# -*- coding: utf-8 -*- from __future__ import unicode_literals from collections import OrderedDict import contextlib import datetime import copy import itertools import os import json import hashlib from .utils import Whatever import psycopg2 import psycopg2.errorcodes from cached_property import cached_property from wcs_olap.wcs_api import WcsApiError psycopg2.extensions.register_type(psycopg2.extensions.UNICODE) psycopg2.extensions.register_type(psycopg2.extensions.UNICODEARRAY) def truncate_pg_identifier(identifier, hash_length=6, force_hash=False): if len(identifier) < 64 and not force_hash: return identifier else: # insert hash in the middle, to keep some readability return ( identifier[:(63 - hash_length) // 2] + hashlib.md5(identifier.encode('utf-8')).hexdigest()[:hash_length] + identifier[-(63 - hash_length) // 2:]) @contextlib.contextmanager def ignore_undefined_object_or_table(): try: yield except psycopg2.ProgrammingError as e: if e.pgcode not in [psycopg2.errorcodes.UNDEFINED_TABLE, psycopg2.errorcodes.UNDEFINED_OBJECT]: raise def quote(name): return '"%s"' % name def slugify(s): return s.replace('-', '_').replace(' ', '_') class Context(object): def __init__(self): self.stack = [] def push(self, d): self.stack.append(d) def pop(self): self.stack = self.stack[:-1] def as_dict(self): r = {} for d in self.stack: r.update(d) return r class WcsOlapFeeder(object): channels = [ [1, 'web', 'web'], [2, 'mail', 'courrier'], [3, 'phone', 'téléphone'], [4, 'counter', 'guichet'], [5, 'backoffice', 'backoffice'], [6, 'email', 'email'], [7, 'fax', 'fax'], [8, 'social-network', 'réseau social'], ] channel_to_id = dict((c[1], c[0]) for c in channels) id_to_channel = dict((c[0], c[1]) for c in channels) status = [ [1, 'Nouveau'], [2, 'En cours'], [3, 'Terminé'], ] status_to_id = dict((c[1], c[0]) for c in channels) id_to_status = dict((c[0], c[1]) for c in channels) def __init__(self, api, pg_dsn, schema, logger=None, config=None, do_feed=True, fake=False, slugs=None): self.api = api self.slugs = slugs self.fake = fake self.logger = logger or Whatever() if len(schema) > 63: raise ValueError('schema name length must < 64 characters: %r' % schema) self.schema = schema self.schema_temp = truncate_pg_identifier(schema + '_temp') self.do_feed = do_feed self.ctx = Context() self.ctx.push({ 'schema': self.schema, 'schema_temp': self.schema_temp, 'role_table': 'role', 'channel_table': 'channel', 'category_table': 'category', 'form_table': 'formdef', 'generic_formdata_table': 'formdata', 'generic_status_table': 'status', 'generic_evolution_table': 'evolution', 'year_table': 'year', 'month_table': 'month', 'day_table': 'day', 'dow_table': 'dow', 'hour_table': 'hour', 'agent_table': 'agent', }) self.config = config or {} self.model = { 'label': self.config.get('cubes_label', schema), 'name': schema, 'search_path': [schema, 'public'], 'pg_dsn': pg_dsn, 'cubes': [], } if 'cubes_slug' in self.config: self.model['slug'] = self.config['cubes_slug'] cube = { 'name': 'all_formdata', 'label': 'Tous les formulaires', 'fact_table': 'formdata', 'key': 'id', 'joins': [ { 'name': 'receipt_time', 'table': 'dates', 'detail': 'date', 'master': 'receipt_time', 'kind': 'right', }, { 'name': 'channel', 'table': 'channel', 'master': 'channel_id', 'detail': 'id', 'kind': 'left', }, { 'name': 'formdef', 'table': 'formdef', 'master': 'formdef_id', 'detail': 'id', 'kind': 'left', }, { 'name': 'category', 'table': 'category', 'master': 'formdef.category_id', 'detail': 'id', 'kind': 'left', }, { 'name': 'hour', 'table': 'hour', 'master': 'hour_id', 'detail': 'id', 'kind': 'right', }, { 'name': 'generic_status', 'table': 'status', 'master': 'generic_status_id', 'detail': 'id', 'kind': 'left', }, { 'name': 'agent', 'table': 'agent', 'master': 'first_agent_id', 'detail': 'id', 'kind': 'left', }, ], 'dimensions': [ { 'name': 'receipt_time', 'label': 'date de la demande', 'join': ['receipt_time'], 'type': 'date', 'value': 'receipt_time.date', }, { 'name': 'channel', 'label': 'canal', 'join': ['channel'], 'type': 'integer', 'value': 'channel.id', 'value_label': 'channel.label', }, { 'name': 'category', 'label': 'catégorie', 'join': ['formdef', 'category'], 'type': 'string', 'value': 'category.ref', 'value_label': 'category.label', 'order_by': 'category.label', }, { 'name': 'formdef', 'label': 'formulaire', 'join': ['formdef'], 'type': 'string', 'value': 'formdef.ref', 'value_label': 'formdef.label', 'order_by': 'formdef.label', }, { 'name': 'generic_status', 'label': 'statut simplifié', 'join': ['generic_status'], 'type': 'integer', 'value': 'generic_status.id', 'value_label': 'generic_status.label', }, { 'name': 'hour', 'label': 'heure', 'join': ['hour'], 'type': 'integer', 'value': 'hour.id', 'filter': False, }, { 'name': 'agent', 'label': 'premier agent traitant', 'join': ['agent'], 'type': 'integer', 'value': 'agent.id', 'value_label': 'agent.label', 'order_by': 'agent.label', }, ], 'measures': [ { 'name': 'count', 'label': 'nombre de demandes', 'type': 'integer', 'expression': 'count({fact_table}.id)', }, { 'name': 'avg_endpoint_delay', 'label': 'délai de traitement moyen', 'type': 'duration', 'expression': 'avg(endpoint_delay)', }, { 'name': 'max_endpoint_delay', 'label': 'délai de traitement maximum', 'type': 'duration', 'expression': 'max(endpoint_delay)', }, { 'name': 'min_endpoint_delay', 'label': 'délai de traitement minimum', 'type': 'duration', 'expression': 'min(endpoint_delay)', }, { 'name': 'percent', 'label': 'pourcentage des demandes', 'type': 'percent', "expression": 'case (select count({fact_table}.id) from {table_expression} ' 'where {where_conditions}) when 0 then null else ' 'count({fact_table}.id) * 100. / (select ' 'count({fact_table}.id) from {table_expression} where ' '{where_conditions}) end', }, { 'name': 'geolocation', 'label': 'localisation géographique', 'type': 'point', 'expression': 'array_agg({fact_table}.geolocation_base) ' 'FILTER (WHERE {fact_table}.geolocation_base IS NOT NULL)', } ] } self.model['cubes'].append(cube) self.base_cube = self.model['cubes'][0] self.agents_mapping = {} self.formdata_json_index = [] # keep at end of __init__ to prevent leak if __init__ raises self.connection = psycopg2.connect(dsn=pg_dsn) self.connection.autocommit = True self.cur = self.connection.cursor() try: self.has_jsonb = self.detect_jsonb() if self.has_jsonb: cube['json_field'] = 'json_data' except Exception: self.connection.close() raise @cached_property def formdefs(self): return [formdef for formdef in self.api.formdefs if (not self.slugs or formdef.slug in self.slugs) and not formdef.is_empty] @cached_property def roles(self): return self.api.roles @cached_property def categories(self): return self.api.categories def detect_jsonb(self): self.cur.execute("SELECT 1 FROM pg_type WHERE typname = 'jsonb'") return bool(self.cur.rowcount) def hash_table_name(self, table_name, hash_length=6, force_hash=False): table_name = table_name.format(**self.default_ctx) return truncate_pg_identifier(table_name, hash_length=hash_length, force_hash=force_hash) @property def default_ctx(self): return self.ctx.as_dict() def ex(self, query, ctx=None, vars=None): ctx = ctx or {} ctx.update(self.default_ctx) sql = query.format(**(ctx or {})) self.logger.debug('SQL: %s VARS: %s', sql, vars) self.cur.execute(sql, vars=vars) def do_schema(self): self.ex('SET search_path = public') self.logger.debug('dropping schema %s', self.schema_temp) self.drop_tables_sequencially(self.schema_temp) self.ex('DROP SCHEMA IF EXISTS {schema_temp} CASCADE') self.logger.debug('creating schema %s', self.schema) self.ex('CREATE SCHEMA {schema_temp}') self.ex('SET search_path = {schema_temp},public') def drop_tables_sequencially(self, schema): """ Drop tables one by one in order to avoid reaching max_locks_per_transaction """ # drop foreign key constraints first self.ex("SELECT table_name, constraint_name FROM " "information_schema.key_column_usage " "WHERE table_schema = %s AND constraint_name LIKE '%%_fkey'", vars=[schema]) for table_name, constraint_name in self.cur.fetchall(): # drop of PK constraints can have effects on FK constraint on other tables. with ignore_undefined_object_or_table(): self.ex('ALTER TABLE %s.%s DROP CONSTRAINT IF EXISTS %s CASCADE' % (quote(schema), quote(table_name), quote(constraint_name))) # remove others self.ex("SELECT table_name, constraint_name FROM " "information_schema.key_column_usage " "WHERE table_schema = %s", vars=[schema]) for table_name, constraint_name in self.cur.fetchall(): # drop of PK constraints can have effects on FK constraint on other tables. with ignore_undefined_object_or_table(): self.ex('ALTER TABLE %s.%s DROP CONSTRAINT IF EXISTS %s CASCADE' % (quote(schema), quote(table_name), quote(constraint_name))) # then drop indexes self.ex("SELECT tablename, indexname FROM pg_indexes WHERE schemaname = %s", vars=[schema]) for table_name, index_name in self.cur.fetchall(): with ignore_undefined_object_or_table(): self.ex('DROP INDEX %s.%s CASCADE' % (quote(schema), quote(index_name))) # finally drop tables, cascade will have no effect self.ex("SELECT tablename FROM pg_tables WHERE schemaname = %s ORDER BY tablename DESC", vars=[schema]) for table in self.cur.fetchall(): tablename = '%s.%s' % (quote(schema), quote(table[0])) self.ex('DROP TABLE IF EXISTS %s;' % tablename) def do_dates_table(self): self.ex('CREATE TABLE IF NOT EXISTS public.dates (date date, day text, month text)') self.ex('CREATE INDEX IF NOT EXISTS dates_index ON public.dates (date)') self.ex('SELECT MIN(date) FROM public.dates') max_date = self.cur.fetchone()[0] first_date = max_date or datetime.date(2010, 1, 1) last_date = (datetime.date.today() + datetime.timedelta(days=150)).replace(month=12, day=31) self.ex('''INSERT INTO public.dates SELECT the_date.the_date::date AS date, to_char(the_date.the_date, 'TMday') AS day, to_char(the_date.the_date, 'TMmonth') AS month FROM generate_series(%s::date, %s::date, '1 day'::interval) AS the_date(the_date) LEFT JOIN public.dates AS dates ON the_date.the_date = dates.date WHERE dates.date IS NULL''', vars=[first_date, last_date]) def create_table(self, name, columns, inherits=None, comment=None): sql = 'CREATE TABLE %s' % quote(name) sql += '(' + ', '.join('%s %s' % (quote(n), t) for n, t in columns) + ')' if inherits: sql += ' INHERITS (%s)' % quote(inherits) self.ex(sql) if comment: self.ex('COMMENT ON TABLE %s IS %%s' % quote(name), vars=(comment,)) def prev_table_exists(self, name): query = """SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_schema = '{schema}' AND table_name = %s)""" self.ex(query, vars=(name,)) return self.cur.fetchone()[0] def update_table_sequence_number(self, name): self.ex("""SELECT setval(pg_get_serial_sequence('{name}', 'id'), (SELECT GREATEST(1, MAX(id)) FROM {name}))""", ctx={'name': quote(name)}) def create_labeled_table_serial(self, name, comment): self.create_table( name, [['id', 'serial primary key'], ['label', 'varchar']], comment=comment) if self.prev_table_exists(name): # Insert data from previous table self.ex( 'INSERT INTO {schema_temp}.{name} SELECT * FROM {schema}.{name}', ctx={'name': quote(name)} ) self.update_table_sequence_number(name) def create_referenced_table(self, name, fields, comment): # add primary key and reference fields new_fields = [['id', 'serial primary key'], ['ref', 'varchar UNIQUE']] + fields self.create_table(name, new_fields, comment=comment) def do_referenced_data(self, name, data, result_column, update_column='label'): to_insert = [] for item in data: ref = item[0] self.ex( 'SELECT ref, {column} FROM {name} WHERE ref = %s', ctx={'name': quote(name), 'column': quote(update_column)}, vars=(ref,)) if self.cur.fetchall(): for item in self.cur.fetchall(): self.ex('UPDATE {name} SET {column}=%s WHERE ref=%s', ctx={'name': quote(name), 'column': quote(update_column)}, vars=[item[1], ref]) else: to_insert.append(item) if to_insert: columns_values = ', '.join(['%s' for x in range(len(item))]) tmpl = ', '.join(['(DEFAULT, %s)' % columns_values] * len(data)) query = 'INSERT INTO {name} VALUES %s' % tmpl self.ex(query, ctx={'name': quote(name)}, # 'column': quote(update_column)}, vars=list(itertools.chain(*to_insert))) result = {} self.ex('SELECT id, {column} FROM {name}', ctx={'name': quote(name), 'column': result_column}) for _id, column in self.cur.fetchall(): result[column] = _id return result def create_labeled_table(self, name, labels, comment=None): self.create_table( name, [ ['id', 'integer primary key'], ['label', 'varchar'] ], comment=comment) if self.prev_table_exists(name): # Insert data from previous table self.ex( 'INSERT INTO {schema_temp}.{name} select * FROM {schema}.{name}', ctx={'name': quote(name)} ) # Find what is missing to_insert = [] for _id, _label in labels: self.ex( 'SELECT * FROM {name} WHERE label = %s', ctx={'name': quote(name)}, vars=(_label,)) if self.cur.fetchone() is None: to_insert.append(_label) labels = None if to_insert: self.ex('SELECT MAX(id) FROM {name}', ctx={'name': quote(name)}) next_id = (self.cur.fetchone()[0] or 0) + 1 ids = range(next_id, next_id + len(to_insert)) labels = zip(ids, to_insert) if labels is not None: # turn enumerate generator object into a proper list labels = list(labels) if labels: tmpl = ', '.join(['(%s, %s)'] * len(labels)) query_str = 'INSERT INTO {name} (id, label) VALUES %s' % tmpl self.ex(query_str, ctx={'name': quote(name)}, vars=list(itertools.chain(*labels))) res = {} self.ex('SELECT id, label FROM {name}', ctx={'name': quote(name)}) for id_, label in self.cur.fetchall(): res[label] = id_ return res def do_category_table(self): fields = [['label', 'varchar']] table_name = self.default_ctx['category_table'] self.create_referenced_table(table_name, fields, 'catégorie') categories_data = [(c.slug, c.title) for c in self.categories] tmp_cat_map = self.do_referenced_data(table_name, categories_data, 'label') self.update_table_sequence_number(table_name) # remap categories ids to ids in the table return dict((c.title, tmp_cat_map[c.title]) for c in self.categories) def do_formdef_table(self): categories_mapping = self.do_category_table() formdef_fields = [['category_id', 'integer REFERENCES {category_table} (id)'], ['label', 'varchar'] ] table_name = self.default_ctx['form_table'] self.create_referenced_table(table_name, formdef_fields, 'types de formulaire') formdefs = [(form.slug, categories_mapping.get(form.schema.category), form.schema.name) for form in self.formdefs] self.formdefs_mapping = self.do_referenced_data(table_name, formdefs, 'ref') self.update_table_sequence_number(table_name) def do_base_table(self): # channels self.create_labeled_table('channel', [[c[0], c[2]] for c in self.channels], comment='canal') # roles roles = dict((i, role.name) for i, role in enumerate(self.roles)) tmp_role_map = self.create_labeled_table('role', roles.items(), comment='role') self.role_mapping = dict( (role.id, tmp_role_map[role.name]) for role in self.roles) # forms self.do_formdef_table() self.create_labeled_table('hour', zip(range(0, 24), map(str, range(0, 24))), comment='heures') self.create_labeled_table('status', self.status, comment='statuts simplifiés') # agents self.create_labeled_table_serial('agent', comment='agents') self.columns = [ ['id', 'serial primary key'], ['formdef_id', 'smallint REFERENCES {form_table} (id)'], ['receipt_time', 'date'], ['hour_id', 'smallint REFERENCES {hour_table} (id)'], ['channel_id', 'smallint REFERENCES {channel_table} (id)'], ['backoffice', 'boolean'], ['generic_status_id', 'smallint REFERENCES {generic_status_table} (id)'], ['endpoint_delay', 'interval'], ['first_agent_id', 'smallint REFERENCES {agent_table} (id)'], ['geolocation_base', 'POINT NULL'], ] if self.has_jsonb: self.columns.append(['json_data', 'JSONB NULL']) self.comments = { 'formdef_id': 'formulaire', 'receipt_time': 'date de réception', 'hour_id': 'heure', 'channel_id': 'canal', 'backoffice': 'soumission backoffce', 'generic_status_id': 'statut simplifié', 'endpoint_delay': 'délai de traitement', 'geolocation_base': 'position géographique', } self.create_table('{generic_formdata_table}', self.columns) for at, comment in self.comments.items(): self.ex('COMMENT ON COLUMN {generic_formdata_table}.%s IS %%s' % at, vars=(comment,)) self.ex('COMMENT ON TABLE {generic_formdata_table} IS %s', vars=('tous les formulaires',)) # evolutions self.create_table('{generic_evolution_table}', [ ['id', 'serial primary key'], ['generic_status_id', 'smallint REFERENCES {generic_status_table} (id)'], ['formdata_id', 'integer'], # "REFERENCES {generic_formdata_table} (id)" is impossible because FK constraints do not work on inherited tables ['time', 'timestamp'], ['date', 'date'], ['hour_id', 'smallint REFERENCES {hour_table} (id)'], ]) self.ex('COMMENT ON TABLE {generic_evolution_table} IS %s', vars=('evolution générique',)) def feed(self): try: if self.do_feed: self.do_schema() self.do_dates_table() self.do_base_table() for formdef in self.formdefs: self.api.cache = {} try: formdef_feeder = WcsFormdefFeeder(self, formdef, do_feed=self.do_feed) formdef_feeder.feed() except WcsApiError as e: self.logger.error('failed to retrieve formdef %s, %s', formdef.slug, e) except Exception: # keep temporary schema alive for debugging raise else: if self.do_feed: if not self.fake: self.logger.debug('dropping schema %s', self.schema) self.drop_tables_sequencially(self.schema) self.ex('DROP SCHEMA IF EXISTS {schema} CASCADE') self.logger.debug('renaming schema %s to %s', self.schema + '_temp', self.schema) self.ex('ALTER SCHEMA {schema_temp} RENAME TO {schema}') if 'cubes_model_dirs' in self.config: model_path = os.path.join(self.config['cubes_model_dirs'], '%s.model' % self.schema) with open(model_path, 'w') as f: json.dump(self.model, f, indent=2, sort_keys=True) finally: # prevent connection from remaining open self.cur.close() self.connection.close() def insert_agent(self, name): self.ex('SELECT id FROM {agent_table} WHERE label = %s', vars=(name,)) res = self.cur.fetchone() if res: return res[0] self.ex('INSERT INTO {agent_table} (label) VALUES (%s) RETURNING (id)', vars=(name,)) return self.cur.fetchone()[0] def get_agent(self, user): assert user.name assert user.id if user.id not in self.agents_mapping: self.agents_mapping[user.id] = self.insert_agent(user.name) return self.agents_mapping[user.id] def create_formdata_json_index(self, table_name, varname): if varname in self.formdata_json_index: return index_name = self.hash_table_name('%s_%s_json_idx' % (table_name, varname), hash_length=8, force_hash=True) self.ex('CREATE INDEX {index_name} ON {generic_formdata_table} (("json_data"->>%s))', ctx={'index_name': quote(index_name)}, vars=[varname]) # prevent double creation self.formdata_json_index.append(varname) def has_digits_validation(field): return field.validation and field.validation.get('type') == 'digits' class WcsFormdefFeeder(object): def __init__(self, olap_feeder, formdef, do_feed=True): self.olap_feeder = olap_feeder self.formdef = formdef self.status_mapping = {} self.items_mappings = {} self.do_feed = do_feed self.fields = [] self.item_id_mappings = {} @property def table_name(self): return self.hash_table_name('formdata_%s' % self.formdef.slug.replace('-', '_')) @property def status_table_name(self): return self.hash_table_name('status_%s' % self.formdef.slug.replace('-', '_')) @property def evolution_table_name(self): return self.hash_table_name('evolution_%s' % self.formdef.slug.replace('-', '_')) def __getattr__(self, name): return getattr(self.olap_feeder, name) def do_statuses(self): statuses = self.formdef.schema.workflow.statuses tmp_status_map = self.olap_feeder.create_labeled_table( self.status_table_name, enumerate([s.name for s in statuses]), comment='statuts du formulaire « %s »' % self.formdef.schema.name) self.status_mapping = dict((s.id, tmp_status_map[s.name]) for s in statuses) def do_data_table(self): columns = OrderedDict() columns['status_id'] = {'sql_col_name': 'status_id', 'sql_col_def': 'smallint REFERENCES {status_table} (id)'} # add item fields for field in self.good_fields.values(): if field.type == 'item': comment = ('valeurs du champ « %s » du formulaire %s' % (field.label, self.formdef.schema.name)) table_name = self.hash_table_name('%s_field_%s' % (self.table_name, field.varname)) # create table and mapping if field.items: self.items_mappings[field.varname] = self.create_labeled_table( table_name, enumerate(field.items), comment=comment) else: # open item field, from data sources... self.create_labeled_table_serial(table_name, comment=comment) field_def = 'integer REFERENCES %s (id)' % quote(table_name) elif field.type == 'bool': field_def = 'boolean' elif field.type == 'string': if has_digits_validation(field): field_def = 'integer' else: field_def = 'varchar' else: continue columns[field.varname] = { 'field': field, 'sql_col_name': truncate_pg_identifier('field_%s' % field.varname), 'sql_col_def': field_def, 'sql_comment': field.label, } # keep loaded fields around for key in columns: if columns[key].get('field') is not None: self.fields.append(columns[key].get('field')) # add geolocation fields for geolocation, label in self.formdef.schema.geolocations: at = 'geolocation_%s' % geolocation columns[at] = { 'sql_col_name': at, 'sql_col_def': 'point', 'comment': 'géoloc « %s »' % label, } # add function fields for function, name in self.formdef.schema.workflow.functions.items(): at = 'function_%s' % slugify(function) columns[at] = { 'sql_col_name': at, 'sql_col_def': 'smallint REFERENCES {role_table} (id)', 'comment': 'fonction « %s »' % name, } self.columns = [name for name, _type in self.olap_feeder.columns] for key in columns: self.columns.append(columns[key]['sql_col_name']) self.columns.remove('geolocation_base') self.create_table(self.table_name, [(columns[key]['sql_col_name'], columns[key]['sql_col_def']) for key in columns], inherits='{generic_formdata_table}', comment='formulaire %s' % self.formdef.schema.name) for key in columns: column = columns[key] if column.get('sql_comment'): self.ex('COMMENT ON COLUMN {formdata_table}.%s IS %%s' % quote(column['sql_col_name']), vars=(column['sql_comment'],)) # Creat index for JSON fields if self.has_jsonb: for varname in self.good_fields: self.create_formdata_json_index(self.table_name, varname) # PostgreSQL does not propagate foreign key constraints to child tables # so we must recreate them manually for name, _type in self.olap_feeder.columns: if 'REFERENCES' not in _type: continue i = _type.index('REFERENCES') constraint = '%s_fk_constraint FOREIGN KEY (%s) %s' % (name, name, _type[i:]) self.ex('ALTER TABLE {formdata_table} ADD CONSTRAINT %s' % constraint) self.ex('ALTER TABLE {formdata_table} ADD PRIMARY KEY (id)') # table des evolutions self.create_table(self.evolution_table_name, [ ['id', 'serial primary key'], ['status_id', 'smallint REFERENCES {status_table} (id)'], ['formdata_id', 'integer REFERENCES {formdata_table} (id)'], ['time', 'timestamp'], ['date', 'date'], ['hour_id', 'smallint REFERENCES {hour_table} (id)'], ]) self.ex('COMMENT ON TABLE {evolution_table} IS %s', vars=('evolution des demandes %s' % self.formdef.schema.name,)) def insert_item_value(self, field, value): table_name = self.hash_table_name('%s_field_%s' % (self.table_name, field.varname)) self.ex('SELECT id FROM {item_table} WHERE label = %s', ctx={'item_table': quote(table_name)}, vars=(value,)) res = self.cur.fetchone() if res: return res[0] self.ex('INSERT INTO {item_table} (label) VALUES (%s) RETURNING (id)', vars=(value,), ctx={'item_table': quote(table_name)}) return self.cur.fetchone()[0] def get_item_id(self, field, value): assert field assert field.varname assert value mapping = self.item_id_mappings.setdefault(field.varname, {}) if value not in mapping: mapping[value] = self.insert_item_value(field, value) return mapping[value] def generic_status(self, status): if status.endpoint: generic_status = 3 elif status.startpoint: generic_status = 1 else: generic_status = 2 return generic_status def do_data(self): values = [] generic_evolution_values = [] evolution_values = [] for data in self.formdef.formdatas.anonymized.full: json_data = {} # ignore formdata without status if data.workflow.real_status: status_id = data.workflow.real_status.id elif data.workflow.status: status_id = data.workflow.status.id elif data.evolution: for evolution in reversed(data.evolution): if evolution.status: status_id = evolution.status break else: continue else: continue try: status = data.formdef.schema.workflow.statuses_map[status_id] except KeyError: self.logger.warning('%s.%s unknown status status_id %s', data.formdef.schema.name, data.id, status_id) continue channel = data.submission.channel.lower() if channel == 'web' and data.submission.backoffice: channel = 'backoffice' row = { 'formdef_id': self.formdefs_mapping[self.formdef.slug], 'receipt_time': data.receipt_time, 'hour_id': data.receipt_time.hour, 'channel_id': self.channel_to_id[channel], 'backoffice': data.submission.backoffice, 'generic_status_id': self.generic_status(status), 'status_id': self.status_mapping[status_id], 'endpoint_delay': data.endpoint_delay, 'first_agent_id': self.get_first_agent_in_evolution(data), } # add form fields value for field in self.fields: v = None raw = None if field.varname in data.fields: raw = data.fields[field.varname] elif field.varname in data.workflow.fields: raw = data.workflow.fields[field.varname] else: raw = None if field.type == 'item': # map items to sql id if field.items or field.options: v = self.items_mappings[field.varname].get(raw) elif raw: v = self.get_item_id(field, raw) else: v = None elif field.type == 'string': if has_digits_validation(field): if raw is not None and raw.isnumeric(): v = int(raw) else: v = raw elif field.type == 'bool': v = raw # unstructured storage of field values if field.varname and raw is not None: json_data[field.varname] = raw row[truncate_pg_identifier('field_%s' % field.varname)] = v if self.has_jsonb: row['json_data'] = json.dumps(json_data) # add geolocation fields value for geolocation, label in self.formdef.schema.geolocations: v = (data.geolocations or {}).get(geolocation) if v: v = '(%.6f, %.6f)' % (v.get('lon'), v.get('lat')) row['geolocation_%s' % geolocation] = v # add function fields value for function, name in self.formdef.schema.workflow.functions.items(): try: v = data.functions[function] except KeyError: v = None else: if v and v.id in self.olap_feeder.role_mapping: v = self.olap_feeder.role_mapping[v.id] else: v = None at = 'function_%s' % slugify(function) row[at] = v values.append(tuple(row[column] for column in self.columns[1:])) # inert evolutions generic_evolution = [] evolution = [] last_status = None for evo in data.evolution: if not evo.status: continue try: status = data.formdef.schema.workflow.statuses_map[evo.status] except KeyError: self.logger.warning('%s.%s unknown status in evolution %s', data.formdef.schema.name, data.id, evo.status) continue status_id = self.status_mapping[status.id] generic_status_id = self.generic_status(status) evolution.append( [0, status_id, evo.time, evo.time.date(), evo.time.hour]) if generic_status_id == last_status: continue generic_evolution.append( [0, generic_status_id, evo.time, evo.time.date(), evo.time.hour]) last_status = generic_status_id generic_evolution_values.append(generic_evolution) evolution_values.append(evolution) if not values: self.logger.warning('no data') return insert_columns = ['%s' % quote(column) for column in self.columns[1:]] insert_columns = ', '.join(insert_columns) self.ex('INSERT INTO {formdata_table} ({columns}) VALUES {values} RETURNING id', ctx=dict( columns=insert_columns, values=', '.join(['%s'] * len(values)) ), vars=values) # insert generic evolutions generic_evolutions = [] ids = list(self.cur.fetchall()) for evos, (formdata_id,) in zip(generic_evolution_values, ids): for row in evos: row[0] = formdata_id generic_evolutions.append(tuple(row)) if len(generic_evolutions) == 500: self.ex('INSERT INTO {generic_evolution_table} (%s) VALUES %s' % ( ', '.join(['formdata_id', 'generic_status_id', 'time', 'date', 'hour_id']), ', '.join(['%s'] * len(generic_evolutions))), vars=generic_evolutions) generic_evolutions = [] if generic_evolutions: self.ex('INSERT INTO {generic_evolution_table} (%s) VALUES %s' % ( ', '.join(['formdata_id', 'generic_status_id', 'time', 'date', 'hour_id']), ', '.join(['%s'] * len(generic_evolutions))), vars=generic_evolutions) # insert evolutions evolutions = [] for evos, (formdata_id,) in zip(evolution_values, ids): for row in evos: row[0] = formdata_id evolutions.append(tuple(row)) if len(evolutions) == 500: self.ex('INSERT INTO {evolution_table} (%s) VALUES %s' % ( ', '.join(['formdata_id', 'status_id', 'time', 'date', 'hour_id']), ', '.join(['%s'] * len(evolutions))), vars=evolutions) evolutions = [] if evolutions: self.ex('INSERT INTO {evolution_table} (%s) VALUES %s' % ( ', '.join(['formdata_id', 'status_id', 'time', 'date', 'hour_id']), ', '.join(['%s'] * len(evolutions))), vars=evolutions) def get_first_agent_in_evolution(self, formdata): for evo in formdata.evolution: if evo.who: return self.get_agent(evo.who) def feed(self): self.olap_feeder.ctx.push({ 'formdata_table': quote(self.table_name), 'status_table': quote(self.status_table_name), 'evolution_table': quote(self.evolution_table_name), }) # create cube cube = self.cube = copy.deepcopy(self.base_cube) def add_warning(message): self.logger.warning('%s', message) cube.setdefault('warnings', []).append(message) # remove json field from formdef cubes cube.pop('json_field', None) cube.update({ 'name': self.table_name, 'label': self.formdef.schema.name, 'fact_table': quote(self.table_name), 'key': 'id', }) cube['dimensions'] = [dimension for dimension in cube['dimensions'] if dimension['name'] not in ('category', 'formdef')] # add dimension for status cube['joins'].append({ 'name': 'status', 'table': quote(self.status_table_name), 'master': 'status_id', 'detail': 'id', 'kind': 'left', }) cube['dimensions'].append({ 'name': 'status', 'label': 'statut', 'join': ['status'], 'type': 'integer', 'value': 'status.id', 'value_label': 'status.label', }) # add dimension for function for function, name in self.formdef.schema.workflow.functions.items(): at = 'function_%s' % slugify(function) cube['joins'].append({ 'name': at, 'table': 'role', 'master': quote(at), 'detail': 'id', 'kind': 'left', }) cube['dimensions'].append({ 'name': at, 'label': 'fonction %s' % name.lower(), 'join': [at], 'type': 'integer', 'value': '%s.id' % quote(at), 'value_label': '%s.label' % quote(at), 'filter': False, }) # add dimensions for item fields fields = self.formdef.schema.fields if self.formdef.schema.workflow: fields += self.formdef.schema.workflow.fields # filter duplicates self.good_fields = good_fields = OrderedDict() bad_varnames = set() for field in fields: if field.type not in ('item', 'bool', 'string'): continue if field.anonymise is True: continue if not field.varname: add_warning('Le champ « %s » n\' a pas de nom de variable, il a été ignoré.' % field.label) continue if field.varname in good_fields and good_fields[field.varname].type != field.type: # duplicate found, but type is not coherent add_warning( 'Le champ « %(label)s » a un nom de variable dupliqué « %(varname)s » ' 'mais pas le même type que « %(label_good)s », tous les champs avec ce nom seront ignorés ' '(%(type)s != %(type_good)s).' % { 'label': field.label, 'varname': field.varname, 'type': field.type, 'label_good': good_fields[field.varname].label, 'type_good': good_fields[field.varname].type, } ) del self.good_fields[field.varname] bad_varnames.add(field.varname) continue if field.varname in bad_varnames: add_warning('Le champ « %s » est un doublon d\'un champ de type différent, il a été ignoré.' % field.label) continue self.good_fields[field.varname] = field for field in good_fields.values(): join = None join_name = 'join_' + field.varname field_name = truncate_pg_identifier('field_%s' % field.varname) dimension_name = field.varname dimension_label = field.label.lower() if field.type == 'item': table_name = self.hash_table_name('%s_field_%s' % (self.table_name, field.varname)) join = { 'name': join_name, 'table': quote(table_name), 'master': quote(field_name), 'detail': 'id', 'kind': 'left', } dimension = { 'name': dimension_name, 'label': dimension_label, 'join': [join_name], 'type': 'integer', 'value': '%s.id' % quote(join_name), 'value_label': '%s.label' % quote(join_name), 'filter': True, } elif field.type == 'bool': dimension = { 'name': dimension_name, 'label': dimension_label, 'type': 'bool', 'value': quote(field_name), 'value_label': '(CASE WHEN %(field)s IS NULL THEN NULL' ' WHEN %(field)s THEN \'Oui\'' ' ELSE \'Non\' END)' % { 'field': quote(field_name), }, 'filter': True, } elif field.type == 'string': if has_digits_validation(field): # we will define a SUM measure instead cube['measures'].append({ 'name': 'sum_' + dimension_name, 'label': 'total du champ « %s »' % dimension_label, 'type': 'integer', 'expression': 'SUM({fact_table}.%s)' % quote(field_name), }) continue else: dimension = { 'name': dimension_name, 'label': dimension_label, 'type': 'string', 'value': quote(field_name), 'filter': True, } else: continue if join: cube['joins'].append(join) cube['dimensions'].append(dimension) self.model['cubes'].append(cube) if self.do_feed: try: self.logger.info('feed formdef %s', self.formdef.slug) self.do_statuses() self.do_data_table() self.do_data() finally: self.olap_feeder.ctx.pop()