lingo/tests/pricing/test_import_export.py

460 lines
17 KiB
Python

import copy
import datetime
import json
import os
import shutil
import sys
import tempfile
from io import StringIO
import pytest
from django.core.management import call_command
from django.utils.encoding import force_bytes
from lingo.agendas.models import Agenda, CheckType, CheckTypeGroup
from lingo.pricing.models import AgendaPricing, Criteria, CriteriaCategory, Pricing, PricingCriteriaCategory
from lingo.pricing.utils import import_site
from lingo.utils.misc import AgendaImportError
pytestmark = pytest.mark.django_db
def get_output_of_command(command, *args, **kwargs):
old_stdout = sys.stdout
output = sys.stdout = StringIO()
call_command(command, *args, **kwargs)
sys.stdout = old_stdout
return output.getvalue()
def test_import_export(app):
Agenda.objects.create(label='Foo Bar')
pricing = Pricing.objects.create(label='Foo')
AgendaPricing.objects.create(
pricing=pricing,
date_start=datetime.date(year=2021, month=9, day=1),
date_end=datetime.date(year=2021, month=10, day=1),
)
CriteriaCategory.objects.create(label='Foo bar')
output = get_output_of_command('export_pricing_config')
assert len(json.loads(output)['agendas']) == 1
assert len(json.loads(output)['pricings']) == 1
assert len(json.loads(output)['pricings'][0]['agendas']) == 0
assert len(json.loads(output)['pricing_models']) == 1
assert len(json.loads(output)['pricing_categories']) == 1
import_site(data={}, clean=True)
empty_output = get_output_of_command('export_pricing_config')
assert len(json.loads(empty_output)['agendas']) == 1
assert len(json.loads(empty_output)['pricings']) == 0
assert len(json.loads(empty_output)['pricing_models']) == 0
assert len(json.loads(empty_output)['pricing_categories']) == 0
old_stdin = sys.stdin
sys.stdin = StringIO(json.dumps({}))
pricing = Pricing.objects.create(label='Foo')
AgendaPricing.objects.create(
pricing=pricing,
date_start=datetime.date(year=2021, month=9, day=1),
date_end=datetime.date(year=2021, month=10, day=1),
)
CriteriaCategory.objects.create(label='Foo bar')
old_stdin = sys.stdin
sys.stdin = StringIO(json.dumps({}))
assert AgendaPricing.objects.count() == 1
assert Pricing.objects.count() == 1
assert CriteriaCategory.objects.count() == 1
try:
call_command('import_pricing_config', '-', clean=True)
finally:
sys.stdin = old_stdin
assert AgendaPricing.objects.count() == 0
assert Pricing.objects.count() == 0
assert CriteriaCategory.objects.count() == 0
with tempfile.NamedTemporaryFile() as f:
f.write(force_bytes(output))
f.flush()
call_command('import_pricing_config', f.name)
assert AgendaPricing.objects.count() == 1
assert Pricing.objects.count() == 1
assert CriteriaCategory.objects.count() == 1
import_site(data={}, if_empty=True)
assert AgendaPricing.objects.count() == 1
assert Pricing.objects.count() == 1
assert CriteriaCategory.objects.count() == 1
import_site(data={}, clean=True)
tempdir = tempfile.mkdtemp('lingo-test')
empty_output = get_output_of_command('export_pricing_config', output=os.path.join(tempdir, 't.json'))
assert os.path.exists(os.path.join(tempdir, 't.json'))
shutil.rmtree(tempdir)
def test_import_export_agenda_pricing(app):
pricing = Pricing.objects.create(label='Foo')
agenda = Agenda.objects.create(label='Foo Bar')
agenda_pricing = AgendaPricing.objects.create(
pricing=pricing,
date_start=datetime.date(year=2021, month=9, day=1),
date_end=datetime.date(year=2021, month=10, day=1),
pricing_data={
'foo': 'bar',
},
)
agenda_pricing.agendas.set([agenda])
output = get_output_of_command('export_pricing_config')
import_site(data={}, clean=True)
assert Pricing.objects.count() == 0
data = json.loads(output)
Agenda.objects.all().delete()
with pytest.raises(AgendaImportError) as excinfo:
import_site(data, overwrite=True)
assert str(excinfo.value) == 'Missing "foo-bar" agenda'
agenda2 = Agenda.objects.create(label='Baz')
with pytest.raises(AgendaImportError) as excinfo:
import_site(data, overwrite=True)
assert str(excinfo.value) == 'Missing "foo-bar" agenda'
del data['pricing_models']
Pricing.objects.all().delete()
agenda = Agenda.objects.create(label='Foo Bar')
with pytest.raises(AgendaImportError) as excinfo:
import_site(data, overwrite=True)
assert str(excinfo.value) == 'Missing "foo" pricing model'
pricing = Pricing.objects.create(label='Foo')
import_site(data, overwrite=True)
agenda_pricing = AgendaPricing.objects.latest('pk')
assert list(agenda_pricing.agendas.all()) == [agenda]
assert agenda_pricing.pricing == pricing
assert agenda_pricing.date_start == datetime.date(year=2021, month=9, day=1)
assert agenda_pricing.date_end == datetime.date(year=2021, month=10, day=1)
assert agenda_pricing.pricing_data == {'foo': 'bar'}
# again
import_site(data)
agenda_pricing = AgendaPricing.objects.get(pk=agenda_pricing.pk)
assert list(agenda_pricing.agendas.all()) == [agenda]
assert agenda_pricing.pricing == pricing
data['pricings'].append(
{
'pricing': 'foo',
'agendas': ['foo-bar', 'baz'],
'date_start': '2022-09-01',
'date_end': '2022-10-01',
'pricing_data': {'foo': 'bar'},
}
)
import_site(data)
agenda_pricing = AgendaPricing.objects.latest('pk')
assert list(agenda_pricing.agendas.all().order_by('slug')) == [agenda2, agenda]
assert agenda_pricing.pricing == pricing
assert agenda_pricing.date_start == datetime.date(year=2022, month=9, day=1)
assert agenda_pricing.date_end == datetime.date(year=2022, month=10, day=1)
assert agenda_pricing.pricing_data == {'foo': 'bar'}
def test_import_export_agenda_with_check_types(app):
group = CheckTypeGroup.objects.create(label='foo')
agenda = Agenda.objects.create(label='Foo Bar', check_type_group=group)
output = get_output_of_command('export_pricing_config')
import_site(data={}, clean=True)
assert CheckTypeGroup.objects.count() == 0
data = json.loads(output)
del data['check_type_groups']
agenda.check_type_group = None
agenda.save()
with pytest.raises(AgendaImportError) as excinfo:
import_site(data, overwrite=True)
assert str(excinfo.value) == 'Missing "foo" check type group'
CheckTypeGroup.objects.create(label='foobar')
with pytest.raises(AgendaImportError) as excinfo:
import_site(data, overwrite=True)
assert str(excinfo.value) == 'Missing "foo" check type group'
group = CheckTypeGroup.objects.create(label='foo')
import_site(data, overwrite=True)
agenda.refresh_from_db()
assert agenda.check_type_group == group
def test_import_export_pricing_criteria_category(app):
output = get_output_of_command('export_pricing_config')
payload = json.loads(output)
assert len(payload['pricing_categories']) == 0
category = CriteriaCategory.objects.create(label='Foo bar')
Criteria.objects.create(label='Foo reason', category=category)
Criteria.objects.create(label='Baz', category=category)
output = get_output_of_command('export_pricing_config')
payload = json.loads(output)
assert len(payload['pricing_categories']) == 1
category.delete()
assert not CriteriaCategory.objects.exists()
assert not Criteria.objects.exists()
import_site(copy.deepcopy(payload))
assert CriteriaCategory.objects.count() == 1
category = CriteriaCategory.objects.first()
assert category.label == 'Foo bar'
assert category.slug == 'foo-bar'
assert category.criterias.count() == 2
assert Criteria.objects.get(category=category, label='Foo reason', slug='foo-reason')
assert Criteria.objects.get(category=category, label='Baz', slug='baz')
# update
update_payload = copy.deepcopy(payload)
update_payload['pricing_categories'][0]['label'] = 'Foo bar Updated'
import_site(update_payload)
category.refresh_from_db()
assert category.label == 'Foo bar Updated'
# insert another category
category.slug = 'foo-bar-updated'
category.save()
import_site(copy.deepcopy(payload))
assert CriteriaCategory.objects.count() == 2
category = CriteriaCategory.objects.latest('pk')
assert category.label == 'Foo bar'
assert category.slug == 'foo-bar'
assert category.criterias.count() == 2
assert Criteria.objects.get(category=category, label='Foo reason', slug='foo-reason')
assert Criteria.objects.get(category=category, label='Baz', slug='baz')
# with overwrite
Criteria.objects.create(category=category, label='Baz2')
import_site(copy.deepcopy(payload), overwrite=True)
assert CriteriaCategory.objects.count() == 2
category = CriteriaCategory.objects.latest('pk')
assert category.label == 'Foo bar'
assert category.slug == 'foo-bar'
assert category.criterias.count() == 2
assert Criteria.objects.get(category=category, label='Foo reason', slug='foo-reason')
assert Criteria.objects.get(category=category, label='Baz', slug='baz')
def test_import_export_pricing(app):
output = get_output_of_command('export_pricing_config')
payload = json.loads(output)
assert len(payload['pricing_models']) == 0
pricing = Pricing.objects.create(label='Foo bar', extra_variables={'foo': 'bar'})
output = get_output_of_command('export_pricing_config')
payload = json.loads(output)
assert len(payload['pricing_models']) == 1
pricing.delete()
assert not Pricing.objects.exists()
import_site(copy.deepcopy(payload))
assert Pricing.objects.count() == 1
pricing = Pricing.objects.first()
assert pricing.label == 'Foo bar'
assert pricing.slug == 'foo-bar'
assert pricing.extra_variables == {'foo': 'bar'}
# update
update_payload = copy.deepcopy(payload)
update_payload['pricing_models'][0]['label'] = 'Foo bar Updated'
import_site(update_payload)
pricing.refresh_from_db()
assert pricing.label == 'Foo bar Updated'
# insert another pricing
pricing.slug = 'foo-bar-updated'
pricing.save()
import_site(copy.deepcopy(payload))
assert Pricing.objects.count() == 2
pricing = Pricing.objects.latest('pk')
assert pricing.label == 'Foo bar'
assert pricing.slug == 'foo-bar'
assert pricing.extra_variables == {'foo': 'bar'}
def test_import_export_pricing_with_categories(app):
pricing = Pricing.objects.create(label='Foo bar')
category = CriteriaCategory.objects.create(label='Foo bar')
pricing.categories.add(category, through_defaults={'order': 42})
output = get_output_of_command('export_pricing_config')
import_site(data={}, clean=True)
assert Pricing.objects.count() == 0
assert CriteriaCategory.objects.count() == 0
data = json.loads(output)
del data['pricing_categories']
with pytest.raises(AgendaImportError) as excinfo:
import_site(data, overwrite=True)
assert str(excinfo.value) == 'Missing "foo-bar" pricing category'
CriteriaCategory.objects.create(label='Foobar')
with pytest.raises(AgendaImportError) as excinfo:
import_site(data, overwrite=True)
assert str(excinfo.value) == 'Missing "foo-bar" pricing category'
category = CriteriaCategory.objects.create(label='Foo bar')
import_site(data, overwrite=True)
pricing = Pricing.objects.get(slug=pricing.slug)
assert list(pricing.categories.all()) == [category]
assert PricingCriteriaCategory.objects.first().order == 42
category2 = CriteriaCategory.objects.create(label='Foo bar 2')
category3 = CriteriaCategory.objects.create(label='Foo bar 3')
pricing.categories.add(category2, through_defaults={'order': 1})
output = get_output_of_command('export_pricing_config')
data = json.loads(output)
del data['pricing_categories']
data['pricing_models'][0]['categories'] = [
{
'category': 'foo-bar-3',
'order': 1,
'criterias': [],
},
{
'category': 'foo-bar',
'order': 35,
'criterias': [],
},
]
import_site(data, overwrite=True)
assert list(pricing.categories.all()) == [category, category3]
assert list(
PricingCriteriaCategory.objects.filter(pricing=pricing).values_list('category', flat=True)
) == [category3.pk, category.pk]
assert list(PricingCriteriaCategory.objects.filter(pricing=pricing).values_list('order', flat=True)) == [
1,
35,
]
assert list(pricing.criterias.all()) == []
criteria1 = Criteria.objects.create(label='Crit 1', category=category)
Criteria.objects.create(label='Crit 2', category=category)
criteria3 = Criteria.objects.create(label='Crit 3', category=category)
# unknown criteria
data['pricing_models'][0]['categories'] = [
{
'category': 'foo-bar-3',
'order': 1,
'criterias': ['unknown'],
},
{
'category': 'foo-bar',
'order': 35,
'criterias': [],
},
]
with pytest.raises(AgendaImportError) as excinfo:
import_site(data, overwrite=True)
assert str(excinfo.value) == 'Missing "unknown" pricing criteria for "foo-bar-3" category'
# wrong criteria (from another category)
data['pricing_models'][0]['categories'] = [
{
'category': 'foo-bar-3',
'order': 1,
'criterias': ['crit-1'],
},
{
'category': 'foo-bar',
'order': 35,
'criterias': [],
},
]
with pytest.raises(AgendaImportError) as excinfo:
import_site(data, overwrite=True)
assert str(excinfo.value) == 'Missing "crit-1" pricing criteria for "foo-bar-3" category'
data['pricing_models'][0]['categories'] = [
{
'category': 'foo-bar-3',
'order': 1,
'criterias': [],
},
{
'category': 'foo-bar',
'order': 35,
'criterias': ['crit-1', 'crit-3'],
},
]
import_site(data, overwrite=True)
assert list(pricing.categories.all()) == [category, category3]
assert list(
PricingCriteriaCategory.objects.filter(pricing=pricing).values_list('category', flat=True)
) == [category3.pk, category.pk]
assert list(PricingCriteriaCategory.objects.filter(pricing=pricing).values_list('order', flat=True)) == [
1,
35,
]
assert set(pricing.criterias.all()) == {criteria1, criteria3}
def test_import_export_check_type_group(app):
output = get_output_of_command('export_pricing_config')
payload = json.loads(output)
assert len(payload['check_type_groups']) == 0
group = CheckTypeGroup.objects.create(label='Foo bar')
CheckType.objects.create(label='Foo reason', group=group)
CheckType.objects.create(label='Baz', group=group)
output = get_output_of_command('export_pricing_config')
payload = json.loads(output)
assert len(payload['check_type_groups']) == 1
group.delete()
assert not CheckTypeGroup.objects.exists()
assert not CheckType.objects.exists()
import_site(copy.deepcopy(payload))
assert CheckTypeGroup.objects.count() == 1
group = CheckTypeGroup.objects.first()
assert group.label == 'Foo bar'
assert group.slug == 'foo-bar'
assert group.check_types.count() == 2
assert CheckType.objects.get(group=group, label='Foo reason', slug='foo-reason')
assert CheckType.objects.get(group=group, label='Baz', slug='baz')
# update
update_payload = copy.deepcopy(payload)
update_payload['check_type_groups'][0]['label'] = 'Foo bar Updated'
import_site(update_payload)
group.refresh_from_db()
assert group.label == 'Foo bar Updated'
# insert another group
group.slug = 'foo-bar-updated'
group.save()
import_site(copy.deepcopy(payload))
assert CheckTypeGroup.objects.count() == 2
group = CheckTypeGroup.objects.latest('pk')
assert group.label == 'Foo bar'
assert group.slug == 'foo-bar'
assert group.check_types.count() == 2
assert CheckType.objects.get(group=group, label='Foo reason', slug='foo-reason')
assert CheckType.objects.get(group=group, label='Baz', slug='baz')
# with overwrite
CheckType.objects.create(group=group, label='Baz2')
import_site(copy.deepcopy(payload), overwrite=True)
assert CheckTypeGroup.objects.count() == 2
group = CheckTypeGroup.objects.latest('pk')
assert group.label == 'Foo bar'
assert group.slug == 'foo-bar'
assert group.check_types.count() == 2
assert CheckType.objects.get(group=group, label='Foo reason', slug='foo-reason')
assert CheckType.objects.get(group=group, label='Baz', slug='baz')