This repository has been archived on 2023-02-21. You can view files and clone it, but cannot push or open issues or pull requests.
mandaye/mandaye/backends/sql.py

187 lines
7.1 KiB
Python

import copy
from datetime import datetime
from mandaye.backends.default import storage_conn
from mandaye.log import logger
from mandaye.models import IDPUser, SPUser, ServiceProvider
class Association(object):
"""
association dictionnary return by the following methods:
{
'id': '', # identifier of your association (must be unique)
'sp_name': '', # name of the service provider (defined in the mappers)
'sp_login': '', # login on the service provider
'sp_post_values': '', # the post values for sp login form
'idp_unique_id:': '', # the unique identifier of the identity provider (ex.: a saml NameID)
'idp_name': '', # identity provide name
'last_connection': datetime.datetime, # last connection with this association
'creation_date': datetime.datetime, # creation date of this association
}
"""
@staticmethod
def sp_user2association(sp_user):
return {
'id': sp_user.id,
'sp_name': sp_user.service_provider.name,
'sp_login': sp_user.login,
'sp_post_values': sp_user.post_values,
'idp_unique_id': sp_user.idp_user.unique_id,
'idp_name': sp_user.idp_user.idp_id,
'last_connection': sp_user.last_connection,
'creation_date': sp_user.creation_date
}
@staticmethod
def get(sp_name, idp_unique_id, idp_name='default'):
""" return a list of dict with associations that matching all of this options """
associations = []
sp_users = storage_conn.query(SPUser).\
join(IDPUser).\
join(ServiceProvider).\
filter(ServiceProvider.name==sp_name).\
filter(IDPUser.unique_id==idp_unique_id).\
filter(IDPUser.idp_id==idp_name).\
all()
for sp_user in sp_users:
association = Association.sp_user2association(sp_user)
associations.append(association)
return associations
@staticmethod
def get_by_id(asso_id):
""" return an dict of the association with the id or None if it doesn't exist """
sp_user = storage_conn.query(SPUser).\
filter(SPUser.id==asso_id).first()
if sp_user:
return Association.sp_user2association(sp_user)
return None
@staticmethod
def has_id(asso_id):
""" return a boolean """
if storage_conn.query(SPUser).\
filter(SPUser.id==asso_id).\
count():
return True
return False
@staticmethod
def update_or_create(sp_name, sp_login, sp_post_values, idp_unique_id,
idp_name='default', creation_date=None, last_connection_date=None):
""" update or create an associtaion which match the following values
return the association id
"""
sp_user = storage_conn.query(SPUser).\
join(IDPUser).\
join(ServiceProvider).\
filter(SPUser.login==sp_login).\
filter(ServiceProvider.name==sp_name).\
filter(IDPUser.unique_id==idp_unique_id).\
filter(IDPUser.idp_id==idp_name).\
first()
if sp_user:
sp_user.post_values = sp_post_values
storage_conn.add(sp_user)
logger.info('Modify association for %r (%r)', sp_login, idp_unique_id)
else:
service_provider = storage_conn.query(ServiceProvider).\
filter(ServiceProvider.name==sp_name).first()
if not service_provider:
logger.info('New service provider %r', sp_name)
service_provider = ServiceProvider(name=sp_name)
storage_conn.add(service_provider)
idp_user = storage_conn.query(IDPUser).\
filter(IDPUser.unique_id==idp_unique_id).\
filter(IDPUser.idp_id==idp_name).\
first()
if not idp_user:
logger.info('New IDP user %r', idp_unique_id)
idp_user = IDPUser(
unique_id=idp_unique_id,
idp_id=idp_name
)
storage_conn.add(idp_user)
creation_date = creation_date or datetime.utcnow()
last_connection_date = last_connection_date or datetime.utcnow()
sp_user = SPUser(
login=sp_login,
post_values=sp_post_values,
idp_user=idp_user,
service_provider=service_provider,
creation_date=creation_date,
last_connection=last_connection_date
)
storage_conn.add(sp_user)
try:
storage_conn.commit()
except:
logger.error("update_or_create transaction failed so we rollback")
storage_conn.rollback()
raise
logger.info('New association %r with %r', sp_login, idp_unique_id)
return sp_user.id
@staticmethod
def delete(asso_id):
""" delete the association which has the following asso_id """
sp_user = storage_conn.query(SPUser).get(asso_id)
if not sp_user:
logger.warning("Association deletion failed: sp user %r doesn't exist", asso_id)
return
logger.info("Disassociate account %r (%r)", sp_user.login, sp_user.idp_user.unique_id)
storage_conn.delete(sp_user)
try:
storage_conn.commit()
except:
logger.error("delete transaction failed so we rollback")
storage_conn.rollback()
raise
@staticmethod
def get_last_connected(sp_name, idp_unique_id, idp_name='default'):
""" get the last connecting association which match the parameters
return a dict of the association
"""
sp_user = storage_conn.query(SPUser).\
join(IDPUser).\
join(ServiceProvider).\
filter(ServiceProvider.name==sp_name).\
filter(IDPUser.unique_id==idp_unique_id).\
filter(IDPUser.idp_id==idp_name).\
order_by(SPUser.last_connection.desc()).\
first()
if sp_user:
return Association.sp_user2association(sp_user)
else:
return None
@staticmethod
def update_last_connection(asso_id):
""" update the association last conenction time with the current time
return None
"""
sp_user = storage_conn.query(SPUser).get(asso_id)
if not sp_user:
logger.warning("Update last connecting failed: sp user %r doesn't exist", asso_id)
return
sp_user.last_connection = datetime.utcnow()
storage_conn.add(sp_user)
@staticmethod
def has_sp_login(sp_login, sp_name):
sp_user = storage_conn.query(SPUser).\
join(ServiceProvider).\
filter(ServiceProvider.name==sp_name).\
filter(SPUser.login==sp_login).\
count()
if sp_user:
return True
return False