362 lines
16 KiB
Python
362 lines
16 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
import pytest
|
|
import os
|
|
from uuid import uuid4
|
|
import zipfile
|
|
import logging
|
|
import shutil
|
|
|
|
from django.core.exceptions import ValidationError
|
|
from django.core.files import File
|
|
from django.core.urlresolvers import reverse
|
|
from django.utils import timezone
|
|
from django.utils.http import urlencode
|
|
from django.core.files import File
|
|
from django.core.management import call_command
|
|
from django.core.management.base import CommandError
|
|
from django.core.files.storage import default_storage
|
|
|
|
from passerelle.apps.family.models import GenericFamily, Family, FamilyLink
|
|
from passerelle.apps.family.models import Invoice, Adult, Child, DATETIME_FORMAT
|
|
|
|
from django.core.files import File
|
|
from django.contrib.contenttypes.models import ContentType
|
|
|
|
from passerelle.base.models import ApiUser, AccessRight
|
|
|
|
|
|
pytestmark = pytest.mark.django_db
|
|
|
|
nameid = 'foobarnameid'
|
|
API_KEY = 'family'
|
|
|
|
@pytest.fixture
|
|
def resource(db):
|
|
filepath = os.path.join(os.path.dirname(__file__), 'data', 'family_data.zip')
|
|
with open(filepath) as fd:
|
|
resource = GenericFamily.objects.create(title='test', slug='test', archive=File(fd, 'family_data.zip'))
|
|
api = ApiUser.objects.create(username='family', keytype='API', key=API_KEY)
|
|
obj_type = ContentType.objects.get_for_model(GenericFamily)
|
|
AccessRight.objects.create(codename='can_access', apiuser=api,
|
|
resource_type=obj_type, resource_pk=resource.pk)
|
|
return resource
|
|
|
|
def test_link_to_family(app, resource):
|
|
r = app.get(reverse('generic-endpoint', kwargs={'connector': 'family',
|
|
'slug': resource.slug, 'endpoint': 'family', 'rest': 'link/'}),
|
|
params={'NameID': nameid, 'login': '9407',
|
|
'password': 'gkh0UrrH', 'apikey': API_KEY})
|
|
assert r.json['data']
|
|
assert FamilyLink.objects.filter(resource=resource, name_id=nameid).exists()
|
|
|
|
def test_unlink_from_family(app, resource):
|
|
r = app.get(reverse('generic-endpoint', kwargs={'connector': 'family',
|
|
'slug': resource.slug, 'endpoint': 'family', 'rest': 'link/'}),
|
|
params={'NameID': nameid, 'login': '1364',
|
|
'password': 'Li6LN1ID', 'apikey': API_KEY})
|
|
assert r.json['data']
|
|
r = app.get(reverse('generic-endpoint', kwargs={'connector': 'family',
|
|
'slug': resource.slug, 'endpoint': 'family', 'rest': 'unlink/'}),
|
|
params={'NameID': nameid, 'apikey': API_KEY})
|
|
assert r.json['data']
|
|
assert not FamilyLink.objects.filter(resource=resource, name_id=nameid).exists()
|
|
|
|
def test_family_infos(app, resource):
|
|
r = app.get(reverse('generic-endpoint', kwargs={'connector': 'family',
|
|
'slug': resource.slug, 'endpoint': 'family', 'rest': 'link/'}),
|
|
params={'NameID': nameid, 'login': '9407',
|
|
'password': 'gkh0UrrH', 'apikey': API_KEY})
|
|
assert r.json['data']
|
|
r = app.get(reverse('generic-endpoint', kwargs={'connector': 'family',
|
|
'slug': resource.slug, 'endpoint': 'family/'}),
|
|
params={'NameID': nameid, 'apikey': API_KEY})
|
|
data = r.json['data']
|
|
assert data['id']
|
|
assert data['adults']
|
|
assert data['children']
|
|
assert data['street_number'] is not None
|
|
assert data['street_name'] is not None
|
|
assert data['zipcode'] is not None
|
|
assert data['address_complement'] is not None
|
|
assert data['city'] is not None
|
|
assert data['address'] is not None
|
|
assert data['quotient'] == '1370.50'
|
|
|
|
def test_family_members(app, resource):
|
|
r = app.get(reverse('generic-endpoint', kwargs={'connector': 'family',
|
|
'slug': resource.slug, 'endpoint': 'family', 'rest': 'link/'}),
|
|
params={'NameID': nameid, 'login': '23762',
|
|
'password': 's6HliUMX', 'apikey': API_KEY})
|
|
assert r.json['data']
|
|
for typ in ('adults', 'children'):
|
|
r = app.get(reverse('generic-endpoint', kwargs={'connector': 'family',
|
|
'slug': resource.slug, 'endpoint': 'family', 'rest': '%s/' % typ}),
|
|
params={'NameID': nameid, 'apikey': API_KEY})
|
|
data = r.json['data']
|
|
|
|
assert len(data)
|
|
for person in data:
|
|
assert person['id']
|
|
assert person['first_name']
|
|
assert person['last_name']
|
|
assert person['street_number'] is not None
|
|
assert person['street_name'] is not None
|
|
assert person['zipcode'] is not None
|
|
assert person['address_complement'] is not None
|
|
assert person['city'] is not None
|
|
assert person['address'] is not None
|
|
|
|
r = app.get(reverse('generic-endpoint', kwargs={'connector': 'family',
|
|
'slug': resource.slug, 'endpoint': 'family', 'rest': '%s/' % typ}),
|
|
params={'NameID': 'unknown', 'apikey': API_KEY})
|
|
data = r.json['data']
|
|
assert len(data) == 0
|
|
|
|
def test_get_family_invoices(app, resource):
|
|
r = app.get(reverse('generic-endpoint', kwargs={'connector': 'family',
|
|
'slug': resource.slug, 'endpoint': 'family', 'rest': 'link/'}),
|
|
params={'NameID': nameid, 'login': '19184',
|
|
'password': '8xUhrK6e', 'apikey': API_KEY})
|
|
r = app.get(reverse('generic-endpoint', kwargs={'connector': 'family',
|
|
'slug': resource.slug, 'endpoint': 'regie', 'rest': 'invoices/'}),
|
|
params={'NameID': nameid, 'apikey': API_KEY})
|
|
assert r.json['data']
|
|
for i in r.json['data']:
|
|
assert not i['paid']
|
|
|
|
r = app.get(reverse('generic-endpoint', kwargs={'connector': 'family',
|
|
'slug': resource.slug, 'endpoint': 'regie',
|
|
'rest': 'invoices/history/'}),
|
|
params={'NameID': nameid, 'apikey': API_KEY})
|
|
assert r.json['data']
|
|
for i in r.json['data']:
|
|
assert i['paid']
|
|
assert i['has_pdf']
|
|
|
|
def test_get_family_invoice(app, resource):
|
|
r = app.get(reverse('generic-endpoint', kwargs={'connector': 'family',
|
|
'slug': resource.slug, 'endpoint': 'family', 'rest': 'link/'}),
|
|
params={'NameID': nameid, 'login': '19184',
|
|
'password': '8xUhrK6e', 'apikey': API_KEY})
|
|
r = app.get(reverse('generic-endpoint', kwargs={'connector': 'family',
|
|
'slug': resource.slug, 'endpoint': 'regie',
|
|
'rest': 'invoices/'}),
|
|
params={'NameID': nameid, 'apikey': API_KEY})
|
|
assert r.json['data']
|
|
invoice = r.json['data'][0]
|
|
|
|
r = app.get(reverse('generic-endpoint', kwargs={'connector': 'family',
|
|
'slug': resource.slug, 'endpoint': 'regie',
|
|
'rest': 'invoice/%s/' % invoice['label']}),
|
|
params={'apikey': API_KEY})
|
|
assert r.json['data']
|
|
for field in ('label', 'amount', 'paid', 'created', 'pay_limit_date'):
|
|
assert r.json['data'][field] == invoice[field]
|
|
|
|
def test_get_invoice_pdf(app, resource):
|
|
r = app.get(reverse('generic-endpoint', kwargs={'connector': 'family',
|
|
'slug': resource.slug, 'endpoint': 'family', 'rest': 'link/'}),
|
|
params={'NameID': nameid, 'login': '11959',
|
|
'password': '1WI6JOux', 'apikey': API_KEY})
|
|
r = app.get(reverse('generic-endpoint', kwargs={'connector': 'family',
|
|
'slug': resource.slug, 'endpoint': 'regie',
|
|
'rest': 'invoices/history/'}),
|
|
params={'NameID': nameid, 'apikey': API_KEY})
|
|
invoice = r.json['data'][0]
|
|
assert invoice
|
|
r = app.get(reverse('generic-endpoint', kwargs={'connector': 'family',
|
|
'slug': resource.slug, 'endpoint': 'regie',
|
|
'rest': 'invoice/%s/pdf/' % invoice['label']}),
|
|
params={'apikey': API_KEY})
|
|
assert 'Content-Type' in r.headers
|
|
assert 'Content-Disposition' in r.headers
|
|
assert r.headers['Content-Type'] == 'application/pdf'
|
|
assert r.headers['Content-Disposition'] == 'attachment; filename=%s.pdf' % invoice['label']
|
|
|
|
def test_pay_invoice(app, resource):
|
|
r = app.get(reverse('generic-endpoint', kwargs={'connector': 'family',
|
|
'slug': resource.slug, 'endpoint': 'family', 'rest': 'link/'}),
|
|
params={'NameID': nameid, 'login': '19184',
|
|
'password': '8xUhrK6e', 'apikey': API_KEY})
|
|
r = app.get(reverse('generic-endpoint', kwargs={'connector': 'family',
|
|
'slug': resource.slug, 'endpoint': 'regie',
|
|
'rest': 'invoices/'}),
|
|
params={'NameID': nameid})
|
|
invoice = r.json['data'][0]
|
|
payment_url = reverse('generic-endpoint', kwargs={'connector': 'family',
|
|
'slug': resource.slug, 'endpoint': 'regie',
|
|
'rest': 'invoice/%s/pay/' % invoice['label']})
|
|
r = app.post_json(payment_url + '?' + urlencode({'apikey': API_KEY}),
|
|
params={'transaction_id': str(uuid4()),
|
|
'transaction_date': timezone.now().strftime(DATETIME_FORMAT)}
|
|
)
|
|
assert r.json['data']
|
|
r = app.get(reverse('generic-endpoint', kwargs={'connector': 'family',
|
|
'slug': resource.slug, 'endpoint': 'regie',
|
|
'rest': 'invoice/%s/' % invoice['label']}),
|
|
params={'apikey': API_KEY})
|
|
assert r.json['data']['paid']
|
|
assert float(r.json['data']['amount']) == 0
|
|
|
|
def test_fondettes_concerto_loader():
|
|
Invoice.objects.all().delete()
|
|
filepath = os.path.join(os.path.dirname(__file__), 'data', 'family_data_fondettes.zip')
|
|
with open(filepath) as fd:
|
|
resource = GenericFamily.objects.create(title='test fondettes',
|
|
slug='test-fondettes', archive=File(fd, 'family_data_fondettes.zip'),
|
|
file_format='concerto_fondettes')
|
|
assert Invoice.objects.filter(resource=resource).count() == 630
|
|
assert len([x for x in Invoice.objects.filter(resource=resource) if x.has_pdf]) == 4
|
|
assert Invoice.objects.filter(paid=True).count() == 314
|
|
assert Invoice.objects.filter(paid=False).count() == 316
|
|
|
|
def test_archive_validation():
|
|
filepath = os.path.join(os.path.dirname(__file__), 'data', 'iparapheur.wsdl')
|
|
with open(filepath) as fd:
|
|
resource = GenericFamily.objects.create(title='test', slug='test', archive=File(fd, 'iparapheur.wsdl'))
|
|
with pytest.raises(ValidationError):
|
|
resource.clean()
|
|
|
|
def test_fondettes_concerto_validation():
|
|
filepath = os.path.join(os.path.dirname(__file__), 'data', 'family_data_fondettes.zip')
|
|
with open(filepath) as fd:
|
|
resource = GenericFamily.objects.create(title='test fondettes',
|
|
slug='test fondettes', archive=File(fd, 'family_data_fondettes.zip'), file_format='concerto_fondettes')
|
|
resource.clean()
|
|
|
|
filepath = os.path.join(os.path.dirname(__file__), 'data', 'family_data.zip')
|
|
resource.archive = File(open(filepath), 'family_data.zip')
|
|
with pytest.raises(ValidationError):
|
|
resource.clean()
|
|
|
|
def test_orleans_concerto_loader():
|
|
# all related objects will also be deleted
|
|
Family.objects.all().delete()
|
|
filepath = os.path.join(os.path.dirname(__file__), 'data', 'orleans',
|
|
'family_data_orleans.zip')
|
|
resource = GenericFamily(title='test orleans',
|
|
slug='test-orleans', archive=File(open(filepath), 'family_data_orleans.zip'), file_format='concerto_orleans')
|
|
from passerelle.apps.family.loaders.concerto_orleans import Loader
|
|
loader = Loader(resource)
|
|
loader.archive = zipfile.ZipFile(filepath)
|
|
|
|
families = loader.build_families()
|
|
assert len(families) == 18
|
|
for family in families.values():
|
|
assert family['external_id']
|
|
assert family['adults']
|
|
assert family['login']
|
|
assert family['password']
|
|
assert family['zipcode']
|
|
assert family['city']
|
|
assert len(family['adults']) > 0
|
|
|
|
for adult in family['adults']:
|
|
assert adult['first_name']
|
|
assert adult['last_name']
|
|
assert adult['sex']
|
|
assert adult['external_id']
|
|
assert adult['zipcode']
|
|
assert adult['city']
|
|
assert len(family['children']) >= 1
|
|
for child in family['children']:
|
|
assert child['external_id']
|
|
assert child['first_name']
|
|
assert child['last_name']
|
|
assert child['sex']
|
|
assert 'birthdate' in child
|
|
|
|
assert 'invoices' in family
|
|
if family['invoices']:
|
|
for invoice in family['invoices']:
|
|
assert invoice['external_id']
|
|
assert invoice['label']
|
|
assert invoice['issue_date']
|
|
assert invoice['online_payment']
|
|
assert 'autobilling' in invoice
|
|
assert 'amount' in invoice
|
|
assert 'total_amount' in invoice
|
|
|
|
# there are 4 families with invoices in test data
|
|
assert len([f for f in families.values() if f['invoices']]) == 4
|
|
# and 14 families with no invoices
|
|
assert len([f for f in families.values() if not f['invoices']]) == 14
|
|
|
|
resource.save()
|
|
|
|
assert Family.objects.filter(resource=resource).count() == 18
|
|
assert Adult.objects.all().count() == 31
|
|
assert Child.objects.all().count() == 35
|
|
assert Invoice.objects.filter(resource=resource).count() == 0
|
|
|
|
def test_orleans_data_import_command():
|
|
with pytest.raises(CommandError) as error:
|
|
call_command('import_orleans_data')
|
|
assert str(error.value) == 'File exports_prcit.zip does not exist.'
|
|
|
|
cur_dir = os.path.dirname(__file__)
|
|
data_dir = os.path.join(cur_dir, 'data', 'orleans')
|
|
|
|
with open(os.path.join(data_dir, 'family_data_orleans.zip')) as fd:
|
|
resource = GenericFamily.objects.create(title='test orleans',
|
|
archive=File(fd, 'family_data_orleans.zip'),
|
|
slug='test-orleans', file_format='concerto_orleans')
|
|
|
|
# cleanup data before launching import
|
|
Family.objects.filter(resource=resource).delete()
|
|
Invoice.objects.filter(resource=resource).delete()
|
|
|
|
call_command('import_orleans_data',
|
|
archive_file=os.path.join(data_dir, 'family_data_orleans.zip'))
|
|
assert Family.objects.filter(resource=resource).count() == 0
|
|
assert Invoice.objects.filter(resource=resource).count() == 0
|
|
|
|
resource_invoices_dir = default_storage.path('family-%s/invoices' % resource.id)
|
|
# cleanup previosly created invoices dir, if exists
|
|
if os.path.exists(resource_invoices_dir) and os.path.isdir(resource_invoices_dir):
|
|
shutil.rmtree(resource_invoices_dir)
|
|
os.symlink(os.path.join(data_dir, 'factures'), resource_invoices_dir)
|
|
|
|
call_command('import_orleans_data',
|
|
archive_file=os.path.join(data_dir, 'family_data_orleans.zip'),
|
|
connector='test-orleans')
|
|
os.unlink(resource_invoices_dir)
|
|
|
|
assert Family.objects.filter(resource=resource).count() == 18
|
|
assert Adult.objects.all().count() == 31
|
|
assert Child.objects.all().count() == 35
|
|
assert Invoice.objects.filter(resource=resource).count() == 7
|
|
|
|
def test_family_pending_invoices_by_nameid_with_no_links():
|
|
test_orleans_data_import_command()
|
|
resource = GenericFamily.objects.get()
|
|
links = resource.get_pending_invoices_by_nameid(None)
|
|
assert links['data'] == {}
|
|
|
|
def test_family_pending_invoices_by_nameid():
|
|
test_orleans_data_import_command()
|
|
resource = GenericFamily.objects.get()
|
|
family = Family.objects.get(external_id='22380')
|
|
link = FamilyLink.objects.create(resource=resource, family=family,
|
|
name_id='testnameid1')
|
|
family = Family.objects.get(external_id='1228')
|
|
link = FamilyLink.objects.create(resource=resource, family=family,
|
|
name_id='testnameid2')
|
|
links = resource.get_pending_invoices_by_nameid(None)
|
|
assert len(links['data']) == 2
|
|
for uuid, invoices in links['data'].iteritems():
|
|
assert uuid in ('testnameid1', 'testnameid2')
|
|
assert len(invoices) >= 1
|
|
|
|
def test_incorrect_orleans_data(caplog):
|
|
filepath = os.path.join(os.path.dirname(__file__), 'data',
|
|
'family_incorrect_data_orleans.zip')
|
|
resource = GenericFamily.objects.create(title='test orleans',
|
|
slug='test-orleans', archive=File(open(filepath), 'family_incorrect_data_orleans.zip'),
|
|
file_format='concerto_orleans')
|
|
for record in caplog.records:
|
|
assert 'Error occured while importing data:' in record.message
|
|
assert record.name == 'passerelle.resource.family.test-orleans'
|
|
assert record.levelno == logging.ERROR
|