100% test coverage, dict strings are truncated when dumped, error messages homogeneized, and more

Features:
    * replaced the loging of json payload with the use of a DictDumper that lazy truncate strings too long
    * homogenize error messages

Fixes:
    * changed the signature of 'check_file_dict()' to match other functions and fix a bug
    * changed the signature of 'upload2ForwardFile()' to force input for 'type_fichier' argument

Refactoring:
    * removed dead code in 'upload_user_files()'
    * using 'basestring' instead of testing for 'str' and 'unicode'
    * renamed 'requerant' by 'demandeur'
    * removed useless and maybe problematic 'join()' (forgotten from the previous commit)
    * added more comments

Tests:
    * 100% test coverage!
    * fixed failing tests
    * added tests for every function
    * completed missing tests (mostly for exceptions)
    * added more comments
    * added the lines of missing statement in the coverage report
This commit is contained in:
Michael Bideau 2019-07-25 14:29:06 +02:00
parent 571316fcf4
commit 98cd089254
3 changed files with 531 additions and 197 deletions

View File

@ -1,2 +1,5 @@
[run] [run]
omit = */south_migrations/* omit = */south_migrations/*
[report]
show_missing = True

View File

@ -96,7 +96,7 @@ def get_file_data(path, b64=True):
return f.read() return f.read()
def get_upload_path(instance, filename): def get_upload_path(instance, filename=None):
"""Return a relative upload path for a file.""" """Return a relative upload path for a file."""
# be careful: # be careful:
# * openADS accept only filename less than 50 chars # * openADS accept only filename less than 50 chars
@ -107,70 +107,44 @@ def get_upload_path(instance, filename):
) )
def dict_deref_multi(data, keys): def trunc_str_values(value, limit, visited=None, truncate_text=u''):
"""Access a dict through a list of keys. """Truncate a string value (not dict keys) and append a truncate text."""
Inspired by: https://stackoverflow.com/a/47969823
if visited is None:
visited = []
if not value in visited:
if isinstance(value, basestring) and len(value) > limit:
value = value[:limit] + truncate_text
elif isinstance(value, dict) or isinstance(value, list) or isinstance(value, tuple):
visited.append(value)
iterator = value.iteritems() if isinstance(value, dict) else enumerate(value)
for k,v in iterator:
value[k] = trunc_str_values(v, limit, visited, truncate_text)
return value
class DictDumper(object):
"""Helper to dump a dictionary to a string representation with lazy processing.
Only applied when dict is converted to string (lazy processing):
- long strings truncated (after the dict has been 'deep' copied)
- (optionaly) dict converted with json.dumps instead of unicode().
""" """
if keys:
key = int(keys[0]) if isinstance(data, list) else keys[0]
return dict_deref_multi(data[key], keys[1:]) if keys else data
def __init__(self, dic, max_str_len=255, use_json_dumps=True):
# TODO implement wildcard filtering
def dict_replace_content_by_paths(dic, paths_to_replace=[], path_separator='/', replace_by='<file content>'):
"""Replace a dict content, with a string, from a list of paths."""
newdic = dic
if newdic and (isinstance(newdic, dict) or isinstance(newdic, list)):
for p in paths_to_replace:
if p:
if p.startswith(path_separator):
p = p[1:]
items = p.split(path_separator)
if items:
try:
value = dict_deref_multi(newdic, items)
except KeyError:
continue
except TypeError:
continue
if not isinstance(value, str) and not isinstance(value, unicode):
raise ValueError(u"path '%s' is not a string ([%s] %s)" % (p, type(value), value))
# newdic is still untouched
if newdic is dic:
# create a copy of it
newdic = copy.deepcopy(dic)
last_item = items.pop(-1)
value = dict_deref_multi(newdic, items)
value[last_item] = replace_by
return newdic
class LogJsonPayloadWithFileContent(object):
""" Return a string representation of a JSON payload with its file content emptyed."""
def __init__(self, payload, paths_to_replace=[], path_separator='/', replace_by='<data>'):
""" arguments: """ arguments:
- payload string the JSON payload to dump - dic string the dict to dump
- paths_to_replace array a list of strings either: - max_str_len integer the maximul length of string values
* in 'path' format (i.e.: ['/fields/file/content',...]) - use_json_dumps boolean True to use json.dumps() else it uses unicode()
* in 'namespace' format (i.e.: ['fields.file.content',...])
- path_separator string the path separator used in 'paths_to_replace' format:
* '/' for 'path' format
* '.' for 'namespace' format
- replace_by string the new value of the content at the specified path
""" """
self.payload = payload self.dic = dic
self.paths_to_replace = paths_to_replace self.max_str_len = max_str_len
self.path_separator = path_separator self.use_json_dumps = use_json_dumps
self.replace_by = replace_by
def __str__(self): def __str__(self):
return json.dumps(dict_replace_content_by_paths( dict_trunc = trunc_str_values(copy.deepcopy(self.dic), self.max_str_len)
self.payload, dict_ref = json.dumps(dict_trunc) if self.use_json_dumps else dict_trunc
paths_to_replace = self.paths_to_replace, return unicode(dict_ref)
path_separator = self.path_separator,
replace_by = self.replace_by
))
class ForwardFile(models.Model): class ForwardFile(models.Model):
@ -206,18 +180,10 @@ class AtrealOpenads(BaseResource, HTTPResource):
verbose_name = _('openADS') verbose_name = _('openADS')
def log_json_payload(self, payload, title='payload', paths_to_replace=[]): def log_json_payload(self, payload, title='payload', max_str_len=100):
"""Log a json paylod surrounded by dashes and with file content filtered.""" """Log a json paylod surrounded by dashes and with file content filtered."""
self.logger.debug(u"----- %s (begining) -----", title) self.logger.debug(u"----- %s (begining) -----", title)
if paths_to_replace: self.logger.debug(u"%s", DictDumper(payload, max_str_len))
self.logger.debug(u"%s", LogJsonPayloadWithFileContent(
payload,
paths_to_replace = paths_to_replace,
path_separator = u'.',
replace_by = u'<b64 content>'
))
else:
self.logger.debug(u"%s", LogJsonPayloadWithFileContent(payload))
self.logger.debug(u"----- %s (end) -----", title) self.logger.debug(u"----- %s (end) -----", title)
@ -230,7 +196,7 @@ class AtrealOpenads(BaseResource, HTTPResource):
raise APIError(u"Expecting '%s' key in JSON %s" % raise APIError(u"Expecting '%s' key in JSON %s" %
('files', title)) ('files', title))
files = payload.get('files') files = payload['files']
if not isinstance(files, list): if not isinstance(files, list):
self.log_json_payload(payload, title) self.log_json_payload(payload, title)
@ -244,15 +210,13 @@ class AtrealOpenads(BaseResource, HTTPResource):
('files', title)) ('files', title))
# log the response # log the response
self.log_json_payload( self.log_json_payload(payload, title)
payload, title
,['files.%i.b64_content' % i for i in range(0, len(files))])
# return the files # return the files
return files return files
def check_file_dict(self, dict_file, b64=True, title='payload'): def check_file_dict(self, dict_file, title='payload', b64=True):
"""Ensure a file dict has all its required items.""" """Ensure a file dict has all its required items."""
# key to get the content # key to get the content
@ -264,24 +228,20 @@ class AtrealOpenads(BaseResource, HTTPResource):
# check content existence # check content existence
if content_key not in dict_file: if content_key not in dict_file:
raise APIError(u"Expecting 'files.0.%s' key in JSON %s" % (content_key, title)) raise APIError(u"Expecting 'file.%s' key in JSON %s" % (content_key, title))
# get its content # get its content
file_content = dict_file[content_key] file_content = dict_file[content_key]
if not isinstance(file_content, str) and not isinstance(file_content, unicode): if not isinstance(file_content, basestring):
raise APIError( raise APIError(
u"Expecting '%s' value in JSON %s to be a %s (not a %s)" % u"Expecting '%s' value in JSON %s in file dict to be a %s (not a %s)" %
('file.%s' % content_key, title, 'string', type(file_content))) ('file.%s' % content_key, title, 'string', type(file_content)))
# check filename # check filename
if ( if 'filename' in dict_file and not isinstance(dict_file['filename'], basestring):
'filename' in dict_file
and not isinstance(dict_file['filename'], str)
and not isinstance(dict_file['filename'], unicode)
):
raise APIError( raise APIError(
u"Expecting '%s' value in file dict to be a %s (not a %s)" % u"Expecting '%s' value in JSON %s in file dict to be a %s (not a %s)" %
('file.filename', title, 'string', type(dict_file['filename']))) ('file.filename', title, 'string', type(dict_file['filename'])))
@ -296,7 +256,7 @@ class AtrealOpenads(BaseResource, HTTPResource):
# asked to check its content # asked to check its content
if ensure_content: if ensure_content:
self.check_file_dict(first, title, b64) self.check_file_dict(first, title=title, b64=b64)
# return the first file # return the first file
return first return first
@ -346,23 +306,8 @@ class AtrealOpenads(BaseResource, HTTPResource):
# loads the request body as JSON content # loads the request body as JSON content
json_data = json.loads(request.body) json_data = json.loads(request.body)
# every field key that might contain a file content
file_keys = ['cerfa'] + ['plan_cadastral_%s' % i for i in range(1,5)] + ['pouvoir_mandat']
# detect if there evolutions data to filter them out of logging
# TODO replace when wildcard filtering is implement in 'dict_replace_content_by_paths()'
evolution_to_filter = []
if 'evolution' in json_data and isinstance(json_data['evolution'], list):
for e, evol in enumerate(json_data['evolution']):
if isinstance(evol, dict) and 'parts' in evol and isinstance(evol['parts'], list):
for p, part in enumerate(evol['parts']):
if isinstance(part, dict) and 'data' in part:
evolution_to_filter.append('evolution.%s.parts.%s.data' % (e, p))
# log the request body (filtering the files content) # log the request body (filtering the files content)
self.log_json_payload( self.log_json_payload(json_data, 'request')
json_data, 'request',
['fields.%s.content' % k for k in file_keys] + evolution_to_filter)
# build the payload # build the payload
payload = { "collectivite": self.collectivite } payload = { "collectivite": self.collectivite }
@ -391,19 +336,19 @@ class AtrealOpenads(BaseResource, HTTPResource):
"numero" : normalize(ref[2]) "numero" : normalize(ref[2])
}) })
# setup requerant variable prefix # setup demandeur variable prefix
prefixes = {"demandeurs": ''} prefixes = {"demandeurs": ''}
if normalize(json_data['fields']['proprietaire']) != 'Oui': if normalize(json_data['fields']['proprietaire']) != 'Oui':
prefixes["mandataires"] = 'mandataire_' prefixes["mandataires"] = 'mandataire_'
# for each type of requerant with associated prefix # for each type of demandeur with associated prefix
for key,prefix in prefixes.items(): for key,prefix in prefixes.items():
# "qualité" of the requerant # "qualité" of the demandeur
qualite = normalize(json_data['fields']['%squalite' % prefix]) qualite = normalize(json_data['fields']['%squalite' % prefix])
# get the requerant informations # get the demandeur informations
requerant = { demandeur = {
"type_personne": 'particulier' if qualite == 'Un particulier' else 'personne_morale', "type_personne": 'particulier' if qualite == 'Un particulier' else 'personne_morale',
"typologie" : 'petitionnaire' if key == 'demandeurs' else 'delegataire', "typologie" : 'petitionnaire' if key == 'demandeurs' else 'delegataire',
"nom" : normalize(json_data['fields']['%snom' % prefix]), "nom" : normalize(json_data['fields']['%snom' % prefix]),
@ -416,24 +361,27 @@ class AtrealOpenads(BaseResource, HTTPResource):
} }
} }
# add fields if the requerant is not an individual # add fields if the demandeur is not an individual
if qualite != 'Un particulier': if qualite != 'Un particulier':
requerant["raison_sociale"] = normalize(json_data['fields']['%sraison_sociale' % prefix]) demandeur["raison_sociale"] = normalize(json_data['fields']['%sraison_sociale' % prefix])
requerant["denomination"] = normalize(json_data['fields']['%sdenomination' % prefix]) demandeur["denomination"] = normalize(json_data['fields']['%sdenomination' % prefix])
self.logger.debug("%s %s => '%s', '%s'", requerant['prenom'], requerant['nom'], requerant['raison_sociale'], requerant['denomination']) self.logger.debug("%s %s => '%s', '%s'", demandeur['prenom'], demandeur['nom'], demandeur['raison_sociale'], demandeur['denomination'])
# add optional lieu_dit field # add optional lieu_dit field
if '%slieu_dit' % prefix in json_data['fields'] and json_data['fields']['%slieu_dit' % prefix]: if '%slieu_dit' % prefix in json_data['fields'] and json_data['fields']['%slieu_dit' % prefix]:
requerant["adresse"]["lieu_dit"] = normalize(json_data['fields']['%slieu_dit' % prefix]) demandeur["adresse"]["lieu_dit"] = normalize(json_data['fields']['%slieu_dit' % prefix])
# add it to the payload # add it to the payload
payload[key] = [requerant] payload[key] = [demandeur]
self.logger.debug(u"Added '%s' to payload: %s %s", key, requerant['prenom'], requerant['nom']) self.logger.debug(u"Added '%s' to payload: %s %s", key, demandeur['prenom'], demandeur['nom'])
# log the payload # log the payload
self.log_json_payload(payload) self.log_json_payload(payload)
# every field key that might contain a file content
file_keys = ['cerfa'] + ['plan_cadastral_%s' % i for i in range(1,5)] + ['pouvoir_mandat']
# prepare files that will be forwarded # prepare files that will be forwarded
files = [] files = []
for k in file_keys: for k in file_keys:
@ -443,18 +391,37 @@ class AtrealOpenads(BaseResource, HTTPResource):
and isinstance(json_data['fields'][k], dict) and isinstance(json_data['fields'][k], dict)
and 'content' in json_data['fields'][k] and 'content' in json_data['fields'][k]
): ):
# get the content decoded from base 64
content = base64.b64decode(json_data['fields'][k]['content']) content = base64.b64decode(json_data['fields'][k]['content'])
# guess the mime type based on the begining of the content
content_type = magic.from_buffer(content, mime=True) content_type = magic.from_buffer(content, mime=True)
# set it as an upload
upload_file = ContentFile(content) upload_file = ContentFile(content)
# build a hash from the upload
file_hash = self.file_digest(upload_file) file_hash = self.file_digest(upload_file)
# build a filename (less than 50 chars)
filename = file_hash[45:] + '.pdf' filename = file_hash[45:] + '.pdf'
# get the content type if specified
if 'content_type' in json_data['fields'][k]: if 'content_type' in json_data['fields'][k]:
content_type = json_data['fields'][k]['content_type'] content_type = json_data['fields'][k]['content_type']
# check the content type is PDF for file of type CERFA
if k == 'cerfa' and content_type != 'application/pdf': if k == 'cerfa' and content_type != 'application/pdf':
self.logger.warning("CERFA content type is '%s' instead of '%s'", content_type, 'application/pdf') self.logger.warning("CERFA content type is '%s' instead of '%s'", content_type, 'application/pdf')
# get the filename if specified
if 'filename' in json_data['fields'][k]: if 'filename' in json_data['fields'][k]:
filename = json_data['fields'][k]['filename'] filename = json_data['fields'][k]['filename']
# set the type fichier based on the key (less than 10 chars)
type_fichier = re.sub(r'_.*$', '', k)[:10] type_fichier = re.sub(r'_.*$', '', k)[:10]
# append the file to the list
files.append({ files.append({
'type_fichier' : type_fichier, 'type_fichier' : type_fichier,
'orig_filename': filename, 'orig_filename': filename,
@ -517,7 +484,7 @@ class AtrealOpenads(BaseResource, HTTPResource):
numero_dossier = result.get('numero_dossier') numero_dossier = result.get('numero_dossier')
if not isinstance(numero_dossier, str) and not isinstance(numero_dossier, unicode): if not isinstance(numero_dossier, basestring):
raise APIError( raise APIError(
u"Expecting '%s' value in JSON response to be a %s (not a %s)" % u"Expecting '%s' value in JSON response to be a %s (not a %s)" %
('numero_dossier', 'string', type(numero_dossier))) ('numero_dossier', 'string', type(numero_dossier)))
@ -608,7 +575,7 @@ class AtrealOpenads(BaseResource, HTTPResource):
return response.json() return response.json()
def upload2ForwardFile(self, path, numero_dossier, type_fichier='CERFA'): def upload2ForwardFile(self, path, numero_dossier, type_fichier):
"""Convert a file path to a ForwardFile.""" """Convert a file path to a ForwardFile."""
if path: if path:
rand_id = base64.urlsafe_b64encode(os.urandom(6)) rand_id = base64.urlsafe_b64encode(os.urandom(6))
@ -763,9 +730,7 @@ class AtrealOpenads(BaseResource, HTTPResource):
raise APIError(u'No JSON content returned: %r' % response.content[:1000]) raise APIError(u'No JSON content returned: %r' % response.content[:1000])
# log the response (filtering the file content) # log the response (filtering the file content)
self.log_json_payload( self.log_json_payload(result, 'response')
result, 'response',
['files.%i.b64_content' % i for i in range(0, len(result['files']))])
# get the courrier # get the courrier
courrier = self.get_first_file_from_json_payload(result, title='response') courrier = self.get_first_file_from_json_payload(result, title='response')
@ -774,7 +739,7 @@ class AtrealOpenads(BaseResource, HTTPResource):
try: try:
courrier_content = base64.b64decode(courrier['b64_content']) courrier_content = base64.b64decode(courrier['b64_content'])
except TypeError: except TypeError:
raise APIError('Invalid content for courrier') raise APIError('Failed to decode courrier content from base 64')
# return the 'courrier' file # return the 'courrier' file
return {'courrier': courrier} return {'courrier': courrier}
@ -807,11 +772,11 @@ class AtrealOpenads(BaseResource, HTTPResource):
# TODO ask for openADS.API to *always* send JSON formatted errors, not HTML ones # TODO ask for openADS.API to *always* send JSON formatted errors, not HTML ones
# return a string representing the HTTP error (filtering the HTML tags and multispaces) # return a string representing the HTTP error (filtering the HTML tags and multispaces)
return u"HTTP error: %s, %s" % \ detail = clean_spaces(strip_tags(response.content[:1000])) if response.content else ''
(response.status_code, return u"HTTP error: %s%s" % (response.status_code, ', ' + detail if detail else '')
clean_spaces(strip_tags(response.content[:1000])) if response.content else '')
# @raise ForwareFile.DoesNotExist if not found
def upload_user_files(self, type_dossier, numero_dossier, file_ids): def upload_user_files(self, type_dossier, numero_dossier, file_ids):
"""A Job to forward user uploaded files to openADS.""" """A Job to forward user uploaded files to openADS."""
@ -849,19 +814,13 @@ class AtrealOpenads(BaseResource, HTTPResource):
# append the forwarded file to the list # append the forwarded file to the list
fwd_files.append(fwd_file) fwd_files.append(fwd_file)
# not found the forwarded file specified: bug
else:
self.logger.warning(u"upload_user_files() failed to find ForwardFile file_id: %s", fid)
# if files need to be forwarded # if files need to be forwarded
if payload: if payload:
self.logger.debug("upload_user_files() payload is not empty") self.logger.debug("upload_user_files() payload is not empty")
# log the payload # log the payload
self.log_json_payload( self.log_json_payload(payload, 'payload')
payload, 'payload',
['%i.b64_content' % i for i in range(0, len(fwd_files))])
# make the request to openADS.API (with a specific timeout) # make the request to openADS.API (with a specific timeout)
url = urlparse.urljoin(self.openADS_API_url, '/dossier/%s/%s/files' % (type_dossier, numero_dossier)) url = urlparse.urljoin(self.openADS_API_url, '/dossier/%s/%s/files' % (type_dossier, numero_dossier))
@ -908,7 +867,7 @@ class AtrealOpenads(BaseResource, HTTPResource):
self.logger.warning( self.logger.warning(
u"upload_user_files() openADS response is not JSON valid for dossier '%s' and files '%s'", u"upload_user_files() openADS response is not JSON valid for dossier '%s' and files '%s'",
numero_dossier, numero_dossier,
','.join([f.id for f in fwd_files]) fwd_files
) )
# response correctly loaded as JSON # response correctly loaded as JSON
@ -940,7 +899,7 @@ class AtrealOpenads(BaseResource, HTTPResource):
self.logger.warning( self.logger.warning(
u"upload_user_files() payload is empty for dossier '%s' and files '%s'", u"upload_user_files() payload is empty for dossier '%s' and files '%s'",
numero_dossier, numero_dossier,
','.join(file_ids) file_ids
) )

View File

@ -22,7 +22,17 @@ from django.core.files import File
from passerelle.utils.jsonresponse import APIError from passerelle.utils.jsonresponse import APIError
from passerelle.base.models import Job from passerelle.base.models import Job
from atreal_openads.models import AtrealOpenads, ForwardFile from atreal_openads.models import (
strip_tags,
clean_spaces,
normalize,
get_file_data,
get_upload_path,
trunc_str_values,
DictDumper,
AtrealOpenads,
ForwardFile
)
CONNECTOR_NAME = 'atreal-openads' CONNECTOR_NAME = 'atreal-openads'
@ -41,14 +51,6 @@ TEST_FILE_CERFA_DIA = os.path.join(RESOURCES_DIR, 'cerfa_10072-02.pdf')
TEST_FILE_PLAN_CADASTRAL = os.path.join(RESOURCES_DIR, 'plancadastral.pdf') TEST_FILE_PLAN_CADASTRAL = os.path.join(RESOURCES_DIR, 'plancadastral.pdf')
def get_file_data(path, b64=True):
"""Return the content of a file as a string, in base64 if specified."""
with open(path, 'r') as f:
if b64:
return base64.b64encode(f.read())
return f.read()
@pytest.fixture @pytest.fixture
def atreal_openads(db): def atreal_openads(db):
return AtrealOpenads.objects.create( return AtrealOpenads.objects.create(
@ -60,6 +62,194 @@ def atreal_openads(db):
) )
def test_strip_tags():
s = 'aaa b cc '
assert strip_tags(s) == s
ss = s + '<em>dd'
assert strip_tags(ss) == s + 'dd'
ss = s + '<em>dd</em>'
assert strip_tags(ss) == s + 'dd'
ss = s + '<em>dd</em>'
assert strip_tags(ss) == s + 'dd'
ss = s + ' 1 < 3'
assert strip_tags(ss) == s + ' 1 < 3'
def test_clean_spaces():
s = 'aaa b cc '
assert clean_spaces(s) == 'aaa b cc'
s = 'a\ta b\nb c\rc d\\n\\r\\td'
assert clean_spaces(s) == 'a a b b c c d d'
def test_normalize():
assert normalize(None) == ''
s = 'aaa b cc '
assert normalize(s) == 'aaa b cc'
s = 'a\ta b\nb c\rc d\\n\\r\\td'
assert normalize(s) == 'a a b b c c d d'
def test_get_file_data():
assert get_file_data(TEST_FILE_CERFA_DIA) == base64.b64encode(open(TEST_FILE_CERFA_DIA).read())
assert get_file_data(TEST_FILE_CERFA_DIA, b64=False) == open(TEST_FILE_CERFA_DIA).read()
def test_get_upload_path(app, atreal_openads):
FF = atreal_openads.upload2ForwardFile(TEST_FILE_CERFA_DIA, FAKE_NUMERO_DOSSIER, 'cerfa')
assert re.search(
r"^pass_openADS_up_%s_%s$" %
('[0-9]{4}-[A-Z][a-z]{2}-[0-9]{2}_[0-9]{2}h[0-9]{2}m[0-9]{2}s[0-9]+', 'cc90'),
get_upload_path(FF))
def test_trunc_str_values():
d = {}
assert trunc_str_values(d, 10) == d
d = {'a': '123456789'}
assert trunc_str_values(d, 0) == {'a': u''}
d = {'a': '123456789'}
assert trunc_str_values(d, 1) == {'a': u'1…'}
d = {'a': '123456789'}
assert trunc_str_values(d, 2) == {'a': u'12…'}
d = {'a': '123456789'}
assert trunc_str_values(d, 5) == {'a': u'12345…'}
d = {'a': '123456789'}
assert trunc_str_values(d, 8) == {'a': u'12345678…'}
d = {'a': '123456789'}
assert trunc_str_values(d, 9) == {'a': u'123456789'}
d = {'a': '123456789'}
assert trunc_str_values(d, 10) == d
d = {'a': '123456789', 'b123456789': '987654321'}
assert trunc_str_values(d, 5) == {'a': u'12345…', 'b123456789': u'98765…'}
d = {'a': '123456789', 'b123456789': '987654321', 'c': {'c1':'ABCDEFGHIJK'}}
assert trunc_str_values(d, 5) == {'a': u'12345…', 'b123456789': u'98765…', 'c': {'c1': u'ABCDE…'}}
d = {'a': '123456789', 'b123456789': '987654321', 'c': {'c1':'ABCDEFGHIJK'}, 'd': ['123456789']}
assert trunc_str_values(d, 5) == {'a': u'12345…', 'b123456789': u'98765…', 'c': {'c1': u'ABCDE…'}, 'd': [u'12345…']}
d = {'a': '123456789', 'b123456789': '987654321', 'c': {'c1':'ABCDEFGHIJK'}, 'd': ['123456789', {'eeeeeeeeee':'132456789'}]}
assert trunc_str_values(d, 5) == {'a': u'12345…', 'b123456789': u'98765…', 'c': {'c1': u'ABCDE…'}, 'd': [u'12345…', {'eeeeeeeeee': u'13245…'}]}
def test_dict_dumper():
d = {}
dd = DictDumper(d, use_json_dumps=False)
assert d == dd.dic
assert unicode(d) == unicode(dd)
dd = DictDumper(d, 0, use_json_dumps=False)
assert d == dd.dic
assert unicode(d) == unicode(dd)
d = {'a': '123456789'}
dd = DictDumper(d, 10, use_json_dumps=False)
assert d == dd.dic
assert unicode(d) == unicode(dd)
dd = DictDumper(d, 5, use_json_dumps=False)
assert d == dd.dic
assert unicode(dd) == unicode({'a': u'12345…'})
dd = DictDumper(d, 5, use_json_dumps=True)
assert d == dd.dic
assert unicode(dd) == u'{"a": "12345\\u2026"}'
def test_openads_log_json_payload(app, atreal_openads):
# TODO implement
assert True
# change the debug file path
# check that the function log to it
# check that what was is logged is correct
def test_openads_get_files_from_json_payload(app, atreal_openads):
title = 'payload'
assert atreal_openads.get_files_from_json_payload({'files':[{'a':'file'}]}) == [{'a':'file'}]
with pytest.raises(APIError) as e:
atreal_openads.get_files_from_json_payload({})
assert unicode(e.value) == u"Expecting '%s' key in JSON %s" % ('files', title)
with pytest.raises(APIError) as e:
atreal_openads.get_files_from_json_payload({'files': 'invalid'})
assert unicode(e.value) == u"Expecting '%s' value in JSON %s to be a %s (not a %s)" % (
'files', title, 'list', type(''))
with pytest.raises(APIError) as e:
atreal_openads.get_files_from_json_payload({'files': {'i':'invalid'}})
assert unicode(e.value) == u"Expecting '%s' value in JSON %s to be a %s (not a %s)" % (
'files', title, 'list', type({}))
with pytest.raises(APIError) as e:
atreal_openads.get_files_from_json_payload({'files': []})
assert unicode(e.value) == u"Expecting non-empty '%s' value in JSON %s" % ('files', title)
def test_check_file_dict(app, atreal_openads):
title = 'payload'
d = {
'content': get_file_data(TEST_FILE_CERFA_DIA, b64=False),
'filename': os.path.basename(TEST_FILE_CERFA_DIA),
'content_type': 'application/pdf'
}
d64 = {
'b64_content': get_file_data(TEST_FILE_CERFA_DIA, b64=True),
'filename': os.path.basename(TEST_FILE_CERFA_DIA),
'content_type': 'application/pdf'
}
assert atreal_openads.check_file_dict(d, b64=False) is None
assert atreal_openads.check_file_dict(d64) is None
d['filename'] = {'a','filename'}
with pytest.raises(APIError) as e:
atreal_openads.check_file_dict(d, b64=False)
assert unicode(e.value) == u"Expecting '%s' value in JSON %s in file dict to be a %s (not a %s)" % (
'file.filename', title, 'string', type(d['filename']))
d['content'] = {'a','filename'}
with pytest.raises(APIError) as e:
atreal_openads.check_file_dict(d, b64=False)
assert unicode(e.value) == u"Expecting '%s' value in JSON %s in file dict to be a %s (not a %s)" % (
'file.content', title, 'string', type(d['content']))
del(d['content'])
with pytest.raises(APIError) as e:
atreal_openads.check_file_dict(d, b64=False)
assert unicode(e.value) == u"Expecting 'file.%s' key in JSON %s" % ('content', title)
del(d64['b64_content'])
with pytest.raises(APIError) as e:
atreal_openads.check_file_dict(d, b64=True)
assert unicode(e.value) == u"Expecting 'file.%s' key in JSON %s" % ('b64_content', title)
def test_get_first_file_from_json_payload(app, atreal_openads):
title = 'payload'
d = {
'files': [{
'content': get_file_data(TEST_FILE_CERFA_DIA, b64=False),
'filename': os.path.basename(TEST_FILE_CERFA_DIA),
'content_type': 'application/pdf'
}]
}
assert atreal_openads.get_first_file_from_json_payload(
d, title, ensure_content=True, b64=False) == d['files'][0]
def test_openads_check_status(app, atreal_openads): def test_openads_check_status(app, atreal_openads):
fake_resp_json = { fake_resp_json = {
'message': 'Service online' 'message': 'Service online'
@ -74,47 +264,74 @@ def test_openads_check_status(app, atreal_openads):
def test_openads_create_dossier(app, atreal_openads): def test_openads_create_dossier(app, atreal_openads):
fake_req_json = { fake_req_json = {
"fields": { "fields": {
"autres_parcelles": True,
"cerfa": { # proprietaire
"content": get_file_data(TEST_FILE_CERFA_DIA), "proprietaire" : "Non",
"content_type": "application/pdf", "proprietaire_raw": "Non",
"field_id": "50",
"filename": os.path.basename(TEST_FILE_CERFA_DIA) # mandataire
}, "mandataire_prenom" : "John",
"code_postal": "13004", "mandataire_nom" : "Man",
"localite": "Marseille",
"nom": "Loulou", "mandataire_qualite" : "Une personne morale",
"nom_voie": "Avenue de la Blaque", "mandataire_qualite_raw" : "Une personne morale",
"numero_voie": "52",
"plan_cadastral_1": { "mandataire_denomination" : "SELARL",
"content": get_file_data(TEST_FILE_PLAN_CADASTRAL), "mandataire_raison_sociale" : "Super Juriste",
"content_type": "application/pdf",
"field_id": "113", "mandataire_numero_voie": "808",
"filename": os.path.basename(TEST_FILE_PLAN_CADASTRAL) "mandataire_nom_voie" : "Avenue de l'argent",
}, "mandataire_lieu_dit" : "geoisbour",
"mandataire_code_postal": "13004",
"mandataire_localite" : "Marseille",
# petitionnaire
"prenom": "Toto", "prenom": "Toto",
"proprietaire": "Oui", "nom" : "Loulou",
"proprietaire_qualite": "Un particulier",
"proprietaire_qualite_raw": "Un particulier", "qualite" : "Un particulier",
"proprietaire_raw": "Oui", "qualite_raw": "Un particulier",
"reference_cadastrale": [
[ "numero_voie": "52",
"999", "nom_voie" : "Avenue de la Blaque",
"Z", "lieu_dit" : "tierquar",
"0010" "code_postal": "13004",
] "localite" : "Marseille",
],
"references_cadastrales": [ # terrain
[ "terrain_numero_voie": "23",
"123", "terrain_nom_voie" : "Boulevard de la République",
"D", "terrain_lieu_dit" : "Leecorne",
"9874"
]
],
"terrain_code_postal": "13002", "terrain_code_postal": "13002",
"terrain_localite": "Marseille", "terrain_localite" : "Marseille",
"terrain_nom_voie": "Boulevard de la Linule",
"terrain_numero_voie": "23" # références cadastrales
"reference_cadastrale" : [ ["999", "Z", "0010"] ],
"autres_parcelles" : True,
"references_cadastrales": [ ["123", "D", "9874"] ],
# user attached files
"cerfa": {
"content" : get_file_data(TEST_FILE_CERFA_DIA),
"content_type": "invalid/content type",
"field_id" : "50",
"filename" : os.path.basename(TEST_FILE_CERFA_DIA)
},
"plan_cadastral_1": {
"content" : get_file_data(TEST_FILE_PLAN_CADASTRAL),
"content_type": "application/pdf",
"filename" : os.path.basename(TEST_FILE_PLAN_CADASTRAL)
},
"plan_cadastral_2": {
"content" : get_file_data(TEST_FILE_PLAN_CADASTRAL),
"content_type": "application/pdf",
"filename" : os.path.basename(TEST_FILE_PLAN_CADASTRAL)
},
"pouvoir_mandat": {
"content" : get_file_data(TEST_FILE_CERFA_DIA),
"content_type": "application/pdf",
"filename" : 'mandat.pdf'
}
} }
} }
req = HttpRequest() req = HttpRequest()
@ -130,6 +347,16 @@ def test_openads_create_dossier(app, atreal_openads):
req.META = {} req.META = {}
req._read_started = False req._read_started = False
fake_resp_bad = Response()
fake_resp_bad.status_code = 502
fake_resp_bad.reason = 'Bad gateway'
with pytest.raises(APIError) as e:
with mock.patch('passerelle.utils.Request.post') as requests_post:
requests_post.return_value = fake_resp_bad
atreal_openads.create_dossier(req, 'DIA')
assert unicode(e.value) == "HTTP error: 502"
fake_resp_json = { fake_resp_json = {
'numero_dossier' : FAKE_NUMERO_DOSSIER, 'numero_dossier' : FAKE_NUMERO_DOSSIER,
'files': [{ 'files': [{
@ -153,6 +380,38 @@ def test_openads_create_dossier(app, atreal_openads):
assert jresp['recepisse']['content_type'] == 'application/pdf' assert jresp['recepisse']['content_type'] == 'application/pdf'
assert jresp['recepisse']['filename'] == fake_resp_json['files'][0]['filename'] assert jresp['recepisse']['filename'] == fake_resp_json['files'][0]['filename']
fake_resp_json['numero_dossier'] = {'a':'invalid type'}
fake_resp._content = json.dumps(fake_resp_json)
with pytest.raises(APIError) as e:
with mock.patch('passerelle.utils.Request.post') as requests_post:
requests_post.return_value = fake_resp
atreal_openads.create_dossier(req, 'DIA')
assert unicode(e.value) == u"Expecting '%s' value in JSON response to be a %s (not a %s)" % (
'numero_dossier', 'string', type({}))
del(fake_resp_json['numero_dossier'])
fake_resp._content = json.dumps(fake_resp_json)
with pytest.raises(APIError) as e:
with mock.patch('passerelle.utils.Request.post') as requests_post:
requests_post.return_value = fake_resp
atreal_openads.create_dossier(req, 'DIA')
assert unicode(e.value) == u"Expecting 'numero_dossier' key in JSON response"
fake_resp_json['files'][0]['b64_content'] = 'invalid_;{[content}'
fake_resp._content = json.dumps(fake_resp_json)
with pytest.raises(APIError) as e:
with mock.patch('passerelle.utils.Request.post') as requests_post:
requests_post.return_value = fake_resp
atreal_openads.create_dossier(req, 'DIA')
assert unicode(e.value) == u'Failed to decode recepisse content from base 64'
fake_resp._content = 'df[{gfd;g#vfd'
with pytest.raises(APIError) as e:
with mock.patch('passerelle.utils.Request.post') as requests_post:
requests_post.return_value = fake_resp
atreal_openads.create_dossier(req, 'DIA')
assert unicode(e.value) == u'No JSON content returned: %r' % fake_resp._content
job = Job.objects.filter(natural_id=FAKE_NUMERO_DOSSIER).last() job = Job.objects.filter(natural_id=FAKE_NUMERO_DOSSIER).last()
assert job assert job
job_id = job.id job_id = job.id
@ -162,7 +421,7 @@ def test_openads_create_dossier(app, atreal_openads):
assert job.parameters is not None assert job.parameters is not None
assert len(job.parameters) == 3 assert len(job.parameters) == 3
assert 'file_ids' in job.parameters assert 'file_ids' in job.parameters
assert len(job.parameters['file_ids']) == 2 assert len(job.parameters['file_ids']) == 4
file_ids = job.parameters['file_ids'] file_ids = job.parameters['file_ids']
FFs = ForwardFile.objects.filter(id__in=file_ids) FFs = ForwardFile.objects.filter(id__in=file_ids)
@ -192,6 +451,16 @@ def test_openads_create_dossier(app, atreal_openads):
def test_openads_get_dossier(app, atreal_openads): def test_openads_get_dossier(app, atreal_openads):
fake_resp_bad = Response()
fake_resp_bad.status_code = 502
fake_resp_bad.reason = 'Bad gateway'
with pytest.raises(APIError) as e:
with mock.patch('passerelle.utils.Request.get') as requests_get:
requests_get.return_value = fake_resp_bad
atreal_openads.get_dossier(None, 'DIA', FAKE_NUMERO_DOSSIER)
assert unicode(e.value) == "HTTP error: 502"
fake_resp_json = { fake_resp_json = {
'etat' : u"Non préemption en cours", 'etat' : u"Non préemption en cours",
'date_depot' : "24/04/2019", 'date_depot' : "24/04/2019",
@ -214,6 +483,13 @@ def test_openads_get_dossier(app, atreal_openads):
assert jresp['decision'] == fake_resp_json['decision'] assert jresp['decision'] == fake_resp_json['decision']
assert jresp['date_limite_instruction'] == fake_resp_json['date_limite_instruction'] assert jresp['date_limite_instruction'] == fake_resp_json['date_limite_instruction']
fake_resp._content = 'df[{gfd;g#vfd'
with pytest.raises(APIError) as e:
with mock.patch('passerelle.utils.Request.get') as requests_get:
requests_get.return_value = fake_resp
atreal_openads.get_dossier(None, 'DIA', FAKE_NUMERO_DOSSIER)
assert unicode(e.value) == u'No JSON content returned: %r' % fake_resp._content
fake_resp_json = { fake_resp_json = {
'errors' : [{ 'errors' : [{
'location' : 'path', 'location' : 'path',
@ -221,28 +497,25 @@ def test_openads_get_dossier(app, atreal_openads):
'description' : '"invalid_type" is not one of DIA, PC, DP, AT, PD' 'description' : '"invalid_type" is not one of DIA, PC, DP, AT, PD'
}] }]
} }
fake_resp = Response()
fake_resp.status_code = 404 fake_resp.status_code = 404
fake_resp.headers = {'Content-Type': 'application/json'}
fake_resp.encoding = 'utf-8'
fake_resp.reason = 'Resource not found' fake_resp.reason = 'Resource not found'
fake_resp._content = json.dumps(fake_resp_json) fake_resp._content = json.dumps(fake_resp_json)
with pytest.raises(APIError) as e: with pytest.raises(APIError) as e:
with mock.patch('passerelle.utils.Request.get') as requests_get: with mock.patch('passerelle.utils.Request.get') as requests_get:
requests_get.return_value = fake_resp requests_get.return_value = fake_resp
jresp = atreal_openads.get_dossier(None, 'invalid_type', FAKE_NUMERO_DOSSIER) atreal_openads.get_dossier(None, 'invalid_type', FAKE_NUMERO_DOSSIER)
assert re.search(r'^HTTP error: 404, \[path\] \(Invalid Type\) "invalid_type" is not one of DIA, PC, DP, AT, PD$', str(e.value)) assert unicode(e.value) == u"HTTP error: 404, [path] (Invalid Type) \"invalid_type\" is not one of DIA, PC, DP, AT, PD"
def test_openads_upload2ForwardFile(app, atreal_openads): def test_openads_upload2ForwardFile(app, atreal_openads):
FF = atreal_openads.upload2ForwardFile(None, None) FF = atreal_openads.upload2ForwardFile(None, None, None)
assert FF is None assert FF is None
FF = atreal_openads.upload2ForwardFile(TEST_FILE_CERFA_DIA, FAKE_NUMERO_DOSSIER) FF = atreal_openads.upload2ForwardFile(TEST_FILE_CERFA_DIA, FAKE_NUMERO_DOSSIER, 'cerfa')
assert isinstance(FF, ForwardFile) assert isinstance(FF, ForwardFile)
assert len(FF.numero_demande) > 0 assert len(FF.numero_demande) > 0
assert FF.numero_dossier == FAKE_NUMERO_DOSSIER assert FF.numero_dossier == FAKE_NUMERO_DOSSIER
assert FF.type_fichier == 'CERFA' assert FF.type_fichier == 'cerfa'
assert FF.orig_filename == os.path.basename(TEST_FILE_CERFA_DIA) assert FF.orig_filename == os.path.basename(TEST_FILE_CERFA_DIA)
assert FF.content_type == 'application/pdf' assert FF.content_type == 'application/pdf'
assert len(FF.file_hash) > 0 assert len(FF.file_hash) > 0
@ -262,15 +535,19 @@ def test_openads_upload2ForwardFile(app, atreal_openads):
def test_openads_get_fwd_files(app, atreal_openads): def test_openads_get_fwd_files(app, atreal_openads):
with pytest.raises(APIError) as e:
atreal_openads.get_fwd_files(None, FAKE_NUMERO_DOSSIER, fichier_id='not an integer')
assert unicode(e.value) == u"fichier_id must be an integer"
with pytest.raises(Http404) as e: with pytest.raises(Http404) as e:
atreal_openads.get_fwd_files(None, FAKE_NUMERO_DOSSIER, fichier_id=18) atreal_openads.get_fwd_files(None, FAKE_NUMERO_DOSSIER, fichier_id=18)
assert re.search(r"^No file matches 'numero_dossier=[^']+' and 'id=[^']+'.$", str(e.value)) assert unicode(e.value) == u"No file matches 'numero_dossier=%s' and 'id=%s'." % (FAKE_NUMERO_DOSSIER, 18)
resp404 = atreal_openads.get_fwd_files(None, FAKE_NUMERO_DOSSIER, fichier_id=None) resp_empty = atreal_openads.get_fwd_files(None, FAKE_NUMERO_DOSSIER, fichier_id=None)
assert resp404 is not None assert resp_empty is not None
assert len(resp404) == 0 assert len(resp_empty) == 0
FF = atreal_openads.upload2ForwardFile(TEST_FILE_CERFA_DIA, FAKE_NUMERO_DOSSIER) FF = atreal_openads.upload2ForwardFile(TEST_FILE_CERFA_DIA, FAKE_NUMERO_DOSSIER, 'cerfa')
FF.save() FF.save()
assert isinstance(FF, ForwardFile) assert isinstance(FF, ForwardFile)
@ -298,7 +575,7 @@ def test_openads_get_fwd_files_status(app, atreal_openads):
atreal_openads.get_fwd_files_status(None, FAKE_NUMERO_DOSSIER, fichier_id=18) atreal_openads.get_fwd_files_status(None, FAKE_NUMERO_DOSSIER, fichier_id=18)
assert re.search(r"^No file matches 'numero_dossier=[^']+' and 'id=[^']+'.$", str(e.value)) assert re.search(r"^No file matches 'numero_dossier=[^']+' and 'id=[^']+'.$", str(e.value))
FF = atreal_openads.upload2ForwardFile(TEST_FILE_CERFA_DIA, FAKE_NUMERO_DOSSIER) FF = atreal_openads.upload2ForwardFile(TEST_FILE_CERFA_DIA, FAKE_NUMERO_DOSSIER, 'cerfa')
FF.save() FF.save()
assert isinstance(FF, ForwardFile) assert isinstance(FF, ForwardFile)
@ -324,6 +601,16 @@ def test_openads_get_fwd_files_status(app, atreal_openads):
def test_openads_get_courrier(app, atreal_openads): def test_openads_get_courrier(app, atreal_openads):
fake_resp_bad = Response()
fake_resp_bad.status_code = 502
fake_resp_bad.reason = 'Bad gateway'
with pytest.raises(APIError) as e:
with mock.patch('passerelle.utils.Request.get') as requests_get:
requests_get.return_value = fake_resp_bad
atreal_openads.get_courrier(None, 'DIA', FAKE_NUMERO_DOSSIER)
assert unicode(e.value) == "HTTP error: 502"
fake_resp_json = { fake_resp_json = {
'files': [{ 'files': [{
'filename' : "instruction_4.pdf", 'filename' : "instruction_4.pdf",
@ -344,22 +631,91 @@ def test_openads_get_courrier(app, atreal_openads):
assert jresp['courrier']['content_type'] == fake_resp_json['files'][0]['content_type'] assert jresp['courrier']['content_type'] == fake_resp_json['files'][0]['content_type']
assert jresp['courrier']['b64_content'] == fake_resp_json['files'][0]['b64_content'] assert jresp['courrier']['b64_content'] == fake_resp_json['files'][0]['b64_content']
fake_resp_json['files'][0]['b64_content'] = 'invalid_;{[content}'
fake_resp._content = json.dumps(fake_resp_json)
with pytest.raises(APIError) as e:
with mock.patch('passerelle.utils.Request.get') as requests_get:
requests_get.return_value = fake_resp
atreal_openads.get_courrier(None, 'DIA', FAKE_NUMERO_DOSSIER)
assert unicode(e.value) == u'Failed to decode courrier content from base 64'
fake_resp._content = 'df[{gfd;g#vfd'
with pytest.raises(APIError) as e:
with mock.patch('passerelle.utils.Request.get') as requests_get:
requests_get.return_value = fake_resp
atreal_openads.get_courrier(None, 'DIA', FAKE_NUMERO_DOSSIER)
assert unicode(e.value) == u'No JSON content returned: %r' % fake_resp._content
def test_get_response_error(app, atreal_openads):
fake_resp_json = {
'errors': [
{
'location' : 'entity.name',
'name' : 'constraint',
'description': 'Must start with an uppercase letter'
}
]
}
fake_resp = Response()
fake_resp.status_code = 404
fake_resp.headers = {'Content-Type': 'application/json'}
fake_resp.encoding = 'utf-8'
fake_resp.reason = 'Not Found'
fake_resp._content = json.dumps(fake_resp_json)
error_msg = atreal_openads.get_response_error(fake_resp)
expected_msg = u'[%s] (%s) %s' % (
fake_resp_json['errors'][0]['location'],
fake_resp_json['errors'][0]['name'],
fake_resp_json['errors'][0]['description']
)
assert error_msg == u"HTTP error: %s, %s" % (fake_resp.status_code, ','.join([expected_msg]))
fake_resp._content = 'invalid_;{[content}'
error_msg = atreal_openads.get_response_error(fake_resp)
assert error_msg == u"HTTP error: %s, %s" % (fake_resp.status_code, fake_resp._content)
def test_openads_upload_user_files(app, atreal_openads): def test_openads_upload_user_files(app, atreal_openads):
FF = atreal_openads.upload2ForwardFile(TEST_FILE_CERFA_DIA, FAKE_NUMERO_DOSSIER)
# TODO check logs (because this doesn't do anything but log)
atreal_openads.upload_user_files('DIA', FAKE_NUMERO_DOSSIER, file_ids=[])
with pytest.raises(ForwardFile.DoesNotExist) as e:
atreal_openads.upload_user_files('DIA', FAKE_NUMERO_DOSSIER, file_ids=[999])
assert unicode(e.value) == u"ForwardFile matching query does not exist."
FF = atreal_openads.upload2ForwardFile(TEST_FILE_CERFA_DIA, FAKE_NUMERO_DOSSIER, 'cerfa')
FF.save() FF.save()
assert isinstance(FF, ForwardFile) assert isinstance(FF, ForwardFile)
assert FF.upload_status == 'pending' assert FF.upload_status == 'pending'
file_id = FF.id file_id = FF.id
assert file_id assert file_id
fake_resp_json = "You want add some files on %s " % FAKE_NUMERO_DOSSIER fake_resp_bad = Response()
fake_resp_bad.status_code = 502
fake_resp_bad.reason = 'Bad gateway'
with mock.patch('passerelle.utils.Request.post') as requests_post:
requests_post.return_value = fake_resp_bad
atreal_openads.upload_user_files('DIA', FAKE_NUMERO_DOSSIER, file_ids=[file_id])
FFup = ForwardFile.objects.get(id=file_id)
assert isinstance(FFup, ForwardFile)
for k in ['numero_dossier', 'type_fichier', 'file_hash', 'orig_filename', 'content_type']:
assert getattr(FFup, k) == getattr(FF, k)
assert FFup.upload_attempt == 1
assert FFup.upload_status == 'failed'
assert FFup.upload_msg == "HTTP error: 502"
fake_resp = Response() fake_resp = Response()
fake_resp.status_code = 200 fake_resp.status_code = 200
fake_resp.headers = {'Content-Type': 'application/json'} fake_resp.headers = {'Content-Type': 'application/json'}
fake_resp.encoding = 'utf-8' fake_resp.encoding = 'utf-8'
fake_resp.reason = 'OK' fake_resp.reason = 'OK'
fake_resp._content = json.dumps(fake_resp_json)
fake_resp._content = 'invalid_;{[content}'
with mock.patch('passerelle.utils.Request.post') as requests_post: with mock.patch('passerelle.utils.Request.post') as requests_post:
requests_post.return_value = fake_resp requests_post.return_value = fake_resp
atreal_openads.upload_user_files('DIA', FAKE_NUMERO_DOSSIER, file_ids=[file_id]) atreal_openads.upload_user_files('DIA', FAKE_NUMERO_DOSSIER, file_ids=[file_id])
@ -368,5 +724,21 @@ def test_openads_upload_user_files(app, atreal_openads):
assert isinstance(FFup, ForwardFile) assert isinstance(FFup, ForwardFile)
for k in ['numero_dossier', 'type_fichier', 'file_hash', 'orig_filename', 'content_type']: for k in ['numero_dossier', 'type_fichier', 'file_hash', 'orig_filename', 'content_type']:
assert getattr(FFup, k) == getattr(FF, k) assert getattr(FFup, k) == getattr(FF, k)
assert FFup.upload_status == 'success' assert FFup.upload_attempt == 2
assert FFup.upload_status == 'failed'
assert FFup.upload_msg == u'No JSON content returned: %r' % fake_resp._content
fake_resp_json = "You want add some files on %s " % FAKE_NUMERO_DOSSIER
fake_resp._content = json.dumps(fake_resp_json)
with mock.patch('passerelle.utils.Request.post') as requests_post:
requests_post.return_value = fake_resp
atreal_openads.upload_user_files('DIA', FAKE_NUMERO_DOSSIER, file_ids=[file_id])
FFup = ForwardFile.objects.get(id=file_id)
assert isinstance(FFup, ForwardFile)
for k in ['numero_dossier', 'type_fichier', 'file_hash', 'orig_filename', 'content_type']:
assert getattr(FFup, k) == getattr(FF, k)
assert FFup.upload_attempt == 3
assert FFup.upload_status == 'success'
assert FFup.upload_msg == 'uploaded successfuly'