This repository has been archived on 2023-02-21. You can view files and clone it, but cannot push or open issues or pull requests.
larpe/larpe/trunk/larpe/site_authentication.ptl

321 lines
14 KiB
Plaintext

import libxml2
import urllib
import urlparse
import httplib
import re
import os
import socket
import base64
from quixote import get_request, get_response, get_session, redirect, get_publisher
from quixote.directory import Directory
from quixote.http_request import parse_header
import lasso
from qommon import get_logger
from qommon.form import *
from qommon.errors import ConnectionError, ConfigurationError, LoginError
from qommon.misc import http_post_request, http_get_page
from qommon.template import *
from larpe.plugins import site_authentication_plugins
import misc
from users import User
from federations import Federation
class SiteAuthentication:
def __init__(self, host):
self.host = host
self.output_filters = []
def federate(self, username, password, provider_id, cookies, select):
user = get_session().get_user(provider_id)
if user is not None:
Federation(username, password, self.host.id, user.name_identifiers[0], cookies, select).store()
def sso_local_login(self, federation):
status, data = self.local_auth_check_dispatch(
federation.username, federation.password, federation.select_fields)
success, return_content = self.check_auth(status, data)
if success:
session = get_session()
if hasattr(session, 'cookies'):
federation.set_cookies(session.cookies)
federation.store()
return return_content
else:
return redirect('local_auth')
def local_auth [html] (self, first_time=True):
response = get_response()
response.set_content_type('text/html')
if hasattr(get_response(), str('breadcrumb')):
del get_response().breadcrumb
get_response().filter['default_org'] = '%s - %s' % (self.host.label, _('Local authentication'))
get_response().filter['body_class'] = 'login'
form = self.form_local_auth()
form.add_submit('submit', _('Submit'))
#form.add_submit('cancel', _('Cancel'))
# if form.get_widget('cancel').parse():
# return redirect('.')
authentication_failure = None
if form.is_submitted() and not form.has_errors():
try:
return self.submit_local_auth_form(form)
except LoginError:
authentication_failure = _('Authentication failure')
get_logger().info('local auth page : %s' % authentication_failure)
except ConnectionError, err:
authentication_failure = _('Connection failed : %s') % err
get_logger().info('local auth page : %s' % authentication_failure)
except ConfigurationError, err:
authentication_failure = _('This service provider is not fully configured : %s') % err
get_logger().info('local auth page : %s' % authentication_failure)
except Exception, err:
authentication_failure = _('Unknown error : %s' % err)
get_logger().info('local auth page : %s' % authentication_failure)
if authentication_failure:
'<div class="errornotice">%s</div>' % authentication_failure
'<p>'
_('Please type your login and password for this Service Provider.')
_('Your local account will be federated with your Liberty Alliance account.')
'</p>'
form.render()
# Also used in admin/hosts.ptl
def form_local_auth(self):
form = Form(enctype='multipart/form-data')
form.add(StringWidget, 'username', title = _('Username'), required = True,
size = 30)
form.add(PasswordWidget, 'password', title = _('Password'), required = True,
size = 30)
for name, values in self.host.select_fields.iteritems():
options = []
if values:
for value in values:
options.append(value)
form.add(SingleSelectWidget, name, title = name.capitalize(),
value = values[0], options = options)
return form
def submit_local_auth_form(self, form):
username = form.get_widget('username').parse()
password = form.get_widget('password').parse()
select = {}
for name, values in self.host.select_fields.iteritems():
if form.get_widget(name):
select[name] = form.get_widget(name).parse()
return self.local_auth_check(username, password, select)
def local_auth_check(self, username, password, select=None):
select = select or {}
status, data = self.local_auth_check_dispatch(username, password, select)
if status == 0:
raise
success, return_content = self.check_auth(status, data)
if success:
if misc.get_current_protocol() == lasso.PROTOCOL_SAML_2_0:
provider_id = self.host.saml2_provider_id
else:
provider_id = self.host.provider_id
session = get_session()
if hasattr(session, 'cookies'):
self.federate(username, password, provider_id, session.cookies, select)
else:
self.federate(username, password, provider_id, None, select)
return return_content
raise LoginError()
def local_auth_check_dispatch(self, username, password, select=None):
select = select or {}
if self.host.auth_mode == 'http_basic':
return self.local_auth_check_http_basic(username, password)
elif self.host.auth_mode == 'form' and hasattr(self.host, 'auth_check_url') \
and self.host.auth_check_url is not None:
return self.local_auth_check_post(username, password, select)
else:
raise ConfigurationError('No authentication form was found')
def local_auth_check_post(self, username, password, select=None):
select = select or {}
url = self.host.auth_check_url
# Build request body
if self.host.post_parameters:
body_params = {}
# Login field
if self.host.post_parameters[self.host.login_field_name]['enabled'] is True:
body_params[self.host.login_field_name] = username
# Password field
if self.host.post_parameters[self.host.password_field_name]['enabled'] is True:
body_params[self.host.password_field_name] = password
# Select fields
for name, value in select.iteritems():
if self.host.post_parameters[name]['enabled'] is True:
body_params[name] = value
# Other fields (hidden, submit and custom)
for name, value in self.host.other_fields.iteritems():
if self.host.post_parameters[name]['enabled'] is True:
body_params[name] = self.host.post_parameters[name]['value']
body = urllib.urlencode(body_params)
else:
# XXX: Legacy (to be removed later) Send all parameters for sites configured with a previous version of Larpe
body = '%s=%s&%s=%s' % (self.host.login_field_name, username, self.host.password_field_name, password)
# Add select fields to the body
for name, value in select.iteritems():
body += '&%s=%s' % (name, value)
# Add hidden fields to the body
if self.host.send_hidden_fields:
for name, value in self.host.other_fields.iteritems():
body += '&%s=%s' % (name, value)
# Build request HTTP headers
if self.host.http_headers:
headers = {}
if self.host.http_headers['X-Forwarded-For']['enabled'] is True:
headers['X-Forwarded-For'] = get_request().get_environ('REMOTE_ADDR', '-')
for name, value in self.host.http_headers.iteritems():
if value['enabled'] is True and value['immutable'] is False:
headers[name] = value['value']
else:
# XXX: (to be removed later) Send default headers for sites configured with a previous version of Larpe
headers = { 'Content-Type': 'application/x-www-form-urlencoded',
'X-Forwarded-For': get_request().get_environ('REMOTE_ADDR', '-'),
'X-Forwarded-Host': self.host.reversed_hostname }
# Send request
response, status, data, auth_headers = http_post_request(url, body, headers, self.host.use_proxy)
cookies = response.getheader('Set-Cookie', None)
self.host.cookies = []
if cookies is not None:
cookies_list = []
cookies_set_list = []
for cookie in cookies.split(', '):
# Drop the path and other attributes
cookie_only = cookie.split('; ')[0]
regexp = re.compile('=')
if regexp.search(cookie_only) is None:
continue
# Split name and value
cookie_split = cookie_only.split('=')
cookie_name = cookie_split[0]
cookie_value = cookie_split[1]
cookies_list.append('%s=%s' % (cookie_name, cookie_value))
set_cookie = '%s=%s; path=/' % (cookie_name, cookie_value)
cookies_set_list.append(set_cookie)
self.host.cookies.append(cookie_name)
cookies_headers = '\r\nSet-Cookie: '.join(cookies_set_list)
get_response().set_header('Set-Cookie', cookies_headers)
self.host.store()
get_session().cookies = '; '.join(cookies_list)
else:
get_logger().warn('No cookie from local authentication')
return response.status, data
def local_auth_check_http_basic(self, username, password):
url = self.host.auth_form_url
hostname, query = urllib.splithost(url[5:])
conn = httplib.HTTPConnection(hostname)
auth_header = 'Basic %s' % base64.encodestring('%s:%s' % (username, password))
try:
conn.request('GET', query, headers={'Authorization': auth_header})
except socket.gaierror, err:
print err
conn.close()
return 0, None
else:
response = conn.getresponse()
conn.close()
return response.status, response.read()
def check_auth(self, status, data):
success = False
return_content = ''
# If status is 500, fail without checking other criterias
if status // 100 == 5:
success = False
return_content = redirect(self.host.get_return_url())
# For http auth, only check status code
elif self.host.auth_mode == 'http_basic':
# If failed, status code should be 401
if status // 100 == 2 or status // 100 == 3:
success = True
return_content = redirect(self.host.get_return_url())
else:
if self.host.auth_system == 'password':
# If there is a password field, authentication probably failed
regexp = re.compile("""<input[^>]*?type=["']?password["']?[^>]*?>""", re.DOTALL | re.IGNORECASE)
if not regexp.findall(data):
success = True
return_content = redirect(self.host.get_return_url())
elif self.host.auth_system == 'status':
match_status = int(self.host.auth_match_status)
if match_status == status:
success = True
return_content = redirect(self.host.get_return_url())
elif self.host.auth_system == 'match_text':
# If the auth_match_text is not matched, it means the authentication is successful
regexp = re.compile(self.host.auth_match_text, re.DOTALL)
if not regexp.findall(data):
success = True
return_content = redirect(self.host.get_return_url())
return success, return_content
def local_logout(self, federation=None, user=None, cookies=None):
if cookies is None and federation is None and user is not None:
federations = Federation.select(lambda x: user.name_identifiers[0] in x.name_identifiers)
if federations:
cookies = federations[0].cookies
# Logout request to the site
url = self.host.logout_url
if url is not None and cookies is not None:
try:
http_get_page(url, {'Cookie': cookies})
except ConnectionError, err:
get_logger().warning(_("%s logout failed") % url)
get_logger().debug(err)
# Remove cookies from the browser
# TODO: this should be removed because this only works
# with a 'direct' logout
if hasattr(self.host, 'cookies'):
for cookie in self.host.cookies:
get_response().expire_cookie(cookie, path='/')
def local_defederate(self, session, provider_id):
if session is None:
return
user = session.get_user(provider_id)
if user is not None:
federations = Federation.select(lambda x: user.name_identifiers[0] in x.name_identifiers)
for federation in federations:
self.local_logout(provider_id, federation)
federation.remove_name_identifier(user.name_identifiers[0])
federation.store()
def get_site_authentication(host):
if host.site_authentication_plugin is None:
return SiteAuthentication(host)
return site_authentication_plugins.get(host.site_authentication_plugin)(host)