346 lines
12 KiB
Python
346 lines
12 KiB
Python
import base64
|
|
import json
|
|
|
|
from django.utils.encoding import force_bytes, force_text
|
|
from django.utils.six.moves.urllib import parse as urllib
|
|
from django.utils.six.moves.urllib import parse as urlparse
|
|
from quixote import cleanup, get_session_manager
|
|
|
|
from utilities import get_app, create_temporary_pub
|
|
import mock
|
|
|
|
PROFILE = {
|
|
'fields': [
|
|
{
|
|
'kind': 'string',
|
|
'description': '',
|
|
'required': True,
|
|
'user_visible': True,
|
|
'label': u'Prenoms',
|
|
'disabled': False,
|
|
'user_editable': True,
|
|
'asked_on_registration': True,
|
|
'name': 'prenoms'
|
|
},
|
|
{
|
|
'kind': 'string',
|
|
'description': '',
|
|
'required': True,
|
|
'user_visible': True,
|
|
'label': 'Nom',
|
|
'disabled': False,
|
|
'user_editable': True,
|
|
'asked_on_registration': True,
|
|
'name': 'nom'
|
|
},
|
|
{
|
|
'kind': 'string',
|
|
'description': '',
|
|
'required': True,
|
|
'user_visible': True,
|
|
'label': 'Email',
|
|
'disabled': False,
|
|
'user_editable': True,
|
|
'asked_on_registration': True,
|
|
'name': 'email'
|
|
},
|
|
]
|
|
}
|
|
|
|
|
|
def base64url_encode(v):
|
|
return base64.urlsafe_b64encode(force_bytes(v)).strip(b'=')
|
|
|
|
|
|
def setup_module(module):
|
|
cleanup()
|
|
global pub
|
|
pub = create_temporary_pub()
|
|
|
|
|
|
def setup_user_profile(pub):
|
|
if not pub.cfg:
|
|
pub.cfg = {}
|
|
# create some roles
|
|
from wcs.ctl.check_hobos import CmdCheckHobos
|
|
|
|
# setup an hobo profile
|
|
CmdCheckHobos().update_profile(PROFILE, pub)
|
|
pub.cfg['users']['field_name'] = ['_prenoms', '_nom']
|
|
pub.cfg['debug'] = {'logger': True}
|
|
pub.user_class.wipe()
|
|
pub.write_cfg()
|
|
|
|
FC_CONFIG = {
|
|
'client_id': '123',
|
|
'client_secret': 'xyz',
|
|
'platform': 'dev-particulier',
|
|
'scopes': 'identite_pivot',
|
|
'user_field_mappings': [
|
|
{
|
|
'field_varname': 'prenoms',
|
|
'value': '[given_name ""]',
|
|
'verified': 'always',
|
|
},
|
|
{
|
|
'field_varname': 'nom',
|
|
'value': '[family_name ""]',
|
|
'verified': 'always',
|
|
},
|
|
{
|
|
'field_varname': 'email',
|
|
'value': '[email ""]',
|
|
'verified': 'always',
|
|
},
|
|
]
|
|
|
|
}
|
|
|
|
|
|
def setup_fc_environment(pub):
|
|
if not pub.cfg:
|
|
pub.cfg = {}
|
|
pub.cfg['identification'] = {
|
|
'methods': ['fc'],
|
|
}
|
|
pub.cfg['fc'] = FC_CONFIG
|
|
pub.user_class.wipe()
|
|
pub.write_cfg()
|
|
|
|
|
|
def get_session(app):
|
|
try:
|
|
session_id = list(app.cookies.values())[0]
|
|
except IndexError:
|
|
return None
|
|
else:
|
|
session_id = session_id.strip('"')
|
|
return get_session_manager().session_class.get(session_id)
|
|
|
|
|
|
def test_fc_login_page(caplog):
|
|
setup_user_profile(pub)
|
|
setup_fc_environment(pub)
|
|
app = get_app(pub)
|
|
resp = app.get('/')
|
|
resp = app.get('/login/')
|
|
assert resp.status_int == 302
|
|
assert resp.location.startswith('https://fcp.integ01.dev-franceconnect.fr/api/v1/authorize')
|
|
qs = urlparse.parse_qs(resp.location.split('?')[1])
|
|
nonce = qs['nonce'][0]
|
|
state = qs['state'][0]
|
|
|
|
id_token = {
|
|
'nonce': nonce,
|
|
}
|
|
token_result = {
|
|
'access_token': 'abcd',
|
|
'id_token': '.%s.' % force_text(base64url_encode(json.dumps(id_token))),
|
|
}
|
|
user_info_result = {
|
|
'sub': 'ymca',
|
|
'given_name': 'John',
|
|
'family_name': 'Doe',
|
|
'email': 'john.doe@example.com',
|
|
}
|
|
|
|
assert pub.user_class.count() == 0
|
|
with mock.patch('wcs.qommon.ident.franceconnect.http_post_request') as http_post_request, \
|
|
mock.patch('wcs.qommon.ident.franceconnect.http_get_page') as http_get_page:
|
|
http_post_request.return_value = (None, 200, json.dumps(token_result), None)
|
|
http_get_page.return_value = (None, 200, json.dumps(user_info_result), None)
|
|
resp = app.get('/ident/fc/callback?%s' % urllib.urlencode({
|
|
'code': '1234', 'state': state,
|
|
}))
|
|
assert pub.user_class.count() == 1
|
|
user = pub.user_class.select()[0]
|
|
assert user.form_data == {'_email': 'john.doe@example.com', '_nom': 'Doe', '_prenoms': 'John'}
|
|
assert set(user.verified_fields) == set(['_nom', '_prenoms', '_email'])
|
|
assert user.email == 'john.doe@example.com'
|
|
assert user.name_identifiers == ['ymca']
|
|
assert user.name == 'John Doe'
|
|
|
|
# Verify we are logged in
|
|
session = get_session(app)
|
|
assert session.user == user.id
|
|
assert session.extra_user_variables['fc_given_name'] == 'John'
|
|
assert session.extra_user_variables['fc_family_name'] == 'Doe'
|
|
assert session.extra_user_variables['fc_email'] == 'john.doe@example.com'
|
|
assert session.extra_user_variables['fc_sub'] == 'ymca'
|
|
|
|
resp = app.get('/logout')
|
|
splitted = urlparse.urlsplit(resp.location)
|
|
assert urlparse.urlunsplit((splitted.scheme, splitted.netloc, splitted.path, '', '')) \
|
|
== 'https://fcp.integ01.dev-franceconnect.fr/api/v1/logout'
|
|
assert urlparse.parse_qs(splitted.query)['post_logout_redirect_uri'] == ['http://example.net']
|
|
assert urlparse.parse_qs(splitted.query)['id_token_hint']
|
|
assert not get_session(app)
|
|
|
|
# Test error handling path
|
|
resp = app.get('/ident/fc/callback?%s' % urllib.urlencode({
|
|
'state': state,
|
|
'error': 'access_denied',
|
|
}))
|
|
assert 'user did not authorize login' in caplog.records[-1].message
|
|
resp = app.get('/ident/fc/callback?%s' % urllib.urlencode({
|
|
'state': state,
|
|
'error': 'whatever',
|
|
}))
|
|
assert 'whatever' in caplog.records[-1].message
|
|
|
|
# Login existing user
|
|
def logme(login_url):
|
|
resp = app.get(login_url)
|
|
assert resp.status_int == 302
|
|
assert resp.location.startswith('https://fcp.integ01.dev-franceconnect.fr/api/v1/authorize')
|
|
qs = urlparse.parse_qs(resp.location.split('?')[1])
|
|
state = qs['state'][0]
|
|
id_token['nonce'] = qs['nonce'][0]
|
|
token_result['id_token'] = '.%s.' % force_text(base64url_encode(json.dumps(id_token)))
|
|
|
|
with mock.patch('wcs.qommon.ident.franceconnect.http_post_request') as http_post_request, \
|
|
mock.patch('wcs.qommon.ident.franceconnect.http_get_page') as http_get_page:
|
|
http_post_request.return_value = (None, 200, json.dumps(token_result), None)
|
|
http_get_page.return_value = (None, 200, json.dumps(user_info_result), None)
|
|
resp = app.get('/ident/fc/callback?%s' % urllib.urlencode({
|
|
'code': '1234', 'state': state,
|
|
}))
|
|
return resp
|
|
app.get('/logout')
|
|
resp = logme('/login/')
|
|
new_session = get_session(app)
|
|
assert session.id != new_session.id, 'no new session created'
|
|
assert pub.user_class.count() == 1, 'existing user has not been used'
|
|
assert new_session.user == user.id
|
|
|
|
# Login with next url
|
|
app.get('/logout')
|
|
resp = logme('/login/?next=/foo/bar/')
|
|
assert resp.status_int == 302
|
|
assert resp.location.endswith('/foo/bar/')
|
|
|
|
# Direct login link
|
|
app.get('/logout')
|
|
resp = logme('/ident/fc/login')
|
|
new_session = get_session(app)
|
|
assert session.id != new_session.id, 'no new session created'
|
|
assert pub.user_class.count() == 1, 'existing user has not been used'
|
|
assert new_session.user == user.id
|
|
app.get('/logout')
|
|
resp = logme('/ident/fc/login?next=/foo/bar/')
|
|
assert resp.status_int == 302
|
|
assert resp.location.endswith('/foo/bar/')
|
|
|
|
# User with missing attributes
|
|
resp = app.get('/logout')
|
|
resp = app.get('/login/')
|
|
assert resp.status_int == 302
|
|
assert resp.location.startswith('https://fcp.integ01.dev-franceconnect.fr/api/v1/authorize')
|
|
qs = urlparse.parse_qs(resp.location.split('?')[1])
|
|
state = qs['state'][0]
|
|
id_token['nonce'] = qs['nonce'][0]
|
|
token_result['id_token'] = '.%s.' % force_text(base64url_encode(json.dumps(id_token)))
|
|
bad_user_info_result = {
|
|
'sub': 'ymca2',
|
|
'given_name': 'John',
|
|
'family_name': 'Deux',
|
|
# 'email': 'john.deux@example.com', # missing
|
|
}
|
|
with mock.patch('wcs.qommon.ident.franceconnect.http_post_request') as http_post_request, \
|
|
mock.patch('wcs.qommon.ident.franceconnect.http_get_page') as http_get_page:
|
|
http_post_request.return_value = (None, 200, json.dumps(token_result), None)
|
|
http_get_page.return_value = (None, 200, json.dumps(bad_user_info_result), None)
|
|
resp = app.get('/ident/fc/callback?%s' % urllib.urlencode({
|
|
'code': '1234', 'state': state,
|
|
}))
|
|
assert pub.user_class.count() == 1, 'an invalid user (no email) has been created'
|
|
session = get_session(app)
|
|
assert (not session or not session.user)
|
|
|
|
|
|
def test_fc_settings():
|
|
setup_user_profile(pub)
|
|
app = get_app(pub)
|
|
resp = app.get('/backoffice/settings/identification/')
|
|
resp.forms[0]['methods$elementfc'].checked = True
|
|
resp = resp.forms[0].submit().follow()
|
|
|
|
assert 'FranceConnect' in resp.text
|
|
resp = resp.click('FranceConnect')
|
|
resp = resp.forms[0].submit('user_field_mappings$add_element')
|
|
resp = resp.forms[0].submit('user_field_mappings$add_element')
|
|
resp.forms[0]['client_id'].value = '123'
|
|
resp.forms[0]['client_secret'].value = 'xyz'
|
|
resp.forms[0]['platform'].value = 'Development citizens'
|
|
resp.forms[0]['scopes'].value = 'identite_pivot'
|
|
|
|
resp.forms[0]['user_field_mappings$element0$field_varname'] = 'prenoms'
|
|
resp.forms[0]['user_field_mappings$element0$value$value_template'] = '[given_name ""]'
|
|
resp.forms[0]['user_field_mappings$element0$value$type'] = 'template'
|
|
resp.forms[0]['user_field_mappings$element0$verified'] = 'Always'
|
|
|
|
resp.forms[0]['user_field_mappings$element1$field_varname'] = 'nom'
|
|
resp.forms[0]['user_field_mappings$element1$value$value_template'] = '[family_name ""]'
|
|
resp.forms[0]['user_field_mappings$element1$value$type'] = 'template'
|
|
resp.forms[0]['user_field_mappings$element1$verified'] = 'Always'
|
|
|
|
resp.forms[0]['user_field_mappings$element2$field_varname'] = 'email'
|
|
resp.forms[0]['user_field_mappings$element2$value$value_template'] = '[email ""]'
|
|
resp.forms[0]['user_field_mappings$element2$value$type'] = 'template'
|
|
resp.forms[0]['user_field_mappings$element2$verified'] = 'Always'
|
|
|
|
resp = resp.forms[0].submit('submit').follow()
|
|
assert pub.cfg['fc'] == FC_CONFIG
|
|
|
|
|
|
def test_fc_settings_no_user_profile():
|
|
FC_CONFIG = {
|
|
'client_id': '123',
|
|
'client_secret': 'xyz',
|
|
'platform': 'dev-particulier',
|
|
'scopes': 'identite_pivot',
|
|
'user_field_mappings': [
|
|
{
|
|
'field_varname': '__name',
|
|
'value': '[given_name ""] [family_name ""]',
|
|
'verified': 'always',
|
|
},
|
|
{
|
|
'field_varname': '__email',
|
|
'value': '[email ""]',
|
|
'verified': 'always',
|
|
},
|
|
]
|
|
|
|
}
|
|
|
|
pub.cfg = {'misc': {'charset': 'utf-8'}}
|
|
pub.user_class.wipe()
|
|
pub.write_cfg()
|
|
app = get_app(pub)
|
|
resp = app.get('/backoffice/settings/identification/')
|
|
resp.forms[0]['methods$elementfc'].checked = True
|
|
resp = resp.forms[0].submit().follow()
|
|
|
|
assert 'FranceConnect' in resp.text
|
|
resp = resp.click('FranceConnect')
|
|
resp = resp.forms[0].submit('user_field_mappings$add_element')
|
|
resp = resp.forms[0].submit('user_field_mappings$add_element')
|
|
resp.forms[0]['client_id'].value = '123'
|
|
resp.forms[0]['client_secret'].value = 'xyz'
|
|
resp.forms[0]['platform'].value = 'Development citizens'
|
|
resp.forms[0]['scopes'].value = 'identite_pivot'
|
|
|
|
resp.forms[0]['user_field_mappings$element0$field_varname'] = '__name'
|
|
resp.forms[0]['user_field_mappings$element0$value$value_template'] = '[given_name ""] [family_name ""]'
|
|
resp.forms[0]['user_field_mappings$element0$value$type'] = 'template'
|
|
resp.forms[0]['user_field_mappings$element0$verified'] = 'Always'
|
|
|
|
resp.forms[0]['user_field_mappings$element2$field_varname'] = '__email'
|
|
resp.forms[0]['user_field_mappings$element2$value$value_template'] = '[email ""]'
|
|
resp.forms[0]['user_field_mappings$element2$value$type'] = 'template'
|
|
resp.forms[0]['user_field_mappings$element2$verified'] = 'Always'
|
|
|
|
resp = resp.forms[0].submit('submit').follow()
|
|
assert pub.cfg['fc'] == FC_CONFIG
|