wcs-olap/wcs_olap/feeder.py

987 lines
38 KiB
Python

# -*- coding: utf-8 -*-
import six
import copy
import itertools
import os
import json
import hashlib
from utils import Whatever
import psycopg2
from cached_property import cached_property
from wcs_olap.wcs_api import WcsApiError
def slugify(s):
return s.replace('-', '_').replace(' ', '_')
class Context(object):
def __init__(self):
self.stack = []
def __getitem__(self, key):
if not self.stack:
raise KeyError(key)
for d in self.stack[::-1]:
try:
return d[key]
except KeyError:
pass
else:
raise KeyError(key)
def push(self, d):
self.stack.append(d)
def pop(self):
self.stack = self.stack[:-1]
def as_dict(self):
r = {}
for d in self.stack:
r.update(d)
return r
class WcsOlapFeeder(object):
channels = [
[1, 'web', u'web'],
[2, 'mail', u'courrier'],
[3, 'phone', u'téléphone'],
[4, 'counter', u'guichet'],
[5, 'backoffice', u'backoffice'],
[6, 'email', u'email'],
[7, 'fax', u'fax'],
]
channel_to_id = dict((c[1], c[0]) for c in channels)
id_to_channel = dict((c[0], c[1]) for c in channels)
status = [
[1, 'Nouveau'],
[2, 'En cours'],
[3, 'Terminé'],
]
status_to_id = dict((c[1], c[0]) for c in channels)
id_to_status = dict((c[0], c[1]) for c in channels)
def __init__(self, api, pg_dsn, schema, logger=None, config=None, do_feed=True, fake=False):
self.api = api
self.fake = fake
self.logger = logger or Whatever()
self.schema = schema
self.do_feed = do_feed
self.ctx = Context()
self.ctx.push({
'schema': self.schema,
'schema_temp': self.schema + '_temp',
'role_table': 'role',
'channel_table': 'channel',
'category_table': 'category',
'form_table': 'formdef',
'generic_formdata_table': 'formdata',
'generic_status_table': 'status',
'generic_evolution_table': 'evolution',
'year_table': 'year',
'month_table': 'month',
'day_table': 'day',
'dow_table': 'dow',
'hour_table': 'hour',
'agent_table': 'agent',
})
self.config = config or {}
self.model = {
'label': self.config.get('cubes_label', schema),
'name': schema,
'search_path': [schema, 'public'],
'pg_dsn': pg_dsn,
'cubes': [],
}
cube = {
'name': 'all_formdata',
'label': u'Tous les formulaires',
'fact_table': 'formdata',
'key': 'id',
'joins': [
{
'name': 'receipt_time',
'table': 'dates',
'detail': 'date',
'master': 'receipt_time',
},
{
'name': 'channel',
'table': 'channel',
'master': 'channel_id',
'detail': 'id',
},
{
'name': 'formdef',
'table': 'formdef',
'master': 'formdef_id',
'detail': 'id',
},
{
'name': 'category',
'table': 'category',
'master': 'formdef.category_id',
'detail': 'id',
'kind': 'left',
},
{
'name': 'hour',
'table': 'hour',
'master': 'hour_id',
'detail': 'id',
},
{
'name': 'generic_status',
'table': 'status',
'master': 'generic_status_id',
'detail': 'id',
},
{
'name': 'agent',
'table': 'agent',
'master': 'first_agent_id',
'detail': 'id',
'kind': 'inner',
},
],
'dimensions': [
{
'name': 'receipt_time',
'label': 'date de la demande',
'join': ['receipt_time'],
'type': 'date',
'value': 'receipt_time.date',
},
{
'name': 'channel',
'label': 'canal',
'join': ['channel'],
'type': 'integer',
'value': 'channel.id',
'value_label': 'channel.label',
},
{
'name': 'category',
'label': 'catégorie',
'join': ['formdef', 'category'],
'type': 'integer',
'value': 'category.id',
'value_label': 'category.label',
'order_by': 'category.label',
},
{
'name': 'formdef',
'label': 'formulaire',
'join': ['formdef'],
'type': 'integer',
'value': 'formdef.id',
'value_label': 'formdef.label',
'order_by': 'formdef.label',
},
{
'name': 'generic_status',
'label': 'statut simplifié',
'join': ['generic_status'],
'type': 'integer',
'value': 'generic_status.id',
'value_label': 'generic_status.label',
},
{
'name': 'hour',
'label': 'heure',
'join': ['hour'],
'type': 'integer',
'value': 'hour.id',
'filter': False,
},
{
'name': 'agent',
'label': 'premier agent traitant',
'join': ['agent'],
'type': 'integer',
'value': 'agent.id',
'value_label': 'agent.label',
'order_by': 'agent.label',
},
],
'measures': [
{
'name': 'count',
'label': 'nombre de demandes',
'type': 'integer',
'expression': 'count({fact_table}.id)',
},
{
'name': 'avg_endpoint_delay',
'label': 'délai de traitement moyen',
'type': 'duration',
'expression': 'avg(endpoint_delay)',
},
{
'name': 'max_endpoint_delay',
'label': 'délai de traitement maximum',
'type': 'duration',
'expression': 'max(endpoint_delay)',
},
{
'name': 'min_endpoint_delay',
'label': 'délai de traitement minimum',
'type': 'duration',
'expression': 'min(endpoint_delay)',
},
{
'name': 'percent',
'label': 'pourcentage des demandes',
'type': 'percent',
"expression": 'case (select count({fact_table}.id) from {table_expression} '
'where {where_conditions}) when 0 then null else '
'count({fact_table}.id) * 100. / (select '
'count({fact_table}.id) from {table_expression} where '
'{where_conditions}) end',
},
{
'name': 'geolocation',
'label': 'localisation géographique',
'type': 'point',
'expression': 'array_agg("{fact_table}".geolocation_base) FILTER (WHERE "{fact_table}".geolocation_base IS NOT NULL)',
}
]
}
self.model['cubes'].append(cube)
self.base_cube = self.model['cubes'][0]
self.agents_mapping = {}
self.formdata_json_index = []
# keep at end of __init__ to prevent leak if __init__ raises
self.connection = psycopg2.connect(dsn=pg_dsn)
self.connection.autocommit = True
self.cur = self.connection.cursor()
psycopg2.extensions.register_type(psycopg2.extensions.UNICODE, self.cur)
try:
self.has_jsonb = self.detect_jsonb()
if self.has_jsonb:
cube['json_field'] = 'json_data'
except Exception:
self.connection.close()
raise
@cached_property
def formdefs(self):
return self.api.formdefs
@cached_property
def roles(self):
return self.api.roles
@cached_property
def categories(self):
return self.api.categories
def detect_jsonb(self):
self.cur.execute("SELECT 1 FROM pg_type WHERE typname = 'jsonb'")
return bool(self.cur.rowcount)
def hash_table_name(self, table_name):
table_name = table_name.format(**self.default_ctx)
if len(table_name) < 64:
return table_name
else:
return table_name[:57] + hashlib.md5(table_name).hexdigest()[:6]
@property
def default_ctx(self):
return self.ctx.as_dict()
def ex(self, query, ctx=None, vars=None):
ctx = ctx or {}
ctx.update(self.default_ctx)
sql = query.format(**(ctx or {}))
self.logger.debug('SQL: %s VARS: %s', sql, vars)
self.cur.execute(sql, vars=vars)
def exmany(self, query, varslist, ctx=None):
ctx = ctx or {}
ctx.update(self.default_ctx)
sql = query.format(**(ctx or {}))
self.logger.debug('SQL: %s VARSLIST: %s', sql, varslist)
self.cur.executemany(sql, varslist)
def do_schema(self):
self.ex('SET search_path = public')
self.logger.debug('dropping schema %s', self.schema + '_temp')
self.ex('DROP SCHEMA IF EXISTS {schema_temp} CASCADE')
self.logger.debug('creating schema %s', self.schema)
self.ex('CREATE SCHEMA {schema_temp}')
self.ex('SET search_path = {schema_temp},public')
def do_dates_table(self):
# test if public.dates exists
self.ex("SELECT * FROM information_schema.tables WHERE table_name = 'dates' AND"
" table_schema = 'public'")
if len(list(self.cur.fetchall())) < 1:
self.ex('''
CREATE TABLE public.dates AS (SELECT
the_date.the_date::date AS date,
to_char(the_date.the_date, 'TMday') AS day,
to_char(the_date.the_date, 'TMmonth') AS month
FROM
generate_series('2010-01-01'::date, '2020-01-01'::date, '1 day'::interval)
AS the_date(the_date));''')
def create_table(self, name, columns, inherits=None, comment=None):
sql = 'CREATE TABLE %s' % name
sql += '(' + ', '.join('%s %s' % (n, t) for n, t in columns) + ')'
if inherits:
sql += ' INHERITS (%s)' % inherits
self.ex(sql)
if comment:
self.ex('COMMENT ON TABLE %s IS %%s' % name, vars=(comment,))
def prev_table_exists(self, name):
query = """SELECT EXISTS (SELECT 1 FROM information_schema.tables
WHERE table_schema = '{schema}' AND table_name = %s)"""
self.ex(query, vars=(name,))
return self.cur.fetchone()[0]
def create_labeled_table_serial(self, name, comment):
self.create_table(
name, [['id', 'serial primary key'], ['label', 'varchar']], comment=comment)
if self.prev_table_exists(name):
# Insert data from previous table
self.ex(
'INSERT INTO {schema_temp}.{name} SELECT * FROM {schema}.{name}',
ctx={'name': name}
)
# Update sequence
self.ex("""SELECT setval(pg_get_serial_sequence('{name}', 'id'),
(SELECT MAX(id) FROM {name}))""", ctx={'name': name})
def create_labeled_table(self, name, labels, comment=None):
self.create_table(
name,
[
['id', 'smallint primary key'],
['label', 'varchar']
], comment=comment)
if self.prev_table_exists(name):
# Insert data from previous table
self.ex(
'INSERT INTO {schema_temp}.{name} select * FROM {schema}.{name}',
ctx={'name': name}
)
# Find what is missing
to_insert = []
for _id, _label in labels:
self.ex(
'SELECT * FROM {name} WHERE label = %s', ctx={'name': name}, vars=(_label,))
if self.cur.fetchone() is None:
to_insert.append(_label)
labels = None
if to_insert:
self.ex('SELECT MAX(id) FROM {name}', ctx={'name': name})
next_id = self.cur.fetchone()[0] + 1
ids = range(next_id, next_id + len(to_insert))
labels = zip(ids, to_insert)
if labels:
labels = list(labels)
tmpl = ', '.join(['(%s, %s)'] * len(labels))
query_str = 'INSERT INTO {name} (id, label) VALUES %s' % tmpl
self.ex(query_str, ctx={'name': name}, vars=list(itertools.chain(*labels)))
res = {}
self.ex("SELECT id, label FROM %s" % str(name))
for id_, label in self.cur.fetchall():
res[label] = id_
return res
def tpl(self, o, ctx=None):
ctx = ctx or {}
ctx.update(self.default_ctx)
def helper(o):
if isinstance(o, six.string_types):
return o.format(**ctx)
elif isinstance(o, dict):
return dict((k, helper(v)) for k, v in o.iteritems())
elif isinstance(o, list):
return [helper(v) for v in o]
elif isinstance(o, (bool, int, float)):
return o
else:
assert False, '%s is not a valid value for JSON' % o
return helper(o)
def add_dim(self, **kwargs):
self.dimensions.append(self.tpl(kwargs))
def do_base_table(self):
# channels
self.create_labeled_table('channel', [[c[0], c[2]] for c in self.channels],
comment=u'canal')
# roles
roles = dict((i, role.name) for i, role in enumerate(self.roles))
tmp_role_map = self.create_labeled_table('role', roles.items(), comment=u'role')
self.role_mapping = dict(
(role.id, tmp_role_map[role.name]) for role in self.roles)
# categories
tmp_cat_map = self.create_labeled_table(
'category', enumerate(c.name for c in self.categories), comment=u'catégorie')
self.categories_mapping = dict((c.id, tmp_cat_map[c.name]) for c in self.categories)
self.create_labeled_table('hour', zip(range(0, 24), map(str, range(0, 24))),
comment=u'heures')
self.create_labeled_table('status', self.status,
comment=u'statuts simplifiés')
self.ex('CREATE TABLE {form_table} (id serial PRIMARY KEY,'
' category_id integer REFERENCES {category_table} (id),'
' label varchar)')
self.ex('COMMENT ON TABLE {form_table} IS %s', vars=(u'types de formulaire',))
# agents
self.create_labeled_table_serial('agent', comment=u'agents')
self.columns = [
['id', 'serial primary key'],
['formdef_id', 'smallint REFERENCES {form_table} (id)'],
['receipt_time', 'date'],
['hour_id', 'smallint REFERENCES {hour_table} (id)'],
['channel_id', 'smallint REFERENCES {channel_table} (id)'],
['backoffice', 'boolean'],
['generic_status_id', 'smallint REFERENCES {generic_status_table} (id)'],
['endpoint_delay', 'interval'],
['first_agent_id', 'smallint REFERENCES {agent_table} (id)'],
['geolocation_base', 'POINT NULL'],
]
if self.has_jsonb:
self.columns.append(['json_data', 'JSONB NULL'])
self.comments = {
'formdef_id': u'formulaire',
'receipt_time': u'date de réception',
'hour_id': u'heure',
'channel_id': u'canal',
'backoffice': u'soumission backoffce',
'generic_status_id': u'statut simplifié',
'endpoint_delay': u'délai de traitement',
'geolocation_base': u'position géographique',
}
self.create_table('{generic_formdata_table}', self.columns)
for at, comment in self.comments.iteritems():
self.ex('COMMENT ON COLUMN {generic_formdata_table}.%s IS %%s' % at, vars=(comment,))
self.ex('COMMENT ON TABLE {generic_formdata_table} IS %s', vars=(u'tous les formulaires',))
# evolutions
self.create_table('{generic_evolution_table}', [
['id', 'serial primary key'],
['generic_status_id', 'smallint REFERENCES {generic_status_table} (id)'],
['formdata_id', 'integer'], # "REFERENCES {generic_formdata_table} (id)" is impossible because FK constraints do not work on inherited tables
['time', 'timestamp'],
['date', 'date'],
['hour_id', 'smallint REFERENCES {hour_table} (id)'],
])
self.ex('COMMENT ON TABLE {generic_evolution_table} IS %s', vars=(u'evolution générique',))
def feed(self):
try:
if self.do_feed:
self.do_schema()
self.do_dates_table()
self.do_base_table()
for formdef in self.formdefs:
self.api.cache = {}
try:
formdef_feeder = WcsFormdefFeeder(self, formdef, do_feed=self.do_feed)
formdef_feeder.feed()
except WcsApiError as e:
self.logger.error(u'failed to retrieve formdef %s, %s', formdef.slug, e)
if 'cubes_model_dirs' in self.config:
model_path = os.path.join(self.config['cubes_model_dirs'], '%s.model' % self.schema)
with open(model_path, 'w') as f:
json.dump(self.model, f, indent=2, sort_keys=True)
except Exception:
# keep temporary schema alive for debugging
raise
else:
if self.do_feed:
if not self.fake:
self.logger.debug('dropping schema %s', self.schema)
self.ex('DROP SCHEMA IF EXISTS {schema} CASCADE')
self.logger.debug('dropping schema %s to %s', self.schema + '_temp', self.schema)
self.ex('ALTER SCHEMA {schema_temp} RENAME TO {schema}')
finally:
# prevent connection from remaining open
self.cur.close()
self.connection.close()
def insert_agent(self, name):
self.ex('SELECT id FROM {agent_table} WHERE label = %s', vars=(name,))
res = self.cur.fetchone()
if res:
return res[0]
self.ex('INSERT INTO {agent_table} (label) VALUES (%s) RETURNING (id)', vars=(name,))
return self.cur.fetchone()[0]
def get_agent(self, user):
assert user.name
assert user.id
if user.id not in self.agents_mapping:
self.agents_mapping[user.id] = self.insert_agent(user.name)
return self.agents_mapping[user.id]
def create_formdata_json_index(self, varname):
if varname in self.formdata_json_index:
return
index_name = self.hash_table_name('{formdata_table}_%s_json_idx' % varname)
self.ex('CREATE INDEX {index_name} ON {generic_formdata_table} (("json_data"->>%s))',
ctx={'index_name': index_name}, vars=[varname])
# prevent double creation
self.formdata_json_index.append(varname)
class WcsFormdefFeeder(object):
def __init__(self, olap_feeder, formdef, do_feed=True):
self.olap_feeder = olap_feeder
self.formdef = formdef
self.status_mapping = {}
self.items_mappings = {}
self.do_feed = do_feed
self.fields = []
self.item_id_mappings = {}
@property
def table_name(self):
return self.hash_table_name('formdata_%s' % self.formdef.slug.replace('-', '_'))
@property
def status_table_name(self):
return self.hash_table_name('status_%s' % self.formdef.slug.replace('-', '_'))
@property
def evolution_table_name(self):
return self.hash_table_name('evolution_%s' % self.formdef.slug.replace('-', '_'))
def __getattr__(self, name):
return getattr(self.olap_feeder, name)
def do_statuses(self):
statuses = self.formdef.schema.workflow.statuses
tmp_status_map = self.olap_feeder.create_labeled_table(
self.status_table_name, enumerate([s.name for s in statuses]),
comment=u'statuts du formulaire « %s »' % self.formdef.schema.name)
self.status_mapping = dict((s.id, tmp_status_map[s.name]) for s in statuses)
def do_data_table(self):
self.ex('INSERT INTO {form_table} (category_id, label) VALUES (%s, %s) RETURNING (id)',
vars=[self.categories_mapping.get(self.formdef.schema.category_id),
self.formdef.schema.name])
self.formdef_sql_id = self.cur.fetchone()[0]
columns = [['status_id', 'smallint REFERENCES {status_table} (id)']]
comments = {}
# compute list of fields
fields = self.formdef.schema.fields
if self.formdef.schema.workflow:
fields += self.formdef.schema.workflow.fields
# add item fields
already_seen_varnames = set()
for field in fields:
if field.anonymise is True:
continue
if not field.varname or '-' in field.varname:
continue
if field.varname in already_seen_varnames:
continue
already_seen_varnames.add(field.varname)
if field.type == 'item':
comment = (u'valeurs du champ « %s » du formulaire %s'
% (field.label, self.formdef.schema.name))
table_name = self.hash_table_name('{formdata_table}_field_%s' % field.varname)
# create table and mapping
if field.items:
self.items_mappings[field.varname] = self.create_labeled_table(
table_name, enumerate(field.items), comment=comment)
else:
# open item field, from data sources...
self.create_labeled_table_serial(table_name, comment=comment)
field_def = 'smallint REFERENCES %s (id)' % table_name
elif field.type == 'bool':
field_def = 'boolean'
elif field.type == 'string':
field_def = 'varchar'
else:
continue
self.fields.append(field)
at = 'field_%s' % field.varname
columns.append([at, field_def])
comments[at] = field.label
# add geolocation fields
for geolocation, label in self.formdef.schema.geolocations:
at = 'geolocation_%s' % geolocation
columns.append([at, 'point'])
comments[at] = label
# add function fields
for function, name in self.formdef.schema.workflow.functions.iteritems():
at = 'function_%s' % slugify(function)
columns.append([at, 'smallint REFERENCES {role_table} (id)'])
comments[at] = u'fonction « %s »' % name
self.columns = ([name for name, _type in self.olap_feeder.columns] + [
name for name, _type in columns])
self.columns.remove('geolocation_base')
self.create_table('{formdata_table}', columns, inherits='{generic_formdata_table}',
comment=u'formulaire %s' % self.formdef.schema.name)
for at, comment in comments.iteritems():
self.ex('COMMENT ON COLUMN {formdata_table}.%s IS %%s' % at, vars=(comment,))
# Creat index for JSON fields
if self.has_jsonb:
for field in fields:
if field.varname and '-' not in field.varname:
self.create_formdata_json_index(field.varname)
# PostgreSQL does not propagate foreign key constraints to child tables
# so we must recreate them manually
for name, _type in self.olap_feeder.columns:
if 'REFERENCES' not in _type:
continue
i = _type.index('REFERENCES')
constraint = '%s_fk_constraint FOREIGN KEY (%s) %s' % (name, name, _type[i:])
self.ex('ALTER TABLE {formdata_table} ADD CONSTRAINT %s' % constraint)
self.ex('ALTER TABLE {formdata_table} ADD PRIMARY KEY (id)')
# table des evolutions
self.create_table('{evolution_table}', [
['id', 'serial primary key'],
['status_id', 'smallint REFERENCES {status_table} (id)'],
['formdata_id', 'integer REFERENCES {formdata_table} (id)'],
['time', 'timestamp'],
['date', 'date'],
['hour_id', 'smallint REFERENCES {hour_table} (id)'],
])
self.ex('COMMENT ON TABLE {evolution_table} IS %s',
vars=(u'evolution des demandes %s' % self.formdef.schema.name,))
def insert_item_value(self, field, value):
table_name = self.hash_table_name('{formdata_table}_field_%s' % field.varname)
self.ex("SELECT id FROM {item_table} WHERE label = %s",
ctx={'item_table': table_name}, vars=(value,))
res = self.cur.fetchone()
if res:
return res[0]
self.ex('INSERT INTO {item_table} (label) VALUES (%s) RETURNING (id)', vars=(value,),
ctx={'item_table': table_name})
return self.cur.fetchone()[0]
def get_item_id(self, field, value):
assert field
assert field.varname
assert value
mapping = self.item_id_mappings.setdefault(field.varname, {})
if value not in mapping:
mapping[value] = self.insert_item_value(field, value)
return mapping[value]
def generic_status(self, status):
if status.endpoint:
generic_status = 3
elif status.startpoint:
generic_status = 1
else:
generic_status = 2
return generic_status
def do_data(self):
values = []
generic_evolution_values = []
evolution_values = []
for data in self.formdef.datas:
json_data = {}
# ignore formdata without status
if data.workflow.status:
status_id = data.workflow.status.id
elif data.evolution:
for evolution in reversed(data.evolution):
if evolution.status:
status_id = evolution.status
break
else:
continue
else:
continue
try:
status = data.formdef.schema.workflow.statuses_map[status_id]
except KeyError:
self.logger.warning('%s.%s unknown status status_id %s',
data.formdef.schema.name, data.id, status_id)
continue
channel = data.submission.channel.lower()
if channel == 'web' and data.submission.backoffice:
channel = 'backoffice'
row = {
'formdef_id': self.formdef_sql_id,
'receipt_time': data.receipt_time,
'hour_id': data.receipt_time.hour,
'channel_id': self.channel_to_id[channel],
'backoffice': data.submission.backoffice,
'generic_status_id': self.generic_status(status),
'status_id': self.status_mapping[status_id],
'endpoint_delay': data.endpoint_delay,
'first_agent_id': self.get_first_agent_in_evolution(data),
}
# add form fields value
for field in self.fields:
v = None
raw = None
if field.varname in data.fields:
raw = data.fields[field.varname]
elif field.varname in data.workflow.fields:
raw = data.workflow.fields[field.varname]
else:
raw = None
if field.type == 'item':
# map items to sql id
if field.items or field.options:
v = self.items_mappings[field.varname].get(raw)
else:
v = raw and self.get_item_id(field, raw)
elif field.type in ('string', 'bool'):
v = raw
# unstructured storage of field values
if field.varname and raw is not None:
json_data[field.varname] = raw
row['field_%s' % field.varname] = v
if self.has_jsonb:
row['json_data'] = json.dumps(json_data)
# add geolocation fields value
for geolocation, label in self.formdef.schema.geolocations:
v = (data.geolocations or {}).get(geolocation)
if v:
v = '(%.6f, %.6f)' % (v.get('lon'), v.get('lat'))
row['geolocation_%s' % geolocation] = v
# add function fields value
for function, name in self.formdef.schema.workflow.functions.iteritems():
try:
v = data.functions[function]
except KeyError:
v = None
else:
v = v and self.olap_feeder.role_mapping[v.id]
at = 'function_%s' % slugify(function)
row[at] = v
tpl = '(' + ', '.join(['%s'] * len(self.columns[1:])) + ')'
value = self.cur.mogrify(tpl, [row[column] for column in self.columns[1:]])
values.append(value)
# inert evolutions
generic_evolution = []
evolution = []
last_status = None
for evo in data.evolution:
if not evo.status:
continue
try:
status = data.formdef.schema.workflow.statuses_map[evo.status]
except KeyError:
self.logger.warning('%s.%s unknown status in evolution %s',
data.formdef.schema.name, data.id, evo.status)
continue
status_id = self.status_mapping[status.id]
generic_status_id = self.generic_status(status)
evolution.append(
[0, status_id, evo.time, evo.time.date(), evo.time.hour])
if generic_status_id == last_status:
continue
generic_evolution.append(
[0, generic_status_id, evo.time, evo.time.date(), evo.time.hour])
last_status = generic_status_id
generic_evolution_values.append(generic_evolution)
evolution_values.append(evolution)
if not values:
self.logger.warning('no data')
return
self.ex('INSERT INTO {formdata_table} ({columns}) VALUES {values} RETURNING id',
ctx=dict(columns=', '.join(self.columns[1:]), values=', '.join(values)))
# insert generic evolutions
generic_evolutions = []
ids = list(self.cur.fetchall())
for evos, (formdata_id,) in zip(generic_evolution_values, ids):
for row in evos:
row[0] = formdata_id
generic_evolutions.append(tuple(row))
if len(generic_evolutions) == 500:
self.ex('INSERT INTO {generic_evolution_table} (%s) VALUES %s' % (
', '.join(['formdata_id', 'generic_status_id', 'time', 'date', 'hour_id']),
', '.join(['%s'] * len(generic_evolutions))), vars=generic_evolutions)
generic_evolutions = []
if generic_evolutions:
self.ex('INSERT INTO {generic_evolution_table} (%s) VALUES %s' % (
', '.join(['formdata_id', 'generic_status_id', 'time', 'date', 'hour_id']),
', '.join(['%s'] * len(generic_evolutions))), vars=generic_evolutions)
# insert evolutions
evolutions = []
for evos, (formdata_id,) in zip(evolution_values, ids):
for row in evos:
row[0] = formdata_id
evolutions.append(tuple(row))
if len(evolutions) == 500:
self.ex('INSERT INTO {evolution_table} (%s) VALUES %s' % (
', '.join(['formdata_id', 'status_id', 'time', 'date', 'hour_id']),
', '.join(['%s'] * len(evolutions))), vars=evolutions)
evolutions = []
if evolutions:
self.ex('INSERT INTO {evolution_table} (%s) VALUES %s' % (
', '.join(['formdata_id', 'status_id', 'time', 'date', 'hour_id']),
', '.join(['%s'] * len(evolutions))), vars=evolutions)
def get_first_agent_in_evolution(self, formdata):
for evo in formdata.evolution:
if evo.who:
return self.get_agent(evo.who)
def feed(self):
self.olap_feeder.ctx.push({
'formdata_table': self.table_name,
'status_table': self.status_table_name,
'evolution_table': self.evolution_table_name
})
# create cube
cube = self.cube = copy.deepcopy(self.base_cube)
# remove json field from formdef cubes
cube.pop('json_field', None)
cube.update({
'name': self.table_name,
'label': self.formdef.schema.name,
'fact_table': self.table_name,
'key': 'id',
})
cube['dimensions'] = [dimension for dimension in cube['dimensions']
if dimension['name'] not in ('category', 'formdef')]
# add dimension for status
cube['joins'].append({
'name': 'status',
'table': self.status_table_name,
'master': 'status_id',
'detail': 'id',
})
cube['dimensions'].append({
'name': 'status',
'label': 'statut',
'join': ['status'],
'type': 'integer',
'value': 'status.id',
'value_label': 'status.label',
})
# add dimension for function
for function, name in self.formdef.schema.workflow.functions.iteritems():
at = 'function_%s' % slugify(function)
cube['joins'].append({
'name': at,
'table': 'role',
'master': at,
'detail': 'id',
})
cube['dimensions'].append({
'name': at,
'label': u'fonction %s' % name.lower(),
'join': [at],
'type': 'integer',
'value': '"%s".id' % at,
'value_label': '"%s".label' % at,
'filter': False,
})
# add dimensions for item fields
fields = self.formdef.schema.fields
if self.formdef.schema.workflow:
fields += self.formdef.schema.workflow.fields
for field in fields:
if field.anonymise is True:
continue
if not field.varname:
continue
if '-' in field.varname:
continue
join = None
if field.type == 'item':
table_name = self.hash_table_name('{formdata_table}_field_%s' % field.varname)
join = {
'name': field.varname,
'table': table_name,
'master': 'field_%s' % field.varname,
'detail': 'id',
}
if not field.required:
join['kind'] = 'full'
dimension = {
'name': field.varname,
'label': field.label.lower(),
'join': [field.varname],
'type': 'integer',
'value': '"%s".id' % field.varname,
'value_label': '"%s".label' % field.varname,
'filter': True,
}
elif field.type == 'bool':
dimension = {
'name': field.varname,
'label': field.label.lower(),
'type': 'bool',
'value': '"field_%s"' % field.varname,
'value_label': '(CASE WHEN "field_%(varname)s" IS NULL THEN NULL'
' WHEN "field_%(varname)s" THEN \'Oui\''
' ELSE \'Non\' END)' % {
'varname': field.varname,
},
'filter': True,
}
elif field.type == 'string':
dimension = {
'name': field.varname,
'label': field.label.lower(),
'type': 'string',
'value': '"field_%s"' % field.varname,
'filter': True,
}
else:
continue
if join:
cube['joins'].append(join)
cube['dimensions'].append(dimension)
self.model['cubes'].append(cube)
if self.do_feed:
try:
self.logger.info('feed formdef %s', self.formdef.slug)
self.do_statuses()
self.do_data_table()
self.do_data()
finally:
self.olap_feeder.ctx.pop()