118 lines
3.5 KiB
Python
118 lines
3.5 KiB
Python
""" This models is used if you are using form replay authentification
|
|
"""
|
|
import collections
|
|
import json
|
|
|
|
from datetime import datetime
|
|
|
|
from sqlalchemy import Column, Integer, String, DateTime
|
|
from sqlalchemy import ForeignKey
|
|
from sqlalchemy.ext.declarative import declarative_base
|
|
from sqlalchemy.ext.mutable import Mutable
|
|
from sqlalchemy.orm import column_property, relationship, backref
|
|
from sqlalchemy.types import TypeDecorator, Unicode
|
|
|
|
Base = declarative_base()
|
|
|
|
class JSONEncodedDict(TypeDecorator):
|
|
"Represents an immutable structure as a json-encoded string."
|
|
|
|
impl = Unicode
|
|
|
|
def process_bind_param(self, value, dialect):
|
|
if value is not None:
|
|
value = json.dumps(value).decode('utf-8')
|
|
return value
|
|
|
|
def process_result_value(self, value, dialect):
|
|
if value is not None:
|
|
value = json.loads(value)
|
|
return value
|
|
|
|
class MutationDict(Mutable, dict):
|
|
|
|
@classmethod
|
|
def coerce(cls, key, value):
|
|
""" Convert plain dictionaries to MutationDict. """
|
|
if not isinstance(value, MutationDict):
|
|
if isinstance(value, dict):
|
|
return MutationDict(value)
|
|
# this call will raise ValueError
|
|
return Mutable.coerce(key, value)
|
|
else:
|
|
return value
|
|
|
|
def __setitem__(self, key, value):
|
|
""" Detect dictionary set events and emit change events. """
|
|
dict.__setitem__(self, key, value)
|
|
self.changed()
|
|
|
|
def __delitem__(self, key):
|
|
""" Detect dictionary del events and emit change events. """
|
|
dict.__delitem__(self, key)
|
|
self.changed()
|
|
|
|
MutationDict.associate_with(JSONEncodedDict)
|
|
|
|
class ServiceProvider(Base):
|
|
|
|
__tablename__ = 'service_provider'
|
|
|
|
id = Column(Integer, primary_key=True)
|
|
name = Column(String(50), unique=True, nullable=False)
|
|
|
|
def __init__(self, name):
|
|
self.name = name
|
|
|
|
def __repr__(self):
|
|
return "<ServiceProvider('%s')>" % (self.name)
|
|
|
|
class IDPUser(Base):
|
|
|
|
__tablename__ = 'idp_user'
|
|
|
|
id = Column(Integer, primary_key=True)
|
|
# Nameid, pseudo, email, ...
|
|
unique_id = Column(String(150), nullable=False)
|
|
# Entityid
|
|
idp_id = Column(String(150), nullable=False)
|
|
sp_users = relationship("SPUser", backref=backref('idp_user'))
|
|
|
|
def __init__(self, unique_id=None, idp_id=None):
|
|
self.unique_id = unique_id
|
|
self.idp_id = idp_id
|
|
|
|
def __repr__(self):
|
|
return "<IDPUser %d '%s'>" % (self.id, self.unique_id)
|
|
|
|
|
|
class SPUser(Base):
|
|
|
|
__tablename__ = 'sp_user'
|
|
|
|
id = Column(Integer, primary_key=True)
|
|
login = Column(String(150), nullable=False)
|
|
post_values = Column(JSONEncodedDict, nullable=False)
|
|
creation_date = Column(DateTime, default=datetime.utcnow(), nullable=False)
|
|
last_connection = Column(DateTime, default=datetime.utcnow())
|
|
|
|
idp_user_id = Column(Integer, ForeignKey('idp_user.id'))
|
|
service_provider_id = Column(Integer, ForeignKey('service_provider.id'),
|
|
nullable=False)
|
|
service_provider = relationship("ServiceProvider", backref=backref('users'))
|
|
|
|
def __init__(self, login=None, post_values=None, idp_user=None, service_provider=None,
|
|
creation_date=None, last_connection=None):
|
|
self.login = login
|
|
self.post_values = post_values
|
|
self.idp_user = idp_user
|
|
self.service_provider = service_provider
|
|
if creation_date:
|
|
self.creation_date = creation_date
|
|
if last_connection:
|
|
self.last_connection = last_connection
|
|
|
|
def __repr__(self):
|
|
return "<SPUser '%d'>" % (self.id)
|
|
|