Compare commits

...

No commits in common. "main" and "eo/buster" have entirely different histories.

16 changed files with 634 additions and 234 deletions

View File

@ -21,13 +21,15 @@ testlong: export JWCRYPTO_TESTS_ENABLE_MMA=True
testlong: export TOX_TESTENV_PASSENV=JWCRYPTO_TESTS_ENABLE_MMA testlong: export TOX_TESTENV_PASSENV=JWCRYPTO_TESTS_ENABLE_MMA
testlong: testlong:
rm -f .coverage rm -f .coverage
tox -e py35 tox -e py36
test: test:
rm -f .coverage rm -f .coverage
tox -e py27 tox -e py27
tox -e py34 --skip-missing-interpreter tox -e py34 --skip-missing-interpreter
tox -e py35 --skip-missing-interpreter tox -e py35 --skip-missing-interpreter
tox -e py36 --skip-missing-interpreter
tox -e py37 --skip-missing-interpreter
DOCS_DIR = docs DOCS_DIR = docs
.PHONY: docs .PHONY: docs

View File

@ -1,3 +1,5 @@
[![Build Status](https://travis-ci.org/latchset/jwcrypto.svg?branch=master)](https://travis-ci.org/latchset/jwcrypto)
JWCrypto JWCrypto
======== ========

25
debian/changelog vendored
View File

@ -1,9 +1,26 @@
python-jwcrypto (0.4.2-1.1.stretch) stretch-eobuilder; urgency=medium python-jwcrypto (0.6.0-2.1) buster; urgency=medium
* Non-maintainer upload. (backport for stretch) * keep .egg-info for authentic
* debian/rules: Keep .egg-info as it's required by authentic. * add target python2
* release buster backport
-- Frederic Peters <fpeters@debian.org> Fri, 07 Sep 2018 09:22:54 +0200 -- Christophe Siraut <csiraut@entrouvert.com> Thu, 14 May 2020 07:16:48 +0200
python-jwcrypto (0.6.0-2) unstable; urgency=medium
* compat, control: Bump compat to 12.
* Switch to python3 only. (Closes: #937862)
* control: Bump policy to 4.4.0.
-- Timo Aaltonen <tjaalton@debian.org> Thu, 12 Sep 2019 00:04:33 +0300
python-jwcrypto (0.6.0-1) unstable; urgency=medium
* New upstream release. (Closes: #925457)
* control: Update vcs urls.
* control: Drop X-Python-Version*.
-- Timo Aaltonen <tjaalton@debian.org> Tue, 02 Apr 2019 09:05:15 +0300
python-jwcrypto (0.4.2-1) unstable; urgency=medium python-jwcrypto (0.4.2-1) unstable; urgency=medium

2
debian/compat vendored
View File

@ -1 +1 @@
10 12

44
debian/control vendored
View File

@ -4,38 +4,36 @@ Uploaders: Timo Aaltonen <tjaalton@debian.org>
Section: python Section: python
Priority: optional Priority: optional
Build-Depends: Build-Depends:
debhelper (>= 10), debhelper (>= 12),
dh-python, dh-python,
python-all (>= 2.6.6-3), python-all,
python-cryptography (>= 1.7),
python-nose,
python-setuptools (>= 0.6b3),
python3-all, python3-all,
python3-cryptography (>= 1.7), python-cryptography,
python3-cryptography,
python-nose,
python3-nose, python3-nose,
python-setuptools,
python3-setuptools, python3-setuptools,
X-Python-Version: >= 2.7 Standards-Version: 4.4.0
X-Python3-Version: >= 3.3
Standards-Version: 4.1.2
Homepage: https://github.com/latchset/jwcrypto Homepage: https://github.com/latchset/jwcrypto
Vcs-Git: https://anonscm.debian.org/git/pkg-freeipa/python-jwcrypto.git Vcs-Git: https://salsa.debian.org/freeipa-team/python-jwcrypto.git
Vcs-Browser: https://anonscm.debian.org/cgit/pkg-freeipa/python-jwcrypto.git Vcs-Browser: https://salsa.debian.org/freeipa-team/python-jwcrypto
Package: python-jwcrypto Package: python-jwcrypto
Architecture: all Architecture: all
Depends: ${misc:Depends}, ${python:Depends},
python-cryptography (>= 1.7),
Description: Python implementation of JOSE Web standards
This python module implements JWK,JWS,JWE specifications using
python-cryptography.
.
This package includes the python2 version.
Package: python3-jwcrypto
Architecture: all
Depends: ${misc:Depends}, ${python3:Depends}, Depends: ${misc:Depends}, ${python3:Depends},
python3-cryptography (>= 1.7), python-cryptography,
Description: Python3 mplementation of JOSE Web standards Description: Python3 implementation of JOSE Web standards
This python module implements JWK,JWS,JWE specifications using
python-cryptography.
.
This package includes the python3 version.
Package: python3-jwcrypto
Architecture: all
Depends: ${misc:Depends}, ${python3:Depends},
python3-cryptography,
Description: Python3 implementation of JOSE Web standards
This python module implements JWK,JWS,JWE specifications using This python module implements JWK,JWS,JWE specifications using
python-cryptography. python-cryptography.
. .

View File

@ -46,16 +46,16 @@ master_doc = 'index'
# General information about the project. # General information about the project.
project = u'JWCrypto' project = u'JWCrypto'
copyright = u'2016-2017, JWCrypto Contributors' copyright = u'2016-2018, JWCrypto Contributors'
# The version info for the project you're documenting, acts as replacement for # The version info for the project you're documenting, acts as replacement for
# |version| and |release|, also used in various other places throughout the # |version| and |release|, also used in various other places throughout the
# built documents. # built documents.
# #
# The short X.Y version. # The short X.Y version.
version = '0.4' version = '0.6'
# The full version, including alpha/beta/rc tags. # The full version, including alpha/beta/rc tags.
release = '0.4.2' release = '0.6'
# The language for content autogenerated by Sphinx. Refer to documentation # The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages. # for a list of supported languages.

View File

@ -51,6 +51,9 @@ Registries
Examples Examples
-------- --------
Symmetric keys
~~~~~~~~~~~~~~
Encrypt a JWE token:: Encrypt a JWE token::
>>> from jwcrypto import jwk, jwe >>> from jwcrypto import jwk, jwe
>>> from jwcrypto.common import json_encode >>> from jwcrypto.common import json_encode
@ -67,3 +70,29 @@ Decrypt a JWE token::
>>> jwetoken.deserialize(enc) >>> jwetoken.deserialize(enc)
>>> jwetoken.decrypt(key) >>> jwetoken.decrypt(key)
>>> payload = jwetoken.payload >>> payload = jwetoken.payload
Asymmetric keys
~~~~~~~~~~~~~~~
Encrypt a JWE token::
>>> from jwcrypto import jwk, jwe
>>> from jwcrypto.common import json_encode, json_decode
>>> public_key = jwk.JWK()
>>> private_key = jwk.JWK.generate(kty='RSA', size=2048)
>>> public_key.import_key(**json_decode(private_key.export_public()))
>>> payload = "My Encrypted message"
>>> protected_header = {
"alg": "RSA-OAEP-256",
"enc": "A256CBC-HS512",
"typ": "JWE",
"kid": public_key.thumbprint(),
}
>>> jwetoken = jwe.JWE(payload.encode('utf-8'),
recipient=public_key,
protected=protected_header)
>>> enc = jwetoken.serialize()
Decrypt a JWE token::
>>> jwetoken = jwe.JWE()
>>> jwetoken.deserialize(enc, key=private_key)
>>> payload = jwetoken.payload

View File

@ -16,12 +16,12 @@ def base64url_encode(payload):
def base64url_decode(payload): def base64url_decode(payload):
l = len(payload) % 4 size = len(payload) % 4
if l == 2: if size == 2:
payload += '==' payload += '=='
elif l == 3: elif size == 3:
payload += '=' payload += '='
elif l != 0: elif size != 0:
raise ValueError('Invalid base64 string') raise ValueError('Invalid base64 string')
return urlsafe_b64decode(payload.encode('utf-8')) return urlsafe_b64decode(payload.encode('utf-8'))

View File

@ -14,6 +14,7 @@ from cryptography.hazmat.primitives.asymmetric import utils as ec_utils
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives.kdf.concatkdf import ConcatKDFHash from cryptography.hazmat.primitives.kdf.concatkdf import ConcatKDFHash
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives.keywrap import aes_key_unwrap, aes_key_wrap
from cryptography.hazmat.primitives.padding import PKCS7 from cryptography.hazmat.primitives.padding import PKCS7
import six import six
@ -141,15 +142,15 @@ class _RawEC(_RawJWS):
def sign(self, key, payload): def sign(self, key, payload):
skey = key.get_op_key('sign', self._curve) skey = key.get_op_key('sign', self._curve)
signature = skey.sign(payload, ec.ECDSA(self.hashfn)) signature = skey.sign(payload, ec.ECDSA(self.hashfn))
r, s = ec_utils.decode_rfc6979_signature(signature) r, s = ec_utils.decode_dss_signature(signature)
l = key.get_curve(self._curve).key_size size = key.get_curve(self._curve).key_size
return _encode_int(r, l) + _encode_int(s, l) return _encode_int(r, size) + _encode_int(s, size)
def verify(self, key, payload, signature): def verify(self, key, payload, signature):
pkey = key.get_op_key('verify', self._curve) pkey = key.get_op_key('verify', self._curve)
r = signature[:len(signature) // 2] r = signature[:len(signature) // 2]
s = signature[len(signature) // 2:] s = signature[len(signature) // 2:]
enc_signature = ec_utils.encode_rfc6979_signature( enc_signature = ec_utils.encode_dss_signature(
int(hexlify(r), 16), int(hexlify(s), 16)) int(hexlify(r), 16), int(hexlify(s), 16))
pkey.verify(enc_signature, payload, ec.ECDSA(self.hashfn)) pkey.verify(enc_signature, payload, ec.ECDSA(self.hashfn))
@ -439,49 +440,14 @@ class _AesKw(_RawKeyMgmt):
if not cek: if not cek:
cek = _randombits(bitsize) cek = _randombits(bitsize)
# Implement RFC 3394 Key Unwrap - 2.2.2 ek = aes_key_wrap(rk, cek, default_backend())
# TODO: Use cryptography once issue #1733 is resolved
iv = 'a6a6a6a6a6a6a6a6'
a = unhexlify(iv)
r = [cek[i:i + 8] for i in range(0, len(cek), 8)]
n = len(r)
for j in range(0, 6):
for i in range(0, n):
e = Cipher(algorithms.AES(rk), modes.ECB(),
backend=self.backend).encryptor()
b = e.update(a + r[i]) + e.finalize()
a = _encode_int(_decode_int(b[:8]) ^ ((n * j) + i + 1), 64)
r[i] = b[-8:]
ek = a
for i in range(0, n):
ek += r[i]
return {'cek': cek, 'ek': ek} return {'cek': cek, 'ek': ek}
def unwrap(self, key, bitsize, ek, headers): def unwrap(self, key, bitsize, ek, headers):
rk = self._get_key(key, 'decrypt') rk = self._get_key(key, 'decrypt')
# Implement RFC 3394 Key Unwrap - 2.2.3 cek = aes_key_unwrap(rk, ek, default_backend())
# TODO: Use cryptography once issue #1733 is resolved
iv = 'a6a6a6a6a6a6a6a6'
aiv = unhexlify(iv)
r = [ek[i:i + 8] for i in range(0, len(ek), 8)]
a = r.pop(0)
n = len(r)
for j in range(5, -1, -1):
for i in range(n - 1, -1, -1):
da = _decode_int(a)
atr = _encode_int((da ^ ((n * j) + i + 1)), 64) + r[i]
d = Cipher(algorithms.AES(rk), modes.ECB(),
backend=self.backend).decryptor()
b = d.update(atr) + d.finalize()
a = b[:8]
r[i] = b[-8:]
if a != aiv:
raise RuntimeError('Decryption Failed')
cek = b''.join(r)
if _bitsize(cek) != bitsize: if _bitsize(cek) != bitsize:
raise InvalidJWEKeyLength(bitsize, _bitsize(cek)) raise InvalidJWEKeyLength(bitsize, _bitsize(cek))
return cek return cek
@ -761,23 +727,24 @@ class _EcdhEs(_RawKeyMgmt, JWAAlgorithm):
def wrap(self, key, bitsize, cek, headers): def wrap(self, key, bitsize, cek, headers):
self._check_key(key) self._check_key(key)
dk_size = self.keysize
if self.keysize is None: if self.keysize is None:
if cek is not None: if cek is not None:
raise InvalidJWEOperation('ECDH-ES cannot use an existing CEK') raise InvalidJWEOperation('ECDH-ES cannot use an existing CEK')
alg = headers['enc'] alg = headers['enc']
dk_size = bitsize
else: else:
bitsize = self.keysize
alg = headers['alg'] alg = headers['alg']
epk = JWK.generate(kty=key.key_type, crv=key.key_curve) epk = JWK.generate(kty=key.key_type, crv=key.key_curve)
dk = self._derive(epk.get_op_key('unwrapKey'), dk = self._derive(epk.get_op_key('unwrapKey'),
key.get_op_key('wrapKey'), key.get_op_key('wrapKey'),
alg, bitsize, headers) alg, dk_size, headers)
if self.keysize is None: if self.keysize is None:
ret = {'cek': dk} ret = {'cek': dk}
else: else:
aeskw = self.aeskwmap[bitsize]() aeskw = self.aeskwmap[self.keysize]()
kek = JWK(kty="oct", use="enc", k=base64url_encode(dk)) kek = JWK(kty="oct", use="enc", k=base64url_encode(dk))
ret = aeskw.wrap(kek, bitsize, cek, headers) ret = aeskw.wrap(kek, bitsize, cek, headers)
@ -788,20 +755,21 @@ class _EcdhEs(_RawKeyMgmt, JWAAlgorithm):
if 'epk' not in headers: if 'epk' not in headers:
raise ValueError('Invalid Header, missing "epk" parameter') raise ValueError('Invalid Header, missing "epk" parameter')
self._check_key(key) self._check_key(key)
dk_size = self.keysize
if self.keysize is None: if self.keysize is None:
alg = headers['enc'] alg = headers['enc']
dk_size = bitsize
else: else:
bitsize = self.keysize
alg = headers['alg'] alg = headers['alg']
epk = JWK(**headers['epk']) epk = JWK(**headers['epk'])
dk = self._derive(key.get_op_key('unwrapKey'), dk = self._derive(key.get_op_key('unwrapKey'),
epk.get_op_key('wrapKey'), epk.get_op_key('wrapKey'),
alg, bitsize, headers) alg, dk_size, headers)
if self.keysize is None: if self.keysize is None:
return dk return dk
else: else:
aeskw = self.aeskwmap[bitsize]() aeskw = self.aeskwmap[self.keysize]()
kek = JWK(kty="oct", use="enc", k=base64url_encode(dk)) kek = JWK(kty="oct", use="enc", k=base64url_encode(dk))
cek = aeskw.unwrap(kek, bitsize, ek, headers) cek = aeskw.unwrap(kek, bitsize, ek, headers)
return cek return cek
@ -828,7 +796,7 @@ class _EcdhEsAes192Kw(_EcdhEs):
class _EcdhEsAes256Kw(_EcdhEs): class _EcdhEsAes256Kw(_EcdhEs):
name = 'ECDH-ES+A256KW' name = 'ECDH-ES+A256KW'
description = 'ECDH-ES using Concat KDF and "A128KW" wrapping' description = 'ECDH-ES using Concat KDF and "A256KW" wrapping'
keysize = 256 keysize = 256
algorithm_usage_location = 'alg' algorithm_usage_location = 'alg'
algorithm_use = 'kex' algorithm_use = 'kex'

View File

@ -3,6 +3,7 @@
import zlib import zlib
from jwcrypto import common from jwcrypto import common
from jwcrypto.common import JWException
from jwcrypto.common import base64url_decode, base64url_encode from jwcrypto.common import base64url_decode, base64url_encode
from jwcrypto.common import json_decode, json_encode from jwcrypto.common import json_decode, json_encode
from jwcrypto.jwa import JWA from jwcrypto.jwa import JWA
@ -40,7 +41,7 @@ default_allowed_algs = [
"""Default allowed algorithms""" """Default allowed algorithms"""
class InvalidJWEData(Exception): class InvalidJWEData(JWException):
"""Invalid JWE Object. """Invalid JWE Object.
This exception is raised when the JWE Object is invalid and/or This exception is raised when the JWE Object is invalid and/or
@ -58,7 +59,7 @@ class InvalidJWEData(Exception):
super(InvalidJWEData, self).__init__(msg) super(InvalidJWEData, self).__init__(msg)
# These have been moved to jwcrypto.common, maintain here for bacwards compat # These have been moved to jwcrypto.common, maintain here for backwards compat
InvalidCEKeyLength = common.InvalidCEKeyLength InvalidCEKeyLength = common.InvalidCEKeyLength
InvalidJWEKeyLength = common.InvalidJWEKeyLength InvalidJWEKeyLength = common.InvalidJWEKeyLength
InvalidJWEKeyType = common.InvalidJWEKeyType InvalidJWEKeyType = common.InvalidJWEKeyType
@ -108,7 +109,7 @@ class JWE(object):
json_decode(unprotected) # check header encoding json_decode(unprotected) # check header encoding
self.objects['unprotected'] = unprotected self.objects['unprotected'] = unprotected
if algs: if algs:
self.allowed_algs = algs self._allowed_algs = algs
if recipient: if recipient:
self.add_recipient(recipient, header=header) self.add_recipient(recipient, header=header)
@ -269,7 +270,19 @@ class JWE(object):
if compact: if compact:
for invalid in 'aad', 'unprotected': for invalid in 'aad', 'unprotected':
if invalid in self.objects: if invalid in self.objects:
raise InvalidJWEOperation("Can't use compact encoding") raise InvalidJWEOperation(
"Can't use compact encoding when the '%s' parameter"
"is set" % invalid)
if 'protected' not in self.objects:
raise InvalidJWEOperation(
"Can't use compat encoding without protected headers")
else:
ph = json_decode(self.objects['protected'])
for required in 'alg', 'enc':
if required not in ph:
raise InvalidJWEOperation(
"Can't use compat encoding, '%s' must be in the "
"protected header" % required)
if 'recipients' in self.objects: if 'recipients' in self.objects:
if len(self.objects['recipients']) != 1: if len(self.objects['recipients']) != 1:
raise InvalidJWEOperation("Invalid number of recipients") raise InvalidJWEOperation("Invalid number of recipients")

View File

@ -1,8 +1,9 @@
# Copyright (C) 2015 JWCrypto Project Contributors - see LICENSE file # Copyright (C) 2015 JWCrypto Project Contributors - see LICENSE file
import os import os
from binascii import hexlify, unhexlify from binascii import hexlify, unhexlify
from collections import namedtuple
from enum import Enum
from cryptography import x509 from cryptography import x509
from cryptography.hazmat.backends import default_backend from cryptography.hazmat.backends import default_backend
@ -12,6 +13,7 @@ from cryptography.hazmat.primitives.asymmetric import rsa
from six import iteritems from six import iteritems
from jwcrypto.common import JWException
from jwcrypto.common import base64url_decode, base64url_encode from jwcrypto.common import base64url_decode, base64url_encode
from jwcrypto.common import json_decode, json_encode from jwcrypto.common import json_decode, json_encode
@ -22,36 +24,59 @@ JWKTypesRegistry = {'EC': 'Elliptic Curve',
'oct': 'Octet sequence'} 'oct': 'Octet sequence'}
"""Registry of valid Key Types""" """Registry of valid Key Types"""
# RFC 7518 - 7.5 # RFC 7518 - 7.5
# It is part of the JWK Parameters Registry, but we want a more # It is part of the JWK Parameters Registry, but we want a more
# specific map for internal usage # specific map for internal usage
JWKValuesRegistry = {'EC': {'crv': ('Curve', 'Public', 'Required'), class ParmType(Enum):
'x': ('X Coordinate', 'Public', 'Required'), name = 'A string with a name'
'y': ('Y Coordinate', 'Public', 'Required'), b64 = 'Base64url Encoded'
'd': ('ECC Private Key', 'Private', None)}, b64U = 'Base64urlUint Encoded'
'RSA': {'n': ('Modulus', 'Public', 'Required'), unsupported = 'Unsupported Parameter'
'e': ('Exponent', 'Public', 'Required'),
'd': ('Private Exponent', 'Private', None),
'p': ('First Prime Factor', 'Private', None), JWKParameter = namedtuple('Parameter', 'description public required type')
'q': ('Second Prime Factor', 'Private', None), JWKValuesRegistry = {
'dp': ('First Factor CRT Exponent', 'Private', 'EC': {
None), 'crv': JWKParameter('Curve', True, True, ParmType.name),
'dq': ('Second Factor CRT Exponent', 'Private', 'x': JWKParameter('X Coordinate', True, True, ParmType.b64),
None), 'y': JWKParameter('Y Coordinate', True, True, ParmType.b64),
'qi': ('First CRT Coefficient', 'Private', None)}, 'd': JWKParameter('ECC Private Key', False, False, ParmType.b64),
'oct': {'k': ('Key Value', 'Private', 'Required')}} },
'RSA': {
'n': JWKParameter('Modulus', True, True, ParmType.b64),
'e': JWKParameter('Exponent', True, True, ParmType.b64U),
'd': JWKParameter('Private Exponent', False, False, ParmType.b64U),
'p': JWKParameter('First Prime Factor', False, False, ParmType.b64U),
'q': JWKParameter('Second Prime Factor', False, False, ParmType.b64U),
'dp': JWKParameter('First Factor CRT Exponent',
False, False, ParmType.b64U),
'dq': JWKParameter('Second Factor CRT Exponent',
False, False, ParmType.b64U),
'qi': JWKParameter('First CRT Coefficient',
False, False, ParmType.b64U),
'oth': JWKParameter('Other Primes Info',
False, False, ParmType.unsupported),
},
'oct': {
'k': JWKParameter('Key Value', False, True, ParmType.b64),
}
}
"""Registry of valid key values""" """Registry of valid key values"""
JWKParamsRegistry = {'kty': ('Key Type', 'Public', ), JWKParamsRegistry = {
'use': ('Public Key Use', 'Public'), 'kty': JWKParameter('Key Type', True, None, None),
'key_ops': ('Key Operations', 'Public'), 'use': JWKParameter('Public Key Use', True, None, None),
'alg': ('Algorithm', 'Public'), 'key_ops': JWKParameter('Key Operations', True, None, None),
'kid': ('Key ID', 'Public'), 'alg': JWKParameter('Algorithm', True, None, None),
'x5u': ('X.509 URL', 'Public'), 'kid': JWKParameter('Key ID', True, None, None),
'x5c': ('X.509 Certificate Chain', 'Public'), 'x5u': JWKParameter('X.509 URL', True, None, None),
'x5t': ('X.509 Certificate SHA-1 Thumbprint', 'Public'), 'x5c': JWKParameter('X.509 Certificate Chain', True, None, None),
'x5t#S256': ('X.509 Certificate SHA-256 Thumbprint', 'x5t': JWKParameter('X.509 Certificate SHA-1 Thumbprint',
'Public')} True, None, None),
'x5t#S256': JWKParameter('X.509 Certificate SHA-256 Thumbprint',
True, None, None)
}
"""Regstry of valid key parameters""" """Regstry of valid key parameters"""
# RFC 7518 - 7.6 # RFC 7518 - 7.6
@ -83,7 +108,7 @@ JWKpycaCurveMap = {'secp256r1': 'P-256',
'secp521r1': 'P-521'} 'secp521r1': 'P-521'}
class InvalidJWKType(Exception): class InvalidJWKType(JWException):
"""Invalid JWK Type Exception. """Invalid JWK Type Exception.
This exception is raised when an invalid parameter type is used. This exception is raised when an invalid parameter type is used.
@ -98,7 +123,7 @@ class InvalidJWKType(Exception):
self.value, list(JWKTypesRegistry.keys())) self.value, list(JWKTypesRegistry.keys()))
class InvalidJWKUsage(Exception): class InvalidJWKUsage(JWException):
"""Invalid JWK usage Exception. """Invalid JWK usage Exception.
This exception is raised when an invalid key usage is requested, This exception is raised when an invalid key usage is requested,
@ -123,7 +148,7 @@ class InvalidJWKUsage(Exception):
valid) valid)
class InvalidJWKOperation(Exception): class InvalidJWKOperation(JWException):
"""Invalid JWK Operation Exception. """Invalid JWK Operation Exception.
This exception is raised when an invalid key operation is requested, This exception is raised when an invalid key operation is requested,
@ -150,7 +175,7 @@ class InvalidJWKOperation(Exception):
valid) valid)
class InvalidJWKValue(Exception): class InvalidJWKValue(JWException):
"""Invalid JWK Value Exception. """Invalid JWK Value Exception.
This exception is raised when an invalid/unknown value is used in the This exception is raised when an invalid/unknown value is used in the
@ -210,6 +235,7 @@ class JWK(object):
@classmethod @classmethod
def generate(cls, **kwargs): def generate(cls, **kwargs):
obj = cls() obj = cls()
kty = None
try: try:
kty = kwargs['kty'] kty = kwargs['kty']
gen = getattr(obj, '_generate_%s' % kty) gen = getattr(obj, '_generate_%s' % kty)
@ -219,6 +245,7 @@ class JWK(object):
return obj return obj
def generate_key(self, **params): def generate_key(self, **params):
kty = None
try: try:
kty = params.pop('generate') kty = params.pop('generate')
gen = getattr(self, '_generate_%s' % kty) gen = getattr(self, '_generate_%s' % kty)
@ -346,8 +373,26 @@ class JWK(object):
names.remove(name) names.remove(name)
for name, val in iteritems(JWKValuesRegistry[kty]): for name, val in iteritems(JWKValuesRegistry[kty]):
if val[2] == 'Required' and name not in self._key: if val.required and name not in self._key:
raise InvalidJWKValue('Missing required value %s' % name) raise InvalidJWKValue('Missing required value %s' % name)
if val.type == ParmType.unsupported and name in self._key:
raise InvalidJWKValue('Unsupported parameter %s' % name)
if val.type == ParmType.b64 and name in self._key:
# Check that the value is base64url encoded
try:
base64url_decode(self._key[name])
except Exception: # pylint: disable=broad-except
raise InvalidJWKValue(
'"%s" is not base64url encoded' % name
)
if val[3] == ParmType.b64U and name in self._key:
# Check that the value is Base64urlUInt encoded
try:
self._decode_int(self._key[name])
except Exception: # pylint: disable=broad-except
raise InvalidJWKValue(
'"%s" is not Base64urlUInt encoded' % name
)
# Unknown key parameters are allowed # Unknown key parameters are allowed
# Let's just store them out of the way # Let's just store them out of the way
@ -385,6 +430,20 @@ class JWK(object):
' "key_ops" values specified at' ' "key_ops" values specified at'
' the same time') ' the same time')
@classmethod
def from_json(cls, key):
"""Creates a RFC 7517 JWK from the standard JSON format.
:param key: The RFC 7517 representation of a JWK.
"""
obj = cls()
try:
jkey = json_decode(key)
except Exception as e: # pylint: disable=broad-except
raise InvalidJWKValue(e)
obj.import_key(**jkey)
return obj
def export(self, private_key=True): def export(self, private_key=True):
"""Exports the key in the standard JSON format. """Exports the key in the standard JSON format.
Exports the key regardless of type, if private_key is False Exports the key regardless of type, if private_key is False
@ -405,19 +464,23 @@ class JWK(object):
It fails if one is not available like when this function It fails if one is not available like when this function
is called on a symmetric key. is called on a symmetric key.
""" """
pub = self._public_params()
return json_encode(pub)
def _public_params(self):
if not self.has_public: if not self.has_public:
raise InvalidJWKType("No public key available") raise InvalidJWKType("No public key available")
pub = {} pub = {}
preg = JWKParamsRegistry preg = JWKParamsRegistry
for name in preg: for name in preg:
if preg[name][1] == 'Public': if preg[name].public:
if name in self._params: if name in self._params:
pub[name] = self._params[name] pub[name] = self._params[name]
reg = JWKValuesRegistry[self._params['kty']] reg = JWKValuesRegistry[self._params['kty']]
for param in reg: for param in reg:
if reg[param][1] == 'Public': if reg[param].public:
pub[param] = self._key[param] pub[param] = self._key[param]
return json_encode(pub) return pub
def _export_all(self): def _export_all(self):
d = dict() d = dict()
@ -439,6 +502,10 @@ class JWK(object):
return self._export_all() return self._export_all()
raise InvalidJWKType("Not a symmetric key") raise InvalidJWKType("Not a symmetric key")
def public(self):
pub = self._public_params()
return JWK(**pub)
@property @property
def has_public(self): def has_public(self):
"""Whether this JWK has an asymmetric Public key.""" """Whether this JWK has an asymmetric Public key."""
@ -446,7 +513,7 @@ class JWK(object):
return False return False
reg = JWKValuesRegistry[self._params['kty']] reg = JWKValuesRegistry[self._params['kty']]
for value in reg: for value in reg:
if reg[value][1] == 'Public' and value in self._key: if reg[value].public and value in self._key:
return True return True
@property @property
@ -456,7 +523,7 @@ class JWK(object):
return False return False
reg = JWKValuesRegistry[self._params['kty']] reg = JWKValuesRegistry[self._params['kty']]
for value in reg: for value in reg:
if reg[value][1] == 'Private' and value in self._key: if not reg[value].public and value in self._key:
return True return True
return False return False
@ -700,7 +767,7 @@ class JWK(object):
t = {'kty': self._params['kty']} t = {'kty': self._params['kty']}
for name, val in iteritems(JWKValuesRegistry[t['kty']]): for name, val in iteritems(JWKValuesRegistry[t['kty']]):
if val[2] == 'Required': if val.required:
t[name] = self._key[name] t[name] = self._key[name]
digest = hashes.Hash(hashalg, backend=default_backend()) digest = hashes.Hash(hashalg, backend=default_backend())
digest.update(bytes(json_encode(t).encode('utf8'))) digest.update(bytes(json_encode(t).encode('utf8')))
@ -733,6 +800,12 @@ class JWKSet(dict):
super(JWKSet, self).__setitem__('keys', _JWKkeys()) super(JWKSet, self).__setitem__('keys', _JWKkeys())
self.update(*args, **kwargs) self.update(*args, **kwargs)
def __iter__(self):
return self['keys'].__iter__()
def __contains__(self, key):
return self['keys'].__contains__(key)
def __setitem__(self, key, val): def __setitem__(self, key, val):
if key == 'keys': if key == 'keys':
self['keys'].add(val) self['keys'].add(val)
@ -769,7 +842,7 @@ class JWKSet(dict):
""" """
try: try:
jwkset = json_decode(keyset) jwkset = json_decode(keyset)
except: except Exception: # pylint: disable=broad-except
raise InvalidJWKValue() raise InvalidJWKValue()
if 'keys' not in jwkset: if 'keys' not in jwkset:
@ -782,8 +855,6 @@ class JWKSet(dict):
else: else:
self[k] = v self[k] = v
return self
@classmethod @classmethod
def from_json(cls, keyset): def from_json(cls, keyset):
"""Creates a RFC 7517 keyset from the standard JSON format. """Creates a RFC 7517 keyset from the standard JSON format.
@ -791,7 +862,8 @@ class JWKSet(dict):
:param keyset: The RFC 7517 representation of a JOSE Keyset. :param keyset: The RFC 7517 representation of a JOSE Keyset.
""" """
obj = cls() obj = cls()
return obj.import_keyset(keyset) obj.import_keyset(keyset)
return obj
def get_key(self, kid): def get_key(self, kid):
"""Gets a key from the set. """Gets a key from the set.

