Refactored, removed test scripts to an own package.

This commit is contained in:
Roland Hedberg 2012-01-25 10:18:27 +01:00
parent f4290dc434
commit 3fb4b7ed37
19 changed files with 3 additions and 1964 deletions

View File

@ -1 +0,0 @@
__author__ = 'rohe0002'

View File

@ -1 +0,0 @@
oauth2_client.py

View File

@ -1,54 +0,0 @@
__author__ = 'rohe0002'
from oic.oauth2.message import Base
from oic.oauth2.message import SINGLE_REQUIRED_STRING
from oic.oauth2.message import SINGLE_OPTIONAL_STRING
from oic.oauth2.message import SINGLE_OPTIONAL_INT
from oic.oauth2.message import OPTIONAL_LIST_OF_STRINGS
class AccessTokenRequest(Base):
c_attributes = Base.c_attributes.copy()
c_attributes["code"] = SINGLE_REQUIRED_STRING
c_attributes["redirect_uri"] = SINGLE_REQUIRED_STRING
c_attributes["client_id"] = SINGLE_REQUIRED_STRING
c_attributes["client_secret"] = SINGLE_REQUIRED_STRING
#c_attributes["grant_type"] = SINGLE_REQUIRED_STRING
def __init__(self,
code=None,
redirect_uri=None,
client_id=None,
client_secret=None,
**kwargs):
Base.__init__(self, **kwargs)
self.code = code
self.redirect_uri = redirect_uri
self.client_id = client_id
self.client_secret = client_secret
class AccessTokenResponse(Base):
c_attributes = Base.c_attributes.copy()
c_attributes["access_token"] = SINGLE_REQUIRED_STRING
#c_attributes["token_type"] = SINGLE_REQUIRED_STRING
c_attributes["expires"] = SINGLE_OPTIONAL_INT
c_attributes["refresh_token"] = SINGLE_OPTIONAL_STRING
c_attributes["scope"] = OPTIONAL_LIST_OF_STRINGS
# Only for implicit flow
c_attributes["state"] = SINGLE_OPTIONAL_STRING
def __init__(self,
access_token=None,
token_type=None,
expires_in=None,
refresh_token=None,
scope=None,
state=None,
**kwargs):
Base.__init__(self, **kwargs)
self.access_token = access_token
self.token_type = token_type
self.expires_in = expires_in
self.refresh_token = refresh_token
self.scope = scope or []
self.state = state

View File

@ -1,12 +0,0 @@
#!/usr/bin/env python
__author__ = 'rohe0002'
from oic.script import OAuth2
from oic.script import oauth2_operations
from oic.oauth2 import Client
from oic.oauth2 import message
cli = OAuth2(oauth2_operations, message, Client)
cli.run()

View File

@ -1 +0,0 @@
__author__ = 'rohe0002'

View File

@ -1 +0,0 @@
oic_client.py

View File

@ -1,30 +0,0 @@
#!/usr/bin/env python
import json
info = {
"client": {
"redirect_uri": ["https://smultron.catalogix.se/authz_cb"],
"contact": ["roland.hedberg@adm.umu.se"],
"application_type": "web",
"application_name": "OIC test tool",
"register":True,
},
"provider": {
"version": { "oauth": "2.0", "openid": "3.0"},
"conf_url": "https://connect.openid4.us",
},
#"basic-code-authn"
#"basic-code-idtoken",
#"basic-code-idtoken-userdata"
#"basic-code-idtoken-check_id"
"interaction": {
#"OpenIDRequest": {"request": {"response_type": "token"}},
"https://connect.openid4.us/abop/op.php/auth": ["login_form", None],
"https://connect.openid4.us/abop/op.php/login": ["select_form",
{"_form_pick_": ("control", "persona", "Default")}]
}
}
print json.dumps(info)

View File

@ -1,28 +0,0 @@
#!/usr/bin/env python
import json
info = {
"client": {
"redirect_uri": ["https://smultron.catalogix.se/authz_cb"],
"contact": ["roland.hedberg@adm.umu.se"],
"application_type": "web",
"application_name": "OIC test tool",
"register":True,
},
"provider": {
"version": { "oauth": "2.0", "openid": "3.0"},
"conf_url": "https://connect-op.heroku.com",
},
"interaction": {
#"OpenIDRequest": {"request": {"response_type": "token"}},
"https://connect-op.heroku.com/": ["select_form",
{"_form_pick_": {"action": "/connect/fake"}}],
"https://connect-op.heroku.com/authorizations/new": ["select_form",
{"_form_pick_": {"action": "/authorizations",
"class": "approve"}}]
}
}
print json.dumps(info)

View File

@ -1,13 +0,0 @@
#!/usr/bin/env python
__author__ = 'rohe0002'
from oic.script import OIC
from oic.script import oic_operations
from oic.oic import Client
from oic.oic.consumer import Consumer
from oic.oic import message
cli = OIC(oic_operations, message, Client, Consumer)
cli.run()

View File

@ -1,10 +0,0 @@
setup={"version":"3.0",
"issuer":BASE,
"authorization_endpoint":BASE+"authorization",
"token_endpoint":BASE+"token",
"user_info_endpoint":BASE+"userinfo",
"registration_endpoint":BASE+"register",
"scopes_supported":["openid","profile","email","address","PPID"],
"flows_supported":["code","token","code id_token","token id_token"],
"identifiers_supported":["public","ppid"]}

View File

@ -1,30 +0,0 @@
#!/usr/bin/env python
import json
import rp
info = {
"config": {
"redirect_uri": ["https://smultron.catalogix.se/authz_cb"],
"password":"hemligt",
"client_id": "client0",
"contact": ["roland.hedberg@adm.umu.se"],
"application_type": "web",
"application_name": "OIC test tool"
},
"provider_conf_url": "https://openidconnect.info",
"phases": {
"login":(["AUTHZREQ_CODE", "CHOSE", "APPROVE_FORM"], "AUTHZRESP"),
"access-token-request":("ACCESS_TOKEN_REQUEST_CLI_SECRET",
"ACCESS_TOKEN_RESPONSE"),
"user-info-request":("USER_INFO_REQUEST", "USER_INFO_RESPONSE"),
"check-id-request":("CHECK_ID_REQUEST", "CHECK_ID_RESPONSE")},
"flows": [
["login", "access-token-request", "check-id-request"]
],
"register":True
}
rp.make_sequence(info)
print json.dumps(info)

