diff --git a/README.rst b/README.rst new file mode 100644 index 0000000..32ad7a9 --- /dev/null +++ b/README.rst @@ -0,0 +1,23 @@ +BI for Publik +============= + +w.c.s. OLAP +----------- + +Tool to export w.c.s. data in a database with star schema for making an OLAP +cube.:: + + usage: wcs-olap --url URL [-h] --orig ORIG --key KEY + --pg-dsn PG_DSN + + Export W.C.S. data as a star schema in a postgresql DB + + optional arguments: + --url URL url of the w.c.s. instance + -h, --help show this help message and exit + --orig ORIG origin of the request for signatures + --key KEY HMAC key for signatures + --pg-dsn PG_DSN Psycopg2 DB DSN + + + diff --git a/create_dates.sql b/create_dates.sql new file mode 100644 index 0000000..dd13b2d --- /dev/null +++ b/create_dates.sql @@ -0,0 +1,9 @@ +-- Crée une table de dates entre 2010 et 2020 +DROP TABLE IF EXISTS dates; +CREATE TABLE dates AS (SELECT + the_date.the_date::date AS date, + to_char(the_date.the_date, 'TMday') AS day, + to_char(the_date.the_date, 'TMmonth') AS month + FROM + generate_series('2010-01-01'::date, '2020-01-01'::date, '1 day'::interval) + AS the_date(the_date)); diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..3229d20 --- /dev/null +++ b/setup.py @@ -0,0 +1,61 @@ +#! /usr/bin/env python + +import subprocess +import os + +from setuptools import setup, find_packages +from setuptools.command.sdist import sdist + + +class eo_sdist(sdist): + def run(self): + print "creating VERSION file" + if os.path.exists('VERSION'): + os.remove('VERSION') + version = get_version() + version_file = open('VERSION', 'w') + version_file.write(version) + version_file.close() + sdist.run(self) + print "removing VERSION file" + if os.path.exists('VERSION'): + os.remove('VERSION') + + +def get_version(): + '''Use the VERSION, if absent generates a version with git describe, if not + tag exists, take 0.0.0- and add the length of the commit log. + ''' + if os.path.exists('VERSION'): + with open('VERSION', 'r') as v: + return v.read() + if os.path.exists('.git'): + p = subprocess.Popen(['git', 'describe', '--dirty', '--match=v*'], stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + result = p.communicate()[0] + if p.returncode == 0: + result = result.split()[0][1:] + else: + result = '0.0.0-%s' % len(subprocess.check_output( + ['git', 'rev-list', 'HEAD']).splitlines()) + return result.replace('-', '.').replace('.g', '+g') + return '0.0.0' + + +setup(name="wcs-olap", + version=get_version(), + license="AGPLv3+", + description="Export w.c.s. data to an OLAP cube", + long_description=open('README.rst').read(), + url="http://dev.entrouvert.org/projects/publik-bi/", + author="Entr'ouvert", + author_email="authentic@listes.entrouvert.com", + maintainer="Benjamin Dauvergne", + maintainer_email="bdauvergne@entrouvert.com", + packages=find_packages(), + include_package_data=True, + install_requires=['requests','psycopg2', 'isodate'], + entry_points={ + 'console_scripts': ['wcs-olap=wcs_olap.cmd:main'], + }, + cmdclass={'sdist': eo_sdist}) diff --git a/wcs_olap/cmd.py b/wcs_olap/cmd.py index eb7491d..794dc0c 100644 --- a/wcs_olap/cmd.py +++ b/wcs_olap/cmd.py @@ -1,59 +1,97 @@ import argparse import ConfigParser import os -import urlparse import logging import logging.config from . import wcs_api from .feeder import WcsOlapFeeder import locale +from . import tb + def main(): - locale.setlocale(locale.LC_ALL, '') + try: + main2() + except SystemExit: + raise + except: + raise + tb.print_tb() + raise SystemExit(1) + + +def get_config(path=None): config = ConfigParser.ConfigParser() - global_config_file = '/etc/wcs_olap.ini' - if os.path.exists(global_config_file): - config.read(global_config_file) - if config.has_section('loggers'): - logging.config.fileConfig(global_config_file) - user_config_file = os.path.expanduser('~/.wcs_olap.ini') - if os.path.exists(user_config_file): - config.read(user_config_file) - if config.has_section('loggers'): - logging.config.fileConfig(user_config_file) + global_config_path = '/etc/wcs_olap.ini' + user_config_path = os.path.expanduser('~/.wcs_olap.ini') + if not path: + if os.path.exists(user_config_path): + path = user_config_path + elif os.path.exists(global_config_path): + path = global_config_path + else: + return config + config.read(path) + if config.has_section('loggers'): + logging.config.fileConfig(path) + return config + + +def main2(): + locale.setlocale(locale.LC_ALL, '') + parser = argparse.ArgumentParser(description='Export W.C.S. data as a star schema in a ' + 'postgresql DB', add_help=False) + parser.add_argument('config_path', default=None) + group = parser.add_mutually_exclusive_group() + group.add_argument("-a", "--all", help="synchronize all wcs", action='store_true', + default=False) + group.add_argument('--url', help='url of the w.c.s. instance', required=False, default=None) + args, rest = parser.parse_known_args() + config = get_config(path=args.config_path) + # list all known urls urls = [url for url in config.sections() if url.startswith('http://') or url.startswith('https://')] - parser = argparse.ArgumentParser(description='Engine ES with W.C.S. data', add_help=False) - parser.add_argument('--url', help='url of the w.c.s. instance', required=not urls, - default=(urls or [None])[0]) - args, rest = parser.parse_known_args() defaults = {} - if getattr(args, 'url') and config.has_section(args.url): - defaults = dict(config.items(args.url)) - parser.add_argument("-h", "--help", action="help", help="show this help message and exit") - parser.add_argument('--orig', help='origin of the request for signatures', - required='orig' not in defaults) - parser.add_argument('--key', help='HMAC key for signatures', required='key' not in defaults) - group = parser.add_mutually_exclusive_group( - required='email' not in defaults and 'name_id' not in defaults) - group.add_argument('--email', help='email for authentication') - group.add_argument('--name-id', help='NameID for authentication') - if defaults: - parser.set_defaults(**defaults) - parser.add_argument('--pg-dsn', help='Psycopg2 DB DSN', required='pg-dsn' not in defaults) + if not args.all: + try: + url = args.url or urls[0] + except IndexError: + print 'no url found' + raise SystemExit(1) + urls = [url] + if config.has_section(args.url): + defaults = dict(config.items(args.url)) + parser.add_argument("-h", "--help", action="help", help="show this help message and exit") + parser.add_argument('--orig', help='origin of the request for signatures') + parser.add_argument('--key', help='HMAC key for signatures') + parser.add_argument('--pg-dsn', help='Psycopg2 DB DSN') + parser.add_argument('--schema', help='schema name') + args = parser.parse_args() + for key in ('orig', 'key', 'pg_dsn', 'schema'): + if getattr(args, key, None): + defaults[key] = getattr(args, key) - args = parser.parse_args() - api = wcs_api.WcsApi(url=args.url, orig=args.orig, key=args.key, email=args.email, - name_id=args.name_id) - domain = urlparse.urlparse(args.url).netloc.split(':')[0] - schema = defaults.get('schema') or domain.replace('.', '_') logger = logging.getLogger('wcs-olap') - logger.info('starting synchronizing w.c.s. at %r with ES at %s:%s', args.url, args.es_host, - args.es_port) - feeder = WcsOlapFeeder(api=api, schema=schema, db_url=args.db_url, logger=logger) - feeder.feed() - logger.info('finished') + for url in urls: + if config.has_section(url): + defaults.update((config.items(url))) + try: + key = defaults['key'] + orig = defaults['orig'] + schema = defaults['schema'] + pg_dsn = defaults['pg_dsn'] + slugs = defaults.get('slugs', '').strip().split() or None + except KeyError, e: + logger.error('configuration in complete for %s: %s', url, e) + else: + api = wcs_api.WcsApi(url=url, orig=orig, key=key, slugs=slugs) + logger.info('starting synchronizing w.c.s. at %r with PostgreSQL at %s', url, pg_dsn) + feeder = WcsOlapFeeder(api=api, schema=schema, pg_dsn=pg_dsn, logger=logger, + config=defaults) + feeder.feed() + logger.info('finished') + defaults = {} if __name__ == '__main__': main() diff --git a/wcs_olap/feeder.py b/wcs_olap/feeder.py index 64da4ea..27260bf 100644 --- a/wcs_olap/feeder.py +++ b/wcs_olap/feeder.py @@ -1,43 +1,383 @@ +# -*- coding: utf-8 -*- + +import copy +import os +import json +import hashlib from utils import Whatever import psycopg2 +from wcs_olap.wcs_api import WcsApiError + + +def slugify(s): + return s.replace('-', '_').replace(' ', '_') + + +class Context(object): + def __init__(self): + self.stack = [] + + def __getitem__(self, key): + if not self.stack: + raise KeyError(key) + for d in self.stack[::-1]: + try: + return d[key] + except KeyError: + pass + else: + raise KeyError(key) + + def push(self, d): + self.stack.append(d) + + def pop(self): + self.stack = self.stack[:-1] + + def as_dict(self): + r = {} + for d in self.stack: + r.update(d) + return r + class WcsOlapFeeder(object): - def __init__(self, api, dsn, schema, logger=None): + def __init__(self, api, pg_dsn, schema, logger=None, config=None): self.api = api self.logger = logger or Whatever() self.schema = schema - self.connection = psycopg2.connect(dsn) + self.connection = psycopg2.connect(dsn=pg_dsn) + self.connection.autocommit = True self.cur = self.connection.cursor() - self.formdefs = api.get_formdefs() + self.formdefs = api.formdefs self.roles = api.roles - - @property - def default_ctx(self): - return { + self.categories = api.categories + self.ctx = Context() + self.ctx.push({ 'schema': self.schema, 'role_table': 'role', 'channel_table': 'channel', + 'category_table': 'category', + 'form_table': 'formdef', + 'generic_formdata_table': 'formdata', + 'generic_status_table': 'status', + 'year_table': 'year', + 'month_table': 'month', + 'day_table': 'day', + 'dow_table': 'dow', + 'hour_table': 'hour', + }) + self.config = config or {} + self.model = { + 'label': self.config.get('cubes_label', schema), + 'name': schema, + 'browser_options': { + 'schema': schema, + }, + 'joins': [ + { + 'name': 'receipt_time', + 'master': 'receipt_time', + 'detail': { + 'table': 'dates', + 'column': 'date', + 'schema': 'public', + }, + 'method': 'detail', + 'alias': 'dates', + }, + { + 'name': 'channel', + 'master': 'channel_id', + 'detail': '{channel_table}.id', + 'method': 'detail', + }, + { + 'name': 'role', + 'detail': '{role_table}.id', + 'method': 'detail', + }, + { + 'name': 'formdef', + 'master': 'formdef_id', + 'detail': '{form_table}.id', + 'method': 'detail', + }, + { + 'name': 'category', + 'master': '{form_table}.category_id', + 'detail': '{category_table}.id', + }, + { + 'name': 'hour', + 'master': 'hour_id', + 'detail': '{hour_table}.id', + 'method': 'detail', + }, + { + 'name': 'generic_status', + 'master': 'generic_status_id', + 'detail': '{generic_status_table}.id', + 'method': 'detail', + }, + ], + 'dimensions': [ + { + 'label': 'date de soumission', + 'name': 'receipt_time', + 'role': 'time', + 'levels': [ + { + 'name': 'year', + 'label': 'année', + 'role': 'year', + 'order_attribute': 'year', + 'order': 'asc', + }, + { + 'name': 'quarter', + 'order_attribute': 'quarter', + 'label': 'trimestre', + 'role': 'quarter', + }, + { + 'name': 'month', + 'label': 'mois', + 'role': 'month', + 'attributes': ['month', 'month_name'], + 'order_attribute': 'month', + 'label_attribute': 'month_name', + 'order': 'asc', + }, + { + 'name': 'week', + 'label': 'semaine', + 'role': 'week', + }, + { + 'name': 'day', + 'label': 'jour', + 'role': 'day', + 'order': 'asc', + }, + { + 'name': 'dow', + 'label': 'jour de la semaine', + 'attributes': ['dow', 'dow_name'], + 'order_attribute': 'dow', + 'label_attribute': 'dow_name', + 'order': 'asc', + }, + ], + 'hierarchies': [ + { + 'name': 'default', + 'label': 'par défaut', + 'levels': ['year', 'month', 'day'] + }, + { + 'name': 'quarterly', + 'label': 'par trimestre', + 'levels': ['year', 'quarter'] + }, + { + 'name': 'weekly', + 'label': 'par semaine', + 'levels': ['year', 'week'] + }, + { + 'name': 'dowly', + 'label': 'par jour de la semaine', + 'levels': ['dow'] + }, + ] + }, + { + 'label': 'canaux', + 'name': 'channels', + }, + { + 'label': 'catégories', + 'name': 'categories', + }, + { + 'label': 'formulaire', + 'name': 'formdef', + }, + { + 'label': 'statuts génériques', + 'name': 'generic_statuses', + }, + { + 'label': 'heure', + 'name': 'hours', + 'levels': [ + { + 'name': 'hours', + 'attributes': ['hour_id', 'hour_label'], + 'order_attribute': 'hour_id', + 'label_attribute': 'hour_label', + } + ] + }, + ], + 'mappings': { + 'receipt_time.year': { + 'table': 'dates', + 'column': 'date', + 'schema': 'public', + 'extract': 'year', + }, + 'receipt_time.month': { + 'table': 'dates', + 'column': 'date', + 'schema': 'public', + 'extract': 'month' + }, + 'receipt_time.month_name': { + 'table': 'dates', + 'schema': 'public', + 'column': 'month' + }, + 'receipt_time.week': { + 'table': 'dates', + 'column': 'date', + 'schema': 'public', + 'extract': 'week' + }, + 'receipt_time.day': { + 'table': 'dates', + 'column': 'date', + 'schema': 'public', + 'extract': 'day' + }, + 'receipt_time.dow': { + 'table': 'dates', + 'column': 'date', + 'schema': 'public', + 'extract': 'dow' + }, + 'receipt_time.dow_name': { + 'table': 'dates', + 'schema': 'public', + 'column': 'day', + }, + 'receipt_time.quarter': { + 'table': 'dates', + 'column': 'date', + 'schema': 'public', + 'extract': 'quarter' + }, + 'formdef': 'formdef.label', + 'channels': 'channel.label', + 'categories': 'category.label', + 'generic_statuses': 'status.label', + 'hours.hour_label': '{hour_table}.label', + 'hours.hour_id': '{hour_table}.id', + }, + 'cubes': [ + { + 'name': schema + '_formdata', + 'label': 'Toutes les demandes (%s)' % schema, + 'key': 'id', + 'fact': 'formdata', + 'dimensions': [ + 'receipt_time', + 'hours', + 'channels', + 'categories', + 'formdef', + 'generic_statuses', + ], + 'joins': [ + { + 'name': 'receipt_time', + }, + { + 'name': 'hour', + }, + { + 'name': 'channel', + }, + { + 'name': 'formdef', + }, + { + 'name': 'category', + }, + { + 'name': 'generic_status', + }, + ], + 'measures': [ + { + 'name': 'endpoint_delay', + 'label': 'délai de traitement', + 'nonadditive': 'all', + }, + ], + 'aggregates': [ + { + 'name': 'record_count', + 'label': 'nombre de demandes', + 'function': 'count' + }, + { + 'name': 'endpoint_delay_max', + 'label': 'délai de traitement maximum', + 'measure': 'endpoint_delay', + 'function': 'max', + }, + { + 'name': 'endpoint_delay_avg', + 'label': 'délai de traitement moyen', + 'measure': 'endpoint_delay', + 'function': 'avg', + }, + ], + }, + ], } + # apply table names + self.model = self.tpl(self.model) + self.base_cube = self.model['cubes'][0] + + def hash_table_name(self, table_name): + table_name = table_name.format(**self.default_ctx) + if len(table_name) < 64: + return table_name + else: + return table_name[:57] + hashlib.md5(table_name).hexdigest()[:6] + + @property + def default_ctx(self): + return self.ctx.as_dict() def ex(self, query, ctx=None, vars=None): ctx = ctx or {} ctx.update(self.default_ctx) - self.cur.execute(query.format(**(ctx or {})), vars=vars) + sql = query.format(**(ctx or {})) + self.logger.debug('SQL: %s VARS: %s', sql, vars) + self.cur.execute(sql, vars=vars) def exmany(self, query, varslist, ctx=None): ctx = ctx or {} ctx.update(self.default_ctx) - self.cur.executemany(query.format(**(ctx or {})), varslist) + sql = query.format(**(ctx or {})) + self.logger.debug('SQL: %s VARSLIST: %s', sql, varslist) + self.cur.executemany(sql, varslist) def do_schema(self): + self.logger.debug('dropping schema %s', self.schema) self.ex('SET search_path = public') - self.ex('DROP SCHEMA {schema} IF EXISTS') - self.ex('CREATE SCHEMA {schema} IF EXISTS') + self.ex('DROP SCHEMA IF EXISTS {schema} CASCADE') + self.logger.debug('creating schema %s', self.schema) + self.ex('CREATE SCHEMA {schema}') self.ex('SET search_path = {schema},public') channels = [ - [1, 'web', u'web'] + [1, 'web', u'web'], [2, 'mail', u'courrier'], [3, 'phone', u'téléphone'], [4, 'counter', u'guichet'], @@ -45,28 +385,143 @@ class WcsOlapFeeder(object): channel_to_id = dict((c[1], c[0]) for c in channels) id_to_channel = dict((c[0], c[1]) for c in channels) + status = [ + [1, 'Nouveau'], + [2, 'En cours'], + [3, 'Terminé'], + ] + status_to_id = dict((c[1], c[0]) for c in channels) + id_to_status = dict((c[0], c[1]) for c in channels) + + def create_table(self, name, columns, inherits=None, comment=None): + sql = 'CREATE TABLE %s' % name + sql += '(' + ', '.join('%s %s' % (n, t) for n, t in columns) + ')' + if inherits: + sql += ' INHERITS (%s)' % inherits + self.ex(sql) + if comment: + self.ex('COMMENT ON TABLE %s IS %%s' % name, vars=(comment,)) + + def create_labeled_table(self, name, labels, comment=None): + self.create_table(name, + [ + ['id', 'smallint primary key'], + ['label', 'varchar'] + ], comment=comment) + values = ', '.join(self.cur.mogrify('(%s, %s)', [_id, _label]) for _id, _label in labels) + if not values: + return + self.ex('INSERT INTO %s (id, label) VALUES %s' % (str(name), values)) + + def tpl(self, o, ctx=None): + ctx = ctx or {} + ctx.update(self.default_ctx) + + def helper(o): + if isinstance(o, basestring): + return o.format(**ctx) + elif isinstance(o, dict): + return dict((k, helper(v)) for k, v in o.iteritems()) + elif isinstance(o, list): + return [helper(v) for v in o] + elif isinstance(o, (bool, int, float)): + return o + else: + assert False, '%s is not a valid value for JSON' % o + return helper(o) + + def add_dim(self, **kwargs): + self.dimensions.append(self.tpl(kwargs)) + def do_base_table(self): - self.ex('CREATE TABLE {channel_table} (id serial PRIMARY KEY, label varchar)') - self.exmany('INSERT INTO {channel_table} (id, label) VALUES (%s, %s)', - [[c[0], c[2]] for c in self.channels]) - self.ex('CREATE TABLE {role_table} (id serial PRIMARY KEY, label varchar)') - self.exmany('INSERT INTO {role_table} (label) VALUES (%s) RETURNING (id)', - [[role.name] for role in self.roles]) - self.roles_mapping = [] - for row, role in zip(self.cur.fetchall(), self.roles): - self.roles_mappin[role.id] = row[0] + # channels + self.create_labeled_table('{channel_table}', [[c[0], c[2]] for c in self.channels]) + + # roles + roles = dict((i, role.name) for i, role in enumerate(self.roles)) + self.create_labeled_table('{role_table}', roles.items()) + self.role_mapping = dict((role.id, i) for i, role in enumerate(self.roles)) + + # categories + self.create_labeled_table('{category_table}', enumerate(c.name for c in self.categories)) + self.categories_mapping = dict((c.id, i) for i, c in enumerate(self.categories)) + + # years + self.create_labeled_table('{year_table}', zip(range(2000, 2030), map(str, range(2000, + 2030)))) + + # month + self.create_labeled_table('{month_table}', zip(range(1, 13), [u'janvier', u'février', + u'mars', u'avril', u'mai', + u'juin', u'juillet', u'août', + u'septembre', u'octobre', + u'novembre', u'décembre'])) + # years + self.create_labeled_table('{day_table}', zip(range(1, 32), map(str, range(1, 32)))) + # day of week + self.create_labeled_table('{dow_table}', enumerate([u'lundi', u'mardi', u'mercredi', + u'jeudi', u'vendredi', u'samedi', + u'dimanche'])) + + self.create_labeled_table('{hour_table}', zip(range(0, 24), map(str, range(0, 24)))) + + self.create_labeled_table('{generic_status_table}', self.status) + self.ex('CREATE TABLE {form_table} (id serial PRIMARY KEY,' + ' category_id integer REFERENCES {category_table} (id),' + ' label varchar)') + self.columns = [ + ['id', 'serial primary key'], + ['formdef_id', 'smallint REFERENCES {form_table} (id)'], + ['receipt_time', 'date'], + ['year_id', 'smallint REFERENCES {year_table} (id)'], + ['month_id', 'smallint REFERENCES {month_table} (id)'], + ['hour_id', 'smallint REFERENCES {hour_table} (id)'], + ['day_id', 'smallint REFERENCES {day_table} (id)'], + ['dow_id', 'smallint REFERENCES {dow_table} (id)'], + ['channel_id', 'smallint REFERENCES {channel_table} (id)'], + ['backoffice', 'boolean'], + ['generic_status_id', 'smallint REFERENCES {generic_status_table} (id)'], + ['endpoint_delay', 'real'], + ] + self.comments = { + 'formdef_id': u'dim$formulaire', + 'receipt_time': u'time$date de réception', + 'year_id': u'dim$année', + 'month_id': u'dim$mois', + 'hour_id': u'dim$heure', + 'day_id': u'dim$jour', + 'dow_id': u'dim$jour de la semaine', + 'channel_id': u'dim$canal', + 'backoffice': u'dim$soumission backoffce', + 'generic_status_id': u'dim$statut générique', + 'endpoint_delay': u'measure$délai de traitement', + } + self.create_table('{generic_formdata_table}', self.columns) + for at, comment in self.comments.iteritems(): + self.ex('COMMENT ON COLUMN {generic_formdata_table}.%s IS %%s' % at, vars=(comment,)) def feed(self): self.do_schema() self.do_base_table() - for formdef in self.api.get_formdefs(): - self.feed_formdef(formdef) + for formdef in self.formdefs: + try: + formdef_feeder = WcsFormdefFeeder(self, formdef) + formdef_feeder.feed() + except WcsApiError, e: + # ignore authorization errors + if (len(e.args) > 2 and hasattr(e.args[2], 'response') + and e.args[2].response.status_code == 403): + continue + self.logger.error('failed to retrieve formdef %s', formdef.slug) + class WcsFormdefFeeder(object): def __init__(self, olap_feeder, formdef): self.olap_feeder = olap_feeder self.formdef = formdef self.status_mapping = {} + self.items_mappings = {} + self.fields = [] @property def table_name(self): @@ -74,45 +529,222 @@ class WcsFormdefFeeder(object): @property def status_table_name(self): - return 'formdata_status_%s' % self.formdef.slug.replace('-', '_') - - @property - def default_ctx(self): - return { - 'table_name': self.table_name, - 'status_table_name': self.status_table_name, - } + return 'status_%s' % self.formdef.slug.replace('-', '_') def __getattr__(self, name): return getattr(self.olap_feeder, name) - def ex(self, query, ctx=None, vars=None): - ctx = ctx or {} - ctx.update(self.default_ctx) - self.olap_feeder.ex(query, ctx=ctx, vars=vars) - - def formdef_exmany(self, formdef, statement, varslist, ctx=None): - ctx = ctx or {} - ctx.update(self.default_ctx) - self.olap_feeder.exmany(statement, varslist, ctx=ctx) - def do_statuses(self): - self.ex('CREATE TABLE {status_table_name} (id serial PRIMARY KEY, ' - 'submission_backoffice label varchar)') - statuses = self.formdef.schema.workflow['statuses'].items() - labels = [[defn['name']] for status, defn in statuses] - self.exmany('INSERT INTO {status_mapping} (label) VALUES (%s) RETURNING (id)', - varslist=labels) - for status_sql_id, (status_id, defn) in zip(self.cur.fetchall(), statuses): - self.status_mapping['wf-%s' % status_id] = status_sql_id + statuses = self.formdef.schema.workflow.statuses + self.olap_feeder.create_labeled_table(self.status_table_name, + enumerate([s.name for s in statuses])) + self.status_mapping = dict((s.id, i) for i, s in enumerate(statuses)) def do_data_table(self): - self.ex('CREATE TABLE {table_name} (id serial PRIMARY KEY, ' - 'receipt_date date, ' - 'status integer REFERENCES {status_table_name} (id))') + self.ex('INSERT INTO {form_table} (category_id, label) VALUES (%s, %s) RETURNING (id)', + vars=[self.categories_mapping.get(self.formdef.schema.category_id), + self.formdef.schema.name]) + self.formdef_sql_id = self.cur.fetchone()[0] + + columns = [['status_id', 'smallint REFERENCES {status_table} (id)']] + + comments = {} + + # add item fields + for field in self.formdef.schema.fields: + if not field.items and not field.options: + continue + if not field.varname: + continue + self.fields.append(field) + comment = (u'dim$%s$Valeur du champ « %s » du formulaire %s' + % (field.label, field.label, self.formdef.schema.name)) + table_name = self.hash_table_name('{formdata_table}_field_%s' % field.varname) + # create table and mapping + if field.items: + self.create_labeled_table(table_name, enumerate(field.items), + comment=comment) + self.items_mappings[field.varname] = dict( + (item, i) for i, item in enumerate(field.items)) + elif field.options: + options = enumerate(field.options) + for option in field.options: + self.create_labeled_table(table_name, + [(i, o['label']) for i, o in options], + comment=comment) + self.items_mappings[field.varname] = dict((o['value'], i) for i, o in options) + + at = 'field_%s' % field.varname + columns.append([at, 'smallint REFERENCES %s (id)' % table_name]) + comments[at] = u'dim$' + field.label + + # add geolocation fields + for geolocation, label in self.formdef.schema.geolocations: + at = 'geolocation_%s' % geolocation + columns.append([at, 'point']) + comments[at] = u'dim$' + label + + # add function fields + for function, name in self.formdef.schema.workflow.functions.iteritems(): + at = 'function_%s' % slugify(function) + columns.append([at, 'smallint REFERENCES {role_table} (id)']) + comments[at] = u'dim$Fonction « %s »' % name + + self.columns = ([name for name, _type in self.olap_feeder.columns] + + [name for name, _type in columns]) + self.create_table('{formdata_table}', columns, inherits='{generic_formdata_table}', + comment=u'cube$%s' % self.formdef.schema.name) + for at, comment in comments.iteritems(): + self.ex('COMMENT ON COLUMN {formdata_table}.%s IS %%s' % at, vars=(comment,)) + # PostgreSQL does not propagate foreign key constraints to child tables + # so we must recreate them manually + for name, _type in self.olap_feeder.columns: + if 'REFERENCES' not in _type: + continue + i = _type.index('REFERENCES') + constraint = '%s_fk_constraint FOREIGN KEY (%s) %s' % (name, name, _type[i:]) + self.ex('ALTER TABLE {formdata_table} ADD CONSTRAINT %s' % constraint) + + def do_data(self): + values = [] + for data in self.formdef.datas: + + # ignore formdata without status + if not data.workflow.status: + continue + + status = data.formdef.schema.workflow.statuses_map[data.workflow.status.id] + if data.endpoint_delay: + endpoint_delay = (data.endpoint_delay.days + float(data.endpoint_delay.seconds) / + 86400.) + else: + endpoint_delay = None + row = { + 'formdef_id': self.formdef_sql_id, + 'receipt_time': data.receipt_time, + 'year_id': data.receipt_time.year, + 'month_id': data.receipt_time.month, + 'day_id': data.receipt_time.day, + 'hour_id': data.receipt_time.hour, + 'dow_id': data.receipt_time.weekday(), + 'channel_id': self.channel_to_id[data.submission.channel.lower()], + 'backoffice': data.submission.backoffice, + # FIXME "En cours"/2 is never used + 'generic_status_id': 3 if status.endpoint else 1, + 'status_id': self.status_mapping[data.workflow.status.id], + 'endpoint_delay': endpoint_delay, + } + # add form fields value + for field in self.fields: + v = None + if field.type == 'item': + # map items to sql id + v = self.items_mappings[field.varname].get(data.fields.get(field.varname)) + row['field_%s' % field.varname] = v + # add geolocation fields value + for geolocation, label in self.formdef.schema.geolocations: + v = (data.geolocations or {}).get(geolocation) + row['geolocation_%s' % geolocation] = v + # add function fields value + for function, name in self.formdef.schema.workflow.functions.iteritems(): + try: + v = data.functions[function] + except KeyError: + v = None + else: + v = self.olap_feeder.role_mapping[v.id] + at = 'function_%s' % slugify(function) + row[at] = v + + tpl = '(' + ', '.join(['%s'] * len(self.columns[1:])) + ')' + value = self.cur.mogrify(tpl, [row[column] for column in self.columns[1:]]) + values.append(value) + if not values: + self.logger.warning('no data') + return + self.ex('INSERT INTO {formdata_table} (%s) VALUES %s' % ( + ', '.join(self.columns[1:]), # skip the id column + ', '.join(values))) def feed(self): - self.logger.info('feed formdef %s', self.formdef.slug) - self.do_statuses() - self.do_data_table() + self.olap_feeder.ctx.push({ + 'formdata_table': self.table_name, + 'status_table': self.status_table_name, + }) + # create cube + self.cube = copy.deepcopy(self.base_cube) + self.cube.update({ + 'name': self.schema + '_' + self.table_name, + 'label': self.formdef.schema.name, + 'fact': self.table_name, + }) + # add dimension for status + self.cube['joins'].append({ + 'master': 'status_id', + 'detail': '%s.id' % self.status_table_name, + 'method': 'detail', + }) + dim_name = '%s_%s' % (self.table_name, 'status') + self.model['dimensions'].append({ + 'name': dim_name, + 'label': 'statut', + 'levels': [ + { + 'name': 'status', + 'attributes': ['status_id', 'status_label'], + 'order_attribute': 'status_id', + 'label_attribute': 'status_label', + }, + ], + }) + self.model['mappings']['%s.status_id' % dim_name] = '%s.id' % self.status_table_name + self.model['mappings']['%s.status_label' % dim_name] = '%s.label' % self.status_table_name + self.cube['dimensions'].append(dim_name) + + # add dimension for function + for function, name in self.formdef.schema.workflow.functions.iteritems(): + at = 'function_%s' % slugify(function) + dim_name = '%s_function_%s' % (self.table_name, slugify(function)) + self.cube['joins'].append({ + 'master': at, + 'detail': self.tpl('{role_table}.id'), + 'alias': at, + }) + self.model['dimensions'].append({ + 'name': dim_name, + 'label': u'fonction %s' % name, + }) + self.model['mappings'][dim_name] = '%s.label' % at + self.cube['dimensions'].append(dim_name) + + # add dimensions for item fields + for field in self.fields: + if field.type != 'item': + continue + table_name = self.hash_table_name('{formdata_table}_field_%s' % field.varname) + self.cube['joins'].append({ + 'master': 'field_%s' % field.varname, + 'detail': '%s.id' % table_name, + 'method': 'detail', + }) + dim_name = '%s_%s' % (self.table_name. field.varname) + self.model['dimensions'].append({ + 'name': dim_name, + 'label': field.label, + }) + self.model['mappings'][dim_name] = '%s.label' % table_name + self.cube['dimensions'].append(dim_name) + + self.model['cubes'].append(self.cube) + try: + self.logger.info('feed formdef %s', self.formdef.slug) + self.do_statuses() + self.do_data_table() + self.do_data() + finally: + self.olap_feeder.ctx.pop() + if 'cubes_model_dirs' in self.config: + model_path = os.path.join(self.config['cubes_model_dirs'], '%s.json' % self.schema) + with open(model_path, 'w') as f: + json.dump(self.model, f, indent=2, sort_keys=True) diff --git a/wcs_olap/tb.py b/wcs_olap/tb.py new file mode 100644 index 0000000..8a9ec1d --- /dev/null +++ b/wcs_olap/tb.py @@ -0,0 +1,55 @@ +from StringIO import StringIO +import sys +import linecache + + +def print_tb(): + exc_type, exc_value, tb = sys.exc_info() + if exc_value: + exc_value = unicode(str(exc_value), errors='ignore') + error_file = StringIO() + + limit = None + if hasattr(sys, 'tracebacklimit'): + limit = sys.tracebacklimit + print >>error_file, "Exception:" + print >>error_file, " type = '%s', value = '%s'" % (exc_type, exc_value) + print >>error_file + + # format the traceback + print >>error_file, 'Stack trace (most recent call first):' + n = 0 + while tb is not None and (limit is None or n < limit): + frame = tb.tb_frame + function = frame.f_code.co_name + filename = frame.f_code.co_filename + exclineno = frame.f_lineno + locals = frame.f_locals.items() + + print >>error_file, ' File "%s", line %s, in %s' % (filename, exclineno, function) + linecache.checkcache(filename) + for lineno in range(exclineno - 2, exclineno + 3): + line = linecache.getline(filename, lineno, frame.f_globals) + if line: + if lineno == exclineno: + print >>error_file, '>%5s %s' % (lineno, line.rstrip()) + else: + print >>error_file, ' %5s %s' % (lineno, line.rstrip()) + print >>error_file + if locals: + print >>error_file, " locals: " + for key, value in locals: + print >>error_file, " %s =" % key, + try: + repr_value = repr(value) + if len(repr_value) > 10000: + repr_value = repr_value[:10000] + ' [...]' + print >>error_file, repr_value, + except: + print >>error_file, "", + print >>error_file + print >>error_file + tb = tb.tb_next + n = n + 1 + + print error_file.getvalue() diff --git a/wcs_olap/utils.py b/wcs_olap/utils.py new file mode 100644 index 0000000..3a3f54f --- /dev/null +++ b/wcs_olap/utils.py @@ -0,0 +1,6 @@ +class Whatever(object): + def __call__(*args, **kwargs): + pass + + def __getattr__(self, name): + return self diff --git a/wcs_olap/wcs_api.py b/wcs_olap/wcs_api.py index d051e47..88b76dc 100644 --- a/wcs_olap/wcs_api.py +++ b/wcs_olap/wcs_api.py @@ -1,7 +1,7 @@ import requests import urlparse import urllib -import datetime +import isodate from . import signature @@ -15,33 +15,107 @@ class BaseObject(object): self.__wcs_api = wcs_api self.__dict__.update(**kwargs) - def json(self): - d = self.__dict__.copy() - for key in d.keys(): - if key.startswith('_'): - del d[key] - return d + +class FormDataWorkflow(BaseObject): + status = None + + def __init__(self, wcs_api, **kwargs): + super(FormDataWorkflow, self).__init__(wcs_api, **kwargs) + if self.status is not None: + self.status = BaseObject(wcs_api, **self.status) + + +class Evolution(BaseObject): + user = None + status = None + parts = None + + def __init__(self, wcs_api, **kwargs): + super(Evolution, self).__init__(wcs_api, **kwargs) + self.time = isodate.parse_datetime(self.time) + if self.parts: + self.parts = [BaseObject(wcs_api, **part) for part in self.parts] class FormData(BaseObject): - def json(self): - d = super(FormData, self).json() - formdef = d.pop('formdef') - receipt_time = datetime.datetime.strptime(d['receipt_time'], "%Y-%m-%dT%H:%M:%SZ") - d['receipt_time__dow'] = receipt_time.strftime('%A') - d['receipt_time__dow_int'] = int(receipt_time.strftime('%w')) - d['receipt_time__month'] = receipt_time.strftime('%B') - d['receipt_time__month_int'] = int(receipt_time.strftime('%m')) - d['receipt_time__hour'] = int(receipt_time.strftime('%H')) - d['formdef_slug'] = formdef.slug - return d + geolocations = None + evolution = None + + def __init__(self, wcs_api, **kwargs): + super(FormData, self).__init__(wcs_api, **kwargs) + self.receipt_time = isodate.parse_datetime(self.receipt_time) + self.submission = BaseObject(wcs_api, **self.submission) + self.workflow = FormDataWorkflow(wcs_api, **self.workflow) + self.evolution = [Evolution(wcs_api, **evo) for evo in self.evolution or []] + self.functions = {} + self.concerned_roles = [] + self.action_roles = [] + for function in self.roles: + roles = [Role(wcs_api, **r) for r in self.roles[function]] + if function == 'concerned': + self.concerned_roles.extend(roles) + elif function == 'actions': + self.concerned_roles.extend(roles) + else: + try: + self.functions[function] = roles[0] + except IndexError: + self.functions[function] = None + del self.roles def __repr__(self): return '<{klass} {display_id!r}>'.format(klass=self.__class__.__name__, display_id=self.id) + @property + def endpoint_delay(self): + '''Compute delay as the time when the last not endpoint status precedes an endpoint + status.''' + statuses_map = self.formdef.schema.workflow.statuses_map + s = 0 + for evo in self.evolution[::-1]: + if evo.status: + if statuses_map[evo.status].endpoint: + s = 1 + last = evo.time - self.receipt_time + else: + if s == 1: + return last + else: + return + + +class Workflow(BaseObject): + status = None + + def __init__(self, wcs_api, **kwargs): + super(Workflow, self).__init__(wcs_api, **kwargs) + self.statuses = [BaseObject(wcs_api, **v) for v in self.statuses] + self.statuses_map = dict((s.id, s) for s in self.statuses) + + +class Field(BaseObject): + items = None + options = None + varname = None + in_filters = False + + +class Schema(BaseObject): + category_id = None + category = None + geolocations = None + + def __init__(self, wcs_api, **kwargs): + super(Schema, self).__init__(wcs_api, **kwargs) + self.workflow = Workflow(wcs_api, **self.workflow) + self.fields = [Field(wcs_api, **f) for f in self.fields] + self.geolocations = sorted((k, v) for k, v in (self.geolocations or {}).items()) + class FormDef(BaseObject): + geolocations = None + def __init__(self, wcs_api, **kwargs): self.__wcs_api = wcs_api self.__dict__.update(**kwargs) @@ -54,7 +128,7 @@ class FormDef(BaseObject): datas = self.__wcs_api.get_formdata(self.slug) for data in datas: data.formdef = self - return datas + yield data @property def schema(self): @@ -68,14 +142,18 @@ class Role(BaseObject): pass +class Category(BaseObject): + pass + + class WcsApi(object): - def __init__(self, url, orig, key, name_id=None, email=None, verify=False): + def __init__(self, url, orig, key, verify=True, slugs=None): self.url = url self.orig = orig self.key = key - self.email = email - self.name_id = name_id self.verify = verify + self.cache = {} + self.slugs = slugs or [] @property def formdefs_url(self): @@ -92,21 +170,21 @@ class WcsApi(object): def get_json(self, *url_parts): url = reduce(lambda x, y: urlparse.urljoin(x, y), url_parts) params = {'orig': self.orig} - if self.email: - params['email'] = self.email - if self.name_id: - params['NameID'] = self.name_id query_string = urllib.urlencode(params) presigned_url = url + ('&' if '?' in url else '?') + query_string + if presigned_url in self.cache: + return self.cache[presigned_url] signed_url = signature.sign_url(presigned_url, self.key) try: - response = requests.get(signed_url, verify=False) + response = requests.get(signed_url, verify=self.verify) response.raise_for_status() except requests.RequestException, e: raise WcsApiError('GET request failed', signed_url, e) else: try: - return response.json() + content = response.json() + self.cache[presigned_url] = content + return content except ValueError, e: raise WcsApiError('Invalid JSON content', signed_url, e) @@ -116,10 +194,21 @@ class WcsApi(object): @property def formdefs(self): - return [FormDef(wcs_api=self, **d) for d in self.get_json(self.formdefs_url)] + return [FormDef(wcs_api=self, **d) for d in self.get_json(self.formdefs_url) + if not self.slugs or d['slug'] in self.slugs] + + @property + def categories(self): + d = {} + for f in self.formdefs: + if hasattr(f.schema, 'category'): + d[f.schema.category_id] = f.schema.category + return [Category(wcs_api=self, id=k, name=v) for k, v in d.items()] + def get_formdata(self, slug): - for d in self.get_json(self.forms_url, slug + '/'): + for d in self.get_json(self.forms_url, slug + '/list?anonymise&full=on'): yield FormData(wcs_api=self, **d) def get_schema(self, slug): - return BaseObject(wcs_api=self, **self.get_json(self.formdefs_url, slug + '/', 'schema')) + json_schema = self.get_json(self.formdefs_url, slug + '/', 'schema?anonymise') + return Schema(wcs_api=self, **json_schema)