zoo/zoo/zoo_meta/models.py

184 lines
6.6 KiB
Python

# zoo - versatile objects management
# Copyright (C) 2016 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from hashlib import md5
from django.apps import apps
from django.db import models, connection
from django.utils.encoding import force_bytes, force_text
from django.utils.translation import ugettext_lazy as _
from django.contrib.postgres.fields import JSONField
from .validators import validate_schema
class GetBySlugManager(models.Manager):
def get_by_natural_key(self, slug):
return self.get(slug=slug)
class CommonSchema(models.Model):
name = models.CharField(
max_length=64,
unique=True,
verbose_name=_('name'))
slug = models.SlugField(
max_length=64,
unique=True,
verbose_name=_('slug'))
schema = JSONField(
verbose_name=_('schema'),
validators=[validate_schema])
caption_template = models.TextField(
verbose_name=_('caption template'),
blank=True)
def __str__(self):
return self.name
def natural_key(self):
return (self.slug,)
def paths(self, schema=None):
schema = schema or self.schema
paths = []
if isinstance(schema, dict) and schema.get('type') == 'object':
if 'properties' not in schema:
return []
for key in schema['properties']:
subschema = schema['properties'][key]
t = subschema.get('type')
if t == 'object':
paths.extend(([key] + subpath, _type, _format)
for subpath, _type, _format in self.paths(subschema))
elif t is not None:
paths.append(([key], t, subschema.get('format')))
return paths
def path_to_sql_expr(self, path):
return ''.join('->\'%s\'' % elt for elt in path[:-1]) + '->>\'%s\'' % path[-1]
def rebuild_string_index(self, cursor, table, path):
expr = 'immutable_normalize((content%s))' % self.path_to_sql_expr(path)
key = md5(force_bytes(expr)).hexdigest()[:8]
sql = ('CREATE INDEX zoo_entity_%s_gin_%s_dynamic_idx ON %s USING gin ((%s) '
' gin_trgm_ops) WHERE schema_id = %s' % (key, self.id, table, expr, self.id))
cursor.execute(sql)
sql = ('CREATE INDEX zoo_entity_%s_gist_%s_dynamic_idx ON %s USING gist ((%s) '
' gist_trgm_ops) WHERE schema_id = %s' % (key, self.id, table, expr, self.id))
cursor.execute(sql)
def rebuild_string_date_time_index(self, cursor, table, path):
expr = 'immutable_date(content%s)' % self.path_to_sql_expr(path)
key = md5(force_bytes(expr)).hexdigest()[:8]
sql = ('CREATE INDEX zoo_entity_%s_%s_dynamic_idx ON %s (%s) '
'WHERE schema_id = %s' % (key, self.id, table, expr, self.id))
cursor.execute(sql)
def create_trigram_index(self, expr):
from zoo.zoo_data.models import Entity, Relation
if isinstance(self, EntitySchema):
table = Entity._meta.db_table
elif isinstance(self, RelationSchema):
table = Relation._meta.db_table
else:
raise NotImplementedError(self)
key = md5(force_bytes(expr)).hexdigest()[:8]
gin_sql = ('CREATE INDEX zoo_entity_%s_gin_%s_dynamic_idx ON %s USING gin ((%s) '
'gin_trgm_ops) WHERE schema_id = %s' % (key, self.id, table, expr, self.id))
gist_sql = ('CREATE INDEX zoo_entity_%s_gist_%s_dynamic_idx ON %s USING gist ((%s)'
' gist_trgm_ops) WHERE schema_id = %s' % (key, self.id, table, expr, self.id))
with connection.cursor() as cursor:
cursor.execute(gin_sql)
cursor.execute(gist_sql)
def rebuild_indexes(self):
from zoo.zoo_data.models import Entity
paths = self.paths()
table = Entity._meta.db_table
with connection.cursor() as cursor:
constraints = connection.introspection.get_constraints(cursor, table)
# drop existing indexes
for key in constraints:
if not key.endswith('_%s_dynamic_idx' % self.pk):
continue
cursor.execute('DROP INDEX IF EXISTS %s' % key)
# rebuild them
for path, _type, _format in paths:
if _format:
_format = _format.replace('-', '_')
m = getattr(self, 'rebuild_%s_%s_index' % (_type, _format), None)
if m is not None:
m(cursor, table, path)
continue
m = getattr(self, 'rebuild_%s_index' % _type, None)
if m is not None:
m(cursor, table, path)
# delegate index building to custom applications
for app in apps.get_app_configs():
if hasattr(app, 'zoo_rebuild_indexes'):
app.zoo_rebuild_indexes(self)
objects = GetBySlugManager()
def make_caption(self, value):
if self.caption_template:
try:
return eval(self.caption_template, {}, value.content)
except Exception as e:
return force_text(e)
return force_text(value.id)
class Meta:
abstract = True
class EntitySchema(CommonSchema):
class Meta:
ordering = ('name',)
verbose_name = _('entity schema')
verbose_name_plural = _('entity schemas')
class RelationSchema(CommonSchema):
left = models.ForeignKey(
EntitySchema,
verbose_name=_('left schema'),
related_name='+',
on_delete=models.CASCADE)
right = models.ForeignKey(
EntitySchema,
verbose_name=_('right schema'),
related_name='+',
on_delete=models.CASCADE)
is_symmetric = models.BooleanField(
default=False,
blank=True,
verbose_name=_('is symmetric'))
class Meta:
ordering = ('name',)
verbose_name = _('relation schema')
verbose_name_plural = _('relation schemas')