View File

@ -1,395 +0,0 @@
#!/usr/bin/env python
from oic.script import httplib2cookie
from script.oic import operations
__author__ = 'rohe0002'
from importlib import import_module
from httplib2 import Http
from oic.oic import Client
from oic.oic import message
from oic.oic.consumer import Consumer
from oic.oauth2.message import ErrorResponse
QUERY2RESPONSE = {
"AuthorizationRequest": "AuthorizationResponse",
"AccessTokenRequest": "AccessTokenResponse",
"UserInfoRequest": "OpenIDSchema",
"RegistrationRequest": "RegistrationResponse"
}
def make_sequence(info):
sequences = []
for flows in info["flows"]:
sequence = []
for flow in flows:
(items, resp) = info["phases"][flow]
if isinstance(items, basestring):
seq = [getattr(operations, items.strip())]
else:
seq = [getattr(operations, item.strip()) for item in items]
resp = getattr(operations, resp.strip())
for _se in seq:
try:
_se["function"] = _se["function"].__name__
except KeyError:
pass
sequence.append((seq, resp))
sequences.append(sequence)
info["sequences"] = sequences
del info["flows"]
del info["phases"]
return info
class Trace(object):
def __init__(self):
self.trace = []
def request(self, msg):
self.trace.append("--> %s" % msg)
def reply(self, msg):
self.trace.append("<-- %s" % msg)
def info(self, msg):
self.trace.append("%s" % msg)
def error(self, msg):
self.trace.append("[ERROR] %s" % msg)
def warning(self, msg):
self.trace.append("[WARNING] %s" % msg)
def __str__(self):
return "\n". join([t.encode("utf-8") for t in self.trace])
def clear(self):
self.trace = []
def do_request(client, url, method, body="", headers=None, trace=False):
if headers is None:
headers = {}
if trace:
trace.request("URL: %s" % url)
trace.request("BODY: %s" % body)
response, content = client.http_request(url, method=method,
body=body, headers=headers,
trace=trace)
if trace:
trace.reply("RESPONSE: %s" % response)
trace.reply("CONTENT: %s" % unicode(content, encoding="utf-8"))
return response, content
#noinspection PyUnusedLocal
def do_operation(client, opdef, response=None, content=None, trace=None):
op = opdef
qresp = None
if "request" in op:
cls = getattr(message, op["request"])
try:
kwargs = op["args"]["kw"]
except KeyError:
kwargs = {}
try:
kwargs["request_args"] = op["args"]["request"]
_req = kwargs["request_args"]
except KeyError:
_req = {}
try:
kwargs["extra_args"] = op["args"]["extra"]
except KeyError:
pass
cis = getattr(client, "construct_%s" % cls.__name__)(cls, **kwargs)
ht_add = None
if "token_placement" in kwargs:
if kwargs["token_placement"] == "header":
ht_add = {"Authorization": "Bearer %s" % cis.access_token}
cis.access_token = None
url, body, ht_args, cis = client.uri_and_body(cls, cis,
method=op["method"],
request_args=_req)
if ht_add:
ht_args.update({"headers": ht_add})
if trace:
trace.request("URL: %s" % url)
trace.request("BODY: %s" % body)
response, content = client.http_request(url, method=op["method"],
body=body, trace=trace,
**ht_args)
if trace:
trace.reply("RESPONSE: %s" % response)
trace.reply("CONTENT: %s" % unicode(content, encoding="utf-8"))
elif "function" in op:
func = getattr(operations, op["function"])
try:
_args = op["args"]
except KeyError:
_args = {}
_args["trace"] = trace
if trace:
trace.request("FUNCTION: %s" % func.__name__)
trace.request("ARGS: %s" % _args)
response, content = func(client, response, content, **_args)
if trace:
trace.reply("RESPONSE: %s" % response)
trace.reply("CONTENT: %s" % unicode(content, encoding="utf-8"))
else:
try:
url = response.url
except AttributeError:
url = response["location"]
if op["method"] == "POST":
body = content
else:
body=None
if "Content-type" in response:
headers = {"Content-type": response["Content-type"]}
else:
headers = {}
if trace:
trace.request("URL: %s" % url)
trace.request("BODY: %s" % body)
response, content = client.http_request(url, method=op["method"],
body=body, headers=headers,
trace=trace)
if trace:
trace.reply("RESPONSE: %s" % response)
trace.reply("CONTENT: %s" % unicode(content, encoding="utf-8"))
return response, content
def discover(principal):
c = Consumer(None, None)
return c.discover(principal)
def provider_config(issuer):
c = Consumer(None, None)
return c.provider_config(issuer)
def register(endpoint, info):
c = Consumer(None, None)
return c.register(endpoint, **info)
if __name__ == "__main__":
import argparse
import json
parser = argparse.ArgumentParser()
parser.add_argument('-v', dest='verbose', action='store_true')
parser.add_argument('-d', dest='debug', action='store_true')
parser.add_argument('-c', dest='conf_file')
parser.add_argument('-s', dest='server_conf')
parser.add_argument('-p', dest="principal")
parser.add_argument('-C', dest="ca_certs")
parser.add_argument('-f', dest="flow")
parser.add_argument('-R', dest="register", action="store_true")
parser.add_argument('-i', dest="conf_issuer")
parser.add_argument('-J', dest="json_config_file")
parser.add_argument('-A', dest="function_args")
args = parser.parse_args()
trace = Trace()
if args.json_config_file:
json_config = json.loads(open(args.json_config_file).read())
else:
json_config = None
sconf = {}
if args.server_conf:
if args.server_conf.startswith("http"):
if args.ca_certs:
http = Http(ca_certs=args.ca_certs)
else:
http = Http(disable_ssl_certificate_validation=True)
response, content = http.request(args.server_conf)
sconf = json.loads(content)
else:
conf_mod = import_module(args.server_conf)
sconf = conf_mod.setup
elif args.conf_issuer:
sconf = provider_config(args.conf_issuer)
elif json_config:
if "server_conf" in json_config:
sconf = json_config["server_conf"]
elif "provider_conf_url" in json_config:
sconf = provider_config(json_config["provider_conf_url"]).dictionary()
trace.info("SERVER CONFIGURATION: %s" % sconf)
_htclass = httplib2cookie.CookiefulHttp
if args.ca_certs:
client = Client(ca_certs=args.ca_certs, httpclass=_htclass)
else:
try:
client = Client(ca_certs=json_config["ca_certs"],
httpclass=_htclass)
except KeyError:
client = Client(disable_ssl_certificate_validation=True,
httpclass=_htclass)
client.http_request = client.http.crequest
for key, val in sconf.items():
if key.endswith("_endpoint"):
setattr(client, key, val)
if args.conf_file:
mod = import_module(args.conf_file)
cconf = mod.CONFIG
elif "config" in json_config:
cconf = json_config["config"]
else:
raise Exception("Missing client configuration")
for prop in ["client_id", "redirect_uri", "password"]:
try:
setattr(client, prop, cconf[prop])
except KeyError:
pass
if args.register or "register" in json_config:
info = {}
for prop in ["contact", "redirect_uri", "application_name",
"application_type"]:
info[prop] = cconf[prop]
resp = register(sconf["registration_endpoint"], info)
for prop in ["client_id", "client_secret"]:
try:
setattr(client, prop, resp[prop])
except KeyError:
pass
trace.info("REGISTRATION INFORMATION: %s" % resp.dictionary())
#client.http = MyFakeOICServer()
client.state = "STATE0"
if args.flow:
sequences = [getattr(operations, oper.strip()) for oper in
args.flow.split(",")]
elif "flows" in json_config:
sequences = []
for flows in json_config["flows"]:
sequence = []
for flow in flows:
(items, resp) = json_config["phases"][flow]
if isinstance(items, basestring):
seq = [getattr(operations, items.strip())]
else:
seq = [getattr(operations, item.strip()) for item in items]
resp = getattr(operations, resp.strip())
sequence.append((seq, resp))
sequences.append(sequence)
elif "sequences" in json_config:
sequences = json_config["sequences"]
else:
sequences = []
response = None
content = None
for sequence in sequences:
for opers, resp in sequence:
err = None
for oper in opers:
if trace:
trace.info(70*"=")
try:
response, content = do_operation(client, oper, response,
content, trace)
#print content
except Exception, err:
trace.error("%s: %s" % (err.__class__.__name__, err))
break
while response.status == 302:
try:
url = response.url
except AttributeError:
url = response["location"]
# If back to me
for_me = False
for redirect_uri in client.redirect_uri:
if url.startswith(redirect_uri):
for_me=True
if for_me:
break
else:
if trace:
trace.info(70*"-")
response,content = do_request(client, url, "GET",
trace=trace)
if response.status >= 400:
if response["content-type"] == "application/json":
err = ErrorResponse.set_json(content)
if trace:
trace.error("%s: %s" % (response.status,
err.get_json()))
else:
err = content
break
if err is None:
if resp["where"] == "url":
info = response["location"]
else:
info = content
respcls = getattr(message, resp["response"])
qresp = client.parse_response(respcls, info,
resp["type"],
client.state, True)
if trace and qresp:
trace.info("[%s]: %s" % (qresp.__class__.__name__,
qresp.dictionary()))
print trace
trace.clear()
# for res in result:
# if res is not None:
# if isinstance(res, tuple):
# print 60*"-"
# print res[0]
# print res[1]
# if res[2]:
# print res[2].dictionary()
# print
# elif isinstance(res, basestring):
# print res

