wcs-olap: build a start schema from data exported by w.c.s. API
Given such an .INI file: [https://demarches.triffouilly.fr/] orig = bi.triffouilly.fr key = 452b8964 pg_dsn = dbname=publik-bi email = bi@entrouvert.com schema = triffouilly # slugs = recette-technique-ajout-d-un-enfant It builds a schema named 'triffouilly' in the pre-existing database named 'publik-bi', the schema will contains tables named: channel (label varchar) role (label varchar) category (label varchar) form (category, label) formdata : parent table of all formdata tables) (form, receipt_time, year, month, dow, hour, channel, backoffice, generic_status) status (generic statuses: new, in progress & closed label for each formdef tables named: formdata_{formdef.slug} status_{formdef.slug} for each anonymisable
This commit is contained in:
parent
e23c50f8ac
commit
ac3dc40a7a
|
@ -0,0 +1,23 @@
|
|||
BI for Publik
|
||||
=============
|
||||
|
||||
w.c.s. OLAP
|
||||
-----------
|
||||
|
||||
Tool to export w.c.s. data in a database with star schema for making an OLAP
|
||||
cube.::
|
||||
|
||||
usage: wcs-olap --url URL [-h] --orig ORIG --key KEY
|
||||
--pg-dsn PG_DSN
|
||||
|
||||
Export W.C.S. data as a star schema in a postgresql DB
|
||||
|
||||
optional arguments:
|
||||
--url URL url of the w.c.s. instance
|
||||
-h, --help show this help message and exit
|
||||
--orig ORIG origin of the request for signatures
|
||||
--key KEY HMAC key for signatures
|
||||
--pg-dsn PG_DSN Psycopg2 DB DSN
|
||||
|
||||
|
||||
|
|
@ -0,0 +1,9 @@
|
|||
-- Crée une table de dates entre 2010 et 2020
|
||||
DROP TABLE IF EXISTS dates;
|
||||
CREATE TABLE dates AS (SELECT
|
||||
the_date.the_date::date AS date,
|
||||
to_char(the_date.the_date, 'TMday') AS day,
|
||||
to_char(the_date.the_date, 'TMmonth') AS month
|
||||
FROM
|
||||
generate_series('2010-01-01'::date, '2020-01-01'::date, '1 day'::interval)
|
||||
AS the_date(the_date));
|
|
@ -0,0 +1,61 @@
|
|||
#! /usr/bin/env python
|
||||
|
||||
import subprocess
|
||||
import os
|
||||
|
||||
from setuptools import setup, find_packages
|
||||
from setuptools.command.sdist import sdist
|
||||
|
||||
|
||||
class eo_sdist(sdist):
|
||||
def run(self):
|
||||
print "creating VERSION file"
|
||||
if os.path.exists('VERSION'):
|
||||
os.remove('VERSION')
|
||||
version = get_version()
|
||||
version_file = open('VERSION', 'w')
|
||||
version_file.write(version)
|
||||
version_file.close()
|
||||
sdist.run(self)
|
||||
print "removing VERSION file"
|
||||
if os.path.exists('VERSION'):
|
||||
os.remove('VERSION')
|
||||
|
||||
|
||||
def get_version():
|
||||
'''Use the VERSION, if absent generates a version with git describe, if not
|
||||
tag exists, take 0.0.0- and add the length of the commit log.
|
||||
'''
|
||||
if os.path.exists('VERSION'):
|
||||
with open('VERSION', 'r') as v:
|
||||
return v.read()
|
||||
if os.path.exists('.git'):
|
||||
p = subprocess.Popen(['git', 'describe', '--dirty', '--match=v*'], stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE)
|
||||
result = p.communicate()[0]
|
||||
if p.returncode == 0:
|
||||
result = result.split()[0][1:]
|
||||
else:
|
||||
result = '0.0.0-%s' % len(subprocess.check_output(
|
||||
['git', 'rev-list', 'HEAD']).splitlines())
|
||||
return result.replace('-', '.').replace('.g', '+g')
|
||||
return '0.0.0'
|
||||
|
||||
|
||||
setup(name="wcs-olap",
|
||||
version=get_version(),
|
||||
license="AGPLv3+",
|
||||
description="Export w.c.s. data to an OLAP cube",
|
||||
long_description=open('README.rst').read(),
|
||||
url="http://dev.entrouvert.org/projects/publik-bi/",
|
||||
author="Entr'ouvert",
|
||||
author_email="authentic@listes.entrouvert.com",
|
||||
maintainer="Benjamin Dauvergne",
|
||||
maintainer_email="bdauvergne@entrouvert.com",
|
||||
packages=find_packages(),
|
||||
include_package_data=True,
|
||||
install_requires=['requests','psycopg2', 'isodate'],
|
||||
entry_points={
|
||||
'console_scripts': ['wcs-olap=wcs_olap.cmd:main'],
|
||||
},
|
||||
cmdclass={'sdist': eo_sdist})
|
116
wcs_olap/cmd.py
116
wcs_olap/cmd.py
|
@ -1,59 +1,97 @@
|
|||
import argparse
|
||||
import ConfigParser
|
||||
import os
|
||||
import urlparse
|
||||
import logging
|
||||
import logging.config
|
||||
from . import wcs_api
|
||||
from .feeder import WcsOlapFeeder
|
||||
import locale
|
||||
|
||||
from . import tb
|
||||
|
||||
|
||||
def main():
|
||||
locale.setlocale(locale.LC_ALL, '')
|
||||
try:
|
||||
main2()
|
||||
except SystemExit:
|
||||
raise
|
||||
except:
|
||||
raise
|
||||
tb.print_tb()
|
||||
raise SystemExit(1)
|
||||
|
||||
|
||||
def get_config(path=None):
|
||||
config = ConfigParser.ConfigParser()
|
||||
global_config_file = '/etc/wcs_olap.ini'
|
||||
if os.path.exists(global_config_file):
|
||||
config.read(global_config_file)
|
||||
if config.has_section('loggers'):
|
||||
logging.config.fileConfig(global_config_file)
|
||||
user_config_file = os.path.expanduser('~/.wcs_olap.ini')
|
||||
if os.path.exists(user_config_file):
|
||||
config.read(user_config_file)
|
||||
if config.has_section('loggers'):
|
||||
logging.config.fileConfig(user_config_file)
|
||||
global_config_path = '/etc/wcs_olap.ini'
|
||||
user_config_path = os.path.expanduser('~/.wcs_olap.ini')
|
||||
if not path:
|
||||
if os.path.exists(user_config_path):
|
||||
path = user_config_path
|
||||
elif os.path.exists(global_config_path):
|
||||
path = global_config_path
|
||||
else:
|
||||
return config
|
||||
config.read(path)
|
||||
if config.has_section('loggers'):
|
||||
logging.config.fileConfig(path)
|
||||
return config
|
||||
|
||||
|
||||
def main2():
|
||||
locale.setlocale(locale.LC_ALL, '')
|
||||
parser = argparse.ArgumentParser(description='Export W.C.S. data as a star schema in a '
|
||||
'postgresql DB', add_help=False)
|
||||
parser.add_argument('config_path', default=None)
|
||||
group = parser.add_mutually_exclusive_group()
|
||||
group.add_argument("-a", "--all", help="synchronize all wcs", action='store_true',
|
||||
default=False)
|
||||
group.add_argument('--url', help='url of the w.c.s. instance', required=False, default=None)
|
||||
args, rest = parser.parse_known_args()
|
||||
config = get_config(path=args.config_path)
|
||||
# list all known urls
|
||||
urls = [url for url in config.sections() if url.startswith('http://') or
|
||||
url.startswith('https://')]
|
||||
parser = argparse.ArgumentParser(description='Engine ES with W.C.S. data', add_help=False)
|
||||
parser.add_argument('--url', help='url of the w.c.s. instance', required=not urls,
|
||||
default=(urls or [None])[0])
|
||||
args, rest = parser.parse_known_args()
|
||||
defaults = {}
|
||||
if getattr(args, 'url') and config.has_section(args.url):
|
||||
defaults = dict(config.items(args.url))
|
||||
parser.add_argument("-h", "--help", action="help", help="show this help message and exit")
|
||||
parser.add_argument('--orig', help='origin of the request for signatures',
|
||||
required='orig' not in defaults)
|
||||
parser.add_argument('--key', help='HMAC key for signatures', required='key' not in defaults)
|
||||
group = parser.add_mutually_exclusive_group(
|
||||
required='email' not in defaults and 'name_id' not in defaults)
|
||||
group.add_argument('--email', help='email for authentication')
|
||||
group.add_argument('--name-id', help='NameID for authentication')
|
||||
if defaults:
|
||||
parser.set_defaults(**defaults)
|
||||
parser.add_argument('--pg-dsn', help='Psycopg2 DB DSN', required='pg-dsn' not in defaults)
|
||||
if not args.all:
|
||||
try:
|
||||
url = args.url or urls[0]
|
||||
except IndexError:
|
||||
print 'no url found'
|
||||
raise SystemExit(1)
|
||||
urls = [url]
|
||||
if config.has_section(args.url):
|
||||
defaults = dict(config.items(args.url))
|
||||
parser.add_argument("-h", "--help", action="help", help="show this help message and exit")
|
||||
parser.add_argument('--orig', help='origin of the request for signatures')
|
||||
parser.add_argument('--key', help='HMAC key for signatures')
|
||||
parser.add_argument('--pg-dsn', help='Psycopg2 DB DSN')
|
||||
parser.add_argument('--schema', help='schema name')
|
||||
args = parser.parse_args()
|
||||
for key in ('orig', 'key', 'pg_dsn', 'schema'):
|
||||
if getattr(args, key, None):
|
||||
defaults[key] = getattr(args, key)
|
||||
|
||||
args = parser.parse_args()
|
||||
api = wcs_api.WcsApi(url=args.url, orig=args.orig, key=args.key, email=args.email,
|
||||
name_id=args.name_id)
|
||||
domain = urlparse.urlparse(args.url).netloc.split(':')[0]
|
||||
schema = defaults.get('schema') or domain.replace('.', '_')
|
||||
logger = logging.getLogger('wcs-olap')
|
||||
logger.info('starting synchronizing w.c.s. at %r with ES at %s:%s', args.url, args.es_host,
|
||||
args.es_port)
|
||||
feeder = WcsOlapFeeder(api=api, schema=schema, db_url=args.db_url, logger=logger)
|
||||
feeder.feed()
|
||||
logger.info('finished')
|
||||
for url in urls:
|
||||
if config.has_section(url):
|
||||
defaults.update((config.items(url)))
|
||||
try:
|
||||
key = defaults['key']
|
||||
orig = defaults['orig']
|
||||
schema = defaults['schema']
|
||||
pg_dsn = defaults['pg_dsn']
|
||||
slugs = defaults.get('slugs', '').strip().split() or None
|
||||
except KeyError, e:
|
||||
logger.error('configuration in complete for %s: %s', url, e)
|
||||
else:
|
||||
api = wcs_api.WcsApi(url=url, orig=orig, key=key, slugs=slugs)
|
||||
logger.info('starting synchronizing w.c.s. at %r with PostgreSQL at %s', url, pg_dsn)
|
||||
feeder = WcsOlapFeeder(api=api, schema=schema, pg_dsn=pg_dsn, logger=logger,
|
||||
config=defaults)
|
||||
feeder.feed()
|
||||
logger.info('finished')
|
||||
defaults = {}
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
|
|
@ -1,43 +1,383 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
import copy
|
||||
import os
|
||||
import json
|
||||
import hashlib
|
||||
from utils import Whatever
|
||||
import psycopg2
|
||||
|
||||
from wcs_olap.wcs_api import WcsApiError
|
||||
|
||||
|
||||
def slugify(s):
|
||||
return s.replace('-', '_').replace(' ', '_')
|
||||
|
||||
|
||||
class Context(object):
|
||||
def __init__(self):
|
||||
self.stack = []
|
||||
|
||||
def __getitem__(self, key):
|
||||
if not self.stack:
|
||||
raise KeyError(key)
|
||||
for d in self.stack[::-1]:
|
||||
try:
|
||||
return d[key]
|
||||
except KeyError:
|
||||
pass
|
||||
else:
|
||||
raise KeyError(key)
|
||||
|
||||
def push(self, d):
|
||||
self.stack.append(d)
|
||||
|
||||
def pop(self):
|
||||
self.stack = self.stack[:-1]
|
||||
|
||||
def as_dict(self):
|
||||
r = {}
|
||||
for d in self.stack:
|
||||
r.update(d)
|
||||
return r
|
||||
|
||||
|
||||
class WcsOlapFeeder(object):
|
||||
def __init__(self, api, dsn, schema, logger=None):
|
||||
def __init__(self, api, pg_dsn, schema, logger=None, config=None):
|
||||
self.api = api
|
||||
self.logger = logger or Whatever()
|
||||
self.schema = schema
|
||||
self.connection = psycopg2.connect(dsn)
|
||||
self.connection = psycopg2.connect(dsn=pg_dsn)
|
||||
self.connection.autocommit = True
|
||||
self.cur = self.connection.cursor()
|
||||
self.formdefs = api.get_formdefs()
|
||||
self.formdefs = api.formdefs
|
||||
self.roles = api.roles
|
||||
|
||||
@property
|
||||
def default_ctx(self):
|
||||
return {
|
||||
self.categories = api.categories
|
||||
self.ctx = Context()
|
||||
self.ctx.push({
|
||||
'schema': self.schema,
|
||||
'role_table': 'role',
|
||||
'channel_table': 'channel',
|
||||
'category_table': 'category',
|
||||
'form_table': 'formdef',
|
||||
'generic_formdata_table': 'formdata',
|
||||
'generic_status_table': 'status',
|
||||
'year_table': 'year',
|
||||
'month_table': 'month',
|
||||
'day_table': 'day',
|
||||
'dow_table': 'dow',
|
||||
'hour_table': 'hour',
|
||||
})
|
||||
self.config = config or {}
|
||||
self.model = {
|
||||
'label': self.config.get('cubes_label', schema),
|
||||
'name': schema,
|
||||
'browser_options': {
|
||||
'schema': schema,
|
||||
},
|
||||
'joins': [
|
||||
{
|
||||
'name': 'receipt_time',
|
||||
'master': 'receipt_time',
|
||||
'detail': {
|
||||
'table': 'dates',
|
||||
'column': 'date',
|
||||
'schema': 'public',
|
||||
},
|
||||
'method': 'detail',
|
||||
'alias': 'dates',
|
||||
},
|
||||
{
|
||||
'name': 'channel',
|
||||
'master': 'channel_id',
|
||||
'detail': '{channel_table}.id',
|
||||
'method': 'detail',
|
||||
},
|
||||
{
|
||||
'name': 'role',
|
||||
'detail': '{role_table}.id',
|
||||
'method': 'detail',
|
||||
},
|
||||
{
|
||||
'name': 'formdef',
|
||||
'master': 'formdef_id',
|
||||
'detail': '{form_table}.id',
|
||||
'method': 'detail',
|
||||
},
|
||||
{
|
||||
'name': 'category',
|
||||
'master': '{form_table}.category_id',
|
||||
'detail': '{category_table}.id',
|
||||
},
|
||||
{
|
||||
'name': 'hour',
|
||||
'master': 'hour_id',
|
||||
'detail': '{hour_table}.id',
|
||||
'method': 'detail',
|
||||
},
|
||||
{
|
||||
'name': 'generic_status',
|
||||
'master': 'generic_status_id',
|
||||
'detail': '{generic_status_table}.id',
|
||||
'method': 'detail',
|
||||
},
|
||||
],
|
||||
'dimensions': [
|
||||
{
|
||||
'label': 'date de soumission',
|
||||
'name': 'receipt_time',
|
||||
'role': 'time',
|
||||
'levels': [
|
||||
{
|
||||
'name': 'year',
|
||||
'label': 'année',
|
||||
'role': 'year',
|
||||
'order_attribute': 'year',
|
||||
'order': 'asc',
|
||||
},
|
||||
{
|
||||
'name': 'quarter',
|
||||
'order_attribute': 'quarter',
|
||||
'label': 'trimestre',
|
||||
'role': 'quarter',
|
||||
},
|
||||
{
|
||||
'name': 'month',
|
||||
'label': 'mois',
|
||||
'role': 'month',
|
||||
'attributes': ['month', 'month_name'],
|
||||
'order_attribute': 'month',
|
||||
'label_attribute': 'month_name',
|
||||
'order': 'asc',
|
||||
},
|
||||
{
|
||||
'name': 'week',
|
||||
'label': 'semaine',
|
||||
'role': 'week',
|
||||
},
|
||||
{
|
||||
'name': 'day',
|
||||
'label': 'jour',
|
||||
'role': 'day',
|
||||
'order': 'asc',
|
||||
},
|
||||
{
|
||||
'name': 'dow',
|
||||
'label': 'jour de la semaine',
|
||||
'attributes': ['dow', 'dow_name'],
|
||||
'order_attribute': 'dow',
|
||||
'label_attribute': 'dow_name',
|
||||
'order': 'asc',
|
||||
},
|
||||
],
|
||||
'hierarchies': [
|
||||
{
|
||||
'name': 'default',
|
||||
'label': 'par défaut',
|
||||
'levels': ['year', 'month', 'day']
|
||||
},
|
||||
{
|
||||
'name': 'quarterly',
|
||||
'label': 'par trimestre',
|
||||
'levels': ['year', 'quarter']
|
||||
},
|
||||
{
|
||||
'name': 'weekly',
|
||||
'label': 'par semaine',
|
||||
'levels': ['year', 'week']
|
||||
},
|
||||
{
|
||||
'name': 'dowly',
|
||||
'label': 'par jour de la semaine',
|
||||
'levels': ['dow']
|
||||
},
|
||||
]
|
||||
},
|
||||
{
|
||||
'label': 'canaux',
|
||||
'name': 'channels',
|
||||
},
|
||||
{
|
||||
'label': 'catégories',
|
||||
'name': 'categories',
|
||||
},
|
||||
{
|
||||
'label': 'formulaire',
|
||||
'name': 'formdef',
|
||||
},
|
||||
{
|
||||
'label': 'statuts génériques',
|
||||
'name': 'generic_statuses',
|
||||
},
|
||||
{
|
||||
'label': 'heure',
|
||||
'name': 'hours',
|
||||
'levels': [
|
||||
{
|
||||
'name': 'hours',
|
||||
'attributes': ['hour_id', 'hour_label'],
|
||||
'order_attribute': 'hour_id',
|
||||
'label_attribute': 'hour_label',
|
||||
}
|
||||
]
|
||||
},
|
||||
],
|
||||
'mappings': {
|
||||
'receipt_time.year': {
|
||||
'table': 'dates',
|
||||
'column': 'date',
|
||||
'schema': 'public',
|
||||
'extract': 'year',
|
||||
},
|
||||
'receipt_time.month': {
|
||||
'table': 'dates',
|
||||
'column': 'date',
|
||||
'schema': 'public',
|
||||
'extract': 'month'
|
||||
},
|
||||
'receipt_time.month_name': {
|
||||
'table': 'dates',
|
||||
'schema': 'public',
|
||||
'column': 'month'
|
||||
},
|
||||
'receipt_time.week': {
|
||||
'table': 'dates',
|
||||
'column': 'date',
|
||||
'schema': 'public',
|
||||
'extract': 'week'
|
||||
},
|
||||
'receipt_time.day': {
|
||||
'table': 'dates',
|
||||
'column': 'date',
|
||||
'schema': 'public',
|
||||
'extract': 'day'
|
||||
},
|
||||
'receipt_time.dow': {
|
||||
'table': 'dates',
|
||||
'column': 'date',
|
||||
'schema': 'public',
|
||||
'extract': 'dow'
|
||||
},
|
||||
'receipt_time.dow_name': {
|
||||
'table': 'dates',
|
||||
'schema': 'public',
|
||||
'column': 'day',
|
||||
},
|
||||
'receipt_time.quarter': {
|
||||
'table': 'dates',
|
||||
'column': 'date',
|
||||
'schema': 'public',
|
||||
'extract': 'quarter'
|
||||
},
|
||||
'formdef': 'formdef.label',
|
||||
'channels': 'channel.label',
|
||||
'categories': 'category.label',
|
||||
'generic_statuses': 'status.label',
|
||||
'hours.hour_label': '{hour_table}.label',
|
||||
'hours.hour_id': '{hour_table}.id',
|
||||
},
|
||||
'cubes': [
|
||||
{
|
||||
'name': schema + '_formdata',
|
||||
'label': 'Toutes les demandes (%s)' % schema,
|
||||
'key': 'id',
|
||||
'fact': 'formdata',
|
||||
'dimensions': [
|
||||
'receipt_time',
|
||||
'hours',
|
||||
'channels',
|
||||
'categories',
|
||||
'formdef',
|
||||
'generic_statuses',
|
||||
],
|
||||
'joins': [
|
||||
{
|
||||
'name': 'receipt_time',
|
||||
},
|
||||
{
|
||||
'name': 'hour',
|
||||
},
|
||||
{
|
||||
'name': 'channel',
|
||||
},
|
||||
{
|
||||
'name': 'formdef',
|
||||
},
|
||||
{
|
||||
'name': 'category',
|
||||
},
|
||||
{
|
||||
'name': 'generic_status',
|
||||
},
|
||||
],
|
||||
'measures': [
|
||||
{
|
||||
'name': 'endpoint_delay',
|
||||
'label': 'délai de traitement',
|
||||
'nonadditive': 'all',
|
||||
},
|
||||
],
|
||||
'aggregates': [
|
||||
{
|
||||
'name': 'record_count',
|
||||
'label': 'nombre de demandes',
|
||||
'function': 'count'
|
||||
},
|
||||
{
|
||||
'name': 'endpoint_delay_max',
|
||||
'label': 'délai de traitement maximum',
|
||||
'measure': 'endpoint_delay',
|
||||
'function': 'max',
|
||||
},
|
||||
{
|
||||
'name': 'endpoint_delay_avg',
|
||||
'label': 'délai de traitement moyen',
|
||||
'measure': 'endpoint_delay',
|
||||
'function': 'avg',
|
||||
},
|
||||
],
|
||||
},
|
||||
],
|
||||
}
|
||||
# apply table names
|
||||
self.model = self.tpl(self.model)
|
||||
self.base_cube = self.model['cubes'][0]
|
||||
|
||||
def hash_table_name(self, table_name):
|
||||
table_name = table_name.format(**self.default_ctx)
|
||||
if len(table_name) < 64:
|
||||
return table_name
|
||||
else:
|
||||
return table_name[:57] + hashlib.md5(table_name).hexdigest()[:6]
|
||||
|
||||
@property
|
||||
def default_ctx(self):
|
||||
return self.ctx.as_dict()
|
||||
|
||||
def ex(self, query, ctx=None, vars=None):
|
||||
ctx = ctx or {}
|
||||
ctx.update(self.default_ctx)
|
||||
self.cur.execute(query.format(**(ctx or {})), vars=vars)
|
||||
sql = query.format(**(ctx or {}))
|
||||
self.logger.debug('SQL: %s VARS: %s', sql, vars)
|
||||
self.cur.execute(sql, vars=vars)
|
||||
|
||||
def exmany(self, query, varslist, ctx=None):
|
||||
ctx = ctx or {}
|
||||
ctx.update(self.default_ctx)
|
||||
self.cur.executemany(query.format(**(ctx or {})), varslist)
|
||||
sql = query.format(**(ctx or {}))
|
||||
self.logger.debug('SQL: %s VARSLIST: %s', sql, varslist)
|
||||
self.cur.executemany(sql, varslist)
|
||||
|
||||
def do_schema(self):
|
||||
self.logger.debug('dropping schema %s', self.schema)
|
||||
self.ex('SET search_path = public')
|
||||
self.ex('DROP SCHEMA {schema} IF EXISTS')
|
||||
self.ex('CREATE SCHEMA {schema} IF EXISTS')
|
||||
self.ex('DROP SCHEMA IF EXISTS {schema} CASCADE')
|
||||
self.logger.debug('creating schema %s', self.schema)
|
||||
self.ex('CREATE SCHEMA {schema}')
|
||||
self.ex('SET search_path = {schema},public')
|
||||
|
||||
channels = [
|
||||
[1, 'web', u'web']
|
||||
[1, 'web', u'web'],
|
||||
[2, 'mail', u'courrier'],
|
||||
[3, 'phone', u'téléphone'],
|
||||
[4, 'counter', u'guichet'],
|
||||
|
@ -45,28 +385,143 @@ class WcsOlapFeeder(object):
|
|||
channel_to_id = dict((c[1], c[0]) for c in channels)
|
||||
id_to_channel = dict((c[0], c[1]) for c in channels)
|
||||
|
||||
status = [
|
||||
[1, 'Nouveau'],
|
||||
[2, 'En cours'],
|
||||
[3, 'Terminé'],
|
||||
]
|
||||
status_to_id = dict((c[1], c[0]) for c in channels)
|
||||
id_to_status = dict((c[0], c[1]) for c in channels)
|
||||
|
||||
def create_table(self, name, columns, inherits=None, comment=None):
|
||||
sql = 'CREATE TABLE %s' % name
|
||||
sql += '(' + ', '.join('%s %s' % (n, t) for n, t in columns) + ')'
|
||||
if inherits:
|
||||
sql += ' INHERITS (%s)' % inherits
|
||||
self.ex(sql)
|
||||
if comment:
|
||||
self.ex('COMMENT ON TABLE %s IS %%s' % name, vars=(comment,))
|
||||
|
||||
def create_labeled_table(self, name, labels, comment=None):
|
||||
self.create_table(name,
|
||||
[
|
||||
['id', 'smallint primary key'],
|
||||
['label', 'varchar']
|
||||
], comment=comment)
|
||||
values = ', '.join(self.cur.mogrify('(%s, %s)', [_id, _label]) for _id, _label in labels)
|
||||
if not values:
|
||||
return
|
||||
self.ex('INSERT INTO %s (id, label) VALUES %s' % (str(name), values))
|
||||
|
||||
def tpl(self, o, ctx=None):
|
||||
ctx = ctx or {}
|
||||
ctx.update(self.default_ctx)
|
||||
|
||||
def helper(o):
|
||||
if isinstance(o, basestring):
|
||||
return o.format(**ctx)
|
||||
elif isinstance(o, dict):
|
||||
return dict((k, helper(v)) for k, v in o.iteritems())
|
||||
elif isinstance(o, list):
|
||||
return [helper(v) for v in o]
|
||||
elif isinstance(o, (bool, int, float)):
|
||||
return o
|
||||
else:
|
||||
assert False, '%s is not a valid value for JSON' % o
|
||||
return helper(o)
|
||||
|
||||
def add_dim(self, **kwargs):
|
||||
self.dimensions.append(self.tpl(kwargs))
|
||||
|
||||
def do_base_table(self):
|
||||
self.ex('CREATE TABLE {channel_table} (id serial PRIMARY KEY, label varchar)')
|
||||
self.exmany('INSERT INTO {channel_table} (id, label) VALUES (%s, %s)',
|
||||
[[c[0], c[2]] for c in self.channels])
|
||||
self.ex('CREATE TABLE {role_table} (id serial PRIMARY KEY, label varchar)')
|
||||
self.exmany('INSERT INTO {role_table} (label) VALUES (%s) RETURNING (id)',
|
||||
[[role.name] for role in self.roles])
|
||||
self.roles_mapping = []
|
||||
for row, role in zip(self.cur.fetchall(), self.roles):
|
||||
self.roles_mappin[role.id] = row[0]
|
||||
# channels
|
||||
self.create_labeled_table('{channel_table}', [[c[0], c[2]] for c in self.channels])
|
||||
|
||||
# roles
|
||||
roles = dict((i, role.name) for i, role in enumerate(self.roles))
|
||||
self.create_labeled_table('{role_table}', roles.items())
|
||||
self.role_mapping = dict((role.id, i) for i, role in enumerate(self.roles))
|
||||
|
||||
# categories
|
||||
self.create_labeled_table('{category_table}', enumerate(c.name for c in self.categories))
|
||||
self.categories_mapping = dict((c.id, i) for i, c in enumerate(self.categories))
|
||||
|
||||
# years
|
||||
self.create_labeled_table('{year_table}', zip(range(2000, 2030), map(str, range(2000,
|
||||
2030))))
|
||||
|
||||
# month
|
||||
self.create_labeled_table('{month_table}', zip(range(1, 13), [u'janvier', u'février',
|
||||
u'mars', u'avril', u'mai',
|
||||
u'juin', u'juillet', u'août',
|
||||
u'septembre', u'octobre',
|
||||
u'novembre', u'décembre']))
|
||||
# years
|
||||
self.create_labeled_table('{day_table}', zip(range(1, 32), map(str, range(1, 32))))
|
||||
# day of week
|
||||
self.create_labeled_table('{dow_table}', enumerate([u'lundi', u'mardi', u'mercredi',
|
||||
u'jeudi', u'vendredi', u'samedi',
|
||||
u'dimanche']))
|
||||
|
||||
self.create_labeled_table('{hour_table}', zip(range(0, 24), map(str, range(0, 24))))
|
||||
|
||||
self.create_labeled_table('{generic_status_table}', self.status)
|
||||
self.ex('CREATE TABLE {form_table} (id serial PRIMARY KEY,'
|
||||
' category_id integer REFERENCES {category_table} (id),'
|
||||
' label varchar)')
|
||||
self.columns = [
|
||||
['id', 'serial primary key'],
|
||||
['formdef_id', 'smallint REFERENCES {form_table} (id)'],
|
||||
['receipt_time', 'date'],
|
||||
['year_id', 'smallint REFERENCES {year_table} (id)'],
|
||||
['month_id', 'smallint REFERENCES {month_table} (id)'],
|
||||
['hour_id', 'smallint REFERENCES {hour_table} (id)'],
|
||||
['day_id', 'smallint REFERENCES {day_table} (id)'],
|
||||
['dow_id', 'smallint REFERENCES {dow_table} (id)'],
|
||||
['channel_id', 'smallint REFERENCES {channel_table} (id)'],
|
||||
['backoffice', 'boolean'],
|
||||
['generic_status_id', 'smallint REFERENCES {generic_status_table} (id)'],
|
||||
['endpoint_delay', 'real'],
|
||||
]
|
||||
self.comments = {
|
||||
'formdef_id': u'dim$formulaire',
|
||||
'receipt_time': u'time$date de réception',
|
||||
'year_id': u'dim$année',
|
||||
'month_id': u'dim$mois',
|
||||
'hour_id': u'dim$heure',
|
||||
'day_id': u'dim$jour',
|
||||
'dow_id': u'dim$jour de la semaine',
|
||||
'channel_id': u'dim$canal',
|
||||
'backoffice': u'dim$soumission backoffce',
|
||||
'generic_status_id': u'dim$statut générique',
|
||||
'endpoint_delay': u'measure$délai de traitement',
|
||||
}
|
||||
self.create_table('{generic_formdata_table}', self.columns)
|
||||
for at, comment in self.comments.iteritems():
|
||||
self.ex('COMMENT ON COLUMN {generic_formdata_table}.%s IS %%s' % at, vars=(comment,))
|
||||
|
||||
def feed(self):
|
||||
self.do_schema()
|
||||
self.do_base_table()
|
||||
for formdef in self.api.get_formdefs():
|
||||
self.feed_formdef(formdef)
|
||||
for formdef in self.formdefs:
|
||||
try:
|
||||
formdef_feeder = WcsFormdefFeeder(self, formdef)
|
||||
formdef_feeder.feed()
|
||||
except WcsApiError, e:
|
||||
# ignore authorization errors
|
||||
if (len(e.args) > 2 and hasattr(e.args[2], 'response')
|
||||
and e.args[2].response.status_code == 403):
|
||||
continue
|
||||
self.logger.error('failed to retrieve formdef %s', formdef.slug)
|
||||
|
||||
|
||||
class WcsFormdefFeeder(object):
|
||||
def __init__(self, olap_feeder, formdef):
|
||||
self.olap_feeder = olap_feeder
|
||||
self.formdef = formdef
|
||||
self.status_mapping = {}
|
||||
self.items_mappings = {}
|
||||
self.fields = []
|
||||
|
||||
@property
|
||||
def table_name(self):
|
||||
|
@ -74,45 +529,222 @@ class WcsFormdefFeeder(object):
|
|||
|
||||
@property
|
||||
def status_table_name(self):
|
||||
return 'formdata_status_%s' % self.formdef.slug.replace('-', '_')
|
||||
|
||||
@property
|
||||
def default_ctx(self):
|
||||
return {
|
||||
'table_name': self.table_name,
|
||||
'status_table_name': self.status_table_name,
|
||||
}
|
||||
return 'status_%s' % self.formdef.slug.replace('-', '_')
|
||||
|
||||
def __getattr__(self, name):
|
||||
return getattr(self.olap_feeder, name)
|
||||
|
||||
def ex(self, query, ctx=None, vars=None):
|
||||
ctx = ctx or {}
|
||||
ctx.update(self.default_ctx)
|
||||
self.olap_feeder.ex(query, ctx=ctx, vars=vars)
|
||||
|
||||
def formdef_exmany(self, formdef, statement, varslist, ctx=None):
|
||||
ctx = ctx or {}
|
||||
ctx.update(self.default_ctx)
|
||||
self.olap_feeder.exmany(statement, varslist, ctx=ctx)
|
||||
|
||||
def do_statuses(self):
|
||||
self.ex('CREATE TABLE {status_table_name} (id serial PRIMARY KEY, '
|
||||
'submission_backoffice label varchar)')
|
||||
statuses = self.formdef.schema.workflow['statuses'].items()
|
||||
labels = [[defn['name']] for status, defn in statuses]
|
||||
self.exmany('INSERT INTO {status_mapping} (label) VALUES (%s) RETURNING (id)',
|
||||
varslist=labels)
|
||||
for status_sql_id, (status_id, defn) in zip(self.cur.fetchall(), statuses):
|
||||
self.status_mapping['wf-%s' % status_id] = status_sql_id
|
||||
statuses = self.formdef.schema.workflow.statuses
|
||||
self.olap_feeder.create_labeled_table(self.status_table_name,
|
||||
enumerate([s.name for s in statuses]))
|
||||
self.status_mapping = dict((s.id, i) for i, s in enumerate(statuses))
|
||||
|
||||
def do_data_table(self):
|
||||
self.ex('CREATE TABLE {table_name} (id serial PRIMARY KEY, '
|
||||
'receipt_date date, '
|
||||
'status integer REFERENCES {status_table_name} (id))')
|
||||
self.ex('INSERT INTO {form_table} (category_id, label) VALUES (%s, %s) RETURNING (id)',
|
||||
vars=[self.categories_mapping.get(self.formdef.schema.category_id),
|
||||
self.formdef.schema.name])
|
||||
self.formdef_sql_id = self.cur.fetchone()[0]
|
||||
|
||||
columns = [['status_id', 'smallint REFERENCES {status_table} (id)']]
|
||||
|
||||
comments = {}
|
||||
|
||||
# add item fields
|
||||
for field in self.formdef.schema.fields:
|
||||
if not field.items and not field.options:
|
||||
continue
|
||||
if not field.varname:
|
||||
continue
|
||||
self.fields.append(field)
|
||||
comment = (u'dim$%s$Valeur du champ « %s » du formulaire %s'
|
||||
% (field.label, field.label, self.formdef.schema.name))
|
||||
table_name = self.hash_table_name('{formdata_table}_field_%s' % field.varname)
|
||||
# create table and mapping
|
||||
if field.items:
|
||||
self.create_labeled_table(table_name, enumerate(field.items),
|
||||
comment=comment)
|
||||
self.items_mappings[field.varname] = dict(
|
||||
(item, i) for i, item in enumerate(field.items))
|
||||
elif field.options:
|
||||
options = enumerate(field.options)
|
||||
for option in field.options:
|
||||
self.create_labeled_table(table_name,
|
||||
[(i, o['label']) for i, o in options],
|
||||
comment=comment)
|
||||
self.items_mappings[field.varname] = dict((o['value'], i) for i, o in options)
|
||||
|
||||
at = 'field_%s' % field.varname
|
||||
columns.append([at, 'smallint REFERENCES %s (id)' % table_name])
|
||||
comments[at] = u'dim$' + field.label
|
||||
|
||||
# add geolocation fields
|
||||
for geolocation, label in self.formdef.schema.geolocations:
|
||||
at = 'geolocation_%s' % geolocation
|
||||
columns.append([at, 'point'])
|
||||
comments[at] = u'dim$' + label
|
||||
|
||||
# add function fields
|
||||
for function, name in self.formdef.schema.workflow.functions.iteritems():
|
||||
at = 'function_%s' % slugify(function)
|
||||
columns.append([at, 'smallint REFERENCES {role_table} (id)'])
|
||||
comments[at] = u'dim$Fonction « %s »' % name
|
||||
|
||||
self.columns = ([name for name, _type in self.olap_feeder.columns]
|
||||
+ [name for name, _type in columns])
|
||||
self.create_table('{formdata_table}', columns, inherits='{generic_formdata_table}',
|
||||
comment=u'cube$%s' % self.formdef.schema.name)
|
||||
for at, comment in comments.iteritems():
|
||||
self.ex('COMMENT ON COLUMN {formdata_table}.%s IS %%s' % at, vars=(comment,))
|
||||
# PostgreSQL does not propagate foreign key constraints to child tables
|
||||
# so we must recreate them manually
|
||||
for name, _type in self.olap_feeder.columns:
|
||||
if 'REFERENCES' not in _type:
|
||||
continue
|
||||
i = _type.index('REFERENCES')
|
||||
constraint = '%s_fk_constraint FOREIGN KEY (%s) %s' % (name, name, _type[i:])
|
||||
self.ex('ALTER TABLE {formdata_table} ADD CONSTRAINT %s' % constraint)
|
||||
|
||||
def do_data(self):
|
||||
values = []
|
||||
for data in self.formdef.datas:
|
||||
|
||||
# ignore formdata without status
|
||||
if not data.workflow.status:
|
||||
continue
|
||||
|
||||
status = data.formdef.schema.workflow.statuses_map[data.workflow.status.id]
|
||||
if data.endpoint_delay:
|
||||
endpoint_delay = (data.endpoint_delay.days + float(data.endpoint_delay.seconds) /
|
||||
86400.)
|
||||
else:
|
||||
endpoint_delay = None
|
||||
row = {
|
||||
'formdef_id': self.formdef_sql_id,
|
||||
'receipt_time': data.receipt_time,
|
||||
'year_id': data.receipt_time.year,
|
||||
'month_id': data.receipt_time.month,
|
||||
'day_id': data.receipt_time.day,
|
||||
'hour_id': data.receipt_time.hour,
|
||||
'dow_id': data.receipt_time.weekday(),
|
||||
'channel_id': self.channel_to_id[data.submission.channel.lower()],
|
||||
'backoffice': data.submission.backoffice,
|
||||
# FIXME "En cours"/2 is never used
|
||||
'generic_status_id': 3 if status.endpoint else 1,
|
||||
'status_id': self.status_mapping[data.workflow.status.id],
|
||||
'endpoint_delay': endpoint_delay,
|
||||
}
|
||||
# add form fields value
|
||||
for field in self.fields:
|
||||
v = None
|
||||
if field.type == 'item':
|
||||
# map items to sql id
|
||||
v = self.items_mappings[field.varname].get(data.fields.get(field.varname))
|
||||
row['field_%s' % field.varname] = v
|
||||
# add geolocation fields value
|
||||
for geolocation, label in self.formdef.schema.geolocations:
|
||||
v = (data.geolocations or {}).get(geolocation)
|
||||
row['geolocation_%s' % geolocation] = v
|
||||
# add function fields value
|
||||
for function, name in self.formdef.schema.workflow.functions.iteritems():
|
||||
try:
|
||||
v = data.functions[function]
|
||||
except KeyError:
|
||||
v = None
|
||||
else:
|
||||
v = self.olap_feeder.role_mapping[v.id]
|
||||
at = 'function_%s' % slugify(function)
|
||||
row[at] = v
|
||||
|
||||
tpl = '(' + ', '.join(['%s'] * len(self.columns[1:])) + ')'
|
||||
value = self.cur.mogrify(tpl, [row[column] for column in self.columns[1:]])
|
||||
values.append(value)
|
||||
if not values:
|
||||
self.logger.warning('no data')
|
||||
return
|
||||
self.ex('INSERT INTO {formdata_table} (%s) VALUES %s' % (
|
||||
', '.join(self.columns[1:]), # skip the id column
|
||||
', '.join(values)))
|
||||
|
||||
def feed(self):
|
||||
self.logger.info('feed formdef %s', self.formdef.slug)
|
||||
self.do_statuses()
|
||||
self.do_data_table()
|
||||
self.olap_feeder.ctx.push({
|
||||
'formdata_table': self.table_name,
|
||||
'status_table': self.status_table_name,
|
||||
})
|
||||
|
||||
# create cube
|
||||
self.cube = copy.deepcopy(self.base_cube)
|
||||
self.cube.update({
|
||||
'name': self.schema + '_' + self.table_name,
|
||||
'label': self.formdef.schema.name,
|
||||
'fact': self.table_name,
|
||||
})
|
||||
# add dimension for status
|
||||
self.cube['joins'].append({
|
||||
'master': 'status_id',
|
||||
'detail': '%s.id' % self.status_table_name,
|
||||
'method': 'detail',
|
||||
})
|
||||
dim_name = '%s_%s' % (self.table_name, 'status')
|
||||
self.model['dimensions'].append({
|
||||
'name': dim_name,
|
||||
'label': 'statut',
|
||||
'levels': [
|
||||
{
|
||||
'name': 'status',
|
||||
'attributes': ['status_id', 'status_label'],
|
||||
'order_attribute': 'status_id',
|
||||
'label_attribute': 'status_label',
|
||||
},
|
||||
],
|
||||
})
|
||||
self.model['mappings']['%s.status_id' % dim_name] = '%s.id' % self.status_table_name
|
||||
self.model['mappings']['%s.status_label' % dim_name] = '%s.label' % self.status_table_name
|
||||
self.cube['dimensions'].append(dim_name)
|
||||
|
||||
# add dimension for function
|
||||
for function, name in self.formdef.schema.workflow.functions.iteritems():
|
||||
at = 'function_%s' % slugify(function)
|
||||
dim_name = '%s_function_%s' % (self.table_name, slugify(function))
|
||||
self.cube['joins'].append({
|
||||
'master': at,
|
||||
'detail': self.tpl('{role_table}.id'),
|
||||
'alias': at,
|
||||
})
|
||||
self.model['dimensions'].append({
|
||||
'name': dim_name,
|
||||
'label': u'fonction %s' % name,
|
||||
})
|
||||
self.model['mappings'][dim_name] = '%s.label' % at
|
||||
self.cube['dimensions'].append(dim_name)
|
||||
|
||||
# add dimensions for item fields
|
||||
for field in self.fields:
|
||||
if field.type != 'item':
|
||||
continue
|
||||
table_name = self.hash_table_name('{formdata_table}_field_%s' % field.varname)
|
||||
self.cube['joins'].append({
|
||||
'master': 'field_%s' % field.varname,
|
||||
'detail': '%s.id' % table_name,
|
||||
'method': 'detail',
|
||||
})
|
||||
dim_name = '%s_%s' % (self.table_name. field.varname)
|
||||
self.model['dimensions'].append({
|
||||
'name': dim_name,
|
||||
'label': field.label,
|
||||
})
|
||||
self.model['mappings'][dim_name] = '%s.label' % table_name
|
||||
self.cube['dimensions'].append(dim_name)
|
||||
|
||||
self.model['cubes'].append(self.cube)
|
||||
try:
|
||||
self.logger.info('feed formdef %s', self.formdef.slug)
|
||||
self.do_statuses()
|
||||
self.do_data_table()
|
||||
self.do_data()
|
||||
finally:
|
||||
self.olap_feeder.ctx.pop()
|
||||
if 'cubes_model_dirs' in self.config:
|
||||
model_path = os.path.join(self.config['cubes_model_dirs'], '%s.json' % self.schema)
|
||||
with open(model_path, 'w') as f:
|
||||
json.dump(self.model, f, indent=2, sort_keys=True)
|
||||
|
|
|
@ -0,0 +1,55 @@
|
|||
from StringIO import StringIO
|
||||
import sys
|
||||
import linecache
|
||||
|
||||
|
||||
def print_tb():
|
||||
exc_type, exc_value, tb = sys.exc_info()
|
||||
if exc_value:
|
||||
exc_value = unicode(str(exc_value), errors='ignore')
|
||||
error_file = StringIO()
|
||||
|
||||
limit = None
|
||||
if hasattr(sys, 'tracebacklimit'):
|
||||
limit = sys.tracebacklimit
|
||||
print >>error_file, "Exception:"
|
||||
print >>error_file, " type = '%s', value = '%s'" % (exc_type, exc_value)
|
||||
print >>error_file
|
||||
|
||||
# format the traceback
|
||||
print >>error_file, 'Stack trace (most recent call first):'
|
||||
n = 0
|
||||
while tb is not None and (limit is None or n < limit):
|
||||
frame = tb.tb_frame
|
||||
function = frame.f_code.co_name
|
||||
filename = frame.f_code.co_filename
|
||||
exclineno = frame.f_lineno
|
||||
locals = frame.f_locals.items()
|
||||
|
||||
print >>error_file, ' File "%s", line %s, in %s' % (filename, exclineno, function)
|
||||
linecache.checkcache(filename)
|
||||
for lineno in range(exclineno - 2, exclineno + 3):
|
||||
line = linecache.getline(filename, lineno, frame.f_globals)
|
||||
if line:
|
||||
if lineno == exclineno:
|
||||
print >>error_file, '>%5s %s' % (lineno, line.rstrip())
|
||||
else:
|
||||
print >>error_file, ' %5s %s' % (lineno, line.rstrip())
|
||||
print >>error_file
|
||||
if locals:
|
||||
print >>error_file, " locals: "
|
||||
for key, value in locals:
|
||||
print >>error_file, " %s =" % key,
|
||||
try:
|
||||
repr_value = repr(value)
|
||||
if len(repr_value) > 10000:
|
||||
repr_value = repr_value[:10000] + ' [...]'
|
||||
print >>error_file, repr_value,
|
||||
except:
|
||||
print >>error_file, "<ERROR WHILE PRINTING VALUE>",
|
||||
print >>error_file
|
||||
print >>error_file
|
||||
tb = tb.tb_next
|
||||
n = n + 1
|
||||
|
||||
print error_file.getvalue()
|
|
@ -0,0 +1,6 @@
|
|||
class Whatever(object):
|
||||
def __call__(*args, **kwargs):
|
||||
pass
|
||||
|
||||
def __getattr__(self, name):
|
||||
return self
|
|
@ -1,7 +1,7 @@
|
|||
import requests
|
||||
import urlparse
|
||||
import urllib
|
||||
import datetime
|
||||
import isodate
|
||||
|
||||
from . import signature
|
||||
|
||||
|
@ -15,33 +15,107 @@ class BaseObject(object):
|
|||
self.__wcs_api = wcs_api
|
||||
self.__dict__.update(**kwargs)
|
||||
|
||||
def json(self):
|
||||
d = self.__dict__.copy()
|
||||
for key in d.keys():
|
||||
if key.startswith('_'):
|
||||
del d[key]
|
||||
return d
|
||||
|
||||
class FormDataWorkflow(BaseObject):
|
||||
status = None
|
||||
|
||||
def __init__(self, wcs_api, **kwargs):
|
||||
super(FormDataWorkflow, self).__init__(wcs_api, **kwargs)
|
||||
if self.status is not None:
|
||||
self.status = BaseObject(wcs_api, **self.status)
|
||||
|
||||
|
||||
class Evolution(BaseObject):
|
||||
user = None
|
||||
status = None
|
||||
parts = None
|
||||
|
||||
def __init__(self, wcs_api, **kwargs):
|
||||
super(Evolution, self).__init__(wcs_api, **kwargs)
|
||||
self.time = isodate.parse_datetime(self.time)
|
||||
if self.parts:
|
||||
self.parts = [BaseObject(wcs_api, **part) for part in self.parts]
|
||||
|
||||
|
||||
class FormData(BaseObject):
|
||||
def json(self):
|
||||
d = super(FormData, self).json()
|
||||
formdef = d.pop('formdef')
|
||||
receipt_time = datetime.datetime.strptime(d['receipt_time'], "%Y-%m-%dT%H:%M:%SZ")
|
||||
d['receipt_time__dow'] = receipt_time.strftime('%A')
|
||||
d['receipt_time__dow_int'] = int(receipt_time.strftime('%w'))
|
||||
d['receipt_time__month'] = receipt_time.strftime('%B')
|
||||
d['receipt_time__month_int'] = int(receipt_time.strftime('%m'))
|
||||
d['receipt_time__hour'] = int(receipt_time.strftime('%H'))
|
||||
d['formdef_slug'] = formdef.slug
|
||||
return d
|
||||
geolocations = None
|
||||
evolution = None
|
||||
|
||||
def __init__(self, wcs_api, **kwargs):
|
||||
super(FormData, self).__init__(wcs_api, **kwargs)
|
||||
self.receipt_time = isodate.parse_datetime(self.receipt_time)
|
||||
self.submission = BaseObject(wcs_api, **self.submission)
|
||||
self.workflow = FormDataWorkflow(wcs_api, **self.workflow)
|
||||
self.evolution = [Evolution(wcs_api, **evo) for evo in self.evolution or []]
|
||||
self.functions = {}
|
||||
self.concerned_roles = []
|
||||
self.action_roles = []
|
||||
for function in self.roles:
|
||||
roles = [Role(wcs_api, **r) for r in self.roles[function]]
|
||||
if function == 'concerned':
|
||||
self.concerned_roles.extend(roles)
|
||||
elif function == 'actions':
|
||||
self.concerned_roles.extend(roles)
|
||||
else:
|
||||
try:
|
||||
self.functions[function] = roles[0]
|
||||
except IndexError:
|
||||
self.functions[function] = None
|
||||
del self.roles
|
||||
|
||||
def __repr__(self):
|
||||
return '<{klass} {display_id!r}>'.format(klass=self.__class__.__name__,
|
||||
display_id=self.id)
|
||||
|
||||
@property
|
||||
def endpoint_delay(self):
|
||||
'''Compute delay as the time when the last not endpoint status precedes an endpoint
|
||||
status.'''
|
||||
statuses_map = self.formdef.schema.workflow.statuses_map
|
||||
s = 0
|
||||
for evo in self.evolution[::-1]:
|
||||
if evo.status:
|
||||
if statuses_map[evo.status].endpoint:
|
||||
s = 1
|
||||
last = evo.time - self.receipt_time
|
||||
else:
|
||||
if s == 1:
|
||||
return last
|
||||
else:
|
||||
return
|
||||
|
||||
|
||||
class Workflow(BaseObject):
|
||||
status = None
|
||||
|
||||
def __init__(self, wcs_api, **kwargs):
|
||||
super(Workflow, self).__init__(wcs_api, **kwargs)
|
||||
self.statuses = [BaseObject(wcs_api, **v) for v in self.statuses]
|
||||
self.statuses_map = dict((s.id, s) for s in self.statuses)
|
||||
|
||||
|
||||
class Field(BaseObject):
|
||||
items = None
|
||||
options = None
|
||||
varname = None
|
||||
in_filters = False
|
||||
|
||||
|
||||
class Schema(BaseObject):
|
||||
category_id = None
|
||||
category = None
|
||||
geolocations = None
|
||||
|
||||
def __init__(self, wcs_api, **kwargs):
|
||||
super(Schema, self).__init__(wcs_api, **kwargs)
|
||||
self.workflow = Workflow(wcs_api, **self.workflow)
|
||||
self.fields = [Field(wcs_api, **f) for f in self.fields]
|
||||
self.geolocations = sorted((k, v) for k, v in (self.geolocations or {}).items())
|
||||
|
||||
|
||||
class FormDef(BaseObject):
|
||||
geolocations = None
|
||||
|
||||
def __init__(self, wcs_api, **kwargs):
|
||||
self.__wcs_api = wcs_api
|
||||
self.__dict__.update(**kwargs)
|
||||
|
@ -54,7 +128,7 @@ class FormDef(BaseObject):
|
|||
datas = self.__wcs_api.get_formdata(self.slug)
|
||||
for data in datas:
|
||||
data.formdef = self
|
||||
return datas
|
||||
yield data
|
||||
|
||||
@property
|
||||
def schema(self):
|
||||
|
@ -68,14 +142,18 @@ class Role(BaseObject):
|
|||
pass
|
||||
|
||||
|
||||
class Category(BaseObject):
|
||||
pass
|
||||
|
||||
|
||||
class WcsApi(object):
|
||||
def __init__(self, url, orig, key, name_id=None, email=None, verify=False):
|
||||
def __init__(self, url, orig, key, verify=True, slugs=None):
|
||||
self.url = url
|
||||
self.orig = orig
|
||||
self.key = key
|
||||
self.email = email
|
||||
self.name_id = name_id
|
||||
self.verify = verify
|
||||
self.cache = {}
|
||||
self.slugs = slugs or []
|
||||
|
||||
@property
|
||||
def formdefs_url(self):
|
||||
|
@ -92,21 +170,21 @@ class WcsApi(object):
|
|||
def get_json(self, *url_parts):
|
||||
url = reduce(lambda x, y: urlparse.urljoin(x, y), url_parts)
|
||||
params = {'orig': self.orig}
|
||||
if self.email:
|
||||
params['email'] = self.email
|
||||
if self.name_id:
|
||||
params['NameID'] = self.name_id
|
||||
query_string = urllib.urlencode(params)
|
||||
presigned_url = url + ('&' if '?' in url else '?') + query_string
|
||||
if presigned_url in self.cache:
|
||||
return self.cache[presigned_url]
|
||||
signed_url = signature.sign_url(presigned_url, self.key)
|
||||
try:
|
||||
response = requests.get(signed_url, verify=False)
|
||||
response = requests.get(signed_url, verify=self.verify)
|
||||
response.raise_for_status()
|
||||
except requests.RequestException, e:
|
||||
raise WcsApiError('GET request failed', signed_url, e)
|
||||
else:
|
||||
try:
|
||||
return response.json()
|
||||
content = response.json()
|
||||
self.cache[presigned_url] = content
|
||||
return content
|
||||
except ValueError, e:
|
||||
raise WcsApiError('Invalid JSON content', signed_url, e)
|
||||
|
||||
|
@ -116,10 +194,21 @@ class WcsApi(object):
|
|||
|
||||
@property
|
||||
def formdefs(self):
|
||||
return [FormDef(wcs_api=self, **d) for d in self.get_json(self.formdefs_url)]
|
||||
return [FormDef(wcs_api=self, **d) for d in self.get_json(self.formdefs_url)
|
||||
if not self.slugs or d['slug'] in self.slugs]
|
||||
|
||||
@property
|
||||
def categories(self):
|
||||
d = {}
|
||||
for f in self.formdefs:
|
||||
if hasattr(f.schema, 'category'):
|
||||
d[f.schema.category_id] = f.schema.category
|
||||
return [Category(wcs_api=self, id=k, name=v) for k, v in d.items()]
|
||||
|
||||
def get_formdata(self, slug):
|
||||
for d in self.get_json(self.forms_url, slug + '/'):
|
||||
for d in self.get_json(self.forms_url, slug + '/list?anonymise&full=on'):
|
||||
yield FormData(wcs_api=self, **d)
|
||||
|
||||
def get_schema(self, slug):
|
||||
return BaseObject(wcs_api=self, **self.get_json(self.formdefs_url, slug + '/', 'schema'))
|
||||
json_schema = self.get_json(self.formdefs_url, slug + '/', 'schema?anonymise')
|
||||
return Schema(wcs_api=self, **json_schema)
|
||||
|
|
Loading…
Reference in New Issue