This repository has been archived on 2023-02-21. You can view files and clone it, but cannot push or open issues or pull requests.
mandaye/mandaye/auth/authform.py

120 lines
4.5 KiB
Python

"""
Dispatcher for basic auth form authentifications
"""
import Cookie
import httplib
import urllib
from urlparse import parse_qs
import mandaye
from mandaye.models import Site, ExtUser, LocalUser
from mandaye.http import HTTPResponse, HTTPHeader
from mandaye.response import _500, _302, _401
from mandaye.response import template_response
class AuthForm(object):
def __init__(self, local_auth, form_action, from_headers,
form_values, username_field, password_field):
""" local_auth: instance of your local authentification system
"""
self.local_auth = local_auth
self.form_action = form_action
self.from_headers = from_headers
self.form_values = form_values
self.username_field = username_field
self.password_field = password_field
def _replay(self, env, username, password):
""" replay the login / password
"""
headers = { 'Content-Type': 'application/x-www-form-urlencoded' }
params = self.form_values
# TODO: find the login / password in the destaut or redirect
params[self.username_field] = username
params[self.password_field] = password
params = urllib.urlencode(params)
conn = httplib.HTTPConnection(env['target'].hostname)
conn.request("POST", self.form_action, params, headers)
response = conn.getresponse()
conn.close()
return response
def associate(self, env, values, request):
""" Render the associate page
"""
return template_response(values['template'], values)
def associate_submit(self, env, values, condition, request, response):
""" Associate your login / password with Mandaye
"""
pseudo = env['beaker.session'].get('pseudo')
if request.msg:
post = parse_qs(request.msg.read())
replay_response = self._replay(env, post['username'][0], post['password'][0])
if eval(condition):
if not pseudo:
return _302(values.get('connection_url'))
else:
site = mandaye.sql_session.query(Site).\
filter_by(name=values.get('site_name')).first()
if not site:
site = Site(values.get('site_name'))
mandaye.sql_session.add(site)
local_user = mandaye.sql_session.query(LocalUser).\
filter_by(token=pseudo).first()
if not local_user:
local_user = LocalUser(token=pseudo)
mandaye.sql_session.add(local_user)
ext_user = mandaye.sql_session.query(ExtUser).join(LocalUser).\
filter(LocalUser.token==pseudo).first()
if not ext_user:
ext_user = ExtUser()
mandaye.sql_session.add(ext_user)
ext_user.login = post['username'][0]
ext_user.password = post['password'][0]
ext_user.local_user = local_user
mandaye.sql_session.commit()
return _302(values.get('connection_url'))
return response
def check_auth(self, env, values, request=None):
""" values: dict witth username, login
"""
pass
def login(self, env, values, condition, request):
""" Automatic login on a site with a form
"""
login = self.local_auth.get_current_login(env)
if not login:
return _401('Access denied: invalid token')
ext_user = mandaye.sql_session.query(ExtUser).\
join(LocalUser).\
filter(LocalUser.token==login).\
first()
if not ext_user:
return _302(values.get('associate_url'))
if not ext_user.login or not ext_user.password:
return _500(env['PATH_INFO'],
'Invalid values for AuthFormDispatcher.login')
response = self._replay(env, ext_user.login, ext_user.password)
headers = HTTPHeader()
headers.load_from_list(response.getheaders())
cookies = Cookie.BaseCookie(response.msg.getheader('set-cookie'))
response = HTTPResponse(response.status, response.reason, headers,
response.read(), cookies)
if condition and eval(condition):
return response
else:
# TODO; find a better solution
return _302(values.get('associate_url'))
def connection(self, values, response):
pass