tarball import

This commit is contained in:
Frédéric Péters 2018-11-04 15:07:38 +01:00
commit 2352e866ce
22 changed files with 1354 additions and 0 deletions

0
CHANGELOG.md Normal file
View File

5
MANIFEST.in Normal file
View File

@ -0,0 +1,5 @@
include *.md
include *.txt
include setup.*
include LICENSE
recursive-include py_vapid *.py

115
PKG-INFO Normal file
View File

@ -0,0 +1,115 @@
Metadata-Version: 1.1
Name: py-vapid
Version: 1.4.0
Summary: Simple VAPID header generation library
Home-page: https://github.com/mozilla-services/vapid
Author: JR Conlin
Author-email: src+vapid@jrconlin.com
License: MPL2
Description: Easy VAPID generation
=====================
This minimal library contains the minimal set of functions you need to
generate a VAPID key set and get the headers you'll need to sign a
WebPush subscription update.
VAPID is a voluntary standard for WebPush subscription providers (sites
that send WebPush updates to remote customers) to self-identify to Push
Servers (the servers that convey the push notifications).
The VAPID "claims" are a set of JSON keys and values. There are two
required fields, one semi-optional and several optional additional
fields.
At a minimum a VAPID claim set should look like:
::
{"sub":"mailto:YourEmail@YourSite.com","aud":"https://PushServer","exp":"ExpirationTimestamp"}
A few notes:
***sub*** is the email address you wish to have on record for this
request, prefixed with "``mailto:``". If things go wrong, this is the
email that will be used to contact you (for instance). This can be a
general delivery address like "``mailto:push_operations@example.com``"
or a specific address like "``mailto:bob@example.com``".
***aud*** is the audience for the VAPID. This is the scheme and host you
use to send subscription endpoints and generally coincides with the
``endpoint`` specified in the Subscription Info block.
As example, if a WebPush subscription info contains:
``{"endpoint": "https://push.example.com:8012/v1/push/...", ...}``
then the ``aud`` would be "``https://push.example.com:8012``"
While some Push Services consider this an optional field, others may be
stricter.
***exp*** This is the UTC timestamp for when this VAPID request will
expire. The maximum period is 24 hours. Setting a shorter period can
prevent "replay" attacks. Setting a longer period allows you to reuse
headers for multiple sends (e.g. if you're sending hundreds of updates
within an hour or so.) If no ``exp`` is included, one that will expire
in 24 hours will be auto-generated for you.
Claims should be stored in a JSON compatible file. In the examples
below, we've stored the claims into a file named ``claims.json``.
py\_vapid can either be installed as a library or used as a stand along
app, ``bin/vapid``.
App Installation
----------------
You'll need ``python virtualenv`` Run that in the current directory.
Then run
::
bin/pip install -r requirements.txt
bin/python setup.py install
App Usage
---------
Run by itself, ``bin/vapid`` will check and optionally create the
public\_key.pem and private\_key.pem files.
``bin/vapid --gen`` can be used to generate a new set of public and
private key PEM files. These will overwrite the contents of
``private_key.pem`` and ``public_key.pem``.
``bin/vapid --sign claims.json`` will generate a set of HTTP headers
from a JSON formatted claims file. A sample ``claims.json`` is included
with this distribution.
``bin/vapid --sign claims.json --json`` will output the headers in JSON
format, which may be useful for other programs.
``bin/vapid --applicationServerKey`` will return the
``applicationServerKey`` value you can use to make a restricted
endpoint. See
https://developer.mozilla.org/en-US/docs/Web/API/PushManager/subscribe
for more details. Be aware that this value is tied to the generated
public/private key. If you remove or generate a new key, any restricted
URL you've previously generated will need to be reallocated. Please note
that some User Agents may require you `to decode this string into a
Uint8Array <https://github.com/GoogleChrome/push-notifications/blob/master/app/scripts/main.js>`__.
See ``bin/vapid -h`` for all options and commands.
Keywords: vapid push webpush
Platform: UNKNOWN
Classifier: Topic :: Internet :: WWW/HTTP
Classifier: Programming Language :: Python
Classifier: Programming Language :: Python :: 2
Classifier: Programming Language :: Python :: 2.7
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.4
Classifier: Programming Language :: Python :: 3.5

90
README.md Normal file
View File

