wcs-olap/wcs_olap/feeder.py

1172 lines
47 KiB
Python

# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from collections import OrderedDict
import contextlib
import datetime
import copy
import itertools
import os
import json
import hashlib
import reprlib
from .utils import Whatever
import psycopg2
import psycopg2.errorcodes
from cached_property import cached_property
from wcs_olap.wcs_api import WcsApiError
psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
psycopg2.extensions.register_type(psycopg2.extensions.UNICODEARRAY)
def truncate_pg_identifier(identifier, hash_length=6, force_hash=False):
if len(identifier) < 64 and not force_hash:
return identifier
else:
# insert hash in the middle, to keep some readability
return (
identifier[:(63 - hash_length) // 2]
+ hashlib.md5(identifier.encode('utf-8')).hexdigest()[:hash_length]
+ identifier[-(63 - hash_length) // 2:])
@contextlib.contextmanager
def ignore_undefined_object_or_table():
try:
yield
except psycopg2.ProgrammingError as e:
if e.pgcode not in [psycopg2.errorcodes.UNDEFINED_TABLE, psycopg2.errorcodes.UNDEFINED_OBJECT]:
raise
def quote(name):
return '"%s"' % name
def slugify(s):
return s.replace('-', '_').replace(' ', '_')
class Context(object):
def __init__(self):
self.stack = []
def push(self, d):
self.stack.append(d)
def pop(self):
self.stack = self.stack[:-1]
def as_dict(self):
r = {}
for d in self.stack:
r.update(d)
return r
class WcsOlapFeeder(object):
channels = [
[1, 'web', 'web'],
[2, 'mail', 'courrier'],
[3, 'phone', 'téléphone'],
[4, 'counter', 'guichet'],
[5, 'backoffice', 'backoffice'],
[6, 'email', 'email'],
[7, 'fax', 'fax'],
[8, 'social-network', 'réseau social'],
]
channel_to_id = dict((c[1], c[0]) for c in channels)
id_to_channel = dict((c[0], c[1]) for c in channels)
status = [
[1, 'Nouveau'],
[2, 'En cours'],
[3, 'Terminé'],
]
status_to_id = dict((c[1], c[0]) for c in channels)
id_to_status = dict((c[0], c[1]) for c in channels)
def __init__(self, api, pg_dsn, schema, logger=None, config=None, do_feed=True, fake=False, slugs=None):
self.api = api
self.slugs = slugs
self.fake = fake
self.logger = logger or Whatever()
if len(schema) > 63:
raise ValueError('schema name length must < 64 characters: %r' % schema)
self.schema = schema
self.schema_temp = truncate_pg_identifier(schema + '_temp')
self.do_feed = do_feed
self.ctx = Context()
self.ctx.push({
'schema': self.schema,
'schema_temp': self.schema_temp,
'role_table': 'role',
'channel_table': 'channel',
'category_table': 'category',
'form_table': 'formdef',
'generic_formdata_table': 'formdata',
'generic_status_table': 'status',
'generic_evolution_table': 'evolution',
'year_table': 'year',
'month_table': 'month',
'day_table': 'day',
'dow_table': 'dow',
'hour_table': 'hour',
'agent_table': 'agent',
})
self.config = config or {}
self.model = {
'label': self.config.get('cubes_label', schema),
'name': schema,
'search_path': [schema, 'public'],
'pg_dsn': pg_dsn,
'cubes': [],
}
if 'cubes_slug' in self.config:
self.model['slug'] = self.config['cubes_slug']
cube = {
'name': 'all_formdata',
'label': 'Tous les formulaires',
'fact_table': 'formdata',
'key': 'id',
'joins': [
{
'name': 'receipt_time',
'table': 'dates',
'detail': 'date',
'master': 'receipt_time',
'kind': 'right',
},
{
'name': 'channel',
'table': 'channel',
'master': 'channel_id',
'detail': 'id',
'kind': 'left',
},
{
'name': 'formdef',
'table': 'formdef',
'master': 'formdef_id',
'detail': 'id',
'kind': 'left',
},
{
'name': 'category',
'table': 'category',
'master': 'formdef.category_id',
'detail': 'id',
'kind': 'left',
},
{
'name': 'hour',
'table': 'hour',
'master': 'hour_id',
'detail': 'id',
'kind': 'right',
},
{
'name': 'generic_status',
'table': 'status',
'master': 'generic_status_id',
'detail': 'id',
'kind': 'left',
},
{
'name': 'agent',
'table': 'agent',
'master': 'first_agent_id',
'detail': 'id',
'kind': 'left',
},
],
'dimensions': [
{
'name': 'receipt_time',
'label': 'date de la demande',
'join': ['receipt_time'],
'type': 'date',
'value': 'receipt_time.date',
},
{
'name': 'channel',
'label': 'canal',
'join': ['channel'],
'type': 'integer',
'value': 'channel.id',
'value_label': 'channel.label',
},
{
'name': 'category',
'label': 'catégorie',
'join': ['formdef', 'category'],
'type': 'string',
'value': 'category.ref',
'value_label': 'category.label',
'order_by': 'category.label',
},
{
'name': 'formdef',
'label': 'formulaire',
'join': ['formdef'],
'type': 'string',
'value': 'formdef.ref',
'value_label': 'formdef.label',
'order_by': 'formdef.label',
},
{
'name': 'generic_status',
'label': 'statut simplifié',
'join': ['generic_status'],
'type': 'integer',
'value': 'generic_status.id',
'value_label': 'generic_status.label',
},
{
'name': 'hour',
'label': 'heure',
'join': ['hour'],
'type': 'integer',
'value': 'hour.id',
'filter': False,
},
{
'name': 'agent',
'label': 'premier agent traitant',
'join': ['agent'],
'type': 'integer',
'value': 'agent.id',
'value_label': 'agent.label',
'order_by': 'agent.label',
},
],
'measures': [
{
'name': 'count',
'label': 'nombre de demandes',
'type': 'integer',
'expression': 'count({fact_table}.id)',
},
{
'name': 'avg_endpoint_delay',
'label': 'délai de traitement moyen',
'type': 'duration',
'expression': 'avg(endpoint_delay)',
},
{
'name': 'max_endpoint_delay',
'label': 'délai de traitement maximum',
'type': 'duration',
'expression': 'max(endpoint_delay)',
},
{
'name': 'min_endpoint_delay',
'label': 'délai de traitement minimum',
'type': 'duration',
'expression': 'min(endpoint_delay)',
},
{
'name': 'percent',
'label': 'pourcentage des demandes',
'type': 'percent',
"expression": 'case (select count({fact_table}.id) from {table_expression} '
'where {where_conditions}) when 0 then null else '
'count({fact_table}.id) * 100. / (select '
'count({fact_table}.id) from {table_expression} where '
'{where_conditions}) end',
},
{
'name': 'geolocation',
'label': 'localisation géographique',
'type': 'point',
'expression': 'array_agg({fact_table}.geolocation_base) '
'FILTER (WHERE {fact_table}.geolocation_base IS NOT NULL)',
}
]
}
self.model['cubes'].append(cube)
self.base_cube = self.model['cubes'][0]
self.agents_mapping = {}
self.formdata_json_index = []
# keep at end of __init__ to prevent leak if __init__ raises
self.connection = psycopg2.connect(dsn=pg_dsn)
self.connection.autocommit = True
self.cur = self.connection.cursor()
try:
self.has_jsonb = self.detect_jsonb()
if self.has_jsonb:
cube['json_field'] = 'json_data'
except Exception:
self.connection.close()
raise
@cached_property
def formdefs(self):
return [formdef for formdef in self.api.formdefs if (not self.slugs or formdef.slug in self.slugs) and not formdef.is_empty]
@cached_property
def roles(self):
return self.api.roles
@cached_property
def categories(self):
return self.api.categories
def detect_jsonb(self):
self.cur.execute("SELECT 1 FROM pg_type WHERE typname = 'jsonb'")
return bool(self.cur.rowcount)
def hash_table_name(self, table_name, hash_length=6, force_hash=False):
table_name = table_name.format(**self.default_ctx)
return truncate_pg_identifier(table_name, hash_length=hash_length, force_hash=force_hash)
@property
def default_ctx(self):
return self.ctx.as_dict()
def ex(self, query, ctx=None, vars=None):
ctx = ctx or {}
ctx.update(self.default_ctx)
sql = query.format(**(ctx or {}))
self.logger.debug('SQL: %s VARS: %s', sql, vars)
try:
self.cur.execute(sql, vars=vars)
except Exception as e:
self.logger.error('Failed to execute %r with vars %s, raised %s', sql, reprlib.repr(vars or []), e)
raise
def do_schema(self):
self.ex('SET search_path = public')
self.logger.debug('dropping schema %s', self.schema_temp)
self.drop_tables_sequencially(self.schema_temp)
self.ex('DROP SCHEMA IF EXISTS {schema_temp} CASCADE')
self.logger.debug('creating schema %s', self.schema)
self.ex('CREATE SCHEMA {schema_temp}')
self.ex('SET search_path = {schema_temp},public')
def drop_tables_sequencially(self, schema):
"""
Drop tables one by one in order to avoid reaching max_locks_per_transaction
"""
# drop foreign key constraints first
self.ex("SELECT table_name, constraint_name FROM "
"information_schema.key_column_usage "
"WHERE table_schema = %s AND constraint_name LIKE '%%_fkey'", vars=[schema])
for table_name, constraint_name in self.cur.fetchall():
# drop of PK constraints can have effects on FK constraint on other tables.
with ignore_undefined_object_or_table():
self.ex('ALTER TABLE %s.%s DROP CONSTRAINT IF EXISTS %s CASCADE'
% (quote(schema), quote(table_name), quote(constraint_name)))
# remove others
self.ex("SELECT table_name, constraint_name FROM "
"information_schema.key_column_usage "
"WHERE table_schema = %s", vars=[schema])
for table_name, constraint_name in self.cur.fetchall():
# drop of PK constraints can have effects on FK constraint on other tables.
with ignore_undefined_object_or_table():
self.ex('ALTER TABLE %s.%s DROP CONSTRAINT IF EXISTS %s CASCADE'
% (quote(schema), quote(table_name), quote(constraint_name)))
# then drop indexes
self.ex("SELECT tablename, indexname FROM pg_indexes WHERE schemaname = %s", vars=[schema])
for table_name, index_name in self.cur.fetchall():
with ignore_undefined_object_or_table():
self.ex('DROP INDEX %s.%s CASCADE' % (quote(schema), quote(index_name)))
# finally drop tables, cascade will have no effect
self.ex("SELECT tablename FROM pg_tables WHERE schemaname = %s ORDER BY tablename DESC", vars=[schema])
for table in self.cur.fetchall():
tablename = '%s.%s' % (quote(schema), quote(table[0]))
self.ex('DROP TABLE IF EXISTS %s;' % tablename)
def do_dates_table(self):
self.ex('CREATE TABLE IF NOT EXISTS public.dates (date date, day text, month text)')
self.ex('CREATE INDEX IF NOT EXISTS dates_index ON public.dates (date)')
self.ex('SELECT MIN(date) FROM public.dates')
max_date = self.cur.fetchone()[0]
first_date = max_date or datetime.date(2010, 1, 1)
last_date = (datetime.date.today() + datetime.timedelta(days=150)).replace(month=12, day=31)
self.ex('''INSERT INTO public.dates SELECT
the_date.the_date::date AS date,
to_char(the_date.the_date, 'TMday') AS day,
to_char(the_date.the_date, 'TMmonth') AS month
FROM generate_series(%s::date, %s::date, '1 day'::interval)
AS the_date(the_date)
LEFT JOIN public.dates AS dates
ON the_date.the_date = dates.date
WHERE dates.date IS NULL''',
vars=[first_date, last_date])
def create_table(self, name, columns, inherits=None, comment=None):
sql = 'CREATE TABLE %s' % quote(name)
sql += '(' + ', '.join('%s %s' % (quote(n), t) for n, t in columns) + ')'
if inherits:
sql += ' INHERITS (%s)' % quote(inherits)
self.ex(sql)
if comment:
self.ex('COMMENT ON TABLE %s IS %%s' % quote(name), vars=(comment,))
def prev_table_exists(self, name):
query = """SELECT EXISTS (SELECT 1 FROM information_schema.tables
WHERE table_schema = '{schema}' AND table_name = %s)"""
self.ex(query, vars=(name,))
return self.cur.fetchone()[0]
def update_table_sequence_number(self, name):
self.ex("""SELECT setval(pg_get_serial_sequence('{name}', 'id'),
(SELECT GREATEST(1, MAX(id)) FROM {name}))""", ctx={'name': quote(name)})
def create_labeled_table_serial(self, name, comment):
self.create_table(
name, [['id', 'serial primary key'], ['label', 'varchar']], comment=comment)
if self.prev_table_exists(name):
# Insert data from previous table
self.ex(
'INSERT INTO {schema_temp}.{name} SELECT * FROM {schema}.{name}',
ctx={'name': quote(name)}
)
self.update_table_sequence_number(name)
def create_referenced_table(self, name, fields, comment):
# add primary key and reference fields
new_fields = [['id', 'serial primary key'], ['ref', 'varchar UNIQUE']] + fields
self.create_table(name, new_fields, comment=comment)
def do_referenced_data(self, name, data, result_column, update_column='label'):
to_insert = []
for item in data:
ref = item[0]
self.ex(
'SELECT ref, {column} FROM {name} WHERE ref = %s',
ctx={'name': quote(name), 'column': quote(update_column)},
vars=(ref,))
if self.cur.fetchall():
for item in self.cur.fetchall():
self.ex('UPDATE {name} SET {column}=%s WHERE ref=%s',
ctx={'name': quote(name), 'column': quote(update_column)},
vars=[item[1], ref])
else:
to_insert.append(item)
if to_insert:
columns_values = ', '.join(['%s' for x in range(len(item))])
tmpl = ', '.join(['(DEFAULT, %s)' % columns_values] * len(data))
query = 'INSERT INTO {name} VALUES %s' % tmpl
self.ex(query, ctx={'name': quote(name)}, # 'column': quote(update_column)},
vars=list(itertools.chain(*to_insert)))
result = {}
self.ex('SELECT id, {column} FROM {name}', ctx={'name': quote(name),
'column': result_column})
for _id, column in self.cur.fetchall():
result[column] = _id
return result
def create_labeled_table(self, name, labels, comment=None):
self.create_table(
name,
[
['id', 'integer primary key'],
['label', 'varchar']
], comment=comment)
if self.prev_table_exists(name):
# Insert data from previous table
self.ex(
'INSERT INTO {schema_temp}.{name} select * FROM {schema}.{name}',
ctx={'name': quote(name)}
)
# Find what is missing
to_insert = []
for _id, _label in labels:
self.ex(
'SELECT * FROM {name} WHERE label = %s', ctx={'name': quote(name)}, vars=(_label,))
if self.cur.fetchone() is None:
to_insert.append(_label)
labels = None
if to_insert:
self.ex('SELECT MAX(id) FROM {name}', ctx={'name': quote(name)})
next_id = (self.cur.fetchone()[0] or 0) + 1
ids = range(next_id, next_id + len(to_insert))
labels = zip(ids, to_insert)
if labels is not None:
# turn enumerate generator object into a proper list
labels = list(labels)
if labels:
tmpl = ', '.join(['(%s, %s)'] * len(labels))
query_str = 'INSERT INTO {name} (id, label) VALUES %s' % tmpl
self.ex(query_str, ctx={'name': quote(name)}, vars=list(itertools.chain(*labels)))
res = {}
self.ex('SELECT id, label FROM {name}', ctx={'name': quote(name)})
for id_, label in self.cur.fetchall():
res[label] = id_
return res
def do_category_table(self):
fields = [['label', 'varchar']]
table_name = self.default_ctx['category_table']
self.create_referenced_table(table_name, fields, 'catégorie')
categories_data = [(c.slug, c.title) for c in self.categories]
tmp_cat_map = self.do_referenced_data(table_name, categories_data, 'label')
self.update_table_sequence_number(table_name)
# remap categories ids to ids in the table
return dict((c.title, tmp_cat_map[c.title]) for c in self.categories)
def do_formdef_table(self):
categories_mapping = self.do_category_table()
formdef_fields = [['category_id', 'integer REFERENCES {category_table} (id)'],
['label', 'varchar']
]
table_name = self.default_ctx['form_table']
self.create_referenced_table(table_name, formdef_fields, 'types de formulaire')
formdefs = [(form.slug, categories_mapping.get(form.schema.category),
form.schema.name) for form in self.formdefs]
self.formdefs_mapping = self.do_referenced_data(table_name, formdefs, 'ref')
self.update_table_sequence_number(table_name)
def do_base_table(self):
# channels
self.create_labeled_table('channel', [[c[0], c[2]] for c in self.channels],
comment='canal')
# roles
roles = dict((i, role.name) for i, role in enumerate(self.roles))
tmp_role_map = self.create_labeled_table('role', roles.items(), comment='role')
self.role_mapping = dict(
(role.id, tmp_role_map[role.name]) for role in self.roles)
# forms
self.do_formdef_table()
self.create_labeled_table('hour', zip(range(0, 24), map(str, range(0, 24))),
comment='heures')
self.create_labeled_table('status', self.status,
comment='statuts simplifiés')
# agents
self.create_labeled_table_serial('agent', comment='agents')
self.columns = [
['id', 'serial primary key'],
['formdef_id', 'smallint REFERENCES {form_table} (id)'],
['receipt_time', 'date'],
['hour_id', 'smallint REFERENCES {hour_table} (id)'],
['channel_id', 'smallint REFERENCES {channel_table} (id)'],
['backoffice', 'boolean'],
['generic_status_id', 'smallint REFERENCES {generic_status_table} (id)'],
['endpoint_delay', 'interval'],
['first_agent_id', 'smallint REFERENCES {agent_table} (id)'],
['geolocation_base', 'POINT NULL'],
]
if self.has_jsonb:
self.columns.append(['json_data', 'JSONB NULL'])
self.comments = {
'formdef_id': 'formulaire',
'receipt_time': 'date de réception',
'hour_id': 'heure',
'channel_id': 'canal',
'backoffice': 'soumission backoffce',
'generic_status_id': 'statut simplifié',
'endpoint_delay': 'délai de traitement',
'geolocation_base': 'position géographique',
}
self.create_table('{generic_formdata_table}', self.columns)
for at, comment in self.comments.items():
self.ex('COMMENT ON COLUMN {generic_formdata_table}.%s IS %%s' % at, vars=(comment,))
self.ex('COMMENT ON TABLE {generic_formdata_table} IS %s', vars=('tous les formulaires',))
# evolutions
self.create_table('{generic_evolution_table}', [
['id', 'serial primary key'],
['generic_status_id', 'smallint REFERENCES {generic_status_table} (id)'],
['formdata_id', 'integer'], # "REFERENCES {generic_formdata_table} (id)" is impossible because FK constraints do not work on inherited tables
['time', 'timestamp'],
['date', 'date'],
['hour_id', 'smallint REFERENCES {hour_table} (id)'],
])
self.ex('COMMENT ON TABLE {generic_evolution_table} IS %s', vars=('evolution générique',))
def feed(self):
try:
if self.do_feed:
self.do_schema()
self.do_dates_table()
self.do_base_table()
for formdef in self.formdefs:
self.api.cache = {}
try:
formdef_feeder = WcsFormdefFeeder(self, formdef, do_feed=self.do_feed)
formdef_feeder.feed()
except WcsApiError as e:
self.logger.error('failed to retrieve formdef %s, %s', formdef.slug, e)
except Exception:
# keep temporary schema alive for debugging
raise
else:
if self.do_feed:
if not self.fake:
self.logger.debug('dropping schema %s', self.schema)
self.drop_tables_sequencially(self.schema)
self.ex('DROP SCHEMA IF EXISTS {schema} CASCADE')
self.logger.debug('renaming schema %s to %s', self.schema + '_temp', self.schema)
self.ex('ALTER SCHEMA {schema_temp} RENAME TO {schema}')
if 'cubes_model_dirs' in self.config:
model_path = os.path.join(self.config['cubes_model_dirs'], '%s.model' % self.schema)
with open(model_path, 'w') as f:
json.dump(self.model, f, indent=2, sort_keys=True)
finally:
# prevent connection from remaining open
self.cur.close()
self.connection.close()
def insert_agent(self, name):
self.ex('SELECT id FROM {agent_table} WHERE label = %s', vars=(name,))
res = self.cur.fetchone()
if res:
return res[0]
self.ex('INSERT INTO {agent_table} (label) VALUES (%s) RETURNING (id)', vars=(name,))
return self.cur.fetchone()[0]
def get_agent(self, user):
assert user.name
assert user.id
if user.id not in self.agents_mapping:
self.agents_mapping[user.id] = self.insert_agent(user.name)
return self.agents_mapping[user.id]
def create_formdata_json_index(self, table_name, varname):
if varname in self.formdata_json_index:
return
index_name = self.hash_table_name('%s_%s_json_idx' % (table_name, varname), hash_length=8,
force_hash=True)
self.ex('CREATE INDEX {index_name} ON {generic_formdata_table} (("json_data"->>%s))',
ctx={'index_name': quote(index_name)}, vars=[varname])
# prevent double creation
self.formdata_json_index.append(varname)
def has_digits_validation(field):
return field.validation and field.validation.get('type') == 'digits'
class WcsFormdefFeeder(object):
def __init__(self, olap_feeder, formdef, do_feed=True):
self.olap_feeder = olap_feeder
self.formdef = formdef
self.status_mapping = {}
self.items_mappings = {}
self.do_feed = do_feed
self.fields = []
self.item_id_mappings = {}
@property
def table_name(self):
return self.hash_table_name('formdata_%s' % self.formdef.slug.replace('-', '_'))
@property
def status_table_name(self):
return self.hash_table_name('status_%s' % self.formdef.slug.replace('-', '_'))
@property
def evolution_table_name(self):
return self.hash_table_name('evolution_%s' % self.formdef.slug.replace('-', '_'))
def __getattr__(self, name):
return getattr(self.olap_feeder, name)
def do_statuses(self):
statuses = self.formdef.schema.workflow.statuses
tmp_status_map = self.olap_feeder.create_labeled_table(
self.status_table_name, enumerate([s.name for s in statuses]),
comment='statuts du formulaire « %s »' % self.formdef.schema.name)
self.status_mapping = dict((s.id, tmp_status_map[s.name]) for s in statuses)
def do_data_table(self):
columns = OrderedDict()
columns['status_id'] = {'sql_col_name': 'status_id', 'sql_col_def': 'smallint REFERENCES {status_table} (id)'}
# add item fields
for field in self.good_fields.values():
if field.type == 'item':
comment = ('valeurs du champ « %s » du formulaire %s'
% (field.label, self.formdef.schema.name))
table_name = self.hash_table_name('%s_field_%s' % (self.table_name, field.varname))
# create table and mapping
if field.items:
self.items_mappings[field.varname] = self.create_labeled_table(
table_name, enumerate(field.items), comment=comment)
else:
# open item field, from data sources...
self.create_labeled_table_serial(table_name, comment=comment)
field_def = 'integer REFERENCES %s (id)' % quote(table_name)
elif field.type == 'bool':
field_def = 'boolean'
elif field.type == 'string':
if has_digits_validation(field):
field_def = 'integer'
else:
field_def = 'varchar'
else:
continue
columns[field.varname] = {
'field': field,
'sql_col_name': truncate_pg_identifier('field_%s' % field.varname),
'sql_col_def': field_def,
'sql_comment': field.label,
}
# keep loaded fields around
for key in columns:
if columns[key].get('field') is not None:
self.fields.append(columns[key].get('field'))
# add geolocation fields
for geolocation, label in self.formdef.schema.geolocations:
at = 'geolocation_%s' % geolocation
columns[at] = {
'sql_col_name': at,
'sql_col_def': 'point',
'comment': 'géoloc « %s »' % label,
}
# add function fields
for function, name in self.formdef.schema.workflow.functions.items():
at = 'function_%s' % slugify(function)
columns[at] = {
'sql_col_name': at,
'sql_col_def': 'smallint REFERENCES {role_table} (id)',
'comment': 'fonction « %s »' % name,
}
self.columns = [name for name, _type in self.olap_feeder.columns]
for key in columns:
self.columns.append(columns[key]['sql_col_name'])
self.columns.remove('geolocation_base')
self.create_table(self.table_name,
[(columns[key]['sql_col_name'], columns[key]['sql_col_def']) for key in columns],
inherits='{generic_formdata_table}',
comment='formulaire %s' % self.formdef.schema.name)
for key in columns:
column = columns[key]
if column.get('sql_comment'):
self.ex('COMMENT ON COLUMN {formdata_table}.%s IS %%s' % quote(column['sql_col_name']),
vars=(column['sql_comment'],))
# Creat index for JSON fields
if self.has_jsonb:
for varname in self.good_fields:
self.create_formdata_json_index(self.table_name, varname)
# PostgreSQL does not propagate foreign key constraints to child tables
# so we must recreate them manually
for name, _type in self.olap_feeder.columns:
if 'REFERENCES' not in _type:
continue
i = _type.index('REFERENCES')
constraint = '%s_fk_constraint FOREIGN KEY (%s) %s' % (name, name, _type[i:])
self.ex('ALTER TABLE {formdata_table} ADD CONSTRAINT %s' % constraint)
self.ex('ALTER TABLE {formdata_table} ADD PRIMARY KEY (id)')
# table des evolutions
self.create_table(self.evolution_table_name, [
['id', 'serial primary key'],
['status_id', 'smallint REFERENCES {status_table} (id)'],
['formdata_id', 'integer REFERENCES {formdata_table} (id)'],
['time', 'timestamp'],
['date', 'date'],
['hour_id', 'smallint REFERENCES {hour_table} (id)'],
])
self.ex('COMMENT ON TABLE {evolution_table} IS %s',
vars=('evolution des demandes %s' % self.formdef.schema.name,))
def insert_item_value(self, field, value):
table_name = self.hash_table_name('%s_field_%s' % (self.table_name, field.varname))
self.ex('SELECT id FROM {item_table} WHERE label = %s',
ctx={'item_table': quote(table_name)}, vars=(value,))
res = self.cur.fetchone()
if res:
return res[0]
self.ex('INSERT INTO {item_table} (label) VALUES (%s) RETURNING (id)', vars=(value,),
ctx={'item_table': quote(table_name)})
return self.cur.fetchone()[0]
def get_item_id(self, field, value):
assert field
assert field.varname
assert value
mapping = self.item_id_mappings.setdefault(field.varname, {})
if value not in mapping:
mapping[value] = self.insert_item_value(field, value)
return mapping[value]
def generic_status(self, status):
if status.endpoint:
generic_status = 3
elif status.startpoint:
generic_status = 1
else:
generic_status = 2
return generic_status
def do_data(self):
values = []
generic_evolution_values = []
evolution_values = []
for data in self.formdef.formdatas.anonymized.full:
json_data = {}
# ignore formdata without status
if data.workflow.real_status:
status_id = data.workflow.real_status.id
elif data.workflow.status:
status_id = data.workflow.status.id
elif data.evolution:
for evolution in reversed(data.evolution):
if evolution.status:
status_id = evolution.status
break
else:
continue
else:
continue
try:
status = data.formdef.schema.workflow.statuses_map[status_id]
except KeyError:
self.logger.warning('%s.%s unknown status status_id %s',
data.formdef.schema.name, data.id, status_id)
continue
channel = data.submission.channel.lower()
if channel == 'web' and data.submission.backoffice:
channel = 'backoffice'
row = {
'formdef_id': self.formdefs_mapping[self.formdef.slug],
'receipt_time': data.receipt_time,
'hour_id': data.receipt_time.hour,
'channel_id': self.channel_to_id[channel],
'backoffice': data.submission.backoffice,
'generic_status_id': self.generic_status(status),
'status_id': self.status_mapping[status_id],
'endpoint_delay': data.endpoint_delay,
'first_agent_id': self.get_first_agent_in_evolution(data),
}
# add form fields value
for field in self.fields:
v = None
raw = None
if field.varname in data.fields:
raw = data.fields[field.varname]
elif field.varname in data.workflow.fields:
raw = data.workflow.fields[field.varname]
else:
raw = None
if field.type == 'item':
# map items to sql id
if field.items or field.options:
v = self.items_mappings[field.varname].get(raw)
elif raw:
v = self.get_item_id(field, raw)
else:
v = None
elif field.type == 'string':
if has_digits_validation(field):
if raw is not None and raw.isnumeric():
v = int(raw)
else:
v = raw
elif field.type == 'bool':
v = raw
# unstructured storage of field values
if field.varname and raw is not None:
json_data[field.varname] = raw
row[truncate_pg_identifier('field_%s' % field.varname)] = v
if self.has_jsonb:
row['json_data'] = json.dumps(json_data)
# add geolocation fields value
for geolocation, label in self.formdef.schema.geolocations:
v = (data.geolocations or {}).get(geolocation)
if v:
v = '(%.6f, %.6f)' % (v.get('lon'), v.get('lat'))
row['geolocation_%s' % geolocation] = v
# add function fields value
for function, name in self.formdef.schema.workflow.functions.items():
try:
v = data.functions[function]
except KeyError:
v = None
else:
if v and v.id in self.olap_feeder.role_mapping:
v = self.olap_feeder.role_mapping[v.id]
else:
v = None
at = 'function_%s' % slugify(function)
row[at] = v
values.append(tuple(row[column] for column in self.columns[1:]))
# inert evolutions
generic_evolution = []
evolution = []
last_status = None
for evo in data.evolution:
if not evo.status:
continue
try:
status = data.formdef.schema.workflow.statuses_map[evo.status]
except KeyError:
self.logger.warning('%s.%s unknown status in evolution %s',
data.formdef.schema.name, data.id, evo.status)
continue
status_id = self.status_mapping[status.id]
generic_status_id = self.generic_status(status)
evolution.append(
[0, status_id, evo.time, evo.time.date(), evo.time.hour])
if generic_status_id == last_status:
continue
generic_evolution.append(
[0, generic_status_id, evo.time, evo.time.date(), evo.time.hour])
last_status = generic_status_id
generic_evolution_values.append(generic_evolution)
evolution_values.append(evolution)
if not values:
self.logger.warning('no data')
return
insert_columns = ['%s' % quote(column) for column in self.columns[1:]]
insert_columns = ', '.join(insert_columns)
self.ex('INSERT INTO {formdata_table} ({columns}) VALUES {values} RETURNING id',
ctx=dict(
columns=insert_columns,
values=', '.join(['%s'] * len(values))
),
vars=values)
# insert generic evolutions
generic_evolutions = []
ids = list(self.cur.fetchall())
for evos, (formdata_id,) in zip(generic_evolution_values, ids):
for row in evos:
row[0] = formdata_id
generic_evolutions.append(tuple(row))
if len(generic_evolutions) == 500:
self.ex('INSERT INTO {generic_evolution_table} (%s) VALUES %s' % (
', '.join(['formdata_id', 'generic_status_id', 'time', 'date', 'hour_id']),
', '.join(['%s'] * len(generic_evolutions))), vars=generic_evolutions)
generic_evolutions = []
if generic_evolutions:
self.ex('INSERT INTO {generic_evolution_table} (%s) VALUES %s' % (
', '.join(['formdata_id', 'generic_status_id', 'time', 'date', 'hour_id']),
', '.join(['%s'] * len(generic_evolutions))), vars=generic_evolutions)
# insert evolutions
evolutions = []
for evos, (formdata_id,) in zip(evolution_values, ids):
for row in evos:
row[0] = formdata_id
evolutions.append(tuple(row))
if len(evolutions) == 500:
self.ex('INSERT INTO {evolution_table} (%s) VALUES %s' % (
', '.join(['formdata_id', 'status_id', 'time', 'date', 'hour_id']),
', '.join(['%s'] * len(evolutions))), vars=evolutions)
evolutions = []
if evolutions:
self.ex('INSERT INTO {evolution_table} (%s) VALUES %s' % (
', '.join(['formdata_id', 'status_id', 'time', 'date', 'hour_id']),
', '.join(['%s'] * len(evolutions))), vars=evolutions)
def get_first_agent_in_evolution(self, formdata):
for evo in formdata.evolution:
if evo.who:
return self.get_agent(evo.who)
def feed(self):
self.olap_feeder.ctx.push({
'formdata_table': quote(self.table_name),
'status_table': quote(self.status_table_name),
'evolution_table': quote(self.evolution_table_name),
})
# create cube
cube = self.cube = copy.deepcopy(self.base_cube)
def add_warning(message):
self.logger.warning('%s', message)
cube.setdefault('warnings', []).append(message)
# remove json field from formdef cubes
cube.pop('json_field', None)
cube.update({
'name': self.table_name,
'label': self.formdef.schema.name,
'fact_table': quote(self.table_name),
'key': 'id',
})
cube['dimensions'] = [dimension for dimension in cube['dimensions']
if dimension['name'] not in ('category', 'formdef')]
# add dimension for status
cube['joins'].append({
'name': 'status',
'table': quote(self.status_table_name),
'master': 'status_id',
'detail': 'id',
'kind': 'left',
})
cube['dimensions'].append({
'name': 'status',
'label': 'statut',
'join': ['status'],
'type': 'integer',
'value': 'status.id',
'value_label': 'status.label',
})
# add dimension for function
for function, name in self.formdef.schema.workflow.functions.items():
at = 'function_%s' % slugify(function)
cube['joins'].append({
'name': at,
'table': 'role',
'master': quote(at),
'detail': 'id',
'kind': 'left',
})
cube['dimensions'].append({
'name': at,
'label': 'fonction %s' % name.lower(),
'join': [at],
'type': 'integer',
'value': '%s.id' % quote(at),
'value_label': '%s.label' % quote(at),
'filter': False,
})
# add dimensions for item fields
fields = self.formdef.schema.fields
if self.formdef.schema.workflow:
fields += self.formdef.schema.workflow.fields
# filter duplicates
self.good_fields = good_fields = OrderedDict()
bad_varnames = set()
for field in fields:
if field.type not in ('item', 'bool', 'string'):
continue
if field.anonymise is True:
continue
if not field.varname:
add_warning('Le champ « %s » n\' a pas de nom de variable, il a été ignoré.' % field.label)
continue
if field.varname in good_fields and good_fields[field.varname].type != field.type:
# duplicate found, but type is not coherent
add_warning(
'Le champ « %(label)s » a un nom de variable dupliqué « %(varname)s » '
'mais pas le même type que « %(label_good)s », tous les champs avec ce nom seront ignorés '
'(%(type)s != %(type_good)s).' % {
'label': field.label,
'varname': field.varname,
'type': field.type,
'label_good': good_fields[field.varname].label,
'type_good': good_fields[field.varname].type,
}
)
del self.good_fields[field.varname]
bad_varnames.add(field.varname)
continue
if field.varname in bad_varnames:
add_warning('Le champ « %s » est un doublon d\'un champ de type différent, il a été ignoré.'
% field.label)
continue
self.good_fields[field.varname] = field
for field in good_fields.values():
join = None
join_name = 'join_' + field.varname
field_name = truncate_pg_identifier('field_%s' % field.varname)
dimension_name = field.varname
dimension_label = field.label.lower()
if field.type == 'item':
table_name = self.hash_table_name('%s_field_%s' % (self.table_name, field.varname))
join = {
'name': join_name,
'table': quote(table_name),
'master': quote(field_name),
'detail': 'id',
'kind': 'left',
}
dimension = {
'name': dimension_name,
'label': dimension_label,
'join': [join_name],
'type': 'integer',
'value': '%s.id' % quote(join_name),
'value_label': '%s.label' % quote(join_name),
'filter': True,
}
elif field.type == 'bool':
dimension = {
'name': dimension_name,
'label': dimension_label,
'type': 'bool',
'value': quote(field_name),
'value_label': '(CASE WHEN %(field)s IS NULL THEN NULL'
' WHEN %(field)s THEN \'Oui\''
' ELSE \'Non\' END)' % {
'field': quote(field_name),
},
'filter': True,
}
elif field.type == 'string':
if has_digits_validation(field):
# we will define a SUM measure instead
cube['measures'].append({
'name': 'sum_' + dimension_name,
'label': 'total du champ « %s »' % dimension_label,
'type': 'integer',
'expression': 'SUM({fact_table}.%s)' % quote(field_name),
})
continue
else:
dimension = {
'name': dimension_name,
'label': dimension_label,
'type': 'string',
'value': quote(field_name),
'filter': True,
}
else:
continue
if join:
cube['joins'].append(join)
cube['dimensions'].append(dimension)
self.model['cubes'].append(cube)
if self.do_feed:
try:
self.logger.info('feed formdef %s', self.formdef.slug)
self.do_statuses()
self.do_data_table()
self.do_data()
finally:
self.olap_feeder.ctx.pop()