Added a test for signed userinfo

This commit is contained in:
Roland Hedberg 2014-12-15 09:52:14 +01:00
parent 863b4a35e3
commit ec3af7261d
2 changed files with 80 additions and 20 deletions

View File

@ -1,18 +1,20 @@
#!/usr/bin/env python
from urlparse import parse_qs
from jwkest.jws import alg2keytype
from oic.oauth2.message import by_schema
from oic.utils.webfinger import WebFinger
__author__ = 'rohe0002'
from oic.oauth2 import rndstr
from oic.oauth2.message import by_schema
from oic.oic import Server
from oic.oic.message import *
from oic.utils.sdb import SessionDB, AuthnEvent
from oic.utils.time_util import utc_time_sans_frac
from oic.utils.webfinger import WebFinger
from oic.oic import Server
from oic.oic.message import *
from oic.oauth2 import rndstr
__author__ = 'rohe0002'
class Response():
def __init__(self, base=None):
@ -27,6 +29,7 @@ class Response():
def __getitem__(self, item):
return getattr(self, item)
ENDPOINT = {
"authorization_endpoint": "/authorization",
"token_endpoint": "/token",
@ -49,8 +52,9 @@ class MyFakeOICServer(Server):
self.registration_expires_in = 3600
self.host = ""
self.webfinger = WebFinger()
self.userinfo_signed_response_alg = ""
#noinspection PyUnusedLocal
# noinspection PyUnusedLocal
def http_request(self, path, method="GET", **kwargs):
part = urlparse(path)
path = part[2]
@ -98,7 +102,7 @@ class MyFakeOICServer(Server):
req = self.parse_authorization_request(query=query)
aevent = AuthnEvent("user", authn_info="acr")
sid = self.sdb.create_authz_session(aevent, areq=req)
sub = self.sdb.do_sub(sid)
_ = self.sdb.do_sub(sid)
_info = self.sdb[sid]
if "code" in req["response_type"]:
@ -109,7 +113,7 @@ class MyFakeOICServer(Server):
_dict = by_schema(AuthorizationResponse(), **_dict)
resp = AuthorizationResponse(**_dict)
#resp.code = grant
# resp.code = grant
else:
_state = req["state"]
resp = AuthorizationResponse(state=_state,
@ -121,7 +125,7 @@ class MyFakeOICServer(Server):
_dict = dict([(k, v) for k, v in
self.sdb.upgrade_to_token(grant).items() if k in
params])
params])
try:
del _dict["refresh_token"]
except KeyError:
@ -175,8 +179,15 @@ class MyFakeOICServer(Server):
resp = OpenIDSchema(**_info)
response = Response()
response.headers = {"content-type": "application/json"}
response.text = resp.to_json()
if self.userinfo_signed_response_alg:
alg = self.userinfo_signed_response_alg
response.headers = {"content-type": "application/jwt"}
key = self.keyjar.get_signing_key(alg2keytype(alg), "", alg=alg)
response.text = resp.to_jwt(key, alg)
else:
response.headers = {"content-type": "application/json"}
response.text = resp.to_json()
return response
@ -237,7 +248,7 @@ class MyFakeOICServer(Server):
response.headers = {"content-type": "application/json"}
return response
#noinspection PyUnusedLocal
# noinspection PyUnusedLocal
def refresh_session_endpoint(self, query):
try:
req = self.parse_refresh_session_request(query=query)
@ -269,8 +280,9 @@ class MyFakeOICServer(Server):
response.text = ""
return response
#noinspection PyUnusedLocal
def add_credentials(self, user, passwd):
# noinspection PyUnusedLocal
@staticmethod
def add_credentials(user, passwd):
return
def openid_conf(self):

View File

@ -200,7 +200,7 @@ class TestOICConsumer():
srv.keyjar = SRVKEYS
sid, location = self.consumer.begin("openid", "code",
path="http://localhost:8087")
path="http://localhost:8087")
print location
#vkeys = {".":srv.keyjar.get_verify_key()}
authreq = srv.parse_authorization_request(url=location)
@ -466,6 +466,54 @@ def test_userinfo():
assert _eq(result.keys(), ['name', 'email', 'verified', 'nickname', 'sub'])
def test_sign_userinfo():
consumer = Consumer(SessionDB(SERVER_INFO["issuer"]), CONFIG,
CLIENT_CONFIG, SERVER_INFO)
consumer.keyjar = CLIKEYS
mfos = MyFakeOICServer("http://localhost:8088")
mfos.keyjar = SRVKEYS
mfos.userinfo_signed_response_alg = "RS256"
consumer.http_request = mfos.http_request
consumer.redirect_uris = ["http://example.com/authz"]
_state = "state0"
consumer.nonce = rndstr()
consumer.secret_type = "basic"
consumer.set_client_secret("hemligt")
consumer.keyjar = CLIKEYS
consumer.client_prefs = {"userinfo_signed_response_alg": "RS256"}
consumer.provider_info = {"http://localhost:8088": {
"userinfo_endpoint": "http://localhost:8088/userinfo"
}}
del consumer.config["request_method"]
args = {
"client_id": consumer.client_id,
"response_type": "code",
"scope": ["openid"],
}
sid, location = consumer.begin("openid", "code")
print location
result = consumer.do_authorization_request(state=_state,
request_args=args)
assert result.status_code == 302
assert result.headers["location"].startswith(consumer.redirect_uris[0])
_, query = result.headers["location"].split("?")
consumer.parse_response(AuthorizationResponse, info=query,
sformat="urlencoded")
consumer.complete(_state)
result = consumer.get_user_info(_state)
print result
assert result.type() == "OpenIDSchema"
assert _eq(result.keys(), ['name', 'email', 'verified', 'nickname', 'sub'])
def real_test_discover():
c = Consumer(None, None)
@ -561,7 +609,7 @@ def test_client_register():
if __name__ == "__main__":
t = TestOICConsumer()
t.setup_class()
t.test_complete()
#test_provider_config()
# t = TestOICConsumer()
# t.setup_class()
# t.test_complete()
test_sign_userinfo()