This repository has been archived on 2023-02-21. You can view files and clone it, but cannot push or open issues or pull requests.
mandaye/mandaye/auth/authform.py

375 lines
15 KiB
Python

"""
Dispatcher for basic auth form authentifications
"""
import Cookie
import base64
import copy
import re
import traceback
import urllib
import mandaye
from cookielib import CookieJar
from datetime import datetime
from lxml.html import fromstring
from urlparse import parse_qs
from mandaye import config, __version__
from mandaye.exceptions import MandayeException
from mandaye.log import logger
from mandaye.http import HTTPResponse, HTTPHeader, HTTPRequest
from mandaye.response import _500, _302, _401
from mandaye.response import template_response
from mandaye.server import get_response
from mandaye.backends.default import backend
try:
from Crypto.Cipher import AES
except ImportError:
config.encrypt_sp_password = False
class AuthForm(object):
def __init__(self, env, mapper):
"""
env: WSGI environment
mapper: mapper's module like mandaye.mappers.linuxfr
"""
self.env = env
self.urls = mapper.urls
self.site_name = self.env["mandaye.config"]["site_name"]
self.form_values = mapper.form_values
if not self.form_values.has_key('form_headers'):
self.form_values['form_headers'] = {
'Content-Type': 'application/x-www-form-urlencoded',
'User-Agent': 'Mozilla/5.0 Mandaye/%s' % __version__
}
if not self.form_values.has_key('post_fields') or \
not self.form_values.has_key('username_field') or \
not self.form_values.has_key('login_url'):
logger.critical("Bad configuration: AuthForm form_values dict must have \
this keys: post_fields and username_field")
raise MandayeException, 'AuthForm bad configuration'
if not self.form_values.has_key('form_attrs') and \
not self.form_values.has_key('post_url'):
logger.critical("Bad configuration: you must set form_attrs or post_url")
if config.encrypt_secret and not self.form_values.has_key('password_field'):
logger.critical("Bad configuration: AuthForm form_values dict must have a \
a password_field key if you want to encode a password.")
raise MandayeException, 'AuthForm bad configuration'
self.login_url = self.form_values.get('login_url')
if not self.form_values.has_key('post_fields'):
self.form_values['post_fields'] = []
def get_default_mapping(self):
return []
def encrypt_pwd(self, password):
""" This method allows you to encrypt a password
To use this feature you muste set encrypt_sp_password to True
in your configuration and set a secret in encrypt_secret
Return encrypted password
"""
if config.encrypt_secret:
logger.debug("Encrypt password")
if config.encrypt_secret:
try:
cipher = AES.new(config.encrypt_secret, AES.MODE_CFB, "0000000000000000")
password = cipher.encrypt(password)
password = base64.b64encode(password)
return password
except Exception, e:
if config.debug:
traceback.print_exc()
logger.warning('Password encrypting failed %s' % e)
else:
logger.warning("You must set a secret to use pwd encryption")
else:
logger.warning("You must set a password_field to encode a password")
def decrypt_pwd(self, password):
""" This method allows you to dencrypt a password encrypt with
encrypt_pwd method. To use this feature you muste set
encrypt_sp_password to True in your configuration and
set a secret in encrypt_secret
Return decrypted password
"""
if config.encrypt_secret:
logger.debug("Decrypt password")
try:
cipher = AES.new(config.encrypt_secret, AES.MODE_CFB, "0000000000000000")
password = base64.b64decode(password)
password = cipher.decrypt(password)
return password
except Exception, e:
if config.debug:
traceback.print_exc()
logger.warning('Decrypting password failed: %s' % e)
else:
logger.warning("You must set a secret to use pwd decryption")
def get_current_unique_id(self, env):
return env['beaker.session']['unique_id']
def replay(self, env, post_values):
""" replay the login / password
env: WSGI env with beaker session and the target
post_values: dict with the field name (key) and the field value (value)
"""
cj = CookieJar()
request = HTTPRequest()
action = self.form_values.get('post_url')
auth_form = None
# if there is a form parse it
if not "://" in self.login_url:
self.login_url = env['target'].geturl() + '/' + self.login_url
login = get_response(env, request, self.login_url, cj)
if login.code == 502:
return login
if self.form_values.has_key('form_attrs'):
html = fromstring(login.msg)
for form in html.forms:
is_good = True
for key, value in self.form_values['form_attrs'].iteritems():
if form.get(key) != value:
is_good = False
if is_good:
auth_form = form
break
if auth_form == None:
logger.critical("%s %s: can't find login form on %s" %
(env['HTTP_HOST'], env['PATH_INFO'], self.login_url))
return _500(env['PATH_INFO'], "Replay: Can't find login form")
params = {}
for input in auth_form.inputs:
if input.name and input.type != 'button':
if input.value:
params[input.name] = input.value.encode('utf-8')
else:
params[input.name] = ''
for key, value in post_values.iteritems():
params[key] = value
else:
params = post_values
if not self.form_values.has_key('post_url'):
if not auth_form and not auth_form.action:
logger.critical("%s %s: don't find form action on %s" %
(env['HTTP_HOST'], env['PATH_INFO'], self.login_url))
return _500(env['PATH_INFO'], 'Replay: form action not found')
action = auth_form.action
if not "://" in action:
action = env['target'].geturl() + '/' + action
cookies = login.cookies
headers = HTTPHeader()
headers.load_from_dict(self.form_values['form_headers'])
params = urllib.urlencode(params)
request = HTTPRequest(cookies, headers, "POST", params)
return get_response(env, request, action, cj)
def _save_association(self, env, unique_id, post_values):
""" save an association in the database
env: wsgi environment
unique_id: idp uinique id
post_values: dict with the post values
"""
logger.debug('AuthForm._save_association: save a new association')
sp_login = post_values[self.form_values['username_field']]
if config.encrypt_sp_password:
password = self.encrypt_pwd(post_values[self.form_values['password_field']])
post_values[self.form_values['password_field']] = password
service_provider = backend.ManagerServiceProvider.get_or_create(self.site_name)
idp_user = backend.ManagerIDPUser.get_or_create(unique_id)
sp_user = backend.ManagerSPUser.get_or_create(sp_login, post_values,
idp_user, service_provider)
env['beaker.session']['unique_id'] = unique_id
env['beaker.session'][self.site_name] = sp_user.id
env['beaker.session'].save()
def associate_submit(self, env, values, condition, request, response):
""" Associate your login / password into your database
"""
logger.debug("Trying to associate a user")
unique_id = env['beaker.session'].get('unique_id')
if request.msg:
if not unique_id:
logger.warning("Association failed: user isn't login on Mandaye")
return _302(self.urls.get('connection_url'))
if type(request.msg) == str:
post = parse_qs(request.msg, request)
else:
post = parse_qs(request.msg.read(), request)
qs = parse_qs(env['QUERY_STRING'])
for key, value in qs.iteritems():
qs[key] = value[0]
post_fields = self.form_values['post_fields']
post_values = {}
for field in post_fields:
if not post.has_key(field):
logger.info('Association auth failed: form not correctly filled')
qs['type'] = 'badlogin'
return _302(self.urls.get('associate_url') + "?%s" % urllib.urlencode(qs))
post_values[field] = post[field][0]
response = self.replay(env, post_values)
if eval(condition):
logger.debug("Replay works: save the association")
self._save_association(env, unique_id, post_values)
if qs.has_key('next_url'):
return _302(qs['next_url'], response.cookies)
return response
logger.info('Auth failed: Bad password or login')
qs['type'] = 'badlogin'
return _302(self.urls.get('associate_url') + "?%s" % urllib.urlencode(qs))
def _login_sp_user(self, sp_user, env, condition, values):
""" Log in sp user
"""
if not sp_user.login:
return _500(env['PATH_INFO'],
'Invalid values for AuthFormDispatcher.login')
post_values = copy.copy(sp_user.post_values)
if config.encrypt_sp_password:
password = self.decrypt_pwd(post_values[self.form_values['password_field']])
post_values[self.form_values['password_field']] = password
response = self.replay(env, post_values)
qs = parse_qs(env['QUERY_STRING'])
if condition and eval(condition):
sp_user.last_connection = datetime.now()
backend.ManagerSPUser.save()
env['beaker.session'][self.site_name] = sp_user.id
env['beaker.session'].save()
if qs.has_key('next_url'):
return _302(qs['next_url'][0], response.cookies)
else:
return response
else:
return _302(self.urls.get('associate_url') + "?type=failed")
def login(self, env, values, condition, request, response):
""" Automatic login on a site with a form
"""
# Specific method to get current idp unique id
unique_id = self.get_current_unique_id(env)
logger.debug('Trying to login on Mandaye')
if not unique_id:
return _401('Access denied: invalid token')
# FIXME: hack to force beaker to generate an id
# somtimes beaker doesn't do it by himself
env['beaker.session'].regenerate_id()
env['beaker.session']['unique_id'] = unique_id
env['beaker.session'].save()
logger.debug('User %s successfully login' % env['beaker.session']['unique_id'])
idp_user = backend.ManagerIDPUser.get_or_create(unique_id)
service_provider = backend.ManagerServiceProvider.get_or_create(self.site_name)
sp_user = backend.ManagerSPUser.get_last_connected(idp_user, service_provider)
if not sp_user:
logger.debug('User %s is not associate' % env['beaker.session']['unique_id'])
return _302(self.urls.get('associate_url') + "?type=first")
return self._login_sp_user(sp_user, env, condition, values)
def logout(self, env, values, request, response):
""" Destroy the Beaker session
"""
logger.debug('Logout from Mandaye')
env['beaker.session'].delete()
return response
def local_logout(self, env, values, request, response):
logger.info('SP logout initiated by Mandaye')
self.logout(env, values, request, response)
next_url = None
qs = parse_qs(env['QUERY_STRING'])
if qs.has_key('RelayState'):
next_url = qs['RelayState'][0]
elif qs.has_key('next_url'):
next_url = qs['next_url'][0]
elif values.has_key('next_url'):
next_url = values['next_url']
req_cookies = request.cookies
for cookie in req_cookies.values():
cookie['expires'] = 'Thu, 01 Jan 1970 00:00:01 GMT'
cookie['path'] = '/'
if next_url:
return _302(next_url, req_cookies)
else:
return _302('/', req_cookies)
def change_user(self, env, values, request, response):
""" Multi accounts feature
Change the current login user
You must call this method into a response filter
This method must have a query string with a username parameter
"""
# TODO: need to logout the first
unique_id = env['beaker.session']['unique_id']
qs = parse_qs(env['QUERY_STRING'])
if not qs.has_key('id') and not unique_id:
return _401('Access denied: beaker session invalid or not qs id')
if qs.has_key('id'):
id = qs['id'][0]
sp_user = backend.ManagerSPUser.get_by_id(id)
else:
service_provider = backend.ManagerServiceProvider.get(self.site_name)
idp_user = backend.ManagerIDPUser.get(unique_id)
sp_user = backend.ManagerSPUser.get_last_connected(idp_user, service_provider)
if not sp_user:
return _302(self.urls.get('associate_url'))
return self._login_sp_user(sp_user, env, 'response.code==302', values)
def disassociate(self, env, values, request, response):
""" Disassociate an account with the Mandaye account
You need to put the id of the sp user you want to disassociate
in the query string (..?id=42) or use by service provider name
(..?sp_name=)
"""
if env['beaker.session'].has_key('unique_id'):
unique_id = env['beaker.session']['unique_id']
else:
return _401('Access denied: no session')
qs = parse_qs(env['QUERY_STRING'])
if values.get('next_url'):
next_url = values.get('next_url')
else:
next_url = '/'
if qs.has_key('next_url'):
next_url = qs['next_url'][0]
if qs.has_key('id'):
sp_id = qs['id'][0]
sp_user = backend.ManagerSPUser.get_by_id(sp_id)
if sp_user:
backend.ManagerSPUser.delete(sp_user)
if backend.ManagerSPUser.get_sp_users(unique_id, self.site_name):
env['QUERY_STRING'] = ''
return self.change_user(env, values, request, response)
else:
return _401('Access denied: bad id')
elif qs.has_key('sp_name'):
sp_name = qs['sp_name'][0]
for sp_user in \
backend.ManagerSPUser.get_sp_users(unique_id, sp_name):
backend.ManagerSPUser.delete(sp_user)
else:
return _401('Access denied: no id or sp name')
values['next_url'] = next_url
if qs.has_key('logout'):
return self.local_logout(env, values, request, response)
return _302(next_url)