This repository has been archived on 2023-02-21. You can view files and clone it, but cannot push or open issues or pull requests.
mandaye/mandaye/auth/authform.py

419 lines
17 KiB
Python

"""
Dispatcher for basic auth form authentifications
"""
import Cookie
import base64
import copy
import re
import os
import traceback
import urllib
import mandaye
from cookielib import CookieJar
from datetime import datetime
from lxml.html import fromstring
from urlparse import parse_qs
from mandaye import config, __version__
from mandaye.exceptions import MandayeException
from mandaye.log import logger
from mandaye.http import HTTPResponse, HTTPHeader, HTTPRequest
from mandaye.response import _500, _302, _401
from mandaye.response import template_response
from mandaye.server import get_response
from mandaye.backends.default import Association
try:
from Crypto.Cipher import AES
except ImportError:
config.encrypt_sp_password = False
class AuthForm(object):
def __init__(self, env, mapper):
"""
env: WSGI environment
mapper: mapper's module like mandaye.mappers.linuxfr
"""
self.env = env
self.urls = mapper.urls
self.site_name = self.env["mandaye.config"]["site_name"]
self.form_values = mapper.form_values
if not self.form_values.has_key('form_headers'):
self.form_values['form_headers'] = {
'Content-Type': 'application/x-www-form-urlencoded',
'User-Agent': 'Mozilla/5.0 Mandaye/%s' % __version__
}
if not self.form_values.has_key('post_fields') or \
not self.form_values.has_key('username_field') or \
not self.form_values.has_key('login_url'):
logger.critical("Bad configuration: AuthForm form_values dict must have \
this keys: post_fields and username_field")
raise MandayeException, 'AuthForm bad configuration'
if not self.form_values.has_key('form_attrs') and \
not self.form_values.has_key('post_url'):
logger.critical("Bad configuration: you must set form_attrs or post_url")
if config.encrypt_secret and not self.form_values.has_key('password_field'):
logger.critical("Bad configuration: AuthForm form_values dict must have a \
a password_field key if you want to encode a password.")
raise MandayeException, 'AuthForm bad configuration'
self.login_url = self.form_values.get('login_url')
if not self.form_values.has_key('post_fields'):
self.form_values['post_fields'] = []
def get_default_mapping(self):
mapping = [
{
'path': r'%s$' % self.urls.get('logout_url', '/mandaye/logout'),
'on_response': [{'auth': 'slo'}]
},
]
if config.a2_auto_connection:
mapping.append({
'path': r'/',
'response': {
'filter': self.auto_connection,
'condition': self.is_connected_a2
}
})
return mapping
def encrypt_pwd(self, password):
""" This method allows you to encrypt a password
To use this feature you muste set encrypt_sp_password to True
in your configuration and set a secret in encrypt_secret
Return encrypted password
"""
if config.encrypt_secret:
logger.debug("Encrypt password")
try:
cipher = AES.new(config.encrypt_secret, AES.MODE_CFB, "0000000000000000")
password = cipher.encrypt(password)
password = base64.b64encode(password)
return password
except Exception, e:
if config.debug:
traceback.print_exc()
logger.warning('Password encrypting failed %s' % e)
else:
logger.warning("You must set a secret to use pwd encryption")
def decrypt_pwd(self, password):
""" This method allows you to dencrypt a password encrypt with
encrypt_pwd method. To use this feature you muste set
encrypt_sp_password to True in your configuration and
set a secret in encrypt_secret
Return decrypted password
"""
if config.encrypt_secret:
logger.debug("Decrypt password")
try:
cipher = AES.new(config.encrypt_secret, AES.MODE_CFB, "0000000000000000")
password = base64.b64decode(password)
password = cipher.decrypt(password)
return password
except Exception, e:
if config.debug:
traceback.print_exc()
logger.warning('Decrypting password failed: %r', e)
else:
logger.warning("You must set a secret to use pwd decryption")
def get_current_unique_id(self, env):
if env['beaker.session'].has_key('unique_id'):
return env['beaker.session']['unique_id']
return None
def replay(self, env, post_values):
""" replay the login / password
env: WSGI env with beaker session and the target
post_values: dict with the field name (key) and the field value (value)
"""
logger.debug("authform.replay post_values: %r", post_values)
cj = CookieJar()
request = HTTPRequest()
action = self.form_values.get('post_url')
auth_form = None
# if there is a form parse it
if not "://" in self.login_url:
self.login_url = os.path.join(env['target'].geturl(), self.login_url)
login = get_response(env, request, self.login_url, cj)
if login.code == 502:
return login
if self.form_values.has_key('form_attrs'):
html = fromstring(login.msg)
for form in html.forms:
is_good = True
for key, value in self.form_values['form_attrs'].iteritems():
if form.get(key) != value:
is_good = False
if is_good:
auth_form = form
break
if auth_form == None:
logger.critical("%r %r: can't find login form on %r",
env['HTTP_HOST'], env['PATH_INFO'], self.login_url)
return _500(env['PATH_INFO'], "Replay: Can't find login form")
params = {}
for input in auth_form.inputs:
if input.name and input.type != 'button':
if input.value:
params[input.name] = input.value.encode('utf-8')
else:
params[input.name] = ''
for key, value in post_values.iteritems():
params[key] = value
else:
params = post_values
if not self.form_values.has_key('post_url'):
if len(auth_form) and not auth_form.action:
logger.critical("%r %r: don't find form action on %r",
env['HTTP_HOST'], env['PATH_INFO'], self.login_url)
return _500(env['PATH_INFO'], 'Replay: form action not found')
action = auth_form.action
if not "://" in action:
login_url = re.sub(r'\?.*$', '', self.login_url)
action = os.path.join(login_url, action)
cookies = login.cookies
headers = HTTPHeader()
headers.load_from_dict(self.form_values['form_headers'])
params = urllib.urlencode(params)
request = HTTPRequest(cookies, headers, "POST", params)
return get_response(env, request, action, cj)
def _save_association(self, env, unique_id, post_values):
""" save an association in the database
env: wsgi environment
unique_id: idp uinique id
post_values: dict with the post values
"""
logger.debug('AuthForm._save_association: save a new association')
sp_login = post_values[self.form_values['username_field']]
if config.encrypt_sp_password:
password = self.encrypt_pwd(post_values[self.form_values['password_field']])
post_values[self.form_values['password_field']] = password
asso_id = Association.update_or_create(self.site_name, sp_login,
post_values, unique_id)
old_association_id = env['beaker.session'].get('old_association_id')
if old_association_id and old_association_id != asso_id:
Association.delete(old_association_id)
env['beaker.session']['old_association_id'] = None
env['beaker.session']['unique_id'] = unique_id
env['beaker.session'][self.site_name] = asso_id
env['beaker.session'].save()
def associate_submit(self, env, values, request, response):
""" Associate your login / password into your database
"""
logger.debug("Trying to associate a user")
unique_id = env['beaker.session'].get('unique_id')
if request.msg:
if not unique_id:
logger.warning("Association failed: user isn't login on Mandaye")
return _302(self.urls.get('connection_url'))
if type(request.msg) == str:
post = parse_qs(request.msg, request)
else:
post = parse_qs(request.msg.read(), request)
logger.debug("association post: %r", post)
qs = parse_qs(env['QUERY_STRING'])
for key, value in qs.iteritems():
qs[key] = value[0]
post_fields = self.form_values['post_fields']
post_values = {}
for field in post_fields:
if not post.has_key(field):
logger.info('Association auth failed: form not correctly filled')
logger.info('%r is missing', field)
qs['type'] = 'badlogin'
return _302(self.urls.get('associate_url') + "?%s" % urllib.urlencode(qs))
post_values[field] = post[field][0]
response = self.replay(env, post_values)
if eval(values['condition']):
logger.debug("Replay works: save the association")
self._save_association(env, unique_id, post_values)
if qs.has_key('next_url'):
return _302(qs['next_url'], response.cookies)
return response
logger.info('Auth failed: Bad password or login')
qs['type'] = 'badlogin'
return _302(self.urls.get('associate_url') + "?%s" % urllib.urlencode(qs))
def _login_sp_user(self, association, env, condition, values):
""" Log in sp user
"""
if not association['sp_login']:
return _500(env['PATH_INFO'],
'Invalid values for AuthFormDispatcher.login')
post_values = copy.copy(association['sp_post_values'])
if config.encrypt_sp_password:
password = self.decrypt_pwd(post_values[self.form_values['password_field']])
post_values[self.form_values['password_field']] = password
response = self.replay(env, post_values)
qs = parse_qs(env['QUERY_STRING'])
if condition and eval(condition):
Association.update_last_connection(association['id'])
env['beaker.session']['old_association_id'] = None
env['beaker.session'][self.site_name] = association['id']
env['beaker.session'].save()
if qs.has_key('next_url'):
return _302(qs['next_url'][0], response.cookies)
else:
return response
else:
env['beaker.session']['old_association_id'] = association['id']
env['beaker.session'].save()
return _302(self.urls.get('associate_url') + "?type=failed")
def login(self, env, values, request, response):
""" Automatic login on a site with a form
"""
# Specific method to get current idp unique id
unique_id = self.get_current_unique_id(env)
logger.debug('Trying to login on Mandaye')
if not unique_id:
return _401('Access denied: invalid token')
# FIXME: hack to force beaker to generate an id
# somtimes beaker doesn't do it by himself
env['beaker.session'].regenerate_id()
env['beaker.session']['unique_id'] = unique_id
env['beaker.session'].save()
logger.debug('User %s successfully login' % env['beaker.session']['unique_id'])
association = Association.get_last_connected(self.site_name, unique_id)
if not association:
logger.debug('User %s is not associate' % env['beaker.session']['unique_id'])
return _302(self.urls.get('associate_url') + "?type=first")
return self._login_sp_user(association, env, values['condition'], values)
def logout(self, env, values, request, response):
""" Destroy the Beaker session
"""
logger.debug('Logout from Mandaye')
env['beaker.session'].delete()
return response
def auto_connection(self, env, values, request, response):
connection_url = self.urls["connection_url"]
logger.debug("Redirection using url : %s" % connection_url)
return _302(connection_url)
def local_logout(self, env, values, request, response):
logger.info('SP logout initiated by Mandaye')
self.logout(env, values, request, response)
next_url = None
qs = parse_qs(env['QUERY_STRING'])
if qs.has_key('RelayState'):
next_url = qs['RelayState'][0]
elif qs.has_key('next_url'):
next_url = qs['next_url'][0]
elif values.has_key('next_url'):
next_url = values['next_url']
logout_url = None
if self.env['mandaye.config'].has_key('sp_logout_url'):
logout_url = self.env['mandaye.config'].get('sp_logout_url')
elif self.env['mandaye.config'].has_key('saml2_sp_logout_url'):
logger.warning("Deprecated saml2_sp_logout_url optioin use sp_logout_url instead")
logout_url = self.env['mandaye.config'].get('saml2_sp_logout_url')
req_cookies = request.cookies
if not logout_url:
logger.warning('sp_logout_url not set into vhost configuration only removing cookies')
for cookie in req_cookies.values():
cookie['expires'] = 'Thu, 01 Jan 1970 00:00:01 GMT'
cookie['path'] = self.env['mandaye.config'].get('cookies_path', '/')
if next_url:
return _302(next_url, req_cookies)
else:
return _302('/', req_cookies)
request = HTTPRequest(req_cookies)
response = get_response(env, request, logout_url)
if next_url:
return _302(next_url, response.cookies)
else:
return response
def change_user(self, env, values, request, response):
""" Multi accounts feature
Change the current login user
You must call this method into a response filter
This method must have a query string with a username parameter
"""
# TODO: need to logout the first
unique_id = env['beaker.session']['unique_id']
qs = parse_qs(env['QUERY_STRING'])
if not qs.has_key('id') and not unique_id:
return _401('Access denied: beaker session invalid or not qs id')
if qs.has_key('id'):
asso_id = qs['id'][0]
association = Association.get_by_id(asso_id)
else:
association = Association.get_last_connected(self.site_name, unique_id)
if not association:
return _302(self.urls.get('associate_url'))
return self._login_sp_user(association, env, 'response.code==302', values)
def disassociate(self, env, values, request, response):
""" Disassociate an account with the Mandaye account
You need to put the id of the sp user you want to disassociate
in the query string (..?id=42) or use by service provider name
(..?sp_name=)
"""
if env['beaker.session'].has_key('unique_id'):
unique_id = env['beaker.session']['unique_id']
else:
return _401('Access denied: no session')
qs = parse_qs(env['QUERY_STRING'])
if values.get('next_url'):
next_url = values.get('next_url')
else:
next_url = '/'
if qs.has_key('next_url'):
next_url = qs['next_url'][0]
if qs.has_key('id'):
asso_id = qs['id'][0]
if Association.has_id(asso_id):
Association.delete(asso_id)
if Association.get(self.site_name, unique_id):
env['QUERY_STRING'] = ''
return self.change_user(env, values, request, response)
else:
return _401('Access denied: bad id')
elif qs.has_key('sp_name'):
sp_name = qs['sp_name'][0]
for asso in \
Association.get(sp_name, unique_id):
Association.delete(asso['id'])
else:
return _401('Access denied: no id or sp name')
values['next_url'] = next_url
if qs.has_key('logout'):
return self.local_logout(env, values, request, response)
return _302(next_url)
def is_connected_a2(self, env, request, response):
""" Auto connection only which works only with Authentic2
"""
if request.cookies.has_key('A2_OPENED_SESSION') and\
not env['beaker.session'].has_key('unique_id'):
logger.info('Trying an auto connection')
return True
return False