View File

@ -21,16 +21,17 @@ __author__ = 'rohe0002'
setup(
name="oic",
version="0.0.2",
version="0.0.3",
description="Python implementation of OAuth2 and OpenID Connect",
author = "Roland Hedberg",
author_email = "roland.hedberg@adm.umu.se",
license="Apache 2.0",
packages=["oic", "oic/oauth2", "oic/oic", "oic/utils", "oic/script"],
packages=["oic", "oic/oauth2", "oic/oic", "oic/utils"],
package_dir = {"": "src"},
classifiers = ["Development Status :: 4 - Beta",
"License :: OSI Approved :: Apache Software License",
"Topic :: Software Development :: Libraries :: Python Modules"],
install_requires = ['httplib2', "PyJWT", "pycrypto"],
zip_safe=False,
)

View File

@ -1,262 +0,0 @@
#!/usr/bin/env python
__author__ = 'rohe0002'
import sys
import argparse
import json
from oic.script import httplib2cookie
from oic.script.base import *
QUERY2RESPONSE = {
"AuthorizationRequest": "AuthorizationResponse",
"OpenIDRequest": "OpenIDResponse",
"AccessTokenRequest": "AccessTokenResponse",
"UserInfoRequest": "OpenIDSchema",
"RegistrationRequest": "RegistrationResponse"
}
class OAuth2(object):
client_args = ["client_id", "redirect_uri", "password"]
def __init__(self, operations_mod, message_mod, client_class):
self.operations_mod = operations_mod
self.message_mod = message_mod
self.client_class = client_class
self.client = None
self.trace = Trace()
self._parser = argparse.ArgumentParser()
self._parser.add_argument('-v', dest='verbose', action='store_true')
self._parser.add_argument('-d', dest='debug', action='store_true')
self._parser.add_argument('-C', dest="ca_certs")
self._parser.add_argument('-J', dest="json_config_file")
self._parser.add_argument('-I', dest="interactions")
self._parser.add_argument("-l", dest="list", action="store_true")
self._parser.add_argument("flow")
self.args = None
self.pinfo = None
self.sequences = []
self.function_args = {}
def parse_args(self):
self.json_config= self.json_config_file()
self.pinfo = self.provider_info()
self.client_conf(self.client_args)
def json_config_file(self):
if self.args.json_config_file == "-":
return json.loads(sys.stdin.read())
else:
return json.loads(open(self.args.json_config_file).read())
def run(self):
self.args = self._parser.parse_args()
self.args.flow = self.args.flow.strip("'")
self.args.flow = self.args.flow.strip('"')
if self.args.list:
return self.operations()
else:
self.parse_args()
if self.args.verbose:
print "SERVER CONFIGURATION: %s" % self.pinfo
#client.http = MyFakeOICServer()
_seq = self.make_sequence()
interact = self.get_interactions()
tests = self.get_test()
self.client.state = "STATE0"
try:
run_sequence(self.client, _seq, self.trace, interact,
self.message_mod, self.args.verbose, tests)
except Exception, err:
print self.trace
print err
def operations(self):
lista = []
for key,val in self.operations_mod.FLOWS.items():
item = {"id": key,
"name": val["name"],
"descr": "".join(val["descr"])}
lista.append(item)
return json.dumps(lista)
def provider_info(self):
# Should provide a Metadata class
res = {}
_jc = self.json_config["provider"]
for key in ["version", "issuer", "endpoints", "scopes_supported",
"schema", "user_id_types_supported",
"userinfo_algs_supported",
"id_token_algs_supported",
"request_object_algs_supported",
"provider_trust"]:
if key == "endpoints":
try:
for endp, url in _jc[key].items():
res[endp] = url
except KeyError:
pass
else:
try:
res[key] = _jc[key]
except KeyError:
pass
return res
def client_conf(self, cprop):
_htclass = httplib2cookie.CookiefulHttp
if self.args.ca_certs:
self.client = self.client_class(ca_certs=self.args.ca_certs,
httpclass=_htclass)
else:
try:
self.client = self.client_class(
ca_certs=self.json_config["ca_certs"],
httpclass=_htclass)
except (KeyError, TypeError):
self.client = self.client_class(
disable_ssl_certificate_validation=True,
httpclass=_htclass)
self.client.http_request = self.client.http.crequest
# set the endpoints in the Client from the provider information
for key, val in self.pinfo.items():
if key.endswith("_endpoint"):
setattr(self.client, key, val)
# Client configuration
self.cconf = self.json_config["client"]
# set necessary information in the Client
for prop in cprop:
try:
setattr(self.client, prop, self.cconf[prop])
except KeyError:
pass
def make_sequence(self):
# Whatever is specified on the command line takes precedences
if self.args.flow:
sequence = flow2sequence(self.operations_mod, self.args.flow)
elif self.json_config and "flow" in self.json_config:
sequence = flow2sequence(self.operations_mod,
self.json_config["flow"])
else:
sequence = None
return sequence
def get_interactions(self):
interactions = {}
if self.json_config:
try:
interactions = self.json_config["interaction"]
except KeyError:
pass
if self.args.interactions:
_int = self.args.interactions.replace("\'", '"')
if interactions:
interactions.update(json.loads(_int))
else:
interactions = json.loads(_int)
for url, spec in interactions.items():
try:
func_name, args = spec
func = getattr(self.operations_mod, func_name)
interactions[url] = (func, args)
except ValueError:
interactions[url] = spec
return interactions
def get_test(self):
if self.args.flow:
flow = self.operations_mod.FLOWS[self.args.flow]
elif self.json_config and "flow" in self.json_config:
flow = self.operations_mod.FLOWS[self.json_config["flow"]]
else:
flow = None
try:
return [getattr(self.operations_mod, t) for t in flow["tests"]]
except KeyError:
return []
class OIC(OAuth2):
client_args = ["client_id", "redirect_uri", "password", "client_secret"]
def __init__(self, operations_mod, message_mod, client_class,
consumer_class):
OAuth2.__init__(self, operations_mod, message_mod, client_class)
self._parser.add_argument('-P', dest="provider_conf_url")
self._parser.add_argument('-p', dest="principal")
self._parser.add_argument('-R', dest="register", action="store_true")
self.consumer_class = consumer_class
def parse_args(self):
OAuth2.parse_args(self)
self.register()
def discover(self, principal):
c = self.consumer_class(None, None)
return c.discover(principal)
def provider_config(self, issuer):
c = self.consumer_class(None, None)
return c.provider_config(issuer)
def _register(self, endpoint, info):
c = self.consumer_class(None, None)
return c.register(endpoint, **info)
def provider_info(self):
if "conf_url" in self.json_config["provider"]:
_url = self.json_config["provider"]["conf_url"]
return self.provider_config(_url).dictionary()
else:
return OAuth2.provider_info(self)
def register(self):
# should I register the client ?
if self.args.register or "register" in self.json_config["client"]:
info = {}
for prop in self.message_mod.RegistrationRequest.c_attributes.keys():
try:
info[prop] = self.cconf[prop]
except KeyError:
pass
self.reg_resp = self._register(self.pinfo["registration_endpoint"],
info)
for prop in ["client_id", "client_secret"]:
try:
setattr(self.client, prop, self.reg_resp[prop])
except KeyError:
pass
if self.args.verbose:
print "REGISTRATION INFORMATION: %s" % self.reg_resp
if __name__ == "__main__":
from oic.script import OAuth2
from oic.script import oauth2_operations
from oic.oauth2 import Client
from oic.oauth2 import message
cli = OAuth2(oauth2_operations, message, Client)
cli.run()

