# zoo - versatile objects management # Copyright (C) 2016 Entr'ouvert # # This program is free software: you can redistribute it and/or modify it # under the terms of the GNU Affero General Public License as published # by the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . from operator import __add__, __or__ import functools import logging import importlib import traceback import datetime from django.db import models, connection from django.db.models import F, Value from django.db.models.functions import Lower from django.db.models.query import QuerySet, Q from django.core.exceptions import ValidationError from django.urls import reverse from django.utils.translation import gettext_lazy as _ from django.utils.timezone import now from django.contrib.postgres.fields import JSONField from django.contrib.postgres.search import TrigramDistance from .search import Unaccent, JSONTextRef from zoo.zoo_meta.validators import schema_validator class Transaction(models.Model): created = models.DateTimeField( auto_now_add=True, verbose_name=_('created')) meta = JSONField( verbose_name=_('meta'), blank=True, null=True) content = JSONField( verbose_name=_('content'), blank=True, null=True) def __str__(self): return str(self.id) @classmethod def get_transaction(self): with connection.cursor() as cursor: cursor.execute('LOCK TABLE %s' % Transaction._meta.db_table) return Transaction.objects.create() class Meta: ordering = ('id',) verbose_name = _('transaction') verbose_name_plural = _('transactions') class EntityQuerySet(QuerySet): def content_search(self, schema, limit=0.3, **kwargs): qs = self qs = qs.filter(schema=schema) filters = [] connection.cursor().execute('SELECT SET_LIMIT(%s)', (limit,)) for key, value in kwargs.items(): filters.append(Q(**{ 'content__' + key + '__unaccent__lower__trigram_similar': Unaccent(Lower(Value(value))), })) qs = qs.filter(functools.reduce(__or__, filters)) expressions = [] ordering = [] for key, value in kwargs.items(): ordering.append(Lower(Unaccent(JSONTextRef(F('content'), *key.split('__'))))) expressions.append(TrigramDistance( Lower(Unaccent(JSONTextRef(F('content'), *key.split('__')))), Lower(Unaccent(Value(value))))) expression = functools.reduce(__add__, expressions) qs = qs.annotate(similarity=expression / len(kwargs)) qs = qs.order_by('similarity', *ordering) return qs class CommonData(models.Model): def clean(self): if self.schema: try: schema_validator(self.schema.schema)(self.content) except ValidationError as e: raise ValidationError({'content': e}) def __str__(self): return str(self.id) class Meta: abstract = True class Entity(CommonData): schema = models.ForeignKey( 'zoo_meta.EntitySchema', verbose_name=_('schema'), on_delete=models.CASCADE) created = models.ForeignKey( Transaction, blank=True, null=True, verbose_name=_('created'), related_name='created_entities', on_delete=models.CASCADE) modified = models.ForeignKey( Transaction, blank=True, null=True, verbose_name=_('modified'), related_name='modified_entities', on_delete=models.CASCADE) deleted = models.ForeignKey( Transaction, verbose_name=_('deleted'), blank=True, null=True, related_name='deleted_entities', on_delete=models.CASCADE) meta = JSONField( blank=True, null=True, verbose_name=_('meta')) content = JSONField( blank=True, null=False, verbose_name=_('content')) objects = EntityQuerySet.as_manager() class Meta: ordering = ('created',) verbose_name = _('entity') verbose_name_plural = _('entities') permissions = ( ("action1_entity", _("Can do action1 on entities")), ("action2_entity", _("Can do action2 on entities")), ("action3_entity", _("Can do action3 on entities")), ("action4_entity", _("Can do action4 on entities")), ("action5_entity", _("Can do action5 on entities")), ("action6_entity", _("Can do action6 on entities")), ("action7_entity", _("Can do action7 on entities")), ) class Relation(CommonData): schema = models.ForeignKey( 'zoo_meta.RelationSchema', verbose_name=_('schema'), on_delete=models.CASCADE) left = models.ForeignKey( 'Entity', verbose_name=_('left'), related_name='left_relations', on_delete=models.CASCADE) right = models.ForeignKey( 'Entity', verbose_name=_('right'), related_name='right_relations', on_delete=models.CASCADE) created = models.ForeignKey( Transaction, blank=True, null=True, verbose_name=_('created'), related_name='created_relations', on_delete=models.CASCADE) modified = models.ForeignKey( Transaction, blank=True, null=True, verbose_name=_('modified'), related_name='modified_relations', on_delete=models.CASCADE) deleted = models.ForeignKey( Transaction, verbose_name=_('deleted'), blank=True, null=True, related_name='deleted_relations', on_delete=models.CASCADE) meta = JSONField( blank=True, null=True, verbose_name=_('meta')) content = JSONField( blank=True, null=False, verbose_name=_('content')) class Meta: ordering = ('created',) verbose_name = _('relation') verbose_name_plural = _('relations') class Log(models.Model): entity = models.ForeignKey( 'Entity', verbose_name=_('entity'), on_delete=models.CASCADE) transaction = models.ForeignKey( 'Transaction', null=True, verbose_name=_('transaction'), on_delete=models.CASCADE) timestamp = models.DateTimeField( auto_now_add=True, db_index=True, verbose_name=_('timestamp')) content = JSONField( blank=True, null=True, verbose_name=_('content')) class Meta: ordering = ('-timestamp', 'id') verbose_name = _('log') verbose_name_plural = _('logs') class JobQuerySet(QuerySet): def todo(self): return self.filter(state__in=[self.model.STATE_TODO, self.model.STATE_ERROR]) def error(self): return self.filter(state__in=[self.model.STATE_ERROR, self.model.STATE_UNRECOVERABLE_ERROR]) def set_todo(self): return self.update(state=self.model.STATE_TODO) def set_unrecoverable_error(self): return self.update(state=self.model.STATE_UNRECOVERABLE_ERROR) def by_action(self, action_klass): class_path = self.model.get_classpath(action_klass) return self.filter(**{'content__$classpath': class_path}) class Job(models.Model): '''Store synchronization messages sent to applications''' SCHEDULER_STEP = 60 * 5 # 5 minutes STATE_TODO = 1 STATE_SUCCESS = 2 STATE_ERROR = 3 STATE_UNRECOVERABLE_ERROR = 4 STATES = [ (STATE_TODO, _('todo')), (STATE_SUCCESS, _('success')), (STATE_ERROR, _('error')), (STATE_UNRECOVERABLE_ERROR, _('unrecoverable error')), ] objects = JobQuerySet.as_manager() created = models.DateTimeField( verbose_name=_('created'), auto_now_add=True, db_index=True) modified = models.DateTimeField( verbose_name=_('sent'), auto_now=True, db_index=True) transaction = models.ForeignKey( 'Transaction', verbose_name='transaction', null=True, blank=True, on_delete=models.CASCADE) state = models.PositiveIntegerField( verbose_name=_('state'), choices=STATES, default=0) content = JSONField( verbose_name=_('content')) @classmethod def get_logger(cls): return logging.getLogger(__name__) @classmethod def get_classpath(cls, class_or_instance): if isinstance(class_or_instance, type): klass = class_or_instance else: klass = class_or_instance.__class__ module = klass.__module__ class_name = klass.__name__ return '%s.%s' % (module, class_name) @classmethod def create(cls, job_action, do_later=False, transaction=None): # conserve class path of the job_action to recreate it later class_path = cls.get_classpath(job_action) content = job_action.to_json() assert isinstance(content, dict), 'action must serialize to a dict' content['$classpath'] = class_path job = Job.objects.create(content=content, transaction=transaction) assert isinstance(job.content, dict), 'action must serialize to json' job.state = cls.STATE_TODO job_action.job = job try: if not do_later: job.state = job_action.do(job=job) job.content = job_action.to_json() job.content['$classpath'] = class_path except Exception as e: # action failed dramatically, do not retry job.state = cls.STATE_UNRECOVERABLE_ERROR error = job.content.setdefault('error', {}) error['code'] = 'internal-server-error' error['exc_detail'] = str(e) error['exc_tb'] = traceback.format_exc() job.get_logger().exception('exception during job %s', job.admin_url) job.save() return job @property def action(self): content = self.content.copy() classpath = content.pop('$classpath') module_name, klass_name = classpath.rsplit('.', 1) try: module = importlib.import_module(module_name) klass = getattr(module, klass_name) except: raise Exception('classpath %s unknown' % classpath) action = klass.from_json(content) action.job = self return action def do(self, log=True): action = self.action try: self.state = action.do(job=self) content = action.to_json() assert isinstance(content, dict), 'action must serialize to a dict' self.content = content except Exception as e: if log: url = self.admin_url self.get_logger().exception('exception during job %s', url) self.state = self.STATE_UNRECOVERABLE_ERROR self.content['$exc_detail'] = str(e) self.content['$exc_tb'] = traceback.format_exc() self.content['$classpath'] = self.get_classpath(action) self.save() @property def admin_url(self): return reverse('admin:zoo_data_job_change', args=[self.id]) @classmethod def redo(cls, timestamp=None): timestamp = timestamp or (now() - datetime.timedelta(seconds=cls.SCHEDULER_STEP)) for job in cls.objects.todo().filter(modified__lt=timestamp): # we do not log on retries, to prevent a storm of errors job.do(log=job.state == cls.STATE_TODO) def __str__(self): return '%s %s' % (self.content['$classpath'], self.id) @property def description(self): action = self.action if hasattr(action, 'description'): return action.description return u'' class Meta: verbose_name = _('job') verbose_name_plural = _('jobs') ordering = ('-created', '-modified')