passerelle/tests/test_mdel_ddpacs.py

167 lines
6.0 KiB
Python

# Passerelle - uniform access to data and services
# Copyright (C) 2019 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a.deepcopy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import io
import logging
import os
import pytest
import xmlschema
import tests.utils
from passerelle.apps.mdel_ddpacs.models import Demand, Resource
from passerelle.utils import json, sftp
from passerelle.utils.zip import ZipTemplate, diff_zip
def build_response_zip(**kwargs):
zip_template = ZipTemplate(os.path.abspath('tests/data/mdel_ddpacs/response_manifest.json'), ctx=kwargs)
return zip_template.name, zip_template.render_to_bytes()
@pytest.fixture(autouse=True)
def resource(db):
return tests.utils.setup_access_rights(
Resource.objects.create(
slug='test',
code_insee='66666',
recipient_siret='999999',
recipient_service='SERVICE',
recipient_guichet='GUICHET',
)
)
@pytest.fixture
def ddpacs_payload():
schema = Resource.get_doc_xml_schema()
d = xmlschema.to_dict('tests/data/pacs-doc.xml', schema=schema.xml_schema)
return json.flatten({'PACS': d})
def test_create_demand(app, resource, ddpacs_payload, freezer, sftpserver, caplog):
# paramiko log socket errors when connection is closed :/
caplog.set_level(logging.WARNING, 'paramiko.transport')
freezer.move_to('2019-01-01')
# Push new demand
payload = {
'display_id': '1-1',
}
payload.update(ddpacs_payload)
assert Demand.objects.count() == 0
assert resource.jobs_set().count() == 0
resp = app.post_json('/mdel-ddpacs/test/create?raise=1', params=payload)
assert resp.json['err'] == 0
assert resp.json['status'] == 'pending'
assert Demand.objects.count() == 1
assert resource.jobs_set().count() == 1
url = resp.json['url']
zip_url = resp.json['zip_url']
# Check demand status URL
status = app.get(url)
assert status.json['err'] == 0
assert status.json == resp.json
# Check demand document URL
zip_document = app.get(zip_url)
with io.BytesIO(zip_document.body) as fd:
differences = diff_zip('tests/data/mdel_ddpacs_expected.zip', fd)
assert not differences, differences
# Check job is skipped as no SFTP is configured
assert resource.jobs_set().get().after_timestamp is None
resource.jobs()
assert resource.jobs_set().get().after_timestamp is not None
assert resource.jobs_set().exclude(status='completed').count() == 1
with sftpserver.serve_content({'input': {}, 'output': {}}):
content = sftpserver.content_provider.content_object
resource.outgoing_sftp = sftp.SFTP(
'sftp://john:doe@{server.host}:{server.port}/output/'.format(server=sftpserver)
)
resource.save()
resource.jobs()
assert not content['output']
# Jump over the 6 hour wait time for retry
freezer.move_to('2019-01-02')
resource.jobs()
assert 'A-1-1-depotDossierPACS-1.zip' in content['output']
# Check it's the same document than through the zip_url
with open('/tmp/zip.zip', 'wb') as fd:
fd.write(content['output']['A-1-1-depotDossierPACS-1.zip'])
with io.BytesIO(content['output']['A-1-1-depotDossierPACS-1.zip']) as fd:
differences = diff_zip('tests/data/mdel_ddpacs_expected.zip', fd)
assert not differences, differences
# Act as if zip was consumed
content['output'] = {}
# Jump over the 6 hour wait time for retry
freezer.move_to('2019-01-03')
resource.jobs()
assert not content['output']
assert resource.jobs_set().exclude(status='completed').count() == 0
# Check response
resource.hourly()
resource.incoming_sftp = sftp.SFTP(
'sftp://john:doe@{server.host}:{server.port}/input/'.format(server=sftpserver)
)
resource.save()
response_name, response_content = build_response_zip(
reference='A-1-1',
flow_type='depotDossierPACS',
step=1,
old_step=1,
etat=100,
commentaire='coucou',
)
content['input'][response_name] = response_content
resource.hourly()
assert resource.demand_set.get().status == 'closed'
assert response_name not in content['input']
response_name, response_content = build_response_zip(
reference='A-1-1', flow_type='depotDossierPACS', step=1, old_step=1, etat=1, commentaire='coucou'
)
content['input'][response_name] = response_content
resource.hourly()
assert 'unexpected file "A-1-1-depotDossierPACS-1.zip"' in caplog.messages[-1]
assert 'step 1 is inferior' in caplog.messages[-1]
resource.check_status()
def test_create_demand_double(app, resource, ddpacs_payload, freezer, sftpserver, caplog):
# paramiko log socket errors when connection is closed :/
caplog.set_level(logging.CRITICAL, 'paramiko.transport')
freezer.move_to('2019-01-01')
# Push new demand
payload = {'display_id': '1-1', 'PACS/convention/conventionType/aideMaterielMontant': 'None'}
ddpacs_payload.update(payload)
assert Demand.objects.count() == 0
assert resource.jobs_set().count() == 0
resp = app.post_json('/mdel-ddpacs/test/create?raise=1', params=ddpacs_payload)
assert resp.json['err'] == 0
assert resp.json['status'] == 'pending'
assert Demand.objects.count() == 1
assert resource.jobs_set().count() == 1