pricing - import/export categories (#64746)

This commit is contained in:
Lauréline Guérin 2022-05-03 13:56:38 +02:00
parent a5c5f63c83
commit 1b82b01fea
No known key found for this signature in database
GPG Key ID: 1FAB9B9B4F93D473
12 changed files with 301 additions and 46 deletions

View File

@ -33,7 +33,7 @@ from django.conf import settings
from django.contrib.auth.models import Group
from django.contrib.humanize.templatetags.humanize import ordinal
from django.contrib.postgres.fields import ArrayField, JSONField
from django.core.exceptions import FieldDoesNotExist, ValidationError
from django.core.exceptions import ValidationError
from django.core.validators import MaxValueValidator, MinValueValidator
from django.db import connection, models, transaction
from django.db.models import Count, Exists, ExpressionWrapper, F, Func, Max, OuterRef, Prefetch, Q, Value
@ -56,7 +56,7 @@ from django.utils.translation import ungettext
from chrono.interval import Interval, IntervalSet
from chrono.utils.date import get_weekday_index
from chrono.utils.db import ArraySubquery, SumCardinality
from chrono.utils.misc import generate_slug
from chrono.utils.misc import AgendaImportError, ICSError, clean_import_data, generate_slug
from chrono.utils.publik_urls import translate_from_publik_url
from chrono.utils.requests_wrapper import requests as requests_wrapper
@ -98,28 +98,6 @@ def is_midnight(dtime):
return dtime.hour == 0 and dtime.minute == 0
def clean_import_data(cls, data):
cleaned_data = copy.deepcopy(data)
for param in data:
try:
field = cls._meta.get_field(param)
except FieldDoesNotExist:
# remove unknown fields
cleaned_data.pop(param)
continue
if field.many_to_many:
# remove many to many fields, they have to be managed after update_or_create
cleaned_data.pop(param)
continue
if param == 'slug':
value = cleaned_data[param]
try:
field.run_validators(value)
except ValidationError:
raise AgendaImportError(_('Bad slug format "%s"') % value)
return cleaned_data
def validate_not_digit(value):
if value.isdigit():
raise ValidationError(_('This value cannot be a number.'))
@ -163,14 +141,6 @@ def booking_template_validator(value):
pass
class ICSError(Exception):
pass
class AgendaImportError(Exception):
pass
class Agenda(models.Model):
label = models.CharField(_('Label'), max_length=150)
slug = models.SlugField(_('Identifier'), max_length=160, unique=True)

View File

@ -1425,6 +1425,14 @@ class AgendasExportForm(forms.Form):
categories = forms.BooleanField(label=_('Categories'), required=False, initial=True)
check_type_groups = forms.BooleanField(label=_('Check type groups'), required=False, initial=True)
events_types = forms.BooleanField(label=_('Events types'), required=False, initial=True)
pricing_categories = forms.BooleanField(
label=_('Pricing criteria categories'), required=False, initial=True
)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if not settings.CHRONO_ENABLE_PRICING:
del self.fields['pricing_categories']
class SharedCustodyRuleForm(forms.ModelForm):

View File

@ -30,6 +30,7 @@ from chrono.agendas.models import (
Resource,
UnavailabilityCalendar,
)
from chrono.pricing.models import CriteriaCategory
def export_site(
@ -39,9 +40,12 @@ def export_site(
events_types=True,
resources=True,
categories=True,
pricing_categories=True,
):
'''Dump site objects to JSON-dumpable dictionnary'''
data = collections.OrderedDict()
if pricing_categories:
data['pricing_categories'] = [x.export_json() for x in CriteriaCategory.objects.all()]
if categories:
data['categories'] = [x.export_json() for x in Category.objects.all()]
if resources:
@ -68,6 +72,7 @@ def import_site(data, if_empty=False, clean=False, overwrite=False):
or EventsType.objects.exists()
or Resource.objects.exists()
or Category.objects.exists()
or CriteriaCategory.objects.exists()
):
return
@ -78,6 +83,7 @@ def import_site(data, if_empty=False, clean=False, overwrite=False):
EventsType.objects.all().delete()
Resource.objects.all().delete()
Category.objects.all().delete()
CriteriaCategory.objects.all().delete()
results = {
key: collections.defaultdict(list)
@ -88,6 +94,7 @@ def import_site(data, if_empty=False, clean=False, overwrite=False):
'events_types',
'resources',
'categories',
'pricing_categories',
]
}
@ -111,6 +118,7 @@ def import_site(data, if_empty=False, clean=False, overwrite=False):
(CheckTypeGroup, 'check_type_groups'),
(UnavailabilityCalendar, 'unavailability_calendars'),
(Agenda, 'agendas'),
(CriteriaCategory, 'pricing_categories'),
):
objs = data.get(key, [])
for obj in objs:

