passerelle/tests/test_family.py

567 lines
20 KiB
Python

import logging
import os
import shutil
import zipfile
from uuid import uuid4
import pytest
from django.contrib.contenttypes.models import ContentType
from django.core.exceptions import ValidationError
from django.core.files import File
from django.core.files.storage import default_storage
from django.core.management import call_command
from django.core.management.base import CommandError
from django.urls import reverse
from django.utils import timezone
from django.utils.http import urlencode
from passerelle.apps.family.models import (
DATETIME_FORMAT,
Adult,
Child,
Family,
FamilyLink,
GenericFamily,
Invoice,
)
from passerelle.base.models import AccessRight, ApiUser
pytestmark = pytest.mark.django_db
nameid = 'foobarnameid'
API_KEY = 'family'
@pytest.fixture
def resource(db):
filepath = os.path.join(os.path.dirname(__file__), 'data', 'family_data.zip')
with open(filepath, 'rb') as fd:
resource = GenericFamily.objects.create(
title='test', slug='test', archive=File(fd, 'family_data.zip')
)
api = ApiUser.objects.create(username='family', keytype='API', key=API_KEY)
obj_type = ContentType.objects.get_for_model(GenericFamily)
AccessRight.objects.create(
codename='can_access', apiuser=api, resource_type=obj_type, resource_pk=resource.pk
)
return resource
def test_link_to_family(app, resource):
r = app.get(
reverse(
'generic-endpoint',
kwargs={'connector': 'family', 'slug': resource.slug, 'endpoint': 'family', 'rest': 'link/'},
),
params={'NameID': nameid, 'login': '9407', 'password': 'gkh0UrrH', 'apikey': API_KEY},
)
assert r.json['data']
assert FamilyLink.objects.filter(resource=resource, name_id=nameid).exists()
def test_unlink_from_family(app, resource):
r = app.get(
reverse(
'generic-endpoint',
kwargs={'connector': 'family', 'slug': resource.slug, 'endpoint': 'family', 'rest': 'link/'},
),
params={'NameID': nameid, 'login': '1364', 'password': 'Li6LN1ID', 'apikey': API_KEY},
)
assert r.json['data']
r = app.get(
reverse(
'generic-endpoint',
kwargs={'connector': 'family', 'slug': resource.slug, 'endpoint': 'family', 'rest': 'unlink/'},
),
params={'NameID': nameid, 'apikey': API_KEY},
)
assert r.json['data']
assert not FamilyLink.objects.filter(resource=resource, name_id=nameid).exists()
def test_family_infos(app, resource):
r = app.get(
reverse(
'generic-endpoint',
kwargs={'connector': 'family', 'slug': resource.slug, 'endpoint': 'family', 'rest': 'link/'},
),
params={'NameID': nameid, 'login': '9407', 'password': 'gkh0UrrH', 'apikey': API_KEY},
)
assert r.json['data']
r = app.get(
reverse(
'generic-endpoint', kwargs={'connector': 'family', 'slug': resource.slug, 'endpoint': 'family/'}
),
params={'NameID': nameid, 'apikey': API_KEY},
)
data = r.json['data']
assert data['id']
assert data['adults']
assert data['children']
assert data['street_number'] is not None
assert data['street_name'] is not None
assert data['zipcode'] is not None
assert data['address_complement'] is not None
assert data['city'] is not None
assert data['address'] is not None
assert data['quotient'] == '1370.50'
def test_family_members(app, resource):
r = app.get(
reverse(
'generic-endpoint',
kwargs={'connector': 'family', 'slug': resource.slug, 'endpoint': 'family', 'rest': 'link/'},
),
params={'NameID': nameid, 'login': '23762', 'password': 's6HliUMX', 'apikey': API_KEY},
)
assert r.json['data']
for typ in ('adults', 'children'):
r = app.get(
reverse(
'generic-endpoint',
kwargs={
'connector': 'family',
'slug': resource.slug,
'endpoint': 'family',
'rest': '%s/' % typ,
},
),
params={'NameID': nameid, 'apikey': API_KEY},
)
data = r.json['data']
assert len(data)
for person in data:
assert person['id']
assert person['first_name']
assert person['last_name']
assert person['street_number'] is not None
assert person['street_name'] is not None
assert person['zipcode'] is not None
assert person['address_complement'] is not None
assert person['city'] is not None
assert person['address'] is not None
r = app.get(
reverse(
'generic-endpoint',
kwargs={
'connector': 'family',
'slug': resource.slug,
'endpoint': 'family',
'rest': '%s/' % typ,
},
),
params={'NameID': 'unknown', 'apikey': API_KEY},
)
data = r.json['data']
assert len(data) == 0
def test_get_family_invoices(app, resource):
r = app.get(
reverse(
'generic-endpoint',
kwargs={'connector': 'family', 'slug': resource.slug, 'endpoint': 'family', 'rest': 'link/'},
),
params={'NameID': nameid, 'login': '19184', 'password': '8xUhrK6e', 'apikey': API_KEY},
)
r = app.get(
reverse(
'generic-endpoint',
kwargs={'connector': 'family', 'slug': resource.slug, 'endpoint': 'regie', 'rest': 'invoices/'},
),
params={'NameID': nameid, 'apikey': API_KEY},
)
assert r.json['data']
for i in r.json['data']:
assert not i['paid']
r = app.get(
reverse(
'generic-endpoint',
kwargs={
'connector': 'family',
'slug': resource.slug,
'endpoint': 'regie',
'rest': 'invoices/history/',
},
),
params={'NameID': nameid, 'apikey': API_KEY},
)
assert r.json['data']
for i in r.json['data']:
assert i['paid']
assert i['has_pdf']
def test_get_family_invoice(app, resource):
r = app.get(
reverse(
'generic-endpoint',
kwargs={'connector': 'family', 'slug': resource.slug, 'endpoint': 'family', 'rest': 'link/'},
),
params={'NameID': nameid, 'login': '19184', 'password': '8xUhrK6e', 'apikey': API_KEY},
)
r = app.get(
reverse(
'generic-endpoint',
kwargs={'connector': 'family', 'slug': resource.slug, 'endpoint': 'regie', 'rest': 'invoices/'},
),
params={'NameID': nameid, 'apikey': API_KEY},
)
assert r.json['data']
invoice = r.json['data'][0]
r = app.get(
reverse(
'generic-endpoint',
kwargs={
'connector': 'family',
'slug': resource.slug,
'endpoint': 'regie',
'rest': 'invoice/%s/' % invoice['label'],
},
),
params={'apikey': API_KEY},
)
assert r.json['data']
for field in ('label', 'amount', 'paid', 'created', 'pay_limit_date'):
assert r.json['data'][field] == invoice[field]
assert r.json['data']['reference_id'] == r.json['data']['label']
def test_get_invoice_pdf(app, resource):
r = app.get(
reverse(
'generic-endpoint',
kwargs={'connector': 'family', 'slug': resource.slug, 'endpoint': 'family', 'rest': 'link/'},
),
params={'NameID': nameid, 'login': '11959', 'password': '1WI6JOux', 'apikey': API_KEY},
)
r = app.get(
reverse(
'generic-endpoint',
kwargs={
'connector': 'family',
'slug': resource.slug,
'endpoint': 'regie',
'rest': 'invoices/history/',
},
),
params={'NameID': nameid, 'apikey': API_KEY},
)
invoice = r.json['data'][0]
assert invoice
r = app.get(
reverse(
'generic-endpoint',
kwargs={
'connector': 'family',
'slug': resource.slug,
'endpoint': 'regie',
'rest': 'invoice/%s/pdf/' % invoice['label'],
},
),
params={'apikey': API_KEY},
)
assert 'Content-Type' in r.headers
assert 'Content-Disposition' in r.headers
assert r.headers['Content-Type'] == 'application/pdf'
assert r.headers['Content-Disposition'] == 'attachment; filename=%s.pdf' % invoice['label']
def test_pay_invoice(app, resource):
r = app.get(
reverse(
'generic-endpoint',
kwargs={'connector': 'family', 'slug': resource.slug, 'endpoint': 'family', 'rest': 'link/'},
),
params={'NameID': nameid, 'login': '19184', 'password': '8xUhrK6e', 'apikey': API_KEY},
)
r = app.get(
reverse(
'generic-endpoint',
kwargs={'connector': 'family', 'slug': resource.slug, 'endpoint': 'regie', 'rest': 'invoices/'},
),
params={'NameID': nameid},
)
invoice = r.json['data'][0]
payment_url = reverse(
'generic-endpoint',
kwargs={
'connector': 'family',
'slug': resource.slug,
'endpoint': 'regie',
'rest': 'invoice/%s/pay/' % invoice['label'],
},
)
r = app.post_json(
payment_url + '?' + urlencode({'apikey': API_KEY}),
params={'transaction_id': str(uuid4()), 'transaction_date': timezone.now().strftime(DATETIME_FORMAT)},
)
assert r.json['data']
r = app.get(
reverse(
'generic-endpoint',
kwargs={
'connector': 'family',
'slug': resource.slug,
'endpoint': 'regie',
'rest': 'invoice/%s/' % invoice['label'],
},
),
params={'apikey': API_KEY},
)
assert r.json['data']['paid']
assert float(r.json['data']['amount']) == 0
def test_fondettes_concerto_loader():
Invoice.objects.all().delete()
filepath = os.path.join(os.path.dirname(__file__), 'data', 'family_data_fondettes.zip')
with open(filepath, 'rb') as fd:
resource = GenericFamily.objects.create(
title='test fondettes',
slug='test-fondettes',
archive=File(fd, 'family_data_fondettes.zip'),
file_format='concerto_fondettes',
)
assert Invoice.objects.filter(resource=resource).count() == 630
assert len([x for x in Invoice.objects.filter(resource=resource) if x.has_pdf]) == 4
assert Invoice.objects.filter(paid=True).count() == 312
assert Invoice.objects.filter(paid=False).count() == 318
assert Invoice.objects.filter(online_payment=False).count() == 2
def test_fondettes_opus_loader():
Invoice.objects.all().delete()
filepath = os.path.join(os.path.dirname(__file__), 'data', 'family_data_opus_fondettes.zip')
with open(filepath, 'rb') as fd:
resource = GenericFamily.objects.create(
title='test fondettes',
slug='test-fondettes',
archive=File(fd, 'family_data_opus_fondettes.zip'),
file_format='opus_fondettes',
)
assert Invoice.objects.filter(resource=resource).count() == 630
assert len([x for x in Invoice.objects.filter(resource=resource) if x.has_pdf]) == 4
assert Invoice.objects.filter(paid=True).count() == 314
assert Invoice.objects.filter(paid=False).count() == 316
def test_archive_validation():
filepath = os.path.join(os.path.dirname(__file__), 'data', 'soap.wsdl')
with open(filepath) as fd:
resource = GenericFamily.objects.create(title='test', slug='test', archive=File(fd, 'soap.wsdl'))
with pytest.raises(ValidationError):
resource.clean()
def test_fondettes_concerto_validation():
filepath = os.path.join(os.path.dirname(__file__), 'data', 'family_data_fondettes.zip')
with open(filepath, 'rb') as fd:
resource = GenericFamily.objects.create(
title='test fondettes',
slug='test fondettes',
archive=File(fd, 'family_data_fondettes.zip'),
file_format='concerto_fondettes',
)
resource.clean()
filepath = os.path.join(os.path.dirname(__file__), 'data', 'family_data.zip')
with open(filepath) as fd:
resource.archive = File(fd, 'family_data.zip')
with pytest.raises(ValidationError):
resource.clean()
def test_orleans_concerto_loader():
# all related objects will also be deleted
Family.objects.all().delete()
filepath = os.path.join(os.path.dirname(__file__), 'data', 'orleans', 'family_data_orleans.zip')
with open(filepath, 'rb') as fd:
resource = GenericFamily(
title='test orleans',
slug='test-orleans',
archive=File(fd, 'family_data_orleans.zip'),
file_format='concerto_orleans',
)
from passerelle.apps.family.loaders.concerto_orleans import Loader
loader = Loader(resource)
with zipfile.ZipFile(filepath) as z:
loader.archive = z
families = loader.build_families()
assert len(families) == 18
for family in families.values():
assert family['external_id']
assert family['adults']
assert family['login']
assert family['password']
assert family['zipcode']
assert family['city']
assert len(family['adults']) > 0
for adult in family['adults']:
assert adult['first_name']
assert adult['last_name']
assert adult['sex']
assert adult['external_id']
assert adult['zipcode']
assert adult['city']
assert len(family['children']) >= 1
for child in family['children']:
assert child['external_id']
assert child['first_name']
assert child['last_name']
assert child['sex']
assert 'birthdate' in child
assert 'invoices' in family
if family['invoices']:
for invoice in family['invoices']:
assert invoice['external_id']
assert invoice['label']
assert invoice['issue_date']
assert invoice['online_payment']
assert 'autobilling' in invoice
assert 'amount' in invoice
assert 'total_amount' in invoice
# there are 4 families with invoices in test data
assert len([f for f in families.values() if f['invoices']]) == 4
# and 14 families with no invoices
assert len([f for f in families.values() if not f['invoices']]) == 14
resource.save()
assert Family.objects.filter(resource=resource).count() == 18
assert Adult.objects.all().count() == 31
assert Child.objects.all().count() == 35
assert Invoice.objects.filter(resource=resource).count() == 0
def test_orleans_data_import_command():
with pytest.raises(CommandError) as error:
call_command('import_orleans_data')
assert str(error.value) == 'File exports_prcit.zip does not exist.'
cur_dir = os.path.dirname(__file__)
data_dir = os.path.join(cur_dir, 'data', 'orleans')
with open(os.path.join(data_dir, 'family_data_orleans.zip'), 'rb') as fd:
resource = GenericFamily.objects.create(
title='test orleans',
archive=File(fd, 'family_data_orleans.zip'),
slug='test-orleans',
file_format='concerto_orleans',
)
# cleanup data before launching import
Family.objects.filter(resource=resource).delete()
Invoice.objects.filter(resource=resource).delete()
call_command('import_orleans_data', archive_file=os.path.join(data_dir, 'family_data_orleans.zip'))
assert Family.objects.filter(resource=resource).count() == 0
assert Invoice.objects.filter(resource=resource).count() == 0
resource_invoices_dir = default_storage.path('family-%s/invoices' % resource.id)
# cleanup previosly created invoices dir, if exists
if os.path.exists(resource_invoices_dir) and os.path.isdir(resource_invoices_dir):
shutil.rmtree(resource_invoices_dir)
os.symlink(os.path.join(data_dir, 'factures'), resource_invoices_dir)
call_command(
'import_orleans_data',
archive_file=os.path.join(data_dir, 'family_data_orleans.zip'),
connector='test-orleans',
)
os.unlink(resource_invoices_dir)
assert Family.objects.filter(resource=resource).count() == 18
assert Adult.objects.all().count() == 31
assert Child.objects.all().count() == 35
assert Invoice.objects.filter(resource=resource).count() == 7
def test_family_pending_invoices_by_nameid_with_no_links():
test_orleans_data_import_command()
resource = GenericFamily.objects.get()
links = resource.get_pending_invoices_by_nameid(None)
assert links['data'] == {}
def test_family_pending_invoices_by_nameid():
test_orleans_data_import_command()
resource = GenericFamily.objects.get()
family = Family.objects.get(external_id='22380')
FamilyLink.objects.create(resource=resource, family=family, name_id='testnameid1')
family = Family.objects.get(external_id='1228')
FamilyLink.objects.create(resource=resource, family=family, name_id='testnameid2')
links = resource.get_pending_invoices_by_nameid(None)
assert len(links['data']) == 2
for uuid, invoices in links['data'].items():
assert uuid in ('testnameid1', 'testnameid2')
assert len(invoices) >= 1
def test_incorrect_orleans_data(caplog):
filepath = os.path.join(os.path.dirname(__file__), 'data', 'family_incorrect_data_orleans.zip')
with open(filepath, 'rb') as fd:
GenericFamily.objects.create(
title='test orleans',
slug='test-orleans',
archive=File(fd, 'family_incorrect_data_orleans.zip'),
file_format='concerto_orleans',
)
for record in caplog.records:
assert 'Error occured while importing data:' in record.message
assert record.name == 'passerelle.resource.family.test-orleans'
assert record.levelno == logging.ERROR
def test_egee_thonon_loader():
Invoice.objects.all().delete()
filepath = os.path.join(os.path.dirname(__file__), 'data', 'family_data_egee_thonon.zip')
with open(filepath, 'rb') as fd:
resource = GenericFamily.objects.create(
title='test-egee-thonon',
slug='test-egee-thonon',
archive=File(fd, 'family_data_egee_thonon.zip'),
file_format='egee_thonon',
)
assert Invoice.objects.filter(resource=resource).count() == 4
for invoice in Invoice.objects.all():
assert len(invoice.external_id) == 13
assert invoice.label == invoice.external_id
assert invoice.online_payment is True
assert invoice.no_online_payment_reason is None
assert invoice.paid is False
assert (invoice.pay_limit_date - invoice.issue_date).days == 62
# modify invoice list, and reimport
Invoice.objects.first().delete()
Invoice.objects.first().delete()
paid_invoice = Invoice.objects.first()
paid_invoice.paid = True
paid_invoice.save()
Invoice.objects.create(resource=resource, external_id='1234', amount='1234')
call_command('update_families_from_zip')
assert Invoice.objects.filter(resource=resource).count() == 4
for invoice in Invoice.objects.all():
assert len(invoice.external_id) == 13
if invoice.id == paid_invoice.id:
assert invoice.paid is True # already paid, not updated
else:
assert invoice.paid is False
# invalid zip file
filepath = os.path.join(os.path.dirname(__file__), 'data', 'family_data.zip')
with open(filepath, 'rb') as fd:
resource.archive = File(fd, 'family_data.zip')
with pytest.raises(ValidationError, match=r'Missing factures.xml file in zip'):
resource.clean()