From fef4726e92fb0c38cd897873febc63177149581b Mon Sep 17 00:00:00 2001 From: Benjamin Dauvergne Date: Thu, 28 Apr 2016 14:51:55 +0200 Subject: [PATCH] remove eau potable and assainissement from global stats --- 3m/3m_prod.sql | 6 +- 3m/3m_test.sql | 6 +- wcs_olap/cmd.py | 32 ++++----- wcs_olap/feeder.py | 156 +++++++++++++++++++++++++++++--------------- wcs_olap/wcs_api.py | 22 +++++-- 5 files changed, 140 insertions(+), 82 deletions(-) diff --git a/3m/3m_prod.sql b/3m/3m_prod.sql index 38d3e60..e54ac67 100644 --- a/3m/3m_prod.sql +++ b/3m/3m_prod.sql @@ -153,17 +153,13 @@ CREATE VIEW bi_espaces_verts AS (SELECT CREATE VIEW bi_all_forms AS ( SELECT formdef_id || '-' || id AS id, backoffice_submission, 'voirie' as form, receipt_time, status, pole_commune_ref FROM bi_voirie UNION ALL - SELECT formdef_id || '-' || id AS id, backoffice_submission, 'reseaux_eau_potable' as form, receipt_time, status, pole_commune_ref FROM bi_reseaux_eau_potable - UNION ALL SELECT formdef_id || '-' || id AS id, backoffice_submission, 'proprete' as form, receipt_time, status, pole_commune_ref FROM bi_proprete UNION ALL SELECT formdef_id || '-' || id AS id, backoffice_submission, 'mobilier_urbain' as form, receipt_time, status, pole_commune_ref FROM bi_mobilier_urbain UNION ALL SELECT formdef_id || '-' || id AS id, backoffice_submission, 'eclairage_public' as form, receipt_time, status, pole_commune_ref FROM bi_eclairage_public UNION ALL - SELECT formdef_id || '-' || id AS id, backoffice_submission, 'espaces_verts' as form, receipt_time, status, pole_commune_ref FROM bi_espaces_verts - UNION ALL - SELECT formdef_id || '-' || id AS id, backoffice_submission, 'assainissement' as form, receipt_time, status, pole_commune_ref FROM bi_assainissement); + SELECT formdef_id || '-' || id AS id, backoffice_submission, 'espaces_verts' as form, receipt_time, status, pole_commune_ref FROM bi_espaces_verts; CREATE VIEW bi_form AS (SELECT distinct(form) from bi_all_forms); diff --git a/3m/3m_test.sql b/3m/3m_test.sql index 3366108..2724350 100644 --- a/3m/3m_test.sql +++ b/3m/3m_test.sql @@ -153,17 +153,13 @@ CREATE VIEW bi_espaces_verts AS (SELECT CREATE VIEW bi_all_forms AS ( SELECT formdef_id || '-' || id AS id, backoffice_submission, 'voirie' as form, receipt_time, status, pole_commune_ref FROM bi_voirie UNION ALL - SELECT formdef_id || '-' || id AS id, backoffice_submission, 'reseaux_eau_potable' as form, receipt_time, status, pole_commune_ref FROM bi_reseaux_eau_potable - UNION ALL SELECT formdef_id || '-' || id AS id, backoffice_submission, 'proprete' as form, receipt_time, status, pole_commune_ref FROM bi_proprete UNION ALL SELECT formdef_id || '-' || id AS id, backoffice_submission, 'mobilier_urbain' as form, receipt_time, status, pole_commune_ref FROM bi_mobilier_urbain UNION ALL SELECT formdef_id || '-' || id AS id, backoffice_submission, 'eclairage_public' as form, receipt_time, status, pole_commune_ref FROM bi_eclairage_public UNION ALL - SELECT formdef_id || '-' || id AS id, backoffice_submission, 'espaces_verts' as form, receipt_time, status, pole_commune_ref FROM bi_espaces_verts - UNION ALL - SELECT formdef_id || '-' || id AS id, backoffice_submission, 'assainissement' as form, receipt_time, status, pole_commune_ref FROM bi_assainissement); + SELECT formdef_id || '-' || id AS id, backoffice_submission, 'espaces_verts' as form, receipt_time, status, pole_commune_ref FROM bi_espaces_verts; CREATE VIEW bi_form AS (SELECT distinct(form) from bi_all_forms); diff --git a/wcs_olap/cmd.py b/wcs_olap/cmd.py index 6256b84..eb7491d 100644 --- a/wcs_olap/cmd.py +++ b/wcs_olap/cmd.py @@ -1,23 +1,27 @@ import argparse import ConfigParser import os -import elasticsearch import urlparse import logging import logging.config from . import wcs_api -from .feeder import WcsEsFeeder +from .feeder import WcsOlapFeeder import locale def main(): locale.setlocale(locale.LC_ALL, '') config = ConfigParser.ConfigParser() - config_file = os.path.expanduser('~/.wcs_es.ini') - if os.path.exists(config_file): - config.read(config_file) - if config.has_section('loggers'): - logging.config.fileConfig(config_file) + global_config_file = '/etc/wcs_olap.ini' + if os.path.exists(global_config_file): + config.read(global_config_file) + if config.has_section('loggers'): + logging.config.fileConfig(global_config_file) + user_config_file = os.path.expanduser('~/.wcs_olap.ini') + if os.path.exists(user_config_file): + config.read(user_config_file) + if config.has_section('loggers'): + logging.config.fileConfig(user_config_file) urls = [url for url in config.sections() if url.startswith('http://') or url.startswith('https://')] parser = argparse.ArgumentParser(description='Engine ES with W.C.S. data', add_help=False) @@ -35,23 +39,19 @@ def main(): required='email' not in defaults and 'name_id' not in defaults) group.add_argument('--email', help='email for authentication') group.add_argument('--name-id', help='NameID for authentication') - parser.add_argument('--es-host', help='ElasticSearch hostname', default='localhost') - parser.add_argument('--es-port', help='ElasticSearch port', type=int, default=9200) - parser.add_argument('--es-no-recreate', help='do not recreate all indexes, juste update them', dest='es_recreate', - default=True, action='store_false') if defaults: parser.set_defaults(**defaults) + parser.add_argument('--pg-dsn', help='Psycopg2 DB DSN', required='pg-dsn' not in defaults) args = parser.parse_args() api = wcs_api.WcsApi(url=args.url, orig=args.orig, key=args.key, email=args.email, name_id=args.name_id) - base_index_name = urlparse.urlparse(args.url).netloc.split(':')[0].replace('.', '_') - es_hosts = [{'host': args.es_host, 'port': args.es_port, 'use_ssl': False}] - es = elasticsearch.Elasticsearch(es_hosts) - logger = logging.getLogger('wcs-es') + domain = urlparse.urlparse(args.url).netloc.split(':')[0] + schema = defaults.get('schema') or domain.replace('.', '_') + logger = logging.getLogger('wcs-olap') logger.info('starting synchronizing w.c.s. at %r with ES at %s:%s', args.url, args.es_host, args.es_port) - feeder = WcsEsFeeder(api, es, base_index_name, recreate=args.es_recreate, logger=logger) + feeder = WcsOlapFeeder(api=api, schema=schema, db_url=args.db_url, logger=logger) feeder.feed() logger.info('finished') diff --git a/wcs_olap/feeder.py b/wcs_olap/feeder.py index 2c408a7..64da4ea 100644 --- a/wcs_olap/feeder.py +++ b/wcs_olap/feeder.py @@ -1,64 +1,118 @@ -import logging -from . import wcs_api +from utils import Whatever +import psycopg2 -from sqlalchemy import create_engine, MetaData, Table, Column, UnicodeText, DATETIME, Index, INTEGER -from sqlalchemy.dialects.postgresql import HSTORE class WcsOlapFeeder(object): - def __init__(self, api, db_url, base_index_name, recreate=False, logger=None): + def __init__(self, api, dsn, schema, logger=None): self.api = api - self.engine = create_engine(db_url) - self.meta = MetaData() - self.recreate = recreate - self.logger = logger or logging.getLogger(__name__) - self.initialize_base_table() + self.logger = logger or Whatever() + self.schema = schema + self.connection = psycopg2.connect(dsn) + self.cur = self.connection.cursor() + self.formdefs = api.get_formdefs() + self.roles = api.roles - def initialize_base_table(self): - self.base_table = Table('forms', self.meta, - Column('id', INTEGER, nullable=False), - Column('formdef', UnicodeText, nullable=False), - Column('receipt_time', DATETIME, nullable=False), - Column('fields', HSTORE), nullable=True)) - Index('base_index', self.base_table.c.formdef, self.base_table.c.receipt_time) - Index('filter_index', self.base_table.c.formdef, self.base_table.c.receipt_time, - self.base_table.c.fields, postgresql_using='gin') - self.base_table.create(self.engine, check_first=True) + @property + def default_ctx(self): + return { + 'schema': self.schema, + 'role_table': 'role', + 'channel_table': 'channel', + } + + def ex(self, query, ctx=None, vars=None): + ctx = ctx or {} + ctx.update(self.default_ctx) + self.cur.execute(query.format(**(ctx or {})), vars=vars) + + def exmany(self, query, varslist, ctx=None): + ctx = ctx or {} + ctx.update(self.default_ctx) + self.cur.executemany(query.format(**(ctx or {})), varslist) + + def do_schema(self): + self.ex('SET search_path = public') + self.ex('DROP SCHEMA {schema} IF EXISTS') + self.ex('CREATE SCHEMA {schema} IF EXISTS') + self.ex('SET search_path = {schema},public') + + channels = [ + [1, 'web', u'web'] + [2, 'mail', u'courrier'], + [3, 'phone', u'téléphone'], + [4, 'counter', u'guichet'], + ] + channel_to_id = dict((c[1], c[0]) for c in channels) + id_to_channel = dict((c[0], c[1]) for c in channels) + + def do_base_table(self): + self.ex('CREATE TABLE {channel_table} (id serial PRIMARY KEY, label varchar)') + self.exmany('INSERT INTO {channel_table} (id, label) VALUES (%s, %s)', + [[c[0], c[2]] for c in self.channels]) + self.ex('CREATE TABLE {role_table} (id serial PRIMARY KEY, label varchar)') + self.exmany('INSERT INTO {role_table} (label) VALUES (%s) RETURNING (id)', + [[role.name] for role in self.roles]) + self.roles_mapping = [] + for row, role in zip(self.cur.fetchall(), self.roles): + self.roles_mappin[role.id] = row[0] def feed(self): + self.do_schema() + self.do_base_table() for formdef in self.api.get_formdefs(): - self.logger.info('created index %r', self.formdef_index) self.feed_formdef(formdef) - def feed_formdef(self, formdef): - self.logger.info('start loading data for formdef %r', formdef.slug) - conn = self.engine.connect() - # Indef formdatas - if self.recreate: - conn.execute(self.base_table.delete().where(self.base_table.formdef == formdef.slug) - try: - datas = formdef.datas - except wcs_api.WcsApiError, e: - logging.error('unable to get formdatas for formdef %r: %s', formdef.slug, e) - else: - for data in datas: +class WcsFormdefFeeder(object): + def __init__(self, olap_feeder, formdef): + self.olap_feeder = olap_feeder + self.formdef = formdef + self.status_mapping = {} - def configure_formdef_mapping(self, index, doc_type, formdef): - self.configure_field(index, doc_type, ['display_id'], {'type': 'long'}) - for field in formdef.schema.fields: - if field['type'] == 'map' and field.get('varname'): - self.configure_field(index, doc_type, ['fields', field['varname']], - {'type': 'geo_point'}) - if field['type'] in ('item', 'string') and field.get('varname'): - self.configure_field(index, doc_type, ['fields', field['varname']], - {'type': 'string', 'index': 'not_analyzed'}) + @property + def table_name(self): + return 'formdata_%s' % self.formdef.slug.replace('-', '_') - def configure_field(self, index, doc_type, field_path, defn): - assert field_path + @property + def status_table_name(self): + return 'formdata_status_%s' % self.formdef.slug.replace('-', '_') + + @property + def default_ctx(self): + return { + 'table_name': self.table_name, + 'status_table_name': self.status_table_name, + } + + def __getattr__(self, name): + return getattr(self.olap_feeder, name) + + def ex(self, query, ctx=None, vars=None): + ctx = ctx or {} + ctx.update(self.default_ctx) + self.olap_feeder.ex(query, ctx=ctx, vars=vars) + + def formdef_exmany(self, formdef, statement, varslist, ctx=None): + ctx = ctx or {} + ctx.update(self.default_ctx) + self.olap_feeder.exmany(statement, varslist, ctx=ctx) + + def do_statuses(self): + self.ex('CREATE TABLE {status_table_name} (id serial PRIMARY KEY, ' + 'submission_backoffice label varchar)') + statuses = self.formdef.schema.workflow['statuses'].items() + labels = [[defn['name']] for status, defn in statuses] + self.exmany('INSERT INTO {status_mapping} (label) VALUES (%s) RETURNING (id)', + varslist=labels) + for status_sql_id, (status_id, defn) in zip(self.cur.fetchall(), statuses): + self.status_mapping['wf-%s' % status_id] = status_sql_id + + def do_data_table(self): + self.ex('CREATE TABLE {table_name} (id serial PRIMARY KEY, ' + 'receipt_date date, ' + 'status integer REFERENCES {status_table_name} (id))') + + def feed(self): + self.logger.info('feed formdef %s', self.formdef.slug) + self.do_statuses() + self.do_data_table() - body = {} - cursor = body - for part in field_path: - cursor['properties'] = {} - cursor['properties'][part] = cursor = {} - cursor.update(defn) - self.es.indices.put_mapping(index=index, doc_type=doc_type, body=body) diff --git a/wcs_olap/wcs_api.py b/wcs_olap/wcs_api.py index 08621c9..d051e47 100644 --- a/wcs_olap/wcs_api.py +++ b/wcs_olap/wcs_api.py @@ -64,6 +64,10 @@ class FormDef(BaseObject): return '<{klass} {slug!r}>'.format(klass=self.__class__.__name__, slug=self.slug) +class Role(BaseObject): + pass + + class WcsApi(object): def __init__(self, url, orig, key, name_id=None, email=None, verify=False): self.url = url @@ -81,6 +85,10 @@ class WcsApi(object): def forms_url(self): return urlparse.urljoin(self.url, 'api/forms/') + @property + def roles_url(self): + return urlparse.urljoin(self.url, 'api/roles') + def get_json(self, *url_parts): url = reduce(lambda x, y: urlparse.urljoin(x, y), url_parts) params = {'orig': self.orig} @@ -102,12 +110,16 @@ class WcsApi(object): except ValueError, e: raise WcsApiError('Invalid JSON content', signed_url, e) - def get_formdefs(self): - return [FormDef(wcs_api=self, **d) for d in self.get_json(self.formdefs_url)] + @property + def roles(self): + return [Role(wcs_api=self, **d) for d in self.get_json(self.roles_url)['data']] + @property + def formdefs(self): + return [FormDef(wcs_api=self, **d) for d in self.get_json(self.formdefs_url)] def get_formdata(self, slug): - return [FormData(wcs_api=self, **d) for d in self.get_json(self.forms_url, slug + '/', - 'list?full=on')] + for d in self.get_json(self.forms_url, slug + '/'): + yield FormData(wcs_api=self, **d) def get_schema(self, slug): - return BaseObject(wcs_api=self, **self.get_json(self.formdefs_url, slug+'/', 'schema')) + return BaseObject(wcs_api=self, **self.get_json(self.formdefs_url, slug + '/', 'schema'))