1167 lines
46 KiB
Python
1167 lines
46 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
from __future__ import unicode_literals
|
|
|
|
from collections import OrderedDict
|
|
import contextlib
|
|
import datetime
|
|
import copy
|
|
import itertools
|
|
import os
|
|
import json
|
|
import hashlib
|
|
from .utils import Whatever
|
|
import psycopg2
|
|
import psycopg2.errorcodes
|
|
|
|
from cached_property import cached_property
|
|
from wcs_olap.wcs_api import WcsApiError
|
|
|
|
psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
|
|
psycopg2.extensions.register_type(psycopg2.extensions.UNICODEARRAY)
|
|
|
|
|
|
def truncate_pg_identifier(identifier, hash_length=6, force_hash=False):
|
|
if len(identifier) < 64 and not force_hash:
|
|
return identifier
|
|
else:
|
|
# insert hash in the middle, to keep some readability
|
|
return (
|
|
identifier[:(63 - hash_length) // 2]
|
|
+ hashlib.md5(identifier.encode('utf-8')).hexdigest()[:hash_length]
|
|
+ identifier[-(63 - hash_length) // 2:])
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def ignore_undefined_object_or_table():
|
|
try:
|
|
yield
|
|
except psycopg2.ProgrammingError as e:
|
|
if e.pgcode not in [psycopg2.errorcodes.UNDEFINED_TABLE, psycopg2.errorcodes.UNDEFINED_OBJECT]:
|
|
raise
|
|
|
|
|
|
def quote(name):
|
|
return '"%s"' % name
|
|
|
|
|
|
def slugify(s):
|
|
return s.replace('-', '_').replace(' ', '_')
|
|
|
|
|
|
class Context(object):
|
|
def __init__(self):
|
|
self.stack = []
|
|
|
|
def push(self, d):
|
|
self.stack.append(d)
|
|
|
|
def pop(self):
|
|
self.stack = self.stack[:-1]
|
|
|
|
def as_dict(self):
|
|
r = {}
|
|
for d in self.stack:
|
|
r.update(d)
|
|
return r
|
|
|
|
|
|
class WcsOlapFeeder(object):
|
|
|
|
channels = [
|
|
[1, 'web', 'web'],
|
|
[2, 'mail', 'courrier'],
|
|
[3, 'phone', 'téléphone'],
|
|
[4, 'counter', 'guichet'],
|
|
[5, 'backoffice', 'backoffice'],
|
|
[6, 'email', 'email'],
|
|
[7, 'fax', 'fax'],
|
|
[8, 'social-network', 'réseau social'],
|
|
]
|
|
channel_to_id = dict((c[1], c[0]) for c in channels)
|
|
id_to_channel = dict((c[0], c[1]) for c in channels)
|
|
|
|
status = [
|
|
[1, 'Nouveau'],
|
|
[2, 'En cours'],
|
|
[3, 'Terminé'],
|
|
]
|
|
status_to_id = dict((c[1], c[0]) for c in channels)
|
|
id_to_status = dict((c[0], c[1]) for c in channels)
|
|
|
|
def __init__(self, api, pg_dsn, schema, logger=None, config=None, do_feed=True, fake=False, slugs=None):
|
|
self.api = api
|
|
self.slugs = slugs
|
|
self.fake = fake
|
|
self.logger = logger or Whatever()
|
|
if len(schema) > 63:
|
|
raise ValueError('schema name length must < 64 characters: %r' % schema)
|
|
self.schema = schema
|
|
self.schema_temp = truncate_pg_identifier(schema + '_temp')
|
|
self.do_feed = do_feed
|
|
self.ctx = Context()
|
|
self.ctx.push({
|
|
'schema': self.schema,
|
|
'schema_temp': self.schema_temp,
|
|
'role_table': 'role',
|
|
'channel_table': 'channel',
|
|
'category_table': 'category',
|
|
'form_table': 'formdef',
|
|
'generic_formdata_table': 'formdata',
|
|
'generic_status_table': 'status',
|
|
'generic_evolution_table': 'evolution',
|
|
'year_table': 'year',
|
|
'month_table': 'month',
|
|
'day_table': 'day',
|
|
'dow_table': 'dow',
|
|
'hour_table': 'hour',
|
|
'agent_table': 'agent',
|
|
})
|
|
self.config = config or {}
|
|
self.model = {
|
|
'label': self.config.get('cubes_label', schema),
|
|
'name': schema,
|
|
'search_path': [schema, 'public'],
|
|
'pg_dsn': pg_dsn,
|
|
'cubes': [],
|
|
}
|
|
if 'cubes_slug' in self.config:
|
|
self.model['slug'] = self.config['cubes_slug']
|
|
cube = {
|
|
'name': 'all_formdata',
|
|
'label': 'Tous les formulaires',
|
|
'fact_table': 'formdata',
|
|
'key': 'id',
|
|
'joins': [
|
|
{
|
|
'name': 'receipt_time',
|
|
'table': 'dates',
|
|
'detail': 'date',
|
|
'master': 'receipt_time',
|
|
'kind': 'right',
|
|
},
|
|
{
|
|
'name': 'channel',
|
|
'table': 'channel',
|
|
'master': 'channel_id',
|
|
'detail': 'id',
|
|
'kind': 'left',
|
|
},
|
|
{
|
|
'name': 'formdef',
|
|
'table': 'formdef',
|
|
'master': 'formdef_id',
|
|
'detail': 'id',
|
|
'kind': 'left',
|
|
},
|
|
{
|
|
'name': 'category',
|
|
'table': 'category',
|
|
'master': 'formdef.category_id',
|
|
'detail': 'id',
|
|
'kind': 'left',
|
|
},
|
|
{
|
|
'name': 'hour',
|
|
'table': 'hour',
|
|
'master': 'hour_id',
|
|
'detail': 'id',
|
|
'kind': 'right',
|
|
},
|
|
{
|
|
'name': 'generic_status',
|
|
'table': 'status',
|
|
'master': 'generic_status_id',
|
|
'detail': 'id',
|
|
'kind': 'left',
|
|
},
|
|
{
|
|
'name': 'agent',
|
|
'table': 'agent',
|
|
'master': 'first_agent_id',
|
|
'detail': 'id',
|
|
'kind': 'left',
|
|
},
|
|
],
|
|
'dimensions': [
|
|
{
|
|
'name': 'receipt_time',
|
|
'label': 'date de la demande',
|
|
'join': ['receipt_time'],
|
|
'type': 'date',
|
|
'value': 'receipt_time.date',
|
|
},
|
|
{
|
|
'name': 'channel',
|
|
'label': 'canal',
|
|
'join': ['channel'],
|
|
'type': 'integer',
|
|
'value': 'channel.id',
|
|
'value_label': 'channel.label',
|
|
},
|
|
{
|
|
'name': 'category',
|
|
'label': 'catégorie',
|
|
'join': ['formdef', 'category'],
|
|
'type': 'string',
|
|
'value': 'category.ref',
|
|
'value_label': 'category.label',
|
|
'order_by': 'category.label',
|
|
},
|
|
{
|
|
'name': 'formdef',
|
|
'label': 'formulaire',
|
|
'join': ['formdef'],
|
|
'type': 'string',
|
|
'value': 'formdef.ref',
|
|
'value_label': 'formdef.label',
|
|
'order_by': 'formdef.label',
|
|
},
|
|
{
|
|
'name': 'generic_status',
|
|
'label': 'statut simplifié',
|
|
'join': ['generic_status'],
|
|
'type': 'integer',
|
|
'value': 'generic_status.id',
|
|
'value_label': 'generic_status.label',
|
|
},
|
|
{
|
|
'name': 'hour',
|
|
'label': 'heure',
|
|
'join': ['hour'],
|
|
'type': 'integer',
|
|
'value': 'hour.id',
|
|
'filter': False,
|
|
},
|
|
{
|
|
'name': 'agent',
|
|
'label': 'premier agent traitant',
|
|
'join': ['agent'],
|
|
'type': 'integer',
|
|
'value': 'agent.id',
|
|
'value_label': 'agent.label',
|
|
'order_by': 'agent.label',
|
|
},
|
|
],
|
|
'measures': [
|
|
{
|
|
'name': 'count',
|
|
'label': 'nombre de demandes',
|
|
'type': 'integer',
|
|
'expression': 'count({fact_table}.id)',
|
|
},
|
|
{
|
|
'name': 'avg_endpoint_delay',
|
|
'label': 'délai de traitement moyen',
|
|
'type': 'duration',
|
|
'expression': 'avg(endpoint_delay)',
|
|
},
|
|
{
|
|
'name': 'max_endpoint_delay',
|
|
'label': 'délai de traitement maximum',
|
|
'type': 'duration',
|
|
'expression': 'max(endpoint_delay)',
|
|
},
|
|
{
|
|
'name': 'min_endpoint_delay',
|
|
'label': 'délai de traitement minimum',
|
|
'type': 'duration',
|
|
'expression': 'min(endpoint_delay)',
|
|
},
|
|
{
|
|
'name': 'percent',
|
|
'label': 'pourcentage des demandes',
|
|
'type': 'percent',
|
|
"expression": 'case (select count({fact_table}.id) from {table_expression} '
|
|
'where {where_conditions}) when 0 then null else '
|
|
'count({fact_table}.id) * 100. / (select '
|
|
'count({fact_table}.id) from {table_expression} where '
|
|
'{where_conditions}) end',
|
|
},
|
|
{
|
|
'name': 'geolocation',
|
|
'label': 'localisation géographique',
|
|
'type': 'point',
|
|
'expression': 'array_agg({fact_table}.geolocation_base) '
|
|
'FILTER (WHERE {fact_table}.geolocation_base IS NOT NULL)',
|
|
}
|
|
]
|
|
}
|
|
self.model['cubes'].append(cube)
|
|
self.base_cube = self.model['cubes'][0]
|
|
self.agents_mapping = {}
|
|
self.formdata_json_index = []
|
|
# keep at end of __init__ to prevent leak if __init__ raises
|
|
self.connection = psycopg2.connect(dsn=pg_dsn)
|
|
self.connection.autocommit = True
|
|
self.cur = self.connection.cursor()
|
|
|
|
try:
|
|
self.has_jsonb = self.detect_jsonb()
|
|
if self.has_jsonb:
|
|
cube['json_field'] = 'json_data'
|
|
except Exception:
|
|
self.connection.close()
|
|
raise
|
|
|
|
@cached_property
|
|
def formdefs(self):
|
|
return [formdef for formdef in self.api.formdefs if not self.slugs or formdef.slug in self.slugs]
|
|
|
|
@cached_property
|
|
def roles(self):
|
|
return self.api.roles
|
|
|
|
@cached_property
|
|
def categories(self):
|
|
return self.api.categories
|
|
|
|
def detect_jsonb(self):
|
|
self.cur.execute("SELECT 1 FROM pg_type WHERE typname = 'jsonb'")
|
|
return bool(self.cur.rowcount)
|
|
|
|
def hash_table_name(self, table_name, hash_length=6, force_hash=False):
|
|
table_name = table_name.format(**self.default_ctx)
|
|
return truncate_pg_identifier(table_name, hash_length=hash_length, force_hash=force_hash)
|
|
|
|
@property
|
|
def default_ctx(self):
|
|
return self.ctx.as_dict()
|
|
|
|
def ex(self, query, ctx=None, vars=None):
|
|
ctx = ctx or {}
|
|
ctx.update(self.default_ctx)
|
|
sql = query.format(**(ctx or {}))
|
|
self.logger.debug('SQL: %s VARS: %s', sql, vars)
|
|
self.cur.execute(sql, vars=vars)
|
|
|
|
def do_schema(self):
|
|
self.ex('SET search_path = public')
|
|
self.logger.debug('dropping schema %s', self.schema_temp)
|
|
self.drop_tables_sequencially(self.schema_temp)
|
|
self.ex('DROP SCHEMA IF EXISTS {schema_temp} CASCADE')
|
|
self.logger.debug('creating schema %s', self.schema)
|
|
self.ex('CREATE SCHEMA {schema_temp}')
|
|
self.ex('SET search_path = {schema_temp},public')
|
|
|
|
def drop_tables_sequencially(self, schema):
|
|
"""
|
|
Drop tables one by one in order to avoid reaching max_locks_per_transaction
|
|
"""
|
|
# drop foreign key constraints first
|
|
self.ex("SELECT table_name, constraint_name FROM "
|
|
"information_schema.key_column_usage "
|
|
"WHERE table_schema = %s AND constraint_name LIKE '%%_fkey'", vars=[schema])
|
|
for table_name, constraint_name in self.cur.fetchall():
|
|
# drop of PK constraints can have effects on FK constraint on other tables.
|
|
with ignore_undefined_object_or_table():
|
|
self.ex('ALTER TABLE %s.%s DROP CONSTRAINT IF EXISTS %s CASCADE'
|
|
% (quote(schema), quote(table_name), quote(constraint_name)))
|
|
# remove others
|
|
self.ex("SELECT table_name, constraint_name FROM "
|
|
"information_schema.key_column_usage "
|
|
"WHERE table_schema = %s", vars=[schema])
|
|
for table_name, constraint_name in self.cur.fetchall():
|
|
# drop of PK constraints can have effects on FK constraint on other tables.
|
|
with ignore_undefined_object_or_table():
|
|
self.ex('ALTER TABLE %s.%s DROP CONSTRAINT IF EXISTS %s CASCADE'
|
|
% (quote(schema), quote(table_name), quote(constraint_name)))
|
|
# then drop indexes
|
|
self.ex("SELECT tablename, indexname FROM pg_indexes WHERE schemaname = %s", vars=[schema])
|
|
for table_name, index_name in self.cur.fetchall():
|
|
with ignore_undefined_object_or_table():
|
|
self.ex('DROP INDEX %s.%s CASCADE' % (quote(schema), quote(index_name)))
|
|
|
|
# finally drop tables, cascade will have no effect
|
|
self.ex("SELECT tablename FROM pg_tables WHERE schemaname = %s ORDER BY tablename DESC", vars=[schema])
|
|
for table in self.cur.fetchall():
|
|
tablename = '%s.%s' % (quote(schema), quote(table[0]))
|
|
self.ex('DROP TABLE IF EXISTS %s;' % tablename)
|
|
|
|
def do_dates_table(self):
|
|
self.ex('CREATE TABLE IF NOT EXISTS public.dates (date date, day text, month text)')
|
|
self.ex('CREATE INDEX IF NOT EXISTS dates_index ON public.dates (date)')
|
|
self.ex('SELECT MIN(date) FROM public.dates')
|
|
max_date = self.cur.fetchone()[0]
|
|
|
|
first_date = max_date or datetime.date(2010, 1, 1)
|
|
last_date = (datetime.date.today() + datetime.timedelta(days=150)).replace(month=12, day=31)
|
|
self.ex('''INSERT INTO public.dates SELECT
|
|
the_date.the_date::date AS date,
|
|
to_char(the_date.the_date, 'TMday') AS day,
|
|
to_char(the_date.the_date, 'TMmonth') AS month
|
|
FROM generate_series(%s::date, %s::date, '1 day'::interval)
|
|
AS the_date(the_date)
|
|
LEFT JOIN public.dates AS dates
|
|
ON the_date.the_date = dates.date
|
|
WHERE dates.date IS NULL''',
|
|
vars=[first_date, last_date])
|
|
|
|
def create_table(self, name, columns, inherits=None, comment=None):
|
|
sql = 'CREATE TABLE %s' % quote(name)
|
|
sql += '(' + ', '.join('%s %s' % (quote(n), t) for n, t in columns) + ')'
|
|
if inherits:
|
|
sql += ' INHERITS (%s)' % quote(inherits)
|
|
self.ex(sql)
|
|
if comment:
|
|
self.ex('COMMENT ON TABLE %s IS %%s' % quote(name), vars=(comment,))
|
|
|
|
def prev_table_exists(self, name):
|
|
query = """SELECT EXISTS (SELECT 1 FROM information_schema.tables
|
|
WHERE table_schema = '{schema}' AND table_name = %s)"""
|
|
self.ex(query, vars=(name,))
|
|
return self.cur.fetchone()[0]
|
|
|
|
def update_table_sequence_number(self, name):
|
|
self.ex("""SELECT setval(pg_get_serial_sequence('{name}', 'id'),
|
|
(SELECT GREATEST(1, MAX(id)) FROM {name}))""", ctx={'name': quote(name)})
|
|
|
|
def create_labeled_table_serial(self, name, comment):
|
|
self.create_table(
|
|
name, [['id', 'serial primary key'], ['label', 'varchar']], comment=comment)
|
|
|
|
if self.prev_table_exists(name):
|
|
# Insert data from previous table
|
|
self.ex(
|
|
'INSERT INTO {schema_temp}.{name} SELECT * FROM {schema}.{name}',
|
|
ctx={'name': quote(name)}
|
|
)
|
|
self.update_table_sequence_number(name)
|
|
|
|
def create_referenced_table(self, name, fields, comment):
|
|
# add primary key and reference fields
|
|
new_fields = [['id', 'serial primary key'], ['ref', 'varchar UNIQUE']] + fields
|
|
self.create_table(name, new_fields, comment=comment)
|
|
|
|
|
|
def do_referenced_data(self, name, data, result_column, update_column='label'):
|
|
to_insert = []
|
|
|
|
for item in data:
|
|
ref = item[0]
|
|
self.ex(
|
|
'SELECT ref, {column} FROM {name} WHERE ref = %s',
|
|
ctx={'name': quote(name), 'column': quote(update_column)},
|
|
vars=(ref,))
|
|
if self.cur.fetchall():
|
|
for item in self.cur.fetchall():
|
|
self.ex('UPDATE {name} SET {column}=%s WHERE ref=%s',
|
|
ctx={'name': quote(name), 'column': quote(update_column)},
|
|
vars=[item[1], ref])
|
|
else:
|
|
to_insert.append(item)
|
|
if to_insert:
|
|
columns_values = ', '.join(['%s' for x in range(len(item))])
|
|
tmpl = ', '.join(['(DEFAULT, %s)' % columns_values] * len(data))
|
|
|
|
query = 'INSERT INTO {name} VALUES %s' % tmpl
|
|
self.ex(query, ctx={'name': quote(name)}, # 'column': quote(update_column)},
|
|
vars=list(itertools.chain(*to_insert)))
|
|
|
|
result = {}
|
|
self.ex('SELECT id, {column} FROM {name}', ctx={'name': quote(name),
|
|
'column': result_column})
|
|
for _id, column in self.cur.fetchall():
|
|
result[column] = _id
|
|
return result
|
|
|
|
|
|
def create_labeled_table(self, name, labels, comment=None):
|
|
self.create_table(
|
|
name,
|
|
[
|
|
['id', 'integer primary key'],
|
|
['label', 'varchar']
|
|
], comment=comment)
|
|
|
|
if self.prev_table_exists(name):
|
|
# Insert data from previous table
|
|
self.ex(
|
|
'INSERT INTO {schema_temp}.{name} select * FROM {schema}.{name}',
|
|
ctx={'name': quote(name)}
|
|
)
|
|
# Find what is missing
|
|
to_insert = []
|
|
for _id, _label in labels:
|
|
self.ex(
|
|
'SELECT * FROM {name} WHERE label = %s', ctx={'name': quote(name)}, vars=(_label,))
|
|
if self.cur.fetchone() is None:
|
|
to_insert.append(_label)
|
|
|
|
labels = None
|
|
if to_insert:
|
|
self.ex('SELECT MAX(id) FROM {name}', ctx={'name': quote(name)})
|
|
next_id = (self.cur.fetchone()[0] or 0) + 1
|
|
ids = range(next_id, next_id + len(to_insert))
|
|
labels = zip(ids, to_insert)
|
|
|
|
if labels is not None:
|
|
# turn enumerate generator object into a proper list
|
|
labels = list(labels)
|
|
|
|
if labels:
|
|
tmpl = ', '.join(['(%s, %s)'] * len(labels))
|
|
query_str = 'INSERT INTO {name} (id, label) VALUES %s' % tmpl
|
|
self.ex(query_str, ctx={'name': quote(name)}, vars=list(itertools.chain(*labels)))
|
|
|
|
res = {}
|
|
self.ex('SELECT id, label FROM {name}', ctx={'name': quote(name)})
|
|
for id_, label in self.cur.fetchall():
|
|
res[label] = id_
|
|
return res
|
|
|
|
def do_category_table(self):
|
|
fields = [['label', 'varchar']]
|
|
table_name = self.default_ctx['category_table']
|
|
self.create_referenced_table(table_name, fields, 'catégorie')
|
|
categories_data = [(c.slug, c.title) for c in self.categories]
|
|
tmp_cat_map = self.do_referenced_data(table_name, categories_data, 'label')
|
|
self.update_table_sequence_number(table_name)
|
|
# remap categories ids to ids in the table
|
|
return dict((c.title, tmp_cat_map[c.title]) for c in self.categories)
|
|
|
|
def do_formdef_table(self):
|
|
categories_mapping = self.do_category_table()
|
|
|
|
formdef_fields = [['category_id', 'integer REFERENCES {category_table} (id)'],
|
|
['label', 'varchar']
|
|
]
|
|
table_name = self.default_ctx['form_table']
|
|
self.create_referenced_table(table_name, formdef_fields, 'types de formulaire')
|
|
|
|
formdefs = [(form.slug, categories_mapping.get(form.schema.category),
|
|
form.schema.name) for form in self.formdefs if not form.is_empty]
|
|
self.formdefs_mapping = self.do_referenced_data(table_name, formdefs, 'ref')
|
|
self.update_table_sequence_number(table_name)
|
|
|
|
def do_base_table(self):
|
|
# channels
|
|
self.create_labeled_table('channel', [[c[0], c[2]] for c in self.channels],
|
|
comment='canal')
|
|
|
|
# roles
|
|
roles = dict((i, role.name) for i, role in enumerate(self.roles))
|
|
tmp_role_map = self.create_labeled_table('role', roles.items(), comment='role')
|
|
self.role_mapping = dict(
|
|
(role.id, tmp_role_map[role.name]) for role in self.roles)
|
|
|
|
# forms
|
|
self.do_formdef_table()
|
|
|
|
self.create_labeled_table('hour', zip(range(0, 24), map(str, range(0, 24))),
|
|
comment='heures')
|
|
|
|
self.create_labeled_table('status', self.status,
|
|
comment='statuts simplifiés')
|
|
|
|
# agents
|
|
self.create_labeled_table_serial('agent', comment='agents')
|
|
|
|
self.columns = [
|
|
['id', 'serial primary key'],
|
|
['formdef_id', 'smallint REFERENCES {form_table} (id)'],
|
|
['receipt_time', 'date'],
|
|
['hour_id', 'smallint REFERENCES {hour_table} (id)'],
|
|
['channel_id', 'smallint REFERENCES {channel_table} (id)'],
|
|
['backoffice', 'boolean'],
|
|
['generic_status_id', 'smallint REFERENCES {generic_status_table} (id)'],
|
|
['endpoint_delay', 'interval'],
|
|
['first_agent_id', 'smallint REFERENCES {agent_table} (id)'],
|
|
['geolocation_base', 'POINT NULL'],
|
|
]
|
|
if self.has_jsonb:
|
|
self.columns.append(['json_data', 'JSONB NULL'])
|
|
self.comments = {
|
|
'formdef_id': 'formulaire',
|
|
'receipt_time': 'date de réception',
|
|
'hour_id': 'heure',
|
|
'channel_id': 'canal',
|
|
'backoffice': 'soumission backoffce',
|
|
'generic_status_id': 'statut simplifié',
|
|
'endpoint_delay': 'délai de traitement',
|
|
'geolocation_base': 'position géographique',
|
|
}
|
|
self.create_table('{generic_formdata_table}', self.columns)
|
|
for at, comment in self.comments.items():
|
|
self.ex('COMMENT ON COLUMN {generic_formdata_table}.%s IS %%s' % at, vars=(comment,))
|
|
self.ex('COMMENT ON TABLE {generic_formdata_table} IS %s', vars=('tous les formulaires',))
|
|
# evolutions
|
|
self.create_table('{generic_evolution_table}', [
|
|
['id', 'serial primary key'],
|
|
['generic_status_id', 'smallint REFERENCES {generic_status_table} (id)'],
|
|
['formdata_id', 'integer'], # "REFERENCES {generic_formdata_table} (id)" is impossible because FK constraints do not work on inherited tables
|
|
['time', 'timestamp'],
|
|
['date', 'date'],
|
|
['hour_id', 'smallint REFERENCES {hour_table} (id)'],
|
|
])
|
|
self.ex('COMMENT ON TABLE {generic_evolution_table} IS %s', vars=('evolution générique',))
|
|
|
|
def feed(self):
|
|
try:
|
|
if self.do_feed:
|
|
self.do_schema()
|
|
self.do_dates_table()
|
|
self.do_base_table()
|
|
for formdef in self.formdefs:
|
|
if formdef.is_empty:
|
|
continue
|
|
self.api.cache = {}
|
|
try:
|
|
formdef_feeder = WcsFormdefFeeder(self, formdef, do_feed=self.do_feed)
|
|
formdef_feeder.feed()
|
|
except WcsApiError as e:
|
|
self.logger.error('failed to retrieve formdef %s, %s', formdef.slug, e)
|
|
except Exception:
|
|
# keep temporary schema alive for debugging
|
|
raise
|
|
else:
|
|
if self.do_feed:
|
|
if not self.fake:
|
|
self.logger.debug('dropping schema %s', self.schema)
|
|
self.drop_tables_sequencially(self.schema)
|
|
self.ex('DROP SCHEMA IF EXISTS {schema} CASCADE')
|
|
self.logger.debug('renaming schema %s to %s', self.schema + '_temp', self.schema)
|
|
self.ex('ALTER SCHEMA {schema_temp} RENAME TO {schema}')
|
|
|
|
if 'cubes_model_dirs' in self.config:
|
|
model_path = os.path.join(self.config['cubes_model_dirs'], '%s.model' % self.schema)
|
|
with open(model_path, 'w') as f:
|
|
json.dump(self.model, f, indent=2, sort_keys=True)
|
|
finally:
|
|
# prevent connection from remaining open
|
|
self.cur.close()
|
|
self.connection.close()
|
|
|
|
def insert_agent(self, name):
|
|
self.ex('SELECT id FROM {agent_table} WHERE label = %s', vars=(name,))
|
|
res = self.cur.fetchone()
|
|
if res:
|
|
return res[0]
|
|
self.ex('INSERT INTO {agent_table} (label) VALUES (%s) RETURNING (id)', vars=(name,))
|
|
return self.cur.fetchone()[0]
|
|
|
|
def get_agent(self, user):
|
|
assert user.name
|
|
assert user.id
|
|
if user.id not in self.agents_mapping:
|
|
self.agents_mapping[user.id] = self.insert_agent(user.name)
|
|
return self.agents_mapping[user.id]
|
|
|
|
def create_formdata_json_index(self, table_name, varname):
|
|
if varname in self.formdata_json_index:
|
|
return
|
|
index_name = self.hash_table_name('%s_%s_json_idx' % (table_name, varname), hash_length=8,
|
|
force_hash=True)
|
|
self.ex('CREATE INDEX {index_name} ON {generic_formdata_table} (("json_data"->>%s))',
|
|
ctx={'index_name': quote(index_name)}, vars=[varname])
|
|
# prevent double creation
|
|
self.formdata_json_index.append(varname)
|
|
|
|
|
|
def has_digits_validation(field):
|
|
return field.validation and field.validation.get('type') == 'digits'
|
|
|
|
|
|
class WcsFormdefFeeder(object):
|
|
def __init__(self, olap_feeder, formdef, do_feed=True):
|
|
self.olap_feeder = olap_feeder
|
|
self.formdef = formdef
|
|
self.status_mapping = {}
|
|
self.items_mappings = {}
|
|
self.do_feed = do_feed
|
|
self.fields = []
|
|
self.item_id_mappings = {}
|
|
|
|
@property
|
|
def table_name(self):
|
|
return self.hash_table_name('formdata_%s' % self.formdef.slug.replace('-', '_'))
|
|
|
|
@property
|
|
def status_table_name(self):
|
|
return self.hash_table_name('status_%s' % self.formdef.slug.replace('-', '_'))
|
|
|
|
@property
|
|
def evolution_table_name(self):
|
|
return self.hash_table_name('evolution_%s' % self.formdef.slug.replace('-', '_'))
|
|
|
|
def __getattr__(self, name):
|
|
return getattr(self.olap_feeder, name)
|
|
|
|
def do_statuses(self):
|
|
statuses = self.formdef.schema.workflow.statuses
|
|
tmp_status_map = self.olap_feeder.create_labeled_table(
|
|
self.status_table_name, enumerate([s.name for s in statuses]),
|
|
comment='statuts du formulaire « %s »' % self.formdef.schema.name)
|
|
self.status_mapping = dict((s.id, tmp_status_map[s.name]) for s in statuses)
|
|
|
|
def do_data_table(self):
|
|
columns = OrderedDict()
|
|
columns['status_id'] = {'sql_col_name': 'status_id', 'sql_col_def': 'smallint REFERENCES {status_table} (id)'}
|
|
|
|
# add item fields
|
|
for field in self.good_fields.values():
|
|
if field.type == 'item':
|
|
comment = ('valeurs du champ « %s » du formulaire %s'
|
|
% (field.label, self.formdef.schema.name))
|
|
table_name = self.hash_table_name('%s_field_%s' % (self.table_name, field.varname))
|
|
# create table and mapping
|
|
if field.items:
|
|
self.items_mappings[field.varname] = self.create_labeled_table(
|
|
table_name, enumerate(field.items), comment=comment)
|
|
else:
|
|
# open item field, from data sources...
|
|
self.create_labeled_table_serial(table_name, comment=comment)
|
|
field_def = 'integer REFERENCES %s (id)' % quote(table_name)
|
|
elif field.type == 'bool':
|
|
field_def = 'boolean'
|
|
elif field.type == 'string':
|
|
if has_digits_validation(field):
|
|
field_def = 'integer'
|
|
else:
|
|
field_def = 'varchar'
|
|
else:
|
|
continue
|
|
columns[field.varname] = {
|
|
'field': field,
|
|
'sql_col_name': truncate_pg_identifier('field_%s' % field.varname),
|
|
'sql_col_def': field_def,
|
|
'sql_comment': field.label,
|
|
}
|
|
|
|
# keep loaded fields around
|
|
for key in columns:
|
|
if columns[key].get('field') is not None:
|
|
self.fields.append(columns[key].get('field'))
|
|
|
|
# add geolocation fields
|
|
for geolocation, label in self.formdef.schema.geolocations:
|
|
at = 'geolocation_%s' % geolocation
|
|
columns[at] = {
|
|
'sql_col_name': at,
|
|
'sql_col_def': 'point',
|
|
'comment': 'géoloc « %s »' % label,
|
|
}
|
|
|
|
# add function fields
|
|
for function, name in self.formdef.schema.workflow.functions.items():
|
|
at = 'function_%s' % slugify(function)
|
|
columns[at] = {
|
|
'sql_col_name': at,
|
|
'sql_col_def': 'smallint REFERENCES {role_table} (id)',
|
|
'comment': 'fonction « %s »' % name,
|
|
}
|
|
|
|
self.columns = [name for name, _type in self.olap_feeder.columns]
|
|
for key in columns:
|
|
self.columns.append(columns[key]['sql_col_name'])
|
|
self.columns.remove('geolocation_base')
|
|
|
|
self.create_table(self.table_name,
|
|
[(columns[key]['sql_col_name'], columns[key]['sql_col_def']) for key in columns],
|
|
inherits='{generic_formdata_table}',
|
|
comment='formulaire %s' % self.formdef.schema.name)
|
|
for key in columns:
|
|
column = columns[key]
|
|
if column.get('sql_comment'):
|
|
self.ex('COMMENT ON COLUMN {formdata_table}.%s IS %%s' % quote(column['sql_col_name']),
|
|
vars=(column['sql_comment'],))
|
|
|
|
# Creat index for JSON fields
|
|
if self.has_jsonb:
|
|
for varname in self.good_fields:
|
|
self.create_formdata_json_index(self.table_name, varname)
|
|
|
|
# PostgreSQL does not propagate foreign key constraints to child tables
|
|
# so we must recreate them manually
|
|
for name, _type in self.olap_feeder.columns:
|
|
if 'REFERENCES' not in _type:
|
|
continue
|
|
i = _type.index('REFERENCES')
|
|
constraint = '%s_fk_constraint FOREIGN KEY (%s) %s' % (name, name, _type[i:])
|
|
self.ex('ALTER TABLE {formdata_table} ADD CONSTRAINT %s' % constraint)
|
|
self.ex('ALTER TABLE {formdata_table} ADD PRIMARY KEY (id)')
|
|
# table des evolutions
|
|
self.create_table(self.evolution_table_name, [
|
|
['id', 'serial primary key'],
|
|
['status_id', 'smallint REFERENCES {status_table} (id)'],
|
|
['formdata_id', 'integer REFERENCES {formdata_table} (id)'],
|
|
['time', 'timestamp'],
|
|
['date', 'date'],
|
|
['hour_id', 'smallint REFERENCES {hour_table} (id)'],
|
|
])
|
|
self.ex('COMMENT ON TABLE {evolution_table} IS %s',
|
|
vars=('evolution des demandes %s' % self.formdef.schema.name,))
|
|
|
|
def insert_item_value(self, field, value):
|
|
table_name = self.hash_table_name('%s_field_%s' % (self.table_name, field.varname))
|
|
self.ex('SELECT id FROM {item_table} WHERE label = %s',
|
|
ctx={'item_table': quote(table_name)}, vars=(value,))
|
|
res = self.cur.fetchone()
|
|
if res:
|
|
return res[0]
|
|
self.ex('INSERT INTO {item_table} (label) VALUES (%s) RETURNING (id)', vars=(value,),
|
|
ctx={'item_table': quote(table_name)})
|
|
return self.cur.fetchone()[0]
|
|
|
|
def get_item_id(self, field, value):
|
|
assert field
|
|
assert field.varname
|
|
assert value
|
|
|
|
mapping = self.item_id_mappings.setdefault(field.varname, {})
|
|
if value not in mapping:
|
|
mapping[value] = self.insert_item_value(field, value)
|
|
return mapping[value]
|
|
|
|
def generic_status(self, status):
|
|
if status.endpoint:
|
|
generic_status = 3
|
|
elif status.startpoint:
|
|
generic_status = 1
|
|
else:
|
|
generic_status = 2
|
|
return generic_status
|
|
|
|
def do_data(self):
|
|
values = []
|
|
generic_evolution_values = []
|
|
evolution_values = []
|
|
for data in self.formdef.formdatas.anonymized.full:
|
|
json_data = {}
|
|
|
|
# ignore formdata without status
|
|
if data.workflow.status:
|
|
status_id = data.workflow.status.id
|
|
elif data.evolution:
|
|
for evolution in reversed(data.evolution):
|
|
if evolution.status:
|
|
status_id = evolution.status
|
|
break
|
|
else:
|
|
continue
|
|
else:
|
|
continue
|
|
|
|
try:
|
|
status = data.formdef.schema.workflow.statuses_map[status_id]
|
|
except KeyError:
|
|
self.logger.warning('%s.%s unknown status status_id %s',
|
|
data.formdef.schema.name, data.id, status_id)
|
|
continue
|
|
|
|
channel = data.submission.channel.lower()
|
|
if channel == 'web' and data.submission.backoffice:
|
|
channel = 'backoffice'
|
|
row = {
|
|
'formdef_id': self.formdefs_mapping[self.formdef.slug],
|
|
'receipt_time': data.receipt_time,
|
|
'hour_id': data.receipt_time.hour,
|
|
'channel_id': self.channel_to_id[channel],
|
|
'backoffice': data.submission.backoffice,
|
|
'generic_status_id': self.generic_status(status),
|
|
'status_id': self.status_mapping[status_id],
|
|
'endpoint_delay': data.endpoint_delay,
|
|
'first_agent_id': self.get_first_agent_in_evolution(data),
|
|
}
|
|
# add form fields value
|
|
for field in self.fields:
|
|
v = None
|
|
raw = None
|
|
if field.varname in data.fields:
|
|
raw = data.fields[field.varname]
|
|
elif field.varname in data.workflow.fields:
|
|
raw = data.workflow.fields[field.varname]
|
|
else:
|
|
raw = None
|
|
if field.type == 'item':
|
|
# map items to sql id
|
|
if field.items or field.options:
|
|
v = self.items_mappings[field.varname].get(raw)
|
|
elif raw:
|
|
v = self.get_item_id(field, raw)
|
|
else:
|
|
v = None
|
|
elif field.type == 'string':
|
|
if has_digits_validation(field):
|
|
if raw is not None and raw.isnumeric():
|
|
v = int(raw)
|
|
else:
|
|
v = raw
|
|
elif field.type == 'bool':
|
|
v = raw
|
|
|
|
# unstructured storage of field values
|
|
if field.varname and raw is not None:
|
|
json_data[field.varname] = raw
|
|
|
|
row[truncate_pg_identifier('field_%s' % field.varname)] = v
|
|
if self.has_jsonb:
|
|
row['json_data'] = json.dumps(json_data)
|
|
# add geolocation fields value
|
|
for geolocation, label in self.formdef.schema.geolocations:
|
|
v = (data.geolocations or {}).get(geolocation)
|
|
if v:
|
|
v = '(%.6f, %.6f)' % (v.get('lon'), v.get('lat'))
|
|
row['geolocation_%s' % geolocation] = v
|
|
# add function fields value
|
|
for function, name in self.formdef.schema.workflow.functions.items():
|
|
try:
|
|
v = data.functions[function]
|
|
except KeyError:
|
|
v = None
|
|
else:
|
|
if v and v.id in self.olap_feeder.role_mapping:
|
|
v = self.olap_feeder.role_mapping[v.id]
|
|
else:
|
|
v = None
|
|
at = 'function_%s' % slugify(function)
|
|
row[at] = v
|
|
|
|
values.append(tuple(row[column] for column in self.columns[1:]))
|
|
# inert evolutions
|
|
generic_evolution = []
|
|
evolution = []
|
|
last_status = None
|
|
for evo in data.evolution:
|
|
if not evo.status:
|
|
continue
|
|
try:
|
|
status = data.formdef.schema.workflow.statuses_map[evo.status]
|
|
except KeyError:
|
|
self.logger.warning('%s.%s unknown status in evolution %s',
|
|
data.formdef.schema.name, data.id, evo.status)
|
|
continue
|
|
status_id = self.status_mapping[status.id]
|
|
generic_status_id = self.generic_status(status)
|
|
evolution.append(
|
|
[0, status_id, evo.time, evo.time.date(), evo.time.hour])
|
|
if generic_status_id == last_status:
|
|
continue
|
|
generic_evolution.append(
|
|
[0, generic_status_id, evo.time, evo.time.date(), evo.time.hour])
|
|
last_status = generic_status_id
|
|
generic_evolution_values.append(generic_evolution)
|
|
evolution_values.append(evolution)
|
|
if not values:
|
|
self.logger.warning('no data')
|
|
return
|
|
insert_columns = ['%s' % quote(column) for column in self.columns[1:]]
|
|
insert_columns = ', '.join(insert_columns)
|
|
self.ex('INSERT INTO {formdata_table} ({columns}) VALUES {values} RETURNING id',
|
|
ctx=dict(
|
|
columns=insert_columns,
|
|
values=', '.join(['%s'] * len(values))
|
|
),
|
|
vars=values)
|
|
|
|
# insert generic evolutions
|
|
generic_evolutions = []
|
|
ids = list(self.cur.fetchall())
|
|
for evos, (formdata_id,) in zip(generic_evolution_values, ids):
|
|
for row in evos:
|
|
row[0] = formdata_id
|
|
generic_evolutions.append(tuple(row))
|
|
if len(generic_evolutions) == 500:
|
|
self.ex('INSERT INTO {generic_evolution_table} (%s) VALUES %s' % (
|
|
', '.join(['formdata_id', 'generic_status_id', 'time', 'date', 'hour_id']),
|
|
', '.join(['%s'] * len(generic_evolutions))), vars=generic_evolutions)
|
|
generic_evolutions = []
|
|
if generic_evolutions:
|
|
self.ex('INSERT INTO {generic_evolution_table} (%s) VALUES %s' % (
|
|
', '.join(['formdata_id', 'generic_status_id', 'time', 'date', 'hour_id']),
|
|
', '.join(['%s'] * len(generic_evolutions))), vars=generic_evolutions)
|
|
|
|
# insert evolutions
|
|
evolutions = []
|
|
for evos, (formdata_id,) in zip(evolution_values, ids):
|
|
for row in evos:
|
|
row[0] = formdata_id
|
|
evolutions.append(tuple(row))
|
|
if len(evolutions) == 500:
|
|
self.ex('INSERT INTO {evolution_table} (%s) VALUES %s' % (
|
|
', '.join(['formdata_id', 'status_id', 'time', 'date', 'hour_id']),
|
|
', '.join(['%s'] * len(evolutions))), vars=evolutions)
|
|
evolutions = []
|
|
if evolutions:
|
|
self.ex('INSERT INTO {evolution_table} (%s) VALUES %s' % (
|
|
', '.join(['formdata_id', 'status_id', 'time', 'date', 'hour_id']),
|
|
', '.join(['%s'] * len(evolutions))), vars=evolutions)
|
|
|
|
def get_first_agent_in_evolution(self, formdata):
|
|
for evo in formdata.evolution:
|
|
if evo.who:
|
|
return self.get_agent(evo.who)
|
|
|
|
def feed(self):
|
|
self.olap_feeder.ctx.push({
|
|
'formdata_table': quote(self.table_name),
|
|
'status_table': quote(self.status_table_name),
|
|
'evolution_table': quote(self.evolution_table_name),
|
|
})
|
|
|
|
# create cube
|
|
cube = self.cube = copy.deepcopy(self.base_cube)
|
|
|
|
def add_warning(message):
|
|
self.logger.warning('%s', message)
|
|
cube.setdefault('warnings', []).append(message)
|
|
|
|
# remove json field from formdef cubes
|
|
cube.pop('json_field', None)
|
|
cube.update({
|
|
'name': self.table_name,
|
|
'label': self.formdef.schema.name,
|
|
'fact_table': quote(self.table_name),
|
|
'key': 'id',
|
|
})
|
|
cube['dimensions'] = [dimension for dimension in cube['dimensions']
|
|
if dimension['name'] not in ('category', 'formdef')]
|
|
|
|
# add dimension for status
|
|
cube['joins'].append({
|
|
'name': 'status',
|
|
'table': quote(self.status_table_name),
|
|
'master': 'status_id',
|
|
'detail': 'id',
|
|
'kind': 'left',
|
|
})
|
|
cube['dimensions'].append({
|
|
'name': 'status',
|
|
'label': 'statut',
|
|
'join': ['status'],
|
|
'type': 'integer',
|
|
'value': 'status.id',
|
|
'value_label': 'status.label',
|
|
})
|
|
|
|
# add dimension for function
|
|
for function, name in self.formdef.schema.workflow.functions.items():
|
|
at = 'function_%s' % slugify(function)
|
|
cube['joins'].append({
|
|
'name': at,
|
|
'table': 'role',
|
|
'master': quote(at),
|
|
'detail': 'id',
|
|
'kind': 'left',
|
|
})
|
|
cube['dimensions'].append({
|
|
'name': at,
|
|
'label': 'fonction %s' % name.lower(),
|
|
'join': [at],
|
|
'type': 'integer',
|
|
'value': '%s.id' % quote(at),
|
|
'value_label': '%s.label' % quote(at),
|
|
'filter': False,
|
|
})
|
|
|
|
# add dimensions for item fields
|
|
fields = self.formdef.schema.fields
|
|
if self.formdef.schema.workflow:
|
|
fields += self.formdef.schema.workflow.fields
|
|
|
|
# filter duplicates
|
|
self.good_fields = good_fields = OrderedDict()
|
|
bad_varnames = set()
|
|
for field in fields:
|
|
if field.type not in ('item', 'bool', 'string'):
|
|
continue
|
|
if field.anonymise is True:
|
|
continue
|
|
if not field.varname:
|
|
add_warning('Le champ « %s » n\' a pas de nom de variable, il a été ignoré.' % field.label)
|
|
continue
|
|
if field.varname in good_fields and good_fields[field.varname].type != field.type:
|
|
# duplicate found, but type is not coherent
|
|
add_warning(
|
|
'Le champ « %(label)s » a un nom de variable dupliqué « %(varname)s » '
|
|
'mais pas le même type que « %(label_good)s », tous les champs avec ce nom seront ignorés '
|
|
'(%(type)s != %(type_good)s).' % {
|
|
'label': field.label,
|
|
'varname': field.varname,
|
|
'type': field.type,
|
|
'label_good': good_fields[field.varname].label,
|
|
'type_good': good_fields[field.varname].type,
|
|
}
|
|
)
|
|
del self.good_fields[field.varname]
|
|
bad_varnames.add(field.varname)
|
|
continue
|
|
if field.varname in bad_varnames:
|
|
add_warning('Le champ « %s » est un doublon d\'un champ de type différent, il a été ignoré.'
|
|
% field.label)
|
|
continue
|
|
self.good_fields[field.varname] = field
|
|
|
|
for field in good_fields.values():
|
|
join = None
|
|
|
|
join_name = 'join_' + field.varname
|
|
field_name = truncate_pg_identifier('field_%s' % field.varname)
|
|
dimension_name = field.varname
|
|
dimension_label = field.label.lower()
|
|
|
|
if field.type == 'item':
|
|
table_name = self.hash_table_name('%s_field_%s' % (self.table_name, field.varname))
|
|
join = {
|
|
'name': join_name,
|
|
'table': quote(table_name),
|
|
'master': quote(field_name),
|
|
'detail': 'id',
|
|
'kind': 'left',
|
|
}
|
|
dimension = {
|
|
'name': dimension_name,
|
|
'label': dimension_label,
|
|
'join': [join_name],
|
|
'type': 'integer',
|
|
'value': '%s.id' % quote(join_name),
|
|
'value_label': '%s.label' % quote(join_name),
|
|
'filter': True,
|
|
}
|
|
elif field.type == 'bool':
|
|
dimension = {
|
|
'name': dimension_name,
|
|
'label': dimension_label,
|
|
'type': 'bool',
|
|
'value': quote(field_name),
|
|
'value_label': '(CASE WHEN %(field)s IS NULL THEN NULL'
|
|
' WHEN %(field)s THEN \'Oui\''
|
|
' ELSE \'Non\' END)' % {
|
|
'field': quote(field_name),
|
|
},
|
|
'filter': True,
|
|
}
|
|
elif field.type == 'string':
|
|
if has_digits_validation(field):
|
|
# we will define a SUM measure instead
|
|
cube['measures'].append({
|
|
'name': 'sum_' + dimension_name,
|
|
'label': 'total du champ « %s »' % dimension_label,
|
|
'type': 'integer',
|
|
'expression': 'SUM({fact_table}.%s)' % quote(field_name),
|
|
})
|
|
continue
|
|
else:
|
|
dimension = {
|
|
'name': dimension_name,
|
|
'label': dimension_label,
|
|
'type': 'string',
|
|
'value': quote(field_name),
|
|
'filter': True,
|
|
}
|
|
else:
|
|
continue
|
|
if join:
|
|
cube['joins'].append(join)
|
|
cube['dimensions'].append(dimension)
|
|
|
|
self.model['cubes'].append(cube)
|
|
if self.do_feed:
|
|
try:
|
|
self.logger.info('feed formdef %s', self.formdef.slug)
|
|
self.do_statuses()
|
|
self.do_data_table()
|
|
self.do_data()
|
|
finally:
|
|
self.olap_feeder.ctx.pop()
|