tests: simplify FranceConnect tests (#48042)

This commit is contained in:
Benjamin Dauvergne 2020-10-26 19:38:34 +01:00
parent 9f08f5c475
commit e28713c583
3 changed files with 315 additions and 617 deletions

View File

@ -14,129 +14,149 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import base64
import contextlib
import datetime
import json
import urllib.parse as urlparse
import uuid
from jwcrypto import jwk, jwt
import httmock
import pytest
import django_webtest
from django.contrib.auth import get_user_model
from django.core.cache import cache
from django_rbac.utils import get_ou_model
from authentic2 import hooks as a2_hooks
from authentic2.manager.utils import get_ou_count
from authentic2_auth_fc.models import FcAccount
from django.http import QueryDict
from django.urls import reverse
from django.utils.http import urlencode
from django.utils.timezone import now
CARTMAN_FC_INFO = {
"token": {
"access_token": "cartmane_access_token",
"token_type": "Bearer",
"expires_in": 1200,
"id_token": "cartman_token_id"
},
"sub": "c11661ed00014db58149c8a886c8180d",
"user_info": {
"birthcountry": "99404",
"birthdate": "2006-06-06",
"birthplace": "southpark",
"email": "ecartman@ou_southpark.org",
"family_name": "CARTMAN",
"gender": "male",
"given_name": "Eric",
"preferred_username": "CARTMAN",
"sub": "c11661ed00014db58149c8a886c8180d"
}
}
from authentic2.models import Service
from authentic2.utils import make_url
from ..utils import assert_equals_url
CLIENT_ID = 'xxx'
CLIENT_SECRET = 'yyy'
def create_user(**kwargs):
User = get_user_model()
password = kwargs.pop('password', None) or kwargs['username']
federation = kwargs.pop('federation', None)
user, created = User.objects.get_or_create(**kwargs)
if password:
user.set_password(password)
user.save()
class FranceConnectMock:
exp = None
if federation:
create_fc_federation(user, federation)
return user
def create_fc_federation(user, info):
kwargs = {
'user': user,
'token': json.dumps(info['token']),
'user_info': json.dumps(info['user_info']),
'sub': info['sub']
}
return FcAccount.objects.create(**kwargs)
@pytest.fixture
def app(request, db):
wtm = django_webtest.WebTestMixin()
wtm._patch_settings()
request.addfinalizer(wtm._unpatch_settings)
return django_webtest.DjangoTestApp(extra_environ={'HTTP_HOST': 'testserver'})
@pytest.fixture
def fc_settings(settings):
settings.A2_FC_ENABLE = True
settings.A2_FC_CLIENT_ID = 'xxx'
settings.A2_FC_CLIENT_SECRET = 'yyy'
return settings
@pytest.fixture
def ou_southpark(db):
OU = get_ou_model()
return OU.objects.create(name='southpark', slug='southpark')
@pytest.fixture
def admin(db):
return create_user(username='admin', is_superuser=True, is_staff=True)
@pytest.fixture
def user_cartman(db, ou_southpark):
return create_user(username='ecartman', first_name='eric', last_name='cartman',
email='ecartman@southpark.org', ou=ou_southpark, federation=CARTMAN_FC_INFO)
@pytest.fixture(autouse=True)
def clear_cache():
OU = get_ou_model()
cache.clear()
for cached_el in (OU.cached, a2_hooks.get_hooks, get_ou_count):
cached_el.cache.clear()
class AllHook(object):
def __init__(self):
self.calls = {}
self.sub = '1234'
self.id_token = {
'aud': 'xxx',
'iss': 'https://fcp.integ01.dev-franceconnect.fr/',
}
self.user_info = {
'family_name': 'Frédérique',
'given_name': 'Ÿuñe',
'email': 'john.doe@example.com',
}
self.access_token = str(uuid.uuid4())
self.client_id = CLIENT_ID
self.client_secret = CLIENT_SECRET
self.scopes = {'openid', 'profile', 'email'}
self.callback_params = {'service': 'portail', 'next': '/idp/'}
def __call__(self, hook_name, *args, **kwargs):
calls = self.calls.setdefault(hook_name, [])
calls.append({'args': args, 'kwargs': kwargs})
def handle_authorization(self, app, url, **kwargs):
assert url.startswith('https://fcp.integ01')
parsed_url = urlparse.urlparse(url)
query = QueryDict(parsed_url.query)
assert_equals_url(query['redirect_uri'], self.callback_url)
assert query['client_id'] == self.client_id
assert set(query['scope'].split()) == self.scopes
assert query['state']
assert query['nonce']
assert query['response_type'] == 'code'
assert query['acr_values'] == 'eidas1'
self.state = query['state']
self.nonce = query['nonce']
self.code = str(uuid.uuid4().hex)
return app.get(
make_url(self.callback_url, params={'code': self.code, 'state': self.state}), **kwargs)
def __getattr__(self, name):
return self.calls.get(name, [])
@property
def callback_url(self):
return 'http://testserver' + reverse('fc-login-or-link') + '?' + urlencode(self.callback_params)
def clear(self):
self.calls = {}
def login_with_fc_fixed_params(self, app):
if app.session:
app.session.flush()
response = app.get('/login/?' + urlencode(self.callback_params))
response = response.click(href='callback')
return self.handle_authorization(app, response.location, status=302)
def login_with_fc(self, app, path):
if app.session:
app.session.flush()
response = app.get(path)
self.callback_params = {k: v for k, v in QueryDict(urlparse.urlparse(response.location).query).items()}
response = response.follow()
response = response.click(href='callback')
return self.handle_authorization(app, response.location, status=302).follow()
def access_token_response(self, url, request):
formdata = QueryDict(request.body)
assert set(formdata.keys()) == {'code', 'client_id', 'client_secret',
'redirect_uri', 'grant_type'}
assert formdata['code'] == self.code
assert formdata['client_id'] == self.client_id
assert formdata['client_secret'] == self.client_secret
assert formdata['grant_type'] == 'authorization_code'
assert_equals_url(formdata['redirect_uri'], self.callback_url)
# make response
id_token = self.id_token.copy()
id_token.update({
'sub': self.sub,
'nonce': self.nonce,
'exp': int((self.exp or (now() + datetime.timedelta(seconds=60))).timestamp()),
})
id_token.update(self.user_info)
return json.dumps({
'access_token': self.access_token,
'id_token': self.hmac_jwt(id_token, self.client_secret)
})
def hmac_jwt(self, payload, key):
header = {'alg': 'HS256'}
k = jwk.JWK(kty='oct', k=base64.b64encode(key.encode('utf-8')).decode('ascii'))
t = jwt.JWT(header=header, claims=payload)
t.make_signed_token(k)
return t.serialize()
def user_info_response(self, url, request):
assert request.headers['Authorization'] == 'Bearer %s' % self.access_token
user_info = self.user_info.copy()
user_info['sub'] = self.sub
return json.dumps(user_info)
@contextlib.contextmanager
def __call__(self):
with httmock.HTTMock(
httmock.urlmatch(path=r'.*/token$')(self.access_token_response),
httmock.urlmatch(path=r'.*userinfo$')(self.user_info_response)):
yield None
def handle_logout(self, app, url):
assert url.startswith('https://fcp.integ01.dev-franceconnect.fr/api/v1/logout')
parsed_url = urlparse.urlparse(url)
query = QueryDict(parsed_url.query)
assert_equals_url(query['post_logout_redirect_uri'], 'http://testserver' + reverse('fc-logout'))
assert query['state']
self.state = query['state']
return app.get(reverse('fc-logout') + '?state=' + self.state)
@pytest.fixture
def hooks(settings):
if hasattr(settings, 'A2_HOOKS'):
hooks = settings.A2_HOOKS
else:
hooks = settings.A2_HOOKS = {}
hook = hooks['__all__'] = AllHook()
yield hook
hook.clear()
del settings.A2_HOOKS['__all__']
def franceconnect(settings, service):
settings.A2_FC_ENABLE = True
settings.A2_FC_CLIENT_ID = CLIENT_ID
settings.A2_FC_CLIENT_SECRET = CLIENT_SECRET
Service.objects.create(name='portail', slug='portail')
mock_object = FranceConnectMock()
with mock_object():
yield mock_object

View File

@ -15,84 +15,36 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import pytest
import re
import httmock
import mock
import json
import base64
from jwcrypto import jwk, jwt
import datetime
import mock
import requests
from django.contrib.auth import get_user_model
from django.urls import reverse
from django.utils.encoding import force_text
from django.utils.six.moves.urllib import parse as urlparse
from django.utils.timezone import now
from authentic2.models import Service
from authentic2_auth_fc import models
from authentic2_auth_fc.utils import requests_retry_session
from ..utils import login
from ..utils import login, get_link_from_mail
User = get_user_model()
@pytest.fixture(autouse=True)
def service(db):
return Service.objects.create(name='portail', slug='portail')
def path(url):
return urlparse.urlparse(url).path
def get_links_from_mail(mail):
'''Extract links from mail sent by Django'''
return re.findall('https?://[^ \n]*', mail.body)
def hmac_jwt(payload, key):
header = {'alg': 'HS256'}
k = jwk.JWK(
kty='oct', k=force_text(base64.b64encode(key.encode('utf-8'))))
t = jwt.JWT(header=header, claims=payload)
t.make_signed_token(k)
return t.serialize()
def test_login_redirect(app, fc_settings):
def test_login_redirect(app, franceconnect):
url = reverse('fc-login-or-link')
response = app.get(url, status=302)
assert response['Location'].startswith('https://fcp.integ01')
def check_authorization_url(url):
callback = reverse('fc-login-or-link')
assert url.startswith('https://fcp.integ01')
query_string = url.split('?')[1]
parsed = {x: y[0] for x, y in urlparse.parse_qs(query_string).items()}
assert 'redirect_uri' in parsed
assert callback in parsed['redirect_uri']
assert 'client_id' in parsed
assert parsed['client_id'] == 'xxx'
assert 'scope' in parsed
assert set(parsed['scope'].split()) == set(['openid', 'profile', 'email'])
assert 'state' in parsed
assert 'nonce' in parsed
assert parsed['state'] == parsed['nonce']
assert 'response_type' in parsed
assert parsed['response_type'] == 'code'
assert parsed['acr_values'] == 'eidas1'
return parsed['state']
def test_login_with_condition(app, fc_settings, settings):
def test_login_with_condition(settings, app, franceconnect):
# open the page first time so session cookie can be set
response = app.get('/login/')
assert 'fc-button' in response
@ -105,225 +57,135 @@ def test_login_with_condition(app, fc_settings, settings):
assert 'fc-button' not in response
def test_login_autorun(app, fc_settings, settings):
def test_login_autorun(settings, app, franceconnect):
# hide password block
settings.AUTH_FRONTENDS_KWARGS = {'password': {'show_condition': 'remote_addr==\'0.0.0.0\''}}
response = app.get('/login/')
assert response['Location'] == reverse('fc-login-or-link')
@pytest.mark.parametrize('exp', [now() + datetime.timedelta(seconds=1000),
now() - datetime.timedelta(seconds=1000)])
def test_login_simple(app, fc_settings, caplog, hooks, exp):
def test_no_create(app, franceconnect):
response = app.get('/login/?service=portail&next=/idp/')
response = response.click(href='callback')
location = response['Location']
state = check_authorization_url(location)
@httmock.urlmatch(path=r'.*/token$')
def access_token_response(url, request):
parsed = {x: y[0] for x, y in urlparse.parse_qs(request.body).items()}
assert set(parsed.keys()) == set(['code', 'client_id', 'client_secret', 'redirect_uri',
'grant_type'])
assert parsed['code'] == 'zzz'
assert parsed['client_id'] == 'xxx'
assert parsed['client_secret'] == 'yyy'
assert parsed['grant_type'] == 'authorization_code'
parsed_redirect = urlparse.urlparse(parsed['redirect_uri'])
parsed_callback = urlparse.urlparse(callback)
assert parsed_redirect.path == parsed_callback.path
for cb_key, cb_value in urlparse.parse_qs(parsed_callback.query).items():
urlparse.parse_qs(parsed_redirect.query)[cb_key] == cb_value
id_token = {
'sub': '1234',
'aud': 'xxx',
'nonce': state,
'exp': int(exp.timestamp()),
'iss': 'https://fcp.integ01.dev-franceconnect.fr/',
}
return json.dumps({
'access_token': 'uuu',
'id_token': hmac_jwt(id_token, 'yyy')
})
@httmock.urlmatch(path=r'.*userinfo$')
def user_info_response(url, request):
assert request.headers['Authorization'] == 'Bearer uuu'
return json.dumps({
'sub': '1234',
'family_name': u'Frédérique',
'given_name': u'Ÿuñe',
})
callback = reverse('fc-login-or-link')
with httmock.HTTMock(access_token_response, user_info_response):
response = app.get(callback + '?service=portail&next=/idp/&code=zzz&state=%s' % state, status=302)
franceconnect.handle_authorization(app, response.location, status=302)
assert User.objects.count() == 0
fc_settings.A2_FC_CREATE = True
with httmock.HTTMock(access_token_response, user_info_response):
response = app.get(callback + '?service=portail&next=/idp/&code=zzz&state=%s' % state, status=302)
if exp < now():
assert User.objects.count() == 0
else:
assert User.objects.count() == 1
if User.objects.count():
user = User.objects.get()
assert user.verified_attributes.first_name == u'Ÿuñe'
assert user.verified_attributes.last_name == u'Frédérique'
assert path(response['Location']) == '/idp/'
assert hooks.event[1]['kwargs']['name'] == 'login'
assert hooks.event[1]['kwargs']['service'] == 'portail'
# we must be connected
assert app.session['_auth_user_id']
assert app.session.get_expire_at_browser_close()
assert models.FcAccount.objects.count() == 1
# test unlink cancel case
response = app.get('/accounts/')
response = response.click('Delete link')
assert len(response.pyquery('[name=cancel][formnovalidate]')) == 1
response = response.form.submit(name='cancel')
response = response.follow()
# test unlink submit case
response = app.get('/accounts/')
response = response.click('Delete link')
response.form.set('new_password1', 'ikKL1234')
response.form.set('new_password2', 'ikKL1234')
response = response.form.submit(name='unlink')
assert 'The link with the FranceConnect account has been deleted' in response.text
assert models.FcAccount.objects.count() == 0
continue_url = response.pyquery('a#a2-continue').attr['href']
state = urlparse.parse_qs(urlparse.urlparse(continue_url).query)['state'][0]
assert app.session['fc_states'][state]['next'] == '/accounts/'
response = app.get(reverse('fc-logout') + '?state=' + state)
assert path(response['Location']) == '/accounts/'
def test_login_email_is_unique(app, fc_settings, caplog):
callback = reverse('fc-login-or-link')
response = app.get(callback, status=302)
location = response['Location']
state = check_authorization_url(location)
def test_create(settings, app, franceconnect, hooks):
# test direct creation
settings.A2_FC_CREATE = True
@httmock.urlmatch(path=r'.*/token$')
def access_token_response(url, request):
parsed = {x: y[0] for x, y in urlparse.parse_qs(request.body).items()}
assert set(parsed.keys()) == set(['code', 'client_id', 'client_secret', 'redirect_uri',
'grant_type'])
assert parsed['code'] == 'zzz'
assert parsed['client_id'] == 'xxx'
assert parsed['client_secret'] == 'yyy'
assert parsed['grant_type'] == 'authorization_code'
parsed_redirect = urlparse.urlparse(parsed['redirect_uri'])
parsed_callback = urlparse.urlparse(callback)
assert parsed_redirect.path == parsed_callback.path
for cb_key, cb_value in urlparse.parse_qs(parsed_callback.query).items():
urlparse.parse_qs(parsed_redirect.query)[cb_key] == cb_value
exp = now() + datetime.timedelta(seconds=1000)
id_token = {
'sub': '1234',
'aud': 'xxx',
'nonce': state,
'exp': int(exp.timestamp()),
'iss': 'https://fcp.integ01.dev-franceconnect.fr/',
}
return json.dumps({
'access_token': 'uuu',
'id_token': hmac_jwt(id_token, 'yyy')
})
response = app.get('/login/?service=portail&next=/idp/')
response = response.click(href='callback')
@httmock.urlmatch(path=r'.*userinfo$')
def user_info_response(url, request):
assert request.headers['Authorization'] == 'Bearer uuu'
return json.dumps({
'sub': '1234',
'family_name': u'Frédérique',
'given_name': u'Ÿuñe',
'email': 'jOhn.dOe@eXample.com',
})
assert User.objects.count() == 0
response = franceconnect.handle_authorization(app, response.location, status=302)
assert User.objects.count() == 1
user = User.objects.create(email='john.doe@example.com', first_name='John', last_name='Doe')
user = User.objects.get()
assert user.verified_attributes.first_name == 'Ÿuñe'
assert user.verified_attributes.last_name == 'Frédérique'
assert path(response['Location']) == '/idp/'
assert hooks.event[1]['kwargs']['name'] == 'login'
assert hooks.event[1]['kwargs']['service'] == 'portail'
# we must be connected
assert app.session['_auth_user_id']
assert app.session.get_expire_at_browser_close()
assert models.FcAccount.objects.count() == 1
# test unlink cancel case
response = app.get('/accounts/')
response = response.click('Delete link')
assert len(response.pyquery('[name=cancel][formnovalidate]')) == 1
response = response.form.submit(name='cancel')
response = response.follow()
# test unlink submit case
response = app.get('/accounts/')
response = response.click('Delete link')
response.form.set('new_password1', 'ikKL1234')
response.form.set('new_password2', 'ikKL1234')
response = response.form.submit(name='unlink')
assert 'The link with the FranceConnect account has been deleted' in response.text
assert models.FcAccount.objects.count() == 0
continue_url = response.pyquery('a#a2-continue').attr['href']
state = urlparse.parse_qs(urlparse.urlparse(continue_url).query)['state'][0]
assert app.session['fc_states'][state]['next'] == '/accounts/'
response = app.get(reverse('fc-logout') + '?state=' + state)
assert path(response['Location']) == '/accounts/'
def test_create_expired(settings, app, franceconnect, hooks):
# test direct creation failure on an expired id_token
settings.A2_FC_CREATE = True
franceconnect.exp = now() - datetime.timedelta(seconds=30)
response = app.get('/login/?service=portail&next=/idp/')
response = response.click(href='callback')
assert User.objects.count() == 0
response = franceconnect.handle_authorization(app, response.location, status=302)
assert User.objects.count() == 0
def test_login_email_is_unique(settings, app, franceconnect, caplog):
settings.A2_EMAIL_IS_UNIQUE = True
user = User(
email='john.doe@example.com',
first_name='John',
last_name='Doe')
user.set_password('toto')
user.save()
fc_settings.A2_EMAIL_IS_UNIQUE = True
with httmock.HTTMock(access_token_response, user_info_response):
response = app.get(callback + '?code=zzz&state=%s' % state, status=302)
franceconnect.user_info['email'] = user.email
assert User.objects.count() == 1
assert app.session['_auth_user_id']
franceconnect.login_with_fc_fixed_params(app)
assert User.objects.count() == 1
assert app.session['_auth_user_id'] == str(user.pk)
# logout, test unlinking when logging with password
app.session.flush()
response = app.get('/login/')
response.form.set('username', User.objects.get().email)
response.form.set('password', 'toto')
response = response.form.submit(name='login-password-submit').follow()
response = app.get('/accounts/')
def test_unlink_after_login_with_password(app, franceconnect, simple_user):
models.FcAccount.objects.create(user=simple_user, user_info='{}')
response = login(app, simple_user, path='/accounts/')
response = response.click('Delete link')
assert 'new_password1' not in response.form.fields
response = response.form.submit(name='unlink').follow()
assert 'The link with the FranceConnect account has been deleted' in response.text
# no logout from FC since we are not logged to it
assert response.request.path == '/accounts/'
def test_login_email_is_unique_and_already_linked(app, fc_settings, caplog):
callback = reverse('fc-login-or-link')
response = app.get(callback, status=302)
location = response['Location']
state = check_authorization_url(location)
def test_unlink_after_login_with_fc(app, franceconnect, simple_user):
models.FcAccount.objects.create(user=simple_user, sub=franceconnect.sub, user_info='{}')
EMAIL = 'john.doe@example.com'
SUB = '1234'
user = User.objects.create(email=EMAIL, first_name='John', last_name='Doe')
response = franceconnect.login_with_fc(app, path='/accounts/')
response = response.click('Delete link')
response.form.set('new_password1', 'ikKL1234')
response.form.set('new_password2', 'ikKL1234')
response = response.form.submit(name='unlink')
assert 'The link with the FranceConnect account has been deleted' in response.text
assert models.FcAccount.objects.count() == 0
continue_url = response.pyquery('a#a2-continue').attr['href']
response = franceconnect.handle_logout(app, continue_url)
assert path(response.location) == '/accounts/'
def test_login_email_is_unique_and_already_linked(settings, app, franceconnect, caplog):
settings.A2_EMAIL_IS_UNIQUE = True
# setup an already linked user account
user = User.objects.create(email='john.doe@example.com', first_name='John', last_name='Doe')
models.FcAccount.objects.create(user=user, sub='4567', token='xxx', user_info='{}')
@httmock.urlmatch(path=r'.*/token$')
def access_token_response(url, request):
parsed = {x: y[0] for x, y in urlparse.parse_qs(request.body).items()}
assert set(parsed.keys()) == set(['code', 'client_id', 'client_secret', 'redirect_uri',
'grant_type'])
assert parsed['code'] == 'zzz'
assert parsed['client_id'] == 'xxx'
assert parsed['client_secret'] == 'yyy'
assert parsed['grant_type'] == 'authorization_code'
parsed_redirect = urlparse.urlparse(parsed['redirect_uri'])
parsed_callback = urlparse.urlparse(callback)
assert parsed_redirect.path == parsed_callback.path
for cb_key, cb_value in urlparse.parse_qs(parsed_callback.query).items():
urlparse.parse_qs(parsed_redirect.query)[cb_key] == cb_value
exp = now() + datetime.timedelta(seconds=1000)
id_token = {
'sub': SUB,
'aud': 'xxx',
'nonce': state,
'exp': int(exp.timestamp()),
'iss': 'https://fcp.integ01.dev-franceconnect.fr/',
}
return json.dumps({
'access_token': 'uuu',
'id_token': hmac_jwt(id_token, 'yyy')
})
@httmock.urlmatch(path=r'.*userinfo$')
def user_info_response(url, request):
assert request.headers['Authorization'] == 'Bearer uuu'
return json.dumps({
'sub': '1234',
'family_name': u'Frédérique',
'given_name': u'Ÿuñe',
'email': EMAIL,
})
fc_settings.A2_EMAIL_IS_UNIQUE = True
with httmock.HTTMock(access_token_response, user_info_response):
response = app.get(callback + '?code=zzz&state=%s' % state, status=302)
assert 'is already used' in str(response)
assert User.objects.count() == 1
response = app.get('/login/?service=portail&next=/idp/')
response = response.click(href='callback')
response = franceconnect.handle_authorization(app, response.location, status=302)
assert models.FcAccount.objects.count() == 1
assert 'is already used' in app.cookies['messages']
assert '_auth_user_id' not in app.session
def test_requests_proxies_support(app, fc_settings, caplog):
def test_requests_proxies_support(settings, app):
session = requests_retry_session()
assert session.proxies == {}
other_session = requests.Session()
@ -331,7 +193,8 @@ def test_requests_proxies_support(app, fc_settings, caplog):
session = requests_retry_session(session=other_session)
assert session is other_session
assert session.proxies == {'http': 'http://example.net'}
fc_settings.REQUESTS_PROXIES = {'https': 'http://pubproxy.com/api/proxy'}
settings.REQUESTS_PROXIES = {'https': 'http://pubproxy.com/api/proxy'}
session = requests_retry_session()
assert session.proxies == {'https': 'http://pubproxy.com/api/proxy'}
@ -341,95 +204,65 @@ def test_requests_proxies_support(app, fc_settings, caplog):
assert mocked_send.call_args[1]['proxies'] == {'https': 'http://pubproxy.com/api/proxy'}
def test_password_reset(app, mailoutbox):
def test_no_password_with_fc_account_can_reset_password(app, db, mailoutbox):
user = User.objects.create(email='john.doe@example.com')
# No FC account, forbidden to set a password
response = app.get('/login/')
response = response.click('Reset it!').maybe_follow()
response.form['email'] = user.email
assert len(mailoutbox) == 0
response = response.form.submit()
assert len(mailoutbox) == 1
url = get_links_from_mail(mailoutbox[0])[0]
url = get_link_from_mail(mailoutbox[0])
response = app.get(url).follow().follow()
assert '_auth_user_id' not in app.session
assert 'not possible to reset' in response
# With FC account, can set a password
models.FcAccount.objects.create(user=user, sub='xxx', token='aaa')
response = app.get(url).maybe_follow()
assert 'new_password1' in response.form.fields
response = app.get('/login/')
response = response.click('Reset it!').maybe_follow()
response.form['email'] = user.email
assert len(mailoutbox) == 1
response = response.form.submit()
assert len(mailoutbox) == 2
url = get_link_from_mail(mailoutbox[1])
response = app.get(url, status=200)
response.form.set('new_password1', 'ikKL1234')
response.form.set('new_password2', 'ikKL1234')
response = response.form.submit().follow()
assert '_auth_user_id' in app.session
def test_registration1(app, fc_settings, caplog, hooks):
exp = now() + datetime.timedelta(seconds=1000)
response = app.get('/login/?service=portail&next=/idp/')
response = response.click(href="callback")
# 1. Try a login
# 2. Verify we come back to login page
# 3. Check presence of registration link
# 4. Follow it
location = response['Location']
state = check_authorization_url(location)
@httmock.urlmatch(path=r'.*/token$')
def access_token_response(url, request):
parsed = {x: y[0] for x, y in urlparse.parse_qs(request.body).items()}
assert set(parsed.keys()) == set(['code', 'client_id', 'client_secret', 'redirect_uri',
'grant_type'])
assert parsed['code'] == 'zzz'
assert parsed['client_id'] == 'xxx'
assert parsed['client_secret'] == 'yyy'
assert parsed['grant_type'] == 'authorization_code'
parsed_redirect = urlparse.urlparse(parsed['redirect_uri'])
parsed_callback = urlparse.urlparse(callback)
assert parsed_redirect.path == parsed_callback.path
for cb_key, cb_value in urlparse.parse_qs(parsed_callback.query).items():
urlparse.parse_qs(parsed_redirect.query)[cb_key] == cb_value
id_token = {
'sub': '1234',
'aud': 'xxx',
'nonce': state,
'exp': int(exp.timestamp()),
'iss': 'https://fcp.integ01.dev-franceconnect.fr/',
'email': 'john.doe@example.com',
}
return json.dumps({
'access_token': 'uuu',
'id_token': hmac_jwt(id_token, 'yyy')
})
@httmock.urlmatch(path=r'.*userinfo$')
def user_info_response(url, request):
assert request.headers['Authorization'] == 'Bearer uuu'
return json.dumps({
'sub': '1234',
'family_name': u'Frédérique',
'given_name': u'Ÿuñe',
'email': 'john.doe@example.com',
})
callback = urlparse.parse_qs(urlparse.urlparse(location).query)['redirect_uri'][0]
with httmock.HTTMock(access_token_response, user_info_response):
response = app.get(callback + '&code=zzz&state=%s' % state, status=302)
def test_registration1(settings, app, franceconnect, caplog, hooks):
response = franceconnect.login_with_fc_fixed_params(app)
assert User.objects.count() == 0
assert path(response['Location']) == '/login/'
assert path(response.location) == '/login/'
response = response.follow()
response = response.click(href='/accounts/fc/register')
location = response['Location']
location.startswith('http://testserver/accounts/activate/')
response.location.startswith('http://testserver/accounts/activate/')
assert User.objects.count() == 0
response = response.follow()
assert hooks.calls['event'][0]['kwargs']['service'] == 'portail'
assert response.location.startswith('/fc/callback/')
# a new user has been created
assert User.objects.count() == 1
# but no FcAccount
assert models.FcAccount.objects.count() == 0
# we must be connected
assert app.session['_auth_user_id']
parsed_location = urlparse.urlparse(response['Location'])
parsed_callback = urlparse.urlparse(callback)
assert parsed_location.path == parsed_callback.path
assert (urlparse.parse_qs(parsed_location.query) ==
urlparse.parse_qs(parsed_callback.query))
# hook must have been called
assert hooks.calls['event'][0]['kwargs']['service'] == 'portail'
response = response.follow()
location = response['Location']
state = check_authorization_url(location)
with httmock.HTTMock(access_token_response, user_info_response):
response = app.get(callback + '&code=zzz&state=%s' % state, status=302)
# a new redirect to FC is done
response = franceconnect.handle_authorization(app, response.location)
# FcAccount now exists
assert models.FcAccount.objects.count() == 1
user = User.objects.get()
assert user.verified_attributes.first_name == u'Ÿuñe'
assert user.verified_attributes.last_name == u'Frédérique'
assert user.verified_attributes.first_name == 'Ÿuñe'
assert user.verified_attributes.last_name == 'Frédérique'
response = app.get('/accounts/')
response = response.click('Delete link')
response.form.set('new_password1', 'ikKL1234')
@ -438,214 +271,60 @@ def test_registration1(app, fc_settings, caplog, hooks):
assert 'The link with the FranceConnect account has been deleted' in response.text
assert models.FcAccount.objects.count() == 0
continue_url = response.pyquery('a#a2-continue').attr['href']
state = urlparse.parse_qs(urlparse.urlparse(continue_url).query)['state'][0]
assert app.session['fc_states'][state]['next'] == '/accounts/'
response = app.get(reverse('fc-logout') + '?state=' + state)
assert path(response['Location']) == '/accounts/'
response = franceconnect.handle_logout(app, continue_url)
assert path(response.location) == '/accounts/'
def test_registration2(app, fc_settings, caplog, hooks):
exp = now() + datetime.timedelta(seconds=1000)
def test_registration2(settings, app, franceconnect, hooks):
response = app.get('/login/?service=portail&next=/idp/')
response = response.click("Register")
response = response.click(href='callback')
# 1. Try a login
# 2. Verify we come back to login page
# 3. Check presence of registration link
# 4. Follow it
location = response['Location']
state = check_authorization_url(location)
franceconnect.callback_params['registration'] = ''
response = franceconnect.handle_authorization(app, response.location)
@httmock.urlmatch(path=r'.*/token$')
def access_token_response(url, request):
parsed = {x: y[0] for x, y in urlparse.parse_qs(request.body).items()}
assert set(parsed.keys()) == set(['code', 'client_id', 'client_secret', 'redirect_uri',
'grant_type'])
assert parsed['code'] == 'zzz'
assert parsed['client_id'] == 'xxx'
assert parsed['client_secret'] == 'yyy'
assert parsed['grant_type'] == 'authorization_code'
parsed_redirect = urlparse.urlparse(parsed['redirect_uri'])
parsed_callback = urlparse.urlparse(callback)
assert parsed_redirect.path == parsed_callback.path
for cb_key, cb_value in urlparse.parse_qs(parsed_callback.query).items():
urlparse.parse_qs(parsed_redirect.query)[cb_key] == cb_value
id_token = {
'sub': '1234',
'aud': 'xxx',
'nonce': state,
'exp': int(exp.timestamp()),
'iss': 'https://fcp.integ01.dev-franceconnect.fr/',
'email': 'john.doe@example.com',
}
return json.dumps({
'access_token': 'uuu',
'id_token': hmac_jwt(id_token, 'yyy')
})
@httmock.urlmatch(path=r'.*userinfo$')
def user_info_response(url, request):
assert request.headers['Authorization'] == 'Bearer uuu'
return json.dumps({
'sub': '1234',
'family_name': u'Frédérique',
'given_name': u'Ÿuñe',
'email': 'john.doe@example.com',
})
callback = urlparse.parse_qs(urlparse.urlparse(location).query)['redirect_uri'][0]
with httmock.HTTMock(access_token_response, user_info_response):
response = app.get(callback + '&code=zzz&state=%s' % state, status=302)
assert User.objects.count() == 0
assert path(response['Location']) == '/accounts/fc/register/'
assert path(response.location) == '/accounts/fc/register/'
response = response.follow()
location = response['Location']
location.startswith('http://testserver/accounts/activate/')
response.location.startswith('http://testserver/accounts/activate/')
response = response.follow()
assert User.objects.count() == 1
user = User.objects.get()
assert user.verified_attributes.first_name is None
assert user.verified_attributes.last_name is None
assert hooks.calls['event'][0]['kwargs']['service'] == 'portail'
assert hooks.calls['event'][1]['kwargs']['service'] == 'portail'
# we must be connected
assert app.session['_auth_user_id']
# remove the registration parameter
callback = callback.replace('&registration=', '')
callback = callback.replace('?registration=', '?')
callback = callback.replace('?&', '?')
parsed_location = urlparse.urlparse(response['Location'])
parsed_callback = urlparse.urlparse(callback)
assert parsed_location.path == parsed_callback.path
assert (urlparse.parse_qs(parsed_location.query) ==
urlparse.parse_qs(parsed_callback.query))
response = response.follow()
location = response['Location']
state = check_authorization_url(location)
with httmock.HTTMock(access_token_response, user_info_response):
response = app.get(callback + '&code=zzz&state=%s' % state, status=302)
assert models.FcAccount.objects.count() == 1
del franceconnect.callback_params['registration']
response = franceconnect.handle_authorization(app, response.location)
user = User.objects.get()
assert user.verified_attributes.first_name == u'Ÿuñe'
assert user.verified_attributes.last_name == u'Frédérique'
response = app.get('/accounts/')
response = response.click('Delete link')
response.form.set('new_password1', 'ikKL1234')
response.form.set('new_password2', 'ikKL1234')
response = response.form.submit(name='unlink')
assert 'The link with the FranceConnect account has been deleted' in response.text
assert models.FcAccount.objects.count() == 0
continue_url = response.pyquery('a#a2-continue').attr['href']
state = urlparse.parse_qs(urlparse.urlparse(continue_url).query)['state'][0]
assert app.session['fc_states'][state]['next'] == '/accounts/'
response = app.get(reverse('fc-logout') + '?state=' + state)
assert path(response['Location']) == '/accounts/'
assert user.verified_attributes.first_name == 'Ÿuñe'
assert user.verified_attributes.last_name == 'Frédérique'
def test_can_change_password(app, fc_settings, caplog, hooks):
exp = now() + datetime.timedelta(seconds=1000)
response = app.get('/login/?service=portail&next=/idp/')
response = response.click("Register")
response = response.click(href='callback')
# 1. Try a login
# 2. Verify we come back to login page
# 3. Check presence of registration link
# 4. Follow it
location = response['Location']
state = check_authorization_url(location)
def test_can_change_password(settings, app, franceconnect):
user = User.objects.create(email='john.doe@example.com')
models.FcAccount.objects.create(user=user, sub=franceconnect.sub)
@httmock.urlmatch(path=r'.*/token$')
def access_token_response(url, request):
parsed = {x: y[0] for x, y in urlparse.parse_qs(request.body).items()}
assert set(parsed.keys()) == set(['code', 'client_id', 'client_secret', 'redirect_uri',
'grant_type'])
assert parsed['code'] == 'zzz'
assert parsed['client_id'] == 'xxx'
assert parsed['client_secret'] == 'yyy'
assert parsed['grant_type'] == 'authorization_code'
parsed_redirect = urlparse.urlparse(parsed['redirect_uri'])
parsed_callback = urlparse.urlparse(callback)
assert parsed_redirect.path == parsed_callback.path
for cb_key, cb_value in urlparse.parse_qs(parsed_callback.query).items():
urlparse.parse_qs(parsed_redirect.query)[cb_key] == cb_value
id_token = {
'sub': '1234',
'aud': 'xxx',
'nonce': state,
'exp': int(exp.timestamp()),
'iss': 'https://fcp.integ01.dev-franceconnect.fr/',
'email': 'john.doe@example.com',
}
return json.dumps({
'access_token': 'uuu',
'id_token': hmac_jwt(id_token, 'yyy')
})
@httmock.urlmatch(path=r'.*userinfo$')
def user_info_response(url, request):
assert request.headers['Authorization'] == 'Bearer uuu'
return json.dumps({
'sub': '1234',
'family_name': u'Frédérique',
'given_name': u'Ÿuñe',
'email': 'john.doe@example.com',
})
callback = urlparse.parse_qs(urlparse.urlparse(location).query)['redirect_uri'][0]
with httmock.HTTMock(access_token_response, user_info_response):
response = app.get(callback + '&code=zzz&state=%s' % state, status=302)
assert User.objects.count() == 0
assert path(response['Location']) == '/accounts/fc/register/'
response = response.follow()
location = response['Location']
location.startswith('http://testserver/accounts/activate/')
response = response.follow()
assert hooks.calls['event'][0]['kwargs']['service'] == 'portail'
assert hooks.calls['event'][1]['kwargs']['service'] == 'portail'
# we must be connected
assert app.session['_auth_user_id']
# remove the registration parameter
callback = callback.replace('&registration=', '')
callback = callback.replace('?registration=', '?')
callback = callback.replace('?&', '?')
parsed_location = urlparse.urlparse(response['Location'])
parsed_callback = urlparse.urlparse(callback)
assert parsed_location.path == parsed_callback.path
assert (urlparse.parse_qs(parsed_location.query) ==
urlparse.parse_qs(parsed_callback.query))
response = response.follow()
location = response['Location']
state = check_authorization_url(location)
with httmock.HTTMock(access_token_response, user_info_response):
response = app.get(callback + '&code=zzz&state=%s' % state, status=302)
assert models.FcAccount.objects.count() == 1
user = User.objects.get()
assert user.verified_attributes.first_name == u'Ÿuñe'
assert user.verified_attributes.last_name == u'Frédérique'
response = app.get('/accounts/')
response = franceconnect.login_with_fc(app, path='/accounts/')
assert len(response.pyquery('[href*="password/change"]')) == 0
response = response.click('Logout')
response = franceconnect.handle_logout(app, response.location).follow()
assert '_auth_user_id' not in app.session
# Login with password
user = User.objects.get()
user.username = 'test'
user.set_password('test')
user.save()
app.session.flush()
response = app.get('/login/')
response.form.set('username', User.objects.get().email)
response.form.set('password', 'test')
response = response.form.submit(name='login-password-submit').follow()
response = app.get('/accounts/')
response = login(app, user, path='/accounts/')
assert len(response.pyquery('[href*="password/change"]')) > 0
response = response.click('Logout').follow()
# Relogin with FC
app.session.flush()
response = app.get('/login/?service=portail&next=/accounts/')
response = response.click(href='callback')
location = response['Location']
state = check_authorization_url(location)
callback = urlparse.parse_qs(urlparse.urlparse(location).query)['redirect_uri'][0]
with httmock.HTTMock(access_token_response, user_info_response):
response = app.get(callback + '&code=zzz&state=%s' % state, status=302)
# we must be connected
assert app.session['_auth_user_id']
assert path(response['Location']) == '/accounts/'
response = response.follow()
response = franceconnect.login_with_fc(app, path='/accounts/')
assert len(response.pyquery('[href*="password/change"]')) == 0
# Unlink
@ -653,20 +332,17 @@ def test_can_change_password(app, fc_settings, caplog, hooks):
response.form.set('new_password1', 'ikKL1234')
response.form.set('new_password2', 'ikKL1234')
response = response.form.submit(name='unlink')
assert 'The link with the FranceConnect account has been deleted' in response.text
continue_url = response.pyquery('a#a2-continue').attr['href']
state = urlparse.parse_qs(urlparse.urlparse(continue_url).query)['state'][0]
assert app.session['fc_states'][state]['next'] == '/accounts/'
response = app.get(reverse('fc-logout') + '?state=' + state)
assert path(response['Location']) == '/accounts/'
response = response.follow()
response = franceconnect.handle_logout(app, continue_url).follow()
assert len(response.pyquery('[href*="password/change"]')) > 0
def test_invalid_next_url(app, fc_settings, caplog, hooks):
def test_invalid_next_url(app, franceconnect):
assert app.get('/fc/callback/?code=coin&next=JJJ72QQQ').location == 'JJJ72QQQ'
def test_manager_user_sidebar(app, fc_settings, superuser, simple_user):
def test_manager_user_sidebar(app, superuser, simple_user):
login(app, superuser, '/manage/')
response = app.get('/manage/users/%s/' % simple_user.id)
assert 'FranceConnect' not in response

View File

@ -18,8 +18,9 @@
from authentic2_auth_fc.models import FcAccount
def test_api_fc_unlink(app, admin, user_cartman):
url = '/api/users/%s/fc-unlink/' % user_cartman.uuid
def test_api_fc_unlink(app, admin, simple_user):
FcAccount.objects.create(user=simple_user)
url = '/api/users/%s/fc-unlink/' % simple_user.uuid
# test unauthorized caller
app.delete(url, status=401)
# test unauthorized method
@ -27,13 +28,14 @@ def test_api_fc_unlink(app, admin, user_cartman):
app.get(url, status=405)
# test success
app.delete(url, status=204)
assert FcAccount.objects.filter(user=user_cartman).exists() is False
assert FcAccount.objects.filter(user=simple_user).exists() is False
def test_api_user_franceconnect(settings, app, admin, user_cartman):
def test_api_user_franceconnect(settings, app, admin, simple_user):
settings.A2_FC_ENABLE = True
FcAccount.objects.create(user=simple_user, sub='1234')
url = '/api/users/%s/' % user_cartman.uuid
url = '/api/users/%s/' % simple_user.uuid
# test unauthorized method
app.authorization = ('Basic', (admin.username, admin.username))
response = app.get(url)
@ -48,7 +50,7 @@ def test_api_user_franceconnect(settings, app, admin, user_cartman):
assert content.get('unlink_url').startswith('http://')
assert content.get('unlink_url').endswith('/unlink/')
unlink_url = '/api/users/%s/fc-unlink/' % user_cartman.uuid
unlink_url = '/api/users/%s/fc-unlink/' % simple_user.uuid
app.delete(unlink_url, status=204)
response = app.get(url + '?full')