add OpenID Connect IdP plugin (fixes #6982)

You must set a valid RSA JWK in a JWKSet in the setting key A2_IDP_OIDC_JWKSET
or only use HMAC signature for your clients.
This commit is contained in:
Benjamin Dauvergne 2016-11-02 16:43:52 +01:00
parent 59b9732551
commit 5009b6eb8a
13 changed files with 1398 additions and 0 deletions

View File

@ -22,6 +22,7 @@ recursive-include src/authentic2/auth2_auth/auth2_oath/templates *.html *.txt *.
recursive-include src/authentic2/manager/templates *.html
recursive-include src/authentic2_auth_saml/templates/authentic2_auth_saml *.html
recursive-include src/authentic2_auth_oidc/templates/authentic2_auth_oidc *.html
recursive-include src/authentic2_idp_oidc/templates/authentic2_idp_oidc *.html
recursive-include src/authentic2/vendor/totp_js/js *.js
recursive-include src/authentic2/saml/fixtures *.json
@ -41,6 +42,7 @@ recursive-include src/authentic2/a2_rbac/locale *.po *.mo
recursive-include src/django_rbac/locale *.po *.mo
recursive-include src/authentic2_auth_saml/locale *.po *.mo
recursive-include src/authentic2_auth_oidc/locale *.po *.mo
recursive-include src/authentic2_idp_oidc/locale *.po *.mo
recursive-include src/authentic2 README xrds.xml *.txt yadis.xrdf
recursive-include src/authentic2_provisionning_ldap/tests *.ldif

View File

@ -164,6 +164,7 @@ setup(name="authentic2",
'authentic2-idp-saml2 = authentic2.idp.saml:Plugin',
'authentic2-idp-openid = authentic2_idp_openid:Plugin',
'authentic2-idp-cas = authentic2_idp_cas:Plugin',
'authentic2-idp-oidc = authentic2_idp_oidc:Plugin',
'authentic2-provisionning-ldap = authentic2_provisionning_ldap:Plugin',
],
})

View File

@ -0,0 +1,22 @@
from django.utils.translation import ugettext_lazy as _
class Plugin(object):
def get_before_urls(self):
from . import urls
return urls.urlpatterns
def get_apps(self):
return [__name__]
def redirect_logout_list(self, request, next=None):
return []
def get_admin_modules(self):
from admin_tools.dashboard import modules
return [modules.ModelList(
_('OpenID Connect authentication'),
models=(
'authentic2_idp_oidc.models.*',
),
)]

View File

@ -0,0 +1,43 @@
from django.contrib import admin
from . import models
class OIDCClientAdmin(admin.ModelAdmin):
list_display = ['name', 'slug', 'client_id', 'ou', 'identifier_policy', 'created', 'modified']
list_filter = ['ou', 'identifier_policy']
date_hierarchy = 'modified'
readonly_fields = ['created', 'modified']
class OIDCAuthorizationAdmin(admin.ModelAdmin):
list_display = ['client', 'user', 'created', 'expired']
list_filter = ['client']
search_fields = ['user__first_name', 'user__last_name', 'user__email', 'user__username',
'client__name']
date_hierarchy = 'created'
readonly_fields = ['created', 'expired']
class OIDCCodeAdmin(admin.ModelAdmin):
list_display = ['client', 'user', 'uuid', 'created', 'expired']
list_filter = ['client']
search_fields = ['user__first_name', 'user__last_name', 'user__email', 'user__username',
'client__name']
date_hierarchy = 'created'
readonly_fields = ['uuid', 'created', 'expired', 'user', 'uuid', 'client', 'state', 'nonce']
class OIDCAccessTokenAdmin(admin.ModelAdmin):
list_display = ['client', 'user', 'uuid', 'created', 'expired']
list_filter = ['client']
search_fields = ['user__first_name', 'user__last_name', 'user__email', 'user__username',
'client__name']
date_hierarchy = 'created'
readonly_fields = ['uuid', 'created', 'expired']
admin.site.register(models.OIDCClient, OIDCClientAdmin)
admin.site.register(models.OIDCAuthorization, OIDCAuthorizationAdmin)
admin.site.register(models.OIDCCode, OIDCCodeAdmin)
admin.site.register(models.OIDCAccessToken, OIDCAccessTokenAdmin)

View File

@ -0,0 +1,29 @@
class AppSettings(object):
'''Thanks django-allauth'''
__SENTINEL = object()
def __init__(self, prefix):
self.prefix = prefix
def _setting(self, name, dflt=__SENTINEL):
from django.conf import settings
from django.core.exceptions import ImproperlyConfigured
v = getattr(settings, self.prefix + name, dflt)
if v is self.__SENTINEL:
raise ImproperlyConfigured('Missing setting %r' % (self.prefix + name))
return v
@property
def ENABLE(self):
return self._setting('ENABLE', True)
@property
def JWKSET(self):
return self._setting('JWKSET', [])
import sys
app_settings = AppSettings('A2_IDP_OIDC_')
app_settings.__name__ = __name__
sys.modules[__name__] = app_settings

View File

