Merge pull request #53 from web-push-libs/feat/49

feat: update to use Cryptography library
This commit is contained in:
JR Conlin 2017-05-10 16:34:00 -07:00 committed by GitHub
commit a09368e0b3
7 changed files with 80 additions and 75 deletions

View File

@ -1,6 +1,7 @@
[![Build_Status](https://travis-ci.org/web-push-libs/pywebpush.svg?branch=master)](https://travis-ci.org/web-push-libs/pywebpush)
[![Build
Status](https://travis-ci.org/web-push-libs/pywebpush.svg?branch=master)](https://travis-ci.org/web-push-libs/pywebpush)
[![Requirements
Status](https://requires.io/github/web-push-libs/pywebpush/requirements.svg?branch=feat%2F44)](https://requires.io/github/web-push-libs/pywebpush/requirements/?branch=master)
Status](https://requires.io/github/web-push-libs/pywebpush/requirements.svg?branch=master)](https://requires.io/github/web-push-libs/pywebpush/requirements/?branch=master)
# Webpush Data encryption library for Python
@ -61,8 +62,12 @@ in the `subscription_info` block.
*data* - can be any serial content (string, bit array, serialized JSON, etc), but be sure that your receiving
application is able to parse and understand it. (e.g. `data = "Mary had a little lamb."`)
*content_type* - specifies the form of Encryption to use, either `'aesgcm'` or the newer `'aes128gcm'`. NOTE that
not all User Agents can decrypt `'aes128gcm'`, so the library defaults to the older form.
*vapid_claims* - a `dict` containing the VAPID claims required for authorization (See
[py_vapid](https://github.com/web-push-libs/vapid/tree/master/python) for more details)
[py_vapid](https://github.com/web-push-libs/vapid/tree/master/python) for more details). If `aud` is not specified,
pywebpush will attempt to auto-fill from the `endpoint`.
*vapid_private_key* - Either a path to a VAPID EC2 private key PEM file, or a string containing the DER representation.
(See [py_vapid](https://github.com/web-push-libs/vapid/tree/master/python) for more details.) The `private_key` may be

View File

@ -1,4 +1,4 @@
|Build\_Status| |Requirements Status|
|Build Status| |Requirements Status|
Webpush Data encryption library for Python
==========================================
@ -65,10 +65,15 @@ above).
etc), but be sure that your receiving application is able to parse and
understand it. (e.g. ``data = "Mary had a little lamb."``)
*content\_type* - specifies the form of Encryption to use, either
``'aesgcm'`` or the newer ``'aes128gcm'``. NOTE that not all User Agents
can decrypt ``'aes128gcm'``, so the library defaults to the older form.
*vapid\_claims* - a ``dict`` containing the VAPID claims required for
authorization (See
`py\_vapid <https://github.com/web-push-libs/vapid/tree/master/python>`__
for more details)
for more details). If ``aud`` is not specified, pywebpush will attempt
to auto-fill from the ``endpoint``.
*vapid\_private\_key* - Either a path to a VAPID EC2 private key PEM
file, or a string containing the DER representation. (See
@ -170,7 +175,7 @@ Encode the ``data`` for future use. On error, returns a
encoded_data = WebPush(subscription_info).encode(data)
.. |Build\_Status| image:: https://travis-ci.org/web-push-libs/pywebpush.svg?branch=master
.. |Build Status| image:: https://travis-ci.org/web-push-libs/pywebpush.svg?branch=master
:target: https://travis-ci.org/web-push-libs/pywebpush
.. |Requirements Status| image:: https://requires.io/github/web-push-libs/pywebpush/requirements.svg?branch=feat%2F44
.. |Requirements Status| image:: https://requires.io/github/web-push-libs/pywebpush/requirements.svg?branch=master
:target: https://requires.io/github/web-push-libs/pywebpush/requirements/?branch=master

View File

@ -13,8 +13,9 @@ except ImportError: # pragma nocover
import six
import http_ece
import pyelliptic
import requests
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import ec
from py_vapid import Vapid
@ -112,7 +113,7 @@ class WebPusher:
keys = self.subscription_info['keys']
for k in ['p256dh', 'auth']:
if keys.get(k) is None:
raise WebPushException("Missing keys value: %s", k)
raise WebPushException("Missing keys value: {}".format(k))
if isinstance(keys[k], six.string_types):
keys[k] = bytes(keys[k].encode('utf8'))
receiver_raw = base64.urlsafe_b64decode(
@ -155,31 +156,25 @@ class WebPusher:
salt = os.urandom(16)
# The server key is an ephemeral ECDH key used only for this
# transaction
server_key = pyelliptic.ECC(curve="prime256v1")
# the ID is the base64 of the raw key, minus the leading "\x04"
# ID tag.
server_key_id = base64.urlsafe_b64encode(server_key.get_pubkey()[1:])
server_key = ec.generate_private_key(ec.SECP256R1, default_backend())
crypto_key = base64.urlsafe_b64encode(
server_key.public_key().public_numbers().encode_point()
).strip(b'=')
if isinstance(data, six.string_types):
data = bytes(data.encode('utf8'))
key_id = server_key_id.decode('utf8')
# http_ece requires that these both be set BEFORE encrypt or
# decrypt is called if you specify the key as "dh".
http_ece.keys[key_id] = server_key
http_ece.labels[key_id] = "P-256"
encrypted = http_ece.encrypt(
data,
salt=salt,
keyid=key_id,
keyid=crypto_key.decode(),
private_key=server_key,
dh=self.receiver_key,
authSecret=self.auth_key,
auth_secret=self.auth_key,
version=content_encoding)
reply = CaseInsensitiveDict({
'crypto_key': base64.urlsafe_b64encode(
server_key.get_pubkey()).strip(b'='),
'crypto_key': crypto_key,
'body': encrypted,
})
if salt:
@ -329,7 +324,7 @@ def webpush(subscription_info,
:type subscription_info: dict
:param data: Serialized data to send
:type data: str
:param vapid_private_key: Dath to vapid private key PEM or encoded str
:param vapid_private_key: Path to vapid private key PEM or encoded str
:type vapid_private_key: str
:param vapid_claims: Dictionary of claims ('sub' required)
:type vapid_claims: dict
@ -344,16 +339,17 @@ def webpush(subscription_info,
if vapid_claims:
if not vapid_claims.get('aud'):
url = urlparse(subscription_info.get('endpoint'))
aud = "{}://{}/".format(url.scheme, url.netloc)
aud = "{}://{}".format(url.scheme, url.netloc)
vapid_claims['aud'] = aud
if not vapid_private_key:
raise WebPushException("VAPID dict missing 'private_key'")
if os.path.isfile(vapid_private_key):
# Presume that key from file is handled correctly by
# py_vapid.
vv = Vapid(private_key_file=vapid_private_key) # pragma no cover
vv = Vapid.from_file(
private_key_file=vapid_private_key) # pragma no cover
else:
vv = Vapid(private_key=vapid_private_key)
vv = Vapid.from_raw(private_raw=vapid_private_key.encode())
vapid_headers = vv.sign(vapid_claims)
result = WebPusher(subscription_info).send(
data,
@ -362,6 +358,6 @@ def webpush(subscription_info,
curl=curl,
)
if not curl and result.status_code > 202:
raise WebPushException("Push failed: {}:".format(
raise WebPushException("Push failed: {}: {}".format(
result, result.text))
return result

View File

@ -6,7 +6,8 @@ import unittest
from mock import patch, Mock
from nose.tools import eq_, ok_, assert_raises
import http_ece
import pyelliptic
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.backends import default_backend
from pywebpush import WebPusher, WebPushException, CaseInsensitiveDict, webpush
@ -21,17 +22,24 @@ class WebpushTestCase(unittest.TestCase):
"M5xqEwuPM7VuQcyiLDhvovthPIXx+gsQRQ=="
)
def _gen_subscription_info(self, recv_key,
def _gen_subscription_info(self,
recv_key=None,
endpoint="https://example.com/"):
if not recv_key:
recv_key = ec.generate_private_key(ec.SECP256R1, default_backend())
return {
"endpoint": endpoint,
"keys": {
'auth': base64.urlsafe_b64encode(os.urandom(16)).strip(b'='),
'p256dh': base64.urlsafe_b64encode(
recv_key.get_pubkey()).strip(b'='),
'p256dh': self._get_pubkey_str(recv_key),
}
}
def _get_pubkey_str(self, priv_key):
return base64.urlsafe_b64encode(
priv_key.public_key().public_numbers().encode_point()
).strip(b'=')
def test_init(self):
# use static values so we know what to look for in the reply
subscription_info = {
@ -72,14 +80,17 @@ class WebpushTestCase(unittest.TestCase):
def test_encode(self):
for content_encoding in ["aesgcm", "aes128gcm"]:
recv_key = pyelliptic.ECC(curve="prime256v1")
recv_key = ec.generate_private_key(
ec.SECP256R1, default_backend())
subscription_info = self._gen_subscription_info(recv_key)
data = "Mary had a little lamb, with some nice mint jelly"
push = WebPusher(subscription_info)
encoded = push.encode(data, content_encoding=content_encoding)
keyid = base64.urlsafe_b64encode(recv_key.get_pubkey()[1:])
http_ece.keys[keyid] = recv_key
http_ece.labels[keyid] = 'P-256'
"""
crypto_key = base64.urlsafe_b64encode(
self._get_pubkey_str(recv_key)
).strip(b'=')
"""
# Convert these b64 strings into their raw, binary form.
raw_salt = None
if 'salt' in encoded:
@ -94,15 +105,14 @@ class WebpushTestCase(unittest.TestCase):
encoded['body'],
salt=raw_salt,
dh=raw_dh,
keyid=keyid,
authSecret=raw_auth,
private_key=recv_key,
auth_secret=raw_auth,
version=content_encoding
)
eq_(decoded.decode('utf8'), data)
def test_bad_content_encoding(self):
recv_key = pyelliptic.ECC(curve="prime256v1")
subscription_info = self._gen_subscription_info(recv_key)
subscription_info = self._gen_subscription_info()
data = "Mary had a little lamb, with some nice mint jelly"
push = WebPusher(subscription_info)
self.assertRaises(WebPushException,
@ -112,8 +122,7 @@ class WebpushTestCase(unittest.TestCase):
@patch("requests.post")
def test_send(self, mock_post):
recv_key = pyelliptic.ECC(curve="prime256v1")
subscription_info = self._gen_subscription_info(recv_key)
subscription_info = self._gen_subscription_info()
headers = {"Crypto-Key": "pre-existing",
"Authentication": "bearer vapid"}
data = "Mary had a little lamb"
@ -131,9 +140,7 @@ class WebpushTestCase(unittest.TestCase):
def test_send_vapid(self, mock_post):
mock_post.return_value = Mock()
mock_post.return_value.status_code = 200
recv_key = pyelliptic.ECC(curve="prime256v1")
subscription_info = self._gen_subscription_info(recv_key)
subscription_info = self._gen_subscription_info()
data = "Mary had a little lamb"
webpush(
subscription_info=subscription_info,
@ -165,43 +172,40 @@ class WebpushTestCase(unittest.TestCase):
def test_send_bad_vapid_no_key(self, mock_post):
mock_post.return_value = Mock()
mock_post.return_value.status_code = 200
recv_key = pyelliptic.ECC(curve="prime256v1")
subscription_info = self._gen_subscription_info(recv_key)
subscription_info = self._gen_subscription_info()
data = "Mary had a little lamb"
assert_raises(WebPushException,
webpush,
subscription_info=subscription_info,
data=data,
vapid_claims={
"aud": "https://example.com",
"sub": "mailto:ops@example.com"
}
"aud": "https://example.com",
"sub": "mailto:ops@example.com"
}
)
@patch("requests.post")
def test_send_bad_vapid_bad_return(self, mock_post):
mock_post.return_value = Mock()
mock_post.return_value.status_code = 410
recv_key = pyelliptic.ECC(curve="prime256v1")
subscription_info = self._gen_subscription_info(recv_key)
subscription_info = self._gen_subscription_info()
data = "Mary had a little lamb"
assert_raises(WebPushException,
webpush,
subscription_info=subscription_info,
data=data,
vapid_claims={
"aud": "https://example.com",
"sub": "mailto:ops@example.com"
},
"aud": "https://example.com",
"sub": "mailto:ops@example.com"
},
vapid_private_key=self.vapid_key
)
@patch("requests.post")
def test_send_empty(self, mock_post):
recv_key = pyelliptic.ECC(curve="prime256v1")
subscription_info = self._gen_subscription_info(recv_key)
subscription_info = self._gen_subscription_info()
headers = {"Crypto-Key": "pre-existing",
"Authentication": "bearer vapid"}
WebPusher(subscription_info).send('', headers)
@ -214,16 +218,14 @@ class WebpushTestCase(unittest.TestCase):
ok_('pre-existing' in ckey)
def test_encode_empty(self):
recv_key = pyelliptic.ECC(curve="prime256v1")
subscription_info = self._gen_subscription_info(recv_key)
subscription_info = self._gen_subscription_info()
headers = {"Crypto-Key": "pre-existing",
"Authentication": "bearer vapid"}
encoded = WebPusher(subscription_info).encode('', headers)
eq_(encoded, None)
def test_encode_no_crypto(self):
recv_key = pyelliptic.ECC(curve="prime256v1")
subscription_info = self._gen_subscription_info(recv_key)
subscription_info = self._gen_subscription_info()
del(subscription_info['keys'])
headers = {"Crypto-Key": "pre-existing",
"Authentication": "bearer vapid"}
@ -236,8 +238,7 @@ class WebpushTestCase(unittest.TestCase):
@patch("requests.post")
def test_send_no_headers(self, mock_post):
recv_key = pyelliptic.ECC(curve="prime256v1")
subscription_info = self._gen_subscription_info(recv_key)
subscription_info = self._gen_subscription_info()
data = "Mary had a little lamb"
WebPusher(subscription_info).send(data)
eq_(subscription_info.get('endpoint'), mock_post.call_args[0][0])
@ -248,15 +249,14 @@ class WebpushTestCase(unittest.TestCase):
@patch("pywebpush.open")
def test_as_curl(self, opener):
recv_key = pyelliptic.ECC(curve="prime256v1")
subscription_info = self._gen_subscription_info(recv_key)
subscription_info = self._gen_subscription_info()
result = webpush(
subscription_info,
data="Mary had a little lamb",
vapid_claims={
"aud": "https://example.com",
"sub": "mailto:ops@example.com"
},
"aud": "https://example.com",
"sub": "mailto:ops@example.com"
},
vapid_private_key=self.vapid_key,
curl=True
)
@ -281,9 +281,8 @@ class WebpushTestCase(unittest.TestCase):
@patch("requests.post")
def test_gcm(self, mock_post):
recv_key = pyelliptic.ECC(curve="prime256v1")
subscription_info = self._gen_subscription_info(
recv_key,
None,
endpoint="https://android.googleapis.com/gcm/send/regid123")
headers = {"Crypto-Key": "pre-existing",
"Authentication": "bearer vapid"}

View File

@ -1,4 +1,4 @@
http-ece==0.7.1
python-jose==1.3.2
cryptography==1.8.1
http-ece==1.0.1
requests==2.13.0
py-vapid==0.8.1
py-vapid==1.2.1

View File

@ -3,7 +3,7 @@ import os
from setuptools import find_packages, setup
__version__ = "0.8.0"
__version__ = "1.0.0"
def read_from(file):

View File

@ -1,5 +1,5 @@
-r requirements.txt
nose>=1.3.7
coverage>=4.3.4
coverage>=4.4
mock==2.0.0
flake8