@ -0,0 +1,90 @@
# Easy VAPID generation
This minimal library contains the minimal set of functions you need to
generate a VAPID key set and get the headers you'll need to sign a
WebPush subscription update.
VAPID is a voluntary standard for WebPush subscription providers
(sites that send WebPush updates to remote customers) to self-identify
to Push Servers (the servers that convey the push notifications).
The VAPID "claims" are a set of JSON keys and values. There are two
required fields, one semi-optional and several optional additional
fields.
At a minimum a VAPID claim set should look like:
```
{"sub":"mailto:YourEmail@YourSite.com","aud":"https://PushServer","exp":"ExpirationTimestamp"}
```
A few notes:
***sub*** is the email address you wish to have on record for this
request, prefixed with "`mailto:`". If things go wrong, this is the
email that will be used to contact you (for instance). This can be a
general delivery address like "`mailto:push_operations@example.com`" or a
specific address like "`mailto:bob@example.com`".
***aud*** is the audience for the VAPID. This is the scheme and host
you use to send subscription endpoints and generally coincides with
the `endpoint` specified in the Subscription Info block.
As example, if a WebPush subscription info contains:
`{"endpoint": "https://push.example.com:8012/v1/push/...", ...}`
then the `aud` would be "`https://push.example.com:8012`"
While some Push Services consider this an optional field, others may
be stricter.
***exp*** This is the UTC timestamp for when this VAPID request will
expire. The maximum period is 24 hours. Setting a shorter period can
prevent "replay" attacks. Setting a longer period allows you to reuse
headers for multiple sends (e.g. if you're sending hundreds of updates
within an hour or so.) If no `exp` is included, one that will expire
in 24 hours will be auto-generated for you.
Claims should be stored in a JSON compatible file. In the examples
below, we've stored the claims into a file named `claims.json`.
py_vapid can either be installed as a library or used as a stand along
app, `bin/vapid`.
## App Installation
You'll need `python virtualenv` Run that in the current directory.
Then run
```
bin/pip install -r requirements.txt
bin/python setup.py install
```
## App Usage
Run by itself, `bin/vapid` will check and optionally create the
public_key.pem and private_key.pem files.
`bin/vapid --gen` can be used to generate a new set of public and
private key PEM files. These will overwrite the contents of
`private_key.pem` and `public_key.pem`.
`bin/vapid --sign claims.json` will generate a set of HTTP headers
from a JSON formatted claims file. A sample `claims.json` is included
with this distribution.
`bin/vapid --sign claims.json --json` will output the headers in
JSON format, which may be useful for other programs.
`bin/vapid --applicationServerKey` will return the
`applicationServerKey` value you can use to make a restricted
endpoint. See
https://developer.mozilla.org/en-US/docs/Web/API/PushManager/subscribe
for more details. Be aware that this value is tied to the generated
public/private key. If you remove or generate a new key, any
restricted URL you've previously generated will need to be
reallocated. Please note that some User Agents may require you [to
decode this string into a Uint8Array](https://github.com/GoogleChrome/push-notifications/blob/master/app/scripts/main.js).
See `bin/vapid -h` for all options and commands.

95
README.rst Normal file
View File

@ -0,0 +1,95 @@
Easy VAPID generation
=====================
This minimal library contains the minimal set of functions you need to
generate a VAPID key set and get the headers you'll need to sign a
WebPush subscription update.
VAPID is a voluntary standard for WebPush subscription providers (sites
that send WebPush updates to remote customers) to self-identify to Push
Servers (the servers that convey the push notifications).
The VAPID "claims" are a set of JSON keys and values. There are two
required fields, one semi-optional and several optional additional
fields.
At a minimum a VAPID claim set should look like:
::
{"sub":"mailto:YourEmail@YourSite.com","aud":"https://PushServer","exp":"ExpirationTimestamp"}
A few notes:
***sub*** is the email address you wish to have on record for this
request, prefixed with "``mailto:``". If things go wrong, this is the
email that will be used to contact you (for instance). This can be a
general delivery address like "``mailto:push_operations@example.com``"
or a specific address like "``mailto:bob@example.com``".
***aud*** is the audience for the VAPID. This is the scheme and host you
use to send subscription endpoints and generally coincides with the
``endpoint`` specified in the Subscription Info block.
As example, if a WebPush subscription info contains:
``{"endpoint": "https://push.example.com:8012/v1/push/...", ...}``
then the ``aud`` would be "``https://push.example.com:8012``"
While some Push Services consider this an optional field, others may be
stricter.
***exp*** This is the UTC timestamp for when this VAPID request will
expire. The maximum period is 24 hours. Setting a shorter period can
prevent "replay" attacks. Setting a longer period allows you to reuse
headers for multiple sends (e.g. if you're sending hundreds of updates
within an hour or so.) If no ``exp`` is included, one that will expire
in 24 hours will be auto-generated for you.
Claims should be stored in a JSON compatible file. In the examples
below, we've stored the claims into a file named ``claims.json``.
py\_vapid can either be installed as a library or used as a stand along
app, ``bin/vapid``.
App Installation
----------------
You'll need ``python virtualenv`` Run that in the current directory.
Then run
::
bin/pip install -r requirements.txt
bin/python setup.py install
App Usage
---------
Run by itself, ``bin/vapid`` will check and optionally create the
public\_key.pem and private\_key.pem files.
``bin/vapid --gen`` can be used to generate a new set of public and
private key PEM files. These will overwrite the contents of
``private_key.pem`` and ``public_key.pem``.
``bin/vapid --sign claims.json`` will generate a set of HTTP headers
from a JSON formatted claims file. A sample ``claims.json`` is included
with this distribution.
``bin/vapid --sign claims.json --json`` will output the headers in JSON
format, which may be useful for other programs.
``bin/vapid --applicationServerKey`` will return the
``applicationServerKey`` value you can use to make a restricted
endpoint. See
https://developer.mozilla.org/en-US/docs/Web/API/PushManager/subscribe
for more details. Be aware that this value is tied to the generated
public/private key. If you remove or generate a new key, any restricted
URL you've previously generated will need to be reallocated. Please note
that some User Agents may require you `to decode this string into a
Uint8Array <https://github.com/GoogleChrome/push-notifications/blob/master/app/scripts/main.js>`__.
See ``bin/vapid -h`` for all options and commands.

115
py_vapid.egg-info/PKG-INFO Normal file
View File

@ -0,0 +1,115 @@
Metadata-Version: 1.1
Name: py-vapid
Version: 1.4.0
Summary: Simple VAPID header generation library
Home-page: https://github.com/mozilla-services/vapid
Author: JR Conlin
Author-email: src+vapid@jrconlin.com
License: MPL2
Description: Easy VAPID generation
=====================
This minimal library contains the minimal set of functions you need to
generate a VAPID key set and get the headers you'll need to sign a
WebPush subscription update.
VAPID is a voluntary standard for WebPush subscription providers (sites
that send WebPush updates to remote customers) to self-identify to Push
Servers (the servers that convey the push notifications).
The VAPID "claims" are a set of JSON keys and values. There are two
required fields, one semi-optional and several optional additional
fields.
At a minimum a VAPID claim set should look like:
::
{"sub":"mailto:YourEmail@YourSite.com","aud":"https://PushServer","exp":"ExpirationTimestamp"}
A few notes:
***sub*** is the email address you wish to have on record for this
request, prefixed with "``mailto:``". If things go wrong, this is the
email that will be used to contact you (for instance). This can be a
general delivery address like "``mailto:push_operations@example.com``"
or a specific address like "``mailto:bob@example.com``".
***aud*** is the audience for the VAPID. This is the scheme and host you
use to send subscription endpoints and generally coincides with the
``endpoint`` specified in the Subscription Info block.
As example, if a WebPush subscription info contains:
``{"endpoint": "https://push.example.com:8012/v1/push/...", ...}``
then the ``aud`` would be "``https://push.example.com:8012``"
While some Push Services consider this an optional field, others may be
stricter.
***exp*** This is the UTC timestamp for when this VAPID request will
expire. The maximum period is 24 hours. Setting a shorter period can
prevent "replay" attacks. Setting a longer period allows you to reuse
headers for multiple sends (e.g. if you're sending hundreds of updates
within an hour or so.) If no ``exp`` is included, one that will expire
in 24 hours will be auto-generated for you.
Claims should be stored in a JSON compatible file. In the examples
below, we've stored the claims into a file named ``claims.json``.
py\_vapid can either be installed as a library or used as a stand along
app, ``bin/vapid``.
App Installation
----------------
You'll need ``python virtualenv`` Run that in the current directory.
Then run
::
bin/pip install -r requirements.txt
bin/python setup.py install
App Usage
---------
Run by itself, ``bin/vapid`` will check and optionally create the
public\_key.pem and private\_key.pem files.
``bin/vapid --gen`` can be used to generate a new set of public and
private key PEM files. These will overwrite the contents of
``private_key.pem`` and ``public_key.pem``.
``bin/vapid --sign claims.json`` will generate a set of HTTP headers
from a JSON formatted claims file. A sample ``claims.json`` is included
with this distribution.
``bin/vapid --sign claims.json --json`` will output the headers in JSON
format, which may be useful for other programs.
``bin/vapid --applicationServerKey`` will return the
``applicationServerKey`` value you can use to make a restricted
endpoint. See
https://developer.mozilla.org/en-US/docs/Web/API/PushManager/subscribe
for more details. Be aware that this value is tied to the generated
public/private key. If you remove or generate a new key, any restricted
URL you've previously generated will need to be reallocated. Please note
that some User Agents may require you `to decode this string into a
Uint8Array <https://github.com/GoogleChrome/push-notifications/blob/master/app/scripts/main.js>`__.
See ``bin/vapid -h`` for all options and commands.
Keywords: vapid push webpush
Platform: UNKNOWN
Classifier: Topic :: Internet :: WWW/HTTP
Classifier: Programming Language :: Python
Classifier: Programming Language :: Python :: 2
Classifier: Programming Language :: Python :: 2.7
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.4
Classifier: Programming Language :: Python :: 3.5

View File

@ -0,0 +1,21 @@
CHANGELOG.md
MANIFEST.in
README.md
README.rst
reqs.txt
requirements.txt
setup.cfg
setup.py
test-requirements.txt
py_vapid/__init__.py
py_vapid/jwt.py
py_vapid/main.py
py_vapid/utils.py
py_vapid.egg-info/PKG-INFO
py_vapid.egg-info/SOURCES.txt
py_vapid.egg-info/dependency_links.txt
py_vapid.egg-info/entry_points.txt
py_vapid.egg-info/not-zip-safe
py_vapid.egg-info/requires.txt
py_vapid.egg-info/top_level.txt
py_vapid/tests/test_vapid.py

View File

@ -0,0 +1 @@

View File

@ -0,0 +1,4 @@
[console_scripts]
vapid = py_vapid.main:main

View File

@ -0,0 +1 @@

View File

@ -0,0 +1 @@
cryptography>=1.8.2

View File

@ -0,0 +1 @@
py_vapid

342
py_vapid/__init__.py Normal file
View File

@ -0,0 +1,342 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
import os
import logging
import binascii
import time
import re
import copy
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import ec, utils as ecutils
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives import hashes
from cryptography.exceptions import InvalidSignature
from py_vapid.utils import b64urldecode, b64urlencode
from py_vapid.jwt import sign
# Show compliance version. For earlier versions see previously tagged releases.
VERSION = "VAPID-DRAFT-02/ECE-DRAFT-07"
class VapidException(Exception):
"""An exception wrapper for Vapid."""
pass
class Vapid01(object):
"""Minimal VAPID Draft 01 signature generation library.
https://tools.ietf.org/html/draft-ietf-webpush-vapid-01
"""
_private_key = None
_public_key = None
_schema = "WebPush"
def __init__(self, private_key=None):
"""Initialize VAPID with an optional private key.
:param private_key: A private key object
:type private_key: ec.EllipticCurvePrivateKey
"""
self.private_key = private_key
if private_key:
self._public_key = self.private_key.public_key()
@classmethod
def from_raw(cls, private_raw):
"""Initialize VAPID using a private key point in "raw" or
"uncompressed" form. Raw keys consist of a single, 32 octet
encoded integer.
:param private_raw: A private key point in uncompressed form.
:type private_raw: bytes
"""
key = ec.derive_private_key(
int(binascii.hexlify(b64urldecode(private_raw)), 16),
curve=ec.SECP256R1(),
backend=default_backend())
return cls(key)
@classmethod
def from_raw_public(cls, public_raw):
key = ec.EllipticCurvePublicNumbers.from_encoded_point(
curve=ec.SECP256R1(),
data=b64urldecode(public_raw)
).public_key(default_backend())
ss = cls()
ss._public_key = key
return ss
@classmethod
def from_pem(cls, private_key):
"""Initialize VAPID using a private key in PEM format.
:param private_key: A private key in PEM format.
:type private_key: bytes
"""
# not sure why, but load_pem_private_key fails to deserialize
return cls.from_der(
b''.join(private_key.splitlines()[1:-1]))
@classmethod
def from_der(cls, private_key):
"""Initialize VAPID using a private key in DER format.
:param private_key: A private key in DER format and Base64-encoded.
:type private_key: bytes
"""
key = serialization.load_der_private_key(b64urldecode(private_key),
password=None,
backend=default_backend())
return cls(key)
@classmethod
def from_file(cls, private_key_file=None):
"""Initialize VAPID using a file containing a private key in PEM or
DER format.
:param private_key_file: Name of the file containing the private key
:type private_key_file: str
"""
if not os.path.isfile(private_key_file):
vapid = cls()
vapid.generate_keys()
vapid.save_key(private_key_file)
return vapid
with open(private_key_file, 'r') as file:
private_key = file.read()
try:
if "-----BEGIN" in private_key:
vapid = cls.from_pem(private_key.encode('utf8'))
else:
vapid = cls.from_der(private_key.encode('utf8'))
return vapid
except Exception as exc:
logging.error("Could not open private key file: %s", repr(exc))
raise VapidException(exc)
@classmethod
def from_string(cls, private_key):
"""Initialize VAPID using a string containing the private key. This
will try to determine if the key is in RAW or DER format.
:param private_key: String containing the key info
:type private_key: str
"""
pkey = private_key.encode().replace(b"\n", b"")
key = b64urldecode(pkey)
if len(key) == 32:
return cls.from_raw(pkey)
return cls.from_der(pkey)
@classmethod
def verify(cls, key, auth):
"""Verify a VAPID authorization token.
:param key: base64 serialized public key
:type key: str
:param auth: authorization token
type key: str
"""
tokens = auth.rsplit(' ', 1)[1].rsplit('.', 1)
kp = cls().from_raw_public(key.encode())
return kp.verify_token(
validation_token=tokens[0].encode(),
verification_token=tokens[1]
)
@property
def private_key(self):
"""The VAPID private ECDSA key"""
if not self._private_key:
raise VapidException("No private key. Call generate_keys()")
return self._private_key
@private_key.setter
def private_key(self, value):
"""Set the VAPID private ECDSA key
:param value: the byte array containing the private ECDSA key data
:type value: ec.EllipticCurvePrivateKey
"""
self._private_key = value
if value:
self._public_key = self.private_key.public_key()
@property
def public_key(self):
"""The VAPID public ECDSA key
The public key is currently read only. Set it via the `.private_key`
method. This will autogenerate a public and private key if no value
has been set.
:returns ec.EllipticCurvePublicKey
"""
return self._public_key
def generate_keys(self):
"""Generate a valid ECDSA Key Pair."""
self.private_key = ec.generate_private_key(ec.SECP256R1,
default_backend())
def private_pem(self):
return self.private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
)
def public_pem(self):
return self.public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
def save_key(self, key_file):
"""Save the private key to a PEM file.
:param key_file: The file path to save the private key data
:type key_file: str
"""
with open(key_file, "wb") as file:
file.write(self.private_pem())
file.close()
def save_public_key(self, key_file):
"""Save the public key to a PEM file.
:param key_file: The name of the file to save the public key
:type key_file: str
"""
with open(key_file, "wb") as file:
file.write(self.public_pem())
file.close()
def verify_token(self, validation_token, verification_token):
"""Internally used to verify the verification token is correct.
:param validation_token: Provided validation token string
:type validation_token: str
:param verification_token: Generated verification token
:type verification_token: str
:returns: Boolean indicating if verifictation token is valid.
:rtype: boolean
"""
hsig = b64urldecode(verification_token.encode('utf8'))
r = int(binascii.hexlify(hsig[:32]), 16)
s = int(binascii.hexlify(hsig[32:]), 16)
try:
self.public_key.verify(
ecutils.encode_dss_signature(r, s),
validation_token,
signature_algorithm=ec.ECDSA(hashes.SHA256())
)
return True
except InvalidSignature:
return False
def _base_sign(self, claims):
cclaims = copy.deepcopy(claims)
if not cclaims.get('exp'):
cclaims['exp'] = str(int(time.time()) + 86400)
if not re.match("mailto:.+@.+\..+",
cclaims.get('sub', ''),
re.IGNORECASE):
raise VapidException(
"Missing 'sub' from claims. "
"'sub' is your admin email as a mailto: link.")
if not re.match("^https?:\/\/[^\/\.:]+\.[^\/:]+(:\d+)?$",
cclaims.get("aud", ""),
re.IGNORECASE):
raise VapidException(
"Missing 'aud' from claims. "
"'aud' is the scheme, host and optional port for this "
"transaction e.g. https://example.com:8080")
return cclaims
def sign(self, claims, crypto_key=None):
"""Sign a set of claims.
:param claims: JSON object containing the JWT claims to use.
:type claims: dict
:param crypto_key: Optional existing crypto_key header content. The
vapid public key will be appended to this data.
:type crypto_key: str
:returns: a hash containing the header fields to use in
the subscription update.
:rtype: dict
"""
sig = sign(self._base_sign(claims), self.private_key)
pkey = 'p256ecdsa='
pkey += b64urlencode(
self.public_key.public_numbers().encode_point())
if crypto_key:
crypto_key = crypto_key + ';' + pkey
else:
crypto_key = pkey
return {"Authorization": "{} {}".format(self._schema, sig.strip('=')),
"Crypto-Key": crypto_key}
class Vapid02(Vapid01):
"""Minimal Vapid 02 signature generation library
https://tools.ietf.org/html/draft-ietf-webpush-vapid-02
"""
_schema = "vapid"
def sign(self, claims, crypto_key=None):
sig = sign(self._base_sign(claims), self.private_key)
pkey = self.public_key.public_numbers().encode_point()
return{
"Authorization": "{schema} t={t},k={k}".format(
schema=self._schema,
t=sig,
k=b64urlencode(pkey)
)
}
@classmethod
def verify(cls, auth):
pref_tok = auth.rsplit(' ', 1)
assert pref_tok[0].lower() == cls._schema, (
"Incorrect schema specified")
parts = {}
for tok in pref_tok[1].split(','):
kv = tok.split('=', 1)
parts[kv[0]] = kv[1]
assert 'k' in parts.keys(), (
"Auth missing public key 'k' value")
assert 't' in parts.keys(), (
"Auth missing token set 't' value")
kp = cls().from_raw_public(parts['k'].encode())
tokens = parts['t'].rsplit('.', 1)
return kp.verify_token(
validation_token=tokens[0].encode(),
verification_token=tokens[1]
)
Vapid = Vapid01

