399 lines
12 KiB
Python
399 lines
12 KiB
Python
# zoo - versatile objects management
|
|
# Copyright (C) 2016 Entr'ouvert
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify it
|
|
# under the terms of the GNU Affero General Public License as published
|
|
# by the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
from operator import __add__, __or__
|
|
|
|
import functools
|
|
import logging
|
|
import importlib
|
|
import traceback
|
|
import datetime
|
|
|
|
from django.db import models, connection
|
|
from django.db.models import F, Value
|
|
from django.db.models.query import QuerySet, Q
|
|
from django.core.exceptions import ValidationError
|
|
from django.urls import reverse
|
|
from django.utils.encoding import force_text
|
|
from django.utils.six import python_2_unicode_compatible
|
|
from django.utils.translation import ugettext_lazy as _
|
|
from django.utils.timezone import now
|
|
from django.contrib.postgres.fields import JSONField
|
|
from django.contrib.postgres.search import TrigramDistance
|
|
|
|
|
|
from .search import Unaccent, Lower, JSONTextRef
|
|
from zoo.zoo_meta.validators import schema_validator
|
|
|
|
|
|
@python_2_unicode_compatible
|
|
class Transaction(models.Model):
|
|
created = models.DateTimeField(
|
|
auto_now_add=True,
|
|
verbose_name=_('created'))
|
|
meta = JSONField(
|
|
verbose_name=_('meta'),
|
|
blank=True,
|
|
null=True)
|
|
content = JSONField(
|
|
verbose_name=_('content'),
|
|
blank=True,
|
|
null=True)
|
|
|
|
def __str__(self):
|
|
return force_text(self.id)
|
|
|
|
@classmethod
|
|
def get_transaction(self):
|
|
with connection.cursor() as cursor:
|
|
cursor.execute('LOCK TABLE %s' % Transaction._meta.db_table)
|
|
return Transaction.objects.create()
|
|
|
|
class Meta:
|
|
ordering = ('id',)
|
|
verbose_name = _('transaction')
|
|
verbose_name_plural = _('transactions')
|
|
|
|
|
|
class EntityQuerySet(QuerySet):
|
|
def content_search(self, schema, limit=0.3, **kwargs):
|
|
qs = self
|
|
qs = qs.filter(schema=schema)
|
|
filters = []
|
|
connection.cursor().execute('SELECT SET_LIMIT(%s)', (limit,))
|
|
for key, value in kwargs.items():
|
|
filters.append(Q(**{
|
|
'content__' + key + '__unaccent__lower__trigram_similar':
|
|
Lower(Unaccent(Value(value))),
|
|
}))
|
|
qs = qs.filter(functools.reduce(__or__, filters))
|
|
expressions = []
|
|
ordering = []
|
|
for key, value in kwargs.items():
|
|
ordering.append(Lower(Unaccent(JSONTextRef(F('content'), *key.split('__')))))
|
|
expressions.append(TrigramDistance(
|
|
Lower(Unaccent(JSONTextRef(F('content'), *key.split('__')))),
|
|
Lower(Unaccent(Value(value)))))
|
|
expression = functools.reduce(__add__, expressions)
|
|
qs = qs.annotate(similarity=expression / len(kwargs))
|
|
qs = qs.order_by('similarity', *ordering)
|
|
return qs
|
|
|
|
|
|
@python_2_unicode_compatible
|
|
class CommonData(models.Model):
|
|
def clean(self):
|
|
if self.schema:
|
|
try:
|
|
schema_validator(self.schema.schema)(self.content)
|
|
except ValidationError as e:
|
|
raise ValidationError({'content': e})
|
|
|
|
def __str__(self):
|
|
return force_text(self.id)
|
|
|
|
class Meta:
|
|
abstract = True
|
|
|
|
|
|
class Entity(CommonData):
|
|
schema = models.ForeignKey(
|
|
'zoo_meta.EntitySchema',
|
|
verbose_name=_('schema'),
|
|
on_delete=models.CASCADE)
|
|
created = models.ForeignKey(
|
|
Transaction,
|
|
blank=True,
|
|
null=True,
|
|
verbose_name=_('created'),
|
|
related_name='created_entities',
|
|
on_delete=models.CASCADE)
|
|
modified = models.ForeignKey(
|
|
Transaction,
|
|
blank=True,
|
|
null=True,
|
|
verbose_name=_('modified'),
|
|
related_name='modified_entities',
|
|
on_delete=models.CASCADE)
|
|
deleted = models.ForeignKey(
|
|
Transaction,
|
|
verbose_name=_('deleted'),
|
|
blank=True,
|
|
null=True,
|
|
related_name='deleted_entities',
|
|
on_delete=models.CASCADE)
|
|
meta = JSONField(
|
|
blank=True,
|
|
null=True,
|
|
verbose_name=_('meta'))
|
|
content = JSONField(
|
|
blank=True,
|
|
null=False,
|
|
verbose_name=_('content'))
|
|
|
|
objects = EntityQuerySet.as_manager()
|
|
|
|
class Meta:
|
|
ordering = ('created',)
|
|
verbose_name = _('entity')
|
|
verbose_name_plural = _('entities')
|
|
permissions = (
|
|
("action1_entity", _("Can do action1 on entities")),
|
|
("action2_entity", _("Can do action2 on entities")),
|
|
("action3_entity", _("Can do action3 on entities")),
|
|
("action4_entity", _("Can do action4 on entities")),
|
|
("action5_entity", _("Can do action5 on entities")),
|
|
("action6_entity", _("Can do action6 on entities")),
|
|
("action7_entity", _("Can do action7 on entities")),
|
|
)
|
|
|
|
|
|
class Relation(CommonData):
|
|
schema = models.ForeignKey(
|
|
'zoo_meta.RelationSchema',
|
|
verbose_name=_('schema'),
|
|
on_delete=models.CASCADE)
|
|
left = models.ForeignKey(
|
|
'Entity',
|
|
verbose_name=_('left'),
|
|
related_name='left_relations',
|
|
on_delete=models.CASCADE)
|
|
right = models.ForeignKey(
|
|
'Entity',
|
|
verbose_name=_('right'),
|
|
related_name='right_relations',
|
|
on_delete=models.CASCADE)
|
|
created = models.ForeignKey(
|
|
Transaction,
|
|
blank=True,
|
|
null=True,
|
|
verbose_name=_('created'),
|
|
related_name='created_relations',
|
|
on_delete=models.CASCADE)
|
|
modified = models.ForeignKey(
|
|
Transaction,
|
|
blank=True,
|
|
null=True,
|
|
verbose_name=_('modified'),
|
|
related_name='modified_relations',
|
|
on_delete=models.CASCADE)
|
|
deleted = models.ForeignKey(
|
|
Transaction,
|
|
verbose_name=_('deleted'),
|
|
blank=True,
|
|
null=True,
|
|
related_name='deleted_relations',
|
|
on_delete=models.CASCADE)
|
|
meta = JSONField(
|
|
blank=True,
|
|
null=True,
|
|
verbose_name=_('meta'))
|
|
content = JSONField(
|
|
blank=True,
|
|
null=False,
|
|
verbose_name=_('content'))
|
|
|
|
class Meta:
|
|
ordering = ('created',)
|
|
verbose_name = _('relation')
|
|
verbose_name_plural = _('relations')
|
|
|
|
|
|
class Log(models.Model):
|
|
entity = models.ForeignKey(
|
|
'Entity',
|
|
verbose_name=_('entity'),
|
|
on_delete=models.CASCADE)
|
|
transaction = models.ForeignKey(
|
|
'Transaction',
|
|
null=True,
|
|
verbose_name=_('transaction'),
|
|
on_delete=models.CASCADE)
|
|
timestamp = models.DateTimeField(
|
|
auto_now_add=True,
|
|
db_index=True,
|
|
verbose_name=_('timestamp'))
|
|
content = JSONField(
|
|
blank=True,
|
|
null=True,
|
|
verbose_name=_('content'))
|
|
|
|
class Meta:
|
|
ordering = ('-timestamp', 'id')
|
|
verbose_name = _('log')
|
|
verbose_name_plural = _('logs')
|
|
|
|
|
|
class JobQuerySet(QuerySet):
|
|
def todo(self):
|
|
return self.filter(state__in=[self.model.STATE_TODO, self.model.STATE_ERROR])
|
|
|
|
def error(self):
|
|
return self.filter(state__in=[self.model.STATE_ERROR, self.model.STATE_UNRECOVERABLE_ERROR])
|
|
|
|
def set_todo(self):
|
|
return self.update(state=self.model.STATE_TODO)
|
|
|
|
def set_unrecoverable_error(self):
|
|
return self.update(state=self.model.STATE_UNRECOVERABLE_ERROR)
|
|
|
|
def by_action(self, action_klass):
|
|
class_path = self.model.get_classpath(action_klass)
|
|
return self.filter(**{'content__$classpath': class_path})
|
|
|
|
|
|
@python_2_unicode_compatible
|
|
class Job(models.Model):
|
|
'''Store synchronization messages sent to applications'''
|
|
SCHEDULER_STEP = 60 * 5 # 5 minutes
|
|
|
|
STATE_TODO = 1
|
|
STATE_SUCCESS = 2
|
|
STATE_ERROR = 3
|
|
STATE_UNRECOVERABLE_ERROR = 4
|
|
|
|
STATES = [
|
|
(STATE_TODO, _('todo')),
|
|
(STATE_SUCCESS, _('success')),
|
|
(STATE_ERROR, _('error')),
|
|
(STATE_UNRECOVERABLE_ERROR, _('unrecoverable error')),
|
|
]
|
|
|
|
objects = JobQuerySet.as_manager()
|
|
|
|
created = models.DateTimeField(
|
|
verbose_name=_('created'),
|
|
auto_now_add=True,
|
|
db_index=True)
|
|
|
|
modified = models.DateTimeField(
|
|
verbose_name=_('sent'),
|
|
auto_now=True,
|
|
db_index=True)
|
|
|
|
transaction = models.ForeignKey(
|
|
'Transaction',
|
|
verbose_name='transaction',
|
|
null=True,
|
|
blank=True,
|
|
on_delete=models.CASCADE)
|
|
|
|
state = models.PositiveIntegerField(
|
|
verbose_name=_('state'),
|
|
choices=STATES,
|
|
default=0)
|
|
|
|
content = JSONField(
|
|
verbose_name=_('content'))
|
|
|
|
@classmethod
|
|
def get_logger(cls):
|
|
return logging.getLogger(__name__)
|
|
|
|
@classmethod
|
|
def get_classpath(cls, class_or_instance):
|
|
if isinstance(class_or_instance, type):
|
|
klass = class_or_instance
|
|
else:
|
|
klass = class_or_instance.__class__
|
|
module = klass.__module__
|
|
class_name = klass.__name__
|
|
return '%s.%s' % (module, class_name)
|
|
|
|
@classmethod
|
|
def create(cls, job_action, do_later=False, transaction=None):
|
|
# conserve class path of the job_action to recreate it later
|
|
class_path = cls.get_classpath(job_action)
|
|
content = job_action.to_json()
|
|
assert isinstance(content, dict), 'action must serialize to a dict'
|
|
content['$classpath'] = class_path
|
|
job = Job.objects.create(content=content, transaction=transaction)
|
|
assert isinstance(job.content, dict), 'action must serialize to json'
|
|
job.state = cls.STATE_TODO
|
|
job_action.job = job
|
|
try:
|
|
if not do_later:
|
|
job.state = job_action.do(job=job)
|
|
job.content = job_action.to_json()
|
|
job.content['$classpath'] = class_path
|
|
except Exception as e:
|
|
# action failed dramatically, do not retry
|
|
job.state = cls.STATE_UNRECOVERABLE_ERROR
|
|
error = job.content.setdefault('error', {})
|
|
error['code'] = 'internal-server-error'
|
|
error['exc_detail'] = force_text(e)
|
|
error['exc_tb'] = traceback.format_exc()
|
|
job.get_logger().exception('exception during job %s', job.admin_url)
|
|
job.save()
|
|
return job
|
|
|
|
@property
|
|
def action(self):
|
|
content = self.content.copy()
|
|
classpath = content.pop('$classpath')
|
|
module_name, klass_name = classpath.rsplit('.', 1)
|
|
try:
|
|
module = importlib.import_module(module_name)
|
|
klass = getattr(module, klass_name)
|
|
except:
|
|
raise Exception('classpath %s unknown' % classpath)
|
|
action = klass.from_json(content)
|
|
action.job = self
|
|
return action
|
|
|
|
def do(self, log=True):
|
|
action = self.action
|
|
try:
|
|
self.state = action.do(job=self)
|
|
content = action.to_json()
|
|
assert isinstance(content, dict), 'action must serialize to a dict'
|
|
self.content = content
|
|
except Exception as e:
|
|
if log:
|
|
url = self.admin_url
|
|
self.get_logger().exception('exception during job %s', url)
|
|
self.state = self.STATE_UNRECOVERABLE_ERROR
|
|
self.content['$exc_detail'] = force_text(e)
|
|
self.content['$exc_tb'] = traceback.format_exc()
|
|
self.content['$classpath'] = self.get_classpath(action)
|
|
self.save()
|
|
|
|
@property
|
|
def admin_url(self):
|
|
return reverse('admin:zoo_data_job_change', args=[self.id])
|
|
|
|
@classmethod
|
|
def redo(cls, timestamp=None):
|
|
timestamp = timestamp or (now() - datetime.timedelta(seconds=cls.SCHEDULER_STEP))
|
|
for job in cls.objects.todo().filter(modified__lt=timestamp):
|
|
# we do not log on retries, to prevent a storm of errors
|
|
job.do(log=job.state == cls.STATE_TODO)
|
|
|
|
def __str__(self):
|
|
return '%s %s' % (self.content['$classpath'], self.id)
|
|
|
|
@property
|
|
def description(self):
|
|
action = self.action
|
|
if hasattr(action, 'description'):
|
|
return action.description
|
|
return u''
|
|
|
|
class Meta:
|
|
verbose_name = _('job')
|
|
verbose_name_plural = _('jobs')
|
|
ordering = ('-created', '-modified')
|