202 lines
7.4 KiB
Python
202 lines
7.4 KiB
Python
# tests/test_gesbac.py
|
|
# Copyright (C) 2019 Entr'ouvert
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify it
|
|
# under the terms of the GNU Affero General Public License as published
|
|
# by the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import pytest
|
|
from django.utils.encoding import force_bytes, force_str
|
|
from django.utils.timezone import now
|
|
|
|
from passerelle.apps.gesbac.models import Gesbac
|
|
from passerelle.utils import SFTP
|
|
from tests.utils import make_resource
|
|
|
|
|
|
@pytest.fixture
|
|
def resource(db, sftpserver):
|
|
return make_resource(
|
|
Gesbac,
|
|
slug='test',
|
|
title='Gesbac',
|
|
description='gesbac',
|
|
outcoming_sftp=SFTP('sftp://foo:bar@{server.host}:{server.port}/output/'.format(server=sftpserver)),
|
|
incoming_sftp=SFTP('sftp://foo:bar@{server.host}:{server.port}/input/'.format(server=sftpserver)),
|
|
output_files_prefix='output-',
|
|
input_files_prefix='input-',
|
|
)
|
|
|
|
|
|
def test_check_status(app, resource, sftpserver):
|
|
with sftpserver.serve_content({'input': {'test': 'content'}, 'output': {'file': 'content'}}):
|
|
resource.check_status()
|
|
|
|
|
|
def test_create_demand(app, resource, freezer, sftpserver):
|
|
assert resource.form_set.count() == 0
|
|
timestamp = now()
|
|
payload = {
|
|
'form_id': '42-42',
|
|
'demand_date': timestamp.strftime('%Y%m%d'),
|
|
'demand_time': timestamp.strftime('%H%M%S'),
|
|
'producer_code': 1,
|
|
'city_insee_code': '75114',
|
|
'street_name': 'Château',
|
|
'street_rivoli_code': 'xxxx',
|
|
'producer_social_reason': 'SCOP',
|
|
'producer_last_name': 'Bar',
|
|
'producer_first_name': 'Foo',
|
|
'producer_email': 'foo@example.com',
|
|
'owner_last_name': 'Bar',
|
|
'owner_first_name': 'Foo',
|
|
'owner_email': 'foo@example.com',
|
|
'family_members_number': 5,
|
|
'houses_number': 1,
|
|
'card_type': 1,
|
|
'card_subject': 1,
|
|
'card_demand_reason': 1,
|
|
'card_demand_purpose': 1,
|
|
'cards_quantity': 1,
|
|
}
|
|
response = app.post_json('/gesbac/test/create-demand/', params=payload)
|
|
assert resource.form_set.filter(status='new').count() == 1
|
|
form = resource.form_set.get(status='new')
|
|
assert len(form.demand_data) == 2
|
|
expected_filename = '%s%s-%s.csv' % (
|
|
resource.output_files_prefix,
|
|
timestamp.strftime('%y%m%d-%H%M%S'),
|
|
form.get_gesbac_id(),
|
|
)
|
|
assert response.json['data']['filename'] == expected_filename
|
|
assert response.json['data']['gesbac_id'] == '4204200'
|
|
assert resource.form_set.filter(status='new').count() == 1
|
|
with sftpserver.serve_content({'output': {expected_filename: 'content'}}):
|
|
resource.jobs()
|
|
assert resource.form_set.filter(status='sent').count() == 1
|
|
|
|
payload['owner_email'] = 'bar@example.com'
|
|
response = app.post_json('/gesbac/test/create-demand/', params=payload)
|
|
assert resource.form_set.filter(status='new').count() == 1
|
|
form = resource.form_set.get(status='new')
|
|
assert len(form.demand_data) == 2
|
|
expected_filename = '%s%s-%s.csv' % (
|
|
resource.output_files_prefix,
|
|
timestamp.strftime('%y%m%d-%H%M%S'),
|
|
form.get_gesbac_id(),
|
|
)
|
|
assert response.json['data']['filename'] == expected_filename
|
|
assert response.json['data']['gesbac_id'] == '4204201'
|
|
with sftpserver.serve_content({'output': {response.json['data']['filename']: 'content'}}):
|
|
resource.jobs()
|
|
assert resource.form_set.filter(status='sent').count() == 2
|
|
|
|
|
|
def test_demand_creation_limit(app, resource, freezer):
|
|
timestamp = now()
|
|
payload = {
|
|
'form_id': '42-44',
|
|
'demand_date': timestamp.strftime('%Y%m%d'),
|
|
'demand_time': timestamp.strftime('%H%M%S'),
|
|
'producer_code': 1,
|
|
'city_insee_code': '75114',
|
|
'street_name': 'Château',
|
|
'street_rivoli_code': 'xxxx',
|
|
'producer_social_reason': 'SCOP',
|
|
'producer_last_name': 'Bar',
|
|
'producer_first_name': 'Foo',
|
|
'producer_email': 'foo@example.com',
|
|
'owner_last_name': 'Bar',
|
|
'owner_first_name': 'Foo',
|
|
'owner_email': 'foo@example.com',
|
|
'family_members_number': 5,
|
|
'houses_number': 1,
|
|
'card_type': 1,
|
|
'card_subject': 1,
|
|
'card_demand_reason': 1,
|
|
'card_demand_purpose': 1,
|
|
'cards_quantity': 1,
|
|
}
|
|
for i in range(20):
|
|
response = app.post_json('/gesbac/test/create-demand/', params=payload)
|
|
assert response.json['err'] == 0
|
|
|
|
response = app.post_json('/gesbac/test/create-demand/', params=payload)
|
|
assert response.json['err'] == 1
|
|
|
|
|
|
def test_get_demand_response(app, resource, freezer, sftpserver):
|
|
response = app.get('/gesbac/test/get-response/', params={'gesbac_id': '42043'}, status=404)
|
|
timestamp = now()
|
|
payload = {
|
|
'form_id': '42-43',
|
|
'demand_date': timestamp.strftime('%Y%m%d'),
|
|
'demand_time': timestamp.strftime('%H%M%S'),
|
|
'producer_code': 1,
|
|
'city_insee_code': '75114',
|
|
'street_name': 'Château',
|
|
'street_rivoli_code': 'xxxx',
|
|
'producer_social_reason': 'SCOP',
|
|
'producer_last_name': 'Bar',
|
|
'producer_first_name': 'Foo',
|
|
'producer_email': 'foo@example.com',
|
|
'owner_last_name': 'Bar',
|
|
'owner_first_name': 'Foo',
|
|
'owner_email': 'foo@example.com',
|
|
'family_members_number': 5,
|
|
'houses_number': 1,
|
|
'card_type': 1,
|
|
'card_subject': 1,
|
|
'card_demand_reason': 1,
|
|
'card_demand_purpose': 1,
|
|
'cards_quantity': 1,
|
|
}
|
|
response = app.post_json('/gesbac/test/create-demand/', params=payload)
|
|
data = response.json['data']
|
|
with sftpserver.serve_content({'output': {data['filename']: 'content'}, 'input': {}}):
|
|
resource.jobs()
|
|
assert resource.form_set.filter(status='closed').count() == 0
|
|
assert resource.form_set.filter(status='sent').count() == 1
|
|
|
|
data = resource.form_set.get(status='sent')
|
|
gesbac_id = data.get_gesbac_id()
|
|
response_filename = '%s91001-090300-%s.csv' % (resource.input_files_prefix, gesbac_id)
|
|
assert resource.form_set.filter(form_id='42-43', filename=response_filename, status='closed').count() == 0
|
|
|
|
# files are encoded in latin-1
|
|
comment = force_str('propriétaire')
|
|
content = force_bytes(force_str('CARTE;%s;3;2;1234;;;;;;;;%s' % (gesbac_id, comment)), 'latin-1')
|
|
with sftpserver.serve_content({'input': {response_filename: content}}):
|
|
resource.hourly()
|
|
|
|
assert resource.form_set.filter(form_id='42-43', status='closed').count() == 1
|
|
response = resource.form_set.get(status='closed')
|
|
assert response.card_data == ['CARTE', gesbac_id, '3', '2', '1234', '', '', '', '', '', '', '', comment]
|
|
|
|
response = app.get('/gesbac/test/get-response/', params={'gesbac_id': gesbac_id})
|
|
assert response.json['err'] == 0
|
|
assert response.json['data'] == [
|
|
'CARTE',
|
|
gesbac_id,
|
|
'3',
|
|
'2',
|
|
'1234',
|
|
'',
|
|
'',
|
|
'',
|
|
'',
|
|
'',
|
|
'',
|
|
'',
|
|
comment,
|
|
]
|