trivial: apply pre-commit

This commit is contained in:
Frédéric Péters 2022-07-16 08:36:18 +02:00
parent c27d346d92
commit 461254e986
12 changed files with 540 additions and 396 deletions

5
debian/control vendored
View File

@ -2,7 +2,10 @@ Source: wcs-olap
Section: python
Priority: optional
Maintainer: Benjamin Dauvergne <bdauvergne@entrouvert.com>
Build-Depends: python3-setuptools, python3-all, debhelper-compat (= 12), dh-python
Build-Depends: debhelper-compat (= 12),
dh-python,
python3-all,
python3-setuptools,
Standards-Version: 3.9.6
Homepage: http://dev.entrouvert.org/projects/wcs-olap/

View File

@ -1,9 +1,9 @@
#! /usr/bin/env python
import subprocess
import os
import subprocess
from setuptools import setup, find_packages
from setuptools import find_packages, setup
from setuptools.command.sdist import sdist
@ -21,15 +21,17 @@ class eo_sdist(sdist):
def get_version():
'''Use the VERSION, if absent generates a version with git describe, if not
tag exists, take 0.0- and add the length of the commit log.
tag exists, take 0.0- and add the length of the commit log.
'''
if os.path.exists('VERSION'):
with open('VERSION', 'r') as v:
with open('VERSION') as v:
return v.read()
if os.path.exists('.git'):
p = subprocess.Popen(
['git', 'describe', '--dirty=.dirty', '--match=v*'],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
result = p.communicate()[0]
if p.returncode == 0:
result = result.decode('ascii').strip()[1:] # strip spaces/newlines and initial v
@ -44,26 +46,22 @@ def get_version():
return '0.0'
setup(name="wcs-olap",
version=get_version(),
license="AGPLv3+",
description="Export w.c.s. data to an OLAP cube",
long_description=open('README.rst').read(),
url="http://dev.entrouvert.org/projects/publik-bi/",
author="Entr'ouvert",
author_email="authentic@listes.entrouvert.com",
maintainer="Benjamin Dauvergne",
maintainer_email="bdauvergne@entrouvert.com",
packages=find_packages(),
include_package_data=True,
install_requires=[
'requests',
'psycopg2',
'isodate',
'six',
'cached-property'
],
entry_points={
'console_scripts': ['wcs-olap=wcs_olap.cmd:main'],
},
cmdclass={'sdist': eo_sdist})
setup(
name="wcs-olap",
version=get_version(),
license="AGPLv3+",
description="Export w.c.s. data to an OLAP cube",
long_description=open('README.rst').read(),
url="http://dev.entrouvert.org/projects/publik-bi/",
author="Entr'ouvert",
author_email="authentic@listes.entrouvert.com",
maintainer="Benjamin Dauvergne",
maintainer_email="bdauvergne@entrouvert.com",
packages=find_packages(),
include_package_data=True,
install_requires=['requests', 'psycopg2', 'isodate', 'six', 'cached-property'],
entry_points={
'console_scripts': ['wcs-olap=wcs_olap.cmd:main'],
},
cmdclass={'sdist': eo_sdist},
)

View File

@ -1,25 +1,22 @@
# -*- coding: utf-8 -*-
import configparser
import os
import random
import shutil
import socket
import sys
import time
import os
import shutil
import random
import socket
from unittest import mock
from contextlib import closing, contextmanager, ExitStack
from collections import namedtuple
from contextlib import ExitStack, closing, contextmanager
from unittest import mock
import psycopg2
import pytest
import utils
Wcs = namedtuple('Wcs', ['url', 'appdir', 'pid'])
class Database(object):
class Database:
def __init__(self):
self.db_name = 'db%s' % random.getrandbits(20)
self.dsn = 'dbname=%s' % self.db_name
@ -60,21 +57,21 @@ def wcs_db():
WCS_SCRIPTS = {
'setup-auth': u"""
'setup-auth': """
from quixote import get_publisher
get_publisher().cfg['identification'] = {'methods': ['password']}
get_publisher().cfg['debug'] = {'display_exceptions': 'text'}
get_publisher().write_cfg()
""",
'setup-storage': u"""
'setup-storage': """
from quixote import get_publisher
get_publisher().cfg['postgresql'] = {'database': %(dbname)r, 'port': %(port)r, 'host': %(host)r, 'user': %(user)r, 'password': %(password)r}
get_publisher().write_cfg()
get_publisher().initialize_sql()
""",
'create-user': u"""
'create-user': """
from quixote import get_publisher
from qommon.ident.password_accounts import PasswordAccount
@ -87,7 +84,7 @@ account.set_password('user')
account.user_id = user.id
account.store()
""",
'create-data': u"""
'create-data': """
import datetime
import random
from quixote import get_publisher
@ -197,32 +194,44 @@ def wcs(tmp_path_factory, wcs_dir, wcs_db):
tenant_dir.mkdir()
utils.run_wcs_script(wcs_dir, WCS_SCRIPTS['setup-auth'], 'setup-auth')
utils.run_wcs_script(wcs_dir, WCS_SCRIPTS['setup-storage'] % {
utils.run_wcs_script(
wcs_dir,
WCS_SCRIPTS['setup-storage']
% {
'dbname': wcs_db.db_name,
'port': os.environ.get('PGPORT'),
'host': os.environ.get('PGHOST'),
'user': os.environ.get('PGUSER'),
'password': os.environ.get('PGPASSWORD'),
},
'setup-storage')
'setup-storage',
)
utils.run_wcs_script(wcs_dir, WCS_SCRIPTS['create-user'], 'create-user')
utils.run_wcs_script(wcs_dir, WCS_SCRIPTS['create-data'], 'create-data')
with (tenant_dir / 'site-options.cfg').open('w') as fd:
fd.write(u'''[api-secrets]
fd.write(
'''[api-secrets]
olap = olap
''')
'''
)
with (wcs_dir / 'wcs.cfg').open('w') as fd:
fd.write(u'''[main]
app_dir = %s\n''' % (str(wcs_dir).replace('%', '%%')))
fd.write(
'''[main]
app_dir = %s\n'''
% (str(wcs_dir).replace('%', '%%'))
)
with (wcs_dir / 'local_settings.py').open('w') as fd:
fd.write(u'''
fd.write(
'''
WCS_LEGACY_CONFIG_FILE = '%s/wcs.cfg'
THEMES_DIRECTORY = '/'
ALLOWED_HOSTS = ['%s']
''' % (wcs_dir, utils.HOSTNAME))
'''
% (wcs_dir, utils.HOSTNAME)
)
# launch a Django worker for running w.c.s.
WCS_PID = os.fork()
@ -230,7 +239,9 @@ ALLOWED_HOSTS = ['%s']
os.chdir(os.path.dirname(utils.WCS_MANAGE))
os.environ['DJANGO_SETTINGS_MODULE'] = 'wcs.settings'
os.environ['WCS_SETTINGS_FILE'] = str(wcs_dir / 'local_settings.py')
os.execvp('python', [sys.executable, 'manage.py', 'runserver', '--noreload', '%s:%s' % (ADDRESS, PORT)])
os.execvp(
'python', [sys.executable, 'manage.py', 'runserver', '--noreload', '%s:%s' % (ADDRESS, PORT)]
)
sys.exit(0)
# verify w.c.s. is launched
@ -272,7 +283,8 @@ def olap_cmd(wcs, tmpdir, postgres_db):
model_dir = tmpdir / 'model_dir'
model_dir.mkdir()
with config_ini.open('w') as fd:
fd.write(u'''
fd.write(
'''
[wcs-olap]
cubes_model_dirs = {model_dir}
pg_dsn = {dsn}
@ -282,10 +294,14 @@ orig = olap
key = olap
schema = olap
cubes_slug = olap-slug
'''.format(wcs=wcs, model_dir=str(model_dir).replace('%', '%%'), dsn=postgres_db.dsn))
'''.format(
wcs=wcs, model_dir=str(model_dir).replace('%', '%%'), dsn=postgres_db.dsn
)
)
import sys
from wcs_olap import cmd
import sys
def f(no_log_errors=True):
old_argv = sys.argv
@ -327,8 +343,11 @@ def mock_cursor_execute():
mocked_cur = mock.Mock(wraps=cur)
mocked_cur.execute = mock.Mock(wraps=cur.execute, **execute_mock_kwargs)
return mocked_cur
mocked_conn.cursor = cursor
return mocked_conn
stack.enter_context(mock.patch.object(psycopg2, 'connect', connect))
yield None
yield do

View File

@ -28,7 +28,8 @@ def test_post_sync_commands(mock_cursor_execute, wcs, postgres_db, olap_cmd):
config.set(
'wcs-olap',
'post-sync-commands',
"NOTIFY coucou, '{schema}';\nSELECT * FROM information_schema.tables")
"NOTIFY coucou, '{schema}';\nSELECT * FROM information_schema.tables",
)
queries = []

View File

@ -17,15 +17,17 @@ def capture_event_mock():
return capture_event_mock(*args, **kwargs)
with contextlib.ExitStack() as stack:
def new_init(*args, **kwargs):
old_init(*args, **kwargs)
if sentry_sdk.Hub.current.client:
stack.enter_context(
mock.patch.object(
sentry_sdk.Hub.current.client.transport, 'capture_event', my_capture_event))
sentry_sdk.Hub.current.client.transport, 'capture_event', my_capture_event
)
)
stack.enter_context(
mock.patch.object(sentry_sdk, 'init', new_init))
stack.enter_context(mock.patch.object(sentry_sdk, 'init', new_init))
yield capture_event_mock

View File

@ -16,17 +16,14 @@
import datetime
import json
import pytest
import pathlib
from urllib.parse import urlparse, parse_qs
import unittest.mock as mock
from urllib.parse import parse_qs, urlparse
import requests
import httmock
import pytest
import requests
import utils
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
@ -49,8 +46,9 @@ def test_wcs_fixture(wcs, postgres_db, tmpdir, olap_cmd, caplog):
with mock.patch('requests.Session', MockSession):
olap_cmd()
call_args_list = MockSession.mocks[-1].call_args_list
url_with_limits = [call_args[0][0] for call_args in MockSession.mocks[-1].call_args_list
if 'limit' in call_args[0][0]]
url_with_limits = [
call_args[0][0] for call_args in MockSession.mocks[-1].call_args_list if 'limit' in call_args[0][0]
]
assert url_with_limits, call_args_list
for url_with_limit in url_with_limits:
parsed_qs = parse_qs(urlparse(url_with_limit).query)
@ -60,10 +58,12 @@ def test_wcs_fixture(wcs, postgres_db, tmpdir, olap_cmd, caplog):
assert (
'Le champ « 7th field bad duplicate » a un nom de variable dupliqué '
'« duplicate » mais pas le même type que « 6th field bad duplicate », '
'tous les champs avec ce nom seront ignorés (string != bool).') in caplog.text
'tous les champs avec ce nom seront ignorés (string != bool).'
) in caplog.text
assert (
'Le champ « 11th field third bad duplicate » est un doublon d\'un '
'champ de type différent, il a été ignoré.') in caplog.text
'champ de type différent, il a été ignoré.'
) in caplog.text
expected_schema = [
('agent', 'id'),
@ -131,15 +131,17 @@ def test_wcs_fixture(wcs, postgres_db, tmpdir, olap_cmd, caplog):
('status', 'id'),
('status', 'label'),
('status_demande', 'id'),
('status_demande', 'label')
('status_demande', 'label'),
]
# verify SQL schema
with postgres_db.conn() as conn:
with conn.cursor() as c:
c.execute('SELECT table_name, column_name '
'FROM information_schema.columns '
'WHERE table_schema = \'olap\' ORDER BY table_name, ordinal_position')
c.execute(
'SELECT table_name, column_name '
'FROM information_schema.columns '
'WHERE table_schema = \'olap\' ORDER BY table_name, ordinal_position'
)
assert list(c.fetchall()) == expected_schema
@ -165,15 +167,18 @@ def test_wcs_fixture(wcs, postgres_db, tmpdir, olap_cmd, caplog):
c.execute('SELECT MIN(date), MAX(date) FROM public.dates')
last_date = (datetime.date.today() + datetime.timedelta(days=150)).replace(month=12, day=31)
assert c.fetchone()[0:2] == (datetime.date(2011, 1, 1), last_date)
c.execute('''SELECT COUNT(*) FROM
c.execute(
'''SELECT COUNT(*) FROM
GENERATE_SERIES(%s::date, %s::date, '1 day'::interval) AS series(date)
FULL OUTER JOIN public.dates AS dates ON series.date = dates.date WHERE dates.date IS NULL OR series.date IS NULL''',
vars=[datetime.date(2011, 1, 1), last_date])
vars=[datetime.date(2011, 1, 1), last_date],
)
assert c.fetchone()[0] == 0
# verify JSON schema
with (olap_cmd.model_dir / 'olap.model').open() as fd, \
(pathlib.Path(__file__).parent / 'olap.model').open() as fd2:
with (olap_cmd.model_dir / 'olap.model').open() as fd, (
pathlib.Path(__file__).parent / 'olap.model'
).open() as fd2:
json_schema = json.load(fd)
expected_json_schema = json.load(fd2)
expected_json_schema['pg_dsn'] = postgres_db.dsn
@ -183,10 +188,12 @@ FULL OUTER JOIN public.dates AS dates ON series.date = dates.date WHERE dates.da
with postgres_db.conn() as conn:
with conn.cursor() as c:
c.execute('SET search_path = olap')
c.execute('SELECT field_good_duplicate, count(id) '
'FROM formdata_demande '
'GROUP BY field_good_duplicate '
'ORDER BY field_good_duplicate')
c.execute(
'SELECT field_good_duplicate, count(id) '
'FROM formdata_demande '
'GROUP BY field_good_duplicate '
'ORDER BY field_good_duplicate'
)
assert dict(c.fetchall()) == {
'a': 37,
'b': 13,
@ -238,7 +245,7 @@ def test_dimension_stability(wcs, wcs_dir, postgres_db, tmpdir, olap_cmd, caplog
assert len(open_refs) == 3
# Change an item of the field
script = u"""
script = """
import datetime
import random
from quixote import get_publisher
@ -292,8 +299,10 @@ formdata.store()
assert new_open_refs[-1][1] == 'open_new_value'
open_new_id = new_open_refs[-1][0]
c.execute('''SELECT field_item, "field_itemOpen"
FROM formdata_demande ORDER BY id''')
c.execute(
'''SELECT field_item, "field_itemOpen"
FROM formdata_demande ORDER BY id'''
)
formdata = c.fetchone()
assert formdata[0] == bazouka_id
assert formdata[1] == open_new_id

View File

@ -2,7 +2,6 @@ import os
import subprocess
import sys
HOSTNAME = '127.0.0.1'
WCS_MANAGE = os.environ.get('WCS_MANAGE')
@ -14,5 +13,15 @@ def run_wcs_script(wcs_dir, script, script_name):
fd.write(script)
subprocess.check_call(
[sys.executable, WCS_MANAGE, 'runscript', '--app-dir', str(wcs_dir), '--vhost', HOSTNAME, str(script_path)],
env={'DJANGO_SETTINGS_MODULE': 'wcs.settings'})
[
sys.executable,
WCS_MANAGE,
'runscript',
'--app-dir',
str(wcs_dir),
'--vhost',
HOSTNAME,
str(script_path),
],
env={'DJANGO_SETTINGS_MODULE': 'wcs.settings'},
)

View File

@ -14,7 +14,7 @@ except ImportError:
else:
from sentry_sdk.integrations.logging import LoggingIntegration
from . import wcs_api, feeder
from . import feeder, wcs_api
def main():
@ -63,15 +63,14 @@ def configure_sentry(config):
# get DEBUG level logs as breadcrumbs
logger.setLevel(logging.DEBUG)
sentry_logging = LoggingIntegration(
level=logging.DEBUG,
event_level=logging.ERROR)
sentry_logging = LoggingIntegration(level=logging.DEBUG, event_level=logging.ERROR)
sentry_sdk.init(
dsn=config.get('sentry', 'dsn'),
environment=config.get('sentry', 'environment', fallback=None),
attach_stacktrace=True,
integrations=[sentry_logging])
integrations=[sentry_logging],
)
scopes.append(sentry_sdk.push_scope)
@ -82,17 +81,17 @@ def main2():
except locale.Error:
# current locale does not exist
pass
parser = argparse.ArgumentParser(description='Export W.C.S. data as a star schema in a '
'postgresql DB', add_help=False)
parser = argparse.ArgumentParser(
description='Export W.C.S. data as a star schema in a ' 'postgresql DB', add_help=False
)
parser.add_argument('config_path', nargs='?', default=None)
group = parser.add_mutually_exclusive_group()
parser.add_argument('--no-feed', dest='feed', help='only produce the model',
action='store_false', default=True)
parser.add_argument('--no-log-errors', dest='no_log_errors',
action='store_true', default=False)
parser.add_argument(
'--no-feed', dest='feed', help='only produce the model', action='store_false', default=True
)
parser.add_argument('--no-log-errors', dest='no_log_errors', action='store_true', default=False)
parser.add_argument('--fake', action='store_true', default=False)
group.add_argument("-a", "--all", help="synchronize all wcs", action='store_true',
default=False)
group.add_argument("-a", "--all", help="synchronize all wcs", action='store_true', default=False)
group.add_argument('--url', help='url of the w.c.s. instance', required=False, default=None)
args, rest = parser.parse_known_args()
feed = args.feed
@ -100,8 +99,7 @@ def main2():
config = get_config(path=args.config_path)
configure_sentry(config)
# list all known urls
urls = [url for url in config.sections() if url.startswith('http://')
or url.startswith('https://')]
urls = [url for url in config.sections() if url.startswith('http://') or url.startswith('https://')]
defaults = {}
if not args.all:
try:
@ -129,7 +127,7 @@ def main2():
if config.has_section('wcs-olap'):
defaults.update(config.items('wcs-olap'))
if config.has_section(url):
defaults.update((config.items(url)))
defaults.update(config.items(url))
try:
key = defaults['key']
orig = defaults['orig']
@ -142,14 +140,25 @@ def main2():
logger.error('configuration incomplete for %s: %s', url, e)
else:
try:
api = wcs_api.WcsApi(url=url, orig=orig, key=key,
batch_size=batch_size,
verify=(defaults.get('verify', 'True') == 'True'))
logger.info('starting synchronizing w.c.s. at %r with PostgreSQL at %s', url,
pg_dsn)
api = wcs_api.WcsApi(
url=url,
orig=orig,
key=key,
batch_size=batch_size,
verify=(defaults.get('verify', 'True') == 'True'),
)
logger.info('starting synchronizing w.c.s. at %r with PostgreSQL at %s', url, pg_dsn)
olap_feeder = feeder.WcsOlapFeeder(
api=api, schema=schema, pg_dsn=pg_dsn, logger=logger,
config=defaults, do_feed=feed, fake=fake, slugs=slugs, scope=scope)
api=api,
schema=schema,
pg_dsn=pg_dsn,
logger=logger,
config=defaults,
do_feed=feed,
fake=fake,
slugs=slugs,
scope=scope,
)
olap_feeder.feed()
logger.info('finished')
feed_result = False
@ -163,5 +172,6 @@ def main2():
if failure:
sys.exit(1)
if __name__ == '__main__':
main()

View File

@ -1,24 +1,22 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from collections import OrderedDict
import contextlib
import datetime
import copy
import itertools
import os
import json
import datetime
import hashlib
import itertools
import json
import os
import reprlib
from .utils import Whatever
import time
from collections import OrderedDict
import psycopg2
import psycopg2.errorcodes
import time
from cached_property import cached_property
from wcs_olap.wcs_api import WcsApiError
from .utils import Whatever
psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
psycopg2.extensions.register_type(psycopg2.extensions.UNICODEARRAY)
@ -29,9 +27,10 @@ def truncate_pg_identifier(identifier, hash_length=6, force_hash=False):
else:
# insert hash in the middle, to keep some readability
return (
identifier[:(63 - hash_length) // 2]
identifier[: (63 - hash_length) // 2]
+ hashlib.md5(identifier.encode('utf-8')).hexdigest()[:hash_length]
+ identifier[-(63 - hash_length) // 2:])
+ identifier[-(63 - hash_length) // 2 :]
)
@contextlib.contextmanager
@ -51,7 +50,7 @@ def slugify(s):
return s.replace('-', '_').replace(' ', '_')
class Context(object):
class Context:
def __init__(self):
self.stack = []
@ -68,7 +67,7 @@ class Context(object):
return r
class WcsOlapFeeder(object):
class WcsOlapFeeder:
channels = [
[1, 'web', 'web'],
@ -80,18 +79,20 @@ class WcsOlapFeeder(object):
[7, 'fax', 'fax'],
[8, 'social-network', 'réseau social'],
]
channel_to_id = dict((c[1], c[0]) for c in channels)
id_to_channel = dict((c[0], c[1]) for c in channels)
channel_to_id = {c[1]: c[0] for c in channels}
id_to_channel = {c[0]: c[1] for c in channels}
status = [
[1, 'Nouveau'],
[2, 'En cours'],
[3, 'Terminé'],
]
status_to_id = dict((c[1], c[0]) for c in channels)
id_to_status = dict((c[0], c[1]) for c in channels)
status_to_id = {c[1]: c[0] for c in channels}
id_to_status = {c[0]: c[1] for c in channels}
def __init__(self, api, pg_dsn, schema, logger=None, config=None, do_feed=True, fake=False, slugs=None, scope=None):
def __init__(
self, api, pg_dsn, schema, logger=None, config=None, do_feed=True, fake=False, slugs=None, scope=None
):
self.api = api
self.slugs = slugs
self.fake = fake
@ -103,23 +104,25 @@ class WcsOlapFeeder(object):
self.schema_temp = truncate_pg_identifier(schema + '_temp')
self.do_feed = do_feed
self.ctx = Context()
self.ctx.push({
'schema': self.schema,
'schema_temp': self.schema_temp,
'role_table': 'role',
'channel_table': 'channel',
'category_table': 'category',
'form_table': 'formdef',
'generic_formdata_table': 'formdata',
'generic_status_table': 'status',
'generic_evolution_table': 'evolution',
'year_table': 'year',
'month_table': 'month',
'day_table': 'day',
'dow_table': 'dow',
'hour_table': 'hour',
'agent_table': 'agent',
})
self.ctx.push(
{
'schema': self.schema,
'schema_temp': self.schema_temp,
'role_table': 'role',
'channel_table': 'channel',
'category_table': 'category',
'form_table': 'formdef',
'generic_formdata_table': 'formdata',
'generic_status_table': 'status',
'generic_evolution_table': 'evolution',
'year_table': 'year',
'month_table': 'month',
'day_table': 'day',
'dow_table': 'dow',
'hour_table': 'hour',
'agent_table': 'agent',
}
)
self.config = config or {}
self.model = {
'label': self.config.get('cubes_label', schema),
@ -276,19 +279,19 @@ class WcsOlapFeeder(object):
'label': 'pourcentage des demandes',
'type': 'percent',
"expression": 'case (select count({fact_table}.id) from {table_expression} '
'where {where_conditions}) when 0 then null else '
'count({fact_table}.id) * 100. / (select '
'count({fact_table}.id) from {table_expression} where '
'{where_conditions}) end',
'where {where_conditions}) when 0 then null else '
'count({fact_table}.id) * 100. / (select '
'count({fact_table}.id) from {table_expression} where '
'{where_conditions}) end',
},
{
'name': 'geolocation',
'label': 'localisation géographique',
'type': 'point',
'expression': 'array_agg({fact_table}.geolocation_base) '
'FILTER (WHERE {fact_table}.geolocation_base IS NOT NULL)',
}
]
'FILTER (WHERE {fact_table}.geolocation_base IS NOT NULL)',
},
],
}
self.model['cubes'].append(cube)
self.base_cube = self.model['cubes'][0]
@ -309,7 +312,11 @@ class WcsOlapFeeder(object):
@cached_property
def formdefs(self):
return [formdef for formdef in self.api.formdefs if (not self.slugs or formdef.slug in self.slugs) and not formdef.is_empty]
return [
formdef
for formdef in self.api.formdefs
if (not self.slugs or formdef.slug in self.slugs) and not formdef.is_empty
]
@cached_property
def roles(self):
@ -343,7 +350,9 @@ class WcsOlapFeeder(object):
try:
self.cur.execute(sql, vars=vars)
except Exception as e:
self.logger.warning('Failed to execute %r with vars %s, raised %s', sql, reprlib.repr(vars or []), e)
self.logger.warning(
'Failed to execute %r with vars %s, raised %s', sql, reprlib.repr(vars or []), e
)
raise
@contextlib.contextmanager
@ -381,23 +390,33 @@ class WcsOlapFeeder(object):
Drop tables one by one in order to avoid reaching max_locks_per_transaction
"""
# drop foreign key constraints first
self.ex("SELECT table_name, constraint_name FROM "
"information_schema.key_column_usage "
"WHERE table_schema = %s AND constraint_name LIKE '%%_fkey'", vars=[schema])
self.ex(
"SELECT table_name, constraint_name FROM "
"information_schema.key_column_usage "
"WHERE table_schema = %s AND constraint_name LIKE '%%_fkey'",
vars=[schema],
)
for table_name, constraint_name in self.cur.fetchall():
# drop of PK constraints can have effects on FK constraint on other tables.
with ignore_undefined_object_or_table():
self.ex('ALTER TABLE %s.%s DROP CONSTRAINT IF EXISTS %s CASCADE'
% (quote(schema), quote(table_name), quote(constraint_name)))
self.ex(
'ALTER TABLE %s.%s DROP CONSTRAINT IF EXISTS %s CASCADE'
% (quote(schema), quote(table_name), quote(constraint_name))
)
# remove others
self.ex("SELECT table_name, constraint_name FROM "
"information_schema.key_column_usage "
"WHERE table_schema = %s", vars=[schema])
self.ex(
"SELECT table_name, constraint_name FROM "
"information_schema.key_column_usage "
"WHERE table_schema = %s",
vars=[schema],
)
for table_name, constraint_name in self.cur.fetchall():
# drop of PK constraints can have effects on FK constraint on other tables.
with ignore_undefined_object_or_table():
self.ex('ALTER TABLE %s.%s DROP CONSTRAINT IF EXISTS %s CASCADE'
% (quote(schema), quote(table_name), quote(constraint_name)))
self.ex(
'ALTER TABLE %s.%s DROP CONSTRAINT IF EXISTS %s CASCADE'
% (quote(schema), quote(table_name), quote(constraint_name))
)
# then drop indexes
self.ex("SELECT tablename, indexname FROM pg_indexes WHERE schemaname = %s", vars=[schema])
for table_name, index_name in self.cur.fetchall():
@ -405,7 +424,9 @@ class WcsOlapFeeder(object):
self.ex('DROP INDEX %s.%s CASCADE' % (quote(schema), quote(index_name)))
# finally drop tables, cascade will have no effect
self.ex("SELECT tablename FROM pg_tables WHERE schemaname = %s ORDER BY tablename DESC", vars=[schema])
self.ex(
"SELECT tablename FROM pg_tables WHERE schemaname = %s ORDER BY tablename DESC", vars=[schema]
)
for table in self.cur.fetchall():
tablename = '%s.%s' % (quote(schema), quote(table[0]))
self.ex('DROP TABLE IF EXISTS %s;' % tablename)
@ -418,7 +439,8 @@ class WcsOlapFeeder(object):
first_date = max_date or datetime.date(2010, 1, 1)
last_date = (datetime.date.today() + datetime.timedelta(days=150)).replace(month=12, day=31)
self.ex('''INSERT INTO public.dates SELECT
self.ex(
'''INSERT INTO public.dates SELECT
the_date.the_date::date AS date,
to_char(the_date.the_date, 'TMday') AS day,
to_char(the_date.the_date, 'TMmonth') AS month
@ -427,7 +449,8 @@ class WcsOlapFeeder(object):
LEFT JOIN public.dates AS dates
ON the_date.the_date = dates.date
WHERE dates.date IS NULL''',
vars=[first_date, last_date])
vars=[first_date, last_date],
)
def create_table(self, name, columns, inherits=None, comment=None, unlogged=True):
if unlogged:
@ -448,18 +471,21 @@ class WcsOlapFeeder(object):
return self.cur.fetchone()[0]
def update_table_sequence_number(self, name):
self.ex("""SELECT setval(pg_get_serial_sequence('{name}', 'id'),
(SELECT GREATEST(1, MAX(id)) FROM {name}))""", ctx={'name': quote(name)})
self.ex(
"""SELECT setval(pg_get_serial_sequence('{name}', 'id'),
(SELECT GREATEST(1, MAX(id)) FROM {name}))""",
ctx={'name': quote(name)},
)
def create_labeled_table_serial(self, name, comment):
self.create_table(
name, [['id', 'serial primary key'], ['label', 'varchar']], comment=comment, unlogged=False)
name, [['id', 'serial primary key'], ['label', 'varchar']], comment=comment, unlogged=False
)
if self.prev_table_exists(name):
# Insert data from previous table
self.ex(
'INSERT INTO {schema_temp}.{name} SELECT * FROM {schema}.{name}',
ctx={'name': quote(name)}
'INSERT INTO {schema_temp}.{name} SELECT * FROM {schema}.{name}', ctx={'name': quote(name)}
)
self.update_table_sequence_number(name)
@ -476,12 +502,15 @@ class WcsOlapFeeder(object):
self.ex(
'SELECT ref, {column} FROM {name} WHERE ref = %s',
ctx={'name': quote(name), 'column': quote(update_column)},
vars=(ref,))
vars=(ref,),
)
if self.cur.fetchall():
for item in self.cur.fetchall():
self.ex('UPDATE {name} SET {column}=%s WHERE ref=%s',
ctx={'name': quote(name), 'column': quote(update_column)},
vars=[item[1], ref])
self.ex(
'UPDATE {name} SET {column}=%s WHERE ref=%s',
ctx={'name': quote(name), 'column': quote(update_column)},
vars=[item[1], ref],
)
else:
to_insert.append(item)
if to_insert:
@ -489,35 +518,32 @@ class WcsOlapFeeder(object):
tmpl = ', '.join(['(DEFAULT, %s)' % columns_values] * len(data))
query = 'INSERT INTO {name} VALUES %s' % tmpl
self.ex(query, ctx={'name': quote(name)}, # 'column': quote(update_column)},
vars=list(itertools.chain(*to_insert)))
self.ex(
query,
ctx={'name': quote(name)}, # 'column': quote(update_column)},
vars=list(itertools.chain(*to_insert)),
)
result = {}
self.ex('SELECT id, {column} FROM {name}', ctx={'name': quote(name),
'column': result_column})
for _id, column in self.cur.fetchall():
self.ex('SELECT id, {column} FROM {name}', ctx={'name': quote(name), 'column': result_column})
for _id, column in self.cur.fetchall():
result[column] = _id
return result
def create_labeled_table(self, name, labels, comment=None):
self.create_table(
name,
[
['id', 'integer primary key'],
['label', 'varchar']
], comment=comment, unlogged=False)
name, [['id', 'integer primary key'], ['label', 'varchar']], comment=comment, unlogged=False
)
if self.prev_table_exists(name):
# Insert data from previous table
self.ex(
'INSERT INTO {schema_temp}.{name} select * FROM {schema}.{name}',
ctx={'name': quote(name)}
'INSERT INTO {schema_temp}.{name} select * FROM {schema}.{name}', ctx={'name': quote(name)}
)
# Find what is missing
to_insert = []
for _id, _label in labels:
self.ex(
'SELECT * FROM {name} WHERE label = %s', ctx={'name': quote(name)}, vars=(_label,))
self.ex('SELECT * FROM {name} WHERE label = %s', ctx={'name': quote(name)}, vars=(_label,))
if self.cur.fetchone() is None:
to_insert.append(_label)
@ -551,19 +577,19 @@ class WcsOlapFeeder(object):
tmp_cat_map = self.do_referenced_data(table_name, categories_data, 'label')
self.update_table_sequence_number(table_name)
# remap categories ids to ids in the table
return dict((c.title, tmp_cat_map[c.title]) for c in self.categories)
return {c.title: tmp_cat_map[c.title] for c in self.categories}
def do_formdef_table(self):
categories_mapping = self.do_category_table()
formdef_fields = [['category_id', 'integer REFERENCES {category_table} (id)'],
['label', 'varchar']
]
formdef_fields = [['category_id', 'integer REFERENCES {category_table} (id)'], ['label', 'varchar']]
table_name = self.default_ctx['form_table']
self.create_referenced_table(table_name, formdef_fields, 'types de formulaire')
formdefs = [(formdef.slug, categories_mapping.get(formdef.schema.category),
formdef.schema.name) for formdef in self.formdefs]
formdefs = [
(formdef.slug, categories_mapping.get(formdef.schema.category), formdef.schema.name)
for formdef in self.formdefs
]
self.formdefs_mapping = self.do_referenced_data(table_name, formdefs, 'ref')
self.formdefs_unique_slug = {}
seen = set()
@ -580,23 +606,19 @@ class WcsOlapFeeder(object):
def do_base_table(self):
# channels
self.create_labeled_table('channel', [[c[0], c[2]] for c in self.channels],
comment='canal')
self.create_labeled_table('channel', [[c[0], c[2]] for c in self.channels], comment='canal')
# roles
roles = dict((i, role.name) for i, role in enumerate(self.roles))
roles = {i: role.name for i, role in enumerate(self.roles)}
tmp_role_map = self.create_labeled_table('role', roles.items(), comment='role')
self.role_mapping = dict(
(role.id, tmp_role_map[role.name]) for role in self.roles)
self.role_mapping = {role.id: tmp_role_map[role.name] for role in self.roles}
# forms
self.do_formdef_table()
self.create_labeled_table('hour', zip(range(0, 24), map(str, range(0, 24))),
comment='heures')
self.create_labeled_table('hour', zip(range(0, 24), map(str, range(0, 24))), comment='heures')
self.create_labeled_table('status', self.status,
comment='statuts simplifiés')
self.create_labeled_table('status', self.status, comment='statuts simplifiés')
# agents
self.create_labeled_table_serial('agent', comment='agents')
@ -630,14 +652,20 @@ class WcsOlapFeeder(object):
self.ex('COMMENT ON COLUMN {generic_formdata_table}.%s IS %%s' % at, vars=(comment,))
self.ex('COMMENT ON TABLE {generic_formdata_table} IS %s', vars=('tous les formulaires',))
# evolutions
self.create_table('{generic_evolution_table}', [
['id', 'serial primary key'],
['generic_status_id', 'smallint REFERENCES {generic_status_table} (id)'],
['formdata_id', 'integer'], # "REFERENCES {generic_formdata_table} (id)" is impossible because FK constraints do not work on inherited tables
['time', 'timestamp'],
['date', 'date'],
['hour_id', 'smallint REFERENCES {hour_table} (id)'],
])
self.create_table(
'{generic_evolution_table}',
[
['id', 'serial primary key'],
['generic_status_id', 'smallint REFERENCES {generic_status_table} (id)'],
[
'formdata_id',
'integer',
], # "REFERENCES {generic_formdata_table} (id)" is impossible because FK constraints do not work on inherited tables
['time', 'timestamp'],
['date', 'date'],
['hour_id', 'smallint REFERENCES {hour_table} (id)'],
],
)
self.ex('COMMENT ON TABLE {generic_evolution_table} IS %s', vars=('evolution générique',))
def feed(self):
@ -678,37 +706,38 @@ class WcsOlapFeeder(object):
def switch_schemas(self):
self.old_schema = truncate_pg_identifier(
self.schema
+ '_old_'
+ datetime.datetime.now().strftime('%Y%m%d%H%M'))
self.schema + '_old_' + datetime.datetime.now().strftime('%Y%m%d%H%M')
)
self.ctx.push({'old_schema': self.old_schema})
try:
def switch():
with self.atomic():
self.logger.info('renaming schema %s to %s and schema %s to %s',
self.schema, self.old_schema,
self.schema_temp, self.schema)
self.logger.info(
'renaming schema %s to %s and schema %s to %s',
self.schema,
self.old_schema,
self.schema_temp,
self.schema,
)
if self.schema_exists(self.schema):
self.ex('ALTER SCHEMA {schema} RENAME TO {old_schema}')
self.ex('ALTER SCHEMA {schema_temp} RENAME TO {schema}')
self.retry_on_db_error(
switch,
times=33,
sleep_time=1)
self.retry_on_db_error(switch, times=33, sleep_time=1)
except Exception:
self.logger.warning('failed to switch schemas 3-times in 33 seconds')
raise
try:
def drop_old_schema():
self.logger.info('dropping schema %s', self.old_schema)
self.drop_tables_sequencially(self.old_schema)
self.ex('DROP SCHEMA IF EXISTS {old_schema} CASCADE')
self.retry_on_db_error(
drop_old_schema,
times=33,
sleep_time=1)
self.retry_on_db_error(drop_old_schema, times=33, sleep_time=1)
except Exception:
self.logger.exception('could not drop schema %s', self.old_schema)
@ -730,10 +759,14 @@ class WcsOlapFeeder(object):
def create_formdata_json_index(self, table_name, varname):
if varname in self.formdata_json_index:
return
index_name = self.hash_table_name('%s_%s_json_idx' % (table_name, varname), hash_length=8,
force_hash=True)
self.ex('CREATE INDEX {index_name} ON {generic_formdata_table} (("json_data"->>%s))',
ctx={'index_name': quote(index_name)}, vars=[varname])
index_name = self.hash_table_name(
'%s_%s_json_idx' % (table_name, varname), hash_length=8, force_hash=True
)
self.ex(
'CREATE INDEX {index_name} ON {generic_formdata_table} (("json_data"->>%s))',
ctx={'index_name': quote(index_name)},
vars=[varname],
)
# prevent double creation
self.formdata_json_index.append(varname)
@ -742,7 +775,7 @@ def has_digits_validation(field):
return field.validation and field.validation.get('type') == 'digits'
class WcsFormdefFeeder(object):
class WcsFormdefFeeder:
def __init__(self, olap_feeder, formdef, do_feed=True):
self.olap_feeder = olap_feeder
self.formdef = formdef
@ -771,26 +804,31 @@ class WcsFormdefFeeder(object):
def do_statuses(self):
statuses = self.formdef.schema.workflow.statuses
tmp_status_map = self.olap_feeder.create_labeled_table(
self.status_table_name, enumerate([s.name for s in statuses]),
comment='statuts du formulaire « %s »' % self.formdef.schema.name)
self.status_mapping = dict((s.id, tmp_status_map[s.name]) for s in statuses)
self.status_table_name,
enumerate([s.name for s in statuses]),
comment='statuts du formulaire « %s »' % self.formdef.schema.name,
)
self.status_mapping = {s.id: tmp_status_map[s.name] for s in statuses}
def do_data_table(self):
columns = OrderedDict()
columns['status_id'] = {'sql_col_name': 'status_id', 'sql_col_def': 'smallint REFERENCES {status_table} (id)'}
columns['status_id'] = {
'sql_col_name': 'status_id',
'sql_col_def': 'smallint REFERENCES {status_table} (id)',
}
# add item fields
for field in self.good_fields.values():
if field.type == 'item':
comment = ('valeurs du champ « %s » du formulaire %s'
% (field.label, self.formdef.schema.name))
comment = 'valeurs du champ « %s » du formulaire %s' % (field.label, self.formdef.schema.name)
table_name = self.hash_table_name('%s_field_%s' % (self.table_name, field.varname))
# create table and mapping
if field.items:
# filter non string items
items = [item for item in field.items if isinstance(item, str)]
self.items_mappings[field.varname] = self.create_labeled_table(
table_name, enumerate(items), comment=comment)
table_name, enumerate(items), comment=comment
)
else:
# open item field, from data sources...
self.create_labeled_table_serial(table_name, comment=comment)
@ -839,15 +877,19 @@ class WcsFormdefFeeder(object):
self.columns.append(columns[key]['sql_col_name'])
self.columns.remove('geolocation_base')
self.create_table(self.table_name,
[(columns[key]['sql_col_name'], columns[key]['sql_col_def']) for key in columns],
inherits='{generic_formdata_table}',
comment='formulaire %s' % self.formdef.schema.name)
self.create_table(
self.table_name,
[(columns[key]['sql_col_name'], columns[key]['sql_col_def']) for key in columns],
inherits='{generic_formdata_table}',
comment='formulaire %s' % self.formdef.schema.name,
)
for key in columns:
column = columns[key]
if column.get('sql_comment'):
self.ex('COMMENT ON COLUMN {formdata_table}.%s IS %%s' % quote(column['sql_col_name']),
vars=(column['sql_comment'],))
self.ex(
'COMMENT ON COLUMN {formdata_table}.%s IS %%s' % quote(column['sql_col_name']),
vars=(column['sql_comment'],),
)
# Creat index for JSON fields
if self.has_jsonb:
@ -864,26 +906,37 @@ class WcsFormdefFeeder(object):
self.ex('ALTER TABLE {formdata_table} ADD CONSTRAINT %s' % constraint)
self.ex('ALTER TABLE {formdata_table} ADD PRIMARY KEY (id)')
# table des evolutions
self.create_table(self.evolution_table_name, [
['id', 'serial primary key'],
['status_id', 'smallint REFERENCES {status_table} (id)'],
['formdata_id', 'integer REFERENCES {formdata_table} (id)'],
['time', 'timestamp'],
['date', 'date'],
['hour_id', 'smallint REFERENCES {hour_table} (id)'],
])
self.ex('COMMENT ON TABLE {evolution_table} IS %s',
vars=('evolution des demandes %s' % self.formdef.schema.name,))
self.create_table(
self.evolution_table_name,
[
['id', 'serial primary key'],
['status_id', 'smallint REFERENCES {status_table} (id)'],
['formdata_id', 'integer REFERENCES {formdata_table} (id)'],
['time', 'timestamp'],
['date', 'date'],
['hour_id', 'smallint REFERENCES {hour_table} (id)'],
],
)
self.ex(
'COMMENT ON TABLE {evolution_table} IS %s',
vars=('evolution des demandes %s' % self.formdef.schema.name,),
)
def insert_item_value(self, field, value):
table_name = self.hash_table_name('%s_field_%s' % (self.table_name, field.varname))
self.ex('SELECT id FROM {item_table} WHERE label = %s',
ctx={'item_table': quote(table_name)}, vars=(value,))
self.ex(
'SELECT id FROM {item_table} WHERE label = %s',
ctx={'item_table': quote(table_name)},
vars=(value,),
)
res = self.cur.fetchone()
if res:
return res[0]
self.ex('INSERT INTO {item_table} (label) VALUES (%s) RETURNING (id)', vars=(value,),
ctx={'item_table': quote(table_name)})
self.ex(
'INSERT INTO {item_table} (label) VALUES (%s) RETURNING (id)',
vars=(value,),
ctx={'item_table': quote(table_name)},
)
return self.cur.fetchone()[0]
def get_item_id(self, field, value):
@ -930,8 +983,9 @@ class WcsFormdefFeeder(object):
try:
status = data.formdef.schema.workflow.statuses_map[status_id]
except KeyError:
self.logger.warning('%s.%s unknown status status_id %s',
data.formdef.schema.name, data.id, status_id)
self.logger.warning(
'%s.%s unknown status status_id %s', data.formdef.schema.name, data.id, status_id
)
continue
channel = data.submission.channel.lower()
@ -1022,17 +1076,16 @@ class WcsFormdefFeeder(object):
try:
status = data.formdef.schema.workflow.statuses_map[evo.status]
except KeyError:
self.logger.warning('%s.%s unknown status in evolution %s',
data.formdef.schema.name, data.id, evo.status)
self.logger.warning(
'%s.%s unknown status in evolution %s', data.formdef.schema.name, data.id, evo.status
)
continue
status_id = self.status_mapping[status.id]
generic_status_id = self.generic_status(status)
evolution.append(
[0, status_id, evo.time, evo.time.date(), evo.time.hour])
evolution.append([0, status_id, evo.time, evo.time.date(), evo.time.hour])
if generic_status_id == last_status:
continue
generic_evolution.append(
[0, generic_status_id, evo.time, evo.time.date(), evo.time.hour])
generic_evolution.append([0, generic_status_id, evo.time, evo.time.date(), evo.time.hour])
last_status = generic_status_id
generic_evolution_values.append(generic_evolution)
evolution_values.append(evolution)
@ -1041,12 +1094,11 @@ class WcsFormdefFeeder(object):
return
insert_columns = ['%s' % quote(column) for column in self.columns[1:]]
insert_columns = ', '.join(insert_columns)
self.ex('INSERT INTO {formdata_table} ({columns}) VALUES {values} RETURNING id',
ctx=dict(
columns=insert_columns,
values=', '.join(['%s'] * len(values))
),
vars=values)
self.ex(
'INSERT INTO {formdata_table} ({columns}) VALUES {values} RETURNING id',
ctx=dict(columns=insert_columns, values=', '.join(['%s'] * len(values))),
vars=values,
)
# insert generic evolutions
generic_evolutions = []
@ -1056,14 +1108,24 @@ class WcsFormdefFeeder(object):
row[0] = formdata_id
generic_evolutions.append(tuple(row))
if len(generic_evolutions) == 500:
self.ex('INSERT INTO {generic_evolution_table} (%s) VALUES %s' % (
', '.join(['formdata_id', 'generic_status_id', 'time', 'date', 'hour_id']),
', '.join(['%s'] * len(generic_evolutions))), vars=generic_evolutions)
self.ex(
'INSERT INTO {generic_evolution_table} (%s) VALUES %s'
% (
', '.join(['formdata_id', 'generic_status_id', 'time', 'date', 'hour_id']),
', '.join(['%s'] * len(generic_evolutions)),
),
vars=generic_evolutions,
)
generic_evolutions = []
if generic_evolutions:
self.ex('INSERT INTO {generic_evolution_table} (%s) VALUES %s' % (
', '.join(['formdata_id', 'generic_status_id', 'time', 'date', 'hour_id']),
', '.join(['%s'] * len(generic_evolutions))), vars=generic_evolutions)
self.ex(
'INSERT INTO {generic_evolution_table} (%s) VALUES %s'
% (
', '.join(['formdata_id', 'generic_status_id', 'time', 'date', 'hour_id']),
', '.join(['%s'] * len(generic_evolutions)),
),
vars=generic_evolutions,
)
# insert evolutions
evolutions = []
@ -1072,14 +1134,24 @@ class WcsFormdefFeeder(object):
row[0] = formdata_id
evolutions.append(tuple(row))
if len(evolutions) == 500:
self.ex('INSERT INTO {evolution_table} (%s) VALUES %s' % (
', '.join(['formdata_id', 'status_id', 'time', 'date', 'hour_id']),
', '.join(['%s'] * len(evolutions))), vars=evolutions)
self.ex(
'INSERT INTO {evolution_table} (%s) VALUES %s'
% (
', '.join(['formdata_id', 'status_id', 'time', 'date', 'hour_id']),
', '.join(['%s'] * len(evolutions)),
),
vars=evolutions,
)
evolutions = []
if evolutions:
self.ex('INSERT INTO {evolution_table} (%s) VALUES %s' % (
', '.join(['formdata_id', 'status_id', 'time', 'date', 'hour_id']),
', '.join(['%s'] * len(evolutions))), vars=evolutions)
self.ex(
'INSERT INTO {evolution_table} (%s) VALUES %s'
% (
', '.join(['formdata_id', 'status_id', 'time', 'date', 'hour_id']),
', '.join(['%s'] * len(evolutions)),
),
vars=evolutions,
)
def get_first_agent_in_evolution(self, formdata):
for evo in formdata.evolution:
@ -1087,11 +1159,13 @@ class WcsFormdefFeeder(object):
return self.get_agent(evo.who)
def feed(self):
self.olap_feeder.ctx.push({
'formdata_table': quote(self.table_name),
'status_table': quote(self.status_table_name),
'evolution_table': quote(self.evolution_table_name),
})
self.olap_feeder.ctx.push(
{
'formdata_table': quote(self.table_name),
'status_table': quote(self.status_table_name),
'evolution_table': quote(self.evolution_table_name),
}
)
# create cube
cube = self.cube = copy.deepcopy(self.base_cube)
@ -1102,51 +1176,62 @@ class WcsFormdefFeeder(object):
# remove json field from formdef cubes
cube.pop('json_field', None)
cube.update({
'name': self.table_name,
'label': self.formdef.schema.name,
'fact_table': quote(self.table_name),
'key': 'id',
})
cube['dimensions'] = [dimension for dimension in cube['dimensions']
if dimension['name'] not in ('category', 'formdef')]
cube.update(
{
'name': self.table_name,
'label': self.formdef.schema.name,
'fact_table': quote(self.table_name),
'key': 'id',
}
)
cube['dimensions'] = [
dimension for dimension in cube['dimensions'] if dimension['name'] not in ('category', 'formdef')
]
# add dimension for status
cube['joins'].append({
'name': 'status',
'table': quote(self.status_table_name),
'master': 'status_id',
'detail': 'id',
'kind': 'left',
})
cube['dimensions'].append({
'name': 'status',
'label': 'statut',
'join': ['status'],
'type': 'integer',
'value': 'status.id',
'value_label': 'status.label',
})
cube['joins'].append(
{
'name': 'status',
'table': quote(self.status_table_name),
'master': 'status_id',
'detail': 'id',
'kind': 'left',
}
)
cube['dimensions'].append(
{
'name': 'status',
'label': 'statut',
'join': ['status'],
'type': 'integer',
'value': 'status.id',
'value_label': 'status.label',
}
)
# add dimension for function
for function, name in self.formdef.schema.workflow.functions.items():
at = 'function_%s' % slugify(function)
cube['joins'].append({
'name': at,
'table': 'role',
'master': quote(at),
'detail': 'id',
'kind': 'left',
})
cube['dimensions'].append({
'name': at,
'label': 'fonction %s' % name.lower(),
'join': [at],
'type': 'integer',
'value': '%s.id' % quote(at),
'value_label': '%s.label' % quote(at),
'filter': False,
})
cube['joins'].append(
{
'name': at,
'table': 'role',
'master': quote(at),
'detail': 'id',
'kind': 'left',
}
)
cube['dimensions'].append(
{
'name': at,
'label': 'fonction %s' % name.lower(),
'join': [at],
'type': 'integer',
'value': '%s.id' % quote(at),
'value_label': '%s.label' % quote(at),
'filter': False,
}
)
# add dimensions for item fields
fields = self.formdef.schema.fields
@ -1171,7 +1256,8 @@ class WcsFormdefFeeder(object):
add_warning(
'Le champ « %(label)s » a un nom de variable dupliqué « %(varname)s » '
'mais pas le même type que « %(label_good)s », tous les champs avec ce nom seront ignorés '
'(%(type)s != %(type_good)s).' % {
'(%(type)s != %(type_good)s).'
% {
'label': field.label,
'varname': field.varname,
'type': field.type,
@ -1183,8 +1269,10 @@ class WcsFormdefFeeder(object):
bad_varnames.add(field.varname)
continue
if field.varname in bad_varnames:
add_warning('Le champ « %s » est un doublon d\'un champ de type différent, il a été ignoré.'
% field.label)
add_warning(
'Le champ « %s » est un doublon d\'un champ de type différent, il a été ignoré.'
% field.label
)
continue
self.good_fields[field.varname] = field
@ -1221,8 +1309,9 @@ class WcsFormdefFeeder(object):
'type': 'bool',
'value': quote(field_name),
'value_label': '(CASE WHEN %(field)s IS NULL THEN NULL'
' WHEN %(field)s THEN \'Oui\''
' ELSE \'Non\' END)' % {
' WHEN %(field)s THEN \'Oui\''
' ELSE \'Non\' END)'
% {
'field': quote(field_name),
},
'filter': True,
@ -1230,12 +1319,14 @@ class WcsFormdefFeeder(object):
elif field.type == 'string':
if has_digits_validation(field):
# we will define a SUM measure instead
cube['measures'].append({
'name': 'sum_' + dimension_name,
'label': 'total du champ « %s »' % dimension_label,
'type': 'integer',
'expression': 'SUM({fact_table}.%s)' % quote(field_name),
})
cube['measures'].append(
{
'name': 'sum_' + dimension_name,
'label': 'total du champ « %s »' % dimension_label,
'type': 'integer',
'expression': 'SUM({fact_table}.%s)' % quote(field_name),
}
)
continue
else:
dimension = {

View File

@ -1,9 +1,9 @@
import urllib.parse as urlparse
import datetime
import base64
import hmac
import datetime
import hashlib
import hmac
import random
import urllib.parse as urlparse
'''Simple signature scheme for query strings'''
# from http://repos.entrouvert.org/portail-citoyen.git/tree/portail_citoyen/apps/data_source_plugin/signature.py
@ -24,10 +24,7 @@ def sign_query(query, key, algo='sha256', timestamp=None, nonce=None):
new_query = query
if new_query:
new_query += '&'
new_query += urlparse.urlencode((
('algo', algo),
('timestamp', timestamp),
('nonce', nonce)))
new_query += urlparse.urlencode((('algo', algo), ('timestamp', timestamp), ('nonce', nonce)))
signature = base64.b64encode(sign_string(new_query, key, algo=algo))
new_query += '&signature=' + urlparse.quote(signature)
return new_query
@ -50,8 +47,7 @@ def check_url(url, key, known_nonce=None, timedelta=30):
def check_query(query, key, known_nonce=None, timedelta=30):
parsed = urlparse.parse_qs(query)
if not ('signature' in parsed and 'algo' in parsed
and 'timestamp' in parsed and 'nonce' in parsed):
if not ('signature' in parsed and 'algo' in parsed and 'timestamp' in parsed and 'nonce' in parsed):
return False
unsigned_query, signature_content = query.split('&signature=', 1)
if '&' in signature_content:

View File

@ -1,4 +1,4 @@
class Whatever(object):
class Whatever:
def __call__(*args, **kwargs):
pass

View File

@ -14,19 +14,18 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import collections
import base64
import copy
import logging
import datetime
import collections
import contextlib
import copy
import datetime
import json
import requests
import isodate
import logging
import urllib.parse as urlparse
import isodate
import requests
from . import signature
@ -34,7 +33,7 @@ class WcsApiError(Exception):
pass
class JSONFile(object):
class JSONFile:
def __init__(self, d):
self.d = d
@ -62,7 +61,7 @@ def to_dict(o):
return o
class BaseObject(object):
class BaseObject:
def __init__(self, wcs_api, **kwargs):
self._wcs_api = wcs_api
self.__dict__.update(**kwargs)
@ -82,7 +81,7 @@ class FormDataWorkflow(BaseObject):
fields = None
def __init__(self, wcs_api, **kwargs):
super(FormDataWorkflow, self).__init__(wcs_api, **kwargs)
super().__init__(wcs_api, **kwargs)
self.status = BaseObject(wcs_api, **self.status) if self.status else None
self.real_status = BaseObject(wcs_api, **self.real_status) if self.real_status else None
self.fields = self.fields or {}
@ -101,7 +100,7 @@ class Evolution(BaseObject):
parts = None
def __init__(self, wcs_api, **kwargs):
super(Evolution, self).__init__(wcs_api, **kwargs)
super().__init__(wcs_api, **kwargs)
self.time = isodate.parse_datetime(self.time)
self.parts = [BaseObject(wcs_api, **part) for part in self.parts or []]
self.who = EvolutionUser(wcs_api, **self.who) if self.who else None
@ -117,7 +116,7 @@ class FormData(BaseObject):
def __init__(self, wcs_api, forms, **kwargs):
self.forms = forms
super(FormData, self).__init__(wcs_api, **kwargs)
super().__init__(wcs_api, **kwargs)
self.receipt_time = isodate.parse_datetime(self.receipt_time)
self.submission = BaseObject(wcs_api, **self.submission or {})
self.workflow = FormDataWorkflow(wcs_api, **self.workflow or {})
@ -157,7 +156,7 @@ class FormData(BaseObject):
@property
def endpoint_delay(self):
'''Compute delay as the time when the last not endpoint status precedes an endpoint
status.'''
status.'''
statuses_map = self.formdef.schema.workflow.statuses_map
s = 0
for evo in self.evolution[::-1]:
@ -188,13 +187,13 @@ class Workflow(BaseObject):
fields = None
def __init__(self, wcs_api, **kwargs):
super(Workflow, self).__init__(wcs_api, **kwargs)
super().__init__(wcs_api, **kwargs)
self.statuses = [BaseObject(wcs_api, **v) for v in (self.statuses or [])]
assert not hasattr(self.statuses[0], 'startpoint'), 'startpoint is exported by w.c.s. FIXME'
for status in self.statuses:
status.startpoint = False
self.statuses[0].startpoint = True
self.statuses_map = dict((s.id, s) for s in self.statuses)
self.statuses_map = {s.id: s for s in self.statuses}
self.fields = [Field(wcs_api, **field) for field in (self.fields or [])]
@ -214,13 +213,13 @@ class Schema(BaseObject):
geolocations = None
def __init__(self, wcs_api, **kwargs):
super(Schema, self).__init__(wcs_api, **kwargs)
super().__init__(wcs_api, **kwargs)
self.workflow = Workflow(wcs_api, **self.workflow)
self.fields = [Field(wcs_api, **f) for f in self.fields or []]
self.geolocations = sorted((k, v) for k, v in (self.geolocations or {}).items())
class FormDatas(object):
class FormDatas:
def __init__(self, wcs_api, formdef, full=False, anonymize=False):
self.wcs_api = wcs_api
self.formdef = formdef
@ -231,13 +230,14 @@ class FormDatas(object):
def __getitem__(self, slice_or_id):
# get batch of forms
if isinstance(slice_or_id, slice):
def helper():
if slice_or_id.stop <= slice_or_id.start or slice_or_id.step:
raise ValueError('invalid slice %s' % slice_or_id)
offset = slice_or_id.start
limit = slice_or_id.stop - slice_or_id.start
url_parts = ['api/forms/{self.formdef.slug}/list'.format(self=self)]
url_parts = [f'api/forms/{self.formdef.slug}/list']
query = {}
query['full'] = 'on' if self._full else 'off'
if offset:
@ -252,10 +252,11 @@ class FormDatas(object):
if not d.get('receipt_time'):
continue
yield FormData(wcs_api=self.wcs_api, forms=self, formdef=self.formdef, **d)
return helper()
# or get one form
else:
url_parts = ['api/forms/{formdef.slug}/{id}/'.format(formdef=self.formdef, id=slice_or_id)]
url_parts = [f'api/forms/{self.formdef.slug}/{slice_or_id}/']
if self.anonymize:
url_parts.append('?anonymise=true')
d = self.wcs_api.get_json(*url_parts)
@ -282,7 +283,7 @@ class FormDatas(object):
start = 0
while True:
empty = True
for formdef in self[start:start + self.batch_size]:
for formdef in self[start : start + self.batch_size]:
empty = False
yield formdef
if empty:
@ -290,14 +291,14 @@ class FormDatas(object):
start += self.batch_size
def __len__(self):
return len(list((o for o in self)))
return len(list(o for o in self))
class CancelSubmitError(Exception):
pass
class FormDefSubmit(object):
class FormDefSubmit:
formdef = None
data = None
user_email = None
@ -346,16 +347,13 @@ class FormDefSubmit(object):
if value is None or value == {} or value == []:
self.data.pop(varname, None)
elif hasattr(self, '_set_type_%s' % field.type):
getattr(self, '_set_type_%s' % field.type)(
varname=varname,
field=field,
value=value, **kwargs)
getattr(self, '_set_type_%s' % field.type)(varname=varname, field=field, value=value, **kwargs)
else:
self.data[varname] = value
def _set_type_item(self, varname, field, value, **kwargs):
if isinstance(value, dict):
if not set(value).issuperset(set(['id', 'text'])):
if not set(value).issuperset({'id', 'text'}):
raise ValueError('item field value must have id and text value')
# clean previous values
self.data.pop(varname, None)
@ -378,7 +376,7 @@ class FormDefSubmit(object):
has_dict = False
for choice in value:
if isinstance(value, dict):
if not set(value).issuperset(set(['id', 'text'])):
if not set(value).issuperset({'id', 'text'}):
raise ValueError('items field values must have id and text value')
has_dict = True
if has_dict:
@ -407,7 +405,7 @@ class FormDefSubmit(object):
elif isinstance(value, bytes):
content = base64.b64encode(value).decode('ascii')
elif isinstance(value, dict):
if not set(value).issuperset(set(['filename', 'content'])):
if not set(value).issuperset({'filename', 'content'}):
raise ValueError('file field needs a dict value with filename and content')
content = value['content']
filename = value['filename']
@ -432,7 +430,7 @@ class FormDefSubmit(object):
def _set_type_map(self, varname, field, value):
if not isinstance(value, dict):
raise TypeError('value must be a dict for a map field')
if set(value) != set(['lat', 'lon']):
if set(value) != {'lat', 'lon'}:
raise ValueError('map field expect keys lat and lon')
self.data[varname] = value
@ -470,22 +468,19 @@ class FormDef(BaseObject):
@property
def schema(self):
if not hasattr(self, '_schema'):
d = self._wcs_api.get_json('api/formdefs/{self.slug}/schema'.format(self=self))
d = self._wcs_api.get_json(f'api/formdefs/{self.slug}/schema')
self._schema = Schema(self._wcs_api, **d)
return self._schema
@contextlib.contextmanager
def submit(self, **kwargs):
submitter = FormDefSubmit(
wcs_api=self._wcs_api,
formdef=self,
**kwargs)
submitter = FormDefSubmit(wcs_api=self._wcs_api, formdef=self, **kwargs)
try:
yield submitter
except CancelSubmitError:
return
payload = submitter.payload()
d = self._wcs_api.post_json(payload, 'api/formdefs/{self.slug}/submit'.format(self=self))
d = self._wcs_api.post_json(payload, f'api/formdefs/{self.slug}/submit')
if d['err'] != 0:
raise WcsApiError('submited returned an error: %s' % d)
submitter.result = BaseObject(self._wcs_api, **d['data'])
@ -499,7 +494,7 @@ class Category(BaseObject):
pass
class WcsObjects(object):
class WcsObjects:
url = None
object_class = None
@ -519,7 +514,7 @@ class WcsObjects(object):
yield self.object_class(wcs_api=self.wcs_api, **d)
def __len__(self):
return len(list((o for o in self)))
return len(list(o for o in self))
class Roles(WcsObjects):
@ -538,9 +533,19 @@ class Categories(WcsObjects):
object_class = Category
class WcsApi(object):
def __init__(self, url, email=None, name_id=None, batch_size=1000,
session=None, logger=None, orig=None, key=None, verify=True):
class WcsApi:
def __init__(
self,
url,
email=None,
name_id=None,
batch_size=1000,
session=None,
logger=None,
orig=None,
key=None,
verify=True,
):
self.url = url
self.email = email
self.name_id = name_id
@ -602,7 +607,8 @@ class WcsApi(object):
final_url,
data=json.dumps(data),
headers={'content-type': 'application/json'},
verify=self.verify)
verify=self.verify,
)
response.raise_for_status()
except requests.RequestException as e:
content = getattr(getattr(e, 'response', None), 'content', None)