This repository has been archived on 2023-02-22. You can view files and clone it, but cannot push or open issues or pull requests.
passerelle-atreal-openads/tests/test_atreal_openads.py

363 lines
14 KiB
Python

# -*- coding: utf-8 -*-
# to run it use the following command in the 'tests' directory:
# ~> DJANGO_SETTINGS_MODULE=passerelle.settings pytest -W ignore::django.utils.deprecation.RemovedInDjango20Warning test_atreal_openads.py -vv
#
# and with 'coverage':
# ~> DJANGO_SETTINGS_MODULE=passerelle.settings pytest -W ignore::django.utils.deprecation.RemovedInDjango20Warning test_atreal_openads.py -vv --cov=~/src/passerelle/passerelle/apps/atreal_openads
import pytest
import mock
import json
import os
import base64
import re
from requests import Response
from django.http import Http404
from django.http.request import HttpRequest, QueryDict
from django.http.response import JsonResponse
from django.core.files import File
from passerelle.utils.jsonresponse import APIError
from passerelle.base.models import Job
from atreal_openads.models import AtrealOpenads, ForwardFile
CONNECTOR_NAME = 'atreal-openads'
CONNECTOR_SLUG = 'atreal'
COLLECTIVITE = 3
OPENADS_API_LOGIN = 'publik-passerelle'
OPENADS_API_PASSWORD = base64.urlsafe_b64encode(os.urandom(20))
OPENADS_API_URL = 'http://openads.api/'
FAKE_COOKIE_CRSF = base64.urlsafe_b64encode(os.urandom(20))
FAKE_NUMERO_DOSSIER = base64.urlsafe_b64encode(os.urandom(10))
TESTS_DIR = os.path.dirname(__file__)
RESOURCES_DIR = os.path.join(TESTS_DIR, 'resources')
TEST_FILE_CERFA_DIA = os.path.join(RESOURCES_DIR, 'cerfa_10072-02.pdf')
TEST_FILE_PLAN_CADASTRAL = os.path.join(RESOURCES_DIR, 'plancadastral.pdf')
def get_file_data(path, b64=True):
"""Return the content of a file as a string, in base64 if specified."""
with open(path, 'r') as f:
if b64:
return base64.b64encode(f.read())
return f.read()
@pytest.fixture
def atreal_openads(db):
return AtrealOpenads.objects.create(
slug = CONNECTOR_SLUG,
collectivite = COLLECTIVITE,
openADS_API_login = OPENADS_API_LOGIN,
openADS_API_password = OPENADS_API_PASSWORD,
openADS_API_url = OPENADS_API_URL
)
def test_openads_check_status(app, atreal_openads):
fake_resp_json = {
'message': 'Service online'
}
fake_resp = JsonResponse(fake_resp_json)
with mock.patch('passerelle.utils.Request.get') as requests_get:
requests_get.return_value = mock.Mock(content=fake_resp, status_code=200)
jresp = atreal_openads.check_status()
assert jresp['response'] == 200
def test_openads_create_dossier(app, atreal_openads):
fake_req_json = {
"fields": {
"autres_parcelles": True,
"cerfa": {
"content": get_file_data(TEST_FILE_CERFA_DIA),
"content_type": "application/pdf",
"field_id": "50",
"filename": os.path.basename(TEST_FILE_CERFA_DIA)
},
"code_postal": "13004",
"localite": "Marseille",
"nom": "Loulou",
"nom_voie": "Avenue de la Blaque",
"numero_voie": "52",
"plan_cadastral_1": {
"content": get_file_data(TEST_FILE_PLAN_CADASTRAL),
"content_type": "application/pdf",
"field_id": "113",
"filename": os.path.basename(TEST_FILE_PLAN_CADASTRAL)
},
"prenom": "Toto",
"proprietaire": "Oui",
"proprietaire_qualite": "Un particulier",
"proprietaire_qualite_raw": "Un particulier",
"proprietaire_raw": "Oui",
"reference_cadastrale": [
[
"999",
"Z",
"0010"
]
],
"references_cadastrales": [
[
"123",
"D",
"9874"
]
],
"terrain_code_postal": "13002",
"terrain_localite": "Marseille",
"terrain_nom_voie": "Boulevard de la Linule",
"terrain_numero_voie": "23"
}
}
req = HttpRequest()
req._body = json.dumps(fake_req_json)
req.path = '/test'
req.method = 'POST'
req.encoding = 'utf-8'
req.GET = QueryDict(mutable=True) # required because of encoding setter
req.POST = QueryDict(mutable=True) # required because of encoding setter
req.content_type = 'application/json'
req.content_params = None
req.COOKIES = {}
req.META = {}
req._read_started = False
fake_resp_json = {
'numero_dossier' : FAKE_NUMERO_DOSSIER,
'files': [{
'b64_content' : get_file_data(TEST_FILE_CERFA_DIA),
'content_type' : 'text/plain',
'filename' : 'recepisse_depot_%s.pdf' % FAKE_NUMERO_DOSSIER,
}]
}
fake_resp = Response()
fake_resp.status_code = 200
fake_resp.headers = {'Content-Type': 'application/json'}
fake_resp.encoding = 'utf-8'
fake_resp.reason = 'OK'
fake_resp._content = json.dumps(fake_resp_json)
with mock.patch('passerelle.utils.Request.post') as requests_post:
requests_post.return_value = fake_resp
jresp = atreal_openads.create_dossier(req, 'DIA')
assert jresp['numero_dossier'] == fake_resp_json['numero_dossier']
assert jresp['recepisse']['b64_content'] == fake_resp_json['files'][0]['b64_content']
assert jresp['recepisse']['content_type'] == 'application/pdf'
assert jresp['recepisse']['filename'] == fake_resp_json['files'][0]['filename']
job = Job.objects.filter(natural_id=FAKE_NUMERO_DOSSIER).last()
assert job
job_id = job.id
assert job.status == 'registered'
assert job.method_name == 'upload_user_files'
assert job.natural_id == FAKE_NUMERO_DOSSIER
assert job.parameters is not None
assert len(job.parameters) == 3
assert 'file_ids' in job.parameters
assert len(job.parameters['file_ids']) == 2
file_ids = job.parameters['file_ids']
FFs = ForwardFile.objects.filter(id__in=file_ids)
for FF in FFs:
assert len(FF.numero_demande) > 0
assert FF.numero_dossier == FAKE_NUMERO_DOSSIER
assert len(FF.file_hash) > 0
assert FF.upload_status == 'pending'
fake_resp_json = "You want add some files on %s " % FAKE_NUMERO_DOSSIER
fake_resp = Response()
fake_resp.status_code = 200
fake_resp.headers = {'Content-Type': 'application/json'}
fake_resp.encoding = 'utf-8'
fake_resp.reason = 'OK'
fake_resp._content = json.dumps(fake_resp_json)
with mock.patch('passerelle.utils.Request.post') as requests_post:
requests_post.return_value = fake_resp
atreal_openads.jobs()
job = Job.objects.get(id=job_id)
assert job.status == 'completed'
FFs = ForwardFile.objects.filter(id__in=file_ids)
for FF in FFs:
assert FF.upload_status == 'success'
def test_openads_get_dossier(app, atreal_openads):
fake_resp_json = {
'etat' : u"Non préemption en cours",
'date_depot' : "24/04/2019",
'date_decision' : "",
'decision' : "",
'date_limite_instruction': "24/06/2019"
}
fake_resp = Response()
fake_resp.status_code = 200
fake_resp.headers = {'Content-Type': 'application/json'}
fake_resp.encoding = 'utf-8'
fake_resp.reason = 'OK'
fake_resp._content = json.dumps(fake_resp_json)
with mock.patch('passerelle.utils.Request.get') as requests_get:
requests_get.return_value = fake_resp
jresp = atreal_openads.get_dossier(None, 'DIA', FAKE_NUMERO_DOSSIER)
assert jresp['etat'] == fake_resp_json['etat']
assert jresp['date_depot'] == fake_resp_json['date_depot']
assert jresp['date_decision'] == fake_resp_json['date_decision']
assert jresp['decision'] == fake_resp_json['decision']
assert jresp['date_limite_instruction'] == fake_resp_json['date_limite_instruction']
fake_resp_json = {
'errors' : [{
'location' : 'path',
'name' : 'Invalid Type',
'description' : '"invalid_type" is not one of DIA, PC, DP, AT, PD'
}]
}
fake_resp = Response()
fake_resp.status_code = 404
fake_resp.headers = {'Content-Type': 'application/json'}
fake_resp.encoding = 'utf-8'
fake_resp.reason = 'Resource not found'
fake_resp._content = json.dumps(fake_resp_json)
with pytest.raises(APIError) as e:
with mock.patch('passerelle.utils.Request.get') as requests_get:
requests_get.return_value = fake_resp
jresp = atreal_openads.get_dossier(None, 'invalid_type', FAKE_NUMERO_DOSSIER)
assert re.search(r'^HTTP error: 404, \[path\] \(Invalid Type\) "invalid_type" is not one of DIA, PC, DP, AT, PD$', str(e.value))
def test_openads_upload2ForwardFile(app, atreal_openads):
FF = atreal_openads.upload2ForwardFile(None, None)
assert FF is None
FF = atreal_openads.upload2ForwardFile(TEST_FILE_CERFA_DIA, FAKE_NUMERO_DOSSIER)
assert isinstance(FF, ForwardFile)
assert len(FF.numero_demande) > 0
assert FF.numero_dossier == FAKE_NUMERO_DOSSIER
assert FF.type_fichier == 'CERFA'
assert FF.orig_filename == os.path.basename(TEST_FILE_CERFA_DIA)
assert FF.content_type == 'application/pdf'
assert len(FF.file_hash) > 0
assert isinstance(FF.upload_file, File)
assert FF.upload_status == 'pending'
FF = atreal_openads.upload2ForwardFile(TEST_FILE_PLAN_CADASTRAL, FAKE_NUMERO_DOSSIER, 'plan')
assert isinstance(FF, ForwardFile)
assert len(FF.numero_demande) > 0
assert FF.numero_dossier == FAKE_NUMERO_DOSSIER
assert FF.type_fichier == 'plan'
assert FF.orig_filename == os.path.basename(TEST_FILE_PLAN_CADASTRAL)
assert FF.content_type == 'application/pdf'
assert len(FF.file_hash) > 0
assert isinstance(FF.upload_file, File)
assert FF.upload_status == 'pending'
def test_openads_get_fwd_files_status(app, atreal_openads):
with pytest.raises(Http404) as e:
resp404 = atreal_openads.get_fwd_files_status(None, FAKE_NUMERO_DOSSIER, fichier_id=18, summary=None)
assert re.search(r"^No file matches 'numero_dossier=[^']+' and 'id=[^']+'.$", str(e.value))
resp404 = atreal_openads.get_fwd_files_status(None, FAKE_NUMERO_DOSSIER, fichier_id=None, summary=None)
assert resp404 is not None
assert len(resp404) == 0
FF = atreal_openads.upload2ForwardFile(TEST_FILE_CERFA_DIA, FAKE_NUMERO_DOSSIER)
FF.save()
assert isinstance(FF, ForwardFile)
jresp = atreal_openads.get_fwd_files_status(None, FAKE_NUMERO_DOSSIER, fichier_id=None, summary=None)
assert jresp is not None
assert len(jresp) == 1
assert jresp[0]['id'] == FF.id
for k in ['numero_dossier', 'type_fichier', 'file_hash', 'orig_filename', 'content_type', 'upload_status', 'upload_msg']:
assert jresp[0][k] == getattr(FF, k)
assert jresp[0]['b64_content'] == get_file_data(FF.upload_file.path)
assert jresp[0]['last_update_datetime'] == FF.last_update_datetime
jresp = atreal_openads.get_fwd_files_status(None, FAKE_NUMERO_DOSSIER, fichier_id=FF.id, summary=None)
assert jresp is not None
assert len(jresp) == 1
assert jresp[0]['id'] == FF.id
for k in ['numero_dossier', 'type_fichier', 'file_hash', 'orig_filename', 'content_type', 'upload_status', 'upload_msg']:
assert jresp[0][k] == getattr(FF, k)
assert jresp[0]['b64_content'] == get_file_data(FF.upload_file.path)
assert jresp[0]['last_update_datetime'] == FF.last_update_datetime
jresp = atreal_openads.get_fwd_files_status(None, FAKE_NUMERO_DOSSIER, fichier_id=None, summary='1')
assert jresp is not None
assert jresp['all_forwarded'] == False
status_msg = '[%s] %s => %s' % (FF.id, FF.orig_filename, FF.upload_msg)
assert len(jresp['pending']) == 1
assert jresp['pending'][0] == status_msg
assert len(jresp['uploading']) == 0
assert len(jresp['success']) == 0
assert len(jresp['failed']) == 0
jresp = atreal_openads.get_fwd_files_status(None, FAKE_NUMERO_DOSSIER, fichier_id=FF.id, summary='1')
assert jresp is not None
assert jresp['all_forwarded'] == False
status_msg = '[%s] %s => %s' % (FF.id, FF.orig_filename, FF.upload_msg)
assert len(jresp['pending']) == 1
assert jresp['pending'][0] == status_msg
assert len(jresp['uploading']) == 0
assert len(jresp['success']) == 0
assert len(jresp['failed']) == 0
def test_openads_get_courrier(app, atreal_openads):
fake_resp_json = {
'files': [{
'filename' : "instruction_4.pdf",
'content_type' : "text/plain",
'b64_content' : get_file_data(TEST_FILE_CERFA_DIA)
}]
}
fake_resp = Response()
fake_resp.status_code = 200
fake_resp.headers = {'Content-Type': 'application/json'}
fake_resp.encoding = 'utf-8'
fake_resp.reason = 'OK'
fake_resp._content = json.dumps(fake_resp_json)
with mock.patch('passerelle.utils.Request.get') as requests_get:
requests_get.return_value = fake_resp
jresp = atreal_openads.get_courrier(None, 'DIA', FAKE_NUMERO_DOSSIER)
assert jresp['courrier']['filename'] == fake_resp_json['files'][0]['filename']
assert jresp['courrier']['content_type'] == fake_resp_json['files'][0]['content_type']
assert jresp['courrier']['b64_content'] == fake_resp_json['files'][0]['b64_content']
def test_openads_upload_user_files(app, atreal_openads):
FF = atreal_openads.upload2ForwardFile(TEST_FILE_CERFA_DIA, FAKE_NUMERO_DOSSIER)
FF.save()
assert isinstance(FF, ForwardFile)
assert FF.upload_status == 'pending'
file_id = FF.id
assert file_id
fake_resp_json = "You want add some files on %s " % FAKE_NUMERO_DOSSIER
fake_resp = Response()
fake_resp.status_code = 200
fake_resp.headers = {'Content-Type': 'application/json'}
fake_resp.encoding = 'utf-8'
fake_resp.reason = 'OK'
fake_resp._content = json.dumps(fake_resp_json)
with mock.patch('passerelle.utils.Request.post') as requests_post:
requests_post.return_value = fake_resp
atreal_openads.upload_user_files('DIA', FAKE_NUMERO_DOSSIER, file_ids=[file_id])
FFup = ForwardFile.objects.get(id=file_id)
assert isinstance(FFup, ForwardFile)
for k in ['numero_dossier', 'type_fichier', 'file_hash', 'orig_filename', 'content_type']:
assert getattr(FFup, k) == getattr(FF, k)
assert FFup.upload_status == 'success'