Making tests independant of 'passerelle' and 'vagrant', added HTTP Basic credentials

Tests:
    * moved 'tests' folder inside 'atreal_openads' module
    * removed useless settings.py file
    * removed useless imports
    * added test file 'resources'
    * fixed file path (no more vagrant)
    * updated the test case of 'upload2ForwardFile' to test for 'plan' type_fichier
    * added HTTP Basic credentials to 'atreal_openads' fixture (removed 'token')

Fixes:
    * stop guessing ForwardFile.type_fichier based on file path
This commit is contained in:
Michael Bideau 2019-07-11 17:07:41 +02:00
parent 88fa11457c
commit 1ac8c2f14e
9 changed files with 640 additions and 6 deletions

View File

@ -444,16 +444,14 @@ class AtrealOpenads(BaseResource):
}
def upload2ForwardFile(self, path, numero_dossier):
def upload2ForwardFile(self, path, numero_dossier, type_fichier='CERFA'):
"""Convert a file path to a ForwardFile."""
if path:
rand_id = base64.urlsafe_b64encode(os.urandom(6))
fwd_file = ForwardFile()
fwd_file.numero_demande = rand_id
fwd_file.numero_dossier = numero_dossier
fwd_file.type_fichier = 'CERFA' if path == TEST_FILE_CERFA_DIA else \
'plan' if path == TEST_FILE_PLAN_CADASTRAL else \
'unknown'
fwd_file.type_fichier = type_fichier
fwd_file.orig_filename = os.path.basename(path)
fwd_file.content_type = magic.from_file(path, mime=True)
with open(path, 'r') as fp:
@ -533,7 +531,7 @@ class AtrealOpenads(BaseResource):
})
def add_file_async(self, request, type_dossier, numero_dossier, *args, **kwargs):
f = TEST_FILE_CERFA_DIA
fwd_file = self.upload2ForwardFile(f, numero_dossier)
fwd_file = self.upload2ForwardFile(f, numero_dossier, 'CERFA')
fwd_file.save()
job = self.add_job('upload_user_files',
natural_id=numero_dossier,
@ -621,7 +619,7 @@ class AtrealOpenads(BaseResource):
})
def createForwardFile(self, request, numero_dossier, *args, **kwargs):
f = TEST_FILE_CERFA_DIA
fwd_file = self.upload2ForwardFile(f, numero_dossier)
fwd_file = self.upload2ForwardFile(f, numero_dossier, 'CERFA')
fwd_file.save()
return {'message': "ForwardFile '%s' created" % fwd_file.id}

View File

