Replaced M2Crypto with PyCrypto

This commit is contained in:
Roland Hedberg 2013-12-02 14:42:36 +01:00
parent 6687be2de4
commit a261aa77a8
12 changed files with 164 additions and 111 deletions

View File

@ -414,7 +414,7 @@ class Provider(object):
pass
# Pick authentication method
_authn = self.pick_auth(areq)
_authn, acr = self.pick_auth(areq=areq)
try:
identity = _authn.authenticated_as(**a_args)

78
src/oic/utils/aes.py Normal file
View File

@ -0,0 +1,78 @@
#!/usr/bin/env python
import os
from Crypto import Random
from Crypto.Cipher import AES
from base64 import b64encode, b64decode
__author__ = 'rolandh'
POSTFIX_MODE = {
"cbc": AES.MODE_CBC,
"cfb": AES.MODE_CFB,
"ecb": AES.MODE_CFB,
}
def build_cipher(key, iv, alg="aes_128_cbc"):
"""
:param key: encryption key
:param iv: init vector
:param alg: cipher algorithm
:return: A Cipher instance
"""
typ, bits, cmode = alg.split("_")
if not iv:
iv = Random.new().read(AES.block_size)
else:
assert len(iv) == AES.block_size
if bits not in ["128", "192", "256"]:
raise Exception("Unsupported key length")
try:
assert len(key) == int(bits) >> 3
except AssertionError:
raise Exception("Wrong Key length")
try:
return AES.new(key, POSTFIX_MODE[cmode], iv), iv
except KeyError:
raise Exception("Unsupported chaining mode")
def encrypt(key, msg, iv=None, alg="aes_128_cbc"):
"""
:param key: The encryption key
:param iv: init vector
:param msg: Message to be encrypted
:return: The encrypted message base64 encoded
"""
cipher, iv = build_cipher(key, iv, alg)
return b64encode(iv + cipher.encrypt(msg))
def decrypt(key, msg, iv=None):
"""
:param key: The encryption key
:param iv: init vector
:param msg: Base64 encoded message to be decrypted
:return: The decrypted message
"""
data = b64decode(msg)
_iv = data[:AES.block_size]
if iv:
assert iv == _iv
cipher, iv = build_cipher(key, iv)
return cipher.decrypt(data)[AES.block_size:]
if __name__ == "__main__":
key_ = "1234523451234545" # 16 byte key
# Message has to be multiple of 16 in length
msg_ = "ToBeOrNotTobe WS01234567"
iv_ = os.urandom(16)
encrypted_msg = encrypt(key_, msg_, iv_)
print decrypt(key_, encrypted_msg, iv_)
encrypted_msg = encrypt(key_, msg_, 0)
print decrypt(key_, encrypted_msg, 0)

View File

@ -1,60 +0,0 @@
#!/usr/bin/env python
import os
__author__ = 'rolandh'
import M2Crypto
from base64 import b64encode, b64decode
def AES_build_cipher(key, iv, op=1, alg="aes_128_cbc"):
"""
:param key: encryption key
:param iv: init vector
:param op: key usage - 1 (encryption) or 0 (decryption)
:param alg: cipher algorithm
:return: A Cipher instance
"""
return M2Crypto.EVP.Cipher(alg=alg, key=key, iv=iv, op=op)
def AES_encrypt(key, msg, iv=None):
"""
:param key: The encryption key
:param iv: init vector
:param msg: Message to be encrypted
:return: The encrypted message base64 encoded
"""
if iv is None:
iv = '\0' * 16
cipher = AES_build_cipher(key, iv, 1)
v = cipher.update(msg)
v = v + cipher.final()
v = b64encode(v)
return v
def AES_decrypt(key, msg, iv=None):
"""
:param key: The encryption key
:param iv: init vector
:param msg: Base64 encoded message to be decrypted
:return: The decrypted message
"""
if iv is None:
iv = '\0' * 16
data = b64decode(msg)
cipher = AES_build_cipher(key, iv, 0)
v = cipher.update(data)
v = v + cipher.final()
return v
if __name__ == "__main__":
key = "123452345"
msg = "ToBeOrNotTobe W.S."
iv = os.urandom(16)
encrypted_msg = AES_encrypt(key, msg, iv)
print AES_decrypt(key, encrypted_msg, iv)

View File

