add cryptor connector (#39431)
This commit is contained in:
parent
359daf8c05
commit
e137d66def
|
@ -35,6 +35,7 @@ Depends: ${python:Depends},
|
|||
python-pyexcel-ods,
|
||||
python-pyexcel-xls,
|
||||
python-crypto,
|
||||
python-pycryptodome,
|
||||
python-feedparser,
|
||||
python-pdfrw,
|
||||
python-httplib2,
|
||||
|
|
|
@ -0,0 +1,50 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Generated by Django 1.11.18 on 2020-03-04 10:25
|
||||
from __future__ import unicode_literals
|
||||
|
||||
from django.db import migrations, models
|
||||
import django.db.models.deletion
|
||||
import passerelle.apps.cryptor.models
|
||||
import uuid
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
initial = True
|
||||
|
||||
dependencies = [
|
||||
('base', '0016_auto_20191002_1443'),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.CreateModel(
|
||||
name='CryptedFile',
|
||||
fields=[
|
||||
('uuid', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False)),
|
||||
('filename', models.CharField(max_length=512)),
|
||||
('content_type', models.CharField(max_length=128)),
|
||||
('creation_timestamp', models.DateTimeField(auto_now_add=True)),
|
||||
],
|
||||
),
|
||||
migrations.CreateModel(
|
||||
name='Cryptor',
|
||||
fields=[
|
||||
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
|
||||
('title', models.CharField(max_length=50, verbose_name='Title')),
|
||||
('slug', models.SlugField(unique=True, verbose_name='Identifier')),
|
||||
('description', models.TextField(verbose_name='Description')),
|
||||
('public_key', models.TextField(blank=True, validators=[passerelle.apps.cryptor.models.validate_rsa_key], verbose_name='Encryption RSA public key (PEM format)')),
|
||||
('private_key', models.TextField(blank=True, validators=[passerelle.apps.cryptor.models.validate_rsa_key], verbose_name='Decryption RSA private key (PEM format)')),
|
||||
('redirect_url_base', models.URLField(blank=True, help_text='Base URL for redirect, empty for local', max_length=256, verbose_name='Base URL of decrypt system')),
|
||||
('users', models.ManyToManyField(blank=True, related_name='_cryptor_users_+', related_query_name='+', to='base.ApiUser')),
|
||||
],
|
||||
options={
|
||||
'verbose_name': 'Encryption / Decryption',
|
||||
},
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='cryptedfile',
|
||||
name='resource',
|
||||
field=models.ForeignKey(on_delete=django.db.models.deletion.PROTECT, to='cryptor.Cryptor'),
|
||||
),
|
||||
]
|
|
@ -0,0 +1,226 @@
|
|||
# passerelle - uniform access to multiple data sources and services
|
||||
# Copyright (C) 2020 Entr'ouvert
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify it
|
||||
# under the terms of the GNU Affero General Public License as published
|
||||
# by the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import base64
|
||||
import binascii
|
||||
import json
|
||||
import os
|
||||
from uuid import uuid4
|
||||
|
||||
from Cryptodome.PublicKey import RSA
|
||||
from Cryptodome.Random import get_random_bytes
|
||||
from Cryptodome.Cipher import AES, PKCS1_OAEP
|
||||
|
||||
from django.core.exceptions import ValidationError
|
||||
from django.core.files.storage import default_storage
|
||||
from django.db import models
|
||||
from django.http import HttpResponse
|
||||
from django.utils.six.moves.urllib_parse import urljoin
|
||||
from django.utils.translation import ugettext_lazy as _
|
||||
|
||||
from passerelle.base.models import BaseResource
|
||||
from passerelle.utils.api import endpoint
|
||||
from passerelle.utils.files import atomic_write
|
||||
from passerelle.utils.jsonresponse import APIError
|
||||
|
||||
|
||||
FILE_SCHEMA = {
|
||||
"$schema": "http://json-schema.org/draft-04/schema#",
|
||||
"title": "File to encrypt",
|
||||
"description": "",
|
||||
"type": "object",
|
||||
"required": ["file"],
|
||||
"properties": {
|
||||
"file": {
|
||||
"type": "object",
|
||||
"required": ["filename", "content_type", "content"],
|
||||
"properties": {
|
||||
"filename": {"type": "string"},
|
||||
"content_type": {"type": "string"},
|
||||
"content": {"type": "string"},
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
# encrypt and decrypt are borrowed from
|
||||
# https://www.pycryptodome.org/en/latest/src/examples.html#encrypt-data-with-rsa
|
||||
|
||||
def write_encrypt(out_file, data, key_pem):
|
||||
public_key = RSA.import_key(key_pem)
|
||||
session_key = get_random_bytes(16)
|
||||
# Encrypt the session key with the public RSA key
|
||||
cipher_rsa = PKCS1_OAEP.new(public_key)
|
||||
enc_session_key = cipher_rsa.encrypt(session_key)
|
||||
# Encrypt the data with the AES session key
|
||||
cipher_aes = AES.new(session_key, AES.MODE_EAX)
|
||||
ciphertext, tag = cipher_aes.encrypt_and_digest(data)
|
||||
# Store in out_file
|
||||
out_file.write(enc_session_key)
|
||||
out_file.write(cipher_aes.nonce)
|
||||
out_file.write(tag)
|
||||
out_file.write(ciphertext)
|
||||
|
||||
|
||||
def read_decrypt(in_file, key_pem):
|
||||
private_key = RSA.import_key(key_pem)
|
||||
# Get crypt elements from in_file
|
||||
enc_session_key = in_file.read(private_key.size_in_bytes())
|
||||
nonce = in_file.read(16)
|
||||
tag = in_file.read(16)
|
||||
ciphertext = in_file.read()
|
||||
# Decrypt the session key with the private RSA key
|
||||
cipher_rsa = PKCS1_OAEP.new(private_key)
|
||||
session_key = cipher_rsa.decrypt(enc_session_key)
|
||||
# Decrypt the data with the AES session key
|
||||
cipher_aes = AES.new(session_key, AES.MODE_EAX, nonce)
|
||||
decrypted = cipher_aes.decrypt_and_verify(ciphertext, tag)
|
||||
return decrypted
|
||||
|
||||
|
||||
def makedir(dir_name):
|
||||
if not os.path.exists(dir_name):
|
||||
if default_storage.directory_permissions_mode:
|
||||
d_umask = os.umask(0)
|
||||
try:
|
||||
os.makedirs(dir_name, mode=default_storage.directory_permissions_mode)
|
||||
except OSError:
|
||||
pass
|
||||
finally:
|
||||
os.umask(d_umask)
|
||||
else:
|
||||
os.makedirs(dir_name)
|
||||
|
||||
|
||||
def validate_rsa_key(key):
|
||||
try:
|
||||
RSA.import_key(key)
|
||||
except ValueError as ex:
|
||||
raise ValidationError(_('invalid RSA key (%s)') % ex)
|
||||
|
||||
|
||||
class Cryptor(BaseResource):
|
||||
public_key = models.TextField(blank=True,
|
||||
verbose_name=_('Encryption RSA public key (PEM format)'),
|
||||
validators=[validate_rsa_key])
|
||||
private_key = models.TextField(blank=True,
|
||||
verbose_name=_('Decryption RSA private key (PEM format)'),
|
||||
validators=[validate_rsa_key])
|
||||
redirect_url_base = models.URLField(max_length=256, blank=True,
|
||||
verbose_name=_('Base URL of decrypt system'),
|
||||
help_text=_('Base URL for redirect, empty for local'))
|
||||
|
||||
category = _('Misc')
|
||||
|
||||
class Meta:
|
||||
verbose_name = _('Encryption / Decryption')
|
||||
|
||||
def get_redirect_url_base_display(self):
|
||||
if self.redirect_url_base:
|
||||
return _('defined') # don't show it, can be sensitive
|
||||
return _('this file-decrypt endpoint')
|
||||
|
||||
def get_filename(self, uuid, create=False):
|
||||
dirname = os.path.join(default_storage.path(self.get_connector_slug()),
|
||||
self.slug, uuid[0:2], uuid[2:4])
|
||||
if create:
|
||||
makedir(dirname)
|
||||
filename = os.path.join(dirname, uuid)
|
||||
return filename
|
||||
|
||||
|
||||
@endpoint(name='file-encrypt', perm='can_encrypt',
|
||||
description=_('Encrypt a file'),
|
||||
post={
|
||||
'description': _('File to encrypt'),
|
||||
'request_body': {'schema': {'application/json': FILE_SCHEMA}}
|
||||
})
|
||||
def file_encrypt(self, request, post_data):
|
||||
if not self.public_key:
|
||||
raise APIError('missing public key')
|
||||
try:
|
||||
data = base64.b64decode(post_data['file']['content'])
|
||||
except (TypeError, binascii.Error):
|
||||
raise APIError('invalid base64 string', http_status=400)
|
||||
|
||||
filename = post_data['file']['filename']
|
||||
content_type = post_data['file']['content_type']
|
||||
cfile = CryptedFile(resource=self, filename=filename, content_type=content_type)
|
||||
cfile.save()
|
||||
|
||||
uuid = str(cfile.uuid) # get string representation of UUID object
|
||||
|
||||
if self.redirect_url_base:
|
||||
redirect_url_base = self.redirect_url_base
|
||||
else:
|
||||
redirect_url_base = request.build_absolute_uri('%sfile-decrypt/' % (
|
||||
self.get_absolute_url(),))
|
||||
redirect_url = urljoin(redirect_url_base, uuid)
|
||||
|
||||
content_filename = self.get_filename(uuid, create=True)
|
||||
metadata_filename = '%s.json' % content_filename
|
||||
metadata = {
|
||||
'filename': cfile.filename,
|
||||
'content_type': cfile.content_type,
|
||||
'creation_timestamp': cfile.creation_timestamp.isoformat(),
|
||||
'redirect_url': redirect_url,
|
||||
}
|
||||
|
||||
tmp_dir = os.path.join(default_storage.path(self.get_connector_slug()), self.slug, 'tmp')
|
||||
with atomic_write(content_filename, dir=tmp_dir) as fd:
|
||||
write_encrypt(fd, data, self.public_key)
|
||||
with atomic_write(metadata_filename, dir=tmp_dir, mode='w') as fd:
|
||||
json.dump(metadata, fd, indent=2)
|
||||
|
||||
return {'data': metadata}
|
||||
|
||||
@endpoint(name='file-decrypt', perm='can_decrypt',
|
||||
description=_('Decrypt a file'),
|
||||
pattern=r'(?P<uuid>[\w-]+)$',
|
||||
example_pattern='{uuid}/',
|
||||
parameters={
|
||||
'uuid': {
|
||||
'description': _('File identifier'),
|
||||
'example_value': '12345678-abcd-4321-abcd-123456789012',
|
||||
},
|
||||
})
|
||||
def file_decrypt(self, request, uuid):
|
||||
if not self.private_key:
|
||||
raise APIError('missing private key')
|
||||
content_filename = self.get_filename(uuid, create=False)
|
||||
metadata_filename = '%s.json' % content_filename
|
||||
if not os.path.exists(metadata_filename):
|
||||
raise APIError('unknown uuid', http_status=404)
|
||||
|
||||
content = read_decrypt(open(content_filename, 'rb'), self.private_key)
|
||||
|
||||
metadata = json.load(open(metadata_filename, 'r'))
|
||||
filename = metadata.get('filename')
|
||||
content_type = metadata.get('content_type')
|
||||
|
||||
response = HttpResponse(content_type=content_type)
|
||||
response['Content-Disposition'] = 'inline; filename="%s"' % filename
|
||||
response.write(content)
|
||||
return response
|
||||
|
||||
|
||||
class CryptedFile(models.Model):
|
||||
resource = models.ForeignKey(Cryptor, on_delete=models.PROTECT)
|
||||
uuid = models.UUIDField(primary_key=True, default=uuid4, editable=False)
|
||||
filename = models.CharField(max_length=512, blank=False)
|
||||
content_type = models.CharField(max_length=128)
|
||||
creation_timestamp = models.DateTimeField(auto_now_add=True)
|
|
@ -134,6 +134,7 @@ INSTALLED_APPS = (
|
|||
'passerelle.apps.cityweb',
|
||||
'passerelle.apps.clicrdv',
|
||||
'passerelle.apps.cmis',
|
||||
'passerelle.apps.cryptor',
|
||||
'passerelle.apps.csvdatasource',
|
||||
'passerelle.apps.family',
|
||||
'passerelle.apps.feeds',
|
||||
|
|
|
@ -177,6 +177,10 @@ li.connector.dpark a::before {
|
|||
content: "\f1b9"; /* car */
|
||||
}
|
||||
|
||||
li.connector.cryptor a::before {
|
||||
content: "\f023"; /* lock */
|
||||
}
|
||||
|
||||
li.connector.status-down span.connector-name::after {
|
||||
font-family: FontAwesome;
|
||||
content: "\f00d"; /* times */
|
||||
|
|
1
setup.py
1
setup.py
|
@ -104,6 +104,7 @@ setup(name='passerelle',
|
|||
'jsonschema < 3.1',
|
||||
'zeep >= 3.2',
|
||||
'pycrypto',
|
||||
'pycryptodomex',
|
||||
'unidecode',
|
||||
'paramiko',
|
||||
'pdfrw',
|
||||
|
|
|
@ -0,0 +1,182 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
import base64
|
||||
import pytest
|
||||
|
||||
from django.contrib.contenttypes.models import ContentType
|
||||
from django.core.exceptions import ValidationError
|
||||
from django.utils.encoding import force_text
|
||||
|
||||
from passerelle.apps.cryptor.models import Cryptor, CryptedFile
|
||||
from passerelle.base.models import ApiUser, AccessRight
|
||||
|
||||
import utils
|
||||
|
||||
PUBLIC_KEY = '''-----BEGIN CERTIFICATE-----
|
||||
MIICyjCCAbKgAwIBAgIUQQzM2eFYF+LpUR3t2euAjZAwLCEwDQYJKoZIhvcNAQEL
|
||||
BQAwDzENMAsGA1UEAwwEemVwbzAeFw0xOTA2MjIyMjMxMTVaFw0yOTA2MTkyMjMx
|
||||
MTVaMA8xDTALBgNVBAMMBHplcG8wggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEK
|
||||
AoIBAQC8BM3xDylze0bOm76IjidyhmFqJlnRvcpbeZVTM7r3qYOHqXFG7/GZL4yd
|
||||
2bW5eL6TCUT3gLEgegGYGPwCkGPd1cq9h+2M7zvolGToRCvrBpxH5Vu6iRkEYyWN
|
||||
yPhc02EUqYlz1FBBYRgyYHQ4jy0vsPH55g536OKLI4rVykszjwD9p0Kh+T2I1D9Z
|
||||
bHyA6s0C8goUFZG7kvasFRIXTTPgUBGSnEN/VPSD5vM94Oj5K4t6P9GHd32Jo2O6
|
||||
E5jAHbPR7I4nRBCJuxJEbfpmsaOuMkGQ5rMulk6LXvRZiT9UDCDem1k6uF6tJkZR
|
||||
g+Uh5V4ZLCzP7sSHcRN2ftWZqAr7AgMBAAGjHjAcMAkGA1UdEwQCMAAwDwYDVR0R
|
||||
BAgwBoIEemVwbzANBgkqhkiG9w0BAQsFAAOCAQEANo54TMbOR5Isd4lix87EM0N+
|
||||
8kxovCLin/szK4+fGfnr0fCUswkhoZ3y6xnmXFX4S2IGLU8bTl+eQVg04VM/7Gg2
|
||||
LvBTtiBmGESbiSaC1fS+DbPBjp1NBpfwbiQFEuQfRMS6ejeF1YMS8Oy9PpeujHDT
|
||||
4cX0kPH2GkqOGtpAdKoOD5XzT3yu5IHv7/AWpl8LZ5hr3f1RbzfIzJ19oC5NDXIR
|
||||
d1XsjuCCHFbCyfnDmuZuQaGCTCm9f4Z8Ynum6hmSSzNvy3YJLRKXEYYLB3+l2V+t
|
||||
SCzQiVzqDeKZnAChvJRditvcKdG36TPHREyCPgkpUTmi0gEjDZDPyBmXhHXHYA==
|
||||
-----END CERTIFICATE-----
|
||||
'''
|
||||
PRIVATE_KEY = '''-----BEGIN PRIVATE KEY-----
|
||||
MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQC8BM3xDylze0bO
|
||||
m76IjidyhmFqJlnRvcpbeZVTM7r3qYOHqXFG7/GZL4yd2bW5eL6TCUT3gLEgegGY
|
||||
GPwCkGPd1cq9h+2M7zvolGToRCvrBpxH5Vu6iRkEYyWNyPhc02EUqYlz1FBBYRgy
|
||||
YHQ4jy0vsPH55g536OKLI4rVykszjwD9p0Kh+T2I1D9ZbHyA6s0C8goUFZG7kvas
|
||||
FRIXTTPgUBGSnEN/VPSD5vM94Oj5K4t6P9GHd32Jo2O6E5jAHbPR7I4nRBCJuxJE
|
||||
bfpmsaOuMkGQ5rMulk6LXvRZiT9UDCDem1k6uF6tJkZRg+Uh5V4ZLCzP7sSHcRN2
|
||||
ftWZqAr7AgMBAAECggEAUQsdHhA0BNQZdEtLuJ7VwBbOfKvlQXQ2enGQ/RkqOUC3
|
||||
Mk3GRxZ8JFSLnyrNmxHBy61OLgUp1F7iuwXh8tT8Rw21YzbpHTutrhXw3PEtoRPr
|
||||
X04s2N3pi6uU72W2MITorrhZSDU3FsdcX7KVxh9pEcqKsvYIPIWEyQbb/EVDXwhC
|
||||
K4TAsmmhsGaU/BB0WHkbzU5KlZqYQHfnoj5pmTLoeYj81Z8D5T9fceImcyuWLl8t
|
||||
OHscbAjNpkS1X2vYNwMLhCAM7YhE63RjFo2G5fxx64wdgs+mllR0PCc3Nli+JPWE
|
||||
LQ/KmVMPY6JH506WbZkEVgb9Gfuj6yASpu5zJxf3MQKBgQDwktbzKmK3zwSNwLdx
|
||||
zZy4AnQb2PNKuryYts9R+nKqWnE5vJwekBIV9vsASK0Aoh4UWP9cUjQV3gZH3HIR
|
||||
9xP8nJwpOLU7c/wPP81HUl7nf68MVP5OzemHY/78fYR8T3Jf/uFdW3AOQrCBjGIV
|
||||
Zbjb5SisqCS2ODc7DSuqzliBMwKBgQDIEz7nhXyMilTlgiWw7iRm9RKWykVSjEuR
|
||||
gxHAqPOkg/HxacfDYs/O+qteSdW/V7zzZ2QHMyr52BaL5sEz0lC7W3O1nfTS94VM
|
||||
YK+MakRBImC5uZvpea+30vJbVLODnKskhsWF8aCqLi5KJvkR5aE0OySKXBur7Czq
|
||||
X2/gK6jfGQKBgAbU1J/BE16O1V1FHLBxm0KqZyunRHlZxiM8BbUZPIpT2SU/kttX
|
||||
UfwnsEb4yVjcQahoQpAXkX0RefIuc1rJPlsNA240Owk+KOkx8Z1V3HYMbScXfsU0
|
||||
Ga6Li2EWG14AT4okTbf98bel8ycqmlprMg2kezwz5h76h674l8XY6DB7AoGBAKJL
|
||||
Aka5gBtchpsZJEvOENc3Spnof60DQrVJVZgrNF+p7BMA1FsIhzsFGQdF603n9MyY
|
||||
fIpelijOgROA3g2UN4qTF1wmQhbzUzxuXVgQR0dyhHWDOxZ7b+8z/QXawjcrWaQq
|
||||
coVBSCtjhIb/8B/1XftJUk2tg4DE9nYzbkOwBq7ZAoGAPF1KSY9TzY7g8gmQzYuY
|
||||
+CCHM3mR9PjSHhJS5VWTwGfw3zZpLptwxzmJAoi1DyrJVhBP17ADVitYK4GquiSB
|
||||
z5aZ2AnUBc/xueO2ixL3ROOXYAeakrRAQ38G13ibYe2dQpv6/CTsZJOttnCErn54
|
||||
3k2Y/kDV+c1uNbzyPiK2qaM=
|
||||
-----END PRIVATE KEY-----
|
||||
'''
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def cryptor(db):
|
||||
return Cryptor.objects.create(slug='test',
|
||||
private_key=PRIVATE_KEY,
|
||||
public_key=PUBLIC_KEY)
|
||||
|
||||
|
||||
def test_cryptor_bad_keys(db):
|
||||
bad1 = Cryptor(slug='bad1', title='t', description='d', private_key='badkey')
|
||||
with pytest.raises(ValidationError) as e1:
|
||||
bad1.full_clean()
|
||||
assert e1.value.messages == ['invalid RSA key (RSA key format is not supported)']
|
||||
bad2 = Cryptor(slug='bad2', title='t', description='d', public_key='badkey')
|
||||
with pytest.raises(ValidationError) as e2:
|
||||
bad2.full_clean()
|
||||
assert e2.value.messages == ['invalid RSA key (RSA key format is not supported)']
|
||||
|
||||
|
||||
def test_cryptor_restricted_access(app, cryptor):
|
||||
endpoint = utils.generic_endpoint_url('cryptor', 'file-encrypt', slug=cryptor.slug)
|
||||
assert endpoint == '/cryptor/test/file-encrypt'
|
||||
resp = app.get(endpoint, status=405)
|
||||
resp = app.post_json(endpoint, params={"foo": "bar"}, status=403)
|
||||
assert resp.json['err'] == 1
|
||||
assert 'PermissionDenied' in resp.json['err_class']
|
||||
|
||||
endpoint = utils.generic_endpoint_url('cryptor', 'file-decrypt', slug=cryptor.slug) + '/uuid'
|
||||
assert endpoint == '/cryptor/test/file-decrypt/uuid'
|
||||
resp = app.post_json(endpoint, params={"foo": "bar"}, status=405)
|
||||
resp = app.get(endpoint, status=403)
|
||||
assert resp.json['err'] == 1
|
||||
assert 'PermissionDenied' in resp.json['err_class']
|
||||
|
||||
|
||||
def test_cryptor_bad_requests(app, cryptor):
|
||||
# full opened access
|
||||
api = ApiUser.objects.create(username='all', keytype='', key='')
|
||||
obj_type = ContentType.objects.get_for_model(cryptor)
|
||||
AccessRight.objects.create(codename='can_encrypt', apiuser=api, resource_type=obj_type,
|
||||
resource_pk=cryptor.pk)
|
||||
AccessRight.objects.create(codename='can_decrypt', apiuser=api, resource_type=obj_type,
|
||||
resource_pk=cryptor.pk)
|
||||
|
||||
endpoint = utils.generic_endpoint_url('cryptor', 'file-encrypt', slug=cryptor.slug)
|
||||
for bad_payload in ('error',
|
||||
{"foo": "bar"},
|
||||
["not", "a", "dict"],
|
||||
{"file": {"filename": "f", "content_type": "ct"}},
|
||||
{"file": {"filename": "f", "content_type": "ct", "content": None}},
|
||||
{"file": {"filename": "f", "content_type": "ct", "content": "NotBase64"}},
|
||||
):
|
||||
resp = app.post_json(endpoint, params=bad_payload, status=400)
|
||||
assert resp.json['err'] == 1
|
||||
|
||||
endpoint = utils.generic_endpoint_url('cryptor', 'file-decrypt', slug=cryptor.slug)
|
||||
endpoint = endpoint + '/bad-uuid'
|
||||
resp = app.get(endpoint, status=404)
|
||||
|
||||
|
||||
def test_cryptor_encrypt_decrypt(app, cryptor):
|
||||
api = ApiUser.objects.create(username='all', keytype='', key='')
|
||||
obj_type = ContentType.objects.get_for_model(cryptor)
|
||||
AccessRight.objects.create(codename='can_encrypt', apiuser=api, resource_type=obj_type,
|
||||
resource_pk=cryptor.pk)
|
||||
AccessRight.objects.create(codename='can_decrypt', apiuser=api, resource_type=obj_type,
|
||||
resource_pk=cryptor.pk)
|
||||
|
||||
# encrypt
|
||||
endpoint = utils.generic_endpoint_url('cryptor', 'file-encrypt', slug=cryptor.slug)
|
||||
content = force_text(base64.b64encode(b'this is foo and bar'))
|
||||
payload = {"file": {"filename": "foo.txt", "content_type": "text/plain", "content": content}}
|
||||
|
||||
resp = app.post_json(endpoint, params=payload, status=200)
|
||||
assert resp.json['err'] == 0
|
||||
assert CryptedFile.objects.count() == 1
|
||||
cfile = CryptedFile.objects.first()
|
||||
assert resp.json['data']['redirect_url'].endswith(
|
||||
'/cryptor/%s/file-decrypt/%s' % (cryptor.slug, cfile.uuid))
|
||||
cfile.delete()
|
||||
|
||||
# encrypt with another redirect url
|
||||
cryptor.redirect_url_base = 'https://foo/bar/'
|
||||
cryptor.save()
|
||||
resp = app.post_json(endpoint, params=payload, status=200)
|
||||
assert resp.json['err'] == 0
|
||||
assert CryptedFile.objects.count() == 1
|
||||
cfile = CryptedFile.objects.first()
|
||||
assert resp.json['data']['redirect_url'] == 'https://foo/bar/%s' % cfile.uuid
|
||||
|
||||
# remove public key = cannot encrypt
|
||||
cryptor.public_key = ''
|
||||
cryptor.save()
|
||||
resp = app.post_json(endpoint, params=payload, status=200)
|
||||
assert resp.json['err'] == 1
|
||||
assert resp.json['err_desc'] == 'missing public key'
|
||||
|
||||
# decrypt
|
||||
endpoint = utils.generic_endpoint_url('cryptor', 'file-decrypt', slug=cryptor.slug)
|
||||
endpoint = endpoint + '/' + str(cfile.uuid)
|
||||
resp = app.get(endpoint, status=200)
|
||||
assert resp.content_type == 'text/plain'
|
||||
assert resp.content == b'this is foo and bar'
|
||||
assert resp.headers['Content-Disposition'] == 'inline; filename="foo.txt"'
|
||||
|
||||
# remove all files does not remove data+metadata(json) files: decrypt still works
|
||||
CryptedFile.objects.all().delete()
|
||||
assert CryptedFile.objects.count() == 0
|
||||
resp = app.get(endpoint, status=200)
|
||||
assert resp.content_type == 'text/plain'
|
||||
assert resp.content == b'this is foo and bar'
|
||||
assert resp.headers['Content-Disposition'] == 'inline; filename="foo.txt"'
|
||||
|
||||
# remove private key = cannot decrypt
|
||||
cryptor.private_key = ''
|
||||
cryptor.save()
|
||||
endpoint = endpoint + '/' + str(cfile.uuid)
|
||||
resp = app.get(endpoint, status=200)
|
||||
assert resp.json['err'] == 1
|
||||
assert resp.json['err_desc'] == 'missing private key'
|
Loading…
Reference in New Issue