wcs-olap: build a start schema from data exported by w.c.s. API

Given such an .INI file:

	[https://demarches.triffouilly.fr/]
	orig = bi.triffouilly.fr
	key = 452b8964
	pg_dsn = dbname=publik-bi
	email = bi@entrouvert.com
	schema = triffouilly
	# slugs = recette-technique-ajout-d-un-enfant

It builds a schema named 'triffouilly' in the pre-existing database named 'publik-bi', the schema will contains tables named:

    channel (label varchar)
    role (label varchar)
    category (label varchar)
    form (category, label)
    formdata : parent table of all formdata tables)
        (form, receipt_time, year, month, dow, hour, channel, backoffice, generic_status)
    status (generic statuses: new, in progress & closed
        label

    for each formdef tables named:

       formdata_{formdef.slug}
       status_{formdef.slug}

       for each anonymisable
This commit is contained in:
Benjamin Dauvergne 2016-05-10 16:53:17 +02:00
parent c8209c9c94
commit 9c44bb9711
8 changed files with 1038 additions and 125 deletions

23
README.rst Normal file
View File

@ -0,0 +1,23 @@
BI for Publik
=============
w.c.s. OLAP
-----------
Tool to export w.c.s. data in a database with star schema for making an OLAP
cube.::
usage: wcs-olap --url URL [-h] --orig ORIG --key KEY
--pg-dsn PG_DSN
Export W.C.S. data as a star schema in a postgresql DB
optional arguments:
--url URL url of the w.c.s. instance
-h, --help show this help message and exit
--orig ORIG origin of the request for signatures
--key KEY HMAC key for signatures
--pg-dsn PG_DSN Psycopg2 DB DSN

9
create_dates.sql Normal file
View File

@ -0,0 +1,9 @@
-- Crée une table de dates entre 2010 et 2020
DROP TABLE IF EXISTS dates;
CREATE TABLE dates AS (SELECT
the_date.the_date::date AS date,
to_char(the_date.the_date, 'TMday') AS day,
to_char(the_date.the_date, 'TMmonth') AS month
FROM
generate_series('2010-01-01'::date, '2020-01-01'::date, '1 day'::interval)
AS the_date(the_date));

61
setup.py Normal file
View File

@ -0,0 +1,61 @@
#! /usr/bin/env python
import subprocess
import os
from setuptools import setup, find_packages
from setuptools.command.sdist import sdist
class eo_sdist(sdist):
def run(self):
print "creating VERSION file"
if os.path.exists('VERSION'):
os.remove('VERSION')
version = get_version()
version_file = open('VERSION', 'w')
version_file.write(version)
version_file.close()
sdist.run(self)
print "removing VERSION file"
if os.path.exists('VERSION'):
os.remove('VERSION')
def get_version():
'''Use the VERSION, if absent generates a version with git describe, if not
tag exists, take 0.0.0- and add the length of the commit log.
'''
if os.path.exists('VERSION'):
with open('VERSION', 'r') as v:
return v.read()
if os.path.exists('.git'):
p = subprocess.Popen(['git', 'describe', '--dirty', '--match=v*'], stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
result = p.communicate()[0]
if p.returncode == 0:
result = result.split()[0][1:]
else:
result = '0.0.0-%s' % len(subprocess.check_output(
['git', 'rev-list', 'HEAD']).splitlines())
return result.replace('-', '.').replace('.g', '+g')
return '0.0.0'
setup(name="wcs-olap",
version=get_version(),
license="AGPLv3+",
description="Export w.c.s. data to an OLAP cube",
long_description=open('README.rst').read(),
url="http://dev.entrouvert.org/projects/publik-bi/",
author="Entr'ouvert",
author_email="authentic@listes.entrouvert.com",
maintainer="Benjamin Dauvergne",
maintainer_email="bdauvergne@entrouvert.com",
packages=find_packages(),
include_package_data=True,
install_requires=['requests','psycopg2', 'isodate'],
entry_points={
'console_scripts': ['wcs-olap=wcs_olap.cmd:main'],
},
cmdclass={'sdist': eo_sdist})

View File

@ -1,59 +1,97 @@
import argparse
import ConfigParser
import os
import urlparse
import logging
import logging.config
from . import wcs_api
from .feeder import WcsOlapFeeder
import locale
from . import tb
def main():
locale.setlocale(locale.LC_ALL, '')
try:
main2()
except SystemExit:
raise
except:
raise
tb.print_tb()
raise SystemExit(1)
def get_config(path=None):
config = ConfigParser.ConfigParser()
global_config_file = '/etc/wcs_olap.ini'
if os.path.exists(global_config_file):
config.read(global_config_file)
if config.has_section('loggers'):
logging.config.fileConfig(global_config_file)
user_config_file = os.path.expanduser('~/.wcs_olap.ini')
if os.path.exists(user_config_file):
config.read(user_config_file)
if config.has_section('loggers'):
logging.config.fileConfig(user_config_file)
global_config_path = '/etc/wcs_olap.ini'
user_config_path = os.path.expanduser('~/.wcs_olap.ini')
if not path:
if os.path.exists(user_config_path):
path = user_config_path
elif os.path.exists(global_config_path):
path = global_config_path
else:
return config
config.read(path)
if config.has_section('loggers'):
logging.config.fileConfig(path)
return config
def main2():
locale.setlocale(locale.LC_ALL, '')
parser = argparse.ArgumentParser(description='Export W.C.S. data as a star schema in a '
'postgresql DB', add_help=False)
parser.add_argument('config_path', default=None)
group = parser.add_mutually_exclusive_group()
group.add_argument("-a", "--all", help="synchronize all wcs", action='store_true',
default=False)
group.add_argument('--url', help='url of the w.c.s. instance', required=False, default=None)
args, rest = parser.parse_known_args()
config = get_config(path=args.config_path)
# list all known urls
urls = [url for url in config.sections() if url.startswith('http://') or
url.startswith('https://')]
parser = argparse.ArgumentParser(description='Engine ES with W.C.S. data', add_help=False)
parser.add_argument('--url', help='url of the w.c.s. instance', required=not urls,
default=(urls or [None])[0])
args, rest = parser.parse_known_args()
defaults = {}
if getattr(args, 'url') and config.has_section(args.url):
defaults = dict(config.items(args.url))
parser.add_argument("-h", "--help", action="help", help="show this help message and exit")
parser.add_argument('--orig', help='origin of the request for signatures',
required='orig' not in defaults)
parser.add_argument('--key', help='HMAC key for signatures', required='key' not in defaults)
group = parser.add_mutually_exclusive_group(
required='email' not in defaults and 'name_id' not in defaults)
group.add_argument('--email', help='email for authentication')
group.add_argument('--name-id', help='NameID for authentication')
if defaults:
parser.set_defaults(**defaults)
parser.add_argument('--pg-dsn', help='Psycopg2 DB DSN', required='pg-dsn' not in defaults)
if not args.all:
try:
url = args.url or urls[0]
except IndexError:
print 'no url found'
raise SystemExit(1)
urls = [url]
if config.has_section(args.url):
defaults = dict(config.items(args.url))
parser.add_argument("-h", "--help", action="help", help="show this help message and exit")
parser.add_argument('--orig', help='origin of the request for signatures')
parser.add_argument('--key', help='HMAC key for signatures')
parser.add_argument('--pg-dsn', help='Psycopg2 DB DSN')
parser.add_argument('--schema', help='schema name')
args = parser.parse_args()
for key in ('orig', 'key', 'pg_dsn', 'schema'):
if getattr(args, key, None):
defaults[key] = getattr(args, key)
args = parser.parse_args()
api = wcs_api.WcsApi(url=args.url, orig=args.orig, key=args.key, email=args.email,
name_id=args.name_id)
domain = urlparse.urlparse(args.url).netloc.split(':')[0]
schema = defaults.get('schema') or domain.replace('.', '_')
logger = logging.getLogger('wcs-olap')
logger.info('starting synchronizing w.c.s. at %r with ES at %s:%s', args.url, args.es_host,
args.es_port)
feeder = WcsOlapFeeder(api=api, schema=schema, db_url=args.db_url, logger=logger)
feeder.feed()
logger.info('finished')
for url in urls:
if config.has_section(url):
defaults.update((config.items(url)))
try:
key = defaults['key']
orig = defaults['orig']
schema = defaults['schema']
pg_dsn = defaults['pg_dsn']
slugs = defaults.get('slugs', '').strip().split() or None
except KeyError, e:
logger.error('configuration in complete for %s: %s', url, e)
else:
api = wcs_api.WcsApi(url=url, orig=orig, key=key, slugs=slugs)
logger.info('starting synchronizing w.c.s. at %r with PostgreSQL at %s', url, pg_dsn)
feeder = WcsOlapFeeder(api=api, schema=schema, pg_dsn=pg_dsn, logger=logger,
config=defaults)
feeder.feed()
logger.info('finished')
defaults = {}
if __name__ == '__main__':
main()

View File

@ -1,43 +1,383 @@
# -*- coding: utf-8 -*-
import copy
import os
import json
import hashlib
from utils import Whatever
import psycopg2
from wcs_olap.wcs_api import WcsApiError
def slugify(s):
return s.replace('-', '_').replace(' ', '_')
class Context(object):
def __init__(self):
self.stack = []
def __getitem__(self, key):
if not self.stack:
raise KeyError(key)
for d in self.stack[::-1]:
try:
return d[key]
except KeyError:
pass
else:
raise KeyError(key)
def push(self, d):
self.stack.append(d)
def pop(self):
self.stack = self.stack[:-1]
def as_dict(self):
r = {}
for d in self.stack:
r.update(d)
return r
class WcsOlapFeeder(object):
def __init__(self, api, dsn, schema, logger=None):
def __init__(self, api, pg_dsn, schema, logger=None, config=None):
self.api = api
self.logger = logger or Whatever()
self.schema = schema
self.connection = psycopg2.connect(dsn)
self.connection = psycopg2.connect(dsn=pg_dsn)
self.connection.autocommit = True
self.cur = self.connection.cursor()
self.formdefs = api.get_formdefs()
self.formdefs = api.formdefs
self.roles = api.roles
@property
def default_ctx(self):
return {
self.categories = api.categories
self.ctx = Context()
self.ctx.push({
'schema': self.schema,
'role_table': 'role',
'channel_table': 'channel',
'category_table': 'category',
'form_table': 'formdef',
'generic_formdata_table': 'formdata',
'generic_status_table': 'status',
'year_table': 'year',
'month_table': 'month',
'day_table': 'day',
'dow_table': 'dow',
'hour_table': 'hour',
})
self.config = config or {}
self.model = {
'label': self.config.get('cubes_label', schema),
'name': schema,
'browser_options': {
'schema': schema,
},
'joins': [
{
'name': 'receipt_time',
'master': 'receipt_time',
'detail': {
'table': 'dates',
'column': 'date',
'schema': 'public',
},
'method': 'detail',
'alias': 'dates',
},
{
'name': 'channel',
'master': 'channel_id',
'detail': '{channel_table}.id',
'method': 'detail',
},
{
'name': 'role',
'detail': '{role_table}.id',
'method': 'detail',
},
{
'name': 'formdef',
'master': 'formdef_id',
'detail': '{form_table}.id',
'method': 'detail',
},
{
'name': 'category',
'master': '{form_table}.category_id',
'detail': '{category_table}.id',
},
{
'name': 'hour',
'master': 'hour_id',
'detail': '{hour_table}.id',
'method': 'detail',
},
{
'name': 'generic_status',
'master': 'generic_status_id',
'detail': '{generic_status_table}.id',
'method': 'detail',
},
],
'dimensions': [
{
'label': 'date de soumission',
'name': 'receipt_time',
'role': 'time',
'levels': [
{
'name': 'year',
'label': 'année',
'role': 'year',
'order_attribute': 'year',
'order': 'asc',
},
{
'name': 'quarter',
'order_attribute': 'quarter',
'label': 'trimestre',
'role': 'quarter',
},
{
'name': 'month',
'label': 'mois',
'role': 'month',
'attributes': ['month', 'month_name'],
'order_attribute': 'month',
'label_attribute': 'month_name',
'order': 'asc',
},
{
'name': 'week',
'label': 'semaine',
'role': 'week',
},
{
'name': 'day',
'label': 'jour',
'role': 'day',
'order': 'asc',
},
{
'name': 'dow',
'label': 'jour de la semaine',
'attributes': ['dow', 'dow_name'],
'order_attribute': 'dow',
'label_attribute': 'dow_name',
'order': 'asc',
},
],
'hierarchies': [
{
'name': 'default',
'label': 'par défaut',
'levels': ['year', 'month', 'day']
},
{
'name': 'quarterly',
'label': 'par trimestre',
'levels': ['year', 'quarter']
},
{
'name': 'weekly',
'label': 'par semaine',
'levels': ['year', 'week']
},
{
'name': 'dowly',
'label': 'par jour de la semaine',
'levels': ['dow']
},
]
},
{
'label': 'canaux',
'name': 'channels',
},
{
'label': 'catégories',
'name': 'categories',
},
{
'label': 'formulaire',
'name': 'formdef',
},
{
'label': 'statuts génériques',
'name': 'generic_statuses',
},
{
'label': 'heure',
'name': 'hours',
'levels': [
{
'name': 'hours',
'attributes': ['hour_id', 'hour_label'],
'order_attribute': 'hour_id',
'label_attribute': 'hour_label',
}
]
},
],
'mappings': {
'receipt_time.year': {
'table': 'dates',
'column': 'date',
'schema': 'public',
'extract': 'year',
},
'receipt_time.month': {
'table': 'dates',
'column': 'date',
'schema': 'public',
'extract': 'month'
},
'receipt_time.month_name': {
'table': 'dates',
'schema': 'public',
'column': 'month'
},
'receipt_time.week': {
'table': 'dates',
'column': 'date',
'schema': 'public',
'extract': 'week'
},
'receipt_time.day': {
'table': 'dates',
'column': 'date',
'schema': 'public',
'extract': 'day'
},
'receipt_time.dow': {
'table': 'dates',
'column': 'date',
'schema': 'public',
'extract': 'dow'
},
'receipt_time.dow_name': {
'table': 'dates',
'schema': 'public',
'column': 'day',
},
'receipt_time.quarter': {
'table': 'dates',
'column': 'date',
'schema': 'public',
'extract': 'quarter'
},
'formdef': 'formdef.label',
'channels': 'channel.label',
'categories': 'category.label',
'generic_statuses': 'status.label',
'hours.hour_label': '{hour_table}.label',
'hours.hour_id': '{hour_table}.id',
},
'cubes': [
{
'name': schema + '_formdata',
'label': 'Toutes les demandes (%s)' % schema,
'key': 'id',
'fact': 'formdata',
'dimensions': [
'receipt_time',
'hours',
'channels',
'categories',
'formdef',
'generic_statuses',
],
'joins': [
{
'name': 'receipt_time',
},
{
'name': 'hour',
},
{
'name': 'channel',
},
{
'name': 'formdef',
},
{
'name': 'category',
},
{
'name': 'generic_status',
},
],
'measures': [
{
'name': 'endpoint_delay',
'label': 'délai de traitement',
'nonadditive': 'all',
},
],
'aggregates': [
{
'name': 'record_count',
'label': 'nombre de demandes',
'function': 'count'
},
{
'name': 'endpoint_delay_max',
'label': 'délai de traitement maximum',
'measure': 'endpoint_delay',
'function': 'max',
},
{
'name': 'endpoint_delay_avg',
'label': 'délai de traitement moyen',
'measure': 'endpoint_delay',
'function': 'avg',
},
],
},
],
}
# apply table names
self.model = self.tpl(self.model)
self.base_cube = self.model['cubes'][0]
def hash_table_name(self, table_name):
table_name = table_name.format(**self.default_ctx)
if len(table_name) < 64:
return table_name
else:
return table_name[:57] + hashlib.md5(table_name).hexdigest()[:6]
@property
def default_ctx(self):
return self.ctx.as_dict()
def ex(self, query, ctx=None, vars=None):
ctx = ctx or {}
ctx.update(self.default_ctx)
self.cur.execute(query.format(**(ctx or {})), vars=vars)
sql = query.format(**(ctx or {}))
self.logger.debug('SQL: %s VARS: %s', sql, vars)
self.cur.execute(sql, vars=vars)
def exmany(self, query, varslist, ctx=None):
ctx = ctx or {}
ctx.update(self.default_ctx)
self.cur.executemany(query.format(**(ctx or {})), varslist)
sql = query.format(**(ctx or {}))
self.logger.debug('SQL: %s VARSLIST: %s', sql, varslist)
self.cur.executemany(sql, varslist)
def do_schema(self):
self.logger.debug('dropping schema %s', self.schema)
self.ex('SET search_path = public')
self.ex('DROP SCHEMA {schema} IF EXISTS')
self.ex('CREATE SCHEMA {schema} IF EXISTS')
self.ex('DROP SCHEMA IF EXISTS {schema} CASCADE')
self.logger.debug('creating schema %s', self.schema)
self.ex('CREATE SCHEMA {schema}')
self.ex('SET search_path = {schema},public')
channels = [
[1, 'web', u'web']
[1, 'web', u'web'],
[2, 'mail', u'courrier'],
[3, 'phone', u'téléphone'],
[4, 'counter', u'guichet'],
@ -45,28 +385,143 @@ class WcsOlapFeeder(object):
channel_to_id = dict((c[1], c[0]) for c in channels)
id_to_channel = dict((c[0], c[1]) for c in channels)
status = [
[1, 'Nouveau'],
[2, 'En cours'],
[3, 'Terminé'],
]
status_to_id = dict((c[1], c[0]) for c in channels)
id_to_status = dict((c[0], c[1]) for c in channels)
def create_table(self, name, columns, inherits=None, comment=None):
sql = 'CREATE TABLE %s' % name
sql += '(' + ', '.join('%s %s' % (n, t) for n, t in columns) + ')'
if inherits:
sql += ' INHERITS (%s)' % inherits
self.ex(sql)
if comment:
self.ex('COMMENT ON TABLE %s IS %%s' % name, vars=(comment,))
def create_labeled_table(self, name, labels, comment=None):
self.create_table(name,
[
['id', 'smallint primary key'],
['label', 'varchar']
], comment=comment)
values = ', '.join(self.cur.mogrify('(%s, %s)', [_id, _label]) for _id, _label in labels)
if not values:
return
self.ex('INSERT INTO %s (id, label) VALUES %s' % (str(name), values))
def tpl(self, o, ctx=None):
ctx = ctx or {}
ctx.update(self.default_ctx)
def helper(o):
if isinstance(o, basestring):
return o.format(**ctx)
elif isinstance(o, dict):
return dict((k, helper(v)) for k, v in o.iteritems())
elif isinstance(o, list):
return [helper(v) for v in o]
elif isinstance(o, (bool, int, float)):
return o
else:
assert False, '%s is not a valid value for JSON' % o
return helper(o)
def add_dim(self, **kwargs):
self.dimensions.append(self.tpl(kwargs))
def do_base_table(self):
self.ex('CREATE TABLE {channel_table} (id serial PRIMARY KEY, label varchar)')
self.exmany('INSERT INTO {channel_table} (id, label) VALUES (%s, %s)',
[[c[0], c[2]] for c in self.channels])
self.ex('CREATE TABLE {role_table} (id serial PRIMARY KEY, label varchar)')
self.exmany('INSERT INTO {role_table} (label) VALUES (%s) RETURNING (id)',
[[role.name] for role in self.roles])
self.roles_mapping = []
for row, role in zip(self.cur.fetchall(), self.roles):
self.roles_mappin[role.id] = row[0]
# channels
self.create_labeled_table('{channel_table}', [[c[0], c[2]] for c in self.channels])
# roles
roles = dict((i, role.name) for i, role in enumerate(self.roles))
self.create_labeled_table('{role_table}', roles.items())
self.role_mapping = dict((role.id, i) for i, role in enumerate(self.roles))
# categories
self.create_labeled_table('{category_table}', enumerate(c.name for c in self.categories))
self.categories_mapping = dict((c.id, i) for i, c in enumerate(self.categories))
# years
self.create_labeled_table('{year_table}', zip(range(2000, 2030), map(str, range(2000,
2030))))
# month
self.create_labeled_table('{month_table}', zip(range(1, 13), [u'janvier', u'février',
u'mars', u'avril', u'mai',
u'juin', u'juillet', u'août',
u'septembre', u'octobre',
u'novembre', u'décembre']))
# years
self.create_labeled_table('{day_table}', zip(range(1, 32), map(str, range(1, 32))))
# day of week
self.create_labeled_table('{dow_table}', enumerate([u'lundi', u'mardi', u'mercredi',
u'jeudi', u'vendredi', u'samedi',
u'dimanche']))
self.create_labeled_table('{hour_table}', zip(range(0, 24), map(str, range(0, 24))))
self.create_labeled_table('{generic_status_table}', self.status)
self.ex('CREATE TABLE {form_table} (id serial PRIMARY KEY,'
' category_id integer REFERENCES {category_table} (id),'
' label varchar)')
self.columns = [
['id', 'serial primary key'],
['formdef_id', 'smallint REFERENCES {form_table} (id)'],
['receipt_time', 'date'],
['year_id', 'smallint REFERENCES {year_table} (id)'],
['month_id', 'smallint REFERENCES {month_table} (id)'],
['hour_id', 'smallint REFERENCES {hour_table} (id)'],
['day_id', 'smallint REFERENCES {day_table} (id)'],
['dow_id', 'smallint REFERENCES {dow_table} (id)'],
['channel_id', 'smallint REFERENCES {channel_table} (id)'],
['backoffice', 'boolean'],
['generic_status_id', 'smallint REFERENCES {generic_status_table} (id)'],
['endpoint_delay', 'real'],
]
self.comments = {
'formdef_id': u'dim$formulaire',
'receipt_time': u'time$date de réception',
'year_id': u'dim$année',
'month_id': u'dim$mois',
'hour_id': u'dim$heure',
'day_id': u'dim$jour',
'dow_id': u'dim$jour de la semaine',
'channel_id': u'dim$canal',
'backoffice': u'dim$soumission backoffce',
'generic_status_id': u'dim$statut générique',
'endpoint_delay': u'measure$délai de traitement',
}
self.create_table('{generic_formdata_table}', self.columns)
for at, comment in self.comments.iteritems():
self.ex('COMMENT ON COLUMN {generic_formdata_table}.%s IS %%s' % at, vars=(comment,))
def feed(self):
self.do_schema()
self.do_base_table()
for formdef in self.api.get_formdefs():
self.feed_formdef(formdef)
for formdef in self.formdefs:
try:
formdef_feeder = WcsFormdefFeeder(self, formdef)
formdef_feeder.feed()
except WcsApiError, e:
# ignore authorization errors
if (len(e.args) > 2 and hasattr(e.args[2], 'response')
and e.args[2].response.status_code == 403):
continue
self.logger.error('failed to retrieve formdef %s', formdef.slug)
class WcsFormdefFeeder(object):
def __init__(self, olap_feeder, formdef):
self.olap_feeder = olap_feeder
self.formdef = formdef
self.status_mapping = {}
self.items_mappings = {}
self.fields = []
@property
def table_name(self):
@ -74,45 +529,222 @@ class WcsFormdefFeeder(object):
@property
def status_table_name(self):
return 'formdata_status_%s' % self.formdef.slug.replace('-', '_')
@property
def default_ctx(self):
return {
'table_name': self.table_name,
'status_table_name': self.status_table_name,
}
return 'status_%s' % self.formdef.slug.replace('-', '_')
def __getattr__(self, name):
return getattr(self.olap_feeder, name)
def ex(self, query, ctx=None, vars=None):
ctx = ctx or {}
ctx.update(self.default_ctx)
self.olap_feeder.ex(query, ctx=ctx, vars=vars)
def formdef_exmany(self, formdef, statement, varslist, ctx=None):
ctx = ctx or {}
ctx.update(self.default_ctx)
self.olap_feeder.exmany(statement, varslist, ctx=ctx)
def do_statuses(self):
self.ex('CREATE TABLE {status_table_name} (id serial PRIMARY KEY, '
'submission_backoffice label varchar)')
statuses = self.formdef.schema.workflow['statuses'].items()
labels = [[defn['name']] for status, defn in statuses]
self.exmany('INSERT INTO {status_mapping} (label) VALUES (%s) RETURNING (id)',
varslist=labels)
for status_sql_id, (status_id, defn) in zip(self.cur.fetchall(), statuses):
self.status_mapping['wf-%s' % status_id] = status_sql_id
statuses = self.formdef.schema.workflow.statuses
self.olap_feeder.create_labeled_table(self.status_table_name,
enumerate([s.name for s in statuses]))
self.status_mapping = dict((s.id, i) for i, s in enumerate(statuses))
def do_data_table(self):
self.ex('CREATE TABLE {table_name} (id serial PRIMARY KEY, '
'receipt_date date, '
'status integer REFERENCES {status_table_name} (id))')
self.ex('INSERT INTO {form_table} (category_id, label) VALUES (%s, %s) RETURNING (id)',
vars=[self.categories_mapping.get(self.formdef.schema.category_id),
self.formdef.schema.name])
self.formdef_sql_id = self.cur.fetchone()[0]
columns = [['status_id', 'smallint REFERENCES {status_table} (id)']]
comments = {}
# add item fields
for field in self.formdef.schema.fields:
if not field.items and not field.options:
continue
if not field.varname:
continue
self.fields.append(field)
comment = (u'dim$%s$Valeur du champ « %s » du formulaire %s'
% (field.label, field.label, self.formdef.schema.name))
table_name = self.hash_table_name('{formdata_table}_field_%s' % field.varname)
# create table and mapping
if field.items:
self.create_labeled_table(table_name, enumerate(field.items),
comment=comment)
self.items_mappings[field.varname] = dict(
(item, i) for i, item in enumerate(field.items))
elif field.options:
options = enumerate(field.options)
for option in field.options:
self.create_labeled_table(table_name,
[(i, o['label']) for i, o in options],
comment=comment)
self.items_mappings[field.varname] = dict((o['value'], i) for i, o in options)
at = 'field_%s' % field.varname
columns.append([at, 'smallint REFERENCES %s (id)' % table_name])
comments[at] = u'dim$' + field.label
# add geolocation fields
for geolocation, label in self.formdef.schema.geolocations:
at = 'geolocation_%s' % geolocation
columns.append([at, 'point'])
comments[at] = u'dim$' + label
# add function fields
for function, name in self.formdef.schema.workflow.functions.iteritems():
at = 'function_%s' % slugify(function)
columns.append([at, 'smallint REFERENCES {role_table} (id)'])
comments[at] = u'dim$Fonction « %s »' % name
self.columns = ([name for name, _type in self.olap_feeder.columns]
+ [name for name, _type in columns])
self.create_table('{formdata_table}', columns, inherits='{generic_formdata_table}',
comment=u'cube$%s' % self.formdef.schema.name)
for at, comment in comments.iteritems():
self.ex('COMMENT ON COLUMN {formdata_table}.%s IS %%s' % at, vars=(comment,))
# PostgreSQL does not propagate foreign key constraints to child tables
# so we must recreate them manually
for name, _type in self.olap_feeder.columns:
if 'REFERENCES' not in _type:
continue
i = _type.index('REFERENCES')
constraint = '%s_fk_constraint FOREIGN KEY (%s) %s' % (name, name, _type[i:])
self.ex('ALTER TABLE {formdata_table} ADD CONSTRAINT %s' % constraint)
def do_data(self):
values = []
for data in self.formdef.datas:
# ignore formdata without status
if not data.workflow.status:
continue
status = data.formdef.schema.workflow.statuses_map[data.workflow.status.id]
if data.endpoint_delay:
endpoint_delay = (data.endpoint_delay.days + float(data.endpoint_delay.seconds) /
86400.)
else:
endpoint_delay = None
row = {
'formdef_id': self.formdef_sql_id,
'receipt_time': data.receipt_time,
'year_id': data.receipt_time.year,
'month_id': data.receipt_time.month,
'day_id': data.receipt_time.day,
'hour_id': data.receipt_time.hour,
'dow_id': data.receipt_time.weekday(),
'channel_id': self.channel_to_id[data.submission.channel.lower()],
'backoffice': data.submission.backoffice,
# FIXME "En cours"/2 is never used
'generic_status_id': 3 if status.endpoint else 1,
'status_id': self.status_mapping[data.workflow.status.id],
'endpoint_delay': endpoint_delay,
}
# add form fields value
for field in self.fields:
v = None
if field.type == 'item':
# map items to sql id
v = self.items_mappings[field.varname].get(data.fields.get(field.varname))
row['field_%s' % field.varname] = v
# add geolocation fields value
for geolocation, label in self.formdef.schema.geolocations:
v = (data.geolocations or {}).get(geolocation)
row['geolocation_%s' % geolocation] = v
# add function fields value
for function, name in self.formdef.schema.workflow.functions.iteritems():
try:
v = data.functions[function]
except KeyError:
v = None
else:
v = self.olap_feeder.role_mapping[v.id]
at = 'function_%s' % slugify(function)
row[at] = v
tpl = '(' + ', '.join(['%s'] * len(self.columns[1:])) + ')'
value = self.cur.mogrify(tpl, [row[column] for column in self.columns[1:]])
values.append(value)
if not values:
self.logger.warning('no data')
return
self.ex('INSERT INTO {formdata_table} (%s) VALUES %s' % (
', '.join(self.columns[1:]), # skip the id column
', '.join(values)))
def feed(self):
self.logger.info('feed formdef %s', self.formdef.slug)
self.do_statuses()
self.do_data_table()
self.olap_feeder.ctx.push({
'formdata_table': self.table_name,
'status_table': self.status_table_name,
})
# create cube
self.cube = copy.deepcopy(self.base_cube)
self.cube.update({
'name': self.schema + '_' + self.table_name,
'label': self.formdef.schema.name,
'fact': self.table_name,
})
# add dimension for status
self.cube['joins'].append({
'master': 'status_id',
'detail': '%s.id' % self.status_table_name,
'method': 'detail',
})
dim_name = '%s_%s' % (self.table_name, 'status')
self.model['dimensions'].append({
'name': dim_name,
'label': 'statut',
'levels': [
{
'name': 'status',
'attributes': ['status_id', 'status_label'],
'order_attribute': 'status_id',
'label_attribute': 'status_label',
},
],
})
self.model['mappings']['%s.status_id' % dim_name] = '%s.id' % self.status_table_name
self.model['mappings']['%s.status_label' % dim_name] = '%s.label' % self.status_table_name
self.cube['dimensions'].append(dim_name)
# add dimension for function
for function, name in self.formdef.schema.workflow.functions.iteritems():
at = 'function_%s' % slugify(function)
dim_name = '%s_function_%s' % (self.table_name, slugify(function))
self.cube['joins'].append({
'master': at,
'detail': self.tpl('{role_table}.id'),
'alias': at,
})
self.model['dimensions'].append({
'name': dim_name,
'label': u'fonction %s' % name,
})
self.model['mappings'][dim_name] = '%s.label' % at
self.cube['dimensions'].append(dim_name)
# add dimensions for item fields
for field in self.fields:
if field.type != 'item':
continue
table_name = self.hash_table_name('{formdata_table}_field_%s' % field.varname)
self.cube['joins'].append({
'master': 'field_%s' % field.varname,
'detail': '%s.id' % table_name,
'method': 'detail',
})
dim_name = '%s_%s' % (self.table_name. field.varname)
self.model['dimensions'].append({
'name': dim_name,
'label': field.label,
})
self.model['mappings'][dim_name] = '%s.label' % table_name
self.cube['dimensions'].append(dim_name)
self.model['cubes'].append(self.cube)
try:
self.logger.info('feed formdef %s', self.formdef.slug)
self.do_statuses()
self.do_data_table()
self.do_data()
finally:
self.olap_feeder.ctx.pop()
if 'cubes_model_dirs' in self.config:
model_path = os.path.join(self.config['cubes_model_dirs'], '%s.json' % self.schema)
with open(model_path, 'w') as f:
json.dump(self.model, f, indent=2, sort_keys=True)

55
wcs_olap/tb.py Normal file
View File

@ -0,0 +1,55 @@
from StringIO import StringIO
import sys
import linecache
def print_tb():
exc_type, exc_value, tb = sys.exc_info()
if exc_value:
exc_value = unicode(str(exc_value), errors='ignore')
error_file = StringIO()
limit = None
if hasattr(sys, 'tracebacklimit'):
limit = sys.tracebacklimit
print >>error_file, "Exception:"
print >>error_file, " type = '%s', value = '%s'" % (exc_type, exc_value)
print >>error_file
# format the traceback
print >>error_file, 'Stack trace (most recent call first):'
n = 0
while tb is not None and (limit is None or n < limit):
frame = tb.tb_frame
function = frame.f_code.co_name
filename = frame.f_code.co_filename
exclineno = frame.f_lineno
locals = frame.f_locals.items()
print >>error_file, ' File "%s", line %s, in %s' % (filename, exclineno, function)
linecache.checkcache(filename)
for lineno in range(exclineno - 2, exclineno + 3):
line = linecache.getline(filename, lineno, frame.f_globals)
if line:
if lineno == exclineno:
print >>error_file, '>%5s %s' % (lineno, line.rstrip())
else:
print >>error_file, ' %5s %s' % (lineno, line.rstrip())
print >>error_file
if locals:
print >>error_file, " locals: "
for key, value in locals:
print >>error_file, " %s =" % key,
try:
repr_value = repr(value)
if len(repr_value) > 10000:
repr_value = repr_value[:10000] + ' [...]'
print >>error_file, repr_value,
except:
print >>error_file, "<ERROR WHILE PRINTING VALUE>",
print >>error_file
print >>error_file
tb = tb.tb_next
n = n + 1
print error_file.getvalue()

6
wcs_olap/utils.py Normal file
View File

@ -0,0 +1,6 @@
class Whatever(object):
def __call__(*args, **kwargs):
pass
def __getattr__(self, name):
return self

View File

@ -1,7 +1,7 @@
import requests
import urlparse
import urllib
import datetime
import isodate
from . import signature
@ -15,33 +15,107 @@ class BaseObject(object):
self.__wcs_api = wcs_api
self.__dict__.update(**kwargs)
def json(self):
d = self.__dict__.copy()
for key in d.keys():
if key.startswith('_'):
del d[key]
return d
class FormDataWorkflow(BaseObject):
status = None
def __init__(self, wcs_api, **kwargs):
super(FormDataWorkflow, self).__init__(wcs_api, **kwargs)
if self.status is not None:
self.status = BaseObject(wcs_api, **self.status)
class Evolution(BaseObject):
user = None
status = None
parts = None
def __init__(self, wcs_api, **kwargs):
super(Evolution, self).__init__(wcs_api, **kwargs)
self.time = isodate.parse_datetime(self.time)
if self.parts:
self.parts = [BaseObject(wcs_api, **part) for part in self.parts]
class FormData(BaseObject):
def json(self):
d = super(FormData, self).json()
formdef = d.pop('formdef')
receipt_time = datetime.datetime.strptime(d['receipt_time'], "%Y-%m-%dT%H:%M:%SZ")
d['receipt_time__dow'] = receipt_time.strftime('%A')
d['receipt_time__dow_int'] = int(receipt_time.strftime('%w'))
d['receipt_time__month'] = receipt_time.strftime('%B')
d['receipt_time__month_int'] = int(receipt_time.strftime('%m'))
d['receipt_time__hour'] = int(receipt_time.strftime('%H'))
d['formdef_slug'] = formdef.slug
return d
geolocations = None
evolution = None
def __init__(self, wcs_api, **kwargs):
super(FormData, self).__init__(wcs_api, **kwargs)
self.receipt_time = isodate.parse_datetime(self.receipt_time)
self.submission = BaseObject(wcs_api, **self.submission)
self.workflow = FormDataWorkflow(wcs_api, **self.workflow)
self.evolution = [Evolution(wcs_api, **evo) for evo in self.evolution or []]
self.functions = {}
self.concerned_roles = []
self.action_roles = []
for function in self.roles:
roles = [Role(wcs_api, **r) for r in self.roles[function]]
if function == 'concerned':
self.concerned_roles.extend(roles)
elif function == 'actions':
self.concerned_roles.extend(roles)
else:
try:
self.functions[function] = roles[0]
except IndexError:
self.functions[function] = None
del self.roles
def __repr__(self):
return '<{klass} {display_id!r}>'.format(klass=self.__class__.__name__,
display_id=self.id)
@property
def endpoint_delay(self):
'''Compute delay as the time when the last not endpoint status precedes an endpoint
status.'''
statuses_map = self.formdef.schema.workflow.statuses_map
s = 0
for evo in self.evolution[::-1]:
if evo.status:
if statuses_map[evo.status].endpoint:
s = 1
last = evo.time - self.receipt_time
else:
if s == 1:
return last
else:
return
class Workflow(BaseObject):
status = None
def __init__(self, wcs_api, **kwargs):
super(Workflow, self).__init__(wcs_api, **kwargs)
self.statuses = [BaseObject(wcs_api, **v) for v in self.statuses]
self.statuses_map = dict((s.id, s) for s in self.statuses)
class Field(BaseObject):
items = None
options = None
varname = None
in_filters = False
class Schema(BaseObject):
category_id = None
category = None
geolocations = None
def __init__(self, wcs_api, **kwargs):
super(Schema, self).__init__(wcs_api, **kwargs)
self.workflow = Workflow(wcs_api, **self.workflow)
self.fields = [Field(wcs_api, **f) for f in self.fields]
self.geolocations = sorted((k, v) for k, v in (self.geolocations or {}).items())
class FormDef(BaseObject):
geolocations = None
def __init__(self, wcs_api, **kwargs):
self.__wcs_api = wcs_api
self.__dict__.update(**kwargs)
@ -54,7 +128,7 @@ class FormDef(BaseObject):
datas = self.__wcs_api.get_formdata(self.slug)
for data in datas:
data.formdef = self
return datas
yield data
@property
def schema(self):
@ -68,14 +142,18 @@ class Role(BaseObject):
pass
class Category(BaseObject):
pass
class WcsApi(object):
def __init__(self, url, orig, key, name_id=None, email=None, verify=False):
def __init__(self, url, orig, key, verify=True, slugs=None):
self.url = url
self.orig = orig
self.key = key
self.email = email
self.name_id = name_id
self.verify = verify
self.cache = {}
self.slugs = slugs or []
@property
def formdefs_url(self):
@ -92,21 +170,21 @@ class WcsApi(object):
def get_json(self, *url_parts):
url = reduce(lambda x, y: urlparse.urljoin(x, y), url_parts)
params = {'orig': self.orig}
if self.email:
params['email'] = self.email
if self.name_id:
params['NameID'] = self.name_id
query_string = urllib.urlencode(params)
presigned_url = url + ('&' if '?' in url else '?') + query_string
if presigned_url in self.cache:
return self.cache[presigned_url]
signed_url = signature.sign_url(presigned_url, self.key)
try:
response = requests.get(signed_url, verify=False)
response = requests.get(signed_url, verify=self.verify)
response.raise_for_status()
except requests.RequestException, e:
raise WcsApiError('GET request failed', signed_url, e)
else:
try:
return response.json()
content = response.json()
self.cache[presigned_url] = content
return content
except ValueError, e:
raise WcsApiError('Invalid JSON content', signed_url, e)
@ -116,10 +194,21 @@ class WcsApi(object):
@property
def formdefs(self):
return [FormDef(wcs_api=self, **d) for d in self.get_json(self.formdefs_url)]
return [FormDef(wcs_api=self, **d) for d in self.get_json(self.formdefs_url)
if not self.slugs or d['slug'] in self.slugs]
@property
def categories(self):
d = {}
for f in self.formdefs:
if hasattr(f.schema, 'category'):
d[f.schema.category_id] = f.schema.category
return [Category(wcs_api=self, id=k, name=v) for k, v in d.items()]
def get_formdata(self, slug):
for d in self.get_json(self.forms_url, slug + '/'):
for d in self.get_json(self.forms_url, slug + '/list?anonymise&full=on'):
yield FormData(wcs_api=self, **d)
def get_schema(self, slug):
return BaseObject(wcs_api=self, **self.get_json(self.formdefs_url, slug + '/', 'schema'))
json_schema = self.get_json(self.formdefs_url, slug + '/', 'schema?anonymise')
return Schema(wcs_api=self, **json_schema)