zoo/zoo/zoo_nanterre/synchronize_federations.py

310 lines
11 KiB
Python

# -*- coding: utf-8 -*-
#
# zoo - versatile objects management
# Copyright (C) 2018 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import csv
from django.core.files.storage import default_storage
from django.core.urlresolvers import reverse
from django.conf import settings
from django.db import DatabaseError
from django.db.transaction import atomic
from django.utils import six
from django.utils.encoding import force_bytes
from django.utils.six import StringIO
from zoo.zoo_meta.models import EntitySchema
from zoo.zoo_data.models import Job, Entity, Transaction, Log
from . import utils
class SynchronizeFederationsImport(object):
def __init__(self, action):
self.action = action
self.federations = []
self.errors = []
self.actions = []
self.keep_count = 0
self.new_count = 0
self.delete_count = 0
self.unknown_count = 0
def analyze(self):
self.federations = []
self.actions = []
self.keep_count = 0
self.new_count = 0
self.delete_count = 0
self.unknown_count = 0
max_federation = 0
for i, line in enumerate(default_storage.open(self.action.csv_filepath)):
line = line.strip()
self.federations.append(line)
try:
max_federation = max(max_federation, int(line))
except ValueError as e:
pass
self.federations.sort()
app_id = self.action.app_id
individus = Entity.objects.filter(
schema__slug=utils.INDIVIDU_ENT,
**{'content__cles_de_federation__%s__isnull' % app_id: False})
individus = individus.order_by('id')
federations_set = set(self.federations)
seen = set()
for individu in individus:
cle = individu.content['cles_de_federation'][app_id]
seen.add(cle)
if cle in federations_set:
action = 'KEEP'
self.keep_count += 1
elif cle.isdigit() and int(cle) > max_federation:
action = 'NEW'
self.new_count += 1
else:
action = 'DELETE'
self.delete_count += 1
self.actions.append((
individu.id,
individu.content['prenoms'],
individu.content['nom_de_naissance'],
individu.content['nom_d_usage'],
app_id,
cle,
action))
for federation in self.federations:
if federation not in seen:
self.unknown_count += 1
self.actions.append((
'',
'',
'',
'',
app_id,
federation,
'UNKNOWN'))
def analyze_and_report(self):
self.analyze()
self.report('report')
def report(self, target):
output_file = StringIO()
writer = csv.writer(output_file)
writer.writerow(['RSU ID', 'prenoms', 'nom de naissance',
'nom d\'usage', 'application', 'federation', 'action'])
for action in self.actions:
if six.PY3:
action = [v for v in action]
else:
action = [force_bytes(v) for v in action]
writer.writerow(action)
setattr(self.action, target + '_csv_filename',
self.action.csv_filename + '-report.csv')
setattr(self.action, target + '_csv_filepath',
default_storage.save(self.action.csv_filepath + '-report-.csv', output_file))
self.action.counts = {
'keep': self.keep_count,
'new': self.new_count,
'delete': self.delete_count,
'unknown': self.unknown_count,
}
def apply(self):
self.analyze()
deletes = {}
for row in self.actions:
entity_id, _, _, _, app_id, federation, action = row
assert app_id == self.action.app_id
if action != 'DELETE':
continue
deletes[entity_id] = federation
entities = Entity.objects.filter(id__in=deletes.keys())
app_name = utils.get_application(app_id)['name']
try:
with atomic():
transaction = Transaction.objects.create()
transaction.content = {
'action': 'synchronize-federation',
'job_id': self.action.job.id,
'app_id': app_id,
'federations_deleted': [],
}
logs = []
self.action.job.transaction = transaction
for entity in entities:
federation = deletes[entity.id]
if entity.content.get('cles_de_federation', {}).get(app_id) == federation:
del entity.content['cles_de_federation'][app_id]
transaction.content['federations_deleted'].append(
[entity.id, app_id, federation])
entity.modified = transaction
text = u'Suppression automatique de la fédération %s: %s' % (app_name, federation)
logs.append(
Log(transaction=transaction,
entity=entity,
content={
'text': text,
}))
entity.save()
else:
raise ValueError
transaction.save()
self.action.job.save()
Log.objects.bulk_create(logs)
except ValueError:
return False, 'atomic update failed'
except DatabaseError:
return False, 'concurrent update'
self.report('apply_report')
return True, None
class SynchronizeFederationsAction(object):
report_csv_filename = None
report_csv_filepath = None
apply_report_csv_filename = None
apply_report_csv_filepath = None
apply = False
errors = None
def __init__(self, app_id, csv_filename, csv_filepath,
report_csv_filename=None, report_csv_filepath=None, errors=None,
counts=None, apply_report_csv_filename=None, apply_report_csv_filepath=None, apply=False, **kwargs):
self.app_id = app_id
self.csv_filename = csv_filename
self.csv_filepath = csv_filepath
self.report_csv_filename = report_csv_filename
self.report_csv_filepath = report_csv_filepath
self.counts = counts
self.apply_report_csv_filename = apply_report_csv_filename
self.apply_report_csv_filepath = apply_report_csv_filepath
self.apply = apply
def to_json(self):
return {
'app_id': self.app_id,
'csv_filename': self.csv_filename,
'csv_filepath': self.csv_filepath,
'report_csv_filename': self.report_csv_filename,
'report_csv_filepath': self.report_csv_filepath,
'counts': self.counts,
'apply_report_csv_filename': self.apply_report_csv_filename,
'apply_report_csv_filepath': self.apply_report_csv_filepath,
'apply': self.apply,
}
@classmethod
def from_json(cls, d):
return cls(**d)
def do(self, job=None, **kwargs):
synchronize_federations_import = SynchronizeFederationsImport(self)
if self.apply:
result, errors = synchronize_federations_import.apply()
if not result:
self.errors = errors
return Job.STATE_ERROR
else:
synchronize_federations_import.analyze_and_report()
return Job.STATE_SUCCESS
@classmethod
def synchronize(cls, app_id, csv_uploaded):
csv_filepath = default_storage.save(
csv_uploaded.name, csv_uploaded)
self = cls(app_id, csv_uploaded.name, csv_filepath)
Job.create(self, do_later=getattr(settings, 'NANTERRE_SYNCHRONIZE_FEDERATIONS_DO_LATER', True))
@classmethod
def get_jobs(cls):
qs = Job.objects.by_action(cls)
return qs
@property
def report(self):
if not self.report_csv_filepath:
return None
try:
return default_storage.open(self.report_csv_filepath)
except IOError:
return None
def make_url(self, action, prefix):
stream = getattr(self, prefix)
if not stream:
return None
url_name = 'admin:synchronize-federations-%s-%s' % (action, prefix.replace('_', '-'))
filename = getattr(self, prefix + '_csv_filename')
return reverse(url_name, kwargs={
'job_id': self.job.id,
'filename': filename,
})
@classmethod
def __delete(self, path):
if not path:
return
if default_storage.exists(path):
try:
default_storage.delete(path)
except IOError:
pass
def delete(self):
self.__delete(self.csv_filepath)
self.__delete(self.report_csv_filepath)
self.__delete(self.apply_report_csv_filepath)
@property
def download_report_url(self):
return self.make_url('download', 'report')
@property
def report_url(self):
return self.make_url('show', 'report')
@property
def download_apply_report_url(self):
return self.make_url('download', 'apply_report')
@property
def apply_report_url(self):
return self.make_url('show', 'apply_report')
@property
def apply_report(self):
if not self.apply_report_csv_filepath:
return None
try:
return default_storage.open(self.apply_report_csv_filepath)
except IOError:
return None
def set_apply(self, job):
job.content['apply'] = True
job.state = job.STATE_TODO
job.save()
if not getattr(settings, 'NANTERRE_SYNCHRONIZE_FEDERATIONS_DO_LATER', True):
job.do()