wcs/tests/test_fc_auth.py

345 lines
12 KiB
Python

import urlparse
import base64
import json
import urllib
from quixote import cleanup, get_session_manager
from utilities import get_app, create_temporary_pub
import mock
PROFILE = {
'fields': [
{
'kind': 'string',
'description': '',
'required': True,
'user_visible': True,
'label': u'Prenoms',
'disabled': False,
'user_editable': True,
'asked_on_registration': True,
'name': 'prenoms'
},
{
'kind': 'string',
'description': '',
'required': True,
'user_visible': True,
'label': 'Nom',
'disabled': False,
'user_editable': True,
'asked_on_registration': True,
'name': 'nom'
},
{
'kind': 'string',
'description': '',
'required': True,
'user_visible': True,
'label': 'Email',
'disabled': False,
'user_editable': True,
'asked_on_registration': True,
'name': 'email'
},
]
}
def base64url_encode(v):
return base64.urlsafe_b64encode(v).strip('=')
def setup_module(module):
cleanup()
global pub
pub = create_temporary_pub()
def setup_user_profile(pub):
if not pub.cfg:
pub.cfg = {}
# create some roles
from wcs.ctl.check_hobos import CmdCheckHobos
# setup an hobo profile
CmdCheckHobos().update_profile(PROFILE, pub)
pub.cfg['users']['field_name'] = ['_prenoms', '_nom']
pub.cfg['debug'] = {'logger': True}
pub.user_class.wipe()
pub.write_cfg()
FC_CONFIG = {
'client_id': '123',
'client_secret': 'xyz',
'platform': 'dev-particulier',
'scopes': 'identite_pivot',
'user_field_mappings': [
{
'field_varname': 'prenoms',
'value': '[given_name ""]',
'verified': 'always',
},
{
'field_varname': 'nom',
'value': '[family_name ""]',
'verified': 'always',
},
{
'field_varname': 'email',
'value': '[email ""]',
'verified': 'always',
},
]
}
def setup_fc_environment(pub):
if not pub.cfg:
pub.cfg = {}
pub.cfg['identification'] = {
'methods': ['fc'],
}
pub.cfg['fc'] = FC_CONFIG
pub.user_class.wipe()
pub.write_cfg()
def get_session(app):
try:
session_id = app.cookies.values()[0]
except IndexError:
return None
else:
session_id = session_id.strip('"')
return get_session_manager().session_class.get(session_id)
def test_fc_login_page(caplog):
setup_user_profile(pub)
setup_fc_environment(pub)
app = get_app(pub)
resp = app.get('/')
resp = app.get('/login/')
assert resp.status_int == 302
assert resp.location.startswith('https://fcp.integ01.dev-franceconnect.fr/api/v1/authorize')
qs = urlparse.parse_qs(resp.location.split('?')[1])
nonce = qs['nonce'][0]
state = qs['state'][0]
id_token = {
'nonce': nonce,
}
token_result = {
'access_token': 'abcd',
'id_token': '.%s.' % base64url_encode(json.dumps(id_token)),
}
user_info_result = {
'sub': 'ymca',
'given_name': 'John',
'family_name': 'Doe',
'email': 'john.doe@example.com',
}
assert pub.user_class.count() == 0
with mock.patch('qommon.ident.franceconnect.http_post_request') as http_post_request, \
mock.patch('qommon.ident.franceconnect.http_get_page') as http_get_page:
http_post_request.return_value = (None, 200, json.dumps(token_result), None)
http_get_page.return_value = (None, 200, json.dumps(user_info_result), None)
resp = app.get('/ident/fc/callback?%s' % urllib.urlencode({
'code': '1234', 'state': state,
}))
assert pub.user_class.count() == 1
user = pub.user_class.select()[0]
assert user.form_data == {'_email': 'john.doe@example.com', '_nom': 'Doe', '_prenoms': 'John'}
assert set(user.verified_fields) == set(['_nom', '_prenoms', '_email'])
assert user.email == 'john.doe@example.com'
assert user.name_identifiers == ['ymca']
assert user.name == 'John Doe'
# Verify we are logged in
session = get_session(app)
assert session.user == user.id
assert session.extra_user_variables['fc_given_name'] == 'John'
assert session.extra_user_variables['fc_family_name'] == 'Doe'
assert session.extra_user_variables['fc_email'] == 'john.doe@example.com'
assert session.extra_user_variables['fc_sub'] == 'ymca'
resp = app.get('/logout')
splitted = urlparse.urlsplit(resp.location)
assert urlparse.urlunsplit((splitted.scheme, splitted.netloc, splitted.path, '', '')) \
== 'https://fcp.integ01.dev-franceconnect.fr/api/v1/logout'
assert urlparse.parse_qs(splitted.query)['post_logout_redirect_uri'] == ['http://example.net']
assert urlparse.parse_qs(splitted.query)['id_token_hint']
assert not get_session(app)
# Test error handling path
resp = app.get('/ident/fc/callback?%s' % urllib.urlencode({
'state': state,
'error': 'access_denied',
}))
assert 'user did not authorize login' in caplog.records[-1].message
resp = app.get('/ident/fc/callback?%s' % urllib.urlencode({
'state': state,
'error': 'whatever',
}))
assert 'whatever' in caplog.records[-1].message
# Login existing user
def logme(login_url):
resp = app.get(login_url)
assert resp.status_int == 302
assert resp.location.startswith('https://fcp.integ01.dev-franceconnect.fr/api/v1/authorize')
qs = urlparse.parse_qs(resp.location.split('?')[1])
state = qs['state'][0]
id_token['nonce'] = qs['nonce'][0]
token_result['id_token'] = '.%s.' % base64url_encode(json.dumps(id_token))
with mock.patch('qommon.ident.franceconnect.http_post_request') as http_post_request, \
mock.patch('qommon.ident.franceconnect.http_get_page') as http_get_page:
http_post_request.return_value = (None, 200, json.dumps(token_result), None)
http_get_page.return_value = (None, 200, json.dumps(user_info_result), None)
resp = app.get('/ident/fc/callback?%s' % urllib.urlencode({
'code': '1234', 'state': state,
}))
return resp
app.get('/logout')
resp = logme('/login/')
new_session = get_session(app)
assert session.id != new_session.id, 'no new session created'
assert pub.user_class.count() == 1, 'existing user has not been used'
assert new_session.user == user.id
# Login with next url
app.get('/logout')
resp = logme('/login/?next=/foo/bar/')
assert resp.status_int == 302
assert resp.location.endswith('/foo/bar/')
# Direct login link
app.get('/logout')
resp = logme('/ident/fc/login')
new_session = get_session(app)
assert session.id != new_session.id, 'no new session created'
assert pub.user_class.count() == 1, 'existing user has not been used'
assert new_session.user == user.id
app.get('/logout')
resp = logme('/ident/fc/login?next=/foo/bar/')
assert resp.status_int == 302
assert resp.location.endswith('/foo/bar/')
# User with missing attributes
resp = app.get('/logout')
resp = app.get('/login/')
assert resp.status_int == 302
assert resp.location.startswith('https://fcp.integ01.dev-franceconnect.fr/api/v1/authorize')
qs = urlparse.parse_qs(resp.location.split('?')[1])
state = qs['state'][0]
id_token['nonce'] = qs['nonce'][0]
token_result['id_token'] = '.%s.' % base64url_encode(json.dumps(id_token))
bad_user_info_result = {
'sub': 'ymca2',
'given_name': 'John',
'family_name': 'Deux',
# 'email': 'john.deux@example.com', # missing
}
with mock.patch('qommon.ident.franceconnect.http_post_request') as http_post_request, \
mock.patch('qommon.ident.franceconnect.http_get_page') as http_get_page:
http_post_request.return_value = (None, 200, json.dumps(token_result), None)
http_get_page.return_value = (None, 200, json.dumps(bad_user_info_result), None)
resp = app.get('/ident/fc/callback?%s' % urllib.urlencode({
'code': '1234', 'state': state,
}))
assert pub.user_class.count() == 1, 'an invalid user (no email) has been created'
session = get_session(app)
assert not session
def test_fc_settings():
setup_user_profile(pub)
app = get_app(pub)
resp = app.get('/backoffice/settings/identification/')
resp.forms[0]['methods$elementfc'].checked = True
resp = resp.forms[0].submit().follow()
assert 'FranceConnect' in resp.body
resp = resp.click('FranceConnect')
resp = resp.forms[0].submit('user_field_mappings$add_element')
resp = resp.forms[0].submit('user_field_mappings$add_element')
resp.forms[0]['client_id'].value = '123'
resp.forms[0]['client_secret'].value = 'xyz'
resp.forms[0]['platform'].value = 'Development citizens'
resp.forms[0]['scopes'].value = 'identite_pivot'
resp.forms[0]['user_field_mappings$element0$field_varname'] = 'prenoms'
resp.forms[0]['user_field_mappings$element0$value$value_template'] = '[given_name ""]'
resp.forms[0]['user_field_mappings$element0$value$type'] = 'template'
resp.forms[0]['user_field_mappings$element0$verified'] = 'Always'
resp.forms[0]['user_field_mappings$element1$field_varname'] = 'nom'
resp.forms[0]['user_field_mappings$element1$value$value_template'] = '[family_name ""]'
resp.forms[0]['user_field_mappings$element1$value$type'] = 'template'
resp.forms[0]['user_field_mappings$element1$verified'] = 'Always'
resp.forms[0]['user_field_mappings$element2$field_varname'] = 'email'
resp.forms[0]['user_field_mappings$element2$value$value_template'] = '[email ""]'
resp.forms[0]['user_field_mappings$element2$value$type'] = 'template'
resp.forms[0]['user_field_mappings$element2$verified'] = 'Always'
resp = resp.forms[0].submit('submit').follow()
assert pub.cfg['fc'] == FC_CONFIG
def test_fc_settings_no_user_profile():
FC_CONFIG = {
'client_id': '123',
'client_secret': 'xyz',
'platform': 'dev-particulier',
'scopes': 'identite_pivot',
'user_field_mappings': [
{
'field_varname': '__name',
'value': '[given_name ""] [family_name ""]',
'verified': 'always',
},
{
'field_varname': '__email',
'value': '[email ""]',
'verified': 'always',
},
]
}
pub.cfg = {'misc': {'charset': 'utf-8'}}
pub.user_class.wipe()
pub.write_cfg()
app = get_app(pub)
resp = app.get('/backoffice/settings/identification/')
resp.forms[0]['methods$elementfc'].checked = True
resp = resp.forms[0].submit().follow()
assert 'FranceConnect' in resp.body
resp = resp.click('FranceConnect')
resp = resp.forms[0].submit('user_field_mappings$add_element')
resp = resp.forms[0].submit('user_field_mappings$add_element')
resp.forms[0]['client_id'].value = '123'
resp.forms[0]['client_secret'].value = 'xyz'
resp.forms[0]['platform'].value = 'Development citizens'
resp.forms[0]['scopes'].value = 'identite_pivot'
resp.forms[0]['user_field_mappings$element0$field_varname'] = '__name'
resp.forms[0]['user_field_mappings$element0$value$value_template'] = '[given_name ""] [family_name ""]'
resp.forms[0]['user_field_mappings$element0$value$type'] = 'template'
resp.forms[0]['user_field_mappings$element0$verified'] = 'Always'
resp.forms[0]['user_field_mappings$element2$field_varname'] = '__email'
resp.forms[0]['user_field_mappings$element2$value$value_template'] = '[email ""]'
resp.forms[0]['user_field_mappings$element2$value$type'] = 'template'
resp.forms[0]['user_field_mappings$element2$verified'] = 'Always'
resp = resp.forms[0].submit('submit').follow()
assert pub.cfg['fc'] == FC_CONFIG