This repository has been archived on 2023-02-21. You can view files and clone it, but cannot push or open issues or pull requests.
publik-bi/wcs_olap/feeder.py

751 lines
29 KiB
Python

# -*- coding: utf-8 -*-
import copy
import os
import json
import hashlib
from utils import Whatever
import psycopg2
from wcs_olap.wcs_api import WcsApiError
def slugify(s):
return s.replace('-', '_').replace(' ', '_')
class Context(object):
def __init__(self):
self.stack = []
def __getitem__(self, key):
if not self.stack:
raise KeyError(key)
for d in self.stack[::-1]:
try:
return d[key]
except KeyError:
pass
else:
raise KeyError(key)
def push(self, d):
self.stack.append(d)
def pop(self):
self.stack = self.stack[:-1]
def as_dict(self):
r = {}
for d in self.stack:
r.update(d)
return r
class WcsOlapFeeder(object):
def __init__(self, api, pg_dsn, schema, logger=None, config=None):
self.api = api
self.logger = logger or Whatever()
self.schema = schema
self.connection = psycopg2.connect(dsn=pg_dsn)
self.connection.autocommit = True
self.cur = self.connection.cursor()
self.formdefs = api.formdefs
self.roles = api.roles
self.categories = api.categories
self.ctx = Context()
self.ctx.push({
'schema': self.schema,
'role_table': 'role',
'channel_table': 'channel',
'category_table': 'category',
'form_table': 'formdef',
'generic_formdata_table': 'formdata',
'generic_status_table': 'status',
'year_table': 'year',
'month_table': 'month',
'day_table': 'day',
'dow_table': 'dow',
'hour_table': 'hour',
})
self.config = config or {}
self.model = {
'label': self.config.get('cubes_label', schema),
'name': schema,
'browser_options': {
'schema': schema,
},
'joins': [
{
'name': 'receipt_time',
'master': 'receipt_time',
'detail': {
'table': 'dates',
'column': 'date',
'schema': 'public',
},
'method': 'detail',
'alias': 'dates',
},
{
'name': 'channel',
'master': 'channel_id',
'detail': '{channel_table}.id',
'method': 'detail',
},
{
'name': 'role',
'detail': '{role_table}.id',
'method': 'detail',
},
{
'name': 'formdef',
'master': 'formdef_id',
'detail': '{form_table}.id',
'method': 'detail',
},
{
'name': 'category',
'master': '{form_table}.category_id',
'detail': '{category_table}.id',
},
{
'name': 'hour',
'master': 'hour_id',
'detail': '{hour_table}.id',
'method': 'detail',
},
{
'name': 'generic_status',
'master': 'generic_status_id',
'detail': '{generic_status_table}.id',
'method': 'detail',
},
],
'dimensions': [
{
'label': 'date de soumission',
'name': 'receipt_time',
'role': 'time',
'levels': [
{
'name': 'year',
'label': 'année',
'role': 'year',
'order_attribute': 'year',
'order': 'asc',
},
{
'name': 'quarter',
'order_attribute': 'quarter',
'label': 'trimestre',
'role': 'quarter',
},
{
'name': 'month',
'label': 'mois',
'role': 'month',
'attributes': ['month', 'month_name'],
'order_attribute': 'month',
'label_attribute': 'month_name',
'order': 'asc',
},
{
'name': 'week',
'label': 'semaine',
'role': 'week',
},
{
'name': 'day',
'label': 'jour',
'role': 'day',
'order': 'asc',
},
{
'name': 'dow',
'label': 'jour de la semaine',
'attributes': ['dow', 'dow_name'],
'order_attribute': 'dow',
'label_attribute': 'dow_name',
'order': 'asc',
},
],
'hierarchies': [
{
'name': 'default',
'label': 'par défaut',
'levels': ['year', 'month', 'day']
},
{
'name': 'quarterly',
'label': 'par trimestre',
'levels': ['year', 'quarter']
},
{
'name': 'weekly',
'label': 'par semaine',
'levels': ['year', 'week']
},
{
'name': 'dowly',
'label': 'par jour de la semaine',
'levels': ['dow']
},
]
},
{
'label': 'canaux',
'name': 'channels',
},
{
'label': 'catégories',
'name': 'categories',
},
{
'label': 'formulaire',
'name': 'formdef',
},
{
'label': 'statuts génériques',
'name': 'generic_statuses',
},
{
'label': 'heure',
'name': 'hours',
'levels': [
{
'name': 'hours',
'attributes': ['hour_id', 'hour_label'],
'order_attribute': 'hour_id',
'label_attribute': 'hour_label',
}
]
},
],
'mappings': {
'receipt_time.year': {
'table': 'dates',
'column': 'date',
'schema': 'public',
'extract': 'year',
},
'receipt_time.month': {
'table': 'dates',
'column': 'date',
'schema': 'public',
'extract': 'month'
},
'receipt_time.month_name': {
'table': 'dates',
'schema': 'public',
'column': 'month'
},
'receipt_time.week': {
'table': 'dates',
'column': 'date',
'schema': 'public',
'extract': 'week'
},
'receipt_time.day': {
'table': 'dates',
'column': 'date',
'schema': 'public',
'extract': 'day'
},
'receipt_time.dow': {
'table': 'dates',
'column': 'date',
'schema': 'public',
'extract': 'dow'
},
'receipt_time.dow_name': {
'table': 'dates',
'schema': 'public',
'column': 'day',
},
'receipt_time.quarter': {
'table': 'dates',
'column': 'date',
'schema': 'public',
'extract': 'quarter'
},
'formdef': 'formdef.label',
'channels': 'channel.label',
'categories': 'category.label',
'generic_statuses': 'status.label',
'hours.hour_label': '{hour_table}.label',
'hours.hour_id': '{hour_table}.id',
},
'cubes': [
{
'name': schema + '_formdata',
'label': 'Toutes les demandes (%s)' % schema,
'key': 'id',
'fact': 'formdata',
'dimensions': [
'receipt_time',
'hours',
'channels',
'categories',
'formdef',
'generic_statuses',
],
'joins': [
{
'name': 'receipt_time',
},
{
'name': 'hour',
},
{
'name': 'channel',
},
{
'name': 'formdef',
},
{
'name': 'category',
},
{
'name': 'generic_status',
},
],
'measures': [
{
'name': 'endpoint_delay',
'label': 'délai de traitement',
'nonadditive': 'all',
},
],
'aggregates': [
{
'name': 'record_count',
'label': 'nombre de demandes',
'function': 'count'
},
{
'name': 'endpoint_delay_max',
'label': 'délai de traitement maximum',
'measure': 'endpoint_delay',
'function': 'max',
},
{
'name': 'endpoint_delay_avg',
'label': 'délai de traitement moyen',
'measure': 'endpoint_delay',
'function': 'avg',
},
],
},
],
}
# apply table names
self.model = self.tpl(self.model)
self.base_cube = self.model['cubes'][0]
def hash_table_name(self, table_name):
table_name = table_name.format(**self.default_ctx)
if len(table_name) < 64:
return table_name
else:
return table_name[:57] + hashlib.md5(table_name).hexdigest()[:6]
@property
def default_ctx(self):
return self.ctx.as_dict()
def ex(self, query, ctx=None, vars=None):
ctx = ctx or {}
ctx.update(self.default_ctx)
sql = query.format(**(ctx or {}))
self.logger.debug('SQL: %s VARS: %s', sql, vars)
self.cur.execute(sql, vars=vars)
def exmany(self, query, varslist, ctx=None):
ctx = ctx or {}
ctx.update(self.default_ctx)
sql = query.format(**(ctx or {}))
self.logger.debug('SQL: %s VARSLIST: %s', sql, varslist)
self.cur.executemany(sql, varslist)
def do_schema(self):
self.logger.debug('dropping schema %s', self.schema)
self.ex('SET search_path = public')
self.ex('DROP SCHEMA IF EXISTS {schema} CASCADE')
self.logger.debug('creating schema %s', self.schema)
self.ex('CREATE SCHEMA {schema}')
self.ex('SET search_path = {schema},public')
channels = [
[1, 'web', u'web'],
[2, 'mail', u'courrier'],
[3, 'phone', u'téléphone'],
[4, 'counter', u'guichet'],
]
channel_to_id = dict((c[1], c[0]) for c in channels)
id_to_channel = dict((c[0], c[1]) for c in channels)
status = [
[1, 'Nouveau'],
[2, 'En cours'],
[3, 'Terminé'],
]
status_to_id = dict((c[1], c[0]) for c in channels)
id_to_status = dict((c[0], c[1]) for c in channels)
def create_table(self, name, columns, inherits=None, comment=None):
sql = 'CREATE TABLE %s' % name
sql += '(' + ', '.join('%s %s' % (n, t) for n, t in columns) + ')'
if inherits:
sql += ' INHERITS (%s)' % inherits
self.ex(sql)
if comment:
self.ex('COMMENT ON TABLE %s IS %%s' % name, vars=(comment,))
def create_labeled_table(self, name, labels, comment=None):
self.create_table(name,
[
['id', 'smallint primary key'],
['label', 'varchar']
], comment=comment)
values = ', '.join(self.cur.mogrify('(%s, %s)', [_id, _label]) for _id, _label in labels)
if not values:
return
self.ex('INSERT INTO %s (id, label) VALUES %s' % (str(name), values))
def tpl(self, o, ctx=None):
ctx = ctx or {}
ctx.update(self.default_ctx)
def helper(o):
if isinstance(o, basestring):
return o.format(**ctx)
elif isinstance(o, dict):
return dict((k, helper(v)) for k, v in o.iteritems())
elif isinstance(o, list):
return [helper(v) for v in o]
elif isinstance(o, (bool, int, float)):
return o
else:
assert False, '%s is not a valid value for JSON' % o
return helper(o)
def add_dim(self, **kwargs):
self.dimensions.append(self.tpl(kwargs))
def do_base_table(self):
# channels
self.create_labeled_table('{channel_table}', [[c[0], c[2]] for c in self.channels])
# roles
roles = dict((i, role.name) for i, role in enumerate(self.roles))
self.create_labeled_table('{role_table}', roles.items())
self.role_mapping = dict((role.id, i) for i, role in enumerate(self.roles))
# categories
self.create_labeled_table('{category_table}', enumerate(c.name for c in self.categories))
self.categories_mapping = dict((c.id, i) for i, c in enumerate(self.categories))
# years
self.create_labeled_table('{year_table}', zip(range(2000, 2030), map(str, range(2000,
2030))))
# month
self.create_labeled_table('{month_table}', zip(range(1, 13), [u'janvier', u'février',
u'mars', u'avril', u'mai',
u'juin', u'juillet', u'août',
u'septembre', u'octobre',
u'novembre', u'décembre']))
# years
self.create_labeled_table('{day_table}', zip(range(1, 32), map(str, range(1, 32))))
# day of week
self.create_labeled_table('{dow_table}', enumerate([u'lundi', u'mardi', u'mercredi',
u'jeudi', u'vendredi', u'samedi',
u'dimanche']))
self.create_labeled_table('{hour_table}', zip(range(0, 24), map(str, range(0, 24))))
self.create_labeled_table('{generic_status_table}', self.status)
self.ex('CREATE TABLE {form_table} (id serial PRIMARY KEY,'
' category_id integer REFERENCES {category_table} (id),'
' label varchar)')
self.columns = [
['id', 'serial primary key'],
['formdef_id', 'smallint REFERENCES {form_table} (id)'],
['receipt_time', 'date'],
['year_id', 'smallint REFERENCES {year_table} (id)'],
['month_id', 'smallint REFERENCES {month_table} (id)'],
['hour_id', 'smallint REFERENCES {hour_table} (id)'],
['day_id', 'smallint REFERENCES {day_table} (id)'],
['dow_id', 'smallint REFERENCES {dow_table} (id)'],
['channel_id', 'smallint REFERENCES {channel_table} (id)'],
['backoffice', 'boolean'],
['generic_status_id', 'smallint REFERENCES {generic_status_table} (id)'],
['endpoint_delay', 'real'],
]
self.comments = {
'formdef_id': u'dim$formulaire',
'receipt_time': u'time$date de réception',
'year_id': u'dim$année',
'month_id': u'dim$mois',
'hour_id': u'dim$heure',
'day_id': u'dim$jour',
'dow_id': u'dim$jour de la semaine',
'channel_id': u'dim$canal',
'backoffice': u'dim$soumission backoffce',
'generic_status_id': u'dim$statut générique',
'endpoint_delay': u'measure$délai de traitement',
}
self.create_table('{generic_formdata_table}', self.columns)
for at, comment in self.comments.iteritems():
self.ex('COMMENT ON COLUMN {generic_formdata_table}.%s IS %%s' % at, vars=(comment,))
def feed(self):
self.do_schema()
self.do_base_table()
for formdef in self.formdefs:
try:
formdef_feeder = WcsFormdefFeeder(self, formdef)
formdef_feeder.feed()
except WcsApiError, e:
# ignore authorization errors
if (len(e.args) > 2 and hasattr(e.args[2], 'response')
and e.args[2].response.status_code == 403):
continue
self.logger.error('failed to retrieve formdef %s', formdef.slug)
class WcsFormdefFeeder(object):
def __init__(self, olap_feeder, formdef):
self.olap_feeder = olap_feeder
self.formdef = formdef
self.status_mapping = {}
self.items_mappings = {}
self.fields = []
@property
def table_name(self):
return self.hash_table_name('formdata_%s' % self.formdef.slug.replace('-', '_'))
@property
def status_table_name(self):
return self.hash_table_name('status_%s' % self.formdef.slug.replace('-', '_'))
def __getattr__(self, name):
return getattr(self.olap_feeder, name)
def do_statuses(self):
statuses = self.formdef.schema.workflow.statuses
self.olap_feeder.create_labeled_table(self.status_table_name,
enumerate([s.name for s in statuses]))
self.status_mapping = dict((s.id, i) for i, s in enumerate(statuses))
def do_data_table(self):
self.ex('INSERT INTO {form_table} (category_id, label) VALUES (%s, %s) RETURNING (id)',
vars=[self.categories_mapping.get(self.formdef.schema.category_id),
self.formdef.schema.name])
self.formdef_sql_id = self.cur.fetchone()[0]
columns = [['status_id', 'smallint REFERENCES {status_table} (id)']]
comments = {}
# add item fields
for field in self.formdef.schema.fields:
if not field.items and not field.options:
continue
if not field.varname:
continue
self.fields.append(field)
comment = (u'dim$%s$Valeur du champ « %s » du formulaire %s'
% (field.label, field.label, self.formdef.schema.name))
table_name = self.hash_table_name('{formdata_table}_field_%s' % field.varname)
# create table and mapping
if field.items:
self.create_labeled_table(table_name, enumerate(field.items),
comment=comment)
self.items_mappings[field.varname] = dict(
(item, i) for i, item in enumerate(field.items))
elif field.options:
options = enumerate(field.options)
for option in field.options:
self.create_labeled_table(table_name,
[(i, o['label']) for i, o in options],
comment=comment)
self.items_mappings[field.varname] = dict((o['value'], i) for i, o in options)
at = 'field_%s' % field.varname
columns.append([at, 'smallint REFERENCES %s (id)' % table_name])
comments[at] = u'dim$' + field.label
# add geolocation fields
for geolocation, label in self.formdef.schema.geolocations:
at = 'geolocation_%s' % geolocation
columns.append([at, 'point'])
comments[at] = u'dim$' + label
# add function fields
for function, name in self.formdef.schema.workflow.functions.iteritems():
at = 'function_%s' % slugify(function)
columns.append([at, 'smallint REFERENCES {role_table} (id)'])
comments[at] = u'dim$Fonction « %s »' % name
self.columns = ([name for name, _type in self.olap_feeder.columns]
+ [name for name, _type in columns])
self.create_table('{formdata_table}', columns, inherits='{generic_formdata_table}',
comment=u'cube$%s' % self.formdef.schema.name)
for at, comment in comments.iteritems():
self.ex('COMMENT ON COLUMN {formdata_table}.%s IS %%s' % at, vars=(comment,))
# PostgreSQL does not propagate foreign key constraints to child tables
# so we must recreate them manually
for name, _type in self.olap_feeder.columns:
if 'REFERENCES' not in _type:
continue
i = _type.index('REFERENCES')
constraint = '%s_fk_constraint FOREIGN KEY (%s) %s' % (name, name, _type[i:])
self.ex('ALTER TABLE {formdata_table} ADD CONSTRAINT %s' % constraint)
def do_data(self):
values = []
for data in self.formdef.datas:
# ignore formdata without status
if not data.workflow.status:
continue
status = data.formdef.schema.workflow.statuses_map[data.workflow.status.id]
if data.endpoint_delay:
endpoint_delay = (data.endpoint_delay.days + float(data.endpoint_delay.seconds) /
86400.)
else:
endpoint_delay = None
row = {
'formdef_id': self.formdef_sql_id,
'receipt_time': data.receipt_time,
'year_id': data.receipt_time.year,
'month_id': data.receipt_time.month,
'day_id': data.receipt_time.day,
'hour_id': data.receipt_time.hour,
'dow_id': data.receipt_time.weekday(),
'channel_id': self.channel_to_id[data.submission.channel.lower()],
'backoffice': data.submission.backoffice,
# FIXME "En cours"/2 is never used
'generic_status_id': 3 if status.endpoint else 1,
'status_id': self.status_mapping[data.workflow.status.id],
'endpoint_delay': endpoint_delay,
}
# add form fields value
for field in self.fields:
v = None
if field.type == 'item':
# map items to sql id
v = self.items_mappings[field.varname].get(data.fields.get(field.varname))
row['field_%s' % field.varname] = v
# add geolocation fields value
for geolocation, label in self.formdef.schema.geolocations:
v = (data.geolocations or {}).get(geolocation)
row['geolocation_%s' % geolocation] = v
# add function fields value
for function, name in self.formdef.schema.workflow.functions.iteritems():
try:
v = data.functions[function]
except KeyError:
v = None
else:
v = v and self.olap_feeder.role_mapping[v.id]
at = 'function_%s' % slugify(function)
row[at] = v
tpl = '(' + ', '.join(['%s'] * len(self.columns[1:])) + ')'
value = self.cur.mogrify(tpl, [row[column] for column in self.columns[1:]])
values.append(value)
if not values:
self.logger.warning('no data')
return
self.ex('INSERT INTO {formdata_table} (%s) VALUES %s' % (
', '.join(self.columns[1:]), # skip the id column
', '.join(values)))
def feed(self):
self.olap_feeder.ctx.push({
'formdata_table': self.table_name,
'status_table': self.status_table_name,
})
# create cube
self.cube = copy.deepcopy(self.base_cube)
self.cube.update({
'name': self.schema + '_' + self.table_name,
'label': self.formdef.schema.name,
'fact': self.table_name,
})
# add dimension for status
self.cube['joins'].append({
'master': 'status_id',
'detail': '%s.id' % self.status_table_name,
'method': 'detail',
})
dim_name = '%s_%s' % (self.table_name, 'status')
self.model['dimensions'].append({
'name': dim_name,
'label': 'statut',
'levels': [
{
'name': 'status',
'attributes': ['status_id', 'status_label'],
'order_attribute': 'status_id',
'label_attribute': 'status_label',
},
],
})
self.model['mappings']['%s.status_id' % dim_name] = '%s.id' % self.status_table_name
self.model['mappings']['%s.status_label' % dim_name] = '%s.label' % self.status_table_name
self.cube['dimensions'].append(dim_name)
# add dimension for function
for function, name in self.formdef.schema.workflow.functions.iteritems():
at = 'function_%s' % slugify(function)
dim_name = '%s_function_%s' % (self.table_name, slugify(function))
self.cube['joins'].append({
'master': at,
'detail': self.tpl('{role_table}.id'),
'alias': at,
})
self.model['dimensions'].append({
'name': dim_name,
'label': u'fonction %s' % name,
})
self.model['mappings'][dim_name] = '%s.label' % at
self.cube['dimensions'].append(dim_name)
# add dimensions for item fields
for field in self.fields:
if field.type != 'item':
continue
table_name = self.hash_table_name('{formdata_table}_field_%s' % field.varname)
self.cube['joins'].append({
'master': 'field_%s' % field.varname,
'detail': '%s.id' % table_name,
'method': 'detail',
})
dim_name = '%s_%s' % (self.table_name. field.varname)
self.model['dimensions'].append({
'name': dim_name,
'label': field.label,
})
self.model['mappings'][dim_name] = '%s.label' % table_name
self.cube['dimensions'].append(dim_name)
self.model['cubes'].append(self.cube)
try:
self.logger.info('feed formdef %s', self.formdef.slug)
self.do_statuses()
self.do_data_table()
self.do_data()
finally:
self.olap_feeder.ctx.pop()
if 'cubes_model_dirs' in self.config:
model_path = os.path.join(self.config['cubes_model_dirs'], '%s.json' % self.schema)
with open(model_path, 'w') as f:
json.dump(self.model, f, indent=2, sort_keys=True)