tests: augment coverage

This commit is contained in:
Benjamin Dauvergne 2019-08-23 17:08:55 +02:00
parent 970a1d13c0
commit 5628f42c5a
4 changed files with 31 additions and 13 deletions

View File

@ -70,13 +70,13 @@ class KerberosPasswordBackend(object):
principal = self.principal_from_user(user)
name = gb.import_name(force_bytes(principal), gb.NameType.kerberos_principal)
try:
name = gb.import_name(force_bytes(principal), gb.NameType.kerberos_principal)
if gb.acquire_cred_with_password(name, force_bytes(password)):
if not user.check_password(password):
user.set_password(password)
user.save()
return user
gb.acquire_cred_with_password(name, force_bytes(password))
except gssapi.exceptions.GSSError as e:
logger.debug('Kerberos: password check failed for principal %s: %s', principal, e)
return None
return None
if not user.check_password(password):
user.set_password(password)
user.save()
return user

View File

@ -52,7 +52,7 @@ def negotiate(request, name=None, store=None):
authstr = request.META['HTTP_AUTHORIZATION'][10:]
try:
in_token = base64.b64decode(authstr)
except ValueError:
except (TypeError, ValueError):
return None, None
server_ctx = gssapi.SecurityContext(creds=server_creds, usage='accept')
@ -60,6 +60,8 @@ def negotiate(request, name=None, store=None):
out_token = server_ctx.step(in_token)
except gssapi.exceptions.GSSError as e:
logging.debug('GSSAPI security context failure: %s', e)
return None, None
if not server_ctx.complete:
raise NegotiateContinue(out_token)

View File

@ -22,10 +22,16 @@ User = get_user_model()
def test_kerberos_password(k5env, db):
user = User.objects.create(username=k5env.user_princ)
k5env.run(['kdestroy'])
assert authenticate(username=k5env.user_princ, password='nogood') is None
user = User.objects.create(username=k5env.user_princ, is_active=False)
assert not user.check_password(k5env.password('user'))
assert authenticate(username=k5env.user_princ, password=k5env.password('user')) is None
user.is_active = True
user.save()
assert authenticate(username=k5env.user_princ, password=k5env.password('user')) == user
user.refresh_from_db()
assert user.check_password(k5env.password('user'))
assert authenticate(username=k5env.user_princ, password=k5env.password('user')) == user
assert not os.path.exists(k5env.ccache)

View File

@ -29,19 +29,29 @@ def test_login(k5env, client, caplog, db, settings):
response = client.get('/login/')
assert response.status_code == 401
response = client.get('/login/', HTTP_AUTHORIZATION=k5env.spnego())
response = client.get('/login/', HTTP_AUTHORIZATION='Negotiate xxx')
assert response.status_code == 401
assert '_auth_user_id' not in client.session
response = client.get('/login/', HTTP_AUTHORIZATION=k5env.spnego())
assert response.status_code == 401
assert '_auth_user_id' not in client.session
# create an user...
User.objects.create(username=k5env.user_princ)
user = User.objects.create(username=k5env.user_princ, is_active=False)
# still no good, user is inactive
response = client.get('/login/', HTTP_AUTHORIZATION=k5env.spnego())
assert response.status_code == 401
assert '_auth_user_id' not in client.session
user.is_active = True
user.save()
# and retry.
response = client.get('/login/', HTTP_AUTHORIZATION=k5env.spnego())
assert response.status_code == 302
assert client.session['_auth_user_id']
assert int(client.session['_auth_user_id']) == user.id
# break service name resolution
settings.GSSAPI_NAME = gssapi.Name('HTTP@localhost', gssapi.NameType.hostbased_service)