88
py_vapid/jwt.py Normal file
View File

@ -0,0 +1,88 @@
import binascii
import json
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import ec, utils
from cryptography.hazmat.primitives import hashes
from py_vapid.utils import b64urldecode, b64urlencode, num_to_bytes
def extract_signature(auth):
"""Extracts the payload and signature from a JWT, converting from RFC7518
to RFC 3279
:param auth: A JWT Authorization Token.
:type auth: str
:return tuple containing the signature material and signature
"""
payload, asig = auth.encode('utf8').rsplit(b'.', 1)
sig = b64urldecode(asig)
if len(sig) != 64:
raise InvalidSignature()
encoded = utils.encode_dss_signature(
s=int(binascii.hexlify(sig[32:]), 16),
r=int(binascii.hexlify(sig[:32]), 16)
)
return payload, encoded
def decode(token, key):
"""Decode a web token into an assertion dictionary
:param token: VAPID auth token
:type token: str
:param key: bitarray containing the public key
:type key: str
:return dict of the VAPID claims
:raise InvalidSignature
"""
try:
sig_material, signature = extract_signature(token)
dkey = b64urldecode(key.encode('utf8'))
pkey = ec.EllipticCurvePublicNumbers.from_encoded_point(
ec.SECP256R1(),
dkey,
).public_key(default_backend())
pkey.verify(
signature,
sig_material,
ec.ECDSA(hashes.SHA256())
)
return json.loads(
b64urldecode(sig_material.split(b'.')[1]).decode('utf8')
)
except InvalidSignature:
raise
except(ValueError, TypeError, binascii.Error):
raise InvalidSignature()
def sign(claims, key):
"""Sign the claims
:param claims: list of JWS claims
:type claims: dict
:param key: Private key for signing
:type key: ec.EllipticCurvePrivateKey
:param algorithm: JWT "alg" descriptor
:type algorithm: str
"""
header = b64urlencode(b"""{"typ":"JWT","alg":"ES256"}""")
# Unfortunately, chrome seems to require the claims to be sorted.
claims = b64urlencode(json.dumps(claims,
separators=(',', ':'),
sort_keys=True).encode('utf8'))
token = "{}.{}".format(header, claims)
rsig = key.sign(token.encode('utf8'), ec.ECDSA(hashes.SHA256()))
(r, s) = utils.decode_dss_signature(rsig)
sig = b64urlencode(num_to_bytes(r) + num_to_bytes(s))
return "{}.{}".format(token, sig)

