remove eau potable and assainissement from global stats

This commit is contained in:
Benjamin Dauvergne 2016-04-28 14:51:55 +02:00
parent 3301318cef
commit fef4726e92
5 changed files with 140 additions and 82 deletions

View File

@ -153,17 +153,13 @@ CREATE VIEW bi_espaces_verts AS (SELECT
CREATE VIEW bi_all_forms AS (
SELECT formdef_id || '-' || id AS id, backoffice_submission, 'voirie' as form, receipt_time, status, pole_commune_ref FROM bi_voirie
UNION ALL
SELECT formdef_id || '-' || id AS id, backoffice_submission, 'reseaux_eau_potable' as form, receipt_time, status, pole_commune_ref FROM bi_reseaux_eau_potable
UNION ALL
SELECT formdef_id || '-' || id AS id, backoffice_submission, 'proprete' as form, receipt_time, status, pole_commune_ref FROM bi_proprete
UNION ALL
SELECT formdef_id || '-' || id AS id, backoffice_submission, 'mobilier_urbain' as form, receipt_time, status, pole_commune_ref FROM bi_mobilier_urbain
UNION ALL
SELECT formdef_id || '-' || id AS id, backoffice_submission, 'eclairage_public' as form, receipt_time, status, pole_commune_ref FROM bi_eclairage_public
UNION ALL
SELECT formdef_id || '-' || id AS id, backoffice_submission, 'espaces_verts' as form, receipt_time, status, pole_commune_ref FROM bi_espaces_verts
UNION ALL
SELECT formdef_id || '-' || id AS id, backoffice_submission, 'assainissement' as form, receipt_time, status, pole_commune_ref FROM bi_assainissement);
SELECT formdef_id || '-' || id AS id, backoffice_submission, 'espaces_verts' as form, receipt_time, status, pole_commune_ref FROM bi_espaces_verts;
CREATE VIEW bi_form AS (SELECT distinct(form) from bi_all_forms);

View File

@ -153,17 +153,13 @@ CREATE VIEW bi_espaces_verts AS (SELECT
CREATE VIEW bi_all_forms AS (
SELECT formdef_id || '-' || id AS id, backoffice_submission, 'voirie' as form, receipt_time, status, pole_commune_ref FROM bi_voirie
UNION ALL
SELECT formdef_id || '-' || id AS id, backoffice_submission, 'reseaux_eau_potable' as form, receipt_time, status, pole_commune_ref FROM bi_reseaux_eau_potable
UNION ALL
SELECT formdef_id || '-' || id AS id, backoffice_submission, 'proprete' as form, receipt_time, status, pole_commune_ref FROM bi_proprete
UNION ALL
SELECT formdef_id || '-' || id AS id, backoffice_submission, 'mobilier_urbain' as form, receipt_time, status, pole_commune_ref FROM bi_mobilier_urbain
UNION ALL
SELECT formdef_id || '-' || id AS id, backoffice_submission, 'eclairage_public' as form, receipt_time, status, pole_commune_ref FROM bi_eclairage_public
UNION ALL
SELECT formdef_id || '-' || id AS id, backoffice_submission, 'espaces_verts' as form, receipt_time, status, pole_commune_ref FROM bi_espaces_verts
UNION ALL
SELECT formdef_id || '-' || id AS id, backoffice_submission, 'assainissement' as form, receipt_time, status, pole_commune_ref FROM bi_assainissement);
SELECT formdef_id || '-' || id AS id, backoffice_submission, 'espaces_verts' as form, receipt_time, status, pole_commune_ref FROM bi_espaces_verts;
CREATE VIEW bi_form AS (SELECT distinct(form) from bi_all_forms);

View File

@ -1,23 +1,27 @@
import argparse
import ConfigParser
import os
import elasticsearch
import urlparse
import logging
import logging.config
from . import wcs_api
from .feeder import WcsEsFeeder
from .feeder import WcsOlapFeeder
import locale
def main():
locale.setlocale(locale.LC_ALL, '')
config = ConfigParser.ConfigParser()
config_file = os.path.expanduser('~/.wcs_es.ini')
if os.path.exists(config_file):
config.read(config_file)
if config.has_section('loggers'):
logging.config.fileConfig(config_file)
global_config_file = '/etc/wcs_olap.ini'
if os.path.exists(global_config_file):
config.read(global_config_file)
if config.has_section('loggers'):
logging.config.fileConfig(global_config_file)
user_config_file = os.path.expanduser('~/.wcs_olap.ini')
if os.path.exists(user_config_file):
config.read(user_config_file)
if config.has_section('loggers'):
logging.config.fileConfig(user_config_file)
urls = [url for url in config.sections() if url.startswith('http://') or
url.startswith('https://')]
parser = argparse.ArgumentParser(description='Engine ES with W.C.S. data', add_help=False)
@ -35,23 +39,19 @@ def main():
required='email' not in defaults and 'name_id' not in defaults)
group.add_argument('--email', help='email for authentication')
group.add_argument('--name-id', help='NameID for authentication')
parser.add_argument('--es-host', help='ElasticSearch hostname', default='localhost')
parser.add_argument('--es-port', help='ElasticSearch port', type=int, default=9200)
parser.add_argument('--es-no-recreate', help='do not recreate all indexes, juste update them', dest='es_recreate',
default=True, action='store_false')
if defaults:
parser.set_defaults(**defaults)
parser.add_argument('--pg-dsn', help='Psycopg2 DB DSN', required='pg-dsn' not in defaults)
args = parser.parse_args()
api = wcs_api.WcsApi(url=args.url, orig=args.orig, key=args.key, email=args.email,
name_id=args.name_id)
base_index_name = urlparse.urlparse(args.url).netloc.split(':')[0].replace('.', '_')
es_hosts = [{'host': args.es_host, 'port': args.es_port, 'use_ssl': False}]
es = elasticsearch.Elasticsearch(es_hosts)
logger = logging.getLogger('wcs-es')
domain = urlparse.urlparse(args.url).netloc.split(':')[0]
schema = defaults.get('schema') or domain.replace('.', '_')
logger = logging.getLogger('wcs-olap')
logger.info('starting synchronizing w.c.s. at %r with ES at %s:%s', args.url, args.es_host,
args.es_port)
feeder = WcsEsFeeder(api, es, base_index_name, recreate=args.es_recreate, logger=logger)
feeder = WcsOlapFeeder(api=api, schema=schema, db_url=args.db_url, logger=logger)
feeder.feed()
logger.info('finished')

View File

@ -1,64 +1,118 @@
import logging
from . import wcs_api
from utils import Whatever
import psycopg2
from sqlalchemy import create_engine, MetaData, Table, Column, UnicodeText, DATETIME, Index, INTEGER
from sqlalchemy.dialects.postgresql import HSTORE
class WcsOlapFeeder(object):
def __init__(self, api, db_url, base_index_name, recreate=False, logger=None):
def __init__(self, api, dsn, schema, logger=None):
self.api = api
self.engine = create_engine(db_url)
self.meta = MetaData()
self.recreate = recreate
self.logger = logger or logging.getLogger(__name__)
self.initialize_base_table()
self.logger = logger or Whatever()
self.schema = schema
self.connection = psycopg2.connect(dsn)
self.cur = self.connection.cursor()
self.formdefs = api.get_formdefs()
self.roles = api.roles
def initialize_base_table(self):
self.base_table = Table('forms', self.meta,
Column('id', INTEGER, nullable=False),
Column('formdef', UnicodeText, nullable=False),
Column('receipt_time', DATETIME, nullable=False),
Column('fields', HSTORE), nullable=True))
Index('base_index', self.base_table.c.formdef, self.base_table.c.receipt_time)
Index('filter_index', self.base_table.c.formdef, self.base_table.c.receipt_time,
self.base_table.c.fields, postgresql_using='gin')
self.base_table.create(self.engine, check_first=True)
@property
def default_ctx(self):
return {
'schema': self.schema,
'role_table': 'role',
'channel_table': 'channel',
}
def ex(self, query, ctx=None, vars=None):
ctx = ctx or {}
ctx.update(self.default_ctx)
self.cur.execute(query.format(**(ctx or {})), vars=vars)
def exmany(self, query, varslist, ctx=None):
ctx = ctx or {}
ctx.update(self.default_ctx)
self.cur.executemany(query.format(**(ctx or {})), varslist)
def do_schema(self):
self.ex('SET search_path = public')
self.ex('DROP SCHEMA {schema} IF EXISTS')
self.ex('CREATE SCHEMA {schema} IF EXISTS')
self.ex('SET search_path = {schema},public')
channels = [
[1, 'web', u'web']
[2, 'mail', u'courrier'],
[3, 'phone', u'téléphone'],
[4, 'counter', u'guichet'],
]
channel_to_id = dict((c[1], c[0]) for c in channels)
id_to_channel = dict((c[0], c[1]) for c in channels)
def do_base_table(self):
self.ex('CREATE TABLE {channel_table} (id serial PRIMARY KEY, label varchar)')
self.exmany('INSERT INTO {channel_table} (id, label) VALUES (%s, %s)',
[[c[0], c[2]] for c in self.channels])
self.ex('CREATE TABLE {role_table} (id serial PRIMARY KEY, label varchar)')
self.exmany('INSERT INTO {role_table} (label) VALUES (%s) RETURNING (id)',
[[role.name] for role in self.roles])
self.roles_mapping = []
for row, role in zip(self.cur.fetchall(), self.roles):
self.roles_mappin[role.id] = row[0]
def feed(self):
self.do_schema()
self.do_base_table()
for formdef in self.api.get_formdefs():
self.logger.info('created index %r', self.formdef_index)
self.feed_formdef(formdef)
def feed_formdef(self, formdef):
self.logger.info('start loading data for formdef %r', formdef.slug)
conn = self.engine.connect()
# Indef formdatas
if self.recreate:
conn.execute(self.base_table.delete().where(self.base_table.formdef == formdef.slug)
try:
datas = formdef.datas
except wcs_api.WcsApiError, e:
logging.error('unable to get formdatas for formdef %r: %s', formdef.slug, e)
else:
for data in datas:
class WcsFormdefFeeder(object):
def __init__(self, olap_feeder, formdef):
self.olap_feeder = olap_feeder
self.formdef = formdef
self.status_mapping = {}
def configure_formdef_mapping(self, index, doc_type, formdef):
self.configure_field(index, doc_type, ['display_id'], {'type': 'long'})
for field in formdef.schema.fields:
if field['type'] == 'map' and field.get('varname'):
self.configure_field(index, doc_type, ['fields', field['varname']],
{'type': 'geo_point'})
if field['type'] in ('item', 'string') and field.get('varname'):
self.configure_field(index, doc_type, ['fields', field['varname']],
{'type': 'string', 'index': 'not_analyzed'})
@property
def table_name(self):
return 'formdata_%s' % self.formdef.slug.replace('-', '_')
def configure_field(self, index, doc_type, field_path, defn):
assert field_path
@property
def status_table_name(self):
return 'formdata_status_%s' % self.formdef.slug.replace('-', '_')
@property
def default_ctx(self):
return {
'table_name': self.table_name,
'status_table_name': self.status_table_name,
}
def __getattr__(self, name):
return getattr(self.olap_feeder, name)
def ex(self, query, ctx=None, vars=None):
ctx = ctx or {}
ctx.update(self.default_ctx)
self.olap_feeder.ex(query, ctx=ctx, vars=vars)
def formdef_exmany(self, formdef, statement, varslist, ctx=None):
ctx = ctx or {}
ctx.update(self.default_ctx)
self.olap_feeder.exmany(statement, varslist, ctx=ctx)
def do_statuses(self):
self.ex('CREATE TABLE {status_table_name} (id serial PRIMARY KEY, '
'submission_backoffice label varchar)')
statuses = self.formdef.schema.workflow['statuses'].items()
labels = [[defn['name']] for status, defn in statuses]
self.exmany('INSERT INTO {status_mapping} (label) VALUES (%s) RETURNING (id)',
varslist=labels)
for status_sql_id, (status_id, defn) in zip(self.cur.fetchall(), statuses):
self.status_mapping['wf-%s' % status_id] = status_sql_id
def do_data_table(self):
self.ex('CREATE TABLE {table_name} (id serial PRIMARY KEY, '
'receipt_date date, '
'status integer REFERENCES {status_table_name} (id))')
def feed(self):
self.logger.info('feed formdef %s', self.formdef.slug)
self.do_statuses()
self.do_data_table()
body = {}
cursor = body
for part in field_path:
cursor['properties'] = {}
cursor['properties'][part] = cursor = {}
cursor.update(defn)
self.es.indices.put_mapping(index=index, doc_type=doc_type, body=body)

View File

@ -64,6 +64,10 @@ class FormDef(BaseObject):
return '<{klass} {slug!r}>'.format(klass=self.__class__.__name__, slug=self.slug)
class Role(BaseObject):
pass
class WcsApi(object):
def __init__(self, url, orig, key, name_id=None, email=None, verify=False):
self.url = url
@ -81,6 +85,10 @@ class WcsApi(object):
def forms_url(self):
return urlparse.urljoin(self.url, 'api/forms/')
@property
def roles_url(self):
return urlparse.urljoin(self.url, 'api/roles')
def get_json(self, *url_parts):
url = reduce(lambda x, y: urlparse.urljoin(x, y), url_parts)
params = {'orig': self.orig}
@ -102,12 +110,16 @@ class WcsApi(object):
except ValueError, e:
raise WcsApiError('Invalid JSON content', signed_url, e)
def get_formdefs(self):
return [FormDef(wcs_api=self, **d) for d in self.get_json(self.formdefs_url)]
@property
def roles(self):
return [Role(wcs_api=self, **d) for d in self.get_json(self.roles_url)['data']]
@property
def formdefs(self):
return [FormDef(wcs_api=self, **d) for d in self.get_json(self.formdefs_url)]
def get_formdata(self, slug):
return [FormData(wcs_api=self, **d) for d in self.get_json(self.forms_url, slug + '/',
'list?full=on')]
for d in self.get_json(self.forms_url, slug + '/'):
yield FormData(wcs_api=self, **d)
def get_schema(self, slug):
return BaseObject(wcs_api=self, **self.get_json(self.formdefs_url, slug+'/', 'schema'))
return BaseObject(wcs_api=self, **self.get_json(self.formdefs_url, slug + '/', 'schema'))