pricing: import/export pricing models (#64903)

This commit is contained in:
Lauréline Guérin 2022-05-09 17:06:24 +02:00
parent 1cd72eeebc
commit 11f5fa3506
No known key found for this signature in database
GPG Key ID: 1FAB9B9B4F93D473
10 changed files with 345 additions and 9 deletions

View File

@ -1428,11 +1428,13 @@ class AgendasExportForm(forms.Form):
pricing_categories = forms.BooleanField(
label=_('Pricing criteria categories'), required=False, initial=True
)
pricing_models = forms.BooleanField(label=_('Pricing models'), required=False, initial=True)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if not settings.CHRONO_ENABLE_PRICING:
del self.fields['pricing_categories']
del self.fields['pricing_models']
class SharedCustodyRuleForm(forms.ModelForm):

View File

@ -30,7 +30,7 @@ from chrono.agendas.models import (
Resource,
UnavailabilityCalendar,
)
from chrono.pricing.models import CriteriaCategory
from chrono.pricing.models import CriteriaCategory, Pricing
def export_site(
@ -41,9 +41,12 @@ def export_site(
resources=True,
categories=True,
pricing_categories=True,
pricing_models=True,
):
'''Dump site objects to JSON-dumpable dictionnary'''
data = collections.OrderedDict()
if pricing_models:
data['pricing_models'] = [x.export_json() for x in Pricing.objects.all()]
if pricing_categories:
data['pricing_categories'] = [x.export_json() for x in CriteriaCategory.objects.all()]
if categories:
@ -73,6 +76,7 @@ def import_site(data, if_empty=False, clean=False, overwrite=False):
or Resource.objects.exists()
or Category.objects.exists()
or CriteriaCategory.objects.exists()
or Pricing.objects.exists()
):
return
@ -84,6 +88,7 @@ def import_site(data, if_empty=False, clean=False, overwrite=False):
Resource.objects.all().delete()
Category.objects.all().delete()
CriteriaCategory.objects.all().delete()
Pricing.objects.all().delete()
results = {
key: collections.defaultdict(list)
@ -95,6 +100,7 @@ def import_site(data, if_empty=False, clean=False, overwrite=False):
'resources',
'categories',
'pricing_categories',
'pricing_models',
]
}
@ -119,6 +125,7 @@ def import_site(data, if_empty=False, clean=False, overwrite=False):
(UnavailabilityCalendar, 'unavailability_calendars'),
(Agenda, 'agendas'),
(CriteriaCategory, 'pricing_categories'),
(Pricing, 'pricing_models'),
):
objs = data.get(key, [])
for obj in objs:

View File