@ -0,0 +1,90 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import migrations, models
from django.conf import settings
import authentic2_idp_oidc.models
class Migration(migrations.Migration):
dependencies = [
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
('authentic2', '0016_attribute_disabled'),
]
operations = [
migrations.CreateModel(
name='OIDCAccessToken',
fields=[
('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)),
('uuid', models.CharField(default=authentic2_idp_oidc.models.generate_uuid, max_length=128, verbose_name='uuid')),
('scopes', models.TextField(verbose_name='scopes')),
('session_key', models.CharField(max_length=128, verbose_name='session key')),
('created', models.DateTimeField(auto_now_add=True, verbose_name='created')),
('expired', models.DateTimeField(verbose_name='expire')),
],
),
migrations.CreateModel(
name='OIDCAuthorization',
fields=[
('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)),
('scopes', models.TextField(verbose_name='scopes')),
('created', models.DateTimeField(auto_now_add=True, verbose_name='created')),
('expired', models.DateTimeField(verbose_name='expire')),
],
),
migrations.CreateModel(
name='OIDCClient',
fields=[
('service_ptr', models.OneToOneField(parent_link=True, auto_created=True, primary_key=True, serialize=False, to='authentic2.Service')),
('client_id', models.CharField(default=authentic2_idp_oidc.models.generate_uuid, unique=True, max_length=255, verbose_name='client id')),
('client_secret', models.CharField(default=authentic2_idp_oidc.models.generate_uuid, max_length=255, verbose_name='client secret')),
('authorization_flow', models.PositiveIntegerField(default=1, verbose_name='authorization flow', choices=[(1, 'authorization code'), (2, 'implicit/native')])),
('redirect_uris', models.TextField(verbose_name='redirect URIs', validators=[authentic2_idp_oidc.models.validate_https_url])),
('sector_identifier_uri', models.URLField(verbose_name='sector identifier URI', blank=True)),
('identifier_policy', models.PositiveIntegerField(default=2, verbose_name='identifier policy', choices=[(1, 'uuid'), (2, 'pairwise'), (3, 'email')])),
('idtoken_algo', models.PositiveIntegerField(default=1, verbose_name='IDToken signature algorithm', choices=[(1, 'RSA'), (2, 'HMAC')])),
('created', models.DateTimeField(auto_now_add=True, verbose_name='created')),
('modified', models.DateTimeField(auto_now=True, verbose_name='modified')),
],
bases=('authentic2.service',),
),
migrations.CreateModel(
name='OIDCCode',
fields=[
('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)),
('uuid', models.CharField(default=authentic2_idp_oidc.models.generate_uuid, max_length=128, verbose_name='uuid')),
('scopes', models.TextField(verbose_name='scopes')),
('state', models.TextField(verbose_name='state')),
('nonce', models.TextField(verbose_name='nonce')),
('redirect_uri', models.URLField(verbose_name='redirect URI')),
('session_key', models.CharField(max_length=128, verbose_name='session key')),
('auth_time', models.DateTimeField(verbose_name='auth time')),
('created', models.DateTimeField(auto_now_add=True, verbose_name='created')),
('expired', models.DateTimeField(verbose_name='expire')),
('client', models.ForeignKey(verbose_name='client', to='authentic2_idp_oidc.OIDCClient')),
('user', models.ForeignKey(verbose_name='user', to=settings.AUTH_USER_MODEL)),
],
),
migrations.AddField(
model_name='oidcauthorization',
name='client',
field=models.ForeignKey(verbose_name='client', to='authentic2_idp_oidc.OIDCClient'),
),
migrations.AddField(
model_name='oidcauthorization',
name='user',
field=models.ForeignKey(verbose_name='user', to=settings.AUTH_USER_MODEL),
),
migrations.AddField(
model_name='oidcaccesstoken',
name='client',
field=models.ForeignKey(verbose_name='client', to='authentic2_idp_oidc.OIDCClient'),
),
migrations.AddField(
model_name='oidcaccesstoken',
name='user',
field=models.ForeignKey(verbose_name='user', to=settings.AUTH_USER_MODEL),
),
]

View File

@ -0,0 +1,229 @@
import uuid
from importlib import import_module
from django.db import models
from django.core.validators import URLValidator
from django.core.exceptions import ValidationError
from django.utils.translation import ugettext_lazy as _
from django.conf import settings
from django.utils.timezone import now
from authentic2.models import Service
from . import utils
def generate_uuid():
return unicode(uuid.uuid4())
def validate_https_url(data):
errors = []
for url in data.split():
try:
URLValidator(schemes=['https'])(url)
except ValidationError as e:
errors.append(e)
if errors:
raise ValidationError(errors)
def strip_words(data):
return u' '.join([url for url in data.split()])
class OIDCClient(Service):
POLICY_UUID = 1
POLICY_PAIRWISE = 2
POLICY_EMAIL = 3
IDENTIFIER_POLICIES = [
(POLICY_UUID, _('uuid')),
(POLICY_PAIRWISE, _('pairwise')),
(POLICY_EMAIL, _('email')),
]
ALGO_RSA = 1
ALGO_HMAC = 2
ALGO_CHOICES = [
(ALGO_RSA, _('RSA')),
(ALGO_HMAC, _('HMAC')),
]
FLOW_AUTHORIZATION_CODE = 1
FLOW_IMPLICIT = 2
FLOW_CHOICES = [
(FLOW_AUTHORIZATION_CODE, _('authorization code')),
(FLOW_IMPLICIT, _('implicit/native')),
]
client_id = models.CharField(
max_length=255,
verbose_name=_('client id'),
unique=True,
default=generate_uuid)
client_secret = models.CharField(
max_length=255,
verbose_name=_('client secret'),
default=generate_uuid)
authorization_flow = models.PositiveIntegerField(
verbose_name=_('authorization flow'),
default=FLOW_AUTHORIZATION_CODE,
choices=FLOW_CHOICES)
redirect_uris = models.TextField(
verbose_name=_('redirect URIs'),
validators=[validate_https_url])
sector_identifier_uri = models.URLField(
verbose_name=_('sector identifier URI'),
blank=True)
identifier_policy = models.PositiveIntegerField(
verbose_name=_('identifier policy'),
default=POLICY_PAIRWISE,
choices=IDENTIFIER_POLICIES)
idtoken_algo = models.PositiveIntegerField(
default=ALGO_RSA,
choices=ALGO_CHOICES,
verbose_name=_('IDToken signature algorithm'))
# metadata
created = models.DateTimeField(
verbose_name=_('created'),
auto_now_add=True)
modified = models.DateTimeField(
verbose_name=_('modified'),
auto_now=True)
def clean(self):
self.redirect_uris = strip_words(self.redirect_uris)
def __repr__(self):
return ('<OIDCClient name:%r client_id:%s identifier_policy:%s>' %
(self.name, self.client_id, self.get_identifier_policy_display()))
class OIDCAuthorization(models.Model):
client = models.ForeignKey(
to=OIDCClient,
verbose_name=_('client'))
user = models.ForeignKey(
to=settings.AUTH_USER_MODEL,
verbose_name=_('user'))
scopes = models.TextField(
blank=False,
verbose_name=_('scopes'))
# metadata
created = models.DateTimeField(
verbose_name=_('created'),
auto_now_add=True)
expired = models.DateTimeField(
verbose_name=_('expire'))
def scope_set(self):
return utils.scope_set(self.scopes)
def __repr__(self):
return '<OIDCAuthorization client:%r user:%r scopes:%r>' % (
self.client_id and unicode(self.client),
self.user_id and unicode(self.user),
self.scopes)
class OIDCCode(models.Model):
uuid = models.CharField(
max_length=128,
verbose_name=_('uuid'),
default=generate_uuid)
client = models.ForeignKey(
to=OIDCClient,
verbose_name=_('client'))
user = models.ForeignKey(
to=settings.AUTH_USER_MODEL,
verbose_name=_('user'))
scopes = models.TextField(
verbose_name=_('scopes'))
state = models.TextField(
verbose_name=_('state'))
nonce = models.TextField(
verbose_name=_('nonce'))
redirect_uri = models.URLField(
verbose_name=_('redirect URI'))
session_key = models.CharField(
verbose_name=_('session key'),
max_length=128)
auth_time = models.DateTimeField(
verbose_name=_('auth time'))
# metadata
created = models.DateTimeField(
verbose_name=_('created'),
auto_now_add=True)
expired = models.DateTimeField(
verbose_name=_('expire'))
@property
def session(self):
if not hasattr(self, '_session'):
engine = import_module(settings.SESSION_ENGINE)
session = engine.SessionStore(session_key=self.session_key)
if session._session_key == self.session_key:
self._session = session
return getattr(self, '_session', None)
def scope_set(self):
return utils.scope_set(self.scopes)
def __repr__(self):
return '<OIDCAccessToken uuid:%s client:%s user:%s expired:%s scopes:%s>' % (
self.uuid,
self.client_id and unicode(self.client),
self.user_id and unicode(self.user),
self.expired,
self.scopes)
class OIDCAccessToken(models.Model):
uuid = models.CharField(
max_length=128,
verbose_name=_('uuid'),
default=generate_uuid)
client = models.ForeignKey(
to=OIDCClient,
verbose_name=_('client'))
user = models.ForeignKey(
to=settings.AUTH_USER_MODEL,
verbose_name=_('user'))
scopes = models.TextField(
verbose_name=_('scopes'))
session_key = models.CharField(
verbose_name=_('session key'),
max_length=128)
# metadata
created = models.DateTimeField(
verbose_name=_('created'),
auto_now_add=True)
expired = models.DateTimeField(
verbose_name=_('expire'))
def scope_set(self):
return utils.scope_set(self.scopes)
@property
def session(self):
if not hasattr(self, '_session'):
engine = import_module(settings.SESSION_ENGINE)
session = engine.SessionStore(session_key=self.session_key)
if session._session_key == self.session_key:
self._session = session
return getattr(self, '_session', None)
def is_valid(self):
return self.expired >= now() and self.session is not None
def __repr__(self):
return '<OIDCAccessToken uuid:%s client:%s user:%s expired:%s scopes:%s>' % (
self.uuid,
self.client_id and unicode(self.client),
self.user_id and unicode(self.user),
self.expired,
self.scopes)

View File

@ -0,0 +1,24 @@
from django.conf.urls import patterns, url
from . import views
__patterns = [
url(r'^.well-known/openid-configuration$',
views.openid_configuration,
name='oidc-openid-configuration'),
url(r'^idp/oidc/certs/$',
views.certs,
name='oidc-certs'),
url(r'^idp/oidc/authorize/$',
views.authorize,
name='oidc-authorize'),
url(r'^idp/oidc/token/$',
views.token,
name='oidc-token'),
url(r'^idp/oidc/user_info/$',
views.user_info,
name='oidc-user-info'),
]
urlpatterns = patterns('', *__patterns)

View File

@ -0,0 +1,96 @@
import json
import hashlib
import urlparse
import base64
from jwcrypto.jwk import JWK, JWKSet, InvalidJWKValue
from jwcrypto.jwt import JWT
from django.core.exceptions import ImproperlyConfigured
from django.conf import settings
from . import app_settings
def get_jwkset():
try:
jwkset = json.dumps(app_settings.JWKSET)
except Exception as e:
raise ImproperlyConfigured('invalid setting A2_IDP_OIDC_JWKSET: %s' % e)
try:
jwkset = JWKSet.from_json(jwkset)
except InvalidJWKValue as e:
raise ImproperlyConfigured('invalid setting A2_IDP_OIDC_JWKSET: %s' % e)
if len(jwkset['keys']) < 1:
raise ImproperlyConfigured('empty A2_IDP_OIDC_JWKSET')
return jwkset
def get_first_rsa_sig_key():
for key in get_jwkset()['keys']:
if key._params['kty'] != 'RSA':
continue
use = key._params.get('use')
if use is None or use == 'sig':
return key
return None
def make_idtoken(client, claims):
'''Make a serialized JWT targeted for this client'''
if client.idtoken_algo == client.ALGO_HMAC:
header = {'alg': 'HS256'}
jwk = JWK(kty='oct', k=client.client_secret)
elif client.idtoken_algo == client.ALGO_RSA:
header = {'alg': 'RS256'}
jwk = get_first_rsa_sig_key()
header['kid'] = jwk.key_id
if jwk is None:
raise ImproperlyConfigured('no RSA key for signature operation in A2_IDP_OIDC_JWKSET')
else:
raise NotImplementedError
jwt = JWT(header=header, claims=claims)
jwt.make_signed_token(jwk)
return jwt.serialize()
def scope_set(data):
'''Convert a scope string into a set of scopes'''
return set([scope.strip() for scope in data.split()])
def clean_words(data):
'''Clean and order a list of words'''
return u' '.join(sorted(map(unicode.strip, data.split())))
def url_domain(url):
return urlparse.urlparse(url).netloc.split(':')[0]
def make_sub(client, user):
if client.identifier_policy == client.POLICY_PAIRWISE:
return make_pairwise_sub(client, user)
elif client.identifier_policy == client.POLICY_UUID:
return unicode(user.uuid)
elif client.identifier_policy == client.POLICY_EMAIL:
return user.email
else:
raise NotImplementedError
def make_pairwise_sub(client, user):
'''Make a pairwise sub'''
sector_identifier = None
if client.sector_identifier_uri:
sector_identifier = url_domain(client.sector_identifier_uri)
else:
for redirect_uri in client.redirect_uris.split():
hostname = urlparse.urlparse(redirect_uri).netloc.split(':')[0]
if sector_identifier is None:
sector_identifier = hostname
elif sector_identifier != hostname:
raise ImproperlyConfigured('all redirect_uri do not have the same hostname')
sub = sector_identifier + str(user.uuid) + settings.SECRET_KEY
sub = base64.b64encode(hashlib.sha256(sub).digest())
return sub

View File

@ -0,0 +1,384 @@
import logging
import datetime
import json
import base64
import time
from django.http import HttpResponse, HttpResponseBadRequest, HttpResponseNotAllowed
from django.utils.timezone import now, UTC
from django.utils.http import urlencode
from django.shortcuts import render
from django.views.decorators.csrf import csrf_exempt
from django.core.urlresolvers import reverse
from authentic2.decorators import setting_enabled
from authentic2.utils import (login_require, redirect, timestamp_from_datetime,
last_authentication_event)
from . import app_settings, models, utils
@setting_enabled('ENABLE', settings=app_settings)
def openid_configuration(request, *args, **kwargs):
metadata = {
'issuer': request.build_absolute_uri(''),
'authorization_endpoint': request.build_absolute_uri(reverse('oidc-authorize')),
'token_endpoint': request.build_absolute_uri(reverse('oidc-token')),
'jwks_uri': request.build_absolute_uri(reverse('oidc-certs')),
'response_types_supported': ['code'],
'subject_types_supported': ['public', 'pairwise'],
'id_token_signing_alg_values_supported': [
'RS256', 'HS256',
],
'userinfo_endpoint': request.build_absolute_uri(reverse('oidc-user-info')),
}
return HttpResponse(json.dumps(metadata), content_type='application/json')
@setting_enabled('ENABLE', settings=app_settings)
def certs(request, *args, **kwargs):
return HttpResponse(utils.get_jwkset().export(private_keys=False),
content_type='application/json')
def authorization_error(request, redirect_uri, error, error_description=None, error_uri=None,
state=None, fragment=False):
params = {
'error': error,
}
if error_description:
params['error_description'] = error_description
if error_uri:
params['error_uri'] = error_uri
if state:
params['state'] = state
if fragment:
return redirect(request, redirect_uri + '#%s' % urlencode(params), resolve=False)
else:
return redirect(request, redirect_uri, params=params, resolve=False)
@setting_enabled('ENABLE', settings=app_settings)
def authorize(request, *args, **kwargs):
logger = logging.getLogger(__name__)
start = now()
try:
client_id = request.GET['client_id']
redirect_uri = request.GET['redirect_uri']
except KeyError as k:
return HttpResponseBadRequest('invalid request: missing parameter %s' % k.args[0])
try:
client = models.OIDCClient.objects.get(client_id=client_id)
except models.OIDCClient.DoesNotExist:
return HttpResponseBadRequest('invalid request: unknown client_id')
fragment = client.authorization_flow == client.FLOW_IMPLICIT
state = request.GET.get('state', '')
try:
response_type = request.GET['response_type']
scope = request.GET['scope']
except KeyError as k:
return authorization_error(request, redirect_uri, 'invalid_request',
state=state,
error_description='missing parameter %s' % k.args[0],
fragment=fragment)
prompt = set(filter(None, request.GET.get('prompt', '').split()))
nonce = request.GET.get('nonce', '')
scopes = utils.scope_set(scope)
max_age = request.GET.get('max_age')
if max_age:
try:
max_age = int(max_age)
if max_age < 0:
raise ValueError
except ValueError:
return authorization_error(request, redirect_uri, 'invalid_request',
error_description='max_age is not a positive integer',
state=state,
fragment=fragment)
if redirect_uri not in client.redirect_uris.split():
return authorization_error(request, redirect_uri, 'invalid_request',
error_description='unauthorized redirect_uri',
state=state,
fragment=fragment)
if client.authorization_flow == client.FLOW_AUTHORIZATION_CODE:
if response_type != 'code':
return authorization_error(request, redirect_uri, 'unsupported_response_type',
error_description='only code is supported',
state=state,
fragment=fragment)
elif client.authorization_flow == client.FLOW_IMPLICIT:
if not set(filter(None, response_type.split())) in (set(['id_token', 'token']),
set(['id_token'])):
return authorization_error(request, redirect_uri, 'unsupported_response_type',
error_description='only "id_token token" or "id_token" '
'are supported',
state=state,
fragment=fragment)
else:
raise NotImplementedError
if 'openid' not in scopes:
return authorization_error(request, redirect_uri, 'invalid_request',
error_description='openid scope is missing',
state=state,
fragment=fragment)
if not (scopes <= set(['openid', 'profile', 'email'])):
return authorization_error(request, redirect_uri, 'invalid_scope',
error_description='only openid, profile and email scopes are '
'supported',
state=state,
fragment=fragment)
# authentication canceled by user
if 'cancel' in request.GET:
logger.info(u'authentication canceled for service %s', client.name)
return authorization_error(request, redirect_uri, 'access_denied',
error_description='user did not authenticate',
state=state,
fragment=fragment)
if not request.user.is_authenticated() or 'login' in prompt:
if 'none' in prompt:
return authorization_error(request, redirect_uri, 'login_required',
error_description='login is required but prompt is none',
state=state,
fragment=fragment)
return login_require(request, params={'nonce': nonce})
last_auth = last_authentication_event(request.session)
if max_age is not None and time.time() - last_auth['when'] >= max_age:
if 'none' in prompt:
return authorization_error(request, redirect_uri, 'login_required',
error_description='login is required but prompt is none',
state=state,
fragment=fragment)
return login_require(request, params={'nonce': nonce})
qs = models.OIDCAuthorization.objects.filter(client=client, user=request.user)
if 'consent' in prompt:
# if consent is asked we delete existing authorizations
# it seems to be the safer option
qs.delete()
qs = models.OIDCAuthorization.objects.none()
else:
qs = qs.filter(expired__gte=start)
authorized_scopes = set()
for authorization in qs:
authorized_scopes |= authorization.scope_set()
if (authorized_scopes & scopes) < scopes:
if 'none' in prompt:
return authorization_error(request, redirect_uri, 'consent_required',
error_description='consent is required but prompt is none',
state=state,
fragment=fragment)
if request.method == 'POST':
if 'accept' in request.POST:
pk_to_deletes = []
for authorization in qs:
# clean obsolete authorizations
if authorization.scope_set() <= scopes:
pk_to_deletes.append(authorization.pk)
models.OIDCAuthorization.objects.create(
client=client, user=request.user, scopes=u' '.join(sorted(scopes)),
expired=start + datetime.timedelta(days=365))
if pk_to_deletes:
models.OIDCAuthorization.objects.filter(pk__in=pk_to_deletes).delete()
logger.info(u'authorized scopes %s for service %s', ' '.join(scopes),
client.name)
else:
logger.info(u'refused scopes %s for service %s', ' '.join(scopes),
client.name)
return authorization_error(request, redirect_uri, 'access_denied',
error_description='user denied access',
state=state,
fragment=fragment)
else:
return render(request, 'authentic2_idp_oidc/authorization.html',
{
'client': client,
'scopes': scopes,
})
if response_type == 'code':
code = models.OIDCCode.objects.create(
client=client, user=request.user, scopes=u' '.join(scopes),
state=state, nonce=nonce, redirect_uri=redirect_uri,
expired=start + datetime.timedelta(seconds=30),
auth_time=datetime.datetime.fromtimestamp(last_auth['when'], UTC()),
session_key=request.session.session_key)
logger.info(u'sending code %s for scopes %s for service %s',
code.uuid, ' '.join(scopes),
client.name)
return redirect(request, redirect_uri, params={
'code': unicode(code.uuid),
'state': state
}, resolve=False)
else:
# FIXME: we should probably factorize this part with the token endpoint similar code
need_access_token = 'token' in response_type.split()
expires_in = 3600 * 8
if need_access_token:
access_token = models.OIDCAccessToken.objects.create(
client=client,
user=request.user,
scopes=u' '.join(scopes),
session_key=request.session.session_key,
expired=start + datetime.timedelta(seconds=expires_in))
acr = 0
if nonce and last_auth.get('nonce') == nonce:
acr = 1
id_token = {
'iss': request.build_absolute_uri(''),
'sub': utils.make_sub(client, request.user),
'aud': client.client_id,
'exp': timestamp_from_datetime(
start + datetime.timedelta(seconds=30)),
'iat': timestamp_from_datetime(start),
'auth_time': last_auth['when'],
'acr': acr,
}
if nonce:
id_token['nonce'] = nonce
params = {
'id_token': utils.make_idtoken(client, id_token),
}
if state:
params['state'] = state
if need_access_token:
params.update({
'access_token': access_token.uuid,
'token_type': 'Bearer',
'expires_in': expires_in,
})
# query is transfered through the hashtag
return redirect(request, redirect_uri + '#%s' % urlencode(params), resolve=False)
def authenticate_client(request, client=None):
if 'HTTP_AUTHORIZATION' not in request.META:
return None
authorization = request.META['HTTP_AUTHORIZATION'].split()
if authorization[0] != 'Basic' or len(authorization) != 2:
return None
try:
decoded = base64.b64decode(authorization[1])
except TypeError:
return None
parts = decoded.split(':')
if len(parts) != 2:
return None
if not client:
try:
client = models.OIDCClient.objects.get(client_id=parts[0])
except models.OIDCClient.DoesNotExist:
return None
if client.client_secret != parts[1]:
return None
return client
def invalid_request(desc=None):
content = {
'error': 'invalid_request',
}
if desc:
content['desc'] = desc
return HttpResponseBadRequest(json.dumps(content))
@setting_enabled('ENABLE', settings=app_settings)
@csrf_exempt
def token(request, *args, **kwargs):
if request.method != 'POST':
return HttpResponseNotAllowed(['POST'])
grant_type = request.POST.get('grant_type')
if grant_type != 'authorization_code':
return invalid_request('grant_type is not authorization_code')
code = request.POST.get('code')
if code is None:
return invalid_request('missing code')
try:
oidc_code = models.OIDCCode.objects.select_related().get(uuid=code)
except models.OIDCCode.DoesNotExist:
return invalid_request('invalid code')
client = authenticate_client(request, client=oidc_code.client)
if client is None:
return HttpResponse('unauthenticated', status=401)
# delete immediately
models.OIDCCode.objects.filter(uuid=code).delete()
redirect_uri = request.POST.get('redirect_uri')
if oidc_code.redirect_uri != redirect_uri:
return invalid_request('invalid redirect_uri')
expires_in = 3600 * 8
access_token = models.OIDCAccessToken.objects.create(
client=client,
user=oidc_code.user,
scopes=oidc_code.scopes,
session_key=oidc_code.session_key,
expired=oidc_code.created + datetime.timedelta(seconds=expires_in))
start = now()
acr = 0
if (oidc_code.nonce and last_authentication_event(oidc_code.session).get('nonce') ==
oidc_code.nonce):
acr = 1
id_token = {
'iss': request.build_absolute_uri(''),
'sub': utils.make_sub(client, oidc_code.user),
'aud': client.client_id,
'exp': timestamp_from_datetime(
start + datetime.timedelta(seconds=30)),
'iat': timestamp_from_datetime(start),
'auth_time': timestamp_from_datetime(oidc_code.auth_time),
'acr': acr,
}
if oidc_code.nonce:
id_token['nonce'] = oidc_code.nonce
response = HttpResponse(json.dumps({
'access_token': unicode(access_token.uuid),
'token_type': 'Bearer',
'expires_in': expires_in,
'id_token': utils.make_idtoken(client, id_token),
}), content_type='application/json')
response['Cache-Control'] = 'no-store'
response['Pragma'] = 'no-cache'
return response
def authenticate_access_token(request):
if 'HTTP_AUTHORIZATION' not in request.META:
return None
authorization = request.META['HTTP_AUTHORIZATION'].split()
if authorization[0] != 'Bearer' or len(authorization) != 2:
return None
try:
access_token = models.OIDCAccessToken.objects.select_related().get(uuid=authorization[1])
except models.OIDCAccessToken.DoesNotExist:
return None
if not access_token.is_valid():
return None
return access_token
@setting_enabled('ENABLE', settings=app_settings)
def user_info(request, *args, **kwargs):
access_token = authenticate_access_token(request)
if access_token is None:
return HttpResponse('unauthenticated', status=401)
scope_set = access_token.scope_set()
user = access_token.user
user_info = {
'sub': utils.make_sub(access_token.client, access_token.user)
}
if 'profile' in scope_set:
user_info['family_name'] = user.last_name
user_info['given_name'] = user.first_name
if user.username:
user_info['preferred_username'] = user.username
if 'email' in scope_set:
user_info['email'] = user.email
user_info['email_verified'] = True
return HttpResponse(json.dumps(user_info), content_type='application/json')

477
tests/test_idp_oidc.py Normal file
View File

@ -0,0 +1,477 @@
import urlparse
import base64
import json
import datetime
import pytest
from jwcrypto.jwt import JWT
from jwcrypto.jwk import JWKSet, JWK
import utils
from django.core.urlresolvers import reverse
from django.utils.timezone import now
from authentic2_idp_oidc.models import OIDCClient, OIDCAuthorization, OIDCCode, OIDCAccessToken
from authentic2_idp_oidc.utils import make_sub
from authentic2.a2_rbac.utils import get_default_ou
from authentic2.utils import make_url
from authentic2_auth_oidc.utils import parse_timestamp
JWKSET = {
"keys": [
{
"qi": "h_zifVD-ChelxZUVxhICNcgGkQz26b-EdIlLY9rN7SX_aD3sLI_JHEHV4Bz3kV5eW8O4qJ8SHhfUdHGK-gRH7FVOGoXnXACf47QoXowHzsPLL64wCuZENTl7hIRGLY-BInULkfTQfuiVSMoxPjsVNTMBzMiz0bNjMQyMyvW5xH4",
"kty": "RSA",
"d": "pUcL4-LDBy3rqJWip269h5Hd6nLvqjXltfkVe_mL-LwZPHmCrUaj_SX54SnCY3Wyf7kxhoMYUac62lQ71923uJPFFdiavAujbNrtZPq32i4C-1apWXW8OGJr8VoVDqalxj9SAq1G54wbbsaAPrZdyuqy-esNxDqDigfbM-cWgngBBYo5CSsfnmnd05N2cUS26L7QzWbNHwilnBTE9e_J7rK3xUCDKrobv6_LiI-AhMmBHJSrCxjexh0wzfBi_Ntj9BGCcPThDjG8SQvaV-aLNdLfIy2XO3i076RLBB6Hm_yHuAparrwp-pPE48eQdiYjrSAFalz4ojWQ3_ByLA6uAQ",
"q": "2FvfeWnIlWNUipan7DIBlJrmz5EinJNxrQ-BNwPHrAoIM8qvyC7jPy09YxZs5Y9CMMZSal6C4Nm2LHBFxHU9z1qd5XDzbk19G-y1lDqZizVXr876TpiAjuq03rcoMQm8dQru_pVjUdgxR64vKyJ9CaFMAqcpZeEMIqAvzhQG8uE",
"dp": "Kg4HPGpzenhK2ser6nfM1Yt-pkqBbWQotvqsxGptECXpbN7vweupvL5kJPeRrbsXKp9QE7DXTN1sG9puJxMSwtgiv4hr9Va9e9WOC6PMd2VY7tgw5uKMpPLMc5y82PusRhBoRh0SUUsjyQxK9PGtWYnGZXbAoaIYPdMyDlosfqU",
"dq": "QuUNEHYTjZTbo8n2-4FumarXKGBAalbwM8jyc7cYemnTpWfKt8M_gd4T99oMK2IC3h_DhZ3ZK3pE6DKCb76sMLtczH8C1RziTMsATWdc5_zDMtl07O4b-ZQ5_g51P8w515pc0JwRzFFi0z3Y2aZdMKgNX1id5SES5nXOshHhICE",
"n": "0lN6CiJGFD8BSPV_azLoEl6Nq-WlHkU743D5rqvzw1sOaxstMGxAhVk2YIhWwfvapV6XjO_yvc4778VBTELOdjRw6BGUdBJepdwkL__TPyjEVhqMQj9MKhEU4GUy9w0Lsilb5D01kfrOKpmdcYw4jhcDvb0H4-LZgh1Vk84vF4WaQCUg_AX4drVDQOjoU8kuWIM8gz9w6zEsbIw-gtMRpFwS8ncA0zDX5VfyC77iMxzFftDIP2gM5GvdevMzvP9IRkRRBhP9vV4JchBFPHSA9OPJcnySjJJNW6aAJn6P6JasN1z68khjufM09J8UzmLAZYOq7gUG95Ox1KsV-g337Q",
"e": "AQAB",
"p": "-Nyj_Sw3f2HUqSssCZv84y7b3blOtGGAhfYN_JtGfcTQv2bOtxrIUzeonCi-Z_1W4hO10tqxJcOB0ibtDqkDlLhnLaIYOBfriITRFK83EJG5sC-0KTmFzUXFTA2aMc1QgP-Fu6gUfQpPqLgWxhx8EFhkBlBZshKU5-C-385Sco0"
}
]
}
@pytest.fixture
def oidc_settings(settings):
settings.A2_IDP_OIDC_JWKSET = JWKSET
return settings
def test_get_jwkset(oidc_settings):
from authentic2_idp_oidc.utils import get_jwkset
get_jwkset()
OIDC_CLIENT_PARAMS = [
{
'authorization_flow': OIDCClient.FLOW_IMPLICIT,
},
{},
{
'identifier_policy': OIDCClient.POLICY_UUID,
},
{
'identifier_policy': OIDCClient.POLICY_EMAIL,
},
{
'idtoken_algo': OIDCClient.ALGO_HMAC,
},
]
@pytest.fixture(params=OIDC_CLIENT_PARAMS)
def oidc_client(request, superuser, app):
url = reverse('admin:authentic2_idp_oidc_oidcclient_add')
assert OIDCClient.objects.count() == 0
response = utils.login(app, superuser, path=url)
response.form.set('name', 'oidcclient')
response.form.set('slug', 'oidcclient')
response.form.set('ou', get_default_ou().pk)
response.form.set('redirect_uris', 'https://example.com/callback')
for key, value in request.param.iteritems():
response.form.set(key, value)
response = response.form.submit().follow()
assert OIDCClient.objects.count() == 1
client = OIDCClient.objects.get()
utils.logout(app)
return client
def client_authentication_headers(oidc_client):
token = base64.b64encode('%s:%s' % (oidc_client.client_id, oidc_client.client_secret))
return {'Authorization': 'Basic %s' % token}
def bearer_authentication_headers(access_token):
return {'Authorization': 'Bearer %s' % str(access_token)}
@pytest.mark.parametrize('login_first', [(True,), (False,)])
def test_authorization_code_sso(login_first, oidc_settings, oidc_client, simple_user, app):
redirect_uri = oidc_client.redirect_uris.split()[0]
params = {
'client_id': oidc_client.client_id,
'scope': 'openid profile email',
'redirect_uri': redirect_uri,
'state': 'xxx',
'nonce': 'yyy',
}
if oidc_client.authorization_flow == oidc_client.FLOW_AUTHORIZATION_CODE:
params['response_type'] = 'code'
elif oidc_client.authorization_flow == oidc_client.FLOW_IMPLICIT:
params['response_type'] = 'token id_token'
authorize_url = make_url('oidc-authorize', params=params)
if login_first:
utils.login(app, simple_user)
response = app.get(authorize_url)
if not login_first:
response = response.follow()
assert response.request.path == reverse('auth_login')
response.form.set('username', simple_user.username)
response.form.set('password', simple_user.username)
response = response.form.submit(name='login-password-submit')
response = response.follow()
assert response.request.path == reverse('oidc-authorize')
assert 'a2-oidc-authorization-form' in response.content
assert OIDCAuthorization.objects.count() == 0
assert OIDCCode.objects.count() == 0
assert OIDCAccessToken.objects.count() == 0
response = response.form.submit('accept')
assert OIDCAuthorization.objects.count() == 1
authz = OIDCAuthorization.objects.get()
assert authz.client == oidc_client
assert authz.user == simple_user
assert authz.scope_set() == set('openid profile email'.split())
assert authz.expired >= now()
if oidc_client.authorization_flow == oidc_client.FLOW_AUTHORIZATION_CODE:
assert OIDCCode.objects.count() == 1
code = OIDCCode.objects.get()
assert code.client == oidc_client
assert code.user == simple_user
assert code.scope_set() == set('openid profile email'.split())
assert code.state == 'xxx'
assert code.nonce == 'yyy'
assert code.redirect_uri == redirect_uri
assert code.session_key == app.session.session_key
assert code.auth_time <= now()
assert code.expired >= now()
assert response['Location'].startswith(redirect_uri)
location = urlparse.urlparse(response['Location'])
if oidc_client.authorization_flow == oidc_client.FLOW_AUTHORIZATION_CODE:
query = urlparse.parse_qs(location.query)
assert set(query.keys()) == set(['code', 'state'])
assert query['code'] == [code.uuid]
code = query['code'][0]
assert query['state'] == ['xxx']
token_url = make_url('oidc-token')
response = app.post(token_url, {
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': oidc_client.redirect_uris.split()[0],
}, headers=client_authentication_headers(oidc_client))
assert 'error' not in response.json
assert 'access_token' in response.json
assert 'expires_in' in response.json
assert 'id_token' in response.json
assert response.json['token_type'] == 'Bearer'
access_token = response.json['access_token']
id_token = response.json['id_token']
elif oidc_client.authorization_flow == oidc_client.FLOW_IMPLICIT:
assert location.fragment
query = urlparse.parse_qs(location.fragment)
assert OIDCAccessToken.objects.count() == 1
access_token = OIDCAccessToken.objects.get()
assert set(query.keys()) == set(['access_token', 'token_type', 'expires_in', 'id_token',
'state'])
assert query['access_token'] == [access_token.uuid]
assert query['token_type'] == ['Bearer']
assert query['state'] == ['xxx']
access_token = query['access_token'][0]
id_token = query['id_token'][0]
if oidc_client.idtoken_algo == oidc_client.ALGO_RSA:
key = JWKSet.from_json(app.get(reverse('oidc-certs')).content)
elif oidc_client.idtoken_algo == oidc_client.ALGO_HMAC:
key = JWK(kty='oct', k=oidc_client.client_secret)
else:
raise NotImplementedError
jwt = JWT(jwt=id_token, key=key)
claims = json.loads(jwt.claims)
assert set(claims) >= set(['iss', 'sub', 'aud', 'exp', 'iat', 'nonce', 'auth_time', 'acr'])
assert claims['nonce'] == 'yyy'
assert response.request.url.startswith(claims['iss'])
assert claims['aud'] == oidc_client.client_id
assert parse_timestamp(claims['iat']) <= now()
assert parse_timestamp(claims['auth_time']) <= now()
assert parse_timestamp(claims['exp']) >= now()
if login_first:
assert claims['acr'] == 0
else:
assert claims['acr'] == 1
user_info_url = make_url('oidc-user-info')
response = app.get(user_info_url, headers=bearer_authentication_headers(access_token))
assert response.json['sub'] == make_sub(oidc_client, simple_user)
assert response.json['preferred_username'] == simple_user.username
assert response.json['given_name'] == simple_user.first_name
assert response.json['family_name'] == simple_user.last_name
assert response.json['email'] == simple_user.email
assert response.json['email_verified'] is True
def assert_oidc_error(response, error, error_description=None, fragment=False):
location = urlparse.urlparse(response['Location'])
query = location.fragment if fragment else location.query
query = urlparse.parse_qs(query)
assert query['error'] == [error]
if error_description:
assert len(query['error_description']) == 1
assert error_description in query['error_description'][0]
def assert_authorization_response(response, fragment=False, **kwargs):
location = urlparse.urlparse(response['Location'])
query = location.fragment if fragment else location.query
query = urlparse.parse_qs(query)
for key, value in kwargs.iteritems():
if value is None:
assert key in query
elif isinstance(value, list):
assert query[key] == value
else:
assert value in query[key][0]
def test_invalid_request(oidc_settings, oidc_client, simple_user, app):
redirect_uri = oidc_client.redirect_uris.split()[0]
if oidc_client.authorization_flow == oidc_client.FLOW_AUTHORIZATION_CODE:
fragment = False
response_type = 'code'
elif oidc_client.authorization_flow == oidc_client.FLOW_IMPLICIT:
fragment = True
response_type = 'id_token token'
else:
raise NotImplementedError
# client_id
authorize_url = make_url('oidc-authorize', params={})
response = app.get(authorize_url, status=400)
assert 'missing parameter \'client_id\'' in response.content
# redirect_uri
authorize_url = make_url('oidc-authorize', params={
'client_id': oidc_client.client_id,
})
response = app.get(authorize_url, status=400)
assert 'missing parameter \'redirect_uri\'' in response.content
# invalid client_id
authorize_url = make_url('oidc-authorize', params={
'client_id': 'xxx',
'redirect_uri': redirect_uri,
})
response = app.get(authorize_url, status=400)
assert 'unknown client_id' in response.content
# missing response_type
authorize_url = make_url('oidc-authorize', params={
'client_id': oidc_client.client_id,
'redirect_uri': redirect_uri,
})
response = app.get(authorize_url)
assert_oidc_error(response, 'invalid_request', 'missing parameter \'response_type\'',
fragment=fragment)
# missing scope
authorize_url = make_url('oidc-authorize', params={
'client_id': oidc_client.client_id,
'redirect_uri': redirect_uri,
'response_type': 'code',
})
response = app.get(authorize_url)
assert_oidc_error(response, 'invalid_request', 'missing parameter \'scope\'', fragment=fragment)
# invalid max_age
authorize_url = make_url('oidc-authorize', params={
'client_id': oidc_client.client_id,
'redirect_uri': redirect_uri,
'response_type': 'code',
'scope': 'openid',
'max_age': 'xxx',
})
response = app.get(authorize_url)
assert_oidc_error(response, 'invalid_request', 'max_age is not', fragment=fragment)
authorize_url = make_url('oidc-authorize', params={
'client_id': oidc_client.client_id,
'redirect_uri': redirect_uri,
'response_type': 'code',
'scope': 'openid',
'max_age': '-1',
})
response = app.get(authorize_url)
assert_oidc_error(response, 'invalid_request', 'max_age is not', fragment=fragment)
# invalid redirect_uri
authorize_url = make_url('oidc-authorize', params={
'client_id': oidc_client.client_id,
'redirect_uri': 'xxx',
'response_type': 'code',
'scope': 'openid',
})
response = app.get(authorize_url)
assert_oidc_error(response, 'invalid_request', 'unauthorized redirect_uri', fragment=fragment)
# unsupported response_type
authorize_url = make_url('oidc-authorize', params={
'client_id': oidc_client.client_id,
'redirect_uri': redirect_uri,
'response_type': 'xxx',
'scope': 'openid',
})
response = app.get(authorize_url)
if oidc_client.authorization_flow == oidc_client.FLOW_AUTHORIZATION_CODE:
assert_oidc_error(response, 'unsupported_response_type', 'only code is supported')
elif oidc_client.authorization_flow == oidc_client.FLOW_IMPLICIT:
assert_oidc_error(response, 'unsupported_response_type',
'only "id_token token" or "id_token" are supported', fragment=fragment)
# openid scope is missing
authorize_url = make_url('oidc-authorize', params={
'client_id': oidc_client.client_id,
'redirect_uri': redirect_uri,
'response_type': response_type,
'scope': 'profile',
})
response = app.get(authorize_url)
assert_oidc_error(response, 'invalid_request', 'openid scope is missing', fragment=fragment)
# use of an unknown scope
authorize_url = make_url('oidc-authorize', params={
'client_id': oidc_client.client_id,
'redirect_uri': redirect_uri,
'response_type': response_type,
'scope': 'openid email profile zob',
})
response = app.get(authorize_url)
assert_oidc_error(response, 'invalid_scope', fragment=fragment)
# cancel
authorize_url = make_url('oidc-authorize', params={
'client_id': oidc_client.client_id,
'redirect_uri': redirect_uri,
'response_type': response_type,
'scope': 'openid email profile',
'cancel': '1',
})
response = app.get(authorize_url)
assert_oidc_error(response, 'access_denied', error_description='user did not authenticate',
fragment=fragment)
# prompt=none
authorize_url = make_url('oidc-authorize', params={
'client_id': oidc_client.client_id,
'redirect_uri': redirect_uri,
'response_type': response_type,
'scope': 'openid email profile',
'prompt': 'none',
})
response = app.get(authorize_url)
assert_oidc_error(response, 'login_required', error_description='prompt is none',
fragment=fragment)
utils.login(app, simple_user)
# prompt=none max_age=0
authorize_url = make_url('oidc-authorize', params={
'client_id': oidc_client.client_id,
'redirect_uri': redirect_uri,
'response_type': response_type,
'scope': 'openid email profile',
'max_age': '0',
'prompt': 'none',
})
response = app.get(authorize_url)
assert_oidc_error(response, 'login_required', error_description='prompt is none',
fragment=fragment)
# max_age=0
authorize_url = make_url('oidc-authorize', params={
'client_id': oidc_client.client_id,
'redirect_uri': redirect_uri,
'response_type': response_type,
'scope': 'openid email profile',
'max_age': '0',
})
response = app.get(authorize_url)
assert urlparse.urlparse(response['Location']).path == reverse('auth_login')
# prompt=login
authorize_url = make_url('oidc-authorize', params={
'client_id': oidc_client.client_id,
'redirect_uri': redirect_uri,
'response_type': response_type,
'scope': 'openid email profile',
'prompt': 'login',
})
response = app.get(authorize_url)
assert urlparse.urlparse(response['Location']).path == reverse('auth_login')
# user refuse authorization
authorize_url = make_url('oidc-authorize', params={
'client_id': oidc_client.client_id,
'redirect_uri': redirect_uri,
'response_type': response_type,
'scope': 'openid email profile',
'prompt': 'none',
})
response = app.get(authorize_url)
assert_oidc_error(response, 'consent_required', error_description='prompt is none',
fragment=fragment)
# user refuse authorization
authorize_url = make_url('oidc-authorize', params={
'client_id': oidc_client.client_id,
'redirect_uri': redirect_uri,
'response_type': response_type,
'scope': 'openid email profile',
})
response = app.get(authorize_url)
response = response.form.submit('refuse')
assert_oidc_error(response, 'access_denied', error_description='user denied access',
fragment=fragment)
# authorization exists
authorize = OIDCAuthorization.objects.create(
client=oidc_client, user=simple_user, scopes='openid profile email',
expired=now() + datetime.timedelta(days=2))
response = app.get(authorize_url)
if oidc_client.authorization_flow == oidc_client.FLOW_AUTHORIZATION_CODE:
assert_authorization_response(response, code=None, fragment=fragment)
elif oidc_client.authorization_flow == oidc_client.FLOW_IMPLICIT:
assert_authorization_response(response, access_token=None, id_token=None, expires_in=None,
token_type=None, fragment=fragment)
# client ask for explicit authorization
authorize_url = make_url('oidc-authorize', params={
'client_id': oidc_client.client_id,
'redirect_uri': redirect_uri,
'response_type': response_type,
'scope': 'openid email profile',
'prompt': 'consent',
})
response = app.get(authorize_url)
assert 'a2-oidc-authorization-form' in response.content
# check all authorization have been deleted, it's our policy
assert OIDCAuthorization.objects.count() == 0
# authorization has expired
authorize.expired = now() - datetime.timedelta(days=2)
authorize.save()
response = app.get(authorize_url)
assert 'a2-oidc-authorization-form' in response.content
authorize.expired = now() + datetime.timedelta(days=2)
authorize.scopes = 'openid profile'
authorize.save()
assert OIDCAuthorization.objects.count() == 1
response = response.form.submit('accept')
assert OIDCAuthorization.objects.count() == 1
# old authorizations have been deleted
assert OIDCAuthorization.objects.get().pk != authorize.pk

View File

@ -41,6 +41,7 @@ deps =
pyquery
httmock
pytest-capturelog
pytz
commands =
./getlasso.sh
authentic: py.test {env:FAST:} {env:COVERAGE:} {posargs:tests/}