This repository has been archived on 2023-02-21. You can view files and clone it, but cannot push or open issues or pull requests.
mandaye/mandaye/auth/authform.py

338 lines
14 KiB
Python

"""
Dispatcher for basic auth form authentifications
"""
import Cookie
import base64
import copy
import re
import traceback
import urllib
import mandaye
from cookielib import CookieJar
from datetime import datetime
from lxml.html import fromstring
from urlparse import parse_qs
from mandaye import config, VERSION
from mandaye.exceptions import MandayeException
from mandaye.log import logger
from mandaye.http import HTTPResponse, HTTPHeader, HTTPRequest
from mandaye.response import _500, _302, _401
from mandaye.response import template_response
from mandaye.server import get_response
from mandaye.config.backend import ManagerIDPUser, ManagerSPUser,\
ManagerServiceProvider
try:
from Crypto.Cipher import AES
except ImportError:
config.encrypt_sp_password = False
class AuthForm(object):
def __init__(self, form_values, site_name):
"""
form_values: dict example :
{
'form_action': '/myform',
'form_url': '/myform',
'form_attrs': { 'name': 'form40', },
'username_field': 'user',
'password_field': 'pass',
'post_fields': ['birthdate', 'card_number']
}
form_url, form_attrs, post_fields and username_field are obligatory
site_name: str with the site name
"""
if not form_values.has_key('form_headers'):
form_values['form_headers'] = {
'Content-Type': 'application/x-www-form-urlencoded',
'User-Agent': 'Mozilla/5.0 Mandaye/%s' % VERSION
}
if not form_values.has_key('form_url') or \
not form_values.has_key('form_attrs') or \
not form_values.has_key('post_fields') or \
not form_values.has_key('username_field'):
logger.critical("Bad configuration: AuthForm form_values dict must have \
this keys: form_url, form_attrs, post_fields and username_field")
raise MandayeException, 'AuthForm bad configuration'
if config.encrypt_secret and not form_values.has_key('password_field'):
logger.critical("Bad configuration: AuthForm form_values dict must have a \
a password_field key if you want to encode a password.")
raise MandayeException, 'AuthForm bad configuration'
self.form_url = form_values['form_url']
self.form_values = form_values
if not self.form_values.has_key('post_fields'):
self.form_values['post_fields'] = []
self.site_name = site_name
def _encrypt_pwd(self, post_values):
""" This method allows you to encrypt a password
To use this feature you muste set encrypt_sp_password to True
in your configuration and set a secret in encrypt_secret
post_values: containt the post values
return None and modify post_values
"""
if config.encrypt_secret:
logger.debug("Encrypt password")
password = post_values[self.form_values['password_field']]
if config.encrypt_secret:
try:
cipher = AES.new(config.encrypt_secret, AES.MODE_CFB, "0000000000000000")
password = cipher.encrypt(password)
password = base64.b64encode(password)
post_values[self.form_values['password_field']] = password
except Exception, e:
if config.debug:
traceback.print_exc()
logger.warning('Password encrypting failed %s' % e)
else:
logger.warning("You must set a secret to use pwd encryption")
else:
logger.warning("You must set a password_field to encode a password")
def _decrypt_pwd(self, post_values):
""" This method allows you to dencrypt a password encrypt with
_encrypt_pwd method. To use this feature you muste set
encrypt_sp_password to True in your configuration and
set a secret in encrypt_secret
post_values: containt the post values
return None and modify post_values
"""
if config.encrypt_secret:
logger.debug("Decrypt password")
password = post_values[self.form_values['password_field']]
try:
cipher = AES.new(config.encrypt_secret, AES.MODE_CFB, "0000000000000000")
password = base64.b64decode(password)
password = cipher.decrypt(password)
post_values[self.form_values['password_field']] = password
except Exception, e:
if config.debug:
traceback.print_exc()
logger.warning('Decrypting password failed: %s' % e)
else:
logger.warning("You must set a secret to use pwd decryption")
def _get_password(self, post_values):
if self.form_values.has_key('password_field'):
if config.encrypt_sp_password:
return self._encrypt_pwd(
post_values[self.form_values['password_field']]
)
return post_values[self.form_values['password_field']]
return None
def replay(self, env, post_values):
""" replay the login / password
env: WSGI env with beaker session and the target
post_values: dict with the field name (key) and the field value (value)
"""
if not "://" in self.form_url:
self.form_url = env['target'].geturl() + '/' + self.form_url
cj = CookieJar()
request = HTTPRequest()
login = get_response(env, request, self.form_url, cj)
if login.code == 502:
return login
html = fromstring(login.msg)
auth_form = None
for form in html.forms:
is_good = True
for key, value in self.form_values['form_attrs'].iteritems():
if form.get(key) != value:
is_good = False
if is_good:
auth_form = form
break
if auth_form == None:
logger.critical("%s %s: can't find login form on %s" %
(env['HTTP_HOST'], env['PATH_INFO'], self.form_url))
return _500(env['PATH_INFO'], "Replay: Can't find login form")
if not self.form_values.has_key('from_action'):
if not auth_form.action:
logger.critical("%s %s: don't find form action on %s" %
(env['HTTP_HOST'], env['PATH_INFO'], self.form_url))
return _500(env['PATH_INFO'], 'Replay: form action not found')
action = auth_form.action
else:
action = self.form_values['form_action']
if not "://" in action:
action = env['target'].geturl() + '/' + action
cookies = login.cookies
headers = HTTPHeader()
headers.load_from_dict(self.form_values['form_headers'])
params = {}
for input in auth_form.inputs:
if input.name and input.type != 'button':
if input.value:
params[input.name] = input.value
else:
params[input.name] = ''
for key, value in post_values.iteritems():
params[key] = value
params = urllib.urlencode(params)
request = HTTPRequest(cookies, headers, "POST", params)
return get_response(env, request, action, cj)
def _save_association(self, env, unique_id, post_values):
""" save an association in the database
env: wsgi environment
unique_id: idp uinique id
post_values: dict with the post values
"""
sp_login = post_values[self.form_values['username_field']]
if config.encrypt_sp_password:
self._encrypt_pwd(post_values)
service_provider = ManagerServiceProvider.get_or_create(self.site_name)
idp_user = ManagerIDPUser.get_or_create(unique_id)
sp_user = ManagerSPUser.get_or_create(sp_login, post_values,
idp_user, service_provider)
sp_user.login = sp_login
sp_user.post_values = post_values
sp_user.idp_user = idp_user
sp_user.last_connection = datetime.now()
sp_user.service_provider = service_provider
ManagerSPUser.save()
env['beaker.session']['unique_id'] = unique_id
env['beaker.session'][self.site_name] = sp_user.id
env['beaker.session'].save()
def associate_submit(self, env, values, condition, request, response):
""" Associate your login / password into your database
"""
logger.debug("Trying to associate a user")
unique_id = env['beaker.session'].get('unique_id')
if request.msg:
if not unique_id:
logger.warning("Association failed: user isn't login on Mandaye")
return _302(values.get('connection_url'))
post = parse_qs(request.msg.read(), request)
qs = parse_qs(env['QUERY_STRING'])
for key, value in qs.iteritems():
qs[key] = value[0]
post_fields = self.form_values['post_fields']
post_values = {}
for field in post_fields:
if not post.has_key(field):
logger.info('Association auth failed: form not correctly filled')
qs['type'] = 'badlogin'
return _302(values.get('associate_url') + "?%s" % urllib.urlencode(qs))
post_values[field] = post[field][0]
response = self.replay(env, post_values)
if eval(condition):
logger.debug("Replay works: save the association")
self._save_association(env, unique_id, post_values)
if qs.has_key('next_url'):
return _302(qs['next_url'], response.cookies)
return response
logger.info('Auth failed: Bad password or login for %s on %s' % \
(post['username'][0], self.site_name))
qs['type'] = 'badlogin'
return _302(values.get('associate_url') + "?%s" % urllib.urlencode(qs))
def _login_sp_user(self, sp_user, env, condition, values):
""" Log in sp user
"""
if not sp_user.login:
return _500(env['PATH_INFO'],
'Invalid values for AuthFormDispatcher.login')
post_values = copy.deepcopy(sp_user.post_values)
if config.encrypt_sp_password:
self._decrypt_pwd(post_values)
response = self.replay(env, post_values)
if condition and eval(condition):
sp_user.last_connection = datetime.now()
ManagerSPUser.save()
env['beaker.session'][self.site_name] = sp_user.id
env['beaker.session'].save()
return response
else:
return _302(values.get('associate_url') + "?type=failed")
def login(self, env, values, condition, request, response):
""" Automatic login on a site with a form
"""
logger.debug('Trying to login on Mandaye')
# Specific method to get current idp unique id
unique_id = self.get_current_unique_id(env)
if not unique_id:
return _401('Access denied: invalid token')
# FIXME: hack to force beaker to generate an id
# somtimes beaker doesn't do it by himself
env['beaker.session'].regenerate_id()
env['beaker.session']['unique_id'] = unique_id
env['beaker.session'].save()
logger.debug('User %s successfully login' % env['beaker.session']['unique_id'])
idp_user = ManagerIDPUser.get(unique_id)
service_provider = ManagerServiceProvider.get(self.site_name)
sp_user = ManagerSPUser.get_last_connected(idp_user, service_provider)
if not sp_user:
logger.debug('User %s is not associate' % env['beaker.session']['unique_id'])
return _302(values.get('associate_url') + "?type=first")
return self._login_sp_user(sp_user, env, condition, values)
def logout(self, env, values, request, response):
""" Destroy the Beaker session
"""
logger.debug('Logout from Mandaye')
env['beaker.session'].delete()
return response
def change_user(self, env, values, request, response):
""" Multi accounts feature
Change the current login user
You must call this method into a response filter
This method must have a query string with a username parameter
"""
# TODO: need to logout the first
unique_id = env['beaker.session']['unique_id']
qs = parse_qs(env['QUERY_STRING'])
if not login or not qs.has_key('id'):
return _401('Access denied: beaker session invalid or not qs id')
id = qs['id'][0]
service_provider = ManagerServiceProvider.get(self.site_name)
idp_user = ManagerServiceProvider.get(unique_id)
sp_user = ManagerSPUser.get_last_connected(idp_user, service_provider)
if not sp_user:
return _302(values.get('associate_url'))
return self._login_sp_user(sp_user, env, 'response.code==302', values)
def disassociate(self, env, values, request, response):
""" Multi accounts feature
Disassociate an account with the Mandaye account
You need to put the username you want to disassociate
in the query string (..?username=toto)
"""
if env['beaker.session'].has_key('login'):
login = env['beaker.session']['login']
else:
return _401('Access denied: no session')
qs = parse_qs(env['QUERY_STRING'])
if not login or not qs.has_key('id'):
return _401('Access denied: beaker session invalid or not id')
id = qs['id'][0]
sp_user = ManagerSPUser.get_by_id(id)
if sp_user:
ManagerSPUser.delete(sp_user)
if qs.has_key('logout'):
self.logout(env, values, request, response)
return _302(values.get('next_url'))
else:
return _401('Access denied: bad id')
return _302(values.get('next_url'))