passerelle/tests/test_cryptor.py

183 lines
8.1 KiB
Python

import base64
import pytest
from django.contrib.contenttypes.models import ContentType
from django.core.exceptions import ValidationError
from django.utils.encoding import force_str
import tests.utils
from passerelle.apps.cryptor.models import CryptedFile, Cryptor
from passerelle.base.models import AccessRight, ApiUser
PUBLIC_KEY = '''-----BEGIN CERTIFICATE-----
MIICyjCCAbKgAwIBAgIUQQzM2eFYF+LpUR3t2euAjZAwLCEwDQYJKoZIhvcNAQEL
BQAwDzENMAsGA1UEAwwEemVwbzAeFw0xOTA2MjIyMjMxMTVaFw0yOTA2MTkyMjMx
MTVaMA8xDTALBgNVBAMMBHplcG8wggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEK
AoIBAQC8BM3xDylze0bOm76IjidyhmFqJlnRvcpbeZVTM7r3qYOHqXFG7/GZL4yd
2bW5eL6TCUT3gLEgegGYGPwCkGPd1cq9h+2M7zvolGToRCvrBpxH5Vu6iRkEYyWN
yPhc02EUqYlz1FBBYRgyYHQ4jy0vsPH55g536OKLI4rVykszjwD9p0Kh+T2I1D9Z
bHyA6s0C8goUFZG7kvasFRIXTTPgUBGSnEN/VPSD5vM94Oj5K4t6P9GHd32Jo2O6
E5jAHbPR7I4nRBCJuxJEbfpmsaOuMkGQ5rMulk6LXvRZiT9UDCDem1k6uF6tJkZR
g+Uh5V4ZLCzP7sSHcRN2ftWZqAr7AgMBAAGjHjAcMAkGA1UdEwQCMAAwDwYDVR0R
BAgwBoIEemVwbzANBgkqhkiG9w0BAQsFAAOCAQEANo54TMbOR5Isd4lix87EM0N+
8kxovCLin/szK4+fGfnr0fCUswkhoZ3y6xnmXFX4S2IGLU8bTl+eQVg04VM/7Gg2
LvBTtiBmGESbiSaC1fS+DbPBjp1NBpfwbiQFEuQfRMS6ejeF1YMS8Oy9PpeujHDT
4cX0kPH2GkqOGtpAdKoOD5XzT3yu5IHv7/AWpl8LZ5hr3f1RbzfIzJ19oC5NDXIR
d1XsjuCCHFbCyfnDmuZuQaGCTCm9f4Z8Ynum6hmSSzNvy3YJLRKXEYYLB3+l2V+t
SCzQiVzqDeKZnAChvJRditvcKdG36TPHREyCPgkpUTmi0gEjDZDPyBmXhHXHYA==
-----END CERTIFICATE-----
'''
PRIVATE_KEY = '''-----BEGIN PRIVATE KEY-----
MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQC8BM3xDylze0bO
m76IjidyhmFqJlnRvcpbeZVTM7r3qYOHqXFG7/GZL4yd2bW5eL6TCUT3gLEgegGY
GPwCkGPd1cq9h+2M7zvolGToRCvrBpxH5Vu6iRkEYyWNyPhc02EUqYlz1FBBYRgy
YHQ4jy0vsPH55g536OKLI4rVykszjwD9p0Kh+T2I1D9ZbHyA6s0C8goUFZG7kvas
FRIXTTPgUBGSnEN/VPSD5vM94Oj5K4t6P9GHd32Jo2O6E5jAHbPR7I4nRBCJuxJE
bfpmsaOuMkGQ5rMulk6LXvRZiT9UDCDem1k6uF6tJkZRg+Uh5V4ZLCzP7sSHcRN2
ftWZqAr7AgMBAAECggEAUQsdHhA0BNQZdEtLuJ7VwBbOfKvlQXQ2enGQ/RkqOUC3
Mk3GRxZ8JFSLnyrNmxHBy61OLgUp1F7iuwXh8tT8Rw21YzbpHTutrhXw3PEtoRPr
X04s2N3pi6uU72W2MITorrhZSDU3FsdcX7KVxh9pEcqKsvYIPIWEyQbb/EVDXwhC
K4TAsmmhsGaU/BB0WHkbzU5KlZqYQHfnoj5pmTLoeYj81Z8D5T9fceImcyuWLl8t
OHscbAjNpkS1X2vYNwMLhCAM7YhE63RjFo2G5fxx64wdgs+mllR0PCc3Nli+JPWE
LQ/KmVMPY6JH506WbZkEVgb9Gfuj6yASpu5zJxf3MQKBgQDwktbzKmK3zwSNwLdx
zZy4AnQb2PNKuryYts9R+nKqWnE5vJwekBIV9vsASK0Aoh4UWP9cUjQV3gZH3HIR
9xP8nJwpOLU7c/wPP81HUl7nf68MVP5OzemHY/78fYR8T3Jf/uFdW3AOQrCBjGIV
Zbjb5SisqCS2ODc7DSuqzliBMwKBgQDIEz7nhXyMilTlgiWw7iRm9RKWykVSjEuR
gxHAqPOkg/HxacfDYs/O+qteSdW/V7zzZ2QHMyr52BaL5sEz0lC7W3O1nfTS94VM
YK+MakRBImC5uZvpea+30vJbVLODnKskhsWF8aCqLi5KJvkR5aE0OySKXBur7Czq
X2/gK6jfGQKBgAbU1J/BE16O1V1FHLBxm0KqZyunRHlZxiM8BbUZPIpT2SU/kttX
UfwnsEb4yVjcQahoQpAXkX0RefIuc1rJPlsNA240Owk+KOkx8Z1V3HYMbScXfsU0
Ga6Li2EWG14AT4okTbf98bel8ycqmlprMg2kezwz5h76h674l8XY6DB7AoGBAKJL
Aka5gBtchpsZJEvOENc3Spnof60DQrVJVZgrNF+p7BMA1FsIhzsFGQdF603n9MyY
fIpelijOgROA3g2UN4qTF1wmQhbzUzxuXVgQR0dyhHWDOxZ7b+8z/QXawjcrWaQq
coVBSCtjhIb/8B/1XftJUk2tg4DE9nYzbkOwBq7ZAoGAPF1KSY9TzY7g8gmQzYuY
+CCHM3mR9PjSHhJS5VWTwGfw3zZpLptwxzmJAoi1DyrJVhBP17ADVitYK4GquiSB
z5aZ2AnUBc/xueO2ixL3ROOXYAeakrRAQ38G13ibYe2dQpv6/CTsZJOttnCErn54
3k2Y/kDV+c1uNbzyPiK2qaM=
-----END PRIVATE KEY-----
'''
@pytest.fixture
def cryptor(db):
return Cryptor.objects.create(slug='test', private_key=PRIVATE_KEY, public_key=PUBLIC_KEY)
def test_cryptor_bad_keys(db):
bad1 = Cryptor(slug='bad1', title='t', description='d', private_key='badkey')
with pytest.raises(ValidationError) as e1:
bad1.full_clean()
assert e1.value.messages == ['invalid RSA key (RSA key format is not supported)']
bad2 = Cryptor(slug='bad2', title='t', description='d', public_key='badkey')
with pytest.raises(ValidationError) as e2:
bad2.full_clean()
assert e2.value.messages == ['invalid RSA key (RSA key format is not supported)']
def test_cryptor_restricted_access(app, cryptor):
endpoint = tests.utils.generic_endpoint_url('cryptor', 'file-encrypt', slug=cryptor.slug)
assert endpoint == '/cryptor/test/file-encrypt'
resp = app.get(endpoint, status=405)
resp = app.post_json(endpoint, params={'foo': 'bar'}, status=403)
assert resp.json['err'] == 1
assert 'PermissionDenied' in resp.json['err_class']
endpoint = tests.utils.generic_endpoint_url('cryptor', 'file-decrypt', slug=cryptor.slug) + '/uuid'
assert endpoint == '/cryptor/test/file-decrypt/uuid'
resp = app.post_json(endpoint, params={'foo': 'bar'}, status=405)
resp = app.get(endpoint, status=403)
assert resp.json['err'] == 1
assert 'PermissionDenied' in resp.json['err_class']
def test_cryptor_bad_requests(app, cryptor):
# full opened access
api = ApiUser.objects.create(username='all', keytype='', key='')
obj_type = ContentType.objects.get_for_model(cryptor)
AccessRight.objects.create(
codename='can_encrypt', apiuser=api, resource_type=obj_type, resource_pk=cryptor.pk
)
AccessRight.objects.create(
codename='can_decrypt', apiuser=api, resource_type=obj_type, resource_pk=cryptor.pk
)
endpoint = tests.utils.generic_endpoint_url('cryptor', 'file-encrypt', slug=cryptor.slug)
for bad_payload in (
'error',
{'foo': 'bar'},
['not', 'a', 'dict'],
{'file': {'filename': 'f', 'content_type': 'ct'}},
{'file': {'filename': 'f', 'content_type': 'ct', 'content': None}},
{'file': {'filename': 'f', 'content_type': 'ct', 'content': 'NotBase64'}},
):
resp = app.post_json(endpoint, params=bad_payload, status=400)
assert resp.json['err'] == 1
endpoint = tests.utils.generic_endpoint_url('cryptor', 'file-decrypt', slug=cryptor.slug)
endpoint = endpoint + '/bad-uuid'
resp = app.get(endpoint, status=404)
def test_cryptor_encrypt_decrypt(app, cryptor):
api = ApiUser.objects.create(username='all', keytype='', key='')
obj_type = ContentType.objects.get_for_model(cryptor)
AccessRight.objects.create(
codename='can_encrypt', apiuser=api, resource_type=obj_type, resource_pk=cryptor.pk
)
AccessRight.objects.create(
codename='can_decrypt', apiuser=api, resource_type=obj_type, resource_pk=cryptor.pk
)
# encrypt
endpoint = tests.utils.generic_endpoint_url('cryptor', 'file-encrypt', slug=cryptor.slug)
content = force_str(base64.b64encode(b'this is foo and bar'))
payload = {'file': {'filename': 'foo.txt', 'content_type': 'text/plain', 'content': content}}
resp = app.post_json(endpoint, params=payload, status=200)
assert resp.json['err'] == 0
assert CryptedFile.objects.count() == 1
cfile = CryptedFile.objects.first()
assert resp.json['data']['redirect_url'].endswith(
'/cryptor/%s/file-decrypt/%s' % (cryptor.slug, cfile.uuid)
)
cfile.delete()
# encrypt with another redirect url
cryptor.redirect_url_base = 'https://foo/bar/'
cryptor.save()
resp = app.post_json(endpoint, params=payload, status=200)
assert resp.json['err'] == 0
assert CryptedFile.objects.count() == 1
cfile = CryptedFile.objects.first()
assert resp.json['data']['redirect_url'] == 'https://foo/bar/%s' % cfile.uuid
# remove public key = cannot encrypt
cryptor.public_key = ''
cryptor.save()
resp = app.post_json(endpoint, params=payload, status=200)
assert resp.json['err'] == 1
assert resp.json['err_desc'] == 'missing public key'
# decrypt
endpoint = tests.utils.generic_endpoint_url('cryptor', 'file-decrypt', slug=cryptor.slug)
endpoint = endpoint + '/' + str(cfile.uuid)
resp = app.get(endpoint, status=200)
assert resp.content_type == 'text/plain'
assert resp.content == b'this is foo and bar'
assert resp.headers['Content-Disposition'] == 'inline; filename="foo.txt"'
# remove all files does not remove data+metadata(json) files: decrypt still works
CryptedFile.objects.all().delete()
assert CryptedFile.objects.count() == 0
resp = app.get(endpoint, status=200)
assert resp.content_type == 'text/plain'
assert resp.content == b'this is foo and bar'
assert resp.headers['Content-Disposition'] == 'inline; filename="foo.txt"'
# remove private key = cannot decrypt
cryptor.private_key = ''
cryptor.save()
resp = app.get(endpoint, status=200)
assert resp.json['err'] == 1
assert resp.json['err_desc'] == 'missing private key'