@ -0,0 +1 @@
!coverage.py: This is a private format, don't read it directly!{"lines":{"/home/vagrant/passerelle-atreal-openads/atreal_openads/tests/test_atreal_openads.py":[9,10,11,12,13,14,15,17,18,19,20,22,23,24,26,27,28,29,30,31,33,34,36,37,38,39,40,43,45,46,47,51,53,54,55,56,57,58,62,63,64,66,67,70,71,72,73,74,75,76,77,78,79,80,81,82,83,86,87,88,89,90,91,92,93,94,95,96,97,98,99,100,101,102,103,105,106,107,108,109,110,111,112,113,114,115,116,117,118,120,121,122,123,124,125,126,127,128,129,130,131,132,133,134,135,136,137,139,140,141,142,143,144,145,146,147,148,149,150,151,152,155,156,157,158,159,161,162,163,164,165,166,167,168,169,170,171,172,173,174,175,176,179,180,181,183,184,185,186,187,190,191,192,193,194,195,196,197,198,200,201,202,203,204,205,206,207,208,209,211,212,213,214,215,218,219,220,225,226,227,230,231,232,233,236,237,238,239,240,241,242,243,244,245,246,247,249,250,251,252,253,254,257,258,259,260,261,262,264,265,266,267,268,269,270,272,273,274,275,276,277,278,279,280,281,282,284,285,286,287,288,289,291,292,293,294,295,296,297,298,299,300,302,303,305,306,307,310,311,312,314,315,316,317,318,319,320,321,322,323,325,326,327,328,329,330,331,332,333,334,337,338,339,340,341,342,343,345,346,347,348,349,350,351,352,353,354,355,356,357,358,360,361,362,363,364,367,368,369,370,371,372,373,374,375,376,377,380,381,382,383,384,385,386,387,388,389,390,391,394,395,396,397,398,400,401,402,403,404,405,406,407,408,410,411,412,413,414,415,416,417,420,421,422,423,424,425,426,427,428,429,431,432,434,435,438,439,440,441,443,444,445,447,448,449,451,452,453,454,455,456,457,458,460,461,462,463,464,465,466,467,469,470,471,472,473,474,475,476,477,479,480,481,482,483,484,485,486,487,490,491,492,493,494,495,496,498,499,500,501,502,503,504,505,506,507,510,511,512,513,514,515,518,519,520,521,522,523,524,525,526,527,528,529,532,533,534,535,536,537,538,540,541,542,543,544,545,546,547,548,549,551,552,553,554,555],"/home/vagrant/passerelle-atreal-openads/atreal_openads/tests/conftest.py":[1,2,35,4,6,8,41,10,13,15,16,17,18,19,22,30],"/home/vagrant/passerelle-atreal-openads/atreal_openads/models.py":[20,21,22,23,24,25,26,27,28,29,30,32,34,35,36,37,38,40,41,42,45,46,47,50,52,56,63,65,66,67,68,69,70,71,74,76,78,81,82,83,86,88,89,90,94,99,100,101,105,106,107,108,109,110,111,112,113,114,115,116,119,120,121,122,123,124,125,126,127,128,130,132,134,136,137,139,140,141,142,143,144,146,147,148,151,152,153,154,155,156,157,158,160,161,162,163,164,165,166,167,168,169,170,171,172,173,174,175,183,184,185,186,187,188,189,190,191,194,195,196,198,199,200,201,202,203,238,239,240,241,242,243,244,245,246,247,248,249,257,259,260,261,262,264,265,266,267,268,269,270,273,274,275,276,277,278,279,280,281,282,283,284,286,288,289,290,291,292,293,295,298,299,300,301,302,304,305,306,307,308,309,312,313,316,317,318,319,320,321,322,323,324,325,328,331,333,334,335,337,338,339,340,341,342,343,344,345,346,347,348,349,350,351,353,354,355,356,357,358,359,360,363,364,365,367,368,369,373,374,378,379,380,381,382,383,384,385,386,387,388,390,391,393,394,395,398,400,401,402,403,404,406,408,409,410,411,412,413,414,415,416,417,418,419,420,421,422,423,424,426,428,429,430,431,432,433,434,435,436,437,438,441,442,443,447,449,450,451,452,453,454,455,456,457,458,459,460,461,462,465,466,467,468,469,470,471,474,475,476,477,478,479,480,481,484,485,486,487,488,489,492,493,494,495,496,497,498,501,503,504,505,506,507,510,511,512,516,517,521,524,525,526,527,528,529,530,533,534,535,536,537,538,539,540,541,542,543,547,548,549,550,551,552,553,555,556,557,558,559,560,563,564,565,566,567,568,569,570,572,573,574,575,576,577,578,579,582,583,584,585,586,587,588,589,590,591,592,593,594,595,596,597,600,601,602,603,604,606,607,609,612,613,614,615,616,617,618,621,622,623,624,627,628,629,630,631,632,633,636,637,638,639,640,644,645,648,649,650,653,656,658,659,660,661,662,663,664,665,666,667,668,669,677,679,680,681,682,683,684,685,686,687,688,689,690,692,693,694,700,701,702,703,706,707,708,709,710,711,712,713,714,715,727,728,741,742,743,745,746,747,748,749,750,751,762,764,765,766,767,768,769,770],"/home/vagrant/passerelle-atreal-openads/atreal_openads/migrations/__init__.py":[1],"/home/vagrant/passerelle-atreal-openads/atreal_openads/migrations/0001_initial.py":[3,5,6,9,11,14,18,19,21,22,23,24,25,26,27,28,29,31,32,35,36,38,39,40,41,42,43,44,45,46,47,48],"/home/vagrant/passerelle-atreal-openads/atreal_openads/__init__.py":[1]}}

Binary file not shown.

View File

