Compare commits
3 Commits
main
...
wip/10211-
Author | SHA1 | Date |
---|---|---|
Benjamin Dauvergne | 349180a799 | |
Benjamin Dauvergne | 92ea191ea0 | |
Benjamin Dauvergne | e69dfb5a98 |
|
@ -1,4 +1,4 @@
|
|||
@Library('eo-jenkins-lib@master') import eo.Utils
|
||||
@Library('eo-jenkins-lib@main') import eo.Utils
|
||||
|
||||
pipeline {
|
||||
agent any
|
||||
|
|
2
setup.py
2
setup.py
|
@ -63,7 +63,7 @@ setup(name="django-kerberos",
|
|||
install_requires=[
|
||||
'six',
|
||||
'django>1.8',
|
||||
'pykerberos',
|
||||
'gssapi',
|
||||
],
|
||||
package_dir={
|
||||
'': 'src',
|
||||
|
|
|
@ -14,120 +14,56 @@
|
|||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import re
|
||||
import logging
|
||||
import warnings
|
||||
|
||||
import six
|
||||
|
||||
from django.core.exceptions import ImproperlyConfigured
|
||||
from django.utils.encoding import force_bytes
|
||||
|
||||
from django.contrib.auth import get_user_model
|
||||
from django.contrib.auth.backends import ModelBackend
|
||||
|
||||
import kerberos
|
||||
from .utils import check_password
|
||||
|
||||
from . import app_settings
|
||||
|
||||
logger = logging.getLogger('django_kerberos')
|
||||
|
||||
UserModel = get_user_model()
|
||||
|
||||
|
||||
class KerberosBackend(ModelBackend):
|
||||
def __init__(self):
|
||||
self.logger = logging.getLogger(__name__)
|
||||
def user_for_principal(self, request, principal):
|
||||
try:
|
||||
warnings.warn('example backend do not use in production!')
|
||||
return UserModel.objects.get(username=str(principal))
|
||||
except UserModel.DoesNotExist:
|
||||
logger.warning('no user found for principal %s', principal)
|
||||
return None
|
||||
|
||||
def username_from_principal(self, principal):
|
||||
'''Make a username from a principal name'''
|
||||
username, domain = principal.rsplit('@', 1)
|
||||
return u'{0}@{1}'.format(username, domain.lower())
|
||||
|
||||
def authorize_principal(self, principal):
|
||||
'''Is this principal authorized to login ?'''
|
||||
return True
|
||||
|
||||
def provision_user(self, principal, user):
|
||||
'''Modify user based on information we can retrieve on this principal'''
|
||||
if app_settings.BACKEND_ADMIN_REGEXP:
|
||||
if re.match(app_settings.BACKEND_ADMIN_REGEXP, principal):
|
||||
if not user.is_staff or not user.is_superuser:
|
||||
self.logger.info('giving superuser power to principal %r', principal)
|
||||
user.is_staff = True
|
||||
user.is_superuser = True
|
||||
user.save()
|
||||
|
||||
def should_create_user(self):
|
||||
'''Should we create users for new principals ?'''
|
||||
return app_settings.BACKEND_CREATE
|
||||
|
||||
def lookup_user(self, principal):
|
||||
'''Find the user model linked to this principal'''
|
||||
User = get_user_model()
|
||||
username_field = getattr(User, 'USERNAME_FIELD', 'username')
|
||||
username = self.username_from_principal(principal)
|
||||
kwargs = {username_field: username}
|
||||
if self.should_create_user():
|
||||
user, created = User.objects.get_or_create(**kwargs)
|
||||
if created:
|
||||
user.set_unusable_password()
|
||||
user.save()
|
||||
else:
|
||||
try:
|
||||
user = User.objects.get(**kwargs)
|
||||
except User.DoesNotExist:
|
||||
return
|
||||
self.provision_user(principal, user)
|
||||
return user
|
||||
|
||||
def authenticate(self, request=None, principal=None):
|
||||
if principal and self.authorize_principal(principal):
|
||||
return self.lookup_user(principal)
|
||||
def authenticate(self, request, principal=None):
|
||||
return self.user_for_principal(request, principal)
|
||||
|
||||
|
||||
class KerberosPasswordBackend(KerberosBackend):
|
||||
def default_realm(self):
|
||||
'''Default realm for usernames without a realm'''
|
||||
return app_settings.DEFAULT_REALM
|
||||
|
||||
def principal_from_username(self, username):
|
||||
realm = self.default_realm()
|
||||
if '@' not in username and realm:
|
||||
username = u'%s@%s' % (username, realm)
|
||||
def principal_for_user(self, user, username):
|
||||
return username
|
||||
|
||||
def keep_password(self):
|
||||
'''Do we save a password hash ?'''
|
||||
return app_settings.KEEP_PASSWORD
|
||||
|
||||
def service_principal(self):
|
||||
'''Service principal for checking password'''
|
||||
if not app_settings.SERVICE_PRINCIPAL:
|
||||
raise ImproperlyConfigured('Kerberos password backend needs the '
|
||||
'setting KERBEROS_SERVICE_PRINCIPAL to be'
|
||||
' set')
|
||||
return app_settings.SERVICE_PRINCIPAL
|
||||
|
||||
def authenticate(self, request=None, username=None, password=None):
|
||||
def authenticate(self, request, username=None, password=None, **kwargs):
|
||||
'''Verify username and password using Kerberos'''
|
||||
if not username:
|
||||
if username is None:
|
||||
username = kwargs.get(UserModel.USERNAME_FIELD)
|
||||
if username is None or password is None:
|
||||
return
|
||||
|
||||
kerb_principal = principal = self.principal_from_username(username)
|
||||
kerb_password = password
|
||||
|
||||
if six.PY2:
|
||||
kerb_principal = force_bytes(kerb_principal)
|
||||
kerb_password = force_bytes(kerb_principal)
|
||||
|
||||
try:
|
||||
if not kerberos.checkPassword(kerb_principal, kerb_password,
|
||||
self.service_principal(),
|
||||
self.default_realm()):
|
||||
return
|
||||
except kerberos.KrbError as e:
|
||||
logging.getLogger(__name__).error(
|
||||
'password validation for principal %r failed %s', principal, e)
|
||||
user = UserModel.objects.get(username=username)
|
||||
except UserModel.DoesNotExist:
|
||||
return
|
||||
|
||||
principal = self.principal_for_user(user, username)
|
||||
|
||||
if check_password(principal, password):
|
||||
if not user.check_password(password):
|
||||
user.set_password(password)
|
||||
user.save()
|
||||
return user
|
||||
else:
|
||||
if principal and self.authorize_principal(principal):
|
||||
user = self.lookup_user(principal)
|
||||
if self.keep_password():
|
||||
user.set_password(password)
|
||||
return user
|
||||
return None
|
||||
|
|
|
@ -1,58 +0,0 @@
|
|||
# django-kerberos - SPNEGO/Kerberos authentication for Django applications
|
||||
# Copyright (C) 2014-2019 Entr'ouvert
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify it
|
||||
# under the terms of the GNU Affero General Public License as published
|
||||
# by the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import logging
|
||||
|
||||
from django.core.exceptions import ImproperlyConfigured
|
||||
from django.utils.encoding import force_bytes
|
||||
|
||||
from django.contrib.auth.hashers import BasePasswordHasher
|
||||
|
||||
import kerberos
|
||||
|
||||
from . import app_settings
|
||||
|
||||
|
||||
class KerberosHasher(BasePasswordHasher):
|
||||
'''A pseudo hasher which just validate that the given password
|
||||
match a given Kerberos identity'''
|
||||
algorithm = 'kerberos'
|
||||
|
||||
def default_realm(self):
|
||||
'''Default realm for usernames without a realm'''
|
||||
return app_settings.DEFAULT_REALM
|
||||
|
||||
def service_principal(self):
|
||||
if not app_settings.SERVICE_PRINCIPAL:
|
||||
raise ImproperlyConfigured(
|
||||
'Kerberos pseudo password hasher needs the setting '
|
||||
'KERBEROS_SERVICE_PRINCIPAL to be set')
|
||||
return app_settings.SERVICE_PRINCIPAL
|
||||
|
||||
def verify(self, password, encoded):
|
||||
algorithm, principal = encoded.split('$', 2)
|
||||
assert algorithm == self.algorithm
|
||||
principal = force_bytes(principal)
|
||||
password = force_bytes(password)
|
||||
try:
|
||||
return kerberos.checkPassword(
|
||||
principal, password,
|
||||
self.service_principal(),
|
||||
self.default_realm())
|
||||
except kerberos.KrbError as e:
|
||||
logging.getLogger(__name__).error(
|
||||
'password validation for principal %r failed %s', principal, e)
|
||||
return False
|
|
@ -0,0 +1,33 @@
|
|||
# django-kerberos - SPNEGO/Kerberos authentication for Django applications
|
||||
# Copyright (C) 2014-2019 Entr'ouvert
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify it
|
||||
# under the terms of the GNU Affero General Public License as published
|
||||
# by the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import logging
|
||||
|
||||
import gssapi
|
||||
import gssapi.exceptions
|
||||
import gssapi.raw as gb
|
||||
|
||||
logger = logging.getLogger('django_kerberos')
|
||||
|
||||
|
||||
def check_password(principal, password):
|
||||
try:
|
||||
name = gb.import_name(principal.encode(), gb.NameType.kerberos_principal)
|
||||
if gb.acquire_cred_with_password(name, password.encode('utf-8')):
|
||||
return True
|
||||
except gssapi.exceptions.GSSError as e:
|
||||
logger.warning('kerberos password check failed %s', e)
|
||||
return False
|
|
@ -14,109 +14,117 @@
|
|||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
from __future__ import unicode_literals
|
||||
|
||||
import base64
|
||||
import logging
|
||||
|
||||
import kerberos
|
||||
import gssapi
|
||||
import gssapi.exceptions
|
||||
|
||||
from django import http
|
||||
from django.template.response import TemplateResponse
|
||||
from django.conf import settings
|
||||
from django.views.generic.base import View
|
||||
from django.contrib import messages
|
||||
from django.utils.translation import ugettext_lazy as _
|
||||
from django.utils.http import is_safe_url
|
||||
from django.views.generic.base import View
|
||||
|
||||
from django.contrib.auth import authenticate, login as auth_login
|
||||
from django.contrib.auth import authenticate, login as auth_login, REDIRECT_FIELD_NAME
|
||||
|
||||
from . import app_settings
|
||||
logger = logging.getLogger('django_kerberos')
|
||||
|
||||
|
||||
class NegotiateFailed(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class NegotiateView(View):
|
||||
NEXT_URL_FIELD = 'next'
|
||||
unauthorized_template_name = 'django_kerberos/unauthorized.html'
|
||||
error_template_name = 'django_kerberos/error.html'
|
||||
redirect_field_name = REDIRECT_FIELD_NAME
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.logger = logging.getLogger(__name__)
|
||||
super(NegotiateView, self).__init__(*args, **kwargs)
|
||||
def get_success_url_allowed_hosts(self):
|
||||
return {self.request.get_host()}
|
||||
|
||||
def challenge(self, request, *args, **kwargs):
|
||||
def get_redirect_url(self):
|
||||
"""Return the user-originating redirect URL if it's safe."""
|
||||
redirect_to = self.request.POST.get(
|
||||
self.redirect_field_name,
|
||||
self.request.GET.get(self.redirect_field_name, '')
|
||||
)
|
||||
url_is_safe = is_safe_url(
|
||||
url=redirect_to,
|
||||
allowed_hosts=self.get_success_url_allowed_hosts(),
|
||||
require_https=self.request.is_secure(),
|
||||
)
|
||||
return redirect_to if url_is_safe else settings.LOGIN_REDIRECT_URL
|
||||
|
||||
def service_name(self):
|
||||
# force a service name, ex. :
|
||||
# server_name = 'HTTP@%s' % self.request.get_host()
|
||||
# return gssapi.Name(server_name, name_type=gssapi.NameType.hostbased_service)
|
||||
# without one, any service name in keytab will do
|
||||
return None
|
||||
|
||||
def _add_www_authenticate(self, response, token=None):
|
||||
encoded_token = base64.b64encode(token).decode('ascii') if token else None
|
||||
logger.debug('sending challenge WWW-Authenticate: Negotiate %s', encoded_token)
|
||||
response['WWW-Authenticate'] = 'Negotiate%s' % (
|
||||
' ' + encoded_token if token else '')
|
||||
|
||||
def challenge(self):
|
||||
'''Send negotiate challenge'''
|
||||
response = TemplateResponse(request, self.unauthorized_template_name, status=401)
|
||||
response['WWW-Authenticate'] = 'Negotiate'
|
||||
return response
|
||||
return http.HttpResponse(status=401)
|
||||
|
||||
def host(self, request):
|
||||
return app_settings.HOSTNAME or request.get_host().split(':')[0]
|
||||
def success(self, request, *args, user=None, **kwargs):
|
||||
'''Do something with the user we found'''
|
||||
auth_login(self.request, user)
|
||||
return self.redirect(request, *args, **kwargs)
|
||||
|
||||
def principal_valid(self, request, *args, **kwargs):
|
||||
'''Do something with the principal we received'''
|
||||
self.logger.info(u'got ticket for principal %s', self.principal)
|
||||
user = authenticate(principal=self.principal)
|
||||
next_url = request.POST.get(self.NEXT_URL_FIELD) or request.GET.get(self.NEXT_URL_FIELD) or settings.LOGIN_REDIRECT_URL
|
||||
if user:
|
||||
self.login_user(request, user)
|
||||
if request.is_ajax():
|
||||
return http.HttpResponse('true' if user else 'false', content_type='application/json')
|
||||
else:
|
||||
if not user:
|
||||
self.logger.warning(u'principal %s has no local user', self.principal)
|
||||
messages.warning(request, _('Principal %s could not be authenticated') %
|
||||
self.principal)
|
||||
return http.HttpResponseRedirect(next_url)
|
||||
|
||||
def login_user(self, request, user):
|
||||
auth_login(request, user)
|
||||
def redirect(self, request, *args, **kwargs):
|
||||
return http.HttpResponseRedirect(self.get_redirect_url(), status=307)
|
||||
|
||||
def negotiate(self, request, *args, **kwargs):
|
||||
'''Try to authenticate the user using SPNEGO and Kerberos'''
|
||||
|
||||
if 'HTTP_AUTHORIZATION' in request.META:
|
||||
kind, authstr = request.META['HTTP_AUTHORIZATION'].split(' ', 1)
|
||||
if kind == 'Negotiate':
|
||||
service = 'HTTP@%s' % self.host(request)
|
||||
self.logger.debug(u'using service name %s', service)
|
||||
self.logger.debug(u'Negotiate authstr %r', authstr)
|
||||
gss_name = self.service_name()
|
||||
if gss_name:
|
||||
logger.debug('using service principal name %s', gss_name)
|
||||
|
||||
out_token = None
|
||||
response = None
|
||||
try:
|
||||
server_creds = gssapi.Credentials(usage='accept', name=gss_name)
|
||||
except gssapi.exceptions.GSSError as e:
|
||||
return self.error(request, message='gssapi credentials failure %s' % e, *args, **kwargs)
|
||||
else:
|
||||
in_token = None
|
||||
if request.META.get('HTTP_AUTHORIZATION', '').startswith('Negotiate '):
|
||||
authstr = request.META['HTTP_AUTHORIZATION'][10:]
|
||||
try:
|
||||
result, context = kerberos.authGSSServerInit(service)
|
||||
except kerberos.KrbError as e:
|
||||
self.logger.warning(u'exception during authGSSServerInit: %s, certainly a '
|
||||
u'keytab problem', e)
|
||||
details = (u'exception during authGSSServerInit: %s, certainly a '
|
||||
u'keytab problem' % e)
|
||||
return TemplateResponse(request, self.error_template_name,
|
||||
context={'details': details}, status=500)
|
||||
# ensure context is finalized
|
||||
in_token = base64.b64decode(authstr)
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
if in_token is not None:
|
||||
server_ctx = gssapi.SecurityContext(creds=server_creds, usage='accept')
|
||||
try:
|
||||
if result != 1:
|
||||
self.logger.warning(u'authGSSServerInit result is non-one: %s', result)
|
||||
details = u'authGSSServerInit result is non-one: %s' % result
|
||||
return TemplateResponse(request, self.error_template_name,
|
||||
context={'details': details}, status=500)
|
||||
try:
|
||||
r = kerberos.authGSSServerStep(context, authstr)
|
||||
except kerberos.KrbError as e:
|
||||
self.logger.warning(u'exception during authGSSServerStep: %s', e)
|
||||
details = u'exception during authGSSServerStep: %s' % e
|
||||
return TemplateResponse(request, self.error_template_name,
|
||||
context={'details': details}, status=500)
|
||||
if r == 1:
|
||||
gssstring = kerberos.authGSSServerResponse(context)
|
||||
else:
|
||||
return self.challenge(request, *args, **kwargs)
|
||||
try:
|
||||
self.principal = kerberos.authGSSServerUserName(context)
|
||||
except kerberos.KrbError as e:
|
||||
self.logger.warning(u'exception during authGSSServerUserName: %s', e)
|
||||
details = u'exception during authGSSServerUserName: %s' % e
|
||||
return TemplateResponse(request, self.error_template_name,
|
||||
context={'details': details}, status=500)
|
||||
finally:
|
||||
kerberos.authGSSServerClean(context)
|
||||
response = self.principal_valid(request, *args, **kwargs)
|
||||
if response:
|
||||
response['WWW-Authenticate'] = 'Negotiate %s' % gssstring
|
||||
return response
|
||||
return self.challenge(request, *args, **kwargs)
|
||||
out_token = server_ctx.step(in_token)
|
||||
except gssapi.exceptions.GSSError as e:
|
||||
return self.error(request, message='gssapi security context failure %s' % e, *args, **kwargs)
|
||||
else:
|
||||
if server_ctx.complete:
|
||||
principal = server_ctx.initiator_name
|
||||
user = authenticate(principal=principal)
|
||||
if user:
|
||||
logger.debug('found user %s for principal %s', user, principal)
|
||||
response = self.success(request, user=user)
|
||||
if response is None:
|
||||
response = self.challenge()
|
||||
self._add_www_authenticate(response, out_token)
|
||||
return response
|
||||
|
||||
def error(self, request, message, *args, **kwargs):
|
||||
logger.warning(message)
|
||||
messages.error(request, message)
|
||||
return self.redirect(request, *args, **kwargs)
|
||||
|
||||
def get(self, request, *args, **kwargs):
|
||||
return self.negotiate(request, *args, **kwargs)
|
||||
|
|
|
@ -1,175 +1,103 @@
|
|||
import logging
|
||||
import pytest
|
||||
import json
|
||||
# django-kerberos - SPNEGO/Kerberos authentication for Django applications
|
||||
# Copyright (C) 2014-2019 Entr'ouvert
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify it
|
||||
# under the terms of the GNU Affero General Public License as published
|
||||
# by the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import kerberos
|
||||
from django.contrib.auth.models import User
|
||||
import base64
|
||||
import logging
|
||||
import os
|
||||
import pytest
|
||||
|
||||
import gssapi
|
||||
import k5test
|
||||
import k5test._utils
|
||||
|
||||
from django.contrib.auth import get_user_model, authenticate
|
||||
|
||||
User = get_user_model()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def kerberos_mock(request, mocker):
|
||||
a = {}
|
||||
d = (
|
||||
'kerberos.authGSSServerInit',
|
||||
'kerberos.authGSSServerStep',
|
||||
'kerberos.authGSSServerResponse',
|
||||
'kerberos.authGSSServerUserName',
|
||||
'kerberos.authGSSServerClean',
|
||||
'kerberos.checkPassword'
|
||||
)
|
||||
for name in d:
|
||||
if hasattr(request, 'param') and name in request.param:
|
||||
continue
|
||||
a[name] = mocker.patch(name)
|
||||
return a
|
||||
def k5env():
|
||||
k5realm = k5test.K5Realm(krb5_config={
|
||||
'libdefaults': {
|
||||
'dns_canonicalize_hostname': False,
|
||||
'rdns': False,
|
||||
}
|
||||
})
|
||||
old_environ = os.environ.copy()
|
||||
try:
|
||||
os.environ.update(k5realm.env)
|
||||
k5realm.http_princ = 'HTTP/testserver@%s' % k5realm.realm
|
||||
k5realm.addprinc(k5realm.http_princ)
|
||||
k5realm.extract_keytab(k5realm.http_princ, k5realm.keytab)
|
||||
yield k5realm
|
||||
finally:
|
||||
os.environ.clear()
|
||||
os.environ.update(old_environ)
|
||||
k5realm.stop()
|
||||
|
||||
|
||||
def test_login_no_header(client, settings, kerberos_mock):
|
||||
client.get('/login/')
|
||||
for mock in kerberos_mock.values():
|
||||
assert mock.call_count == 0
|
||||
def test_login(k5env, client, caplog, db, monkeypatch):
|
||||
caplog.set_level(logging.DEBUG)
|
||||
|
||||
|
||||
@pytest.mark.parametrize('kerberos_mock', ['kerberos.authGSSServerInit'], indirect=True)
|
||||
def test_login_missing_keytab(client, settings, kerberos_mock, caplog):
|
||||
resp = client.get('/login/', HTTP_AUTHORIZATION='Negotiate coin')
|
||||
for key, mock in kerberos_mock.items():
|
||||
assert mock.call_count == 0
|
||||
assert b'keytab problem' in resp.content
|
||||
assert 'keytab problem' in caplog.text
|
||||
|
||||
|
||||
def test_login(client, db, settings, kerberos_mock, caplog):
|
||||
caplog.set_level(logging.INFO)
|
||||
kerberos_mock['kerberos.authGSSServerInit'].side_effect = kerberos.KrbError('coin')
|
||||
response = client.get('/login/', HTTP_AUTHORIZATION='Negotiate xxxx')
|
||||
assert response.status_code == 500
|
||||
assert b'exception during authGSSServerInit' in response.content
|
||||
assert 'exception during authGSSServerInit' in caplog.text
|
||||
assert b'coin' in response.content
|
||||
kerberos_mock['kerberos.authGSSServerInit'].side_effect = None
|
||||
caplog.clear()
|
||||
|
||||
kerberos_mock['kerberos.authGSSServerInit'].return_value = 0, None
|
||||
response = client.get('/login/', HTTP_AUTHORIZATION='Negotiate xxxx')
|
||||
assert response.status_code == 500
|
||||
assert b'authGSSServerInit result is non-one' in response.content
|
||||
assert 'authGSSServerInit result is non-one' in caplog.text
|
||||
caplog.clear()
|
||||
|
||||
kerberos_mock['kerberos.authGSSServerInit'].return_value = 1, None
|
||||
kerberos_mock['kerberos.authGSSServerStep'].side_effect = kerberos.KrbError('coin')
|
||||
response = client.get('/login/', HTTP_AUTHORIZATION='Negotiate xxxx')
|
||||
assert response.status_code == 500
|
||||
assert b'exception during authGSSServerStep' in response.content
|
||||
assert 'exception during authGSSServerStep' in caplog.text
|
||||
assert b'coin' in response.content
|
||||
kerberos_mock['kerberos.authGSSServerStep'].side_effect = None
|
||||
caplog.clear()
|
||||
|
||||
kerberos_mock['kerberos.authGSSServerStep'].return_value = 0
|
||||
response = client.get('/login/', HTTP_AUTHORIZATION='Negotiate xxxx')
|
||||
response = client.get('/login/')
|
||||
assert response.status_code == 401
|
||||
|
||||
kerberos_mock['kerberos.authGSSServerStep'].return_value = 1
|
||||
kerberos_mock['kerberos.authGSSServerUserName'].side_effect = kerberos.KrbError('coin')
|
||||
response = client.get('/login/', HTTP_AUTHORIZATION='Negotiate xxxx')
|
||||
assert response.status_code == 500
|
||||
assert b'exception during authGSSServerUserName' in response.content
|
||||
assert 'exception during authGSSServerUserName' in caplog.text
|
||||
assert b'coin' in response.content
|
||||
kerberos_mock['kerberos.authGSSServerUserName'].side_effect = None
|
||||
caplog.clear()
|
||||
service_name = gssapi.Name('HTTP@testserver', gssapi.NameType.hostbased_service)
|
||||
service_name.canonicalize(gssapi.MechType.kerberos)
|
||||
|
||||
kerberos_mock['kerberos.authGSSServerUserName'].return_value = 'john.doe@EXAMPLE.COM'
|
||||
kerberos_mock['kerberos.authGSSServerResponse'].return_value = 'yyyy'
|
||||
response = client.get('/login/', HTTP_AUTHORIZATION='Negotiate xxxx')
|
||||
assert response.status_code == 302
|
||||
assert 'principal john.doe@EXAMPLE.COM has no local user' in caplog.text
|
||||
caplog.clear()
|
||||
# first attempt
|
||||
ctx = gssapi.SecurityContext(usage='initiate', name=service_name)
|
||||
token = ctx.step()
|
||||
response = client.get('/login/',
|
||||
HTTP_AUTHORIZATION='Negotiate %s' % base64.b64encode(token).decode('ascii'))
|
||||
|
||||
user = User.objects.create(username='john.doe@example.com')
|
||||
assert response.status_code == 401
|
||||
assert '_auth_user_id' not in client.session
|
||||
response = client.get('/login/', HTTP_AUTHORIZATION='Negotiate xxxx')
|
||||
assert response.status_code == 302
|
||||
assert response['WWW-Authenticate'] == 'Negotiate yyyy'
|
||||
assert 'principal john.doe@EXAMPLE.COM has no local user' not in caplog.text
|
||||
assert client.session['_auth_user_id'] == str(user.id)
|
||||
client.logout()
|
||||
user.delete()
|
||||
assert User.objects.count() == 0
|
||||
caplog.clear()
|
||||
|
||||
settings.KERBEROS_BACKEND_CREATE = True
|
||||
assert '_auth_user_id' not in client.session
|
||||
response = client.get('/login/', HTTP_AUTHORIZATION='Negotiate xxxx')
|
||||
assert response.status_code == 302
|
||||
assert 'principal john.doe@EXAMPLE.COM has no local user' not in caplog.text
|
||||
assert User.objects.count() == 1
|
||||
user = User.objects.get()
|
||||
assert not user.is_staff
|
||||
assert not user.is_superuser
|
||||
assert client.session['_auth_user_id'] == str(user.id)
|
||||
assert 'got ticket for principal john.doe@EXAMPLE.COM' in caplog.text
|
||||
client.logout()
|
||||
caplog.clear()
|
||||
# create an user...
|
||||
User.objects.create(username=k5env.user_princ)
|
||||
|
||||
settings.KERBEROS_BACKEND_ADMIN_REGEXP = 'john.doe'
|
||||
assert '_auth_user_id' not in client.session
|
||||
response = client.get('/login/', HTTP_AUTHORIZATION='Negotiate xxxx')
|
||||
assert response.status_code == 302
|
||||
assert 'principal john.doe@EXAMPLE.COM has no local user' not in caplog.text
|
||||
assert User.objects.count() == 1
|
||||
user = User.objects.get()
|
||||
assert user.is_staff
|
||||
assert user.is_superuser
|
||||
assert client.session['_auth_user_id'] == str(user.id)
|
||||
assert 'got ticket for principal john.doe@EXAMPLE.COM' in caplog.text
|
||||
assert 'giving superuser power to principal \'john.doe@EXAMPLE.COM\'' in caplog.text
|
||||
client.logout()
|
||||
caplog.clear()
|
||||
# and retry.
|
||||
ctx = gssapi.SecurityContext(usage='initiate', name=service_name)
|
||||
token = ctx.step()
|
||||
response = client.get('/login/',
|
||||
HTTP_AUTHORIZATION='Negotiate %s' % base64.b64encode(token).decode('ascii'))
|
||||
|
||||
assert '_auth_user_id' not in client.session
|
||||
response = client.get('/login/', HTTP_AUTHORIZATION='Negotiate xxxx', HTTP_X_REQUESTED_WITH='XMLHttpRequest')
|
||||
assert response.status_code == 200
|
||||
assert json.loads(response.content.decode('ascii')) is True
|
||||
assert response.status_code == 307
|
||||
assert client.session['_auth_user_id']
|
||||
|
||||
# break service name resolution
|
||||
monkeypatch.setattr('django_kerberos.views.NegotiateView.service_name',
|
||||
lambda self: gssapi.Name('HTTP@whatever.invalid', gssapi.NameType.hostbased_service))
|
||||
ctx = gssapi.SecurityContext(usage='initiate', name=service_name)
|
||||
token = ctx.step()
|
||||
response = client.get('/login/',
|
||||
HTTP_AUTHORIZATION='Negotiate %s' % base64.b64encode(token).decode('ascii'),
|
||||
HTTP_X_REQUESTED_WITH='XMLHttpRequest')
|
||||
assert response.status_code == 307
|
||||
cookies = response.cookies.output()
|
||||
assert 'gssapi security context failure' in caplog.text or 'gssapi credentials failure' in caplog.text
|
||||
assert 'gssapi security context failure' in cookies or 'gssapi credentials failure' in cookies
|
||||
|
||||
|
||||
def test_password_backend(db, settings, kerberos_mock, caplog):
|
||||
from django.contrib.auth import authenticate
|
||||
def test_password(k5env, db):
|
||||
user = User.objects.create(username=k5env.user_princ)
|
||||
|
||||
settings.KERBEROS_DEFAULT_REALM = 'EXAMPLE.COM'
|
||||
settings.KERBEROS_SERVICE_PRINCIPAL = 'HTTP/SERVICE.EXAMPLE.COM@EXAMPLE.COM'
|
||||
k5env.run(['kdestroy'])
|
||||
|
||||
m = kerberos_mock['kerberos.checkPassword']
|
||||
m.return_value = False
|
||||
assert authenticate(username='john.doe', password='password') is None
|
||||
assert User.objects.count() == 0
|
||||
|
||||
m.return_value = True
|
||||
assert authenticate(username='john.doe', password='password') is None
|
||||
assert User.objects.count() == 0
|
||||
|
||||
user = User.objects.create(username='john.doe@example.com')
|
||||
assert authenticate(username='john.doe', password='password') == user
|
||||
user.delete()
|
||||
|
||||
assert User.objects.count() == 0
|
||||
settings.KERBEROS_BACKEND_CREATE = True
|
||||
new_user = authenticate(username='john.doe', password='password')
|
||||
assert new_user
|
||||
assert User.objects.count() == 1
|
||||
assert new_user.username == 'john.doe@example.com'
|
||||
assert not new_user.has_usable_password()
|
||||
|
||||
settings.KERBEROS_KEEP_PASSWORD = True
|
||||
new_user = authenticate(username='john.doe', password='password')
|
||||
assert User.objects.count() == 1
|
||||
assert new_user.username == 'john.doe@example.com'
|
||||
assert new_user.has_usable_password()
|
||||
assert new_user.check_password('password')
|
||||
|
||||
caplog.clear()
|
||||
m.side_effect = kerberos.KrbError('coin')
|
||||
assert authenticate(username='john.doe', password='password') is None
|
||||
assert 'password validation for principal %r failed coin' % u'john.doe@EXAMPLE.COM' in caplog.text
|
||||
assert authenticate(username=k5env.user_princ, password='nogood') is None
|
||||
assert authenticate(username=k5env.user_princ, password=k5env.password('user')) == user
|
||||
assert not os.path.exists(k5env.ccache)
|
||||
|
|
37
tox.ini
37
tox.ini
|
@ -5,37 +5,40 @@
|
|||
|
||||
[tox]
|
||||
toxworkdir = {env:TMPDIR:/tmp}/tox-{env:USER}/django-kerberos/{env:BRANCH_NAME:}
|
||||
envlist = py27-coverage-{dj18,dj111}-{pg,sqlite},py3-coverage-{dj18,dj111,dj20,djlast}-{pg,sqlite},pylint
|
||||
envlist =
|
||||
py3-dj111
|
||||
py3-dj22-gssapi{141,}
|
||||
|
||||
[testenv]
|
||||
whitelist_externals =
|
||||
/bin/mv
|
||||
/bin/rm
|
||||
setenv =
|
||||
DJANGO_SETTINGS_MODULE=settings
|
||||
PYTHONPATH=tests
|
||||
SETUPTOOLS_USE_DISTUTILS=stdlib
|
||||
PYTHONPATH=.
|
||||
DJANGO_SETTINGS_MODULE=tests.settings
|
||||
coverage: COVERAGE=--cov-branch --cov-append --cov=src/ --cov-report=html --cov-report=xml --cov-config .coveragerc
|
||||
sqlite: DB_ENGINE=django.db.backends.sqlite3
|
||||
pg: DB_ENGINE=django.db.backends.postgresql_psycopg2
|
||||
DB_ENGINE=django.db.backends.postgresql_psycopg2
|
||||
JUNIT={tty::-o junit_suite_name={envname} --junit-xml=junit-{envname}.xml}
|
||||
COVERAGE={tty::--cov --cov-append --cov-report xml --cov-report html --cov-config=tox.ini}
|
||||
SETUPTOOLS_USE_DISTUTILS=stdlib
|
||||
!nosw: SW={tty:--sw:}
|
||||
KRB5_KTNAME=/tmp/coin
|
||||
KRB5_CLIENT_KTNAME=/tmp/coin
|
||||
KRB5CCNAME=/tmp/coin
|
||||
usedevelop = true
|
||||
deps =
|
||||
dj18: django>1.8,<1.9
|
||||
dj18: django-tables2<1.1
|
||||
dj111: django<2.0
|
||||
dj20: django<2.1
|
||||
djlast: django
|
||||
pg: psycopg2-binary
|
||||
dj22: django<2.3
|
||||
gssapi141: gssapi==1.4.1
|
||||
psycopg2-binary
|
||||
pytest
|
||||
pytest-mock
|
||||
pytest-django
|
||||
pytest-cov
|
||||
k5test
|
||||
commands =
|
||||
py.test {env:COVERAGE:} -o junit_suite_name={envname} --junit-xml=junit-{envname}.xml {posargs:tests}
|
||||
py.test {env:COVERAGE:} {env:JUNIT:} {env:SW:} {posargs:tests/}
|
||||
|
||||
[testenv:pylint]
|
||||
deps =
|
||||
pylint<1.8
|
||||
pylint-django<0.8.1
|
||||
pylint
|
||||
pylint-django
|
||||
commands =
|
||||
pylint: ./pylint.sh src/django_kerberos/
|
||||
|
|
Reference in New Issue