106
py_vapid/main.py Normal file
View File

@ -0,0 +1,106 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
import argparse
import os
import json
from py_vapid import Vapid01, Vapid02, b64urlencode
def prompt(prompt):
# Not sure why, but python3 throws and exception if you try to
# monkeypatch for this. It's ugly, but this seems to play nicer.
try:
return input(prompt)
except NameError:
return raw_input(prompt) # noqa: F821
def main():
parser = argparse.ArgumentParser(description="VAPID tool")
parser.add_argument('--sign', '-s', help='claims file to sign')
parser.add_argument('--gen', '-g', help='generate new key pairs',
default=False, action="store_true")
parser.add_argument('--version2', '-2', help="use VAPID spec Draft-02",
default=False, action="store_true")
parser.add_argument('--version1', '-1', help="use VAPID spec Draft-01",
default=True, action="store_true")
parser.add_argument('--json', help="dump as json",
default=False, action="store_true")
parser.add_argument('--applicationServerKey',
help="show applicationServerKey value",
default=False, action="store_true")
args = parser.parse_args()
# Added to solve 2.7 => 3.* incompatibility
Vapid = Vapid01
if args.version2:
Vapid = Vapid02
if args.gen or not os.path.exists('private_key.pem'):
if not args.gen:
print("No private_key.pem file found.")
answer = None
while answer not in ['y', 'n']:
answer = prompt("Do you want me to create one for you? (Y/n)")
if not answer:
answer = 'y'
answer = answer.lower()[0]
if answer == 'n':
print("Sorry, can't do much for you then.")
exit(1)
vapid = Vapid()
vapid.generate_keys()
print("Generating private_key.pem")
vapid.save_key('private_key.pem')
print("Generating public_key.pem")
vapid.save_public_key('public_key.pem')
vapid = Vapid.from_file('private_key.pem')
claim_file = args.sign
result = dict()
if args.applicationServerKey:
raw_pub = vapid.public_key.public_numbers().encode_point()
print("Application Server Key = {}\n\n".format(
b64urlencode(raw_pub)))
if claim_file:
if not os.path.exists(claim_file):
print("No {} file found.".format(claim_file))
print("""
The claims file should be a JSON formatted file that holds the
information that describes you. There are three elements in the claims
file you'll need:
"sub" This is your site's admin email address
(e.g. "mailto:admin@example.com")
"exp" This is the expiration time for the claim in seconds. If you don't
have one, I'll add one that expires in 24 hours.
You're also welcome to add additional fields to the claims which could be
helpful for the Push Service operations team to pass along to your operations
team (e.g. "ami-id": "e-123456", "cust-id": "a3sfa10987"). Remember to keep
these values short to prevent some servers from rejecting the transaction due
to overly large headers. See https://jwt.io/introduction/ for details.
For example, a claims.json file could contain:
{"sub": "mailto:admin@example.com"}
""")
exit(1)
try:
claims = json.loads(open(claim_file).read())
result.update(vapid.sign(claims))
except Exception as exc:
print("Crap, something went wrong: {}".format(repr(exc)))
raise exc
if args.json:
print(json.dumps(result))
return
print("Include the following headers in your request:\n")
for key, value in result.items():
print("{}: {}\n".format(key, value))
print("\n")
if __name__ == '__main__':
main()

