444 lines
15 KiB
Python
444 lines
15 KiB
Python
# w.c.s. - web application for online forms
|
|
# Copyright (C) 2005-2010 Entr'ouvert
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation; either version 2 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program; if not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import copy
|
|
import json
|
|
import hashlib
|
|
import os
|
|
import time
|
|
|
|
from quixote import get_publisher
|
|
|
|
from quixote.session import Session as QuixoteSession
|
|
from quixote.session import SessionManager as QuixoteSessionManager
|
|
from quixote.util import randbytes
|
|
|
|
from django.conf import settings
|
|
from django.utils.encoding import force_bytes, force_text
|
|
from django.core.signing import Signer, BadSignature
|
|
|
|
from . import misc
|
|
from .storage import StorableObject
|
|
from .publisher import get_cfg
|
|
from .upload_storage import get_storage_object
|
|
from quixote.publish import get_session, get_session_manager, get_request
|
|
|
|
|
|
class QommonSession(QuixoteSession):
|
|
def __init__(self, id):
|
|
QuixoteSession.__init__(self, id)
|
|
env = get_request().environ
|
|
# add support for X_FORWARDED_FOR
|
|
self.__remote_address = env.get('X_FORWARDED_FOR', env.get('REMOTE_ADDR'))
|
|
|
|
|
|
class CaptchaSession(object):
|
|
MAX_CAPTCHA_TOKENS = 8
|
|
_captcha_tokens = None
|
|
won_captcha = False
|
|
|
|
def create_captcha_token(self):
|
|
if not self._captcha_tokens:
|
|
self._captcha_tokens = []
|
|
token = {'token': randbytes(8), 'answer': None}
|
|
self._captcha_tokens.append(token)
|
|
extra = len(self._captcha_tokens) - self.MAX_CAPTCHA_TOKENS
|
|
if extra > 0:
|
|
del self._captcha_tokens[:extra]
|
|
return token
|
|
|
|
def get_captcha_token(self, token):
|
|
if not self._captcha_tokens:
|
|
return None
|
|
try:
|
|
return [x for x in self._captcha_tokens if x.get('token') == token][0]
|
|
except IndexError:
|
|
return None
|
|
|
|
def remove_captcha_token(self, token):
|
|
token = self.get_captcha_token(token)
|
|
if token:
|
|
self._captcha_tokens.remove(token)
|
|
|
|
def has_info(self):
|
|
return self.won_captcha or self._captcha_tokens
|
|
|
|
|
|
class Session(QommonSession, CaptchaSession, StorableObject):
|
|
_names = 'sessions'
|
|
|
|
name_identifier = None
|
|
lasso_session_dump = None
|
|
lasso_session_index = None
|
|
lasso_anonymous_identity_dump = None
|
|
lasso_identity_provider_id = None
|
|
message = None
|
|
saml_authn_context = None
|
|
saml_idp_cookie = None
|
|
ident_idp_token = None
|
|
has_uploads = False
|
|
jsonp_display_values = None
|
|
extra_variables = None
|
|
expire = None
|
|
forced = False
|
|
# should only be overwritten by authentication methods
|
|
extra_user_variables = None
|
|
|
|
username = None # only set on password authentication
|
|
|
|
def force(self):
|
|
# add some data in the session, it will force a cookie to be set, this
|
|
# is used so we get a session identifier fixed even on the first page
|
|
# of a form.
|
|
self.forced = True
|
|
get_session_manager().maintain_session(self)
|
|
|
|
def set_expire(self, expire):
|
|
self.expire = expire
|
|
|
|
def set_duration(self, duration):
|
|
self.set_expire(time.time() + duration)
|
|
|
|
def is_expired(self):
|
|
if self.expire:
|
|
return time.time() >= self.expire
|
|
duration = get_publisher().get_site_option('session_max_age')
|
|
if duration is None:
|
|
return False
|
|
try:
|
|
duration = int(duration)
|
|
except ValueError:
|
|
return False
|
|
return (time.time() - self.get_access_time()) > duration
|
|
|
|
def has_info(self):
|
|
return self.name_identifier or \
|
|
self.lasso_session_dump or self.message or \
|
|
self.lasso_anonymous_identity_dump or \
|
|
self.lasso_identity_provider_id or \
|
|
self.saml_authn_context or \
|
|
self.ident_idp_token or \
|
|
self.has_uploads or \
|
|
self.jsonp_display_values or \
|
|
self.extra_variables or \
|
|
CaptchaSession.has_info(self) or \
|
|
self.expire or \
|
|
self.extra_user_variables or \
|
|
self.forced or \
|
|
QuixoteSession.has_info(self)
|
|
is_dirty = has_info
|
|
|
|
def migrate(self):
|
|
# abuse migrate (as it is called after a session is loaded) to keep
|
|
# track of original content, to avoid saving it when there's no change.
|
|
self.__orig_dict__ = copy.deepcopy(self.__dict__)
|
|
|
|
def __getstate__(self):
|
|
odict = copy.copy(self.__dict__)
|
|
if '__orig_dict__' in odict:
|
|
del odict['__orig_dict__']
|
|
return odict
|
|
|
|
def store(self, *args, **kwargs):
|
|
current_dict = copy.copy(self.__dict__)
|
|
orig_dict = current_dict.pop('__orig_dict__', {})
|
|
current_dict.pop('_access_time', None)
|
|
orig_dict.pop('_access_time', None)
|
|
if current_dict != orig_dict:
|
|
return super(Session, self).store(*args, **kwargs)
|
|
|
|
def get_session_id(self):
|
|
return self.id
|
|
|
|
def set_session_id(self, session_id):
|
|
self.id = session_id
|
|
|
|
session_id = property(get_session_id, set_session_id)
|
|
|
|
def get_form_token_filepath(self, token):
|
|
return os.path.join(get_publisher().form_tokens_dir, token)
|
|
|
|
def create_form_token(self):
|
|
token = super(Session, self).create_form_token()
|
|
open(self.get_form_token_filepath(token), 'wb').close()
|
|
return token
|
|
|
|
def has_form_token(self, token):
|
|
if not token:
|
|
return False
|
|
has_form_token = super(Session, self).has_form_token(token)
|
|
if not os.path.exists(self.get_form_token_filepath(token)):
|
|
has_form_token = False
|
|
return has_form_token
|
|
|
|
def remove_form_token(self, token):
|
|
super(Session, self).remove_form_token(token)
|
|
self.store()
|
|
try:
|
|
os.unlink(self.get_form_token_filepath(token))
|
|
except OSError:
|
|
pass
|
|
|
|
def clean_form_tokens(self):
|
|
dirname = os.path.join(get_publisher().app_dir, 'form_tokens')
|
|
for token in self._form_tokens:
|
|
try:
|
|
os.unlink(os.path.join(dirname, token))
|
|
except OSError:
|
|
pass
|
|
|
|
def has_user(self):
|
|
user_id = QuixoteSession.get_user(self)
|
|
if user_id and not str(user_id).startswith('anonymous-'):
|
|
return True
|
|
return False
|
|
|
|
def get_user(self):
|
|
user_id = QuixoteSession.get_user(self)
|
|
if user_id:
|
|
if str(user_id).startswith('anonymous-'):
|
|
user = get_publisher().user_class()
|
|
user.id = user_id
|
|
user.anonymous = True
|
|
user.name_identifiers = [ self.name_identifier ]
|
|
user.lasso_dump = self.lasso_anonymous_identity_dump
|
|
return user
|
|
else:
|
|
try:
|
|
return get_publisher().user_class.get(user_id)
|
|
except KeyError:
|
|
pass
|
|
return None
|
|
|
|
def set_user(self, user_id):
|
|
self.id = None # force a new session id to be assigned
|
|
self.extra_user_variables = None
|
|
QuixoteSession.set_user(self, user_id)
|
|
if str(user_id).startswith('anonymous-'):
|
|
# do not store connection time for anonymous users
|
|
return
|
|
try:
|
|
user = get_publisher().user_class.get(user_id)
|
|
user.last_seen = time.time()
|
|
user.store()
|
|
except KeyError:
|
|
pass
|
|
|
|
def display_message(self):
|
|
if not self.message or not self.message[1]:
|
|
return ''
|
|
from quixote.html import htmltext
|
|
# force_text are required for pre-python3 sessions
|
|
s = htmltext('<div id="messages"><ul class="messages">'
|
|
'<li class="%s">%s</li></ul></div>') % (
|
|
force_text(self.message[0]),
|
|
force_text(self.message[1]))
|
|
self.message = None
|
|
return s
|
|
|
|
def get_user_object(self):
|
|
return self.get_user()
|
|
|
|
def get_authentication_context(self):
|
|
for context in get_publisher().get_supported_authentication_contexts():
|
|
contexts = get_publisher().get_authentication_saml_contexts(context)
|
|
if self.saml_authn_context in contexts:
|
|
return context
|
|
return None
|
|
|
|
def get_signer(self):
|
|
return Signer(settings.SECRET_KEY + self.id)
|
|
|
|
def add_tempfile(self, upload, storage=None):
|
|
dirname = os.path.join(get_publisher().app_dir, 'tempfiles')
|
|
if not os.path.exists(dirname):
|
|
os.mkdir(dirname)
|
|
token = randbytes(8)
|
|
upload.time = time.time()
|
|
upload.token = token
|
|
upload.storage = storage
|
|
get_storage_object(upload.storage).save_tempfile(upload)
|
|
|
|
self.has_uploads = True
|
|
if not self.id:
|
|
# the token is signed with the session id so we need to have it
|
|
# created now.
|
|
get_session_manager().maintain_session(self)
|
|
|
|
signer = self.get_signer()
|
|
data = {
|
|
'orig_filename': upload.orig_filename,
|
|
'base_filename': upload.base_filename,
|
|
'content_type': upload.content_type,
|
|
'charset': upload.charset,
|
|
'size': getattr(upload, 'size', None),
|
|
'session': self.id,
|
|
'token': signer.sign(token),
|
|
'unsigned_token': token,
|
|
'storage': upload.storage,
|
|
'storage-attrs': getattr(upload, 'storage_attrs', None),
|
|
}
|
|
filename = os.path.join(get_publisher().app_dir, 'tempfiles', upload.token)
|
|
with open(filename + '.json', 'w') as fd:
|
|
json.dump(data, fd, indent=2)
|
|
|
|
return data
|
|
|
|
def get_tempfile(self, token):
|
|
if not token:
|
|
return None
|
|
if not self.id: # missing session
|
|
return None
|
|
signer = self.get_signer()
|
|
try:
|
|
value = signer.unsign(token)
|
|
except (BadSignature, UnicodeDecodeError):
|
|
return None
|
|
dirname = os.path.join(get_publisher().app_dir, 'tempfiles')
|
|
filename = os.path.join(dirname, value + '.json')
|
|
if not os.path.exists(filename):
|
|
return None
|
|
return misc.json_loads(open(filename).read())
|
|
|
|
def get_tempfile_path(self, token):
|
|
temp = self.get_tempfile(token)
|
|
if not temp:
|
|
return None
|
|
dirname = os.path.join(get_publisher().app_dir, 'tempfiles')
|
|
filename = os.path.join(dirname, temp['unsigned_token'])
|
|
return filename
|
|
|
|
def get_tempfile_content(self, token):
|
|
temp = self.get_tempfile(token)
|
|
if not temp:
|
|
return temp
|
|
|
|
return get_storage_object(temp.get('storage')).get_tempfile(temp)
|
|
|
|
def add_extra_variables(self, **kwargs):
|
|
if not self.extra_variables:
|
|
self.extra_variables = {}
|
|
self.extra_variables.update(kwargs)
|
|
|
|
def get_substitution_variables(self, prefix='session_'):
|
|
d = {}
|
|
d[prefix + 'hash_id'] = hashlib.sha1(force_bytes(self.id)).hexdigest()
|
|
if self.extra_variables:
|
|
for k, v in self.extra_variables.items():
|
|
d[prefix + 'var_' + k] = v
|
|
if self.extra_user_variables:
|
|
for k, v in self.extra_user_variables.items():
|
|
d[prefix + 'var_user_' + k] = v
|
|
return d
|
|
|
|
@classmethod
|
|
def get_sessions_for_saml(cls, name_identifier=Ellipsis, session_indexes=()):
|
|
return (x for x in cls.values()
|
|
if (not session_indexes
|
|
or x.lasso_session_index in session_indexes)
|
|
and name_identifier in (x.name_identifier or []))
|
|
|
|
|
|
class QommonSessionManager(QuixoteSessionManager):
|
|
def start_request(self):
|
|
QuixoteSessionManager.start_request(self)
|
|
session_cfg = get_cfg('session', {})
|
|
ip_linked = session_cfg.get('ip_linked', False)
|
|
if ip_linked:
|
|
env = get_request().environ
|
|
ip = env.get('X_FORWARDED_FOR', env.get('REMOTE_ADDR'))
|
|
session = get_session()
|
|
if ip != session._remote_address:
|
|
# clean cookie
|
|
get_session_manager().expire_session()
|
|
# start a new cookie
|
|
QuixoteSessionManager.start_request(self)
|
|
session = get_session()
|
|
|
|
|
|
class StorageSessionManager(QommonSessionManager):
|
|
|
|
def forget_changes(self, session):
|
|
pass
|
|
|
|
def __getitem__(self, session_id):
|
|
try:
|
|
session = self.session_class.get(session_id)
|
|
if session.is_expired():
|
|
try:
|
|
session.remove_self()
|
|
except OSError:
|
|
pass
|
|
raise KeyError
|
|
return session
|
|
except KeyError:
|
|
raise KeyError
|
|
|
|
def get(self, session_id, default = None):
|
|
try:
|
|
return self[session_id]
|
|
except KeyError:
|
|
return default
|
|
except ValueError: # happens for "insecure string pickle"
|
|
return default
|
|
|
|
def commit_changes(self, session):
|
|
if session and session.id:
|
|
session.store()
|
|
|
|
def keys(self):
|
|
return self.session_class.keys()
|
|
|
|
def values(self):
|
|
return self.session_class.values()
|
|
|
|
def items(self):
|
|
return self.session_class.items()
|
|
|
|
def has_key(self, session_id):
|
|
return self.session_class.has_key(session_id)
|
|
|
|
def __setitem__(self, session_id, session):
|
|
session.store()
|
|
|
|
def __delitem__(self, session_id):
|
|
if not session_id:
|
|
return
|
|
try:
|
|
session = self.session_class.get(session_id)
|
|
except KeyError:
|
|
session = None
|
|
try:
|
|
self.session_class.remove_object(session_id)
|
|
except OSError:
|
|
raise KeyError
|
|
|
|
if session:
|
|
session.clean_form_tokens()
|
|
|
|
def get_sessions_for_saml(self, name_identifier=Ellipsis, session_indexes=()):
|
|
return self.session_class.get_sessions_for_saml(name_identifier, session_indexes)
|
|
|
|
def get_session_for_saml(self, name_identifier = None, session_index = None):
|
|
if session_index:
|
|
session_indexes = (session_index,)
|
|
else:
|
|
session_indexes = ()
|
|
for session in self.get_sessions_for_saml(name_identifier,
|
|
session_indexes):
|
|
return session
|
|
return None
|