View File

@ -1,5 +1,8 @@
# Copyright (C) 2015 JWCrypto Project Contributors - see LICENSE file # Copyright (C) 2015 JWCrypto Project Contributors - see LICENSE file
from collections import namedtuple
from jwcrypto.common import JWException
from jwcrypto.common import base64url_decode, base64url_encode from jwcrypto.common import base64url_decode, base64url_encode
from jwcrypto.common import json_decode, json_encode from jwcrypto.common import json_decode, json_encode
from jwcrypto.jwa import JWA from jwcrypto.jwa import JWA
@ -8,18 +11,24 @@ from jwcrypto.jwk import JWK
# RFC 7515 - 9.1 # RFC 7515 - 9.1
# name: (description, supported?) # name: (description, supported?)
JWSHeaderRegistry = {'alg': ('Algorithm', True), JWSHeaderParameter = namedtuple('Parameter',
'jku': ('JWK Set URL', False), 'description mustprotect supported')
'jwk': ('JSON Web Key', False), JWSHeaderRegistry = {
'kid': ('Key ID', True), 'alg': JWSHeaderParameter('Algorithm', False, True),
'x5u': ('X.509 URL', False), 'jku': JWSHeaderParameter('JWK Set URL', False, False),
'x5c': ('X.509 Certificate Chain', False), 'jwk': JWSHeaderParameter('JSON Web Key', False, False),
'x5t': ('X.509 Certificate SHA-1 Thumbprint', False), 'kid': JWSHeaderParameter('Key ID', False, True),
'x5t#S256': ('X.509 Certificate SHA-256 Thumbprint', 'x5u': JWSHeaderParameter('X.509 URL', False, False),
False), 'x5c': JWSHeaderParameter('X.509 Certificate Chain', False, False),
'typ': ('Type', True), 'x5t': JWSHeaderParameter(
'cty': ('Content Type', True), 'X.509 Certificate SHA-1 Thumbprint', False, False),
'crit': ('Critical', True)} 'x5t#S256': JWSHeaderParameter(
'X.509 Certificate SHA-256 Thumbprint', False, False),
'typ': JWSHeaderParameter('Type', False, True),
'cty': JWSHeaderParameter('Content Type', False, True),
'crit': JWSHeaderParameter('Critical', True, True),
'b64': JWSHeaderParameter('Base64url-Encode Payload', True, True)
}
"""Registry of valid header parameters""" """Registry of valid header parameters"""
default_allowed_algs = [ default_allowed_algs = [
@ -30,7 +39,7 @@ default_allowed_algs = [
"""Default allowed algorithms""" """Default allowed algorithms"""
class InvalidJWSSignature(Exception): class InvalidJWSSignature(JWException):
"""Invalid JWS Signature. """Invalid JWS Signature.
This exception is raised when a signature cannot be validated. This exception is raised when a signature cannot be validated.
@ -47,7 +56,7 @@ class InvalidJWSSignature(Exception):
super(InvalidJWSSignature, self).__init__(msg) super(InvalidJWSSignature, self).__init__(msg)
class InvalidJWSObject(Exception): class InvalidJWSObject(JWException):
"""Invalid JWS Object. """Invalid JWS Object.
This exception is raised when the JWS Object is invalid and/or This exception is raised when the JWS Object is invalid and/or
@ -63,7 +72,7 @@ class InvalidJWSObject(Exception):
super(InvalidJWSObject, self).__init__(msg) super(InvalidJWSObject, self).__init__(msg)
class InvalidJWSOperation(Exception): class InvalidJWSOperation(JWException):
"""Invalid JWS Object. """Invalid JWS Object.
This exception is raised when a requested operation cannot This exception is raised when a requested operation cannot
@ -113,11 +122,16 @@ class JWSCore(object):
if header is not None: if header is not None:
if isinstance(header, dict): if isinstance(header, dict):
self.header = header
header = json_encode(header) header = json_encode(header)
else:
self.header = json_decode(header)
self.protected = base64url_encode(header.encode('utf-8')) self.protected = base64url_encode(header.encode('utf-8'))
else: else:
self.header = dict()
self.protected = '' self.protected = ''
self.payload = base64url_encode(payload) self.payload = payload
def _jwa(self, name, allowed): def _jwa(self, name, allowed):
if allowed is None: if allowed is None:
@ -126,12 +140,22 @@ class JWSCore(object):
raise InvalidJWSOperation('Algorithm not allowed') raise InvalidJWSOperation('Algorithm not allowed')
return JWA.signing_alg(name) return JWA.signing_alg(name)
def _payload(self):
if self.header.get('b64', True):
return base64url_encode(self.payload).encode('utf-8')
else:
if isinstance(self.payload, bytes):
return self.payload
else:
return self.payload.encode('utf-8')
def sign(self): def sign(self):
"""Generates a signature""" """Generates a signature"""
sigin = ('.'.join([self.protected, self.payload])).encode('utf-8') payload = self._payload()
sigin = b'.'.join([self.protected.encode('utf-8'), payload])
signature = self.engine.sign(self.key, sigin) signature = self.engine.sign(self.key, sigin)
return {'protected': self.protected, return {'protected': self.protected,
'payload': self.payload, 'payload': payload,
'signature': base64url_encode(signature)} 'signature': base64url_encode(signature)}
def verify(self, signature): def verify(self, signature):
@ -140,7 +164,8 @@ class JWSCore(object):
:raises InvalidJWSSignature: if the verification fails. :raises InvalidJWSSignature: if the verification fails.
""" """
try: try:
sigin = ('.'.join([self.protected, self.payload])).encode('utf-8') payload = self._payload()
sigin = b'.'.join([self.protected.encode('utf-8'), payload])
self.engine.verify(self.key, sigin, signature) self.engine.verify(self.key, sigin, signature)
except Exception as e: # pylint: disable=broad-except except Exception as e: # pylint: disable=broad-except
raise InvalidJWSSignature('Verification failed', repr(e)) raise InvalidJWSSignature('Verification failed', repr(e))
@ -164,16 +189,6 @@ class JWS(object):
self.verifylog = None self.verifylog = None
self._allowed_algs = None self._allowed_algs = None
def _check_crit(self, crit):
for k in crit:
if k not in JWSHeaderRegistry:
raise InvalidJWSSignature('Unknown critical header: '
'"%s"' % k)
else:
if not JWSHeaderRegistry[k][1]:
raise InvalidJWSSignature('Unsupported critical '
'header: "%s"' % k)
@property @property
def allowed_algs(self): def allowed_algs(self):
"""Allowed algorithms. """Allowed algorithms.
@ -197,31 +212,61 @@ class JWS(object):
def is_valid(self): def is_valid(self):
return self.objects.get('valid', False) return self.objects.get('valid', False)
def _merge_headers(self, h1, h2): # TODO: allow caller to specify list of headers it understands
for k in list(h1.keys()): def _merge_check_headers(self, protected, *headers):
if k in h2: header = None
raise InvalidJWSObject('Duplicate header: "%s"' % k) crit = []
h1.update(h2) if protected is not None:
return h1 if 'crit' in protected:
crit = protected['crit']
# Check immediately if we support these critical headers
for k in crit:
if k not in JWSHeaderRegistry:
raise InvalidJWSObject(
'Unknown critical header: "%s"' % k)
else:
if not JWSHeaderRegistry[k][1]:
raise InvalidJWSObject(
'Unsupported critical header: "%s"' % k)
header = protected
if 'b64' in header:
if not isinstance(header['b64'], bool):
raise InvalidJWSObject('b64 header must be a boolean')
for hn in headers:
if hn is None:
continue
if header is None:
header = dict()
for h in list(hn.keys()):
if h in JWSHeaderRegistry:
if JWSHeaderRegistry[h].mustprotect:
raise InvalidJWSObject('"%s" must be protected' % h)
if h in header:
raise InvalidJWSObject('Duplicate header: "%s"' % h)
header.update(hn)
for k in crit:
if k not in header:
raise InvalidJWSObject('Missing critical header "%s"' % k)
return header
# TODO: support selecting key with 'kid' and passing in multiple keys # TODO: support selecting key with 'kid' and passing in multiple keys
def _verify(self, alg, key, payload, signature, protected, header=None): def _verify(self, alg, key, payload, signature, protected, header=None):
# verify it is a valid JSON object and keep a decode copy p = dict()
# verify it is a valid JSON object and decode
if protected is not None: if protected is not None:
p = json_decode(protected) p = json_decode(protected)
else: if not isinstance(p, dict):
p = dict() raise InvalidJWSSignature('Invalid Protected header')
if not isinstance(p, dict):
raise InvalidJWSSignature('Invalid Protected header')
# merge heders, and verify there are no duplicates # merge heders, and verify there are no duplicates
if header: if header:
if not isinstance(header, dict): if not isinstance(header, dict):
raise InvalidJWSSignature('Invalid Unprotected header') raise InvalidJWSSignature('Invalid Unprotected header')
p = self._merge_headers(p, header)
# verify critical headers # Merge and check (critical) headers
# TODO: allow caller to specify list of headers it understands self._merge_check_headers(p, header)
if 'crit' in p:
self._check_crit(p['crit'])
# check 'alg' is present # check 'alg' is present
if alg is None and 'alg' not in p: if alg is None and 'alg' not in p:
raise InvalidJWSSignature('No "alg" in headers') raise InvalidJWSSignature('No "alg" in headers')
@ -282,6 +327,33 @@ class JWS(object):
raise InvalidJWSSignature('Verification failed for all ' raise InvalidJWSSignature('Verification failed for all '
'signatures' + repr(self.verifylog)) 'signatures' + repr(self.verifylog))
def _deserialize_signature(self, s):
o = dict()
o['signature'] = base64url_decode(str(s['signature']))
if 'protected' in s:
p = base64url_decode(str(s['protected']))
o['protected'] = p.decode('utf-8')
if 'header' in s:
o['header'] = s['header']
return o
def _deserialize_b64(self, o, protected):
if protected is None:
b64n = None
else:
p = json_decode(protected)
b64n = p.get('b64')
if b64n is not None:
if not isinstance(b64n, bool):
raise InvalidJWSObject('b64 header must be boolean')
b64 = o.get('b64')
if b64 == b64n:
return
elif b64 is None:
o['b64'] = b64n
else:
raise InvalidJWSObject('conflicting b64 values')
def deserialize(self, raw_jws, key=None, alg=None): def deserialize(self, raw_jws, key=None, alg=None):
"""Deserialize a JWS token. """Deserialize a JWS token.
@ -304,25 +376,21 @@ class JWS(object):
try: try:
try: try:
djws = json_decode(raw_jws) djws = json_decode(raw_jws)
o['payload'] = base64url_decode(str(djws['payload']))
if 'signatures' in djws: if 'signatures' in djws:
o['signatures'] = list() o['signatures'] = list()
for s in djws['signatures']: for s in djws['signatures']:
os = dict() os = self._deserialize_signature(s)
os['signature'] = base64url_decode(str(s['signature']))
if 'protected' in s:
p = base64url_decode(str(s['protected']))
os['protected'] = p.decode('utf-8')
if 'header' in s:
os['header'] = s['header']
o['signatures'].append(os) o['signatures'].append(os)
self._deserialize_b64(o, os.get('protected'))
else: else:
o['signature'] = base64url_decode(str(djws['signature'])) o = self._deserialize_signature(djws)
if 'protected' in djws: self._deserialize_b64(o, o.get('protected'))
p = base64url_decode(str(djws['protected']))
o['protected'] = p.decode('utf-8') if 'payload' in djws:
if 'header' in djws: if o.get('b64', True):
o['header'] = djws['header'] o['payload'] = base64url_decode(str(djws['payload']))
else:
o['payload'] = djws['payload']
except ValueError: except ValueError:
c = raw_jws.split('.') c = raw_jws.split('.')
@ -331,6 +399,7 @@ class JWS(object):
p = base64url_decode(str(c[0])) p = base64url_decode(str(c[0]))
if len(p) > 0: if len(p) > 0:
o['protected'] = p.decode('utf-8') o['protected'] = p.decode('utf-8')
self._deserialize_b64(o, o['protected'])
o['payload'] = base64url_decode(str(c[1])) o['payload'] = base64url_decode(str(c[1]))
o['signature'] = base64url_decode(str(c[2])) o['signature'] = base64url_decode(str(c[2]))
@ -353,7 +422,8 @@ class JWS(object):
:param potected: The Protected Header (optional) :param potected: The Protected Header (optional)
:param header: The Unprotected Header (optional) :param header: The Unprotected Header (optional)
:raises InvalidJWSObject: if no payload has been set on the object. :raises InvalidJWSObject: if no payload has been set on the object,
or invalid headers are provided.
:raises ValueError: if the key is not a :class:`JWK` object. :raises ValueError: if the key is not a :class:`JWK` object.
:raises ValueError: if the algorithm is missing or is not provided :raises ValueError: if the algorithm is missing or is not provided
by one of the headers. by one of the headers.
@ -364,20 +434,36 @@ class JWS(object):
if not self.objects.get('payload', None): if not self.objects.get('payload', None):
raise InvalidJWSObject('Missing Payload') raise InvalidJWSObject('Missing Payload')
b64 = True
p = dict() p = dict()
if protected: if protected:
if isinstance(protected, dict): if isinstance(protected, dict):
protected = json_encode(protected) p = protected
p = json_decode(protected) protected = json_encode(p)
# TODO: allow caller to specify list of headers it understands else:
if 'crit' in p: p = json_decode(protected)
self._check_crit(p['crit'])
# If b64 is present we must enforce criticality
if 'b64' in list(p.keys()):
crit = p.get('crit', [])
if 'b64' not in crit:
raise InvalidJWSObject('b64 header must always be critical')
b64 = p['b64']
if 'b64' in self.objects:
if b64 != self.objects['b64']:
raise InvalidJWSObject('Mixed b64 headers on signatures')
h = None
if header: if header:
if isinstance(header, dict): if isinstance(header, dict):
h = header
header = json_encode(header) header = json_encode(header)
h = json_decode(header) else:
p = self._merge_headers(p, h) h = json_decode(header)
p = self._merge_check_headers(p, h)
if 'alg' in p: if 'alg' in p:
if alg is None: if alg is None:
@ -416,6 +502,7 @@ class JWS(object):
self.objects['signatures'].append(o) self.objects['signatures'].append(o)
else: else:
self.objects.update(o) self.objects.update(o)
self.objects['b64'] = b64
def serialize(self, compact=False): def serialize(self, compact=False):
"""Serializes the object into a JWS token. """Serializes the object into a JWS token.
@ -428,7 +515,6 @@ class JWS(object):
:raises InvalidJWSSignature: if no signature has been added :raises InvalidJWSSignature: if no signature has been added
to the object, or no valid signature can be found. to the object, or no valid signature can be found.
""" """
if compact: if compact:
if 'signatures' in self.objects: if 'signatures' in self.objects:
raise InvalidJWSOperation("Can't use compact encoding with " raise InvalidJWSOperation("Can't use compact encoding with "
@ -441,23 +527,40 @@ class JWS(object):
protected = base64url_encode(self.objects['protected']) protected = base64url_encode(self.objects['protected'])
else: else:
protected = '' protected = ''
return '.'.join([protected, if self.objects.get('payload', False):
base64url_encode(self.objects['payload']), if self.objects.get('b64', True):
payload = base64url_encode(self.objects['payload'])
else:
if isinstance(self.objects['payload'], bytes):
payload = self.objects['payload'].decode('utf-8')
else:
payload = self.objects['payload']
if '.' in payload:
raise InvalidJWSOperation(
"Can't use compact encoding with unencoded "
"payload that uses the . character")
else:
payload = ''
return '.'.join([protected, payload,
base64url_encode(self.objects['signature'])]) base64url_encode(self.objects['signature'])])
else: else:
obj = self.objects obj = self.objects
sig = dict()
if self.objects.get('payload', False):
if self.objects.get('b64', True):
sig['payload'] = base64url_encode(self.objects['payload'])
else:
sig['payload'] = self.objects['payload']
if 'signature' in obj: if 'signature' in obj:
if not obj.get('valid', False): if not obj.get('valid', False):
raise InvalidJWSSignature("No valid signature found") raise InvalidJWSSignature("No valid signature found")
sig = {'payload': base64url_encode(obj['payload']), sig['signature'] = base64url_encode(obj['signature'])
'signature': base64url_encode(obj['signature'])}
if 'protected' in obj: if 'protected' in obj:
sig['protected'] = base64url_encode(obj['protected']) sig['protected'] = base64url_encode(obj['protected'])
if 'header' in obj: if 'header' in obj:
sig['header'] = obj['header'] sig['header'] = obj['header']
elif 'signatures' in obj: elif 'signatures' in obj:
sig = {'payload': base64url_encode(obj['payload']), sig['signatures'] = list()
'signatures': list()}
for o in obj['signatures']: for o in obj['signatures']:
if not o.get('valid', False): if not o.get('valid', False):
continue continue
@ -481,24 +584,27 @@ class JWS(object):
raise InvalidJWSOperation("Payload not verified") raise InvalidJWSOperation("Payload not verified")
return self.objects['payload'] return self.objects['payload']
def detach_payload(self):
self.objects.pop('payload', None)
@property @property
def jose_header(self): def jose_header(self):
obj = self.objects obj = self.objects
if 'signature' in obj: if 'signature' in obj:
jh = dict()
if 'protected' in obj: if 'protected' in obj:
p = json_decode(obj['protected']) p = json_decode(obj['protected'])
jh = self._merge_headers(jh, p) else:
jh = self._merge_headers(jh, obj.get('header', dict())) p = None
return jh return self._merge_check_headers(p, obj.get('header', dict()))
elif 'signatures' in self.objects: elif 'signatures' in self.objects:
jhl = list() jhl = list()
for o in obj['signatures']: for o in obj['signatures']:
jh = dict() jh = dict()
if 'protected' in obj: if 'protected' in o:
p = json_decode(o['protected']) p = json_decode(o['protected'])
jh = self._merge_headers(jh, p) else:
jh = self._merge_headers(jh, o.get('header', dict())) p = None
jh = self._merge_check_headers(p, o.get('header', dict()))
jhl.append(jh) jhl.append(jh)
return jhl return jhl
else: else:

View File

@ -5,7 +5,7 @@ import uuid
from six import string_types from six import string_types
from jwcrypto.common import json_decode, json_encode from jwcrypto.common import JWException, json_decode, json_encode
from jwcrypto.jwe import JWE from jwcrypto.jwe import JWE
from jwcrypto.jwk import JWK, JWKSet from jwcrypto.jwk import JWK, JWKSet
from jwcrypto.jws import JWS from jwcrypto.jws import JWS
@ -22,7 +22,7 @@ JWTClaimsRegistry = {'iss': 'Issuer',
'jti': 'JWT ID'} 'jti': 'JWT ID'}
class JWTExpired(Exception): class JWTExpired(JWException):
"""Json Web Token is expired. """Json Web Token is expired.
This exception is raised when a token is expired accoring to its claims. This exception is raised when a token is expired accoring to its claims.
@ -39,7 +39,7 @@ class JWTExpired(Exception):
super(JWTExpired, self).__init__(msg) super(JWTExpired, self).__init__(msg)
class JWTNotYetValid(Exception): class JWTNotYetValid(JWException):
"""Json Web Token is not yet valid. """Json Web Token is not yet valid.
This exception is raised when a token is not valid yet according to its This exception is raised when a token is not valid yet according to its
@ -57,7 +57,7 @@ class JWTNotYetValid(Exception):
super(JWTNotYetValid, self).__init__(msg) super(JWTNotYetValid, self).__init__(msg)
class JWTMissingClaim(Exception): class JWTMissingClaim(JWException):
"""Json Web Token claim is invalid. """Json Web Token claim is invalid.
This exception is raised when a claim does not match the expected value. This exception is raised when a claim does not match the expected value.
@ -74,7 +74,7 @@ class JWTMissingClaim(Exception):
super(JWTMissingClaim, self).__init__(msg) super(JWTMissingClaim, self).__init__(msg)
class JWTInvalidClaimValue(Exception): class JWTInvalidClaimValue(JWException):
"""Json Web Token claim is invalid. """Json Web Token claim is invalid.
This exception is raised when a claim does not match the expected value. This exception is raised when a claim does not match the expected value.
@ -91,7 +91,7 @@ class JWTInvalidClaimValue(Exception):
super(JWTInvalidClaimValue, self).__init__(msg) super(JWTInvalidClaimValue, self).__init__(msg)
class JWTInvalidClaimFormat(Exception): class JWTInvalidClaimFormat(JWException):
"""Json Web Token claim format is invalid. """Json Web Token claim format is invalid.
This exception is raised when a claim is not in a valid format. This exception is raised when a claim is not in a valid format.
@ -108,7 +108,7 @@ class JWTInvalidClaimFormat(Exception):
super(JWTInvalidClaimFormat, self).__init__(msg) super(JWTInvalidClaimFormat, self).__init__(msg)
class JWTMissingKeyID(Exception): class JWTMissingKeyID(JWException):
"""Json Web Token is missing key id. """Json Web Token is missing key id.
This exception is raised when trying to decode a JWT with a key set This exception is raised when trying to decode a JWT with a key set
@ -126,7 +126,7 @@ class JWTMissingKeyID(Exception):
super(JWTMissingKeyID, self).__init__(msg) super(JWTMissingKeyID, self).__init__(msg)
class JWTMissingKey(Exception): class JWTMissingKey(JWException):
"""Json Web Token is using a key not in the key set. """Json Web Token is using a key not in the key set.
This exception is raised if the key that was used is not available This exception is raised if the key that was used is not available
@ -155,15 +155,15 @@ class JWT(object):
"""Creates a JWT object. """Creates a JWT object.
:param header: A dict or a JSON string with the JWT Header data. :param header: A dict or a JSON string with the JWT Header data.
:param claims: A dict or a string withthe JWT Claims data. :param claims: A dict or a string with the JWT Claims data.
:param jwt: a 'raw' JWT token :param jwt: a 'raw' JWT token
:param key: A (:class:`jwcrypto.jwk.JWK`) key to deserialize :param key: A (:class:`jwcrypto.jwk.JWK`) key to deserialize
the token. A (:class:`jwcrypt.jwk.JWKSet`) can also be used. the token. A (:class:`jwcrypto.jwk.JWKSet`) can also be used.
:param algs: An optional list of allowed algorithms :param algs: An optional list of allowed algorithms
:param default_claims: An optional dict with default values for :param default_claims: An optional dict with default values for
registred claims. A None value for NumericDate type claims registred claims. A None value for NumericDate type claims
will cause generation according to system time. Only the values will cause generation according to system time. Only the values
fro RFC 7519 - 4.1 are evaluated. from RFC 7519 - 4.1 are evaluated.
:param check_claims: An optional dict of claims that must be :param check_claims: An optional dict of claims that must be
present in the token, if the value is not None the claim must present in the token, if the value is not None the claim must
match exactly. match exactly.
@ -212,9 +212,15 @@ class JWT(object):
@header.setter @header.setter
def header(self, h): def header(self, h):
if isinstance(h, dict): if isinstance(h, dict):
self._header = json_encode(h) eh = json_encode(h)
else: else:
self._header = h eh = h
h = json_decode(eh)
if h.get('b64') is False:
raise ValueError("b64 header is invalid."
"JWTs cannot use unencoded payloads")
self._header = eh
@property @property
def claims(self): def claims(self):
@ -224,6 +230,10 @@ class JWT(object):
@claims.setter @claims.setter
def claims(self, c): def claims(self, c):
if self._reg_claims and not isinstance(c, dict):
# decode c so we can set default claims
c = json_decode(c)
if isinstance(c, dict): if isinstance(c, dict):
self._add_default_claims(c) self._add_default_claims(c)
self._claims = json_encode(c) self._claims = json_encode(c)
@ -276,7 +286,7 @@ class JWT(object):
def _add_jti_claim(self, claims): def _add_jti_claim(self, claims):
if 'jti' in claims or 'jti' not in self._reg_claims: if 'jti' in claims or 'jti' not in self._reg_claims:
return return
claims['jti'] = uuid.uuid4() claims['jti'] = str(uuid.uuid4())
def _add_default_claims(self, claims): def _add_default_claims(self, claims):
if self._reg_claims is None: if self._reg_claims is None:
@ -380,8 +390,8 @@ class JWT(object):
if value in claims[name]: if value in claims[name]:
continue continue
raise JWTInvalidClaimValue( raise JWTInvalidClaimValue(
"Invalid '%s' value. Expected '%s' in '%s'" % ( "Invalid '%s' value. Expected '%s' to be in '%s'" % (
name, value, claims[name])) name, claims[name], value))
elif name == 'exp': elif name == 'exp':
if value is not None: if value is not None:
@ -398,7 +408,7 @@ class JWT(object):
else: else:
if value is not None and value != claims[name]: if value is not None and value != claims[name]:
raise JWTInvalidClaimValue( raise JWTInvalidClaimValue(
"Invalid '%s' value. Expected '%d' got '%d'" % ( "Invalid '%s' value. Expected '%s' got '%s'" % (
name, value, claims[name])) name, value, claims[name]))
def make_signed_token(self, key): def make_signed_token(self, key):
@ -437,7 +447,7 @@ class JWT(object):
:param jwt: a 'raw' JWT token. :param jwt: a 'raw' JWT token.
:param key: A (:class:`jwcrypto.jwk.JWK`) verification or :param key: A (:class:`jwcrypto.jwk.JWK`) verification or
decryption key, or a (:class:`jwcrypt.jwk.JWKSet`) that decryption key, or a (:class:`jwcrypto.jwk.JWKSet`) that
contains a key indexed by the 'kid' header. contains a key indexed by the 'kid' header.
""" """
c = jwt.count('.') c = jwt.count('.')

View File

@ -3,7 +3,6 @@
from __future__ import unicode_literals from __future__ import unicode_literals
import copy import copy
import unittest import unittest
from cryptography.hazmat.backends import default_backend from cryptography.hazmat.backends import default_backend
@ -312,11 +311,17 @@ class TestJWK(unittest.TestCase):
self.assertRaises(jwk.InvalidJWKValue, self.assertRaises(jwk.InvalidJWKValue,
jwk.JWK.from_pyca, dict()) jwk.JWK.from_pyca, dict())
def test_jwk_from_json(self):
k = jwk.JWK.generate(kty='oct', size=256)
y = jwk.JWK.from_json(k.export())
self.assertEqual(k.export(), y.export())
def test_jwkset(self): def test_jwkset(self):
k = jwk.JWK(**RSAPrivateKey) k = jwk.JWK(**RSAPrivateKey)
ks = jwk.JWKSet() ks = jwk.JWKSet()
ks.add(k) ks.add(k)
ks2 = jwk.JWKSet().import_keyset(ks.export()) ks2 = jwk.JWKSet()
ks2.import_keyset(ks.export())
self.assertEqual(len(ks), len(ks2)) self.assertEqual(len(ks), len(ks2))
self.assertEqual(len(ks), 1) self.assertEqual(len(ks), 1)
k1 = ks.get_key(RSAPrivateKey['kid']) k1 = ks.get_key(RSAPrivateKey['kid'])
@ -329,6 +334,15 @@ class TestJWK(unittest.TestCase):
ks3 = jwk.JWKSet.from_json(ks.export()) ks3 = jwk.JWKSet.from_json(ks.export())
self.assertEqual(len(ks), len(ks3)) self.assertEqual(len(ks), len(ks3))
# Test Keyset with mutiple keys
ksm = jwk.JWKSet.from_json(json_encode(PrivateKeys))
num = 0
for item in ksm:
self.assertTrue(isinstance(item, jwk.JWK))
self.assertTrue(item in ksm)
num += 1
self.assertEqual(num, len(PrivateKeys['keys']))
def test_thumbprint(self): def test_thumbprint(self):
for i in range(0, len(PublicKeys['keys'])): for i in range(0, len(PublicKeys['keys'])):
k = jwk.JWK(**PublicKeys['keys'][i]) k = jwk.JWK(**PublicKeys['keys'][i])
@ -378,6 +392,25 @@ class TestJWK(unittest.TestCase):
self.assertFalse(pubkey.has_private) self.assertFalse(pubkey.has_private)
self.assertEqual(prikey.key_id, pubkey.key_id) self.assertEqual(prikey.key_id, pubkey.key_id)
def test_public(self):
key = jwk.JWK.from_pem(RSAPrivatePEM, password=RSAPrivatePassword)
self.assertTrue(key.has_public)
self.assertTrue(key.has_private)
pubkey = key.public()
self.assertTrue(pubkey.has_public)
self.assertFalse(pubkey.has_private)
# finally check public works
e = jwe.JWE('plaintext', '{"alg":"RSA-OAEP","enc":"A256GCM"}')
e.add_recipient(pubkey)
enc = e.serialize()
d = jwe.JWE()
d.deserialize(enc, key)
self.assertEqual(d.payload, b'plaintext')
def test_invalid_value(self):
with self.assertRaises(jwk.InvalidJWKValue):
jwk.JWK(kty='oct', k=b'\x01')
# RFC 7515 - A.1 # RFC 7515 - A.1
A1_protected = \ A1_protected = \
@ -556,7 +589,11 @@ A6_example = {
'key2': jwk.JWK(**A3_key), 'key2': jwk.JWK(**A3_key),
'protected2': bytes(bytearray(A3_protected)).decode('utf-8'), 'protected2': bytes(bytearray(A3_protected)).decode('utf-8'),
'header2': json_encode({"kid": "e9bc097a-ce51-4036-9562-d2ade882db0d"}), 'header2': json_encode({"kid": "e9bc097a-ce51-4036-9562-d2ade882db0d"}),
'serialized': A6_serialized} 'serialized': A6_serialized,
'jose_header': [{"kid": "2010-12-29",
"alg": "RS256"},
{"kid": "e9bc097a-ce51-4036-9562-d2ade882db0d",
"alg": "ES256"}]}
A7_example = \ A7_example = \
'{' + \ '{' + \
@ -630,6 +667,7 @@ class TestJWS(unittest.TestCase):
sig = s.serialize() sig = s.serialize()
s.deserialize(sig, A6_example['key1']) s.deserialize(sig, A6_example['key1'])
s.deserialize(A6_serialized, A6_example['key2']) s.deserialize(A6_serialized, A6_example['key2'])
self.assertEqual(A6_example['jose_header'], s.jose_header)
def test_A7(self): def test_A7(self):
s = jws.JWS(A6_example['payload']) s = jws.JWS(A6_example['payload'])
@ -801,6 +839,29 @@ E_A5_ex = \
'"ciphertext":"KDlTtXchhZTGufMYmOYGS4HffxPSUrfmqCHXaI9wOGY",' \ '"ciphertext":"KDlTtXchhZTGufMYmOYGS4HffxPSUrfmqCHXaI9wOGY",' \
'"tag":"Mz-VPPyU4RlcuYv1IwIvzw"}' '"tag":"Mz-VPPyU4RlcuYv1IwIvzw"}'
Issue_136_Protected_Header_no_epk = {
"alg": "ECDH-ES+A256KW",
"enc": "A256CBC-HS512"}
Issue_136_Contributed_JWE = \
"eyJhbGciOiJFQ0RILUVTK0ExMjhLVyIsImVuYyI6IkEyNTZDQkMtSFM1MTIiLCJr" \
"aWQiOiJrZXkxIiwiZXBrIjp7Imt0eSI6IkVDIiwiY3J2IjoiUC0yNTYiLCJ4Ijoi" \
"cDNpU241cEFSNUpYUE5aVF9SSEw2MTJMUGliWEI2WDhvTE9EOXFrN2NhTSIsInki" \
"OiI1Y04yQ2FqeXM3SVlDSXFEby1QUHF2bVQ1RzFvMEEtU0JicEQ5NFBOb3NNIn19" \
".wG51hYE_Vma8tvFKVyeZs4lsHhXiarEw3-59eWHPmhRflDAKrMvnBw1urezo_Bz" \
"ZyPJ76m42ORQPbhEu5NvbJk3vgdgcp03j" \
".lRttW8r6P6zM0uYDQt0EjQ.qnOnz7biCbqdLEdUH3acMamFm-cBRCSTFb83tNPrgDU" \
".vZnwYpYjzrTaYritwMzaguaAMsq9rQOWe8NUHICv2hg"
Issue_136_Contributed_Key = {
"alg": "ECDH-ES+A128KW",
"crv": "P-256",
"d": "F2PnliYin65AoIUxL1CwwzBPNeL2TyZPAKtkXOP50l8",
"kid": "key1",
"kty": "EC",
"x": "FPrb_xwxe8SBP3kO-e-WsofFp7n5-yc_tGgfAvqAP8g",
"y": "lM3HuyKMYUVsYdGqiWlkwTZbGO3Fh-hyadq8lfkTgBc"}
class TestJWE(unittest.TestCase): class TestJWE(unittest.TestCase):
def check_enc(self, plaintext, protected, key, vector): def check_enc(self, plaintext, protected, key, vector):
@ -843,6 +904,40 @@ class TestJWE(unittest.TestCase):
e = jwe.JWE(algs=['A256KW']) e = jwe.JWE(algs=['A256KW'])
e.deserialize(E_A5_ex, E_A4_ex['key2']) e.deserialize(E_A5_ex, E_A4_ex['key2'])
def test_compact_protected_header(self):
"""Compact representation requires a protected header"""
e = jwe.JWE(E_A1_ex['plaintext'])
e.add_recipient(E_A1_ex['key'], E_A1_ex['protected'])
with self.assertRaises(jwe.InvalidJWEOperation):
e.serialize(compact=True)
def test_compact_invalid_header(self):
with self.assertRaises(jwe.InvalidJWEOperation):
e = jwe.JWE(E_A1_ex['plaintext'], E_A1_ex['protected'],
aad='XYZ', recipient=E_A1_ex['key'])
e.serialize(compact=True)
with self.assertRaises(jwe.InvalidJWEOperation):
e = jwe.JWE(E_A1_ex['plaintext'], E_A1_ex['protected'],
unprotected='{"jku":"https://example.com/keys.jwks"}',
recipient=E_A1_ex['key'])
e.serialize(compact=True)
def test_JWE_Issue_136(self):
plaintext = "plain"
protected = json_encode(Issue_136_Protected_Header_no_epk)
key = jwk.JWK.generate(kty='EC', crv='P-521')
e = jwe.JWE(plaintext, protected)
e.add_recipient(key)
enc = e.serialize()
e.deserialize(enc, key)
self.assertEqual(e.payload, plaintext.encode('utf-8'))
e = jwe.JWE()
e.deserialize(Issue_136_Contributed_JWE,
jwk.JWK(**Issue_136_Contributed_Key))
MMA_vector_key = jwk.JWK(**E_A2_key) MMA_vector_key = jwk.JWK(**E_A2_key)
MMA_vector_ok_cek = \ MMA_vector_ok_cek = \
@ -1018,6 +1113,39 @@ class TestJWT(unittest.TestCase):
keyset.add(key) keyset.add(key)
jwt.JWT(jwt=token, key=keyset, check_claims={'exp': 1300819380}) jwt.JWT(jwt=token, key=keyset, check_claims={'exp': 1300819380})
def test_invalid_claim_type(self):
key = jwk.JWK(**E_A2_key)
claims = {"testclaim": "test"}
claims.update(A1_claims)
t = jwt.JWT(A1_header, claims)
t.make_encrypted_token(key)
token = t.serialize()
# Wrong string
self.assertRaises(jwt.JWTInvalidClaimValue, jwt.JWT, jwt=token,
key=key, check_claims={"testclaim": "ijgi"})
# Wrong type
self.assertRaises(jwt.JWTInvalidClaimValue, jwt.JWT, jwt=token,
key=key, check_claims={"testclaim": 123})
# Correct
jwt.JWT(jwt=token, key=key, check_claims={"testclaim": "test"})
def test_claim_params(self):
key = jwk.JWK(**E_A2_key)
default_claims = {"iss": "test", "exp": None}
string_claims = '{"string_claim":"test"}'
string_header = '{"alg":"RSA1_5","enc":"A128CBC-HS256"}'
t = jwt.JWT(string_header, string_claims,
default_claims=default_claims)
t.make_encrypted_token(key)
token = t.serialize()
# Check default_claims
jwt.JWT(jwt=token, key=key, check_claims={"iss": "test", "exp": None,
"string_claim": "test"})
class ConformanceTests(unittest.TestCase): class ConformanceTests(unittest.TestCase):
@ -1148,3 +1276,57 @@ class JWATests(unittest.TestCase):
self.assertEqual(inst.name, name) self.assertEqual(inst.name, name)
else: else:
self.fail((name, cls)) self.fail((name, cls))
# RFC 7797
rfc7797_e_header = '{"alg":"HS256"}'
rfc7797_u_header = '{"alg":"HS256","b64":false,"crit":["b64"]}'
rfc7797_payload = "$.02"
class TestUnencodedPayload(unittest.TestCase):
def test_regular(self):
result = \
'eyJhbGciOiJIUzI1NiJ9.JC4wMg.' + \
'5mvfOroL-g7HyqJoozehmsaqmvTYGEq5jTI1gVvoEoQ'
s = jws.JWS(rfc7797_payload)
s.add_signature(jwk.JWK(**SymmetricKeys['keys'][1]),
protected=rfc7797_e_header)
sig = s.serialize(compact=True)
self.assertEqual(sig, result)
def test_compat_unencoded(self):
result = \
'eyJhbGciOiJIUzI1NiIsImI2NCI6ZmFsc2UsImNyaXQiOlsiYjY0Il19..' + \
'A5dxf2s96_n5FLueVuW1Z_vh161FwXZC4YLPff6dmDY'
s = jws.JWS(rfc7797_payload)
s.add_signature(jwk.JWK(**SymmetricKeys['keys'][1]),
protected=rfc7797_u_header)
# check unencoded payload is in serialized form
sig = s.serialize()
self.assertEqual(json_decode(sig)['payload'], rfc7797_payload)
# check error raises if we try to get compact serialization
with self.assertRaises(jws.InvalidJWSOperation):
sig = s.serialize(compact=True)
# check compact serialization is allowed with detached payload
s.detach_payload()
sig = s.serialize(compact=True)
self.assertEqual(sig, result)
def test_misses_crit(self):
s = jws.JWS(rfc7797_payload)
with self.assertRaises(jws.InvalidJWSObject):
s.add_signature(jwk.JWK(**SymmetricKeys['keys'][1]),
protected={"alg": "HS256", "b64": False})
def test_mismatching_encoding(self):
s = jws.JWS(rfc7797_payload)
s.add_signature(jwk.JWK(**SymmetricKeys['keys'][0]),
protected=rfc7797_e_header)
with self.assertRaises(jws.InvalidJWSObject):
s.add_signature(jwk.JWK(**SymmetricKeys['keys'][1]),
protected=rfc7797_u_header)

View File

@ -6,7 +6,7 @@ from setuptools import setup
setup( setup(
name = 'jwcrypto', name = 'jwcrypto',
version = '0.4.2', version = '0.6.0',
license = 'LGPLv3+', license = 'LGPLv3+',
maintainer = 'JWCrypto Project Contributors', maintainer = 'JWCrypto Project Contributors',
maintainer_email = 'simo@redhat.com', maintainer_email = 'simo@redhat.com',
@ -18,6 +18,7 @@ setup(
'Programming Language :: Python :: 3.4', 'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.5', 'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6', 'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
'Intended Audience :: Developers', 'Intended Audience :: Developers',
'Topic :: Security', 'Topic :: Security',
'Topic :: Software Development :: Libraries :: Python Modules' 'Topic :: Software Development :: Libraries :: Python Modules'

View File

@ -1,5 +1,5 @@
[tox] [tox]
envlist = lint,py27,py34,py35,py36,pep8py2,pep8py3,doc,sphinx envlist = lint,py27,py34,py35,py36,py37,pep8py2,pep8py3,doc,sphinx
skip_missing_interpreters = true skip_missing_interpreters = true
[testenv] [testenv]