This repository has been archived on 2023-02-21. You can view files and clone it, but cannot push or open issues or pull requests.
authentic-old/authentic/sessions.py

181 lines
5.3 KiB
Python

from quixote.session import Session, SessionManager
from qommon.storage import StorableObject
from quixote import get_request, get_response
from qommon.sessions import CaptchaSession
import identities
import time
class BasicSession(Session, CaptchaSession, StorableObject):
_names = 'sessions'
lasso_login_dump = None
lasso_logout_dump = None
lasso_session_dump = None
lasso_proxy_session_dump = None
after_url = None
name_identifiers = None
proxied_idp = None
question_key = None
saml2 = False
in_slo_get = False
in_slo_sp = False
authentication_method = None
message = None
ssl = False
__remember = None
# identifier of the last requesting service
_service = None
_has_info_keys = [ "lasso_login_dump", "lasso_session_dump",
"question_key", "after_url", "name_identifiers", "proxied_idp",
"authentication_method", "message", "session" ]
def __init__(self, id = None):
Session.__init__(self, id)
self.name_identifiers = []
def has_info(self):
for key in self._has_info_keys:
if getattr(self, key, None):
return True
return Session.has_info(self) or CaptchaSession.has_info(self)
is_dirty = has_info
def get_session_id(self):
return self.id
def set_session_id(self, session_id):
self.id = session_id
session_id = property(get_session_id, set_session_id)
def get_user_object(self):
try:
return identities.get_store().get_identity(self.user)
except KeyError:
return None
def get_sessions_by_user(cls, user):
return cls.select(lambda x: x.user == user, ignore_errors = True)
get_sessions_by_user = classmethod(get_sessions_by_user)
def display_message(self):
if not self.message or not self.message[1]:
return ''
from quixote.html import htmltext
s = htmltext('<div class="%snotice">%s</div>' % self.message)
self.message = None
return s
def remember(self, key, value = None, expiration = None):
if value is not None:
self.__remember = self.__remember or {}
self.__remember[key] = (expiration, value)
if self.__remember is None:
return None
t=self.__remember.get(key)
if t and t[0] > time.time():
return t[1]
return None
def get_service(self):
'''Accessor for the _service, allows service to work on old sessions'''
return getattr(self, '_service')
def set_service(self, value):
self._service = value
_not_cleaned = ('_service')
_to_remove = ('login_tokens')
def clean_data(self):
fields_to_clean = [ f for f in self.__dict__ \
if f not in self._not_cleaned and not
f.startswith('_') ]
for field in fields_to_clean:
if field in self._to_remove:
delattr(self, field)
elif hasattr(Session, field):
setattr(self, field. getattr(Session, field))
else:
setattr(self, field, None)
self.id = None
service = property(get_service, set_service)
class StorageSessionManager(SessionManager):
def __init__(self, session_class = BasicSession):
SessionManager.__init__(self, session_class = session_class)
def forget_changes(self, session):
pass
def __getitem__(self, session_id):
try:
return BasicSession.get(session_id)
except KeyError:
raise KeyError
def get(self, session_id, default = None):
try:
return BasicSession.get(session_id)
except KeyError:
return default
def commit_changes(self, session):
if session and session.id:
session.store()
# if HTTPS on set secure flag on the cookie, always set the HTTPOnly flag
def _set_cookie(self, value, **attrs):
if get_request().environ.get('HTTPS'):
attrs['secure'] = 1
attrs['HTTPOnly'] = 1
return SessionManager._set_cookie(self, value, **attrs)
# if a new session is requested under SSL configuration set a SSL flag
def new_session(self, id):
session = SessionManager.new_session(self, id)
if get_request().environ.get('HTTPS'):
session.ssl = True
return session
def keys(self):
return BasicSession.keys()
def values(self):
return BasicSession.values(ignore_errors = True)
def items(self):
return BasicSession.items()
def has_key(self, session_id):
return BasicSession.has_key(session_id)
def __setitem__(self, session_id, session):
session.store()
def __delitem__(self, session_id):
if not session_id:
return
try:
self.session_class.remove_object(session_id)
except OSError:
raise KeyError
def finish_successful_request(self):
SessionManager.finish_successful_request(self)
def expire_session(self):
# Delete the data from disk
# Clean some of the session fields
# Keep others
request = get_request()
if request.session and request.session.id:
try:
del self[request.session.id]
except KeyError:
pass
request.session.clean_data()