passerelle/tests/test_family.py

571 lines
20 KiB
Python

# -*- coding: utf-8 -*-
import logging
import os
import shutil
import zipfile
from uuid import uuid4
import pytest
from django.contrib.contenttypes.models import ContentType
from django.core.exceptions import ValidationError
from django.core.files import File
from django.core.files.storage import default_storage
from django.core.management import call_command
from django.core.management.base import CommandError
from django.urls import reverse
from django.utils import timezone
from django.utils.http import urlencode
from passerelle.apps.family.models import (
DATETIME_FORMAT,
Adult,
Child,
Family,
FamilyLink,
GenericFamily,
Invoice,
)
from passerelle.base.models import AccessRight, ApiUser
pytestmark = pytest.mark.django_db
nameid = 'foobarnameid'
API_KEY = 'family'
@pytest.fixture
def resource(db):
filepath = os.path.join(os.path.dirname(__file__), 'data', 'family_data.zip')
with open(filepath, 'rb') as fd:
resource = GenericFamily.objects.create(
title='test', slug='test', archive=File(fd, 'family_data.zip')
)
api = ApiUser.objects.create(username='family', keytype='API', key=API_KEY)
obj_type = ContentType.objects.get_for_model(GenericFamily)
AccessRight.objects.create(
codename='can_access', apiuser=api, resource_type=obj_type, resource_pk=resource.pk
)
return resource
def test_link_to_family(app, resource):
r = app.get(
reverse(
'generic-endpoint',
kwargs={'connector': 'family', 'slug': resource.slug, 'endpoint': 'family', 'rest': 'link/'},
),
params={'NameID': nameid, 'login': '9407', 'password': 'gkh0UrrH', 'apikey': API_KEY},
)
assert r.json['data']
assert FamilyLink.objects.filter(resource=resource, name_id=nameid).exists()
def test_unlink_from_family(app, resource):
r = app.get(
reverse(
'generic-endpoint',
kwargs={'connector': 'family', 'slug': resource.slug, 'endpoint': 'family', 'rest': 'link/'},
),
params={'NameID': nameid, 'login': '1364', 'password': 'Li6LN1ID', 'apikey': API_KEY},
)
assert r.json['data']
r = app.get(
reverse(
'generic-endpoint',
kwargs={'connector': 'family', 'slug': resource.slug, 'endpoint': 'family', 'rest': 'unlink/'},
),
params={'NameID': nameid, 'apikey': API_KEY},
)
assert r.json['data']
assert not FamilyLink.objects.filter(resource=resource, name_id=nameid).exists()
def test_family_infos(app, resource):
r = app.get(
reverse(
'generic-endpoint',
kwargs={'connector': 'family', 'slug': resource.slug, 'endpoint': 'family', 'rest': 'link/'},
),
params={'NameID': nameid, 'login': '9407', 'password': 'gkh0UrrH', 'apikey': API_KEY},
)
assert r.json['data']
r = app.get(
reverse(
'generic-endpoint', kwargs={'connector': 'family', 'slug': resource.slug, 'endpoint': 'family/'}
),
params={'NameID': nameid, 'apikey': API_KEY},
)
data = r.json['data']
assert data['id']
assert data['adults']
assert data['children']
assert data['street_number'] is not None
assert data['street_name'] is not None
assert data['zipcode'] is not None
assert data['address_complement'] is not None
assert data['city'] is not None
assert data['address'] is not None
assert data['quotient'] == '1370.50'
def test_family_members(app, resource):
r = app.get(
reverse(
'generic-endpoint',
kwargs={'connector': 'family', 'slug': resource.slug, 'endpoint': 'family', 'rest': 'link/'},
),
params={'NameID': nameid, 'login': '23762', 'password': 's6HliUMX', 'apikey': API_KEY},
)
assert r.json['data']
for typ in ('adults', 'children'):
r = app.get(
reverse(
'generic-endpoint',
kwargs={
'connector': 'family',
'slug': resource.slug,
'endpoint': 'family',
'rest': '%s/' % typ,
},
),
params={'NameID': nameid, 'apikey': API_KEY},
)
data = r.json['data']
assert len(data)
for person in data:
assert person['id']
assert person['first_name']
assert person['last_name']
assert person['street_number'] is not None
assert person['street_name'] is not None
assert person['zipcode'] is not None
assert person['address_complement'] is not None
assert person['city'] is not None
assert person['address'] is not None
r = app.get(
reverse(
'generic-endpoint',
kwargs={
'connector': 'family',
'slug': resource.slug,
'endpoint': 'family',
'rest': '%s/' % typ,
},
),
params={'NameID': 'unknown', 'apikey': API_KEY},
)
data = r.json['data']
assert len(data) == 0
def test_get_family_invoices(app, resource):
r = app.get(
reverse(
'generic-endpoint',
kwargs={'connector': 'family', 'slug': resource.slug, 'endpoint': 'family', 'rest': 'link/'},
),
params={'NameID': nameid, 'login': '19184', 'password': '8xUhrK6e', 'apikey': API_KEY},
)
r = app.get(
reverse(
'generic-endpoint',
kwargs={'connector': 'family', 'slug': resource.slug, 'endpoint': 'regie', 'rest': 'invoices/'},
),
params={'NameID': nameid, 'apikey': API_KEY},
)
assert r.json['data']
for i in r.json['data']:
assert not i['paid']
r = app.get(
reverse(
'generic-endpoint',
kwargs={
'connector': 'family',
'slug': resource.slug,
'endpoint': 'regie',
'rest': 'invoices/history/',
},
),
params={'NameID': nameid, 'apikey': API_KEY},
)
assert r.json['data']
for i in r.json['data']:
assert i['paid']
assert i['has_pdf']
def test_get_family_invoice(app, resource):
r = app.get(
reverse(
'generic-endpoint',
kwargs={'connector': 'family', 'slug': resource.slug, 'endpoint': 'family', 'rest': 'link/'},
),
params={'NameID': nameid, 'login': '19184', 'password': '8xUhrK6e', 'apikey': API_KEY},
)
r = app.get(
reverse(
'generic-endpoint',
kwargs={'connector': 'family', 'slug': resource.slug, 'endpoint': 'regie', 'rest': 'invoices/'},
),
params={'NameID': nameid, 'apikey': API_KEY},
)
assert r.json['data']
invoice = r.json['data'][0]
r = app.get(
reverse(
'generic-endpoint',
kwargs={
'connector': 'family',
'slug': resource.slug,
'endpoint': 'regie',
'rest': 'invoice/%s/' % invoice['label'],
},
),
params={'apikey': API_KEY},
)
assert r.json['data']
for field in ('label', 'amount', 'paid', 'created', 'pay_limit_date'):
assert r.json['data'][field] == invoice[field]
assert r.json['data']['reference_id'] == r.json['data']['label']
def test_get_invoice_pdf(app, resource):
r = app.get(
reverse(
'generic-endpoint',
kwargs={'connector': 'family', 'slug': resource.slug, 'endpoint': 'family', 'rest': 'link/'},
),
params={'NameID': nameid, 'login': '11959', 'password': '1WI6JOux', 'apikey': API_KEY},
)
r = app.get(
reverse(
'generic-endpoint',
kwargs={
'connector': 'family',
'slug': resource.slug,
'endpoint': 'regie',
'rest': 'invoices/history/',
},
),
params={'NameID': nameid, 'apikey': API_KEY},
)
invoice = r.json['data'][0]
assert invoice
r = app.get(
reverse(
'generic-endpoint',
kwargs={
'connector': 'family',
'slug': resource.slug,
'endpoint': 'regie',
'rest': 'invoice/%s/pdf/' % invoice['label'],
},
),
params={'apikey': API_KEY},
)
assert 'Content-Type' in r.headers
assert 'Content-Disposition' in r.headers
assert r.headers['Content-Type'] == 'application/pdf'
assert r.headers['Content-Disposition'] == 'attachment; filename=%s.pdf' % invoice['label']
def test_pay_invoice(app, resource):
r = app.get(
reverse(
'generic-endpoint',
kwargs={'connector': 'family', 'slug': resource.slug, 'endpoint': 'family', 'rest': 'link/'},
),
params={'NameID': nameid, 'login': '19184', 'password': '8xUhrK6e', 'apikey': API_KEY},
)
r = app.get(
reverse(
'generic-endpoint',
kwargs={'connector': 'family', 'slug': resource.slug, 'endpoint': 'regie', 'rest': 'invoices/'},
),
params={'NameID': nameid},
)
invoice = r.json['data'][0]
payment_url = reverse(
'generic-endpoint',
kwargs={
'connector': 'family',
'slug': resource.slug,
'endpoint': 'regie',
'rest': 'invoice/%s/pay/' % invoice['label'],
},
)
r = app.post_json(
payment_url + '?' + urlencode({'apikey': API_KEY}),
params={'transaction_id': str(uuid4()), 'transaction_date': timezone.now().strftime(DATETIME_FORMAT)},
)
assert r.json['data']
r = app.get(
reverse(
'generic-endpoint',
kwargs={
'connector': 'family',
'slug': resource.slug,
'endpoint': 'regie',
'rest': 'invoice/%s/' % invoice['label'],
},
),
params={'apikey': API_KEY},
)
assert r.json['data']['paid']
assert float(r.json['data']['amount']) == 0
def test_fondettes_concerto_loader():
Invoice.objects.all().delete()
filepath = os.path.join(os.path.dirname(__file__), 'data', 'family_data_fondettes.zip')
with open(filepath, 'rb') as fd:
resource = GenericFamily.objects.create(
title='test fondettes',
slug='test-fondettes',
archive=File(fd, 'family_data_fondettes.zip'),
file_format='concerto_fondettes',
)
assert Invoice.objects.filter(resource=resource).count() == 630
assert len([x for x in Invoice.objects.filter(resource=resource) if x.has_pdf]) == 4
assert Invoice.objects.filter(paid=True).count() == 312
assert Invoice.objects.filter(paid=False).count() == 318
assert Invoice.objects.filter(online_payment=False).count() == 2
def test_fondettes_opus_loader():
Invoice.objects.all().delete()
filepath = os.path.join(os.path.dirname(__file__), 'data', 'family_data_opus_fondettes.zip')
with open(filepath, 'rb') as fd:
resource = GenericFamily.objects.create(
title='test fondettes',
slug='test-fondettes',
archive=File(fd, 'family_data_opus_fondettes.zip'),
file_format='opus_fondettes',
)
assert Invoice.objects.filter(resource=resource).count() == 630
assert len([x for x in Invoice.objects.filter(resource=resource) if x.has_pdf]) == 4
assert Invoice.objects.filter(paid=True).count() == 314
assert Invoice.objects.filter(paid=False).count() == 316
def test_archive_validation():
filepath = os.path.join(os.path.dirname(__file__), 'data', 'iparapheur.wsdl')
with open(filepath) as fd:
resource = GenericFamily.objects.create(
title='test', slug='test', archive=File(fd, 'iparapheur.wsdl')
)
with pytest.raises(ValidationError):
resource.clean()
def test_fondettes_concerto_validation():
filepath = os.path.join(os.path.dirname(__file__), 'data', 'family_data_fondettes.zip')
with open(filepath, 'rb') as fd:
resource = GenericFamily.objects.create(
title='test fondettes',
slug='test fondettes',
archive=File(fd, 'family_data_fondettes.zip'),
file_format='concerto_fondettes',
)
resource.clean()
filepath = os.path.join(os.path.dirname(__file__), 'data', 'family_data.zip')
with open(filepath) as fd:
resource.archive = File(fd, 'family_data.zip')
with pytest.raises(ValidationError):
resource.clean()
def test_orleans_concerto_loader():
# all related objects will also be deleted
Family.objects.all().delete()
filepath = os.path.join(os.path.dirname(__file__), 'data', 'orleans', 'family_data_orleans.zip')
with open(filepath, 'rb') as fd:
resource = GenericFamily(
title='test orleans',
slug='test-orleans',
archive=File(fd, 'family_data_orleans.zip'),
file_format='concerto_orleans',
)
from passerelle.apps.family.loaders.concerto_orleans import Loader
loader = Loader(resource)
with zipfile.ZipFile(filepath) as z:
loader.archive = z
families = loader.build_families()
assert len(families) == 18
for family in families.values():
assert family['external_id']
assert family['adults']
assert family['login']
assert family['password']
assert family['zipcode']
assert family['city']
assert len(family['adults']) > 0
for adult in family['adults']:
assert adult['first_name']
assert adult['last_name']
assert adult['sex']
assert adult['external_id']
assert adult['zipcode']
assert adult['city']
assert len(family['children']) >= 1
for child in family['children']:
assert child['external_id']
assert child['first_name']
assert child['last_name']
assert child['sex']
assert 'birthdate' in child
assert 'invoices' in family
if family['invoices']:
for invoice in family['invoices']:
assert invoice['external_id']
assert invoice['label']
assert invoice['issue_date']
assert invoice['online_payment']
assert 'autobilling' in invoice
assert 'amount' in invoice
assert 'total_amount' in invoice
# there are 4 families with invoices in test data
assert len([f for f in families.values() if f['invoices']]) == 4
# and 14 families with no invoices
assert len([f for f in families.values() if not f['invoices']]) == 14
resource.save()
assert Family.objects.filter(resource=resource).count() == 18
assert Adult.objects.all().count() == 31
assert Child.objects.all().count() == 35
assert Invoice.objects.filter(resource=resource).count() == 0
def test_orleans_data_import_command():
with pytest.raises(CommandError) as error:
call_command('import_orleans_data')
assert str(error.value) == 'File exports_prcit.zip does not exist.'
cur_dir = os.path.dirname(__file__)
data_dir = os.path.join(cur_dir, 'data', 'orleans')
with open(os.path.join(data_dir, 'family_data_orleans.zip'), 'rb') as fd:
resource = GenericFamily.objects.create(
title='test orleans',
archive=File(fd, 'family_data_orleans.zip'),
slug='test-orleans',
file_format='concerto_orleans',
)
# cleanup data before launching import
Family.objects.filter(resource=resource).delete()
Invoice.objects.filter(resource=resource).delete()
call_command('import_orleans_data', archive_file=os.path.join(data_dir, 'family_data_orleans.zip'))
assert Family.objects.filter(resource=resource).count() == 0
assert Invoice.objects.filter(resource=resource).count() == 0
resource_invoices_dir = default_storage.path('family-%s/invoices' % resource.id)
# cleanup previosly created invoices dir, if exists
if os.path.exists(resource_invoices_dir) and os.path.isdir(resource_invoices_dir):
shutil.rmtree(resource_invoices_dir)
os.symlink(os.path.join(data_dir, 'factures'), resource_invoices_dir)
call_command(
'import_orleans_data',
archive_file=os.path.join(data_dir, 'family_data_orleans.zip'),
connector='test-orleans',
)
os.unlink(resource_invoices_dir)
assert Family.objects.filter(resource=resource).count() == 18
assert Adult.objects.all().count() == 31
assert Child.objects.all().count() == 35
assert Invoice.objects.filter(resource=resource).count() == 7
def test_family_pending_invoices_by_nameid_with_no_links():
test_orleans_data_import_command()
resource = GenericFamily.objects.get()
links = resource.get_pending_invoices_by_nameid(None)
assert links['data'] == {}
def test_family_pending_invoices_by_nameid():
test_orleans_data_import_command()
resource = GenericFamily.objects.get()
family = Family.objects.get(external_id='22380')
FamilyLink.objects.create(resource=resource, family=family, name_id='testnameid1')
family = Family.objects.get(external_id='1228')
FamilyLink.objects.create(resource=resource, family=family, name_id='testnameid2')
links = resource.get_pending_invoices_by_nameid(None)
assert len(links['data']) == 2
for uuid, invoices in links['data'].items():
assert uuid in ('testnameid1', 'testnameid2')
assert len(invoices) >= 1
def test_incorrect_orleans_data(caplog):
filepath = os.path.join(os.path.dirname(__file__), 'data', 'family_incorrect_data_orleans.zip')
with open(filepath, 'rb') as fd:
GenericFamily.objects.create(
title='test orleans',
slug='test-orleans',
archive=File(fd, 'family_incorrect_data_orleans.zip'),
file_format='concerto_orleans',
)
for record in caplog.records:
assert 'Error occured while importing data:' in record.message
assert record.name == 'passerelle.resource.family.test-orleans'
assert record.levelno == logging.ERROR
def test_egee_thonon_loader():
Invoice.objects.all().delete()
filepath = os.path.join(os.path.dirname(__file__), 'data', 'family_data_egee_thonon.zip')
with open(filepath, 'rb') as fd:
resource = GenericFamily.objects.create(
title='test-egee-thonon',
slug='test-egee-thonon',
archive=File(fd, 'family_data_egee_thonon.zip'),
file_format='egee_thonon',
)
assert Invoice.objects.filter(resource=resource).count() == 4
for invoice in Invoice.objects.all():
assert len(invoice.external_id) == 13
assert invoice.label == invoice.external_id
assert invoice.online_payment is True
assert invoice.no_online_payment_reason is None
assert invoice.paid is False
assert (invoice.pay_limit_date - invoice.issue_date).days == 62
# modify invoice list, and reimport
Invoice.objects.first().delete()
Invoice.objects.first().delete()
paid_invoice = Invoice.objects.first()
paid_invoice.paid = True
paid_invoice.save()
Invoice.objects.create(resource=resource, external_id='1234', amount='1234')
call_command('update_families_from_zip')
assert Invoice.objects.filter(resource=resource).count() == 4
for invoice in Invoice.objects.all():
assert len(invoice.external_id) == 13
if invoice.id == paid_invoice.id:
assert invoice.paid is True # already paid, not updated
else:
assert invoice.paid is False
# invalid zip file
filepath = os.path.join(os.path.dirname(__file__), 'data', 'family_data.zip')
with open(filepath, 'rb') as fd:
resource.archive = File(fd, 'family_data.zip')
with pytest.raises(ValidationError, match=r'Missing factures.xml file in zip'):
resource.clean()