import logging import os import shutil import zipfile from uuid import uuid4 import pytest from django.contrib.contenttypes.models import ContentType from django.core.exceptions import ValidationError from django.core.files import File from django.core.files.storage import default_storage from django.core.management import call_command from django.core.management.base import CommandError from django.urls import reverse from django.utils import timezone from django.utils.http import urlencode from passerelle.apps.family.models import ( DATETIME_FORMAT, Adult, Child, Family, FamilyLink, GenericFamily, Invoice, ) from passerelle.base.models import AccessRight, ApiUser pytestmark = pytest.mark.django_db nameid = 'foobarnameid' API_KEY = 'family' @pytest.fixture def resource(db): filepath = os.path.join(os.path.dirname(__file__), 'data', 'family_data.zip') with open(filepath, 'rb') as fd: resource = GenericFamily.objects.create( title='test', slug='test', archive=File(fd, 'family_data.zip') ) api = ApiUser.objects.create(username='family', keytype='API', key=API_KEY) obj_type = ContentType.objects.get_for_model(GenericFamily) AccessRight.objects.create( codename='can_access', apiuser=api, resource_type=obj_type, resource_pk=resource.pk ) return resource def test_link_to_family(app, resource): r = app.get( reverse( 'generic-endpoint', kwargs={'connector': 'family', 'slug': resource.slug, 'endpoint': 'family', 'rest': 'link/'}, ), params={'NameID': nameid, 'login': '9407', 'password': 'gkh0UrrH', 'apikey': API_KEY}, ) assert r.json['data'] assert FamilyLink.objects.filter(resource=resource, name_id=nameid).exists() def test_unlink_from_family(app, resource): r = app.get( reverse( 'generic-endpoint', kwargs={'connector': 'family', 'slug': resource.slug, 'endpoint': 'family', 'rest': 'link/'}, ), params={'NameID': nameid, 'login': '1364', 'password': 'Li6LN1ID', 'apikey': API_KEY}, ) assert r.json['data'] r = app.get( reverse( 'generic-endpoint', kwargs={'connector': 'family', 'slug': resource.slug, 'endpoint': 'family', 'rest': 'unlink/'}, ), params={'NameID': nameid, 'apikey': API_KEY}, ) assert r.json['data'] assert not FamilyLink.objects.filter(resource=resource, name_id=nameid).exists() def test_family_infos(app, resource): r = app.get( reverse( 'generic-endpoint', kwargs={'connector': 'family', 'slug': resource.slug, 'endpoint': 'family', 'rest': 'link/'}, ), params={'NameID': nameid, 'login': '9407', 'password': 'gkh0UrrH', 'apikey': API_KEY}, ) assert r.json['data'] r = app.get( reverse( 'generic-endpoint', kwargs={'connector': 'family', 'slug': resource.slug, 'endpoint': 'family/'} ), params={'NameID': nameid, 'apikey': API_KEY}, ) data = r.json['data'] assert data['id'] assert data['adults'] assert data['children'] assert data['street_number'] is not None assert data['street_name'] is not None assert data['zipcode'] is not None assert data['address_complement'] is not None assert data['city'] is not None assert data['address'] is not None assert data['quotient'] == '1370.50' def test_family_members(app, resource): r = app.get( reverse( 'generic-endpoint', kwargs={'connector': 'family', 'slug': resource.slug, 'endpoint': 'family', 'rest': 'link/'}, ), params={'NameID': nameid, 'login': '23762', 'password': 's6HliUMX', 'apikey': API_KEY}, ) assert r.json['data'] for typ in ('adults', 'children'): r = app.get( reverse( 'generic-endpoint', kwargs={ 'connector': 'family', 'slug': resource.slug, 'endpoint': 'family', 'rest': '%s/' % typ, }, ), params={'NameID': nameid, 'apikey': API_KEY}, ) data = r.json['data'] assert len(data) for person in data: assert person['id'] assert person['first_name'] assert person['last_name'] assert person['street_number'] is not None assert person['street_name'] is not None assert person['zipcode'] is not None assert person['address_complement'] is not None assert person['city'] is not None assert person['address'] is not None r = app.get( reverse( 'generic-endpoint', kwargs={ 'connector': 'family', 'slug': resource.slug, 'endpoint': 'family', 'rest': '%s/' % typ, }, ), params={'NameID': 'unknown', 'apikey': API_KEY}, ) data = r.json['data'] assert len(data) == 0 def test_get_family_invoices(app, resource): r = app.get( reverse( 'generic-endpoint', kwargs={'connector': 'family', 'slug': resource.slug, 'endpoint': 'family', 'rest': 'link/'}, ), params={'NameID': nameid, 'login': '19184', 'password': '8xUhrK6e', 'apikey': API_KEY}, ) r = app.get( reverse( 'generic-endpoint', kwargs={'connector': 'family', 'slug': resource.slug, 'endpoint': 'regie', 'rest': 'invoices/'}, ), params={'NameID': nameid, 'apikey': API_KEY}, ) assert r.json['data'] for i in r.json['data']: assert not i['paid'] r = app.get( reverse( 'generic-endpoint', kwargs={ 'connector': 'family', 'slug': resource.slug, 'endpoint': 'regie', 'rest': 'invoices/history/', }, ), params={'NameID': nameid, 'apikey': API_KEY}, ) assert r.json['data'] for i in r.json['data']: assert i['paid'] assert i['has_pdf'] def test_get_family_invoice(app, resource): r = app.get( reverse( 'generic-endpoint', kwargs={'connector': 'family', 'slug': resource.slug, 'endpoint': 'family', 'rest': 'link/'}, ), params={'NameID': nameid, 'login': '19184', 'password': '8xUhrK6e', 'apikey': API_KEY}, ) r = app.get( reverse( 'generic-endpoint', kwargs={'connector': 'family', 'slug': resource.slug, 'endpoint': 'regie', 'rest': 'invoices/'}, ), params={'NameID': nameid, 'apikey': API_KEY}, ) assert r.json['data'] invoice = r.json['data'][0] r = app.get( reverse( 'generic-endpoint', kwargs={ 'connector': 'family', 'slug': resource.slug, 'endpoint': 'regie', 'rest': 'invoice/%s/' % invoice['label'], }, ), params={'apikey': API_KEY}, ) assert r.json['data'] for field in ('label', 'amount', 'paid', 'created', 'pay_limit_date'): assert r.json['data'][field] == invoice[field] assert r.json['data']['reference_id'] == r.json['data']['label'] def test_get_invoice_pdf(app, resource): r = app.get( reverse( 'generic-endpoint', kwargs={'connector': 'family', 'slug': resource.slug, 'endpoint': 'family', 'rest': 'link/'}, ), params={'NameID': nameid, 'login': '11959', 'password': '1WI6JOux', 'apikey': API_KEY}, ) r = app.get( reverse( 'generic-endpoint', kwargs={ 'connector': 'family', 'slug': resource.slug, 'endpoint': 'regie', 'rest': 'invoices/history/', }, ), params={'NameID': nameid, 'apikey': API_KEY}, ) invoice = r.json['data'][0] assert invoice r = app.get( reverse( 'generic-endpoint', kwargs={ 'connector': 'family', 'slug': resource.slug, 'endpoint': 'regie', 'rest': 'invoice/%s/pdf/' % invoice['label'], }, ), params={'apikey': API_KEY}, ) assert 'Content-Type' in r.headers assert 'Content-Disposition' in r.headers assert r.headers['Content-Type'] == 'application/pdf' assert r.headers['Content-Disposition'] == 'attachment; filename=%s.pdf' % invoice['label'] def test_pay_invoice(app, resource): r = app.get( reverse( 'generic-endpoint', kwargs={'connector': 'family', 'slug': resource.slug, 'endpoint': 'family', 'rest': 'link/'}, ), params={'NameID': nameid, 'login': '19184', 'password': '8xUhrK6e', 'apikey': API_KEY}, ) r = app.get( reverse( 'generic-endpoint', kwargs={'connector': 'family', 'slug': resource.slug, 'endpoint': 'regie', 'rest': 'invoices/'}, ), params={'NameID': nameid}, ) invoice = r.json['data'][0] payment_url = reverse( 'generic-endpoint', kwargs={ 'connector': 'family', 'slug': resource.slug, 'endpoint': 'regie', 'rest': 'invoice/%s/pay/' % invoice['label'], }, ) r = app.post_json( payment_url + '?' + urlencode({'apikey': API_KEY}), params={'transaction_id': str(uuid4()), 'transaction_date': timezone.now().strftime(DATETIME_FORMAT)}, ) assert r.json['data'] r = app.get( reverse( 'generic-endpoint', kwargs={ 'connector': 'family', 'slug': resource.slug, 'endpoint': 'regie', 'rest': 'invoice/%s/' % invoice['label'], }, ), params={'apikey': API_KEY}, ) assert r.json['data']['paid'] assert float(r.json['data']['amount']) == 0 def test_fondettes_concerto_loader(): Invoice.objects.all().delete() filepath = os.path.join(os.path.dirname(__file__), 'data', 'family_data_fondettes.zip') with open(filepath, 'rb') as fd: resource = GenericFamily.objects.create( title='test fondettes', slug='test-fondettes', archive=File(fd, 'family_data_fondettes.zip'), file_format='concerto_fondettes', ) assert Invoice.objects.filter(resource=resource).count() == 630 assert len([x for x in Invoice.objects.filter(resource=resource) if x.has_pdf]) == 4 assert Invoice.objects.filter(paid=True).count() == 312 assert Invoice.objects.filter(paid=False).count() == 318 assert Invoice.objects.filter(online_payment=False).count() == 2 def test_fondettes_opus_loader(): Invoice.objects.all().delete() filepath = os.path.join(os.path.dirname(__file__), 'data', 'family_data_opus_fondettes.zip') with open(filepath, 'rb') as fd: resource = GenericFamily.objects.create( title='test fondettes', slug='test-fondettes', archive=File(fd, 'family_data_opus_fondettes.zip'), file_format='opus_fondettes', ) assert Invoice.objects.filter(resource=resource).count() == 630 assert len([x for x in Invoice.objects.filter(resource=resource) if x.has_pdf]) == 4 assert Invoice.objects.filter(paid=True).count() == 314 assert Invoice.objects.filter(paid=False).count() == 316 def test_archive_validation(): filepath = os.path.join(os.path.dirname(__file__), 'data', 'soap.wsdl') with open(filepath) as fd: resource = GenericFamily.objects.create(title='test', slug='test', archive=File(fd, 'soap.wsdl')) with pytest.raises(ValidationError): resource.clean() def test_fondettes_concerto_validation(): filepath = os.path.join(os.path.dirname(__file__), 'data', 'family_data_fondettes.zip') with open(filepath, 'rb') as fd: resource = GenericFamily.objects.create( title='test fondettes', slug='test fondettes', archive=File(fd, 'family_data_fondettes.zip'), file_format='concerto_fondettes', ) resource.clean() filepath = os.path.join(os.path.dirname(__file__), 'data', 'family_data.zip') with open(filepath) as fd: resource.archive = File(fd, 'family_data.zip') with pytest.raises(ValidationError): resource.clean() def test_orleans_concerto_loader(): # all related objects will also be deleted Family.objects.all().delete() filepath = os.path.join(os.path.dirname(__file__), 'data', 'orleans', 'family_data_orleans.zip') with open(filepath, 'rb') as fd: resource = GenericFamily( title='test orleans', slug='test-orleans', archive=File(fd, 'family_data_orleans.zip'), file_format='concerto_orleans', ) from passerelle.apps.family.loaders.concerto_orleans import Loader loader = Loader(resource) with zipfile.ZipFile(filepath) as z: loader.archive = z families = loader.build_families() assert len(families) == 18 for family in families.values(): assert family['external_id'] assert family['adults'] assert family['login'] assert family['password'] assert family['zipcode'] assert family['city'] assert len(family['adults']) > 0 for adult in family['adults']: assert adult['first_name'] assert adult['last_name'] assert adult['sex'] assert adult['external_id'] assert adult['zipcode'] assert adult['city'] assert len(family['children']) >= 1 for child in family['children']: assert child['external_id'] assert child['first_name'] assert child['last_name'] assert child['sex'] assert 'birthdate' in child assert 'invoices' in family if family['invoices']: for invoice in family['invoices']: assert invoice['external_id'] assert invoice['label'] assert invoice['issue_date'] assert invoice['online_payment'] assert 'autobilling' in invoice assert 'amount' in invoice assert 'total_amount' in invoice # there are 4 families with invoices in test data assert len([f for f in families.values() if f['invoices']]) == 4 # and 14 families with no invoices assert len([f for f in families.values() if not f['invoices']]) == 14 resource.save() assert Family.objects.filter(resource=resource).count() == 18 assert Adult.objects.all().count() == 31 assert Child.objects.all().count() == 35 assert Invoice.objects.filter(resource=resource).count() == 0 def test_orleans_data_import_command(): with pytest.raises(CommandError) as error: call_command('import_orleans_data') assert str(error.value) == 'File exports_prcit.zip does not exist.' cur_dir = os.path.dirname(__file__) data_dir = os.path.join(cur_dir, 'data', 'orleans') with open(os.path.join(data_dir, 'family_data_orleans.zip'), 'rb') as fd: resource = GenericFamily.objects.create( title='test orleans', archive=File(fd, 'family_data_orleans.zip'), slug='test-orleans', file_format='concerto_orleans', ) # cleanup data before launching import Family.objects.filter(resource=resource).delete() Invoice.objects.filter(resource=resource).delete() call_command('import_orleans_data', archive_file=os.path.join(data_dir, 'family_data_orleans.zip')) assert Family.objects.filter(resource=resource).count() == 0 assert Invoice.objects.filter(resource=resource).count() == 0 resource_invoices_dir = default_storage.path('family-%s/invoices' % resource.id) # cleanup previosly created invoices dir, if exists if os.path.exists(resource_invoices_dir) and os.path.isdir(resource_invoices_dir): shutil.rmtree(resource_invoices_dir) os.symlink(os.path.join(data_dir, 'factures'), resource_invoices_dir) call_command( 'import_orleans_data', archive_file=os.path.join(data_dir, 'family_data_orleans.zip'), connector='test-orleans', ) os.unlink(resource_invoices_dir) assert Family.objects.filter(resource=resource).count() == 18 assert Adult.objects.all().count() == 31 assert Child.objects.all().count() == 35 assert Invoice.objects.filter(resource=resource).count() == 7 def test_family_pending_invoices_by_nameid_with_no_links(): test_orleans_data_import_command() resource = GenericFamily.objects.get() links = resource.get_pending_invoices_by_nameid(None) assert links['data'] == {} def test_family_pending_invoices_by_nameid(): test_orleans_data_import_command() resource = GenericFamily.objects.get() family = Family.objects.get(external_id='22380') FamilyLink.objects.create(resource=resource, family=family, name_id='testnameid1') family = Family.objects.get(external_id='1228') FamilyLink.objects.create(resource=resource, family=family, name_id='testnameid2') links = resource.get_pending_invoices_by_nameid(None) assert len(links['data']) == 2 for uuid, invoices in links['data'].items(): assert uuid in ('testnameid1', 'testnameid2') assert len(invoices) >= 1 def test_incorrect_orleans_data(caplog): filepath = os.path.join(os.path.dirname(__file__), 'data', 'family_incorrect_data_orleans.zip') with open(filepath, 'rb') as fd: GenericFamily.objects.create( title='test orleans', slug='test-orleans', archive=File(fd, 'family_incorrect_data_orleans.zip'), file_format='concerto_orleans', ) for record in caplog.records: assert 'Error occured while importing data:' in record.message assert record.name == 'passerelle.resource.family.test-orleans' assert record.levelno == logging.ERROR def test_egee_thonon_loader(): Invoice.objects.all().delete() filepath = os.path.join(os.path.dirname(__file__), 'data', 'family_data_egee_thonon.zip') with open(filepath, 'rb') as fd: resource = GenericFamily.objects.create( title='test-egee-thonon', slug='test-egee-thonon', archive=File(fd, 'family_data_egee_thonon.zip'), file_format='egee_thonon', ) assert Invoice.objects.filter(resource=resource).count() == 4 for invoice in Invoice.objects.all(): assert len(invoice.external_id) == 13 assert invoice.label == invoice.external_id assert invoice.online_payment is True assert invoice.no_online_payment_reason is None assert invoice.paid is False assert (invoice.pay_limit_date - invoice.issue_date).days == 62 # modify invoice list, and reimport Invoice.objects.first().delete() Invoice.objects.first().delete() paid_invoice = Invoice.objects.first() paid_invoice.paid = True paid_invoice.save() Invoice.objects.create(resource=resource, external_id='1234', amount='1234') call_command('update_families_from_zip') assert Invoice.objects.filter(resource=resource).count() == 4 for invoice in Invoice.objects.all(): assert len(invoice.external_id) == 13 if invoice.id == paid_invoice.id: assert invoice.paid is True # already paid, not updated else: assert invoice.paid is False # invalid zip file filepath = os.path.join(os.path.dirname(__file__), 'data', 'family_data.zip') with open(filepath, 'rb') as fd: resource.archive = File(fd, 'family_data.zip') with pytest.raises(ValidationError, match=r'Missing factures.xml file in zip'): resource.clean()