tests: augment coverage
This commit is contained in:
parent
970a1d13c0
commit
5628f42c5a
|
@ -70,13 +70,13 @@ class KerberosPasswordBackend(object):
|
|||
|
||||
principal = self.principal_from_user(user)
|
||||
|
||||
name = gb.import_name(force_bytes(principal), gb.NameType.kerberos_principal)
|
||||
try:
|
||||
name = gb.import_name(force_bytes(principal), gb.NameType.kerberos_principal)
|
||||
if gb.acquire_cred_with_password(name, force_bytes(password)):
|
||||
if not user.check_password(password):
|
||||
user.set_password(password)
|
||||
user.save()
|
||||
return user
|
||||
gb.acquire_cred_with_password(name, force_bytes(password))
|
||||
except gssapi.exceptions.GSSError as e:
|
||||
logger.debug('Kerberos: password check failed for principal %s: %s', principal, e)
|
||||
return None
|
||||
return None
|
||||
if not user.check_password(password):
|
||||
user.set_password(password)
|
||||
user.save()
|
||||
return user
|
||||
|
|
|
@ -52,7 +52,7 @@ def negotiate(request, name=None, store=None):
|
|||
authstr = request.META['HTTP_AUTHORIZATION'][10:]
|
||||
try:
|
||||
in_token = base64.b64decode(authstr)
|
||||
except ValueError:
|
||||
except (TypeError, ValueError):
|
||||
return None, None
|
||||
|
||||
server_ctx = gssapi.SecurityContext(creds=server_creds, usage='accept')
|
||||
|
@ -60,6 +60,8 @@ def negotiate(request, name=None, store=None):
|
|||
out_token = server_ctx.step(in_token)
|
||||
except gssapi.exceptions.GSSError as e:
|
||||
logging.debug('GSSAPI security context failure: %s', e)
|
||||
return None, None
|
||||
|
||||
if not server_ctx.complete:
|
||||
raise NegotiateContinue(out_token)
|
||||
|
||||
|
|
|
@ -22,10 +22,16 @@ User = get_user_model()
|
|||
|
||||
|
||||
def test_kerberos_password(k5env, db):
|
||||
user = User.objects.create(username=k5env.user_princ)
|
||||
|
||||
k5env.run(['kdestroy'])
|
||||
|
||||
assert authenticate(username=k5env.user_princ, password='nogood') is None
|
||||
user = User.objects.create(username=k5env.user_princ, is_active=False)
|
||||
assert not user.check_password(k5env.password('user'))
|
||||
assert authenticate(username=k5env.user_princ, password=k5env.password('user')) is None
|
||||
user.is_active = True
|
||||
user.save()
|
||||
assert authenticate(username=k5env.user_princ, password=k5env.password('user')) == user
|
||||
user.refresh_from_db()
|
||||
assert user.check_password(k5env.password('user'))
|
||||
assert authenticate(username=k5env.user_princ, password=k5env.password('user')) == user
|
||||
assert not os.path.exists(k5env.ccache)
|
||||
|
|
|
@ -29,19 +29,29 @@ def test_login(k5env, client, caplog, db, settings):
|
|||
response = client.get('/login/')
|
||||
assert response.status_code == 401
|
||||
|
||||
response = client.get('/login/', HTTP_AUTHORIZATION=k5env.spnego())
|
||||
response = client.get('/login/', HTTP_AUTHORIZATION='Negotiate xxx')
|
||||
assert response.status_code == 401
|
||||
assert '_auth_user_id' not in client.session
|
||||
|
||||
response = client.get('/login/', HTTP_AUTHORIZATION=k5env.spnego())
|
||||
assert response.status_code == 401
|
||||
assert '_auth_user_id' not in client.session
|
||||
|
||||
# create an user...
|
||||
User.objects.create(username=k5env.user_princ)
|
||||
user = User.objects.create(username=k5env.user_princ, is_active=False)
|
||||
|
||||
# still no good, user is inactive
|
||||
response = client.get('/login/', HTTP_AUTHORIZATION=k5env.spnego())
|
||||
assert response.status_code == 401
|
||||
assert '_auth_user_id' not in client.session
|
||||
|
||||
user.is_active = True
|
||||
user.save()
|
||||
# and retry.
|
||||
response = client.get('/login/', HTTP_AUTHORIZATION=k5env.spnego())
|
||||
|
||||
assert response.status_code == 302
|
||||
assert client.session['_auth_user_id']
|
||||
assert int(client.session['_auth_user_id']) == user.id
|
||||
|
||||
# break service name resolution
|
||||
settings.GSSAPI_NAME = gssapi.Name('HTTP@localhost', gssapi.NameType.hostbased_service)
|
||||
|
|
Loading…
Reference in New Issue