@ -1,7 +1,7 @@
from oic.oauth2 import rndstr
from oic.oauth2.exception import UnsupportedMethod
from oic.utils.aes_m2c import AES_encrypt
from oic.utils.aes_m2c import AES_decrypt
from oic.utils.aes import encrypt
from oic.utils.aes import decrypt
__author__ = 'rohe0002'
@ -330,6 +330,7 @@ class CookieDealer(object):
self.init_srv(srv)
# minutes before the interaction should be completed
self.cookie_ttl = ttl # N minutes
self.pad_chr = " "
def init_srv(self, srv):
if srv:
@ -353,9 +354,14 @@ class CookieDealer(object):
if cookie_name is None:
cookie_name = self.srv.cookie_name
timestamp = str(int(time.mktime(time.gmtime())))
info = AES_encrypt(self.srv.symkey,
"::".join([value, timestamp, typ]),
self.srv.iv)
_msg = "::".join([value, timestamp, typ])
if self.srv.symkey:
# Pad the message to be multiples of 16 bytes in length
lm = len(_msg)
_msg = _msg.ljust(lm + 16 - lm % 16, self.pad_chr)
info = encrypt(self.srv.symkey, _msg, self.srv.iv)
else:
info = _msg
cookie = make_cookie(cookie_name, info, self.srv.seed,
expire=ttl, domain="", path="")
return cookie
@ -377,8 +383,14 @@ class CookieDealer(object):
try:
info, timestamp = parse_cookie(cookie_name,
self.srv.seed, cookie)
value, _ts, typ = AES_decrypt(self.srv.symkey, info,
self.srv.iv).split("::")
if self.srv.symkey:
txt = decrypt(self.srv.symkey, info, self.srv.iv)
# strip spaces at the end
txt = txt.rstrip(self.pad_chr)
else:
txt = info
value, _ts, typ = txt.split("::")
if timestamp == _ts:
return value, _ts, typ
except TypeError:

View File

@ -12,9 +12,9 @@ import traceback
from requests import request
from jwkest.jwk import RSA_key, rsa_load
from jwkest.jwk import EC_key
from jwkest.jwk import SYM_key
from jwkest.jwk import RSAKey, rsa_load
#from jwkest.jwk import ECKey
from jwkest.jwk import SYMKey
from M2Crypto.util import no_passphrase_callback
KEYLOADERR = "Failed to load %s key from '%s' (%s)"
@ -29,9 +29,9 @@ class UnknownKeyType(Exception):
K2C = {
"RSA": RSA_key,
"EC": EC_key,
"oct": SYM_key,
"RSA": RSAKey,
#"EC": EC_key,
"oct": SYMKey,
# "pkix": PKIX_key
}
@ -230,12 +230,12 @@ class KeyBundle(object):
def keybundle_from_local_file(filename, typ, usage):
if typ.upper() == "RSA":
kb = KeyBundle()
k = RSA_key()
k = RSAKey()
k.load(filename)
k.use = usage[0]
kb.append(k)
for use in usage[1:]:
_k = RSA_key()
_k = RSAKey()
_k.use = use
_k.key = k.key
kb.append(_k)
@ -561,10 +561,10 @@ def key_setup(vault, **kwargs):
_key = create_and_store_rsa_key_pair(
path=vault_path)
kb.append(RSA_key(key=_key, use=usage, kid=kid))
kb.append(RSAKey(key=_key, use=usage, kid=kid))
kid += 1
if usage == "sig" and "enc" not in kwargs:
kb.append(RSA_key(key=_key, use="enc", kid=kid))
kb.append(RSAKey(key=_key, use="enc", kid=kid))
kid += 1
return kb

View File

@ -3,7 +3,7 @@ import os
import urllib
from urlparse import parse_qs
import jwkest
from jwkest.jwk import SYM_key
from jwkest.jwk import SYMKey
from jwkest.jws import JWS
from mako.lookup import TemplateLookup
from oic.oic import JWT_BEARER
@ -68,7 +68,7 @@ def test_2():
def test_3():
form = create_return_form_env("user", "hemligt", "query=foo")
srv = SRV()
srv.symkey = "symkey"
srv.symkey = rndstr(16)
srv.seed = rndstr()
srv.iv = os.urandom(16)
srv.cookie_name = "xyzxyz"
@ -114,7 +114,7 @@ def test_3():
def test_5():
form = create_return_form_env("user", "hemligt", "QUERY")
srv = SRV()
srv.symkey = "symkey"
srv.symkey = rndstr(16)
srv.seed = rndstr()
srv.iv = os.urandom(16)
srv.cookie_name = "xyzxyz"
@ -140,7 +140,7 @@ def test_5():
def test_6():
form = create_return_form_env("user", "secret", "QUERY")
srv = SRV()
srv.symkey = "symkey"
srv.symkey = rndstr(16)
srv.seed = rndstr()
srv.iv = os.urandom(16)
srv.cookie_name = "xyzxyz"
@ -171,7 +171,7 @@ def test_client_secret_jwt():
assert header == {'alg': 'HS256'}
_rj = JWS()
info = _rj.verify_compact(cas, [SYM_key(key=cli.client_secret)])
info = _rj.verify_compact(cas, [SYMKey(key=cli.client_secret)])
_dict = json.loads(info)
assert _eq(_dict.keys(), ["aud", "iss", "sub", "jti", "exp", "iat"])

