lingo/lingo/invoicing/views.py

728 lines
25 KiB
Python

# lingo - payment and billing system
# Copyright (C) 2022 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import collections
import datetime
import json
from django.contrib import messages
from django.db import transaction
from django.db.models import CharField, Count, IntegerField, JSONField, OuterRef, Subquery, Value
from django.db.models.functions import Coalesce
from django.http import Http404, HttpResponse
from django.shortcuts import get_object_or_404, redirect
from django.urls import reverse, reverse_lazy
from django.utils.translation import gettext_lazy as _
from django.utils.translation import ngettext
from django.views.generic import (
CreateView,
DeleteView,
DetailView,
FormView,
ListView,
TemplateView,
UpdateView,
)
from lingo.agendas.models import Agenda
from lingo.invoicing.forms import (
CampaignForm,
DraftInvoiceFilterSet,
DraftInvoiceLineFilterSet,
InvoiceFilterSet,
InvoiceLineFilterSet,
RegieInvoiceFilterSet,
)
from lingo.invoicing.models import (
Campaign,
Counter,
DraftInvoice,
DraftInvoiceLine,
InjectedLine,
Invoice,
InvoiceLine,
Pool,
Regie,
RegieImportError,
)
from lingo.pricing.forms import ImportForm
def is_ajax(request):
return request.headers.get('x-requested-with') == 'XMLHttpRequest'
def import_regies(data):
results = collections.defaultdict(list)
with transaction.atomic():
regies = data.get('regies', [])
for regie in regies:
created, regie_obj = Regie.import_json(regie)
if created:
results['created'].append(regie_obj)
else:
results['updated'].append(regie_obj)
return results
class HomeView(TemplateView):
template_name = 'lingo/invoicing/manager_home.html'
home = HomeView.as_view()
class RegiesListView(ListView):
template_name = 'lingo/invoicing/manager_regie_list.html'
model = Regie
regies_list = RegiesListView.as_view()
class RegieAddView(CreateView):
template_name = 'lingo/invoicing/manager_regie_form.html'
model = Regie
fields = ['label', 'description', 'cashier_role']
def get_success_url(self):
return reverse('lingo-manager-invoicing-regie-detail', args=[self.object.pk])
regie_add = RegieAddView.as_view()
class RegieDetailView(DetailView):
template_name = 'lingo/invoicing/manager_regie_detail.html'
model = Regie
def get_context_data(self, **kwargs):
kwargs['regie'] = self.object
kwargs['agendas'] = Agenda.objects.filter(regie=self.object)
kwargs['campaigns'] = self.object.campaign_set.all().order_by('-date_start')
has_related_objects = False
if kwargs['campaigns']:
has_related_objects = True
elif self.object.injectedline_set.exists():
has_related_objects = True
kwargs['has_related_objects'] = has_related_objects
return super().get_context_data(**kwargs)
regie_detail = RegieDetailView.as_view()
class RegieEditView(UpdateView):
template_name = 'lingo/invoicing/manager_regie_form.html'
model = Regie
fields = ['label', 'description', 'cashier_role', 'counter_name', 'number_format']
def get_success_url(self):
return reverse('lingo-manager-invoicing-regie-detail', args=[self.object.pk])
regie_edit = RegieEditView.as_view()
class RegieDeleteView(DeleteView):
template_name = 'lingo/manager_confirm_delete.html'
model = Regie
def get_queryset(self):
return super().get_queryset().filter(campaign__isnull=True, injectedline__isnull=True)
def delete(self, request, *args, **kwargs):
self.object = self.get_object()
Counter.objects.filter(regie=self.object).delete()
return super().delete(request, *args, **kwargs)
def get_success_url(self):
return reverse('lingo-manager-invoicing-regie-list')
regie_delete = RegieDeleteView.as_view()
class RegiesExportView(ListView):
model = Regie
def get(self, request, *args, **kwargs):
response = HttpResponse(content_type='application/json')
today = datetime.date.today()
attachment = 'attachment; filename="export_regies_{}.json"'.format(today.strftime('%Y%m%d'))
response['Content-Disposition'] = attachment
json.dump({'regies': [regie.export_json() for regie in self.get_queryset()]}, response, indent=2)
return response
regies_export = RegiesExportView.as_view()
class RegiesImportView(FormView):
form_class = ImportForm
template_name = 'lingo/invoicing/manager_import.html'
success_url = reverse_lazy('lingo-manager-invoicing-regie-list')
def form_valid(self, form):
try:
config_json = json.loads(self.request.FILES['config_json'].read())
except ValueError:
form.add_error('config_json', _('File is not in the expected JSON format.'))
return self.form_invalid(form)
try:
results = import_regies(config_json)
except RegieImportError as exc:
form.add_error('config_json', '%s' % exc)
return self.form_invalid(form)
import_messages = {
'create': lambda x: ngettext(
'A regie was created.',
'%(count)d regies were created.',
x,
),
'update': lambda x: ngettext(
'A regie was updated.',
'%(count)d regie were updated.',
x,
),
}
create_message = _('No regie created.')
update_message = _('No regie updated.')
created = len(results.get('created', []))
updated = len(results.get('updated', []))
if created:
create_message = import_messages.get('create')(created) % {'count': created}
if updated:
update_message = import_messages.get('update')(updated) % {'count': updated}
message = "%s %s" % (create_message, update_message)
messages.info(self.request, message)
return super().form_valid(form)
regies_import = RegiesImportView.as_view()
class CampaignAddView(CreateView):
template_name = 'lingo/invoicing/manager_campaign_form.html'
model = Campaign
form_class = CampaignForm
def dispatch(self, request, *args, **kwargs):
self.regie = get_object_or_404(Regie, pk=kwargs['regie_pk'])
return super().dispatch(request, *args, **kwargs)
def get_context_data(self, **kwargs):
kwargs['regie'] = self.regie
return super().get_context_data(**kwargs)
def get_form_kwargs(self):
kwargs = super().get_form_kwargs()
kwargs['instance'] = Campaign(regie=self.regie)
return kwargs
def get_success_url(self):
return reverse('lingo-manager-invoicing-campaign-detail', args=[self.regie.pk, self.object.pk])
campaign_add = CampaignAddView.as_view()
class CampaignDetailView(DetailView):
template_name = 'lingo/invoicing/manager_campaign_detail.html'
model = Campaign
def dispatch(self, request, *args, **kwargs):
self.regie = get_object_or_404(Regie, pk=kwargs['regie_pk'])
return super().dispatch(request, *args, **kwargs)
def get_queryset(self):
return super().get_queryset().filter(regie=self.regie)
def get_context_data(self, **kwargs):
draft_lines = DraftInvoiceLine.objects.filter(pool=OuterRef('pk')).order_by().values('pool')
count_draft_error = draft_lines.filter(status='error').annotate(count=Count('pool')).values('count')
count_draft_warning = (
draft_lines.filter(status='warning').annotate(count=Count('pool')).values('count')
)
count_draft_success = (
draft_lines.filter(status='success').annotate(count=Count('pool')).values('count')
)
lines = InvoiceLine.objects.filter(pool=OuterRef('pk')).order_by().values('pool')
count_error = (
lines.filter(status='error', error_status='').annotate(count=Count('pool')).values('count')
)
count_warning = lines.filter(status='warning').annotate(count=Count('pool')).values('count')
count_success = lines.filter(status='success').annotate(count=Count('pool')).values('count')
kwargs['regie'] = self.regie
kwargs['pools'] = self.object.pool_set.annotate(
draft_error_count=Coalesce(Subquery(count_draft_error, output_field=IntegerField()), Value(0)),
draft_warning_count=Coalesce(
Subquery(count_draft_warning, output_field=IntegerField()), Value(0)
),
draft_success_count=Coalesce(
Subquery(count_draft_success, output_field=IntegerField()), Value(0)
),
error_count=Coalesce(Subquery(count_error, output_field=IntegerField()), Value(0)),
warning_count=Coalesce(Subquery(count_warning, output_field=IntegerField()), Value(0)),
success_count=Coalesce(Subquery(count_success, output_field=IntegerField()), Value(0)),
).order_by('-created_at')
kwargs['has_running_pool'] = any(p.status in ['registered', 'running'] for p in kwargs['pools'])
kwargs['has_real_pool'] = any(not p.draft for p in kwargs['pools'])
return super().get_context_data(**kwargs)
campaign_detail = CampaignDetailView.as_view()
class CampaignEditView(UpdateView):
template_name = 'lingo/invoicing/manager_campaign_form.html'
model = Campaign
form_class = CampaignForm
def dispatch(self, request, *args, **kwargs):
self.regie = get_object_or_404(Regie, pk=kwargs['regie_pk'])
return super().dispatch(request, *args, **kwargs)
def get_queryset(self):
return (
super()
.get_queryset()
.filter(regie=self.regie)
.exclude(pool__draft=False)
.exclude(pool__status__in=['registered', 'running'])
)
def get_context_data(self, **kwargs):
kwargs['regie'] = self.regie
return super().get_context_data(**kwargs)
def get_success_url(self):
return reverse('lingo-manager-invoicing-campaign-detail', args=[self.regie.pk, self.object.pk])
campaign_edit = CampaignEditView.as_view()
class CampaignDeleteView(DeleteView):
template_name = 'lingo/manager_confirm_delete.html'
model = Campaign
def dispatch(self, request, *args, **kwargs):
self.regie = get_object_or_404(Regie, pk=kwargs['regie_pk'])
return super().dispatch(request, *args, **kwargs)
def get_queryset(self):
return (
super()
.get_queryset()
.filter(regie=self.regie)
.exclude(pool__draft=False)
.exclude(pool__status__in=['registered', 'running'])
)
def delete(self, request, *args, **kwargs):
self.object = self.get_object()
DraftInvoiceLine.objects.filter(pool__campaign=self.object).delete()
DraftInvoice.objects.filter(pool__campaign=self.object).delete()
Pool.objects.filter(campaign=self.object).delete()
return super().delete(request, *args, **kwargs)
def get_success_url(self):
return '%s#open:campaigns' % reverse('lingo-manager-invoicing-regie-detail', args=[self.regie.pk])
campaign_delete = CampaignDeleteView.as_view()
class PoolDetailView(ListView):
template_name = 'lingo/invoicing/manager_pool_detail.html'
paginate_by = 100
def dispatch(self, request, *args, **kwargs):
self.regie = get_object_or_404(Regie, pk=kwargs['regie_pk'])
self.campaign = get_object_or_404(Campaign, pk=kwargs['pk'], regie=self.regie)
self.object = get_object_or_404(Pool, pk=kwargs['pool_pk'], campaign=self.campaign)
return super().dispatch(request, *args, **kwargs)
def get_queryset(self):
invoice_model = Invoice
filter_model = InvoiceFilterSet
if self.object.draft:
invoice_model = DraftInvoice
filter_model = DraftInvoiceFilterSet
data = self.request.GET or None
self.filterset = filter_model(
data=data,
queryset=invoice_model.objects.filter(pool=self.object).order_by('created_at'),
pool=self.object,
)
return self.filterset.qs
def get_context_data(self, **kwargs):
kwargs['regie'] = self.regie
kwargs['object'] = self.campaign
kwargs['pool'] = self.object
kwargs['filterset'] = self.filterset
line_model = InvoiceLine
line_values = ['status', 'error_status']
if self.object.draft:
line_model = DraftInvoiceLine
line_values = ['status']
all_lines = line_model.objects.filter(pool=self.object).values(*line_values)
self.object.error_count = len(
[line for line in all_lines if line['status'] == 'error' and not line.get('error_status')]
)
self.object.warning_count = len([line for line in all_lines if line['status'] == 'warning'])
self.object.success_count = len([line for line in all_lines if line['status'] == 'success'])
return super().get_context_data(**kwargs)
pool_detail = PoolDetailView.as_view()
class PoolJournalView(ListView):
template_name = 'lingo/invoicing/manager_pool_journal.html'
paginate_by = 100
def dispatch(self, request, *args, **kwargs):
self.regie = get_object_or_404(Regie, pk=kwargs['regie_pk'])
self.campaign = get_object_or_404(Campaign, pk=kwargs['pk'], regie=self.regie)
self.object = get_object_or_404(Pool, pk=kwargs['pool_pk'], campaign=self.campaign)
return super().dispatch(request, *args, **kwargs)
def get_queryset(self):
line_model = InvoiceLine
filter_model = InvoiceLineFilterSet
if self.object.draft:
line_model = DraftInvoiceLine
filter_model = DraftInvoiceLineFilterSet
all_lines = line_model.objects.filter(pool=self.object).order_by('pk').select_related('invoice')
self.object.error_count = len(
[line for line in all_lines if line.status == 'error' and not getattr(line, 'error_status', '')]
)
self.object.warning_count = len([line for line in all_lines if line.status == 'warning'])
self.object.success_count = len([line for line in all_lines if line.status == 'success'])
data = self.request.GET or None
self.filterset = filter_model(data=data, queryset=all_lines, pool=self.object)
return self.filterset.qs if data and [v for v in data.values() if v] else all_lines
def get_context_data(self, **kwargs):
kwargs['regie'] = self.regie
kwargs['object'] = self.campaign
kwargs['pool'] = self.object
kwargs['filterset'] = self.filterset
return super().get_context_data(**kwargs)
pool_journal = PoolJournalView.as_view()
class PoolAddView(FormView):
template_name = 'lingo/invoicing/manager_pool_add.html'
def dispatch(self, request, *args, **kwargs):
self.regie = get_object_or_404(Regie, pk=kwargs['regie_pk'])
self.object = get_object_or_404(
Campaign.objects.filter(regie=self.regie)
.exclude(pool__draft=False)
.exclude(pool__status__in=['registered', 'running']),
pk=kwargs['pk'],
)
return super().dispatch(request, *args, **kwargs)
def get_context_data(self, **kwargs):
kwargs['form'] = None
kwargs['regie'] = self.regie
kwargs['object'] = self.object
return super().get_context_data(**kwargs)
def post(self, request, *args, **kwargs):
self.object.generate()
return redirect(
'%s#open:pools'
% reverse('lingo-manager-invoicing-campaign-detail', args=[self.regie.pk, self.object.pk])
)
pool_add = PoolAddView.as_view()
class PoolPromoteView(FormView):
template_name = 'lingo/invoicing/manager_pool_promote.html'
def dispatch(self, request, *args, **kwargs):
self.regie = get_object_or_404(Regie, pk=kwargs['regie_pk'])
self.object = get_object_or_404(
Pool,
campaign__id=kwargs['pk'],
campaign__regie=self.regie,
pk=kwargs['pool_pk'],
draft=True,
status='completed',
)
if not self.object.is_last:
raise Http404
return super().dispatch(request, *args, **kwargs)
def get_context_data(self, **kwargs):
kwargs['form'] = None
kwargs['regie'] = self.regie
kwargs['object'] = self.object.campaign
kwargs['pool'] = self.object
return super().get_context_data(**kwargs)
def post(self, request, *args, **kwargs):
self.object.promote()
return redirect(
'%s#open:pools'
% reverse(
'lingo-manager-invoicing-campaign-detail', args=[self.regie.pk, self.object.campaign.pk]
)
)
pool_promote = PoolPromoteView.as_view()
class PoolDeleteView(DeleteView):
template_name = 'lingo/manager_confirm_delete.html'
model = Pool
pk_url_kwarg = 'pool_pk'
def dispatch(self, request, *args, **kwargs):
self.regie = get_object_or_404(Regie, pk=kwargs['regie_pk'])
self.campaign = get_object_or_404(
Campaign.objects.filter(regie=self.regie)
.exclude(pool__draft=False)
.exclude(pool__status__in=['registered', 'running']),
pk=kwargs['pk'],
)
return super().dispatch(request, *args, **kwargs)
def get_queryset(self):
return self.campaign.pool_set.filter(draft=True)
def delete(self, request, *args, **kwargs):
self.object = self.get_object()
DraftInvoiceLine.objects.filter(pool=self.object).delete()
DraftInvoice.objects.filter(pool=self.object).delete()
return super().delete(request, *args, **kwargs)
def get_success_url(self):
return '%s#open:pools' % reverse(
'lingo-manager-invoicing-campaign-detail', args=[self.regie.pk, self.campaign.pk]
)
pool_delete = PoolDeleteView.as_view()
class InvoiceLineListView(ListView):
template_name = 'lingo/invoicing/manager_invoice_lines.html'
def dispatch(self, request, *args, **kwargs):
self.regie = get_object_or_404(Regie, pk=kwargs['regie_pk'])
self.pool = get_object_or_404(
Pool, pk=kwargs['pool_pk'], campaign_id=kwargs['pk'], campaign__regie=self.regie
)
invoice_model = Invoice
if self.pool.draft:
invoice_model = DraftInvoice
self.invoice = get_object_or_404(invoice_model, pk=kwargs['invoice_pk'], pool=self.pool)
return super().dispatch(request, *args, **kwargs)
def get_queryset(self):
return self.invoice.lines.all().order_by('user_external_id', 'event_date', 'pk')
def get_context_data(self, **kwargs):
kwargs['regie'] = self.pool.campaign.regie
kwargs['object'] = self.pool.campaign
kwargs['pool'] = self.pool
return super().get_context_data(**kwargs)
invoice_line_list = InvoiceLineListView.as_view()
class LineSetErrorStatusView(DetailView):
model = InvoiceLine
pk_url_kwarg = 'line_pk'
template_name = 'lingo/invoicing/manager_line_detail_fragment.html'
def dispatch(self, request, *args, **kwargs):
self.regie = get_object_or_404(Regie, pk=kwargs['regie_pk'])
return super().dispatch(request, *args, **kwargs)
def get_queryset(self):
return (
super()
.get_queryset()
.filter(
status='error',
pool=self.kwargs['pool_pk'],
pool__campaign=self.kwargs['pk'],
pool__campaign__regie=self.regie,
)
)
def get_context_data(self, **kwargs):
kwargs['regie'] = self.regie
kwargs['object'] = self.object.pool.campaign
kwargs['pool'] = self.object.pool
kwargs['line'] = self.object
return super().get_context_data(**kwargs)
def get(self, request, *args, **kwargs):
self.object = self.get_object()
error_status = kwargs['status']
if error_status == 'reset':
self.object.error_status = ''
elif error_status == 'ignore':
self.object.error_status = 'ignored'
elif error_status == 'fix':
self.object.error_status = 'fixed'
else:
raise Http404
self.object.save()
if is_ajax(self.request):
context = self.get_context_data(object=self.object)
return self.render_to_response(context)
return redirect(
reverse(
'lingo-manager-invoicing-pool-journal', args=[self.regie.pk, kwargs['pk'], kwargs['pool_pk']]
)
)
line_set_error_status = LineSetErrorStatusView.as_view()
class NonInvoicedLineListView(ListView):
template_name = 'lingo/invoicing/manager_non_invoiced_line_list.html'
paginate_by = 100
def dispatch(self, request, *args, **kwargs):
self.regie = get_object_or_404(Regie, pk=kwargs['regie_pk'])
return super().dispatch(request, *args, **kwargs)
def get_queryset(self):
fields = [
'pk',
'event_date',
'slug',
'label',
'quantity',
'unit_amount',
'total_amount',
'user_external_id',
'payer_external_id',
'payer_first_name',
'payer_last_name',
'user_first_name',
'user_last_name',
'event',
'pricing_data',
'status',
'pool_id',
]
qs1 = InvoiceLine.objects.filter(
status='error', error_status='', pool__campaign__regie=self.regie
).values(*fields)
qs2 = (
InjectedLine.objects.filter(invoiceline__isnull=True, regie=self.regie)
.annotate(
user_first_name=Value('', output_field=CharField()),
user_last_name=Value('', output_field=CharField()),
event=Value({}, output_field=JSONField()),
pricing_data=Value({}, output_field=JSONField()),
status=Value('injected', output_field=CharField()),
pool_id=Value(0, output_field=IntegerField()),
)
.values(*fields)
)
qs = qs1.union(qs2).order_by('event_date', 'user_external_id', 'label', 'pk')
return qs
def get_context_data(self, **kwargs):
kwargs['regie'] = self.regie
context = super().get_context_data(**kwargs)
pools = Pool.objects.filter(draft=False).in_bulk()
for line in context['object_list']:
if line['status'] == 'error':
line['user_name'] = InvoiceLine(
user_first_name=line['user_first_name'], user_last_name=line['user_last_name']
).user_name
line['payer_name'] = InvoiceLine(
payer_first_name=line['payer_first_name'], payer_last_name=line['payer_last_name']
).payer_name
line['error_display'] = InvoiceLine(
status=line['status'], pricing_data=line['pricing_data']
).get_error_display()
line['campaign_id'] = pools[line['pool_id']].campaign_id
return context
non_invoiced_line_list = NonInvoicedLineListView.as_view()
class RegieInvoiceListView(ListView):
template_name = 'lingo/invoicing/manager_invoice_list.html'
paginate_by = 100
def dispatch(self, request, *args, **kwargs):
self.regie = get_object_or_404(Regie, pk=kwargs['regie_pk'])
return super().dispatch(request, *args, **kwargs)
def get_queryset(self):
self.filterset = RegieInvoiceFilterSet(
data=self.request.GET or None,
queryset=Invoice.objects.filter(regie=self.regie).order_by('created_at'),
)
return self.filterset.qs
def get_context_data(self, **kwargs):
kwargs['regie'] = self.regie
kwargs['filterset'] = self.filterset
return super().get_context_data(**kwargs)
regie_invoice_list = RegieInvoiceListView.as_view()
class RegieInvoiceLineListView(ListView):
template_name = 'lingo/invoicing/manager_invoice_lines.html'
def dispatch(self, request, *args, **kwargs):
self.regie = get_object_or_404(Regie, pk=kwargs['regie_pk'])
self.invoice = get_object_or_404(Invoice, pk=kwargs['invoice_pk'], regie=self.regie)
return super().dispatch(request, *args, **kwargs)
def get_queryset(self):
return self.invoice.lines.all().order_by('user_external_id', 'event_date', 'pk')
def get_context_data(self, **kwargs):
kwargs['regie'] = self.regie
return super().get_context_data(**kwargs)
regie_invoice_line_list = RegieInvoiceLineListView.as_view()