tests: split auth_oidc manager tests (#85934)

This commit is contained in:
Benjamin Dauvergne 2024-01-23 21:46:53 +01:00
parent f13c7ca24e
commit eceb4b2424
7 changed files with 659 additions and 602 deletions

View File

View File

@ -0,0 +1,270 @@
# authentic2 - versatile identity manager
# Copyright (C) 2010-2019 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import importlib
import json
import random
import uuid
import py
import pytest
import responses
from django.contrib.auth import get_user_model
from jwcrypto.jwk import JWK, JWKSet
from authentic2.a2_rbac.models import OrganizationalUnit
from authentic2.a2_rbac.utils import get_default_ou
from authentic2.utils import crypto
from authentic2_auth_oidc.models import OIDCAccount, OIDCClaimMapping, OIDCProvider
from tests.utils import call_command, check_log
User = get_user_model()
def test_oidc_register_issuer(db, tmpdir, monkeypatch):
oidc_conf_f = py.path.local(__file__).dirpath('openid_configuration.json')
with oidc_conf_f.open() as f:
oidc_conf = json.load(f)
def register_issuer(
name,
client_id,
client_secret,
issuer=None,
openid_configuration=None,
verify=True,
timeout=None,
ou=None,
):
ou = OrganizationalUnit.objects.get(default=True)
# skipping jwkset retrieval in mocked function
jwkset = JWKSet()
jwkset.add(JWK.generate(kty='RSA', size=512, kid='auie'))
jwkset.add(JWK.generate(kty='EC', size=256, kid='tsrn'))
return OIDCProvider.objects.create(
name=name,
slug='test',
client_id='abc',
client_secret='def',
enabled=True,
ou=ou,
issuer=issuer,
strategy='create',
jwkset_json=jwkset.export(as_dict=True),
authorization_endpoint=openid_configuration['authorization_endpoint'],
token_endpoint=openid_configuration['token_endpoint'],
userinfo_endpoint=openid_configuration['userinfo_endpoint'],
end_session_endpoint=openid_configuration['end_session_endpoint'],
)
oidc_cmd = importlib.import_module('authentic2_auth_oidc.management.commands.oidc-register-issuer')
monkeypatch.setattr(oidc_cmd, 'register_issuer', register_issuer)
oidc_conf = py.path.local(__file__).dirpath('openid_configuration.json').strpath
call_command(
'oidc-register-issuer',
'--openid-configuration',
oidc_conf,
'--issuer',
'issuer',
'--client-id',
'auie',
'--client-secret',
'tsrn',
'somename',
)
provider = OIDCProvider.objects.get(name='somename')
assert provider.issuer == 'issuer'
@responses.activate
@pytest.mark.parametrize('deletion_number,deletion_valid', [(2, True), (5, True), (10, False)])
def test_oidc_sync_provider(
db, app, admin, settings, caplog, deletion_number, deletion_valid, nologtoconsole
):
oidc_provider = OIDCProvider.objects.create(
issuer='https://some.provider',
name='Some Provider',
slug='some-provider',
ou=get_default_ou(),
)
OIDCClaimMapping.objects.create(
authenticator=oidc_provider,
attribute='username',
idtoken_claim=False,
claim='username',
)
OIDCClaimMapping.objects.create(
authenticator=oidc_provider,
attribute='email',
idtoken_claim=False,
claim='email',
)
# last one, with an idtoken claim
OIDCClaimMapping.objects.create(
authenticator=oidc_provider,
attribute='last_name',
idtoken_claim=True,
claim='family_name',
)
# typo in template string
OIDCClaimMapping.objects.create(
authenticator=oidc_provider,
attribute='first_name',
idtoken_claim=True,
claim='given_name',
)
User = get_user_model()
for i in range(100):
user = User.objects.create(
first_name='John%s' % i,
last_name='Doe%s' % i,
username='john.doe.%s' % i,
email='john.doe.%s@ad.dre.ss' % i,
ou=get_default_ou(),
)
identifier = uuid.UUID(user.uuid).bytes
sector_identifier = 'some.provider'
cipher_args = [
settings.SECRET_KEY.encode('utf-8'),
identifier,
sector_identifier,
]
sub = crypto.aes_base64url_deterministic_encrypt(*cipher_args).decode('utf-8')
OIDCAccount.objects.create(user=user, provider=oidc_provider, sub=sub)
def synchronization_get_modified_response():
# randomized batch of modified users
modified_users = random.sample(list(User.objects.all()), 20)
results = []
for count, user in enumerate(modified_users):
user_json = user.to_json()
user_json['username'] = f'modified_{count}'
user_json['first_name'] = 'Mod'
user_json['last_name'] = 'Ified'
# mocking claim resolution by oidc provider
user_json['given_name'] = 'Mod'
user_json['family_name'] = 'Ified'
# add user sub to response
try:
account = OIDCAccount.objects.get(user=user)
except OIDCAccount.DoesNotExist:
pass
else:
user_json['sub'] = account.sub
results.append(user_json)
return {'results': results}
responses.post(
'https://some.provider/api/users/synchronization/',
json={
'unknown_uuids': [
account.sub for account in random.sample(list(OIDCAccount.objects.all()), deletion_number)
]
},
)
responses.get('https://some.provider/api/users/', json=synchronization_get_modified_response())
with check_log(caplog, 'no provider supporting synchronization'):
call_command('oidc-sync-provider', '-v1')
oidc_provider.a2_synchronization_supported = True
oidc_provider.save()
with check_log(caplog, 'no provider supporting synchronization'):
call_command('oidc-sync-provider', '--provider', 'whatever', '-v1')
with check_log(caplog, 'got 20 users'):
call_command('oidc-sync-provider', '-v1')
if deletion_valid:
# existing users check
assert OIDCAccount.objects.count() == 100 - deletion_number
else:
assert OIDCAccount.objects.count() == 100
assert caplog.records[3].levelname == 'ERROR'
assert 'deletion ratio is abnormally high' in caplog.records[3].message
# users update
assert User.objects.filter(username__startswith='modified').count() in range(20 - deletion_number, 21)
assert User.objects.filter(first_name='Mod', last_name='Ified').count() in range(20 - deletion_number, 21)
@responses.activate
def test_auth_oidc_refresh_jwkset_json(db, app, admin, settings, caplog):
jwkset_url = 'https://www.example.com/common/discovery/v3.0/keys'
kid_rsa = '123'
kid_ec = '456'
def generate_remote_jwkset_json():
key_rsa = JWK.generate(kty='RSA', size=512, kid=kid_rsa)
key_ec = JWK.generate(kty='EC', size=256, kid=kid_ec)
jwkset = JWKSet()
jwkset.add(key_rsa)
jwkset.add(key_ec)
return jwkset.export(as_dict=True)
responses.get(
jwkset_url,
json={
'headers': {
'content-type': 'application/json',
},
'status_code': 200,
**generate_remote_jwkset_json(),
},
)
issuer = ('https://www.example.com',)
provider = OIDCProvider.objects.create(
ou=get_default_ou(),
name='Foo',
slug='foo',
client_id='abc',
client_secret='def',
enabled=True,
issuer=issuer,
authorization_endpoint='%s/authorize' % issuer,
token_endpoint='%s/token' % issuer,
end_session_endpoint='%s/logout' % issuer,
userinfo_endpoint='%s/user_info' % issuer,
token_revocation_endpoint='%s/revoke' % issuer,
jwkset_url=jwkset_url,
idtoken_algo=OIDCProvider.ALGO_RSA,
claims_parameter_supported=False,
button_label='Connect with Foo',
)
assert {key['kid'] for key in provider.jwkset_json['keys']} == {'123', '456'}
kid_rsa = 'abcdefg'
kid_ec = 'hijklmn'
responses.get(
jwkset_url,
json={
'headers': {
'content-type': 'application/json',
},
'status_code': 200,
**generate_remote_jwkset_json(),
},
)
call_command('oidc-refresh-jwkset-json', '-v1')
provider.refresh_from_db()
assert {key['kid'] for key in provider.jwkset_json['keys']} == {'abcdefg', 'hijklmn'}

View File

@ -0,0 +1,385 @@
# authentic2 - versatile identity manager
# Copyright (C) 2010-2022 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import json
import pytest
import responses
from django.utils.html import escape
from jwcrypto.jwk import JWK, JWKSet
from webtest import Upload
from authentic2.a2_rbac.models import Role
from authentic2.a2_rbac.utils import get_default_ou
from authentic2.apps.authenticators.models import AddRoleAction
from authentic2.manager.utils import label_from_role
from authentic2.models import Attribute
from authentic2_auth_oidc.models import OIDCAccount, OIDCClaimMapping, OIDCProvider
from tests.utils import assert_event, login, request_select2
from .test_misc import oidc_provider, oidc_provider_jwkset # pylint: disable=unused-import
@pytest.mark.freeze_time('2022-04-19 14:00')
@responses.activate
def test_authenticators_oidc(app, superuser, ou1, ou2):
resp = login(app, superuser, path='/manage/authenticators/')
resp = resp.click('Add new authenticator')
resp.form['name'] = 'Test'
resp.form['authenticator'] = 'oidc'
resp = resp.form.submit()
assert '/edit/' in resp.location
assert_event('authenticator.creation', user=superuser, session=app.session)
provider = OIDCProvider.objects.filter(slug='test', ou=get_default_ou()).get()
resp = app.get(provider.get_absolute_url())
assert 'extra-actions-menu-opener' in resp.text
assert 'Creation date: April 19, 2022, 2 p.m.' in resp.text
assert 'Last modification date: April 19, 2022, 2 p.m.' in resp.text
assert 'Issuer' not in resp.text
assert 'Enable' not in resp.text
assert 'configuration is not complete' in resp.text
app.get('/manage/authenticators/%s/toggle/' % provider.pk, status=403)
resp = resp.click('Edit')
assert 'enabled' not in resp.form.fields
assert 'last_sync_time' not in resp.form.fields
assert resp.pyquery('input#id_client_id').val() == ''
assert resp.pyquery('input#id_client_secret').val() == ''
resp.form['ou'] = ou1.pk
resp.form['issuer'] = 'https://oidc.example.com'
resp.form['scopes'] = 'profile email'
resp.form['strategy'] = 'create'
resp.form['authorization_endpoint'] = 'https://oidc.example.com/authorize'
resp.form['token_endpoint'] = 'https://oidc.example.com/token'
resp.form['userinfo_endpoint'] = 'https://oidc.example.com/user_info'
resp.form['idtoken_algo'] = 2
resp.form['button_label'] = 'Test'
resp.form['button_description'] = 'test'
resp.form['client_id'] = 'auie'
resp.form['client_secret'] = 'tsrn'
resp = resp.form.submit().follow()
assert_event('authenticator.edit', user=superuser, session=app.session)
assert 'Issuer: https://oidc.example.com' in resp.text
assert 'Scopes: profile email' in resp.text
resp = app.get('/manage/authenticators/')
assert 'OpenID Connect - Test' in resp.text
assert 'class="section disabled"' in resp.text
assert 'OIDC provider linked to' not in resp.text
resp = resp.click('Configure', index=1)
resp = resp.click('Enable').follow()
assert 'Authenticator has been enabled.' in resp.text
assert_event('authenticator.enable', user=superuser, session=app.session)
resp = resp.click('Journal of edits')
assert 'enable' in resp.text
assert (
'edit (ou, issuer, scopes, strategy, client_id, button_label, idtoken_algo, '
'client_secret, token_endpoint, userinfo_endpoint, button_description, authorization_endpoint)'
in resp.text
)
assert 'creation' in resp.text
jwkset_url = 'https://www.example.com/common/discovery/v3.0/keys'
kid_rsa = '123'
def generate_remote_jwkset_json():
key_rsa = JWK.generate(kty='RSA', size=512, kid=kid_rsa)
jwkset = JWKSet()
jwkset.add(key_rsa)
return jwkset.export(as_dict=True)
responses.get(
jwkset_url,
json={
'headers': {
'content-type': 'application/json',
},
'status_code': 200,
**generate_remote_jwkset_json(),
},
)
provider.refresh_from_db()
provider.jwkset_url = jwkset_url
provider.save()
resp = app.get('/manage/authenticators/%s/edit/' % provider.pk)
assert resp.pyquery('input#id_jwkset_url')[0].value == jwkset_url
assert 'disabled' in resp.pyquery('textarea#id_jwkset_json')[0].keys()
assert '"kid": "123"' in resp.pyquery('textarea#id_jwkset_json')[0].text
assert (
resp.pyquery('div[aria-labelledby="id_jwkset_json_title"] div.hint')[0].text
== 'JSON is fetched from the WebKey Set URL'
)
resp = app.get('/manage/authenticators/')
assert 'class="section disabled"' not in resp.text
assert 'OIDC provider linked to https://oidc.example.com with scopes profile, email.' not in resp.text
# same name
resp = resp.click('Add new authenticator')
resp.form['name'] = 'test'
resp.form['authenticator'] = 'oidc'
resp = resp.form.submit().follow()
assert OIDCProvider.objects.filter(slug='test-1').count() == 1
OIDCProvider.objects.filter(slug='test-1').delete()
# no name
resp = app.get('/manage/authenticators/add/')
resp.form['authenticator'] = 'oidc'
resp = resp.form.submit()
assert 'This field is required' in resp.text
resp = app.get('/manage/authenticators/')
resp = resp.click('Configure', index=1)
resp = resp.click('Disable').follow()
assert 'Authenticator has been disabled.' in resp.text
assert_event('authenticator.disable', user=superuser, session=app.session)
resp = app.get('/manage/authenticators/')
assert 'class="section disabled"' in resp.text
resp = resp.click('Configure', index=1)
resp = resp.click('Delete')
resp = resp.form.submit().follow()
assert not OIDCProvider.objects.filter(slug='test').exists()
assert_event('authenticator.deletion', user=superuser, session=app.session)
def test_authenticators_oidc_claims(app, superuser):
authenticator = OIDCProvider.objects.create(slug='idp1')
resp = login(app, superuser, path=authenticator.get_absolute_url())
resp = resp.click('Add', href='claim')
resp.form['claim'] = 'email'
resp.form['attribute'].select(text='Email address (email)')
resp.form['verified'].select(text='verified claim')
resp.form['required'] = True
resp.form['idtoken_claim'] = True
resp = resp.form.submit()
assert_event('authenticator.related_object.creation', user=superuser, session=app.session)
assert '#open:oidcclaimmapping' in resp.location
resp = resp.follow()
assert 'email → Email address (email), verified, required, idtoken' in resp.text
resp = resp.click('email')
resp.form['attribute'].select(text='First name (first_name)')
resp = resp.form.submit().follow()
assert 'email → First name (first_name), verified, required, idtoken' in resp.text
assert_event('authenticator.related_object.edit', user=superuser, session=app.session)
resp = resp.click('Remove')
resp = resp.form.submit().follow()
assert 'email' not in resp.text
assert_event('authenticator.related_object.deletion', user=superuser, session=app.session)
def test_authenticators_oidc_claims_disabled_attribute(app, superuser):
authenticator = OIDCProvider.objects.create(slug='idp1')
attr = Attribute.objects.create(kind='string', name='test_attribute', label='Test attribute')
resp = login(app, superuser, path=authenticator.get_absolute_url())
resp = resp.click('Add', href='claim')
assert resp.pyquery('select#id_attribute option[value=test_attribute]')
attr.disabled = True
attr.save()
resp = app.get(authenticator.get_absolute_url())
resp = resp.click('Add', href='claim')
assert not resp.pyquery('select#id_attribute option[value=test_attribute]')
def test_authenticators_oidc_add_role(app, superuser, role_ou1):
authenticator = OIDCProvider.objects.create(slug='idp1')
resp = login(app, superuser, path=authenticator.get_absolute_url())
resp = resp.click('Add', href='role')
select2_json = request_select2(app, resp, term='role_ou1')
assert len(select2_json['results']) == 1
resp.form['role'].force_value(select2_json['results'][0]['id'])
resp = resp.form.submit().follow()
assert 'role_ou1' in resp.text
def test_authenticators_oidc_export(app, superuser, simple_role):
authenticator = OIDCProvider.objects.create(slug='idp1', order=42, ou=get_default_ou(), enabled=True)
OIDCClaimMapping.objects.create(authenticator=authenticator, claim='test', attribute='hop')
AddRoleAction.objects.create(authenticator=authenticator, role=simple_role)
resp = login(app, superuser, path=authenticator.get_absolute_url())
export_resp = resp.click('Export')
resp = app.get('/manage/authenticators/import/')
resp.form['authenticator_json'] = Upload('export.json', export_resp.body, 'application/json')
resp = resp.form.submit()
assert '/authenticators/%s/' % authenticator.pk in resp.location
resp = resp.follow()
assert 'Authenticator has been updated.' in resp.text
assert OIDCProvider.objects.count() == 1
assert OIDCClaimMapping.objects.count() == 1
assert AddRoleAction.objects.count() == 1
OIDCProvider.objects.all().delete()
OIDCClaimMapping.objects.all().delete()
AddRoleAction.objects.all().delete()
resp = app.get('/manage/authenticators/import/')
resp.form['authenticator_json'] = Upload('export.json', export_resp.body, 'application/json')
resp = resp.form.submit().follow()
assert 'Authenticator has been created.' in resp.text
authenticator = OIDCProvider.objects.get()
assert authenticator.slug == 'idp1'
assert authenticator.order == 1
assert authenticator.ou == get_default_ou()
assert authenticator.enabled is False
assert OIDCClaimMapping.objects.filter(
authenticator=authenticator, claim='test', attribute='hop'
).exists()
assert AddRoleAction.objects.filter(authenticator=authenticator, role=simple_role).exists()
def test_authenticators_oidc_import_errors(app, superuser, simple_role):
resp = login(app, superuser, path='/manage/authenticators/import/')
resp.form['authenticator_json'] = Upload('export.json', b'not-json', 'application/json')
resp = resp.form.submit()
assert 'File is not in the expected JSON format.' in resp.text
resp.form['authenticator_json'] = Upload('export.json', b'{}', 'application/json')
resp = resp.form.submit()
assert escape('Missing "authenticator_type" key.') in resp.text
resp.form['authenticator_json'] = Upload(
'export.json', b'{"authenticator_type": "xxx"}', 'application/json'
)
resp = resp.form.submit()
assert 'Invalid authenticator_type: xxx.' in resp.text
resp.form['authenticator_json'] = Upload(
'export.json', b'{"authenticator_type": "x.y"}', 'application/json'
)
resp = resp.form.submit()
assert 'Unknown authenticator_type: x.y.' in resp.text
authenticator = OIDCProvider.objects.create(slug='idp1', order=42, ou=get_default_ou(), enabled=True)
AddRoleAction.objects.create(authenticator=authenticator, role=simple_role)
export_resp = app.get('/manage/authenticators/%s/export/' % authenticator.pk)
export = json.loads(export_resp.text)
del export['slug']
resp.form['authenticator_json'] = Upload('export.json', json.dumps(export).encode(), 'application/json')
resp = resp.form.submit()
assert 'Missing slug.' in resp.text
export = json.loads(export_resp.text)
export['ou'] = {'slug': 'xxx'}
resp.form['authenticator_json'] = Upload('export.json', json.dumps(export).encode(), 'application/json')
resp = resp.form.submit()
assert escape("Organization unit not found: {'slug': 'xxx'}.") in resp.text
export = json.loads(export_resp.text)
del export['related_objects'][0]['object_type']
resp.form['authenticator_json'] = Upload('export.json', json.dumps(export).encode(), 'application/json')
resp = resp.form.submit()
assert escape('Missing "object_type" key.') in resp.text
export = json.loads(export_resp.text)
del export['related_objects'][0]['role']
resp.form['authenticator_json'] = Upload('export.json', json.dumps(export).encode(), 'application/json')
resp = resp.form.submit()
assert escape('Missing "role" key in add role action.') in resp.text
export = json.loads(export_resp.text)
export['related_objects'][0]['role'] = {'slug': 'xxx'}
resp.form['authenticator_json'] = Upload('export.json', json.dumps(export).encode(), 'application/json')
resp = resp.form.submit()
assert escape("Role not found: {'slug': 'xxx'}.") in resp.text
def test_authenticators_add_role_actions(app, admin, simple_role, role_ou1):
authenticator = OIDCProvider.objects.create(slug='idp1', ou=get_default_ou(), enabled=True)
authenticator.save()
action = AddRoleAction.objects.create(authenticator=authenticator, role=simple_role)
login(app, admin)
resp = app.get(authenticator.get_absolute_url())
assert resp.pyquery(
f'a[href="/manage/authenticators/{authenticator.pk}/addroleaction/{action.pk}/edit/"]'
).text() == label_from_role(simple_role)
resp = resp.click(href=f'/manage/authenticators/{authenticator.pk}/addroleaction/add/')
select2_json = request_select2(app, resp, term='role_ou1')
assert len(select2_json['results']) == 1
resp.form['role'].force_value(select2_json['results'][0]['id'])
resp.form['condition'] = '{% %}'
resp = resp.form.submit()
assert 'template syntax error: Could not parse the remainder:' in resp.text
resp.form['role'] = role_ou1.id
resp.form['condition'] = '"Admin" in attributes.groups'
resp = resp.form.submit().follow()
action = AddRoleAction.objects.get(
authenticator=authenticator, role=role_ou1, condition='"Admin" in attributes.groups'
)
assert resp.pyquery(
f'a[href="/manage/authenticators/{authenticator.pk}/addroleaction/{action.pk}/edit/"]'
).text() == '%s (depending on condition)' % label_from_role(role_ou1)
def test_authenticators_oidc_related_objects_permissions(app, simple_user, simple_role):
authenticator = OIDCProvider.objects.create(slug='idp1', order=42, ou=get_default_ou(), enabled=True)
authenticator.save()
mapping = OIDCClaimMapping.objects.create(authenticator=authenticator, claim='test', attribute='hop')
action = AddRoleAction.objects.create(authenticator=authenticator, role=simple_role)
simple_user.roles.add(simple_role.get_admin_role()) # grant user access to /manage/
role = Role.objects.get(name='Manager of authenticators')
login(app, simple_user, path='/')
app.get(authenticator.get_absolute_url(), status=403)
app.get(f'/manage/authenticators/{authenticator.pk}/oidcclaimmapping/{mapping.pk}/edit/', status=403)
app.get(f'/manage/authenticators/{authenticator.pk}/addroleaction/{action.pk}/delete/', status=403)
app.get(f'/manage/authenticators/{authenticator.pk}/addroleaction/add/', status=403)
simple_user.roles.add(role)
app.get(authenticator.get_absolute_url())
app.get(f'/manage/authenticators/{authenticator.pk}/oidcclaimmapping/{mapping.pk}/edit/')
app.get(f'/manage/authenticators/{authenticator.pk}/addroleaction/{action.pk}/delete/')
app.get(f'/manage/authenticators/{authenticator.pk}/addroleaction/add/')
def test_manager_user_sidebar(app, superuser, simple_user, oidc_provider):
login(app, superuser, '/manage/')
response = app.get('/manage/users/%s/' % simple_user.id)
assert 'OIDC' not in response
OIDCAccount.objects.create(user=simple_user, provider=oidc_provider, sub='1234')
response = app.get('/manage/users/%s/' % simple_user.id)
assert 'OIDC' in response
assert 'Server' in response
assert '1234' in response

View File

@ -52,8 +52,7 @@ from authentic2_auth_oidc.backends import OIDCBackend
from authentic2_auth_oidc.models import OIDCAccount, OIDCClaimMapping, OIDCProvider
from authentic2_auth_oidc.utils import IDToken, IDTokenError, parse_id_token, register_issuer
from authentic2_auth_oidc.views import oidc_login
from . import utils
from tests import utils
pytestmark = pytest.mark.django_db
@ -466,7 +465,6 @@ def test_providers_on_login_page(oidc_provider, app):
# two frontends should be present on login page
assert response.pyquery('p#oidc-p-server')
OIDCProvider.objects.create(
id=2,
ou=get_default_ou(),
name='OIDCIDP 2',
slug='oidcidp-2',
@ -1182,19 +1180,6 @@ def test_multiple_users_with_same_email(app, caplog, code, oidc_provider_jwkset,
assert 'too many users' in caplog.records[-1].message
def test_manager_user_sidebar(app, superuser, simple_user, oidc_provider):
utils.login(app, superuser, '/manage/')
response = app.get('/manage/users/%s/' % simple_user.id)
assert 'OIDC' not in response
OIDCAccount.objects.create(user=simple_user, provider=oidc_provider, sub='1234')
response = app.get('/manage/users/%s/' % simple_user.id)
assert 'OIDC' in response
assert 'Server' in response
assert '1234' in response
def test_strategy_find_username(app, caplog, code, oidc_provider, oidc_provider_jwkset, simple_user):
# no mapping please
OIDCClaimMapping.objects.all().delete()

View File

@ -17,8 +17,6 @@
import datetime
import importlib
import json
import random
import uuid
from io import BufferedReader, BufferedWriter, TextIOWrapper
import py
@ -28,7 +26,6 @@ import webtest
from django.contrib.auth import get_user_model
from django.contrib.contenttypes.models import ContentType
from django.utils.timezone import now
from jwcrypto.jwk import JWK, JWKSet
from mellon.models import Issuer, UserSAMLIdentifier
from authentic2.a2_rbac.models import (
@ -44,10 +41,9 @@ from authentic2.a2_rbac.utils import get_default_ou, get_operation
from authentic2.apps.journal.models import Event
from authentic2.custom_user.models import DeletedUser
from authentic2.models import UserExternalId
from authentic2.utils import crypto
from authentic2_auth_oidc.models import OIDCAccount, OIDCClaimMapping, OIDCProvider
from authentic2_auth_oidc.models import OIDCAccount, OIDCProvider
from .utils import call_command, check_log, login
from .utils import call_command, login
User = get_user_model()
@ -644,63 +640,6 @@ def test_load_ldif(db, monkeypatch, tmpdir):
)
def test_oidc_register_issuer(db, tmpdir, monkeypatch):
oidc_conf_f = py.path.local(__file__).dirpath('openid_configuration.json')
with oidc_conf_f.open() as f:
oidc_conf = json.load(f)
def register_issuer(
name,
client_id,
client_secret,
issuer=None,
openid_configuration=None,
verify=True,
timeout=None,
ou=None,
):
ou = OrganizationalUnit.objects.get(default=True)
# skipping jwkset retrieval in mocked function
jwkset = JWKSet()
jwkset.add(JWK.generate(kty='RSA', size=512, kid='auie'))
jwkset.add(JWK.generate(kty='EC', size=256, kid='tsrn'))
return OIDCProvider.objects.create(
name=name,
slug='test',
client_id='abc',
client_secret='def',
enabled=True,
ou=ou,
issuer=issuer,
strategy='create',
jwkset_json=jwkset.export(as_dict=True),
authorization_endpoint=openid_configuration['authorization_endpoint'],
token_endpoint=openid_configuration['token_endpoint'],
userinfo_endpoint=openid_configuration['userinfo_endpoint'],
end_session_endpoint=openid_configuration['end_session_endpoint'],
)
oidc_cmd = importlib.import_module('authentic2_auth_oidc.management.commands.oidc-register-issuer')
monkeypatch.setattr(oidc_cmd, 'register_issuer', register_issuer)
oidc_conf = py.path.local(__file__).dirpath('openid_configuration.json').strpath
call_command(
'oidc-register-issuer',
'--openid-configuration',
oidc_conf,
'--issuer',
'issuer',
'--client-id',
'auie',
'--client-secret',
'tsrn',
'somename',
)
provider = OIDCProvider.objects.get(name='somename')
assert provider.issuer == 'issuer'
def test_resetpassword(simple_user):
call_command('resetpassword', 'user')
old_pass = simple_user.password
@ -851,182 +790,3 @@ def test_clean_user_exports(settings, app, superuser, freezer):
call_command('clean-user-exports')
with pytest.raises(webtest.app.AppError):
resp.click('Download CSV')
@responses.activate
@pytest.mark.parametrize('deletion_number,deletion_valid', [(2, True), (5, True), (10, False)])
def test_oidc_sync_provider(
db, app, admin, settings, caplog, deletion_number, deletion_valid, nologtoconsole
):
oidc_provider = OIDCProvider.objects.create(
issuer='https://some.provider',
name='Some Provider',
slug='some-provider',
ou=get_default_ou(),
)
OIDCClaimMapping.objects.create(
authenticator=oidc_provider,
attribute='username',
idtoken_claim=False,
claim='username',
)
OIDCClaimMapping.objects.create(
authenticator=oidc_provider,
attribute='email',
idtoken_claim=False,
claim='email',
)
# last one, with an idtoken claim
OIDCClaimMapping.objects.create(
authenticator=oidc_provider,
attribute='last_name',
idtoken_claim=True,
claim='family_name',
)
# typo in template string
OIDCClaimMapping.objects.create(
authenticator=oidc_provider,
attribute='first_name',
idtoken_claim=True,
claim='given_name',
)
User = get_user_model()
for i in range(100):
user = User.objects.create(
first_name='John%s' % i,
last_name='Doe%s' % i,
username='john.doe.%s' % i,
email='john.doe.%s@ad.dre.ss' % i,
ou=get_default_ou(),
)
identifier = uuid.UUID(user.uuid).bytes
sector_identifier = 'some.provider'
cipher_args = [
settings.SECRET_KEY.encode('utf-8'),
identifier,
sector_identifier,
]
sub = crypto.aes_base64url_deterministic_encrypt(*cipher_args).decode('utf-8')
OIDCAccount.objects.create(user=user, provider=oidc_provider, sub=sub)
def synchronization_get_modified_response():
# randomized batch of modified users
modified_users = random.sample(list(User.objects.all()), 20)
results = []
for count, user in enumerate(modified_users):
user_json = user.to_json()
user_json['username'] = f'modified_{count}'
user_json['first_name'] = 'Mod'
user_json['last_name'] = 'Ified'
# mocking claim resolution by oidc provider
user_json['given_name'] = 'Mod'
user_json['family_name'] = 'Ified'
# add user sub to response
try:
account = OIDCAccount.objects.get(user=user)
except OIDCAccount.DoesNotExist:
pass
else:
user_json['sub'] = account.sub
results.append(user_json)
return {'results': results}
responses.post(
'https://some.provider/api/users/synchronization/',
json={
'unknown_uuids': [
account.sub for account in random.sample(list(OIDCAccount.objects.all()), deletion_number)
]
},
)
responses.get('https://some.provider/api/users/', json=synchronization_get_modified_response())
with check_log(caplog, 'no provider supporting synchronization'):
call_command('oidc-sync-provider', '-v1')
oidc_provider.a2_synchronization_supported = True
oidc_provider.save()
with check_log(caplog, 'no provider supporting synchronization'):
call_command('oidc-sync-provider', '--provider', 'whatever', '-v1')
with check_log(caplog, 'got 20 users'):
call_command('oidc-sync-provider', '-v1')
if deletion_valid:
# existing users check
assert OIDCAccount.objects.count() == 100 - deletion_number
else:
assert OIDCAccount.objects.count() == 100
assert caplog.records[3].levelname == 'ERROR'
assert 'deletion ratio is abnormally high' in caplog.records[3].message
# users update
assert User.objects.filter(username__startswith='modified').count() in range(20 - deletion_number, 21)
assert User.objects.filter(first_name='Mod', last_name='Ified').count() in range(20 - deletion_number, 21)
@responses.activate
def test_auth_oidc_refresh_jwkset_json(db, app, admin, settings, caplog):
jwkset_url = 'https://www.example.com/common/discovery/v3.0/keys'
kid_rsa = '123'
kid_ec = '456'
def generate_remote_jwkset_json():
key_rsa = JWK.generate(kty='RSA', size=512, kid=kid_rsa)
key_ec = JWK.generate(kty='EC', size=256, kid=kid_ec)
jwkset = JWKSet()
jwkset.add(key_rsa)
jwkset.add(key_ec)
return jwkset.export(as_dict=True)
responses.get(
jwkset_url,
json={
'headers': {
'content-type': 'application/json',
},
'status_code': 200,
**generate_remote_jwkset_json(),
},
)
issuer = ('https://www.example.com',)
provider = OIDCProvider.objects.create(
ou=get_default_ou(),
name='Foo',
slug='foo',
client_id='abc',
client_secret='def',
enabled=True,
issuer=issuer,
authorization_endpoint='%s/authorize' % issuer,
token_endpoint='%s/token' % issuer,
end_session_endpoint='%s/logout' % issuer,
userinfo_endpoint='%s/user_info' % issuer,
token_revocation_endpoint='%s/revoke' % issuer,
jwkset_url=jwkset_url,
idtoken_algo=OIDCProvider.ALGO_RSA,
claims_parameter_supported=False,
button_label='Connect with Foo',
)
assert {key['kid'] for key in provider.jwkset_json['keys']} == {'123', '456'}
kid_rsa = 'abcdefg'
kid_ec = 'hijklmn'
responses.get(
jwkset_url,
json={
'headers': {
'content-type': 'application/json',
},
'status_code': 200,
**generate_remote_jwkset_json(),
},
)
call_command('oidc-refresh-jwkset-json', '-v1')
provider.refresh_from_db()
assert {key['kid'] for key in provider.jwkset_json['keys']} == {'abcdefg', 'hijklmn'}

View File

@ -17,19 +17,15 @@
import json
import pytest
import responses
from django import VERSION as DJ_VERSION
from django.utils.html import escape
from jwcrypto.jwk import JWK, JWKSet
from webtest import Upload
from authentic2.a2_rbac.models import Role
from authentic2.a2_rbac.utils import get_default_ou
from authentic2.apps.authenticators.models import AddRoleAction, BaseAuthenticator, LoginPasswordAuthenticator
from authentic2.manager.utils import label_from_role
from authentic2.models import Attribute
from authentic2_auth_fc.models import FcAuthenticator
from authentic2_auth_oidc.models import OIDCClaimMapping, OIDCProvider
from authentic2_auth_oidc.models import OIDCProvider
from authentic2_auth_saml.models import SAMLAuthenticator, SetAttributeAction
from .utils import assert_event, login, logout, request_select2
@ -220,345 +216,6 @@ def test_authenticators_password_export(app, superuser):
assert LoginPasswordAuthenticator.objects.get().button_description == 'test'
@pytest.mark.freeze_time('2022-04-19 14:00')
@responses.activate
def test_authenticators_oidc(app, superuser, ou1, ou2):
resp = login(app, superuser, path='/manage/authenticators/')
resp = resp.click('Add new authenticator')
resp.form['name'] = 'Test'
resp.form['authenticator'] = 'oidc'
resp = resp.form.submit()
assert '/edit/' in resp.location
assert_event('authenticator.creation', user=superuser, session=app.session)
provider = OIDCProvider.objects.filter(slug='test', ou=get_default_ou()).get()
resp = app.get(provider.get_absolute_url())
assert 'extra-actions-menu-opener' in resp.text
assert 'Creation date: April 19, 2022, 2 p.m.' in resp.text
assert 'Last modification date: April 19, 2022, 2 p.m.' in resp.text
assert 'Issuer' not in resp.text
assert 'Enable' not in resp.text
assert 'configuration is not complete' in resp.text
app.get('/manage/authenticators/%s/toggle/' % provider.pk, status=403)
resp = resp.click('Edit')
assert 'enabled' not in resp.form.fields
assert 'last_sync_time' not in resp.form.fields
assert resp.pyquery('input#id_client_id').val() == ''
assert resp.pyquery('input#id_client_secret').val() == ''
resp.form['ou'] = ou1.pk
resp.form['issuer'] = 'https://oidc.example.com'
resp.form['scopes'] = 'profile email'
resp.form['strategy'] = 'create'
resp.form['authorization_endpoint'] = 'https://oidc.example.com/authorize'
resp.form['token_endpoint'] = 'https://oidc.example.com/token'
resp.form['userinfo_endpoint'] = 'https://oidc.example.com/user_info'
resp.form['idtoken_algo'] = 2
resp.form['button_label'] = 'Test'
resp.form['button_description'] = 'test'
resp.form['client_id'] = 'auie'
resp.form['client_secret'] = 'tsrn'
resp = resp.form.submit().follow()
assert_event('authenticator.edit', user=superuser, session=app.session)
assert 'Issuer: https://oidc.example.com' in resp.text
assert 'Scopes: profile email' in resp.text
resp = app.get('/manage/authenticators/')
assert 'OpenID Connect - Test' in resp.text
assert 'class="section disabled"' in resp.text
assert 'OIDC provider linked to' not in resp.text
resp = resp.click('Configure', index=1)
resp = resp.click('Enable').follow()
assert 'Authenticator has been enabled.' in resp.text
assert_event('authenticator.enable', user=superuser, session=app.session)
resp = resp.click('Journal of edits')
assert 'enable' in resp.text
assert (
'edit (ou, issuer, scopes, strategy, client_id, button_label, idtoken_algo, '
'client_secret, token_endpoint, userinfo_endpoint, button_description, authorization_endpoint)'
in resp.text
)
assert 'creation' in resp.text
jwkset_url = 'https://www.example.com/common/discovery/v3.0/keys'
kid_rsa = '123'
def generate_remote_jwkset_json():
key_rsa = JWK.generate(kty='RSA', size=512, kid=kid_rsa)
jwkset = JWKSet()
jwkset.add(key_rsa)
return jwkset.export(as_dict=True)
responses.get(
jwkset_url,
json={
'headers': {
'content-type': 'application/json',
},
'status_code': 200,
**generate_remote_jwkset_json(),
},
)
provider.refresh_from_db()
provider.jwkset_url = jwkset_url
provider.save()
resp = app.get('/manage/authenticators/%s/edit/' % provider.pk)
assert resp.pyquery('input#id_jwkset_url')[0].value == jwkset_url
assert 'disabled' in resp.pyquery('textarea#id_jwkset_json')[0].keys()
assert '"kid": "123"' in resp.pyquery('textarea#id_jwkset_json')[0].text
assert (
resp.pyquery('div[aria-labelledby="id_jwkset_json_title"] div.hint')[0].text
== 'JSON is fetched from the WebKey Set URL'
)
resp = app.get('/manage/authenticators/')
assert 'class="section disabled"' not in resp.text
assert 'OIDC provider linked to https://oidc.example.com with scopes profile, email.' not in resp.text
# same name
resp = resp.click('Add new authenticator')
resp.form['name'] = 'test'
resp.form['authenticator'] = 'oidc'
resp = resp.form.submit().follow()
assert OIDCProvider.objects.filter(slug='test-1').count() == 1
OIDCProvider.objects.filter(slug='test-1').delete()
# no name
resp = app.get('/manage/authenticators/add/')
resp.form['authenticator'] = 'oidc'
resp = resp.form.submit()
assert 'This field is required' in resp.text
resp = app.get('/manage/authenticators/')
resp = resp.click('Configure', index=1)
resp = resp.click('Disable').follow()
assert 'Authenticator has been disabled.' in resp.text
assert_event('authenticator.disable', user=superuser, session=app.session)
resp = app.get('/manage/authenticators/')
assert 'class="section disabled"' in resp.text
resp = resp.click('Configure', index=1)
resp = resp.click('Delete')
resp = resp.form.submit().follow()
assert not OIDCProvider.objects.filter(slug='test').exists()
assert_event('authenticator.deletion', user=superuser, session=app.session)
def test_authenticators_oidc_claims(app, superuser):
authenticator = OIDCProvider.objects.create(slug='idp1')
resp = login(app, superuser, path=authenticator.get_absolute_url())
resp = resp.click('Add', href='claim')
resp.form['claim'] = 'email'
resp.form['attribute'].select(text='Email address (email)')
resp.form['verified'].select(text='verified claim')
resp.form['required'] = True
resp.form['idtoken_claim'] = True
resp = resp.form.submit()
assert_event('authenticator.related_object.creation', user=superuser, session=app.session)
assert '#open:oidcclaimmapping' in resp.location
resp = resp.follow()
assert 'email → Email address (email), verified, required, idtoken' in resp.text
resp = resp.click('email')
resp.form['attribute'].select(text='First name (first_name)')
resp = resp.form.submit().follow()
assert 'email → First name (first_name), verified, required, idtoken' in resp.text
assert_event('authenticator.related_object.edit', user=superuser, session=app.session)
resp = resp.click('Remove')
resp = resp.form.submit().follow()
assert 'email' not in resp.text
assert_event('authenticator.related_object.deletion', user=superuser, session=app.session)
def test_authenticators_oidc_claims_disabled_attribute(app, superuser):
authenticator = OIDCProvider.objects.create(slug='idp1')
attr = Attribute.objects.create(kind='string', name='test_attribute', label='Test attribute')
resp = login(app, superuser, path=authenticator.get_absolute_url())
resp = resp.click('Add', href='claim')
assert resp.pyquery('select#id_attribute option[value=test_attribute]')
attr.disabled = True
attr.save()
resp = app.get(authenticator.get_absolute_url())
resp = resp.click('Add', href='claim')
assert not resp.pyquery('select#id_attribute option[value=test_attribute]')
def test_authenticators_oidc_add_role(app, superuser, role_ou1):
authenticator = OIDCProvider.objects.create(slug='idp1')
resp = login(app, superuser, path=authenticator.get_absolute_url())
resp = resp.click('Add', href='role')
select2_json = request_select2(app, resp, term='role_ou1')
assert len(select2_json['results']) == 1
resp.form['role'].force_value(select2_json['results'][0]['id'])
resp = resp.form.submit().follow()
assert 'role_ou1' in resp.text
def test_authenticators_oidc_export(app, superuser, simple_role):
authenticator = OIDCProvider.objects.create(slug='idp1', order=42, ou=get_default_ou(), enabled=True)
OIDCClaimMapping.objects.create(authenticator=authenticator, claim='test', attribute='hop')
AddRoleAction.objects.create(authenticator=authenticator, role=simple_role)
resp = login(app, superuser, path=authenticator.get_absolute_url())
export_resp = resp.click('Export')
resp = app.get('/manage/authenticators/import/')
resp.form['authenticator_json'] = Upload('export.json', export_resp.body, 'application/json')
resp = resp.form.submit()
assert '/authenticators/%s/' % authenticator.pk in resp.location
resp = resp.follow()
assert 'Authenticator has been updated.' in resp.text
assert OIDCProvider.objects.count() == 1
assert OIDCClaimMapping.objects.count() == 1
assert AddRoleAction.objects.count() == 1
OIDCProvider.objects.all().delete()
OIDCClaimMapping.objects.all().delete()
AddRoleAction.objects.all().delete()
resp = app.get('/manage/authenticators/import/')
resp.form['authenticator_json'] = Upload('export.json', export_resp.body, 'application/json')
resp = resp.form.submit().follow()
assert 'Authenticator has been created.' in resp.text
authenticator = OIDCProvider.objects.get()
assert authenticator.slug == 'idp1'
assert authenticator.order == 1
assert authenticator.ou == get_default_ou()
assert authenticator.enabled is False
assert OIDCClaimMapping.objects.filter(
authenticator=authenticator, claim='test', attribute='hop'
).exists()
assert AddRoleAction.objects.filter(authenticator=authenticator, role=simple_role).exists()
def test_authenticators_oidc_import_errors(app, superuser, simple_role):
resp = login(app, superuser, path='/manage/authenticators/import/')
resp.form['authenticator_json'] = Upload('export.json', b'not-json', 'application/json')
resp = resp.form.submit()
assert 'File is not in the expected JSON format.' in resp.text
resp.form['authenticator_json'] = Upload('export.json', b'{}', 'application/json')
resp = resp.form.submit()
assert escape('Missing "authenticator_type" key.') in resp.text
resp.form['authenticator_json'] = Upload(
'export.json', b'{"authenticator_type": "xxx"}', 'application/json'
)
resp = resp.form.submit()
assert 'Invalid authenticator_type: xxx.' in resp.text
resp.form['authenticator_json'] = Upload(
'export.json', b'{"authenticator_type": "x.y"}', 'application/json'
)
resp = resp.form.submit()
assert 'Unknown authenticator_type: x.y.' in resp.text
authenticator = OIDCProvider.objects.create(slug='idp1', order=42, ou=get_default_ou(), enabled=True)
AddRoleAction.objects.create(authenticator=authenticator, role=simple_role)
export_resp = app.get('/manage/authenticators/%s/export/' % authenticator.pk)
export = json.loads(export_resp.text)
del export['slug']
resp.form['authenticator_json'] = Upload('export.json', json.dumps(export).encode(), 'application/json')
resp = resp.form.submit()
assert 'Missing slug.' in resp.text
export = json.loads(export_resp.text)
export['ou'] = {'slug': 'xxx'}
resp.form['authenticator_json'] = Upload('export.json', json.dumps(export).encode(), 'application/json')
resp = resp.form.submit()
assert escape("Organization unit not found: {'slug': 'xxx'}.") in resp.text
export = json.loads(export_resp.text)
del export['related_objects'][0]['object_type']
resp.form['authenticator_json'] = Upload('export.json', json.dumps(export).encode(), 'application/json')
resp = resp.form.submit()
assert escape('Missing "object_type" key.') in resp.text
export = json.loads(export_resp.text)
del export['related_objects'][0]['role']
resp.form['authenticator_json'] = Upload('export.json', json.dumps(export).encode(), 'application/json')
resp = resp.form.submit()
assert escape('Missing "role" key in add role action.') in resp.text
export = json.loads(export_resp.text)
export['related_objects'][0]['role'] = {'slug': 'xxx'}
resp.form['authenticator_json'] = Upload('export.json', json.dumps(export).encode(), 'application/json')
resp = resp.form.submit()
assert escape("Role not found: {'slug': 'xxx'}.") in resp.text
def test_authenticators_add_role_actions(app, admin, simple_role, role_ou1):
authenticator = OIDCProvider.objects.create(slug='idp1', ou=get_default_ou(), enabled=True)
authenticator.save()
action = AddRoleAction.objects.create(authenticator=authenticator, role=simple_role)
login(app, admin)
resp = app.get(authenticator.get_absolute_url())
assert resp.pyquery(
f'a[href="/manage/authenticators/{authenticator.pk}/addroleaction/{action.pk}/edit/"]'
).text() == label_from_role(simple_role)
resp = resp.click(href=f'/manage/authenticators/{authenticator.pk}/addroleaction/add/')
select2_json = request_select2(app, resp, term='role_ou1')
assert len(select2_json['results']) == 1
resp.form['role'].force_value(select2_json['results'][0]['id'])
resp.form['condition'] = '{% %}'
resp = resp.form.submit()
assert 'template syntax error: Could not parse the remainder:' in resp.text
resp.form['role'] = role_ou1.id
resp.form['condition'] = '"Admin" in attributes.groups'
resp = resp.form.submit().follow()
action = AddRoleAction.objects.get(
authenticator=authenticator, role=role_ou1, condition='"Admin" in attributes.groups'
)
assert resp.pyquery(
f'a[href="/manage/authenticators/{authenticator.pk}/addroleaction/{action.pk}/edit/"]'
).text() == '%s (depending on condition)' % label_from_role(role_ou1)
def test_authenticators_oidc_related_objects_permissions(app, simple_user, simple_role):
authenticator = OIDCProvider.objects.create(slug='idp1', order=42, ou=get_default_ou(), enabled=True)
authenticator.save()
mapping = OIDCClaimMapping.objects.create(authenticator=authenticator, claim='test', attribute='hop')
action = AddRoleAction.objects.create(authenticator=authenticator, role=simple_role)
simple_user.roles.add(simple_role.get_admin_role()) # grant user access to /manage/
role = Role.objects.get(name='Manager of authenticators')
login(app, simple_user, path='/')
app.get(authenticator.get_absolute_url(), status=403)
app.get(f'/manage/authenticators/{authenticator.pk}/oidcclaimmapping/{mapping.pk}/edit/', status=403)
app.get(f'/manage/authenticators/{authenticator.pk}/addroleaction/{action.pk}/delete/', status=403)
app.get(f'/manage/authenticators/{authenticator.pk}/addroleaction/add/', status=403)
simple_user.roles.add(role)
app.get(authenticator.get_absolute_url())
app.get(f'/manage/authenticators/{authenticator.pk}/oidcclaimmapping/{mapping.pk}/edit/')
app.get(f'/manage/authenticators/{authenticator.pk}/addroleaction/{action.pk}/delete/')
app.get(f'/manage/authenticators/{authenticator.pk}/addroleaction/add/')
def test_authenticators_fc(app, superuser):
resp = login(app, superuser, path='/manage/authenticators/')