zoo/zoo/zoo_nanterre/utils.py

387 lines
13 KiB
Python

# -*- coding: utf-8 -*-
import re
import datetime
import isodate
import operator
import copy
import itertools
from django.conf import settings
from django.contrib.postgres.search import TrigramDistance
from django.db import connection
from django.db.models import Q, F, Value
from django.db.models.functions import Least, Greatest, Coalesce, Concat
from zoo.zoo_meta.models import EntitySchema
from zoo.zoo_data.models import Entity
from zoo.zoo_data.search import Unaccent, Lower, JSONTextRef, Normalize
today = datetime.date.today
def make_date(date_var):
'''Extract a date from a datetime, a date, a struct_time or a string'''
if isinstance(date_var, datetime.datetime):
return date_var.date()
if isinstance(date_var, datetime.date):
return date_var
return isodate.parse_date(date_var)
def date_delta(t1, t2):
'''Return the timedelta between two date like values'''
t1, t2 = make_date(t1), make_date(t2)
return t1 - t2
def age_in_years_and_months(born, today=None):
'''Compute age since today as the number of years and months elapsed'''
born = make_date(born)
if today is None:
today = datetime.date.today()
today = make_date(today)
before = (today.month, today.day) < (born.month, born.day)
years = today.year - born.year
months = today.month - born.month
if before:
years -= 1
months += 12
if today.day < born.day:
months -= 1
return years, months
def age_in_years(born, today=None):
'''Compute age since today as the number of years elapsed'''
return age_in_years_and_months(born, today=today)[0]
class PersonSearch(object):
EMAIL_RE = re.compile(
'^[a-zA-Z.+_-]*@[a-zA-Z.+_-]*$')
DATE_RE1 = re.compile(
'^(?:(?P<year>\d\d|\d\d\d\d)(?:-(?P<month>\d{1,2})(?:-(?P<day>\d{1,2}))?)?)$')
DATE_RE2 = re.compile(
'^(?:(?:(?:(?P<day>\d{1,2})/)?(?P<month>\d{1,2})/)?(?P<year>\d\d|\d\d\d\d))$')
@classmethod
def match_birthdate(cls, birthdate):
return cls.DATE_RE1.match(birthdate) or cls.DATE_RE2.match(birthdate)
@classmethod
def lu(cls, x):
return Normalize(x)
@classmethod
def luv(cls, x):
return cls.lu(Value(x))
@classmethod
def applications(cls):
return getattr(settings, 'ZOO_NANTERRE_APPLICATIONS', {}).iteritems()
def __init__(self, limit=0.5):
self.birthdates_filters = []
self.name_filters = []
self.name_similarities = []
self.email_similarities = []
self.key_filters = []
self.email_filters = []
self.schema = EntitySchema.objects.get(slug='individu')
self.limit = limit
def search_query(self, query):
'''Take a one line query and try to build a search filter from it'''
emails = []
identifiers = []
birthdates = []
names = []
parts = query.strip().split()
for part in parts:
part = part.strip()
if not part:
continue
if self.EMAIL_RE.match(part):
emails.append(part)
elif part.startswith('#'):
if part[1:]:
identifiers.append(part[1:])
elif self.match_birthdate(part):
birthdates.append(self.match_birthdate(part).groupdict())
else:
names.append(part)
for email in emails:
self = self.search_email(email)
for identifier in identifiers:
self = self.search_identifier(identifier)
for birthdate in birthdates:
self = self.search_birthdate(birthdate)
self = self.search_names(names)
return self
def search_email(self, email):
self = copy.deepcopy(self)
f = self.q_normalize('email', email)
self.email_filters.append(f)
self.email_similarities.append(Value(1.0) - self.distance('email', email))
return self
def search_identifier(self, identifier):
self = copy.deepcopy(self)
q = Q(id=identifier)
for key, name in self.applications():
q |= Q(**{'content__cles_de_federation__%s' % key: identifier})
self.key_filters.append(q)
return self
def search_birthdate(self, birthdate):
self = copy.deepcopy(self)
if hasattr(birthdate, 'keys'):
# case of dict
pass
elif hasattr(birthdate, 'year'):
# case of date or datetime
birthdate = {
'year': birthdate.year,
'month': birthdate.month,
'day': birthdate.day,
}
else:
# case of strings
birthdate = self.match_birthdate(birthdate).groupdict()
this_year = datetime.date.today().year % 100
year = int(birthdate['year'])
if year < 100:
if year > this_year:
year += 1900
else:
year += 2000
q = Q(content__date_de_naissance__timestamp__year=year)
if birthdate['month']:
q &= Q(content__date_de_naissance__timestamp__month=birthdate['month'])
if birthdate['day']:
q &= Q(content__date_de_naissance__timestamp__day=birthdate['day'])
self.birthdates_filters.append(q)
return self
@classmethod
def distance(cls, field, value):
return TrigramDistance(cls.lu(JSONTextRef(F('content'), field)), cls.luv(value))
@classmethod
def q_normalize(self, field, value):
return Q(**{'content__%s__normalize__trigram_similar' % field: self.luv(value)})
def search_name(self, first_name=None, last_name=None, factor=1.0, first_name_weight=1.0,
last_name_weight=1.0):
q = Q()
if not first_name or not last_name:
factor *= 0.8
if last_name:
q &= (self.q_normalize('nom_d_usage', last_name)
| self.q_normalize('nom_de_naissance', last_name))
if first_name:
q &= self.q_normalize('prenoms', first_name)
self.name_filters.append(q)
fname_d = self.distance('prenoms', first_name)
name_of_use_d = self.distance('nom_d_usage', last_name)
name_of_birth_d = self.distance('nom_de_naissance', last_name)
if first_name and last_name:
similarity = Value(first_name_weight) * fname_d
similarity += Value(last_name_weight) * Least(name_of_use_d, name_of_birth_d)
similarity /= Value(first_name_weight + last_name_weight)
elif first_name:
similarity = fname_d
else:
similarity = Least(name_of_use_d, name_of_birth_d)
similarity = (Value(1.0) - similarity) * Value(factor)
self.name_similarities.append(similarity)
return self
def search_names(self, names):
if not names:
return self
self = copy.deepcopy(self)
for i in range(0, len(names) + 1):
first_name, last_name = ' '.join(names[:i]), ' '.join(names[i:])
self = self.search_name(first_name, last_name)
if len(names) > 1:
self = self.search_name(last_name, first_name, factor=0.8)
return self
@classmethod
def or_filters(self, filters):
return reduce(operator.__or__, filters, Q())
@classmethod
def add_age(cls, individu):
birthdate = make_date(individu.content['date_de_naissance'])
if birthdate >= datetime.date.today():
age = u'à naître'
else:
individu.age = years, months = age_in_years_and_months(
individu.content['date_de_naissance'])
if (months, years) == (0, 0):
age = u'moins d\'un mois'
elif years < 1:
age = u'%s mois' % months
elif years < 2:
age = u'%s mois' % (months + 12)
else:
age = u'%s ans' % years
individu.age_label = age
@classmethod
def add_adresses(cls, individu):
individu.adresses = []
for relation in individu.left_relations.all():
if relation.schema.slug != 'habite':
continue
individu.adresses.append(relation.right.content)
individu.adresses[-1].update(relation.content)
individu.adresses.sort(key=lambda x: int(not x['facturation']))
@classmethod
def add_enfants(cls, individu):
enfants = []
for relation in individu.left_relations.all():
if relation.schema.slug != 'responsabilite-legale':
continue
enfant = relation.right
cls.add_age(enfant)
cls.add_federations(enfant)
enfant.responsabilite_legale = relation.content['statut']
enfants.append(enfant)
if enfants:
individu.enfants = enfants
@classmethod
def add_parents(cls, individu):
parents = []
for relation in individu.right_relations.all():
if relation.schema.slug != 'responsabilite-legale':
continue
parent = relation.left
cls.add_age(parent)
cls.add_federations(parent)
parent.responsabilite_legale = relation.content['statut']
parents.append(parent)
if parents:
individu.parents = parents
@classmethod
def add_union(cls, individu):
conjoint = None
for relation in individu.left_relations.all():
if relation.schema.slug != 'union':
continue
conjoint = relation.right
break
else:
for relation in individu.right_relations.all():
if relation.schema.slug != 'union':
continue
conjoint = relation.left
break
if conjoint:
cls.add_age(conjoint)
cls.add_federations(conjoint)
individu.union = conjoint
individu.union_statut = relation.content['statut']
@classmethod
def add_federations(cls, individu):
individu.federations = []
cles_de_federation = individu.content.get('cles_de_federation', {})
for federation_key, federation_name in cls.applications():
if cles_de_federation.get(federation_key):
individu.federations.append(federation_name)
def queryset(self, prefetch=True):
connection.cursor().execute('SELECT SET_LIMIT(0.3)')
qs = Entity.objects.filter(schema=self.schema)
qs = qs.filter(
self.or_filters(
self.birthdates_filters))
qs = qs.filter(
self.or_filters(self.key_filters))
qs = qs.filter(
self.or_filters(self.email_filters))
qs = qs.filter(
self.or_filters(self.name_filters))
qs = qs.annotate(
fullname=Concat(
Coalesce(
JSONTextRef(F('content'), 'nom_d_usage'),
JSONTextRef(F('content'), 'nom_de_naissance'),
Value(' ')
),
Value(' '),
JSONTextRef(F('content'), 'prenoms'))
)
# order by similarities or fullname
similarities = []
if self.name_similarities:
e = (Greatest(*self.name_similarities) if len(self.name_similarities) > 1
else self.name_similarities[0])
similarities.append(e)
if self.email_similarities:
e = (Greatest(*self.email_similarities) if len(self.email_similarities) > 1
else self.email_similarities[0])
similarities.append(e)
if similarities:
qs = qs.annotate(similarity=reduce(operator.__add__, similarities) /
Value(len(similarities)))
qs = qs.filter(similarity__gte=self.limit)
qs = qs.order_by('-similarity', 'fullname')
else:
qs = qs.order_by('fullname')
if prefetch:
qs = qs.prefetch_related(
'left_relations__schema', 'left_relations__right',
'right_relations__schema', 'right_relations__left',
)
return qs
def __getitem__(self, item):
if hasattr(item, 'start'):
return self.decorate_iter(self.queryset()[item.start:item.stop])
return self.decorate_individu(self.queryset()[item])
@classmethod
def decorate_individu(self, individu):
self.add_age(individu)
self.add_adresses(individu)
self.add_federations(individu)
self.add_enfants(individu)
self.add_parents(individu)
self.add_union(individu)
@classmethod
def decorate_iter(self, qs):
for individu in qs:
self.decorate_individu(individu)
yield individu
def __iter__(self):
return self.decorate_iter(self.queryset())