idp_oidc: support oauth2 resource owner password credential grant (#35205)

This commit is contained in:
Paul Marillonnet 2019-11-18 17:09:54 +01:00
parent ba597c14d5
commit dda27fe488
8 changed files with 438 additions and 42 deletions

View File

@ -122,6 +122,7 @@ setup(name="authentic2",
'dnspython>=1.10',
'Django-Select2>5,<6',
'django-tables2>=1.0,<2.0',
'django-ratelimit',
'gadjo>=0.53',
'django-import-export>=0.2.7,<=0.4.5',
'djangorestframework>=3.3,<3.5',

View File

@ -53,6 +53,14 @@ class AppSettings(object):
def IDTOKEN_DURATION(self):
return self._setting('IDTOKEN_DURATION', 30)
@property
def ACCESS_TOKEN_DURATION(self):
return self._setting('ACCESS_TOKEN_DURATION', 3600 * 8)
@property
def PASSWORD_GRANT_RATELIMIT(self):
return self._setting('PASSWORD_GRANT_RATELIMIT', '100/m')
app_settings = AppSettings('A2_IDP_OIDC_')
app_settings.__name__ = __name__
sys.modules[__name__] = app_settings

View File

@ -20,7 +20,7 @@ class Migration(migrations.Migration):
('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)),
('uuid', models.CharField(default=authentic2_idp_oidc.models.generate_uuid, max_length=128, verbose_name='uuid')),
('scopes', models.TextField(verbose_name='scopes')),
('session_key', models.CharField(max_length=128, verbose_name='session key')),
('session_key', models.CharField(blank=True, max_length=128, verbose_name='session key')),
('created', models.DateTimeField(auto_now_add=True, verbose_name='created')),
('expired', models.DateTimeField(verbose_name='expire')),
],
@ -40,7 +40,7 @@ class Migration(migrations.Migration):
('service_ptr', models.OneToOneField(parent_link=True, auto_created=True, primary_key=True, serialize=False, to='authentic2.Service')),
('client_id', models.CharField(default=authentic2_idp_oidc.models.generate_uuid, unique=True, max_length=255, verbose_name='client id')),
('client_secret', models.CharField(default=authentic2_idp_oidc.models.generate_uuid, max_length=255, verbose_name='client secret')),
('authorization_flow', models.PositiveIntegerField(default=1, verbose_name='authorization flow', choices=[(1, 'authorization code'), (2, 'implicit/native')])),
('authorization_flow', models.PositiveIntegerField(choices=[(1, 'authorization code'), (2, 'implicit/native'), (3, 'resource owner password credentials')], default=1, verbose_name='authorization flow')),
('redirect_uris', models.TextField(verbose_name='redirect URIs', validators=[authentic2_idp_oidc.models.validate_https_url])),
('sector_identifier_uri', models.URLField(verbose_name='sector identifier URI', blank=True)),
('identifier_policy', models.PositiveIntegerField(default=2, verbose_name='identifier policy', choices=[(1, 'uuid'), (2, 'pairwise'), (3, 'email')])),

View File

@ -0,0 +1,25 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.11.20 on 2020-01-22 21:58
from __future__ import unicode_literals
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('authentic2_idp_oidc', '0011_auto_20180808_1546'),
]
operations = [
migrations.AddField(
model_name='oidcclient',
name='access_token_duration',
field=models.DurationField(blank=True, default=None, null=True, verbose_name='time during which the access token is valid'),
),
migrations.AddField(
model_name='oidcclient',
name='scope',
field=models.TextField(blank=True, default=b'', verbose_name='resource owner credentials grant scope'),
),
]

View File

@ -78,9 +78,11 @@ class OIDCClient(Service):
]
FLOW_AUTHORIZATION_CODE = 1
FLOW_IMPLICIT = 2
FLOW_RESOURCE_OWNER_CRED = 3
FLOW_CHOICES = [
(FLOW_AUTHORIZATION_CODE, _('authorization code')),
(FLOW_IMPLICIT, _('implicit/native')),
(FLOW_RESOURCE_OWNER_CRED, _('resource owner password credentials')),
]
AUTHORIZATION_MODE_BY_SERVICE = 1
@ -106,6 +108,11 @@ class OIDCClient(Service):
blank=True,
null=True,
default=None)
access_token_duration = models.DurationField(
verbose_name=_('time during which the access token is valid'),
blank=True,
null=True,
default=None)
authorization_mode = models.PositiveIntegerField(
default=AUTHORIZATION_MODE_BY_SERVICE,
choices=AUTHORIZATION_MODES,
@ -129,6 +136,11 @@ class OIDCClient(Service):
verbose_name=_('identifier policy'),
default=POLICY_PAIRWISE,
choices=IDENTIFIER_POLICIES)
scope = models.TextField(
verbose_name=_('resource owner credentials grant scope'),
help_text=_('Permitted or default scopes (for credentials grant)'),
default='',
blank=True)
@to_iter
def get_idtoken_algorithms():
@ -198,6 +210,9 @@ class OIDCClient(Service):
return True
return False
def scope_set(self):
return utils.scope_set(self.scope)
def __repr__(self):
return ('<OIDCClient name:%r client_id:%r identifier_policy:%r>' %
(self.name, self.client_id, self.get_identifier_policy_display()))
@ -312,7 +327,8 @@ class OIDCAccessToken(models.Model):
verbose_name=_('scopes'))
session_key = models.CharField(
verbose_name=_('session key'),
max_length=128)
max_length=128,
blank=True)
# metadata
created = models.DateTimeField(

View File

@ -179,10 +179,11 @@ def normalize_claim_values(values):
def create_user_info(request, client, user, scope_set, id_token=False):
'''Create user info dictionnary'''
'''Create user info dictionary'''
user_info = {
'sub': make_sub(client, user)
}
if 'openid' in scope_set:
user_info['sub'] = make_sub(client, user)
attributes = get_attributes({
'user': user,
'request': request,

View File

@ -15,12 +15,14 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
import math
import datetime
import json
import base64
import time
from django.http import HttpResponse, HttpResponseBadRequest, HttpResponseNotAllowed
from django.http import (HttpResponse, HttpResponseBadRequest,
HttpResponseNotAllowed, JsonResponse)
from django.utils import six
from django.utils.timezone import now, utc
from django.utils.http import urlencode
@ -28,10 +30,14 @@ from django.shortcuts import render
from django.views.decorators.csrf import csrf_exempt
from django.core.urlresolvers import reverse
from django.contrib import messages
from django.contrib.auth import authenticate
from django.conf import settings
from django.utils.translation import ugettext as _
from ratelimit.utils import is_ratelimited
from authentic2 import app_settings as a2_app_settings
from authentic2.decorators import setting_enabled
from authentic2.exponential_retry_timeout import ExponentialRetryTimeout
from authentic2.utils import (login_require, redirect, timestamp_from_datetime,
last_authentication_event, make_url)
from authentic2.views import logout as a2_logout
@ -60,7 +66,7 @@ def openid_configuration(request, *args, **kwargs):
'frontchannel_logout_supported': True,
'frontchannel_logout_session_supported': True,
}
return HttpResponse(json.dumps(metadata), content_type='application/json')
return JsonResponse(metadata)
@setting_enabled('ENABLE', settings=app_settings)
@ -90,9 +96,19 @@ def authorization_error(request, redirect_uri, error, error_description=None, er
def idtoken_duration(client):
if client.idtoken_duration:
return client.idtoken_duration
return datetime.timedelta(seconds=app_settings.IDTOKEN_DURATION)
return client.idtoken_duration or datetime.timedelta(seconds=app_settings.IDTOKEN_DURATION)
def access_token_duration(client):
return client.access_token_duration or datetime.timedelta(seconds=app_settings.IDTOKEN_DURATION)
def allowed_scopes(client):
return client.scope_set() or app_settings.SCOPES or ['openid', 'email', 'profile']
def is_scopes_allowed(scopes, client):
return scopes <= set(allowed_scopes(client))
@setting_enabled('ENABLE', settings=app_settings)
@ -115,6 +131,13 @@ def authorize(request, *args, **kwargs):
redirect_uri, client_id)
return redirect(request, 'auth_homepage')
if client.authorization_flow == client.FLOW_RESOURCE_OWNER_CRED:
messages.warning(request, _('Client is configured for resource owner password crendetial grant type'))
return authorization_error(request, 'auth_homepage',
'unauthorized_client',
error_description='authz endpoint is configured '
'for resource owner password credential grant type')
if not client.is_valid_redirect_uri(redirect_uri):
messages.warning(request, _('Authorization request is invalid'))
logger.warning(u'idp_oidc: authorization request error, unknown redirect_uri redirect_uri=%r client_id=%r',
@ -171,10 +194,10 @@ def authorize(request, *args, **kwargs):
error_description='openid scope is missing',
state=state,
fragment=fragment)
allowed_scopes = app_settings.SCOPES or ['openid', 'email', 'profile']
if not (scopes <= set(allowed_scopes)):
if not is_scopes_allowed(scopes, client):
message = 'only "%s" scope(s) are supported, but "%s" requested' % (
', '.join(allowed_scopes), ', '.join(scopes))
', '.join(allowed_scopes(client)), ', '.join(scopes))
return authorization_error(request, redirect_uri, 'invalid_scope',
error_description=message,
state=state,
@ -290,14 +313,14 @@ def authorize(request, *args, **kwargs):
else:
# FIXME: we should probably factorize this part with the token endpoint similar code
need_access_token = 'token' in response_type.split()
expires_in = 3600 * 8
expires_in = access_token_duration(client)
if need_access_token:
access_token = models.OIDCAccessToken.objects.create(
client=client,
user=request.user,
scopes=u' '.join(scopes),
session_key=request.session.session_key,
expired=start + datetime.timedelta(seconds=expires_in))
expired=start + expires_in)
acr = '0'
if nonce is not None and last_auth.get('nonce') == nonce:
acr = '1'
@ -326,7 +349,7 @@ def authorize(request, *args, **kwargs):
params.update({
'access_token': access_token.uuid,
'token_type': 'Bearer',
'expires_in': expires_in,
'expires_in': expires_in.total_seconds(),
})
# query is transfered through the hashtag
response = redirect(request, redirect_uri + '#%s' % urlencode(params), resolve=False)
@ -365,32 +388,139 @@ def authenticate_client(request, client=None):
return client
def invalid_request(desc=None):
def error_response(error, error_description=None, status=400):
content = {
'error': 'invalid_request',
'error': error,
}
if desc:
content['desc'] = desc
return HttpResponseBadRequest(json.dumps(content), content_type='application/json')
if error_description:
content['error_description'] = error_description
return JsonResponse(content, status=status)
@setting_enabled('ENABLE', settings=app_settings)
@csrf_exempt
def token(request, *args, **kwargs):
if request.method != 'POST':
return HttpResponseNotAllowed(['POST'])
grant_type = request.POST.get('grant_type')
if grant_type != 'authorization_code':
return invalid_request('grant_type is not authorization_code')
def invalid_request_response(error_description=None):
return error_response('invalid_request', error_description=error_description)
def access_denied_response(error_description=None):
return error_response('access_denied', error_description=error_description)
def unauthorized_client_response(error_description=None):
return error_response('unauthorized_client', error_description=error_description)
def invalid_client_response(error_description=None):
return error_response('invalid_client', error_description=error_description)
def credential_grant_ratelimit_key(group, request):
client = authenticate_client(request, client=None)
if client:
return client.client_id
# return remote address when no valid client credentials have been provided
return request.META['REMOTE_ADDR']
def idtoken_from_user_credential(request):
if request.META.get('CONTENT_TYPE') != 'application/x-www-form-urlencoded':
return invalid_request_response(
'wrong content type. request content type must be \'application/x-www-form-urlencoded\'')
username = request.POST.get('username')
scope = request.POST.get('scope')
# scope is ignored, we used the configured scope
if not all((username, request.POST.get('password'))):
return invalid_request_response(
'request must bear both username and password as '
'parameters using the "application/x-www-form-urlencoded" '
'media type')
if is_ratelimited(
request, group='ro-cred-grant', increment=True,
key=credential_grant_ratelimit_key,
rate=app_settings.PASSWORD_GRANT_RATELIMIT):
return invalid_request_response(
'reached rate limitation, too many erroneous requests')
client = authenticate_client(request, client=None)
if not client:
return invalid_client_response('client authentication failed')
if client.authorization_flow != models.OIDCClient.FLOW_RESOURCE_OWNER_CRED:
return unauthorized_client_response(
'client is not configured for resource owner password '
'credential grant')
exponential_backoff = ExponentialRetryTimeout(
key_prefix='idp-oidc-ro-cred-grant',
duration=a2_app_settings.A2_LOGIN_EXPONENTIAL_RETRY_TIMEOUT_DURATION,
factor=a2_app_settings.A2_LOGIN_EXPONENTIAL_RETRY_TIMEOUT_FACTOR)
backoff_keys = (username, client.client_id)
seconds_to_wait = exponential_backoff.seconds_to_wait(*backoff_keys)
if seconds_to_wait > a2_app_settings.A2_LOGIN_EXPONENTIAL_RETRY_TIMEOUT_MAX_DURATION:
seconds_to_wait = a2_app_settings.A2_LOGIN_EXPONENTIAL_RETRY_TIMEOUT_MAX_DURATION
if seconds_to_wait:
return invalid_request_response(
'too many attempts with erroneous RO password, you must wait '
'%s seconds to try again.' % int(math.ceil(seconds_to_wait)))
user = authenticate(request, username=username, password=request.POST.get('password'))
if not user:
exponential_backoff.failure(*backoff_keys)
return access_denied_response('invalid resource owner credentials')
# limit requested scopes
if scope is not None:
scopes = utils.scope_set(scope) & client.scope_set()
else:
scopes = client.scope_set()
exponential_backoff.success(*backoff_keys)
start = now()
# make access_token
expires_in = access_token_duration(client)
access_token = models.OIDCAccessToken.objects.create(
client=client,
user=user,
scopes=' '.join(scopes),
session_key='',
expired=start + expires_in)
# make id_token
id_token = utils.create_user_info(
request,
client,
user,
scopes,
id_token=True)
id_token.update({
'iss': utils.get_issuer(request),
'aud': client.client_id,
'exp': timestamp_from_datetime(start + idtoken_duration(client)),
'iat': timestamp_from_datetime(start),
'auth_time': timestamp_from_datetime(start),
'acr': '0',
})
return JsonResponse({
'access_token': six.text_type(access_token.uuid),
'token_type': 'Bearer',
'expires_in': expires_in.total_seconds(),
'id_token': utils.make_idtoken(client, id_token),
})
def tokens_from_authz_code(request):
code = request.POST.get('code')
if code is None:
return invalid_request('missing code')
return invalid_request_response('missing code')
try:
oidc_code = models.OIDCCode.objects.select_related().get(uuid=code)
except models.OIDCCode.DoesNotExist:
return invalid_request('invalid code')
return invalid_request_response('invalid code')
if not oidc_code.is_valid():
return invalid_request('code has expired or user is disconnected')
return invalid_request_response('code has expired or user is disconnected')
client = authenticate_client(request, client=oidc_code.client)
if client is None:
return HttpResponse('unauthenticated', status=401)
@ -398,14 +528,14 @@ def token(request, *args, **kwargs):
models.OIDCCode.objects.filter(uuid=code).delete()
redirect_uri = request.POST.get('redirect_uri')
if oidc_code.redirect_uri != redirect_uri:
return invalid_request('invalid redirect_uri')
expires_in = 3600 * 8
return invalid_request_response('invalid redirect_uri')
expires_in = access_token_duration(client)
access_token = models.OIDCAccessToken.objects.create(
client=client,
user=oidc_code.user,
scopes=oidc_code.scopes,
session_key=oidc_code.session_key,
expired=oidc_code.created + datetime.timedelta(seconds=expires_in))
expired=oidc_code.created + expires_in)
start = now()
acr = '0'
if (oidc_code.nonce is not None
@ -429,12 +559,26 @@ def token(request, *args, **kwargs):
})
if oidc_code.nonce is not None:
id_token['nonce'] = oidc_code.nonce
response = HttpResponse(json.dumps({
return JsonResponse({
'access_token': six.text_type(access_token.uuid),
'token_type': 'Bearer',
'expires_in': expires_in,
'expires_in': expires_in.total_seconds(),
'id_token': utils.make_idtoken(client, id_token),
}), content_type='application/json')
})
@setting_enabled('ENABLE', settings=app_settings)
@csrf_exempt
def token(request, *args, **kwargs):
if request.method != 'POST':
return HttpResponseNotAllowed(['POST'])
grant_type = request.POST.get('grant_type')
if grant_type == 'password':
response = idtoken_from_user_credential(request)
elif grant_type == 'authorization_code':
response = tokens_from_authz_code(request)
else:
return invalid_request_response('grant_type must be either authorization_code or password')
response['Cache-Control'] = 'no-store'
response['Pragma'] = 'no-cache'
return response
@ -465,7 +609,7 @@ def user_info(request, *args, **kwargs):
access_token.client,
access_token.user,
access_token.scope_set())
return HttpResponse(json.dumps(user_info), content_type='application/json')
return JsonResponse(user_info)
@setting_enabled('ENABLE', settings=app_settings)

