183 lines
8.1 KiB
Python
183 lines
8.1 KiB
Python
import base64
|
|
|
|
import pytest
|
|
from django.contrib.contenttypes.models import ContentType
|
|
from django.core.exceptions import ValidationError
|
|
from django.utils.encoding import force_str
|
|
|
|
import tests.utils
|
|
from passerelle.apps.cryptor.models import CryptedFile, Cryptor
|
|
from passerelle.base.models import AccessRight, ApiUser
|
|
|
|
PUBLIC_KEY = '''-----BEGIN CERTIFICATE-----
|
|
MIICyjCCAbKgAwIBAgIUQQzM2eFYF+LpUR3t2euAjZAwLCEwDQYJKoZIhvcNAQEL
|
|
BQAwDzENMAsGA1UEAwwEemVwbzAeFw0xOTA2MjIyMjMxMTVaFw0yOTA2MTkyMjMx
|
|
MTVaMA8xDTALBgNVBAMMBHplcG8wggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEK
|
|
AoIBAQC8BM3xDylze0bOm76IjidyhmFqJlnRvcpbeZVTM7r3qYOHqXFG7/GZL4yd
|
|
2bW5eL6TCUT3gLEgegGYGPwCkGPd1cq9h+2M7zvolGToRCvrBpxH5Vu6iRkEYyWN
|
|
yPhc02EUqYlz1FBBYRgyYHQ4jy0vsPH55g536OKLI4rVykszjwD9p0Kh+T2I1D9Z
|
|
bHyA6s0C8goUFZG7kvasFRIXTTPgUBGSnEN/VPSD5vM94Oj5K4t6P9GHd32Jo2O6
|
|
E5jAHbPR7I4nRBCJuxJEbfpmsaOuMkGQ5rMulk6LXvRZiT9UDCDem1k6uF6tJkZR
|
|
g+Uh5V4ZLCzP7sSHcRN2ftWZqAr7AgMBAAGjHjAcMAkGA1UdEwQCMAAwDwYDVR0R
|
|
BAgwBoIEemVwbzANBgkqhkiG9w0BAQsFAAOCAQEANo54TMbOR5Isd4lix87EM0N+
|
|
8kxovCLin/szK4+fGfnr0fCUswkhoZ3y6xnmXFX4S2IGLU8bTl+eQVg04VM/7Gg2
|
|
LvBTtiBmGESbiSaC1fS+DbPBjp1NBpfwbiQFEuQfRMS6ejeF1YMS8Oy9PpeujHDT
|
|
4cX0kPH2GkqOGtpAdKoOD5XzT3yu5IHv7/AWpl8LZ5hr3f1RbzfIzJ19oC5NDXIR
|
|
d1XsjuCCHFbCyfnDmuZuQaGCTCm9f4Z8Ynum6hmSSzNvy3YJLRKXEYYLB3+l2V+t
|
|
SCzQiVzqDeKZnAChvJRditvcKdG36TPHREyCPgkpUTmi0gEjDZDPyBmXhHXHYA==
|
|
-----END CERTIFICATE-----
|
|
'''
|
|
PRIVATE_KEY = '''-----BEGIN PRIVATE KEY-----
|
|
MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQC8BM3xDylze0bO
|
|
m76IjidyhmFqJlnRvcpbeZVTM7r3qYOHqXFG7/GZL4yd2bW5eL6TCUT3gLEgegGY
|
|
GPwCkGPd1cq9h+2M7zvolGToRCvrBpxH5Vu6iRkEYyWNyPhc02EUqYlz1FBBYRgy
|
|
YHQ4jy0vsPH55g536OKLI4rVykszjwD9p0Kh+T2I1D9ZbHyA6s0C8goUFZG7kvas
|
|
FRIXTTPgUBGSnEN/VPSD5vM94Oj5K4t6P9GHd32Jo2O6E5jAHbPR7I4nRBCJuxJE
|
|
bfpmsaOuMkGQ5rMulk6LXvRZiT9UDCDem1k6uF6tJkZRg+Uh5V4ZLCzP7sSHcRN2
|
|
ftWZqAr7AgMBAAECggEAUQsdHhA0BNQZdEtLuJ7VwBbOfKvlQXQ2enGQ/RkqOUC3
|
|
Mk3GRxZ8JFSLnyrNmxHBy61OLgUp1F7iuwXh8tT8Rw21YzbpHTutrhXw3PEtoRPr
|
|
X04s2N3pi6uU72W2MITorrhZSDU3FsdcX7KVxh9pEcqKsvYIPIWEyQbb/EVDXwhC
|
|
K4TAsmmhsGaU/BB0WHkbzU5KlZqYQHfnoj5pmTLoeYj81Z8D5T9fceImcyuWLl8t
|
|
OHscbAjNpkS1X2vYNwMLhCAM7YhE63RjFo2G5fxx64wdgs+mllR0PCc3Nli+JPWE
|
|
LQ/KmVMPY6JH506WbZkEVgb9Gfuj6yASpu5zJxf3MQKBgQDwktbzKmK3zwSNwLdx
|
|
zZy4AnQb2PNKuryYts9R+nKqWnE5vJwekBIV9vsASK0Aoh4UWP9cUjQV3gZH3HIR
|
|
9xP8nJwpOLU7c/wPP81HUl7nf68MVP5OzemHY/78fYR8T3Jf/uFdW3AOQrCBjGIV
|
|
Zbjb5SisqCS2ODc7DSuqzliBMwKBgQDIEz7nhXyMilTlgiWw7iRm9RKWykVSjEuR
|
|
gxHAqPOkg/HxacfDYs/O+qteSdW/V7zzZ2QHMyr52BaL5sEz0lC7W3O1nfTS94VM
|
|
YK+MakRBImC5uZvpea+30vJbVLODnKskhsWF8aCqLi5KJvkR5aE0OySKXBur7Czq
|
|
X2/gK6jfGQKBgAbU1J/BE16O1V1FHLBxm0KqZyunRHlZxiM8BbUZPIpT2SU/kttX
|
|
UfwnsEb4yVjcQahoQpAXkX0RefIuc1rJPlsNA240Owk+KOkx8Z1V3HYMbScXfsU0
|
|
Ga6Li2EWG14AT4okTbf98bel8ycqmlprMg2kezwz5h76h674l8XY6DB7AoGBAKJL
|
|
Aka5gBtchpsZJEvOENc3Spnof60DQrVJVZgrNF+p7BMA1FsIhzsFGQdF603n9MyY
|
|
fIpelijOgROA3g2UN4qTF1wmQhbzUzxuXVgQR0dyhHWDOxZ7b+8z/QXawjcrWaQq
|
|
coVBSCtjhIb/8B/1XftJUk2tg4DE9nYzbkOwBq7ZAoGAPF1KSY9TzY7g8gmQzYuY
|
|
+CCHM3mR9PjSHhJS5VWTwGfw3zZpLptwxzmJAoi1DyrJVhBP17ADVitYK4GquiSB
|
|
z5aZ2AnUBc/xueO2ixL3ROOXYAeakrRAQ38G13ibYe2dQpv6/CTsZJOttnCErn54
|
|
3k2Y/kDV+c1uNbzyPiK2qaM=
|
|
-----END PRIVATE KEY-----
|
|
'''
|
|
|
|
|
|
@pytest.fixture
|
|
def cryptor(db):
|
|
return Cryptor.objects.create(slug='test', private_key=PRIVATE_KEY, public_key=PUBLIC_KEY)
|
|
|
|
|
|
def test_cryptor_bad_keys(db):
|
|
bad1 = Cryptor(slug='bad1', title='t', description='d', private_key='badkey')
|
|
with pytest.raises(ValidationError) as e1:
|
|
bad1.full_clean()
|
|
assert e1.value.messages == ['invalid RSA key (RSA key format is not supported)']
|
|
bad2 = Cryptor(slug='bad2', title='t', description='d', public_key='badkey')
|
|
with pytest.raises(ValidationError) as e2:
|
|
bad2.full_clean()
|
|
assert e2.value.messages == ['invalid RSA key (RSA key format is not supported)']
|
|
|
|
|
|
def test_cryptor_restricted_access(app, cryptor):
|
|
endpoint = tests.utils.generic_endpoint_url('cryptor', 'file-encrypt', slug=cryptor.slug)
|
|
assert endpoint == '/cryptor/test/file-encrypt'
|
|
resp = app.get(endpoint, status=405)
|
|
resp = app.post_json(endpoint, params={'foo': 'bar'}, status=403)
|
|
assert resp.json['err'] == 1
|
|
assert 'PermissionDenied' in resp.json['err_class']
|
|
|
|
endpoint = tests.utils.generic_endpoint_url('cryptor', 'file-decrypt', slug=cryptor.slug) + '/uuid'
|
|
assert endpoint == '/cryptor/test/file-decrypt/uuid'
|
|
resp = app.post_json(endpoint, params={'foo': 'bar'}, status=405)
|
|
resp = app.get(endpoint, status=403)
|
|
assert resp.json['err'] == 1
|
|
assert 'PermissionDenied' in resp.json['err_class']
|
|
|
|
|
|
def test_cryptor_bad_requests(app, cryptor):
|
|
# full opened access
|
|
api = ApiUser.objects.create(username='all', keytype='', key='')
|
|
obj_type = ContentType.objects.get_for_model(cryptor)
|
|
AccessRight.objects.create(
|
|
codename='can_encrypt', apiuser=api, resource_type=obj_type, resource_pk=cryptor.pk
|
|
)
|
|
AccessRight.objects.create(
|
|
codename='can_decrypt', apiuser=api, resource_type=obj_type, resource_pk=cryptor.pk
|
|
)
|
|
|
|
endpoint = tests.utils.generic_endpoint_url('cryptor', 'file-encrypt', slug=cryptor.slug)
|
|
for bad_payload in (
|
|
'error',
|
|
{'foo': 'bar'},
|
|
['not', 'a', 'dict'],
|
|
{'file': {'filename': 'f', 'content_type': 'ct'}},
|
|
{'file': {'filename': 'f', 'content_type': 'ct', 'content': None}},
|
|
{'file': {'filename': 'f', 'content_type': 'ct', 'content': 'NotBase64'}},
|
|
):
|
|
resp = app.post_json(endpoint, params=bad_payload, status=400)
|
|
assert resp.json['err'] == 1
|
|
|
|
endpoint = tests.utils.generic_endpoint_url('cryptor', 'file-decrypt', slug=cryptor.slug)
|
|
endpoint = endpoint + '/bad-uuid'
|
|
resp = app.get(endpoint, status=404)
|
|
|
|
|
|
def test_cryptor_encrypt_decrypt(app, cryptor):
|
|
api = ApiUser.objects.create(username='all', keytype='', key='')
|
|
obj_type = ContentType.objects.get_for_model(cryptor)
|
|
AccessRight.objects.create(
|
|
codename='can_encrypt', apiuser=api, resource_type=obj_type, resource_pk=cryptor.pk
|
|
)
|
|
AccessRight.objects.create(
|
|
codename='can_decrypt', apiuser=api, resource_type=obj_type, resource_pk=cryptor.pk
|
|
)
|
|
|
|
# encrypt
|
|
endpoint = tests.utils.generic_endpoint_url('cryptor', 'file-encrypt', slug=cryptor.slug)
|
|
content = force_str(base64.b64encode(b'this is foo and bar'))
|
|
payload = {'file': {'filename': 'foo.txt', 'content_type': 'text/plain', 'content': content}}
|
|
|
|
resp = app.post_json(endpoint, params=payload, status=200)
|
|
assert resp.json['err'] == 0
|
|
assert CryptedFile.objects.count() == 1
|
|
cfile = CryptedFile.objects.first()
|
|
assert resp.json['data']['redirect_url'].endswith(
|
|
'/cryptor/%s/file-decrypt/%s' % (cryptor.slug, cfile.uuid)
|
|
)
|
|
cfile.delete()
|
|
|
|
# encrypt with another redirect url
|
|
cryptor.redirect_url_base = 'https://foo/bar/'
|
|
cryptor.save()
|
|
resp = app.post_json(endpoint, params=payload, status=200)
|
|
assert resp.json['err'] == 0
|
|
assert CryptedFile.objects.count() == 1
|
|
cfile = CryptedFile.objects.first()
|
|
assert resp.json['data']['redirect_url'] == 'https://foo/bar/%s' % cfile.uuid
|
|
|
|
# remove public key = cannot encrypt
|
|
cryptor.public_key = ''
|
|
cryptor.save()
|
|
resp = app.post_json(endpoint, params=payload, status=200)
|
|
assert resp.json['err'] == 1
|
|
assert resp.json['err_desc'] == 'missing public key'
|
|
|
|
# decrypt
|
|
endpoint = tests.utils.generic_endpoint_url('cryptor', 'file-decrypt', slug=cryptor.slug)
|
|
endpoint = endpoint + '/' + str(cfile.uuid)
|
|
resp = app.get(endpoint, status=200)
|
|
assert resp.content_type == 'text/plain'
|
|
assert resp.content == b'this is foo and bar'
|
|
assert resp.headers['Content-Disposition'] == 'inline; filename="foo.txt"'
|
|
|
|
# remove all files does not remove data+metadata(json) files: decrypt still works
|
|
CryptedFile.objects.all().delete()
|
|
assert CryptedFile.objects.count() == 0
|
|
resp = app.get(endpoint, status=200)
|
|
assert resp.content_type == 'text/plain'
|
|
assert resp.content == b'this is foo and bar'
|
|
assert resp.headers['Content-Disposition'] == 'inline; filename="foo.txt"'
|
|
|
|
# remove private key = cannot decrypt
|
|
cryptor.private_key = ''
|
|
cryptor.save()
|
|
resp = app.get(endpoint, status=200)
|
|
assert resp.json['err'] == 1
|
|
assert resp.json['err_desc'] == 'missing private key'
|