authentic/src/authentic2/crypto.py

211 lines
6.6 KiB
Python

# authentic2 - versatile identity manager
# Copyright (C) 2010-2019 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import base64
import hashlib
import hmac
import struct
from Cryptodome import Random
from Cryptodome.Cipher import AES
from Cryptodome.Hash import HMAC, SHA256
from Cryptodome.Protocol.KDF import PBKDF2
from django.utils.crypto import constant_time_compare
from django.utils.encoding import force_bytes
from django.utils.six import text_type
from authentic2.compat.misc import Base64Error
class DecryptionError(Exception):
pass
def base64url_decode(raw):
rem = len(raw) % 4
if rem > 0:
raw += b'=' * (4 - rem)
return base64.urlsafe_b64decode(raw)
def base64url_encode(raw):
return base64.urlsafe_b64encode(raw).rstrip(b'=')
def get_hashclass(name):
if name in ['md5', 'sha1', 'sha256', 'sha384', 'sha512']:
return getattr(hashlib, name)
return None
def aes_base64_encrypt(key, data):
"""Generate an AES key from any key material using PBKDF2, and encrypt data using CFB mode. A
new IV is generated each time, the IV is also used as salt for PBKDF2.
"""
iv = Random.get_random_bytes(16)
aes_key = PBKDF2(key, iv)
aes = AES.new(aes_key, AES.MODE_CFB, iv=iv)
crypted = aes.encrypt(data)
return b'%s$%s' % (base64.b64encode(iv), base64.b64encode(crypted))
def aes_base64_decrypt(key, payload, raise_on_error=True):
'''Decrypt data encrypted with aes_base64_encrypt'''
if not isinstance(payload, bytes):
try:
payload = payload.encode('ascii')
except Exception:
raise DecryptionError('payload is not an ASCII string')
try:
iv, crypted = payload.split(b'$')
except (ValueError, TypeError):
if raise_on_error:
raise DecryptionError('bad payload')
return None
try:
iv = base64.b64decode(iv)
crypted = base64.b64decode(crypted)
except Base64Error:
if raise_on_error:
raise DecryptionError('incorrect base64 encoding')
return None
aes_key = PBKDF2(key, iv)
aes = AES.new(aes_key, AES.MODE_CFB, iv=iv)
return aes.decrypt(crypted)
def add_padding(msg, block_size):
'''Pad message with zero bytes to match block_size'''
pad_length = block_size - (len(msg) + 2) % block_size
padded = struct.pack('<h%ds%ds' % (len(msg), pad_length), len(msg), msg, b'\0' * pad_length)
assert len(padded) % block_size == 0
return padded
def remove_padding(msg, block_size):
'''Ignore padded zero bytes'''
try:
(msg_length,) = struct.unpack('<h', msg[:2])
except struct.error:
raise DecryptionError('wrong padding')
if len(msg) % block_size != 0:
raise DecryptionError('message length is not a multiple of block size', len(msg), block_size)
unpadded = msg[2 : 2 + msg_length]
if msg_length > len(msg) - 2:
raise DecryptionError('wrong padding')
if len(msg[2 + msg_length :].strip(force_bytes('\0'))):
raise DecryptionError('padding is not all zero')
if len(unpadded) != msg_length:
raise DecryptionError('wrong padding')
return unpadded
def aes_base64url_deterministic_encrypt(key, data, salt, hash_name='sha256', count=1):
"""Encrypt using AES-128 and sign using HMAC-SHA256 shortened to 64 bits.
Count and algorithm are encoded in the final string for future evolution.
"""
mode = 1 # AES128-SHA256
hashmod = SHA256
key_size = 16
hmac_size = key_size
if isinstance(salt, text_type):
salt = force_bytes(salt)
iv = hashmod.new(salt).digest()
def prf(secret, salt):
return HMAC.new(secret, salt, hashmod).digest()
aes_key = PBKDF2(key, iv, dkLen=key_size, count=count, prf=prf)
key_size = len(aes_key)
aes = AES.new(aes_key, AES.MODE_CBC, iv[:key_size])
crypted = aes.encrypt(add_padding(data, key_size))
hmac = prf(key, crypted)[:hmac_size]
raw = struct.pack('<2sBH', b'a2', mode, count) + crypted + hmac
return base64url_encode(raw)
def aes_base64url_deterministic_decrypt(key, urlencoded, salt, raise_on_error=True, max_count=1):
mode = 1 # AES128-SHA256
hashmod = SHA256
key_size = 16
hmac_size = key_size
def prf(secret, salt):
return HMAC.new(secret, salt, hashmod).digest()
try:
try:
raw = base64url_decode(urlencoded)
except Exception as e:
raise DecryptionError('base64 decoding failed', e)
try:
magic, mode, count = struct.unpack('<2sBH', raw[:5])
except struct.error as e:
raise DecryptionError('invalid packing', e)
if magic != b'a2':
raise DecryptionError('invalid magic string', magic)
if mode != 1:
raise DecryptionError('mode is not AES128-SHA256', mode)
if count > max_count:
raise DecryptionError('count is too big', count)
crypted, hmac = raw[5:-hmac_size], raw[-hmac_size:]
if not crypted or not hmac or prf(key, crypted)[:hmac_size] != hmac:
raise DecryptionError('invalid HMAC')
if isinstance(salt, text_type):
salt = force_bytes(salt)
iv = hashmod.new(salt).digest()
aes_key = PBKDF2(key, iv, dkLen=key_size, count=count, prf=prf)
aes = AES.new(aes_key, AES.MODE_CBC, iv[:key_size])
data = remove_padding(aes.decrypt(crypted), key_size)
return data
except DecryptionError:
if not raise_on_error:
return None
raise
def hmac_url(key, url):
if hasattr(key, 'isnumeric'):
key = key.encode('utf-8')
if hasattr(url, 'isnumeric'):
url = url.encode('utf-8', 'replace')
return (
base64.b32encode(hmac.HMAC(key=key, msg=url, digestmod=hashlib.sha256).digest())
.decode('ascii')
.strip('=')
)
def check_hmac_url(key, url, signature):
if hasattr(signature, 'isnumeric'):
signature = signature.encode('utf-8', 'replace')
return constant_time_compare(signature, hmac_url(key, url).encode('ascii'))