View File

@ -0,0 +1,235 @@
import binascii
import base64
import copy
import os
import json
import unittest
from nose.tools import eq_, ok_
from mock import patch, Mock
from py_vapid import Vapid01, Vapid02, VapidException
from py_vapid.jwt import decode
# This is a private key in DER form.
T_DER = """
MHcCAQEEIPeN1iAipHbt8+/KZ2NIF8NeN24jqAmnMLFZEMocY8RboAoGCCqGSM49
AwEHoUQDQgAEEJwJZq/GN8jJbo1GGpyU70hmP2hbWAUpQFKDByKB81yldJ9GTklB
M5xqEwuPM7VuQcyiLDhvovthPIXx+gsQRQ==
"""
key = dict(
d=111971876876285331364078054667935803036831194031221090723024134705696601261147, # noqa
x=7512698603580564493364310058109115206932767156853859985379597995200661812060, # noqa
y=74837673548863147047276043384733294240255217876718360423043754089982135570501 # noqa
)
# This is the same private key, in PEM form.
T_PRIVATE = ("-----BEGIN PRIVATE KEY-----{}"
"-----END PRIVATE KEY-----\n").format(T_DER)
# This is the same private key, as a point in uncompressed form. This should
# be Base64url-encoded without padding.
T_RAW = """
943WICKkdu3z78pnY0gXw143biOoCacwsVkQyhxjxFs
""".strip().encode('utf8')
# This is a public key in PEM form.
T_PUBLIC = """-----BEGIN PUBLIC KEY-----
MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEEJwJZq/GN8jJbo1GGpyU70hmP2hb
WAUpQFKDByKB81yldJ9GTklBM5xqEwuPM7VuQcyiLDhvovthPIXx+gsQRQ==
-----END PUBLIC KEY-----
"""
# this is a public key in uncompressed form ('\x04' + 2 * 32 octets)
# Remember, this should have any padding stripped.
T_PUBLIC_RAW = (
"BBCcCWavxjfIyW6NRhqclO9IZj9oW1gFKUBSgwcigfNc"
"pXSfRk5JQTOcahMLjzO1bkHMoiw4b6L7YTyF8foLEEU"
).strip('=').encode('utf8')
def setUp(self):
with open('/tmp/private', 'w') as ff:
ff.write(T_PRIVATE)
with open('/tmp/public', 'w') as ff:
ff.write(T_PUBLIC)
with open('/tmp/private.der', 'w') as ff:
ff.write(T_DER)
def tearDown(self):
os.unlink('/tmp/private')
os.unlink('/tmp/public')
class VapidTestCase(unittest.TestCase):
def check_keys(self, v):
eq_(v.private_key.private_numbers().private_value, key.get('d'))
eq_(v.public_key.public_numbers().x, key.get('x'))
eq_(v.public_key.public_numbers().y, key.get('y'))
def test_init(self):
v1 = Vapid01.from_file("/tmp/private")
self.check_keys(v1)
v2 = Vapid01.from_pem(T_PRIVATE.encode())
self.check_keys(v2)
v3 = Vapid01.from_der(T_DER.encode())
self.check_keys(v3)
v4 = Vapid01.from_file("/tmp/private.der")
self.check_keys(v4)
no_exist = '/tmp/not_exist'
Vapid01.from_file(no_exist)
ok_(os.path.isfile(no_exist))
os.unlink(no_exist)
def repad(self, data):
return data + "===="[len(data) % 4:]
@patch("py_vapid.Vapid01.from_pem", side_effect=Exception)
def test_init_bad_read(self, mm):
self.assertRaises(Exception,
Vapid01.from_file,
private_key_file="/tmp/private")
def test_gen_key(self):
v = Vapid01()
v.generate_keys()
ok_(v.public_key)
ok_(v.private_key)
def test_private_key(self):
v = Vapid01()
self.assertRaises(VapidException,
lambda: v.private_key)
def test_public_key(self):
v = Vapid01()
eq_(v._private_key, None)
eq_(v._public_key, None)
def test_save_key(self):
v = Vapid01()
v.generate_keys()
v.save_key("/tmp/p2")
os.unlink("/tmp/p2")
def test_same_public_key(self):
v = Vapid01()
v.generate_keys()
v.save_public_key("/tmp/p2")
os.unlink("/tmp/p2")
def test_from_raw(self):
v = Vapid01.from_raw(T_RAW)
self.check_keys(v)
def test_from_string(self):
v1 = Vapid01.from_string(T_DER)
v2 = Vapid01.from_string(T_RAW.decode())
self.check_keys(v1)
self.check_keys(v2)
def test_sign_01(self):
v = Vapid01.from_string(T_DER)
claims = {"aud": "https://example.com",
"sub": "mailto:admin@example.com"}
result = v.sign(claims, "id=previous")
eq_(result['Crypto-Key'],
'id=previous;p256ecdsa=' + T_PUBLIC_RAW.decode('utf8'))
pkey = binascii.b2a_base64(
v.public_key.public_numbers().encode_point()
).decode('utf8').replace('+', '-').replace('/', '_').strip()
items = decode(result['Authorization'].split(' ')[1], pkey)
for k in claims:
eq_(items[k], claims[k])
result = v.sign(claims)
eq_(result['Crypto-Key'],
'p256ecdsa=' + T_PUBLIC_RAW.decode('utf8'))
# Verify using the same function as Integration
# this should ensure that the r,s sign values are correctly formed
ok_(Vapid01.verify(
key=result['Crypto-Key'].split('=')[1],
auth=result['Authorization']
))
def test_sign_02(self):
v = Vapid02.from_file("/tmp/private")
claims = {"aud": "https://example.com",
"sub": "mailto:admin@example.com",
"foo": "extra value"}
claim_check = copy.deepcopy(claims)
result = v.sign(claims, "id=previous")
auth = result['Authorization']
eq_(auth[:6], 'vapid ')
ok_(' t=' in auth)
ok_(',k=' in auth)
parts = auth[6:].split(',')
eq_(len(parts), 2)
t_val = json.loads(base64.urlsafe_b64decode(
self.repad(parts[0][2:].split('.')[1])
).decode('utf8'))
k_val = binascii.a2b_base64(self.repad(parts[1][2:]))
eq_(binascii.hexlify(k_val)[:2], b'04')
eq_(len(k_val), 65)
eq_(claims, claim_check)
for k in claims:
eq_(t_val[k], claims[k])
def test_integration(self):
# These values were taken from a test page. DO NOT ALTER!
key = ("BDd3_hVL9fZi9Ybo2UUzA284WG5FZR30_95YeZJsiApwXKpNcF1rRPF3foI"
"iBHXRdJI2Qhumhf6_LFTeZaNndIo")
auth = ("eyJ0eXAiOiJKV1QiLCJhbGciOiJFUzI1NiJ9.eyJhdWQiOiJod"
"HRwczovL3VwZGF0ZXMucHVzaC5zZXJ2aWNlcy5tb3ppbGxhLmNvbSIsImV"
"4cCI6MTQ5NDY3MTQ3MCwic3ViIjoibWFpbHRvOnNpbXBsZS1wdXNoLWRlb"
"W9AZ2F1bnRmYWNlLmNvLnVrIn0.LqPi86T-HJ71TXHAYFptZEHD7Wlfjcc"
"4u5jYZ17WpqOlqDcW-5Wtx3x1OgYX19alhJ9oLumlS2VzEvNioZolQA")
ok_(Vapid01.verify(key=key, auth="webpush {}".format(auth)))
ok_(Vapid02.verify(auth="vapid t={},k={}".format(auth, key)))
def test_bad_integration(self):
# These values were taken from a test page. DO NOT ALTER!
key = ("BDd3_hVL9fZi9Ybo2UUzA284WG5FZR30_95YeZJsiApwXKpNcF1rRPF3foI"
"iBHXRdJI2Qhumhf6_LFTeZaNndIo")
auth = ("WebPush eyJ0eXAiOiJKV1QiLCJhbGciOiJFUzI1NiJ9.eyJhdWQiOiJod"
"HRwczovL3VwZGF0ZXMucHVzaC5zZXJ2aWNlcy5tb3ppbGxhLmNvbSIsImV"
"4cCI6MTQ5NDY3MTQ3MCwic3ViIjoibWFpbHRvOnNpbXBsZS1wdXNoLWRlb"
"W9AZ2F1bnRmYWNlLmNvLnVrIn0.LqPi86T-HJ71TXHAYFptZEHD7Wlfjcc"
"4u5jYZ17WpqOlqDcW-5Wtx3x1OgYX19alhJ9oLumlS2VzEvNioZ_BAD")
eq_(Vapid01.verify(key=key, auth=auth), False)
def test_bad_sign(self):
v = Vapid01.from_file("/tmp/private")
self.assertRaises(VapidException,
v.sign,
{})
self.assertRaises(VapidException,
v.sign,
{'sub': 'foo',
'aud': "p.example.com"})
self.assertRaises(VapidException,
v.sign,
{'sub': 'mailto:foo@bar.com',
'aud': "p.example.com"})
self.assertRaises(VapidException,
v.sign,
{'sub': 'mailto:foo@bar.com',
'aud': "https://p.example.com:8080/"})
@patch('cryptography.hazmat.primitives.asymmetric'
'.ec.EllipticCurvePublicNumbers')
def test_invalid_sig(self, mm):
from cryptography.exceptions import InvalidSignature
ve = Mock()
ve.verify.side_effect = InvalidSignature
pk = Mock()
pk.public_key.return_value = ve
mm.from_encoded_point.return_value = pk
self.assertRaises(InvalidSignature,
decode,
'foo.bar.blat',
'aaaa')
self.assertRaises(InvalidSignature,
decode,
'foo.bar.a',
'aaaa')

