Compare commits

...

6 Commits

Author SHA1 Message Date
Lauréline Guérin fc8872e559
misc: remove import/export commands, simplify code (#78125)
gitea/lingo/pipeline/head This commit looks good Details
2023-06-02 15:44:00 +02:00
Lauréline Guérin 2b3f25985b
invoicing: add a global export/import for invoicing config (#78125) 2023-06-02 14:37:27 +02:00
Lauréline Guérin 1813a9c415
misc: rename AgendaImportError to be more generic (#78125) 2023-06-02 11:28:51 +02:00
Lauréline Guérin 0eadd96ae8
pricing: add the edition of regie slug (#78125) 2023-06-02 11:28:41 +02:00
Lauréline Guérin 05c08d9098
invoicing: reorganize regie templates (#78125) 2023-06-02 11:28:30 +02:00
Lauréline Guérin dc8b89112a
invoicing: split views (#78125) 2023-06-02 11:28:17 +02:00
29 changed files with 1368 additions and 1297 deletions

View File

@ -21,7 +21,7 @@ from django.db import models
from django.utils.text import slugify
from django.utils.translation import gettext_lazy as _
from lingo.utils.misc import AgendaImportError, clean_import_data, generate_slug
from lingo.utils.misc import LingoImportError, clean_import_data, generate_slug
class Agenda(models.Model):
@ -66,17 +66,17 @@ class Agenda(models.Model):
}
@classmethod
def import_json(cls, data, overwrite=False):
def import_json(cls, data):
data = copy.deepcopy(data)
try:
agenda = Agenda.objects.get(slug=data['slug'])
except Agenda.DoesNotExist:
raise AgendaImportError(_('Missing "%s" agenda') % data['slug'])
raise LingoImportError(_('Missing "%s" agenda') % data['slug'])
if data.get('check_type_group'):
try:
data['check_type_group'] = CheckTypeGroup.objects.get(slug=data['check_type_group'])
except CheckTypeGroup.DoesNotExist:
raise AgendaImportError(_('Missing "%s" check type group') % data['check_type_group'])
raise LingoImportError(_('Missing "%s" check type group') % data['check_type_group'])
agenda.check_type_group = data.get('check_type_group')
agenda.save()
@ -111,14 +111,11 @@ class CheckTypeGroup(models.Model):
return slugify(self.label)
@classmethod
def import_json(cls, data, overwrite=False):
def import_json(cls, data):
check_types = data.pop('check_types', [])
data = clean_import_data(cls, data)
group, created = cls.objects.update_or_create(slug=data['slug'], defaults=data)
if overwrite:
CheckType.objects.filter(group=group).delete()
for check_type in check_types:
check_type['group'] = group
CheckType.import_json(check_type)

View File

@ -20,7 +20,29 @@ from django import forms
from django.utils.translation import gettext_lazy as _
from gadjo.forms.widgets import MultiSelectWidget
from lingo.invoicing.models import Campaign, DraftInvoice, DraftInvoiceLine, Invoice, InvoiceLine
from lingo.invoicing.models import Campaign, DraftInvoice, DraftInvoiceLine, Invoice, InvoiceLine, Regie
class ExportForm(forms.Form):
regies = forms.BooleanField(label=_('Regies'), required=False, initial=True)
class ImportForm(forms.Form):
config_json = forms.FileField(label=_('Export File'))
class RegieForm(forms.ModelForm):
class Meta:
model = Regie
fields = ['label', 'slug', 'description', 'cashier_role', 'counter_name', 'number_format']
def clean_slug(self):
slug = self.cleaned_data['slug']
if Regie.objects.filter(slug=slug).exclude(pk=self.instance.pk).exists():
raise forms.ValidationError(_('Another regie exists with the same identifier.'))
return slug
class CampaignForm(forms.ModelForm):

View File

@ -0,0 +1,22 @@
{% extends "lingo/invoicing/manager_home.html" %}
{% load i18n %}
{% block breadcrumb %}
{{ block.super }}
<a href="{% url 'lingo-manager-invoicing-config-export' %}">{% trans 'Export' %}</a>
{% endblock %}
{% block appbar %}
<h2>{% trans "Export" %}</h2>
{% endblock %}
{% block content %}
<form method="post" enctype="multipart/form-data">
{% csrf_token %}
{{ form.as_p }}
<div class="buttons">
<button class="submit-button">{% trans "Export" %}</button>
<a class="cancel" href="{% url 'lingo-manager-invoicing-home' %}">{% trans 'Cancel' %}</a>
</div>
</form>
{% endblock %}

View File

@ -1,13 +1,13 @@
{% extends "lingo/invoicing/manager_regie_list.html" %}
{% extends "lingo/invoicing/manager_home.html" %}
{% load i18n %}
{% block breadcrumb %}
{{ block.super }}
<a href="{% url 'lingo-manager-invoicing-regie-import' %}">{% trans 'Import' %}</a>
<a href="{% url 'lingo-manager-invoicing-config-import' %}">{% trans 'Import' %}</a>
{% endblock %}
{% block appbar %}
<h2>{% trans "Import Regies" %}</h2>
<h2>{% trans "Import" %}</h2>
{% endblock %}
{% block content %}
@ -16,7 +16,7 @@
{{ form.as_p }}
<div class="buttons">
<button class="submit-button">{% trans "Import" %}</button>
<a class="cancel" href="{% url 'lingo-manager-invoicing-regie-list' %}">{% trans 'Cancel' %}</a>
<a class="cancel" href="{% url 'lingo-manager-invoicing-home' %}">{% trans 'Cancel' %}</a>
</div>
</form>
{% endblock %}

View File

@ -8,6 +8,13 @@
{% block appbar %}
<h2>{% trans 'Invoicing' %}</h2>
<span class="actions">
<a class="extra-actions-menu-opener"></a>
<ul class="extra-actions-menu">
<li><a rel="popup" href="{% url 'lingo-manager-invoicing-config-import' %}">{% trans 'Import' %}</a></li>
<li><a rel="popup" href="{% url 'lingo-manager-invoicing-config-export' %}" data-autoclose-dialog="true">{% trans 'Export' %}</a></li>
</ul>
</span>
{% endblock %}
{% block content %}

View File

@ -1,12 +0,0 @@
{% extends "lingo/invoicing/manager_home.html" %}
{% load i18n %}
{% block appbar %}
<h2>{% if regie %}{% trans 'Regie' %} - {{regie}} {% else %}{% trans 'Regies' %}{% endif %}</h2>
{% endblock %}
{% block breadcrumb %}
{{ block.super }}
<a href="{% url 'lingo-manager-invoicing-regie-list' %}">{% trans "Regies" %}</a>
{% endblock %}

View File

@ -1,4 +1,4 @@
{% extends "lingo/invoicing/manager_regie_common.html" %}
{% extends "lingo/invoicing/manager_regie_list.html" %}
{% load i18n gadjo %}
{% block breadcrumb %}
@ -7,12 +7,16 @@
{% endblock %}
{% block appbar %}
{{ block.super }}
<h2>{% trans 'Regie' %} - {{ regie }}</h2>
<span class="actions">
{% if not has_related_objects %}
<a href="{% url 'lingo-manager-invoicing-regie-delete' pk=regie.pk %}" rel="popup">{% trans "Delete" %}</a>
{% endif %}
<a href="{% url 'lingo-manager-invoicing-regie-edit' pk=regie.pk %}" rel="popup">{% trans "Edit" %}</a>
<a class="extra-actions-menu-opener"></a>
<ul class="extra-actions-menu">
<li><a href="{% url 'lingo-manager-invoicing-regie-edit' pk=regie.pk %}" rel="popup">{% trans "Edit" %}</a></li>
<li><a href="{% url 'lingo-manager-invoicing-regie-export' pk=regie.pk %}">{% trans 'Export' %}</a></li>
{% if not has_related_objects %}
<li><a href="{% url 'lingo-manager-invoicing-regie-delete' pk=regie.pk %}" rel="popup">{% trans "Delete" %}</a></li>
{% endif %}
</ul>
<a href="{% url 'lingo-manager-invoicing-non-invoiced-line-list' regie_pk=regie.pk %}">{% trans 'Non invoiced lines' %}</a>
<a href="{% url 'lingo-manager-invoicing-regie-invoice-list' regie_pk=regie.pk %}">{% trans 'Invoices' %}</a>
</span>

View File

@ -1,4 +1,4 @@
{% extends "lingo/invoicing/manager_regie_common.html" %}
{% extends "lingo/invoicing/manager_regie_list.html" %}
{% load i18n gadjo %}
{% block breadcrumb %}
@ -13,7 +13,7 @@
{% block appbar %}
{% if regie.pk %}
<h2>{% trans "Edit regie" %} - {{regie}}</h2>
<h2>{% trans "Edit regie" %} - {{ regie }}</h2>
{% else %}
<h2>{% trans "New regie" %}</h2>
{% endif %}
@ -25,7 +25,11 @@
{{ form|with_template }}
<div class="buttons">
<button>{% trans "Submit" %}</button>
<a class="cancel" href="{{cancel_url}}">{% trans 'Cancel' %}</a>
{% if object.pk %}
<a class="cancel" href="{% url 'lingo-manager-invoicing-regie-detail' regie.pk %}">{% trans 'Cancel' %}</a>
{% else %}
<a class="cancel" href="{% url 'lingo-manager-invoicing-regie-list' %}">{% trans 'Cancel' %}</a>
{% endif %}
</div>
</form>
{% endblock %}

View File

@ -1,18 +1,14 @@
{% extends "lingo/invoicing/manager_regie_common.html" %}
{% extends "lingo/invoicing/manager_home.html" %}
{% load i18n %}
{% block breadcrumb %}
{{ block.super }}
<a href="{% url 'lingo-manager-invoicing-regie-list' %}">{% trans "Regies" %}</a>
{% endblock %}
{% block appbar %}
<h2>{% trans 'Regies' %}</h2>
<span class="actions">
<a class="extra-actions-menu-opener"></a>
<ul class="extra-actions-menu">
<li>
<a href="{% url 'lingo-manager-invoicing-regie-import' %}">{% trans 'Import' %}</a>
</li>
<li>
<a href="{% url 'lingo-manager-invoicing-regie-export' %}">{% trans 'Export' %}</a>
</li>
</ul>
<a rel="popup" href="{% url 'lingo-manager-invoicing-regie-add' %}">{% trans 'New regie' %}</a>
</span>
{% endblock %}

View File

@ -16,126 +16,134 @@
from django.urls import path
from . import views
from .views import campaign as campaign_views
from .views import home as home_views
from .views import pool as pool_views
from .views import regie as regie_views
urlpatterns = [
path('', views.home, name='lingo-manager-invoicing-home'),
path('regies/', views.regies_list, name='lingo-manager-invoicing-regie-list'),
path('', home_views.home, name='lingo-manager-invoicing-home'),
path('import/', home_views.config_import, name='lingo-manager-invoicing-config-import'),
path('export/', home_views.config_export, name='lingo-manager-invoicing-config-export'),
path('regies/', regie_views.regies_list, name='lingo-manager-invoicing-regie-list'),
path(
'regie/add/',
views.regie_add,
regie_views.regie_add,
name='lingo-manager-invoicing-regie-add',
),
path(
'regie/<int:pk>/',
views.regie_detail,
regie_views.regie_detail,
name='lingo-manager-invoicing-regie-detail',
),
path(
'regie/<int:pk>/edit/',
views.regie_edit,
regie_views.regie_edit,
name='lingo-manager-invoicing-regie-edit',
),
path(
'regie/<int:pk>/delete/',
views.regie_delete,
regie_views.regie_delete,
name='lingo-manager-invoicing-regie-delete',
),
path('regies/import/', views.regies_import, name='lingo-manager-invoicing-regie-import'),
path('regies/export/', views.regies_export, name='lingo-manager-invoicing-regie-export'),
path(
'regie/<int:pk>/export/',
regie_views.regie_export,
name='lingo-manager-invoicing-regie-export',
),
path(
'regie/<int:regie_pk>/campaign/add/',
views.campaign_add,
campaign_views.campaign_add,
name='lingo-manager-invoicing-campaign-add',
),
path(
'regie/<int:regie_pk>/campaign/<int:pk>/',
views.campaign_detail,
campaign_views.campaign_detail,
name='lingo-manager-invoicing-campaign-detail',
),
path(
'regie/<int:regie_pk>/campaign/<int:pk>/edit/',
views.campaign_edit,
campaign_views.campaign_edit,
name='lingo-manager-invoicing-campaign-edit',
),
path(
'regie/<int:regie_pk>/campaign/<int:pk>/dates/edit/',
views.campaign_dates_edit,
campaign_views.campaign_dates_edit,
name='lingo-manager-invoicing-campaign-dates-edit',
),
path(
'regie/<int:regie_pk>/campaign/<int:pk>/delete/',
views.campaign_delete,
campaign_views.campaign_delete,
name='lingo-manager-invoicing-campaign-delete',
),
path(
'regie/<int:regie_pk>/campaign/<int:pk>/unlock-check/',
views.campaign_unlock_check,
campaign_views.campaign_unlock_check,
name='lingo-manager-invoicing-campaign-unlock-check',
),
path(
'regie/<int:regie_pk>/campaign/<int:pk>/finalize/',
views.campaign_finalize,
campaign_views.campaign_finalize,
name='lingo-manager-invoicing-campaign-finalize',
),
path(
'regie/<int:regie_pk>/campaign/<int:pk>/pool/add/',
views.pool_add,
pool_views.pool_add,
name='lingo-manager-invoicing-pool-add',
),
path(
'regie/<int:regie_pk>/campaign/<int:pk>/pool/<int:pool_pk>/',
views.pool_detail,
pool_views.pool_detail,
name='lingo-manager-invoicing-pool-detail',
),
path(
'regie/<int:regie_pk>/campaign/<int:pk>/pool/<int:pool_pk>/journal/',
views.pool_journal,
pool_views.pool_journal,
name='lingo-manager-invoicing-pool-journal',
),
path(
'regie/<int:regie_pk>/campaign/<int:pk>/pool/<int:pool_pk>/promote/',
views.pool_promote,
pool_views.pool_promote,
name='lingo-manager-invoicing-pool-promote',
),
path(
'regie/<int:regie_pk>/campaign/<int:pk>/pool/<int:pool_pk>/delete/',
views.pool_delete,
pool_views.pool_delete,
name='lingo-manager-invoicing-pool-delete',
),
path(
'regie/<int:regie_pk>/campaign/<int:pk>/pool/<int:pool_pk>/invoice/<int:invoice_pk>/pdf/',
views.invoice_pdf,
pool_views.invoice_pdf,
name='lingo-manager-invoicing-invoice-pdf',
),
path(
'ajax/regie/<int:regie_pk>/campaign/<int:pk>/pool/<int:pool_pk>/invoice/<int:invoice_pk>/lines/',
views.invoice_line_list,
pool_views.invoice_line_list,
name='lingo-manager-invoicing-invoice-line-list',
),
path(
'ajax/regie/<int:regie_pk>/campaign/<int:pk>/pool/<int:pool_pk>/line/<int:line_pk>/<slug:status>/',
views.line_set_error_status,
pool_views.line_set_error_status,
name='lingo-manager-invoicing-line-set-error-status',
),
path(
'regie/<int:regie_pk>/non-invoiced-lines/',
views.non_invoiced_line_list,
regie_views.non_invoiced_line_list,
name='lingo-manager-invoicing-non-invoiced-line-list',
),
path(
'regie/<int:regie_pk>/invoices/',
views.regie_invoice_list,
regie_views.regie_invoice_list,
name='lingo-manager-invoicing-regie-invoice-list',
),
path(
'regie/<int:regie_pk>/invoice/<int:invoice_pk>/pdf/',
views.regie_invoice_pdf,
regie_views.regie_invoice_pdf,
name='lingo-manager-invoicing-regie-invoice-pdf',
),
path(
'ajax/regie/<int:regie_pk>/invoice/<int:invoice_pk>/lines/',
views.regie_invoice_line_list,
regie_views.regie_invoice_line_list,
name='lingo-manager-invoicing-regie-invoice-line-list',
),
]

View File

@ -17,12 +17,13 @@
import collections
import datetime
from django.db import transaction
from django.test.client import RequestFactory
from django.utils.translation import gettext_lazy as _
from lingo.agendas.chrono import get_check_status, get_subscriptions
from lingo.agendas.models import Agenda
from lingo.invoicing.models import DraftInvoice, DraftInvoiceLine, InjectedLine
from lingo.invoicing.models import DraftInvoice, DraftInvoiceLine, InjectedLine, Regie
from lingo.pricing.models import AgendaPricing, AgendaPricingNotFound, PricingError
@ -310,3 +311,34 @@ def generate_invoices_from_lines(all_lines, pool):
invoices.append(invoice)
return invoices
def export_site(
regies=True,
):
'''Dump site objects to JSON-dumpable dictionnary'''
data = {}
if regies:
data['regies'] = [x.export_json() for x in Regie.objects.all()]
return data
def import_site(data):
results = {
key: collections.defaultdict(list)
for key in [
'regies',
]
}
with transaction.atomic():
for cls, key in ((Regie, 'regies'),):
objs = data.get(key, [])
for obj in objs:
created, obj = cls.import_json(obj)
results[key]['all'].append(obj)
if created:
results[key]['created'].append(obj)
else:
results[key]['updated'].append(obj)
return results

View File

@ -1,929 +0,0 @@
# lingo - payment and billing system
# Copyright (C) 2022 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import collections
import datetime
import json
from django.contrib import messages
from django.db import transaction
from django.db.models import CharField, Count, IntegerField, JSONField, OuterRef, Subquery, Value
from django.db.models.functions import Coalesce
from django.http import Http404, HttpResponse
from django.shortcuts import get_object_or_404, redirect
from django.urls import reverse, reverse_lazy
from django.utils.translation import gettext_lazy as _
from django.utils.translation import ngettext
from django.views.generic import (
CreateView,
DeleteView,
DetailView,
FormView,
ListView,
TemplateView,
UpdateView,
)
from weasyprint import HTML
from lingo.agendas.chrono import ChronoError, mark_events_invoiced, unlock_events_check
from lingo.agendas.models import Agenda
from lingo.invoicing.forms import (
CampaignDatesForm,
CampaignForm,
DraftInvoiceFilterSet,
DraftInvoiceLineFilterSet,
InvoiceFilterSet,
InvoiceLineFilterSet,
RegieInvoiceFilterSet,
)
from lingo.invoicing.models import (
Campaign,
Counter,
DraftInvoice,
DraftInvoiceLine,
InjectedLine,
Invoice,
InvoiceLine,
InvoicePayment,
Pool,
Regie,
RegieImportError,
)
from lingo.pricing.forms import ImportForm
def is_ajax(request):
return request.headers.get('x-requested-with') == 'XMLHttpRequest'
def import_regies(data):
results = collections.defaultdict(list)
with transaction.atomic():
regies = data.get('regies', [])
for regie in regies:
created, regie_obj = Regie.import_json(regie)
if created:
results['created'].append(regie_obj)
else:
results['updated'].append(regie_obj)
return results
class HomeView(TemplateView):
template_name = 'lingo/invoicing/manager_home.html'
home = HomeView.as_view()
class RegiesListView(ListView):
template_name = 'lingo/invoicing/manager_regie_list.html'
model = Regie
regies_list = RegiesListView.as_view()
class RegieAddView(CreateView):
template_name = 'lingo/invoicing/manager_regie_form.html'
model = Regie
fields = ['label', 'description', 'cashier_role']
def get_success_url(self):
return reverse('lingo-manager-invoicing-regie-detail', args=[self.object.pk])
regie_add = RegieAddView.as_view()
class RegieDetailView(DetailView):
template_name = 'lingo/invoicing/manager_regie_detail.html'
model = Regie
def get_context_data(self, **kwargs):
kwargs['regie'] = self.object
kwargs['agendas'] = Agenda.objects.filter(regie=self.object)
kwargs['campaigns'] = self.object.campaign_set.all().order_by('-date_start')
has_related_objects = False
if kwargs['campaigns']:
has_related_objects = True
elif self.object.injectedline_set.exists():
has_related_objects = True
kwargs['has_related_objects'] = has_related_objects
return super().get_context_data(**kwargs)
regie_detail = RegieDetailView.as_view()
class RegieEditView(UpdateView):
template_name = 'lingo/invoicing/manager_regie_form.html'
model = Regie
fields = ['label', 'description', 'cashier_role', 'counter_name', 'number_format']
def get_success_url(self):
return reverse('lingo-manager-invoicing-regie-detail', args=[self.object.pk])
regie_edit = RegieEditView.as_view()
class RegieDeleteView(DeleteView):
template_name = 'lingo/manager_confirm_delete.html'
model = Regie
def get_queryset(self):
return super().get_queryset().filter(campaign__isnull=True, injectedline__isnull=True)
def delete(self, request, *args, **kwargs):
self.object = self.get_object()
Counter.objects.filter(regie=self.object).delete()
return super().delete(request, *args, **kwargs)
def get_success_url(self):
return reverse('lingo-manager-invoicing-regie-list')
regie_delete = RegieDeleteView.as_view()
class RegiesExportView(ListView):
model = Regie
def get(self, request, *args, **kwargs):
response = HttpResponse(content_type='application/json')
today = datetime.date.today()
attachment = 'attachment; filename="export_regies_{}.json"'.format(today.strftime('%Y%m%d'))
response['Content-Disposition'] = attachment
json.dump({'regies': [regie.export_json() for regie in self.get_queryset()]}, response, indent=2)
return response
regies_export = RegiesExportView.as_view()
class RegiesImportView(FormView):
form_class = ImportForm
template_name = 'lingo/invoicing/manager_import.html'
success_url = reverse_lazy('lingo-manager-invoicing-regie-list')
def form_valid(self, form):
try:
config_json = json.loads(self.request.FILES['config_json'].read())
except ValueError:
form.add_error('config_json', _('File is not in the expected JSON format.'))
return self.form_invalid(form)
try:
results = import_regies(config_json)
except RegieImportError as exc:
form.add_error('config_json', '%s' % exc)
return self.form_invalid(form)
import_messages = {
'create': lambda x: ngettext(
'A regie was created.',
'%(count)d regies were created.',
x,
),
'update': lambda x: ngettext(
'A regie was updated.',
'%(count)d regie were updated.',
x,
),
}
create_message = _('No regie created.')
update_message = _('No regie updated.')
created = len(results.get('created', []))
updated = len(results.get('updated', []))
if created:
create_message = import_messages.get('create')(created) % {'count': created}
if updated:
update_message = import_messages.get('update')(updated) % {'count': updated}
message = "%s %s" % (create_message, update_message)
messages.info(self.request, message)
return super().form_valid(form)
regies_import = RegiesImportView.as_view()
class CampaignAddView(CreateView):
template_name = 'lingo/invoicing/manager_campaign_form.html'
model = Campaign
form_class = CampaignForm
def dispatch(self, request, *args, **kwargs):
self.regie = get_object_or_404(Regie, pk=kwargs['regie_pk'])
return super().dispatch(request, *args, **kwargs)
def get_context_data(self, **kwargs):
kwargs['regie'] = self.regie
return super().get_context_data(**kwargs)
def get_form_kwargs(self):
kwargs = super().get_form_kwargs()
kwargs['instance'] = Campaign(regie=self.regie)
return kwargs
def get_success_url(self):
return reverse('lingo-manager-invoicing-campaign-detail', args=[self.regie.pk, self.object.pk])
campaign_add = CampaignAddView.as_view()
class CampaignDetailView(DetailView):
template_name = 'lingo/invoicing/manager_campaign_detail.html'
model = Campaign
def dispatch(self, request, *args, **kwargs):
self.regie = get_object_or_404(Regie, pk=kwargs['regie_pk'])
return super().dispatch(request, *args, **kwargs)
def get_queryset(self):
return super().get_queryset().filter(regie=self.regie)
def get_context_data(self, **kwargs):
draft_lines = DraftInvoiceLine.objects.filter(pool=OuterRef('pk')).order_by().values('pool')
count_draft_error = draft_lines.filter(status='error').annotate(count=Count('pool')).values('count')
count_draft_warning = (
draft_lines.filter(status='warning').annotate(count=Count('pool')).values('count')
)
count_draft_success = (
draft_lines.filter(status='success').annotate(count=Count('pool')).values('count')
)
lines = InvoiceLine.objects.filter(pool=OuterRef('pk')).order_by().values('pool')
count_error = (
lines.filter(status='error', error_status='').annotate(count=Count('pool')).values('count')
)
count_warning = lines.filter(status='warning').annotate(count=Count('pool')).values('count')
count_success = lines.filter(status='success').annotate(count=Count('pool')).values('count')
kwargs['regie'] = self.regie
kwargs['pools'] = self.object.pool_set.annotate(
draft_error_count=Coalesce(Subquery(count_draft_error, output_field=IntegerField()), Value(0)),
draft_warning_count=Coalesce(
Subquery(count_draft_warning, output_field=IntegerField()), Value(0)
),
draft_success_count=Coalesce(
Subquery(count_draft_success, output_field=IntegerField()), Value(0)
),
error_count=Coalesce(Subquery(count_error, output_field=IntegerField()), Value(0)),
warning_count=Coalesce(Subquery(count_warning, output_field=IntegerField()), Value(0)),
success_count=Coalesce(Subquery(count_success, output_field=IntegerField()), Value(0)),
).order_by('-created_at')
kwargs['has_running_pool'] = any(p.status in ['registered', 'running'] for p in kwargs['pools'])
kwargs['has_real_pool'] = any(not p.draft for p in kwargs['pools'])
kwargs['has_real_completed_pool'] = any(
not p.draft and p.status == 'completed' for p in kwargs['pools']
)
if self.object.invalid:
messages.warning(self.request, _('The last pool is invalid, please start a new pool.'))
return super().get_context_data(**kwargs)
campaign_detail = CampaignDetailView.as_view()
class CampaignEditView(UpdateView):
template_name = 'lingo/invoicing/manager_campaign_form.html'
model = Campaign
form_class = CampaignForm
def dispatch(self, request, *args, **kwargs):
self.regie = get_object_or_404(Regie, pk=kwargs['regie_pk'])
return super().dispatch(request, *args, **kwargs)
def get_queryset(self):
return (
super()
.get_queryset()
.filter(regie=self.regie, finalized=False)
.exclude(pool__draft=False)
.exclude(pool__status__in=['registered', 'running'])
)
def get_context_data(self, **kwargs):
kwargs['regie'] = self.regie
return super().get_context_data(**kwargs)
def get_success_url(self):
return reverse('lingo-manager-invoicing-campaign-detail', args=[self.regie.pk, self.object.pk])
campaign_edit = CampaignEditView.as_view()
class CampaignDatesEditView(UpdateView):
template_name = 'lingo/invoicing/manager_campaign_dates_form.html'
model = Campaign
form_class = CampaignDatesForm
def dispatch(self, request, *args, **kwargs):
self.regie = get_object_or_404(Regie, pk=kwargs['regie_pk'])
return super().dispatch(request, *args, **kwargs)
def get_queryset(self):
return (
super()
.get_queryset()
.filter(regie=self.regie, finalized=False)
.exclude(pool__status__in=['registered', 'running'])
)
def get_context_data(self, **kwargs):
kwargs['regie'] = self.regie
return super().get_context_data(**kwargs)
def get_success_url(self):
return '%s#open:dates' % reverse(
'lingo-manager-invoicing-campaign-detail', args=[self.regie.pk, self.object.pk]
)
campaign_dates_edit = CampaignDatesEditView.as_view()
class CampaignDeleteView(DeleteView):
template_name = 'lingo/manager_confirm_delete.html'
model = Campaign
def dispatch(self, request, *args, **kwargs):
self.regie = get_object_or_404(Regie, pk=kwargs['regie_pk'])
return super().dispatch(request, *args, **kwargs)
def get_queryset(self):
return (
super()
.get_queryset()
.filter(regie=self.regie, finalized=False)
.exclude(pool__draft=False)
.exclude(pool__status__in=['registered', 'running'])
)
def delete(self, request, *args, **kwargs):
self.object = self.get_object()
DraftInvoiceLine.objects.filter(pool__campaign=self.object).delete()
DraftInvoice.objects.filter(pool__campaign=self.object).delete()
Pool.objects.filter(campaign=self.object).delete()
return super().delete(request, *args, **kwargs)
def get_success_url(self):
return '%s#open:campaigns' % reverse('lingo-manager-invoicing-regie-detail', args=[self.regie.pk])
campaign_delete = CampaignDeleteView.as_view()
class CampaignUnlockCheckView(FormView):
template_name = 'lingo/invoicing/manager_campaign_unlock_check.html'
def dispatch(self, request, *args, **kwargs):
self.regie = get_object_or_404(Regie, pk=kwargs['regie_pk'])
self.object = get_object_or_404(
Campaign.objects.filter(regie=self.regie, invalid=False, finalized=False)
.exclude(pool__draft=False)
.exclude(pool__status__in=['registered', 'running']),
pk=kwargs['pk'],
)
return super().dispatch(request, *args, **kwargs)
def get_context_data(self, **kwargs):
kwargs['form'] = None
kwargs['regie'] = self.regie
kwargs['object'] = self.object
return super().get_context_data(**kwargs)
def post(self, request, *args, **kwargs):
self.object.mark_as_invalid()
agendas = [a.slug for a in self.object.agendas.all()]
if agendas:
try:
unlock_events_check(
agenda_slugs=agendas,
date_start=self.object.date_start,
date_end=self.object.date_end,
)
except ChronoError as e:
messages.error(self.request, _('Fail to unlock events check: %s') % e)
return redirect(
'%s#open:pools'
% reverse('lingo-manager-invoicing-campaign-detail', args=[self.regie.pk, self.object.pk])
)
campaign_unlock_check = CampaignUnlockCheckView.as_view()
class CampaignFinalizeView(FormView):
template_name = 'lingo/invoicing/manager_campaign_finalize.html'
def dispatch(self, request, *args, **kwargs):
self.regie = get_object_or_404(Regie, pk=kwargs['regie_pk'])
self.object = get_object_or_404(
Campaign.objects.filter(regie=self.regie, invalid=False, finalized=False).filter(
pk__in=Pool.objects.filter(draft=False, status='completed').values('campaign')
),
pk=kwargs['pk'],
)
return super().dispatch(request, *args, **kwargs)
def get_context_data(self, **kwargs):
kwargs['form'] = None
kwargs['regie'] = self.regie
kwargs['object'] = self.object
return super().get_context_data(**kwargs)
def post(self, request, *args, **kwargs):
try:
agendas = [a.slug for a in self.object.agendas.all()]
if agendas:
try:
mark_events_invoiced(
agenda_slugs=agendas,
date_start=self.object.date_start,
date_end=self.object.date_end,
)
except ChronoError as e:
messages.error(self.request, _('Fail to mark events as invoiced: %s') % e)
raise
except ChronoError:
pass
else:
self.object.mark_as_finalized()
return redirect(
'%s#open:pools'
% reverse('lingo-manager-invoicing-campaign-detail', args=[self.regie.pk, self.object.pk])
)
campaign_finalize = CampaignFinalizeView.as_view()
class PoolDetailView(ListView):
template_name = 'lingo/invoicing/manager_pool_detail.html'
paginate_by = 100
def dispatch(self, request, *args, **kwargs):
self.regie = get_object_or_404(Regie, pk=kwargs['regie_pk'])
self.campaign = get_object_or_404(Campaign, pk=kwargs['pk'], regie=self.regie)
self.object = get_object_or_404(Pool, pk=kwargs['pool_pk'], campaign=self.campaign)
return super().dispatch(request, *args, **kwargs)
def get_queryset(self):
invoice_model = Invoice
filter_model = InvoiceFilterSet
if self.object.draft:
invoice_model = DraftInvoice
filter_model = DraftInvoiceFilterSet
data = self.request.GET or None
self.filterset = filter_model(
data=data,
queryset=invoice_model.objects.filter(pool=self.object).order_by('created_at'),
pool=self.object,
)
return self.filterset.qs
def get_context_data(self, **kwargs):
kwargs['regie'] = self.regie
kwargs['object'] = self.campaign
kwargs['pool'] = self.object
kwargs['filterset'] = self.filterset
line_model = InvoiceLine
line_values = ['status', 'error_status']
if self.object.draft:
line_model = DraftInvoiceLine
line_values = ['status']
all_lines = line_model.objects.filter(pool=self.object).values(*line_values)
self.object.error_count = len(
[line for line in all_lines if line['status'] == 'error' and not line.get('error_status')]
)
self.object.warning_count = len([line for line in all_lines if line['status'] == 'warning'])
self.object.success_count = len([line for line in all_lines if line['status'] == 'success'])
kwargs['has_running_pool'] = any(
p.status in ['registered', 'running'] for p in self.campaign.pool_set.all()
)
return super().get_context_data(**kwargs)
pool_detail = PoolDetailView.as_view()
class PoolJournalView(ListView):
template_name = 'lingo/invoicing/manager_pool_journal.html'
paginate_by = 100
def dispatch(self, request, *args, **kwargs):
self.regie = get_object_or_404(Regie, pk=kwargs['regie_pk'])
self.campaign = get_object_or_404(Campaign, pk=kwargs['pk'], regie=self.regie)
self.object = get_object_or_404(Pool, pk=kwargs['pool_pk'], campaign=self.campaign)
return super().dispatch(request, *args, **kwargs)
def get_queryset(self):
line_model = InvoiceLine
filter_model = InvoiceLineFilterSet
if self.object.draft:
line_model = DraftInvoiceLine
filter_model = DraftInvoiceLineFilterSet
all_lines = line_model.objects.filter(pool=self.object).order_by('pk').select_related('invoice')
self.object.error_count = len(
[line for line in all_lines if line.status == 'error' and not getattr(line, 'error_status', '')]
)
self.object.warning_count = len([line for line in all_lines if line.status == 'warning'])
self.object.success_count = len([line for line in all_lines if line.status == 'success'])
data = self.request.GET or None
self.filterset = filter_model(data=data, queryset=all_lines, pool=self.object)
return self.filterset.qs if data and [v for v in data.values() if v] else all_lines
def get_context_data(self, **kwargs):
kwargs['regie'] = self.regie
kwargs['object'] = self.campaign
kwargs['pool'] = self.object
kwargs['filterset'] = self.filterset
return super().get_context_data(**kwargs)
pool_journal = PoolJournalView.as_view()
class PoolAddView(FormView):
template_name = 'lingo/invoicing/manager_pool_add.html'
def dispatch(self, request, *args, **kwargs):
self.regie = get_object_or_404(Regie, pk=kwargs['regie_pk'])
self.object = get_object_or_404(
Campaign.objects.filter(regie=self.regie, finalized=False)
.exclude(pool__draft=False)
.exclude(pool__status__in=['registered', 'running']),
pk=kwargs['pk'],
)
return super().dispatch(request, *args, **kwargs)
def get_context_data(self, **kwargs):
kwargs['form'] = None
kwargs['regie'] = self.regie
kwargs['object'] = self.object
return super().get_context_data(**kwargs)
def post(self, request, *args, **kwargs):
self.object.mark_as_valid()
self.object.generate()
return redirect(
'%s#open:pools'
% reverse('lingo-manager-invoicing-campaign-detail', args=[self.regie.pk, self.object.pk])
)
pool_add = PoolAddView.as_view()
class PoolPromoteView(FormView):
template_name = 'lingo/invoicing/manager_pool_promote.html'
def dispatch(self, request, *args, **kwargs):
self.regie = get_object_or_404(Regie, pk=kwargs['regie_pk'])
self.object = get_object_or_404(
Pool,
campaign__id=kwargs['pk'],
campaign__regie=self.regie,
campaign__invalid=False,
campaign__finalized=False,
pk=kwargs['pool_pk'],
draft=True,
status='completed',
)
if not self.object.is_last:
raise Http404
return super().dispatch(request, *args, **kwargs)
def get_context_data(self, **kwargs):
kwargs['form'] = None
kwargs['regie'] = self.regie
kwargs['object'] = self.object.campaign
kwargs['pool'] = self.object
return super().get_context_data(**kwargs)
def post(self, request, *args, **kwargs):
self.object.promote()
return redirect(
'%s#open:pools'
% reverse(
'lingo-manager-invoicing-campaign-detail', args=[self.regie.pk, self.object.campaign.pk]
)
)
pool_promote = PoolPromoteView.as_view()
class PoolDeleteView(DeleteView):
template_name = 'lingo/manager_confirm_delete.html'
model = Pool
pk_url_kwarg = 'pool_pk'
def dispatch(self, request, *args, **kwargs):
self.regie = get_object_or_404(Regie, pk=kwargs['regie_pk'])
self.campaign = get_object_or_404(
Campaign.objects.filter(regie=self.regie, finalized=False).exclude(
pool__status__in=['registered', 'running']
),
pk=kwargs['pk'],
)
return super().dispatch(request, *args, **kwargs)
def get_queryset(self):
return self.campaign.pool_set.all()
def delete(self, request, *args, **kwargs):
self.object = self.get_object()
if self.object.is_last and self.object.draft:
self.campaign.mark_as_invalid()
invoice_model = Invoice
line_model = InvoiceLine
if self.object.draft:
invoice_model = DraftInvoice
line_model = DraftInvoiceLine
line_model.objects.filter(pool=self.object).delete()
invoice_model.objects.filter(pool=self.object).delete()
return super().delete(request, *args, **kwargs)
def get_success_url(self):
return '%s#open:pools' % reverse(
'lingo-manager-invoicing-campaign-detail', args=[self.regie.pk, self.campaign.pk]
)
pool_delete = PoolDeleteView.as_view()
class PDFMixin:
def get(self, request, *args, **kwargs):
self.object = self.get_object()
result = self.object.html()
if 'html' in request.GET:
return HttpResponse(result)
html = HTML(string=result)
pdf = html.write_pdf()
response = HttpResponse(pdf, content_type='application/pdf')
response['Content-Disposition'] = 'attachment; filename="%s.pdf"' % self.object.formatted_number
return response
class InvoicePDFView(PDFMixin, DetailView):
pk_url_kwarg = 'invoice_pk'
model = Invoice
def dispatch(self, request, *args, **kwargs):
self.regie = get_object_or_404(Regie, pk=kwargs['regie_pk'])
self.pool = get_object_or_404(
Pool, pk=kwargs['pool_pk'], campaign=kwargs['pk'], campaign__regie=self.regie
)
return super().dispatch(request, *args, **kwargs)
def get_queryset(self):
invoice_model = Invoice
if self.pool.draft:
invoice_model = DraftInvoice
return invoice_model.objects.filter(pool=self.pool)
invoice_pdf = InvoicePDFView.as_view()
class InvoiceLineListView(ListView):
template_name = 'lingo/invoicing/manager_invoice_lines.html'
def dispatch(self, request, *args, **kwargs):
self.regie = get_object_or_404(Regie, pk=kwargs['regie_pk'])
self.pool = get_object_or_404(
Pool, pk=kwargs['pool_pk'], campaign_id=kwargs['pk'], campaign__regie=self.regie
)
invoice_model = Invoice
if self.pool.draft:
invoice_model = DraftInvoice
self.invoice = get_object_or_404(invoice_model, pk=kwargs['invoice_pk'], pool=self.pool)
return super().dispatch(request, *args, **kwargs)
def get_queryset(self):
return self.invoice.lines.all().order_by('user_external_id', 'event_date', 'pk')
def get_context_data(self, **kwargs):
kwargs['regie'] = self.pool.campaign.regie
kwargs['object'] = self.pool.campaign
kwargs['pool'] = self.pool
if not self.pool.draft:
kwargs['invoice'] = self.invoice
kwargs['invoice_payments'] = (
InvoicePayment.objects.filter(invoice=self.invoice)
.select_related('payment')
.order_by('created_at')
)
return super().get_context_data(**kwargs)
invoice_line_list = InvoiceLineListView.as_view()
class LineSetErrorStatusView(DetailView):
model = InvoiceLine
pk_url_kwarg = 'line_pk'
template_name = 'lingo/invoicing/manager_line_detail_fragment.html'
def dispatch(self, request, *args, **kwargs):
self.regie = get_object_or_404(Regie, pk=kwargs['regie_pk'])
return super().dispatch(request, *args, **kwargs)
def get_queryset(self):
return (
super()
.get_queryset()
.filter(
status='error',
pool=self.kwargs['pool_pk'],
pool__campaign=self.kwargs['pk'],
pool__campaign__regie=self.regie,
)
)
def get_context_data(self, **kwargs):
kwargs['regie'] = self.regie
kwargs['object'] = self.object.pool.campaign
kwargs['pool'] = self.object.pool
kwargs['line'] = self.object
return super().get_context_data(**kwargs)
def get(self, request, *args, **kwargs):
self.object = self.get_object()
error_status = kwargs['status']
if error_status == 'reset':
self.object.error_status = ''
elif error_status == 'ignore':
self.object.error_status = 'ignored'
elif error_status == 'fix':
self.object.error_status = 'fixed'
else:
raise Http404
self.object.save()
if is_ajax(self.request):
context = self.get_context_data(object=self.object)
return self.render_to_response(context)
return redirect(
reverse(
'lingo-manager-invoicing-pool-journal', args=[self.regie.pk, kwargs['pk'], kwargs['pool_pk']]
)
)
line_set_error_status = LineSetErrorStatusView.as_view()
class NonInvoicedLineListView(ListView):
template_name = 'lingo/invoicing/manager_non_invoiced_line_list.html'
paginate_by = 100
def dispatch(self, request, *args, **kwargs):
self.regie = get_object_or_404(Regie, pk=kwargs['regie_pk'])
return super().dispatch(request, *args, **kwargs)
def get_queryset(self):
fields = [
'pk',
'event_date',
'slug',
'label',
'quantity',
'unit_amount',
'total_amount',
'user_external_id',
'payer_external_id',
'payer_first_name',
'payer_last_name',
'user_first_name',
'user_last_name',
'event',
'pricing_data',
'status',
'pool_id',
]
qs1 = InvoiceLine.objects.filter(
status='error', error_status='', pool__campaign__regie=self.regie
).values(*fields)
qs2 = (
InjectedLine.objects.filter(invoiceline__isnull=True, regie=self.regie)
.annotate(
user_first_name=Value('', output_field=CharField()),
user_last_name=Value('', output_field=CharField()),
event=Value({}, output_field=JSONField()),
pricing_data=Value({}, output_field=JSONField()),
status=Value('injected', output_field=CharField()),
pool_id=Value(0, output_field=IntegerField()),
)
.values(*fields)
)
qs = qs1.union(qs2).order_by('event_date', 'user_external_id', 'label', 'pk')
return qs
def get_context_data(self, **kwargs):
kwargs['regie'] = self.regie
context = super().get_context_data(**kwargs)
pools = Pool.objects.filter(draft=False).in_bulk()
for line in context['object_list']:
if line['status'] == 'error':
line['user_name'] = InvoiceLine(
user_first_name=line['user_first_name'], user_last_name=line['user_last_name']
).user_name
line['payer_name'] = InvoiceLine(
payer_first_name=line['payer_first_name'], payer_last_name=line['payer_last_name']
).payer_name
line['error_display'] = InvoiceLine(
status=line['status'], pricing_data=line['pricing_data']
).get_error_display()
line['campaign_id'] = pools[line['pool_id']].campaign_id
line['chrono_event_url'] = InvoiceLine(event=line['event']).get_chrono_event_url()
return context
non_invoiced_line_list = NonInvoicedLineListView.as_view()
class RegieInvoiceListView(ListView):
template_name = 'lingo/invoicing/manager_invoice_list.html'
paginate_by = 100
def dispatch(self, request, *args, **kwargs):
self.regie = get_object_or_404(Regie, pk=kwargs['regie_pk'])
return super().dispatch(request, *args, **kwargs)
def get_queryset(self):
self.filterset = RegieInvoiceFilterSet(
data=self.request.GET or None,
queryset=Invoice.objects.filter(regie=self.regie).prefetch_related('pool').order_by('created_at'),
)
return self.filterset.qs
def get_context_data(self, **kwargs):
kwargs['regie'] = self.regie
kwargs['filterset'] = self.filterset
return super().get_context_data(**kwargs)
regie_invoice_list = RegieInvoiceListView.as_view()
class RegieInvoicePDFView(PDFMixin, DetailView):
pk_url_kwarg = 'invoice_pk'
model = Invoice
def dispatch(self, request, *args, **kwargs):
self.regie = get_object_or_404(Regie, pk=kwargs['regie_pk'])
return super().dispatch(request, *args, **kwargs)
def get_queryset(self):
return super().get_queryset().filter(regie=self.regie)
regie_invoice_pdf = RegieInvoicePDFView.as_view()
class RegieInvoiceLineListView(ListView):
template_name = 'lingo/invoicing/manager_invoice_lines.html'
def dispatch(self, request, *args, **kwargs):
self.regie = get_object_or_404(Regie, pk=kwargs['regie_pk'])
self.invoice = get_object_or_404(Invoice, pk=kwargs['invoice_pk'], regie=self.regie)
return super().dispatch(request, *args, **kwargs)
def get_queryset(self):
return self.invoice.lines.all().order_by('user_external_id', 'event_date', 'pk')
def get_context_data(self, **kwargs):
kwargs['regie'] = self.regie
kwargs['invoice'] = self.invoice
kwargs['invoice_payments'] = (
InvoicePayment.objects.filter(invoice=self.invoice)
.select_related('payment')
.order_by('created_at')
)
return super().get_context_data(**kwargs)
regie_invoice_line_list = RegieInvoiceLineListView.as_view()

View File

@ -0,0 +1,281 @@
# lingo - payment and billing system
# Copyright (C) 2023 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from django.contrib import messages
from django.db.models import Count, IntegerField, OuterRef, Subquery, Value
from django.db.models.functions import Coalesce
from django.shortcuts import get_object_or_404, redirect
from django.urls import reverse
from django.utils.translation import gettext_lazy as _
from django.views.generic import CreateView, DeleteView, DetailView, FormView, UpdateView
from lingo.agendas.chrono import ChronoError, mark_events_invoiced, unlock_events_check
from lingo.invoicing.forms import CampaignDatesForm, CampaignForm
from lingo.invoicing.models import Campaign, DraftInvoice, DraftInvoiceLine, InvoiceLine, Pool, Regie
class CampaignAddView(CreateView):
template_name = 'lingo/invoicing/manager_campaign_form.html'
model = Campaign
form_class = CampaignForm
def dispatch(self, request, *args, **kwargs):
self.regie = get_object_or_404(Regie, pk=kwargs['regie_pk'])
return super().dispatch(request, *args, **kwargs)
def get_context_data(self, **kwargs):
kwargs['regie'] = self.regie
return super().get_context_data(**kwargs)
def get_form_kwargs(self):
kwargs = super().get_form_kwargs()
kwargs['instance'] = Campaign(regie=self.regie)
return kwargs
def get_success_url(self):
return reverse('lingo-manager-invoicing-campaign-detail', args=[self.regie.pk, self.object.pk])
campaign_add = CampaignAddView.as_view()
class CampaignDetailView(DetailView):
template_name = 'lingo/invoicing/manager_campaign_detail.html'
model = Campaign
def dispatch(self, request, *args, **kwargs):
self.regie = get_object_or_404(Regie, pk=kwargs['regie_pk'])
return super().dispatch(request, *args, **kwargs)
def get_queryset(self):
return super().get_queryset().filter(regie=self.regie)
def get_context_data(self, **kwargs):
draft_lines = DraftInvoiceLine.objects.filter(pool=OuterRef('pk')).order_by().values('pool')
count_draft_error = draft_lines.filter(status='error').annotate(count=Count('pool')).values('count')
count_draft_warning = (
draft_lines.filter(status='warning').annotate(count=Count('pool')).values('count')
)
count_draft_success = (
draft_lines.filter(status='success').annotate(count=Count('pool')).values('count')
)
lines = InvoiceLine.objects.filter(pool=OuterRef('pk')).order_by().values('pool')
count_error = (
lines.filter(status='error', error_status='').annotate(count=Count('pool')).values('count')
)
count_warning = lines.filter(status='warning').annotate(count=Count('pool')).values('count')
count_success = lines.filter(status='success').annotate(count=Count('pool')).values('count')
kwargs['regie'] = self.regie
kwargs['pools'] = self.object.pool_set.annotate(
draft_error_count=Coalesce(Subquery(count_draft_error, output_field=IntegerField()), Value(0)),
draft_warning_count=Coalesce(
Subquery(count_draft_warning, output_field=IntegerField()), Value(0)
),
draft_success_count=Coalesce(
Subquery(count_draft_success, output_field=IntegerField()), Value(0)
),
error_count=Coalesce(Subquery(count_error, output_field=IntegerField()), Value(0)),
warning_count=Coalesce(Subquery(count_warning, output_field=IntegerField()), Value(0)),
success_count=Coalesce(Subquery(count_success, output_field=IntegerField()), Value(0)),
).order_by('-created_at')
kwargs['has_running_pool'] = any(p.status in ['registered', 'running'] for p in kwargs['pools'])
kwargs['has_real_pool'] = any(not p.draft for p in kwargs['pools'])
kwargs['has_real_completed_pool'] = any(
not p.draft and p.status == 'completed' for p in kwargs['pools']
)
if self.object.invalid:
messages.warning(self.request, _('The last pool is invalid, please start a new pool.'))
return super().get_context_data(**kwargs)
campaign_detail = CampaignDetailView.as_view()
class CampaignEditView(UpdateView):
template_name = 'lingo/invoicing/manager_campaign_form.html'
model = Campaign
form_class = CampaignForm
def dispatch(self, request, *args, **kwargs):
self.regie = get_object_or_404(Regie, pk=kwargs['regie_pk'])
return super().dispatch(request, *args, **kwargs)
def get_queryset(self):
return (
super()
.get_queryset()
.filter(regie=self.regie, finalized=False)
.exclude(pool__draft=False)
.exclude(pool__status__in=['registered', 'running'])
)
def get_context_data(self, **kwargs):
kwargs['regie'] = self.regie
return super().get_context_data(**kwargs)
def get_success_url(self):
return reverse('lingo-manager-invoicing-campaign-detail', args=[self.regie.pk, self.object.pk])
campaign_edit = CampaignEditView.as_view()
class CampaignDatesEditView(UpdateView):
template_name = 'lingo/invoicing/manager_campaign_dates_form.html'
model = Campaign
form_class = CampaignDatesForm
def dispatch(self, request, *args, **kwargs):
self.regie = get_object_or_404(Regie, pk=kwargs['regie_pk'])
return super().dispatch(request, *args, **kwargs)
def get_queryset(self):
return (
super()
.get_queryset()
.filter(regie=self.regie, finalized=False)
.exclude(pool__status__in=['registered', 'running'])
)
def get_context_data(self, **kwargs):
kwargs['regie'] = self.regie
return super().get_context_data(**kwargs)
def get_success_url(self):
return '%s#open:dates' % reverse(
'lingo-manager-invoicing-campaign-detail', args=[self.regie.pk, self.object.pk]
)
campaign_dates_edit = CampaignDatesEditView.as_view()
class CampaignDeleteView(DeleteView):
template_name = 'lingo/manager_confirm_delete.html'
model = Campaign
def dispatch(self, request, *args, **kwargs):
self.regie = get_object_or_404(Regie, pk=kwargs['regie_pk'])
return super().dispatch(request, *args, **kwargs)
def get_queryset(self):
return (
super()
.get_queryset()
.filter(regie=self.regie, finalized=False)
.exclude(pool__draft=False)
.exclude(pool__status__in=['registered', 'running'])
)
def delete(self, request, *args, **kwargs):
self.object = self.get_object()
DraftInvoiceLine.objects.filter(pool__campaign=self.object).delete()
DraftInvoice.objects.filter(pool__campaign=self.object).delete()
Pool.objects.filter(campaign=self.object).delete()
return super().delete(request, *args, **kwargs)
def get_success_url(self):
return '%s#open:campaigns' % reverse('lingo-manager-invoicing-regie-detail', args=[self.regie.pk])
campaign_delete = CampaignDeleteView.as_view()
class CampaignUnlockCheckView(FormView):
template_name = 'lingo/invoicing/manager_campaign_unlock_check.html'
def dispatch(self, request, *args, **kwargs):
self.regie = get_object_or_404(Regie, pk=kwargs['regie_pk'])
self.object = get_object_or_404(
Campaign.objects.filter(regie=self.regie, invalid=False, finalized=False)
.exclude(pool__draft=False)
.exclude(pool__status__in=['registered', 'running']),
pk=kwargs['pk'],
)
return super().dispatch(request, *args, **kwargs)
def get_context_data(self, **kwargs):
kwargs['form'] = None
kwargs['regie'] = self.regie
kwargs['object'] = self.object
return super().get_context_data(**kwargs)
def post(self, request, *args, **kwargs):
self.object.mark_as_invalid()
agendas = [a.slug for a in self.object.agendas.all()]
if agendas:
try:
unlock_events_check(
agenda_slugs=agendas,
date_start=self.object.date_start,
date_end=self.object.date_end,
)
except ChronoError as e:
messages.error(self.request, _('Fail to unlock events check: %s') % e)
return redirect(
'%s#open:pools'
% reverse('lingo-manager-invoicing-campaign-detail', args=[self.regie.pk, self.object.pk])
)
campaign_unlock_check = CampaignUnlockCheckView.as_view()
class CampaignFinalizeView(FormView):
template_name = 'lingo/invoicing/manager_campaign_finalize.html'
def dispatch(self, request, *args, **kwargs):
self.regie = get_object_or_404(Regie, pk=kwargs['regie_pk'])
self.object = get_object_or_404(
Campaign.objects.filter(regie=self.regie, invalid=False, finalized=False).filter(
pk__in=Pool.objects.filter(draft=False, status='completed').values('campaign')
),
pk=kwargs['pk'],
)
return super().dispatch(request, *args, **kwargs)
def get_context_data(self, **kwargs):
kwargs['form'] = None
kwargs['regie'] = self.regie
kwargs['object'] = self.object
return super().get_context_data(**kwargs)
def post(self, request, *args, **kwargs):
try:
agendas = [a.slug for a in self.object.agendas.all()]
if agendas:
try:
mark_events_invoiced(
agenda_slugs=agendas,
date_start=self.object.date_start,
date_end=self.object.date_end,
)
except ChronoError as e:
messages.error(self.request, _('Fail to mark events as invoiced: %s') % e)
raise
except ChronoError:
pass
else:
self.object.mark_as_finalized()
return redirect(
'%s#open:pools'
% reverse('lingo-manager-invoicing-campaign-detail', args=[self.regie.pk, self.object.pk])
)
campaign_finalize = CampaignFinalizeView.as_view()

View File

@ -0,0 +1,134 @@
# lingo - payment and billing system
# Copyright (C) 2023 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import datetime
import json
from django.contrib import messages
from django.http import HttpResponse, HttpResponseRedirect
from django.urls import reverse, reverse_lazy
from django.utils.encoding import force_str
from django.utils.translation import gettext_lazy as _
from django.utils.translation import ngettext
from django.views.generic import FormView, TemplateView
from lingo.invoicing.forms import ExportForm, ImportForm
from lingo.invoicing.utils import export_site, import_site
from lingo.utils.misc import LingoImportError
class HomeView(TemplateView):
template_name = 'lingo/invoicing/manager_home.html'
home = HomeView.as_view()
class ConfigExportView(FormView):
form_class = ExportForm
template_name = 'lingo/invoicing/export.html'
def form_valid(self, form):
response = HttpResponse(content_type='application/json')
today = datetime.date.today()
response['Content-Disposition'] = 'attachment; filename="export_invoicing_config_{}.json"'.format(
today.strftime('%Y%m%d')
)
json.dump(export_site(**form.cleaned_data), response, indent=2)
return response
config_export = ConfigExportView.as_view()
class ConfigImportView(FormView):
form_class = ImportForm
template_name = 'lingo/invoicing/import.html'
success_url = reverse_lazy('lingo-manager-invoicing-home')
def form_valid(self, form):
try:
config_json = json.loads(force_str(self.request.FILES['config_json'].read()))
except ValueError:
form.add_error('config_json', _('File is not in the expected JSON format.'))
return self.form_invalid(form)
try:
results = import_site(config_json)
except LingoImportError as exc:
form.add_error('config_json', '%s' % exc)
return self.form_invalid(form)
except KeyError as exc:
form.add_error('config_json', _('Key "%s" is missing.') % exc.args[0])
return self.form_invalid(form)
import_messages = {
'regies': {
'create_noop': _('No regie created.'),
'create': lambda x: ngettext(
'A regie has been created.',
'%(count)d regies have been created.',
x,
),
'update_noop': _('No regie updated.'),
'update': lambda x: ngettext(
'A regie has been updated.',
'%(count)d regies have been updated.',
x,
),
},
}
global_noop = True
for obj_name, obj_results in results.items():
if obj_results['all']:
global_noop = False
count = len(obj_results['created'])
if not count:
message1 = import_messages[obj_name].get('create_noop')
else:
message1 = import_messages[obj_name]['create'](count) % {'count': count}
count = len(obj_results['updated'])
if not count:
message2 = import_messages[obj_name]['update_noop']
else:
message2 = import_messages[obj_name]['update'](count) % {'count': count}
if message1:
obj_results['messages'] = "%s %s" % (message1, message2)
else:
obj_results['messages'] = message2
(r_count,) = (len(results['regies']['all']),)
if (r_count,) == (1,):
# only one regie imported, redirect to regie page
return HttpResponseRedirect(
reverse(
'lingo-manager-invoicing-regie-detail',
kwargs={'pk': results['regies']['all'][0].pk},
)
)
if global_noop:
messages.info(self.request, _('No data found.'))
else:
messages.info(self.request, results['regies']['messages'])
return super().form_valid(form)
config_import = ConfigImportView.as_view()

View File

@ -0,0 +1,349 @@
# lingo - payment and billing system
# Copyright (C) 2023 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from django.http import Http404
from django.shortcuts import get_object_or_404, redirect
from django.urls import reverse
from django.views.generic import DeleteView, DetailView, FormView, ListView
from lingo.invoicing.forms import (
DraftInvoiceFilterSet,
DraftInvoiceLineFilterSet,
InvoiceFilterSet,
InvoiceLineFilterSet,
)
from lingo.invoicing.models import (
Campaign,
DraftInvoice,
DraftInvoiceLine,
Invoice,
InvoiceLine,
InvoicePayment,
Pool,
Regie,
)
from lingo.invoicing.views.utils import PDFMixin
def is_ajax(request):
return request.headers.get('x-requested-with') == 'XMLHttpRequest'
class PoolDetailView(ListView):
template_name = 'lingo/invoicing/manager_pool_detail.html'
paginate_by = 100
def dispatch(self, request, *args, **kwargs):
self.regie = get_object_or_404(Regie, pk=kwargs['regie_pk'])
self.campaign = get_object_or_404(Campaign, pk=kwargs['pk'], regie=self.regie)
self.object = get_object_or_404(Pool, pk=kwargs['pool_pk'], campaign=self.campaign)
return super().dispatch(request, *args, **kwargs)
def get_queryset(self):
invoice_model = Invoice
filter_model = InvoiceFilterSet
if self.object.draft:
invoice_model = DraftInvoice
filter_model = DraftInvoiceFilterSet
data = self.request.GET or None
self.filterset = filter_model(
data=data,
queryset=invoice_model.objects.filter(pool=self.object).order_by('created_at'),
pool=self.object,
)
return self.filterset.qs
def get_context_data(self, **kwargs):
kwargs['regie'] = self.regie
kwargs['object'] = self.campaign
kwargs['pool'] = self.object
kwargs['filterset'] = self.filterset
line_model = InvoiceLine
line_values = ['status', 'error_status']
if self.object.draft:
line_model = DraftInvoiceLine
line_values = ['status']
all_lines = line_model.objects.filter(pool=self.object).values(*line_values)
self.object.error_count = len(
[line for line in all_lines if line['status'] == 'error' and not line.get('error_status')]
)
self.object.warning_count = len([line for line in all_lines if line['status'] == 'warning'])
self.object.success_count = len([line for line in all_lines if line['status'] == 'success'])
kwargs['has_running_pool'] = any(
p.status in ['registered', 'running'] for p in self.campaign.pool_set.all()
)
return super().get_context_data(**kwargs)
pool_detail = PoolDetailView.as_view()
class PoolJournalView(ListView):
template_name = 'lingo/invoicing/manager_pool_journal.html'
paginate_by = 100
def dispatch(self, request, *args, **kwargs):
self.regie = get_object_or_404(Regie, pk=kwargs['regie_pk'])
self.campaign = get_object_or_404(Campaign, pk=kwargs['pk'], regie=self.regie)
self.object = get_object_or_404(Pool, pk=kwargs['pool_pk'], campaign=self.campaign)
return super().dispatch(request, *args, **kwargs)
def get_queryset(self):
line_model = InvoiceLine
filter_model = InvoiceLineFilterSet
if self.object.draft:
line_model = DraftInvoiceLine
filter_model = DraftInvoiceLineFilterSet
all_lines = line_model.objects.filter(pool=self.object).order_by('pk').select_related('invoice')
self.object.error_count = len(
[line for line in all_lines if line.status == 'error' and not getattr(line, 'error_status', '')]
)
self.object.warning_count = len([line for line in all_lines if line.status == 'warning'])
self.object.success_count = len([line for line in all_lines if line.status == 'success'])
data = self.request.GET or None
self.filterset = filter_model(data=data, queryset=all_lines, pool=self.object)
return self.filterset.qs if data and [v for v in data.values() if v] else all_lines
def get_context_data(self, **kwargs):
kwargs['regie'] = self.regie
kwargs['object'] = self.campaign
kwargs['pool'] = self.object
kwargs['filterset'] = self.filterset
return super().get_context_data(**kwargs)
pool_journal = PoolJournalView.as_view()
class PoolAddView(FormView):
template_name = 'lingo/invoicing/manager_pool_add.html'
def dispatch(self, request, *args, **kwargs):
self.regie = get_object_or_404(Regie, pk=kwargs['regie_pk'])
self.object = get_object_or_404(
Campaign.objects.filter(regie=self.regie, finalized=False)
.exclude(pool__draft=False)
.exclude(pool__status__in=['registered', 'running']),
pk=kwargs['pk'],
)
return super().dispatch(request, *args, **kwargs)
def get_context_data(self, **kwargs):
kwargs['form'] = None
kwargs['regie'] = self.regie
kwargs['object'] = self.object
return super().get_context_data(**kwargs)
def post(self, request, *args, **kwargs):
self.object.mark_as_valid()
self.object.generate()
return redirect(
'%s#open:pools'
% reverse('lingo-manager-invoicing-campaign-detail', args=[self.regie.pk, self.object.pk])
)
pool_add = PoolAddView.as_view()
class PoolPromoteView(FormView):
template_name = 'lingo/invoicing/manager_pool_promote.html'
def dispatch(self, request, *args, **kwargs):
self.regie = get_object_or_404(Regie, pk=kwargs['regie_pk'])
self.object = get_object_or_404(
Pool,
campaign__id=kwargs['pk'],
campaign__regie=self.regie,
campaign__invalid=False,
campaign__finalized=False,
pk=kwargs['pool_pk'],
draft=True,
status='completed',
)
if not self.object.is_last:
raise Http404
return super().dispatch(request, *args, **kwargs)
def get_context_data(self, **kwargs):
kwargs['form'] = None
kwargs['regie'] = self.regie
kwargs['object'] = self.object.campaign
kwargs['pool'] = self.object
return super().get_context_data(**kwargs)
def post(self, request, *args, **kwargs):
self.object.promote()
return redirect(
'%s#open:pools'
% reverse(
'lingo-manager-invoicing-campaign-detail', args=[self.regie.pk, self.object.campaign.pk]
)
)
pool_promote = PoolPromoteView.as_view()
class PoolDeleteView(DeleteView):
template_name = 'lingo/manager_confirm_delete.html'
model = Pool
pk_url_kwarg = 'pool_pk'
def dispatch(self, request, *args, **kwargs):
self.regie = get_object_or_404(Regie, pk=kwargs['regie_pk'])
self.campaign = get_object_or_404(
Campaign.objects.filter(regie=self.regie, finalized=False).exclude(
pool__status__in=['registered', 'running']
),
pk=kwargs['pk'],
)
return super().dispatch(request, *args, **kwargs)
def get_queryset(self):
return self.campaign.pool_set.all()
def delete(self, request, *args, **kwargs):
self.object = self.get_object()
if self.object.is_last and self.object.draft:
self.campaign.mark_as_invalid()
invoice_model = Invoice
line_model = InvoiceLine
if self.object.draft:
invoice_model = DraftInvoice
line_model = DraftInvoiceLine
line_model.objects.filter(pool=self.object).delete()
invoice_model.objects.filter(pool=self.object).delete()
return super().delete(request, *args, **kwargs)
def get_success_url(self):
return '%s#open:pools' % reverse(
'lingo-manager-invoicing-campaign-detail', args=[self.regie.pk, self.campaign.pk]
)
pool_delete = PoolDeleteView.as_view()
class InvoicePDFView(PDFMixin, DetailView):
pk_url_kwarg = 'invoice_pk'
model = Invoice
def dispatch(self, request, *args, **kwargs):
self.regie = get_object_or_404(Regie, pk=kwargs['regie_pk'])
self.pool = get_object_or_404(
Pool, pk=kwargs['pool_pk'], campaign=kwargs['pk'], campaign__regie=self.regie
)
return super().dispatch(request, *args, **kwargs)
def get_queryset(self):
invoice_model = Invoice
if self.pool.draft:
invoice_model = DraftInvoice
return invoice_model.objects.filter(pool=self.pool)
invoice_pdf = InvoicePDFView.as_view()
class InvoiceLineListView(ListView):
template_name = 'lingo/invoicing/manager_invoice_lines.html'
def dispatch(self, request, *args, **kwargs):
self.regie = get_object_or_404(Regie, pk=kwargs['regie_pk'])
self.pool = get_object_or_404(
Pool, pk=kwargs['pool_pk'], campaign_id=kwargs['pk'], campaign__regie=self.regie
)
invoice_model = Invoice
if self.pool.draft:
invoice_model = DraftInvoice
self.invoice = get_object_or_404(invoice_model, pk=kwargs['invoice_pk'], pool=self.pool)
return super().dispatch(request, *args, **kwargs)
def get_queryset(self):
return self.invoice.lines.all().order_by('user_external_id', 'event_date', 'pk')
def get_context_data(self, **kwargs):
kwargs['regie'] = self.pool.campaign.regie
kwargs['object'] = self.pool.campaign
kwargs['pool'] = self.pool
if not self.pool.draft:
kwargs['invoice'] = self.invoice
kwargs['invoice_payments'] = (
InvoicePayment.objects.filter(invoice=self.invoice)
.select_related('payment')
.order_by('created_at')
)
return super().get_context_data(**kwargs)
invoice_line_list = InvoiceLineListView.as_view()
class LineSetErrorStatusView(DetailView):
model = InvoiceLine
pk_url_kwarg = 'line_pk'
template_name = 'lingo/invoicing/manager_line_detail_fragment.html'
def dispatch(self, request, *args, **kwargs):
self.regie = get_object_or_404(Regie, pk=kwargs['regie_pk'])
return super().dispatch(request, *args, **kwargs)
def get_queryset(self):
return (
super()
.get_queryset()
.filter(
status='error',
pool=self.kwargs['pool_pk'],
pool__campaign=self.kwargs['pk'],
pool__campaign__regie=self.regie,
)
)
def get_context_data(self, **kwargs):
kwargs['regie'] = self.regie
kwargs['object'] = self.object.pool.campaign
kwargs['pool'] = self.object.pool
kwargs['line'] = self.object
return super().get_context_data(**kwargs)
def get(self, request, *args, **kwargs):
self.object = self.get_object()
error_status = kwargs['status']
if error_status == 'reset':
self.object.error_status = ''
elif error_status == 'ignore':
self.object.error_status = 'ignored'
elif error_status == 'fix':
self.object.error_status = 'fixed'
else:
raise Http404
self.object.save()
if is_ajax(self.request):
context = self.get_context_data(object=self.object)
return self.render_to_response(context)
return redirect(
reverse(
'lingo-manager-invoicing-pool-journal', args=[self.regie.pk, kwargs['pk'], kwargs['pool_pk']]
)
)
line_set_error_status = LineSetErrorStatusView.as_view()

View File

@ -0,0 +1,265 @@
# lingo - payment and billing system
# Copyright (C) 2022 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import collections
import datetime
import json
from django.db import transaction
from django.db.models import CharField, IntegerField, JSONField, Value
from django.http import HttpResponse
from django.shortcuts import get_object_or_404
from django.urls import reverse
from django.views.generic import CreateView, DeleteView, DetailView, ListView, UpdateView
from lingo.agendas.models import Agenda
from lingo.invoicing.forms import RegieForm, RegieInvoiceFilterSet
from lingo.invoicing.models import Counter, InjectedLine, Invoice, InvoiceLine, InvoicePayment, Pool, Regie
from lingo.invoicing.views.utils import PDFMixin
def import_regies(data):
results = collections.defaultdict(list)
with transaction.atomic():
regies = data.get('regies', [])
for regie in regies:
created, regie_obj = Regie.import_json(regie)
if created:
results['created'].append(regie_obj)
else:
results['updated'].append(regie_obj)
return results
class RegiesListView(ListView):
template_name = 'lingo/invoicing/manager_regie_list.html'
model = Regie
regies_list = RegiesListView.as_view()
class RegieAddView(CreateView):
template_name = 'lingo/invoicing/manager_regie_form.html'
model = Regie
fields = ['label', 'description', 'cashier_role']
def get_success_url(self):
return reverse('lingo-manager-invoicing-regie-detail', args=[self.object.pk])
regie_add = RegieAddView.as_view()
class RegieDetailView(DetailView):
template_name = 'lingo/invoicing/manager_regie_detail.html'
model = Regie
def get_context_data(self, **kwargs):
kwargs['regie'] = self.object
kwargs['agendas'] = Agenda.objects.filter(regie=self.object)
kwargs['campaigns'] = self.object.campaign_set.all().order_by('-date_start')
has_related_objects = False
if kwargs['campaigns']:
has_related_objects = True
elif self.object.injectedline_set.exists():
has_related_objects = True
kwargs['has_related_objects'] = has_related_objects
return super().get_context_data(**kwargs)
regie_detail = RegieDetailView.as_view()
class RegieEditView(UpdateView):
template_name = 'lingo/invoicing/manager_regie_form.html'
model = Regie
form_class = RegieForm
def get_success_url(self):
return reverse('lingo-manager-invoicing-regie-detail', args=[self.object.pk])
regie_edit = RegieEditView.as_view()
class RegieDeleteView(DeleteView):
template_name = 'lingo/manager_confirm_delete.html'
model = Regie
def get_queryset(self):
return super().get_queryset().filter(campaign__isnull=True, injectedline__isnull=True)
def delete(self, request, *args, **kwargs):
self.object = self.get_object()
Counter.objects.filter(regie=self.object).delete()
return super().delete(request, *args, **kwargs)
def get_success_url(self):
return reverse('lingo-manager-invoicing-regie-list')
regie_delete = RegieDeleteView.as_view()
class RegieExport(DetailView):
model = Regie
def get(self, request, *args, **kwargs):
response = HttpResponse(content_type='application/json')
today = datetime.date.today()
attachment = 'attachment; filename="export_regie_{}_{}.json"'.format(
self.get_object().slug, today.strftime('%Y%m%d')
)
response['Content-Disposition'] = attachment
json.dump({'regies': [self.get_object().export_json()]}, response, indent=2)
return response
regie_export = RegieExport.as_view()
class NonInvoicedLineListView(ListView):
template_name = 'lingo/invoicing/manager_non_invoiced_line_list.html'
paginate_by = 100
def dispatch(self, request, *args, **kwargs):
self.regie = get_object_or_404(Regie, pk=kwargs['regie_pk'])
return super().dispatch(request, *args, **kwargs)
def get_queryset(self):
fields = [
'pk',
'event_date',
'slug',
'label',
'quantity',
'unit_amount',
'total_amount',
'user_external_id',
'payer_external_id',
'payer_first_name',
'payer_last_name',
'user_first_name',
'user_last_name',
'event',
'pricing_data',
'status',
'pool_id',
]
qs1 = InvoiceLine.objects.filter(
status='error', error_status='', pool__campaign__regie=self.regie
).values(*fields)
qs2 = (
InjectedLine.objects.filter(invoiceline__isnull=True, regie=self.regie)
.annotate(
user_first_name=Value('', output_field=CharField()),
user_last_name=Value('', output_field=CharField()),
event=Value({}, output_field=JSONField()),
pricing_data=Value({}, output_field=JSONField()),
status=Value('injected', output_field=CharField()),
pool_id=Value(0, output_field=IntegerField()),
)
.values(*fields)
)
qs = qs1.union(qs2).order_by('event_date', 'user_external_id', 'label', 'pk')
return qs
def get_context_data(self, **kwargs):
kwargs['regie'] = self.regie
context = super().get_context_data(**kwargs)
pools = Pool.objects.filter(draft=False).in_bulk()
for line in context['object_list']:
if line['status'] == 'error':
line['user_name'] = InvoiceLine(
user_first_name=line['user_first_name'], user_last_name=line['user_last_name']
).user_name
line['payer_name'] = InvoiceLine(
payer_first_name=line['payer_first_name'], payer_last_name=line['payer_last_name']
).payer_name
line['error_display'] = InvoiceLine(
status=line['status'], pricing_data=line['pricing_data']
).get_error_display()
line['campaign_id'] = pools[line['pool_id']].campaign_id
line['chrono_event_url'] = InvoiceLine(event=line['event']).get_chrono_event_url()
return context
non_invoiced_line_list = NonInvoicedLineListView.as_view()
class RegieInvoiceListView(ListView):
template_name = 'lingo/invoicing/manager_invoice_list.html'
paginate_by = 100
def dispatch(self, request, *args, **kwargs):
self.regie = get_object_or_404(Regie, pk=kwargs['regie_pk'])
return super().dispatch(request, *args, **kwargs)
def get_queryset(self):
self.filterset = RegieInvoiceFilterSet(
data=self.request.GET or None,
queryset=Invoice.objects.filter(regie=self.regie).prefetch_related('pool').order_by('created_at'),
)
return self.filterset.qs
def get_context_data(self, **kwargs):
kwargs['regie'] = self.regie
kwargs['filterset'] = self.filterset
return super().get_context_data(**kwargs)
regie_invoice_list = RegieInvoiceListView.as_view()
class RegieInvoicePDFView(PDFMixin, DetailView):
pk_url_kwarg = 'invoice_pk'
model = Invoice
def dispatch(self, request, *args, **kwargs):
self.regie = get_object_or_404(Regie, pk=kwargs['regie_pk'])
return super().dispatch(request, *args, **kwargs)
def get_queryset(self):
return super().get_queryset().filter(regie=self.regie)
regie_invoice_pdf = RegieInvoicePDFView.as_view()
class RegieInvoiceLineListView(ListView):
template_name = 'lingo/invoicing/manager_invoice_lines.html'
def dispatch(self, request, *args, **kwargs):
self.regie = get_object_or_404(Regie, pk=kwargs['regie_pk'])
self.invoice = get_object_or_404(Invoice, pk=kwargs['invoice_pk'], regie=self.regie)
return super().dispatch(request, *args, **kwargs)
def get_queryset(self):
return self.invoice.lines.all().order_by('user_external_id', 'event_date', 'pk')
def get_context_data(self, **kwargs):
kwargs['regie'] = self.regie
kwargs['invoice'] = self.invoice
kwargs['invoice_payments'] = (
InvoicePayment.objects.filter(invoice=self.invoice)
.select_related('payment')
.order_by('created_at')
)
return super().get_context_data(**kwargs)
regie_invoice_line_list = RegieInvoiceLineListView.as_view()

View File

@ -1,5 +1,5 @@
# lingo - payment and billing system
# Copyright (C) 2022 Entr'ouvert
# Copyright (C) 2023 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
@ -14,26 +14,18 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import json
import sys
from django.core.management.base import BaseCommand
from lingo.pricing.utils import export_site
from django.http import HttpResponse
from weasyprint import HTML
class Command(BaseCommand):
help = 'Export the site'
def add_arguments(self, parser):
parser.add_argument(
'--output', metavar='FILE', default=None, help='name of a file to write output to'
)
def handle(self, *args, **options):
if options['output']:
with open(options['output'], 'w') as output:
json.dump(export_site(), output, indent=4)
else:
output = sys.stdout
json.dump(export_site(), output, indent=4)
class PDFMixin:
def get(self, request, *args, **kwargs):
self.object = self.get_object()
result = self.object.html()
if 'html' in request.GET:
return HttpResponse(result)
html = HTML(string=result)
pdf = html.write_pdf()
response = HttpResponse(pdf, content_type='application/pdf')
response['Content-Disposition'] = 'attachment; filename="%s.pdf"' % self.object.formatted_number
return response

View File

@ -1,54 +0,0 @@
# lingo - payment and billing system
# Copyright (C) 2022 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import json
import sys
from django.core.management.base import BaseCommand, CommandError
from lingo.pricing.utils import import_site
from lingo.utils.misc import AgendaImportError
class Command(BaseCommand):
help = 'Import an exported site'
def add_arguments(self, parser):
parser.add_argument('filename', metavar='FILENAME', type=str, help='name of file to import')
parser.add_argument('--clean', action='store_true', default=False, help='Clean site before importing')
parser.add_argument(
'--if-empty', action='store_true', default=False, help='Import only if site is empty'
)
parser.add_argument('--overwrite', action='store_true', default=False, help='Overwrite existing data')
def handle(self, filename, **options):
def do_import(fd):
try:
import_site(
json.load(fd),
if_empty=options['if_empty'],
clean=options['clean'],
overwrite=options['overwrite'],
)
except AgendaImportError as exc:
raise CommandError('%s' % exc)
if filename == '-':
fd = sys.stdin
do_import(fd)
else:
with open(filename) as fd:
do_import(fd)

View File

@ -25,7 +25,7 @@ from django.utils.text import slugify
from django.utils.translation import gettext_lazy as _
from lingo.agendas.models import Agenda, CheckType
from lingo.utils.misc import AgendaImportError, clean_import_data, generate_slug
from lingo.utils.misc import LingoImportError, clean_import_data, generate_slug
class PricingError(Exception):
@ -102,14 +102,11 @@ class CriteriaCategory(models.Model):
return slugify(self.label)
@classmethod
def import_json(cls, data, overwrite=False):
def import_json(cls, data):
criterias = data.pop('criterias', [])
data = clean_import_data(cls, data)
category, created = cls.objects.update_or_create(slug=data['slug'], defaults=data)
if overwrite:
Criteria.objects.filter(category=category).delete()
for criteria in criterias:
criteria['category'] = category
Criteria.import_json(criteria)
@ -297,7 +294,7 @@ class Pricing(models.Model):
return result
@classmethod
def import_json(cls, data, overwrite=False):
def import_json(cls, data):
data = data.copy()
categories = data.pop('categories', [])
categories_by_slug = {c.slug: c for c in CriteriaCategory.objects.all()}
@ -308,10 +305,10 @@ class Pricing(models.Model):
for category_data in categories:
category_slug = category_data['category']
if category_data['category'] not in categories_by_slug:
raise AgendaImportError(_('Missing "%s" pricing category') % category_data['category'])
raise LingoImportError(_('Missing "%s" pricing category') % category_data['category'])
for criteria_slug in category_data['criterias']:
if (category_slug, criteria_slug) not in criterias_by_categories_and_slug:
raise AgendaImportError(
raise LingoImportError(
_('Missing "%s" pricing criteria for "%s" category') % (criteria_slug, category_slug)
)
data = clean_import_data(cls, data)
@ -446,7 +443,7 @@ class AgendaPricing(models.Model):
}
@classmethod
def import_json(cls, data, overwrite=False):
def import_json(cls, data):
data = copy.deepcopy(data)
agenda_slugs = data.pop('agendas', None) or []
billing_dates = data.pop('billing_dates', None) or []
@ -456,19 +453,15 @@ class AgendaPricing(models.Model):
try:
agendas.append(Agenda.objects.get(slug=agenda_slug))
except Agenda.DoesNotExist:
raise AgendaImportError(_('Missing "%s" agenda') % agenda_slug)
raise LingoImportError(_('Missing "%s" agenda') % agenda_slug)
try:
data['pricing'] = Pricing.objects.get(slug=data['pricing'])
except Pricing.DoesNotExist:
raise AgendaImportError(_('Missing "%s" pricing model') % data['pricing'])
raise LingoImportError(_('Missing "%s" pricing model') % data['pricing'])
agenda_pricing, created = cls.objects.update_or_create(slug=data['slug'], defaults=data)
if overwrite and not created:
agenda_pricing.agendas.clear()
agenda_pricing.agendas.add(*agendas)
if overwrite and not created:
agenda_pricing.billingdates.all().delete()
for billing_date in billing_dates:
billing_date['agenda_pricing'] = agenda_pricing
BillingDate.import_json(billing_date)

View File

@ -1,4 +1,4 @@
{% extends "lingo/pricing/manager_pricing_list.html" %}
{% extends "lingo/pricing/manager_home.html" %}
{% load i18n %}
{% block breadcrumb %}
@ -16,7 +16,7 @@
{{ form.as_p }}
<div class="buttons">
<button class="submit-button">{% trans "Export" %}</button>
<a class="cancel" href="{% url 'lingo-manager-pricing-list' %}">{% trans 'Cancel' %}</a>
<a class="cancel" href="{% url 'lingo-manager-pricing-home' %}">{% trans 'Cancel' %}</a>
</div>
</form>
{% endblock %}

View File

@ -1,4 +1,4 @@
{% extends "lingo/pricing/manager_pricing_list.html" %}
{% extends "lingo/pricing/manager_home.html" %}
{% load i18n %}
{% block breadcrumb %}
@ -16,7 +16,7 @@
{{ form.as_p }}
<div class="buttons">
<button class="submit-button">{% trans "Import" %}</button>
<a class="cancel" href="{% url 'lingo-manager-pricing-list' %}">{% trans 'Cancel' %}</a>
<a class="cancel" href="{% url 'lingo-manager-pricing-home' %}">{% trans 'Cancel' %}</a>
</div>
</form>
{% endblock %}

View File

@ -30,7 +30,7 @@ def export_site(
pricings=True,
):
'''Dump site objects to JSON-dumpable dictionnary'''
data = collections.OrderedDict()
data = {}
if pricings:
data['pricings'] = [x.export_json() for x in AgendaPricing.objects.all()]
if pricing_models:
@ -44,21 +44,7 @@ def export_site(
return data
def import_site(data, if_empty=False, clean=False, overwrite=False):
if if_empty and (
AgendaPricing.objects.exists()
or CheckTypeGroup.objects.exists()
or CriteriaCategory.objects.exists()
or Pricing.objects.exists()
):
return
if clean:
AgendaPricing.objects.all().delete()
CriteriaCategory.objects.all().delete()
Pricing.objects.all().delete()
CheckTypeGroup.objects.all().delete()
def import_site(data):
results = {
key: collections.defaultdict(list)
for key in [
@ -80,7 +66,7 @@ def import_site(data, if_empty=False, clean=False, overwrite=False):
):
objs = data.get(key, [])
for obj in objs:
created, obj = cls.import_json(obj, overwrite=overwrite)
created, obj = cls.import_json(obj)
results[key]['all'].append(obj)
if created:
results[key]['created'].append(obj)

View File

@ -71,7 +71,7 @@ from lingo.pricing.models import (
PricingCriteriaCategory,
)
from lingo.pricing.utils import export_site, import_site
from lingo.utils.misc import AgendaImportError
from lingo.utils.misc import LingoImportError
class HomeView(TemplateView):
@ -111,8 +111,8 @@ class ConfigImportView(FormView):
return self.form_invalid(form)
try:
results = import_site(config_json, overwrite=False)
except AgendaImportError as exc:
results = import_site(config_json)
except LingoImportError as exc:
form.add_error('config_json', '%s' % exc)
return self.form_invalid(form)
except KeyError as exc:

View File

@ -22,7 +22,7 @@ from django.core.exceptions import FieldDoesNotExist, ValidationError
from django.utils.translation import gettext_lazy as _
class AgendaImportError(Exception):
class LingoImportError(Exception):
pass
@ -69,7 +69,7 @@ def clean_import_data(cls, data):
try:
field.run_validators(value)
except ValidationError:
raise AgendaImportError(_('Bad slug format "%s"') % value)
raise LingoImportError(_('Bad slug format "%s"') % value)
return cleaned_data

View File

@ -824,7 +824,7 @@ def test_add_pool(app, admin_user):
app.get('/manage/invoicing/regie/%s/campaign/%s/pool/add/' % (regie.pk, campaign.pk), status=404)
@mock.patch('lingo.invoicing.views.unlock_events_check')
@mock.patch('lingo.invoicing.views.campaign.unlock_events_check')
def test_unlock_check(mock_unlock, app, admin_user):
regie = Regie.objects.create(label='Foo')
agenda = Agenda.objects.create(label='Foo bar', regie=regie)
@ -919,7 +919,7 @@ def test_unlock_check(mock_unlock, app, admin_user):
app.get('/manage/invoicing/regie/%s/campaign/%s/unlock-check/' % (regie.pk, campaign.pk), status=404)
@mock.patch('lingo.invoicing.views.mark_events_invoiced')
@mock.patch('lingo.invoicing.views.campaign.mark_events_invoiced')
def test_finalize(mock_invoiced, app, admin_user):
regie = Regie.objects.create(label='Foo')
agenda = Agenda.objects.create(label='Foo bar', regie=regie)

View File

@ -0,0 +1,95 @@
import copy
import json
import pytest
from webtest import Upload
from lingo.invoicing.models import Regie
from tests.utils import login
pytestmark = pytest.mark.django_db
def test_export_site(freezer, app, admin_user):
freezer.move_to('2020-06-15')
login(app)
resp = app.get('/manage/invoicing/')
resp = resp.click('Export')
resp = resp.form.submit()
assert resp.headers['content-type'] == 'application/json'
assert (
resp.headers['content-disposition'] == 'attachment; filename="export_invoicing_config_20200615.json"'
)
site_json = json.loads(resp.text)
assert site_json == {
'regies': [],
}
Regie.objects.create(label='Foo Bar')
resp = app.get('/manage/invoicing/export/')
resp = resp.form.submit()
site_json = json.loads(resp.text)
assert len(site_json['regies']) == 1
@pytest.mark.freeze_time('2023-06-02')
def test_import_regie(app, admin_user):
regie = Regie.objects.create(label='Foo bar')
app = login(app)
resp = app.get('/manage/invoicing/regie/%s/' % regie.pk)
resp = resp.click(href='/manage/invoicing/regie/%s/export/' % regie.pk)
assert resp.headers['content-type'] == 'application/json'
assert resp.headers['content-disposition'] == 'attachment; filename="export_regie_foo-bar_20230602.json"'
regie_export = resp.text
# existing regie
resp = app.get('/manage/invoicing/', status=200)
resp = resp.click('Import')
resp.form['config_json'] = Upload('export.json', regie_export.encode('utf-8'), 'application/json')
resp = resp.form.submit()
assert resp.location.endswith('/manage/invoicing/regie/%s/' % regie.pk)
resp = resp.follow()
assert 'No regie created. A regie has been updated.' not in resp.text
assert Regie.objects.count() == 1
# new regie
Regie.objects.all().delete()
resp = app.get('/manage/invoicing/', status=200)
resp = resp.click('Import')
resp.form['config_json'] = Upload('export.json', regie_export.encode('utf-8'), 'application/json')
resp = resp.form.submit()
regie = Regie.objects.latest('pk')
assert resp.location.endswith('/manage/invoicing/regie/%s/' % regie.pk)
resp = resp.follow()
assert 'A regie has been created. No regie updated.' not in resp.text
assert Regie.objects.count() == 1
# multiple regies
regies = json.loads(regie_export)
regies['regies'].append(copy.copy(regies['regies'][0]))
regies['regies'].append(copy.copy(regies['regies'][0]))
regies['regies'][1]['label'] = 'Foo bar 2'
regies['regies'][1]['slug'] = 'foo-bar-2'
regies['regies'][2]['label'] = 'Foo bar 3'
regies['regies'][2]['slug'] = 'foo-bar-3'
resp = app.get('/manage/invoicing/', status=200)
resp = resp.click('Import')
resp.form['config_json'] = Upload('export.json', json.dumps(regies).encode('utf-8'), 'application/json')
resp = resp.form.submit()
assert resp.location.endswith('/manage/invoicing/')
resp = resp.follow()
assert '2 regies have been created. A regie has been updated.' in resp.text
assert Regie.objects.count() == 3
Regie.objects.all().delete()
resp = app.get('/manage/invoicing/', status=200)
resp = resp.click('Import')
resp.form['config_json'] = Upload('export.json', json.dumps(regies).encode('utf-8'), 'application/json')
resp = resp.form.submit().follow()
assert '3 regies have been created. No regie updated.' in resp.text
assert Regie.objects.count() == 3

View File

@ -1,5 +1,4 @@
import datetime
import json
from urllib.parse import urlparse
import pytest
@ -7,7 +6,6 @@ from django.contrib.auth.models import Group
from django.urls import reverse
from django.utils.formats import date_format
from django.utils.timezone import localtime
from webtest import Upload
from lingo.agendas.models import Agenda
from lingo.invoicing.models import (
@ -153,19 +151,27 @@ def test_manager_invoicing_regie_edit(app, admin_user):
assert h2.text() == 'Edit regie - Foo'
form = resp.form
form.set('label', 'Foo bar')
form.set('slug', 'foo-bar')
form.set('description', 'foo new description')
form.set('cashier_role', group_bar.id)
response = form.submit().follow()
assert Regie.objects.count() == 1
regie = Regie.objects.first()
assert regie.label == 'Foo bar'
assert regie.slug == 'foo'
assert regie.slug == 'foo-bar'
assert regie.description == 'foo new description'
assert regie.cashier_role == group_bar
assert urlparse(response.request.url).path == reverse(
'lingo-manager-invoicing-regie-detail', kwargs={'pk': regie.pk}
)
Regie.objects.create(label='Foo', description='foo description', cashier_role=group_foo)
resp = app.get(reverse('lingo-manager-invoicing-regie-edit', kwargs={'pk': regie.pk}))
form = resp.form
form.set('slug', 'foo')
response = form.submit()
assert response.context['form'].errors['slug'] == ['Another regie exists with the same identifier.']
def test_manager_invoicing_regie_delete(app, admin_user):
app = login(app)
@ -217,29 +223,6 @@ def test_manager_invoicing_regie_delete(app, admin_user):
assert Counter.objects.count() == 0
def test_manager_invoicing_regie_import_export(app, admin_user, freezer):
freezer.move_to('2020-06-15')
app = login(app)
group = Group.objects.create(name='role-foo')
regie1 = Regie.objects.create(label='Foo', description='foo description', cashier_role=group)
Regie.objects.create(label='Bar', description='bar description', cashier_role=group)
response = app.get(reverse('lingo-manager-invoicing-regie-export'))
assert response.headers['content-type'] == 'application/json'
assert response.headers['content-disposition'] == 'attachment; filename="export_regies_20200615.json"'
regies_export = response.text
regies_json = json.loads(regies_export)
assert len(regies_json['regies']) == 2
regie1.delete()
assert Regie.objects.count() == 1
response = app.get(reverse('lingo-manager-invoicing-regie-import'))
response.form['config_json'] = Upload('export.json', regies_export.encode('utf-8'), 'application/json')
response = response.form.submit().follow()
assert urlparse(response.request.url).path == reverse('lingo-manager-invoicing-regie-list')
assert 'A regie was created. A regie was updated.' in response.text
assert Regie.objects.count() == 2
def test_non_invoiced_line_list(app, admin_user):
regie = Regie.objects.create(label='Regie')
other_regie = Regie.objects.create(label='Other Regie')

View File

@ -1,15 +1,7 @@
import copy
import datetime
import json
import os
import shutil
import sys
import tempfile
from io import StringIO
import pytest
from django.core.management import call_command
from django.utils.encoding import force_bytes
from lingo.agendas.models import Agenda, CheckType, CheckTypeGroup
from lingo.pricing.models import (
@ -20,20 +12,12 @@ from lingo.pricing.models import (
Pricing,
PricingCriteriaCategory,
)
from lingo.pricing.utils import import_site
from lingo.utils.misc import AgendaImportError
from lingo.pricing.utils import export_site, import_site
from lingo.utils.misc import LingoImportError
pytestmark = pytest.mark.django_db
def get_output_of_command(command, *args, **kwargs):
old_stdout = sys.stdout
output = sys.stdout = StringIO()
call_command(command, *args, **kwargs)
sys.stdout = old_stdout
return output.getvalue()
def test_import_export(app):
Agenda.objects.create(label='Foo Bar')
pricing = Pricing.objects.create(label='Foo')
@ -44,59 +28,16 @@ def test_import_export(app):
)
CriteriaCategory.objects.create(label='Foo bar')
output = get_output_of_command('export_pricing_config')
assert len(json.loads(output)['agendas']) == 1
assert len(json.loads(output)['pricings']) == 1
assert len(json.loads(output)['pricings'][0]['agendas']) == 0
assert len(json.loads(output)['pricing_models']) == 1
assert len(json.loads(output)['pricing_categories']) == 1
import_site(data={}, clean=True)
empty_output = get_output_of_command('export_pricing_config')
assert len(json.loads(empty_output)['agendas']) == 1
assert len(json.loads(empty_output)['pricings']) == 0
assert len(json.loads(empty_output)['pricing_models']) == 0
assert len(json.loads(empty_output)['pricing_categories']) == 0
old_stdin = sys.stdin
sys.stdin = StringIO(json.dumps({}))
pricing = Pricing.objects.create(label='Foo')
AgendaPricing.objects.create(
pricing=pricing,
date_start=datetime.date(year=2021, month=9, day=1),
date_end=datetime.date(year=2021, month=10, day=1),
)
CriteriaCategory.objects.create(label='Foo bar')
old_stdin = sys.stdin
sys.stdin = StringIO(json.dumps({}))
data = export_site()
assert len(data['agendas']) == 1
assert len(data['pricings']) == 1
assert len(data['pricings'][0]['agendas']) == 0
assert len(data['pricing_models']) == 1
assert len(data['pricing_categories']) == 1
import_site(data={})
assert AgendaPricing.objects.count() == 1
assert Pricing.objects.count() == 1
assert CriteriaCategory.objects.count() == 1
try:
call_command('import_pricing_config', '-', clean=True)
finally:
sys.stdin = old_stdin
assert AgendaPricing.objects.count() == 0
assert Pricing.objects.count() == 0
assert CriteriaCategory.objects.count() == 0
with tempfile.NamedTemporaryFile() as f:
f.write(force_bytes(output))
f.flush()
call_command('import_pricing_config', f.name)
assert AgendaPricing.objects.count() == 1
assert Pricing.objects.count() == 1
assert CriteriaCategory.objects.count() == 1
import_site(data={}, if_empty=True)
assert AgendaPricing.objects.count() == 1
assert Pricing.objects.count() == 1
assert CriteriaCategory.objects.count() == 1
import_site(data={}, clean=True)
tempdir = tempfile.mkdtemp('lingo-test')
empty_output = get_output_of_command('export_pricing_config', output=os.path.join(tempdir, 't.json'))
assert os.path.exists(os.path.join(tempdir, 't.json'))
shutil.rmtree(tempdir)
def test_import_export_agenda_pricing(app):
@ -112,31 +53,27 @@ def test_import_export_agenda_pricing(app):
},
)
agenda_pricing.agendas.set([agenda])
output = get_output_of_command('export_pricing_config')
import_site(data={}, clean=True)
assert Pricing.objects.count() == 0
data = json.loads(output)
data = export_site()
Agenda.objects.all().delete()
with pytest.raises(AgendaImportError) as excinfo:
import_site(data, overwrite=True)
with pytest.raises(LingoImportError) as excinfo:
import_site(data)
assert str(excinfo.value) == 'Missing "foo-bar" agenda'
agenda2 = Agenda.objects.create(label='Baz')
with pytest.raises(AgendaImportError) as excinfo:
import_site(data, overwrite=True)
with pytest.raises(LingoImportError) as excinfo:
import_site(data)
assert str(excinfo.value) == 'Missing "foo-bar" agenda'
del data['pricing_models']
Pricing.objects.all().delete()
agenda = Agenda.objects.create(label='Foo Bar')
with pytest.raises(AgendaImportError) as excinfo:
import_site(data, overwrite=True)
with pytest.raises(LingoImportError) as excinfo:
import_site(data)
assert str(excinfo.value) == 'Missing "foo" pricing model'
pricing = Pricing.objects.create(label='Foo')
import_site(data, overwrite=True)
import_site(data)
agenda_pricing = AgendaPricing.objects.latest('pk')
assert list(agenda_pricing.agendas.all()) == [agenda]
assert agenda_pricing.pricing == pricing
@ -189,15 +126,9 @@ def test_import_export_agenda_pricing_with_billing_dates(app):
label='Period 2',
)
output = get_output_of_command('export_pricing_config')
data = export_site()
import_site(data={}, clean=True)
assert Pricing.objects.count() == 0
assert AgendaPricing.objects.count() == 0
assert BillingDate.objects.count() == 0
data = json.loads(output)
import_site(data, overwrite=True)
import_site(data)
agenda_pricing = AgendaPricing.objects.latest('pk')
assert agenda_pricing.billingdates.count() == 2
billing_date1 = agenda_pricing.billingdates.all()[0]
@ -211,41 +142,37 @@ def test_import_export_agenda_pricing_with_billing_dates(app):
def test_import_export_agenda_with_check_types(app):
group = CheckTypeGroup.objects.create(label='foo')
agenda = Agenda.objects.create(label='Foo Bar', check_type_group=group)
output = get_output_of_command('export_pricing_config')
data = export_site()
import_site(data={}, clean=True)
assert CheckTypeGroup.objects.count() == 0
data = json.loads(output)
group.delete()
del data['check_type_groups']
agenda.check_type_group = None
agenda.save()
with pytest.raises(AgendaImportError) as excinfo:
import_site(data, overwrite=True)
with pytest.raises(LingoImportError) as excinfo:
import_site(data)
assert str(excinfo.value) == 'Missing "foo" check type group'
CheckTypeGroup.objects.create(label='foobar')
with pytest.raises(AgendaImportError) as excinfo:
import_site(data, overwrite=True)
with pytest.raises(LingoImportError) as excinfo:
import_site(data)
assert str(excinfo.value) == 'Missing "foo" check type group'
group = CheckTypeGroup.objects.create(label='foo')
import_site(data, overwrite=True)
import_site(data)
agenda.refresh_from_db()
assert agenda.check_type_group == group
def test_import_export_pricing_criteria_category(app):
output = get_output_of_command('export_pricing_config')
payload = json.loads(output)
payload = export_site()
assert len(payload['pricing_categories']) == 0
category = CriteriaCategory.objects.create(label='Foo bar')
Criteria.objects.create(label='Foo reason', category=category)
Criteria.objects.create(label='Baz', category=category)
output = get_output_of_command('export_pricing_config')
payload = json.loads(output)
payload = export_site()
assert len(payload['pricing_categories']) == 1
category.delete()
@ -280,27 +207,14 @@ def test_import_export_pricing_criteria_category(app):
assert Criteria.objects.get(category=category, label='Foo reason', slug='foo-reason')
assert Criteria.objects.get(category=category, label='Baz', slug='baz')
# with overwrite
Criteria.objects.create(category=category, label='Baz2')
import_site(copy.deepcopy(payload), overwrite=True)
assert CriteriaCategory.objects.count() == 2
category = CriteriaCategory.objects.latest('pk')
assert category.label == 'Foo bar'
assert category.slug == 'foo-bar'
assert category.criterias.count() == 2
assert Criteria.objects.get(category=category, label='Foo reason', slug='foo-reason')
assert Criteria.objects.get(category=category, label='Baz', slug='baz')
def test_import_export_pricing(app):
output = get_output_of_command('export_pricing_config')
payload = json.loads(output)
payload = export_site()
assert len(payload['pricing_models']) == 0
pricing = Pricing.objects.create(label='Foo bar', extra_variables={'foo': 'bar'})
output = get_output_of_command('export_pricing_config')
payload = json.loads(output)
payload = export_site()
assert len(payload['pricing_models']) == 1
pricing.delete()
@ -336,25 +250,21 @@ def test_import_export_pricing_with_categories(app):
category = CriteriaCategory.objects.create(label='Foo bar')
pricing.categories.add(category, through_defaults={'order': 42})
output = get_output_of_command('export_pricing_config')
import_site(data={}, clean=True)
assert Pricing.objects.count() == 0
assert CriteriaCategory.objects.count() == 0
data = json.loads(output)
data = export_site()
category.delete()
del data['pricing_categories']
with pytest.raises(AgendaImportError) as excinfo:
import_site(data, overwrite=True)
with pytest.raises(LingoImportError) as excinfo:
import_site(data)
assert str(excinfo.value) == 'Missing "foo-bar" pricing category'
CriteriaCategory.objects.create(label='Foobar')
with pytest.raises(AgendaImportError) as excinfo:
import_site(data, overwrite=True)
with pytest.raises(LingoImportError) as excinfo:
import_site(data)
assert str(excinfo.value) == 'Missing "foo-bar" pricing category'
category = CriteriaCategory.objects.create(label='Foo bar')
import_site(data, overwrite=True)
import_site(data)
pricing = Pricing.objects.get(slug=pricing.slug)
assert list(pricing.categories.all()) == [category]
assert PricingCriteriaCategory.objects.first().order == 42
@ -362,8 +272,7 @@ def test_import_export_pricing_with_categories(app):
category2 = CriteriaCategory.objects.create(label='Foo bar 2')
category3 = CriteriaCategory.objects.create(label='Foo bar 3')
pricing.categories.add(category2, through_defaults={'order': 1})
output = get_output_of_command('export_pricing_config')
data = json.loads(output)
data = export_site()
del data['pricing_categories']
data['pricing_models'][0]['categories'] = [
{
@ -377,7 +286,7 @@ def test_import_export_pricing_with_categories(app):
'criterias': [],
},
]
import_site(data, overwrite=True)
import_site(data)
assert list(pricing.categories.all()) == [category, category3]
assert list(
PricingCriteriaCategory.objects.filter(pricing=pricing).values_list('category', flat=True)
@ -405,8 +314,8 @@ def test_import_export_pricing_with_categories(app):
'criterias': [],
},
]
with pytest.raises(AgendaImportError) as excinfo:
import_site(data, overwrite=True)
with pytest.raises(LingoImportError) as excinfo:
import_site(data)
assert str(excinfo.value) == 'Missing "unknown" pricing criteria for "foo-bar-3" category'
# wrong criteria (from another category)
@ -422,8 +331,8 @@ def test_import_export_pricing_with_categories(app):
'criterias': [],
},
]
with pytest.raises(AgendaImportError) as excinfo:
import_site(data, overwrite=True)
with pytest.raises(LingoImportError) as excinfo:
import_site(data)
assert str(excinfo.value) == 'Missing "crit-1" pricing criteria for "foo-bar-3" category'
data['pricing_models'][0]['categories'] = [
@ -438,7 +347,7 @@ def test_import_export_pricing_with_categories(app):
'criterias': ['crit-1', 'crit-3'],
},
]
import_site(data, overwrite=True)
import_site(data)
assert list(pricing.categories.all()) == [category, category3]
assert list(
PricingCriteriaCategory.objects.filter(pricing=pricing).values_list('category', flat=True)
@ -451,16 +360,14 @@ def test_import_export_pricing_with_categories(app):
def test_import_export_check_type_group(app):
output = get_output_of_command('export_pricing_config')
payload = json.loads(output)
payload = export_site()
assert len(payload['check_type_groups']) == 0
group = CheckTypeGroup.objects.create(label='Foo bar')
CheckType.objects.create(label='Foo reason', group=group)
CheckType.objects.create(label='Baz', group=group)
output = get_output_of_command('export_pricing_config')
payload = json.loads(output)
payload = export_site()
assert len(payload['check_type_groups']) == 1
group.delete()
@ -494,14 +401,3 @@ def test_import_export_check_type_group(app):
assert group.check_types.count() == 2
assert CheckType.objects.get(group=group, label='Foo reason', slug='foo-reason')
assert CheckType.objects.get(group=group, label='Baz', slug='baz')
# with overwrite
CheckType.objects.create(group=group, label='Baz2')
import_site(copy.deepcopy(payload), overwrite=True)
assert CheckTypeGroup.objects.count() == 2
group = CheckTypeGroup.objects.latest('pk')
assert group.label == 'Foo bar'
assert group.slug == 'foo-bar'
assert group.check_types.count() == 2
assert CheckType.objects.get(group=group, label='Foo reason', slug='foo-reason')
assert CheckType.objects.get(group=group, label='Baz', slug='baz')