tests: simplify FranceConnect tests (#48042)
This commit is contained in:
parent
9f08f5c475
commit
e28713c583
|
@ -14,129 +14,149 @@
|
|||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import base64
|
||||
import contextlib
|
||||
import datetime
|
||||
import json
|
||||
import urllib.parse as urlparse
|
||||
import uuid
|
||||
|
||||
from jwcrypto import jwk, jwt
|
||||
import httmock
|
||||
import pytest
|
||||
import django_webtest
|
||||
|
||||
from django.contrib.auth import get_user_model
|
||||
from django.core.cache import cache
|
||||
from django_rbac.utils import get_ou_model
|
||||
|
||||
from authentic2 import hooks as a2_hooks
|
||||
from authentic2.manager.utils import get_ou_count
|
||||
from authentic2_auth_fc.models import FcAccount
|
||||
from django.http import QueryDict
|
||||
from django.urls import reverse
|
||||
from django.utils.http import urlencode
|
||||
from django.utils.timezone import now
|
||||
|
||||
|
||||
CARTMAN_FC_INFO = {
|
||||
"token": {
|
||||
"access_token": "cartmane_access_token",
|
||||
"token_type": "Bearer",
|
||||
"expires_in": 1200,
|
||||
"id_token": "cartman_token_id"
|
||||
},
|
||||
"sub": "c11661ed00014db58149c8a886c8180d",
|
||||
"user_info": {
|
||||
"birthcountry": "99404",
|
||||
"birthdate": "2006-06-06",
|
||||
"birthplace": "southpark",
|
||||
"email": "ecartman@ou_southpark.org",
|
||||
"family_name": "CARTMAN",
|
||||
"gender": "male",
|
||||
"given_name": "Eric",
|
||||
"preferred_username": "CARTMAN",
|
||||
"sub": "c11661ed00014db58149c8a886c8180d"
|
||||
}
|
||||
}
|
||||
from authentic2.models import Service
|
||||
from authentic2.utils import make_url
|
||||
|
||||
from ..utils import assert_equals_url
|
||||
|
||||
CLIENT_ID = 'xxx'
|
||||
CLIENT_SECRET = 'yyy'
|
||||
|
||||
|
||||
def create_user(**kwargs):
|
||||
User = get_user_model()
|
||||
password = kwargs.pop('password', None) or kwargs['username']
|
||||
federation = kwargs.pop('federation', None)
|
||||
user, created = User.objects.get_or_create(**kwargs)
|
||||
if password:
|
||||
user.set_password(password)
|
||||
user.save()
|
||||
class FranceConnectMock:
|
||||
exp = None
|
||||
|
||||
if federation:
|
||||
create_fc_federation(user, federation)
|
||||
return user
|
||||
|
||||
|
||||
def create_fc_federation(user, info):
|
||||
kwargs = {
|
||||
'user': user,
|
||||
'token': json.dumps(info['token']),
|
||||
'user_info': json.dumps(info['user_info']),
|
||||
'sub': info['sub']
|
||||
}
|
||||
return FcAccount.objects.create(**kwargs)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def app(request, db):
|
||||
wtm = django_webtest.WebTestMixin()
|
||||
wtm._patch_settings()
|
||||
request.addfinalizer(wtm._unpatch_settings)
|
||||
return django_webtest.DjangoTestApp(extra_environ={'HTTP_HOST': 'testserver'})
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def fc_settings(settings):
|
||||
settings.A2_FC_ENABLE = True
|
||||
settings.A2_FC_CLIENT_ID = 'xxx'
|
||||
settings.A2_FC_CLIENT_SECRET = 'yyy'
|
||||
return settings
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def ou_southpark(db):
|
||||
OU = get_ou_model()
|
||||
return OU.objects.create(name='southpark', slug='southpark')
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def admin(db):
|
||||
return create_user(username='admin', is_superuser=True, is_staff=True)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def user_cartman(db, ou_southpark):
|
||||
return create_user(username='ecartman', first_name='eric', last_name='cartman',
|
||||
email='ecartman@southpark.org', ou=ou_southpark, federation=CARTMAN_FC_INFO)
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def clear_cache():
|
||||
OU = get_ou_model()
|
||||
|
||||
cache.clear()
|
||||
for cached_el in (OU.cached, a2_hooks.get_hooks, get_ou_count):
|
||||
cached_el.cache.clear()
|
||||
|
||||
|
||||
class AllHook(object):
|
||||
def __init__(self):
|
||||
self.calls = {}
|
||||
self.sub = '1234'
|
||||
self.id_token = {
|
||||
'aud': 'xxx',
|
||||
'iss': 'https://fcp.integ01.dev-franceconnect.fr/',
|
||||
}
|
||||
self.user_info = {
|
||||
'family_name': 'Frédérique',
|
||||
'given_name': 'Ÿuñe',
|
||||
'email': 'john.doe@example.com',
|
||||
}
|
||||
self.access_token = str(uuid.uuid4())
|
||||
self.client_id = CLIENT_ID
|
||||
self.client_secret = CLIENT_SECRET
|
||||
self.scopes = {'openid', 'profile', 'email'}
|
||||
self.callback_params = {'service': 'portail', 'next': '/idp/'}
|
||||
|
||||
def __call__(self, hook_name, *args, **kwargs):
|
||||
calls = self.calls.setdefault(hook_name, [])
|
||||
calls.append({'args': args, 'kwargs': kwargs})
|
||||
def handle_authorization(self, app, url, **kwargs):
|
||||
assert url.startswith('https://fcp.integ01')
|
||||
parsed_url = urlparse.urlparse(url)
|
||||
query = QueryDict(parsed_url.query)
|
||||
assert_equals_url(query['redirect_uri'], self.callback_url)
|
||||
assert query['client_id'] == self.client_id
|
||||
assert set(query['scope'].split()) == self.scopes
|
||||
assert query['state']
|
||||
assert query['nonce']
|
||||
assert query['response_type'] == 'code'
|
||||
assert query['acr_values'] == 'eidas1'
|
||||
self.state = query['state']
|
||||
self.nonce = query['nonce']
|
||||
self.code = str(uuid.uuid4().hex)
|
||||
return app.get(
|
||||
make_url(self.callback_url, params={'code': self.code, 'state': self.state}), **kwargs)
|
||||
|
||||
def __getattr__(self, name):
|
||||
return self.calls.get(name, [])
|
||||
@property
|
||||
def callback_url(self):
|
||||
return 'http://testserver' + reverse('fc-login-or-link') + '?' + urlencode(self.callback_params)
|
||||
|
||||
def clear(self):
|
||||
self.calls = {}
|
||||
def login_with_fc_fixed_params(self, app):
|
||||
if app.session:
|
||||
app.session.flush()
|
||||
response = app.get('/login/?' + urlencode(self.callback_params))
|
||||
response = response.click(href='callback')
|
||||
return self.handle_authorization(app, response.location, status=302)
|
||||
|
||||
def login_with_fc(self, app, path):
|
||||
if app.session:
|
||||
app.session.flush()
|
||||
response = app.get(path)
|
||||
self.callback_params = {k: v for k, v in QueryDict(urlparse.urlparse(response.location).query).items()}
|
||||
response = response.follow()
|
||||
response = response.click(href='callback')
|
||||
return self.handle_authorization(app, response.location, status=302).follow()
|
||||
|
||||
def access_token_response(self, url, request):
|
||||
formdata = QueryDict(request.body)
|
||||
assert set(formdata.keys()) == {'code', 'client_id', 'client_secret',
|
||||
'redirect_uri', 'grant_type'}
|
||||
assert formdata['code'] == self.code
|
||||
assert formdata['client_id'] == self.client_id
|
||||
assert formdata['client_secret'] == self.client_secret
|
||||
assert formdata['grant_type'] == 'authorization_code'
|
||||
assert_equals_url(formdata['redirect_uri'], self.callback_url)
|
||||
|
||||
# make response
|
||||
id_token = self.id_token.copy()
|
||||
id_token.update({
|
||||
'sub': self.sub,
|
||||
'nonce': self.nonce,
|
||||
'exp': int((self.exp or (now() + datetime.timedelta(seconds=60))).timestamp()),
|
||||
})
|
||||
id_token.update(self.user_info)
|
||||
return json.dumps({
|
||||
'access_token': self.access_token,
|
||||
'id_token': self.hmac_jwt(id_token, self.client_secret)
|
||||
})
|
||||
|
||||
def hmac_jwt(self, payload, key):
|
||||
header = {'alg': 'HS256'}
|
||||
k = jwk.JWK(kty='oct', k=base64.b64encode(key.encode('utf-8')).decode('ascii'))
|
||||
t = jwt.JWT(header=header, claims=payload)
|
||||
t.make_signed_token(k)
|
||||
return t.serialize()
|
||||
|
||||
def user_info_response(self, url, request):
|
||||
assert request.headers['Authorization'] == 'Bearer %s' % self.access_token
|
||||
user_info = self.user_info.copy()
|
||||
user_info['sub'] = self.sub
|
||||
return json.dumps(user_info)
|
||||
|
||||
@contextlib.contextmanager
|
||||
def __call__(self):
|
||||
with httmock.HTTMock(
|
||||
httmock.urlmatch(path=r'.*/token$')(self.access_token_response),
|
||||
httmock.urlmatch(path=r'.*userinfo$')(self.user_info_response)):
|
||||
yield None
|
||||
|
||||
def handle_logout(self, app, url):
|
||||
assert url.startswith('https://fcp.integ01.dev-franceconnect.fr/api/v1/logout')
|
||||
parsed_url = urlparse.urlparse(url)
|
||||
query = QueryDict(parsed_url.query)
|
||||
assert_equals_url(query['post_logout_redirect_uri'], 'http://testserver' + reverse('fc-logout'))
|
||||
assert query['state']
|
||||
self.state = query['state']
|
||||
return app.get(reverse('fc-logout') + '?state=' + self.state)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def hooks(settings):
|
||||
if hasattr(settings, 'A2_HOOKS'):
|
||||
hooks = settings.A2_HOOKS
|
||||
else:
|
||||
hooks = settings.A2_HOOKS = {}
|
||||
hook = hooks['__all__'] = AllHook()
|
||||
yield hook
|
||||
hook.clear()
|
||||
del settings.A2_HOOKS['__all__']
|
||||
def franceconnect(settings, service):
|
||||
settings.A2_FC_ENABLE = True
|
||||
settings.A2_FC_CLIENT_ID = CLIENT_ID
|
||||
settings.A2_FC_CLIENT_SECRET = CLIENT_SECRET
|
||||
|
||||
Service.objects.create(name='portail', slug='portail')
|
||||
mock_object = FranceConnectMock()
|
||||
with mock_object():
|
||||
yield mock_object
|
||||
|
|
|
@ -15,84 +15,36 @@
|
|||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import pytest
|
||||
import re
|
||||
import httmock
|
||||
import mock
|
||||
import json
|
||||
import base64
|
||||
from jwcrypto import jwk, jwt
|
||||
import datetime
|
||||
import mock
|
||||
|
||||
import requests
|
||||
|
||||
from django.contrib.auth import get_user_model
|
||||
from django.urls import reverse
|
||||
from django.utils.encoding import force_text
|
||||
from django.utils.six.moves.urllib import parse as urlparse
|
||||
from django.utils.timezone import now
|
||||
|
||||
from authentic2.models import Service
|
||||
|
||||
from authentic2_auth_fc import models
|
||||
from authentic2_auth_fc.utils import requests_retry_session
|
||||
|
||||
from ..utils import login
|
||||
from ..utils import login, get_link_from_mail
|
||||
|
||||
|
||||
User = get_user_model()
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def service(db):
|
||||
return Service.objects.create(name='portail', slug='portail')
|
||||
|
||||
|
||||
def path(url):
|
||||
return urlparse.urlparse(url).path
|
||||
|
||||
|
||||
def get_links_from_mail(mail):
|
||||
'''Extract links from mail sent by Django'''
|
||||
return re.findall('https?://[^ \n]*', mail.body)
|
||||
|
||||
|
||||
def hmac_jwt(payload, key):
|
||||
header = {'alg': 'HS256'}
|
||||
k = jwk.JWK(
|
||||
kty='oct', k=force_text(base64.b64encode(key.encode('utf-8'))))
|
||||
t = jwt.JWT(header=header, claims=payload)
|
||||
t.make_signed_token(k)
|
||||
return t.serialize()
|
||||
|
||||
|
||||
def test_login_redirect(app, fc_settings):
|
||||
def test_login_redirect(app, franceconnect):
|
||||
url = reverse('fc-login-or-link')
|
||||
response = app.get(url, status=302)
|
||||
assert response['Location'].startswith('https://fcp.integ01')
|
||||
|
||||
|
||||
def check_authorization_url(url):
|
||||
callback = reverse('fc-login-or-link')
|
||||
assert url.startswith('https://fcp.integ01')
|
||||
query_string = url.split('?')[1]
|
||||
parsed = {x: y[0] for x, y in urlparse.parse_qs(query_string).items()}
|
||||
assert 'redirect_uri' in parsed
|
||||
assert callback in parsed['redirect_uri']
|
||||
assert 'client_id' in parsed
|
||||
assert parsed['client_id'] == 'xxx'
|
||||
assert 'scope' in parsed
|
||||
assert set(parsed['scope'].split()) == set(['openid', 'profile', 'email'])
|
||||
assert 'state' in parsed
|
||||
assert 'nonce' in parsed
|
||||
assert parsed['state'] == parsed['nonce']
|
||||
assert 'response_type' in parsed
|
||||
assert parsed['response_type'] == 'code'
|
||||
assert parsed['acr_values'] == 'eidas1'
|
||||
return parsed['state']
|
||||
|
||||
|
||||
def test_login_with_condition(app, fc_settings, settings):
|
||||
def test_login_with_condition(settings, app, franceconnect):
|
||||
# open the page first time so session cookie can be set
|
||||
response = app.get('/login/')
|
||||
assert 'fc-button' in response
|
||||
|
@ -105,225 +57,135 @@ def test_login_with_condition(app, fc_settings, settings):
|
|||
assert 'fc-button' not in response
|
||||
|
||||
|
||||
def test_login_autorun(app, fc_settings, settings):
|
||||
def test_login_autorun(settings, app, franceconnect):
|
||||
# hide password block
|
||||
settings.AUTH_FRONTENDS_KWARGS = {'password': {'show_condition': 'remote_addr==\'0.0.0.0\''}}
|
||||
response = app.get('/login/')
|
||||
assert response['Location'] == reverse('fc-login-or-link')
|
||||
|
||||
|
||||
@pytest.mark.parametrize('exp', [now() + datetime.timedelta(seconds=1000),
|
||||
now() - datetime.timedelta(seconds=1000)])
|
||||
def test_login_simple(app, fc_settings, caplog, hooks, exp):
|
||||
def test_no_create(app, franceconnect):
|
||||
response = app.get('/login/?service=portail&next=/idp/')
|
||||
response = response.click(href='callback')
|
||||
location = response['Location']
|
||||
state = check_authorization_url(location)
|
||||
|
||||
@httmock.urlmatch(path=r'.*/token$')
|
||||
def access_token_response(url, request):
|
||||
parsed = {x: y[0] for x, y in urlparse.parse_qs(request.body).items()}
|
||||
assert set(parsed.keys()) == set(['code', 'client_id', 'client_secret', 'redirect_uri',
|
||||
'grant_type'])
|
||||
assert parsed['code'] == 'zzz'
|
||||
assert parsed['client_id'] == 'xxx'
|
||||
assert parsed['client_secret'] == 'yyy'
|
||||
assert parsed['grant_type'] == 'authorization_code'
|
||||
parsed_redirect = urlparse.urlparse(parsed['redirect_uri'])
|
||||
parsed_callback = urlparse.urlparse(callback)
|
||||
assert parsed_redirect.path == parsed_callback.path
|
||||
for cb_key, cb_value in urlparse.parse_qs(parsed_callback.query).items():
|
||||
urlparse.parse_qs(parsed_redirect.query)[cb_key] == cb_value
|
||||
id_token = {
|
||||
'sub': '1234',
|
||||
'aud': 'xxx',
|
||||
'nonce': state,
|
||||
'exp': int(exp.timestamp()),
|
||||
'iss': 'https://fcp.integ01.dev-franceconnect.fr/',
|
||||
}
|
||||
return json.dumps({
|
||||
'access_token': 'uuu',
|
||||
'id_token': hmac_jwt(id_token, 'yyy')
|
||||
})
|
||||
|
||||
@httmock.urlmatch(path=r'.*userinfo$')
|
||||
def user_info_response(url, request):
|
||||
assert request.headers['Authorization'] == 'Bearer uuu'
|
||||
return json.dumps({
|
||||
'sub': '1234',
|
||||
'family_name': u'Frédérique',
|
||||
'given_name': u'Ÿuñe',
|
||||
})
|
||||
|
||||
callback = reverse('fc-login-or-link')
|
||||
with httmock.HTTMock(access_token_response, user_info_response):
|
||||
response = app.get(callback + '?service=portail&next=/idp/&code=zzz&state=%s' % state, status=302)
|
||||
franceconnect.handle_authorization(app, response.location, status=302)
|
||||
assert User.objects.count() == 0
|
||||
fc_settings.A2_FC_CREATE = True
|
||||
with httmock.HTTMock(access_token_response, user_info_response):
|
||||
response = app.get(callback + '?service=portail&next=/idp/&code=zzz&state=%s' % state, status=302)
|
||||
if exp < now():
|
||||
assert User.objects.count() == 0
|
||||
else:
|
||||
assert User.objects.count() == 1
|
||||
if User.objects.count():
|
||||
user = User.objects.get()
|
||||
assert user.verified_attributes.first_name == u'Ÿuñe'
|
||||
assert user.verified_attributes.last_name == u'Frédérique'
|
||||
assert path(response['Location']) == '/idp/'
|
||||
assert hooks.event[1]['kwargs']['name'] == 'login'
|
||||
assert hooks.event[1]['kwargs']['service'] == 'portail'
|
||||
# we must be connected
|
||||
assert app.session['_auth_user_id']
|
||||
assert app.session.get_expire_at_browser_close()
|
||||
assert models.FcAccount.objects.count() == 1
|
||||
|
||||
# test unlink cancel case
|
||||
response = app.get('/accounts/')
|
||||
response = response.click('Delete link')
|
||||
assert len(response.pyquery('[name=cancel][formnovalidate]')) == 1
|
||||
response = response.form.submit(name='cancel')
|
||||
response = response.follow()
|
||||
|
||||
# test unlink submit case
|
||||
response = app.get('/accounts/')
|
||||
response = response.click('Delete link')
|
||||
response.form.set('new_password1', 'ikKL1234')
|
||||
response.form.set('new_password2', 'ikKL1234')
|
||||
response = response.form.submit(name='unlink')
|
||||
assert 'The link with the FranceConnect account has been deleted' in response.text
|
||||
assert models.FcAccount.objects.count() == 0
|
||||
continue_url = response.pyquery('a#a2-continue').attr['href']
|
||||
state = urlparse.parse_qs(urlparse.urlparse(continue_url).query)['state'][0]
|
||||
assert app.session['fc_states'][state]['next'] == '/accounts/'
|
||||
response = app.get(reverse('fc-logout') + '?state=' + state)
|
||||
assert path(response['Location']) == '/accounts/'
|
||||
|
||||
|
||||
def test_login_email_is_unique(app, fc_settings, caplog):
|
||||
callback = reverse('fc-login-or-link')
|
||||
response = app.get(callback, status=302)
|
||||
location = response['Location']
|
||||
state = check_authorization_url(location)
|
||||
def test_create(settings, app, franceconnect, hooks):
|
||||
# test direct creation
|
||||
settings.A2_FC_CREATE = True
|
||||
|
||||
@httmock.urlmatch(path=r'.*/token$')
|
||||
def access_token_response(url, request):
|
||||
parsed = {x: y[0] for x, y in urlparse.parse_qs(request.body).items()}
|
||||
assert set(parsed.keys()) == set(['code', 'client_id', 'client_secret', 'redirect_uri',
|
||||
'grant_type'])
|
||||
assert parsed['code'] == 'zzz'
|
||||
assert parsed['client_id'] == 'xxx'
|
||||
assert parsed['client_secret'] == 'yyy'
|
||||
assert parsed['grant_type'] == 'authorization_code'
|
||||
parsed_redirect = urlparse.urlparse(parsed['redirect_uri'])
|
||||
parsed_callback = urlparse.urlparse(callback)
|
||||
assert parsed_redirect.path == parsed_callback.path
|
||||
for cb_key, cb_value in urlparse.parse_qs(parsed_callback.query).items():
|
||||
urlparse.parse_qs(parsed_redirect.query)[cb_key] == cb_value
|
||||
exp = now() + datetime.timedelta(seconds=1000)
|
||||
id_token = {
|
||||
'sub': '1234',
|
||||
'aud': 'xxx',
|
||||
'nonce': state,
|
||||
'exp': int(exp.timestamp()),
|
||||
'iss': 'https://fcp.integ01.dev-franceconnect.fr/',
|
||||
}
|
||||
return json.dumps({
|
||||
'access_token': 'uuu',
|
||||
'id_token': hmac_jwt(id_token, 'yyy')
|
||||
})
|
||||
response = app.get('/login/?service=portail&next=/idp/')
|
||||
response = response.click(href='callback')
|
||||
|
||||
@httmock.urlmatch(path=r'.*userinfo$')
|
||||
def user_info_response(url, request):
|
||||
assert request.headers['Authorization'] == 'Bearer uuu'
|
||||
return json.dumps({
|
||||
'sub': '1234',
|
||||
'family_name': u'Frédérique',
|
||||
'given_name': u'Ÿuñe',
|
||||
'email': 'jOhn.dOe@eXample.com',
|
||||
})
|
||||
assert User.objects.count() == 0
|
||||
response = franceconnect.handle_authorization(app, response.location, status=302)
|
||||
assert User.objects.count() == 1
|
||||
|
||||
user = User.objects.create(email='john.doe@example.com', first_name='John', last_name='Doe')
|
||||
user = User.objects.get()
|
||||
assert user.verified_attributes.first_name == 'Ÿuñe'
|
||||
assert user.verified_attributes.last_name == 'Frédérique'
|
||||
assert path(response['Location']) == '/idp/'
|
||||
assert hooks.event[1]['kwargs']['name'] == 'login'
|
||||
assert hooks.event[1]['kwargs']['service'] == 'portail'
|
||||
# we must be connected
|
||||
assert app.session['_auth_user_id']
|
||||
assert app.session.get_expire_at_browser_close()
|
||||
assert models.FcAccount.objects.count() == 1
|
||||
|
||||
# test unlink cancel case
|
||||
response = app.get('/accounts/')
|
||||
response = response.click('Delete link')
|
||||
assert len(response.pyquery('[name=cancel][formnovalidate]')) == 1
|
||||
response = response.form.submit(name='cancel')
|
||||
response = response.follow()
|
||||
|
||||
# test unlink submit case
|
||||
response = app.get('/accounts/')
|
||||
response = response.click('Delete link')
|
||||
response.form.set('new_password1', 'ikKL1234')
|
||||
response.form.set('new_password2', 'ikKL1234')
|
||||
response = response.form.submit(name='unlink')
|
||||
assert 'The link with the FranceConnect account has been deleted' in response.text
|
||||
assert models.FcAccount.objects.count() == 0
|
||||
continue_url = response.pyquery('a#a2-continue').attr['href']
|
||||
state = urlparse.parse_qs(urlparse.urlparse(continue_url).query)['state'][0]
|
||||
assert app.session['fc_states'][state]['next'] == '/accounts/'
|
||||
response = app.get(reverse('fc-logout') + '?state=' + state)
|
||||
assert path(response['Location']) == '/accounts/'
|
||||
|
||||
|
||||
def test_create_expired(settings, app, franceconnect, hooks):
|
||||
# test direct creation failure on an expired id_token
|
||||
settings.A2_FC_CREATE = True
|
||||
franceconnect.exp = now() - datetime.timedelta(seconds=30)
|
||||
|
||||
response = app.get('/login/?service=portail&next=/idp/')
|
||||
response = response.click(href='callback')
|
||||
|
||||
assert User.objects.count() == 0
|
||||
response = franceconnect.handle_authorization(app, response.location, status=302)
|
||||
assert User.objects.count() == 0
|
||||
|
||||
|
||||
def test_login_email_is_unique(settings, app, franceconnect, caplog):
|
||||
settings.A2_EMAIL_IS_UNIQUE = True
|
||||
user = User(
|
||||
email='john.doe@example.com',
|
||||
first_name='John',
|
||||
last_name='Doe')
|
||||
user.set_password('toto')
|
||||
user.save()
|
||||
fc_settings.A2_EMAIL_IS_UNIQUE = True
|
||||
with httmock.HTTMock(access_token_response, user_info_response):
|
||||
response = app.get(callback + '?code=zzz&state=%s' % state, status=302)
|
||||
franceconnect.user_info['email'] = user.email
|
||||
|
||||
assert User.objects.count() == 1
|
||||
assert app.session['_auth_user_id']
|
||||
franceconnect.login_with_fc_fixed_params(app)
|
||||
assert User.objects.count() == 1
|
||||
assert app.session['_auth_user_id'] == str(user.pk)
|
||||
|
||||
# logout, test unlinking when logging with password
|
||||
app.session.flush()
|
||||
response = app.get('/login/')
|
||||
response.form.set('username', User.objects.get().email)
|
||||
response.form.set('password', 'toto')
|
||||
response = response.form.submit(name='login-password-submit').follow()
|
||||
|
||||
response = app.get('/accounts/')
|
||||
def test_unlink_after_login_with_password(app, franceconnect, simple_user):
|
||||
models.FcAccount.objects.create(user=simple_user, user_info='{}')
|
||||
|
||||
response = login(app, simple_user, path='/accounts/')
|
||||
response = response.click('Delete link')
|
||||
assert 'new_password1' not in response.form.fields
|
||||
response = response.form.submit(name='unlink').follow()
|
||||
assert 'The link with the FranceConnect account has been deleted' in response.text
|
||||
# no logout from FC since we are not logged to it
|
||||
assert response.request.path == '/accounts/'
|
||||
|
||||
|
||||
def test_login_email_is_unique_and_already_linked(app, fc_settings, caplog):
|
||||
callback = reverse('fc-login-or-link')
|
||||
response = app.get(callback, status=302)
|
||||
location = response['Location']
|
||||
state = check_authorization_url(location)
|
||||
def test_unlink_after_login_with_fc(app, franceconnect, simple_user):
|
||||
models.FcAccount.objects.create(user=simple_user, sub=franceconnect.sub, user_info='{}')
|
||||
|
||||
EMAIL = 'john.doe@example.com'
|
||||
SUB = '1234'
|
||||
user = User.objects.create(email=EMAIL, first_name='John', last_name='Doe')
|
||||
response = franceconnect.login_with_fc(app, path='/accounts/')
|
||||
response = response.click('Delete link')
|
||||
response.form.set('new_password1', 'ikKL1234')
|
||||
response.form.set('new_password2', 'ikKL1234')
|
||||
response = response.form.submit(name='unlink')
|
||||
assert 'The link with the FranceConnect account has been deleted' in response.text
|
||||
assert models.FcAccount.objects.count() == 0
|
||||
continue_url = response.pyquery('a#a2-continue').attr['href']
|
||||
response = franceconnect.handle_logout(app, continue_url)
|
||||
assert path(response.location) == '/accounts/'
|
||||
|
||||
|
||||
def test_login_email_is_unique_and_already_linked(settings, app, franceconnect, caplog):
|
||||
settings.A2_EMAIL_IS_UNIQUE = True
|
||||
|
||||
# setup an already linked user account
|
||||
user = User.objects.create(email='john.doe@example.com', first_name='John', last_name='Doe')
|
||||
models.FcAccount.objects.create(user=user, sub='4567', token='xxx', user_info='{}')
|
||||
|
||||
@httmock.urlmatch(path=r'.*/token$')
|
||||
def access_token_response(url, request):
|
||||
parsed = {x: y[0] for x, y in urlparse.parse_qs(request.body).items()}
|
||||
assert set(parsed.keys()) == set(['code', 'client_id', 'client_secret', 'redirect_uri',
|
||||
'grant_type'])
|
||||
assert parsed['code'] == 'zzz'
|
||||
assert parsed['client_id'] == 'xxx'
|
||||
assert parsed['client_secret'] == 'yyy'
|
||||
assert parsed['grant_type'] == 'authorization_code'
|
||||
parsed_redirect = urlparse.urlparse(parsed['redirect_uri'])
|
||||
parsed_callback = urlparse.urlparse(callback)
|
||||
assert parsed_redirect.path == parsed_callback.path
|
||||
for cb_key, cb_value in urlparse.parse_qs(parsed_callback.query).items():
|
||||
urlparse.parse_qs(parsed_redirect.query)[cb_key] == cb_value
|
||||
exp = now() + datetime.timedelta(seconds=1000)
|
||||
id_token = {
|
||||
'sub': SUB,
|
||||
'aud': 'xxx',
|
||||
'nonce': state,
|
||||
'exp': int(exp.timestamp()),
|
||||
'iss': 'https://fcp.integ01.dev-franceconnect.fr/',
|
||||
}
|
||||
return json.dumps({
|
||||
'access_token': 'uuu',
|
||||
'id_token': hmac_jwt(id_token, 'yyy')
|
||||
})
|
||||
|
||||
@httmock.urlmatch(path=r'.*userinfo$')
|
||||
def user_info_response(url, request):
|
||||
assert request.headers['Authorization'] == 'Bearer uuu'
|
||||
return json.dumps({
|
||||
'sub': '1234',
|
||||
'family_name': u'Frédérique',
|
||||
'given_name': u'Ÿuñe',
|
||||
'email': EMAIL,
|
||||
})
|
||||
|
||||
fc_settings.A2_EMAIL_IS_UNIQUE = True
|
||||
with httmock.HTTMock(access_token_response, user_info_response):
|
||||
response = app.get(callback + '?code=zzz&state=%s' % state, status=302)
|
||||
assert 'is already used' in str(response)
|
||||
assert User.objects.count() == 1
|
||||
response = app.get('/login/?service=portail&next=/idp/')
|
||||
response = response.click(href='callback')
|
||||
response = franceconnect.handle_authorization(app, response.location, status=302)
|
||||
assert models.FcAccount.objects.count() == 1
|
||||
assert 'is already used' in app.cookies['messages']
|
||||
assert '_auth_user_id' not in app.session
|
||||
|
||||
|
||||
def test_requests_proxies_support(app, fc_settings, caplog):
|
||||
def test_requests_proxies_support(settings, app):
|
||||
session = requests_retry_session()
|
||||
assert session.proxies == {}
|
||||
other_session = requests.Session()
|
||||
|
@ -331,7 +193,8 @@ def test_requests_proxies_support(app, fc_settings, caplog):
|
|||
session = requests_retry_session(session=other_session)
|
||||
assert session is other_session
|
||||
assert session.proxies == {'http': 'http://example.net'}
|
||||
fc_settings.REQUESTS_PROXIES = {'https': 'http://pubproxy.com/api/proxy'}
|
||||
|
||||
settings.REQUESTS_PROXIES = {'https': 'http://pubproxy.com/api/proxy'}
|
||||
session = requests_retry_session()
|
||||
assert session.proxies == {'https': 'http://pubproxy.com/api/proxy'}
|
||||
|
||||
|
@ -341,95 +204,65 @@ def test_requests_proxies_support(app, fc_settings, caplog):
|
|||
assert mocked_send.call_args[1]['proxies'] == {'https': 'http://pubproxy.com/api/proxy'}
|
||||
|
||||
|
||||
def test_password_reset(app, mailoutbox):
|
||||
def test_no_password_with_fc_account_can_reset_password(app, db, mailoutbox):
|
||||
user = User.objects.create(email='john.doe@example.com')
|
||||
# No FC account, forbidden to set a password
|
||||
response = app.get('/login/')
|
||||
response = response.click('Reset it!').maybe_follow()
|
||||
response.form['email'] = user.email
|
||||
assert len(mailoutbox) == 0
|
||||
response = response.form.submit()
|
||||
assert len(mailoutbox) == 1
|
||||
url = get_links_from_mail(mailoutbox[0])[0]
|
||||
url = get_link_from_mail(mailoutbox[0])
|
||||
response = app.get(url).follow().follow()
|
||||
assert '_auth_user_id' not in app.session
|
||||
assert 'not possible to reset' in response
|
||||
|
||||
# With FC account, can set a password
|
||||
models.FcAccount.objects.create(user=user, sub='xxx', token='aaa')
|
||||
response = app.get(url).maybe_follow()
|
||||
assert 'new_password1' in response.form.fields
|
||||
response = app.get('/login/')
|
||||
response = response.click('Reset it!').maybe_follow()
|
||||
response.form['email'] = user.email
|
||||
assert len(mailoutbox) == 1
|
||||
response = response.form.submit()
|
||||
assert len(mailoutbox) == 2
|
||||
url = get_link_from_mail(mailoutbox[1])
|
||||
response = app.get(url, status=200)
|
||||
response.form.set('new_password1', 'ikKL1234')
|
||||
response.form.set('new_password2', 'ikKL1234')
|
||||
response = response.form.submit().follow()
|
||||
assert '_auth_user_id' in app.session
|
||||
|
||||
|
||||
def test_registration1(app, fc_settings, caplog, hooks):
|
||||
exp = now() + datetime.timedelta(seconds=1000)
|
||||
response = app.get('/login/?service=portail&next=/idp/')
|
||||
response = response.click(href="callback")
|
||||
# 1. Try a login
|
||||
# 2. Verify we come back to login page
|
||||
# 3. Check presence of registration link
|
||||
# 4. Follow it
|
||||
location = response['Location']
|
||||
state = check_authorization_url(location)
|
||||
|
||||
@httmock.urlmatch(path=r'.*/token$')
|
||||
def access_token_response(url, request):
|
||||
parsed = {x: y[0] for x, y in urlparse.parse_qs(request.body).items()}
|
||||
assert set(parsed.keys()) == set(['code', 'client_id', 'client_secret', 'redirect_uri',
|
||||
'grant_type'])
|
||||
assert parsed['code'] == 'zzz'
|
||||
assert parsed['client_id'] == 'xxx'
|
||||
assert parsed['client_secret'] == 'yyy'
|
||||
assert parsed['grant_type'] == 'authorization_code'
|
||||
parsed_redirect = urlparse.urlparse(parsed['redirect_uri'])
|
||||
parsed_callback = urlparse.urlparse(callback)
|
||||
assert parsed_redirect.path == parsed_callback.path
|
||||
for cb_key, cb_value in urlparse.parse_qs(parsed_callback.query).items():
|
||||
urlparse.parse_qs(parsed_redirect.query)[cb_key] == cb_value
|
||||
id_token = {
|
||||
'sub': '1234',
|
||||
'aud': 'xxx',
|
||||
'nonce': state,
|
||||
'exp': int(exp.timestamp()),
|
||||
'iss': 'https://fcp.integ01.dev-franceconnect.fr/',
|
||||
'email': 'john.doe@example.com',
|
||||
}
|
||||
return json.dumps({
|
||||
'access_token': 'uuu',
|
||||
'id_token': hmac_jwt(id_token, 'yyy')
|
||||
})
|
||||
|
||||
@httmock.urlmatch(path=r'.*userinfo$')
|
||||
def user_info_response(url, request):
|
||||
assert request.headers['Authorization'] == 'Bearer uuu'
|
||||
return json.dumps({
|
||||
'sub': '1234',
|
||||
'family_name': u'Frédérique',
|
||||
'given_name': u'Ÿuñe',
|
||||
'email': 'john.doe@example.com',
|
||||
})
|
||||
|
||||
callback = urlparse.parse_qs(urlparse.urlparse(location).query)['redirect_uri'][0]
|
||||
with httmock.HTTMock(access_token_response, user_info_response):
|
||||
response = app.get(callback + '&code=zzz&state=%s' % state, status=302)
|
||||
def test_registration1(settings, app, franceconnect, caplog, hooks):
|
||||
response = franceconnect.login_with_fc_fixed_params(app)
|
||||
assert User.objects.count() == 0
|
||||
assert path(response['Location']) == '/login/'
|
||||
assert path(response.location) == '/login/'
|
||||
response = response.follow()
|
||||
response = response.click(href='/accounts/fc/register')
|
||||
location = response['Location']
|
||||
location.startswith('http://testserver/accounts/activate/')
|
||||
response.location.startswith('http://testserver/accounts/activate/')
|
||||
assert User.objects.count() == 0
|
||||
response = response.follow()
|
||||
assert hooks.calls['event'][0]['kwargs']['service'] == 'portail'
|
||||
assert response.location.startswith('/fc/callback/')
|
||||
# a new user has been created
|
||||
assert User.objects.count() == 1
|
||||
# but no FcAccount
|
||||
assert models.FcAccount.objects.count() == 0
|
||||
# we must be connected
|
||||
assert app.session['_auth_user_id']
|
||||
parsed_location = urlparse.urlparse(response['Location'])
|
||||
parsed_callback = urlparse.urlparse(callback)
|
||||
assert parsed_location.path == parsed_callback.path
|
||||
assert (urlparse.parse_qs(parsed_location.query) ==
|
||||
urlparse.parse_qs(parsed_callback.query))
|
||||
# hook must have been called
|
||||
assert hooks.calls['event'][0]['kwargs']['service'] == 'portail'
|
||||
|
||||
response = response.follow()
|
||||
location = response['Location']
|
||||
state = check_authorization_url(location)
|
||||
with httmock.HTTMock(access_token_response, user_info_response):
|
||||
response = app.get(callback + '&code=zzz&state=%s' % state, status=302)
|
||||
# a new redirect to FC is done
|
||||
response = franceconnect.handle_authorization(app, response.location)
|
||||
|
||||
# FcAccount now exists
|
||||
assert models.FcAccount.objects.count() == 1
|
||||
user = User.objects.get()
|
||||
assert user.verified_attributes.first_name == u'Ÿuñe'
|
||||
assert user.verified_attributes.last_name == u'Frédérique'
|
||||
assert user.verified_attributes.first_name == 'Ÿuñe'
|
||||
assert user.verified_attributes.last_name == 'Frédérique'
|
||||
|
||||
response = app.get('/accounts/')
|
||||
response = response.click('Delete link')
|
||||
response.form.set('new_password1', 'ikKL1234')
|
||||
|
@ -438,214 +271,60 @@ def test_registration1(app, fc_settings, caplog, hooks):
|
|||
assert 'The link with the FranceConnect account has been deleted' in response.text
|
||||
assert models.FcAccount.objects.count() == 0
|
||||
continue_url = response.pyquery('a#a2-continue').attr['href']
|
||||
state = urlparse.parse_qs(urlparse.urlparse(continue_url).query)['state'][0]
|
||||
assert app.session['fc_states'][state]['next'] == '/accounts/'
|
||||
response = app.get(reverse('fc-logout') + '?state=' + state)
|
||||
assert path(response['Location']) == '/accounts/'
|
||||
response = franceconnect.handle_logout(app, continue_url)
|
||||
assert path(response.location) == '/accounts/'
|
||||
|
||||
|
||||
def test_registration2(app, fc_settings, caplog, hooks):
|
||||
exp = now() + datetime.timedelta(seconds=1000)
|
||||
def test_registration2(settings, app, franceconnect, hooks):
|
||||
response = app.get('/login/?service=portail&next=/idp/')
|
||||
response = response.click("Register")
|
||||
response = response.click(href='callback')
|
||||
# 1. Try a login
|
||||
# 2. Verify we come back to login page
|
||||
# 3. Check presence of registration link
|
||||
# 4. Follow it
|
||||
location = response['Location']
|
||||
state = check_authorization_url(location)
|
||||
franceconnect.callback_params['registration'] = ''
|
||||
response = franceconnect.handle_authorization(app, response.location)
|
||||
|
||||
@httmock.urlmatch(path=r'.*/token$')
|
||||
def access_token_response(url, request):
|
||||
parsed = {x: y[0] for x, y in urlparse.parse_qs(request.body).items()}
|
||||
assert set(parsed.keys()) == set(['code', 'client_id', 'client_secret', 'redirect_uri',
|
||||
'grant_type'])
|
||||
assert parsed['code'] == 'zzz'
|
||||
assert parsed['client_id'] == 'xxx'
|
||||
assert parsed['client_secret'] == 'yyy'
|
||||
assert parsed['grant_type'] == 'authorization_code'
|
||||
parsed_redirect = urlparse.urlparse(parsed['redirect_uri'])
|
||||
parsed_callback = urlparse.urlparse(callback)
|
||||
assert parsed_redirect.path == parsed_callback.path
|
||||
for cb_key, cb_value in urlparse.parse_qs(parsed_callback.query).items():
|
||||
urlparse.parse_qs(parsed_redirect.query)[cb_key] == cb_value
|
||||
id_token = {
|
||||
'sub': '1234',
|
||||
'aud': 'xxx',
|
||||
'nonce': state,
|
||||
'exp': int(exp.timestamp()),
|
||||
'iss': 'https://fcp.integ01.dev-franceconnect.fr/',
|
||||
'email': 'john.doe@example.com',
|
||||
}
|
||||
return json.dumps({
|
||||
'access_token': 'uuu',
|
||||
'id_token': hmac_jwt(id_token, 'yyy')
|
||||
})
|
||||
|
||||
@httmock.urlmatch(path=r'.*userinfo$')
|
||||
def user_info_response(url, request):
|
||||
assert request.headers['Authorization'] == 'Bearer uuu'
|
||||
return json.dumps({
|
||||
'sub': '1234',
|
||||
'family_name': u'Frédérique',
|
||||
'given_name': u'Ÿuñe',
|
||||
'email': 'john.doe@example.com',
|
||||
})
|
||||
|
||||
callback = urlparse.parse_qs(urlparse.urlparse(location).query)['redirect_uri'][0]
|
||||
with httmock.HTTMock(access_token_response, user_info_response):
|
||||
response = app.get(callback + '&code=zzz&state=%s' % state, status=302)
|
||||
assert User.objects.count() == 0
|
||||
assert path(response['Location']) == '/accounts/fc/register/'
|
||||
assert path(response.location) == '/accounts/fc/register/'
|
||||
response = response.follow()
|
||||
location = response['Location']
|
||||
location.startswith('http://testserver/accounts/activate/')
|
||||
response.location.startswith('http://testserver/accounts/activate/')
|
||||
response = response.follow()
|
||||
assert User.objects.count() == 1
|
||||
user = User.objects.get()
|
||||
assert user.verified_attributes.first_name is None
|
||||
assert user.verified_attributes.last_name is None
|
||||
assert hooks.calls['event'][0]['kwargs']['service'] == 'portail'
|
||||
assert hooks.calls['event'][1]['kwargs']['service'] == 'portail'
|
||||
# we must be connected
|
||||
assert app.session['_auth_user_id']
|
||||
# remove the registration parameter
|
||||
callback = callback.replace('®istration=', '')
|
||||
callback = callback.replace('?registration=', '?')
|
||||
callback = callback.replace('?&', '?')
|
||||
parsed_location = urlparse.urlparse(response['Location'])
|
||||
parsed_callback = urlparse.urlparse(callback)
|
||||
assert parsed_location.path == parsed_callback.path
|
||||
assert (urlparse.parse_qs(parsed_location.query) ==
|
||||
urlparse.parse_qs(parsed_callback.query))
|
||||
response = response.follow()
|
||||
location = response['Location']
|
||||
state = check_authorization_url(location)
|
||||
with httmock.HTTMock(access_token_response, user_info_response):
|
||||
response = app.get(callback + '&code=zzz&state=%s' % state, status=302)
|
||||
assert models.FcAccount.objects.count() == 1
|
||||
|
||||
del franceconnect.callback_params['registration']
|
||||
response = franceconnect.handle_authorization(app, response.location)
|
||||
user = User.objects.get()
|
||||
assert user.verified_attributes.first_name == u'Ÿuñe'
|
||||
assert user.verified_attributes.last_name == u'Frédérique'
|
||||
response = app.get('/accounts/')
|
||||
response = response.click('Delete link')
|
||||
response.form.set('new_password1', 'ikKL1234')
|
||||
response.form.set('new_password2', 'ikKL1234')
|
||||
response = response.form.submit(name='unlink')
|
||||
assert 'The link with the FranceConnect account has been deleted' in response.text
|
||||
assert models.FcAccount.objects.count() == 0
|
||||
continue_url = response.pyquery('a#a2-continue').attr['href']
|
||||
state = urlparse.parse_qs(urlparse.urlparse(continue_url).query)['state'][0]
|
||||
assert app.session['fc_states'][state]['next'] == '/accounts/'
|
||||
response = app.get(reverse('fc-logout') + '?state=' + state)
|
||||
assert path(response['Location']) == '/accounts/'
|
||||
assert user.verified_attributes.first_name == 'Ÿuñe'
|
||||
assert user.verified_attributes.last_name == 'Frédérique'
|
||||
|
||||
|
||||
def test_can_change_password(app, fc_settings, caplog, hooks):
|
||||
exp = now() + datetime.timedelta(seconds=1000)
|
||||
response = app.get('/login/?service=portail&next=/idp/')
|
||||
response = response.click("Register")
|
||||
response = response.click(href='callback')
|
||||
# 1. Try a login
|
||||
# 2. Verify we come back to login page
|
||||
# 3. Check presence of registration link
|
||||
# 4. Follow it
|
||||
location = response['Location']
|
||||
state = check_authorization_url(location)
|
||||
def test_can_change_password(settings, app, franceconnect):
|
||||
user = User.objects.create(email='john.doe@example.com')
|
||||
models.FcAccount.objects.create(user=user, sub=franceconnect.sub)
|
||||
|
||||
@httmock.urlmatch(path=r'.*/token$')
|
||||
def access_token_response(url, request):
|
||||
parsed = {x: y[0] for x, y in urlparse.parse_qs(request.body).items()}
|
||||
assert set(parsed.keys()) == set(['code', 'client_id', 'client_secret', 'redirect_uri',
|
||||
'grant_type'])
|
||||
assert parsed['code'] == 'zzz'
|
||||
assert parsed['client_id'] == 'xxx'
|
||||
assert parsed['client_secret'] == 'yyy'
|
||||
assert parsed['grant_type'] == 'authorization_code'
|
||||
parsed_redirect = urlparse.urlparse(parsed['redirect_uri'])
|
||||
parsed_callback = urlparse.urlparse(callback)
|
||||
assert parsed_redirect.path == parsed_callback.path
|
||||
for cb_key, cb_value in urlparse.parse_qs(parsed_callback.query).items():
|
||||
urlparse.parse_qs(parsed_redirect.query)[cb_key] == cb_value
|
||||
id_token = {
|
||||
'sub': '1234',
|
||||
'aud': 'xxx',
|
||||
'nonce': state,
|
||||
'exp': int(exp.timestamp()),
|
||||
'iss': 'https://fcp.integ01.dev-franceconnect.fr/',
|
||||
'email': 'john.doe@example.com',
|
||||
}
|
||||
return json.dumps({
|
||||
'access_token': 'uuu',
|
||||
'id_token': hmac_jwt(id_token, 'yyy')
|
||||
})
|
||||
|
||||
@httmock.urlmatch(path=r'.*userinfo$')
|
||||
def user_info_response(url, request):
|
||||
assert request.headers['Authorization'] == 'Bearer uuu'
|
||||
return json.dumps({
|
||||
'sub': '1234',
|
||||
'family_name': u'Frédérique',
|
||||
'given_name': u'Ÿuñe',
|
||||
'email': 'john.doe@example.com',
|
||||
})
|
||||
|
||||
callback = urlparse.parse_qs(urlparse.urlparse(location).query)['redirect_uri'][0]
|
||||
with httmock.HTTMock(access_token_response, user_info_response):
|
||||
response = app.get(callback + '&code=zzz&state=%s' % state, status=302)
|
||||
assert User.objects.count() == 0
|
||||
assert path(response['Location']) == '/accounts/fc/register/'
|
||||
response = response.follow()
|
||||
location = response['Location']
|
||||
location.startswith('http://testserver/accounts/activate/')
|
||||
response = response.follow()
|
||||
assert hooks.calls['event'][0]['kwargs']['service'] == 'portail'
|
||||
assert hooks.calls['event'][1]['kwargs']['service'] == 'portail'
|
||||
# we must be connected
|
||||
assert app.session['_auth_user_id']
|
||||
# remove the registration parameter
|
||||
callback = callback.replace('®istration=', '')
|
||||
callback = callback.replace('?registration=', '?')
|
||||
callback = callback.replace('?&', '?')
|
||||
parsed_location = urlparse.urlparse(response['Location'])
|
||||
parsed_callback = urlparse.urlparse(callback)
|
||||
assert parsed_location.path == parsed_callback.path
|
||||
assert (urlparse.parse_qs(parsed_location.query) ==
|
||||
urlparse.parse_qs(parsed_callback.query))
|
||||
response = response.follow()
|
||||
location = response['Location']
|
||||
state = check_authorization_url(location)
|
||||
with httmock.HTTMock(access_token_response, user_info_response):
|
||||
response = app.get(callback + '&code=zzz&state=%s' % state, status=302)
|
||||
assert models.FcAccount.objects.count() == 1
|
||||
user = User.objects.get()
|
||||
assert user.verified_attributes.first_name == u'Ÿuñe'
|
||||
assert user.verified_attributes.last_name == u'Frédérique'
|
||||
response = app.get('/accounts/')
|
||||
response = franceconnect.login_with_fc(app, path='/accounts/')
|
||||
assert len(response.pyquery('[href*="password/change"]')) == 0
|
||||
response = response.click('Logout')
|
||||
response = franceconnect.handle_logout(app, response.location).follow()
|
||||
assert '_auth_user_id' not in app.session
|
||||
|
||||
# Login with password
|
||||
user = User.objects.get()
|
||||
user.username = 'test'
|
||||
user.set_password('test')
|
||||
user.save()
|
||||
app.session.flush()
|
||||
response = app.get('/login/')
|
||||
response.form.set('username', User.objects.get().email)
|
||||
response.form.set('password', 'test')
|
||||
response = response.form.submit(name='login-password-submit').follow()
|
||||
response = app.get('/accounts/')
|
||||
|
||||
response = login(app, user, path='/accounts/')
|
||||
assert len(response.pyquery('[href*="password/change"]')) > 0
|
||||
response = response.click('Logout').follow()
|
||||
|
||||
# Relogin with FC
|
||||
app.session.flush()
|
||||
response = app.get('/login/?service=portail&next=/accounts/')
|
||||
response = response.click(href='callback')
|
||||
location = response['Location']
|
||||
state = check_authorization_url(location)
|
||||
callback = urlparse.parse_qs(urlparse.urlparse(location).query)['redirect_uri'][0]
|
||||
with httmock.HTTMock(access_token_response, user_info_response):
|
||||
response = app.get(callback + '&code=zzz&state=%s' % state, status=302)
|
||||
# we must be connected
|
||||
assert app.session['_auth_user_id']
|
||||
assert path(response['Location']) == '/accounts/'
|
||||
response = response.follow()
|
||||
response = franceconnect.login_with_fc(app, path='/accounts/')
|
||||
assert len(response.pyquery('[href*="password/change"]')) == 0
|
||||
|
||||
# Unlink
|
||||
|
@ -653,20 +332,17 @@ def test_can_change_password(app, fc_settings, caplog, hooks):
|
|||
response.form.set('new_password1', 'ikKL1234')
|
||||
response.form.set('new_password2', 'ikKL1234')
|
||||
response = response.form.submit(name='unlink')
|
||||
assert 'The link with the FranceConnect account has been deleted' in response.text
|
||||
continue_url = response.pyquery('a#a2-continue').attr['href']
|
||||
state = urlparse.parse_qs(urlparse.urlparse(continue_url).query)['state'][0]
|
||||
assert app.session['fc_states'][state]['next'] == '/accounts/'
|
||||
response = app.get(reverse('fc-logout') + '?state=' + state)
|
||||
assert path(response['Location']) == '/accounts/'
|
||||
response = response.follow()
|
||||
response = franceconnect.handle_logout(app, continue_url).follow()
|
||||
assert len(response.pyquery('[href*="password/change"]')) > 0
|
||||
|
||||
|
||||
def test_invalid_next_url(app, fc_settings, caplog, hooks):
|
||||
def test_invalid_next_url(app, franceconnect):
|
||||
assert app.get('/fc/callback/?code=coin&next=JJJ72QQQ').location == 'JJJ72QQQ'
|
||||
|
||||
|
||||
def test_manager_user_sidebar(app, fc_settings, superuser, simple_user):
|
||||
def test_manager_user_sidebar(app, superuser, simple_user):
|
||||
login(app, superuser, '/manage/')
|
||||
response = app.get('/manage/users/%s/' % simple_user.id)
|
||||
assert 'FranceConnect' not in response
|
||||
|
|
|
@ -18,8 +18,9 @@
|
|||
from authentic2_auth_fc.models import FcAccount
|
||||
|
||||
|
||||
def test_api_fc_unlink(app, admin, user_cartman):
|
||||
url = '/api/users/%s/fc-unlink/' % user_cartman.uuid
|
||||
def test_api_fc_unlink(app, admin, simple_user):
|
||||
FcAccount.objects.create(user=simple_user)
|
||||
url = '/api/users/%s/fc-unlink/' % simple_user.uuid
|
||||
# test unauthorized caller
|
||||
app.delete(url, status=401)
|
||||
# test unauthorized method
|
||||
|
@ -27,13 +28,14 @@ def test_api_fc_unlink(app, admin, user_cartman):
|
|||
app.get(url, status=405)
|
||||
# test success
|
||||
app.delete(url, status=204)
|
||||
assert FcAccount.objects.filter(user=user_cartman).exists() is False
|
||||
assert FcAccount.objects.filter(user=simple_user).exists() is False
|
||||
|
||||
|
||||
def test_api_user_franceconnect(settings, app, admin, user_cartman):
|
||||
def test_api_user_franceconnect(settings, app, admin, simple_user):
|
||||
settings.A2_FC_ENABLE = True
|
||||
FcAccount.objects.create(user=simple_user, sub='1234')
|
||||
|
||||
url = '/api/users/%s/' % user_cartman.uuid
|
||||
url = '/api/users/%s/' % simple_user.uuid
|
||||
# test unauthorized method
|
||||
app.authorization = ('Basic', (admin.username, admin.username))
|
||||
response = app.get(url)
|
||||
|
@ -48,7 +50,7 @@ def test_api_user_franceconnect(settings, app, admin, user_cartman):
|
|||
assert content.get('unlink_url').startswith('http://')
|
||||
assert content.get('unlink_url').endswith('/unlink/')
|
||||
|
||||
unlink_url = '/api/users/%s/fc-unlink/' % user_cartman.uuid
|
||||
unlink_url = '/api/users/%s/fc-unlink/' % simple_user.uuid
|
||||
app.delete(unlink_url, status=204)
|
||||
|
||||
response = app.get(url + '?full')
|
||||
|
|
Loading…
Reference in New Issue