This repository has been archived on 2023-02-21. You can view files and clone it, but cannot push or open issues or pull requests.
mandaye/mandaye/auth/vincennes.py

140 lines
5.6 KiB
Python

""" Vincennes authentifications
"""
import base64
import traceback
from Crypto.Cipher import AES
from datetime import datetime, timedelta
from urlparse import parse_qs
from mandaye.auth.authform import AuthForm
from mandaye.log import logger
from mandaye.models import Site, ExtUser, LocalUser
from mandaye.db import sql_session
from mandaye.response import _502, _302
from mandaye.server import get_response
class VincennesAuth(AuthForm):
""" Specific authentification class for Vincennes
"""
def __init__(self, form_values, site_name, url):
""" url: Vincennes 'idp' url
"""
self.url = url
AuthForm.__init__(self, form_values, site_name)
def _parse_qs(self, query):
""" Parse query string
Return a dict
"""
res = {}
values = query.split('&')
for value in values:
keyvalue = value.split('=', 1)
res[keyvalue[0]] = keyvalue[1]
return res
def get_current_login(self, env):
""" Return the current Vincennes pseudo
"""
from mandaye import config
# TODO: test time validity
if not env['QUERY_STRING']:
return None
query = self._parse_qs(env['QUERY_STRING'])
if query.has_key('token'):
try:
token = query['token']
token = base64.b64decode(token)
cipher = AES.new(config.secret, AES.MODE_CFB)
decode = cipher.decrypt(token)
if not decode:
raise "Empty token"
except Exception, e:
if config.debug:
traceback.print_exc()
logger.info("Invalid token %s" % e)
return None
else:
info = eval(decode[16:])
if info.has_key('date'):
date = info['date']
token_date = datetime.strptime(date[:-6], '%Y-%m-%dT%H:%M:%S')
delta = datetime.utcnow() - token_date
timeout = timedelta(seconds=config.token_timeout)
if abs(delta) > timeout:
logger.warning('Vincennes token timeout (delta of %s seconds)' \
% abs(delta))
return None
return info['pseudo']
return None
def connection(self, env, values, request, response):
""" Connection to the compte citoyen
"""
dest_url = "%s://%s%s" % (env['mandaye.scheme'], env['HTTP_HOST'],
values.get('next_url'))
location = "%s?next_url=%s&service=%s" % \
(self.url, dest_url, self.site_name)
return _302(location)
def auto_login(self, env, values, condition, request, response):
""" Try to auto login the user if he is already login on www.vincennes.fr
"""
logger.debug('Trying to auto log user on %s' % self.site_name)
login = self.get_current_login(env)
if env['beaker.session'].has_key('next_url'):
path = env['beaker.session']['next_url']
else:
logger.warning('Auto login without mandaye_next_url automatically redirect to /')
path = '/'
if not login:
logger.debug('Auto login failed because the user is not connected on vincennes.fr')
return _302(path, request.cookies)
env['beaker.session']['login'] = login
env['beaker.session'].save()
ext_user = sql_session().query(ExtUser).\
join(LocalUser).\
join(Site).\
filter(LocalUser.login==login).\
filter(Site.name==self.site_name).\
order_by(ExtUser.last_connection.desc()).\
first()
if not ext_user:
logger.debug("No association found redirect to the association page %s" % values.get('associate_url'))
# TODO: urllencode path
return _302(values.get('associate_url') + "?type=first&next_url=%s" % path)
else:
response = self._login_ext_user(ext_user, env, condition, values)
logger.info("User %s has been successfully auto login on %s" % (login, self.site_name))
return _302(path, response.cookies)
def auto_connection(self, env, values, request, response):
""" Try to auto connect a user on www.vincennes.fr
This method will try to auto connect only if the referer match and
if the request is a get and the user isn't login into Mandaye
"""
referer = request.headers.getheader('referer')
if not env['beaker.session'].get(self.site_name) \
and not env['beaker.session'].get('next_url'):
if (not referer and values.get('empty_referer')) or \
(referer and values.get('autologin_from') in referer):
dest_url = "%s://%s%s" % (env['mandaye.scheme'], env['HTTP_HOST'],
values.get('next_url'))
target = "%s?service=%s&pass=1&next_url=%s" % \
(self.url, self.site_name, dest_url)
if env['QUERY_STRING']:
env['beaker.session']['next_url'] = env['PATH_INFO'] + "?%s" % env['QUERY_STRING']
else:
env['beaker.session']['next_url'] = env['PATH_INFO']
env['beaker.session'].save()
logger.debug('Auto connection call %s' % target)
return _302(target)
if env['beaker.session'].get('next_url'):
logger.debug('Disable next_url in the session')
env['beaker.session']['next_url'] = None
env['beaker.session'].save()
return response