36
py_vapid/utils.py Normal file
View File

@ -0,0 +1,36 @@
import base64
import binascii
def b64urldecode(data):
"""Decodes an unpadded Base64url-encoded string.
:param data: data bytes to decode
:type data: bytes
:returns bytes
"""
return base64.urlsafe_b64decode(data + b"===="[len(data) % 4:])
def b64urlencode(data):
"""Encode a byte string into a Base64url-encoded string without padding
:param data: data bytes to encode
:type data: bytes
:returns str
"""
return base64.urlsafe_b64encode(data).replace(b'=', b'').decode('utf8')
def num_to_bytes(n):
"""Returns the byte representation of an integer, in big-endian order.
:param n: The integer to encode.
:type n: int
:returns bytes
"""
h = '%x' % n
return binascii.unhexlify('0' * (len(h) % 2) + h)

21
reqs.txt Normal file
View File

@ -0,0 +1,21 @@
asn1crypto==0.24.0
cffi==1.11.5
configparser==3.5.0
coverage==4.5.1
cryptography==2.0.3
ecdsa==0.13
enum34==1.1.6
flake8==3.5.0
funcsigs==1.0.2
idna==2.6
ipaddress==1.0.19
mccabe==0.6.1
mock==2.0.0
nose==1.3.7
pbr==3.1.1
pkg-resources==0.0.0
-e git+git@github.com:web-push-libs/vapid.git@26d2465cc5e2f97fb6e891f14feea4f4459d8f5f#egg=py_vapid&subdirectory=python
pycodestyle==2.3.1
pycparser==2.18
pyflakes==1.6.0
six==1.11.0

