100% test coverage, dict strings are truncated when dumped, error messages homogeneized, and more

Features:
    * replaced the loging of json payload with the use of a DictDumper that lazy truncate strings too long
    * homogenize error messages

Fixes:
    * changed the signature of 'check_file_dict()' to match other functions and fix a bug
    * changed the signature of 'upload2ForwardFile()' to force input for 'type_fichier' argument

Refactoring:
    * removed dead code in 'upload_user_files()'
    * using 'basestring' instead of testing for 'str' and 'unicode'
    * renamed 'requerant' by 'demandeur'
    * removed useless and maybe problematic 'join()' (forgotten from the previous commit)
    * added more comments

Tests:
    * 100% test coverage!
    * fixed failing tests
    * added tests for every function
    * completed missing tests (mostly for exceptions)
    * added more comments
    * added the lines of missing statement in the coverage report
This commit is contained in:
Michael Bideau 2019-07-25 14:29:06 +02:00
parent 571316fcf4
commit 98cd089254
3 changed files with 531 additions and 197 deletions

View File

@ -1,2 +1,5 @@
[run]
omit = */south_migrations/*
[report]
show_missing = True

View File

@ -96,7 +96,7 @@ def get_file_data(path, b64=True):
return f.read()
def get_upload_path(instance, filename):
def get_upload_path(instance, filename=None):
"""Return a relative upload path for a file."""
# be careful:
# * openADS accept only filename less than 50 chars
@ -107,70 +107,44 @@ def get_upload_path(instance, filename):
)
def dict_deref_multi(data, keys):
"""Access a dict through a list of keys.
Inspired by: https://stackoverflow.com/a/47969823
def trunc_str_values(value, limit, visited=None, truncate_text=u''):
"""Truncate a string value (not dict keys) and append a truncate text."""
if visited is None:
visited = []
if not value in visited:
if isinstance(value, basestring) and len(value) > limit:
value = value[:limit] + truncate_text
elif isinstance(value, dict) or isinstance(value, list) or isinstance(value, tuple):
visited.append(value)
iterator = value.iteritems() if isinstance(value, dict) else enumerate(value)
for k,v in iterator:
value[k] = trunc_str_values(v, limit, visited, truncate_text)
return value
class DictDumper(object):
"""Helper to dump a dictionary to a string representation with lazy processing.
Only applied when dict is converted to string (lazy processing):
- long strings truncated (after the dict has been 'deep' copied)
- (optionaly) dict converted with json.dumps instead of unicode().
"""
if keys:
key = int(keys[0]) if isinstance(data, list) else keys[0]
return dict_deref_multi(data[key], keys[1:]) if keys else data
# TODO implement wildcard filtering
def dict_replace_content_by_paths(dic, paths_to_replace=[], path_separator='/', replace_by='<file content>'):
"""Replace a dict content, with a string, from a list of paths."""
newdic = dic
if newdic and (isinstance(newdic, dict) or isinstance(newdic, list)):
for p in paths_to_replace:
if p:
if p.startswith(path_separator):
p = p[1:]
items = p.split(path_separator)
if items:
try:
value = dict_deref_multi(newdic, items)
except KeyError:
continue
except TypeError:
continue
if not isinstance(value, str) and not isinstance(value, unicode):
raise ValueError(u"path '%s' is not a string ([%s] %s)" % (p, type(value), value))
# newdic is still untouched
if newdic is dic:
# create a copy of it
newdic = copy.deepcopy(dic)
last_item = items.pop(-1)
value = dict_deref_multi(newdic, items)
value[last_item] = replace_by
return newdic
class LogJsonPayloadWithFileContent(object):
""" Return a string representation of a JSON payload with its file content emptyed."""
def __init__(self, payload, paths_to_replace=[], path_separator='/', replace_by='<data>'):
def __init__(self, dic, max_str_len=255, use_json_dumps=True):
""" arguments:
- payload string the JSON payload to dump
- paths_to_replace array a list of strings either:
* in 'path' format (i.e.: ['/fields/file/content',...])
* in 'namespace' format (i.e.: ['fields.file.content',...])
- path_separator string the path separator used in 'paths_to_replace' format:
* '/' for 'path' format
* '.' for 'namespace' format
- replace_by string the new value of the content at the specified path
- dic string the dict to dump
- max_str_len integer the maximul length of string values
- use_json_dumps boolean True to use json.dumps() else it uses unicode()
"""
self.payload = payload
self.paths_to_replace = paths_to_replace
self.path_separator = path_separator
self.replace_by = replace_by
self.dic = dic
self.max_str_len = max_str_len
self.use_json_dumps = use_json_dumps
def __str__(self):
return json.dumps(dict_replace_content_by_paths(
self.payload,
paths_to_replace = self.paths_to_replace,
path_separator = self.path_separator,
replace_by = self.replace_by
))
dict_trunc = trunc_str_values(copy.deepcopy(self.dic), self.max_str_len)
dict_ref = json.dumps(dict_trunc) if self.use_json_dumps else dict_trunc
return unicode(dict_ref)
class ForwardFile(models.Model):
@ -206,18 +180,10 @@ class AtrealOpenads(BaseResource, HTTPResource):
verbose_name = _('openADS')
def log_json_payload(self, payload, title='payload', paths_to_replace=[]):
def log_json_payload(self, payload, title='payload', max_str_len=100):
"""Log a json paylod surrounded by dashes and with file content filtered."""
self.logger.debug(u"----- %s (begining) -----", title)
if paths_to_replace:
self.logger.debug(u"%s", LogJsonPayloadWithFileContent(
payload,
paths_to_replace = paths_to_replace,
path_separator = u'.',
replace_by = u'<b64 content>'
))
else:
self.logger.debug(u"%s", LogJsonPayloadWithFileContent(payload))
self.logger.debug(u"%s", DictDumper(payload, max_str_len))
self.logger.debug(u"----- %s (end) -----", title)
@ -230,7 +196,7 @@ class AtrealOpenads(BaseResource, HTTPResource):
raise APIError(u"Expecting '%s' key in JSON %s" %
('files', title))
files = payload.get('files')
files = payload['files']
if not isinstance(files, list):
self.log_json_payload(payload, title)
@ -244,15 +210,13 @@ class AtrealOpenads(BaseResource, HTTPResource):
('files', title))
# log the response
self.log_json_payload(
payload, title
,['files.%i.b64_content' % i for i in range(0, len(files))])
self.log_json_payload(payload, title)
# return the files
return files
def check_file_dict(self, dict_file, b64=True, title='payload'):
def check_file_dict(self, dict_file, title='payload', b64=True):
"""Ensure a file dict has all its required items."""
# key to get the content
@ -264,24 +228,20 @@ class AtrealOpenads(BaseResource, HTTPResource):
# check content existence
if content_key not in dict_file:
raise APIError(u"Expecting 'files.0.%s' key in JSON %s" % (content_key, title))
raise APIError(u"Expecting 'file.%s' key in JSON %s" % (content_key, title))
# get its content
file_content = dict_file[content_key]
if not isinstance(file_content, str) and not isinstance(file_content, unicode):
if not isinstance(file_content, basestring):
raise APIError(
u"Expecting '%s' value in JSON %s to be a %s (not a %s)" %
u"Expecting '%s' value in JSON %s in file dict to be a %s (not a %s)" %
('file.%s' % content_key, title, 'string', type(file_content)))
# check filename
if (
'filename' in dict_file
and not isinstance(dict_file['filename'], str)
and not isinstance(dict_file['filename'], unicode)
):
if 'filename' in dict_file and not isinstance(dict_file['filename'], basestring):
raise APIError(
u"Expecting '%s' value in file dict to be a %s (not a %s)" %
u"Expecting '%s' value in JSON %s in file dict to be a %s (not a %s)" %
('file.filename', title, 'string', type(dict_file['filename'])))
@ -296,7 +256,7 @@ class AtrealOpenads(BaseResource, HTTPResource):
# asked to check its content
if ensure_content:
self.check_file_dict(first, title, b64)
self.check_file_dict(first, title=title, b64=b64)
# return the first file
return first
@ -346,23 +306,8 @@ class AtrealOpenads(BaseResource, HTTPResource):
# loads the request body as JSON content
json_data = json.loads(request.body)
# every field key that might contain a file content
file_keys = ['cerfa'] + ['plan_cadastral_%s' % i for i in range(1,5)] + ['pouvoir_mandat']
# detect if there evolutions data to filter them out of logging
# TODO replace when wildcard filtering is implement in 'dict_replace_content_by_paths()'
evolution_to_filter = []
if 'evolution' in json_data and isinstance(json_data['evolution'], list):
for e, evol in enumerate(json_data['evolution']):
if isinstance(evol, dict) and 'parts' in evol and isinstance(evol['parts'], list):
for p, part in enumerate(evol['parts']):
if isinstance(part, dict) and 'data' in part:
evolution_to_filter.append('evolution.%s.parts.%s.data' % (e, p))
# log the request body (filtering the files content)
self.log_json_payload(
json_data, 'request',
['fields.%s.content' % k for k in file_keys] + evolution_to_filter)
self.log_json_payload(json_data, 'request')
# build the payload
payload = { "collectivite": self.collectivite }
@ -391,19 +336,19 @@ class AtrealOpenads(BaseResource, HTTPResource):
"numero" : normalize(ref[2])
})
# setup requerant variable prefix
# setup demandeur variable prefix
prefixes = {"demandeurs": ''}
if normalize(json_data['fields']['proprietaire']) != 'Oui':
prefixes["mandataires"] = 'mandataire_'
# for each type of requerant with associated prefix
# for each type of demandeur with associated prefix
for key,prefix in prefixes.items():
# "qualité" of the requerant
# "qualité" of the demandeur
qualite = normalize(json_data['fields']['%squalite' % prefix])
# get the requerant informations
requerant = {
# get the demandeur informations
demandeur = {
"type_personne": 'particulier' if qualite == 'Un particulier' else 'personne_morale',
"typologie" : 'petitionnaire' if key == 'demandeurs' else 'delegataire',
"nom" : normalize(json_data['fields']['%snom' % prefix]),
@ -416,24 +361,27 @@ class AtrealOpenads(BaseResource, HTTPResource):
}
}
# add fields if the requerant is not an individual
# add fields if the demandeur is not an individual
if qualite != 'Un particulier':
requerant["raison_sociale"] = normalize(json_data['fields']['%sraison_sociale' % prefix])
requerant["denomination"] = normalize(json_data['fields']['%sdenomination' % prefix])
self.logger.debug("%s %s => '%s', '%s'", requerant['prenom'], requerant['nom'], requerant['raison_sociale'], requerant['denomination'])
demandeur["raison_sociale"] = normalize(json_data['fields']['%sraison_sociale' % prefix])
demandeur["denomination"] = normalize(json_data['fields']['%sdenomination' % prefix])
self.logger.debug("%s %s => '%s', '%s'", demandeur['prenom'], demandeur['nom'], demandeur['raison_sociale'], demandeur['denomination'])
# add optional lieu_dit field
if '%slieu_dit' % prefix in json_data['fields'] and json_data['fields']['%slieu_dit' % prefix]:
requerant["adresse"]["lieu_dit"] = normalize(json_data['fields']['%slieu_dit' % prefix])
demandeur["adresse"]["lieu_dit"] = normalize(json_data['fields']['%slieu_dit' % prefix])
# add it to the payload
payload[key] = [requerant]
payload[key] = [demandeur]
self.logger.debug(u"Added '%s' to payload: %s %s", key, requerant['prenom'], requerant['nom'])
self.logger.debug(u"Added '%s' to payload: %s %s", key, demandeur['prenom'], demandeur['nom'])
# log the payload
self.log_json_payload(payload)
# every field key that might contain a file content
file_keys = ['cerfa'] + ['plan_cadastral_%s' % i for i in range(1,5)] + ['pouvoir_mandat']
# prepare files that will be forwarded
files = []
for k in file_keys:
@ -443,18 +391,37 @@ class AtrealOpenads(BaseResource, HTTPResource):
and isinstance(json_data['fields'][k], dict)
and 'content' in json_data['fields'][k]
):
# get the content decoded from base 64
content = base64.b64decode(json_data['fields'][k]['content'])
# guess the mime type based on the begining of the content
content_type = magic.from_buffer(content, mime=True)
# set it as an upload
upload_file = ContentFile(content)
# build a hash from the upload
file_hash = self.file_digest(upload_file)
# build a filename (less than 50 chars)
filename = file_hash[45:] + '.pdf'
# get the content type if specified
if 'content_type' in json_data['fields'][k]:
content_type = json_data['fields'][k]['content_type']
# check the content type is PDF for file of type CERFA
if k == 'cerfa' and content_type != 'application/pdf':
self.logger.warning("CERFA content type is '%s' instead of '%s'", content_type, 'application/pdf')
# get the filename if specified
if 'filename' in json_data['fields'][k]:
filename = json_data['fields'][k]['filename']
# set the type fichier based on the key (less than 10 chars)
type_fichier = re.sub(r'_.*$', '', k)[:10]
# append the file to the list
files.append({
'type_fichier' : type_fichier,
'orig_filename': filename,
@ -517,7 +484,7 @@ class AtrealOpenads(BaseResource, HTTPResource):
numero_dossier = result.get('numero_dossier')
if not isinstance(numero_dossier, str) and not isinstance(numero_dossier, unicode):
if not isinstance(numero_dossier, basestring):
raise APIError(
u"Expecting '%s' value in JSON response to be a %s (not a %s)" %
('numero_dossier', 'string', type(numero_dossier)))
@ -608,7 +575,7 @@ class AtrealOpenads(BaseResource, HTTPResource):
return response.json()
def upload2ForwardFile(self, path, numero_dossier, type_fichier='CERFA'):
def upload2ForwardFile(self, path, numero_dossier, type_fichier):
"""Convert a file path to a ForwardFile."""
if path:
rand_id = base64.urlsafe_b64encode(os.urandom(6))
@ -763,9 +730,7 @@ class AtrealOpenads(BaseResource, HTTPResource):
raise APIError(u'No JSON content returned: %r' % response.content[:1000])
# log the response (filtering the file content)
self.log_json_payload(
result, 'response',
['files.%i.b64_content' % i for i in range(0, len(result['files']))])
self.log_json_payload(result, 'response')
# get the courrier
courrier = self.get_first_file_from_json_payload(result, title='response')
@ -774,7 +739,7 @@ class AtrealOpenads(BaseResource, HTTPResource):
try:
courrier_content = base64.b64decode(courrier['b64_content'])
except TypeError:
raise APIError('Invalid content for courrier')
raise APIError('Failed to decode courrier content from base 64')
# return the 'courrier' file
return {'courrier': courrier}
@ -807,11 +772,11 @@ class AtrealOpenads(BaseResource, HTTPResource):
# TODO ask for openADS.API to *always* send JSON formatted errors, not HTML ones
# return a string representing the HTTP error (filtering the HTML tags and multispaces)
return u"HTTP error: %s, %s" % \
(response.status_code,
clean_spaces(strip_tags(response.content[:1000])) if response.content else '')
detail = clean_spaces(strip_tags(response.content[:1000])) if response.content else ''
return u"HTTP error: %s%s" % (response.status_code, ', ' + detail if detail else '')
# @raise ForwareFile.DoesNotExist if not found
def upload_user_files(self, type_dossier, numero_dossier, file_ids):
"""A Job to forward user uploaded files to openADS."""
@ -849,19 +814,13 @@ class AtrealOpenads(BaseResource, HTTPResource):
# append the forwarded file to the list
fwd_files.append(fwd_file)
# not found the forwarded file specified: bug
else:
self.logger.warning(u"upload_user_files() failed to find ForwardFile file_id: %s", fid)
# if files need to be forwarded
if payload:
self.logger.debug("upload_user_files() payload is not empty")
# log the payload
self.log_json_payload(
payload, 'payload',
['%i.b64_content' % i for i in range(0, len(fwd_files))])
self.log_json_payload(payload, 'payload')
# make the request to openADS.API (with a specific timeout)
url = urlparse.urljoin(self.openADS_API_url, '/dossier/%s/%s/files' % (type_dossier, numero_dossier))
@ -908,7 +867,7 @@ class AtrealOpenads(BaseResource, HTTPResource):
self.logger.warning(
u"upload_user_files() openADS response is not JSON valid for dossier '%s' and files '%s'",
numero_dossier,
','.join([f.id for f in fwd_files])
fwd_files
)
# response correctly loaded as JSON
@ -940,7 +899,7 @@ class AtrealOpenads(BaseResource, HTTPResource):
self.logger.warning(
u"upload_user_files() payload is empty for dossier '%s' and files '%s'",
numero_dossier,
','.join(file_ids)
file_ids
)

View File

@ -22,7 +22,17 @@ from django.core.files import File
from passerelle.utils.jsonresponse import APIError
from passerelle.base.models import Job
from atreal_openads.models import AtrealOpenads, ForwardFile
from atreal_openads.models import (
strip_tags,
clean_spaces,
normalize,
get_file_data,
get_upload_path,
trunc_str_values,
DictDumper,
AtrealOpenads,
ForwardFile
)
CONNECTOR_NAME = 'atreal-openads'
@ -41,14 +51,6 @@ TEST_FILE_CERFA_DIA = os.path.join(RESOURCES_DIR, 'cerfa_10072-02.pdf')
TEST_FILE_PLAN_CADASTRAL = os.path.join(RESOURCES_DIR, 'plancadastral.pdf')
def get_file_data(path, b64=True):
"""Return the content of a file as a string, in base64 if specified."""
with open(path, 'r') as f:
if b64:
return base64.b64encode(f.read())
return f.read()
@pytest.fixture
def atreal_openads(db):
return AtrealOpenads.objects.create(
@ -60,6 +62,194 @@ def atreal_openads(db):
)
def test_strip_tags():
s = 'aaa b cc '
assert strip_tags(s) == s
ss = s + '<em>dd'
assert strip_tags(ss) == s + 'dd'
ss = s + '<em>dd</em>'
assert strip_tags(ss) == s + 'dd'
ss = s + '<em>dd</em>'
assert strip_tags(ss) == s + 'dd'
ss = s + ' 1 < 3'
assert strip_tags(ss) == s + ' 1 < 3'
def test_clean_spaces():
s = 'aaa b cc '
assert clean_spaces(s) == 'aaa b cc'
s = 'a\ta b\nb c\rc d\\n\\r\\td'
assert clean_spaces(s) == 'a a b b c c d d'
def test_normalize():
assert normalize(None) == ''
s = 'aaa b cc '
assert normalize(s) == 'aaa b cc'
s = 'a\ta b\nb c\rc d\\n\\r\\td'
assert normalize(s) == 'a a b b c c d d'
def test_get_file_data():
assert get_file_data(TEST_FILE_CERFA_DIA) == base64.b64encode(open(TEST_FILE_CERFA_DIA).read())
assert get_file_data(TEST_FILE_CERFA_DIA, b64=False) == open(TEST_FILE_CERFA_DIA).read()
def test_get_upload_path(app, atreal_openads):
FF = atreal_openads.upload2ForwardFile(TEST_FILE_CERFA_DIA, FAKE_NUMERO_DOSSIER, 'cerfa')
assert re.search(
r"^pass_openADS_up_%s_%s$" %
('[0-9]{4}-[A-Z][a-z]{2}-[0-9]{2}_[0-9]{2}h[0-9]{2}m[0-9]{2}s[0-9]+', 'cc90'),
get_upload_path(FF))
def test_trunc_str_values():
d = {}
assert trunc_str_values(d, 10) == d
d = {'a': '123456789'}
assert trunc_str_values(d, 0) == {'a': u''}
d = {'a': '123456789'}
assert trunc_str_values(d, 1) == {'a': u'1…'}
d = {'a': '123456789'}
assert trunc_str_values(d, 2) == {'a': u'12…'}
d = {'a': '123456789'}
assert trunc_str_values(d, 5) == {'a': u'12345…'}
d = {'a': '123456789'}
assert trunc_str_values(d, 8) == {'a': u'12345678…'}
d = {'a': '123456789'}
assert trunc_str_values(d, 9) == {'a': u'123456789'}
d = {'a': '123456789'}
assert trunc_str_values(d, 10) == d
d = {'a': '123456789', 'b123456789': '987654321'}
assert trunc_str_values(d, 5) == {'a': u'12345…', 'b123456789': u'98765…'}
d = {'a': '123456789', 'b123456789': '987654321', 'c': {'c1':'ABCDEFGHIJK'}}
assert trunc_str_values(d, 5) == {'a': u'12345…', 'b123456789': u'98765…', 'c': {'c1': u'ABCDE…'}}
d = {'a': '123456789', 'b123456789': '987654321', 'c': {'c1':'ABCDEFGHIJK'}, 'd': ['123456789']}
assert trunc_str_values(d, 5) == {'a': u'12345…', 'b123456789': u'98765…', 'c': {'c1': u'ABCDE…'}, 'd': [u'12345…']}
d = {'a': '123456789', 'b123456789': '987654321', 'c': {'c1':'ABCDEFGHIJK'}, 'd': ['123456789', {'eeeeeeeeee':'132456789'}]}
assert trunc_str_values(d, 5) == {'a': u'12345…', 'b123456789': u'98765…', 'c': {'c1': u'ABCDE…'}, 'd': [u'12345…', {'eeeeeeeeee': u'13245…'}]}
def test_dict_dumper():
d = {}
dd = DictDumper(d, use_json_dumps=False)
assert d == dd.dic
assert unicode(d) == unicode(dd)
dd = DictDumper(d, 0, use_json_dumps=False)
assert d == dd.dic
assert unicode(d) == unicode(dd)
d = {'a': '123456789'}
dd = DictDumper(d, 10, use_json_dumps=False)
assert d == dd.dic
assert unicode(d) == unicode(dd)
dd = DictDumper(d, 5, use_json_dumps=False)
assert d == dd.dic
assert unicode(dd) == unicode({'a': u'12345…'})
dd = DictDumper(d, 5, use_json_dumps=True)
assert d == dd.dic
assert unicode(dd) == u'{"a": "12345\\u2026"}'
def test_openads_log_json_payload(app, atreal_openads):
# TODO implement
assert True
# change the debug file path
# check that the function log to it
# check that what was is logged is correct
def test_openads_get_files_from_json_payload(app, atreal_openads):
title = 'payload'
assert atreal_openads.get_files_from_json_payload({'files':[{'a':'file'}]}) == [{'a':'file'}]
with pytest.raises(APIError) as e:
atreal_openads.get_files_from_json_payload({})
assert unicode(e.value) == u"Expecting '%s' key in JSON %s" % ('files', title)
with pytest.raises(APIError) as e:
atreal_openads.get_files_from_json_payload({'files': 'invalid'})
assert unicode(e.value) == u"Expecting '%s' value in JSON %s to be a %s (not a %s)" % (
'files', title, 'list', type(''))
with pytest.raises(APIError) as e:
atreal_openads.get_files_from_json_payload({'files': {'i':'invalid'}})
assert unicode(e.value) == u"Expecting '%s' value in JSON %s to be a %s (not a %s)" % (
'files', title, 'list', type({}))
with pytest.raises(APIError) as e:
atreal_openads.get_files_from_json_payload({'files': []})
assert unicode(e.value) == u"Expecting non-empty '%s' value in JSON %s" % ('files', title)
def test_check_file_dict(app, atreal_openads):
title = 'payload'
d = {
'content': get_file_data(TEST_FILE_CERFA_DIA, b64=False),
'filename': os.path.basename(TEST_FILE_CERFA_DIA),
'content_type': 'application/pdf'
}
d64 = {
'b64_content': get_file_data(TEST_FILE_CERFA_DIA, b64=True),
'filename': os.path.basename(TEST_FILE_CERFA_DIA),
'content_type': 'application/pdf'
}
assert atreal_openads.check_file_dict(d, b64=False) is None
assert atreal_openads.check_file_dict(d64) is None
d['filename'] = {'a','filename'}
with pytest.raises(APIError) as e:
atreal_openads.check_file_dict(d, b64=False)
assert unicode(e.value) == u"Expecting '%s' value in JSON %s in file dict to be a %s (not a %s)" % (
'file.filename', title, 'string', type(d['filename']))
d['content'] = {'a','filename'}
with pytest.raises(APIError) as e:
atreal_openads.check_file_dict(d, b64=False)
assert unicode(e.value) == u"Expecting '%s' value in JSON %s in file dict to be a %s (not a %s)" % (
'file.content', title, 'string', type(d['content']))
del(d['content'])
with pytest.raises(APIError) as e:
atreal_openads.check_file_dict(d, b64=False)
assert unicode(e.value) == u"Expecting 'file.%s' key in JSON %s" % ('content', title)
del(d64['b64_content'])
with pytest.raises(APIError) as e:
atreal_openads.check_file_dict(d, b64=True)
assert unicode(e.value) == u"Expecting 'file.%s' key in JSON %s" % ('b64_content', title)
def test_get_first_file_from_json_payload(app, atreal_openads):
title = 'payload'
d = {
'files': [{
'content': get_file_data(TEST_FILE_CERFA_DIA, b64=False),
'filename': os.path.basename(TEST_FILE_CERFA_DIA),
'content_type': 'application/pdf'
}]
}
assert atreal_openads.get_first_file_from_json_payload(
d, title, ensure_content=True, b64=False) == d['files'][0]
def test_openads_check_status(app, atreal_openads):
fake_resp_json = {
'message': 'Service online'
@ -74,47 +264,74 @@ def test_openads_check_status(app, atreal_openads):
def test_openads_create_dossier(app, atreal_openads):
fake_req_json = {
"fields": {
"autres_parcelles": True,
"cerfa": {
"content": get_file_data(TEST_FILE_CERFA_DIA),
"content_type": "application/pdf",
"field_id": "50",
"filename": os.path.basename(TEST_FILE_CERFA_DIA)
},
"code_postal": "13004",
"localite": "Marseille",
"nom": "Loulou",
"nom_voie": "Avenue de la Blaque",
"numero_voie": "52",
"plan_cadastral_1": {
"content": get_file_data(TEST_FILE_PLAN_CADASTRAL),
"content_type": "application/pdf",
"field_id": "113",
"filename": os.path.basename(TEST_FILE_PLAN_CADASTRAL)
},
# proprietaire
"proprietaire" : "Non",
"proprietaire_raw": "Non",
# mandataire
"mandataire_prenom" : "John",
"mandataire_nom" : "Man",
"mandataire_qualite" : "Une personne morale",
"mandataire_qualite_raw" : "Une personne morale",
"mandataire_denomination" : "SELARL",
"mandataire_raison_sociale" : "Super Juriste",
"mandataire_numero_voie": "808",
"mandataire_nom_voie" : "Avenue de l'argent",
"mandataire_lieu_dit" : "geoisbour",
"mandataire_code_postal": "13004",
"mandataire_localite" : "Marseille",
# petitionnaire
"prenom": "Toto",
"proprietaire": "Oui",
"proprietaire_qualite": "Un particulier",
"proprietaire_qualite_raw": "Un particulier",
"proprietaire_raw": "Oui",
"reference_cadastrale": [
[
"999",
"Z",
"0010"
]
],
"references_cadastrales": [
[
"123",
"D",
"9874"
]
],
"nom" : "Loulou",
"qualite" : "Un particulier",
"qualite_raw": "Un particulier",
"numero_voie": "52",
"nom_voie" : "Avenue de la Blaque",
"lieu_dit" : "tierquar",
"code_postal": "13004",
"localite" : "Marseille",
# terrain
"terrain_numero_voie": "23",
"terrain_nom_voie" : "Boulevard de la République",
"terrain_lieu_dit" : "Leecorne",
"terrain_code_postal": "13002",
"terrain_localite": "Marseille",
"terrain_nom_voie": "Boulevard de la Linule",
"terrain_numero_voie": "23"
"terrain_localite" : "Marseille",
# références cadastrales
"reference_cadastrale" : [ ["999", "Z", "0010"] ],
"autres_parcelles" : True,
"references_cadastrales": [ ["123", "D", "9874"] ],
# user attached files
"cerfa": {
"content" : get_file_data(TEST_FILE_CERFA_DIA),
"content_type": "invalid/content type",
"field_id" : "50",
"filename" : os.path.basename(TEST_FILE_CERFA_DIA)
},
"plan_cadastral_1": {
"content" : get_file_data(TEST_FILE_PLAN_CADASTRAL),
"content_type": "application/pdf",
"filename" : os.path.basename(TEST_FILE_PLAN_CADASTRAL)
},
"plan_cadastral_2": {
"content" : get_file_data(TEST_FILE_PLAN_CADASTRAL),
"content_type": "application/pdf",
"filename" : os.path.basename(TEST_FILE_PLAN_CADASTRAL)
},
"pouvoir_mandat": {
"content" : get_file_data(TEST_FILE_CERFA_DIA),
"content_type": "application/pdf",
"filename" : 'mandat.pdf'
}
}
}
req = HttpRequest()
@ -130,6 +347,16 @@ def test_openads_create_dossier(app, atreal_openads):
req.META = {}
req._read_started = False
fake_resp_bad = Response()
fake_resp_bad.status_code = 502
fake_resp_bad.reason = 'Bad gateway'
with pytest.raises(APIError) as e:
with mock.patch('passerelle.utils.Request.post') as requests_post:
requests_post.return_value = fake_resp_bad
atreal_openads.create_dossier(req, 'DIA')
assert unicode(e.value) == "HTTP error: 502"
fake_resp_json = {
'numero_dossier' : FAKE_NUMERO_DOSSIER,
'files': [{
@ -153,6 +380,38 @@ def test_openads_create_dossier(app, atreal_openads):
assert jresp['recepisse']['content_type'] == 'application/pdf'
assert jresp['recepisse']['filename'] == fake_resp_json['files'][0]['filename']
fake_resp_json['numero_dossier'] = {'a':'invalid type'}
fake_resp._content = json.dumps(fake_resp_json)
with pytest.raises(APIError) as e:
with mock.patch('passerelle.utils.Request.post') as requests_post:
requests_post.return_value = fake_resp
atreal_openads.create_dossier(req, 'DIA')
assert unicode(e.value) == u"Expecting '%s' value in JSON response to be a %s (not a %s)" % (
'numero_dossier', 'string', type({}))
del(fake_resp_json['numero_dossier'])
fake_resp._content = json.dumps(fake_resp_json)
with pytest.raises(APIError) as e:
with mock.patch('passerelle.utils.Request.post') as requests_post:
requests_post.return_value = fake_resp
atreal_openads.create_dossier(req, 'DIA')
assert unicode(e.value) == u"Expecting 'numero_dossier' key in JSON response"
fake_resp_json['files'][0]['b64_content'] = 'invalid_;{[content}'
fake_resp._content = json.dumps(fake_resp_json)
with pytest.raises(APIError) as e:
with mock.patch('passerelle.utils.Request.post') as requests_post:
requests_post.return_value = fake_resp
atreal_openads.create_dossier(req, 'DIA')
assert unicode(e.value) == u'Failed to decode recepisse content from base 64'
fake_resp._content = 'df[{gfd;g#vfd'
with pytest.raises(APIError) as e:
with mock.patch('passerelle.utils.Request.post') as requests_post:
requests_post.return_value = fake_resp
atreal_openads.create_dossier(req, 'DIA')
assert unicode(e.value) == u'No JSON content returned: %r' % fake_resp._content
job = Job.objects.filter(natural_id=FAKE_NUMERO_DOSSIER).last()
assert job
job_id = job.id
@ -162,7 +421,7 @@ def test_openads_create_dossier(app, atreal_openads):
assert job.parameters is not None
assert len(job.parameters) == 3
assert 'file_ids' in job.parameters
assert len(job.parameters['file_ids']) == 2
assert len(job.parameters['file_ids']) == 4
file_ids = job.parameters['file_ids']
FFs = ForwardFile.objects.filter(id__in=file_ids)
@ -192,6 +451,16 @@ def test_openads_create_dossier(app, atreal_openads):
def test_openads_get_dossier(app, atreal_openads):
fake_resp_bad = Response()
fake_resp_bad.status_code = 502
fake_resp_bad.reason = 'Bad gateway'
with pytest.raises(APIError) as e:
with mock.patch('passerelle.utils.Request.get') as requests_get:
requests_get.return_value = fake_resp_bad
atreal_openads.get_dossier(None, 'DIA', FAKE_NUMERO_DOSSIER)
assert unicode(e.value) == "HTTP error: 502"
fake_resp_json = {
'etat' : u"Non préemption en cours",
'date_depot' : "24/04/2019",
@ -214,6 +483,13 @@ def test_openads_get_dossier(app, atreal_openads):
assert jresp['decision'] == fake_resp_json['decision']
assert jresp['date_limite_instruction'] == fake_resp_json['date_limite_instruction']
fake_resp._content = 'df[{gfd;g#vfd'
with pytest.raises(APIError) as e:
with mock.patch('passerelle.utils.Request.get') as requests_get:
requests_get.return_value = fake_resp
atreal_openads.get_dossier(None, 'DIA', FAKE_NUMERO_DOSSIER)
assert unicode(e.value) == u'No JSON content returned: %r' % fake_resp._content
fake_resp_json = {
'errors' : [{
'location' : 'path',
@ -221,28 +497,25 @@ def test_openads_get_dossier(app, atreal_openads):
'description' : '"invalid_type" is not one of DIA, PC, DP, AT, PD'
}]
}
fake_resp = Response()
fake_resp.status_code = 404
fake_resp.headers = {'Content-Type': 'application/json'}
fake_resp.encoding = 'utf-8'
fake_resp.reason = 'Resource not found'
fake_resp._content = json.dumps(fake_resp_json)
with pytest.raises(APIError) as e:
with mock.patch('passerelle.utils.Request.get') as requests_get:
requests_get.return_value = fake_resp
jresp = atreal_openads.get_dossier(None, 'invalid_type', FAKE_NUMERO_DOSSIER)
assert re.search(r'^HTTP error: 404, \[path\] \(Invalid Type\) "invalid_type" is not one of DIA, PC, DP, AT, PD$', str(e.value))
atreal_openads.get_dossier(None, 'invalid_type', FAKE_NUMERO_DOSSIER)
assert unicode(e.value) == u"HTTP error: 404, [path] (Invalid Type) \"invalid_type\" is not one of DIA, PC, DP, AT, PD"
def test_openads_upload2ForwardFile(app, atreal_openads):
FF = atreal_openads.upload2ForwardFile(None, None)
FF = atreal_openads.upload2ForwardFile(None, None, None)
assert FF is None
FF = atreal_openads.upload2ForwardFile(TEST_FILE_CERFA_DIA, FAKE_NUMERO_DOSSIER)
FF = atreal_openads.upload2ForwardFile(TEST_FILE_CERFA_DIA, FAKE_NUMERO_DOSSIER, 'cerfa')
assert isinstance(FF, ForwardFile)
assert len(FF.numero_demande) > 0
assert FF.numero_dossier == FAKE_NUMERO_DOSSIER
assert FF.type_fichier == 'CERFA'
assert FF.type_fichier == 'cerfa'
assert FF.orig_filename == os.path.basename(TEST_FILE_CERFA_DIA)
assert FF.content_type == 'application/pdf'
assert len(FF.file_hash) > 0
@ -262,15 +535,19 @@ def test_openads_upload2ForwardFile(app, atreal_openads):
def test_openads_get_fwd_files(app, atreal_openads):
with pytest.raises(APIError) as e:
atreal_openads.get_fwd_files(None, FAKE_NUMERO_DOSSIER, fichier_id='not an integer')
assert unicode(e.value) == u"fichier_id must be an integer"
with pytest.raises(Http404) as e:
atreal_openads.get_fwd_files(None, FAKE_NUMERO_DOSSIER, fichier_id=18)
assert re.search(r"^No file matches 'numero_dossier=[^']+' and 'id=[^']+'.$", str(e.value))
assert unicode(e.value) == u"No file matches 'numero_dossier=%s' and 'id=%s'." % (FAKE_NUMERO_DOSSIER, 18)
resp404 = atreal_openads.get_fwd_files(None, FAKE_NUMERO_DOSSIER, fichier_id=None)
assert resp404 is not None
assert len(resp404) == 0
resp_empty = atreal_openads.get_fwd_files(None, FAKE_NUMERO_DOSSIER, fichier_id=None)
assert resp_empty is not None
assert len(resp_empty) == 0
FF = atreal_openads.upload2ForwardFile(TEST_FILE_CERFA_DIA, FAKE_NUMERO_DOSSIER)
FF = atreal_openads.upload2ForwardFile(TEST_FILE_CERFA_DIA, FAKE_NUMERO_DOSSIER, 'cerfa')
FF.save()
assert isinstance(FF, ForwardFile)
@ -298,7 +575,7 @@ def test_openads_get_fwd_files_status(app, atreal_openads):
atreal_openads.get_fwd_files_status(None, FAKE_NUMERO_DOSSIER, fichier_id=18)
assert re.search(r"^No file matches 'numero_dossier=[^']+' and 'id=[^']+'.$", str(e.value))
FF = atreal_openads.upload2ForwardFile(TEST_FILE_CERFA_DIA, FAKE_NUMERO_DOSSIER)
FF = atreal_openads.upload2ForwardFile(TEST_FILE_CERFA_DIA, FAKE_NUMERO_DOSSIER, 'cerfa')
FF.save()
assert isinstance(FF, ForwardFile)
@ -324,6 +601,16 @@ def test_openads_get_fwd_files_status(app, atreal_openads):
def test_openads_get_courrier(app, atreal_openads):
fake_resp_bad = Response()
fake_resp_bad.status_code = 502
fake_resp_bad.reason = 'Bad gateway'
with pytest.raises(APIError) as e:
with mock.patch('passerelle.utils.Request.get') as requests_get:
requests_get.return_value = fake_resp_bad
atreal_openads.get_courrier(None, 'DIA', FAKE_NUMERO_DOSSIER)
assert unicode(e.value) == "HTTP error: 502"
fake_resp_json = {
'files': [{
'filename' : "instruction_4.pdf",
@ -344,22 +631,91 @@ def test_openads_get_courrier(app, atreal_openads):
assert jresp['courrier']['content_type'] == fake_resp_json['files'][0]['content_type']
assert jresp['courrier']['b64_content'] == fake_resp_json['files'][0]['b64_content']
fake_resp_json['files'][0]['b64_content'] = 'invalid_;{[content}'
fake_resp._content = json.dumps(fake_resp_json)
with pytest.raises(APIError) as e:
with mock.patch('passerelle.utils.Request.get') as requests_get:
requests_get.return_value = fake_resp
atreal_openads.get_courrier(None, 'DIA', FAKE_NUMERO_DOSSIER)
assert unicode(e.value) == u'Failed to decode courrier content from base 64'
fake_resp._content = 'df[{gfd;g#vfd'
with pytest.raises(APIError) as e:
with mock.patch('passerelle.utils.Request.get') as requests_get:
requests_get.return_value = fake_resp
atreal_openads.get_courrier(None, 'DIA', FAKE_NUMERO_DOSSIER)
assert unicode(e.value) == u'No JSON content returned: %r' % fake_resp._content
def test_get_response_error(app, atreal_openads):
fake_resp_json = {
'errors': [
{
'location' : 'entity.name',
'name' : 'constraint',
'description': 'Must start with an uppercase letter'
}
]
}
fake_resp = Response()
fake_resp.status_code = 404
fake_resp.headers = {'Content-Type': 'application/json'}
fake_resp.encoding = 'utf-8'
fake_resp.reason = 'Not Found'
fake_resp._content = json.dumps(fake_resp_json)
error_msg = atreal_openads.get_response_error(fake_resp)
expected_msg = u'[%s] (%s) %s' % (
fake_resp_json['errors'][0]['location'],
fake_resp_json['errors'][0]['name'],
fake_resp_json['errors'][0]['description']
)
assert error_msg == u"HTTP error: %s, %s" % (fake_resp.status_code, ','.join([expected_msg]))
fake_resp._content = 'invalid_;{[content}'
error_msg = atreal_openads.get_response_error(fake_resp)
assert error_msg == u"HTTP error: %s, %s" % (fake_resp.status_code, fake_resp._content)
def test_openads_upload_user_files(app, atreal_openads):
FF = atreal_openads.upload2ForwardFile(TEST_FILE_CERFA_DIA, FAKE_NUMERO_DOSSIER)
# TODO check logs (because this doesn't do anything but log)
atreal_openads.upload_user_files('DIA', FAKE_NUMERO_DOSSIER, file_ids=[])
with pytest.raises(ForwardFile.DoesNotExist) as e:
atreal_openads.upload_user_files('DIA', FAKE_NUMERO_DOSSIER, file_ids=[999])
assert unicode(e.value) == u"ForwardFile matching query does not exist."
FF = atreal_openads.upload2ForwardFile(TEST_FILE_CERFA_DIA, FAKE_NUMERO_DOSSIER, 'cerfa')
FF.save()
assert isinstance(FF, ForwardFile)
assert FF.upload_status == 'pending'
file_id = FF.id
assert file_id
fake_resp_json = "You want add some files on %s " % FAKE_NUMERO_DOSSIER
fake_resp_bad = Response()
fake_resp_bad.status_code = 502
fake_resp_bad.reason = 'Bad gateway'
with mock.patch('passerelle.utils.Request.post') as requests_post:
requests_post.return_value = fake_resp_bad
atreal_openads.upload_user_files('DIA', FAKE_NUMERO_DOSSIER, file_ids=[file_id])
FFup = ForwardFile.objects.get(id=file_id)
assert isinstance(FFup, ForwardFile)
for k in ['numero_dossier', 'type_fichier', 'file_hash', 'orig_filename', 'content_type']:
assert getattr(FFup, k) == getattr(FF, k)
assert FFup.upload_attempt == 1
assert FFup.upload_status == 'failed'
assert FFup.upload_msg == "HTTP error: 502"
fake_resp = Response()
fake_resp.status_code = 200
fake_resp.headers = {'Content-Type': 'application/json'}
fake_resp.encoding = 'utf-8'
fake_resp.reason = 'OK'
fake_resp._content = json.dumps(fake_resp_json)
fake_resp._content = 'invalid_;{[content}'
with mock.patch('passerelle.utils.Request.post') as requests_post:
requests_post.return_value = fake_resp
atreal_openads.upload_user_files('DIA', FAKE_NUMERO_DOSSIER, file_ids=[file_id])
@ -368,5 +724,21 @@ def test_openads_upload_user_files(app, atreal_openads):
assert isinstance(FFup, ForwardFile)
for k in ['numero_dossier', 'type_fichier', 'file_hash', 'orig_filename', 'content_type']:
assert getattr(FFup, k) == getattr(FF, k)
assert FFup.upload_status == 'success'
assert FFup.upload_attempt == 2
assert FFup.upload_status == 'failed'
assert FFup.upload_msg == u'No JSON content returned: %r' % fake_resp._content
fake_resp_json = "You want add some files on %s " % FAKE_NUMERO_DOSSIER
fake_resp._content = json.dumps(fake_resp_json)
with mock.patch('passerelle.utils.Request.post') as requests_post:
requests_post.return_value = fake_resp
atreal_openads.upload_user_files('DIA', FAKE_NUMERO_DOSSIER, file_ids=[file_id])
FFup = ForwardFile.objects.get(id=file_id)
assert isinstance(FFup, ForwardFile)
for k in ['numero_dossier', 'type_fichier', 'file_hash', 'orig_filename', 'content_type']:
assert getattr(FFup, k) == getattr(FF, k)
assert FFup.upload_attempt == 3
assert FFup.upload_status == 'success'
assert FFup.upload_msg == 'uploaded successfuly'