remove eau potable and assainissement from global stats
This commit is contained in:
parent
3301318cef
commit
fef4726e92
|
@ -153,17 +153,13 @@ CREATE VIEW bi_espaces_verts AS (SELECT
|
|||
CREATE VIEW bi_all_forms AS (
|
||||
SELECT formdef_id || '-' || id AS id, backoffice_submission, 'voirie' as form, receipt_time, status, pole_commune_ref FROM bi_voirie
|
||||
UNION ALL
|
||||
SELECT formdef_id || '-' || id AS id, backoffice_submission, 'reseaux_eau_potable' as form, receipt_time, status, pole_commune_ref FROM bi_reseaux_eau_potable
|
||||
UNION ALL
|
||||
SELECT formdef_id || '-' || id AS id, backoffice_submission, 'proprete' as form, receipt_time, status, pole_commune_ref FROM bi_proprete
|
||||
UNION ALL
|
||||
SELECT formdef_id || '-' || id AS id, backoffice_submission, 'mobilier_urbain' as form, receipt_time, status, pole_commune_ref FROM bi_mobilier_urbain
|
||||
UNION ALL
|
||||
SELECT formdef_id || '-' || id AS id, backoffice_submission, 'eclairage_public' as form, receipt_time, status, pole_commune_ref FROM bi_eclairage_public
|
||||
UNION ALL
|
||||
SELECT formdef_id || '-' || id AS id, backoffice_submission, 'espaces_verts' as form, receipt_time, status, pole_commune_ref FROM bi_espaces_verts
|
||||
UNION ALL
|
||||
SELECT formdef_id || '-' || id AS id, backoffice_submission, 'assainissement' as form, receipt_time, status, pole_commune_ref FROM bi_assainissement);
|
||||
SELECT formdef_id || '-' || id AS id, backoffice_submission, 'espaces_verts' as form, receipt_time, status, pole_commune_ref FROM bi_espaces_verts;
|
||||
|
||||
CREATE VIEW bi_form AS (SELECT distinct(form) from bi_all_forms);
|
||||
|
||||
|
|
|
@ -153,17 +153,13 @@ CREATE VIEW bi_espaces_verts AS (SELECT
|
|||
CREATE VIEW bi_all_forms AS (
|
||||
SELECT formdef_id || '-' || id AS id, backoffice_submission, 'voirie' as form, receipt_time, status, pole_commune_ref FROM bi_voirie
|
||||
UNION ALL
|
||||
SELECT formdef_id || '-' || id AS id, backoffice_submission, 'reseaux_eau_potable' as form, receipt_time, status, pole_commune_ref FROM bi_reseaux_eau_potable
|
||||
UNION ALL
|
||||
SELECT formdef_id || '-' || id AS id, backoffice_submission, 'proprete' as form, receipt_time, status, pole_commune_ref FROM bi_proprete
|
||||
UNION ALL
|
||||
SELECT formdef_id || '-' || id AS id, backoffice_submission, 'mobilier_urbain' as form, receipt_time, status, pole_commune_ref FROM bi_mobilier_urbain
|
||||
UNION ALL
|
||||
SELECT formdef_id || '-' || id AS id, backoffice_submission, 'eclairage_public' as form, receipt_time, status, pole_commune_ref FROM bi_eclairage_public
|
||||
UNION ALL
|
||||
SELECT formdef_id || '-' || id AS id, backoffice_submission, 'espaces_verts' as form, receipt_time, status, pole_commune_ref FROM bi_espaces_verts
|
||||
UNION ALL
|
||||
SELECT formdef_id || '-' || id AS id, backoffice_submission, 'assainissement' as form, receipt_time, status, pole_commune_ref FROM bi_assainissement);
|
||||
SELECT formdef_id || '-' || id AS id, backoffice_submission, 'espaces_verts' as form, receipt_time, status, pole_commune_ref FROM bi_espaces_verts;
|
||||
|
||||
CREATE VIEW bi_form AS (SELECT distinct(form) from bi_all_forms);
|
||||
|
||||
|
|
|
@ -1,23 +1,27 @@
|
|||
import argparse
|
||||
import ConfigParser
|
||||
import os
|
||||
import elasticsearch
|
||||
import urlparse
|
||||
import logging
|
||||
import logging.config
|
||||
from . import wcs_api
|
||||
from .feeder import WcsEsFeeder
|
||||
from .feeder import WcsOlapFeeder
|
||||
import locale
|
||||
|
||||
|
||||
def main():
|
||||
locale.setlocale(locale.LC_ALL, '')
|
||||
config = ConfigParser.ConfigParser()
|
||||
config_file = os.path.expanduser('~/.wcs_es.ini')
|
||||
if os.path.exists(config_file):
|
||||
config.read(config_file)
|
||||
if config.has_section('loggers'):
|
||||
logging.config.fileConfig(config_file)
|
||||
global_config_file = '/etc/wcs_olap.ini'
|
||||
if os.path.exists(global_config_file):
|
||||
config.read(global_config_file)
|
||||
if config.has_section('loggers'):
|
||||
logging.config.fileConfig(global_config_file)
|
||||
user_config_file = os.path.expanduser('~/.wcs_olap.ini')
|
||||
if os.path.exists(user_config_file):
|
||||
config.read(user_config_file)
|
||||
if config.has_section('loggers'):
|
||||
logging.config.fileConfig(user_config_file)
|
||||
urls = [url for url in config.sections() if url.startswith('http://') or
|
||||
url.startswith('https://')]
|
||||
parser = argparse.ArgumentParser(description='Engine ES with W.C.S. data', add_help=False)
|
||||
|
@ -35,23 +39,19 @@ def main():
|
|||
required='email' not in defaults and 'name_id' not in defaults)
|
||||
group.add_argument('--email', help='email for authentication')
|
||||
group.add_argument('--name-id', help='NameID for authentication')
|
||||
parser.add_argument('--es-host', help='ElasticSearch hostname', default='localhost')
|
||||
parser.add_argument('--es-port', help='ElasticSearch port', type=int, default=9200)
|
||||
parser.add_argument('--es-no-recreate', help='do not recreate all indexes, juste update them', dest='es_recreate',
|
||||
default=True, action='store_false')
|
||||
if defaults:
|
||||
parser.set_defaults(**defaults)
|
||||
parser.add_argument('--pg-dsn', help='Psycopg2 DB DSN', required='pg-dsn' not in defaults)
|
||||
|
||||
args = parser.parse_args()
|
||||
api = wcs_api.WcsApi(url=args.url, orig=args.orig, key=args.key, email=args.email,
|
||||
name_id=args.name_id)
|
||||
base_index_name = urlparse.urlparse(args.url).netloc.split(':')[0].replace('.', '_')
|
||||
es_hosts = [{'host': args.es_host, 'port': args.es_port, 'use_ssl': False}]
|
||||
es = elasticsearch.Elasticsearch(es_hosts)
|
||||
logger = logging.getLogger('wcs-es')
|
||||
domain = urlparse.urlparse(args.url).netloc.split(':')[0]
|
||||
schema = defaults.get('schema') or domain.replace('.', '_')
|
||||
logger = logging.getLogger('wcs-olap')
|
||||
logger.info('starting synchronizing w.c.s. at %r with ES at %s:%s', args.url, args.es_host,
|
||||
args.es_port)
|
||||
feeder = WcsEsFeeder(api, es, base_index_name, recreate=args.es_recreate, logger=logger)
|
||||
feeder = WcsOlapFeeder(api=api, schema=schema, db_url=args.db_url, logger=logger)
|
||||
feeder.feed()
|
||||
logger.info('finished')
|
||||
|
||||
|
|
|
@ -1,64 +1,118 @@
|
|||
import logging
|
||||
from . import wcs_api
|
||||
from utils import Whatever
|
||||
import psycopg2
|
||||
|
||||
from sqlalchemy import create_engine, MetaData, Table, Column, UnicodeText, DATETIME, Index, INTEGER
|
||||
from sqlalchemy.dialects.postgresql import HSTORE
|
||||
|
||||
class WcsOlapFeeder(object):
|
||||
def __init__(self, api, db_url, base_index_name, recreate=False, logger=None):
|
||||
def __init__(self, api, dsn, schema, logger=None):
|
||||
self.api = api
|
||||
self.engine = create_engine(db_url)
|
||||
self.meta = MetaData()
|
||||
self.recreate = recreate
|
||||
self.logger = logger or logging.getLogger(__name__)
|
||||
self.initialize_base_table()
|
||||
self.logger = logger or Whatever()
|
||||
self.schema = schema
|
||||
self.connection = psycopg2.connect(dsn)
|
||||
self.cur = self.connection.cursor()
|
||||
self.formdefs = api.get_formdefs()
|
||||
self.roles = api.roles
|
||||
|
||||
def initialize_base_table(self):
|
||||
self.base_table = Table('forms', self.meta,
|
||||
Column('id', INTEGER, nullable=False),
|
||||
Column('formdef', UnicodeText, nullable=False),
|
||||
Column('receipt_time', DATETIME, nullable=False),
|
||||
Column('fields', HSTORE), nullable=True))
|
||||
Index('base_index', self.base_table.c.formdef, self.base_table.c.receipt_time)
|
||||
Index('filter_index', self.base_table.c.formdef, self.base_table.c.receipt_time,
|
||||
self.base_table.c.fields, postgresql_using='gin')
|
||||
self.base_table.create(self.engine, check_first=True)
|
||||
@property
|
||||
def default_ctx(self):
|
||||
return {
|
||||
'schema': self.schema,
|
||||
'role_table': 'role',
|
||||
'channel_table': 'channel',
|
||||
}
|
||||
|
||||
def ex(self, query, ctx=None, vars=None):
|
||||
ctx = ctx or {}
|
||||
ctx.update(self.default_ctx)
|
||||
self.cur.execute(query.format(**(ctx or {})), vars=vars)
|
||||
|
||||
def exmany(self, query, varslist, ctx=None):
|
||||
ctx = ctx or {}
|
||||
ctx.update(self.default_ctx)
|
||||
self.cur.executemany(query.format(**(ctx or {})), varslist)
|
||||
|
||||
def do_schema(self):
|
||||
self.ex('SET search_path = public')
|
||||
self.ex('DROP SCHEMA {schema} IF EXISTS')
|
||||
self.ex('CREATE SCHEMA {schema} IF EXISTS')
|
||||
self.ex('SET search_path = {schema},public')
|
||||
|
||||
channels = [
|
||||
[1, 'web', u'web']
|
||||
[2, 'mail', u'courrier'],
|
||||
[3, 'phone', u'téléphone'],
|
||||
[4, 'counter', u'guichet'],
|
||||
]
|
||||
channel_to_id = dict((c[1], c[0]) for c in channels)
|
||||
id_to_channel = dict((c[0], c[1]) for c in channels)
|
||||
|
||||
def do_base_table(self):
|
||||
self.ex('CREATE TABLE {channel_table} (id serial PRIMARY KEY, label varchar)')
|
||||
self.exmany('INSERT INTO {channel_table} (id, label) VALUES (%s, %s)',
|
||||
[[c[0], c[2]] for c in self.channels])
|
||||
self.ex('CREATE TABLE {role_table} (id serial PRIMARY KEY, label varchar)')
|
||||
self.exmany('INSERT INTO {role_table} (label) VALUES (%s) RETURNING (id)',
|
||||
[[role.name] for role in self.roles])
|
||||
self.roles_mapping = []
|
||||
for row, role in zip(self.cur.fetchall(), self.roles):
|
||||
self.roles_mappin[role.id] = row[0]
|
||||
|
||||
def feed(self):
|
||||
self.do_schema()
|
||||
self.do_base_table()
|
||||
for formdef in self.api.get_formdefs():
|
||||
self.logger.info('created index %r', self.formdef_index)
|
||||
self.feed_formdef(formdef)
|
||||
|
||||
def feed_formdef(self, formdef):
|
||||
self.logger.info('start loading data for formdef %r', formdef.slug)
|
||||
conn = self.engine.connect()
|
||||
# Indef formdatas
|
||||
if self.recreate:
|
||||
conn.execute(self.base_table.delete().where(self.base_table.formdef == formdef.slug)
|
||||
try:
|
||||
datas = formdef.datas
|
||||
except wcs_api.WcsApiError, e:
|
||||
logging.error('unable to get formdatas for formdef %r: %s', formdef.slug, e)
|
||||
else:
|
||||
for data in datas:
|
||||
class WcsFormdefFeeder(object):
|
||||
def __init__(self, olap_feeder, formdef):
|
||||
self.olap_feeder = olap_feeder
|
||||
self.formdef = formdef
|
||||
self.status_mapping = {}
|
||||
|
||||
def configure_formdef_mapping(self, index, doc_type, formdef):
|
||||
self.configure_field(index, doc_type, ['display_id'], {'type': 'long'})
|
||||
for field in formdef.schema.fields:
|
||||
if field['type'] == 'map' and field.get('varname'):
|
||||
self.configure_field(index, doc_type, ['fields', field['varname']],
|
||||
{'type': 'geo_point'})
|
||||
if field['type'] in ('item', 'string') and field.get('varname'):
|
||||
self.configure_field(index, doc_type, ['fields', field['varname']],
|
||||
{'type': 'string', 'index': 'not_analyzed'})
|
||||
@property
|
||||
def table_name(self):
|
||||
return 'formdata_%s' % self.formdef.slug.replace('-', '_')
|
||||
|
||||
def configure_field(self, index, doc_type, field_path, defn):
|
||||
assert field_path
|
||||
@property
|
||||
def status_table_name(self):
|
||||
return 'formdata_status_%s' % self.formdef.slug.replace('-', '_')
|
||||
|
||||
@property
|
||||
def default_ctx(self):
|
||||
return {
|
||||
'table_name': self.table_name,
|
||||
'status_table_name': self.status_table_name,
|
||||
}
|
||||
|
||||
def __getattr__(self, name):
|
||||
return getattr(self.olap_feeder, name)
|
||||
|
||||
def ex(self, query, ctx=None, vars=None):
|
||||
ctx = ctx or {}
|
||||
ctx.update(self.default_ctx)
|
||||
self.olap_feeder.ex(query, ctx=ctx, vars=vars)
|
||||
|
||||
def formdef_exmany(self, formdef, statement, varslist, ctx=None):
|
||||
ctx = ctx or {}
|
||||
ctx.update(self.default_ctx)
|
||||
self.olap_feeder.exmany(statement, varslist, ctx=ctx)
|
||||
|
||||
def do_statuses(self):
|
||||
self.ex('CREATE TABLE {status_table_name} (id serial PRIMARY KEY, '
|
||||
'submission_backoffice label varchar)')
|
||||
statuses = self.formdef.schema.workflow['statuses'].items()
|
||||
labels = [[defn['name']] for status, defn in statuses]
|
||||
self.exmany('INSERT INTO {status_mapping} (label) VALUES (%s) RETURNING (id)',
|
||||
varslist=labels)
|
||||
for status_sql_id, (status_id, defn) in zip(self.cur.fetchall(), statuses):
|
||||
self.status_mapping['wf-%s' % status_id] = status_sql_id
|
||||
|
||||
def do_data_table(self):
|
||||
self.ex('CREATE TABLE {table_name} (id serial PRIMARY KEY, '
|
||||
'receipt_date date, '
|
||||
'status integer REFERENCES {status_table_name} (id))')
|
||||
|
||||
def feed(self):
|
||||
self.logger.info('feed formdef %s', self.formdef.slug)
|
||||
self.do_statuses()
|
||||
self.do_data_table()
|
||||
|
||||
body = {}
|
||||
cursor = body
|
||||
for part in field_path:
|
||||
cursor['properties'] = {}
|
||||
cursor['properties'][part] = cursor = {}
|
||||
cursor.update(defn)
|
||||
self.es.indices.put_mapping(index=index, doc_type=doc_type, body=body)
|
||||
|
|
|
@ -64,6 +64,10 @@ class FormDef(BaseObject):
|
|||
return '<{klass} {slug!r}>'.format(klass=self.__class__.__name__, slug=self.slug)
|
||||
|
||||
|
||||
class Role(BaseObject):
|
||||
pass
|
||||
|
||||
|
||||
class WcsApi(object):
|
||||
def __init__(self, url, orig, key, name_id=None, email=None, verify=False):
|
||||
self.url = url
|
||||
|
@ -81,6 +85,10 @@ class WcsApi(object):
|
|||
def forms_url(self):
|
||||
return urlparse.urljoin(self.url, 'api/forms/')
|
||||
|
||||
@property
|
||||
def roles_url(self):
|
||||
return urlparse.urljoin(self.url, 'api/roles')
|
||||
|
||||
def get_json(self, *url_parts):
|
||||
url = reduce(lambda x, y: urlparse.urljoin(x, y), url_parts)
|
||||
params = {'orig': self.orig}
|
||||
|
@ -102,12 +110,16 @@ class WcsApi(object):
|
|||
except ValueError, e:
|
||||
raise WcsApiError('Invalid JSON content', signed_url, e)
|
||||
|
||||
def get_formdefs(self):
|
||||
return [FormDef(wcs_api=self, **d) for d in self.get_json(self.formdefs_url)]
|
||||
@property
|
||||
def roles(self):
|
||||
return [Role(wcs_api=self, **d) for d in self.get_json(self.roles_url)['data']]
|
||||
|
||||
@property
|
||||
def formdefs(self):
|
||||
return [FormDef(wcs_api=self, **d) for d in self.get_json(self.formdefs_url)]
|
||||
def get_formdata(self, slug):
|
||||
return [FormData(wcs_api=self, **d) for d in self.get_json(self.forms_url, slug + '/',
|
||||
'list?full=on')]
|
||||
for d in self.get_json(self.forms_url, slug + '/'):
|
||||
yield FormData(wcs_api=self, **d)
|
||||
|
||||
def get_schema(self, slug):
|
||||
return BaseObject(wcs_api=self, **self.get_json(self.formdefs_url, slug+'/', 'schema'))
|
||||
return BaseObject(wcs_api=self, **self.get_json(self.formdefs_url, slug + '/', 'schema'))
|
||||
|
|
Reference in New Issue