View File

@ -36,8 +36,8 @@ def test():
}
LDAP_EXTRAVALIDATION = {
"verifyAttr": "eduPersonAffiliation",
"verifyAttrValid": ['employee', 'staff', 'student']
"verify_attr": "eduPersonAffiliation",
"verify_attr_valid": ['employee', 'staff', 'student']
}
LDAP_EXTRAVALIDATION.update(LDAP)
@ -58,9 +58,10 @@ def test():
res = ac.pick(PASSWORD)
assert res
# list of two 2-tuples
assert len(res) == 2
assert res[0].__class__.__name__ == "CasAuthnMethod"
assert res[1].__class__.__name__ == "UsernamePasswordMako"
assert res[0][0].__class__.__name__ == "CasAuthnMethod"
assert res[1][0].__class__.__name__ == "UsernamePasswordMako"
if __name__ == "__main__":

20
tests/test_http_util.py Normal file
View File

@ -0,0 +1,20 @@
from oic.utils.http_util import CookieDealer
__author__ = 'roland'
class DummyServer():
def __init__(self):
self.symkey = "0123456789012345"
def test_cookie_dealer_1():
cd = CookieDealer(DummyServer())
kaka = cd.create_cookie("Something to pass along", "sso", "Foobar")
#print kaka
value, _ts, typ = cd.get_cookie_value(kaka[1], "Foobar")
assert value == "Something to pass along"
assert typ == "sso"
if __name__ == "__main__":
test_cookie_dealer_1()

View File

@ -4,7 +4,7 @@ from oic.utils.keyio import key_export
from oic.utils.keyio import KeyJar
from oic.utils.keyio import KeyBundle
from oic.utils.keyio import keybundle_from_local_file
from oic.utils.keyio import RSA_key
from oic.utils.keyio import RSAKey
from jwkest.jws import JWS, NoSuitableSigningKeys, WrongTypeOfKey
@ -38,7 +38,7 @@ def test_chain_2():
assert len(kc.get("RSA")) == 2
key = kc.get("RSA")[0]
assert isinstance(key, RSA_key)
assert isinstance(key, RSAKey)
kc.update()
assert kc.remote is False
@ -46,7 +46,7 @@ def test_chain_2():
assert len(kc.get("RSA")) == 2
key = kc.get("RSA")[0]
assert isinstance(key, RSA_key)
assert isinstance(key, RSAKey)
# remote testing is tricky
@ -119,13 +119,14 @@ def test_local_jwk_file():
keys = kj.get_signing_key()
assert len(keys) == 1
key = keys[0]
assert isinstance(key, RSA_key)
assert isinstance(key, RSAKey)
assert key.kid == "abc"
def test_signing():
kb = keybundle_from_local_file("file://jwk.json", "jwk", ["ver", "sig"])
assert len(kb) == 1
# Signing is only possible if key is a private RSA key
kb = keybundle_from_local_file("rsa.key", "rsa", ["ver", "sig"])
assert len(kb) == 2
kj = KeyJar()
kj.issuer_keys[""] = [kb]
keys = kj.get_signing_key()
@ -133,9 +134,9 @@ def test_signing():
_jws = JWS(payload, alg="RS512")
try:
_jwt = _jws.sign_compact(keys)
assert False
except (NoSuitableSigningKeys, WrongTypeOfKey):
assert True
except (NoSuitableSigningKeys, WrongTypeOfKey):
assert False
def test_kid_usage():
@ -149,4 +150,4 @@ def test_kid_usage():
if __name__ == "__main__":
test_kid_usage()
test_signing()

View File

@ -1,8 +1,7 @@
from jwkest.jwk import SYM_key
__author__ = 'rohe0002'
from oic.oauth2.message import *
from jwkest.jwk import SYMKey
from pytest import raises
@ -479,7 +478,7 @@ def test_to_from_jwt():
req_str_list=["spike", "lee"],
opt_json='{"ford": "green"}')
keys = [SYM_key(key="A1B2C3D4")]
keys = [SYMKey(key="A1B2C3D4")]
jws = item.to_jwt(keys, "HS256")
print jws