View File

@ -1,345 +0,0 @@
#!/usr/bin/env python
__author__ = 'rohe0002'
from importlib import import_module
from oic.oauth2.message import ErrorResponse
class Trace(object):
def __init__(self):
self.trace = []
def request(self, msg):
self.trace.append("--> %s" % msg)
def reply(self, msg):
self.trace.append("<-- %s" % msg)
def info(self, msg):
self.trace.append("%s" % msg)
def error(self, msg):
self.trace.append("[ERROR] %s" % msg)
def warning(self, msg):
self.trace.append("[WARNING] %s" % msg)
def __str__(self):
return "\n". join([t.encode("utf-8") for t in self.trace])
def clear(self):
self.trace = []
def flow2sequence(operations, item):
flow = operations.FLOWS[item]
return [operations.PHASES[phase] for phase in flow["sequence"]]
def endpoint(client, base):
for _endp in client._endpoints:
if getattr(client, _endp) == base:
return True
return False
def do_request(client, url, method, body="", headers=None, trace=False):
if headers is None:
headers = {}
if trace:
trace.request("URL: %s" % url)
trace.request("BODY: %s" % body)
response, content = client.http_request(url, method=method,
body=body, headers=headers,
trace=trace)
if trace:
trace.reply("RESPONSE: %s" % response)
trace.reply("CONTENT: %s" % unicode(content, encoding="utf-8"))
return response, content
#noinspection PyUnusedLocal
def do_operation(client, opdef, message_mod, response=None, content=None,
trace=None, location=""):
op = opdef
qresp = None
if "request" in op:
if isinstance(op["request"], tuple):
(mod, klass) = op["request"]
imod = import_module(mod)
cls = getattr(imod, klass)
else:
cls = getattr(message_mod, op["request"])
try:
kwargs = op["args"]["kw"].copy()
except KeyError:
kwargs = {}
try:
kwargs["request_args"] = op["args"]["request"].copy()
_req = kwargs["request_args"]
except KeyError:
_req = {}
try:
kwargs["extra_args"] = op["args"]["extra"].copy()
except KeyError:
pass
cis = getattr(client, "construct_%s" % cls.__name__)(cls, **kwargs)
ht_add = None
if "authn_method" in kwargs:
(r_arg, h_arg) = client.init_authentication_method(**kwargs)
if r_arg:
for key,val in r_arg.items():
setattr(cis, key, val)
else:
h_arg = None
url, body, ht_args, cis = client.uri_and_body(cls, cis,
method=op["method"],
request_args=_req)
if h_arg:
ht_args.update(h_arg)
if ht_add:
ht_args.update({"headers": ht_add})
if trace:
trace.request("URL: %s" % url)
trace.request("BODY: %s" % body)
response, content = client.http_request(url, method=op["method"],
body=body, trace=trace,
**ht_args)
if trace:
trace.reply("RESPONSE: %s" % response)
trace.reply("CONTENT: %s" % unicode(content, encoding="utf-8"))
elif "function" in op:
func = op["function"]
try:
_args = op["args"].copy()
except (KeyError, AttributeError):
_args = {}
_args["_trace_"] = trace
_args["location"] = location
if trace:
trace.request("FUNCTION: %s" % func.__name__)
trace.request("ARGS: %s" % _args)
url, response, content = func(client, response, content, **_args)
else:
try:
url = response.url
except AttributeError:
url = response["location"]
if op["method"] == "POST":
body = content
else:
body=None
if "Content-type" in response:
headers = {"Content-type": response["Content-type"]}
else:
headers = {}
if trace:
trace.request("URL: %s" % url)
trace.request("BODY: %s" % body)
response, content = client.http_request(url, method=op["method"],
body=body, headers=headers,
trace=trace)
if trace:
trace.reply("RESPONSE: %s" % response)
trace.reply("CONTENT: %s" % unicode(content, encoding="utf-8"))
return url, response, content
def rec_update(dic0, dic1):
res = {}
for key, val in dic0.items():
if key not in dic1:
res[key] = val
else:
if isinstance(val, dict):
res[key] = rec_update(val, dic1[key])
else:
res[key] = dic1[key]
for key, val in dic1.items():
if key in dic0:
continue
else:
res[key] = val
return res
def run_sequence(client, sequence, trace, interaction, message_mod, verbose,
tests=None):
item = []
response = None
content = None
err = None
for req, resp in sequence:
err = None
if trace:
trace.info(70*"=")
try:
extra_args = interaction[req["request"]]
try:
req["args"] = rec_update(req["args"], extra_args)
except KeyError:
req["args"] = extra_args
except KeyError:
pass
try:
url, response, content = do_operation(client, req,
message_mod,
response, content,
trace)
except Exception, err:
trace.error("%s: %s" % (err.__class__.__name__, err))
break
done = False
while not done:
while response.status in [302, 301, 303]:
try:
url = response.url
except AttributeError:
url = response["location"]
# If back to me
for_me = False
for redirect_uri in client.redirect_uri:
if url.startswith(redirect_uri):
for_me=True
if for_me:
done = True
break
else:
if trace:
trace.info(70*".")
response, content = do_request(client, url, "GET",
trace=trace)
if done or err:
break
if response.status >= 400:
if response["content-type"] == "application/json":
err = ErrorResponse.set_json(content)
if trace:
trace.error("%s: %s" % (response.status,
err.get_json()))
else:
err = content
break
if trace:
trace.info(70*"=")
_base = url.split("?")[0]
try:
_spec = interaction[_base]
except KeyError:
if endpoint(client, _base):
break
trace.error("No interaction bound to '%s'" % _base)
raise
_op = {"function": _spec[0], "args": _spec[1]}
try:
url, response, content = do_operation(client, _op,
message_mod,
response, content,
trace, location=url)
#print content
except Exception, err:
trace.error("%s: %s" % (err.__class__.__name__, err))
raise
if err is None:
if resp["where"] == "url":
try:
info = response["location"]
except KeyError:
trace.error("Not a final redirect")
raise Exception
else:
info = content
if info:
if isinstance(resp["response"], tuple):
(mod, klass) = resp["response"]
imod = import_module(mod)
respcls = getattr(imod, klass)
else:
respcls = getattr(message_mod, resp["response"])
try:
qresp = client.parse_response(respcls, info,
resp["type"],
client.state, True,
key=client.client_secret,
client_id=client.client_id)
if trace and qresp:
trace.info("[%s]: %s" % (qresp.__class__.__name__,
qresp.dictionary()))
item.append(qresp)
except Exception, err:
trace.error("info: %s" % info)
trace.error("%s" % err)
raise
if err:
break
if err or verbose:
print trace
if not err and tests:
for test in tests:
assert test(client, item)
return err
def run_sequences(client, sequences, trace, interaction, message_mod,
verbose=False):
for sequence, endpoints, fid in sequences:
# clear cookie cache
client.grant.clear()
try:
client.http.cookiejar.clear()
except AttributeError:
pass
err = run_sequence(client, sequence, trace, interaction, message_mod,
verbose)
if err:
print "%s - FAIL" % fid
print
if not verbose:
print trace
else:
print "%s - OK" % fid
trace.clear()