@ -0,0 +1,67 @@
import pytest
from httmock import urlmatch, HTTMock, response
import django_webtest
from django.core.cache import cache
@pytest.fixture(autouse=True)
def media(settings, tmpdir):
settings.MEDIA_ROOT = str(tmpdir.mkdir('media'))
@pytest.fixture
def app(request):
wtm = django_webtest.WebTestMixin()
wtm._patch_settings()
request.addfinalizer(wtm._unpatch_settings)
cache.clear()
return django_webtest.DjangoTestApp()
@pytest.fixture
def endpoint_dummy_cache(monkeypatch):
from django.core.cache import caches
import passerelle.views
monkeypatch.setattr(
passerelle.views, 'cache', caches['dummy'])
@urlmatch()
def internal_server_error(url, request):
return response(500, 'Internal server error')
@pytest.fixture
def mock_500():
with HTTMock(internal_server_error):
yield None
@pytest.fixture
def relax_openssl(tmpdir):
'''OpenSSL default configuration has been really strict for some years,
this fixture set a temporary really permisive ciphers list.'''
import os
openssl_cnf_path = tmpdir / 'openssl.cnf'
with openssl_cnf_path.open('w') as fd:
fd.write(u'''
[default_conf]
ssl_conf = ssl_sect
[ssl_sect]
system_default = system_default_sect
[system_default_sect]
CipherString = ALL''')
old_value = os.environ.get('OPENSSL_CONF', None)
try:
os.environ['OPENSSL_CONF'] = str(openssl_cnf_path)
yield
finally:
if old_value is None:
del os.environ['OPENSSL_CONF']
else:
os.environ['OPENSSL_CONF'] = old_value

Binary file not shown.

Binary file not shown.

Binary file not shown.

After

Width:  |  Height:  |  Size: 3.6 KiB

View File

@ -0,0 +1,11 @@
import os
passerelle_root_dir = os.environ.get('PASSERELLE_ROOT_DIR')
if passerelle_root_dir and os.path.isdir(passerelle_root_dir):
passerelle_settings_file = os.path.join(passerelle_root_dir, 'tests/settings.py')
if os.path.exists(passerelle_settings_file):
execfile(passerelle_settings_file)
INSTALLED_APPS += ('passerelle.apps.atreal_openads',)

View File

