This repository has been archived on 2023-02-21. You can view files and clone it, but cannot push or open issues or pull requests.
publik-bi/wcs_olap/feeder.py

610 lines
23 KiB
Python

# -*- coding: utf-8 -*-
import copy
import os
import json
import hashlib
from utils import Whatever
import psycopg2
from wcs_olap.wcs_api import WcsApiError
def slugify(s):
return s.replace('-', '_').replace(' ', '_')
class Context(object):
def __init__(self):
self.stack = []
def __getitem__(self, key):
if not self.stack:
raise KeyError(key)
for d in self.stack[::-1]:
try:
return d[key]
except KeyError:
pass
else:
raise KeyError(key)
def push(self, d):
self.stack.append(d)
def pop(self):
self.stack = self.stack[:-1]
def as_dict(self):
r = {}
for d in self.stack:
r.update(d)
return r
class WcsOlapFeeder(object):
def __init__(self, api, pg_dsn, schema, logger=None, config=None, do_feed=True):
self.api = api
self.logger = logger or Whatever()
self.schema = schema
self.connection = psycopg2.connect(dsn=pg_dsn)
self.connection.autocommit = True
self.cur = self.connection.cursor()
self.formdefs = api.formdefs
self.roles = api.roles
self.categories = api.categories
self.do_feed = do_feed
self.ctx = Context()
self.ctx.push({
'schema': self.schema,
'role_table': 'role',
'channel_table': 'channel',
'category_table': 'category',
'form_table': 'formdef',
'generic_formdata_table': 'formdata',
'generic_status_table': 'status',
'year_table': 'year',
'month_table': 'month',
'day_table': 'day',
'dow_table': 'dow',
'hour_table': 'hour',
})
self.config = config or {}
self.model = {
'label': self.config.get('cubes_label', schema),
'name': schema,
'search_path': [schema, 'public'],
'pg_dsn': pg_dsn,
'cubes': [],
}
cube = {
'name': 'all_formdata',
'label': u'Tous les formulaires',
'fact_table': 'formdata',
'key': 'id',
'joins': [
{
'name': 'receipt_time',
'table': 'dates',
'detail': 'date',
'master': 'receipt_time',
},
{
'name': 'channel',
'table': 'channel',
'master': 'channel_id',
'detail': 'id',
},
{
'name': 'formdef',
'table': 'formdef',
'master': 'formdef_id',
'detail': 'id',
},
{
'name': 'category',
'table': 'category',
'master': 'formdef.category_id',
'detail': 'id',
'kind': 'left',
},
{
'name': 'hour',
'table': 'hour',
'master': 'hour_id',
'detail': 'id',
},
{
'name': 'generic_status',
'table': 'status',
'master': 'generic_status_id',
'detail': 'id',
},
],
'dimensions': [
{
'name': 'receipt_time',
'label': 'date de la demande',
'join': ['receipt_time'],
'type': 'date',
'value': 'receipt_time.date',
},
{
'name': 'channel',
'label': 'canal',
'join': ['channel'],
'type': 'integer',
'value': 'channel.id',
'value_label': 'channel.label',
},
{
'name': 'category',
'label': 'catégorie',
'join': ['formdef', 'category'],
'type': 'integer',
'value': 'category.id',
'value_label': 'category.label',
},
{
'name': 'formdef',
'label': 'formulaire',
'join': ['formdef'],
'type': 'integer',
'value': 'formdef.id',
'value_label': 'formdef.label',
},
{
'name': 'generic_status',
'label': 'statut simplifié',
'join': ['generic_status'],
'type': 'integer',
'value': 'generic_status.id',
'value_label': 'generic_status.label',
},
{
'name': 'hour',
'label': 'heure',
'join': ['hour'],
'type': 'integer',
'value': 'hour.id',
'filter': False,
},
],
'measures': [
{
'name': 'count',
'label': 'nombre de demandes',
'type': 'integer',
'expression': 'count({fact_table}.id)',
},
{
'name': 'avg_endpoint_delay',
'label': 'délai de traitement moyen',
'type': 'duration',
'expression': 'avg(endpoint_delay)',
},
{
'name': 'max_endpoint_delay',
'label': 'délai de traitement maximum',
'type': 'duration',
'expression': 'max(endpoint_delay)',
},
{
'name': 'min_endpoint_delay',
'label': 'délai de traitement minimum',
'type': 'duration',
'expression': 'min(endpoint_delay)',
},
{
'name': 'percent',
'label': 'pourcentage des demandes',
'type': 'percent',
"expression": 'count({fact_table}.id) * 100. '
'/ (select count({fact_table}.id) from {table_expression} '
'where {where_conditions})',
}
]
}
self.model['cubes'].append(cube)
self.base_cube = self.model['cubes'][0]
def hash_table_name(self, table_name):
table_name = table_name.format(**self.default_ctx)
if len(table_name) < 64:
return table_name
else:
return table_name[:57] + hashlib.md5(table_name).hexdigest()[:6]
@property
def default_ctx(self):
return self.ctx.as_dict()
def ex(self, query, ctx=None, vars=None):
ctx = ctx or {}
ctx.update(self.default_ctx)
sql = query.format(**(ctx or {}))
self.logger.debug('SQL: %s VARS: %s', sql, vars)
self.cur.execute(sql, vars=vars)
def exmany(self, query, varslist, ctx=None):
ctx = ctx or {}
ctx.update(self.default_ctx)
sql = query.format(**(ctx or {}))
self.logger.debug('SQL: %s VARSLIST: %s', sql, varslist)
self.cur.executemany(sql, varslist)
def do_schema(self):
self.logger.debug('dropping schema %s', self.schema)
self.ex('SET search_path = public')
self.ex('DROP SCHEMA IF EXISTS {schema} CASCADE')
self.logger.debug('creating schema %s', self.schema)
self.ex('CREATE SCHEMA {schema}')
self.ex('SET search_path = {schema},public')
channels = [
[1, 'web', u'web'],
[2, 'mail', u'courrier'],
[3, 'phone', u'téléphone'],
[4, 'counter', u'guichet'],
[5, 'backoffice', u'backoffice'],
]
channel_to_id = dict((c[1], c[0]) for c in channels)
id_to_channel = dict((c[0], c[1]) for c in channels)
status = [
[1, 'Nouveau'],
[2, 'En cours'],
[3, 'Terminé'],
]
status_to_id = dict((c[1], c[0]) for c in channels)
id_to_status = dict((c[0], c[1]) for c in channels)
def create_table(self, name, columns, inherits=None, comment=None):
sql = 'CREATE TABLE %s' % name
sql += '(' + ', '.join('%s %s' % (n, t) for n, t in columns) + ')'
if inherits:
sql += ' INHERITS (%s)' % inherits
self.ex(sql)
if comment:
self.ex('COMMENT ON TABLE %s IS %%s' % name, vars=(comment,))
def create_labeled_table(self, name, labels, comment=None):
self.create_table(name,
[
['id', 'smallint primary key'],
['label', 'varchar']
], comment=comment)
values = ', '.join(self.cur.mogrify('(%s, %s)', [_id, _label]) for _id, _label in labels)
if not values:
return
self.ex('INSERT INTO %s (id, label) VALUES %s' % (str(name), values))
def tpl(self, o, ctx=None):
ctx = ctx or {}
ctx.update(self.default_ctx)
def helper(o):
if isinstance(o, basestring):
return o.format(**ctx)
elif isinstance(o, dict):
return dict((k, helper(v)) for k, v in o.iteritems())
elif isinstance(o, list):
return [helper(v) for v in o]
elif isinstance(o, (bool, int, float)):
return o
else:
assert False, '%s is not a valid value for JSON' % o
return helper(o)
def add_dim(self, **kwargs):
self.dimensions.append(self.tpl(kwargs))
def do_base_table(self):
# channels
self.create_labeled_table('{channel_table}', [[c[0], c[2]] for c in self.channels],
comment=u'canal')
# roles
roles = dict((i, role.name) for i, role in enumerate(self.roles))
self.create_labeled_table('{role_table}', roles.items(), comment=u'role')
self.role_mapping = dict((role.id, i) for i, role in enumerate(self.roles))
# categories
self.create_labeled_table('{category_table}', enumerate(c.name for c in self.categories),
comment=u'catégorie')
self.categories_mapping = dict((c.id, i) for i, c in enumerate(self.categories))
self.create_labeled_table('{hour_table}', zip(range(0, 24), map(str, range(0, 24))),
comment=u'heures')
self.create_labeled_table('{generic_status_table}', self.status,
comment=u'statuts simplifiés')
self.ex('CREATE TABLE {form_table} (id serial PRIMARY KEY,'
' category_id integer REFERENCES {category_table} (id),'
' label varchar)')
self.ex('COMMENT ON TABLE {form_table} IS %s', vars=(u'types de formulaire',))
self.columns = [
['id', 'serial primary key'],
['formdef_id', 'smallint REFERENCES {form_table} (id)'],
['receipt_time', 'date'],
['hour_id', 'smallint REFERENCES {hour_table} (id)'],
['channel_id', 'smallint REFERENCES {channel_table} (id)'],
['backoffice', 'boolean'],
['generic_status_id', 'smallint REFERENCES {generic_status_table} (id)'],
['endpoint_delay', 'interval'],
]
self.comments = {
'formdef_id': u'formulaire',
'receipt_time': u'date de réception',
'hour_id': u'heure',
'channel_id': u'canal',
'backoffice': u'soumission backoffce',
'generic_status_id': u'statut simplifié',
'endpoint_delay': u'délai de traitement',
}
self.create_table('{generic_formdata_table}', self.columns)
for at, comment in self.comments.iteritems():
self.ex('COMMENT ON COLUMN {generic_formdata_table}.%s IS %%s' % at, vars=(comment,))
self.ex('COMMENT ON TABLE {generic_formdata_table} IS %s', vars=(u'tous les formulaires',))
def feed(self):
if self.do_feed:
self.do_schema()
self.do_base_table()
for formdef in self.formdefs:
try:
formdef_feeder = WcsFormdefFeeder(self, formdef, do_feed=self.do_feed)
formdef_feeder.feed()
except WcsApiError, e:
# ignore authorization errors
if (len(e.args) > 2 and hasattr(e.args[2], 'response')
and e.args[2].response.status_code == 403):
continue
self.logger.error('failed to retrieve formdef %s', formdef.slug)
if 'cubes_model_dirs' in self.config:
model_path = os.path.join(self.config['cubes_model_dirs'], '%s.model' % self.schema)
with open(model_path, 'w') as f:
json.dump(self.model, f, indent=2, sort_keys=True)
class WcsFormdefFeeder(object):
def __init__(self, olap_feeder, formdef, do_feed=True):
self.olap_feeder = olap_feeder
self.formdef = formdef
self.status_mapping = {}
self.items_mappings = {}
self.do_feed = do_feed
self.fields = []
@property
def table_name(self):
return self.hash_table_name('formdata_%s' % self.formdef.slug.replace('-', '_'))
@property
def status_table_name(self):
return self.hash_table_name('status_%s' % self.formdef.slug.replace('-', '_'))
def __getattr__(self, name):
return getattr(self.olap_feeder, name)
def do_statuses(self):
statuses = self.formdef.schema.workflow.statuses
self.olap_feeder.create_labeled_table(self.status_table_name,
enumerate([s.name for s in statuses]),
comment=u'statuts du formulaire « %s »' %
self.formdef.schema.name)
self.status_mapping = dict((s.id, i) for i, s in enumerate(statuses))
def do_data_table(self):
self.ex('INSERT INTO {form_table} (category_id, label) VALUES (%s, %s) RETURNING (id)',
vars=[self.categories_mapping.get(self.formdef.schema.category_id),
self.formdef.schema.name])
self.formdef_sql_id = self.cur.fetchone()[0]
columns = [['status_id', 'smallint REFERENCES {status_table} (id)']]
comments = {}
# add item fields
for field in self.formdef.schema.fields:
if not field.items and not field.options:
continue
if not field.varname:
continue
self.fields.append(field)
comment = (u'valeurs du champ « %s » du formulaire %s'
% (field.label, self.formdef.schema.name))
table_name = self.hash_table_name('{formdata_table}_field_%s' % field.varname)
# create table and mapping
if field.items:
self.create_labeled_table(table_name, enumerate(field.items),
comment=comment)
self.items_mappings[field.varname] = dict(
(item, i) for i, item in enumerate(field.items))
elif field.options:
options = enumerate(field.options)
for option in field.options:
self.create_labeled_table(table_name,
[(i, o['label']) for i, o in options],
comment=comment)
self.items_mappings[field.varname] = dict((o['value'], i) for i, o in options)
at = 'field_%s' % field.varname
columns.append([at, 'smallint REFERENCES %s (id)' % table_name])
comments[at] = field.label
# add geolocation fields
for geolocation, label in self.formdef.schema.geolocations:
at = 'geolocation_%s' % geolocation
columns.append([at, 'point'])
comments[at] = label
# add function fields
for function, name in self.formdef.schema.workflow.functions.iteritems():
at = 'function_%s' % slugify(function)
columns.append([at, 'smallint REFERENCES {role_table} (id)'])
comments[at] = u'fonction « %s »' % name
self.columns = ([name for name, _type in self.olap_feeder.columns]
+ [name for name, _type in columns])
self.create_table('{formdata_table}', columns, inherits='{generic_formdata_table}',
comment=u'formulaire %s' % self.formdef.schema.name)
for at, comment in comments.iteritems():
self.ex('COMMENT ON COLUMN {formdata_table}.%s IS %%s' % at, vars=(comment,))
# PostgreSQL does not propagate foreign key constraints to child tables
# so we must recreate them manually
for name, _type in self.olap_feeder.columns:
if 'REFERENCES' not in _type:
continue
i = _type.index('REFERENCES')
constraint = '%s_fk_constraint FOREIGN KEY (%s) %s' % (name, name, _type[i:])
self.ex('ALTER TABLE {formdata_table} ADD CONSTRAINT %s' % constraint)
def do_data(self):
values = []
for data in self.formdef.datas:
# ignore formdata without status
if not data.workflow.status:
continue
status = data.formdef.schema.workflow.statuses_map[data.workflow.status.id]
channel = data.submission.channel.lower()
if channel == 'web' and data.submission.backoffice:
channel = 'backoffice'
# Simplify status
if status.endpoint:
generic_status = 3
elif status.startpoint:
generic_status = 1
else:
generic_status = 2
row = {
'formdef_id': self.formdef_sql_id,
'receipt_time': data.receipt_time,
'hour_id': data.receipt_time.hour,
'channel_id': self.channel_to_id[channel],
'backoffice': data.submission.backoffice,
# FIXME "En cours"/2 is never used
'generic_status_id': 3 if status.endpoint else 1,
'status_id': self.status_mapping[data.workflow.status.id],
'endpoint_delay': data.endpoint_delay,
}
# add form fields value
for field in self.fields:
v = None
if field.type == 'item':
# map items to sql id
v = self.items_mappings[field.varname].get(data.fields.get(field.varname))
row['field_%s' % field.varname] = v
# add geolocation fields value
for geolocation, label in self.formdef.schema.geolocations:
v = (data.geolocations or {}).get(geolocation)
row['geolocation_%s' % geolocation] = v
# add function fields value
for function, name in self.formdef.schema.workflow.functions.iteritems():
try:
v = data.functions[function]
except KeyError:
v = None
else:
v = v and self.olap_feeder.role_mapping[v.id]
at = 'function_%s' % slugify(function)
row[at] = v
tpl = '(' + ', '.join(['%s'] * len(self.columns[1:])) + ')'
value = self.cur.mogrify(tpl, [row[column] for column in self.columns[1:]])
values.append(value)
if not values:
self.logger.warning('no data')
return
self.ex('INSERT INTO {formdata_table} (%s) VALUES %s' % (
', '.join(self.columns[1:]), # skip the id column
', '.join(values)))
def feed(self):
self.olap_feeder.ctx.push({
'formdata_table': self.table_name,
'status_table': self.status_table_name,
})
# create cube
cube = self.cube = copy.deepcopy(self.base_cube)
cube.update({
'name': self.table_name,
'label': self.formdef.schema.name,
'fact_table': self.table_name,
'key': 'id',
})
cube['dimensions'] = [dimension for dimension in cube['dimensions']
if dimension['name'] not in ('category', 'formdef')]
# add dimension for status
cube['joins'].append({
'name': 'status',
'table': self.status_table_name,
'master': 'status_id',
'detail': 'id',
})
cube['dimensions'].append({
'name': 'status',
'label': 'statut',
'join': ['status'],
'type': 'integer',
'value': 'status.id',
'value_label': 'status.label',
})
# add dimension for function
for function, name in self.formdef.schema.workflow.functions.iteritems():
at = 'function_%s' % slugify(function)
cube['joins'].append({
'name': at,
'table': 'role',
'master': at,
'detail': 'id',
})
cube['dimensions'].append({
'name': at,
'label': u'fonction %s' % name.lower(),
'join': [at],
'type': 'integer',
'value': '%s.id' % at,
'value_label': '%s.label' % at,
'filter': False,
})
# add dimensions for item fields
for field in self.formdef.schema.fields:
if not field.items and not field.options:
continue
if not field.varname:
continue
table_name = self.hash_table_name('{formdata_table}_field_%s' % field.varname)
cube['joins'].append({
'name': field.varname,
'table': table_name,
'master': 'field_%s' % field.varname,
'detail': 'id',
})
cube['dimensions'].append({
'name': field.varname,
'label': field.label.lower(),
'join': [field.varname],
'type': 'integer',
'value': '%s.id' % field.varname,
'value_label': '%s.label' % field.varname,
'filter': field.in_filters,
})
self.model['cubes'].append(cube)
if self.do_feed:
try:
self.logger.info('feed formdef %s', self.formdef.slug)
self.do_statuses()
self.do_data_table()
self.do_data()
finally:
self.olap_feeder.ctx.pop()