diff --git a/src/authentic2_auth_oidc/migrations/0008_auto_20201102_1142.py b/src/authentic2_auth_oidc/migrations/0008_auto_20201102_1142.py new file mode 100644 index 000000000..8e99bce95 --- /dev/null +++ b/src/authentic2_auth_oidc/migrations/0008_auto_20201102_1142.py @@ -0,0 +1,25 @@ +# Generated by Django 2.2.17 on 2020-11-02 10:42 + +from django.conf import settings +from django.db import migrations, models +import django.db.models.deletion + + +class Migration(migrations.Migration): + + dependencies = [ + migrations.swappable_dependency(settings.AUTH_USER_MODEL), + ('authentic2_auth_oidc', '0007_auto_20200317_1732'), + ] + + operations = [ + migrations.AlterField( + model_name='oidcaccount', + name='sub', + field=models.CharField(max_length=256, verbose_name='sub'), + ), + migrations.AlterUniqueTogether( + name='oidcaccount', + unique_together={('provider', 'sub')}, + ), + ] diff --git a/src/authentic2_auth_oidc/models.py b/src/authentic2_auth_oidc/models.py index 3a894c865..9e3a74b60 100644 --- a/src/authentic2_auth_oidc/models.py +++ b/src/authentic2_auth_oidc/models.py @@ -259,8 +259,7 @@ class OIDCAccount(models.Model): on_delete=models.CASCADE) sub = models.CharField( verbose_name=_('sub'), - max_length=256, - unique=True) + max_length=256) def __str__(self): return u'{0} on {1} linked to {2}'.format(self.sub, self.provider and self.provider.issuer, @@ -268,3 +267,8 @@ class OIDCAccount(models.Model): def __repr__(self): return '' % (self.sub, self.provider and self.provider.issuer) + + class Meta: + unique_together = [ + ('provider', 'sub'), + ] diff --git a/tests/test_auth_oidc.py b/tests/test_auth_oidc.py index e12c4f420..018ee288d 100644 --- a/tests/test_auth_oidc.py +++ b/tests/test_auth_oidc.py @@ -31,6 +31,7 @@ from jwcrypto.jwt import JWT from httmock import urlmatch, HTTMock from django.contrib.auth import get_user_model +from django.db import IntegrityError, transaction from django.urls import reverse from django.utils.encoding import force_text, force_str from django.http import QueryDict @@ -43,7 +44,7 @@ from django_rbac.utils import get_ou_model from authentic2_auth_oidc.utils import ( parse_id_token, IDToken, get_providers, has_providers, register_issuer, IDTokenError) -from authentic2_auth_oidc.models import OIDCProvider, OIDCClaimMapping +from authentic2_auth_oidc.models import OIDCProvider, OIDCClaimMapping, OIDCAccount from authentic2.models import Attribute from authentic2.models import AttributeValue from authentic2.utils import last_authentication_event @@ -53,6 +54,8 @@ from . import utils pytestmark = pytest.mark.django_db +User = get_user_model() + def test_base64url_decode(): with pytest.raises(ValueError): @@ -152,25 +155,29 @@ def oidc_provider_rsa(request, db, oidc_provider_jwkset): def make_oidc_provider( + name='Server', + slug=None, + issuer=None, + max_auth_age=10, + strategy=OIDCProvider.STRATEGY_CREATE, idtoken_algo=OIDCProvider._meta.get_field('idtoken_algo').default, jwkset=None, claims_parameter_supported=False): - - if jwkset is not None: - jwkset = json.loads(jwkset.export()) - + slug = slug or name.lower() + issuer = issuer or ('https://%s.example.com' % slug) + jwkset = json.loads(jwkset.export()) if jwkset else None provider = OIDCProvider.objects.create( ou=get_default_ou(), - name='OIDIDP', - slug='oididp', - issuer='http://server.example.com', - authorization_endpoint='https://server.example.com/authorize', - token_endpoint='https://server.example.com/token', - end_session_endpoint='https://server.example.com/logout', - userinfo_endpoint='https://server.example.com/user_info', - token_revocation_endpoint='https://server.example.com/revoke', - max_auth_age=10, - strategy=OIDCProvider.STRATEGY_CREATE, + name=name, + slug=slug, + issuer=issuer, + authorization_endpoint='%s/authorize' % issuer, + token_endpoint='%s/token' % issuer, + end_session_endpoint='%s/logout' % issuer, + userinfo_endpoint='%s/user_info' % issuer, + token_revocation_endpoint='%s/revoke' % issuer, + max_auth_age=max_auth_age, + strategy=strategy, jwkset_json=jwkset, idtoken_algo=idtoken_algo, claims_parameter_supported=claims_parameter_supported, @@ -350,7 +357,7 @@ def login_callback_url(oidc_provider): def test_providers_on_login_page(oidc_provider, app): response = app.get('/login/') # two frontends should be present on login page - assert response.pyquery('p#oidc-p-oididp') + assert response.pyquery('p#oidc-p-server') OIDCProvider.objects.create( id=2, ou=get_default_ou(), @@ -370,31 +377,15 @@ def test_providers_on_login_page(oidc_provider, app): ) response = app.get('/login/') - assert response.pyquery('p#oidc-p-oididp') + assert response.pyquery('p#oidc-p-server') assert response.pyquery('p#oidc-p-oidcidp-2') def test_login_with_conditional_authenticators(oidc_provider, app, settings, caplog): - OIDCProvider.objects.create( - id=2, - ou=get_default_ou(), - name='My IDP', - slug='myidp', - issuer='https://idp2.example.com/', - authorization_endpoint='https://idp2.example.com/authorize', - token_endpoint='https://idp2.example.com/token', - end_session_endpoint='https://idp2.example.com/logout', - userinfo_endpoint='https://idp*é.example.com/user_info', - token_revocation_endpoint='https://idp2.example.com/revoke', - max_auth_age=10, - strategy=OIDCProvider.STRATEGY_CREATE, - jwkset_json=None, - idtoken_algo=OIDCProvider.ALGO_RSA, - claims_parameter_supported=False - ) + make_oidc_provider(name='My IDP', slug='myidp') response = app.get('/login/') assert 'My IDP' in response - assert 'OIDIDP' in response + assert 'Server' in response settings.AUTH_FRONTENDS_KWARGS = { 'oidc': { @@ -404,31 +395,31 @@ def test_login_with_conditional_authenticators(oidc_provider, app, settings, cap } } response = app.get('/login/') - assert 'OIDIDP' in response + assert 'Server' in response assert 'My IDP' not in response settings.AUTH_FRONTENDS_KWARGS = { 'oidc': { 'show_condition': { 'myid': 'remote_addr==\'0.0.0.0\'', - 'oididp': 'remote_addr==\'127.0.0.1\'' + 'server': 'remote_addr==\'127.0.0.1\'' } } } response = app.get('/login/') - assert 'OIDIDP' in response + assert 'Server' in response assert 'My IDP' in response settings.AUTH_FRONTENDS_KWARGS = { 'oidc': { 'show_condition': { 'myidp': 'remote_addr==\'0.0.0.0\'', - 'oididp': 'remote_addr==\'127.0.0.1\'' + 'server': 'remote_addr==\'127.0.0.1\'' } } } response = app.get('/login/') - assert 'OIDIDP' in response + assert 'Server' in response assert 'My IDP' not in response settings.AUTH_FRONTENDS_KWARGS = { @@ -437,19 +428,19 @@ def test_login_with_conditional_authenticators(oidc_provider, app, settings, cap } } response = app.get('/login/') - assert 'OIDIDP' in response + assert 'Server' in response assert 'My IDP' in response settings.AUTH_FRONTENDS_KWARGS = { 'oidc': { 'show_condition': { 'myidp': 'remote_addr==\'127.0.0.1\' and \'backoffice\' not in login_hint', - 'oididp': '\'backoffice\' in login_hint', + 'server': '\'backoffice\' in login_hint', } } } response = app.get('/login/') - assert 'OIDIDP' not in response + assert 'Server' not in response assert 'My IDP' in response # As we do not create a session on each access to the login page, we need @@ -462,13 +453,13 @@ def test_login_with_conditional_authenticators(oidc_provider, app, settings, cap app.set_cookie(force_str(settings.SESSION_COOKIE_NAME), force_str(session.session_key)) response = app.get('/login/') - assert 'OIDIDP' in response + assert 'Server' in response assert 'My IDP' not in response def test_login_autorun(oidc_provider, app, settings): response = app.get('/login/') - assert 'OIDIDP' in response + assert 'Server' in response # hide password block settings.AUTH_FRONTENDS_KWARGS = {'password': {'show_condition': 'remote_addr==\'0.0.0.0\''}} @@ -504,7 +495,6 @@ def test_sso(app, caplog, code, oidc_provider, oidc_provider_jwkset, hooks): assert claims['userinfo']['family_name']['essential'] assert claims['userinfo']['ou'] is None - User = get_user_model() assert User.objects.count() == 0 with utils.check_log(caplog, 'failed to contact the token_endpoint'): @@ -592,7 +582,7 @@ def test_sso(app, caplog, code, oidc_provider, oidc_provider_jwkset, hooks): def test_show_on_login_page(app, oidc_provider): response = app.get('/login/') - assert 'oidc-a-oididp' in response.text + assert 'oidc-a-server' in response.text # do not show this provider on login page anymore oidc_provider.show = False @@ -602,7 +592,7 @@ def test_show_on_login_page(app, oidc_provider): get_providers.cache.clear() has_providers.cache.clear() response = app.get('/login/') - assert 'oidc-a-oididp' not in response.text + assert 'oidc-a-server' not in response.text def test_strategy_find_uuid(app, caplog, code, oidc_provider, oidc_provider_jwkset, simple_user): @@ -611,7 +601,6 @@ def test_strategy_find_uuid(app, caplog, code, oidc_provider, oidc_provider_jwks oidc_provider.strategy = oidc_provider.STRATEGY_FIND_UUID oidc_provider.save() - User = get_user_model() assert User.objects.count() == 1 response = app.get('/').maybe_follow() @@ -654,7 +643,6 @@ def test_strategy_create(app, caplog, code, oidc_provider, oidc_provider_jwkset) oidc_provider.ou.email_is_unique = True oidc_provider.ou.save() - User = get_user_model() User.objects.all().delete() response = app.get('/').maybe_follow() @@ -729,7 +717,6 @@ def test_invalid_kid(app, caplog, code, oidc_provider_rsa, # no mapping please OIDCClaimMapping.objects.all().delete() - User = get_user_model() assert User.objects.count() == 1 response = app.get('/').maybe_follow() @@ -802,7 +789,6 @@ def test_templated_claim_mapping(app, caplog, code, oidc_provider, oidc_provider ) oidc_provider.save() - User = get_user_model() assert User.objects.count() == 0 response = app.get('/').maybe_follow() @@ -846,3 +832,16 @@ def test_lost_state(app, caplog, code, oidc_provider, oidc_provider_jwkset, hook assert '_auth_user_id' not in app.session # we are automatically redirected to our destination assert response.location == '/accounts/oidc/login/%s/?next=/whatever/' % oidc_provider.pk + + +def test_multiple_accounts(db): + user1 = User.objects.create() + user2 = User.objects.create() + provider1 = make_oidc_provider(name='Provider1') + provider2 = make_oidc_provider(name='Provider2') + OIDCAccount.objects.create(user=user1, provider=provider1, sub='1234') + with pytest.raises(IntegrityError): + with transaction.atomic(): + OIDCAccount.objects.create(user=user1, provider=provider2, sub='4567') + OIDCAccount.objects.create(user=user2, provider=provider2, sub='1234') +