@ -0,0 +1,557 @@
# -*- coding: utf-8 -*-
# to run it use the following command in the 'tests' directory:
# ~> DJANGO_SETTINGS_MODULE=passerelle.settings pytest -W ignore::django.utils.deprecation.RemovedInDjango20Warning test_atreal_openads.py -vv
#
# and with 'coverage':
# ~> DJANGO_SETTINGS_MODULE=passerelle.settings pytest -W ignore::django.utils.deprecation.RemovedInDjango20Warning test_atreal_openads.py -vv --cov=~/src/passerelle/passerelle/apps/atreal_openads
import pytest
import mock
import json
import os
import base64
import re
from requests import Response
from django.http import Http404
from django.http.request import HttpRequest, QueryDict
from django.http.response import JsonResponse
from django.core.files import File
from passerelle.apps.atreal_openads.models import AtrealOpenads, ForwardFile
from passerelle.utils.jsonresponse import APIError
from passerelle.base.models import Job
CONNECTOR_NAME = 'atreal-openads'
CONNECTOR_SLUG = 'atreal'
COLLECTIVITE = 3
OPENADS_API_LOGIN = 'publik-passerelle'
OPENADS_API_PASSWORD = base64.urlsafe_b64encode(os.urandom(20))
OPENADS_API_URL = 'http://openads.api/'
FAKE_COOKIE_CRSF = base64.urlsafe_b64encode(os.urandom(20))
FAKE_NUMERO_DOSSIER = base64.urlsafe_b64encode(os.urandom(10))
TESTS_DIR = os.path.dirname(__file__)
RESOURCES_DIR = os.path.join(TESTS_DIR, 'resources')
TEST_FILE_TRAC_ICO = os.path.join(RESOURCES_DIR, 'trac.ico')
TEST_FILE_CERFA_DIA = os.path.join(RESOURCES_DIR, 'cerfa_10072-02.pdf')
TEST_FILE_PLAN_CADASTRAL = os.path.join(RESOURCES_DIR, 'plancadastral.pdf')
def get_file_data(path, b64=True):
"""Return the content of a file as a string, in base64 if specified."""
with open(path, 'r') as f:
if b64:
return base64.b64encode(f.read())
return f.read()
@pytest.fixture
def atreal_openads(db):
return AtrealOpenads.objects.create(
slug = CONNECTOR_SLUG,
collectivite = COLLECTIVITE,
openADS_API_login = OPENADS_API_LOGIN,
openADS_API_password = OPENADS_API_PASSWORD,
openADS_API_url = OPENADS_API_URL
)
def test_openads_hello(app, atreal_openads):
resp = atreal_openads.hello(None)
assert resp['hello'] == 'world'
resp = atreal_openads.hello(None, 'toto')
assert resp['hello'] == 'toto'
def test_openads_echo(app, atreal_openads):
meta = {
'Host' : 'passerelle.dev.publik.love',
'User-Agent' : 'Mozilla Firefox',
'Accept' : 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'fr,fr-FR;q=0.8,en-US;q=0.5,en;q=0.3',
'Accept-Encoding': 'gzip, deflate, br',
'Referer' : 'https://passerelle.dev.publik.love/%s/%s/' % (CONNECTOR_NAME, CONNECTOR_SLUG),
'Cookie' : 'publik_portal_agent_url=https%3A%2F%2Fagent-combo.dev.publik.love%2F; '
'publik_portal_agent_title=Portail%20Agent; csrftoken-2baa71=' + FAKE_COOKIE_CRSF,
'DNT' : '1',
'Connection' : 'keep-alive',
'Pragma' : 'no-cache',
'Cache-Control' : 'no-cache'
}
req = HttpRequest()
req._body = ""
req.path = '/test'
req.method = 'GET'
req.encoding = 'utf-8'
req.GET = QueryDict(mutable=True) # required because of encoding setter
req.POST = QueryDict(mutable=True) # required because of encoding setter
req.content_type = 'text/plain'
req.content_params = None
req.COOKIES = {}
req.META = meta
req._read_started = False
resp = atreal_openads.echo(req, body=False, cookies=False, meta=False, empty=False)
jresp = json.loads(resp['received'])
assert jresp['scheme'] == 'http'
assert jresp['method'] == req.method
assert jresp['content_type'] == req.content_type
assert jresp['encoding'] == req.encoding
resp = atreal_openads.echo(req, body=True, cookies=True, meta=True, empty=True)
jresp = json.loads(resp['received'])
assert jresp['body'] == req._body
assert jresp['FILES'] == []
assert jresp['content_params'] == None
assert jresp['encoding'] == req.encoding
assert jresp['GET'] == {}
assert jresp['COOKIES'] == {}
assert jresp['content_type'] == req.content_type
assert jresp['POST'] == {}
assert jresp['scheme'] == 'http'
assert jresp['method'] == req.method
for k,v in req.META.items():
jresp['META'][k] == v
req = HttpRequest()
req._body = '{"data": "test"}'
req.path = '/test'
req.method = 'POST'
req.encoding = 'utf-8'
req.GET = QueryDict(mutable=True) # required because of encoding setter
req.POST = QueryDict(mutable=True) # required because of encoding setter
req.content_type = 'application/json; charset=UTF-8'
req.content_params = None
req.COOKIES = {}
req.META = meta
req._read_started = False
resp = atreal_openads.echo(req, body=False, cookies=False, meta=False, empty=False)
jresp = json.loads(resp['received'])
assert jresp['scheme'] == 'http'
assert jresp['method'] == req.method
assert jresp['content_type'] == req.content_type
assert jresp['encoding'] == req.encoding
resp = atreal_openads.echo(req, body=True, cookies=True, meta=True, empty=True)
jresp = json.loads(resp['received'])
assert jresp['body'] == req._body
assert jresp['FILES'] == []
assert jresp['content_params'] == None
assert jresp['encoding'] == req.encoding
assert jresp['GET'] == {}
assert jresp['COOKIES'] == {}
assert jresp['content_type'] == req.content_type
assert jresp['POST'] == {}
assert jresp['scheme'] == 'http'
assert jresp['method'] == req.method
for k,v in req.META.items():
jresp['META'][k] == v
def test_openads_echofile(app, atreal_openads):
test_file_json = {
'filename' : os.path.basename(TEST_FILE_TRAC_ICO),
'content_type' : 'image/x-icon',
'b64_content' : 'ccc'
}
req = HttpRequest()
req._body = json.dumps(test_file_json)
req.path = '/test'
req.method = 'POST'
req.encoding = 'utf-8'
req.GET = QueryDict(mutable=True) # required because of encoding setter
req.POST = QueryDict(mutable=True) # required because of encoding setter
req.content_type = 'application/json'
req.content_params = None
req.COOKIES = {}
req.META = {}
req._read_started = False
jresp = atreal_openads.echofile(req)
assert jresp['filename'] == test_file_json['filename']
assert jresp['content_type'] == test_file_json['content_type']
assert jresp['content'] == test_file_json['b64_content']
def test_openads_check_status(app, atreal_openads):
fake_resp_json = {
'message': 'Service online'
}
fake_resp = JsonResponse(fake_resp_json)
with mock.patch('passerelle.utils.Request.get') as requests_get:
requests_get.return_value = mock.Mock(content=fake_resp, status_code=200)
jresp = atreal_openads.check_status()
assert jresp['response'] == 200
def test_openads_create_dossier(app, atreal_openads):
fake_req_json = {
"fields": {
"autres_parcelles": True,
"cerfa": {
"content": get_file_data(TEST_FILE_CERFA_DIA),
"content_type": "application/pdf",
"field_id": "50",
"filename": os.path.basename(TEST_FILE_CERFA_DIA)
},
"code_postal": "13004",
"localite": "Marseille",
"nom": "Loulou",
"nom_voie": "Avenue de la Blaque",
"numero_voie": "52",
"plan_cadastral_1": {
"content": get_file_data(TEST_FILE_PLAN_CADASTRAL),
"content_type": "application/pdf",
"field_id": "113",
"filename": os.path.basename(TEST_FILE_PLAN_CADASTRAL)
},
"prenom": "Toto",
"proprietaire": "Oui",
"proprietaire_qualite": "Un particulier",
"proprietaire_qualite_raw": "Un particulier",
"proprietaire_raw": "Oui",
"reference_cadastrale": [
[
"999",
"Z",
"0010"
]
],
"references_cadastrales": [
[
"123",
"D",
"9874"
]
],
"terrain_code_postal": "13002",
"terrain_localite": "Marseille",
"terrain_nom_voie": "Boulevard de la Linule",
"terrain_numero_voie": "23"
}
}
req = HttpRequest()
req._body = json.dumps(fake_req_json)
req.path = '/test'
req.method = 'POST'
req.encoding = 'utf-8'
req.GET = QueryDict(mutable=True) # required because of encoding setter
req.POST = QueryDict(mutable=True) # required because of encoding setter
req.content_type = 'application/json'
req.content_params = None
req.COOKIES = {}
req.META = {}
req._read_started = False
fake_resp_json = {
'numero_dossier' : FAKE_NUMERO_DOSSIER,
'files': [{
'b64_content' : get_file_data(TEST_FILE_CERFA_DIA),
'content_type' : 'text/plain',
'filename' : 'recepisse_depot_%s.pdf' % FAKE_NUMERO_DOSSIER,
}]
}
fake_resp = Response()
fake_resp.status_code = 200
fake_resp.headers = {'Content-Type': 'application/json'}
fake_resp.encoding = 'utf-8'
fake_resp.reason = 'OK'
fake_resp._content = json.dumps(fake_resp_json)
with mock.patch('passerelle.utils.Request.post') as requests_post:
requests_post.return_value = fake_resp
jresp = atreal_openads.create_dossier(req, 'DIA')
assert jresp['numero_dossier'] == fake_resp_json['numero_dossier']
assert jresp['recepisse']['b64_content'] == fake_resp_json['files'][0]['b64_content']
assert jresp['recepisse']['content_type'] == 'application/pdf'
assert jresp['recepisse']['filename'] == fake_resp_json['files'][0]['filename']
job = Job.objects.filter(natural_id=FAKE_NUMERO_DOSSIER).last()
assert job
job_id = job.id
assert job.status == 'registered'
assert job.method_name == 'upload_user_files'
assert job.natural_id == FAKE_NUMERO_DOSSIER
assert job.parameters is not None
assert len(job.parameters) == 3
assert 'file_ids' in job.parameters
assert len(job.parameters['file_ids']) == 2
file_ids = job.parameters['file_ids']
FFs = ForwardFile.objects.filter(id__in=file_ids)
for FF in FFs:
assert len(FF.numero_demande) > 0
assert FF.numero_dossier == FAKE_NUMERO_DOSSIER
assert len(FF.file_hash) > 0
assert FF.upload_status == 'pending'
fake_resp_json = "You want add some files on %s " % FAKE_NUMERO_DOSSIER
fake_resp = Response()
fake_resp.status_code = 200
fake_resp.headers = {'Content-Type': 'application/json'}
fake_resp.encoding = 'utf-8'
fake_resp.reason = 'OK'
fake_resp._content = json.dumps(fake_resp_json)
with mock.patch('passerelle.utils.Request.post') as requests_post:
requests_post.return_value = fake_resp
atreal_openads.jobs()
job = Job.objects.get(id=job_id)
assert job.status == 'completed'
FFs = ForwardFile.objects.filter(id__in=file_ids)
for FF in FFs:
assert FF.upload_status == 'success'
def test_openads_upload2ForwardFile(app, atreal_openads):
FF = atreal_openads.upload2ForwardFile(None, None)
assert FF is None
FF = atreal_openads.upload2ForwardFile(TEST_FILE_CERFA_DIA, FAKE_NUMERO_DOSSIER)
assert isinstance(FF, ForwardFile)
assert len(FF.numero_demande) > 0
assert FF.numero_dossier == FAKE_NUMERO_DOSSIER
assert FF.type_fichier == 'CERFA'
assert FF.orig_filename == os.path.basename(TEST_FILE_CERFA_DIA)
assert FF.content_type == 'application/pdf'
assert len(FF.file_hash) > 0
assert isinstance(FF.upload_file, File)
assert FF.upload_status == 'pending'
FF = atreal_openads.upload2ForwardFile(TEST_FILE_PLAN_CADASTRAL, FAKE_NUMERO_DOSSIER, 'plan')
assert isinstance(FF, ForwardFile)
assert len(FF.numero_demande) > 0
assert FF.numero_dossier == FAKE_NUMERO_DOSSIER
assert FF.type_fichier == 'plan'
assert FF.orig_filename == os.path.basename(TEST_FILE_PLAN_CADASTRAL)
assert FF.content_type == 'application/pdf'
assert len(FF.file_hash) > 0
assert isinstance(FF.upload_file, File)
assert FF.upload_status == 'pending'
def test_openads_get_dossier(app, atreal_openads):
fake_resp_json = {
'etat' : u"Non préemption en cours",
'date_depot' : "24/04/2019",
'date_decision' : "",
'decision' : "",
'date_limite_instruction': "24/06/2019"
}
fake_resp = Response()
fake_resp.status_code = 200
fake_resp.headers = {'Content-Type': 'application/json'}
fake_resp.encoding = 'utf-8'
fake_resp.reason = 'OK'
fake_resp._content = json.dumps(fake_resp_json)
with mock.patch('passerelle.utils.Request.get') as requests_get:
requests_get.return_value = fake_resp
jresp = atreal_openads.get_dossier(None, 'DIA', FAKE_NUMERO_DOSSIER)
assert jresp['etat'] == fake_resp_json['etat']
assert jresp['date_depot'] == fake_resp_json['date_depot']
assert jresp['date_decision'] == fake_resp_json['date_decision']
assert jresp['decision'] == fake_resp_json['decision']
assert jresp['date_limite_instruction'] == fake_resp_json['date_limite_instruction']
fake_resp_json = {
'errors' : [{
'location' : 'path',
'name' : 'Invalid Type',
'description' : '"invalid_type" is not one of DIA, PC, DP, AT, PD'
}]
}
fake_resp = Response()
fake_resp.status_code = 404
fake_resp.headers = {'Content-Type': 'application/json'}
fake_resp.encoding = 'utf-8'
fake_resp.reason = 'Resource not found'
fake_resp._content = json.dumps(fake_resp_json)
with pytest.raises(APIError) as e:
with mock.patch('passerelle.utils.Request.get') as requests_get:
requests_get.return_value = fake_resp
jresp = atreal_openads.get_dossier(None, 'invalid_type', FAKE_NUMERO_DOSSIER)
assert re.search(r'^HTTP error: 404, \[path\] \(Invalid Type\) "invalid_type" is not one of DIA, PC, DP, AT, PD$', str(e.value))
def test_openads_add_file(app, atreal_openads):
fake_resp_json = "You want add some files on %s " % FAKE_NUMERO_DOSSIER
fake_resp = Response()
fake_resp.status_code = 200
fake_resp.headers = {'Content-Type': 'application/json'}
fake_resp.encoding = 'utf-8'
fake_resp.reason = 'OK'
fake_resp._content = json.dumps(fake_resp_json)
with mock.patch('passerelle.utils.Request.post') as requests_post:
requests_post.return_value = fake_resp
jresp = atreal_openads.add_file(None, 'DIA', FAKE_NUMERO_DOSSIER)
assert jresp == fake_resp_json
def test_openads_add_file_async(app, atreal_openads):
jresp = atreal_openads.add_file_async(None, 'DIA', FAKE_NUMERO_DOSSIER)
assert jresp['message'] == 'upload is pending (async)'
assert jresp['job_id'] is not None
job_id = jresp['job_id']
job = Job.objects.get(id=job_id)
assert job.status == 'registered'
assert job.method_name == 'upload_user_files'
assert job.natural_id == FAKE_NUMERO_DOSSIER
assert job.parameters is not None
assert len(job.parameters) == 3
assert 'file_ids' in job.parameters
assert len(job.parameters['file_ids']) == 1
file_id = job.parameters['file_ids'][0]
FF = ForwardFile.objects.get(id=file_id)
assert len(FF.numero_demande) > 0
assert FF.numero_dossier == FAKE_NUMERO_DOSSIER
assert FF.type_fichier == 'CERFA'
assert FF.orig_filename == os.path.basename(TEST_FILE_CERFA_DIA)
assert FF.content_type == 'application/pdf'
assert len(FF.file_hash) > 0
assert FF.upload_status == 'pending'
fake_resp_json = "You want add some files on %s " % FAKE_NUMERO_DOSSIER
fake_resp = Response()
fake_resp.status_code = 200
fake_resp.headers = {'Content-Type': 'application/json'}
fake_resp.encoding = 'utf-8'
fake_resp.reason = 'OK'
fake_resp._content = json.dumps(fake_resp_json)
with mock.patch('passerelle.utils.Request.post') as requests_post:
requests_post.return_value = fake_resp
atreal_openads.jobs()
job = Job.objects.get(id=job_id)
assert job.status == 'completed'
FF = ForwardFile.objects.get(id=file_id)
assert FF.upload_status == 'success'
def test_openads_get_fwd_files_status(app, atreal_openads):
with pytest.raises(Http404) as e:
resp404 = atreal_openads.get_fwd_files_status(None, FAKE_NUMERO_DOSSIER, fichier_id=18, summary=None)
assert re.search(r"^No file matches 'numero_dossier=[^']+' and 'id=[^']+'.$", str(e.value))
resp404 = atreal_openads.get_fwd_files_status(None, FAKE_NUMERO_DOSSIER, fichier_id=None, summary=None)
assert resp404 is not None
assert len(resp404) == 0
FF = atreal_openads.upload2ForwardFile(TEST_FILE_TRAC_ICO, FAKE_NUMERO_DOSSIER)
FF.save()
assert isinstance(FF, ForwardFile)
jresp = atreal_openads.get_fwd_files_status(None, FAKE_NUMERO_DOSSIER, fichier_id=None, summary=None)
assert jresp is not None
assert len(jresp) == 1
assert jresp[0]['id'] == FF.id
for k in ['numero_dossier', 'type_fichier', 'file_hash', 'orig_filename', 'content_type', 'upload_status', 'upload_msg']:
assert jresp[0][k] == getattr(FF, k)
assert jresp[0]['b64_content'] == get_file_data(FF.upload_file.path)
assert jresp[0]['last_update_datetime'] == FF.last_update_datetime
jresp = atreal_openads.get_fwd_files_status(None, FAKE_NUMERO_DOSSIER, fichier_id=FF.id, summary=None)
assert jresp is not None
assert len(jresp) == 1
assert jresp[0]['id'] == FF.id
for k in ['numero_dossier', 'type_fichier', 'file_hash', 'orig_filename', 'content_type', 'upload_status', 'upload_msg']:
assert jresp[0][k] == getattr(FF, k)
assert jresp[0]['b64_content'] == get_file_data(FF.upload_file.path)
assert jresp[0]['last_update_datetime'] == FF.last_update_datetime
jresp = atreal_openads.get_fwd_files_status(None, FAKE_NUMERO_DOSSIER, fichier_id=None, summary='1')
assert jresp is not None
assert jresp['all_forwarded'] == False
status_msg = '[%s] %s => %s' % (FF.id, FF.orig_filename, FF.upload_msg)
assert len(jresp['pending']) == 1
assert jresp['pending'][0] == status_msg
assert len(jresp['uploading']) == 0
assert len(jresp['success']) == 0
assert len(jresp['failed']) == 0
jresp = atreal_openads.get_fwd_files_status(None, FAKE_NUMERO_DOSSIER, fichier_id=FF.id, summary='1')
assert jresp is not None
assert jresp['all_forwarded'] == False
status_msg = '[%s] %s => %s' % (FF.id, FF.orig_filename, FF.upload_msg)
assert len(jresp['pending']) == 1
assert jresp['pending'][0] == status_msg
assert len(jresp['uploading']) == 0
assert len(jresp['success']) == 0
assert len(jresp['failed']) == 0
def test_openads_createForwardFile(app, atreal_openads):
jresp = atreal_openads.createForwardFile(None, FAKE_NUMERO_DOSSIER)
assert jresp is not None
m = re.search(r"^ForwardFile '(\d+)' created$", jresp['message'])
assert m
assert int(m.group(1)) > 0
file_id = int(m.group(1))
FF = ForwardFile.objects.get(id=file_id)
assert isinstance(FF, ForwardFile)
assert len(FF.numero_demande) > 0
assert FF.numero_dossier == FAKE_NUMERO_DOSSIER
assert FF.type_fichier == 'CERFA'
assert FF.orig_filename == os.path.basename(TEST_FILE_CERFA_DIA)
assert FF.content_type == 'application/pdf'
assert len(FF.file_hash) > 0
assert isinstance(FF.upload_file, File)
assert FF.upload_status == 'pending'
def test_openads_get_courrier(app, atreal_openads):
fake_resp_json = {
'files': [{
'filename' : "instruction_4.pdf",
'content_type' : "text/plain",
'b64_content' : get_file_data(TEST_FILE_CERFA_DIA)
}]
}
fake_resp = Response()
fake_resp.status_code = 200
fake_resp.headers = {'Content-Type': 'application/json'}
fake_resp.encoding = 'utf-8'
fake_resp.reason = 'OK'
fake_resp._content = json.dumps(fake_resp_json)
with mock.patch('passerelle.utils.Request.get') as requests_get:
requests_get.return_value = fake_resp
jresp = atreal_openads.get_courrier(None, 'DIA', FAKE_NUMERO_DOSSIER)
assert jresp['courrier']['filename'] == fake_resp_json['files'][0]['filename']
assert jresp['courrier']['content_type'] == fake_resp_json['files'][0]['content_type']
assert jresp['courrier']['b64_content'] == fake_resp_json['files'][0]['b64_content']
def test_openads_upload_user_files(app, atreal_openads):
FF = atreal_openads.upload2ForwardFile(TEST_FILE_TRAC_ICO, FAKE_NUMERO_DOSSIER)
FF.save()
assert isinstance(FF, ForwardFile)
assert FF.upload_status == 'pending'
file_id = FF.id
assert file_id
fake_resp_json = "You want add some files on %s " % FAKE_NUMERO_DOSSIER
fake_resp = Response()
fake_resp.status_code = 200
fake_resp.headers = {'Content-Type': 'application/json'}
fake_resp.encoding = 'utf-8'
fake_resp.reason = 'OK'
fake_resp._content = json.dumps(fake_resp_json)
with mock.patch('passerelle.utils.Request.post') as requests_post:
requests_post.return_value = fake_resp
atreal_openads.upload_user_files('DIA', FAKE_NUMERO_DOSSIER, file_ids=[file_id])
FFup = ForwardFile.objects.get(id=file_id)
assert isinstance(FFup, ForwardFile)
for k in ['numero_dossier', 'type_fichier', 'file_hash', 'orig_filename', 'content_type']:
assert getattr(FFup, k) == getattr(FF, k)
assert FFup.upload_status == 'success'