1
requirements.txt Normal file
View File

@ -0,0 +1 @@
cryptography>=1.8.2

13
setup.cfg Normal file
View File

@ -0,0 +1,13 @@
[nosetests]
verbose = True
verbosity = 1
cover-tests = True
cover-erase = True
with-coverage = True
detailed-errors = True
cover-package = py_vapid
[egg_info]
tag_build =
tag_date = 0

59
setup.py Normal file
View File

@ -0,0 +1,59 @@
import io
import os
from setuptools import setup, find_packages
__version__ = "1.4.0"
def read_from(file):
reply = []
with io.open(os.path.join(here, file), encoding='utf8') as f:
for l in f:
l = l.strip()
if not l:
break
if l[:2] == '-r':
reply += read_from(l.split(' ')[1])
continue
if l[0] != '#' or l[:2] != '//':
reply.append(l)
return reply
here = os.path.abspath(os.path.dirname(__file__))
with io.open(os.path.join(here, 'README.rst'), encoding='utf8') as f:
README = f.read()
with io.open(os.path.join(here, 'CHANGELOG.md'), encoding='utf8') as f:
CHANGES = f.read()
setup(name="py-vapid",
version=__version__,
description='Simple VAPID header generation library',
long_description=README + '\n\n' + CHANGES,
classifiers=["Topic :: Internet :: WWW/HTTP",
"Programming Language :: Python",
"Programming Language :: Python :: 2",
"Programming Language :: Python :: 2.7",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.4",
"Programming Language :: Python :: 3.5",
],
keywords='vapid push webpush',
author="JR Conlin",
author_email="src+vapid@jrconlin.com",
url='https://github.com/mozilla-services/vapid',
license="MPL2",
test_suite="nose.collector",
include_package_data=True,
zip_safe=False,
packages=find_packages(),
package_data={'': ['README.md', 'CHANGELOG.md',
'requirements.txt', 'test-requirements.txt']},
install_requires=read_from('requirements.txt'),
tests_require=read_from('test-requirements.txt'),
entry_points="""
[console_scripts]
vapid = py_vapid.main:main
""",
)

4
test-requirements.txt Normal file
View File

@ -0,0 +1,4 @@
nose
coverage
mock>=1.0.1
flake8