100% test coverage, dict strings are truncated when dumped, error messages homogeneized, and more
Features: * replaced the loging of json payload with the use of a DictDumper that lazy truncate strings too long * homogenize error messages Fixes: * changed the signature of 'check_file_dict()' to match other functions and fix a bug * changed the signature of 'upload2ForwardFile()' to force input for 'type_fichier' argument Refactoring: * removed dead code in 'upload_user_files()' * using 'basestring' instead of testing for 'str' and 'unicode' * renamed 'requerant' by 'demandeur' * removed useless and maybe problematic 'join()' (forgotten from the previous commit) * added more comments Tests: * 100% test coverage! * fixed failing tests * added tests for every function * completed missing tests (mostly for exceptions) * added more comments * added the lines of missing statement in the coverage report
This commit is contained in:
parent
571316fcf4
commit
98cd089254
|
@ -1,2 +1,5 @@
|
|||
[run]
|
||||
omit = */south_migrations/*
|
||||
|
||||
[report]
|
||||
show_missing = True
|
||||
|
|
|
@ -96,7 +96,7 @@ def get_file_data(path, b64=True):
|
|||
return f.read()
|
||||
|
||||
|
||||
def get_upload_path(instance, filename):
|
||||
def get_upload_path(instance, filename=None):
|
||||
"""Return a relative upload path for a file."""
|
||||
# be careful:
|
||||
# * openADS accept only filename less than 50 chars
|
||||
|
@ -107,70 +107,44 @@ def get_upload_path(instance, filename):
|
|||
)
|
||||
|
||||
|
||||
def dict_deref_multi(data, keys):
|
||||
"""Access a dict through a list of keys.
|
||||
Inspired by: https://stackoverflow.com/a/47969823
|
||||
def trunc_str_values(value, limit, visited=None, truncate_text=u'…'):
|
||||
"""Truncate a string value (not dict keys) and append a truncate text."""
|
||||
|
||||
if visited is None:
|
||||
visited = []
|
||||
if not value in visited:
|
||||
if isinstance(value, basestring) and len(value) > limit:
|
||||
value = value[:limit] + truncate_text
|
||||
elif isinstance(value, dict) or isinstance(value, list) or isinstance(value, tuple):
|
||||
visited.append(value)
|
||||
iterator = value.iteritems() if isinstance(value, dict) else enumerate(value)
|
||||
for k,v in iterator:
|
||||
value[k] = trunc_str_values(v, limit, visited, truncate_text)
|
||||
return value
|
||||
|
||||
|
||||
class DictDumper(object):
|
||||
"""Helper to dump a dictionary to a string representation with lazy processing.
|
||||
|
||||
Only applied when dict is converted to string (lazy processing):
|
||||
- long strings truncated (after the dict has been 'deep' copied)
|
||||
- (optionaly) dict converted with json.dumps instead of unicode().
|
||||
"""
|
||||
if keys:
|
||||
key = int(keys[0]) if isinstance(data, list) else keys[0]
|
||||
return dict_deref_multi(data[key], keys[1:]) if keys else data
|
||||
|
||||
|
||||
# TODO implement wildcard filtering
|
||||
def dict_replace_content_by_paths(dic, paths_to_replace=[], path_separator='/', replace_by='<file content>'):
|
||||
"""Replace a dict content, with a string, from a list of paths."""
|
||||
newdic = dic
|
||||
if newdic and (isinstance(newdic, dict) or isinstance(newdic, list)):
|
||||
for p in paths_to_replace:
|
||||
if p:
|
||||
if p.startswith(path_separator):
|
||||
p = p[1:]
|
||||
items = p.split(path_separator)
|
||||
if items:
|
||||
try:
|
||||
value = dict_deref_multi(newdic, items)
|
||||
except KeyError:
|
||||
continue
|
||||
except TypeError:
|
||||
continue
|
||||
if not isinstance(value, str) and not isinstance(value, unicode):
|
||||
raise ValueError(u"path '%s' is not a string ([%s] %s)" % (p, type(value), value))
|
||||
# newdic is still untouched
|
||||
if newdic is dic:
|
||||
# create a copy of it
|
||||
newdic = copy.deepcopy(dic)
|
||||
last_item = items.pop(-1)
|
||||
value = dict_deref_multi(newdic, items)
|
||||
value[last_item] = replace_by
|
||||
return newdic
|
||||
|
||||
|
||||
class LogJsonPayloadWithFileContent(object):
|
||||
""" Return a string representation of a JSON payload with its file content emptyed."""
|
||||
|
||||
def __init__(self, payload, paths_to_replace=[], path_separator='/', replace_by='<data>'):
|
||||
def __init__(self, dic, max_str_len=255, use_json_dumps=True):
|
||||
""" arguments:
|
||||
- payload string the JSON payload to dump
|
||||
- paths_to_replace array a list of strings either:
|
||||
* in 'path' format (i.e.: ['/fields/file/content',...])
|
||||
* in 'namespace' format (i.e.: ['fields.file.content',...])
|
||||
- path_separator string the path separator used in 'paths_to_replace' format:
|
||||
* '/' for 'path' format
|
||||
* '.' for 'namespace' format
|
||||
- replace_by string the new value of the content at the specified path
|
||||
- dic string the dict to dump
|
||||
- max_str_len integer the maximul length of string values
|
||||
- use_json_dumps boolean True to use json.dumps() else it uses unicode()
|
||||
"""
|
||||
self.payload = payload
|
||||
self.paths_to_replace = paths_to_replace
|
||||
self.path_separator = path_separator
|
||||
self.replace_by = replace_by
|
||||
self.dic = dic
|
||||
self.max_str_len = max_str_len
|
||||
self.use_json_dumps = use_json_dumps
|
||||
|
||||
def __str__(self):
|
||||
return json.dumps(dict_replace_content_by_paths(
|
||||
self.payload,
|
||||
paths_to_replace = self.paths_to_replace,
|
||||
path_separator = self.path_separator,
|
||||
replace_by = self.replace_by
|
||||
))
|
||||
dict_trunc = trunc_str_values(copy.deepcopy(self.dic), self.max_str_len)
|
||||
dict_ref = json.dumps(dict_trunc) if self.use_json_dumps else dict_trunc
|
||||
return unicode(dict_ref)
|
||||
|
||||
|
||||
class ForwardFile(models.Model):
|
||||
|
@ -206,18 +180,10 @@ class AtrealOpenads(BaseResource, HTTPResource):
|
|||
verbose_name = _('openADS')
|
||||
|
||||
|
||||
def log_json_payload(self, payload, title='payload', paths_to_replace=[]):
|
||||
def log_json_payload(self, payload, title='payload', max_str_len=100):
|
||||
"""Log a json paylod surrounded by dashes and with file content filtered."""
|
||||
self.logger.debug(u"----- %s (begining) -----", title)
|
||||
if paths_to_replace:
|
||||
self.logger.debug(u"%s", LogJsonPayloadWithFileContent(
|
||||
payload,
|
||||
paths_to_replace = paths_to_replace,
|
||||
path_separator = u'.',
|
||||
replace_by = u'<b64 content>'
|
||||
))
|
||||
else:
|
||||
self.logger.debug(u"%s", LogJsonPayloadWithFileContent(payload))
|
||||
self.logger.debug(u"%s", DictDumper(payload, max_str_len))
|
||||
self.logger.debug(u"----- %s (end) -----", title)
|
||||
|
||||
|
||||
|
@ -230,7 +196,7 @@ class AtrealOpenads(BaseResource, HTTPResource):
|
|||
raise APIError(u"Expecting '%s' key in JSON %s" %
|
||||
('files', title))
|
||||
|
||||
files = payload.get('files')
|
||||
files = payload['files']
|
||||
|
||||
if not isinstance(files, list):
|
||||
self.log_json_payload(payload, title)
|
||||
|
@ -244,15 +210,13 @@ class AtrealOpenads(BaseResource, HTTPResource):
|
|||
('files', title))
|
||||
|
||||
# log the response
|
||||
self.log_json_payload(
|
||||
payload, title
|
||||
,['files.%i.b64_content' % i for i in range(0, len(files))])
|
||||
self.log_json_payload(payload, title)
|
||||
|
||||
# return the files
|
||||
return files
|
||||
|
||||
|
||||
def check_file_dict(self, dict_file, b64=True, title='payload'):
|
||||
def check_file_dict(self, dict_file, title='payload', b64=True):
|
||||
"""Ensure a file dict has all its required items."""
|
||||
|
||||
# key to get the content
|
||||
|
@ -264,24 +228,20 @@ class AtrealOpenads(BaseResource, HTTPResource):
|
|||
|
||||
# check content existence
|
||||
if content_key not in dict_file:
|
||||
raise APIError(u"Expecting 'files.0.%s' key in JSON %s" % (content_key, title))
|
||||
raise APIError(u"Expecting 'file.%s' key in JSON %s" % (content_key, title))
|
||||
|
||||
# get its content
|
||||
file_content = dict_file[content_key]
|
||||
|
||||
if not isinstance(file_content, str) and not isinstance(file_content, unicode):
|
||||
if not isinstance(file_content, basestring):
|
||||
raise APIError(
|
||||
u"Expecting '%s' value in JSON %s to be a %s (not a %s)" %
|
||||
u"Expecting '%s' value in JSON %s in file dict to be a %s (not a %s)" %
|
||||
('file.%s' % content_key, title, 'string', type(file_content)))
|
||||
|
||||
# check filename
|
||||
if (
|
||||
'filename' in dict_file
|
||||
and not isinstance(dict_file['filename'], str)
|
||||
and not isinstance(dict_file['filename'], unicode)
|
||||
):
|
||||
if 'filename' in dict_file and not isinstance(dict_file['filename'], basestring):
|
||||
raise APIError(
|
||||
u"Expecting '%s' value in file dict to be a %s (not a %s)" %
|
||||
u"Expecting '%s' value in JSON %s in file dict to be a %s (not a %s)" %
|
||||
('file.filename', title, 'string', type(dict_file['filename'])))
|
||||
|
||||
|
||||
|
@ -296,7 +256,7 @@ class AtrealOpenads(BaseResource, HTTPResource):
|
|||
|
||||
# asked to check its content
|
||||
if ensure_content:
|
||||
self.check_file_dict(first, title, b64)
|
||||
self.check_file_dict(first, title=title, b64=b64)
|
||||
|
||||
# return the first file
|
||||
return first
|
||||
|
@ -346,23 +306,8 @@ class AtrealOpenads(BaseResource, HTTPResource):
|
|||
# loads the request body as JSON content
|
||||
json_data = json.loads(request.body)
|
||||
|
||||
# every field key that might contain a file content
|
||||
file_keys = ['cerfa'] + ['plan_cadastral_%s' % i for i in range(1,5)] + ['pouvoir_mandat']
|
||||
|
||||
# detect if there evolutions data to filter them out of logging
|
||||
# TODO replace when wildcard filtering is implement in 'dict_replace_content_by_paths()'
|
||||
evolution_to_filter = []
|
||||
if 'evolution' in json_data and isinstance(json_data['evolution'], list):
|
||||
for e, evol in enumerate(json_data['evolution']):
|
||||
if isinstance(evol, dict) and 'parts' in evol and isinstance(evol['parts'], list):
|
||||
for p, part in enumerate(evol['parts']):
|
||||
if isinstance(part, dict) and 'data' in part:
|
||||
evolution_to_filter.append('evolution.%s.parts.%s.data' % (e, p))
|
||||
|
||||
# log the request body (filtering the files content)
|
||||
self.log_json_payload(
|
||||
json_data, 'request',
|
||||
['fields.%s.content' % k for k in file_keys] + evolution_to_filter)
|
||||
self.log_json_payload(json_data, 'request')
|
||||
|
||||
# build the payload
|
||||
payload = { "collectivite": self.collectivite }
|
||||
|
@ -391,19 +336,19 @@ class AtrealOpenads(BaseResource, HTTPResource):
|
|||
"numero" : normalize(ref[2])
|
||||
})
|
||||
|
||||
# setup requerant variable prefix
|
||||
# setup demandeur variable prefix
|
||||
prefixes = {"demandeurs": ''}
|
||||
if normalize(json_data['fields']['proprietaire']) != 'Oui':
|
||||
prefixes["mandataires"] = 'mandataire_'
|
||||
|
||||
# for each type of requerant with associated prefix
|
||||
# for each type of demandeur with associated prefix
|
||||
for key,prefix in prefixes.items():
|
||||
|
||||
# "qualité" of the requerant
|
||||
# "qualité" of the demandeur
|
||||
qualite = normalize(json_data['fields']['%squalite' % prefix])
|
||||
|
||||
# get the requerant informations
|
||||
requerant = {
|
||||
# get the demandeur informations
|
||||
demandeur = {
|
||||
"type_personne": 'particulier' if qualite == 'Un particulier' else 'personne_morale',
|
||||
"typologie" : 'petitionnaire' if key == 'demandeurs' else 'delegataire',
|
||||
"nom" : normalize(json_data['fields']['%snom' % prefix]),
|
||||
|
@ -416,24 +361,27 @@ class AtrealOpenads(BaseResource, HTTPResource):
|
|||
}
|
||||
}
|
||||
|
||||
# add fields if the requerant is not an individual
|
||||
# add fields if the demandeur is not an individual
|
||||
if qualite != 'Un particulier':
|
||||
requerant["raison_sociale"] = normalize(json_data['fields']['%sraison_sociale' % prefix])
|
||||
requerant["denomination"] = normalize(json_data['fields']['%sdenomination' % prefix])
|
||||
self.logger.debug("%s %s => '%s', '%s'", requerant['prenom'], requerant['nom'], requerant['raison_sociale'], requerant['denomination'])
|
||||
demandeur["raison_sociale"] = normalize(json_data['fields']['%sraison_sociale' % prefix])
|
||||
demandeur["denomination"] = normalize(json_data['fields']['%sdenomination' % prefix])
|
||||
self.logger.debug("%s %s => '%s', '%s'", demandeur['prenom'], demandeur['nom'], demandeur['raison_sociale'], demandeur['denomination'])
|
||||
|
||||
# add optional lieu_dit field
|
||||
if '%slieu_dit' % prefix in json_data['fields'] and json_data['fields']['%slieu_dit' % prefix]:
|
||||
requerant["adresse"]["lieu_dit"] = normalize(json_data['fields']['%slieu_dit' % prefix])
|
||||
demandeur["adresse"]["lieu_dit"] = normalize(json_data['fields']['%slieu_dit' % prefix])
|
||||
|
||||
# add it to the payload
|
||||
payload[key] = [requerant]
|
||||
payload[key] = [demandeur]
|
||||
|
||||
self.logger.debug(u"Added '%s' to payload: %s %s", key, requerant['prenom'], requerant['nom'])
|
||||
self.logger.debug(u"Added '%s' to payload: %s %s", key, demandeur['prenom'], demandeur['nom'])
|
||||
|
||||
# log the payload
|
||||
self.log_json_payload(payload)
|
||||
|
||||
# every field key that might contain a file content
|
||||
file_keys = ['cerfa'] + ['plan_cadastral_%s' % i for i in range(1,5)] + ['pouvoir_mandat']
|
||||
|
||||
# prepare files that will be forwarded
|
||||
files = []
|
||||
for k in file_keys:
|
||||
|
@ -443,18 +391,37 @@ class AtrealOpenads(BaseResource, HTTPResource):
|
|||
and isinstance(json_data['fields'][k], dict)
|
||||
and 'content' in json_data['fields'][k]
|
||||
):
|
||||
# get the content decoded from base 64
|
||||
content = base64.b64decode(json_data['fields'][k]['content'])
|
||||
|
||||
# guess the mime type based on the begining of the content
|
||||
content_type = magic.from_buffer(content, mime=True)
|
||||
|
||||
# set it as an upload
|
||||
upload_file = ContentFile(content)
|
||||
|
||||
# build a hash from the upload
|
||||
file_hash = self.file_digest(upload_file)
|
||||
|
||||
# build a filename (less than 50 chars)
|
||||
filename = file_hash[45:] + '.pdf'
|
||||
|
||||
# get the content type if specified
|
||||
if 'content_type' in json_data['fields'][k]:
|
||||
content_type = json_data['fields'][k]['content_type']
|
||||
|
||||
# check the content type is PDF for file of type CERFA
|
||||
if k == 'cerfa' and content_type != 'application/pdf':
|
||||
self.logger.warning("CERFA content type is '%s' instead of '%s'", content_type, 'application/pdf')
|
||||
|
||||
# get the filename if specified
|
||||
if 'filename' in json_data['fields'][k]:
|
||||
filename = json_data['fields'][k]['filename']
|
||||
|
||||
# set the type fichier based on the key (less than 10 chars)
|
||||
type_fichier = re.sub(r'_.*$', '', k)[:10]
|
||||
|
||||
# append the file to the list
|
||||
files.append({
|
||||
'type_fichier' : type_fichier,
|
||||
'orig_filename': filename,
|
||||
|
@ -517,7 +484,7 @@ class AtrealOpenads(BaseResource, HTTPResource):
|
|||
|
||||
numero_dossier = result.get('numero_dossier')
|
||||
|
||||
if not isinstance(numero_dossier, str) and not isinstance(numero_dossier, unicode):
|
||||
if not isinstance(numero_dossier, basestring):
|
||||
raise APIError(
|
||||
u"Expecting '%s' value in JSON response to be a %s (not a %s)" %
|
||||
('numero_dossier', 'string', type(numero_dossier)))
|
||||
|
@ -608,7 +575,7 @@ class AtrealOpenads(BaseResource, HTTPResource):
|
|||
return response.json()
|
||||
|
||||
|
||||
def upload2ForwardFile(self, path, numero_dossier, type_fichier='CERFA'):
|
||||
def upload2ForwardFile(self, path, numero_dossier, type_fichier):
|
||||
"""Convert a file path to a ForwardFile."""
|
||||
if path:
|
||||
rand_id = base64.urlsafe_b64encode(os.urandom(6))
|
||||
|
@ -763,9 +730,7 @@ class AtrealOpenads(BaseResource, HTTPResource):
|
|||
raise APIError(u'No JSON content returned: %r' % response.content[:1000])
|
||||
|
||||
# log the response (filtering the file content)
|
||||
self.log_json_payload(
|
||||
result, 'response',
|
||||
['files.%i.b64_content' % i for i in range(0, len(result['files']))])
|
||||
self.log_json_payload(result, 'response')
|
||||
|
||||
# get the courrier
|
||||
courrier = self.get_first_file_from_json_payload(result, title='response')
|
||||
|
@ -774,7 +739,7 @@ class AtrealOpenads(BaseResource, HTTPResource):
|
|||
try:
|
||||
courrier_content = base64.b64decode(courrier['b64_content'])
|
||||
except TypeError:
|
||||
raise APIError('Invalid content for courrier')
|
||||
raise APIError('Failed to decode courrier content from base 64')
|
||||
|
||||
# return the 'courrier' file
|
||||
return {'courrier': courrier}
|
||||
|
@ -807,11 +772,11 @@ class AtrealOpenads(BaseResource, HTTPResource):
|
|||
|
||||
# TODO ask for openADS.API to *always* send JSON formatted errors, not HTML ones
|
||||
# return a string representing the HTTP error (filtering the HTML tags and multispaces)
|
||||
return u"HTTP error: %s, %s" % \
|
||||
(response.status_code,
|
||||
clean_spaces(strip_tags(response.content[:1000])) if response.content else '')
|
||||
detail = clean_spaces(strip_tags(response.content[:1000])) if response.content else ''
|
||||
return u"HTTP error: %s%s" % (response.status_code, ', ' + detail if detail else '')
|
||||
|
||||
|
||||
# @raise ForwareFile.DoesNotExist if not found
|
||||
def upload_user_files(self, type_dossier, numero_dossier, file_ids):
|
||||
"""A Job to forward user uploaded files to openADS."""
|
||||
|
||||
|
@ -849,19 +814,13 @@ class AtrealOpenads(BaseResource, HTTPResource):
|
|||
# append the forwarded file to the list
|
||||
fwd_files.append(fwd_file)
|
||||
|
||||
# not found the forwarded file specified: bug
|
||||
else:
|
||||
self.logger.warning(u"upload_user_files() failed to find ForwardFile file_id: %s", fid)
|
||||
|
||||
# if files need to be forwarded
|
||||
if payload:
|
||||
|
||||
self.logger.debug("upload_user_files() payload is not empty")
|
||||
|
||||
# log the payload
|
||||
self.log_json_payload(
|
||||
payload, 'payload',
|
||||
['%i.b64_content' % i for i in range(0, len(fwd_files))])
|
||||
self.log_json_payload(payload, 'payload')
|
||||
|
||||
# make the request to openADS.API (with a specific timeout)
|
||||
url = urlparse.urljoin(self.openADS_API_url, '/dossier/%s/%s/files' % (type_dossier, numero_dossier))
|
||||
|
@ -908,7 +867,7 @@ class AtrealOpenads(BaseResource, HTTPResource):
|
|||
self.logger.warning(
|
||||
u"upload_user_files() openADS response is not JSON valid for dossier '%s' and files '%s'",
|
||||
numero_dossier,
|
||||
','.join([f.id for f in fwd_files])
|
||||
fwd_files
|
||||
)
|
||||
|
||||
# response correctly loaded as JSON
|
||||
|
@ -940,7 +899,7 @@ class AtrealOpenads(BaseResource, HTTPResource):
|
|||
self.logger.warning(
|
||||
u"upload_user_files() payload is empty for dossier '%s' and files '%s'",
|
||||
numero_dossier,
|
||||
','.join(file_ids)
|
||||
file_ids
|
||||
)
|
||||
|
||||
|
||||
|
|
|
@ -22,7 +22,17 @@ from django.core.files import File
|
|||
from passerelle.utils.jsonresponse import APIError
|
||||
from passerelle.base.models import Job
|
||||
|
||||
from atreal_openads.models import AtrealOpenads, ForwardFile
|
||||
from atreal_openads.models import (
|
||||
strip_tags,
|
||||
clean_spaces,
|
||||
normalize,
|
||||
get_file_data,
|
||||
get_upload_path,
|
||||
trunc_str_values,
|
||||
DictDumper,
|
||||
AtrealOpenads,
|
||||
ForwardFile
|
||||
)
|
||||
|
||||
|
||||
CONNECTOR_NAME = 'atreal-openads'
|
||||
|
@ -41,14 +51,6 @@ TEST_FILE_CERFA_DIA = os.path.join(RESOURCES_DIR, 'cerfa_10072-02.pdf')
|
|||
TEST_FILE_PLAN_CADASTRAL = os.path.join(RESOURCES_DIR, 'plancadastral.pdf')
|
||||
|
||||
|
||||
def get_file_data(path, b64=True):
|
||||
"""Return the content of a file as a string, in base64 if specified."""
|
||||
with open(path, 'r') as f:
|
||||
if b64:
|
||||
return base64.b64encode(f.read())
|
||||
return f.read()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def atreal_openads(db):
|
||||
return AtrealOpenads.objects.create(
|
||||
|
@ -60,6 +62,194 @@ def atreal_openads(db):
|
|||
)
|
||||
|
||||
|
||||
def test_strip_tags():
|
||||
s = 'aaa b cc '
|
||||
assert strip_tags(s) == s
|
||||
|
||||
ss = s + '<em>dd'
|
||||
assert strip_tags(ss) == s + 'dd'
|
||||
|
||||
ss = s + '<em>dd</em>'
|
||||
assert strip_tags(ss) == s + 'dd'
|
||||
|
||||
ss = s + '<em>dd</em>'
|
||||
assert strip_tags(ss) == s + 'dd'
|
||||
|
||||
ss = s + ' 1 < 3'
|
||||
assert strip_tags(ss) == s + ' 1 < 3'
|
||||
|
||||
|
||||
def test_clean_spaces():
|
||||
s = 'aaa b cc '
|
||||
assert clean_spaces(s) == 'aaa b cc'
|
||||
|
||||
s = 'a\ta b\nb c\rc d\\n\\r\\td'
|
||||
assert clean_spaces(s) == 'a a b b c c d d'
|
||||
|
||||
|
||||
def test_normalize():
|
||||
assert normalize(None) == ''
|
||||
|
||||
s = 'aaa b cc '
|
||||
assert normalize(s) == 'aaa b cc'
|
||||
|
||||
s = 'a\ta b\nb c\rc d\\n\\r\\td'
|
||||
assert normalize(s) == 'a a b b c c d d'
|
||||
|
||||
|
||||
def test_get_file_data():
|
||||
assert get_file_data(TEST_FILE_CERFA_DIA) == base64.b64encode(open(TEST_FILE_CERFA_DIA).read())
|
||||
assert get_file_data(TEST_FILE_CERFA_DIA, b64=False) == open(TEST_FILE_CERFA_DIA).read()
|
||||
|
||||
|
||||
def test_get_upload_path(app, atreal_openads):
|
||||
FF = atreal_openads.upload2ForwardFile(TEST_FILE_CERFA_DIA, FAKE_NUMERO_DOSSIER, 'cerfa')
|
||||
assert re.search(
|
||||
r"^pass_openADS_up_%s_%s$" %
|
||||
('[0-9]{4}-[A-Z][a-z]{2}-[0-9]{2}_[0-9]{2}h[0-9]{2}m[0-9]{2}s[0-9]+', 'cc90'),
|
||||
get_upload_path(FF))
|
||||
|
||||
|
||||
def test_trunc_str_values():
|
||||
d = {}
|
||||
assert trunc_str_values(d, 10) == d
|
||||
d = {'a': '123456789'}
|
||||
assert trunc_str_values(d, 0) == {'a': u'…'}
|
||||
d = {'a': '123456789'}
|
||||
assert trunc_str_values(d, 1) == {'a': u'1…'}
|
||||
d = {'a': '123456789'}
|
||||
assert trunc_str_values(d, 2) == {'a': u'12…'}
|
||||
d = {'a': '123456789'}
|
||||
assert trunc_str_values(d, 5) == {'a': u'12345…'}
|
||||
d = {'a': '123456789'}
|
||||
assert trunc_str_values(d, 8) == {'a': u'12345678…'}
|
||||
d = {'a': '123456789'}
|
||||
assert trunc_str_values(d, 9) == {'a': u'123456789'}
|
||||
d = {'a': '123456789'}
|
||||
assert trunc_str_values(d, 10) == d
|
||||
|
||||
d = {'a': '123456789', 'b123456789': '987654321'}
|
||||
assert trunc_str_values(d, 5) == {'a': u'12345…', 'b123456789': u'98765…'}
|
||||
|
||||
d = {'a': '123456789', 'b123456789': '987654321', 'c': {'c1':'ABCDEFGHIJK'}}
|
||||
assert trunc_str_values(d, 5) == {'a': u'12345…', 'b123456789': u'98765…', 'c': {'c1': u'ABCDE…'}}
|
||||
|
||||
d = {'a': '123456789', 'b123456789': '987654321', 'c': {'c1':'ABCDEFGHIJK'}, 'd': ['123456789']}
|
||||
assert trunc_str_values(d, 5) == {'a': u'12345…', 'b123456789': u'98765…', 'c': {'c1': u'ABCDE…'}, 'd': [u'12345…']}
|
||||
|
||||
d = {'a': '123456789', 'b123456789': '987654321', 'c': {'c1':'ABCDEFGHIJK'}, 'd': ['123456789', {'eeeeeeeeee':'132456789'}]}
|
||||
assert trunc_str_values(d, 5) == {'a': u'12345…', 'b123456789': u'98765…', 'c': {'c1': u'ABCDE…'}, 'd': [u'12345…', {'eeeeeeeeee': u'13245…'}]}
|
||||
|
||||
|
||||
def test_dict_dumper():
|
||||
d = {}
|
||||
dd = DictDumper(d, use_json_dumps=False)
|
||||
assert d == dd.dic
|
||||
assert unicode(d) == unicode(dd)
|
||||
dd = DictDumper(d, 0, use_json_dumps=False)
|
||||
assert d == dd.dic
|
||||
assert unicode(d) == unicode(dd)
|
||||
|
||||
d = {'a': '123456789'}
|
||||
dd = DictDumper(d, 10, use_json_dumps=False)
|
||||
assert d == dd.dic
|
||||
assert unicode(d) == unicode(dd)
|
||||
dd = DictDumper(d, 5, use_json_dumps=False)
|
||||
assert d == dd.dic
|
||||
assert unicode(dd) == unicode({'a': u'12345…'})
|
||||
dd = DictDumper(d, 5, use_json_dumps=True)
|
||||
assert d == dd.dic
|
||||
assert unicode(dd) == u'{"a": "12345\\u2026"}'
|
||||
|
||||
|
||||
def test_openads_log_json_payload(app, atreal_openads):
|
||||
# TODO implement
|
||||
assert True
|
||||
# change the debug file path
|
||||
# check that the function log to it
|
||||
# check that what was is logged is correct
|
||||
|
||||
|
||||
def test_openads_get_files_from_json_payload(app, atreal_openads):
|
||||
title = 'payload'
|
||||
|
||||
assert atreal_openads.get_files_from_json_payload({'files':[{'a':'file'}]}) == [{'a':'file'}]
|
||||
|
||||
with pytest.raises(APIError) as e:
|
||||
atreal_openads.get_files_from_json_payload({})
|
||||
assert unicode(e.value) == u"Expecting '%s' key in JSON %s" % ('files', title)
|
||||
|
||||
with pytest.raises(APIError) as e:
|
||||
atreal_openads.get_files_from_json_payload({'files': 'invalid'})
|
||||
assert unicode(e.value) == u"Expecting '%s' value in JSON %s to be a %s (not a %s)" % (
|
||||
'files', title, 'list', type(''))
|
||||
|
||||
with pytest.raises(APIError) as e:
|
||||
atreal_openads.get_files_from_json_payload({'files': {'i':'invalid'}})
|
||||
assert unicode(e.value) == u"Expecting '%s' value in JSON %s to be a %s (not a %s)" % (
|
||||
'files', title, 'list', type({}))
|
||||
|
||||
with pytest.raises(APIError) as e:
|
||||
atreal_openads.get_files_from_json_payload({'files': []})
|
||||
assert unicode(e.value) == u"Expecting non-empty '%s' value in JSON %s" % ('files', title)
|
||||
|
||||
|
||||
def test_check_file_dict(app, atreal_openads):
|
||||
title = 'payload'
|
||||
|
||||
d = {
|
||||
'content': get_file_data(TEST_FILE_CERFA_DIA, b64=False),
|
||||
'filename': os.path.basename(TEST_FILE_CERFA_DIA),
|
||||
'content_type': 'application/pdf'
|
||||
}
|
||||
|
||||
d64 = {
|
||||
'b64_content': get_file_data(TEST_FILE_CERFA_DIA, b64=True),
|
||||
'filename': os.path.basename(TEST_FILE_CERFA_DIA),
|
||||
'content_type': 'application/pdf'
|
||||
}
|
||||
|
||||
assert atreal_openads.check_file_dict(d, b64=False) is None
|
||||
assert atreal_openads.check_file_dict(d64) is None
|
||||
|
||||
d['filename'] = {'a','filename'}
|
||||
with pytest.raises(APIError) as e:
|
||||
atreal_openads.check_file_dict(d, b64=False)
|
||||
assert unicode(e.value) == u"Expecting '%s' value in JSON %s in file dict to be a %s (not a %s)" % (
|
||||
'file.filename', title, 'string', type(d['filename']))
|
||||
|
||||
d['content'] = {'a','filename'}
|
||||
with pytest.raises(APIError) as e:
|
||||
atreal_openads.check_file_dict(d, b64=False)
|
||||
assert unicode(e.value) == u"Expecting '%s' value in JSON %s in file dict to be a %s (not a %s)" % (
|
||||
'file.content', title, 'string', type(d['content']))
|
||||
|
||||
del(d['content'])
|
||||
with pytest.raises(APIError) as e:
|
||||
atreal_openads.check_file_dict(d, b64=False)
|
||||
assert unicode(e.value) == u"Expecting 'file.%s' key in JSON %s" % ('content', title)
|
||||
|
||||
del(d64['b64_content'])
|
||||
with pytest.raises(APIError) as e:
|
||||
atreal_openads.check_file_dict(d, b64=True)
|
||||
assert unicode(e.value) == u"Expecting 'file.%s' key in JSON %s" % ('b64_content', title)
|
||||
|
||||
|
||||
def test_get_first_file_from_json_payload(app, atreal_openads):
|
||||
title = 'payload'
|
||||
|
||||
d = {
|
||||
'files': [{
|
||||
'content': get_file_data(TEST_FILE_CERFA_DIA, b64=False),
|
||||
'filename': os.path.basename(TEST_FILE_CERFA_DIA),
|
||||
'content_type': 'application/pdf'
|
||||
}]
|
||||
}
|
||||
|
||||
assert atreal_openads.get_first_file_from_json_payload(
|
||||
d, title, ensure_content=True, b64=False) == d['files'][0]
|
||||
|
||||
|
||||
def test_openads_check_status(app, atreal_openads):
|
||||
fake_resp_json = {
|
||||
'message': 'Service online'
|
||||
|
@ -74,47 +264,74 @@ def test_openads_check_status(app, atreal_openads):
|
|||
def test_openads_create_dossier(app, atreal_openads):
|
||||
fake_req_json = {
|
||||
"fields": {
|
||||
"autres_parcelles": True,
|
||||
"cerfa": {
|
||||
"content": get_file_data(TEST_FILE_CERFA_DIA),
|
||||
"content_type": "application/pdf",
|
||||
"field_id": "50",
|
||||
"filename": os.path.basename(TEST_FILE_CERFA_DIA)
|
||||
},
|
||||
"code_postal": "13004",
|
||||
"localite": "Marseille",
|
||||
"nom": "Loulou",
|
||||
"nom_voie": "Avenue de la Blaque",
|
||||
"numero_voie": "52",
|
||||
"plan_cadastral_1": {
|
||||
"content": get_file_data(TEST_FILE_PLAN_CADASTRAL),
|
||||
"content_type": "application/pdf",
|
||||
"field_id": "113",
|
||||
"filename": os.path.basename(TEST_FILE_PLAN_CADASTRAL)
|
||||
},
|
||||
|
||||
# proprietaire
|
||||
"proprietaire" : "Non",
|
||||
"proprietaire_raw": "Non",
|
||||
|
||||
# mandataire
|
||||
"mandataire_prenom" : "John",
|
||||
"mandataire_nom" : "Man",
|
||||
|
||||
"mandataire_qualite" : "Une personne morale",
|
||||
"mandataire_qualite_raw" : "Une personne morale",
|
||||
|
||||
"mandataire_denomination" : "SELARL",
|
||||
"mandataire_raison_sociale" : "Super Juriste",
|
||||
|
||||
"mandataire_numero_voie": "808",
|
||||
"mandataire_nom_voie" : "Avenue de l'argent",
|
||||
"mandataire_lieu_dit" : "geoisbour",
|
||||
"mandataire_code_postal": "13004",
|
||||
"mandataire_localite" : "Marseille",
|
||||
|
||||
# petitionnaire
|
||||
"prenom": "Toto",
|
||||
"proprietaire": "Oui",
|
||||
"proprietaire_qualite": "Un particulier",
|
||||
"proprietaire_qualite_raw": "Un particulier",
|
||||
"proprietaire_raw": "Oui",
|
||||
"reference_cadastrale": [
|
||||
[
|
||||
"999",
|
||||
"Z",
|
||||
"0010"
|
||||
]
|
||||
],
|
||||
"references_cadastrales": [
|
||||
[
|
||||
"123",
|
||||
"D",
|
||||
"9874"
|
||||
]
|
||||
],
|
||||
"nom" : "Loulou",
|
||||
|
||||
"qualite" : "Un particulier",
|
||||
"qualite_raw": "Un particulier",
|
||||
|
||||
"numero_voie": "52",
|
||||
"nom_voie" : "Avenue de la Blaque",
|
||||
"lieu_dit" : "tierquar",
|
||||
"code_postal": "13004",
|
||||
"localite" : "Marseille",
|
||||
|
||||
# terrain
|
||||
"terrain_numero_voie": "23",
|
||||
"terrain_nom_voie" : "Boulevard de la République",
|
||||
"terrain_lieu_dit" : "Leecorne",
|
||||
"terrain_code_postal": "13002",
|
||||
"terrain_localite": "Marseille",
|
||||
"terrain_nom_voie": "Boulevard de la Linule",
|
||||
"terrain_numero_voie": "23"
|
||||
"terrain_localite" : "Marseille",
|
||||
|
||||
# références cadastrales
|
||||
"reference_cadastrale" : [ ["999", "Z", "0010"] ],
|
||||
"autres_parcelles" : True,
|
||||
"references_cadastrales": [ ["123", "D", "9874"] ],
|
||||
|
||||
# user attached files
|
||||
"cerfa": {
|
||||
"content" : get_file_data(TEST_FILE_CERFA_DIA),
|
||||
"content_type": "invalid/content type",
|
||||
"field_id" : "50",
|
||||
"filename" : os.path.basename(TEST_FILE_CERFA_DIA)
|
||||
},
|
||||
"plan_cadastral_1": {
|
||||
"content" : get_file_data(TEST_FILE_PLAN_CADASTRAL),
|
||||
"content_type": "application/pdf",
|
||||
"filename" : os.path.basename(TEST_FILE_PLAN_CADASTRAL)
|
||||
},
|
||||
"plan_cadastral_2": {
|
||||
"content" : get_file_data(TEST_FILE_PLAN_CADASTRAL),
|
||||
"content_type": "application/pdf",
|
||||
"filename" : os.path.basename(TEST_FILE_PLAN_CADASTRAL)
|
||||
},
|
||||
"pouvoir_mandat": {
|
||||
"content" : get_file_data(TEST_FILE_CERFA_DIA),
|
||||
"content_type": "application/pdf",
|
||||
"filename" : 'mandat.pdf'
|
||||
}
|
||||
}
|
||||
}
|
||||
req = HttpRequest()
|
||||
|
@ -130,6 +347,16 @@ def test_openads_create_dossier(app, atreal_openads):
|
|||
req.META = {}
|
||||
req._read_started = False
|
||||
|
||||
fake_resp_bad = Response()
|
||||
fake_resp_bad.status_code = 502
|
||||
fake_resp_bad.reason = 'Bad gateway'
|
||||
|
||||
with pytest.raises(APIError) as e:
|
||||
with mock.patch('passerelle.utils.Request.post') as requests_post:
|
||||
requests_post.return_value = fake_resp_bad
|
||||
atreal_openads.create_dossier(req, 'DIA')
|
||||
assert unicode(e.value) == "HTTP error: 502"
|
||||
|
||||
fake_resp_json = {
|
||||
'numero_dossier' : FAKE_NUMERO_DOSSIER,
|
||||
'files': [{
|
||||
|
@ -153,6 +380,38 @@ def test_openads_create_dossier(app, atreal_openads):
|
|||
assert jresp['recepisse']['content_type'] == 'application/pdf'
|
||||
assert jresp['recepisse']['filename'] == fake_resp_json['files'][0]['filename']
|
||||
|
||||
fake_resp_json['numero_dossier'] = {'a':'invalid type'}
|
||||
fake_resp._content = json.dumps(fake_resp_json)
|
||||
with pytest.raises(APIError) as e:
|
||||
with mock.patch('passerelle.utils.Request.post') as requests_post:
|
||||
requests_post.return_value = fake_resp
|
||||
atreal_openads.create_dossier(req, 'DIA')
|
||||
assert unicode(e.value) == u"Expecting '%s' value in JSON response to be a %s (not a %s)" % (
|
||||
'numero_dossier', 'string', type({}))
|
||||
|
||||
del(fake_resp_json['numero_dossier'])
|
||||
fake_resp._content = json.dumps(fake_resp_json)
|
||||
with pytest.raises(APIError) as e:
|
||||
with mock.patch('passerelle.utils.Request.post') as requests_post:
|
||||
requests_post.return_value = fake_resp
|
||||
atreal_openads.create_dossier(req, 'DIA')
|
||||
assert unicode(e.value) == u"Expecting 'numero_dossier' key in JSON response"
|
||||
|
||||
fake_resp_json['files'][0]['b64_content'] = 'invalid_;{[content}'
|
||||
fake_resp._content = json.dumps(fake_resp_json)
|
||||
with pytest.raises(APIError) as e:
|
||||
with mock.patch('passerelle.utils.Request.post') as requests_post:
|
||||
requests_post.return_value = fake_resp
|
||||
atreal_openads.create_dossier(req, 'DIA')
|
||||
assert unicode(e.value) == u'Failed to decode recepisse content from base 64'
|
||||
|
||||
fake_resp._content = 'df[{gfd;g#vfd'
|
||||
with pytest.raises(APIError) as e:
|
||||
with mock.patch('passerelle.utils.Request.post') as requests_post:
|
||||
requests_post.return_value = fake_resp
|
||||
atreal_openads.create_dossier(req, 'DIA')
|
||||
assert unicode(e.value) == u'No JSON content returned: %r' % fake_resp._content
|
||||
|
||||
job = Job.objects.filter(natural_id=FAKE_NUMERO_DOSSIER).last()
|
||||
assert job
|
||||
job_id = job.id
|
||||
|
@ -162,7 +421,7 @@ def test_openads_create_dossier(app, atreal_openads):
|
|||
assert job.parameters is not None
|
||||
assert len(job.parameters) == 3
|
||||
assert 'file_ids' in job.parameters
|
||||
assert len(job.parameters['file_ids']) == 2
|
||||
assert len(job.parameters['file_ids']) == 4
|
||||
file_ids = job.parameters['file_ids']
|
||||
|
||||
FFs = ForwardFile.objects.filter(id__in=file_ids)
|
||||
|
@ -192,6 +451,16 @@ def test_openads_create_dossier(app, atreal_openads):
|
|||
|
||||
|
||||
def test_openads_get_dossier(app, atreal_openads):
|
||||
fake_resp_bad = Response()
|
||||
fake_resp_bad.status_code = 502
|
||||
fake_resp_bad.reason = 'Bad gateway'
|
||||
|
||||
with pytest.raises(APIError) as e:
|
||||
with mock.patch('passerelle.utils.Request.get') as requests_get:
|
||||
requests_get.return_value = fake_resp_bad
|
||||
atreal_openads.get_dossier(None, 'DIA', FAKE_NUMERO_DOSSIER)
|
||||
assert unicode(e.value) == "HTTP error: 502"
|
||||
|
||||
fake_resp_json = {
|
||||
'etat' : u"Non préemption en cours",
|
||||
'date_depot' : "24/04/2019",
|
||||
|
@ -214,6 +483,13 @@ def test_openads_get_dossier(app, atreal_openads):
|
|||
assert jresp['decision'] == fake_resp_json['decision']
|
||||
assert jresp['date_limite_instruction'] == fake_resp_json['date_limite_instruction']
|
||||
|
||||
fake_resp._content = 'df[{gfd;g#vfd'
|
||||
with pytest.raises(APIError) as e:
|
||||
with mock.patch('passerelle.utils.Request.get') as requests_get:
|
||||
requests_get.return_value = fake_resp
|
||||
atreal_openads.get_dossier(None, 'DIA', FAKE_NUMERO_DOSSIER)
|
||||
assert unicode(e.value) == u'No JSON content returned: %r' % fake_resp._content
|
||||
|
||||
fake_resp_json = {
|
||||
'errors' : [{
|
||||
'location' : 'path',
|
||||
|
@ -221,28 +497,25 @@ def test_openads_get_dossier(app, atreal_openads):
|
|||
'description' : '"invalid_type" is not one of DIA, PC, DP, AT, PD'
|
||||
}]
|
||||
}
|
||||
fake_resp = Response()
|
||||
fake_resp.status_code = 404
|
||||
fake_resp.headers = {'Content-Type': 'application/json'}
|
||||
fake_resp.encoding = 'utf-8'
|
||||
fake_resp.reason = 'Resource not found'
|
||||
fake_resp._content = json.dumps(fake_resp_json)
|
||||
with pytest.raises(APIError) as e:
|
||||
with mock.patch('passerelle.utils.Request.get') as requests_get:
|
||||
requests_get.return_value = fake_resp
|
||||
jresp = atreal_openads.get_dossier(None, 'invalid_type', FAKE_NUMERO_DOSSIER)
|
||||
assert re.search(r'^HTTP error: 404, \[path\] \(Invalid Type\) "invalid_type" is not one of DIA, PC, DP, AT, PD$', str(e.value))
|
||||
atreal_openads.get_dossier(None, 'invalid_type', FAKE_NUMERO_DOSSIER)
|
||||
assert unicode(e.value) == u"HTTP error: 404, [path] (Invalid Type) \"invalid_type\" is not one of DIA, PC, DP, AT, PD"
|
||||
|
||||
|
||||
def test_openads_upload2ForwardFile(app, atreal_openads):
|
||||
FF = atreal_openads.upload2ForwardFile(None, None)
|
||||
FF = atreal_openads.upload2ForwardFile(None, None, None)
|
||||
assert FF is None
|
||||
|
||||
FF = atreal_openads.upload2ForwardFile(TEST_FILE_CERFA_DIA, FAKE_NUMERO_DOSSIER)
|
||||
FF = atreal_openads.upload2ForwardFile(TEST_FILE_CERFA_DIA, FAKE_NUMERO_DOSSIER, 'cerfa')
|
||||
assert isinstance(FF, ForwardFile)
|
||||
assert len(FF.numero_demande) > 0
|
||||
assert FF.numero_dossier == FAKE_NUMERO_DOSSIER
|
||||
assert FF.type_fichier == 'CERFA'
|
||||
assert FF.type_fichier == 'cerfa'
|
||||
assert FF.orig_filename == os.path.basename(TEST_FILE_CERFA_DIA)
|
||||
assert FF.content_type == 'application/pdf'
|
||||
assert len(FF.file_hash) > 0
|
||||
|
@ -262,15 +535,19 @@ def test_openads_upload2ForwardFile(app, atreal_openads):
|
|||
|
||||
|
||||
def test_openads_get_fwd_files(app, atreal_openads):
|
||||
with pytest.raises(APIError) as e:
|
||||
atreal_openads.get_fwd_files(None, FAKE_NUMERO_DOSSIER, fichier_id='not an integer')
|
||||
assert unicode(e.value) == u"fichier_id must be an integer"
|
||||
|
||||
with pytest.raises(Http404) as e:
|
||||
atreal_openads.get_fwd_files(None, FAKE_NUMERO_DOSSIER, fichier_id=18)
|
||||
assert re.search(r"^No file matches 'numero_dossier=[^']+' and 'id=[^']+'.$", str(e.value))
|
||||
assert unicode(e.value) == u"No file matches 'numero_dossier=%s' and 'id=%s'." % (FAKE_NUMERO_DOSSIER, 18)
|
||||
|
||||
resp404 = atreal_openads.get_fwd_files(None, FAKE_NUMERO_DOSSIER, fichier_id=None)
|
||||
assert resp404 is not None
|
||||
assert len(resp404) == 0
|
||||
resp_empty = atreal_openads.get_fwd_files(None, FAKE_NUMERO_DOSSIER, fichier_id=None)
|
||||
assert resp_empty is not None
|
||||
assert len(resp_empty) == 0
|
||||
|
||||
FF = atreal_openads.upload2ForwardFile(TEST_FILE_CERFA_DIA, FAKE_NUMERO_DOSSIER)
|
||||
FF = atreal_openads.upload2ForwardFile(TEST_FILE_CERFA_DIA, FAKE_NUMERO_DOSSIER, 'cerfa')
|
||||
FF.save()
|
||||
assert isinstance(FF, ForwardFile)
|
||||
|
||||
|
@ -298,7 +575,7 @@ def test_openads_get_fwd_files_status(app, atreal_openads):
|
|||
atreal_openads.get_fwd_files_status(None, FAKE_NUMERO_DOSSIER, fichier_id=18)
|
||||
assert re.search(r"^No file matches 'numero_dossier=[^']+' and 'id=[^']+'.$", str(e.value))
|
||||
|
||||
FF = atreal_openads.upload2ForwardFile(TEST_FILE_CERFA_DIA, FAKE_NUMERO_DOSSIER)
|
||||
FF = atreal_openads.upload2ForwardFile(TEST_FILE_CERFA_DIA, FAKE_NUMERO_DOSSIER, 'cerfa')
|
||||
FF.save()
|
||||
assert isinstance(FF, ForwardFile)
|
||||
|
||||
|
@ -324,6 +601,16 @@ def test_openads_get_fwd_files_status(app, atreal_openads):
|
|||
|
||||
|
||||
def test_openads_get_courrier(app, atreal_openads):
|
||||
fake_resp_bad = Response()
|
||||
fake_resp_bad.status_code = 502
|
||||
fake_resp_bad.reason = 'Bad gateway'
|
||||
|
||||
with pytest.raises(APIError) as e:
|
||||
with mock.patch('passerelle.utils.Request.get') as requests_get:
|
||||
requests_get.return_value = fake_resp_bad
|
||||
atreal_openads.get_courrier(None, 'DIA', FAKE_NUMERO_DOSSIER)
|
||||
assert unicode(e.value) == "HTTP error: 502"
|
||||
|
||||
fake_resp_json = {
|
||||
'files': [{
|
||||
'filename' : "instruction_4.pdf",
|
||||
|
@ -344,22 +631,91 @@ def test_openads_get_courrier(app, atreal_openads):
|
|||
assert jresp['courrier']['content_type'] == fake_resp_json['files'][0]['content_type']
|
||||
assert jresp['courrier']['b64_content'] == fake_resp_json['files'][0]['b64_content']
|
||||
|
||||
fake_resp_json['files'][0]['b64_content'] = 'invalid_;{[content}'
|
||||
fake_resp._content = json.dumps(fake_resp_json)
|
||||
with pytest.raises(APIError) as e:
|
||||
with mock.patch('passerelle.utils.Request.get') as requests_get:
|
||||
requests_get.return_value = fake_resp
|
||||
atreal_openads.get_courrier(None, 'DIA', FAKE_NUMERO_DOSSIER)
|
||||
assert unicode(e.value) == u'Failed to decode courrier content from base 64'
|
||||
|
||||
fake_resp._content = 'df[{gfd;g#vfd'
|
||||
with pytest.raises(APIError) as e:
|
||||
with mock.patch('passerelle.utils.Request.get') as requests_get:
|
||||
requests_get.return_value = fake_resp
|
||||
atreal_openads.get_courrier(None, 'DIA', FAKE_NUMERO_DOSSIER)
|
||||
assert unicode(e.value) == u'No JSON content returned: %r' % fake_resp._content
|
||||
|
||||
|
||||
def test_get_response_error(app, atreal_openads):
|
||||
fake_resp_json = {
|
||||
'errors': [
|
||||
{
|
||||
'location' : 'entity.name',
|
||||
'name' : 'constraint',
|
||||
'description': 'Must start with an uppercase letter'
|
||||
}
|
||||
]
|
||||
}
|
||||
fake_resp = Response()
|
||||
fake_resp.status_code = 404
|
||||
fake_resp.headers = {'Content-Type': 'application/json'}
|
||||
fake_resp.encoding = 'utf-8'
|
||||
fake_resp.reason = 'Not Found'
|
||||
fake_resp._content = json.dumps(fake_resp_json)
|
||||
|
||||
error_msg = atreal_openads.get_response_error(fake_resp)
|
||||
expected_msg = u'[%s] (%s) %s' % (
|
||||
fake_resp_json['errors'][0]['location'],
|
||||
fake_resp_json['errors'][0]['name'],
|
||||
fake_resp_json['errors'][0]['description']
|
||||
)
|
||||
assert error_msg == u"HTTP error: %s, %s" % (fake_resp.status_code, ','.join([expected_msg]))
|
||||
|
||||
fake_resp._content = 'invalid_;{[content}'
|
||||
error_msg = atreal_openads.get_response_error(fake_resp)
|
||||
assert error_msg == u"HTTP error: %s, %s" % (fake_resp.status_code, fake_resp._content)
|
||||
|
||||
|
||||
def test_openads_upload_user_files(app, atreal_openads):
|
||||
FF = atreal_openads.upload2ForwardFile(TEST_FILE_CERFA_DIA, FAKE_NUMERO_DOSSIER)
|
||||
|
||||
# TODO check logs (because this doesn't do anything but log)
|
||||
atreal_openads.upload_user_files('DIA', FAKE_NUMERO_DOSSIER, file_ids=[])
|
||||
|
||||
with pytest.raises(ForwardFile.DoesNotExist) as e:
|
||||
atreal_openads.upload_user_files('DIA', FAKE_NUMERO_DOSSIER, file_ids=[999])
|
||||
assert unicode(e.value) == u"ForwardFile matching query does not exist."
|
||||
|
||||
FF = atreal_openads.upload2ForwardFile(TEST_FILE_CERFA_DIA, FAKE_NUMERO_DOSSIER, 'cerfa')
|
||||
FF.save()
|
||||
assert isinstance(FF, ForwardFile)
|
||||
assert FF.upload_status == 'pending'
|
||||
file_id = FF.id
|
||||
assert file_id
|
||||
|
||||
fake_resp_json = "You want add some files on %s " % FAKE_NUMERO_DOSSIER
|
||||
fake_resp_bad = Response()
|
||||
fake_resp_bad.status_code = 502
|
||||
fake_resp_bad.reason = 'Bad gateway'
|
||||
|
||||
with mock.patch('passerelle.utils.Request.post') as requests_post:
|
||||
requests_post.return_value = fake_resp_bad
|
||||
atreal_openads.upload_user_files('DIA', FAKE_NUMERO_DOSSIER, file_ids=[file_id])
|
||||
|
||||
FFup = ForwardFile.objects.get(id=file_id)
|
||||
assert isinstance(FFup, ForwardFile)
|
||||
for k in ['numero_dossier', 'type_fichier', 'file_hash', 'orig_filename', 'content_type']:
|
||||
assert getattr(FFup, k) == getattr(FF, k)
|
||||
assert FFup.upload_attempt == 1
|
||||
assert FFup.upload_status == 'failed'
|
||||
assert FFup.upload_msg == "HTTP error: 502"
|
||||
|
||||
fake_resp = Response()
|
||||
fake_resp.status_code = 200
|
||||
fake_resp.headers = {'Content-Type': 'application/json'}
|
||||
fake_resp.encoding = 'utf-8'
|
||||
fake_resp.reason = 'OK'
|
||||
fake_resp._content = json.dumps(fake_resp_json)
|
||||
|
||||
fake_resp._content = 'invalid_;{[content}'
|
||||
with mock.patch('passerelle.utils.Request.post') as requests_post:
|
||||
requests_post.return_value = fake_resp
|
||||
atreal_openads.upload_user_files('DIA', FAKE_NUMERO_DOSSIER, file_ids=[file_id])
|
||||
|
@ -368,5 +724,21 @@ def test_openads_upload_user_files(app, atreal_openads):
|
|||
assert isinstance(FFup, ForwardFile)
|
||||
for k in ['numero_dossier', 'type_fichier', 'file_hash', 'orig_filename', 'content_type']:
|
||||
assert getattr(FFup, k) == getattr(FF, k)
|
||||
assert FFup.upload_status == 'success'
|
||||
assert FFup.upload_attempt == 2
|
||||
assert FFup.upload_status == 'failed'
|
||||
assert FFup.upload_msg == u'No JSON content returned: %r' % fake_resp._content
|
||||
|
||||
fake_resp_json = "You want add some files on %s " % FAKE_NUMERO_DOSSIER
|
||||
fake_resp._content = json.dumps(fake_resp_json)
|
||||
with mock.patch('passerelle.utils.Request.post') as requests_post:
|
||||
requests_post.return_value = fake_resp
|
||||
atreal_openads.upload_user_files('DIA', FAKE_NUMERO_DOSSIER, file_ids=[file_id])
|
||||
|
||||
FFup = ForwardFile.objects.get(id=file_id)
|
||||
assert isinstance(FFup, ForwardFile)
|
||||
for k in ['numero_dossier', 'type_fichier', 'file_hash', 'orig_filename', 'content_type']:
|
||||
assert getattr(FFup, k) == getattr(FF, k)
|
||||
assert FFup.upload_attempt == 3
|
||||
assert FFup.upload_status == 'success'
|
||||
assert FFup.upload_msg == 'uploaded successfuly'
|
||||
|
||||
|
|
Reference in New Issue