This repository has been archived on 2023-02-21. You can view files and clone it, but cannot push or open issues or pull requests.
mandaye/mandaye/auth/authform.py

258 lines
11 KiB
Python

"""
Dispatcher for basic auth form authentifications
"""
import Cookie
import logging
import re
import urllib
import mandaye
from cookielib import CookieJar
from datetime import datetime
from lxml.html import fromstring
from urlparse import parse_qs
from mandaye.models import Site, ExtUser, LocalUser
from mandaye.http import HTTPResponse, HTTPHeader, HTTPRequest
from mandaye.response import _500, _302, _401
from mandaye.response import template_response
from mandaye.server import get_response
class AuthForm(object):
def __init__(self, local_auth, form_values, site_name):
""" local_auth: instance of your local authentification system
form_values: dict example :
{
'form_action': '/myform',
'form_url': '/myform',
'form_attrs': { 'name': 'form40', },
'username_field': 'user',
'password_field': 'pwd'
}
site_name: str with the site name
"""
self.local_auth = local_auth
if not form_values.has_key('form_headers'):
form_values['form_headers'] = {
'Content-Type': 'application/x-www-form-urlencoded',
'User-Agent': 'Mozilla/5.0 Mandaye/0.0'
}
if not form_values.has_key('form_url') or \
not form_values.has_key('form_attrs') or \
not form_values.has_key('username_field') or \
not form_values.has_key('password_field'):
logging.critical("Bad configuration: AuthForm form_values dict must have \
this keys: form_url, form_attrs, username_field and password_field")
# TODO: manage Mandaye exceptions
raise BaseException, 'AuthForm bad configuration'
self.form_url = form_values['form_url']
self.form_values = form_values
self.site_name = site_name
def replay(self, env, username, password, extra_values={}):
""" replay the login / password
env: WSGI env with beaker session and the target
extra_values: dict with the field name (key) and the field value (value)
"""
if not "://" in self.form_url:
self.form_url = env['target'].geturl() + '/' + self.form_url
cj = CookieJar()
request = HTTPRequest()
login = get_response(env, request, self.form_url, cj)
if login.code == 502:
return login
html = fromstring(login.msg)
auth_form = None
for form in html.forms:
is_good = True
for key, value in self.form_values['form_attrs'].iteritems():
if form.get(key) != value:
is_good = False
if is_good:
auth_form = form
break
if auth_form == None:
logging.critical("%s %s: can't find login form on %s" %
(env['HTTP_HOST'], env['PATH_INFO'], self.form_url))
return _500(env['PATH_INFO'], "Replay: Can't find login form")
if not self.form_values.has_key('from_action'):
if not auth_form.action:
logging.critical("%s %s: don't find form action on %s" %
(env['HTTP_HOST'], env['PATH_INFO'], self.form_url))
return _500(env['PATH_INFO'], 'Replay: form action not found')
action = auth_form.action
else:
action = self.form_values['form_action']
if not "://" in action:
action = env['target'].geturl() + '/' + action
cookies = login.cookies
headers = HTTPHeader()
headers.load_from_dict(self.form_values['form_headers'])
params = {}
for input in auth_form.inputs:
if input.name and input.type != 'button':
if input.value:
params[input.name] = input.value
else:
params[input.name] = ''
params[self.form_values['username_field']] = username
params[self.form_values['password_field']] = password
for key, value in extra_values.iteritems():
params[key] = value
params = urllib.urlencode(params)
request = HTTPRequest(cookies, headers, "POST", params)
return get_response(env, request, action, cj)
def associate_submit(self, env, values, condition, request, response):
""" Associate your login / password into your database
"""
login = env['beaker.session'].get('login')
if request.msg:
post = parse_qs(request.msg.read(), request)
if not post.has_key('username') or not post.has_key('password'):
logging.info('Auth failed: form not correctly filled')
return _302(values.get('associate_url') + "?type=badlogin")
username = post['username'][0]
# TODO: generized this part (use a generic key / value table)
extra_values = {}
if post.has_key('birthdate') and self.form_values.has_key('birthdate_field'):
birthdate_field = self.form_values['birthdate_field']
extra_values = {birthdate_field: post['birthdate'][0]}
response = self.replay(env, username,
post['password'][0], extra_values)
if eval(condition):
if not login:
return _302(values.get('connection_url'))
else:
site = mandaye.sql_session.query(Site).\
filter_by(name=self.site_name).first()
if not site:
site = Site(self.site_name)
mandaye.sql_session.add(site)
local_user = mandaye.sql_session.query(LocalUser).\
filter_by(login=login).first()
if not local_user:
local_user = LocalUser(login=login)
mandaye.sql_session.add(local_user)
ext_user = mandaye.sql_session.query(ExtUser).\
join(LocalUser).\
filter(LocalUser.login==login).\
filter(ExtUser.login==username).\
first()
if not ext_user:
ext_user = ExtUser()
mandaye.sql_session.add(ext_user)
logging.info('New association: %s on site %s' % \
(ext_user.login, self.site_name))
ext_user.login = post['username'][0]
ext_user.password = post['password'][0]
ext_user.local_user = local_user
ext_user.site = site
# TODO: generalize this
if post.has_key('birthdate'):
ext_user.birthdate = post['birthdate'][0]
mandaye.sql_session.commit()
return _302(values.get('connection_url'))
logging.info('Auth failed: Bad password or login for %s on %s' % \
(post['username'][0], self.site_name))
return _302(values.get('associate_url') + "?type=badlogin")
def _login_ext_user(self, ext_user, env, condition, values):
""" Log in an external user
"""
if not ext_user.login or not ext_user.password:
return _500(env['PATH_INFO'],
'Invalid values for AuthFormDispatcher.login')
# TODO: generized this condition
extra_values = {}
if ext_user.birthdate and self.form_values.has_key('birthdate_field'):
extra_values = { self.form_values['birthdate_field']: ext_user.birthdate }
response = self.replay(env, ext_user.login,
ext_user.password, extra_values)
if condition and eval(condition):
ext_user.last_connection = datetime.now()
mandaye.sql_session.commit()
env['beaker.session'][self.site_name] = ext_user.login
env['beaker.session'].save()
return response
else:
return _302(values.get('associate_url') + "?type=failed")
def login(self, env, values, condition, request, response):
""" Automatic login on a site with a form
"""
login = self.local_auth.get_current_login(env)
if not login:
return _401('Access denied: invalid token')
env['beaker.session']['login'] = login
env['beaker.session'].save()
ext_user = mandaye.sql_session.query(ExtUser).\
join(LocalUser).\
join(Site).\
filter(LocalUser.login==login).\
filter(Site.name==self.site_name).\
order_by(ExtUser.last_connection.desc()).\
first()
if not ext_user:
return _302(values.get('associate_url') + "?type=first")
return self._login_ext_user(ext_user, env, condition, values)
def change_user(self, env, values, request, response):
""" Multi accounts feature
Change the current login user
You must call this method into a response filter
This method must have a query string with a username parameter
"""
# TODO: need to logout the first
login = env['beaker.session']['login']
qs = parse_qs(env['QUERY_STRING'])
if not login and not qs.has_key('username'):
return _401('Access denied: beaker session invalid or not username')
username = qs['username'][0]
ext_user = mandaye.sql_session.query(ExtUser).\
join(LocalUser).\
join(Site).\
filter(LocalUser.login==login).\
filter(Site.name==self.site_name).\
filter(ExtUser.login==username).\
first()
if not ext_user:
return _302(values.get('associate_url'))
return self._login_ext_user(ext_user, env, 'response.code==302', values)
def disassociate(self, env, values, request, response):
""" Multi accounts feature
Disassociate an account with the Mandaye account
You need to put the username you want to disassociate
in the query string (..?username=toto)
"""
login = env['beaker.session']['login']
qs = parse_qs(env['QUERY_STRING'])
if not login and not qs.has_key('username'):
return _401('Access denied: beaker session invalid or not username')
username = qs['username'][0]
ext_user = mandaye.sql_session.query(ExtUser).\
join(LocalUser).\
join(Site).\
filter(LocalUser.login==login).\
filter(Site.name==self.site_name).\
filter(ExtUser.login==username).\
first()
if ext_user:
mandaye.sql_session.delete(ext_user)
mandaye.sql_session.commit()
return _302(values.get('next_url'))