This commit is contained in:
Roland Hedberg 2013-02-22 09:50:04 +01:00
parent fabca09179
commit 9c3aaa8795
14 changed files with 370 additions and 282 deletions

View File

@ -214,7 +214,7 @@ class Consumer(Client):
# Might be an error response
try:
aresp = self.parse_response(AuthorizationResponse,
info=_query, format="urlencoded")
info=_query, sformat="urlencoded")
except Exception, err:
logger.error("%s" % err)
raise
@ -233,7 +233,7 @@ class Consumer(Client):
return aresp
else: # implicit flow
atr = self.parse_response(AccessTokenResponse,
info=_query, format="urlencoded",
info=_query, sformat="urlencoded",
extended=True)
if isinstance(atr, Message):

View File

@ -371,7 +371,7 @@ class Consumer(Client):
_log_info("Expect Authorization Response")
aresp = self.parse_response(AuthorizationResponse,
info=_query,
format="urlencoded",
sformat="urlencoded",
keyjar=self.keyjar)
if aresp.type() == "ErrorResponse":
_log_info("ErrorResponse: %s" % aresp)
@ -407,7 +407,7 @@ class Consumer(Client):
else: # implicit flow
_log_info("Expect Access Token Response")
atr = self.parse_response(AccessTokenResponse, info=_query,
format="urlencoded",
sformat="urlencoded",
keyjar=self.keyjar)
if atr.type() == "ErrorResponse":
raise TokenError(atr["error"])

View File