View File

@ -1019,13 +1019,13 @@ class AgendasImportView(FormView):
'check_type_groups': {
'create_noop': _('No check type group created.'),
'create': lambda x: ungettext(
'An check type group has been created.',
'A check type group has been created.',
'%(count)d check type groups have been created.',
x,
),
'update_noop': _('No check type group updated.'),
'update': lambda x: ungettext(
'An check type group has been updated.',
'A check type group has been updated.',
'%(count)d check type groups have been updated.',
x,
),
@ -1072,6 +1072,20 @@ class AgendasImportView(FormView):
x,
),
},
'pricing_categories': {
'create_noop': _('No pricing criteria category created.'),
'create': lambda x: ungettext(
'A pricing criteria category has been created.',
'%(count)d pricing criteria categories have been created.',
x,
),
'update_noop': _('No pricing criteria category updated.'),
'update': lambda x: ungettext(
'A pricing criteria category has been updated.',
'%(count)d pricing criteria categories have been updated.',
x,
),
},
}
global_noop = True
@ -1092,17 +1106,18 @@ class AgendasImportView(FormView):
obj_results['messages'] = "%s %s" % (message1, message2)
a_count, uc_count, arg_count = (
a_count, uc_count, arg_count, pc_count = (
len(results['agendas']['all']),
len(results['unavailability_calendars']['all']),
len(results['check_type_groups']['all']),
len(results['pricing_categories']['all']),
)
if (a_count, uc_count, arg_count) == (1, 0, 0):
if (a_count, uc_count, arg_count, pc_count) == (1, 0, 0, 0):
# only one agenda imported, redirect to settings page
return HttpResponseRedirect(
reverse('chrono-manager-agenda-settings', kwargs={'pk': results['agendas']['all'][0].pk})
)
if (a_count, uc_count, arg_count) == (0, 1, 0):
if (a_count, uc_count, arg_count, pc_count) == (0, 1, 0, 0):
# only one unavailability calendar imported, redirect to settings page
return HttpResponseRedirect(
reverse(
@ -1110,9 +1125,12 @@ class AgendasImportView(FormView):
kwargs={'pk': results['unavailability_calendars']['all'][0].pk},
)
)
if (a_count, uc_count, arg_count) == (0, 0, 1):
# only one check type group imported, redirect to group page
if (a_count, uc_count, arg_count, pc_count) == (0, 0, 1, 0):
# only one check type group imported, redirect to check type page
return HttpResponseRedirect(reverse('chrono-manager-check-type-list'))
if (a_count, uc_count, arg_count, pc_count) == (0, 0, 0, 1):
# only one criteria category imported, redirect to criteria page
return HttpResponseRedirect(reverse('chrono-manager-pricing-criteria-list'))
if global_noop:
messages.info(self.request, _('No data found.'))
@ -1123,6 +1141,7 @@ class AgendasImportView(FormView):
messages.info(self.request, results['events_types']['messages'])
messages.info(self.request, results['resources']['messages'])
messages.info(self.request, results['categories']['messages'])
messages.info(self.request, results['pricing_categories']['messages'])
return super().form_valid(form)

View File

@ -23,7 +23,7 @@ from django.utils.text import slugify
from django.utils.translation import ugettext_lazy as _
from chrono.agendas.models import Agenda, Booking, Subscription
from chrono.utils.misc import generate_slug
from chrono.utils.misc import clean_import_data, generate_slug
class PricingError(Exception):
@ -87,6 +87,28 @@ class CriteriaCategory(models.Model):
def base_slug(self):
return slugify(self.label)
@classmethod
def import_json(cls, data, overwrite=False):
criterias = data.pop('criterias', [])
data = clean_import_data(cls, data)
category, created = cls.objects.update_or_create(slug=data['slug'], defaults=data)
if overwrite:
Criteria.objects.filter(category=category).delete()
for criteria in criterias:
criteria['category'] = category
Criteria.import_json(criteria)
return created, category
def export_json(self):
return {
'label': self.label,
'slug': self.slug,
'criterias': [a.export_json() for a in self.criterias.all()],
}
class Criteria(models.Model):
category = models.ForeignKey(
@ -121,6 +143,19 @@ class Criteria(models.Model):
def base_slug(self):
return slugify(self.label)
@classmethod
def import_json(cls, data):
data = clean_import_data(cls, data)
cls.objects.update_or_create(slug=data['slug'], category=data['category'], defaults=data)
def export_json(self):
return {
'label': self.label,
'slug': self.slug,
'condition': self.condition,
'order': self.order,
}
def compute_condition(self, context):
try:
template = Template('{%% if %s %%}OK{%% endif %%}' % self.condition)

View File

@ -23,7 +23,7 @@
<h3>
<a rel="popup" href="{% url 'chrono-manager-pricing-criteria-category-edit' object.pk %}">{{ object }} [{{ object.slug }}]</a>
<span>
<a class="button" href="{# url 'chrono-manager-pricing-criteria-category-export' object.pk #}">{% trans "Export"%}</a>
<a class="button" href="{% url 'chrono-manager-pricing-criteria-category-export' object.pk %}">{% trans "Export"%}</a>
<a class="button" rel="popup" href="{% url 'chrono-manager-pricing-criteria-category-delete' object.pk %}">{% trans "Delete"%}</a>
</span>
</h3>

View File

@ -35,6 +35,11 @@ urlpatterns = [
views.criteria_category_delete,
name='chrono-manager-pricing-criteria-category-delete',
),
url(
r'^criteria/category/(?P<pk>\d+)/export/$',
views.criteria_category_export,
name='chrono-manager-pricing-criteria-category-export',
),
url(
r'^criteria/category/(?P<category_pk>\d+)/add/$',
views.criteria_add,

View File

@ -14,10 +14,13 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import datetime
import json
from django.core.exceptions import PermissionDenied
from django.http import HttpResponse
from django.urls import reverse
from django.views.generic import CreateView, DeleteView, ListView, UpdateView
from django.views.generic import CreateView, DeleteView, DetailView, ListView, UpdateView
from chrono.pricing.forms import CriteriaForm, NewCriteriaForm
from chrono.pricing.models import Criteria, CriteriaCategory
@ -89,6 +92,28 @@ class CriteriaCategoryDeleteView(DeleteView):
criteria_category_delete = CriteriaCategoryDeleteView.as_view()
class CriteriaCategoryExport(DetailView):
model = CriteriaCategory
def dispatch(self, request, *args, **kwargs):
if not request.user.is_staff:
raise PermissionDenied()
return super().dispatch(request, *args, **kwargs)
def get(self, request, *args, **kwargs):
response = HttpResponse(content_type='application/json')
today = datetime.date.today()
attachment = 'attachment; filename="export_pricing_category_{}_{}.json"'.format(
self.get_object().slug, today.strftime('%Y%m%d')
)
response['Content-Disposition'] = attachment
json.dump({'pricing_categories': [self.get_object().export_json()]}, response, indent=2)
return response
criteria_category_export = CriteriaCategoryExport.as_view()
class CriteriaAddView(CreateView):
template_name = 'chrono/pricing/manager_criteria_form.html'
model = Criteria

View File

@ -14,6 +14,19 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import copy
from django.core.exceptions import FieldDoesNotExist, ValidationError
from django.utils.translation import ugettext_lazy as _
class ICSError(Exception):
pass
class AgendaImportError(Exception):
pass
def generate_slug(instance, seen_slugs=None, **query_filters):
base_slug = instance.base_slug
@ -38,3 +51,25 @@ def generate_slug(instance, seen_slugs=None, **query_filters):
i += 1
seen_slugs.add(slug)
return slug
def clean_import_data(cls, data):
cleaned_data = copy.deepcopy(data)
for param in data:
try:
field = cls._meta.get_field(param)
except FieldDoesNotExist:
# remove unknown fields
cleaned_data.pop(param)
continue
if field.many_to_many:
# remove many to many fields, they have to be managed after update_or_create
cleaned_data.pop(param)
continue
if param == 'slug':
value = cleaned_data[param]
try:
field.run_validators(value)
except ValidationError:
raise AgendaImportError(_('Bad slug format "%s"') % value)
return cleaned_data

View File

@ -22,7 +22,7 @@ from tests.utils import login
pytestmark = pytest.mark.django_db
def test_export_site(app, admin_user):
def test_export_site(settings, app, admin_user):
login(app)
resp = app.get('/manage/')
resp = resp.click('Export')
@ -40,6 +40,7 @@ def test_export_site(app, admin_user):
'events_types': [],
'resources': [],
'categories': [],
'pricing_categories': [],
}
agenda = Agenda.objects.create(label='Foo Bar', kind='events')
@ -55,6 +56,7 @@ def test_export_site(app, admin_user):
assert len(site_json['events_types']) == 0
assert len(site_json['resources']) == 0
assert len(site_json['categories']) == 0
assert len(site_json['pricing_categories']) == 0
resp = app.get('/manage/agendas/export/')
resp.form['agendas'] = False
@ -62,6 +64,7 @@ def test_export_site(app, admin_user):
resp.form['events_types'] = False
resp.form['resources'] = False
resp.form['categories'] = False
resp.form['pricing_categories'] = False
resp = resp.form.submit()
site_json = json.loads(resp.text)
@ -71,6 +74,11 @@ def test_export_site(app, admin_user):
assert 'events_types' not in site_json
assert 'resources' not in site_json
assert 'categories' not in site_json
assert 'pricing_categories' not in site_json
settings.CHRONO_ENABLE_PRICING = False
resp = app.get('/manage/agendas/export/')
assert 'pricing_categories' not in resp.context['form'].fields
def test_import_agenda_as_manager(app, manager_user):
@ -315,7 +323,7 @@ def test_import_check_type_group(app, admin_user):
resp = resp.form.submit()
assert resp.location.endswith('/manage/check-types/')
resp = resp.follow()
assert 'No check type group created. An check type group has been updated.' not in resp.text
assert 'No check type group created. A check type group has been updated.' not in resp.text
assert CheckTypeGroup.objects.count() == 1
assert CheckType.objects.count() == 2
@ -327,7 +335,7 @@ def test_import_check_type_group(app, admin_user):
resp = resp.form.submit()
assert resp.location.endswith('/manage/check-types/')
resp = resp.follow()
assert 'An check type group has been created. No check type group updated.' not in resp.text
assert 'A check type group has been created. No check type group updated.' not in resp.text
assert CheckTypeGroup.objects.count() == 1
assert CheckType.objects.count() == 2
@ -346,7 +354,7 @@ def test_import_check_type_group(app, admin_user):
resp = resp.form.submit()
assert resp.location.endswith('/manage/')
resp = resp.follow()
assert '2 check type groups have been created. An check type group has been updated.' in resp.text
assert '2 check type groups have been created. A check type group has been updated.' in resp.text
assert CheckTypeGroup.objects.count() == 3
assert CheckType.objects.count() == 6

View File

@ -1,4 +1,8 @@
import copy
import json
import pytest
from webtest import Upload
from chrono.agendas.models import Agenda
from chrono.pricing.models import Criteria, CriteriaCategory
@ -188,3 +192,83 @@ def test_delete_criteria_as_manager(app, manager_user):
app = login(app, username='manager', password='manager')
app.get('/manage/pricing/criteria/category/%s/%s/delete/' % (category.pk, criteria.pk), status=403)
@pytest.mark.freeze_time('2021-07-08')
def test_import_criteria_category(app, admin_user):
category = CriteriaCategory.objects.create(label='Foo bar')
Criteria.objects.create(label='Foo', category=category)
Criteria.objects.create(label='Baz', category=category)
app = login(app)
resp = app.get('/manage/pricing/criteria/category/%s/export/' % category.id)
assert resp.headers['content-type'] == 'application/json'
assert (
resp.headers['content-disposition']
== 'attachment; filename="export_pricing_category_foo-bar_20210708.json"'
)
category_export = resp.text
# existing category
resp = app.get('/manage/', status=200)
resp = resp.click('Import')
resp.form['agendas_json'] = Upload('export.json', category_export.encode('utf-8'), 'application/json')
resp = resp.form.submit()
assert resp.location.endswith('/manage/pricing/criterias/')
resp = resp.follow()
assert (
'No pricing criteria category created. A pricing criteria category has been updated.' not in resp.text
)
assert CriteriaCategory.objects.count() == 1
assert Criteria.objects.count() == 2
# new category
CriteriaCategory.objects.all().delete()
resp = app.get('/manage/', status=200)
resp = resp.click('Import')
resp.form['agendas_json'] = Upload('export.json', category_export.encode('utf-8'), 'application/json')
resp = resp.form.submit()
assert resp.location.endswith('/manage/pricing/criterias/')
resp = resp.follow()
assert (
'A pricing criteria category has been created. No pricing criteria category updated.' not in resp.text
)
assert CriteriaCategory.objects.count() == 1
assert Criteria.objects.count() == 2
# multiple categories
categories = json.loads(category_export)
categories['pricing_categories'].append(copy.copy(categories['pricing_categories'][0]))
categories['pricing_categories'].append(copy.copy(categories['pricing_categories'][0]))
categories['pricing_categories'][1]['label'] = 'Foo bar 2'
categories['pricing_categories'][1]['slug'] = 'foo-bar-2'
categories['pricing_categories'][2]['label'] = 'Foo bar 3'
categories['pricing_categories'][2]['slug'] = 'foo-bar-3'
resp = app.get('/manage/', status=200)
resp = resp.click('Import')
resp.form['agendas_json'] = Upload(
'export.json', json.dumps(categories).encode('utf-8'), 'application/json'
)
resp = resp.form.submit()
assert resp.location.endswith('/manage/')
resp = resp.follow()
assert (
'2 pricing criteria categories have been created. A pricing criteria category has been updated.'
in resp.text
)
assert CriteriaCategory.objects.count() == 3
assert Criteria.objects.count() == 6
CriteriaCategory.objects.all().delete()
resp = app.get('/manage/', status=200)
resp = resp.click('Import')
resp.form['agendas_json'] = Upload(
'export.json', json.dumps(categories).encode('utf-8'), 'application/json'
)
resp = resp.form.submit().follow()
assert (
'3 pricing criteria categories have been created. No pricing criteria category updated.' in resp.text
)
assert CriteriaCategory.objects.count() == 3
assert Criteria.objects.count() == 6

View File

@ -35,6 +35,7 @@ from chrono.agendas.models import (
VirtualMember,
)
from chrono.manager.utils import import_site
from chrono.pricing.models import Criteria, CriteriaCategory
pytestmark = pytest.mark.django_db
@ -1100,6 +1101,63 @@ def test_import_export_resource(app):
assert resource.slug == 'foo-bar'
def test_import_export_pricing_criteria_category(app):
output = get_output_of_command('export_site')
payload = json.loads(output)
assert len(payload['check_type_groups']) == 0
category = CriteriaCategory.objects.create(label='Foo bar')
Criteria.objects.create(label='Foo reason', category=category)
Criteria.objects.create(label='Baz', category=category)
output = get_output_of_command('export_site')
payload = json.loads(output)
assert len(payload['pricing_categories']) == 1
category.delete()
assert not CriteriaCategory.objects.exists()
assert not Criteria.objects.exists()
import_site(copy.deepcopy(payload))
assert CriteriaCategory.objects.count() == 1
category = CriteriaCategory.objects.first()
assert category.label == 'Foo bar'
assert category.slug == 'foo-bar'
assert category.criterias.count() == 2
assert Criteria.objects.get(category=category, label='Foo reason', slug='foo-reason')
assert Criteria.objects.get(category=category, label='Baz', slug='baz')
# update
update_payload = copy.deepcopy(payload)
update_payload['pricing_categories'][0]['label'] = 'Foo bar Updated'
import_site(update_payload)
category.refresh_from_db()
assert category.label == 'Foo bar Updated'
# insert another category
category.slug = 'foo-bar-updated'
category.save()
import_site(copy.deepcopy(payload))
assert CriteriaCategory.objects.count() == 2
category = CriteriaCategory.objects.latest('pk')
assert category.label == 'Foo bar'
assert category.slug == 'foo-bar'
assert category.criterias.count() == 2
assert Criteria.objects.get(category=category, label='Foo reason', slug='foo-reason')
assert Criteria.objects.get(category=category, label='Baz', slug='baz')
# with overwrite
Criteria.objects.create(category=category, label='Baz2')
import_site(copy.deepcopy(payload), overwrite=True)
assert CriteriaCategory.objects.count() == 2
category = CriteriaCategory.objects.latest('pk')
assert category.label == 'Foo bar'
assert category.slug == 'foo-bar'
assert category.criterias.count() == 2
assert Criteria.objects.get(category=category, label='Foo reason', slug='foo-reason')
assert Criteria.objects.get(category=category, label='Baz', slug='baz')
@mock.patch('chrono.agendas.models.Agenda.is_available_for_simple_management')
def test_import_export_desk_simple_management(available_mock):
agenda = Agenda.objects.create(label='Foo bar', kind='meetings', desk_simple_management=True)