This repository has been archived on 2023-02-21. You can view files and clone it, but cannot push or open issues or pull requests.
larpe/larpe/branches/idwsf/larpe/sessions.py

147 lines
4.4 KiB
Python

import random
from quixote.session import Session, SessionManager
from quixote import get_request
from quixote.html import htmltext
from storage import StorableObject
from users import User
class BasicSession(Session, StorableObject):
_names = 'sessions'
users = {}
# name_identifiers = {}
# name_identifier = None
after_url = None
anonymous_key = None
lasso_session_dumps = {}
# lasso_session_dump = None
# lasso_anonymous_identity_dump = None
tempfiles = None
magictokens = None
message = None
def has_info(self):
return self.users or self.after_url or self.anonymous_key or \
self.tempfiles or self.lasso_session_dumps or self.message or \
self.magictokens or Session.has_info(self)
is_dirty = has_info
def get_session_id(self):
return self.id
def set_session_id(self, session_id):
self.id = session_id
session_id = property(get_session_id, set_session_id)
def get_anonymous_key(self, generate = False):
if self.anonymous_key:
return self.anonymous_key
if generate:
self.anonymous_key = random.randint(0, 1000000000)
return self.anonymous_key
def display_message(self):
if not self.message:
return ''
s = htmltext('<div class="%snotice">%s</div>' % self.message)
self.message = None
return s
def add_magictoken(self, token, data):
if not self.magictokens:
self.magictokens = {}
self.magictokens[token] = data
def get_by_magictoken(self, token, default = None):
if not self.magictokens:
return default
return self.magictokens.get(token, default)
def get_user(self, provider_id):
user_id = self.users.get(provider_id, None)
if user_id:
try:
user = User.get(user_id)
except KeyError:
user = User()
# if str(user_id).startswith('anonymous-'):
user.id = user_id
# user.anonymous = True
# if self.name_identifiers.has_key(providerId):
# if not user.name_identifiers.has_key(providerId):
# user.name_identifiers[providerId] = [ self.name_identifiers[providerId] ]
# if self.name_identifiers.has_key(providerId):
# user.name_identifiers[providerId] = [ self.name_identifiers[providerId] ]
# else:
# user.name_identifiers[providerId] = []
# user.lasso_dumps[providerId] = self.lasso_anonymous_identity_dump
return user
return None
def set_user(self, user_id, provider_id):
self.users[provider_id] = user_id
class StorageSessionManager(SessionManager):
def __init__(self):
SessionManager.__init__(self, session_class=BasicSession)
def forget_changes(self, session):
pass
def __getitem__(self, session_id):
try:
return BasicSession.get(session_id)
except KeyError:
raise KeyError
def get(self, session_id, default = None):
try:
return BasicSession.get(session_id)
except KeyError:
return default
def commit_changes(self, session):
if session and session.id:
session.store()
def keys(self):
return BasicSession.keys()
def values(self):
return BasicSession.values()
def items(self):
return BasicSession.items()
def has_key(self, session_id):
return BasicSession.has_key(session_id)
def __setitem__(self, session_id, session):
session.store()
def __delitem__(self, session_id):
if not session_id:
return
try:
BasicSession.remove_object(session_id)
except OSError:
raise KeyError
def expire_session(self, provider_id=None):
session = get_request().session
if session.id is not None:
if provider_id:
if session.users.has_key(provider_id):
del session.users[provider_id]
if session.lasso_session_dumps.has_key(provider_id):
del session.lasso_session_dumps[provider_id]
session.store()
if not session.users:
SessionManager.expire_session(self)
# session.remove_self()