View File

@ -25,20 +25,24 @@ from jwcrypto.jwk import JWKSet, JWK
import utils
from django.core.cache import cache
from django.core.urlresolvers import reverse
from django.core.files import File
from django.db import connection
from django.db.migrations.executor import MigrationExecutor
from django.utils.timezone import now
from django.test.client import RequestFactory
from django.contrib.auth import get_user_model
from django.utils.six.moves.urllib import parse as urlparse
from ratelimit.utils import is_ratelimited
User = get_user_model()
from authentic2.models import Attribute, AuthorizedRole
from authentic2_idp_oidc.models import OIDCClient, OIDCAuthorization, OIDCCode, OIDCAccessToken, OIDCClaim
from authentic2_idp_oidc.utils import make_sub
from authentic2_idp_oidc.utils import (make_sub, get_first_rsa_sig_key,
base64url)
from authentic2.a2_rbac.utils import get_default_ou
from authentic2.utils import make_url
from authentic2_auth_oidc.utils import parse_timestamp
@ -66,6 +70,7 @@ JWKSET = {
@pytest.fixture
def oidc_settings(settings):
settings.A2_IDP_OIDC_JWKSET = JWKSET
settings.A2_IDP_OIDC_PASSWORD_GRANT_RATELIMIT = '100/m'
return settings
@ -667,7 +672,7 @@ def test_invalid_request(caplog, oidc_settings, oidc_client, simple_user, app):
}, headers=client_authentication_headers(oidc_client), status=400)
assert 'error' in response.json
assert response.json['error'] == 'invalid_request'
assert response.json['desc'] == 'code has expired or user is disconnected'
assert response.json['error_description'] == 'code has expired or user is disconnected'
# invalid logout
logout_url = make_url('oidc-logout', params={
@ -693,7 +698,7 @@ def test_invalid_request(caplog, oidc_settings, oidc_client, simple_user, app):
}, headers=client_authentication_headers(oidc_client), status=400)
assert 'error' in response.json
assert response.json['error'] == 'invalid_request'
assert response.json['desc'] == 'code has expired or user is disconnected'
assert response.json['error_description'] == 'code has expired or user is disconnected'
def test_expired_manager(db, simple_user):
@ -1161,3 +1166,199 @@ def test_filter_api_users(app, oidc_client, admin, simple_user, role_random):
response = app.get('/api/users/')
assert len(response.json['results']) == count
def test_credentials_grant(app, oidc_client, admin, simple_user):
cache.clear()
oidc_client.authorization_flow = OIDCClient.FLOW_RESOURCE_OWNER_CRED
oidc_client.scope = 'openid'
oidc_client.save()
token_url = make_url('oidc-token')
if oidc_client.idtoken_algo == OIDCClient.ALGO_HMAC:
jwk = JWK(kty='oct', k=base64url(oidc_client.client_secret.encode('utf-8')))
elif oidc_client.idtoken_algo == OIDCClient.ALGO_RSA:
jwk = get_first_rsa_sig_key()
# 1. test in-request client credentials
params = {
'client_id': oidc_client.client_id,
'client_secret': oidc_client.client_secret,
'grant_type': 'password',
'username': simple_user.username,
'password': simple_user.username,
}
response = app.post(token_url, params=params)
assert 'id_token' in response.json
token = response.json['id_token']
header, payload, signature = token.split('.')
jwt = JWT()
jwt.deserialize(token, key=jwk)
claims = json.loads(jwt.claims)
# xxx already verified by jwcrypto deserialization?
assert set(claims) == set(['acr', 'aud', 'auth_time', 'exp', 'iat', 'iss', 'sub'])
assert all(claims.values())
# 2. test basic authz
params.pop('client_id')
params.pop('client_secret')
response = app.post(token_url, params=params, headers=client_authentication_headers(oidc_client))
assert 'id_token' in response.json
token = response.json['id_token']
header, payload, signature = token.split('.')
jwt = JWT()
jwt.deserialize(token, key=jwk)
claims = json.loads(jwt.claims)
# xxx already verified by jwcrypto deserialization?
assert set(claims) == set(['acr', 'aud', 'auth_time', 'exp', 'iat', 'iss', 'sub'])
assert all(claims.values())
def test_credentials_grant_ratelimitation_invalid_client(
app, oidc_client, admin, simple_user, oidc_settings, freezer):
freezer.move_to('2020-01-01')
cache.clear()
oidc_client.authorization_flow = OIDCClient.FLOW_RESOURCE_OWNER_CRED
oidc_client.save()
token_url = make_url('oidc-token')
params = {
'client_id': oidc_client.client_id,
'client_secret': 'notgood',
'grant_type': 'password',
'username': simple_user.username,
'password': simple_user.username,
}
for i in range(int(oidc_settings.A2_IDP_OIDC_PASSWORD_GRANT_RATELIMIT.split('/')[0])):
response = app.post(token_url, params=params, status=400)
assert response.json['error'] == 'invalid_client'
assert 'client authentication failed' in response.json['error_description']
response = app.post(token_url, params=params, status=400)
assert response.json['error'] == 'invalid_request'
assert 'reached rate limitation' in response.json['error_description']
def test_credentials_grant_ratelimitation_valid_client(
app, oidc_client, admin, simple_user, oidc_settings, freezer):
freezer.move_to('2020-01-01')
cache.clear()
oidc_client.authorization_flow = OIDCClient.FLOW_RESOURCE_OWNER_CRED
oidc_client.save()
token_url = make_url('oidc-token')
params = {
'client_id': oidc_client.client_id,
'client_secret': oidc_client.client_secret,
'grant_type': 'password',
'username': simple_user.username,
'password': simple_user.username,
}
for i in range(int(oidc_settings.A2_IDP_OIDC_PASSWORD_GRANT_RATELIMIT.split('/')[0])):
app.post(token_url, params=params)
response = app.post(token_url, params=params, status=400)
assert response.json['error'] == 'invalid_request'
assert 'reached rate limitation' in response.json['error_description']
def test_credentials_grant_retrytimout(
app, oidc_client, admin, simple_user, settings, freezer):
freezer.move_to('2020-01-01')
cache.clear()
settings.A2_LOGIN_EXPONENTIAL_RETRY_TIMEOUT_DURATION = 2
oidc_client.authorization_flow = OIDCClient.FLOW_RESOURCE_OWNER_CRED
oidc_client.save()
token_url = make_url('oidc-token')
params = {
'client_id': oidc_client.client_id,
'client_secret': oidc_client.client_secret,
'grant_type': 'password',
'username': simple_user.username,
'password': u'SurelyNotTheRightPassword',
}
attempts = 0
while attempts < 100:
response = app.post(token_url, params=params, status=400)
attempts += 1
if attempts >= 10:
assert response.json['error'] == 'invalid_request'
assert 'too many attempts with erroneous RO password' in response.json['error_description']
# freeze some time after backoff delay expiration
freezer.move_to(datetime.timedelta(days=2))
# obtain a successful login
params['password'] = simple_user.username
response = app.post(token_url, params=params, status=200)
assert 'id_token' in response.json
def test_credentials_grant_invalid_flow(
app, oidc_client, admin, simple_user, settings):
cache.clear()
params = {
'client_id': oidc_client.client_id,
'client_secret': oidc_client.client_secret,
'grant_type': 'password',
'username': simple_user.username,
'password': u'SurelyNotTheRightPassword',
}
token_url = make_url('oidc-token')
response = app.post(token_url, params=params, status=400)
assert response.json['error'] == 'unauthorized_client'
assert 'is not configured' in response.json['error_description']
def test_credentials_grant_invalid_client(
app, oidc_client, admin, simple_user, settings):
cache.clear()
oidc_client.authorization_flow = OIDCClient.FLOW_RESOURCE_OWNER_CRED
oidc_client.save()
params = {
'client_id': oidc_client.client_id,
'client_secret': 'tryingthis', # Nope, wrong secret
'grant_type': 'password',
'username': simple_user.username,
'password': simple_user.username,
}
token_url = make_url('oidc-token')
response = app.post(token_url, params=params, status=400)
assert response.json['error'] == 'invalid_client'
assert response.json['error_description'] == 'client authentication failed'
def test_credentials_grant_unauthz_client(
app, oidc_client, admin, simple_user, settings):
cache.clear()
params = {
'client_id': oidc_client.client_id,
'client_secret': oidc_client.client_secret,
'grant_type': 'password',
'username': simple_user.username,
'password': simple_user.username,
}
token_url = make_url('oidc-token')
response = app.post(token_url, params=params, status=400)
assert response.json['error'] == 'unauthorized_client'
assert 'client is not configured for resource owner'in response.json['error_description']
def test_credentials_grant_invalid_content_type(
app, oidc_client, admin, simple_user, settings):
cache.clear()
oidc_client.authorization_flow = OIDCClient.FLOW_RESOURCE_OWNER_CRED
oidc_client.save()
params = {
'client_id': oidc_client.client_id,
'client_secret': oidc_client.client_secret,
'grant_type': 'password',
'username': simple_user.username,
'password': simple_user.username,
}
token_url = make_url('oidc-token')
response = app.post(
token_url, params=params,
content_type='multipart/form-data',
status=400)
assert response.json['error'] == 'invalid_request'
assert 'wrong content type' in response.json['error_description']