diff --git a/tests/auth_fc/conftest.py b/tests/auth_fc/conftest.py
index 8b17fc6d6..b0df872c2 100644
--- a/tests/auth_fc/conftest.py
+++ b/tests/auth_fc/conftest.py
@@ -14,129 +14,149 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see .
+import base64
+import contextlib
+import datetime
import json
+import urllib.parse as urlparse
+import uuid
+
+from jwcrypto import jwk, jwt
+import httmock
import pytest
-import django_webtest
-from django.contrib.auth import get_user_model
-from django.core.cache import cache
-from django_rbac.utils import get_ou_model
-
-from authentic2 import hooks as a2_hooks
-from authentic2.manager.utils import get_ou_count
-from authentic2_auth_fc.models import FcAccount
+from django.http import QueryDict
+from django.urls import reverse
+from django.utils.http import urlencode
+from django.utils.timezone import now
-CARTMAN_FC_INFO = {
- "token": {
- "access_token": "cartmane_access_token",
- "token_type": "Bearer",
- "expires_in": 1200,
- "id_token": "cartman_token_id"
- },
- "sub": "c11661ed00014db58149c8a886c8180d",
- "user_info": {
- "birthcountry": "99404",
- "birthdate": "2006-06-06",
- "birthplace": "southpark",
- "email": "ecartman@ou_southpark.org",
- "family_name": "CARTMAN",
- "gender": "male",
- "given_name": "Eric",
- "preferred_username": "CARTMAN",
- "sub": "c11661ed00014db58149c8a886c8180d"
- }
-}
+from authentic2.models import Service
+from authentic2.utils import make_url
+
+from ..utils import assert_equals_url
+
+CLIENT_ID = 'xxx'
+CLIENT_SECRET = 'yyy'
-def create_user(**kwargs):
- User = get_user_model()
- password = kwargs.pop('password', None) or kwargs['username']
- federation = kwargs.pop('federation', None)
- user, created = User.objects.get_or_create(**kwargs)
- if password:
- user.set_password(password)
- user.save()
+class FranceConnectMock:
+ exp = None
- if federation:
- create_fc_federation(user, federation)
- return user
-
-
-def create_fc_federation(user, info):
- kwargs = {
- 'user': user,
- 'token': json.dumps(info['token']),
- 'user_info': json.dumps(info['user_info']),
- 'sub': info['sub']
- }
- return FcAccount.objects.create(**kwargs)
-
-
-@pytest.fixture
-def app(request, db):
- wtm = django_webtest.WebTestMixin()
- wtm._patch_settings()
- request.addfinalizer(wtm._unpatch_settings)
- return django_webtest.DjangoTestApp(extra_environ={'HTTP_HOST': 'testserver'})
-
-
-@pytest.fixture
-def fc_settings(settings):
- settings.A2_FC_ENABLE = True
- settings.A2_FC_CLIENT_ID = 'xxx'
- settings.A2_FC_CLIENT_SECRET = 'yyy'
- return settings
-
-
-@pytest.fixture
-def ou_southpark(db):
- OU = get_ou_model()
- return OU.objects.create(name='southpark', slug='southpark')
-
-
-@pytest.fixture
-def admin(db):
- return create_user(username='admin', is_superuser=True, is_staff=True)
-
-
-@pytest.fixture
-def user_cartman(db, ou_southpark):
- return create_user(username='ecartman', first_name='eric', last_name='cartman',
- email='ecartman@southpark.org', ou=ou_southpark, federation=CARTMAN_FC_INFO)
-
-
-@pytest.fixture(autouse=True)
-def clear_cache():
- OU = get_ou_model()
-
- cache.clear()
- for cached_el in (OU.cached, a2_hooks.get_hooks, get_ou_count):
- cached_el.cache.clear()
-
-
-class AllHook(object):
def __init__(self):
- self.calls = {}
+ self.sub = '1234'
+ self.id_token = {
+ 'aud': 'xxx',
+ 'iss': 'https://fcp.integ01.dev-franceconnect.fr/',
+ }
+ self.user_info = {
+ 'family_name': 'Frédérique',
+ 'given_name': 'Ÿuñe',
+ 'email': 'john.doe@example.com',
+ }
+ self.access_token = str(uuid.uuid4())
+ self.client_id = CLIENT_ID
+ self.client_secret = CLIENT_SECRET
+ self.scopes = {'openid', 'profile', 'email'}
+ self.callback_params = {'service': 'portail', 'next': '/idp/'}
- def __call__(self, hook_name, *args, **kwargs):
- calls = self.calls.setdefault(hook_name, [])
- calls.append({'args': args, 'kwargs': kwargs})
+ def handle_authorization(self, app, url, **kwargs):
+ assert url.startswith('https://fcp.integ01')
+ parsed_url = urlparse.urlparse(url)
+ query = QueryDict(parsed_url.query)
+ assert_equals_url(query['redirect_uri'], self.callback_url)
+ assert query['client_id'] == self.client_id
+ assert set(query['scope'].split()) == self.scopes
+ assert query['state']
+ assert query['nonce']
+ assert query['response_type'] == 'code'
+ assert query['acr_values'] == 'eidas1'
+ self.state = query['state']
+ self.nonce = query['nonce']
+ self.code = str(uuid.uuid4().hex)
+ return app.get(
+ make_url(self.callback_url, params={'code': self.code, 'state': self.state}), **kwargs)
- def __getattr__(self, name):
- return self.calls.get(name, [])
+ @property
+ def callback_url(self):
+ return 'http://testserver' + reverse('fc-login-or-link') + '?' + urlencode(self.callback_params)
- def clear(self):
- self.calls = {}
+ def login_with_fc_fixed_params(self, app):
+ if app.session:
+ app.session.flush()
+ response = app.get('/login/?' + urlencode(self.callback_params))
+ response = response.click(href='callback')
+ return self.handle_authorization(app, response.location, status=302)
+
+ def login_with_fc(self, app, path):
+ if app.session:
+ app.session.flush()
+ response = app.get(path)
+ self.callback_params = {k: v for k, v in QueryDict(urlparse.urlparse(response.location).query).items()}
+ response = response.follow()
+ response = response.click(href='callback')
+ return self.handle_authorization(app, response.location, status=302).follow()
+
+ def access_token_response(self, url, request):
+ formdata = QueryDict(request.body)
+ assert set(formdata.keys()) == {'code', 'client_id', 'client_secret',
+ 'redirect_uri', 'grant_type'}
+ assert formdata['code'] == self.code
+ assert formdata['client_id'] == self.client_id
+ assert formdata['client_secret'] == self.client_secret
+ assert formdata['grant_type'] == 'authorization_code'
+ assert_equals_url(formdata['redirect_uri'], self.callback_url)
+
+ # make response
+ id_token = self.id_token.copy()
+ id_token.update({
+ 'sub': self.sub,
+ 'nonce': self.nonce,
+ 'exp': int((self.exp or (now() + datetime.timedelta(seconds=60))).timestamp()),
+ })
+ id_token.update(self.user_info)
+ return json.dumps({
+ 'access_token': self.access_token,
+ 'id_token': self.hmac_jwt(id_token, self.client_secret)
+ })
+
+ def hmac_jwt(self, payload, key):
+ header = {'alg': 'HS256'}
+ k = jwk.JWK(kty='oct', k=base64.b64encode(key.encode('utf-8')).decode('ascii'))
+ t = jwt.JWT(header=header, claims=payload)
+ t.make_signed_token(k)
+ return t.serialize()
+
+ def user_info_response(self, url, request):
+ assert request.headers['Authorization'] == 'Bearer %s' % self.access_token
+ user_info = self.user_info.copy()
+ user_info['sub'] = self.sub
+ return json.dumps(user_info)
+
+ @contextlib.contextmanager
+ def __call__(self):
+ with httmock.HTTMock(
+ httmock.urlmatch(path=r'.*/token$')(self.access_token_response),
+ httmock.urlmatch(path=r'.*userinfo$')(self.user_info_response)):
+ yield None
+
+ def handle_logout(self, app, url):
+ assert url.startswith('https://fcp.integ01.dev-franceconnect.fr/api/v1/logout')
+ parsed_url = urlparse.urlparse(url)
+ query = QueryDict(parsed_url.query)
+ assert_equals_url(query['post_logout_redirect_uri'], 'http://testserver' + reverse('fc-logout'))
+ assert query['state']
+ self.state = query['state']
+ return app.get(reverse('fc-logout') + '?state=' + self.state)
@pytest.fixture
-def hooks(settings):
- if hasattr(settings, 'A2_HOOKS'):
- hooks = settings.A2_HOOKS
- else:
- hooks = settings.A2_HOOKS = {}
- hook = hooks['__all__'] = AllHook()
- yield hook
- hook.clear()
- del settings.A2_HOOKS['__all__']
+def franceconnect(settings, service):
+ settings.A2_FC_ENABLE = True
+ settings.A2_FC_CLIENT_ID = CLIENT_ID
+ settings.A2_FC_CLIENT_SECRET = CLIENT_SECRET
+
+ Service.objects.create(name='portail', slug='portail')
+ mock_object = FranceConnectMock()
+ with mock_object():
+ yield mock_object
diff --git a/tests/auth_fc/test_auth_fc.py b/tests/auth_fc/test_auth_fc.py
index 1dae183ad..da539b3f2 100644
--- a/tests/auth_fc/test_auth_fc.py
+++ b/tests/auth_fc/test_auth_fc.py
@@ -15,84 +15,36 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see .
-import pytest
-import re
-import httmock
-import mock
-import json
-import base64
-from jwcrypto import jwk, jwt
import datetime
+import mock
import requests
from django.contrib.auth import get_user_model
from django.urls import reverse
-from django.utils.encoding import force_text
from django.utils.six.moves.urllib import parse as urlparse
from django.utils.timezone import now
-from authentic2.models import Service
-
from authentic2_auth_fc import models
from authentic2_auth_fc.utils import requests_retry_session
-from ..utils import login
+from ..utils import login, get_link_from_mail
User = get_user_model()
-@pytest.fixture(autouse=True)
-def service(db):
- return Service.objects.create(name='portail', slug='portail')
-
-
def path(url):
return urlparse.urlparse(url).path
-def get_links_from_mail(mail):
- '''Extract links from mail sent by Django'''
- return re.findall('https?://[^ \n]*', mail.body)
-
-
-def hmac_jwt(payload, key):
- header = {'alg': 'HS256'}
- k = jwk.JWK(
- kty='oct', k=force_text(base64.b64encode(key.encode('utf-8'))))
- t = jwt.JWT(header=header, claims=payload)
- t.make_signed_token(k)
- return t.serialize()
-
-
-def test_login_redirect(app, fc_settings):
+def test_login_redirect(app, franceconnect):
url = reverse('fc-login-or-link')
response = app.get(url, status=302)
assert response['Location'].startswith('https://fcp.integ01')
-def check_authorization_url(url):
- callback = reverse('fc-login-or-link')
- assert url.startswith('https://fcp.integ01')
- query_string = url.split('?')[1]
- parsed = {x: y[0] for x, y in urlparse.parse_qs(query_string).items()}
- assert 'redirect_uri' in parsed
- assert callback in parsed['redirect_uri']
- assert 'client_id' in parsed
- assert parsed['client_id'] == 'xxx'
- assert 'scope' in parsed
- assert set(parsed['scope'].split()) == set(['openid', 'profile', 'email'])
- assert 'state' in parsed
- assert 'nonce' in parsed
- assert parsed['state'] == parsed['nonce']
- assert 'response_type' in parsed
- assert parsed['response_type'] == 'code'
- assert parsed['acr_values'] == 'eidas1'
- return parsed['state']
-
-
-def test_login_with_condition(app, fc_settings, settings):
+def test_login_with_condition(settings, app, franceconnect):
# open the page first time so session cookie can be set
response = app.get('/login/')
assert 'fc-button' in response
@@ -105,225 +57,135 @@ def test_login_with_condition(app, fc_settings, settings):
assert 'fc-button' not in response
-def test_login_autorun(app, fc_settings, settings):
+def test_login_autorun(settings, app, franceconnect):
# hide password block
settings.AUTH_FRONTENDS_KWARGS = {'password': {'show_condition': 'remote_addr==\'0.0.0.0\''}}
response = app.get('/login/')
assert response['Location'] == reverse('fc-login-or-link')
-@pytest.mark.parametrize('exp', [now() + datetime.timedelta(seconds=1000),
- now() - datetime.timedelta(seconds=1000)])
-def test_login_simple(app, fc_settings, caplog, hooks, exp):
+def test_no_create(app, franceconnect):
response = app.get('/login/?service=portail&next=/idp/')
response = response.click(href='callback')
- location = response['Location']
- state = check_authorization_url(location)
-
- @httmock.urlmatch(path=r'.*/token$')
- def access_token_response(url, request):
- parsed = {x: y[0] for x, y in urlparse.parse_qs(request.body).items()}
- assert set(parsed.keys()) == set(['code', 'client_id', 'client_secret', 'redirect_uri',
- 'grant_type'])
- assert parsed['code'] == 'zzz'
- assert parsed['client_id'] == 'xxx'
- assert parsed['client_secret'] == 'yyy'
- assert parsed['grant_type'] == 'authorization_code'
- parsed_redirect = urlparse.urlparse(parsed['redirect_uri'])
- parsed_callback = urlparse.urlparse(callback)
- assert parsed_redirect.path == parsed_callback.path
- for cb_key, cb_value in urlparse.parse_qs(parsed_callback.query).items():
- urlparse.parse_qs(parsed_redirect.query)[cb_key] == cb_value
- id_token = {
- 'sub': '1234',
- 'aud': 'xxx',
- 'nonce': state,
- 'exp': int(exp.timestamp()),
- 'iss': 'https://fcp.integ01.dev-franceconnect.fr/',
- }
- return json.dumps({
- 'access_token': 'uuu',
- 'id_token': hmac_jwt(id_token, 'yyy')
- })
-
- @httmock.urlmatch(path=r'.*userinfo$')
- def user_info_response(url, request):
- assert request.headers['Authorization'] == 'Bearer uuu'
- return json.dumps({
- 'sub': '1234',
- 'family_name': u'Frédérique',
- 'given_name': u'Ÿuñe',
- })
-
- callback = reverse('fc-login-or-link')
- with httmock.HTTMock(access_token_response, user_info_response):
- response = app.get(callback + '?service=portail&next=/idp/&code=zzz&state=%s' % state, status=302)
+ franceconnect.handle_authorization(app, response.location, status=302)
assert User.objects.count() == 0
- fc_settings.A2_FC_CREATE = True
- with httmock.HTTMock(access_token_response, user_info_response):
- response = app.get(callback + '?service=portail&next=/idp/&code=zzz&state=%s' % state, status=302)
- if exp < now():
- assert User.objects.count() == 0
- else:
- assert User.objects.count() == 1
- if User.objects.count():
- user = User.objects.get()
- assert user.verified_attributes.first_name == u'Ÿuñe'
- assert user.verified_attributes.last_name == u'Frédérique'
- assert path(response['Location']) == '/idp/'
- assert hooks.event[1]['kwargs']['name'] == 'login'
- assert hooks.event[1]['kwargs']['service'] == 'portail'
- # we must be connected
- assert app.session['_auth_user_id']
- assert app.session.get_expire_at_browser_close()
- assert models.FcAccount.objects.count() == 1
-
- # test unlink cancel case
- response = app.get('/accounts/')
- response = response.click('Delete link')
- assert len(response.pyquery('[name=cancel][formnovalidate]')) == 1
- response = response.form.submit(name='cancel')
- response = response.follow()
-
- # test unlink submit case
- response = app.get('/accounts/')
- response = response.click('Delete link')
- response.form.set('new_password1', 'ikKL1234')
- response.form.set('new_password2', 'ikKL1234')
- response = response.form.submit(name='unlink')
- assert 'The link with the FranceConnect account has been deleted' in response.text
- assert models.FcAccount.objects.count() == 0
- continue_url = response.pyquery('a#a2-continue').attr['href']
- state = urlparse.parse_qs(urlparse.urlparse(continue_url).query)['state'][0]
- assert app.session['fc_states'][state]['next'] == '/accounts/'
- response = app.get(reverse('fc-logout') + '?state=' + state)
- assert path(response['Location']) == '/accounts/'
-def test_login_email_is_unique(app, fc_settings, caplog):
- callback = reverse('fc-login-or-link')
- response = app.get(callback, status=302)
- location = response['Location']
- state = check_authorization_url(location)
+def test_create(settings, app, franceconnect, hooks):
+ # test direct creation
+ settings.A2_FC_CREATE = True
- @httmock.urlmatch(path=r'.*/token$')
- def access_token_response(url, request):
- parsed = {x: y[0] for x, y in urlparse.parse_qs(request.body).items()}
- assert set(parsed.keys()) == set(['code', 'client_id', 'client_secret', 'redirect_uri',
- 'grant_type'])
- assert parsed['code'] == 'zzz'
- assert parsed['client_id'] == 'xxx'
- assert parsed['client_secret'] == 'yyy'
- assert parsed['grant_type'] == 'authorization_code'
- parsed_redirect = urlparse.urlparse(parsed['redirect_uri'])
- parsed_callback = urlparse.urlparse(callback)
- assert parsed_redirect.path == parsed_callback.path
- for cb_key, cb_value in urlparse.parse_qs(parsed_callback.query).items():
- urlparse.parse_qs(parsed_redirect.query)[cb_key] == cb_value
- exp = now() + datetime.timedelta(seconds=1000)
- id_token = {
- 'sub': '1234',
- 'aud': 'xxx',
- 'nonce': state,
- 'exp': int(exp.timestamp()),
- 'iss': 'https://fcp.integ01.dev-franceconnect.fr/',
- }
- return json.dumps({
- 'access_token': 'uuu',
- 'id_token': hmac_jwt(id_token, 'yyy')
- })
+ response = app.get('/login/?service=portail&next=/idp/')
+ response = response.click(href='callback')
- @httmock.urlmatch(path=r'.*userinfo$')
- def user_info_response(url, request):
- assert request.headers['Authorization'] == 'Bearer uuu'
- return json.dumps({
- 'sub': '1234',
- 'family_name': u'Frédérique',
- 'given_name': u'Ÿuñe',
- 'email': 'jOhn.dOe@eXample.com',
- })
+ assert User.objects.count() == 0
+ response = franceconnect.handle_authorization(app, response.location, status=302)
+ assert User.objects.count() == 1
- user = User.objects.create(email='john.doe@example.com', first_name='John', last_name='Doe')
+ user = User.objects.get()
+ assert user.verified_attributes.first_name == 'Ÿuñe'
+ assert user.verified_attributes.last_name == 'Frédérique'
+ assert path(response['Location']) == '/idp/'
+ assert hooks.event[1]['kwargs']['name'] == 'login'
+ assert hooks.event[1]['kwargs']['service'] == 'portail'
+ # we must be connected
+ assert app.session['_auth_user_id']
+ assert app.session.get_expire_at_browser_close()
+ assert models.FcAccount.objects.count() == 1
+
+ # test unlink cancel case
+ response = app.get('/accounts/')
+ response = response.click('Delete link')
+ assert len(response.pyquery('[name=cancel][formnovalidate]')) == 1
+ response = response.form.submit(name='cancel')
+ response = response.follow()
+
+ # test unlink submit case
+ response = app.get('/accounts/')
+ response = response.click('Delete link')
+ response.form.set('new_password1', 'ikKL1234')
+ response.form.set('new_password2', 'ikKL1234')
+ response = response.form.submit(name='unlink')
+ assert 'The link with the FranceConnect account has been deleted' in response.text
+ assert models.FcAccount.objects.count() == 0
+ continue_url = response.pyquery('a#a2-continue').attr['href']
+ state = urlparse.parse_qs(urlparse.urlparse(continue_url).query)['state'][0]
+ assert app.session['fc_states'][state]['next'] == '/accounts/'
+ response = app.get(reverse('fc-logout') + '?state=' + state)
+ assert path(response['Location']) == '/accounts/'
+
+
+def test_create_expired(settings, app, franceconnect, hooks):
+ # test direct creation failure on an expired id_token
+ settings.A2_FC_CREATE = True
+ franceconnect.exp = now() - datetime.timedelta(seconds=30)
+
+ response = app.get('/login/?service=portail&next=/idp/')
+ response = response.click(href='callback')
+
+ assert User.objects.count() == 0
+ response = franceconnect.handle_authorization(app, response.location, status=302)
+ assert User.objects.count() == 0
+
+
+def test_login_email_is_unique(settings, app, franceconnect, caplog):
+ settings.A2_EMAIL_IS_UNIQUE = True
+ user = User(
+ email='john.doe@example.com',
+ first_name='John',
+ last_name='Doe')
user.set_password('toto')
user.save()
- fc_settings.A2_EMAIL_IS_UNIQUE = True
- with httmock.HTTMock(access_token_response, user_info_response):
- response = app.get(callback + '?code=zzz&state=%s' % state, status=302)
+ franceconnect.user_info['email'] = user.email
+
assert User.objects.count() == 1
- assert app.session['_auth_user_id']
+ franceconnect.login_with_fc_fixed_params(app)
+ assert User.objects.count() == 1
+ assert app.session['_auth_user_id'] == str(user.pk)
- # logout, test unlinking when logging with password
- app.session.flush()
- response = app.get('/login/')
- response.form.set('username', User.objects.get().email)
- response.form.set('password', 'toto')
- response = response.form.submit(name='login-password-submit').follow()
- response = app.get('/accounts/')
+def test_unlink_after_login_with_password(app, franceconnect, simple_user):
+ models.FcAccount.objects.create(user=simple_user, user_info='{}')
+
+ response = login(app, simple_user, path='/accounts/')
response = response.click('Delete link')
assert 'new_password1' not in response.form.fields
response = response.form.submit(name='unlink').follow()
assert 'The link with the FranceConnect account has been deleted' in response.text
+ # no logout from FC since we are not logged to it
assert response.request.path == '/accounts/'
-def test_login_email_is_unique_and_already_linked(app, fc_settings, caplog):
- callback = reverse('fc-login-or-link')
- response = app.get(callback, status=302)
- location = response['Location']
- state = check_authorization_url(location)
+def test_unlink_after_login_with_fc(app, franceconnect, simple_user):
+ models.FcAccount.objects.create(user=simple_user, sub=franceconnect.sub, user_info='{}')
- EMAIL = 'john.doe@example.com'
- SUB = '1234'
- user = User.objects.create(email=EMAIL, first_name='John', last_name='Doe')
+ response = franceconnect.login_with_fc(app, path='/accounts/')
+ response = response.click('Delete link')
+ response.form.set('new_password1', 'ikKL1234')
+ response.form.set('new_password2', 'ikKL1234')
+ response = response.form.submit(name='unlink')
+ assert 'The link with the FranceConnect account has been deleted' in response.text
+ assert models.FcAccount.objects.count() == 0
+ continue_url = response.pyquery('a#a2-continue').attr['href']
+ response = franceconnect.handle_logout(app, continue_url)
+ assert path(response.location) == '/accounts/'
+
+
+def test_login_email_is_unique_and_already_linked(settings, app, franceconnect, caplog):
+ settings.A2_EMAIL_IS_UNIQUE = True
+
+ # setup an already linked user account
+ user = User.objects.create(email='john.doe@example.com', first_name='John', last_name='Doe')
models.FcAccount.objects.create(user=user, sub='4567', token='xxx', user_info='{}')
-
- @httmock.urlmatch(path=r'.*/token$')
- def access_token_response(url, request):
- parsed = {x: y[0] for x, y in urlparse.parse_qs(request.body).items()}
- assert set(parsed.keys()) == set(['code', 'client_id', 'client_secret', 'redirect_uri',
- 'grant_type'])
- assert parsed['code'] == 'zzz'
- assert parsed['client_id'] == 'xxx'
- assert parsed['client_secret'] == 'yyy'
- assert parsed['grant_type'] == 'authorization_code'
- parsed_redirect = urlparse.urlparse(parsed['redirect_uri'])
- parsed_callback = urlparse.urlparse(callback)
- assert parsed_redirect.path == parsed_callback.path
- for cb_key, cb_value in urlparse.parse_qs(parsed_callback.query).items():
- urlparse.parse_qs(parsed_redirect.query)[cb_key] == cb_value
- exp = now() + datetime.timedelta(seconds=1000)
- id_token = {
- 'sub': SUB,
- 'aud': 'xxx',
- 'nonce': state,
- 'exp': int(exp.timestamp()),
- 'iss': 'https://fcp.integ01.dev-franceconnect.fr/',
- }
- return json.dumps({
- 'access_token': 'uuu',
- 'id_token': hmac_jwt(id_token, 'yyy')
- })
-
- @httmock.urlmatch(path=r'.*userinfo$')
- def user_info_response(url, request):
- assert request.headers['Authorization'] == 'Bearer uuu'
- return json.dumps({
- 'sub': '1234',
- 'family_name': u'Frédérique',
- 'given_name': u'Ÿuñe',
- 'email': EMAIL,
- })
-
- fc_settings.A2_EMAIL_IS_UNIQUE = True
- with httmock.HTTMock(access_token_response, user_info_response):
- response = app.get(callback + '?code=zzz&state=%s' % state, status=302)
- assert 'is already used' in str(response)
- assert User.objects.count() == 1
+ response = app.get('/login/?service=portail&next=/idp/')
+ response = response.click(href='callback')
+ response = franceconnect.handle_authorization(app, response.location, status=302)
+ assert models.FcAccount.objects.count() == 1
+ assert 'is already used' in app.cookies['messages']
assert '_auth_user_id' not in app.session
-def test_requests_proxies_support(app, fc_settings, caplog):
+def test_requests_proxies_support(settings, app):
session = requests_retry_session()
assert session.proxies == {}
other_session = requests.Session()
@@ -331,7 +193,8 @@ def test_requests_proxies_support(app, fc_settings, caplog):
session = requests_retry_session(session=other_session)
assert session is other_session
assert session.proxies == {'http': 'http://example.net'}
- fc_settings.REQUESTS_PROXIES = {'https': 'http://pubproxy.com/api/proxy'}
+
+ settings.REQUESTS_PROXIES = {'https': 'http://pubproxy.com/api/proxy'}
session = requests_retry_session()
assert session.proxies == {'https': 'http://pubproxy.com/api/proxy'}
@@ -341,95 +204,65 @@ def test_requests_proxies_support(app, fc_settings, caplog):
assert mocked_send.call_args[1]['proxies'] == {'https': 'http://pubproxy.com/api/proxy'}
-def test_password_reset(app, mailoutbox):
+def test_no_password_with_fc_account_can_reset_password(app, db, mailoutbox):
user = User.objects.create(email='john.doe@example.com')
+ # No FC account, forbidden to set a password
response = app.get('/login/')
response = response.click('Reset it!').maybe_follow()
response.form['email'] = user.email
assert len(mailoutbox) == 0
response = response.form.submit()
assert len(mailoutbox) == 1
- url = get_links_from_mail(mailoutbox[0])[0]
+ url = get_link_from_mail(mailoutbox[0])
+ response = app.get(url).follow().follow()
+ assert '_auth_user_id' not in app.session
+ assert 'not possible to reset' in response
+
+ # With FC account, can set a password
models.FcAccount.objects.create(user=user, sub='xxx', token='aaa')
- response = app.get(url).maybe_follow()
- assert 'new_password1' in response.form.fields
+ response = app.get('/login/')
+ response = response.click('Reset it!').maybe_follow()
+ response.form['email'] = user.email
+ assert len(mailoutbox) == 1
+ response = response.form.submit()
+ assert len(mailoutbox) == 2
+ url = get_link_from_mail(mailoutbox[1])
+ response = app.get(url, status=200)
+ response.form.set('new_password1', 'ikKL1234')
+ response.form.set('new_password2', 'ikKL1234')
+ response = response.form.submit().follow()
+ assert '_auth_user_id' in app.session
-def test_registration1(app, fc_settings, caplog, hooks):
- exp = now() + datetime.timedelta(seconds=1000)
- response = app.get('/login/?service=portail&next=/idp/')
- response = response.click(href="callback")
- # 1. Try a login
- # 2. Verify we come back to login page
- # 3. Check presence of registration link
- # 4. Follow it
- location = response['Location']
- state = check_authorization_url(location)
-
- @httmock.urlmatch(path=r'.*/token$')
- def access_token_response(url, request):
- parsed = {x: y[0] for x, y in urlparse.parse_qs(request.body).items()}
- assert set(parsed.keys()) == set(['code', 'client_id', 'client_secret', 'redirect_uri',
- 'grant_type'])
- assert parsed['code'] == 'zzz'
- assert parsed['client_id'] == 'xxx'
- assert parsed['client_secret'] == 'yyy'
- assert parsed['grant_type'] == 'authorization_code'
- parsed_redirect = urlparse.urlparse(parsed['redirect_uri'])
- parsed_callback = urlparse.urlparse(callback)
- assert parsed_redirect.path == parsed_callback.path
- for cb_key, cb_value in urlparse.parse_qs(parsed_callback.query).items():
- urlparse.parse_qs(parsed_redirect.query)[cb_key] == cb_value
- id_token = {
- 'sub': '1234',
- 'aud': 'xxx',
- 'nonce': state,
- 'exp': int(exp.timestamp()),
- 'iss': 'https://fcp.integ01.dev-franceconnect.fr/',
- 'email': 'john.doe@example.com',
- }
- return json.dumps({
- 'access_token': 'uuu',
- 'id_token': hmac_jwt(id_token, 'yyy')
- })
-
- @httmock.urlmatch(path=r'.*userinfo$')
- def user_info_response(url, request):
- assert request.headers['Authorization'] == 'Bearer uuu'
- return json.dumps({
- 'sub': '1234',
- 'family_name': u'Frédérique',
- 'given_name': u'Ÿuñe',
- 'email': 'john.doe@example.com',
- })
-
- callback = urlparse.parse_qs(urlparse.urlparse(location).query)['redirect_uri'][0]
- with httmock.HTTMock(access_token_response, user_info_response):
- response = app.get(callback + '&code=zzz&state=%s' % state, status=302)
+def test_registration1(settings, app, franceconnect, caplog, hooks):
+ response = franceconnect.login_with_fc_fixed_params(app)
assert User.objects.count() == 0
- assert path(response['Location']) == '/login/'
+ assert path(response.location) == '/login/'
response = response.follow()
response = response.click(href='/accounts/fc/register')
- location = response['Location']
- location.startswith('http://testserver/accounts/activate/')
+ response.location.startswith('http://testserver/accounts/activate/')
+ assert User.objects.count() == 0
response = response.follow()
- assert hooks.calls['event'][0]['kwargs']['service'] == 'portail'
+ assert response.location.startswith('/fc/callback/')
+ # a new user has been created
+ assert User.objects.count() == 1
+ # but no FcAccount
+ assert models.FcAccount.objects.count() == 0
# we must be connected
assert app.session['_auth_user_id']
- parsed_location = urlparse.urlparse(response['Location'])
- parsed_callback = urlparse.urlparse(callback)
- assert parsed_location.path == parsed_callback.path
- assert (urlparse.parse_qs(parsed_location.query) ==
- urlparse.parse_qs(parsed_callback.query))
+ # hook must have been called
+ assert hooks.calls['event'][0]['kwargs']['service'] == 'portail'
+
response = response.follow()
- location = response['Location']
- state = check_authorization_url(location)
- with httmock.HTTMock(access_token_response, user_info_response):
- response = app.get(callback + '&code=zzz&state=%s' % state, status=302)
+ # a new redirect to FC is done
+ response = franceconnect.handle_authorization(app, response.location)
+
+ # FcAccount now exists
assert models.FcAccount.objects.count() == 1
user = User.objects.get()
- assert user.verified_attributes.first_name == u'Ÿuñe'
- assert user.verified_attributes.last_name == u'Frédérique'
+ assert user.verified_attributes.first_name == 'Ÿuñe'
+ assert user.verified_attributes.last_name == 'Frédérique'
+
response = app.get('/accounts/')
response = response.click('Delete link')
response.form.set('new_password1', 'ikKL1234')
@@ -438,214 +271,60 @@ def test_registration1(app, fc_settings, caplog, hooks):
assert 'The link with the FranceConnect account has been deleted' in response.text
assert models.FcAccount.objects.count() == 0
continue_url = response.pyquery('a#a2-continue').attr['href']
- state = urlparse.parse_qs(urlparse.urlparse(continue_url).query)['state'][0]
- assert app.session['fc_states'][state]['next'] == '/accounts/'
- response = app.get(reverse('fc-logout') + '?state=' + state)
- assert path(response['Location']) == '/accounts/'
+ response = franceconnect.handle_logout(app, continue_url)
+ assert path(response.location) == '/accounts/'
-def test_registration2(app, fc_settings, caplog, hooks):
- exp = now() + datetime.timedelta(seconds=1000)
+def test_registration2(settings, app, franceconnect, hooks):
response = app.get('/login/?service=portail&next=/idp/')
response = response.click("Register")
response = response.click(href='callback')
- # 1. Try a login
- # 2. Verify we come back to login page
- # 3. Check presence of registration link
- # 4. Follow it
- location = response['Location']
- state = check_authorization_url(location)
+ franceconnect.callback_params['registration'] = ''
+ response = franceconnect.handle_authorization(app, response.location)
- @httmock.urlmatch(path=r'.*/token$')
- def access_token_response(url, request):
- parsed = {x: y[0] for x, y in urlparse.parse_qs(request.body).items()}
- assert set(parsed.keys()) == set(['code', 'client_id', 'client_secret', 'redirect_uri',
- 'grant_type'])
- assert parsed['code'] == 'zzz'
- assert parsed['client_id'] == 'xxx'
- assert parsed['client_secret'] == 'yyy'
- assert parsed['grant_type'] == 'authorization_code'
- parsed_redirect = urlparse.urlparse(parsed['redirect_uri'])
- parsed_callback = urlparse.urlparse(callback)
- assert parsed_redirect.path == parsed_callback.path
- for cb_key, cb_value in urlparse.parse_qs(parsed_callback.query).items():
- urlparse.parse_qs(parsed_redirect.query)[cb_key] == cb_value
- id_token = {
- 'sub': '1234',
- 'aud': 'xxx',
- 'nonce': state,
- 'exp': int(exp.timestamp()),
- 'iss': 'https://fcp.integ01.dev-franceconnect.fr/',
- 'email': 'john.doe@example.com',
- }
- return json.dumps({
- 'access_token': 'uuu',
- 'id_token': hmac_jwt(id_token, 'yyy')
- })
-
- @httmock.urlmatch(path=r'.*userinfo$')
- def user_info_response(url, request):
- assert request.headers['Authorization'] == 'Bearer uuu'
- return json.dumps({
- 'sub': '1234',
- 'family_name': u'Frédérique',
- 'given_name': u'Ÿuñe',
- 'email': 'john.doe@example.com',
- })
-
- callback = urlparse.parse_qs(urlparse.urlparse(location).query)['redirect_uri'][0]
- with httmock.HTTMock(access_token_response, user_info_response):
- response = app.get(callback + '&code=zzz&state=%s' % state, status=302)
assert User.objects.count() == 0
- assert path(response['Location']) == '/accounts/fc/register/'
+ assert path(response.location) == '/accounts/fc/register/'
response = response.follow()
- location = response['Location']
- location.startswith('http://testserver/accounts/activate/')
+ response.location.startswith('http://testserver/accounts/activate/')
response = response.follow()
+ assert User.objects.count() == 1
+ user = User.objects.get()
+ assert user.verified_attributes.first_name is None
+ assert user.verified_attributes.last_name is None
assert hooks.calls['event'][0]['kwargs']['service'] == 'portail'
assert hooks.calls['event'][1]['kwargs']['service'] == 'portail'
# we must be connected
assert app.session['_auth_user_id']
- # remove the registration parameter
- callback = callback.replace('®istration=', '')
- callback = callback.replace('?registration=', '?')
- callback = callback.replace('?&', '?')
- parsed_location = urlparse.urlparse(response['Location'])
- parsed_callback = urlparse.urlparse(callback)
- assert parsed_location.path == parsed_callback.path
- assert (urlparse.parse_qs(parsed_location.query) ==
- urlparse.parse_qs(parsed_callback.query))
response = response.follow()
- location = response['Location']
- state = check_authorization_url(location)
- with httmock.HTTMock(access_token_response, user_info_response):
- response = app.get(callback + '&code=zzz&state=%s' % state, status=302)
- assert models.FcAccount.objects.count() == 1
+
+ del franceconnect.callback_params['registration']
+ response = franceconnect.handle_authorization(app, response.location)
user = User.objects.get()
- assert user.verified_attributes.first_name == u'Ÿuñe'
- assert user.verified_attributes.last_name == u'Frédérique'
- response = app.get('/accounts/')
- response = response.click('Delete link')
- response.form.set('new_password1', 'ikKL1234')
- response.form.set('new_password2', 'ikKL1234')
- response = response.form.submit(name='unlink')
- assert 'The link with the FranceConnect account has been deleted' in response.text
- assert models.FcAccount.objects.count() == 0
- continue_url = response.pyquery('a#a2-continue').attr['href']
- state = urlparse.parse_qs(urlparse.urlparse(continue_url).query)['state'][0]
- assert app.session['fc_states'][state]['next'] == '/accounts/'
- response = app.get(reverse('fc-logout') + '?state=' + state)
- assert path(response['Location']) == '/accounts/'
+ assert user.verified_attributes.first_name == 'Ÿuñe'
+ assert user.verified_attributes.last_name == 'Frédérique'
-def test_can_change_password(app, fc_settings, caplog, hooks):
- exp = now() + datetime.timedelta(seconds=1000)
- response = app.get('/login/?service=portail&next=/idp/')
- response = response.click("Register")
- response = response.click(href='callback')
- # 1. Try a login
- # 2. Verify we come back to login page
- # 3. Check presence of registration link
- # 4. Follow it
- location = response['Location']
- state = check_authorization_url(location)
+def test_can_change_password(settings, app, franceconnect):
+ user = User.objects.create(email='john.doe@example.com')
+ models.FcAccount.objects.create(user=user, sub=franceconnect.sub)
- @httmock.urlmatch(path=r'.*/token$')
- def access_token_response(url, request):
- parsed = {x: y[0] for x, y in urlparse.parse_qs(request.body).items()}
- assert set(parsed.keys()) == set(['code', 'client_id', 'client_secret', 'redirect_uri',
- 'grant_type'])
- assert parsed['code'] == 'zzz'
- assert parsed['client_id'] == 'xxx'
- assert parsed['client_secret'] == 'yyy'
- assert parsed['grant_type'] == 'authorization_code'
- parsed_redirect = urlparse.urlparse(parsed['redirect_uri'])
- parsed_callback = urlparse.urlparse(callback)
- assert parsed_redirect.path == parsed_callback.path
- for cb_key, cb_value in urlparse.parse_qs(parsed_callback.query).items():
- urlparse.parse_qs(parsed_redirect.query)[cb_key] == cb_value
- id_token = {
- 'sub': '1234',
- 'aud': 'xxx',
- 'nonce': state,
- 'exp': int(exp.timestamp()),
- 'iss': 'https://fcp.integ01.dev-franceconnect.fr/',
- 'email': 'john.doe@example.com',
- }
- return json.dumps({
- 'access_token': 'uuu',
- 'id_token': hmac_jwt(id_token, 'yyy')
- })
-
- @httmock.urlmatch(path=r'.*userinfo$')
- def user_info_response(url, request):
- assert request.headers['Authorization'] == 'Bearer uuu'
- return json.dumps({
- 'sub': '1234',
- 'family_name': u'Frédérique',
- 'given_name': u'Ÿuñe',
- 'email': 'john.doe@example.com',
- })
-
- callback = urlparse.parse_qs(urlparse.urlparse(location).query)['redirect_uri'][0]
- with httmock.HTTMock(access_token_response, user_info_response):
- response = app.get(callback + '&code=zzz&state=%s' % state, status=302)
- assert User.objects.count() == 0
- assert path(response['Location']) == '/accounts/fc/register/'
- response = response.follow()
- location = response['Location']
- location.startswith('http://testserver/accounts/activate/')
- response = response.follow()
- assert hooks.calls['event'][0]['kwargs']['service'] == 'portail'
- assert hooks.calls['event'][1]['kwargs']['service'] == 'portail'
- # we must be connected
- assert app.session['_auth_user_id']
- # remove the registration parameter
- callback = callback.replace('®istration=', '')
- callback = callback.replace('?registration=', '?')
- callback = callback.replace('?&', '?')
- parsed_location = urlparse.urlparse(response['Location'])
- parsed_callback = urlparse.urlparse(callback)
- assert parsed_location.path == parsed_callback.path
- assert (urlparse.parse_qs(parsed_location.query) ==
- urlparse.parse_qs(parsed_callback.query))
- response = response.follow()
- location = response['Location']
- state = check_authorization_url(location)
- with httmock.HTTMock(access_token_response, user_info_response):
- response = app.get(callback + '&code=zzz&state=%s' % state, status=302)
- assert models.FcAccount.objects.count() == 1
- user = User.objects.get()
- assert user.verified_attributes.first_name == u'Ÿuñe'
- assert user.verified_attributes.last_name == u'Frédérique'
- response = app.get('/accounts/')
+ response = franceconnect.login_with_fc(app, path='/accounts/')
assert len(response.pyquery('[href*="password/change"]')) == 0
+ response = response.click('Logout')
+ response = franceconnect.handle_logout(app, response.location).follow()
+ assert '_auth_user_id' not in app.session
# Login with password
- user = User.objects.get()
+ user.username = 'test'
user.set_password('test')
user.save()
- app.session.flush()
- response = app.get('/login/')
- response.form.set('username', User.objects.get().email)
- response.form.set('password', 'test')
- response = response.form.submit(name='login-password-submit').follow()
- response = app.get('/accounts/')
+
+ response = login(app, user, path='/accounts/')
assert len(response.pyquery('[href*="password/change"]')) > 0
+ response = response.click('Logout').follow()
# Relogin with FC
- app.session.flush()
- response = app.get('/login/?service=portail&next=/accounts/')
- response = response.click(href='callback')
- location = response['Location']
- state = check_authorization_url(location)
- callback = urlparse.parse_qs(urlparse.urlparse(location).query)['redirect_uri'][0]
- with httmock.HTTMock(access_token_response, user_info_response):
- response = app.get(callback + '&code=zzz&state=%s' % state, status=302)
- # we must be connected
- assert app.session['_auth_user_id']
- assert path(response['Location']) == '/accounts/'
- response = response.follow()
+ response = franceconnect.login_with_fc(app, path='/accounts/')
assert len(response.pyquery('[href*="password/change"]')) == 0
# Unlink
@@ -653,20 +332,17 @@ def test_can_change_password(app, fc_settings, caplog, hooks):
response.form.set('new_password1', 'ikKL1234')
response.form.set('new_password2', 'ikKL1234')
response = response.form.submit(name='unlink')
+ assert 'The link with the FranceConnect account has been deleted' in response.text
continue_url = response.pyquery('a#a2-continue').attr['href']
- state = urlparse.parse_qs(urlparse.urlparse(continue_url).query)['state'][0]
- assert app.session['fc_states'][state]['next'] == '/accounts/'
- response = app.get(reverse('fc-logout') + '?state=' + state)
- assert path(response['Location']) == '/accounts/'
- response = response.follow()
+ response = franceconnect.handle_logout(app, continue_url).follow()
assert len(response.pyquery('[href*="password/change"]')) > 0
-def test_invalid_next_url(app, fc_settings, caplog, hooks):
+def test_invalid_next_url(app, franceconnect):
assert app.get('/fc/callback/?code=coin&next=JJJ72QQQ').location == 'JJJ72QQQ'
-def test_manager_user_sidebar(app, fc_settings, superuser, simple_user):
+def test_manager_user_sidebar(app, superuser, simple_user):
login(app, superuser, '/manage/')
response = app.get('/manage/users/%s/' % simple_user.id)
assert 'FranceConnect' not in response
diff --git a/tests/auth_fc/test_auth_fc_api.py b/tests/auth_fc/test_auth_fc_api.py
index 20160045c..8a6f91908 100644
--- a/tests/auth_fc/test_auth_fc_api.py
+++ b/tests/auth_fc/test_auth_fc_api.py
@@ -18,8 +18,9 @@
from authentic2_auth_fc.models import FcAccount
-def test_api_fc_unlink(app, admin, user_cartman):
- url = '/api/users/%s/fc-unlink/' % user_cartman.uuid
+def test_api_fc_unlink(app, admin, simple_user):
+ FcAccount.objects.create(user=simple_user)
+ url = '/api/users/%s/fc-unlink/' % simple_user.uuid
# test unauthorized caller
app.delete(url, status=401)
# test unauthorized method
@@ -27,13 +28,14 @@ def test_api_fc_unlink(app, admin, user_cartman):
app.get(url, status=405)
# test success
app.delete(url, status=204)
- assert FcAccount.objects.filter(user=user_cartman).exists() is False
+ assert FcAccount.objects.filter(user=simple_user).exists() is False
-def test_api_user_franceconnect(settings, app, admin, user_cartman):
+def test_api_user_franceconnect(settings, app, admin, simple_user):
settings.A2_FC_ENABLE = True
+ FcAccount.objects.create(user=simple_user, sub='1234')
- url = '/api/users/%s/' % user_cartman.uuid
+ url = '/api/users/%s/' % simple_user.uuid
# test unauthorized method
app.authorization = ('Basic', (admin.username, admin.username))
response = app.get(url)
@@ -48,7 +50,7 @@ def test_api_user_franceconnect(settings, app, admin, user_cartman):
assert content.get('unlink_url').startswith('http://')
assert content.get('unlink_url').endswith('/unlink/')
- unlink_url = '/api/users/%s/fc-unlink/' % user_cartman.uuid
+ unlink_url = '/api/users/%s/fc-unlink/' % simple_user.uuid
app.delete(unlink_url, status=204)
response = app.get(url + '?full')