# -*- coding: utf-8 -*- # # zoo - versatile objects management # Copyright (C) 2016 Entr'ouvert # # This program is free software: you can redistribute it and/or modify it # under the terms of the GNU Affero General Public License as published # by the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import csv import io import itertools import time from django.template.response import TemplateResponse from django.views.generic import TemplateView from django.shortcuts import redirect, get_object_or_404 from django.http import Http404, FileResponse, HttpResponseRedirect from django.db.transaction import non_atomic_requests, atomic from django.db import connection from django.conf import settings from django.core.cache import cache from django.utils.timezone import now, six from django.contrib.auth.decorators import permission_required from django.contrib import messages from zoo.zoo_data.models import Entity from . import forms from .synchronize_federations import SynchronizeFederationsAction from .inactivity import Inactivity from . import utils class Demo(TemplateView): template_name = 'zoo_nanterre/demo.html' demo = Demo.as_view() class Search(TemplateView): template_name = 'zoo_nanterre/search.html' def get_context_data(self, **kwargs): ctx = super(Search, self).get_context_data(**kwargs) form = forms.SearchForm(data=self.request.GET or None) ctx['form'] = form ctx['results'] = list(form.results()) if form.is_valid() else [] return ctx class ImportControl(TemplateView): template_name = 'zoo_nanterre/import_control.html' def get_context_data(self, **kwargs): ctx = super(ImportControl, self).get_context_data(**kwargs) c = connection.cursor() c.execute("SELECT count(e.id) from zoo_data_entity as e , zoo_meta_entityschema as s where " " e.schema_id = s.id and s.slug = 'individu'") ctx['count'] = c.fetchone()[0] c.execute("SELECT count(e.id) from zoo_data_entity as e , zoo_meta_entityschema as s where " " e.schema_id = s.id and s.slug = 'individu' and e.content->>'statut_legal' = 'majeur'") ctx['majeurs'] = c.fetchone()[0] c.execute("SELECT count(e.id) from zoo_data_entity as e , zoo_meta_entityschema as s where " " e.schema_id = s.id and s.slug = 'individu' and e.content->>'statut_legal' = 'mineur'") ctx['mineurs'] = c.fetchone()[0] c.execute("SELECT count(e.content->'cles_de_federation'->'technocarte') from " "zoo_data_entity as e") ctx['federations_technocarte'] = c.fetchone()[0] c.execute("SELECT count(e.content->'cles_de_federation'->'infor') from " "zoo_data_entity as e") ctx['federations_infor'] = c.fetchone()[0] c.execute("SELECT count(e.content->'cles_de_federation'->'implicit') from " "zoo_data_entity as e") ctx['federations_implicit'] = c.fetchone()[0] c.execute("SELECT count(e.content->'cles_de_federation'->'saga') from " "zoo_data_entity as e") ctx['federations_saga'] = c.fetchone()[0] c.execute("SELECT count(e.content->'cles_de_federation'->'saga_tiers') from " "zoo_data_entity as e") ctx['federations_saga_tiers'] = c.fetchone()[0] return ctx search = non_atomic_requests(Search.as_view()) import_control = ImportControl.as_view() @permission_required('zoo_data.action1_entity') def synchronize_federations(request, model_admin, *args, **kwargs): jobs = SynchronizeFederationsAction.get_jobs() context = dict( model_admin.admin_site.each_context(request), title='Synchronises les applications', jobs=jobs, ) return TemplateResponse(request, "admin/zoo_data/entity/synchronize_federations.html", context) @permission_required('zoo_data.action1_entity') def synchronize_federations_report(request, job_id, model_admin, *args, **kwargs): jobs = SynchronizeFederationsAction.get_jobs() job = get_object_or_404(jobs, id=job_id) report = job.action.report if not report: raise Http404('no report') text_report = report if six.PY3: text_report = io.TextIOWrapper(text_report, encoding='utf-8') reader = csv.reader(text_report) next(reader) actions = [row for row in reader if row[6] != 'KEEP'] context = dict( model_admin.admin_site.each_context(request), title=job.created, job=job, csv=actions, csv_url=job.action.download_report_url, csv_filesize=report.size, ) return TemplateResponse(request, "admin/zoo_data/entity/synchronize_federations_report.html", context) @permission_required('zoo_data.action1_entity') def synchronize_federations_download_report(request, job_id, model_admin, *args, **kwargs): jobs = SynchronizeFederationsAction.get_jobs() job = get_object_or_404(jobs, id=job_id) report = job.action.report if not report: raise Http404('no report') return FileResponse(report, content_type='text/csv') @permission_required('zoo_data.action1_entity') def synchronize_federations_apply_report(request, job_id, model_admin, *args, **kwargs): jobs = SynchronizeFederationsAction.get_jobs() job = get_object_or_404(jobs, id=job_id) report = job.action.apply_report if not report: raise Http404('no report') text_report = report if six.PY3: text_report = io.TextIOWrapper(text_report, encoding='utf-8') reader = csv.reader(text_report) next(reader) actions = [row for row in reader if row[6] != 'KEEP'] context = dict( model_admin.admin_site.each_context(request), title=u'Application - %s' % job.created, job=job, csv=actions, csv_url=job.action.download_apply_report_url, csv_filesize=report.size, ) return TemplateResponse(request, "admin/zoo_data/entity/synchronize_federations_report.html", context) @permission_required('zoo_data.action1_entity') def synchronize_federations_download_apply_report(request, job_id, model_admin, *args, **kwargs): jobs = SynchronizeFederationsAction.get_jobs() job = get_object_or_404(jobs, id=job_id) report = job.action.apply_report if not report: raise Http404('no report') return FileResponse(report, content_type='text/csv') @permission_required('zoo_data.action1_entity') def synchronize_federations_add(request, model_admin, *args, **kwargs): if request.method == 'POST': form = forms.SynchronizeFederationsForm(request.POST, request.FILES) if form.is_valid(): form.save() return redirect('admin:synchronize-federations') else: form = forms.SynchronizeFederationsForm() context = dict( model_admin.admin_site.each_context(request), form=form, ) return TemplateResponse(request, "admin/zoo_data/entity/synchronize_federations_add.html", context) @permission_required('zoo_data.action1_entity') def synchronize_federations_apply(request, job_id, model_admin, **kwargs): if request.method == 'POST': jobs = SynchronizeFederationsAction.get_jobs() job = get_object_or_404(jobs, id=job_id) apply_report = job.action.apply_report if apply_report: raise Http404 job.action.set_apply(job) return redirect('admin:synchronize-federations') @permission_required('zoo_data.action1_entity') def synchronize_federations_delete(request, job_id, model_admin, *args, **kwargs): if request.method == 'POST': jobs = SynchronizeFederationsAction.get_jobs() job = get_object_or_404(jobs, id=job_id) job.action.delete() job.delete() return redirect('admin:synchronize-federations') def fiches_inactives(): inactivity = Inactivity( child_delay=getattr(settings, 'ZOO_NANTERRE_INACTIVITY_CHILD_DELAY', 365), # default 12 months adult_delay=getattr(settings, 'ZOO_NANTERRE_INACTIVITY_ADULT_DELAY', 182), # default 6 months ) fiches = [] for child in itertools.chain(inactivity.deletable_children, inactivity.deletable_adults): utils.PersonSearch.add_age(child) fiches.append({ 'id': child.id, 'prenoms': child.content['prenoms'], 'nom_de_naissance': child.content['nom_de_naissance'], 'nom_d_usage': child.content['nom_d_usage'], 'date_de_naissance': child.content['date_de_naissance'], 'statut_legal': child.content['statut_legal'], 'age': child.age_label, }) fiches.sort(key=lambda f: f['id']) return fiches @permission_required('zoo_data.action1_entity') @atomic def inactive_index(request, model_admin, *args, **kwargs): try: timestamp, fiches = cache.get('fiches-inactives') if 'recompute' in request.GET: cache.delete('fiches-inactives') return HttpResponseRedirect(request.path) # delete operation if 'delete' in request.GET and request.method == 'POST': new_timestamp = now() new_fiches = fiches_inactives() if fiches != new_fiches: messages.warning(request, u'Les fiches à supprimer ont changées.') cache.set('fiches-inactives', (new_timestamp, new_fiches), 365 * 24 * 3600) else: Entity.objects.filter(id__in=[fiche['id'] for fiche in fiches]).delete() messages.info(request, u'%d fiches ont été supprimées.' % len(fiches)) cache.delete('fiches-inactives') return HttpResponseRedirect(request.path) duration = None queries = None except (TypeError, ValueError): try: connection.force_debug_cursor = True start = time.time() fiches = fiches_inactives() queries = len(connection.queries_log) duration = time.time() - start timestamp = now() cache.set('fiches-inactives', (timestamp, fiches), 365 * 24 * 3600) finally: connection.force_debug_cursor = False # download csv export if 'csv' in request.GET: header = ['id', 'prenoms', 'nom_d_usage', 'nom_de_naissance', 'date_de_naissance', 'statut_legal', 'age'] def rows(): yield header for fiche in fiches: yield [fiche[key] for key in header] return utils.csv_export_response(rows(), 'fiches-inactives-%s.csv' % timestamp) context = dict( model_admin.admin_site.each_context(request), title='Fiches inactives à supprimer', fiches=fiches, timestamp=timestamp, duration=duration, queries=queries, child_delay=getattr(settings, 'ZOO_NANTERRE_INACTIVITY_CHILD_DELAY', 365), adult_delay=getattr(settings, 'ZOO_NANTERRE_INACTIVITY_ADULT_DELAY', 365), ) return TemplateResponse(request, "admin/zoo_data/entity/inactive_index.html", context)