Fixed tests
This commit is contained in:
parent
049e64c99b
commit
1f3a5c795e
|
@ -6,7 +6,7 @@ from oic.utils.webfinger import WebFinger
|
|||
|
||||
__author__ = 'rohe0002'
|
||||
|
||||
from oic.utils.sdb import SessionDB
|
||||
from oic.utils.sdb import SessionDB, AuthnEvent
|
||||
from oic.utils.time_util import utc_time_sans_frac
|
||||
|
||||
from oic.oic import Server
|
||||
|
@ -43,7 +43,7 @@ ENDPOINT = {
|
|||
class MyFakeOICServer(Server):
|
||||
def __init__(self, name=""):
|
||||
Server.__init__(self)
|
||||
self.sdb = SessionDB()
|
||||
self.sdb = SessionDB(name)
|
||||
self.name = name
|
||||
self.client = {}
|
||||
self.registration_expires_in = 3600
|
||||
|
@ -96,9 +96,10 @@ class MyFakeOICServer(Server):
|
|||
|
||||
def authorization_endpoint(self, query):
|
||||
req = self.parse_authorization_request(query=query)
|
||||
sid = self.sdb.create_authz_session(sub="user", areq=req)
|
||||
aevent = AuthnEvent("user", authn_info="acr")
|
||||
sid = self.sdb.create_authz_session(aevent, areq=req)
|
||||
sub = self.sdb.do_sub(sid)
|
||||
_info = self.sdb[sid]
|
||||
_info["sub"] = _info["local_sub"]
|
||||
|
||||
if "code" in req["response_type"]:
|
||||
if "token" in req["response_type"]:
|
||||
|
|
|
@ -109,6 +109,7 @@ AUTHZ = AuthzHandling()
|
|||
SYMKEY = "symmetric key used to encrypt cookie info"
|
||||
USERINFO = UserInfo(USERDB)
|
||||
|
||||
provider_init = Provider("pyoicserv", SessionDB(), CDB, AUTHN_BROKER, USERINFO,
|
||||
provider_init = Provider("pyoicserv", SessionDB(SERVER_INFO["issuer"]), CDB,
|
||||
AUTHN_BROKER, USERINFO,
|
||||
AUTHZ, verify_client, SYMKEY, urlmap=URLMAP,
|
||||
keyjar=KEYJAR)
|
||||
|
|
|
@ -89,21 +89,21 @@ def _eq(l1, l2):
|
|||
|
||||
|
||||
def test_provider_init():
|
||||
provider = Provider("pyoicserv", sdb.SessionDB(), CDB, AUTHN_BROKER, AUTHZ,
|
||||
verify_client)
|
||||
provider = Provider("pyoicserv", sdb.SessionDB(SERVER_INFO["issuer"]), CDB,
|
||||
AUTHN_BROKER, AUTHZ, verify_client)
|
||||
|
||||
assert provider
|
||||
|
||||
provider = Provider("pyoicserv", sdb.SessionDB(), CDB, AUTHN_BROKER, AUTHZ,
|
||||
verify_client,
|
||||
provider = Provider("pyoicserv", sdb.SessionDB(SERVER_INFO["issuer"]), CDB,
|
||||
AUTHN_BROKER, AUTHZ, verify_client,
|
||||
urlmap={"client1": ["https://example.com/authz"]})
|
||||
|
||||
assert provider.urlmap["client1"] == ["https://example.com/authz"]
|
||||
|
||||
|
||||
def test_provider_authorization_endpoint():
|
||||
provider = Provider("pyoicserv", sdb.SessionDB(), CDB, AUTHN_BROKER, AUTHZ,
|
||||
verify_client)
|
||||
provider = Provider("pyoicserv", sdb.SessionDB(SERVER_INFO["issuer"]), CDB,
|
||||
AUTHN_BROKER, AUTHZ, verify_client)
|
||||
|
||||
bib = {"scope": ["openid"],
|
||||
"state": "id-6da9ca0cc23959f5f33e8becd9b08cae",
|
||||
|
@ -121,8 +121,8 @@ def test_provider_authorization_endpoint():
|
|||
|
||||
|
||||
def test_provider_authenticated():
|
||||
provider = Provider("pyoicserv", sdb.SessionDB(), CDB, AUTHN_BROKER, AUTHZ,
|
||||
verify_client, symkey=rndstr(16))
|
||||
provider = Provider("pyoicserv", sdb.SessionDB(SERVER_INFO["issuer"]), CDB,
|
||||
AUTHN_BROKER, AUTHZ, verify_client, symkey=rndstr(16))
|
||||
_session_db = {}
|
||||
cons = Consumer(_session_db, client_config=CLIENT_CONFIG,
|
||||
server_info=SERVER_INFO, **CONSUMER_CONFIG)
|
||||
|
@ -154,8 +154,8 @@ def test_provider_authenticated():
|
|||
|
||||
|
||||
def test_provider_authenticated_token():
|
||||
provider = Provider("pyoicserv", sdb.SessionDB(), CDB, AUTHN_BROKER, AUTHZ,
|
||||
verify_client, symkey=rndstr(16))
|
||||
provider = Provider("pyoicserv", sdb.SessionDB(SERVER_INFO["issuer"]), CDB,
|
||||
AUTHN_BROKER, AUTHZ, verify_client, symkey=rndstr(16))
|
||||
_session_db = {}
|
||||
cons = Consumer(_session_db, client_config=CLIENT_CONFIG,
|
||||
server_info=SERVER_INFO, **CONSUMER_CONFIG)
|
||||
|
@ -176,8 +176,8 @@ def test_provider_authenticated_token():
|
|||
|
||||
|
||||
def test_token_endpoint():
|
||||
provider = Provider("pyoicserv", sdb.SessionDB(), CDB, AUTHN_BROKER, AUTHZ,
|
||||
verify_client, symkey=rndstr(16))
|
||||
provider = Provider("pyoicserv", sdb.SessionDB(SERVER_INFO["issuer"]), CDB,
|
||||
AUTHN_BROKER, AUTHZ, verify_client, symkey=rndstr(16))
|
||||
|
||||
authreq = AuthorizationRequest(state="state",
|
||||
redirect_uri="http://example.com/authz",
|
||||
|
@ -212,7 +212,8 @@ def test_token_endpoint():
|
|||
|
||||
|
||||
def test_token_endpoint_unauth():
|
||||
provider = Provider("pyoicserv", sdb.SessionDB(), CDB, AUTHN_BROKER, AUTHZ,
|
||||
provider = Provider("pyoicserv", sdb.SessionDB(SERVER_INFO["issuer"]), CDB,
|
||||
AUTHN_BROKER, AUTHZ,
|
||||
verify_client, symkey=rndstr(16))
|
||||
|
||||
authreq = AuthorizationRequest(state="state",
|
||||
|
|
|
@ -115,8 +115,8 @@ AUTHZ_ORG_URL = "http://example.org/authorization"
|
|||
|
||||
class TestOICConsumer():
|
||||
def setup_class(self):
|
||||
self.consumer = Consumer(SessionDB(), CONFIG, CLIENT_CONFIG,
|
||||
SERVER_INFO)
|
||||
self.consumer = Consumer(SessionDB(SERVER_INFO["issuer"]),
|
||||
CONFIG, CLIENT_CONFIG, SERVER_INFO)
|
||||
self.consumer.client_secret = CLIENT_SECRET
|
||||
|
||||
def test_init(self):
|
||||
|
@ -306,7 +306,8 @@ class TestOICConsumer():
|
|||
|
||||
|
||||
def test_complete_secret_auth():
|
||||
consumer = Consumer(SessionDB(), CONFIG, CLIENT_CONFIG, SERVER_INFO)
|
||||
consumer = Consumer(SessionDB(SERVER_INFO["issuer"]), CONFIG,
|
||||
CLIENT_CONFIG, SERVER_INFO)
|
||||
mfos = MyFakeOICServer("http://localhost:8088")
|
||||
mfos.keyjar = SRVKEYS
|
||||
consumer.http_request = mfos.http_request
|
||||
|
@ -343,7 +344,8 @@ def test_complete_secret_auth():
|
|||
|
||||
|
||||
def test_complete_auth_token():
|
||||
consumer = Consumer(SessionDB(), CONFIG, CLIENT_CONFIG, SERVER_INFO)
|
||||
consumer = Consumer(SessionDB(SERVER_INFO["issuer"]), CONFIG,
|
||||
CLIENT_CONFIG, SERVER_INFO)
|
||||
mfos = MyFakeOICServer("http://localhost:8088")
|
||||
mfos.keyjar = SRVKEYS
|
||||
consumer.http_request = mfos.http_request
|
||||
|
@ -386,7 +388,8 @@ def test_complete_auth_token():
|
|||
|
||||
|
||||
def test_complete_auth_token_idtoken():
|
||||
consumer = Consumer(SessionDB(), CONFIG, CLIENT_CONFIG, SERVER_INFO)
|
||||
consumer = Consumer(SessionDB(SERVER_INFO["issuer"]), CONFIG,
|
||||
CLIENT_CONFIG, SERVER_INFO)
|
||||
consumer.keyjar = CLIKEYS
|
||||
mfos = MyFakeOICServer("http://localhost:8088")
|
||||
mfos.keyjar = SRVKEYS
|
||||
|
@ -427,7 +430,8 @@ def test_complete_auth_token_idtoken():
|
|||
|
||||
|
||||
def test_userinfo():
|
||||
consumer = Consumer(SessionDB(), CONFIG, CLIENT_CONFIG, SERVER_INFO)
|
||||
consumer = Consumer(SessionDB(SERVER_INFO["issuer"]), CONFIG,
|
||||
CLIENT_CONFIG, SERVER_INFO)
|
||||
consumer.keyjar = CLIKEYS
|
||||
mfos = MyFakeOICServer("http://localhost:8088")
|
||||
mfos.keyjar = SRVKEYS
|
||||
|
|
|
@ -8,6 +8,7 @@ from oic.utils.authn.authn_context import AuthnBroker
|
|||
from oic.utils.authn.client import verify_client
|
||||
from oic.utils.authn.user import UserAuthnMethod
|
||||
from oic.utils.authz import AuthzHandling
|
||||
from oic.utils.http_util import Response
|
||||
from oic.utils.userinfo import UserInfo
|
||||
|
||||
from oic.exception import RedirectURIError
|
||||
|
@ -28,7 +29,7 @@ from oic.oic.message import CheckSessionRequest
|
|||
from oic.oic.message import RegistrationRequest
|
||||
from oic.oic.message import IdToken
|
||||
|
||||
from oic.utils.sdb import SessionDB
|
||||
from oic.utils.sdb import SessionDB, AuthnEvent
|
||||
from oic.oic import Client
|
||||
from oic.oic import make_openid_request
|
||||
|
||||
|
@ -105,8 +106,8 @@ CDB = {
|
|||
},
|
||||
CLIENT_ID: {
|
||||
"client_secret": CLIENT_SECRET,
|
||||
"redirect_uris": [("http://localhost:8087/authz", None)]
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
USERDB = {
|
||||
|
@ -142,7 +143,11 @@ class DummyAuthn(UserAuthnMethod):
|
|||
self.user = user
|
||||
|
||||
def authenticated_as(self, cookie=None, **kwargs):
|
||||
return {"uid": self.user}
|
||||
if cookie == "FAIL":
|
||||
return None
|
||||
else:
|
||||
return {"uid": self.user}
|
||||
|
||||
|
||||
#AUTHN = UsernamePasswordMako(None, "login.mako", tl, PASSWD, "authenticated")
|
||||
AUTHN_BROKER = AuthnBroker()
|
||||
|
@ -153,7 +158,8 @@ AUTHZ = AuthzHandling()
|
|||
SYMKEY = rndstr(16) # symmetric key used to encrypt cookie info
|
||||
USERINFO = UserInfo(USERDB)
|
||||
|
||||
provider_init = Provider("pyoicserv", SessionDB(), CDB, AUTHN_BROKER, USERINFO,
|
||||
provider_init = Provider("pyoicserv", SessionDB(SERVER_INFO["issuer"]), CDB,
|
||||
AUTHN_BROKER, USERINFO,
|
||||
AUTHZ, verify_client, SYMKEY, urlmap=URLMAP,
|
||||
keyjar=KEYJAR)
|
||||
|
||||
|
@ -230,10 +236,13 @@ def test_server_authorization_endpoint_id_token():
|
|||
redirect_uri="http://example.com/authz",
|
||||
scope=["openid"], state="state000")
|
||||
|
||||
sdb = SessionDB()
|
||||
sid = sdb.create_authz_session("userX", AREQ)
|
||||
|
||||
sdb = provider.sdb
|
||||
ae = AuthnEvent("userX")
|
||||
sid = sdb.create_authz_session(ae, AREQ)
|
||||
sdb.do_sub(sid)
|
||||
_info = sdb[sid]
|
||||
# All this is jut removed when the id_token is constructed
|
||||
# The proper information comes from the session information
|
||||
_user_info = IdToken(iss="https://foo.example.om", sub="foo",
|
||||
aud=bib["client_id"], exp=epoch_in_a_while(minutes=10),
|
||||
acr="2", nonce=bib["nonce"])
|
||||
|
@ -244,14 +253,23 @@ def test_server_authorization_endpoint_id_token():
|
|||
user_info=_user_info)
|
||||
|
||||
req["id_token"] = idt
|
||||
query_string = req.to_urlencoded()
|
||||
|
||||
QUERY_STRING = req.to_urlencoded()
|
||||
|
||||
resp = provider.authorization_endpoint(request=QUERY_STRING)
|
||||
# client_id not in id_token["aud"] so login required
|
||||
resp = provider.authorization_endpoint(request=query_string, cookie="FAIL")
|
||||
|
||||
print resp
|
||||
assert "error=login_required" in resp.message
|
||||
|
||||
req["client_id"] = "client_1"
|
||||
query_string = req.to_urlencoded()
|
||||
|
||||
# client_id is in id_token["aud"] so no login required
|
||||
resp = provider.authorization_endpoint(request=query_string, cookie="FAIL")
|
||||
|
||||
print resp.message
|
||||
assert resp.message.startswith("http://localhost:8087/authz")
|
||||
|
||||
|
||||
def test_server_authenticated():
|
||||
server = provider_init
|
||||
|
@ -409,17 +427,18 @@ def test_token_endpoint():
|
|||
_sdb = server.sdb
|
||||
sid = _sdb.token.key(user="sub", areq=authreq)
|
||||
access_grant = _sdb.token(sid=sid)
|
||||
ae = AuthnEvent("user")
|
||||
_sdb[sid] = {
|
||||
"oauth_state": "authz",
|
||||
"sub": "sub",
|
||||
"authn_event": ae,
|
||||
"authzreq": authreq.to_json(),
|
||||
"client_id": CLIENT_ID,
|
||||
"code": access_grant,
|
||||
"code_used": False,
|
||||
"scope": ["openid"],
|
||||
"redirect_uri": "http://example.com/authz",
|
||||
"auth_time": 1000000
|
||||
}
|
||||
_sdb.do_sub(sid)
|
||||
|
||||
# Construct Access token request
|
||||
areq = AccessTokenRequest(code=access_grant, client_id=CLIENT_ID,
|
||||
|
@ -446,9 +465,10 @@ def test_token_endpoint_unauth():
|
|||
_sdb = server.sdb
|
||||
sid = _sdb.token.key(user="sub", areq=authreq)
|
||||
access_grant = _sdb.token(sid=sid)
|
||||
ae = AuthnEvent("user")
|
||||
_sdb[sid] = {
|
||||
"authn_event": ae,
|
||||
"oauth_state": "authz",
|
||||
"sub": "sub",
|
||||
"authzreq": "",
|
||||
"client_id": "client_1",
|
||||
"code": access_grant,
|
||||
|
@ -456,6 +476,7 @@ def test_token_endpoint_unauth():
|
|||
"scope": ["openid"],
|
||||
"redirect_uri": "http://example.com/authz"
|
||||
}
|
||||
_sdb.do_sub(sid)
|
||||
|
||||
# Construct Access token request
|
||||
areq = AccessTokenRequest(code=access_grant,
|
||||
|
@ -495,7 +516,9 @@ def test_idtoken():
|
|||
redirect_uri="http://example.com/authz",
|
||||
scope=["openid"], state="state000")
|
||||
|
||||
sid = server.sdb.create_authz_session("sub", AREQ)
|
||||
ae = AuthnEvent("sub")
|
||||
sid = server.sdb.create_authz_session(ae, AREQ)
|
||||
server.sdb.do_sub(sid)
|
||||
session = server.sdb[sid]
|
||||
|
||||
id_token = server.id_token_as_signed_jwt(session)
|
||||
|
@ -531,7 +554,7 @@ def test_userinfo_endpoint():
|
|||
ident = OpenIDSchema().deserialize(resp3.message, "json")
|
||||
print ident.keys()
|
||||
assert _eq(ident.keys(), ['nickname', 'sub', 'name', 'email'])
|
||||
assert ident["sub"] == USERDB["username"]["sub"]
|
||||
assert ident["sub"] == hash(USERDB["username"]["sub"]+server.sdb.base_url)
|
||||
|
||||
|
||||
def test_check_session_endpoint():
|
||||
|
@ -577,8 +600,8 @@ def test_registration_endpoint():
|
|||
|
||||
|
||||
def test_provider_key_setup():
|
||||
provider = Provider("pyoicserv", SessionDB(), None, None, None, None, None,
|
||||
"")
|
||||
provider = Provider("pyoicserv", SessionDB(SERVER_INFO["issuer"]), None,
|
||||
None, None, None, None, "")
|
||||
provider.baseurl = "http://www.example.com/"
|
||||
provider.key_setup("static", sig={"format": "jwk", "alg": "RSA"})
|
||||
|
||||
|
@ -714,4 +737,4 @@ def test_key_rollover():
|
|||
assert len(provider2.keyjar.issuer_keys[""]) == 2
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_key_rollover()
|
||||
test_server_authorization_endpoint_id_token()
|
||||
|
|
|
@ -6,7 +6,7 @@ import time
|
|||
|
||||
from pytest import raises
|
||||
|
||||
from oic.utils.sdb import SessionDB
|
||||
from oic.utils.sdb import SessionDB, AuthnEvent
|
||||
from oic.utils.sdb import ExpiredToken
|
||||
from oic.oic.message import AuthorizationRequest
|
||||
from oic.oic.message import OpenIDRequest
|
||||
|
@ -31,23 +31,25 @@ OAUTH2_AREQ = AuthorizationRequest(response_type="code",
|
|||
redirect_uri="http://example.com/authz",
|
||||
scope=["openid"], state="state000")
|
||||
|
||||
BASE_URL = "https://exampl.com/"
|
||||
|
||||
|
||||
def _eq(l1, l2):
|
||||
return set(l1) == set(l2)
|
||||
|
||||
|
||||
def test_token():
|
||||
sdb = SessionDB()
|
||||
sdb = SessionDB(BASE_URL)
|
||||
sid = sdb.token.key(areq=AREQ)
|
||||
assert len(sid) == 56
|
||||
|
||||
sdb = SessionDB({"a": "b"})
|
||||
sdb = SessionDB(BASE_URL, {"a": "b"})
|
||||
sid = sdb.token.key(areq=AREQ)
|
||||
assert len(sid) == 56
|
||||
|
||||
|
||||
def test_new_token():
|
||||
sdb = SessionDB()
|
||||
sdb = SessionDB(BASE_URL)
|
||||
sid = sdb.token.key(areq=AREQ)
|
||||
assert len(sid) == 56
|
||||
|
||||
|
@ -63,7 +65,7 @@ def test_new_token():
|
|||
|
||||
|
||||
def test_type_and_key():
|
||||
sdb = SessionDB()
|
||||
sdb = SessionDB(BASE_URL)
|
||||
sid = sdb.token.key(areq=AREQ)
|
||||
code = sdb.token(sid=sid)
|
||||
print sid
|
||||
|
@ -74,7 +76,7 @@ def test_type_and_key():
|
|||
|
||||
|
||||
def test_setitem():
|
||||
sdb = SessionDB()
|
||||
sdb = SessionDB(BASE_URL)
|
||||
sid = sdb.token.key(areq=AREQ)
|
||||
code = sdb.token(sid=sid)
|
||||
|
||||
|
@ -90,7 +92,7 @@ def test_setitem():
|
|||
|
||||
|
||||
def test_update():
|
||||
sdb = SessionDB()
|
||||
sdb = SessionDB(BASE_URL)
|
||||
sid = sdb.token.key(areq=AREQ)
|
||||
code = sdb.token(sid=sid)
|
||||
|
||||
|
@ -111,33 +113,37 @@ def test_update():
|
|||
|
||||
|
||||
def test_create_authz_session():
|
||||
sdb = SessionDB()
|
||||
sid = sdb.create_authz_session("sub", AREQ)
|
||||
sdb = SessionDB(BASE_URL)
|
||||
ae = AuthnEvent("uid")
|
||||
sid = sdb.create_authz_session(ae, AREQ)
|
||||
sdb.do_sub(sid)
|
||||
|
||||
info = sdb[sid]
|
||||
print info
|
||||
assert info["oauth_state"] == "authz"
|
||||
|
||||
sdb = SessionDB()
|
||||
sdb = SessionDB(BASE_URL)
|
||||
ae = AuthnEvent("sub")
|
||||
# Missing nonce property
|
||||
sid = sdb.create_authz_session("sub", OAUTH2_AREQ)
|
||||
sid = sdb.create_authz_session(ae, OAUTH2_AREQ)
|
||||
info = sdb[sid]
|
||||
print info
|
||||
assert info["oauth_state"] == "authz"
|
||||
|
||||
sid2 = sdb.create_authz_session("sub", AREQN)
|
||||
ae = AuthnEvent("sub")
|
||||
sid2 = sdb.create_authz_session(ae, AREQN)
|
||||
|
||||
info = sdb[sid2]
|
||||
print info
|
||||
assert info["nonce"] == "something"
|
||||
|
||||
sid3 = sdb.create_authz_session("sub", AREQN, id_token="id_token")
|
||||
sid3 = sdb.create_authz_session(ae, AREQN, id_token="id_token")
|
||||
|
||||
info = sdb[sid3]
|
||||
print info
|
||||
assert info["id_token"] == "id_token"
|
||||
|
||||
sid4 = sdb.create_authz_session("sub", AREQN, oidreq=OIDR)
|
||||
sid4 = sdb.create_authz_session(ae, AREQN, oidreq=OIDR)
|
||||
|
||||
info = sdb[sid4]
|
||||
print info
|
||||
|
@ -146,10 +152,10 @@ def test_create_authz_session():
|
|||
|
||||
|
||||
def test_create_authz_session_with_sector_id():
|
||||
sdb = SessionDB(seed="foo")
|
||||
uid = "sub"
|
||||
sid5 = sdb.create_authz_session(uid, AREQN, oidreq=OIDR)
|
||||
sdb.do_userid(sid5, uid, "http://example.com/si.jwt", "pairwise")
|
||||
sdb = SessionDB(BASE_URL, seed="foo")
|
||||
ae = AuthnEvent("sub")
|
||||
sid5 = sdb.create_authz_session(ae, AREQN, oidreq=OIDR)
|
||||
sdb.do_sub(sid5, "http://example.com/si.jwt", "pairwise")
|
||||
|
||||
info_1 = sdb[sid5]
|
||||
print info_1
|
||||
|
@ -158,7 +164,7 @@ def test_create_authz_session_with_sector_id():
|
|||
assert info_1["sub"] != "sub"
|
||||
user_id1 = info_1["sub"]
|
||||
|
||||
sdb.do_userid(sid5, uid, "http://example.net/si.jwt", "pairwise")
|
||||
sdb.do_sub(sid5, "http://example.net/si.jwt", "pairwise")
|
||||
|
||||
info_2 = sdb[sid5]
|
||||
print info_2
|
||||
|
@ -167,34 +173,35 @@ def test_create_authz_session_with_sector_id():
|
|||
|
||||
|
||||
def test_upgrade_to_token():
|
||||
sdb = SessionDB()
|
||||
sid = sdb.create_authz_session("sub", AREQ)
|
||||
sdb = SessionDB(BASE_URL)
|
||||
ae1 = AuthnEvent("sub")
|
||||
sid = sdb.create_authz_session(ae1, AREQ)
|
||||
grant = sdb[sid]["code"]
|
||||
_dict = sdb.upgrade_to_token(grant)
|
||||
|
||||
print _dict.keys()
|
||||
assert _eq(_dict.keys(), ['code', 'authzreq', 'token_type', 'local_sub',
|
||||
'client_id', 'oauth_state', 'refresh_token',
|
||||
'revoked', 'sub', 'access_token',
|
||||
'token_expires_at', 'expires_in', 'state',
|
||||
'redirect_uri', 'code_used', 'scope',
|
||||
'access_token_scope'])
|
||||
assert _eq(_dict.keys(), ['authn_event', 'code', 'authzreq', 'revoked',
|
||||
'access_token', 'token_expires_at', 'expires_in',
|
||||
'token_type', 'state', 'redirect_uri',
|
||||
'code_used', 'client_id', 'scope', 'oauth_state',
|
||||
'refresh_token', 'access_token_scope'])
|
||||
|
||||
raises(Exception, 'sdb.upgrade_to_token(grant)')
|
||||
|
||||
raises(Exception, 'sdb.upgrade_to_token(_dict["access_token"]')
|
||||
|
||||
sdb = SessionDB()
|
||||
sid = sdb.create_authz_session("another_user_id", AREQ)
|
||||
sdb = SessionDB(BASE_URL)
|
||||
ae2 = AuthnEvent("another_user_id")
|
||||
sid = sdb.create_authz_session(ae2, AREQ)
|
||||
grant = sdb[sid]["code"]
|
||||
|
||||
_dict = sdb.upgrade_to_token(grant, id_token="id_token", oidreq=OIDR)
|
||||
print _dict.keys()
|
||||
assert _eq(_dict.keys(), ['code', 'authzreq', 'id_token', 'token_type',
|
||||
'local_sub', 'client_id', 'oauth_state',
|
||||
'refresh_token', 'revoked', 'sub', 'oidreq',
|
||||
'access_token', 'token_expires_at', 'expires_in',
|
||||
'state', 'redirect_uri', 'code_used', 'scope',
|
||||
assert _eq(_dict.keys(), ['authn_event', 'code', 'authzreq', 'revoked',
|
||||
'oidreq', 'access_token', 'id_token',
|
||||
'token_expires_at', 'expires_in', 'token_type',
|
||||
'state', 'redirect_uri', 'code_used', 'client_id',
|
||||
'scope', 'oauth_state', 'refresh_token',
|
||||
'access_token_scope'])
|
||||
|
||||
assert _dict["id_token"] == "id_token"
|
||||
|
@ -204,8 +211,9 @@ def test_upgrade_to_token():
|
|||
|
||||
|
||||
def test_refresh_token():
|
||||
sdb = SessionDB()
|
||||
sid = sdb.create_authz_session("sub", AREQ)
|
||||
sdb = SessionDB(BASE_URL)
|
||||
ae = AuthnEvent("sub")
|
||||
sid = sdb.create_authz_session(ae, AREQ)
|
||||
grant = sdb[sid]["code"]
|
||||
_dict = sdb.upgrade_to_token(grant)
|
||||
dict1 = _dict.copy()
|
||||
|
@ -222,8 +230,9 @@ def test_refresh_token():
|
|||
|
||||
|
||||
def test_is_valid():
|
||||
sdb = SessionDB()
|
||||
sid = sdb.create_authz_session("sub", AREQ)
|
||||
sdb = SessionDB(BASE_URL)
|
||||
ae1 = AuthnEvent("sub")
|
||||
sid = sdb.create_authz_session(ae1, AREQ)
|
||||
grant = sdb[sid]["code"]
|
||||
|
||||
assert sdb.is_valid(grant)
|
||||
|
@ -255,7 +264,8 @@ def test_is_valid():
|
|||
dict2["access_token"] = token1
|
||||
assert sdb.is_valid(token2) is False
|
||||
|
||||
sid = sdb.create_authz_session("another:user", AREQ)
|
||||
ae = AuthnEvent("another:user")
|
||||
sid = sdb.create_authz_session(ae, AREQ)
|
||||
grant = sdb[sid]["code"]
|
||||
|
||||
gdict = sdb[grant]
|
||||
|
@ -264,8 +274,9 @@ def test_is_valid():
|
|||
|
||||
|
||||
def test_revoke_token():
|
||||
sdb = SessionDB()
|
||||
sid = sdb.create_authz_session("sub", AREQ)
|
||||
sdb = SessionDB(BASE_URL)
|
||||
ae1 = AuthnEvent("sub")
|
||||
sid = sdb.create_authz_session(ae1, AREQ)
|
||||
|
||||
grant = sdb[sid]["code"]
|
||||
_dict = sdb.upgrade_to_token(grant)
|
||||
|
@ -291,9 +302,26 @@ def test_revoke_token():
|
|||
|
||||
# --- new token ----
|
||||
|
||||
sdb = SessionDB()
|
||||
sid = sdb.create_authz_session("sub", AREQ)
|
||||
sdb = SessionDB(BASE_URL)
|
||||
ae2 = AuthnEvent("sub")
|
||||
sid = sdb.create_authz_session(ae2, AREQ)
|
||||
|
||||
grant = sdb[sid]["code"]
|
||||
sdb.revoke_token(grant)
|
||||
assert sdb.is_valid(grant) is False
|
||||
|
||||
|
||||
def test_sub_to_authn_event():
|
||||
sdb = SessionDB(BASE_URL)
|
||||
ae2 = AuthnEvent("sub")
|
||||
sid = sdb.create_authz_session(ae2, AREQ)
|
||||
sub = sdb.do_sub(sid)
|
||||
|
||||
# given the sub find out weather the authn event is still valid
|
||||
|
||||
sids = sdb.sub2sid[sub]
|
||||
ae = sdb[sids[0]]["authn_event"]
|
||||
assert ae.valid()
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_sub_to_authn_event()
|
||||
|
|
|
@ -144,7 +144,8 @@ def test_srv2():
|
|||
req = cc.construct_UserClaimsRequest(
|
||||
request_args={"sub": "diana", "claims_names": ["gender", "birthdate"]})
|
||||
|
||||
srv = ClaimsServer("pyoicserv", SessionDB(), CDB, USERINFO, verify_client,
|
||||
srv = ClaimsServer("pyoicserv", SessionDB("https://example.com"), CDB,
|
||||
USERINFO, verify_client,
|
||||
keyjar=KEYJAR, dist_claims_mode=ClaimsMode(USER2MODE))
|
||||
|
||||
srv.keyjar[""] = keybundle_from_local_file("%s/rsa.key" % BASE_PATH, "rsa", ["ver", "sig"])
|
||||
|
|
|
@ -89,14 +89,16 @@ def _eq(l1, l2):
|
|||
|
||||
|
||||
def test_provider_init():
|
||||
provider = Provider("pyoicserv", sdb.SessionDB(), CDB, AUTHN_BROKER, AUTHZ,
|
||||
provider = Provider("pyoicserv", sdb.SessionDB(SERVER_INFO["issuer"]), CDB,
|
||||
AUTHN_BROKER, AUTHZ,
|
||||
verify_client, client_info_url="https://example.com/as")
|
||||
|
||||
assert provider
|
||||
|
||||
|
||||
def test_client_registration():
|
||||
provider = Provider("pyoicserv", sdb.SessionDB(), CDB, AUTHN_BROKER, AUTHZ,
|
||||
provider = Provider("pyoicserv", sdb.SessionDB(SERVER_INFO["issuer"]), CDB,
|
||||
AUTHN_BROKER, AUTHZ,
|
||||
verify_client,
|
||||
client_info_url="https://example.com/as/")
|
||||
|
||||
|
@ -125,8 +127,8 @@ def test_client_registration_uri_error():
|
|||
"jwks_uri": "https://client.example.org/my_public_keys.jwks"
|
||||
}
|
||||
|
||||
provider = Provider("pyoicserv", sdb.SessionDB(), CDB, AUTHN_BROKER, AUTHZ,
|
||||
verify_client,
|
||||
provider = Provider("pyoicserv", sdb.SessionDB("https://example.org/"),
|
||||
CDB, AUTHN_BROKER, AUTHZ, verify_client,
|
||||
client_info_url="https://example.com/as/")
|
||||
|
||||
request = RegistrationRequest(**args)
|
||||
|
@ -152,8 +154,8 @@ def test_client_registration_2():
|
|||
"scope": "read write dolphin",
|
||||
}
|
||||
|
||||
provider = Provider("pyoicserv", sdb.SessionDB(), CDB, AUTHN_BROKER, AUTHZ,
|
||||
verify_client,
|
||||
provider = Provider("pyoicserv", sdb.SessionDB("https://example.org/"),
|
||||
CDB, AUTHN_BROKER, AUTHZ, verify_client,
|
||||
client_info_url="https://example.com/as/",
|
||||
client_authn_methods={
|
||||
"client_secret_post": ClientSecretPost,
|
||||
|
@ -183,8 +185,8 @@ def test_client_user_info_get():
|
|||
"scope": "read write dolphin",
|
||||
}
|
||||
|
||||
provider = Provider("pyoicserv", sdb.SessionDB(), CDB, AUTHN_BROKER, AUTHZ,
|
||||
verify_client,
|
||||
provider = Provider("pyoicserv", sdb.SessionDB("https://example.org/"),
|
||||
CDB, AUTHN_BROKER, AUTHZ, verify_client,
|
||||
client_info_url="https://example.com/as/",
|
||||
client_authn_methods={
|
||||
"client_secret_post": ClientSecretPost,
|
||||
|
@ -225,8 +227,8 @@ def test_client_registration_update():
|
|||
"scope": "read write dolphin",
|
||||
}
|
||||
|
||||
provider = Provider("pyoicserv", sdb.SessionDB(), CDB, AUTHN_BROKER, AUTHZ,
|
||||
verify_client,
|
||||
provider = Provider("pyoicserv", sdb.SessionDB("https://example.org/"),
|
||||
CDB, AUTHN_BROKER, AUTHZ, verify_client,
|
||||
client_info_url="https://example.com/as/",
|
||||
client_authn_methods={
|
||||
"client_secret_post": ClientSecretPost,
|
||||
|
@ -283,7 +285,8 @@ def test_client_registration_delete():
|
|||
"scope": "read write dolphin",
|
||||
}
|
||||
|
||||
provider = Provider("pyoicserv", sdb.SessionDB(), CDB, AUTHN_BROKER, AUTHZ,
|
||||
provider = Provider("pyoicserv", sdb.SessionDB(SERVER_INFO["issuer"]), CDB,
|
||||
AUTHN_BROKER, AUTHZ,
|
||||
verify_client,
|
||||
client_info_url="https://example.com/as/",
|
||||
client_authn_methods={
|
||||
|
|
Reference in New Issue