View File

@ -1,158 +0,0 @@
# ========================================================================
# Copyright (c) 2007, Metaweb Technologies, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY METAWEB TECHNOLOGIES AND CONTRIBUTORS
# ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL METAWEB
# TECHNOLOGIES OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
# ========================================================================
#
#
# httplib2cookie.py allows you to use python's standard
# CookieJar class with httplib2.
#
#
import re
import cookielib
from httplib2 import Http, RedirectLimit
import urllib
import urllib2
class DummyRequest(object):
"""Simulated urllib2.Request object for httplib2
implements only what's necessary for cookielib.CookieJar to work
"""
def __init__(self, url, headers=None):
self.url = url
self.headers = headers
self.origin_req_host = urllib2.request_host(self)
self.type, r = urllib.splittype(url)
self.host, r = urllib.splithost(r)
if self.host:
self.host = urllib.unquote(self.host)
def get_full_url(self):
return self.url
def get_origin_req_host(self):
# TODO to match urllib2 this should be different for redirects
return self.origin_req_host
def get_type(self):
return self.type
def get_host(self):
return self.host
def get_header(self, key, default=None):
return self.headers.get(key.lower(), default)
def has_header(self, key):
return key in self.headers
def add_unredirected_header(self, key, val):
# TODO this header should not be sent on redirect
self.headers[key.lower()] = val
def is_unverifiable(self):
# TODO to match urllib2, this should be set to True when the
# request is the result of a redirect
return False
class DummyResponse(object):
"""Simulated urllib2.Request object for httplib2
implements only what's necessary for cookielib.CookieJar to work
"""
def __init__(self, response):
self.response = response
def info(self):
return DummyMessage(self.response)
class DummyMessage(object):
"""Simulated mimetools.Message object for httplib2
implements only what's necessary for cookielib.CookieJar to work
"""
def __init__(self, response):
self.response = response
#noinspection PyUnusedLocal
def getheaders(self, k):
k = k.lower()
v = self.response.get(k.lower(), None)
if k not in self.response:
return []
#return self.response[k].split(re.compile(',\\s*'))
# httplib2 joins multiple values for the same header
# using ','. but the netscape cookie format uses ','
# as part of the expires= date format. so we have
# to split carefully here - header.split(',') won't do it.
HEADERVAL= re.compile(r'\s*(([^,]|(,\s*\d))+)')
return [h[0] for h in HEADERVAL.findall(self.response[k])]
class CookiefulHttp(Http):
"""Subclass of httplib2.Http that keeps cookie state
constructor takes an optional cookiejar=cookielib.CookieJar
currently this does not handle redirects completely correctly:
if the server redirects to a different host the original
cookies will still be sent to that host.
"""
def __init__(self, cookiejar=None, **kws):
# note that httplib2.Http is not a new-style-class
Http.__init__(self, **kws)
if cookiejar is None:
cookiejar = cookielib.CookieJar()
self.cookiejar = cookiejar
def crequest(self, uri, trace=None, **kws):
"""
"""
headers = kws.pop('headers', {})
req = DummyRequest(uri, headers)
self.cookiejar.add_cookie_header(req)
headers = req.headers
if trace:
trace.request("HEADERS: %s" % headers)
try:
(r, body) = Http.request(self, uri, headers=headers,
redirections=0, **kws)
except RedirectLimit, err:
r = err.response
body = err.content
resp = DummyResponse(r)
self.cookiejar.extract_cookies(resp, req)
return r, body

