summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBenjamin Dauvergne <bdauvergne@entrouvert.com>2019-06-15 06:55:53 (GMT)
committerBenjamin Dauvergne <bdauvergne@entrouvert.com>2019-07-29 22:30:31 (GMT)
commit53c5124fd03332e8cc92cdc7ecdb2846ebd13970 (patch)
tree5619882aa2152f65944b8dce701e8a3cbb27f996
parent6aeece1d25ac7f968d6e4be89710c7914d073be1 (diff)
downloadauthentic-wip/33991-link-by-email-link-on-login.zip
authentic-wip/33991-link-by-email-link-on-login.tar.gz
authentic-wip/33991-link-by-email-link-on-login.tar.bz2
-rw-r--r--tests/auth_fc/test_auth_fc.py261
1 files changed, 154 insertions, 107 deletions
diff --git a/tests/auth_fc/test_auth_fc.py b/tests/auth_fc/test_auth_fc.py
index fcd0e92..48ddb31 100644
--- a/tests/auth_fc/test_auth_fc.py
+++ b/tests/auth_fc/test_auth_fc.py
@@ -15,6 +15,7 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
+import contextlib
import pytest
import re
import urlparse
@@ -31,7 +32,8 @@ from django.core.urlresolvers import reverse
from django.contrib.auth import get_user_model
from django.utils.timezone import now
-from authentic2.utils import timestamp_from_datetime
+from authentic2.a2_rbac.utils import get_default_ou
+from authentic2.utils import timestamp_from_datetime, make_url
from authentic2_auth_fc import models
from authentic2_auth_fc.utils import requests_retry_session
@@ -40,6 +42,96 @@ from authentic2_auth_fc.utils import requests_retry_session
User = get_user_model()
+class FcMock(object):
+ code = 'zzz'
+ sub = '1234'
+ client_id = 'xxx'
+ client_secret = 'yyy'
+ access_token = 'uuu'
+ iss = 'https://fcp.integ01.dev-franceconnect.fr/'
+ user_info__family_name = u'Frédérique'
+ user_info__given_name = u'Ÿuñe'
+
+ def __init__(self, **kwargs):
+ self.__dict__.update(**kwargs)
+
+ def check_authorization_url(self, url):
+ callback = reverse('fc-login-or-link')
+ assert url.startswith('https://fcp.integ01')
+ query_string = url.split('?')[1]
+ parsed = {x: y[0] for x, y in urlparse.parse_qs(query_string).items()}
+ assert 'redirect_uri' in parsed
+ assert callback in parsed['redirect_uri']
+ assert 'client_id' in parsed
+ assert parsed['client_id'] == 'xxx'
+ assert 'scope' in parsed
+ assert set(parsed['scope'].split()) == set(['openid', 'profile', 'birth', 'email'])
+ assert 'state' in parsed
+ assert 'nonce' in parsed
+ assert parsed['state'] == parsed['nonce']
+ assert 'response_type' in parsed
+ assert parsed['response_type'] == 'code'
+ return parsed['state'], parsed['redirect_uri']
+
+ @contextlib.contextmanager
+ def mock(self, response_or_url, callback=None, **kwargs):
+ old_dict = self.__dict__.copy()
+ self.__dict__.update(**kwargs)
+ exp = getattr(self, 'exp', timestamp_from_datetime(now() + datetime.timedelta(seconds=1000)))
+
+ if hasattr(response_or_url, 'location'):
+ location = response_or_url.location
+ else:
+ location = response_or_url
+
+ state, redirect_uri = self.check_authorization_url(location)
+
+ @httmock.urlmatch(path=r'.*/token$')
+ def access_token_response(url, request):
+ parsed = {x: y[0] for x, y in urlparse.parse_qs(request.body).items()}
+ assert set(parsed.keys()) == set(['code', 'client_id', 'client_secret', 'redirect_uri',
+ 'grant_type'])
+ assert parsed['code'] == self.code
+ assert parsed['client_id'] == self.client_id
+ assert parsed['client_secret'] == self.client_secret
+ assert parsed['grant_type'] == 'authorization_code'
+ assert parsed['redirect_uri'] == redirect_uri
+ id_token = {
+ 'sub': self.sub,
+ 'aud': self.client_id,
+ 'nonce': state,
+ 'exp': exp,
+ 'iss': self.iss,
+ }
+ return json.dumps({
+ 'access_token': self.access_token,
+ 'id_token': hmac_jwt(id_token, 'yyy')
+ })
+
+ @httmock.urlmatch(path=r'.*userinfo$')
+ def user_info_response(url, request):
+ assert request.headers['Authorization'] == 'Bearer uuu'
+ user_info = {
+ 'sub': self.sub,
+ }
+ for key in dir(self):
+ if key.startswith('user_info__'):
+ user_info[key[len('user_info__'):]] = getattr(self, key)
+ return json.dumps(user_info)
+
+ callback_url = make_url(redirect_uri, params={'code': self.code, 'state': state})
+ try:
+ with httmock.HTTMock(access_token_response, user_info_response):
+ yield callback_url
+ finally:
+ self.__dict__.update(old_dict)
+
+
+@pytest.fixture
+def fc(fc_settings):
+ yield FcMock()
+
+
def path(url):
return urlparse.urlparse(url).path
@@ -88,126 +180,81 @@ def check_authorization_url(url):
return parsed['state']
-@pytest.mark.parametrize('exp', [timestamp_from_datetime(now() + datetime.timedelta(seconds=1000)),
- timestamp_from_datetime(now() - datetime.timedelta(seconds=1000))])
-def test_login_simple(app, fc_settings, caplog, hooks, exp):
+def test_nocreate(app, fc_settings, fc, caplog, hooks):
response = app.get('/login/?service=portail&next=/idp/')
response = response.click(href='callback')
- location = response['Location']
- state = check_authorization_url(location)
- @httmock.urlmatch(path=r'.*/token$')
- def access_token_response(url, request):
- parsed = {x: y[0] for x, y in urlparse.parse_qs(request.body).items()}
- assert set(parsed.keys()) == set(['code', 'client_id', 'client_secret', 'redirect_uri',
- 'grant_type'])
- assert parsed['code'] == 'zzz'
- assert parsed['client_id'] == 'xxx'
- assert parsed['client_secret'] == 'yyy'
- assert parsed['grant_type'] == 'authorization_code'
- assert callback in parsed['redirect_uri']
- id_token = {
- 'sub': '1234',
- 'aud': 'xxx',
- 'nonce': state,
- 'exp': exp,
- 'iss': 'https://fcp.integ01.dev-franceconnect.fr/',
- }
- return json.dumps({
- 'access_token': 'uuu',
- 'id_token': hmac_jwt(id_token, 'yyy')
- })
+ with fc.mock(response) as callback_url:
+ response = app.get(callback_url)
- @httmock.urlmatch(path=r'.*userinfo$')
- def user_info_response(url, request):
- assert request.headers['Authorization'] == 'Bearer uuu'
- return json.dumps({
- 'sub': '1234',
- 'family_name': u'Frédérique',
- 'given_name': u'Ÿuñe',
- })
+ assert User.objects.count() == 0
+ assert response.location.startswith('/login/')
+
+
+def test_create_expired(app, fc_settings, fc, caplog, hooks):
+ fc_settings.A2_FC_CREATE = True
+
+ response = app.get('/login/?service=portail&next=/idp/')
+ response = response.click(href='callback')
+
+ exp = timestamp_from_datetime(now() - datetime.timedelta(seconds=1000))
+ with fc.mock(response, exp=exp) as callback_url:
+ response = app.get(callback_url)
- callback = reverse('fc-login-or-link')
- with httmock.HTTMock(access_token_response, user_info_response):
- response = app.get(callback + '?service=portail&next=/idp/&code=zzz&state=%s' % state, status=302)
assert User.objects.count() == 0
+
+
+def test_create(app, fc_settings, fc, caplog, hooks):
fc_settings.A2_FC_CREATE = True
- with httmock.HTTMock(access_token_response, user_info_response):
- response = app.get(callback + '?service=portail&next=/idp/&code=zzz&state=%s' % state, status=302)
- if exp < timestamp_from_datetime(now()):
- assert User.objects.count() == 0
- else:
- assert User.objects.count() == 1
- if User.objects.count():
- user = User.objects.get()
- assert user.verified_attributes.first_name == u'Ÿuñe'
- assert user.verified_attributes.last_name == u'Frédérique'
- assert path(response['Location']) == '/idp/'
- assert hooks.event[1]['kwargs']['name'] == 'login'
- assert hooks.event[1]['kwargs']['service'] == 'portail'
- # we must be connected
- assert app.session['_auth_user_id']
- assert models.FcAccount.objects.count() == 1
- response = app.get('/accounts/')
- response = response.click('Delete link')
- response.form.set('new_password1', 'ikKL1234')
- response.form.set('new_password2', 'ikKL1234')
- response = response.form.submit(name='unlink')
- assert 'The link with the FranceConnect account has been deleted' in response.content
- assert models.FcAccount.objects.count() == 0
- continue_url = response.pyquery('a#a2-continue').attr['href']
- state = urlparse.parse_qs(urlparse.urlparse(continue_url).query)['state'][0]
- assert app.session['fc_states'][state]['next'] == '/accounts/'
- response = app.get(reverse('fc-logout') + '?state=' + state)
- assert path(response['Location']) == '/accounts/'
-
-
-def test_login_email_is_unique(app, fc_settings, caplog):
- callback = reverse('fc-login-or-link')
- response = app.get(callback, status=302)
- location = response['Location']
- state = check_authorization_url(location)
- @httmock.urlmatch(path=r'.*/token$')
- def access_token_response(url, request):
- parsed = {x: y[0] for x, y in urlparse.parse_qs(request.body).items()}
- assert set(parsed.keys()) == set(['code', 'client_id', 'client_secret', 'redirect_uri',
- 'grant_type'])
- assert parsed['code'] == 'zzz'
- assert parsed['client_id'] == 'xxx'
- assert parsed['client_secret'] == 'yyy'
- assert parsed['grant_type'] == 'authorization_code'
- assert callback in parsed['redirect_uri']
- id_token = {
- 'sub': '1234',
- 'aud': 'xxx',
- 'nonce': state,
- 'exp': timestamp_from_datetime(now() + datetime.timedelta(seconds=1000)),
- 'iss': 'https://fcp.integ01.dev-franceconnect.fr/',
- }
- return json.dumps({
- 'access_token': 'uuu',
- 'id_token': hmac_jwt(id_token, 'yyy')
- })
+ response = app.get('/login/?service=portail&next=/idp/')
+ response = response.click(href='callback')
- @httmock.urlmatch(path=r'.*userinfo$')
- def user_info_response(url, request):
- assert request.headers['Authorization'] == 'Bearer uuu'
- return json.dumps({
- 'sub': '1234',
- 'family_name': u'Frédérique',
- 'given_name': u'Ÿuñe',
- 'email': 'jOhn.dOe@eXample.com',
- })
+ with fc.mock(response) as callback_url:
+ response = app.get(callback_url)
+ assert User.objects.count() == 1
+ user = User.objects.get()
+ assert user.verified_attributes.first_name == u'Ÿuñe'
+ assert user.verified_attributes.last_name == u'Frédérique'
+ assert path(response['Location']) == '/idp/'
+ assert hooks.event[1]['kwargs']['name'] == 'login'
+ assert hooks.event[1]['kwargs']['service'] == 'portail'
+ # we must be connected
+ assert app.session['_auth_user_id']
+ assert models.FcAccount.objects.count() == 1
+ response = app.get('/accounts/')
+ response = response.click('Delete link')
+ response.form.set('new_password1', 'ikKL1234')
+ response.form.set('new_password2', 'ikKL1234')
+ response = response.form.submit(name='unlink')
+ assert 'The link with the FranceConnect account has been deleted' in response.content
+ assert models.FcAccount.objects.count() == 0
+ continue_url = response.pyquery('a#a2-continue').attr['href']
+ state = urlparse.parse_qs(urlparse.urlparse(continue_url).query)['state'][0]
+ assert app.session['fc_states'][state]['next'] == '/accounts/'
+ response = app.get(reverse('fc-logout') + '?state=' + state)
+ assert path(response['Location']) == '/accounts/'
- user = User.objects.create(email='john.doe@example.com', first_name='John', last_name='Doe')
+
+def test_link_by_email(app, fc, fc_settings, caplog):
+ user = User.objects.create(
+ email='john.doe@example.com',
+ first_name='John',
+ last_name='Doe',
+ ou=get_default_ou())
user.set_password('toto')
user.save()
- fc_settings.A2_EMAIL_IS_UNIQUE = True
- with httmock.HTTMock(access_token_response, user_info_response):
- response = app.get(callback + '?code=zzz&state=%s' % state, status=302)
+
+ response = app.get('/login/?service=portail&next=/idp/')
+ response = response.click(href='callback')
+
+ assert models.FcAccount.objects.count() == 0
+ with fc.mock(response, user_info__email='jOhn.dOe@eXample.com') as callback_url:
+ response = app.get(callback_url)
+
assert User.objects.count() == 1
assert app.session['_auth_user_id']
+ assert models.FcAccount.objects.filter(user=user, sub=fc.sub, order=0).count() == 1
# logout, test unlinking when logging with password
app.session.flush()