@ -24,10 +24,12 @@ from jwkest import jws
logger = logging.getLogger(__name__)
#noinspection PyUnusedLocal
def json_ser(val, sformat=None, lev=0):
return json.dumps(val)
#noinspection PyUnusedLocal
def json_deser(val, sformat=None, lev=0):
return json.loads(val)
@ -36,17 +38,20 @@ SINGLE_OPTIONAL_BOOLEAN = (bool, False, None, None)
SINGLE_OPTIONAL_JSON = (dict, False, json_ser, json_deser)
SINGLE_REQUIRED_INT = (int, True, None, None)
def idtoken_deser(val, sformat="urlencoded"):
# id_token are always serialized as a JWT
return IdToken().deserialize(val, "jwt")
def idtokenclaim_deser(val, sformat="urlencoded"):
if sformat in ["dict", "json"]:
if not isinstance(val, basestring):
val = json.dumps(val)
sformat="json"
sformat = "json"
return IDTokenClaim().deserialize(val, sformat)
def userinfo_deser(val, sformat="urlencoded"):
if sformat in ["dict", "json"]:
if not isinstance(val, basestring):
@ -54,6 +59,7 @@ def userinfo_deser(val, sformat="urlencoded"):
sformat = "json"
return UserInfoClaim().deserialize(val, sformat)
def address_deser(val, sformat="urlencoded"):
if sformat in ["dict", "json"]:
if not isinstance(val, basestring):
@ -61,6 +67,7 @@ def address_deser(val, sformat="urlencoded"):
sformat = "json"
return AddressClaim().deserialize(val, sformat)
def claims_deser(val, sformat="urlencoded"):
if sformat in ["dict", "json"]:
if not isinstance(val, basestring):
@ -68,6 +75,7 @@ def claims_deser(val, sformat="urlencoded"):
sformat = "json"
return Claims().deserialize(val, sformat)
def msg_ser(inst, sformat, lev=0):
if sformat in ["urlencoded", "json"]:
if isinstance(inst, dict) or isinstance(inst, Message):
@ -86,9 +94,11 @@ def msg_ser(inst, sformat, lev=0):
return res
def msg_list_ser(insts, sformat, lev=0):
return [msg_ser(inst, sformat, lev) for inst in insts]
def claims_ser(val, sformat="urlencoded", lev=0):
# everything in c_extension
if isinstance(val, basestring):
@ -99,7 +109,7 @@ def claims_ser(val, sformat="urlencoded", lev=0):
item = val
if isinstance(item, Message):
return item.serialize(method=sformat, lev=lev+1)
return item.serialize(method=sformat, lev=lev + 1)
if sformat == "urlencoded":
res = urllib.urlencode(item)
@ -118,6 +128,7 @@ def claims_ser(val, sformat="urlencoded", lev=0):
return res
def registration_request_deser(val, sformat="urlencoded"):
if sformat in ["dict", "json"]:
if not isinstance(val, basestring):
@ -134,14 +145,14 @@ SINGLE_OPTIONAL_ID_TOKEN_CLAIM = (Message, False, msg_ser, idtokenclaim_deser)
SINGLE_OPTIONAL_JWT = (basestring, False, msg_ser, None)
SINGLE_OPTIONAL_IDTOKEN = (basestring, False, msg_ser, None)
SINGLE_OPTIONAL_REGISTRATION_REQUEST= (Message, False, msg_ser,
registration_request_deser)
SINGLE_OPTIONAL_REGISTRATION_REQUEST = (Message, False, msg_ser,
registration_request_deser)
# ----------------------------------------------------------------------------
SCOPE_CHARSET = []
for char in ['\x21', ('\x23','\x5b'), ('\x5d','\x7E')]:
for char in ['\x21', ('\x23', '\x5b'), ('\x5d', '\x7E')]:
if isinstance(set, tuple):
c = char[0]
while c <= char[1]:
@ -150,6 +161,7 @@ for char in ['\x21', ('\x23','\x5b'), ('\x5d','\x7E')]:
else:
SCOPE_CHARSET.append(set)
def check_char_set(string, allowed):
for c in string:
if c not in allowed:
@ -236,9 +248,10 @@ class AuthorizationResponse(message.AuthorizationResponse,
raise MissingRequiredAttribute("Missing at_hash property")
try:
assert idt["at_hash"] == jws.left_hash(
self["access_token"], hfunc )
self["access_token"], hfunc)
except AssertionError:
raise VerificationError("Failed to verify access_token hash")
raise VerificationError(
"Failed to verify access_token hash")
if "code" in self:
try:
@ -274,12 +287,12 @@ class AuthorizationRequest(message.AuthorizationRequest):
"nonce": SINGLE_OPTIONAL_STRING,
"scope": REQUIRED_LIST_OF_SP_SEP_STRINGS,
"id_token": SINGLE_OPTIONAL_STRING
})
})
c_allowed_values = message.AuthorizationRequest.c_allowed_values.copy()
c_allowed_values = {
"display": ["page", "popup", "touch", "wap"],
"prompt": ["none", "login", "consent", "select_account"]
}
"display": ["page", "popup", "touch", "wap"],
"prompt": ["none", "login", "consent", "select_account"]
}
def verify(self, **kwargs):
"""Authorization Request parameters that are OPTIONAL in the OAuth 2.0
@ -411,14 +424,14 @@ class RegistrationRequest(Message):
"id_token_encrypted_response_enc": SINGLE_OPTIONAL_STRING,
"default_max_age": SINGLE_OPTIONAL_INT,
"require_auth_time": OPTIONAL_LOGICAL,
"default_acr":SINGLE_OPTIONAL_STRING,
"default_acr": SINGLE_OPTIONAL_STRING,
"initiate_login_uri": SINGLE_OPTIONAL_STRING,
"post_logout_redirect_url": SINGLE_OPTIONAL_STRING,
#"client_id": SINGLE_OPTIONAL_STRING,
#"client_secret": SINGLE_OPTIONAL_STRING,
}
c_default = {"application_type": "web"}
c_allowed_values = {"operation" : ["register", "client_update"],
c_allowed_values = {"operation": ["register", "client_update"],
"application_type": ["native", "web"],
"subject_type": ["public", "pairwise"]}
@ -470,9 +483,9 @@ class RegistrationResponse(Message):
class ClientRegistrationErrorResponse(message.ErrorResponse):
c_allowed_values= {"error":["invalid_operation", "invalid_client_id",
"invalid_redirect_uri",
"invalid_configuration_parameter"]}
c_allowed_values= {"error": ["invalid_operation", "invalid_client_id",
"invalid_redirect_uri",
"invalid_configuration_parameter"]}
class IdToken(OpenIDSchema):
@ -480,7 +493,7 @@ class IdToken(OpenIDSchema):
c_param.update({
"iss": SINGLE_REQUIRED_STRING,
"sub": SINGLE_REQUIRED_STRING,
"aud": REQUIRED_LIST_OF_STRINGS, # Array of strings or string
"aud": REQUIRED_LIST_OF_STRINGS, # Array of strings or string
"azp": SINGLE_OPTIONAL_STRING,
"exp": SINGLE_REQUIRED_INT,
"iat": SINGLE_REQUIRED_INT,
@ -550,55 +563,50 @@ class OpenIDRequest(message.AuthorizationRequest):
c_param.update({"userinfo": SINGLE_OPTIONAL_USERINFO_CLAIM,
"id_token": SINGLE_OPTIONAL_ID_TOKEN_CLAIM,
"iss": SINGLE_OPTIONAL_STRING,
"aud": OPTIONAL_LIST_OF_STRINGS, # Array of strings or string
"aud": OPTIONAL_LIST_OF_STRINGS,
#"nonce": SINGLE_OPTIONAL_STRING,
"registration": SINGLE_OPTIONAL_REGISTRATION_REQUEST})
class ProviderConfigurationResponse(Message):
c_param = {
"version": SINGLE_REQUIRED_STRING,
"issuer": SINGLE_REQUIRED_STRING,
"authorization_endpoint": SINGLE_OPTIONAL_STRING,
"token_endpoint": SINGLE_OPTIONAL_STRING,
"userinfo_endpoint": SINGLE_OPTIONAL_STRING,
"check_session_iframe": SINGLE_OPTIONAL_STRING,
"end_session_endpoint": SINGLE_OPTIONAL_STRING,
"jwk_url": SINGLE_OPTIONAL_STRING,
"jwk_encryption_url": SINGLE_OPTIONAL_STRING,
"x509_url": SINGLE_REQUIRED_STRING,
"x509_encryption_url": SINGLE_OPTIONAL_STRING,
"registration_endpoint": SINGLE_OPTIONAL_STRING,
"scopes_supported": OPTIONAL_LIST_OF_STRINGS,
"response_types_supported": REQUIRED_LIST_OF_STRINGS,
"acr_values_supported": OPTIONAL_LIST_OF_STRINGS,
"subject_types_supported": REQUIRED_LIST_OF_STRINGS,
"userinfo_signing_alg_values_supported": OPTIONAL_LIST_OF_STRINGS,
"userinfo_encryption_alg_values_supported":
OPTIONAL_LIST_OF_STRINGS,
"userinfo_encryption_enc_values_supported":
OPTIONAL_LIST_OF_STRINGS,
"id_token_signing_alg_values_supported": REQUIRED_LIST_OF_STRINGS,
"id_token_encryption_alg_values_supported":
OPTIONAL_LIST_OF_STRINGS,
"id_token_encryption_enc_values_supported":
OPTIONAL_LIST_OF_STRINGS,
"request_object_signing_alg_values_supported":
OPTIONAL_LIST_OF_STRINGS,
"request_object_encryption_alg_values_supported":
OPTIONAL_LIST_OF_STRINGS,
"request_object_encryption_enc_values_supported":
OPTIONAL_LIST_OF_STRINGS,
"token_endpoint_auth_methods_supported": OPTIONAL_LIST_OF_STRINGS,
"token_endpoint_auth_signing_alg_values_supported":
OPTIONAL_LIST_OF_STRINGS,
"display_values_supported": OPTIONAL_LIST_OF_STRINGS,
"claim_types_supported": OPTIONAL_LIST_OF_STRINGS,
"claims_supported": OPTIONAL_LIST_OF_STRINGS,
"service_documentation": SINGLE_OPTIONAL_STRING
}
"version": SINGLE_REQUIRED_STRING,
"issuer": SINGLE_REQUIRED_STRING,
"authorization_endpoint": SINGLE_OPTIONAL_STRING,
"token_endpoint": SINGLE_OPTIONAL_STRING,
"userinfo_endpoint": SINGLE_OPTIONAL_STRING,
"check_session_iframe": SINGLE_OPTIONAL_STRING,
"end_session_endpoint": SINGLE_OPTIONAL_STRING,
"jwk_url": SINGLE_OPTIONAL_STRING,
"jwk_encryption_url": SINGLE_OPTIONAL_STRING,
"x509_url": SINGLE_REQUIRED_STRING,
"x509_encryption_url": SINGLE_OPTIONAL_STRING,
"registration_endpoint": SINGLE_OPTIONAL_STRING,
"scopes_supported": OPTIONAL_LIST_OF_STRINGS,
"response_types_supported": REQUIRED_LIST_OF_STRINGS,
"acr_values_supported": OPTIONAL_LIST_OF_STRINGS,
"subject_types_supported": REQUIRED_LIST_OF_STRINGS,
"userinfo_signing_alg_values_supported": OPTIONAL_LIST_OF_STRINGS,
"userinfo_encryption_alg_values_supported": OPTIONAL_LIST_OF_STRINGS,
"userinfo_encryption_enc_values_supported": OPTIONAL_LIST_OF_STRINGS,
"id_token_signing_alg_values_supported": REQUIRED_LIST_OF_STRINGS,
"id_token_encryption_alg_values_supported": OPTIONAL_LIST_OF_STRINGS,
"id_token_encryption_enc_values_supported": OPTIONAL_LIST_OF_STRINGS,
"request_object_signing_alg_values_supported": OPTIONAL_LIST_OF_STRINGS,
"request_object_encryption_alg_values_supported":
OPTIONAL_LIST_OF_STRINGS,
"request_object_encryption_enc_values_supported":
OPTIONAL_LIST_OF_STRINGS,
"token_endpoint_auth_methods_supported": OPTIONAL_LIST_OF_STRINGS,
"token_endpoint_auth_signing_alg_values_supported":
OPTIONAL_LIST_OF_STRINGS,
"display_values_supported": OPTIONAL_LIST_OF_STRINGS,
"claim_types_supported": OPTIONAL_LIST_OF_STRINGS,
"claims_supported": OPTIONAL_LIST_OF_STRINGS,
"service_documentation": SINGLE_OPTIONAL_STRING
}
c_default = {"version": "3.0",
"token_endpoint_auth_methods_supported":"client_secret_basic"}
"token_endpoint_auth_methods_supported": "client_secret_basic"}
def verify(self, **kwargs):
if "scopes_supported" in self:
@ -622,7 +630,7 @@ class AuthnToken(Message):
"exp": SINGLE_REQUIRED_INT,
"iat": SINGLE_OPTIONAL_INT,
"azp": SINGLE_OPTIONAL_INT,
}
}
class UserInfoErrorResponse(message.ErrorResponse):
@ -655,18 +663,18 @@ SCOPE2CLAIMS = {
}
MSG = {
"RefreshAccessTokenRequest" : RefreshAccessTokenRequest,
"TokenErrorResponse" :TokenErrorResponse,
"RefreshAccessTokenRequest": RefreshAccessTokenRequest,
"TokenErrorResponse": TokenErrorResponse,
"AccessTokenResponse": AccessTokenResponse,
"UserInfoRequest": UserInfoRequest,
"AuthorizationResponse" : AuthorizationResponse,
"AuthorizationErrorResponse" : AuthorizationErrorResponse,
"AuthorizationResponse": AuthorizationResponse,
"AuthorizationErrorResponse": AuthorizationErrorResponse,
"AuthorizationRequest": AuthorizationRequest,
"AccessTokenRequest" : AccessTokenRequest,
"AccessTokenRequest": AccessTokenRequest,
"AddressClaim": AddressClaim,
"OpenIDSchema": OpenIDSchema,
"RegistrationRequest": RegistrationRequest,
"RegistrationResponse" : RegistrationResponse,
"RegistrationResponse": RegistrationResponse,
"ClientRegistrationErrorResponse": ClientRegistrationErrorResponse,
"IdToken": IdToken,
"RefreshSessionRequest": RefreshSessionRequest,

View File

@ -168,12 +168,12 @@ def getpath(environ):
quote(environ.get('PATH_INFO', ''))])
def _expiration(timeout, strformat=None):
def _expiration(timeout, time_format=None):
if timeout == "now":
return time_util.instant(strformat)
return time_util.instant(time_format)
else:
# validity time should match lifetime of assertions
return time_util.in_a_while(minutes=timeout, format=strformat)
return time_util.in_a_while(minutes=timeout, time_format=time_format)
def cookie_signature(seed, *parts):

View File

@ -291,7 +291,7 @@ class KeyBundle(object):
pass
for key, val in self._key.items():
if val is []:
if val == []:
del self._key[key]
def __str__(self):

View File

@ -76,7 +76,7 @@ class Token(object):
def key(self, user="", areq=None):
csum = hmac.new(self.secret, digestmod=hashlib.sha224)
csum.update("%s" % time.time())
csum.update("%s" % utc_time_sans_frac())
csum.update("%f" % random.random())
if user:
csum.update(user)
@ -105,7 +105,7 @@ class Token(object):
# first _sidlen bytes are the sid
_sid = plain[:self._sidlen]
_type = plain[self._sidlen]
_rnd = plain[self._sidlen+1:]
_rnd = plain[self._sidlen + 1:]
return _type, _sid, _rnd
def type_and_key(self, token):
@ -187,9 +187,9 @@ class SessionDB(object):
logger.debug("uid: %s, old: %s" % (uid, old))
self.uid2sid[uid] = sid
for id in old:
for old_id in old:
try:
del self.uid2sid[id]
del self.uid2sid[old_id]
except KeyError:
pass
@ -281,7 +281,7 @@ class SessionDB(object):
dic["token_type"] = "Bearer"
dic["expires_at"] = utc_time_sans_frac() + self.token_expires_in
dic["expires_in"] = self.token_expires_in
dic["issued"] = time.time()
dic["issued"] = utc_time_sans_frac()
if id_token:
dic["id_token"] = id_token
if oidreq:
@ -307,7 +307,8 @@ class SessionDB(object):
access_token = self.token("T", prev=rtoken)
dic["issued"] = time.time()
dic["expires_at"] = utc_time_sans_frac() + self.token_expires_in
dic["issued"] = utc_time_sans_frac()
dic["access_token"] = access_token
self._db[sid] = dic
#self._db[dic["xxxx"]] = dic
@ -316,8 +317,8 @@ class SessionDB(object):
raise WrongTokenType("Not a refresh token!")
def is_expired(self, sess):
if "issued" in sess:
if (sess["issued"] + sess["expires_in"]) < time.time():
if "expires_at" in sess:
if sess["expires_at"] < utc_time_sans_frac():
return True
return False

View File

@ -195,23 +195,23 @@ def time_a_while_ago(days=0, seconds=0, microseconds=0, milliseconds=0,
def in_a_while(days=0, seconds=0, microseconds=0, milliseconds=0,
minutes=0, hours=0, weeks=0, format=TIME_FORMAT):
minutes=0, hours=0, weeks=0, time_format=TIME_FORMAT):
"""
format of timedelta:
timedelta([days[, seconds[, microseconds[, milliseconds[,
minutes[, hours[, weeks]]]]]]])
"""
if format is None:
format = TIME_FORMAT
if not time_format:
time_format = TIME_FORMAT
return time_in_a_while(days, seconds, microseconds, milliseconds,
minutes, hours, weeks).strftime(format)
minutes, hours, weeks).strftime(time_format)
def a_while_ago(days=0, seconds=0, microseconds=0, milliseconds=0,
minutes=0, hours=0, weeks=0, format=TIME_FORMAT):
minutes=0, hours=0, weeks=0, time_format=TIME_FORMAT):
return time_a_while_ago(days, seconds, microseconds, milliseconds,
minutes, hours, weeks).strftime(format)
minutes, hours, weeks).strftime(time_format)
# ---------------------------------------------------------------------------
@ -228,17 +228,17 @@ def shift_time(dtime, shift):
# ---------------------------------------------------------------------------
def str_to_time(timestr, format=TIME_FORMAT):
def str_to_time(timestr, time_format=TIME_FORMAT):
"""
:param timestr:
:param format:
:param time_format:
:return: UTC time
"""
if not timestr:
return 0
try:
then = time.strptime(timestr, format)
then = time.strptime(timestr, time_format)
except ValueError: # assume it's a format problem
try:
elem = TIME_FORMAT_WITH_FRAGMENT.match(timestr)
@ -250,8 +250,8 @@ def str_to_time(timestr, format=TIME_FORMAT):
return time.gmtime(calendar.timegm(then))
def instant(format=TIME_FORMAT):
return time.strftime(format, time.gmtime())
def instant(time_format=TIME_FORMAT):
return time.strftime(time_format, time.gmtime())
# ---------------------------------------------------------------------------
@ -306,3 +306,21 @@ def later_than(after, before):
before = time.gmtime(before)
return after >= before
def utc_time_sans_frac():
return int("%d" % time.mktime(time.gmtime()))
def time_sans_frac():
return int("%d" % time.time())
def epoch_in_a_while(days=0, seconds=0, microseconds=0, milliseconds=0,
minutes=0, hours=0, weeks=0):
"""
Return a point in the future as number of seconds since the epoch
1970-01-01
"""
return int("%d" % time.mktime(time_in_a_while(days, seconds, microseconds,
milliseconds, minutes, hours,
weeks).timetuple()))

View File

@ -23,6 +23,7 @@ from oic.utils.sdb import Crypt
from pytest import raises
def _eq(l1, l2):
return set(l1) == set(l2)
@ -34,6 +35,7 @@ ACC_TOK_RESP = AccessTokenResponse(access_token="2YotnFZFEjr1zCsicMWpAA",
example_parameter="example_value",
scope=["inner", "outer"])
def test_grant():
grant = Grant()
assert grant
@ -42,6 +44,7 @@ def test_grant():
grant = Grant(60)
assert grant.exp_in == 60
def test_grant_from_code():
ar = AuthorizationResponse(code="code", state="state")
@ -50,6 +53,7 @@ def test_grant_from_code():
assert grant
assert grant.code == "code"
def test_grant_add_code():
ar = AuthorizationResponse(code="code", state="state")
@ -58,6 +62,7 @@ def test_grant_add_code():
assert grant
assert grant.code == "code"
def test_grant_update():
ar = AuthorizationResponse(code="code", state="state")
@ -67,6 +72,7 @@ def test_grant_update():
assert grant
assert grant.code == "code"
def test_grant_set():
ar = AuthorizationResponse(code="code", state="state")
@ -75,6 +81,7 @@ def test_grant_set():
assert grant
assert grant.code == "code"
def test_grant_add_token():
grant = Grant()
@ -87,6 +94,7 @@ def test_grant_add_token():
assert token.token_type == "example"
assert token.refresh_token == "tGzv3JOkF0XG5Qx2TlKWIA"
def test_grant_set_3():
err = ErrorResponse(error="invalid_request")
grant = Grant()
@ -105,7 +113,7 @@ class TestOAuthClient():
def test_areq_1(self):
ar = self.client.construct_AuthorizationRequest(
request_args={"response_type":["code"]})
request_args={"response_type": ["code"]})
assert ar["redirect_uri"] == "http://example.com/redirect"
assert ar["response_type"] == ["code"]
@ -138,7 +146,7 @@ class TestOAuthClient():
def test_parse_authz_resp_url(self):
url = "https://client.example.com/cb?code=SplxlOBeZQQYbYS6WxSbIA&state=ghi"
aresp = self.client.parse_response(AuthorizationResponse,
info=url, format="urlencoded")
info=url, sformat="urlencoded")
assert aresp["code"] == "SplxlOBeZQQYbYS6WxSbIA"
assert aresp["state"] == "ghi"
@ -150,7 +158,7 @@ class TestOAuthClient():
def test_parse_authz_resp_query(self):
query = "code=SplxlOBeZQQYbYS6WxSbIA&state=hij"
aresp = self.client.parse_response(AuthorizationResponse,
info=query, format="urlencoded")
info=query, sformat="urlencoded")
assert aresp["code"] == "SplxlOBeZQQYbYS6WxSbIA"
assert aresp["state"] == "hij"
@ -163,7 +171,7 @@ class TestOAuthClient():
def test_parse_authz_resp_query_multi_scope(self):
query = "code=SplxlOBeZQQYbYS6WxAAAA&state=klm"
aresp = self.client.parse_response(AuthorizationResponse,
info=query, format="urlencoded")
info=query, sformat="urlencoded")
assert aresp["code"] == "SplxlOBeZQQYbYS6WxAAAA"
assert aresp["state"] == "klm"
@ -177,7 +185,7 @@ class TestOAuthClient():
def test_parse_authz_resp_query_unknown_parameter(self):
query = "code=SplxlOBeZQQYbYS6WxSbIA&state=xyz&foo=bar"
aresp = self.client.parse_response(AuthorizationResponse,
info=query, format="urlencoded")
info=query, sformat="urlencoded")
assert aresp["code"] == "SplxlOBeZQQYbYS6WxSbIA"
assert aresp["state"] == "xyz"
@ -261,7 +269,7 @@ class TestOAuthClient():
def test_get_access_token_refresh_1(self):
print self.client.grant
self.client.grant[""].grant_expiration_time = time.time()+60
self.client.grant[""].grant_expiration_time = time.time() + 60
self.client.grant[""].code = "access_code"
token = self.client.grant[""].tokens[0]
print token
@ -275,12 +283,13 @@ class TestOAuthClient():
def test_get_access_token_refresh_2(self):
self.client.grant["foo"] = Grant()
self.client.grant["foo"].grant_expiration_time = time_util.utc_time_sans_frac()+60
_get = time_util.utc_time_sans_frac() + 60
self.client.grant["foo"].grant_expiration_time = _get
self.client.grant["foo"].code = "access_code"
print self.client.grant["foo"]
resp = AccessTokenResponse(refresh_token = "refresh_with_me",
access_token = "access")
resp = AccessTokenResponse(refresh_token="refresh_with_me",
access_token="access")
self.client.grant["foo"].tokens.append(Token(resp))
# Uses refresh_token from previous response
@ -294,7 +303,7 @@ class TestOAuthClient():
ruri = "https://client.example.com/cb?error=access_denied&amp;state=xyz"
resp = self.client.parse_response(AuthorizationResponse,
info=ruri, format="urlencoded")
info=ruri, sformat="urlencoded")
print type(resp), resp
assert resp.type() == "AuthorizationErrorResponse"
@ -307,8 +316,8 @@ class TestOAuthClient():
def test_construct_request_with_extra_args(self):
print self.client.__dict__.items()
req = self.client.construct_AccessTokenRequest(state="foo",
extra_args={"foo":"bar"})
req = self.client.construct_AccessTokenRequest(
state="foo", extra_args={"foo": "bar"})
assert req
print req.keys()
@ -331,14 +340,14 @@ class TestOAuthClient():
# default == "POST"
assert uri == 'https://example.com/authz'
assert body == "redirect_uri=http%3A%2F%2Fclient.example.com%2Fauthz&response_type=code&client_id=1"
assert h_args == {'headers': {'content-type': 'application/x-www-form-urlencoded'}}
assert h_args == {'headers': {'content-type':
'application/x-www-form-urlencoded'}}
assert cis.type() == "AuthorizationRequest"
def test_request_info_simple_get(self):
#self.client.authorization_endpoint = "https://example.com/authz"
uri, body, h_args, cis = self.client.request_info(
AuthorizationRequest,
method="GET")
uri, body, h_args, cis = self.client.request_info(AuthorizationRequest,
method="GET")
assert uri == 'https://example.com/authz?redirect_uri=http%3A%2F%2Fclient.example.com%2Fauthz&response_type=code&client_id=1'
assert body is None
@ -348,9 +357,7 @@ class TestOAuthClient():
def test_request_info_simple_get_with_req_args(self):
#self.client.authorization_endpoint = "https://example.com/authz"
uri, body, h_args, cis = self.client.request_info(
AuthorizationRequest,
method="GET",
request_args={"state":"init"})
AuthorizationRequest, method="GET", request_args={"state": "init"})
print uri
assert uri == 'https://example.com/authz?state=init&redirect_uri=http%3A%2F%2Fclient.example.com%2Fauthz&response_type=code&client_id=1'
@ -361,9 +368,7 @@ class TestOAuthClient():
def test_request_info_simple_get_with_extra_args(self):
#self.client.authorization_endpoint = "https://example.com/authz"
uri, body, h_args, cis = self.client.request_info(
AuthorizationRequest,
method="GET",
extra_args={"rock":"little"})
AuthorizationRequest, method="GET", extra_args={"rock":"little"})
print uri
assert uri == 'https://example.com/authz?redirect_uri=http%3A%2F%2Fclient.example.com%2Fauthz&response_type=code&client_id=1&rock=little'
@ -385,6 +390,7 @@ class TestOAuthClient():
assert h_args == {}
assert cis.type() == "AuthorizationRequest"
def test_get_authorization_request():
client = Client()
client.redirect_uris = ["https://www.example.com/authz"]
@ -404,6 +410,7 @@ def test_get_authorization_request():
assert ar["redirect_uri"] == 'https://www.example.com/authz'
assert ar["response_type"] == ['code']
def test_get_access_token_request():
resp = AuthorizationResponse(code="code", state="state")
grant = Grant(1)
@ -417,6 +424,7 @@ def test_get_access_token_request():
except Exception, err:
assert err.__class__.__name__ == "GrantExpired"
def test_parse_access_token_response():
client = Client()
@ -435,13 +443,13 @@ def test_parse_access_token_response():
uec = at.to_urlencoded()
raises(ValueError, 'client.parse_response(ATR, info=uec)')
uatr = client.parse_response(ATR, info=uec, format="urlencoded")
uatr = client.parse_response(ATR, info=uec, sformat="urlencoded")
assert _eq(uatr.keys(), ['access_token', 'token_type', 'expire_in',
'refresh_token'])
huec = "%s?%s" % ("https://example.com/token", uec)
uatr = client.parse_response(ATR, info=huec, format="urlencoded")
uatr = client.parse_response(ATR, info=huec, sformat="urlencoded")
assert _eq(uatr.keys(), ['access_token', 'token_type', 'expire_in',
'refresh_token'])
@ -453,21 +461,20 @@ def test_parse_access_token_response():
uerr = err.to_urlencoded()
_ = client.parse_response(ATR, info=jerr)
_ = client.parse_response(ATR, info=uerr, format="urlencoded")
_ = client.parse_response(ATR, info=uerr, sformat="urlencoded")
raises(Exception,
'client.parse_response(ATR, info=jerr, format="urlencoded")')
'client.parse_response(ATR, info=jerr, sformat="urlencoded")')
raises(Exception, "client.parse_response(ATR, info=uerr)")
raises(Exception,
'client.parse_response(ATR, info=jerr, format="focus")')
'client.parse_response(ATR, info=jerr, sformat="focus")')
#noinspection PyUnusedLocal
def test_parse_access_token_response_missing_attribute():
at = AccessTokenResponse(access_token="SlAV32hkKG",
token_type="Bearer", refresh_token="8xLOxBtZp8",
expire_in=3600)
token_type="Bearer", refresh_token="8xLOxBtZp8",
expire_in=3600)
atdict = at.to_dict()
del atdict["access_token"]
@ -484,10 +491,11 @@ def test_parse_access_token_response_missing_attribute():
atuec = urllib.urlencode(atdict)
try:
client.parse_response(ATR, info=atuec, format='urlencoded')
client.parse_response(ATR, info=atuec, sformat='urlencoded')
except Exception, err:
assert err.__class__.__name__ == "MissingRequiredAttribute"
def test_crypt():
crypt = Crypt("4-amino-1H-pyrimidine-2-one")
ctext = crypt.encrypt("Cytosine")
@ -500,15 +508,16 @@ def test_crypt():
assert plain == 'cytidinetriphosp'
def test_crypt2():
db = {}
csum = hmac.new("secret", digestmod=hashlib.sha224)
csum.update("%s" % time.time())
csum.update("%f" % random.random())
txt = csum.digest() # 28 bytes long, 224 bits
txt = csum.digest() # 28 bytes long, 224 bits
print len(txt)
db[txt] = "foobar"
txt = "%saces" % txt # another 4 bytes
txt = "%saces" % txt # another 4 bytes
#print len(txt), txt
crypt = Crypt("4-amino-1H-pyrimidine-2-one")
ctext = crypt.encrypt(txt)
@ -528,6 +537,7 @@ def test_grant_init():
grant = Grant()
assert grant.grant_expiration_time == 0
def test_grant_resp():
resp = AuthorizationResponse(code="code", state="state")
grant = Grant()
@ -540,7 +550,7 @@ def test_grant_resp():
grant.add_code(resp)
time.sleep(2)
assert grant.is_valid() == False
assert grant.is_valid() is False
grant = Grant.from_code(resp)
assert grant.code == "code"
@ -553,9 +563,10 @@ def test_grant_access_token_1():
grant.add_code(resp)
atr = AccessTokenResponse(access_token="2YotnFZFEjr1zCsicMWpAA",
token_type="example", expires_in=1,
refresh_token="tGzv3JOkF0XG5Qx2TlKWIA",
example_parameter="example_value", xscope=["inner", "outer"])
token_type="example", expires_in=1,
refresh_token="tGzv3JOkF0XG5Qx2TlKWIA",
example_parameter="example_value",
xscope=["inner", "outer"])
token = Token(atr)
grant.tokens.append(token)
@ -579,6 +590,7 @@ def test_grant_access_token_1():
time.sleep(2)
assert token.is_valid() == False
def test_grant_access_token_2():
resp = AuthorizationResponse(code="code", state="state")
grant = Grant()
@ -595,10 +607,11 @@ def test_grant_access_token_2():
assert len(grant.tokens) == 1
time.sleep(2)
token = grant.tokens[0]
assert token.is_valid() == True
assert token.is_valid() is True
assert "%s" % grant != ""
def test_client_get_grant():
cli = Client()
@ -612,31 +625,33 @@ def test_client_get_grant():
assert gr1.code == "code"
def test_client_parse_args():
cli = Client()
args = {
"response_type":"",
"client_id":"client_id",
"redirect_uri":"http://example.com/authz",
"scope":"scope",
"state":"state",
}
"response_type": "",
"client_id": "client_id",
"redirect_uri": "http://example.com/authz",
"scope": "scope",
"state": "state",
}
ar_args = cli._parse_args(AuthorizationRequest, **args)
assert _eq(ar_args.keys(), ['scope', 'state', 'redirect_uri',
'response_type', 'client_id'])
def test_client_parse_extra_args():
cli = Client()
args = {
"response_type":"",
"client_id":"client_id",
"redirect_uri":"http://example.com/authz",
"scope":"scope",
"state":"state",
"response_type": "",
"client_id": "client_id",
"redirect_uri": "http://example.com/authz",
"scope": "scope",
"state": "state",
"extra_session": "home"
}
ar_args = cli._parse_args(AuthorizationRequest, **args)
@ -644,6 +659,7 @@ def test_client_parse_extra_args():
assert _eq(ar_args.keys(), ['state', 'redirect_uri', 'response_type',
'client_id', 'scope', 'extra_session'])
def test_client_endpoint():
cli = Client()
cli.authorization_endpoint = "https://example.org/oauth2/as"
@ -693,15 +709,16 @@ def test_server_parse_parse_authorization_request():
assert areq["redirect_uri"] == "http://foobar.example.com/oaclient"
assert areq["state"] == "cold"
def test_server_parse_jwt_request():
srv = Server()
ar = AuthorizationRequest(response_type=["code"],
client_id="foobar",
redirect_uri="http://foobar.example.com/oaclient",
state="cold")
client_id="foobar",
redirect_uri="http://foobar.example.com/oaclient",
state="cold")
srv.keyjar["foobar"] = KeyBundle({"hmac":"A1B2C3D4"}, usage=["ver", "sig"])
srv.keyjar[""] = KeyBundle({"hmac":"A1B2C3D4"}, usage=["ver", "sig"])
srv.keyjar["foobar"] = KeyBundle({"hmac": "A1B2C3D4"}, usage=["ver", "sig"])
srv.keyjar[""] = KeyBundle({"hmac": "A1B2C3D4"}, usage=["ver", "sig"])
keys = srv.keyjar.get_signing_key(owner="foobar")
_jwt = ar.to_jwt(key=keys, algorithm="HS256")
@ -714,10 +731,11 @@ def test_server_parse_jwt_request():
assert req["redirect_uri"] == "http://foobar.example.com/oaclient"
assert req["state"] == "cold"
def test_server_parse_token_request():
atr = AccessTokenRequest(grant_type="authorization_code",
code="SplxlOBeZQQYbYS6WxSbIA",
redirect_uri="https://client.example.com/cb", extra="foo")
atr = AccessTokenRequest(
grant_type="authorization_code", code="SplxlOBeZQQYbYS6WxSbIA",
redirect_uri="https://client.example.com/cb", extra="foo")
uenc = atr.to_urlencoded()
@ -739,9 +757,10 @@ def test_server_parse_token_request():
assert tr["extra"] == "foo"
def test_server_parse_refresh_token_request():
ratr = RefreshAccessTokenRequest(refresh_token="ababababab",
client_id="Client_id")
client_id="Client_id")
uenc = ratr.to_urlencoded()
@ -753,6 +772,7 @@ def test_server_parse_refresh_token_request():
assert tr["refresh_token"] == "ababababab"
assert tr["client_id"] == "Client_id"
def test_client_secret_post():
client = Client("A")
client.client_secret = "boarding pass"
@ -770,8 +790,8 @@ def test_client_secret_post():
cis = AccessTokenRequest(code="foo", redirect_uri="http://example.com")
request_args = {}
http_args = oauth2.client_secret_post(client, cis, request_args,
http_args={"client_secret": "another"})
http_args = oauth2.client_secret_post(
client, cis, request_args, http_args={"client_secret": "another"})
print cis
assert cis["client_id"] == "A"
@ -779,6 +799,7 @@ def test_client_secret_post():
print http_args
assert http_args == {}
def test_client_secret_basic():
client = Client("A")
client.client_secret = "boarding pass"
@ -789,6 +810,7 @@ def test_client_secret_basic():
assert http_args == {"auth": ("A", "boarding pass")}
def test_bearer_header():
client = Client("A")
client.client_secret = "boarding pass"
@ -801,7 +823,8 @@ def test_bearer_header():
print cis
print http_args
assert http_args == {"headers": {"Authorization":"Bearer Sesame"}}
assert http_args == {"headers": {"Authorization": "Bearer Sesame"}}
def test_bearer_header_with_http_args():
client = Client("A")
@ -817,14 +840,14 @@ def test_bearer_header_with_http_args():
print cis
print http_args
assert _eq(http_args.keys(), ["foo", "headers"])
assert http_args["headers"] == {"Authorization":"Bearer Sesame"}
assert http_args["headers"] == {"Authorization": "Bearer Sesame"}
# -----------------
request_args = {"access_token": "Sesame"}
http_args = oauth2.bearer_header(client, cis, request_args,
http_args={"headers": {"x-foo": "bar"}})
http_args={"headers": {"x-foo": "bar"}})
print cis
print http_args
@ -832,18 +855,20 @@ def test_bearer_header_with_http_args():
assert _eq(http_args["headers"].keys(), ["Authorization", "x-foo"])
assert http_args["headers"]["Authorization"] == "Bearer Sesame"
def test_bearer_header_2():
client = Client("A")
client.client_secret = "boarding pass"
cis = ResourceRequest(access_token= "Sesame")
cis = ResourceRequest(access_token="Sesame")
http_args = oauth2.bearer_header(client, cis)
print cis
assert "access_token" not in cis
print http_args
assert http_args == {"headers": {"Authorization":"Bearer Sesame"}}
assert http_args == {"headers": {"Authorization": "Bearer Sesame"}}
def test_bearer_header_3():
client = Client("A")
@ -902,6 +927,7 @@ def test_bearer_body():
print http_args
assert http_args is None
def test_bearer_body_get_token():
client = Client("A")
client.client_secret = "boarding pass"

View File

@ -113,7 +113,7 @@ class TestOICClient():
def test_parse_authz_resp_url(self):
url = "https://client.example.com/cb?code=SplxlOBeZQQYbYS6WxSbIA&state=ghi"
aresp = self.client.parse_response(AuthorizationResponse,
info=url, format="urlencoded")
info=url, sformat="urlencoded")
assert aresp["code"] == "SplxlOBeZQQYbYS6WxSbIA"
assert aresp["state"] == "ghi"
@ -125,7 +125,7 @@ class TestOICClient():
def test_parse_authz_resp_query(self):
query = "code=SplxlOBeZQQYbYS6WxSbIA&state=hij"
aresp = self.client.parse_response(AuthorizationResponse,
info=query, format="urlencoded")
info=query, sformat="urlencoded")
assert aresp["code"] == "SplxlOBeZQQYbYS6WxSbIA"
assert aresp["state"] == "hij"
@ -138,7 +138,7 @@ class TestOICClient():
def test_parse_authz_resp_query_multi_scope(self):
query = "code=SplxlOBeZQQYbYS6WxAAAA&state=klm"
aresp = self.client.parse_response(AuthorizationResponse,
info=query, format="urlencoded")
info=query, sformat="urlencoded")
assert aresp["code"] == "SplxlOBeZQQYbYS6WxAAAA"
assert aresp["state"] == "klm"
@ -152,7 +152,7 @@ class TestOICClient():
def test_parse_authz_resp_query_unknown_parameter(self):
query = "code=SplxlOBeZQQYbYS6WxSbIA&state=xyz&foo=bar"
aresp = self.client.parse_response(AuthorizationResponse,
info=query, format="urlencoded")
info=query, sformat="urlencoded")
assert aresp["code"] == "SplxlOBeZQQYbYS6WxSbIA"
assert aresp["state"] == "xyz"
@ -271,7 +271,7 @@ class TestOICClient():
#print self.client.response2error[AuthorizationErrorResponse.__name__]
resp = self.client.parse_response(AuthorizationResponse,
info=ruri, format="urlencoded")
info=ruri, sformat="urlencoded")
print type(resp), resp
assert resp.type() == "AuthorizationErrorResponse"
@ -403,7 +403,7 @@ class TestOICClient():
_, query = _loc.split("?")
self.client.parse_response(AuthorizationResponse, info=query,
format="urlencoded")
sformat="urlencoded")
def test_access_token_request(self):
self.client.token_endpoint = "http://oic.example.org/token"
@ -586,13 +586,13 @@ def test_parse_access_token_response():
uec = at.to_urlencoded()
raises(ValueError, 'client.parse_response(ATR, info=uec)')
uatr = client.parse_response(ATR, info=uec, format="urlencoded")
uatr = client.parse_response(ATR, info=uec, sformat="urlencoded")
assert _eq(uatr.keys(), ['access_token', 'token_type', 'expires_in',
'refresh_token'])
huec = "%s?%s" % ("https://example.com/token", uec)
uatr = client.parse_response(ATR, info=huec, format="urlencoded")
uatr = client.parse_response(ATR, info=huec, sformat="urlencoded")
assert _eq(uatr.keys(), ['access_token', 'token_type', 'expires_in',
'refresh_token'])
@ -604,15 +604,15 @@ def test_parse_access_token_response():
uerr = err.to_urlencoded()
_ = client.parse_response(ATR, info=jerr)
_ = client.parse_response(ATR, info=uerr, format="urlencoded")
_ = client.parse_response(ATR, info=uerr, sformat="urlencoded")
raises(Exception,
'client.parse_response(ATR, info=jerr, format="urlencoded")')
'client.parse_response(ATR, info=jerr, sformat="urlencoded")')
raises(Exception, "client.parse_response(ATR, info=uerr)")
raises(Exception,
'client.parse_response(ATR, info=jerr, format="focus")')
'client.parse_response(ATR, info=jerr, sformat="focus")')
#noinspection PyUnusedLocal
def test_parse_access_token_response_missing_attribute():
@ -631,7 +631,7 @@ def test_parse_access_token_response_missing_attribute():
atuec = urllib.urlencode(atdict)
raises(MissingRequiredAttribute,
"client.parse_response(ATR, info=atuec, format='urlencoded')")
"client.parse_response(ATR, info=atuec, sformat='urlencoded')")
def test_client_get_grant():
@ -904,7 +904,7 @@ def test_userinfo_request():
info = ARESP.to_urlencoded()
cli.parse_response(AuthorizationResponse, info,
format="urlencoded", state="state0")
sformat="urlencoded", state="state0")
cli.parse_response(AccessTokenResponse, TRESP.to_json(), state="state0")
@ -976,7 +976,7 @@ CLAIM = Claims(name={"essential": True}, nickname=None,
email={"essential": True},
email_verified={"essential": True}, picture=None)
USRINFO = UserInfoClaim(claims=CLAIM, format="signed")
USRINFO = UserInfoClaim(claims=CLAIM, sformat="signed")
OIDREQ = OpenIDRequest(response_type=["code", "id_token"], client_id=CLIENT_ID,
redirect_uri="https://client.example.com/cb",

View File

@ -239,7 +239,7 @@ class TestOICConsumer():
#vkeys = {".": self.consumer.keyjar.get_verify_key()}
self.consumer.parse_response(AuthorizationResponse, info=query,
format="urlencoded")
sformat="urlencoded")
resp = self.consumer.complete()
print resp
@ -331,7 +331,7 @@ def test_complete_secret_auth():
_, query = result.headers["location"].split("?")
consumer.parse_response(AuthorizationResponse, info=query,
format="urlencoded")
sformat="urlencoded")
resp = consumer.complete()
print resp
@ -454,7 +454,7 @@ def test_userinfo():
_, query = result.headers["location"].split("?")
consumer.parse_response(AuthorizationResponse, info=query,
format="urlencoded")
sformat="urlencoded")
consumer.complete()

View File

@ -50,7 +50,7 @@ def test_ProviderConfigurationResponse():
def test_idtokenclaim_deser():
claims = Claims(weather={"acr": "2"})
pre = IDTokenClaim(claims=claims, max_age=3600)
idt = idtokenclaim_deser(pre.to_json(), format="json")
idt = idtokenclaim_deser(pre.to_json(), sformat="json")
assert _eq(idt.keys(), ['claims', "max_age"])
@ -61,7 +61,7 @@ def test_userinfo_deser():
pre_uic = UserInfoClaim(claims=CLAIM, format="signed")
uic = userinfo_deser(pre_uic.to_json(), format="json")
uic = userinfo_deser(pre_uic.to_json(), sformat="json")
assert _eq(uic.keys(), ["claims", "format"])
def test_claims_deser():
@ -69,11 +69,11 @@ def test_claims_deser():
email={"essential": True},
email_verified={"essential": True}, picture=None)
claims = claims_deser(pre.to_json(), format="json")
claims = claims_deser(pre.to_json(), sformat="json")
assert _eq(claims.keys(), ['name', 'nickname', 'email', 'email_verified',
'picture'])
claims = claims_deser(pre.to_dict(), format="dict")
claims = claims_deser(pre.to_dict(), sformat="dict")
assert _eq(claims.keys(), ['name', 'nickname', 'email', 'email_verified',
'picture'])
@ -81,10 +81,10 @@ def test_address_deser():
pre = AddressClaim(street_address="Kasamark 114", locality="Umea",
country="Sweden")
adc = address_deser(pre.to_json(), format="json")
adc = address_deser(pre.to_json(), sformat="json")
assert _eq(adc.keys(), ['street_address', 'locality', 'country'])
adc = address_deser(pre.to_dict(), format="json")
adc = address_deser(pre.to_dict(), sformat="json")
assert _eq(adc.keys(), ['street_address', 'locality', 'country'])
@ -130,12 +130,12 @@ CLAIMS = Claims(name={"essential": True}, nickname=None,
email_verified={"essential": True}, picture=None)
def test_claims_ser_json():
claims = claims_deser(claims_ser(CLAIMS, "json"), format="json")
claims = claims_deser(claims_ser(CLAIMS, "json"), sformat="json")
assert _eq(claims.keys(), ['name', 'nickname', 'email', 'email_verified',
'picture'])
def test_claims_ser_urlencoded():
claims = claims_deser(claims_ser(CLAIMS, "urlencoded"), format="urlencoded")
claims = claims_deser(claims_ser(CLAIMS, "urlencoded"), sformat="urlencoded")
assert _eq(claims.keys(), ['name', 'nickname', 'email', 'email_verified',
'picture'])

View File

@ -829,7 +829,8 @@ def test_registered_redirect_uri_without_query_component():
client_id=provider.cdb.keys()[0])
resp = provider._verify_redirect_uri(areq)
print resp
if resp:
print resp.message
assert resp is None
def test_registered_redirect_uri_with_query_component():

View File

@ -1,3 +1,5 @@
from oic.utils.time_util import utc_time_sans_frac
__author__ = 'rohe0002'
import time
@ -11,35 +13,39 @@ from oic.oic.message import OpenIDRequest
#from oic.oauth2 import message
AREQ = AuthorizationRequest(response_type="code",
client_id="client1", redirect_uri="http://example.com/authz",
scope=["openid"], state="state000")
AREQ = AuthorizationRequest(response_type="code", client_id="client1",
redirect_uri="http://example.com/authz",
scope=["openid"], state="state000")
AREQN = AuthorizationRequest(response_type="code",
client_id="client1", redirect_uri="http://example.com/authz",
scope=["openid"], state="state000", nonce="something")
AREQN = AuthorizationRequest(response_type="code", client_id="client1",
redirect_uri="http://example.com/authz",
scope=["openid"], state="state000",
nonce="something")
OIDR = OpenIDRequest(response_type="code", client_id="client1",
redirect_uri="http://example.com/authz", scope=["openid"],
state="state000")
redirect_uri="http://example.com/authz", scope=["openid"],
state="state000")
OAUTH2_AREQ = AuthorizationRequest(response_type="code",
client_id="client1",
redirect_uri="http://example.com/authz",
scope=["openid"], state="state000")
client_id="client1",
redirect_uri="http://example.com/authz",
scope=["openid"], state="state000")
def _eq(l1, l2):
return set(l1) == set(l2)
def test_token():
sdb = SessionDB()
sid = sdb.token.key(areq=AREQ)
assert len(sid) == 28
sdb = SessionDB({"a":"b"})
sdb = SessionDB({"a": "b"})
sid = sdb.token.key(areq=AREQ)
assert len(sid) == 28
def test_new_token():
sdb = SessionDB()
sid = sdb.token.key(areq=AREQ)
@ -48,13 +54,14 @@ def test_new_token():
code2 = sdb.token('T', sid=sid)
assert len(sid) == 28
code3 = sdb.token(type="", prev=code2)
code3 = sdb.token(ttype="", prev=code2)
assert code2 != code3
sid2 = sdb.token.key(areq=AREQ, user="jones")
assert len(sid2) == 28
assert sid != sid2
def test_type_and_key():
sdb = SessionDB()
sid = sdb.token.key(areq=AREQ)
@ -65,21 +72,23 @@ def test_type_and_key():
assert part[0] == "A"
assert part[1] == sid
def test_setitem():
sdb = SessionDB()
sid = sdb.token.key(areq=AREQ)
code = sdb.token(sid=sid)
sdb[sid] = {"indo":"china"}
sdb[sid] = {"indo": "china"}
info = sdb[sid]
assert info == {"indo":"china"}
assert info == {"indo": "china"}
info = sdb[code]
assert info == {"indo":"china"}
assert info == {"indo": "china"}
raises(KeyError, 'sdb["abcdefghijklmnop"]')
def test_update():
sdb = SessionDB()
sid = sdb.token.key(areq=AREQ)
@ -88,7 +97,7 @@ def test_update():
raises(KeyError, 'sdb.update(sid, "indo", "nebue")')
raises(KeyError, 'sdb.update(code, "indo", "nebue")')
sdb[sid] = {"indo":"china"}
sdb[sid] = {"indo": "china"}
sdb.update(sid, "indo", "nebue")
sdb.update(code, "indo", "second")
@ -100,6 +109,7 @@ def test_update():
raises(KeyError, 'sdb.update(sid2, "indo", "bar")')
def test_create_authz_session():
sdb = SessionDB()
sid = sdb.create_authz_session("user_id", AREQ)
@ -134,6 +144,7 @@ def test_create_authz_session():
assert "id_token" not in info
assert "oidreq" in info
def test_create_authz_session_with_sector_id():
sdb = SessionDB(seed="foo")
uid = "user_id"
@ -154,6 +165,7 @@ def test_create_authz_session_with_sector_id():
assert info_2["sub"] != "user_id"
assert info_2["sub"] != user_id1
def test_update_to_token():
sdb = SessionDB()
sid = sdb.create_authz_session("user_id", AREQ)
@ -161,12 +173,12 @@ def test_update_to_token():
_dict = sdb.update_to_token(grant)
print _dict.keys()
assert _eq(_dict.keys(), ['code', 'oauth_state', 'issued', 'expires_at',
'token_type', 'client_id', 'authzreq',
'refresh_token', 'local_sub', 'sub',
'access_token', 'expires_in', 'state',
'redirect_uri', 'code_used', 'scope',
'access_token_scope', 'revoked'])
assert _eq(_dict.keys(), ['code', 'authzreq', 'issued', 'expires_at',
'token_type', 'local_sub', 'client_id',
'issued_at', 'oauth_state', 'refresh_token',
'revoked', 'sub', 'access_token', 'expires_in',
'state', 'redirect_uri', 'code_used', 'scope',
'access_token_scope'])
raises(Exception, 'sdb.update_to_token(grant)')
@ -178,16 +190,17 @@ def test_update_to_token():
_dict = sdb.update_to_token(grant, id_token="id_token", oidreq=OIDR)
print _dict.keys()
assert _eq(_dict.keys(), ['code', 'oauth_state', 'issued', 'expires_at',
'token_type', 'client_id', 'authzreq',
'refresh_token', 'local_sub', 'sub',
'oidreq', 'access_token', 'expires_in', 'state',
'redirect_uri', 'code_used', 'id_token',
'scope', 'access_token_scope', 'revoked'])
assert _eq(_dict.keys(), ['code', 'authzreq', 'issued', 'expires_at',
'token_type', 'local_sub', 'client_id',
'issued_at', 'oauth_state', 'refresh_token',
'revoked', 'sub', 'oidreq', 'access_token',
'expires_in', 'state', 'redirect_uri',
'code_used', 'id_token', 'scope',
'access_token_scope'])
assert _dict["id_token"] == "id_token"
assert _dict["oidreq"].type() == "OpenIDRequest"
token = _dict["access_token"]
_ = _dict["access_token"]
raises(Exception, 'sdb.update_to_token(token)')
@ -217,7 +230,7 @@ def test_is_valid():
assert sdb.is_valid(grant)
_dict = sdb.update_to_token(grant)
assert sdb.is_valid(grant) == False
assert sdb.is_valid(grant) is False
token1 = _dict["access_token"]
assert sdb.is_valid(token1)
@ -231,24 +244,25 @@ def test_is_valid():
# replace refresh_token
dict2["refresh_token"] = token2
assert sdb.is_valid(rtoken) == False
assert sdb.is_valid(rtoken) is False
# mess with the time-line
dict2["issued"] = time.time() - 86400 # like yesterday
assert sdb.is_valid(token2) == False
dict2["expires_at"] = utc_time_sans_frac() - 86400 # like yesterday
assert sdb.is_valid(token2) is False
# replace access_token
dict2["access_token"] = token1
assert sdb.is_valid(token2) == False
assert sdb.is_valid(token2) is False
sid = sdb.create_authz_session("another:user", AREQ)
grant = sdb[sid]["code"]
gdict = sdb[grant]
gdict["issued"] = time.time() - 86400 # like yesterday
assert sdb.is_valid(grant) == False
gdict["expires_at"] = utc_time_sans_frac() - 86400 # like yesterday
assert sdb.is_valid(grant) is False
def test_revoke_token():
sdb = SessionDB()
@ -263,14 +277,14 @@ def test_revoke_token():
assert sdb.is_valid(token)
sdb.revoke_token(token)
assert sdb.is_valid(token) == False
assert sdb.is_valid(token) is False
dict2 = sdb.refresh_token(rtoken)
token = dict2["access_token"]
assert sdb.is_valid(token)
sdb.revoke_token(rtoken)
assert sdb.is_valid(rtoken) == False
assert sdb.is_valid(rtoken) is False
raises(ExpiredToken, 'sdb.refresh_token(rtoken)')
@ -283,4 +297,4 @@ def test_revoke_token():
grant = sdb[sid]["code"]
sdb.revoke_token(grant)
assert sdb.is_valid(grant) == False
assert sdb.is_valid(grant) is False

View File

@ -7,38 +7,43 @@ from oic.utils.time_util import *
from pytest import raises
def test_f_quotient():
assert f_quotient(-1,3) == -1
assert f_quotient(0,3) == 0
assert f_quotient(1,3) == 0
assert f_quotient(2,3) == 0
assert f_quotient(3,3) == 1
assert f_quotient(3.123,3) == 1
assert f_quotient(-1, 3) == -1
assert f_quotient(0, 3) == 0
assert f_quotient(1, 3) == 0
assert f_quotient(2, 3) == 0
assert f_quotient(3, 3) == 1
assert f_quotient(3.123, 3) == 1
def test_modulo():
assert modulo(-1,3) == 2
assert modulo(0,3) == 0
assert modulo(1,3) == 1
assert modulo(2,3) == 2
assert modulo(3,3) == 0
assert modulo(-1, 3) == 2
assert modulo(0, 3) == 0
assert modulo(1, 3) == 1
assert modulo(2, 3) == 2
assert modulo(3, 3) == 0
x = 3.123
assert modulo(3.123,3) == x - 3
assert modulo(3.123, 3) == x - 3
def test_f_quotient_2():
assert f_quotient(0, 1, 13) == -1
for i in range(1,13):
for i in range(1, 13):
assert f_quotient(i, 1, 13) == 0
assert f_quotient(13, 1, 13) == 1
assert f_quotient(13.123, 1, 13) == 1
def test_modulo_2():
assert modulo(0, 1, 13) == 12
for i in range(1,13):
for i in range(1, 13):
assert modulo(i, 1, 13) == i
assert modulo(13, 1, 13) == 1
#x = 0.123
#assert modulo(13+x, 1, 13) == 1+x
def test_parse_duration():
(sign, d) = parse_duration("P1Y3M5DT7H10M3.3S")
assert sign == "+"
@ -49,6 +54,7 @@ def test_parse_duration():
assert d['tm_year'] == 1
assert d['tm_min'] == 10
def test_add_duration_1():
#2000-01-12T12:13:14Z P1Y3M5DT7H10M3S 2001-04-17T19:23:17Z
t = add_duration(str_to_time("2000-01-12T12:13:14Z"), "P1Y3M5DT7H10M3S")
@ -59,9 +65,10 @@ def test_add_duration_1():
assert t.tm_min == 23
assert t.tm_sec == 17
def test_add_duration_2():
#2000-01-12 PT33H 2000-01-13
t = add_duration(str_to_time("2000-01-12T00:00:00Z"),"PT33H")
t = add_duration(str_to_time("2000-01-12T00:00:00Z"), "PT33H")
assert t.tm_year == 2000
assert t.tm_mon == 1
assert t.tm_mday == 14
@ -69,59 +76,59 @@ def test_add_duration_2():
assert t.tm_min == 0
assert t.tm_sec == 0
def test_str_to_time():
def test_str_to_time():
t = calendar.timegm(str_to_time("2000-01-12T00:00:00Z"))
#TODO: Find all instances of time.mktime(.....)
#t = time.mktime(str_to_time("2000-01-12T00:00:00Z"))
#assert t == 947631600.0
#TODO: add something to show how this time was arrived at
# do this as an external method in the
assert t == 947635200
def test_instant():
inst = str_to_time(instant())
now = time.gmtime()
assert now >= inst
def test_valid():
assert valid("2000-01-12T00:00:00Z") == False
assert valid("2000-01-12T00:00:00Z") is False
current_year = datetime.today().year
assert valid("%d-01-12T00:00:00Z" % (current_year + 1)) == True
assert valid("%d-01-12T00:00:00Z" % (current_year + 1)) is True
this_instance = instant()
time.sleep(1)
assert valid(this_instance) == False # unless on a very fast machine :-)
assert valid(this_instance) is False # unless on a very fast machine :-)
soon = in_a_while(seconds=10)
assert valid(soon) == True
assert valid(soon) is True
def test_timeout():
soon = in_a_while(seconds=1, format="")
soon = in_a_while(seconds=1, time_format="")
time.sleep(2)
assert valid(soon) == False
assert valid(soon) is False
def test_before():
current_year = datetime.today().year
assert before("%d-01-01T00:00:00Z" % current_year) == True
assert before("%d-01-01T00:00:00Z" % (current_year + 1)) == False
assert before("%d-01-01T00:00:00Z" % current_year) is False
assert before("%d-01-01T00:00:00Z" % (current_year + 1)) is True
def test_after():
current_year = datetime.today().year
assert after("%d-01-01T00:00:00Z" % (current_year + 1)) == True
assert after("%d-01-01T00:00:00Z" % current_year) == False
assert after("%d-01-01T00:00:00Z" % (current_year + 1)) is False
assert after("%d-01-01T00:00:00Z" % current_year) is True
def test_not_before():
current_year = datetime.today().year
assert not_before("%d-01-01T00:00:00Z" % (current_year + 1)) == True
assert not_before("%d-01-01T00:00:00Z" % current_year) == False
assert not_before("%d-01-01T00:00:00Z" % (current_year + 1)) is False
assert not_before("%d-01-01T00:00:00Z" % current_year) is True
def test_not_on_or_after():
current_year = datetime.today().year
assert not_on_or_after("%d-01-01T00:00:00Z" % (current_year + 1)) == True
assert not_on_or_after("%d-01-01T00:00:00Z" % current_year) == False
assert not_on_or_after("%d-01-01T00:00:00Z" % (current_year + 1)) is True
assert not_on_or_after("%d-01-01T00:00:00Z" % current_year) is False
def test_parse_duration_1():
(sign, d) = parse_duration("-P1Y3M5DT7H10M3.3S")
@ -133,6 +140,7 @@ def test_parse_duration_1():
assert d['tm_year'] == 1
assert d['tm_min'] == 10
def test_parse_duration_2():
raises(Exception, 'parse_duration("-P1Y-3M5DT7H10M3.3S")')
raises(Exception, 'parse_duration("-P1Y3M5DU7H10M3.3S")')
@ -141,71 +149,83 @@ def test_parse_duration_2():
raises(Exception, 'parse_duration("-P1Y3M5DT7H10MxS")')
raises(Exception, 'parse_duration("-P1Y4M4DT7H10.5M3S")')
def test_add_duration_2():
#2000-01-12 PT33H 2000-01-13
t = add_duration(str_to_time("2000-01-12T00:00:00Z"),"P32D")
assert t.tm_year==2000
assert t.tm_mon==2
assert t.tm_mday==12
assert t.tm_hour==0
assert t.tm_min==0
assert t.tm_sec==0
assert t.tm_wday==5
assert t.tm_wday==5
assert t.tm_yday==43
assert t.tm_isdst==0
t = add_duration(str_to_time("2000-01-12T00:00:00Z"), "P32D")
assert t.tm_year == 2000
assert t.tm_mon == 2
assert t.tm_mday == 12
assert t.tm_hour == 0
assert t.tm_min == 0
assert t.tm_sec == 0
assert t.tm_wday == 5
assert t.tm_wday == 5
assert t.tm_yday == 43
assert t.tm_isdst == 0
def test_add_duration_3():
#2000-01-12 PT33H 2000-01-13
t = add_duration(str_to_time("2000-01-12T00:00:00Z"),"-P32D")
t = add_duration(str_to_time("2000-01-12T00:00:00Z"), "-P32D")
assert t is None
def test_time_a_while_ago():
dt = datetime.utcnow()
t = time_a_while_ago(seconds=10)
delta = dt - t # slightly less than 10
delta = dt - t # slightly less than 10
assert delta.seconds == 9
assert delta.microseconds > 0
def test_a_while_ago():
dt = time.mktime(time.gmtime())
str = a_while_ago(seconds=10)
t = time.mktime(str_to_time(str))
delta = dt - t # slightly less than 10
then = a_while_ago(seconds=10)
t = time.mktime(str_to_time(then))
delta = dt - t # slightly less than 10
print delta
assert delta == 10
def test_shift_time():
dt = datetime.utcnow()
t = shift_time(dt, 10)
delta = t - dt # exactly 10
delta = t - dt # exactly 10
assert delta.seconds == 10
def test_str_to_time_str_error():
raises(Exception, 'str_to_time("2000-01-12T00:00:00ZABC")')
def test_str_to_time_1():
t = str_to_time("")
assert t == 0
def test_utc_now():
t1 = utc_now()
t2 = int("%d" % time.time())
assert t1 == t2
def test_before_0():
assert before("")
assert before(0)
def test_before_int():
n = int(time.time())
assert before(n-1)
assert before(n+1) is False
now_local = int(time.time())
assert before(now_local - 1) is False
assert before(now_local + 2)
def test_later_than_int():
t = int(time.time())
assert later_than(t, t-1)
assert later_than(t-1, t) is False
now_local = int(time.time())
assert later_than(now_local, now_local - 1)
assert later_than(now_local - 1, now_local) is False
def test_later_than_str():
a = in_a_while(seconds=10)