View File

@ -1,119 +0,0 @@
from oic.script.opfunc import *
#from opfunc import *
RESPOND = {
"method": "POST",
}
AUTHZREQ_CODE = {
"request": "AuthorizationRequest",
"method": "GET",
"args": {
"request": {"response_type": "code"},
}
}
AUTHZRESP = {
"response": "AuthorizationResponse",
"where": "url",
"type": "urlencoded",
}
ACCESS_TOKEN_RESPONSE = {
"response": "AccessTokenResponse",
"where": "body",
"type": "json"
}
USER_INFO_RESPONSE = {
"response": "OpenIDSchema",
"where": "body",
"type": "json"
}
ACCESS_TOKEN_REQUEST_PASSWD = {
"request":"AccessTokenRequest",
"method": "POST",
"args": {
"kw": {"authn_method": "client_secret_basic"}
},
}
ACCESS_TOKEN_REQUEST_CLI_SECRET_POST = {
"request":"AccessTokenRequest",
"method": "POST",
"args": {
"kw": {"authn_method": "client_secret_post"}
},
}
ACCESS_TOKEN_REQUEST_CLI_SECRET_GET = {
"request":"AccessTokenRequest",
"method": "GET",
"args": {
"kw": {"authn_method": "client_secret_post"}
},
}
ACCESS_TOKEN_REQUEST_FACEBOOK = {
"request":("facebook","AccessTokenRequest"),
"method": "GET",
"args": {
"kw": {"authn_method": "client_secret_post"}
},
}
ACCESS_TOKEN_RESPONSE_FACEBOOK = {
"response": ("facebook", "AccessTokenResponse"),
"where": "body",
"type": "urlencoded"
}
PHASES= {
"login": (AUTHZREQ_CODE, AUTHZRESP),
"access-token-request-post":(ACCESS_TOKEN_REQUEST_CLI_SECRET_POST,
ACCESS_TOKEN_RESPONSE),
"access-token-request-get":(ACCESS_TOKEN_REQUEST_CLI_SECRET_GET,
ACCESS_TOKEN_RESPONSE),
"facebook-access-token-request-get":(ACCESS_TOKEN_REQUEST_FACEBOOK,
ACCESS_TOKEN_RESPONSE_FACEBOOK),
# "coip-login": ([AUTHZREQ_CODE, CHOSE, SELECT_FORM, LOGIN_FORM, POST_FORM],
# AUTHZRESP),
}
FLOWS = {
'basic-code-authn': {
"name": 'Basic Code flow with authentication',
"descr": ('Very basic test of a Provider using the authorization code ',
'flow. The test tool acting as a consumer is very relaxed',
'and tries to obtain an ID Token.'),
"sequence": ["login"],
"endpoints": ["authorization_endpoint"]
},
'basic-code-idtoken-post': {
"name": 'Basic Code flow with ID Token',
"descr": ('Very basic test of a Provider using the authorization code ',
'flow. The test tool acting as a consumer is very relaxed',
'and tries to obtain an ID Token.'),
"depends": ["basic-code-authn"],
"sequence": ["login", "access-token-request-post"],
"endpoints": ["authorization_endpoint", "token_endpoint"]
},
'basic-code-idtoken-get': {
"name": 'Basic Code flow with ID Token',
"descr": ('Very basic test of a Provider using the authorization code ',
'flow. The test tool acting as a consumer is very relaxed',
'and tries to obtain an ID Token.'),
"depends": ["basic-code-authn"],
"sequence": ["login", "access-token-request-get"],
"endpoints": ["authorization_endpoint", "token_endpoint"]
},
'facebook-idtoken-get': {
"name": 'Facebook flow with ID Token',
"descr": ('Facebook specific flow'),
"depends": ["basic-code-authn"],
"sequence": ["login", "facebook-access-token-request-get"],
"endpoints": ["authorization_endpoint", "token_endpoint"]
},
}

View File