View File

@ -1,5 +1,6 @@
from mako.lookup import TemplateLookup
from mako.runtime import UNDEFINED
from oic.oauth2 import rndstr
from oic.utils.authn.authn_context import AuthnBroker
from oic.utils.authn.client import verify_client
from oic.utils.authn.user import UserAuthnMethod
@ -71,7 +72,7 @@ class DummyAuthn(UserAuthnMethod):
return {"uid": self.user}
AUTHN_BROKER = AuthnBroker()
AUTHN_BROKER.add(UNDEFINED, DummyAuthn(None, "username"))
AUTHN_BROKER.add("UNDEFINED", DummyAuthn(None, "username"))
# dealing with authorization
AUTHZ = Implicit()
@ -122,7 +123,7 @@ def test_provider_authorization_endpoint():
def test_provider_authenticated():
provider = Provider("pyoicserv", sdb.SessionDB(), CDB, AUTHN_BROKER, AUTHZ,
verify_client)
verify_client, symkey=rndstr(16))
_session_db = {}
cons = Consumer(_session_db, client_config=CLIENT_CONFIG,
server_info=SERVER_INFO, **CONSUMER_CONFIG)
@ -131,9 +132,9 @@ def test_provider_authenticated():
location = cons.begin("http://localhost:8087",
"http://localhost:8088/authorization")
QUERY_STRING = location.split("?")[1]
query_string = location.split("?")[1]
resp = provider.authorization_endpoint(QUERY_STRING)
resp = provider.authorization_endpoint(query_string)
assert resp.status == "302 Found"
print resp.headers
print resp.message
@ -155,7 +156,7 @@ def test_provider_authenticated():
def test_provider_authenticated_token():
provider = Provider("pyoicserv", sdb.SessionDB(), CDB, AUTHN_BROKER, AUTHZ,
verify_client)
verify_client, symkey=rndstr(16))
_session_db = {}
cons = Consumer(_session_db, client_config=CLIENT_CONFIG,
server_info=SERVER_INFO, **CONSUMER_CONFIG)
@ -201,7 +202,7 @@ def test_provider_authenticated_token():
def test_token_endpoint():
provider = Provider("pyoicserv", sdb.SessionDB(), CDB, AUTHN_BROKER, AUTHZ,
verify_client)
verify_client, symkey=rndstr(16))
authreq = AuthorizationRequest(state="state",
redirect_uri="http://example.com/authz",
@ -237,7 +238,7 @@ def test_token_endpoint():
def test_token_endpoint_unauth():
provider = Provider("pyoicserv", sdb.SessionDB(), CDB, AUTHN_BROKER, AUTHZ,
verify_client)
verify_client, symkey=rndstr(16))
authreq = AuthorizationRequest(state="state",
redirect_uri="http://example.com/authz",
@ -269,4 +270,4 @@ def test_token_endpoint_unauth():
assert _eq(atr.keys(), ['error_description', 'error'])
if __name__ == "__main__":
test_provider_authenticated_none()
test_provider_authenticated()

View File

@ -1,12 +1,11 @@
from mako.lookup import TemplateLookup
from oic.oauth2 import rndstr
from oic.utils.authn.authn_context import AuthnBroker
from oic.utils.authn.client import verify_client
from oic.utils.authn.user import UserAuthnMethod
from oic.utils.authz import AuthzHandling
from oic.utils.userinfo import UserInfo
__author__ = 'rohe0002'
from oic.oauth2.exception import RedirectURIError
from oic.utils.keyio import KeyBundle
@ -34,6 +33,8 @@ from oic.oic.provider import Provider
from oic.utils.time_util import epoch_in_a_while
__author__ = 'rohe0002'
CLIENT_CONFIG = {
"client_id": "number5",
"ca_certs": "/usr/local/etc/oic/ca_certs.txt",
@ -139,11 +140,11 @@ class DummyAuthn(UserAuthnMethod):
#AUTHN = UsernamePasswordMako(None, "login.mako", tl, PASSWD, "authenticated")
AUTHN_BROKER = AuthnBroker()
AUTHN_BROKER.add(UNDEFINED, DummyAuthn(None, "username"))
AUTHN_BROKER.add("UNDEFINED", DummyAuthn(None, "username"))
# dealing with authorization
AUTHZ = AuthzHandling()
SYMKEY = "symmetric key used to encrypt cookie info"
SYMKEY = rndstr(16) # symmetric key used to encrypt cookie info
USERINFO = UserInfo(USERDB)
provider_init = Provider("pyoicserv", SessionDB(), CDB, AUTHN_BROKER, USERINFO,