@ -1086,6 +1086,20 @@ class AgendasImportView(FormView):
x,
),
},
'pricing_models': {
'create_noop': _('No pricing model created.'),
'create': lambda x: ungettext(
'A pricing model has been created.',
'%(count)d pricing models have been created.',
x,
),
'update_noop': _('No pricing model updated.'),
'update': lambda x: ungettext(
'A pricing model has been updated.',
'%(count)d pricing models have been updated.',
x,
),
},
}
global_noop = True
@ -1106,18 +1120,19 @@ class AgendasImportView(FormView):
obj_results['messages'] = "%s %s" % (message1, message2)
a_count, uc_count, arg_count, pc_count = (
a_count, uc_count, arg_count, pc_count, pm_count = (
len(results['agendas']['all']),
len(results['unavailability_calendars']['all']),
len(results['check_type_groups']['all']),
len(results['pricing_categories']['all']),
len(results['pricing_models']['all']),
)
if (a_count, uc_count, arg_count, pc_count) == (1, 0, 0, 0):
if (a_count, uc_count, arg_count, pc_count, pm_count) == (1, 0, 0, 0, 0):
# only one agenda imported, redirect to settings page
return HttpResponseRedirect(
reverse('chrono-manager-agenda-settings', kwargs={'pk': results['agendas']['all'][0].pk})
)
if (a_count, uc_count, arg_count, pc_count) == (0, 1, 0, 0):
if (a_count, uc_count, arg_count, pc_count, pm_count) == (0, 1, 0, 0, 0):
# only one unavailability calendar imported, redirect to settings page
return HttpResponseRedirect(
reverse(
@ -1125,12 +1140,20 @@ class AgendasImportView(FormView):
kwargs={'pk': results['unavailability_calendars']['all'][0].pk},
)
)
if (a_count, uc_count, arg_count, pc_count) == (0, 0, 1, 0):
if (a_count, uc_count, arg_count, pc_count, pm_count) == (0, 0, 1, 0, 0):
# only one check type group imported, redirect to check type page
return HttpResponseRedirect(reverse('chrono-manager-check-type-list'))
if (a_count, uc_count, arg_count, pc_count) == (0, 0, 0, 1):
if (a_count, uc_count, arg_count, pc_count, pm_count) == (0, 0, 0, 1, 0):
# only one criteria category imported, redirect to criteria page
return HttpResponseRedirect(reverse('chrono-manager-pricing-criteria-list'))
if (a_count, uc_count, arg_count, pc_count, pm_count) == (0, 0, 0, 0, 1):
# only one pricing imported, redirect to pricing page
return HttpResponseRedirect(
reverse(
'chrono-manager-pricing-detail',
kwargs={'pk': results['pricing_models']['all'][0].pk},
)
)
if global_noop:
messages.info(self.request, _('No data found.'))
@ -1142,6 +1165,7 @@ class AgendasImportView(FormView):
messages.info(self.request, results['resources']['messages'])
messages.info(self.request, results['categories']['messages'])
messages.info(self.request, results['pricing_categories']['messages'])
messages.info(self.request, results['pricing_models']['messages'])
return super().form_valid(form)

View File

@ -23,7 +23,7 @@ from django.utils.text import slugify
from django.utils.translation import ugettext_lazy as _
from chrono.agendas.models import Agenda, Booking, Subscription
from chrono.utils.misc import clean_import_data, generate_slug
from chrono.utils.misc import AgendaImportError, clean_import_data, generate_slug
class PricingError(Exception):
@ -205,6 +205,48 @@ class Pricing(models.Model):
def get_extra_variables_keys(self):
return sorted((self.extra_variables or {}).keys())
@classmethod
def import_json(cls, data, overwrite=False):
data = data.copy()
categories = data.pop('categories', [])
categories_by_slug = {c.slug: c for c in CriteriaCategory.objects.all()}
criterias_by_categories_and_slug = {
(crit.category.slug, crit.slug): crit
for crit in Criteria.objects.select_related('category').all()
}
for category_data in categories:
category_slug = category_data['category']
if category_data['category'] not in categories_by_slug:
raise AgendaImportError(_('Missing "%s" pricing category') % category_data['category'])
for criteria_slug in category_data['criterias']:
if (category_slug, criteria_slug) not in criterias_by_categories_and_slug:
raise AgendaImportError(
_('Missing "%s" pricing criteria for "%s" category') % (criteria_slug, category_slug)
)
data = clean_import_data(cls, data)
pricing, created = cls.objects.update_or_create(slug=data['slug'], defaults=data)
PricingCriteriaCategory.objects.filter(pricing=pricing).delete()
criterias = []
for category_data in categories:
pricing.categories.add(
categories_by_slug[category_data['category']],
through_defaults={'order': category_data['order']},
)
for criteria_slug in category_data['criterias']:
criterias.append(criterias_by_categories_and_slug[(category_data['category'], criteria_slug)])
pricing.criterias.set(criterias)
return created, pricing
def export_json(self):
return {
'label': self.label,
'slug': self.slug,
'extra_variables': self.extra_variables,
'categories': [pcc.export_json() for pcc in PricingCriteriaCategory.objects.filter(pricing=self)],
}
class PricingCriteriaCategory(models.Model):
pricing = models.ForeignKey(Pricing, on_delete=models.CASCADE)
@ -226,6 +268,13 @@ class PricingCriteriaCategory(models.Model):
self.order = max_order + 1
super().save(*args, **kwargs)
def export_json(self):
return {
'category': self.category.slug,
'order': self.order,
'criterias': [c.slug for c in self.pricing.criterias.all() if c.category == self.category],
}
class AgendaPricing(models.Model):
agenda = models.ForeignKey(Agenda, on_delete=models.CASCADE)

View File

@ -15,6 +15,7 @@
<a class="extra-actions-menu-opener"></a>
<ul class="extra-actions-menu">
<li><a rel="popup" href="{% url 'chrono-manager-pricing-edit' pk=object.pk %}">{% trans 'Options' %}</a></li>
<li><a href="{% url 'chrono-manager-pricing-export' pk=object.pk %}">{% trans 'Export' %}</a></li>
<li><a rel="popup" href="{% url 'chrono-manager-pricing-delete' pk=object.pk %}">{% trans 'Delete' %}</a></li>
</ul>
</span>

View File

@ -40,6 +40,11 @@ urlpatterns = [
views.pricing_delete,
name='chrono-manager-pricing-delete',
),
url(
r'^(?P<pk>\d+)/export/$',
views.pricing_export,
name='chrono-manager-pricing-export',
),
url(
r'^(?P<pk>\d+)/variable/$',
views.pricing_variable_edit,

View File

@ -138,6 +138,28 @@ class PricingDeleteView(DeleteView):
pricing_delete = PricingDeleteView.as_view()
class PricingExport(DetailView):
model = Pricing
def dispatch(self, request, *args, **kwargs):
if not request.user.is_staff:
raise PermissionDenied()
return super().dispatch(request, *args, **kwargs)
def get(self, request, *args, **kwargs):
response = HttpResponse(content_type='application/json')
today = datetime.date.today()
attachment = 'attachment; filename="export_pricing_{}_{}.json"'.format(
self.get_object().slug, today.strftime('%Y%m%d')
)
response['Content-Disposition'] = attachment
json.dump({'pricing_models': [self.get_object().export_json()]}, response, indent=2)
return response
pricing_export = PricingExport.as_view()
class PricingVariableEdit(FormView):
template_name = 'chrono/pricing/manager_pricing_variable_form.html'
model = Pricing

View File

@ -41,6 +41,7 @@ def test_export_site(settings, app, admin_user):
'resources': [],
'categories': [],
'pricing_categories': [],
'pricing_models': [],
}
agenda = Agenda.objects.create(label='Foo Bar', kind='events')
@ -57,6 +58,7 @@ def test_export_site(settings, app, admin_user):
assert len(site_json['resources']) == 0
assert len(site_json['categories']) == 0
assert len(site_json['pricing_categories']) == 0
assert len(site_json['pricing_models']) == 0
resp = app.get('/manage/agendas/export/')
resp.form['agendas'] = False
@ -65,6 +67,7 @@ def test_export_site(settings, app, admin_user):
resp.form['resources'] = False
resp.form['categories'] = False
resp.form['pricing_categories'] = False
resp.form['pricing_models'] = False
resp = resp.form.submit()
site_json = json.loads(resp.text)
@ -75,10 +78,12 @@ def test_export_site(settings, app, admin_user):
assert 'resources' not in site_json
assert 'categories' not in site_json
assert 'pricing_categories' not in site_json
assert 'pricing_models' not in site_json
settings.CHRONO_ENABLE_PRICING = False
resp = app.get('/manage/agendas/export/')
assert 'pricing_categories' not in resp.context['form'].fields
assert 'pricing_models' not in resp.context['form'].fields
def test_import_agenda_as_manager(app, manager_user):

View File

@ -111,6 +111,69 @@ def test_delete_pricing_as_manager(app, manager_user):
app.get('/manage/pricing/%s/delete/' % pricing.pk, status=403)
@pytest.mark.freeze_time('2021-07-08')
def test_import_pricing(app, admin_user):
pricing = Pricing.objects.create(label='Model')
app = login(app)
resp = app.get('/manage/pricing/%s/export/' % pricing.id)
assert resp.headers['content-type'] == 'application/json'
assert resp.headers['content-disposition'] == 'attachment; filename="export_pricing_model_20210708.json"'
pricing_export = resp.text
# existing pricing
resp = app.get('/manage/', status=200)
resp = resp.click('Import')
resp.form['agendas_json'] = Upload('export.json', pricing_export.encode('utf-8'), 'application/json')
resp = resp.form.submit()
assert resp.location.endswith('/manage/pricing/%s/' % pricing.pk)
resp = resp.follow()
assert 'No pricing model created. A pricing model has been updated.' not in resp.text
assert Pricing.objects.count() == 1
# new pricing
Pricing.objects.all().delete()
resp = app.get('/manage/', status=200)
resp = resp.click('Import')
resp.form['agendas_json'] = Upload('export.json', pricing_export.encode('utf-8'), 'application/json')
resp = resp.form.submit()
pricing = Pricing.objects.latest('pk')
assert resp.location.endswith('/manage/pricing/%s/' % pricing.pk)
resp = resp.follow()
assert 'A pricing model has been created. No pricing model updated.' not in resp.text
assert Pricing.objects.count() == 1
# multiple pricing
pricings = json.loads(pricing_export)
pricings['pricing_models'].append(copy.copy(pricings['pricing_models'][0]))
pricings['pricing_models'].append(copy.copy(pricings['pricing_models'][0]))
pricings['pricing_models'][1]['label'] = 'Foo bar 2'
pricings['pricing_models'][1]['slug'] = 'foo-bar-2'
pricings['pricing_models'][2]['label'] = 'Foo bar 3'
pricings['pricing_models'][2]['slug'] = 'foo-bar-3'
resp = app.get('/manage/', status=200)
resp = resp.click('Import')
resp.form['agendas_json'] = Upload(
'export.json', json.dumps(pricings).encode('utf-8'), 'application/json'
)
resp = resp.form.submit()
assert resp.location.endswith('/manage/')
resp = resp.follow()
assert '2 pricing models have been created. A pricing model has been updated.' in resp.text
assert Pricing.objects.count() == 3
Pricing.objects.all().delete()
resp = app.get('/manage/', status=200)
resp = resp.click('Import')
resp.form['agendas_json'] = Upload(
'export.json', json.dumps(pricings).encode('utf-8'), 'application/json'
)
resp = resp.form.submit().follow()
assert '3 pricing models have been created. No pricing model updated.' in resp.text
assert Pricing.objects.count() == 3
def test_pricing_edit_extra_variables(app, admin_user):
pricing = Pricing.objects.create(label='Model')
assert pricing.extra_variables == {}

View File

@ -35,7 +35,7 @@ from chrono.agendas.models import (
VirtualMember,
)
from chrono.manager.utils import import_site
from chrono.pricing.models import Criteria, CriteriaCategory
from chrono.pricing.models import Criteria, CriteriaCategory, Pricing, PricingCriteriaCategory
pytestmark = pytest.mark.django_db
@ -1104,7 +1104,7 @@ def test_import_export_resource(app):
def test_import_export_pricing_criteria_category(app):
output = get_output_of_command('export_site')
payload = json.loads(output)
assert len(payload['check_type_groups']) == 0
assert len(payload['pricing_categories']) == 0
category = CriteriaCategory.objects.create(label='Foo bar')
Criteria.objects.create(label='Foo reason', category=category)
@ -1158,6 +1158,164 @@ def test_import_export_pricing_criteria_category(app):
assert Criteria.objects.get(category=category, label='Baz', slug='baz')
def test_import_export_pricing(app):
output = get_output_of_command('export_site')
payload = json.loads(output)
assert len(payload['pricing_models']) == 0
pricing = Pricing.objects.create(label='Foo bar', extra_variables={'foo': 'bar'})
output = get_output_of_command('export_site')
payload = json.loads(output)
assert len(payload['pricing_models']) == 1
pricing.delete()
assert not Pricing.objects.exists()
import_site(copy.deepcopy(payload))
assert Pricing.objects.count() == 1
pricing = Pricing.objects.first()
assert pricing.label == 'Foo bar'
assert pricing.slug == 'foo-bar'
assert pricing.extra_variables == {'foo': 'bar'}
# update
update_payload = copy.deepcopy(payload)
update_payload['pricing_models'][0]['label'] = 'Foo bar Updated'
import_site(update_payload)
pricing.refresh_from_db()
assert pricing.label == 'Foo bar Updated'
# insert another pricing
pricing.slug = 'foo-bar-updated'
pricing.save()
import_site(copy.deepcopy(payload))
assert Pricing.objects.count() == 2
pricing = Pricing.objects.latest('pk')
assert pricing.label == 'Foo bar'
assert pricing.slug == 'foo-bar'
assert pricing.extra_variables == {'foo': 'bar'}
def test_import_export_pricing_with_categories(app):
pricing = Pricing.objects.create(label='Foo bar')
category = CriteriaCategory.objects.create(label='Foo bar')
pricing.categories.add(category, through_defaults={'order': 42})
output = get_output_of_command('export_site')
import_site(data={}, clean=True)
assert Pricing.objects.count() == 0
assert CriteriaCategory.objects.count() == 0
data = json.loads(output)
del data['pricing_categories']
with pytest.raises(AgendaImportError) as excinfo:
import_site(data, overwrite=True)
assert str(excinfo.value) == 'Missing "foo-bar" pricing category'
CriteriaCategory.objects.create(label='Foobar')
with pytest.raises(AgendaImportError) as excinfo:
import_site(data, overwrite=True)
assert str(excinfo.value) == 'Missing "foo-bar" pricing category'
category = CriteriaCategory.objects.create(label='Foo bar')
import_site(data, overwrite=True)
pricing = Pricing.objects.get(slug=pricing.slug)
assert list(pricing.categories.all()) == [category]
assert PricingCriteriaCategory.objects.first().order == 42
category2 = CriteriaCategory.objects.create(label='Foo bar 2')
category3 = CriteriaCategory.objects.create(label='Foo bar 3')
pricing.categories.add(category2, through_defaults={'order': 1})
output = get_output_of_command('export_site')
data = json.loads(output)
del data['pricing_categories']
data['pricing_models'][0]['categories'] = [
{
'category': 'foo-bar-3',
'order': 1,
'criterias': [],
},
{
'category': 'foo-bar',
'order': 35,
'criterias': [],
},
]
import_site(data, overwrite=True)
assert list(pricing.categories.all()) == [category, category3]
assert list(
PricingCriteriaCategory.objects.filter(pricing=pricing).values_list('category', flat=True)
) == [category3.pk, category.pk]
assert list(PricingCriteriaCategory.objects.filter(pricing=pricing).values_list('order', flat=True)) == [
1,
35,
]
assert list(pricing.criterias.all()) == []
criteria1 = Criteria.objects.create(label='Crit 1', category=category)
Criteria.objects.create(label='Crit 2', category=category)
criteria3 = Criteria.objects.create(label='Crit 3', category=category)
# unknown criteria
data['pricing_models'][0]['categories'] = [
{
'category': 'foo-bar-3',
'order': 1,
'criterias': ['unknown'],
},
{
'category': 'foo-bar',
'order': 35,
'criterias': [],
},
]
with pytest.raises(AgendaImportError) as excinfo:
import_site(data, overwrite=True)
assert str(excinfo.value) == 'Missing "unknown" pricing criteria for "foo-bar-3" category'
# wrong criteria (from another category)
data['pricing_models'][0]['categories'] = [
{
'category': 'foo-bar-3',
'order': 1,
'criterias': ['crit-1'],
},
{
'category': 'foo-bar',
'order': 35,
'criterias': [],
},
]
with pytest.raises(AgendaImportError) as excinfo:
import_site(data, overwrite=True)
assert str(excinfo.value) == 'Missing "crit-1" pricing criteria for "foo-bar-3" category'
data['pricing_models'][0]['categories'] = [
{
'category': 'foo-bar-3',
'order': 1,
'criterias': [],
},
{
'category': 'foo-bar',
'order': 35,
'criterias': ['crit-1', 'crit-3'],
},
]
import_site(data, overwrite=True)
assert list(pricing.categories.all()) == [category, category3]
assert list(
PricingCriteriaCategory.objects.filter(pricing=pricing).values_list('category', flat=True)
) == [category3.pk, category.pk]
assert list(PricingCriteriaCategory.objects.filter(pricing=pricing).values_list('order', flat=True)) == [
1,
35,
]
assert set(pricing.criterias.all()) == {criteria1, criteria3}
@mock.patch('chrono.agendas.models.Agenda.is_available_for_simple_management')
def test_import_export_desk_simple_management(available_mock):
agenda = Agenda.objects.create(label='Foo bar', kind='meetings', desk_simple_management=True)