@ -1,222 +0,0 @@
__author__ = 'rohe0002'
from oic.script.opfunc import *
# ========================================================================
RESPOND = {
"method": "POST",
}
AUTHZREQ_CODE = {
"request": "AuthorizationRequest",
"method": "GET",
"args": {
"request": {"response_type": "code",
"scope": ["openid"],
},
}
}
AUTHZRESP = {
"response": "AuthorizationResponse",
"where": "url",
"type": "urlencoded",
}
OPENID_REQUEST_CODE = {
"request": "OpenIDRequest",
"method": "GET",
"args": {"request": {"response_type": "code", "scope": ["openid"]}}
}
OPENID_REQUEST_TOKEN = {
"request": "OpenIDRequest",
"method": "GET",
"args": {"request": {"response_type": "token", "scope": ["openid"]}}
}
# {"OpenIDRequest": {"request", {"response_type":["code","token"]}}}
OPENID_REQUEST_CODE_TOKEN = {
"request": "OpenIDRequest",
"method": "GET",
"args": {"request": {"response_type": ["code","token"],
"scope": ["openid"]}}
}
OPENID_REQUEST_CODE_IDTOKEN = {
"request": "OpenIDRequest",
"method": "GET",
"args": {"request": {"response_type": ["code","id_token"],
"scope": ["openid"]}}
}
OPENID_REQUEST_TOKEN_IDTOKEN = {
"request": "OpenIDRequest",
"method": "GET",
"args": {"request": {"response_type": ["token","id_token"],
"scope": ["openid"]}}
}
OPENID_REQUEST_CODE_TOKEN_IDTOKEN = {
"request": "OpenIDRequest",
"method": "GET",
"args": {"request": {"response_type": ["code", "token", "id_token"],
"scope": ["openid"]}}
}
# 2.1.2.1.2
# The User Identifier for which an ID Token is being requested.
# If the specified user is not currently authenticated to the Authorization
# Server, they may be prompted for authenticated, unless the prompt parameter
# in the Authorization Request is set to none. The Claim Value in the request
# is an object containing the single element value.
#"user_id": {"value":"248289761001"}
#OPENID_REQUEST_CODE_21212 = {
# "request": "AuthorizationRequest",
# "method": "GET",
# "args": {
# "request": {"response_type": "code",
# "scope": ["openid"],
# "prompt": "none"
# },
# "kw": {
# "idtoken_claims": {
# "claims": {"user_id": {"value":"248289761001"}}
# }
# }
# }
#}
ACCESS_TOKEN_RESPONSE = {
"response": "AccessTokenResponse",
"where": "body",
"type": "json"
}
USER_INFO_RESPONSE = {
"response": "OpenIDSchema",
"where": "body",
"type": "json"
}
ACCESS_TOKEN_REQUEST_PASSWD = {
"request":"AccessTokenRequest",
"method": "POST",
"args": {
"kw": {"authn_method": "client_secret_basic"}
},
}
ACCESS_TOKEN_REQUEST_CLI_SECRET = {
"request":"AccessTokenRequest",
"method": "POST",
"args": {
"kw": {"authn_method": "client_secret_post"}
},
}
USER_INFO_REQUEST = {
"request":"UserInfoRequest",
"method": "POST",
"args": {
"kw": {"authn_method": "bearer_header"}
},
}
USER_INFO_REQUEST_BODY = {
"request":"UserInfoRequest",
"method": "POST",
"args": {
"kw": {"authn_method": "bearer_body"}
},
}
USER_INFO_REQUEST_BODY_GET = {
"request":"UserInfoRequest",
"method": "GET",
"args": {
"kw": {"authn_method": "bearer_body"}
},
}
PROVIDER_CONFIGURATION = {
"request": "ProviderConfigurationRequest"
}
CHECK_ID_REQUEST = {
"request": "CheckIDRequest",
"method": "POST"
}
CHECK_ID_RESPONSE = {
"response": "IdToken",
"where": "body",
"type": "json"
}
PHASES= {
"login": (AUTHZREQ_CODE, AUTHZRESP),
"oic-login": (OPENID_REQUEST_CODE, AUTHZRESP),
"oic-login-token": (OPENID_REQUEST_TOKEN, AUTHZRESP),
"oic-login-idtoken-token": (OPENID_REQUEST_TOKEN_IDTOKEN, AUTHZRESP),
# "login-form": ([AUTHZREQ_CODE, LOGIN_FORM], AUTHZRESP),
# "login-form-approve": ([AUTHZREQ_CODE, LOGIN_FORM, APPROVE_FORM],
# AUTHZRESP),
"access-token-request":(ACCESS_TOKEN_REQUEST_CLI_SECRET,
ACCESS_TOKEN_RESPONSE),
"check-id-request":(CHECK_ID_REQUEST, CHECK_ID_RESPONSE),
"user-info-request":(USER_INFO_REQUEST_BODY, USER_INFO_RESPONSE)
}
FLOWS = {
'openid-code': {
"name": 'Basic OpenID Connect Code flow with authentication',
"descr": ('Very basic test of a Provider using the authorization code ',
'flow. The test tool acting as a consumer is very relaxed',
'and tries to obtain an access code.'),
"sequence": ["login"],
"endpoints": ["authorization_endpoint"]
},
'openid-token': {
"name": 'Basic OpenID Connect Token flow with authentication',
"descr": ('Very basic test of a OIC Provider using the token ',
'flow. The test tool acting as a consumer is very relaxed',
'and tries to obtain an access token.'),
"sequence": ["oic-login-token"],
"endpoints": ["authorization_endpoint"]
},
'openid-code-userdata': {
"name": 'Basic Code flow with ID Token and User data',
"descr": ('Very basic test of a Provider using the authorization code'
' flow, but in addition to retrieve an ID Token,',
' this test flow also tried to obtain user data.'),
"depends": ['openid-token'],
"sequence": ["oic-login", "access-token-request", "user-info-request"],
"endpoints": ["authorization_endpoint", "token_endpoint",
"userinfo_endpoint"],
},
'openid-token-idtoken-userdata': {
"name": 'Token flow with ID Token and User data',
"descr": ('Very basic test of a Provider using the authorization code'
' flow, but in addition to retrieve an ID Token,',
' this test flow also tried to obtain user data.'),
"depends": ['openid-token'],
"sequence": ["oic-login-idtoken-token", "user-info-request"],
"endpoints": ["authorization_endpoint", "userinfo_endpoint"],
},
'openid-token-idtoken-check_id': {
"name": 'OpenID Connect Token flow with check id',
"descr": ('Very basic test of a OIC Provider using the token ',
'flow. The test tool acting as a consumer is very relaxed',
'and tries to obtain an ID Token.'),
"depends": ["openid-token"],
"sequence": ["oic-login-idtoken-token", "check-id-request"],
"endpoints": ["authorization_endpoint", "check_id_endpoint"],
"tests": ["cmp_idtoken"]
},
}

