Minor tweaks.
This commit is contained in:
parent
bfde2df3c5
commit
370972f114
|
@ -36,13 +36,15 @@ class OpenIDConnect(object):
|
|||
self.attribute_map = attribute_map
|
||||
self.authenticating_authority = authenticating_authority
|
||||
self.name = name
|
||||
self.client_id = ""
|
||||
self.client_secret = ""
|
||||
|
||||
for param in ["client_id", "client_secret"]:
|
||||
try:
|
||||
setattr(self, param, kwargs[param])
|
||||
del kwargs[param]
|
||||
except KeyError:
|
||||
setattr(self, param, "")
|
||||
pass
|
||||
|
||||
self.extra = kwargs
|
||||
try:
|
||||
|
@ -135,7 +137,7 @@ class OpenIDConnect(object):
|
|||
client = self.dynamic(server_env, callback, logoutCallback, session, key)
|
||||
else:
|
||||
client = self.static(server_env, callback, logoutCallback, key)
|
||||
client.state = session.getState()
|
||||
_state = session.getState()
|
||||
session.setClient(client)
|
||||
|
||||
acr_value = session.get_acr_value(client.authorization_endpoint)
|
||||
|
@ -151,7 +153,8 @@ class OpenIDConnect(object):
|
|||
return []
|
||||
elif acr_values is not None and len(acr_values) == 1:
|
||||
acr_value = acr_values[0]
|
||||
return self.create_authnrequest(environ, server_env, start_response, session, acr_value)
|
||||
return self.create_authnrequest(environ, server_env, start_response,
|
||||
session, acr_value, _state)
|
||||
except Exception:
|
||||
message = traceback.format_exception(*sys.exc_info())
|
||||
logger.error(message)
|
||||
|
@ -160,14 +163,15 @@ class OpenIDConnect(object):
|
|||
(False, "Cannot find the OP! Please view your configuration."))
|
||||
|
||||
#noinspection PyUnusedLocal
|
||||
def create_authnrequest(self, environ, server_env, start_response, session, acr_value):
|
||||
def create_authnrequest(self, environ, server_env, start_response, session,
|
||||
acr_value, state):
|
||||
try:
|
||||
client = session.getClient()
|
||||
session.set_acr_value(client.authorization_endpoint, acr_value)
|
||||
request_args = {
|
||||
"response_type": self.flow_type,
|
||||
"scope": server_env["SCOPE"],
|
||||
"state": client.state,
|
||||
"state": state,
|
||||
}
|
||||
|
||||
if acr_value is not None:
|
||||
|
|
|
@ -1,4 +1,6 @@
|
|||
#!/usr/bin/env python
|
||||
import time
|
||||
|
||||
__author__ = 'rohe0002'
|
||||
|
||||
import requests
|
||||
|
@ -411,7 +413,7 @@ class Client(PBase):
|
|||
self.verify_ssl = verify_ssl
|
||||
#self.secret_type = "basic "
|
||||
|
||||
self.state = None
|
||||
#self.state = None
|
||||
self.nonce = None
|
||||
|
||||
self.grant = {}
|
||||
|
@ -451,7 +453,7 @@ class Client(PBase):
|
|||
client_secret = property(get_client_secret, set_client_secret)
|
||||
|
||||
def reset(self):
|
||||
self.state = None
|
||||
#self.state = None
|
||||
self.nonce = None
|
||||
|
||||
self.grant = {}
|
||||
|
@ -504,18 +506,18 @@ class Client(PBase):
|
|||
|
||||
return uri
|
||||
|
||||
def get_grant(self, **kwargs):
|
||||
try:
|
||||
_state = kwargs["state"]
|
||||
if not _state:
|
||||
_state = self.state
|
||||
except KeyError:
|
||||
_state = self.state
|
||||
def get_grant(self, state, **kwargs):
|
||||
# try:
|
||||
# _state = kwargs["state"]
|
||||
# if not _state:
|
||||
# _state = self.state
|
||||
# except KeyError:
|
||||
# _state = self.state
|
||||
|
||||
try:
|
||||
return self.grant[_state]
|
||||
return self.grant[state]
|
||||
except:
|
||||
raise Exception("No grant found for state:'%s'" % _state)
|
||||
raise Exception("No grant found for state:'%s'" % state)
|
||||
|
||||
def get_token(self, also_expired=False, **kwargs):
|
||||
try:
|
||||
|
@ -909,6 +911,9 @@ class Client(PBase):
|
|||
response_cls=AuthorizationResponse,
|
||||
**kwargs):
|
||||
|
||||
if state:
|
||||
request_args["state"] = state
|
||||
|
||||
url, body, ht_args, csi = self.request_info(request, method,
|
||||
request_args, extra_args,
|
||||
**kwargs)
|
||||
|
|
|
@ -158,7 +158,6 @@ class Consumer(Client):
|
|||
"""
|
||||
|
||||
res = {
|
||||
"state": self.state,
|
||||
"grant": self.grant,
|
||||
"seed": self.seed,
|
||||
"redirect_uris": self.redirect_uris,
|
||||
|
@ -191,7 +190,7 @@ class Consumer(Client):
|
|||
self.seed = rndstr()
|
||||
|
||||
sid = stateID(request, self.seed)
|
||||
self.state = sid
|
||||
#self.state = sid
|
||||
self.grant[sid] = Grant(seed=self.seed)
|
||||
self._backup(sid)
|
||||
self.sdb["seed:%s" % self.seed] = sid
|
||||
|
@ -208,7 +207,7 @@ class Consumer(Client):
|
|||
|
||||
LOG_DEBUG("Redirecting to: %s" % (location,))
|
||||
|
||||
return location
|
||||
return sid, location
|
||||
|
||||
#noinspection PyUnusedLocal
|
||||
def handle_authorization_response(self, query="", **kwargs):
|
||||
|
@ -258,20 +257,21 @@ class Consumer(Client):
|
|||
except KeyError:
|
||||
raise UnknownState(atr["state"])
|
||||
|
||||
self.seed = self.grant[self.state].seed
|
||||
self.seed = self.grant[atr["state"]].seed
|
||||
|
||||
return atr
|
||||
|
||||
def complete(self, query="", **kwargs):
|
||||
def complete(self, query, state, **kwargs):
|
||||
"""
|
||||
:param query: The query part of the request URL
|
||||
:param state:
|
||||
"""
|
||||
|
||||
resp = self.handle_authorization_response(query, **kwargs)
|
||||
|
||||
if resp.type() == "AuthorizationResponse":
|
||||
# Go get the access token
|
||||
resp = self.do_access_token_request(state=self.state)
|
||||
resp = self.do_access_token_request(state=state)
|
||||
|
||||
return resp
|
||||
|
||||
|
@ -294,13 +294,13 @@ class Consumer(Client):
|
|||
return request_args, http_args, extra_args
|
||||
|
||||
#noinspection PyUnusedLocal
|
||||
def get_access_token_request(self, **kwargs):
|
||||
def get_access_token_request(self, state, **kwargs):
|
||||
|
||||
request_args, http_args, extra_args = self.client_auth_info()
|
||||
|
||||
url, body, ht_args, csi = self.request_info(AccessTokenRequest,
|
||||
request_args=request_args,
|
||||
state=self.state,
|
||||
state=state,
|
||||
**extra_args)
|
||||
|
||||
if not http_args:
|
||||
|
|
|
@ -615,12 +615,12 @@ class Client(oauth2.Client):
|
|||
self.redirect_uris = reginfo["redirect_uris"]
|
||||
|
||||
def handle_registration_info(self, response):
|
||||
if response.status_code == 200:
|
||||
if response.status_code in [200, 201]:
|
||||
resp = ClientInfoResponse().deserialize(response.text, "json")
|
||||
self.store_registration_info(resp)
|
||||
else:
|
||||
err = ErrorResponse().deserialize(response.text, "json")
|
||||
raise PyoidcError("Registration failed: %s" % err.get_json())
|
||||
raise PyoidcError("Registration failed: %s" % err.to_json())
|
||||
|
||||
return resp
|
||||
|
||||
|
|
|
@ -391,11 +391,11 @@ class Message(object):
|
|||
else:
|
||||
self._dict[skey] = val
|
||||
|
||||
def to_json(self, lev=0):
|
||||
def to_json(self, lev=0, indent=None):
|
||||
if lev:
|
||||
return self.to_dict(lev + 1)
|
||||
else:
|
||||
return json.dumps(self.to_dict(1))
|
||||
return json.dumps(self.to_dict(1), indent=indent)
|
||||
|
||||
def from_json(self, txt, **kwargs):
|
||||
return self.from_dict(json.loads(txt))
|
||||
|
|
|
@ -200,7 +200,7 @@ class Consumer(Client):
|
|||
|
||||
#noinspection PyUnusedLocal,PyArgumentEqualDefault
|
||||
def begin(self, scope="", response_type="", use_nonce=False, path="",
|
||||
requrl="", **kwargs):
|
||||
**kwargs):
|
||||
""" Begin the OIDC flow
|
||||
|
||||
:param scope: Defines which user info claims is wanted
|
||||
|
@ -209,8 +209,8 @@ class Consumer(Client):
|
|||
:param use_nonce: If not implicit flow nonce is optional.
|
||||
This defines if it should be used anyway.
|
||||
:param path: The path part of the redirect URL
|
||||
:param requrl: Request URL
|
||||
:return: A URL to which the user should be redirected
|
||||
:return: A 2-tuple, session identifier and URL to which the user
|
||||
should be redirected
|
||||
"""
|
||||
_log_info = logger.info
|
||||
|
||||
|
@ -239,15 +239,11 @@ class Consumer(Client):
|
|||
response_type = self.config["response_type"]
|
||||
|
||||
sid = stateID(path, self.seed)
|
||||
self.state = sid
|
||||
self.grant[sid] = Grant(seed=self.seed)
|
||||
|
||||
self._backup(sid)
|
||||
self.sdb["seed:%s" % self.seed] = sid
|
||||
|
||||
# Store the request and the redirect uri used
|
||||
self._request = requrl
|
||||
|
||||
args = {
|
||||
"client_id": self.client_id,
|
||||
"state": sid,
|
||||
|
@ -312,7 +308,7 @@ class Consumer(Client):
|
|||
if self.debug:
|
||||
_log_info("Redirecting to: %s" % location)
|
||||
|
||||
return location
|
||||
return sid, location
|
||||
|
||||
#noinspection PyUnusedLocal
|
||||
def parse_authz(self, query="", **kwargs):
|
||||
|
@ -380,7 +376,7 @@ class Consumer(Client):
|
|||
idt = None
|
||||
return None, atr, idt
|
||||
|
||||
def complete(self):
|
||||
def complete(self, state):
|
||||
"""
|
||||
Do the access token request, the last step in a code flow.
|
||||
If Implicit flow was used then this method is never used.
|
||||
|
@ -398,7 +394,7 @@ class Consumer(Client):
|
|||
else:
|
||||
raise PyoidcError("Nothing to authenticate with")
|
||||
|
||||
resp = self.do_access_token_request(state=self.state,
|
||||
resp = self.do_access_token_request(state=state,
|
||||
request_args=args,
|
||||
http_args=http_args)
|
||||
|
||||
|
@ -408,7 +404,7 @@ class Consumer(Client):
|
|||
raise TokenError(resp.error, resp)
|
||||
|
||||
#self._backup(self.sdb["seed:%s" % _cli.seed])
|
||||
self._backup(self.state)
|
||||
self._backup(state)
|
||||
|
||||
return resp
|
||||
|
||||
|
@ -416,14 +412,14 @@ class Consumer(Client):
|
|||
pass
|
||||
|
||||
#noinspection PyUnusedLocal
|
||||
def get_user_info(self):
|
||||
uinfo = self.do_user_info_request(state=self.state, schema="openid")
|
||||
def get_user_info(self, state):
|
||||
uinfo = self.do_user_info_request(state=state, schema="openid")
|
||||
|
||||
if uinfo.type() == "ErrorResponse":
|
||||
raise TokenError(uinfo.error, uinfo)
|
||||
|
||||
self.user_info = uinfo
|
||||
self._backup(self.state)
|
||||
self._backup(state)
|
||||
|
||||
return uinfo
|
||||
|
||||
|
|
|
@ -192,15 +192,20 @@ class Provider(AProvider):
|
|||
self.hostname = hostname or socket.gethostname
|
||||
self.register_endpoint = "%s%s" % (self.baseurl, "register")
|
||||
|
||||
self.default_sign_alg = {"id_token": "RS256", "userinfo": "RS256"}
|
||||
|
||||
if capabilities:
|
||||
self.verify_capabilities(capabilities)
|
||||
self.capabilities = ProviderConfigurationResponse(**capabilities)
|
||||
else:
|
||||
self.capabilities = self.provider_features()
|
||||
|
||||
def id_token_as_signed_jwt(self, session, loa="2", alg="RS256", code=None,
|
||||
def id_token_as_signed_jwt(self, session, loa="2", alg="", code=None,
|
||||
access_token=None, user_info=None, auth_time=0):
|
||||
|
||||
if alg == "":
|
||||
alg = self.default_sign_alg["id_token"]
|
||||
|
||||
logger.debug("Signing alg: %s [%s]" % (alg, alg2keytype(alg)))
|
||||
_idt = self.server.make_id_token(session, loa, self.baseurl, alg, code,
|
||||
access_token, user_info, auth_time)
|
||||
|
@ -866,7 +871,12 @@ class Provider(AProvider):
|
|||
_cinfo = self.cdb[session["client_id"]]
|
||||
try:
|
||||
if "userinfo_signed_response_alg" in _cinfo:
|
||||
algo = _cinfo["userinfo_signed_response_alg"]
|
||||
|
||||
try:
|
||||
algo = _cinfo["userinfo_signed_response_alg"]
|
||||
except KeyError: # Fall back to default
|
||||
algo = self.default_sign_alg["userinfo"]
|
||||
|
||||
# Use my key for signing
|
||||
key = self.keyjar.get_signing_key(alg2keytype(algo), "")
|
||||
if not key:
|
||||
|
|
|
@ -169,13 +169,6 @@ class BearerHeader(ClientAuthnMethod):
|
|||
try:
|
||||
_acc_token = kwargs["access_token"]
|
||||
except KeyError:
|
||||
try:
|
||||
_ = kwargs["state"]
|
||||
except KeyError:
|
||||
if not self.cli.state:
|
||||
raise Exception("Missing state specification")
|
||||
kwargs["state"] = self.cli.state
|
||||
|
||||
_acc_token = self.cli.get_token(**kwargs).access_token
|
||||
else:
|
||||
try:
|
||||
|
|
|
@ -59,7 +59,7 @@ class Token(object):
|
|||
def __init__(self, secret, password):
|
||||
self.secret = secret
|
||||
self._rndlen = 19
|
||||
self._sidlen = 28
|
||||
self._sidlen = 56
|
||||
self.crypt = Crypt(password)
|
||||
|
||||
def __call__(self, ttype="A", prev="", sid=None):
|
||||
|
@ -102,7 +102,7 @@ class Token(object):
|
|||
except KeyError:
|
||||
pass
|
||||
|
||||
return csum.digest() # 28 bytes long, 224 bits
|
||||
return csum.hexdigest() # 56 bytes long, 224 bits
|
||||
|
||||
def _split_token(self, token):
|
||||
plain = self.crypt.decrypt(base64.b64decode(token))
|
||||
|
|
|
@ -110,7 +110,8 @@ class MyFakeOICServer(Server):
|
|||
resp = AuthorizationResponse(**_dict)
|
||||
#resp.code = grant
|
||||
else:
|
||||
resp = AuthorizationResponse(state=req["state"],
|
||||
_state = req["state"]
|
||||
resp = AuthorizationResponse(state=_state,
|
||||
code=_info["code"])
|
||||
|
||||
else: # "implicit" in req.response_type:
|
||||
|
|
|
@ -46,22 +46,25 @@ def test():
|
|||
"%s/authorization" % issuer),
|
||||
10, "http://%s" % socket.gethostname())
|
||||
|
||||
ac.add(PASSWORD,
|
||||
CasAuthnMethod(
|
||||
None, CAS_SERVER, SERVICE_URL,
|
||||
"%s/authorization" % issuer,
|
||||
UserLDAPMemberValidation(**LDAP_EXTRAVALIDATION)),
|
||||
20, "http://%s" % socket.gethostname())
|
||||
try:
|
||||
ac.add(PASSWORD,
|
||||
CasAuthnMethod(
|
||||
None, CAS_SERVER, SERVICE_URL,
|
||||
"%s/authorization" % issuer,
|
||||
UserLDAPMemberValidation(**LDAP_EXTRAVALIDATION)),
|
||||
20, "http://%s" % socket.gethostname())
|
||||
except Exception:
|
||||
assert len(ac) == 1
|
||||
else:
|
||||
assert len(ac) == 2
|
||||
|
||||
assert len(ac) == 2
|
||||
res = ac.pick(PASSWORD)
|
||||
|
||||
res = ac.pick(PASSWORD)
|
||||
|
||||
assert res
|
||||
# list of two 2-tuples
|
||||
assert len(res) == 2
|
||||
assert res[0][0].__class__.__name__ == "CasAuthnMethod"
|
||||
assert res[1][0].__class__.__name__ == "UsernamePasswordMako"
|
||||
assert res
|
||||
# list of two 2-tuples
|
||||
assert len(res) == 2
|
||||
assert res[0][0].__class__.__name__ == "CasAuthnMethod"
|
||||
assert res[1][0].__class__.__name__ == "UsernamePasswordMako"
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
|
|
@ -110,6 +110,7 @@ class TestOAuthClient():
|
|||
def setup_class(self):
|
||||
self.client = Client("1")
|
||||
self.client.redirect_uris = ["http://example.com/redirect"]
|
||||
self.client.response_type = "code"
|
||||
|
||||
def test_areq_1(self):
|
||||
ar = self.client.construct_AuthorizationRequest(
|
||||
|
@ -122,8 +123,8 @@ class TestOAuthClient():
|
|||
assert "scope" not in ar
|
||||
|
||||
def test_areq_2(self):
|
||||
self.client.state = "abc"
|
||||
req_args = {"response_type": ["code"], "scope": ["foo", "bar"]}
|
||||
req_args = {"response_type": ["code"], "scope": ["foo", "bar"],
|
||||
"state": "abc"}
|
||||
ar = self.client.construct_AuthorizationRequest(request_args=req_args)
|
||||
|
||||
assert ar["redirect_uri"] == "http://example.com/redirect"
|
||||
|
@ -133,8 +134,8 @@ class TestOAuthClient():
|
|||
assert ar["scope"] == ["foo", "bar"]
|
||||
|
||||
def test_areq_replace_default_state(self):
|
||||
self.client.state = "efg"
|
||||
req_args = {"response_type": ["code"], "scope": ["foo", "bar"]}
|
||||
req_args = {"response_type": ["code"], "scope": ["foo", "bar"],
|
||||
"state": "efg"}
|
||||
ar = self.client.construct_AuthorizationRequest(request_args=req_args)
|
||||
|
||||
assert ar["redirect_uri"] == "http://example.com/redirect"
|
||||
|
@ -335,17 +336,20 @@ class TestOAuthClient():
|
|||
|
||||
def test_request_info_simple(self):
|
||||
self.client.authorization_endpoint = "https://example.com/authz"
|
||||
uri, body, h_args, cis = self.client.request_info(AuthorizationRequest)
|
||||
req_args = {"state": "hmm", "response_type": "code"}
|
||||
uri, body, h_args, cis = self.client.request_info(AuthorizationRequest,
|
||||
request_args=req_args)
|
||||
|
||||
# default == "POST"
|
||||
assert uri == 'https://example.com/authz'
|
||||
assert body == "redirect_uri=http%3A%2F%2Fclient.example.com%2Fauthz&response_type=code&client_id=1"
|
||||
print body
|
||||
assert body == "state=hmm&redirect_uri=http%3A%2F%2Fclient.example.com%2Fauthz&response_type=code&client_id=1"
|
||||
assert h_args == {'headers': {'Content-type':
|
||||
'application/x-www-form-urlencoded'}}
|
||||
assert cis.type() == "AuthorizationRequest"
|
||||
|
||||
def test_request_info_simple_get(self):
|
||||
#self.client.authorization_endpoint = "https://example.com/authz"
|
||||
self.client.authorization_endpoint = "https://example.com/authz"
|
||||
uri, body, h_args, cis = self.client.request_info(AuthorizationRequest,
|
||||
method="GET")
|
||||
|
||||
|
@ -883,7 +887,6 @@ def test_bearer_header_2():
|
|||
def test_bearer_header_3():
|
||||
client = Client("A")
|
||||
client.client_secret = "boarding pass"
|
||||
client.state = "state"
|
||||
|
||||
resp1 = AuthorizationResponse(code="auth_grant", state="state")
|
||||
client.parse_response(AuthorizationResponse, resp1.to_urlencoded(),
|
||||
|
@ -896,7 +899,7 @@ def test_bearer_header_3():
|
|||
|
||||
cis = ResourceRequest()
|
||||
|
||||
http_args = BearerHeader(client).construct(cis)
|
||||
http_args = BearerHeader(client).construct(cis, state="state")
|
||||
|
||||
print cis
|
||||
assert "access_token" not in cis
|
||||
|
@ -941,7 +944,6 @@ def test_bearer_body():
|
|||
def test_bearer_body_get_token():
|
||||
client = Client("A")
|
||||
client.client_secret = "boarding pass"
|
||||
client.state = "state"
|
||||
|
||||
resp1 = AuthorizationResponse(code="auth_grant", state="state")
|
||||
client.parse_response(AuthorizationResponse, resp1.to_urlencoded(),
|
||||
|
@ -954,10 +956,14 @@ def test_bearer_body_get_token():
|
|||
|
||||
cis = ResourceRequest()
|
||||
|
||||
_ = BearerBody(client).construct(cis)
|
||||
_ = BearerBody(client).construct(cis, state="state")
|
||||
|
||||
assert "access_token" in cis
|
||||
assert cis["access_token"] == "token1"
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_client_secret_basic()
|
||||
# tc = TestOAuthClient()
|
||||
# tc.setup_class()
|
||||
# tc.test_request_info_simple_get()
|
||||
|
||||
test_bearer_body()
|
|
@ -99,11 +99,11 @@ def test_factory():
|
|||
server_info=SERVER_INFO, **CONSUMER_CONFIG)
|
||||
|
||||
sid = stateID("https://example.org/", cons.seed)
|
||||
cons.state = sid
|
||||
_state = sid
|
||||
cons._backup(sid)
|
||||
cons.sdb["seed:%s" % cons.seed] = sid
|
||||
|
||||
kaka = make_cookie(CLIENT_CONFIG["client_id"], cons.state, cons.seed,
|
||||
kaka = make_cookie(CLIENT_CONFIG["client_id"], _state, cons.seed,
|
||||
expire=360, path="/")
|
||||
|
||||
_oac = factory(kaka[1], _session_db, CLIENT_CONFIG["client_id"],
|
||||
|
@ -111,7 +111,7 @@ def test_factory():
|
|||
**CONSUMER_CONFIG)
|
||||
|
||||
assert _oac
|
||||
assert _oac.state == cons.state
|
||||
assert _oac.client_id == cons.client_id
|
||||
assert _oac.seed == cons.seed
|
||||
|
||||
|
||||
|
@ -120,12 +120,12 @@ def test_consumer_begin():
|
|||
cons = Consumer(_session_db, client_config=CLIENT_CONFIG,
|
||||
server_info=SERVER_INFO, **CONSUMER_CONFIG)
|
||||
|
||||
loc = cons.begin("http://localhost:8087",
|
||||
"http://localhost:8088/authorization")
|
||||
sid, loc = cons.begin("http://localhost:8087",
|
||||
"http://localhost:8088/authorization")
|
||||
|
||||
# state is dynamic
|
||||
params = {"scope": "openid",
|
||||
"state": cons.state,
|
||||
"state": sid,
|
||||
"redirect_uri": "http://localhost:8087/authz",
|
||||
"response_type": "code",
|
||||
"client_id": "number5"}
|
||||
|
@ -141,17 +141,17 @@ def test_consumer_handle_authorization_response():
|
|||
server_info=SERVER_INFO, **CONSUMER_CONFIG)
|
||||
cons.debug = True
|
||||
|
||||
_ = cons.begin("http://localhost:8087",
|
||||
"http://localhost:8088/authorization")
|
||||
sid, loc = cons.begin("http://localhost:8087",
|
||||
"http://localhost:8088/authorization")
|
||||
|
||||
atr = AuthorizationResponse(code="SplxlOBeZQQYbYS6WxSbIA",
|
||||
state=cons.state)
|
||||
state=sid)
|
||||
|
||||
res = cons.handle_authorization_response(query=atr.to_urlencoded())
|
||||
|
||||
assert res.type() == "AuthorizationResponse"
|
||||
print cons.grant[cons.state]
|
||||
grant = cons.grant[cons.state]
|
||||
print cons.grant[sid]
|
||||
grant = cons.grant[sid]
|
||||
assert grant.code == "SplxlOBeZQQYbYS6WxSbIA"
|
||||
|
||||
|
||||
|
@ -161,11 +161,11 @@ def test_consumer_parse_authz_exception():
|
|||
server_info=SERVER_INFO, **CONSUMER_CONFIG)
|
||||
cons.debug = True
|
||||
|
||||
_ = cons.begin("http://localhost:8087",
|
||||
"http://localhost:8088/authorization")
|
||||
sid, loc = cons.begin("http://localhost:8087",
|
||||
"http://localhost:8088/authorization")
|
||||
|
||||
atr = AuthorizationResponse(code="SplxlOBeZQQYbYS6WxSbIA",
|
||||
state=cons.state)
|
||||
state=sid)
|
||||
|
||||
adict = atr.to_dict()
|
||||
del adict["code"]
|
||||
|
@ -181,10 +181,10 @@ def test_consumer_parse_authz_error():
|
|||
server_info=SERVER_INFO, **CONSUMER_CONFIG)
|
||||
cons.debug = True
|
||||
|
||||
_ = cons.begin("http://localhost:8087",
|
||||
"http://localhost:8088/authorization")
|
||||
sid, loc = cons.begin("http://localhost:8087",
|
||||
"http://localhost:8088/authorization")
|
||||
|
||||
atr = AuthorizationErrorResponse(error="access_denied", state=cons.state)
|
||||
atr = AuthorizationErrorResponse(error="access_denied", state=sid)
|
||||
|
||||
QUERY_STRING = atr.to_urlencoded()
|
||||
|
||||
|
@ -201,20 +201,20 @@ def test_consumer_parse_access_token():
|
|||
environ = BASE_ENVIRON
|
||||
|
||||
cons.response_type = ["token"]
|
||||
_ = cons.begin("http://localhost:8087",
|
||||
"http://localhost:8088/authorization")
|
||||
sid, loc = cons.begin("http://localhost:8087",
|
||||
"http://localhost:8088/authorization")
|
||||
|
||||
atr = AccessTokenResponse(access_token="2YotnFZFEjr1zCsicMWpAA",
|
||||
token_type="example",
|
||||
refresh_token="tGzv3JOkF0XG5Qx2TlKWIA",
|
||||
example_parameter="example_value",
|
||||
state=cons.state)
|
||||
state=sid)
|
||||
|
||||
res = cons.handle_authorization_response(query=atr.to_urlencoded())
|
||||
|
||||
assert res.type() == "AccessTokenResponse"
|
||||
print cons.grant[cons.state]
|
||||
grant = cons.grant[cons.state]
|
||||
print cons.grant[sid]
|
||||
grant = cons.grant[sid]
|
||||
assert len(grant.tokens) == 1
|
||||
token = grant.tokens[0]
|
||||
assert token.access_token == "2YotnFZFEjr1zCsicMWpAA"
|
||||
|
@ -252,19 +252,19 @@ def test_consumer_client_get_access_token_reques():
|
|||
cons = Consumer(_session_db, client_config=CLIENT_CONFIG,
|
||||
server_info=SERVER_INFO, **CONSUMER_CONFIG)
|
||||
cons.client_secret = "secret0"
|
||||
cons.state = "state"
|
||||
_state = "state"
|
||||
cons.redirect_uris = ["https://www.example.com/oic/cb"]
|
||||
|
||||
resp1 = AuthorizationResponse(code="auth_grant", state="state")
|
||||
resp1 = AuthorizationResponse(code="auth_grant", state=_state)
|
||||
cons.parse_response(AuthorizationResponse, resp1.to_urlencoded(),
|
||||
"urlencoded")
|
||||
resp2 = AccessTokenResponse(access_token="token1",
|
||||
token_type="Bearer", expires_in=0,
|
||||
state="state")
|
||||
state=_state)
|
||||
cons.parse_response(AccessTokenResponse, resp2.to_urlencoded(),
|
||||
"urlencoded")
|
||||
|
||||
url, body, http_args = cons.get_access_token_request()
|
||||
url, body, http_args = cons.get_access_token_request(_state)
|
||||
assert url == "http://localhost:8088/token"
|
||||
print body
|
||||
assert body == ("code=auth_grant&client_secret=secret0&"
|
||||
|
@ -273,3 +273,5 @@ def test_consumer_client_get_access_token_reques():
|
|||
assert http_args == {'headers': {
|
||||
'Content-type': 'application/x-www-form-urlencoded'}}
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_consumer_parse_access_token()
|
|
@ -128,8 +128,8 @@ def test_provider_authenticated():
|
|||
server_info=SERVER_INFO, **CONSUMER_CONFIG)
|
||||
cons.debug = True
|
||||
|
||||
location = cons.begin("http://localhost:8087",
|
||||
"http://localhost:8088/authorization")
|
||||
sid, location = cons.begin("http://localhost:8087",
|
||||
"http://localhost:8088/authorization")
|
||||
|
||||
query_string = location.split("?")[1]
|
||||
|
||||
|
@ -147,8 +147,8 @@ def test_provider_authenticated():
|
|||
assert aresp.type() == "AuthorizationResponse"
|
||||
assert _eq(aresp.keys(), ['state', 'code'])
|
||||
|
||||
print cons.grant[cons.state].keys()
|
||||
assert _eq(cons.grant[cons.state].keys(), ['tokens', 'code', 'exp_in',
|
||||
print cons.grant[sid].keys()
|
||||
assert _eq(cons.grant[sid].keys(), ['tokens', 'code', 'exp_in',
|
||||
'seed', 'id_token',
|
||||
'grant_expiration_time'])
|
||||
|
||||
|
@ -161,9 +161,9 @@ def test_provider_authenticated_token():
|
|||
server_info=SERVER_INFO, **CONSUMER_CONFIG)
|
||||
cons.debug = True
|
||||
|
||||
location = cons.begin("http://localhost:8087",
|
||||
"http://localhost:8088/authorization",
|
||||
"token")
|
||||
sid, location = cons.begin("http://localhost:8087",
|
||||
"http://localhost:8088/authorization",
|
||||
"token")
|
||||
|
||||
QUERY_STRING = location.split("?")[1]
|
||||
resp = provider.authorization_endpoint(QUERY_STRING)
|
||||
|
@ -174,30 +174,6 @@ def test_provider_authenticated_token():
|
|||
assert "token_type=Bearer" in txt
|
||||
|
||||
|
||||
# def test_provider_authenticated_none():
|
||||
# provider = Provider("pyoicserv", sdb.SessionDB(), CDB, AUTHN_BROKER, AUTHZ,
|
||||
# verify_client)
|
||||
# _session_db = {}
|
||||
# cons = Consumer(_session_db, client_config=CLIENT_CONFIG,
|
||||
# server_info=SERVER_INFO, **CONSUMER_CONFIG)
|
||||
# cons.debug = True
|
||||
#
|
||||
# location = cons.begin("http://localhost:8087",
|
||||
# "http://localhost:8088/authorization",
|
||||
# "none")
|
||||
#
|
||||
# QUERY_STRING = location.split("?")[1]
|
||||
#
|
||||
# resp2 = provider.authorization_endpoint(request=QUERY_STRING)
|
||||
#
|
||||
# location = resp2.message
|
||||
# print location
|
||||
#
|
||||
# assert location.startswith("http://localhost:8087/authz")
|
||||
# query = location.split("?")[1]
|
||||
# assert query.startswith("state=")
|
||||
# assert "&" not in query
|
||||
|
||||
|
||||
def test_token_endpoint():
|
||||
provider = Provider("pyoicserv", sdb.SessionDB(), CDB, AUTHN_BROKER, AUTHZ,
|
||||
|
@ -269,4 +245,4 @@ def test_token_endpoint_unauth():
|
|||
assert _eq(atr.keys(), ['error_description', 'error'])
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_provider_authenticated_token()
|
||||
test_provider_authenticated()
|
|
@ -76,6 +76,7 @@ class TestOICClient():
|
|||
self.client.redirect_uris = ["http://example.com/redirect"]
|
||||
self.client.client_secret = CLIENT_SECRET
|
||||
self.client.keyjar[""] = KC_RSA
|
||||
self._state = ""
|
||||
|
||||
def test_areq_1(self):
|
||||
ar = self.client.construct_AuthorizationRequest(
|
||||
|
@ -308,8 +309,8 @@ class TestOICClient():
|
|||
# default == "POST"
|
||||
assert uri == 'https://example.com/authz'
|
||||
areq = AuthorizationRequest().from_urlencoded(body)
|
||||
assert _eq(areq.keys(), ["nonce", "redirect_uri", "response_type",
|
||||
"client_id", "scope"])
|
||||
assert _eq(areq.keys(), ["nonce", "state", "redirect_uri",
|
||||
"response_type", "client_id", "scope"])
|
||||
assert h_args == {'headers': {
|
||||
'Content-type': 'application/x-www-form-urlencoded'}}
|
||||
assert cis.type() == "AuthorizationRequest"
|
||||
|
@ -322,7 +323,7 @@ class TestOICClient():
|
|||
(url, query) = uri.split("?")
|
||||
areq = AuthorizationRequest().from_urlencoded(query)
|
||||
assert _eq(areq.keys(), ["nonce", "redirect_uri", "response_type",
|
||||
"client_id", "scope"])
|
||||
"client_id", "scope", "state"])
|
||||
assert areq["redirect_uri"] == "http://client.example.com/authz"
|
||||
|
||||
assert body is None
|
||||
|
@ -357,7 +358,7 @@ class TestOICClient():
|
|||
(url, query) = uri.split("?")
|
||||
areq = AuthorizationRequest().from_urlencoded(query)
|
||||
assert _eq(areq.keys(), ["redirect_uri", "response_type",
|
||||
"client_id", "rock", "scope"])
|
||||
"client_id", "rock", "scope", "state"])
|
||||
assert body is None
|
||||
assert h_args == {}
|
||||
assert cis.type() == "AuthorizationRequest"
|
||||
|
@ -405,23 +406,25 @@ class TestOICClient():
|
|||
def test_access_token_request(self):
|
||||
self.client.token_endpoint = "http://oic.example.org/token"
|
||||
|
||||
self._state = "state0"
|
||||
print self.client.grant.keys()
|
||||
print self.client.state
|
||||
print self.client.grant[self.client.state]
|
||||
print self.client.grant[self._state]
|
||||
|
||||
resp = self.client.do_access_token_request(scope="openid")
|
||||
resp = self.client.do_access_token_request(scope="openid",
|
||||
state=self._state)
|
||||
print resp
|
||||
print resp.keys()
|
||||
self.client.grant[self.client.state].add_token(resp)
|
||||
self.client.grant[self._state].add_token(resp)
|
||||
assert resp.type() == "AccessTokenResponse"
|
||||
assert _eq(resp.keys(), ['token_type', 'state', 'access_token',
|
||||
'expires_in', 'refresh_token', 'scope'])
|
||||
|
||||
def test_do_user_info_request(self):
|
||||
self.client.userinfo_endpoint = "http://oic.example.org/userinfo"
|
||||
self._state = "state0"
|
||||
print self.client.grant.keys()
|
||||
print self.client.grant[self.client.state]
|
||||
resp = self.client.do_user_info_request(state=self.client.state)
|
||||
print self.client.grant[self._state]
|
||||
resp = self.client.do_user_info_request(state=self._state)
|
||||
assert resp.type() == "OpenIDSchema"
|
||||
assert _eq(resp.keys(),
|
||||
['name', 'email', 'verified', 'nickname', 'sub'])
|
||||
|
@ -429,8 +432,10 @@ class TestOICClient():
|
|||
|
||||
def test_do_access_token_refresh(self):
|
||||
#token = self.client.get_token(scope="openid")
|
||||
_state = "state0"
|
||||
|
||||
resp = self.client.do_access_token_refresh(scope="openid")
|
||||
resp = self.client.do_access_token_refresh(scope="openid",
|
||||
state=_state)
|
||||
print resp
|
||||
assert resp.type() == "AccessTokenResponse"
|
||||
assert _eq(resp.keys(), ['token_type', 'state', 'access_token',
|
||||
|
@ -1153,8 +1158,8 @@ def test_make_id_token():
|
|||
|
||||
|
||||
if __name__ == "__main__":
|
||||
#t = TestOICClient()
|
||||
#t.setup_class()
|
||||
#t.test_do_check_session_request()
|
||||
t = TestOICClient()
|
||||
t.setup_class()
|
||||
t.test_access_token_request()
|
||||
|
||||
test_userinfo_request()
|
||||
#test_userinfo_request()
|
|
@ -172,7 +172,7 @@ class TestOICConsumer():
|
|||
srv.keyjar = SRVKEYS
|
||||
print "redirect_uris", self.consumer.redirect_uris
|
||||
print "config", self.consumer.config
|
||||
location = self.consumer.begin("openid", "code")
|
||||
sid, location = self.consumer.begin("openid", "code")
|
||||
print location
|
||||
authreq = srv.parse_authorization_request(url=location)
|
||||
print authreq.keys()
|
||||
|
@ -180,7 +180,7 @@ class TestOICConsumer():
|
|||
'response_type', 'client_id', 'scope',
|
||||
'redirect_uri'])
|
||||
|
||||
assert authreq["state"] == self.consumer.state
|
||||
assert authreq["state"] == sid
|
||||
assert authreq["scope"] == self.consumer.config["scope"]
|
||||
assert authreq["client_id"] == self.consumer.client_id
|
||||
|
||||
|
@ -192,7 +192,7 @@ class TestOICConsumer():
|
|||
srv = Server()
|
||||
srv.keyjar = SRVKEYS
|
||||
|
||||
location = self.consumer.begin("openid", "code",
|
||||
sid, location = self.consumer.begin("openid", "code",
|
||||
path="http://localhost:8087")
|
||||
print location
|
||||
#vkeys = {".":srv.keyjar.get_verify_key()}
|
||||
|
@ -202,7 +202,7 @@ class TestOICConsumer():
|
|||
'response_type', 'client_id', 'scope',
|
||||
'claims', 'request_uri'])
|
||||
|
||||
assert authreq["state"] == self.consumer.state
|
||||
assert authreq["state"] == sid
|
||||
assert authreq["scope"] == self.consumer.config["scope"]
|
||||
assert authreq["client_id"] == self.consumer.client_id
|
||||
assert authreq["redirect_uri"].startswith("http://localhost:8087/authz")
|
||||
|
@ -212,7 +212,7 @@ class TestOICConsumer():
|
|||
mfos.keyjar = SRVKEYS
|
||||
|
||||
self.consumer.http_request = mfos.http_request
|
||||
self.consumer.state = "state0"
|
||||
_state = "state0"
|
||||
self.consumer.nonce = rndstr()
|
||||
self.consumer.redirect_uris = ["https://example.com/cb"]
|
||||
args = {
|
||||
|
@ -222,7 +222,7 @@ class TestOICConsumer():
|
|||
}
|
||||
|
||||
result = self.consumer.do_authorization_request(
|
||||
state=self.consumer.state, request_args=args)
|
||||
state=_state, request_args=args)
|
||||
assert result.status_code == 302
|
||||
print "redirect_uris", self.consumer.redirect_uris
|
||||
print result.headers["location"]
|
||||
|
@ -236,21 +236,21 @@ class TestOICConsumer():
|
|||
self.consumer.parse_response(AuthorizationResponse, info=query,
|
||||
sformat="urlencoded")
|
||||
|
||||
resp = self.consumer.complete()
|
||||
resp = self.consumer.complete(_state)
|
||||
print resp
|
||||
assert resp.type() == "AccessTokenResponse"
|
||||
print resp.keys()
|
||||
assert _eq(resp.keys(), ['token_type', 'state', 'access_token',
|
||||
'scope', 'expires_in', 'refresh_token'])
|
||||
|
||||
assert resp["state"] == self.consumer.state
|
||||
assert resp["state"] == _state
|
||||
|
||||
def test_parse_authz(self):
|
||||
mfos = MyFakeOICServer("http://localhost:8088")
|
||||
mfos.keyjar = SRVKEYS
|
||||
|
||||
self.consumer.http_request = mfos.http_request
|
||||
self.consumer.state = "state0"
|
||||
_state = "state0"
|
||||
self.consumer.nonce = rndstr()
|
||||
args = {
|
||||
"client_id": self.consumer.client_id,
|
||||
|
@ -259,7 +259,7 @@ class TestOICConsumer():
|
|||
}
|
||||
|
||||
result = self.consumer.do_authorization_request(
|
||||
state=self.consumer.state, request_args=args)
|
||||
state=_state, request_args=args)
|
||||
|
||||
print self.consumer.sdb.keys()
|
||||
print self.consumer.sdb["state0"].keys()
|
||||
|
@ -270,12 +270,12 @@ class TestOICConsumer():
|
|||
assert part[2] is None
|
||||
|
||||
assert atr.type() == "AuthorizationResponse"
|
||||
assert atr["state"] == "state0"
|
||||
assert atr["state"] == _state
|
||||
assert "code" in atr
|
||||
|
||||
def test_parse_authz_implicit(self):
|
||||
self.consumer.config["response_type"] = "implicit"
|
||||
|
||||
_state = "statxxx"
|
||||
args = {
|
||||
"client_id": self.consumer.client_id,
|
||||
"response_type": "implicit",
|
||||
|
@ -283,7 +283,7 @@ class TestOICConsumer():
|
|||
}
|
||||
|
||||
result = self.consumer.do_authorization_request(
|
||||
state=self.consumer.state, request_args=args)
|
||||
state=_state, request_args=args)
|
||||
|
||||
part = self.consumer.parse_authz(query=result.headers["location"])
|
||||
print part
|
||||
|
@ -292,7 +292,7 @@ class TestOICConsumer():
|
|||
assert part[2] is None
|
||||
|
||||
assert atr.type() == "AccessTokenResponse"
|
||||
assert atr["state"] == "state0"
|
||||
assert atr["state"] == _state
|
||||
assert "access_token" in atr
|
||||
|
||||
|
||||
|
@ -302,7 +302,7 @@ def test_complete_secret_auth():
|
|||
mfos.keyjar = SRVKEYS
|
||||
consumer.http_request = mfos.http_request
|
||||
consumer.redirect_uris = ["http://example.com/authz"]
|
||||
consumer.state = "state0"
|
||||
_state = "state0"
|
||||
consumer.nonce = rndstr()
|
||||
consumer.client_secret = "hemlig"
|
||||
consumer.secret_type = "basic"
|
||||
|
@ -314,7 +314,7 @@ def test_complete_secret_auth():
|
|||
"scope": ["openid"],
|
||||
}
|
||||
|
||||
result = consumer.do_authorization_request(state=consumer.state,
|
||||
result = consumer.do_authorization_request(state=_state,
|
||||
request_args=args)
|
||||
assert result.status_code == 302
|
||||
assert result.headers["location"].startswith(consumer.redirect_uris[0])
|
||||
|
@ -323,14 +323,14 @@ def test_complete_secret_auth():
|
|||
consumer.parse_response(AuthorizationResponse, info=query,
|
||||
sformat="urlencoded")
|
||||
|
||||
resp = consumer.complete()
|
||||
resp = consumer.complete(_state)
|
||||
print resp
|
||||
assert resp.type() == "AccessTokenResponse"
|
||||
print resp.keys()
|
||||
assert _eq(resp.keys(), ['token_type', 'state', 'access_token',
|
||||
'scope', 'expires_in', 'refresh_token'])
|
||||
|
||||
assert resp["state"] == consumer.state
|
||||
assert resp["state"] == _state
|
||||
|
||||
|
||||
def test_complete_auth_token():
|
||||
|
@ -339,7 +339,7 @@ def test_complete_auth_token():
|
|||
mfos.keyjar = SRVKEYS
|
||||
consumer.http_request = mfos.http_request
|
||||
consumer.redirect_uris = ["http://example.com/authz"]
|
||||
consumer.state = "state0"
|
||||
_state = "state0"
|
||||
consumer.nonce = rndstr()
|
||||
consumer.client_secret = "hemlig"
|
||||
consumer.secret_type = "basic"
|
||||
|
@ -351,7 +351,7 @@ def test_complete_auth_token():
|
|||
"scope": ["openid"],
|
||||
}
|
||||
|
||||
result = consumer.do_authorization_request(state=consumer.state,
|
||||
result = consumer.do_authorization_request(state=_state,
|
||||
request_args=args)
|
||||
consumer._backup("state0")
|
||||
|
||||
|
@ -383,7 +383,7 @@ def test_complete_auth_token_idtoken():
|
|||
mfos.keyjar = SRVKEYS
|
||||
consumer.http_request = mfos.http_request
|
||||
consumer.redirect_uris = ["http://example.com/authz"]
|
||||
consumer.state = "state0"
|
||||
_state = "state0"
|
||||
consumer.nonce = rndstr()
|
||||
consumer.client_secret = "hemlig"
|
||||
consumer.secret_type = "basic"
|
||||
|
@ -395,7 +395,7 @@ def test_complete_auth_token_idtoken():
|
|||
"scope": ["openid"],
|
||||
}
|
||||
|
||||
result = consumer.do_authorization_request(state=consumer.state,
|
||||
result = consumer.do_authorization_request(state=_state,
|
||||
request_args=args)
|
||||
consumer._backup("state0")
|
||||
|
||||
|
@ -424,7 +424,7 @@ def test_userinfo():
|
|||
mfos.keyjar = SRVKEYS
|
||||
consumer.http_request = mfos.http_request
|
||||
consumer.redirect_uris = ["http://example.com/authz"]
|
||||
consumer.state = "state0"
|
||||
_state = "state0"
|
||||
consumer.nonce = rndstr()
|
||||
consumer.secret_type = "basic"
|
||||
consumer.set_client_secret("hemligt")
|
||||
|
@ -436,7 +436,7 @@ def test_userinfo():
|
|||
"scope": ["openid"],
|
||||
}
|
||||
|
||||
result = consumer.do_authorization_request(state=consumer.state,
|
||||
result = consumer.do_authorization_request(state=_state,
|
||||
request_args=args)
|
||||
assert result.status_code == 302
|
||||
assert result.headers["location"].startswith(consumer.redirect_uris[0])
|
||||
|
@ -445,9 +445,9 @@ def test_userinfo():
|
|||
consumer.parse_response(AuthorizationResponse, info=query,
|
||||
sformat="urlencoded")
|
||||
|
||||
consumer.complete()
|
||||
consumer.complete(_state)
|
||||
|
||||
result = consumer.get_user_info()
|
||||
result = consumer.get_user_info(_state)
|
||||
print result
|
||||
assert result.type() == "OpenIDSchema"
|
||||
assert _eq(result.keys(), ['name', 'email', 'verified', 'nickname', 'sub'])
|
||||
|
@ -548,7 +548,7 @@ def test_client_register():
|
|||
|
||||
|
||||
if __name__ == "__main__":
|
||||
#t = TestOICConsumer()
|
||||
#t.setup_class()
|
||||
#t.test_begin()
|
||||
test_provider_config()
|
||||
t = TestOICConsumer()
|
||||
t.setup_class()
|
||||
t.test_complete()
|
||||
#test_provider_config()
|
|
@ -255,7 +255,8 @@ def test_server_authenticated():
|
|||
cons.debug = True
|
||||
cons.keyjar[""] = KC_RSA
|
||||
|
||||
location = cons.begin("openid", "code", path="http://localhost:8087")
|
||||
_state, location = cons.begin("openid", "code",
|
||||
path="http://localhost:8087")
|
||||
|
||||
QUERY_STRING = location.split("?")[1]
|
||||
print QUERY_STRING
|
||||
|
@ -280,9 +281,10 @@ def test_server_authenticated():
|
|||
assert _eq(aresp.keys(), ['request', 'state', 'redirect_uri',
|
||||
'response_type', 'client_id', 'claims', 'scope'])
|
||||
|
||||
print cons.grant[cons.state].keys()
|
||||
assert _eq(cons.grant[cons.state].keys(), ['tokens', 'id_token', 'exp_in',
|
||||
'seed', 'grant_expiration_time'])
|
||||
print cons.grant[_state].keys()
|
||||
assert _eq(cons.grant[_state].keys(),
|
||||
['code', 'tokens', 'id_token', 'exp_in', 'seed',
|
||||
'grant_expiration_time'])
|
||||
|
||||
|
||||
def test_server_authenticated_1():
|
||||
|
@ -293,7 +295,7 @@ def test_server_authenticated_1():
|
|||
cons.debug = True
|
||||
cons.keyjar[""] = KC_RSA
|
||||
|
||||
location = cons.begin("openid", "code", path="http://localhost:8087")
|
||||
state, location = cons.begin("openid", "code", path="http://localhost:8087")
|
||||
|
||||
resp = server.authorization_endpoint(request=location.split("?")[1])
|
||||
|
||||
|
@ -316,9 +318,9 @@ def test_server_authenticated_2():
|
|||
cons.debug = True
|
||||
cons.keyjar[""] = KC_RSA
|
||||
|
||||
location = cons.begin(scope="openid email claims_in_id_token",
|
||||
response_type="code id_token",
|
||||
path="http://localhost:8087")
|
||||
_state, location = cons.begin(scope="openid email claims_in_id_token",
|
||||
response_type="code id_token",
|
||||
path="http://localhost:8087")
|
||||
|
||||
print location
|
||||
resp = server.authorization_endpoint(request=location.split("?")[1])
|
||||
|
@ -339,15 +341,16 @@ def test_server_authenticated_2():
|
|||
assert aresp.type() == "AuthorizationResponse"
|
||||
assert _eq(aresp.keys(), ['scope', 'state', 'code', 'id_token'])
|
||||
|
||||
print cons.grant[cons.state].keys()
|
||||
assert _eq(cons.grant[cons.state].keys(), ['code', 'id_token', 'tokens',
|
||||
print cons.grant[_state].keys()
|
||||
assert _eq(cons.grant[_state].keys(), ['code', 'id_token', 'tokens',
|
||||
'exp_in',
|
||||
'grant_expiration_time', 'seed'])
|
||||
id_token = part[2]
|
||||
assert isinstance(id_token, IdToken)
|
||||
print id_token.keys()
|
||||
assert _eq(id_token.keys(), ['c_hash', 'sub', 'iss', 'acr', 'exp', 'iat',
|
||||
'aud', 'nonce'])
|
||||
assert _eq(id_token.keys(),
|
||||
['nonce', 'c_hash', 'sub', 'iss', 'acr', 'exp', 'auth_time',
|
||||
'iat', 'aud'])
|
||||
|
||||
|
||||
def test_server_authenticated_token():
|
||||
|
@ -359,8 +362,8 @@ def test_server_authenticated_token():
|
|||
cons.debug = True
|
||||
cons.keyjar[""] = KC_RSA
|
||||
|
||||
location = cons.begin("openid", response_type="token",
|
||||
path="http://localhost:8087")
|
||||
_state, location = cons.begin("openid", response_type="token",
|
||||
path="http://localhost:8087")
|
||||
|
||||
resp = server.authorization_endpoint(request=location.split("?")[1])
|
||||
|
||||
|
@ -377,8 +380,8 @@ def test_server_authenticated_none():
|
|||
cons.debug = True
|
||||
cons.keyjar[""] = KC_RSA
|
||||
|
||||
location = cons.begin("openid", response_type="none",
|
||||
path="http://localhost:8087")
|
||||
_state, location = cons.begin("openid", response_type="none",
|
||||
path="http://localhost:8087")
|
||||
|
||||
resp = server.authorization_endpoint(request=location.split("?")[1])
|
||||
|
||||
|
@ -506,7 +509,7 @@ def test_userinfo_endpoint():
|
|||
cons.config["request_method"] = "parameter"
|
||||
cons.keyjar[""] = KC_RSA
|
||||
|
||||
location = cons.begin("openid", "token", path="http://localhost:8087")
|
||||
state, location = cons.begin("openid", "token", path="http://localhost:8087")
|
||||
|
||||
resp = server.authorization_endpoint(request=location.split("?")[1])
|
||||
|
||||
|
@ -684,4 +687,4 @@ def test_registered_redirect_uri_with_query_component():
|
|||
assert resp is None
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_token_endpoint()
|
||||
test_registration_endpoint()
|
|
@ -39,26 +39,26 @@ def _eq(l1, l2):
|
|||
def test_token():
|
||||
sdb = SessionDB()
|
||||
sid = sdb.token.key(areq=AREQ)
|
||||
assert len(sid) == 28
|
||||
assert len(sid) == 56
|
||||
|
||||
sdb = SessionDB({"a": "b"})
|
||||
sid = sdb.token.key(areq=AREQ)
|
||||
assert len(sid) == 28
|
||||
assert len(sid) == 56
|
||||
|
||||
|
||||
def test_new_token():
|
||||
sdb = SessionDB()
|
||||
sid = sdb.token.key(areq=AREQ)
|
||||
assert len(sid) == 28
|
||||
assert len(sid) == 56
|
||||
|
||||
code2 = sdb.token('T', sid=sid)
|
||||
assert len(sid) == 28
|
||||
assert len(sid) == 56
|
||||
|
||||
code3 = sdb.token(ttype="", prev=code2)
|
||||
assert code2 != code3
|
||||
|
||||
sid2 = sdb.token.key(areq=AREQ, user="jones")
|
||||
assert len(sid2) == 28
|
||||
assert len(sid2) == 56
|
||||
assert sid != sid2
|
||||
|
||||
|
||||
|
|
Reference in New Issue