auth_oidc: make account unique on (provider, user) and (provider, sub) (#48174)
This commit is contained in:
parent
0dac935c96
commit
5d28c9034c
|
@ -0,0 +1,25 @@
|
|||
# Generated by Django 2.2.17 on 2020-11-02 10:42
|
||||
|
||||
from django.conf import settings
|
||||
from django.db import migrations, models
|
||||
import django.db.models.deletion
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
|
||||
('authentic2_auth_oidc', '0007_auto_20200317_1732'),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.AlterField(
|
||||
model_name='oidcaccount',
|
||||
name='sub',
|
||||
field=models.CharField(max_length=256, verbose_name='sub'),
|
||||
),
|
||||
migrations.AlterUniqueTogether(
|
||||
name='oidcaccount',
|
||||
unique_together={('provider', 'sub')},
|
||||
),
|
||||
]
|
|
@ -259,8 +259,7 @@ class OIDCAccount(models.Model):
|
|||
on_delete=models.CASCADE)
|
||||
sub = models.CharField(
|
||||
verbose_name=_('sub'),
|
||||
max_length=256,
|
||||
unique=True)
|
||||
max_length=256)
|
||||
|
||||
def __str__(self):
|
||||
return u'{0} on {1} linked to {2}'.format(self.sub, self.provider and self.provider.issuer,
|
||||
|
@ -268,3 +267,8 @@ class OIDCAccount(models.Model):
|
|||
|
||||
def __repr__(self):
|
||||
return '<OIDCAccount %r on %r>' % (self.sub, self.provider and self.provider.issuer)
|
||||
|
||||
class Meta:
|
||||
unique_together = [
|
||||
('provider', 'sub'),
|
||||
]
|
||||
|
|
|
@ -31,6 +31,7 @@ from jwcrypto.jwt import JWT
|
|||
from httmock import urlmatch, HTTMock
|
||||
|
||||
from django.contrib.auth import get_user_model
|
||||
from django.db import IntegrityError, transaction
|
||||
from django.urls import reverse
|
||||
from django.utils.encoding import force_text, force_str
|
||||
from django.http import QueryDict
|
||||
|
@ -43,7 +44,7 @@ from django_rbac.utils import get_ou_model
|
|||
from authentic2_auth_oidc.utils import (
|
||||
parse_id_token, IDToken, get_providers, has_providers, register_issuer,
|
||||
IDTokenError)
|
||||
from authentic2_auth_oidc.models import OIDCProvider, OIDCClaimMapping
|
||||
from authentic2_auth_oidc.models import OIDCProvider, OIDCClaimMapping, OIDCAccount
|
||||
from authentic2.models import Attribute
|
||||
from authentic2.models import AttributeValue
|
||||
from authentic2.utils import last_authentication_event
|
||||
|
@ -53,6 +54,8 @@ from . import utils
|
|||
|
||||
pytestmark = pytest.mark.django_db
|
||||
|
||||
User = get_user_model()
|
||||
|
||||
|
||||
def test_base64url_decode():
|
||||
with pytest.raises(ValueError):
|
||||
|
@ -152,25 +155,29 @@ def oidc_provider_rsa(request, db, oidc_provider_jwkset):
|
|||
|
||||
|
||||
def make_oidc_provider(
|
||||
name='Server',
|
||||
slug=None,
|
||||
issuer=None,
|
||||
max_auth_age=10,
|
||||
strategy=OIDCProvider.STRATEGY_CREATE,
|
||||
idtoken_algo=OIDCProvider._meta.get_field('idtoken_algo').default,
|
||||
jwkset=None,
|
||||
claims_parameter_supported=False):
|
||||
|
||||
if jwkset is not None:
|
||||
jwkset = json.loads(jwkset.export())
|
||||
|
||||
slug = slug or name.lower()
|
||||
issuer = issuer or ('https://%s.example.com' % slug)
|
||||
jwkset = json.loads(jwkset.export()) if jwkset else None
|
||||
provider = OIDCProvider.objects.create(
|
||||
ou=get_default_ou(),
|
||||
name='OIDIDP',
|
||||
slug='oididp',
|
||||
issuer='http://server.example.com',
|
||||
authorization_endpoint='https://server.example.com/authorize',
|
||||
token_endpoint='https://server.example.com/token',
|
||||
end_session_endpoint='https://server.example.com/logout',
|
||||
userinfo_endpoint='https://server.example.com/user_info',
|
||||
token_revocation_endpoint='https://server.example.com/revoke',
|
||||
max_auth_age=10,
|
||||
strategy=OIDCProvider.STRATEGY_CREATE,
|
||||
name=name,
|
||||
slug=slug,
|
||||
issuer=issuer,
|
||||
authorization_endpoint='%s/authorize' % issuer,
|
||||
token_endpoint='%s/token' % issuer,
|
||||
end_session_endpoint='%s/logout' % issuer,
|
||||
userinfo_endpoint='%s/user_info' % issuer,
|
||||
token_revocation_endpoint='%s/revoke' % issuer,
|
||||
max_auth_age=max_auth_age,
|
||||
strategy=strategy,
|
||||
jwkset_json=jwkset,
|
||||
idtoken_algo=idtoken_algo,
|
||||
claims_parameter_supported=claims_parameter_supported,
|
||||
|
@ -350,7 +357,7 @@ def login_callback_url(oidc_provider):
|
|||
def test_providers_on_login_page(oidc_provider, app):
|
||||
response = app.get('/login/')
|
||||
# two frontends should be present on login page
|
||||
assert response.pyquery('p#oidc-p-oididp')
|
||||
assert response.pyquery('p#oidc-p-server')
|
||||
OIDCProvider.objects.create(
|
||||
id=2,
|
||||
ou=get_default_ou(),
|
||||
|
@ -370,31 +377,15 @@ def test_providers_on_login_page(oidc_provider, app):
|
|||
)
|
||||
|
||||
response = app.get('/login/')
|
||||
assert response.pyquery('p#oidc-p-oididp')
|
||||
assert response.pyquery('p#oidc-p-server')
|
||||
assert response.pyquery('p#oidc-p-oidcidp-2')
|
||||
|
||||
|
||||
def test_login_with_conditional_authenticators(oidc_provider, app, settings, caplog):
|
||||
OIDCProvider.objects.create(
|
||||
id=2,
|
||||
ou=get_default_ou(),
|
||||
name='My IDP',
|
||||
slug='myidp',
|
||||
issuer='https://idp2.example.com/',
|
||||
authorization_endpoint='https://idp2.example.com/authorize',
|
||||
token_endpoint='https://idp2.example.com/token',
|
||||
end_session_endpoint='https://idp2.example.com/logout',
|
||||
userinfo_endpoint='https://idp*é.example.com/user_info',
|
||||
token_revocation_endpoint='https://idp2.example.com/revoke',
|
||||
max_auth_age=10,
|
||||
strategy=OIDCProvider.STRATEGY_CREATE,
|
||||
jwkset_json=None,
|
||||
idtoken_algo=OIDCProvider.ALGO_RSA,
|
||||
claims_parameter_supported=False
|
||||
)
|
||||
make_oidc_provider(name='My IDP', slug='myidp')
|
||||
response = app.get('/login/')
|
||||
assert 'My IDP' in response
|
||||
assert 'OIDIDP' in response
|
||||
assert 'Server' in response
|
||||
|
||||
settings.AUTH_FRONTENDS_KWARGS = {
|
||||
'oidc': {
|
||||
|
@ -404,31 +395,31 @@ def test_login_with_conditional_authenticators(oidc_provider, app, settings, cap
|
|||
}
|
||||
}
|
||||
response = app.get('/login/')
|
||||
assert 'OIDIDP' in response
|
||||
assert 'Server' in response
|
||||
assert 'My IDP' not in response
|
||||
|
||||
settings.AUTH_FRONTENDS_KWARGS = {
|
||||
'oidc': {
|
||||
'show_condition': {
|
||||
'myid': 'remote_addr==\'0.0.0.0\'',
|
||||
'oididp': 'remote_addr==\'127.0.0.1\''
|
||||
'server': 'remote_addr==\'127.0.0.1\''
|
||||
}
|
||||
}
|
||||
}
|
||||
response = app.get('/login/')
|
||||
assert 'OIDIDP' in response
|
||||
assert 'Server' in response
|
||||
assert 'My IDP' in response
|
||||
|
||||
settings.AUTH_FRONTENDS_KWARGS = {
|
||||
'oidc': {
|
||||
'show_condition': {
|
||||
'myidp': 'remote_addr==\'0.0.0.0\'',
|
||||
'oididp': 'remote_addr==\'127.0.0.1\''
|
||||
'server': 'remote_addr==\'127.0.0.1\''
|
||||
}
|
||||
}
|
||||
}
|
||||
response = app.get('/login/')
|
||||
assert 'OIDIDP' in response
|
||||
assert 'Server' in response
|
||||
assert 'My IDP' not in response
|
||||
|
||||
settings.AUTH_FRONTENDS_KWARGS = {
|
||||
|
@ -437,19 +428,19 @@ def test_login_with_conditional_authenticators(oidc_provider, app, settings, cap
|
|||
}
|
||||
}
|
||||
response = app.get('/login/')
|
||||
assert 'OIDIDP' in response
|
||||
assert 'Server' in response
|
||||
assert 'My IDP' in response
|
||||
|
||||
settings.AUTH_FRONTENDS_KWARGS = {
|
||||
'oidc': {
|
||||
'show_condition': {
|
||||
'myidp': 'remote_addr==\'127.0.0.1\' and \'backoffice\' not in login_hint',
|
||||
'oididp': '\'backoffice\' in login_hint',
|
||||
'server': '\'backoffice\' in login_hint',
|
||||
}
|
||||
}
|
||||
}
|
||||
response = app.get('/login/')
|
||||
assert 'OIDIDP' not in response
|
||||
assert 'Server' not in response
|
||||
assert 'My IDP' in response
|
||||
|
||||
# As we do not create a session on each access to the login page, we need
|
||||
|
@ -462,13 +453,13 @@ def test_login_with_conditional_authenticators(oidc_provider, app, settings, cap
|
|||
app.set_cookie(force_str(settings.SESSION_COOKIE_NAME), force_str(session.session_key))
|
||||
|
||||
response = app.get('/login/')
|
||||
assert 'OIDIDP' in response
|
||||
assert 'Server' in response
|
||||
assert 'My IDP' not in response
|
||||
|
||||
|
||||
def test_login_autorun(oidc_provider, app, settings):
|
||||
response = app.get('/login/')
|
||||
assert 'OIDIDP' in response
|
||||
assert 'Server' in response
|
||||
|
||||
# hide password block
|
||||
settings.AUTH_FRONTENDS_KWARGS = {'password': {'show_condition': 'remote_addr==\'0.0.0.0\''}}
|
||||
|
@ -504,7 +495,6 @@ def test_sso(app, caplog, code, oidc_provider, oidc_provider_jwkset, hooks):
|
|||
assert claims['userinfo']['family_name']['essential']
|
||||
assert claims['userinfo']['ou'] is None
|
||||
|
||||
User = get_user_model()
|
||||
assert User.objects.count() == 0
|
||||
|
||||
with utils.check_log(caplog, 'failed to contact the token_endpoint'):
|
||||
|
@ -592,7 +582,7 @@ def test_sso(app, caplog, code, oidc_provider, oidc_provider_jwkset, hooks):
|
|||
|
||||
def test_show_on_login_page(app, oidc_provider):
|
||||
response = app.get('/login/')
|
||||
assert 'oidc-a-oididp' in response.text
|
||||
assert 'oidc-a-server' in response.text
|
||||
|
||||
# do not show this provider on login page anymore
|
||||
oidc_provider.show = False
|
||||
|
@ -602,7 +592,7 @@ def test_show_on_login_page(app, oidc_provider):
|
|||
get_providers.cache.clear()
|
||||
has_providers.cache.clear()
|
||||
response = app.get('/login/')
|
||||
assert 'oidc-a-oididp' not in response.text
|
||||
assert 'oidc-a-server' not in response.text
|
||||
|
||||
|
||||
def test_strategy_find_uuid(app, caplog, code, oidc_provider, oidc_provider_jwkset, simple_user):
|
||||
|
@ -611,7 +601,6 @@ def test_strategy_find_uuid(app, caplog, code, oidc_provider, oidc_provider_jwks
|
|||
oidc_provider.strategy = oidc_provider.STRATEGY_FIND_UUID
|
||||
oidc_provider.save()
|
||||
|
||||
User = get_user_model()
|
||||
assert User.objects.count() == 1
|
||||
|
||||
response = app.get('/').maybe_follow()
|
||||
|
@ -654,7 +643,6 @@ def test_strategy_create(app, caplog, code, oidc_provider, oidc_provider_jwkset)
|
|||
oidc_provider.ou.email_is_unique = True
|
||||
oidc_provider.ou.save()
|
||||
|
||||
User = get_user_model()
|
||||
User.objects.all().delete()
|
||||
|
||||
response = app.get('/').maybe_follow()
|
||||
|
@ -729,7 +717,6 @@ def test_invalid_kid(app, caplog, code, oidc_provider_rsa,
|
|||
# no mapping please
|
||||
OIDCClaimMapping.objects.all().delete()
|
||||
|
||||
User = get_user_model()
|
||||
assert User.objects.count() == 1
|
||||
|
||||
response = app.get('/').maybe_follow()
|
||||
|
@ -802,7 +789,6 @@ def test_templated_claim_mapping(app, caplog, code, oidc_provider, oidc_provider
|
|||
)
|
||||
oidc_provider.save()
|
||||
|
||||
User = get_user_model()
|
||||
assert User.objects.count() == 0
|
||||
|
||||
response = app.get('/').maybe_follow()
|
||||
|
@ -846,3 +832,16 @@ def test_lost_state(app, caplog, code, oidc_provider, oidc_provider_jwkset, hook
|
|||
assert '_auth_user_id' not in app.session
|
||||
# we are automatically redirected to our destination
|
||||
assert response.location == '/accounts/oidc/login/%s/?next=/whatever/' % oidc_provider.pk
|
||||
|
||||
|
||||
def test_multiple_accounts(db):
|
||||
user1 = User.objects.create()
|
||||
user2 = User.objects.create()
|
||||
provider1 = make_oidc_provider(name='Provider1')
|
||||
provider2 = make_oidc_provider(name='Provider2')
|
||||
OIDCAccount.objects.create(user=user1, provider=provider1, sub='1234')
|
||||
with pytest.raises(IntegrityError):
|
||||
with transaction.atomic():
|
||||
OIDCAccount.objects.create(user=user1, provider=provider2, sub='4567')
|
||||
OIDCAccount.objects.create(user=user2, provider=provider2, sub='1234')
|
||||
|
||||
|
|
Loading…
Reference in New Issue