View File

@ -1,280 +0,0 @@
__author__ = 'rohe0002'
import json
#import jwt
from urlparse import urlparse
from mechanize import ParseResponse
from mechanize._form import ControlNotFoundError
#from httplib2 import Response
class FlowException(Exception):
def __init__(self, function="", content="", url=""):
Exception.__init__(self)
self.function = function
self.content = content
self.url = url
def __str__(self):
return json.dumps(self.__dict__)
class DResponse():
def __init__(self, **kwargs):
self.status = 200
self.index = 0
self._message = ""
self.url = ""
if kwargs:
for key, val in kwargs.items():
if val:
self.__setitem__(key, val)
def __setitem__(self, key, value):
setattr(self, key, value)
def __getitem__(self, item):
if item == "content-location":
return self.url
elif item == "content-length":
return len(self._message)
else:
return getattr(self, item)
def geturl(self):
return self.url
def read(self, size=0):
if size:
if self._len < size:
return self._message
else:
if self._len == self.index:
part = None
elif self._len - self.index < size:
part = self._message[self.index:]
self.index = self._len
else:
part = self._message[self.index:self.index+size]
self.index += size
return part
else:
return self._message
def write(self, message):
self._message = message
self._len = len(message)
def do_request(client, url, method, body="", headers=None, trace=False):
if headers is None:
headers = {}
if trace:
trace.request("URL: %s" % url)
trace.request("BODY: %s" % body)
response, content = client.http_request(url, method=method,
body=body, headers=headers,
trace=trace)
if trace:
trace.reply("RESPONSE: %s" % response)
trace.reply("CONTENT: %s" % unicode(content, encoding="utf-8"))
return url, response, content
def pick_form(response, content, url=None, **kwargs):
forms = ParseResponse(response)
if not forms:
raise FlowException(content=content, url=url)
_form = None
if len(forms) == 1:
_form = forms[0]
else:
_dict = kwargs["_form_pick_"]
for form in forms:
if _form:
break
_keys = form.attrs.keys()
for key,val in _dict.items():
if key in _keys:
if val == form.attrs[key]:
_form = form
elif key == "control":
try:
orig_val = form[key]
if isinstance(orig_val, basestring):
if orig_val == val:
_form = form
elif val in orig_val:
_form = form
except KeyError:
pass
else:
_form = None
if not _form:
break
return _form
def do_click(client, form, **kwargs):
request = form.click()
headers = {}
for key, val in request.unredirected_hdrs.items():
headers[key] = val
url = request._Request__original
try:
_trace = kwargs["_trace_"]
except KeyError:
_trace = False
if form.method == "POST":
return do_request(client, url, "POST", request.data, headers, _trace)
else:
return do_request(client, url, "GET", headers=headers, trace=_trace)
#noinspection PyUnusedLocal
def login_form(client, orig_response, content, **kwargs):
try:
_url = orig_response["content-location"]
except KeyError:
_url = kwargs["location"]
# content is a form to be filled in and returned
response = DResponse(status=orig_response["status"], url=_url)
response.write(content)
form = pick_form(response, content, _url, **kwargs)
try:
form[kwargs["user_label"]] = kwargs["user"]
except KeyError:
pass
try:
form[kwargs["password_label"]] = kwargs["password"]
except KeyError:
pass
return do_click(client, form, **kwargs)
#noinspection PyUnusedLocal
def approve_form(client, orig_response, content, **kwargs):
# content is a form to be filled in and returned
response = DResponse(status=orig_response["status"])
if orig_response["status"] == 302:
response.url = orig_response["content-location"]
else:
response.url = client.authorization_endpoint
response.write(content)
form = pick_form(response, content, **kwargs)
# do something with args
return do_click(client, form, **kwargs)
def select_form(client, orig_response, content, **kwargs):
try:
_url = orig_response["content-location"]
except KeyError:
_url = kwargs["location"]
# content is a form to be filled in and returned
response = DResponse(status=orig_response["status"], url=_url)
response.write(content)
form = pick_form(response, content, _url, **kwargs)
for key, val in kwargs.items():
if key.startswith("_"):
continue
try:
form[key] = val
except ControlNotFoundError:
pass
return do_click(client, form, **kwargs)
#noinspection PyUnusedLocal
def chose(client, orig_response, content, **kwargs):
try:
_url = orig_response["content-location"]
except KeyError:
_url = kwargs["location"]
part = urlparse(_url)
#resp = Response({"status":"302"})
try:
_trace = kwargs["trace"]
except KeyError:
_trace = False
url = "%s://%s%s" % (part[0], part[1], kwargs["path"])
return do_request(client, url, "GET", trace=_trace)
#return resp, ""
def post_form(client, orig_response, content, **kwargs):
_url = orig_response["content-location"]
# content is a form to be filled in and returned
response = DResponse(status=orig_response["status"], url=_url)
response.write(content)
form = pick_form(response, content, _url, **kwargs)
return do_click(client, form, **kwargs)
# ========================================================================
#LOGIN_FORM = {
# "function": login_form,
# "args": {
# "user_label": "login",
# "password_label": "password",
# "user": "username",
# "password": "hemligt"
# }
#}
LOGIN_FORM = {
"id": "login_form",
"function": login_form,
}
APPROVE_FORM = {
"id": "approve_form",
"function": approve_form,
}
CHOSE = {
"id": "chose",
"function": chose,
"args": { "path": "/account/fake"}
}
SELECT_FORM = {
"id": "select_form",
"function": select_form,
"args": { }
}
POST_FORM = {
"id": "post_form",
"function": post_form,
}
# ========================================================================
from oic.oic.message import IdToken
def cmp_idtoken(client, item):
idt = IdToken.from_jwt(item[0].id_token, key=client.client_secret)
return idt.dictionary() == item[1].dictionary()
# ========================================================================