567 lines
20 KiB
Python
567 lines
20 KiB
Python
import logging
|
|
import os
|
|
import shutil
|
|
import zipfile
|
|
from uuid import uuid4
|
|
|
|
import pytest
|
|
from django.contrib.contenttypes.models import ContentType
|
|
from django.core.exceptions import ValidationError
|
|
from django.core.files import File
|
|
from django.core.files.storage import default_storage
|
|
from django.core.management import call_command
|
|
from django.core.management.base import CommandError
|
|
from django.urls import reverse
|
|
from django.utils import timezone
|
|
from django.utils.http import urlencode
|
|
|
|
from passerelle.apps.family.models import (
|
|
DATETIME_FORMAT,
|
|
Adult,
|
|
Child,
|
|
Family,
|
|
FamilyLink,
|
|
GenericFamily,
|
|
Invoice,
|
|
)
|
|
from passerelle.base.models import AccessRight, ApiUser
|
|
|
|
pytestmark = pytest.mark.django_db
|
|
|
|
nameid = 'foobarnameid'
|
|
API_KEY = 'family'
|
|
|
|
|
|
@pytest.fixture
|
|
def resource(db):
|
|
filepath = os.path.join(os.path.dirname(__file__), 'data', 'family_data.zip')
|
|
with open(filepath, 'rb') as fd:
|
|
resource = GenericFamily.objects.create(
|
|
title='test', slug='test', archive=File(fd, 'family_data.zip')
|
|
)
|
|
api = ApiUser.objects.create(username='family', keytype='API', key=API_KEY)
|
|
obj_type = ContentType.objects.get_for_model(GenericFamily)
|
|
AccessRight.objects.create(
|
|
codename='can_access', apiuser=api, resource_type=obj_type, resource_pk=resource.pk
|
|
)
|
|
return resource
|
|
|
|
|
|
def test_link_to_family(app, resource):
|
|
r = app.get(
|
|
reverse(
|
|
'generic-endpoint',
|
|
kwargs={'connector': 'family', 'slug': resource.slug, 'endpoint': 'family', 'rest': 'link/'},
|
|
),
|
|
params={'NameID': nameid, 'login': '9407', 'password': 'gkh0UrrH', 'apikey': API_KEY},
|
|
)
|
|
assert r.json['data']
|
|
assert FamilyLink.objects.filter(resource=resource, name_id=nameid).exists()
|
|
|
|
|
|
def test_unlink_from_family(app, resource):
|
|
r = app.get(
|
|
reverse(
|
|
'generic-endpoint',
|
|
kwargs={'connector': 'family', 'slug': resource.slug, 'endpoint': 'family', 'rest': 'link/'},
|
|
),
|
|
params={'NameID': nameid, 'login': '1364', 'password': 'Li6LN1ID', 'apikey': API_KEY},
|
|
)
|
|
assert r.json['data']
|
|
r = app.get(
|
|
reverse(
|
|
'generic-endpoint',
|
|
kwargs={'connector': 'family', 'slug': resource.slug, 'endpoint': 'family', 'rest': 'unlink/'},
|
|
),
|
|
params={'NameID': nameid, 'apikey': API_KEY},
|
|
)
|
|
assert r.json['data']
|
|
assert not FamilyLink.objects.filter(resource=resource, name_id=nameid).exists()
|
|
|
|
|
|
def test_family_infos(app, resource):
|
|
r = app.get(
|
|
reverse(
|
|
'generic-endpoint',
|
|
kwargs={'connector': 'family', 'slug': resource.slug, 'endpoint': 'family', 'rest': 'link/'},
|
|
),
|
|
params={'NameID': nameid, 'login': '9407', 'password': 'gkh0UrrH', 'apikey': API_KEY},
|
|
)
|
|
assert r.json['data']
|
|
r = app.get(
|
|
reverse(
|
|
'generic-endpoint', kwargs={'connector': 'family', 'slug': resource.slug, 'endpoint': 'family/'}
|
|
),
|
|
params={'NameID': nameid, 'apikey': API_KEY},
|
|
)
|
|
data = r.json['data']
|
|
assert data['id']
|
|
assert data['adults']
|
|
assert data['children']
|
|
assert data['street_number'] is not None
|
|
assert data['street_name'] is not None
|
|
assert data['zipcode'] is not None
|
|
assert data['address_complement'] is not None
|
|
assert data['city'] is not None
|
|
assert data['address'] is not None
|
|
assert data['quotient'] == '1370.50'
|
|
|
|
|
|
def test_family_members(app, resource):
|
|
r = app.get(
|
|
reverse(
|
|
'generic-endpoint',
|
|
kwargs={'connector': 'family', 'slug': resource.slug, 'endpoint': 'family', 'rest': 'link/'},
|
|
),
|
|
params={'NameID': nameid, 'login': '23762', 'password': 's6HliUMX', 'apikey': API_KEY},
|
|
)
|
|
assert r.json['data']
|
|
for typ in ('adults', 'children'):
|
|
r = app.get(
|
|
reverse(
|
|
'generic-endpoint',
|
|
kwargs={
|
|
'connector': 'family',
|
|
'slug': resource.slug,
|
|
'endpoint': 'family',
|
|
'rest': '%s/' % typ,
|
|
},
|
|
),
|
|
params={'NameID': nameid, 'apikey': API_KEY},
|
|
)
|
|
data = r.json['data']
|
|
|
|
assert len(data)
|
|
for person in data:
|
|
assert person['id']
|
|
assert person['first_name']
|
|
assert person['last_name']
|
|
assert person['street_number'] is not None
|
|
assert person['street_name'] is not None
|
|
assert person['zipcode'] is not None
|
|
assert person['address_complement'] is not None
|
|
assert person['city'] is not None
|
|
assert person['address'] is not None
|
|
|
|
r = app.get(
|
|
reverse(
|
|
'generic-endpoint',
|
|
kwargs={
|
|
'connector': 'family',
|
|
'slug': resource.slug,
|
|
'endpoint': 'family',
|
|
'rest': '%s/' % typ,
|
|
},
|
|
),
|
|
params={'NameID': 'unknown', 'apikey': API_KEY},
|
|
)
|
|
data = r.json['data']
|
|
assert len(data) == 0
|
|
|
|
|
|
def test_get_family_invoices(app, resource):
|
|
r = app.get(
|
|
reverse(
|
|
'generic-endpoint',
|
|
kwargs={'connector': 'family', 'slug': resource.slug, 'endpoint': 'family', 'rest': 'link/'},
|
|
),
|
|
params={'NameID': nameid, 'login': '19184', 'password': '8xUhrK6e', 'apikey': API_KEY},
|
|
)
|
|
r = app.get(
|
|
reverse(
|
|
'generic-endpoint',
|
|
kwargs={'connector': 'family', 'slug': resource.slug, 'endpoint': 'regie', 'rest': 'invoices/'},
|
|
),
|
|
params={'NameID': nameid, 'apikey': API_KEY},
|
|
)
|
|
assert r.json['data']
|
|
for i in r.json['data']:
|
|
assert not i['paid']
|
|
|
|
r = app.get(
|
|
reverse(
|
|
'generic-endpoint',
|
|
kwargs={
|
|
'connector': 'family',
|
|
'slug': resource.slug,
|
|
'endpoint': 'regie',
|
|
'rest': 'invoices/history/',
|
|
},
|
|
),
|
|
params={'NameID': nameid, 'apikey': API_KEY},
|
|
)
|
|
assert r.json['data']
|
|
for i in r.json['data']:
|
|
assert i['paid']
|
|
assert i['has_pdf']
|
|
|
|
|
|
def test_get_family_invoice(app, resource):
|
|
r = app.get(
|
|
reverse(
|
|
'generic-endpoint',
|
|
kwargs={'connector': 'family', 'slug': resource.slug, 'endpoint': 'family', 'rest': 'link/'},
|
|
),
|
|
params={'NameID': nameid, 'login': '19184', 'password': '8xUhrK6e', 'apikey': API_KEY},
|
|
)
|
|
r = app.get(
|
|
reverse(
|
|
'generic-endpoint',
|
|
kwargs={'connector': 'family', 'slug': resource.slug, 'endpoint': 'regie', 'rest': 'invoices/'},
|
|
),
|
|
params={'NameID': nameid, 'apikey': API_KEY},
|
|
)
|
|
assert r.json['data']
|
|
invoice = r.json['data'][0]
|
|
|
|
r = app.get(
|
|
reverse(
|
|
'generic-endpoint',
|
|
kwargs={
|
|
'connector': 'family',
|
|
'slug': resource.slug,
|
|
'endpoint': 'regie',
|
|
'rest': 'invoice/%s/' % invoice['label'],
|
|
},
|
|
),
|
|
params={'apikey': API_KEY},
|
|
)
|
|
assert r.json['data']
|
|
for field in ('label', 'amount', 'paid', 'created', 'pay_limit_date'):
|
|
assert r.json['data'][field] == invoice[field]
|
|
assert r.json['data']['reference_id'] == r.json['data']['label']
|
|
|
|
|
|
def test_get_invoice_pdf(app, resource):
|
|
r = app.get(
|
|
reverse(
|
|
'generic-endpoint',
|
|
kwargs={'connector': 'family', 'slug': resource.slug, 'endpoint': 'family', 'rest': 'link/'},
|
|
),
|
|
params={'NameID': nameid, 'login': '11959', 'password': '1WI6JOux', 'apikey': API_KEY},
|
|
)
|
|
r = app.get(
|
|
reverse(
|
|
'generic-endpoint',
|
|
kwargs={
|
|
'connector': 'family',
|
|
'slug': resource.slug,
|
|
'endpoint': 'regie',
|
|
'rest': 'invoices/history/',
|
|
},
|
|
),
|
|
params={'NameID': nameid, 'apikey': API_KEY},
|
|
)
|
|
invoice = r.json['data'][0]
|
|
assert invoice
|
|
r = app.get(
|
|
reverse(
|
|
'generic-endpoint',
|
|
kwargs={
|
|
'connector': 'family',
|
|
'slug': resource.slug,
|
|
'endpoint': 'regie',
|
|
'rest': 'invoice/%s/pdf/' % invoice['label'],
|
|
},
|
|
),
|
|
params={'apikey': API_KEY},
|
|
)
|
|
assert 'Content-Type' in r.headers
|
|
assert 'Content-Disposition' in r.headers
|
|
assert r.headers['Content-Type'] == 'application/pdf'
|
|
assert r.headers['Content-Disposition'] == 'attachment; filename=%s.pdf' % invoice['label']
|
|
|
|
|
|
def test_pay_invoice(app, resource):
|
|
r = app.get(
|
|
reverse(
|
|
'generic-endpoint',
|
|
kwargs={'connector': 'family', 'slug': resource.slug, 'endpoint': 'family', 'rest': 'link/'},
|
|
),
|
|
params={'NameID': nameid, 'login': '19184', 'password': '8xUhrK6e', 'apikey': API_KEY},
|
|
)
|
|
r = app.get(
|
|
reverse(
|
|
'generic-endpoint',
|
|
kwargs={'connector': 'family', 'slug': resource.slug, 'endpoint': 'regie', 'rest': 'invoices/'},
|
|
),
|
|
params={'NameID': nameid},
|
|
)
|
|
invoice = r.json['data'][0]
|
|
payment_url = reverse(
|
|
'generic-endpoint',
|
|
kwargs={
|
|
'connector': 'family',
|
|
'slug': resource.slug,
|
|
'endpoint': 'regie',
|
|
'rest': 'invoice/%s/pay/' % invoice['label'],
|
|
},
|
|
)
|
|
r = app.post_json(
|
|
payment_url + '?' + urlencode({'apikey': API_KEY}),
|
|
params={'transaction_id': str(uuid4()), 'transaction_date': timezone.now().strftime(DATETIME_FORMAT)},
|
|
)
|
|
assert r.json['data']
|
|
r = app.get(
|
|
reverse(
|
|
'generic-endpoint',
|
|
kwargs={
|
|
'connector': 'family',
|
|
'slug': resource.slug,
|
|
'endpoint': 'regie',
|
|
'rest': 'invoice/%s/' % invoice['label'],
|
|
},
|
|
),
|
|
params={'apikey': API_KEY},
|
|
)
|
|
assert r.json['data']['paid']
|
|
assert float(r.json['data']['amount']) == 0
|
|
|
|
|
|
def test_fondettes_concerto_loader():
|
|
Invoice.objects.all().delete()
|
|
filepath = os.path.join(os.path.dirname(__file__), 'data', 'family_data_fondettes.zip')
|
|
with open(filepath, 'rb') as fd:
|
|
resource = GenericFamily.objects.create(
|
|
title='test fondettes',
|
|
slug='test-fondettes',
|
|
archive=File(fd, 'family_data_fondettes.zip'),
|
|
file_format='concerto_fondettes',
|
|
)
|
|
assert Invoice.objects.filter(resource=resource).count() == 630
|
|
assert len([x for x in Invoice.objects.filter(resource=resource) if x.has_pdf]) == 4
|
|
assert Invoice.objects.filter(paid=True).count() == 312
|
|
assert Invoice.objects.filter(paid=False).count() == 318
|
|
assert Invoice.objects.filter(online_payment=False).count() == 2
|
|
|
|
|
|
def test_fondettes_opus_loader():
|
|
Invoice.objects.all().delete()
|
|
filepath = os.path.join(os.path.dirname(__file__), 'data', 'family_data_opus_fondettes.zip')
|
|
with open(filepath, 'rb') as fd:
|
|
resource = GenericFamily.objects.create(
|
|
title='test fondettes',
|
|
slug='test-fondettes',
|
|
archive=File(fd, 'family_data_opus_fondettes.zip'),
|
|
file_format='opus_fondettes',
|
|
)
|
|
assert Invoice.objects.filter(resource=resource).count() == 630
|
|
assert len([x for x in Invoice.objects.filter(resource=resource) if x.has_pdf]) == 4
|
|
assert Invoice.objects.filter(paid=True).count() == 314
|
|
assert Invoice.objects.filter(paid=False).count() == 316
|
|
|
|
|
|
def test_archive_validation():
|
|
filepath = os.path.join(os.path.dirname(__file__), 'data', 'soap.wsdl')
|
|
with open(filepath) as fd:
|
|
resource = GenericFamily.objects.create(title='test', slug='test', archive=File(fd, 'soap.wsdl'))
|
|
with pytest.raises(ValidationError):
|
|
resource.clean()
|
|
|
|
|
|
def test_fondettes_concerto_validation():
|
|
filepath = os.path.join(os.path.dirname(__file__), 'data', 'family_data_fondettes.zip')
|
|
with open(filepath, 'rb') as fd:
|
|
resource = GenericFamily.objects.create(
|
|
title='test fondettes',
|
|
slug='test fondettes',
|
|
archive=File(fd, 'family_data_fondettes.zip'),
|
|
file_format='concerto_fondettes',
|
|
)
|
|
resource.clean()
|
|
|
|
filepath = os.path.join(os.path.dirname(__file__), 'data', 'family_data.zip')
|
|
with open(filepath) as fd:
|
|
resource.archive = File(fd, 'family_data.zip')
|
|
with pytest.raises(ValidationError):
|
|
resource.clean()
|
|
|
|
|
|
def test_orleans_concerto_loader():
|
|
# all related objects will also be deleted
|
|
Family.objects.all().delete()
|
|
filepath = os.path.join(os.path.dirname(__file__), 'data', 'orleans', 'family_data_orleans.zip')
|
|
with open(filepath, 'rb') as fd:
|
|
resource = GenericFamily(
|
|
title='test orleans',
|
|
slug='test-orleans',
|
|
archive=File(fd, 'family_data_orleans.zip'),
|
|
file_format='concerto_orleans',
|
|
)
|
|
from passerelle.apps.family.loaders.concerto_orleans import Loader
|
|
|
|
loader = Loader(resource)
|
|
with zipfile.ZipFile(filepath) as z:
|
|
loader.archive = z
|
|
|
|
families = loader.build_families()
|
|
|
|
assert len(families) == 18
|
|
for family in families.values():
|
|
assert family['external_id']
|
|
assert family['adults']
|
|
assert family['login']
|
|
assert family['password']
|
|
assert family['zipcode']
|
|
assert family['city']
|
|
assert len(family['adults']) > 0
|
|
|
|
for adult in family['adults']:
|
|
assert adult['first_name']
|
|
assert adult['last_name']
|
|
assert adult['sex']
|
|
assert adult['external_id']
|
|
assert adult['zipcode']
|
|
assert adult['city']
|
|
assert len(family['children']) >= 1
|
|
for child in family['children']:
|
|
assert child['external_id']
|
|
assert child['first_name']
|
|
assert child['last_name']
|
|
assert child['sex']
|
|
assert 'birthdate' in child
|
|
|
|
assert 'invoices' in family
|
|
if family['invoices']:
|
|
for invoice in family['invoices']:
|
|
assert invoice['external_id']
|
|
assert invoice['label']
|
|
assert invoice['issue_date']
|
|
assert invoice['online_payment']
|
|
assert 'autobilling' in invoice
|
|
assert 'amount' in invoice
|
|
assert 'total_amount' in invoice
|
|
|
|
# there are 4 families with invoices in test data
|
|
assert len([f for f in families.values() if f['invoices']]) == 4
|
|
# and 14 families with no invoices
|
|
assert len([f for f in families.values() if not f['invoices']]) == 14
|
|
resource.save()
|
|
|
|
assert Family.objects.filter(resource=resource).count() == 18
|
|
assert Adult.objects.all().count() == 31
|
|
assert Child.objects.all().count() == 35
|
|
assert Invoice.objects.filter(resource=resource).count() == 0
|
|
|
|
|
|
def test_orleans_data_import_command():
|
|
with pytest.raises(CommandError) as error:
|
|
call_command('import_orleans_data')
|
|
assert str(error.value) == 'File exports_prcit.zip does not exist.'
|
|
|
|
cur_dir = os.path.dirname(__file__)
|
|
data_dir = os.path.join(cur_dir, 'data', 'orleans')
|
|
|
|
with open(os.path.join(data_dir, 'family_data_orleans.zip'), 'rb') as fd:
|
|
resource = GenericFamily.objects.create(
|
|
title='test orleans',
|
|
archive=File(fd, 'family_data_orleans.zip'),
|
|
slug='test-orleans',
|
|
file_format='concerto_orleans',
|
|
)
|
|
|
|
# cleanup data before launching import
|
|
Family.objects.filter(resource=resource).delete()
|
|
Invoice.objects.filter(resource=resource).delete()
|
|
|
|
call_command('import_orleans_data', archive_file=os.path.join(data_dir, 'family_data_orleans.zip'))
|
|
assert Family.objects.filter(resource=resource).count() == 0
|
|
assert Invoice.objects.filter(resource=resource).count() == 0
|
|
|
|
resource_invoices_dir = default_storage.path('family-%s/invoices' % resource.id)
|
|
# cleanup previosly created invoices dir, if exists
|
|
if os.path.exists(resource_invoices_dir) and os.path.isdir(resource_invoices_dir):
|
|
shutil.rmtree(resource_invoices_dir)
|
|
os.symlink(os.path.join(data_dir, 'factures'), resource_invoices_dir)
|
|
|
|
call_command(
|
|
'import_orleans_data',
|
|
archive_file=os.path.join(data_dir, 'family_data_orleans.zip'),
|
|
connector='test-orleans',
|
|
)
|
|
os.unlink(resource_invoices_dir)
|
|
|
|
assert Family.objects.filter(resource=resource).count() == 18
|
|
assert Adult.objects.all().count() == 31
|
|
assert Child.objects.all().count() == 35
|
|
assert Invoice.objects.filter(resource=resource).count() == 7
|
|
|
|
|
|
def test_family_pending_invoices_by_nameid_with_no_links():
|
|
test_orleans_data_import_command()
|
|
resource = GenericFamily.objects.get()
|
|
links = resource.get_pending_invoices_by_nameid(None)
|
|
assert links['data'] == {}
|
|
|
|
|
|
def test_family_pending_invoices_by_nameid():
|
|
test_orleans_data_import_command()
|
|
resource = GenericFamily.objects.get()
|
|
family = Family.objects.get(external_id='22380')
|
|
FamilyLink.objects.create(resource=resource, family=family, name_id='testnameid1')
|
|
family = Family.objects.get(external_id='1228')
|
|
FamilyLink.objects.create(resource=resource, family=family, name_id='testnameid2')
|
|
links = resource.get_pending_invoices_by_nameid(None)
|
|
assert len(links['data']) == 2
|
|
for uuid, invoices in links['data'].items():
|
|
assert uuid in ('testnameid1', 'testnameid2')
|
|
assert len(invoices) >= 1
|
|
|
|
|
|
def test_incorrect_orleans_data(caplog):
|
|
filepath = os.path.join(os.path.dirname(__file__), 'data', 'family_incorrect_data_orleans.zip')
|
|
with open(filepath, 'rb') as fd:
|
|
GenericFamily.objects.create(
|
|
title='test orleans',
|
|
slug='test-orleans',
|
|
archive=File(fd, 'family_incorrect_data_orleans.zip'),
|
|
file_format='concerto_orleans',
|
|
)
|
|
for record in caplog.records:
|
|
assert 'Error occured while importing data:' in record.message
|
|
assert record.name == 'passerelle.resource.family.test-orleans'
|
|
assert record.levelno == logging.ERROR
|
|
|
|
|
|
def test_egee_thonon_loader():
|
|
Invoice.objects.all().delete()
|
|
filepath = os.path.join(os.path.dirname(__file__), 'data', 'family_data_egee_thonon.zip')
|
|
with open(filepath, 'rb') as fd:
|
|
resource = GenericFamily.objects.create(
|
|
title='test-egee-thonon',
|
|
slug='test-egee-thonon',
|
|
archive=File(fd, 'family_data_egee_thonon.zip'),
|
|
file_format='egee_thonon',
|
|
)
|
|
assert Invoice.objects.filter(resource=resource).count() == 4
|
|
for invoice in Invoice.objects.all():
|
|
assert len(invoice.external_id) == 13
|
|
assert invoice.label == invoice.external_id
|
|
assert invoice.online_payment is True
|
|
assert invoice.no_online_payment_reason is None
|
|
assert invoice.paid is False
|
|
assert (invoice.pay_limit_date - invoice.issue_date).days == 62
|
|
|
|
# modify invoice list, and reimport
|
|
Invoice.objects.first().delete()
|
|
Invoice.objects.first().delete()
|
|
paid_invoice = Invoice.objects.first()
|
|
paid_invoice.paid = True
|
|
paid_invoice.save()
|
|
Invoice.objects.create(resource=resource, external_id='1234', amount='1234')
|
|
call_command('update_families_from_zip')
|
|
assert Invoice.objects.filter(resource=resource).count() == 4
|
|
for invoice in Invoice.objects.all():
|
|
assert len(invoice.external_id) == 13
|
|
if invoice.id == paid_invoice.id:
|
|
assert invoice.paid is True # already paid, not updated
|
|
else:
|
|
assert invoice.paid is False
|
|
|
|
# invalid zip file
|
|
filepath = os.path.join(os.path.dirname(__file__), 'data', 'family_data.zip')
|
|
with open(filepath, 'rb') as fd:
|
|
resource.archive = File(fd, 'family_data.zip')
|
|
with pytest.raises(ValidationError, match=r'Missing factures.xml file in zip'):
|
|
resource.clean()
|