feat: update to http-ece 0.6.4 (with draft-06 support)

use new "content_type" argument to specify either "aesgcm" (draft-01)
or "aes128gcm" (draft-04).

NOTE: Not all clients yet support Draft-04.

closes #33
This commit is contained in:
jrconlin 2017-02-14 13:36:19 -08:00
parent 0e3af3c0f4
commit ac3322f653
5 changed files with 76 additions and 32 deletions

View File

@ -1,3 +1,10 @@
## 0.7.0 (2017-02-14)
feat: update to http-ece 0.7.0 (with draft-06 support)
feat: Allow empty payloads for send()
feat: Add python3 classfiers & python3.6 travis tests
feat: Add README.rst
bug: change long to int to support python3
## 0.4.0 (2016-06-05)
feat: make python 2.7 / 3.5 polyglot

View File

@ -1,4 +1,7 @@
[![Build_Status](https://travis-ci.org/jrconlin/pywebpush.svg?branch=master)](https://travis-ci.org/jrconlin/pywebpush)
[![Requirements
Status](https://requires.io/github/web-push-libs/pywebpush/requirements.svg?branch=master)]
# Webpush Data encryption library for Python

View File

@ -82,6 +82,11 @@ class WebPusher:
"""
subscription_info = {}
valid_encodings = [
# "aesgcm128", # this is draft-0, but DO NOT USE.
"aesgcm", # draft-httpbis-encryption-encoding-01
"aes128gcm" # draft-httpbis-encryption-encoding-04
]
def __init__(self, subscription_info):
"""Initialize using the info provided by the client PushSubscription
@ -113,16 +118,28 @@ class WebPusher:
"""Add base64 padding to the end of a string, if required"""
return data + b"===="[:len(data) % 4]
def encode(self, data):
def encode(self, data, content_encoding="aesgcm"):
"""Encrypt the data.
:param data: A serialized block of byte data (String, JSON, bit array,
etc.) Make sure that whatever you send, your client knows how
to understand it.
:type data: str
:param content_encoding: The content_encoding type to use to encrypt
the data. Defaults to draft-01 "aesgcm". Latest draft-04 is
"aes128gcm", however not all clients may be able to use this
format.
:type content_encoding: enum("aesgcm", "aes128gcm")
"""
# Salt is a random 16 byte array.
salt = os.urandom(16)
salt = None
if content_encoding not in self.valid_encodings:
raise WebPushException("Invalid content encoding specified. "
"Select from " +
json.dumps(self.valid_encodings))
if (content_encoding == "aesgcm"):
salt = os.urandom(16)
# The server key is an ephemeral ECDH key used only for this
# transaction
server_key = pyelliptic.ECC(curve="prime256v1")
@ -133,26 +150,31 @@ class WebPusher:
if isinstance(data, six.string_types):
data = bytes(data.encode('utf8'))
key_id = server_key_id.decode('utf8')
# http_ece requires that these both be set BEFORE encrypt or
# decrypt is called if you specify the key as "dh".
http_ece.keys[server_key_id] = server_key
http_ece.labels[server_key_id] = "P-256"
http_ece.keys[key_id] = server_key
http_ece.labels[key_id] = "P-256"
encrypted = http_ece.encrypt(
data,
salt=salt,
keyid=server_key_id,
keyid=key_id,
dh=self.receiver_key,
authSecret=self.auth_key)
authSecret=self.auth_key,
version=content_encoding)
return CaseInsensitiveDict({
reply = CaseInsensitiveDict({
'crypto_key': base64.urlsafe_b64encode(
server_key.get_pubkey()).strip(b'='),
'salt': base64.urlsafe_b64encode(salt).strip(b'='),
'body': encrypted,
})
if salt:
reply['salt'] = base64.urlsafe_b64encode(salt).strip(b'=')
return reply
def send(self, data=None, headers=None, ttl=0, gcm_key=None, reg_id=None):
def send(self, data=None, headers=None, ttl=0, gcm_key=None, reg_id=None,
content_encoding="aesgcm"):
"""Encode and send the data to the Push Service.
:param data: A serialized block of data (see encode() ).

View File

@ -65,32 +65,44 @@ class WebpushTestCase(unittest.TestCase):
eq_(push.auth_key, b'\x93\xc2U\xea\xc8\xddn\x10"\xd6}\xff,0K\xbc')
def test_encode(self):
for content_encoding in ["aesgcm", "aes128gcm"]:
recv_key = pyelliptic.ECC(curve="prime256v1")
subscription_info = self._gen_subscription_info(recv_key)
data = "Mary had a little lamb, with some nice mint jelly"
push = WebPusher(subscription_info)
encoded = push.encode(data, content_encoding=content_encoding)
keyid = base64.urlsafe_b64encode(recv_key.get_pubkey()[1:])
http_ece.keys[keyid] = recv_key
http_ece.labels[keyid] = 'P-256'
# Convert these b64 strings into their raw, binary form.
raw_salt = None
if 'salt' in encoded:
raw_salt = base64.urlsafe_b64decode(
push._repad(encoded['salt']))
raw_dh = base64.urlsafe_b64decode(
push._repad(encoded['crypto_key']))
raw_auth = base64.urlsafe_b64decode(
push._repad(subscription_info['keys']['auth']))
decoded = http_ece.decrypt(
encoded['body'],
salt=raw_salt,
dh=raw_dh,
keyid=keyid,
authSecret=raw_auth,
version=content_encoding
)
eq_(decoded.decode('utf8'), data)
def test_bad_content_encoding(self):
recv_key = pyelliptic.ECC(curve="prime256v1")
subscription_info = self._gen_subscription_info(recv_key)
data = "Mary had a little lamb, with some nice mint jelly"
push = WebPusher(subscription_info)
encoded = push.encode(data)
keyid = base64.urlsafe_b64encode(recv_key.get_pubkey()[1:])
http_ece.keys[keyid] = recv_key
http_ece.labels[keyid] = 'P-256'
# Convert these b64 strings into their raw, binary form.
raw_salt = base64.urlsafe_b64decode(push._repad(encoded['salt']))
raw_dh = base64.urlsafe_b64decode(push._repad(encoded['crypto_key']))
raw_auth = base64.urlsafe_b64decode(
push._repad(subscription_info['keys']['auth']))
decoded = http_ece.decrypt(
encoded['body'],
salt=raw_salt,
dh=raw_dh,
keyid=keyid,
authSecret=raw_auth,
)
eq_(decoded.decode('utf8'), data)
self.assertRaises(WebPushException,
push.encode,
data,
content_encoding="aesgcm128")
@patch("requests.post")
def test_send(self, mock_post):

View File

@ -3,7 +3,7 @@ import os
from setuptools import find_packages, setup
__version__ = "0.6.3"
__version__ = "0.7.0"
def read_from(file):