trivial: apply pre-commit

This commit is contained in:
Frédéric Péters 2022-07-16 08:36:18 +02:00
parent c27d346d92
commit 461254e986
12 changed files with 540 additions and 396 deletions

5
debian/control vendored
View File

@ -2,7 +2,10 @@ Source: wcs-olap
Section: python Section: python
Priority: optional Priority: optional
Maintainer: Benjamin Dauvergne <bdauvergne@entrouvert.com> Maintainer: Benjamin Dauvergne <bdauvergne@entrouvert.com>
Build-Depends: python3-setuptools, python3-all, debhelper-compat (= 12), dh-python Build-Depends: debhelper-compat (= 12),
dh-python,
python3-all,
python3-setuptools,
Standards-Version: 3.9.6 Standards-Version: 3.9.6
Homepage: http://dev.entrouvert.org/projects/wcs-olap/ Homepage: http://dev.entrouvert.org/projects/wcs-olap/

View File

@ -1,9 +1,9 @@
#! /usr/bin/env python #! /usr/bin/env python
import subprocess
import os import os
import subprocess
from setuptools import setup, find_packages from setuptools import find_packages, setup
from setuptools.command.sdist import sdist from setuptools.command.sdist import sdist
@ -21,15 +21,17 @@ class eo_sdist(sdist):
def get_version(): def get_version():
'''Use the VERSION, if absent generates a version with git describe, if not '''Use the VERSION, if absent generates a version with git describe, if not
tag exists, take 0.0- and add the length of the commit log. tag exists, take 0.0- and add the length of the commit log.
''' '''
if os.path.exists('VERSION'): if os.path.exists('VERSION'):
with open('VERSION', 'r') as v: with open('VERSION') as v:
return v.read() return v.read()
if os.path.exists('.git'): if os.path.exists('.git'):
p = subprocess.Popen( p = subprocess.Popen(
['git', 'describe', '--dirty=.dirty', '--match=v*'], ['git', 'describe', '--dirty=.dirty', '--match=v*'],
stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
result = p.communicate()[0] result = p.communicate()[0]
if p.returncode == 0: if p.returncode == 0:
result = result.decode('ascii').strip()[1:] # strip spaces/newlines and initial v result = result.decode('ascii').strip()[1:] # strip spaces/newlines and initial v
@ -44,26 +46,22 @@ def get_version():
return '0.0' return '0.0'
setup(name="wcs-olap", setup(
version=get_version(), name="wcs-olap",
license="AGPLv3+", version=get_version(),
description="Export w.c.s. data to an OLAP cube", license="AGPLv3+",
long_description=open('README.rst').read(), description="Export w.c.s. data to an OLAP cube",
url="http://dev.entrouvert.org/projects/publik-bi/", long_description=open('README.rst').read(),
author="Entr'ouvert", url="http://dev.entrouvert.org/projects/publik-bi/",
author_email="authentic@listes.entrouvert.com", author="Entr'ouvert",
maintainer="Benjamin Dauvergne", author_email="authentic@listes.entrouvert.com",
maintainer_email="bdauvergne@entrouvert.com", maintainer="Benjamin Dauvergne",
packages=find_packages(), maintainer_email="bdauvergne@entrouvert.com",
include_package_data=True, packages=find_packages(),
install_requires=[ include_package_data=True,
'requests', install_requires=['requests', 'psycopg2', 'isodate', 'six', 'cached-property'],
'psycopg2', entry_points={
'isodate', 'console_scripts': ['wcs-olap=wcs_olap.cmd:main'],
'six', },
'cached-property' cmdclass={'sdist': eo_sdist},
], )
entry_points={
'console_scripts': ['wcs-olap=wcs_olap.cmd:main'],
},
cmdclass={'sdist': eo_sdist})

View File

@ -1,25 +1,22 @@
# -*- coding: utf-8 -*-
import configparser import configparser
import os
import random
import shutil
import socket
import sys import sys
import time import time
import os
import shutil
import random
import socket
from unittest import mock
from contextlib import closing, contextmanager, ExitStack
from collections import namedtuple from collections import namedtuple
from contextlib import ExitStack, closing, contextmanager
from unittest import mock
import psycopg2 import psycopg2
import pytest import pytest
import utils import utils
Wcs = namedtuple('Wcs', ['url', 'appdir', 'pid']) Wcs = namedtuple('Wcs', ['url', 'appdir', 'pid'])
class Database(object): class Database:
def __init__(self): def __init__(self):
self.db_name = 'db%s' % random.getrandbits(20) self.db_name = 'db%s' % random.getrandbits(20)
self.dsn = 'dbname=%s' % self.db_name self.dsn = 'dbname=%s' % self.db_name
@ -60,21 +57,21 @@ def wcs_db():
WCS_SCRIPTS = { WCS_SCRIPTS = {
'setup-auth': u""" 'setup-auth': """
from quixote import get_publisher from quixote import get_publisher
get_publisher().cfg['identification'] = {'methods': ['password']} get_publisher().cfg['identification'] = {'methods': ['password']}
get_publisher().cfg['debug'] = {'display_exceptions': 'text'} get_publisher().cfg['debug'] = {'display_exceptions': 'text'}
get_publisher().write_cfg() get_publisher().write_cfg()
""", """,
'setup-storage': u""" 'setup-storage': """
from quixote import get_publisher from quixote import get_publisher
get_publisher().cfg['postgresql'] = {'database': %(dbname)r, 'port': %(port)r, 'host': %(host)r, 'user': %(user)r, 'password': %(password)r} get_publisher().cfg['postgresql'] = {'database': %(dbname)r, 'port': %(port)r, 'host': %(host)r, 'user': %(user)r, 'password': %(password)r}
get_publisher().write_cfg() get_publisher().write_cfg()
get_publisher().initialize_sql() get_publisher().initialize_sql()
""", """,
'create-user': u""" 'create-user': """
from quixote import get_publisher from quixote import get_publisher
from qommon.ident.password_accounts import PasswordAccount from qommon.ident.password_accounts import PasswordAccount
@ -87,7 +84,7 @@ account.set_password('user')
account.user_id = user.id account.user_id = user.id
account.store() account.store()
""", """,
'create-data': u""" 'create-data': """
import datetime import datetime
import random import random
from quixote import get_publisher from quixote import get_publisher
@ -197,32 +194,44 @@ def wcs(tmp_path_factory, wcs_dir, wcs_db):
tenant_dir.mkdir() tenant_dir.mkdir()
utils.run_wcs_script(wcs_dir, WCS_SCRIPTS['setup-auth'], 'setup-auth') utils.run_wcs_script(wcs_dir, WCS_SCRIPTS['setup-auth'], 'setup-auth')
utils.run_wcs_script(wcs_dir, WCS_SCRIPTS['setup-storage'] % { utils.run_wcs_script(
wcs_dir,
WCS_SCRIPTS['setup-storage']
% {
'dbname': wcs_db.db_name, 'dbname': wcs_db.db_name,
'port': os.environ.get('PGPORT'), 'port': os.environ.get('PGPORT'),
'host': os.environ.get('PGHOST'), 'host': os.environ.get('PGHOST'),
'user': os.environ.get('PGUSER'), 'user': os.environ.get('PGUSER'),
'password': os.environ.get('PGPASSWORD'), 'password': os.environ.get('PGPASSWORD'),
}, },
'setup-storage') 'setup-storage',
)
utils.run_wcs_script(wcs_dir, WCS_SCRIPTS['create-user'], 'create-user') utils.run_wcs_script(wcs_dir, WCS_SCRIPTS['create-user'], 'create-user')
utils.run_wcs_script(wcs_dir, WCS_SCRIPTS['create-data'], 'create-data') utils.run_wcs_script(wcs_dir, WCS_SCRIPTS['create-data'], 'create-data')
with (tenant_dir / 'site-options.cfg').open('w') as fd: with (tenant_dir / 'site-options.cfg').open('w') as fd:
fd.write(u'''[api-secrets] fd.write(
'''[api-secrets]
olap = olap olap = olap
''') '''
)
with (wcs_dir / 'wcs.cfg').open('w') as fd: with (wcs_dir / 'wcs.cfg').open('w') as fd:
fd.write(u'''[main] fd.write(
app_dir = %s\n''' % (str(wcs_dir).replace('%', '%%'))) '''[main]
app_dir = %s\n'''
% (str(wcs_dir).replace('%', '%%'))
)
with (wcs_dir / 'local_settings.py').open('w') as fd: with (wcs_dir / 'local_settings.py').open('w') as fd:
fd.write(u''' fd.write(
'''
WCS_LEGACY_CONFIG_FILE = '%s/wcs.cfg' WCS_LEGACY_CONFIG_FILE = '%s/wcs.cfg'
THEMES_DIRECTORY = '/' THEMES_DIRECTORY = '/'
ALLOWED_HOSTS = ['%s'] ALLOWED_HOSTS = ['%s']
''' % (wcs_dir, utils.HOSTNAME)) '''
% (wcs_dir, utils.HOSTNAME)
)
# launch a Django worker for running w.c.s. # launch a Django worker for running w.c.s.
WCS_PID = os.fork() WCS_PID = os.fork()
@ -230,7 +239,9 @@ ALLOWED_HOSTS = ['%s']
os.chdir(os.path.dirname(utils.WCS_MANAGE)) os.chdir(os.path.dirname(utils.WCS_MANAGE))
os.environ['DJANGO_SETTINGS_MODULE'] = 'wcs.settings' os.environ['DJANGO_SETTINGS_MODULE'] = 'wcs.settings'
os.environ['WCS_SETTINGS_FILE'] = str(wcs_dir / 'local_settings.py') os.environ['WCS_SETTINGS_FILE'] = str(wcs_dir / 'local_settings.py')
os.execvp('python', [sys.executable, 'manage.py', 'runserver', '--noreload', '%s:%s' % (ADDRESS, PORT)]) os.execvp(
'python', [sys.executable, 'manage.py', 'runserver', '--noreload', '%s:%s' % (ADDRESS, PORT)]
)
sys.exit(0) sys.exit(0)
# verify w.c.s. is launched # verify w.c.s. is launched
@ -272,7 +283,8 @@ def olap_cmd(wcs, tmpdir, postgres_db):
model_dir = tmpdir / 'model_dir' model_dir = tmpdir / 'model_dir'
model_dir.mkdir() model_dir.mkdir()
with config_ini.open('w') as fd: with config_ini.open('w') as fd:
fd.write(u''' fd.write(
'''
[wcs-olap] [wcs-olap]
cubes_model_dirs = {model_dir} cubes_model_dirs = {model_dir}
pg_dsn = {dsn} pg_dsn = {dsn}
@ -282,10 +294,14 @@ orig = olap
key = olap key = olap
schema = olap schema = olap
cubes_slug = olap-slug cubes_slug = olap-slug
'''.format(wcs=wcs, model_dir=str(model_dir).replace('%', '%%'), dsn=postgres_db.dsn)) '''.format(
wcs=wcs, model_dir=str(model_dir).replace('%', '%%'), dsn=postgres_db.dsn
)
)
import sys
from wcs_olap import cmd from wcs_olap import cmd
import sys
def f(no_log_errors=True): def f(no_log_errors=True):
old_argv = sys.argv old_argv = sys.argv
@ -327,8 +343,11 @@ def mock_cursor_execute():
mocked_cur = mock.Mock(wraps=cur) mocked_cur = mock.Mock(wraps=cur)
mocked_cur.execute = mock.Mock(wraps=cur.execute, **execute_mock_kwargs) mocked_cur.execute = mock.Mock(wraps=cur.execute, **execute_mock_kwargs)
return mocked_cur return mocked_cur
mocked_conn.cursor = cursor mocked_conn.cursor = cursor
return mocked_conn return mocked_conn
stack.enter_context(mock.patch.object(psycopg2, 'connect', connect)) stack.enter_context(mock.patch.object(psycopg2, 'connect', connect))
yield None yield None
yield do yield do

View File

@ -28,7 +28,8 @@ def test_post_sync_commands(mock_cursor_execute, wcs, postgres_db, olap_cmd):
config.set( config.set(
'wcs-olap', 'wcs-olap',
'post-sync-commands', 'post-sync-commands',
"NOTIFY coucou, '{schema}';\nSELECT * FROM information_schema.tables") "NOTIFY coucou, '{schema}';\nSELECT * FROM information_schema.tables",
)
queries = [] queries = []

View File

@ -17,15 +17,17 @@ def capture_event_mock():
return capture_event_mock(*args, **kwargs) return capture_event_mock(*args, **kwargs)
with contextlib.ExitStack() as stack: with contextlib.ExitStack() as stack:
def new_init(*args, **kwargs): def new_init(*args, **kwargs):
old_init(*args, **kwargs) old_init(*args, **kwargs)
if sentry_sdk.Hub.current.client: if sentry_sdk.Hub.current.client:
stack.enter_context( stack.enter_context(
mock.patch.object( mock.patch.object(
sentry_sdk.Hub.current.client.transport, 'capture_event', my_capture_event)) sentry_sdk.Hub.current.client.transport, 'capture_event', my_capture_event
)
)
stack.enter_context( stack.enter_context(mock.patch.object(sentry_sdk, 'init', new_init))
mock.patch.object(sentry_sdk, 'init', new_init))
yield capture_event_mock yield capture_event_mock

View File

@ -16,17 +16,14 @@
import datetime import datetime
import json import json
import pytest
import pathlib import pathlib
from urllib.parse import urlparse, parse_qs
import unittest.mock as mock import unittest.mock as mock
from urllib.parse import parse_qs, urlparse
import requests
import httmock import httmock
import pytest
import requests
import utils import utils
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
@ -49,8 +46,9 @@ def test_wcs_fixture(wcs, postgres_db, tmpdir, olap_cmd, caplog):
with mock.patch('requests.Session', MockSession): with mock.patch('requests.Session', MockSession):
olap_cmd() olap_cmd()
call_args_list = MockSession.mocks[-1].call_args_list call_args_list = MockSession.mocks[-1].call_args_list
url_with_limits = [call_args[0][0] for call_args in MockSession.mocks[-1].call_args_list url_with_limits = [
if 'limit' in call_args[0][0]] call_args[0][0] for call_args in MockSession.mocks[-1].call_args_list if 'limit' in call_args[0][0]
]
assert url_with_limits, call_args_list assert url_with_limits, call_args_list
for url_with_limit in url_with_limits: for url_with_limit in url_with_limits:
parsed_qs = parse_qs(urlparse(url_with_limit).query) parsed_qs = parse_qs(urlparse(url_with_limit).query)
@ -60,10 +58,12 @@ def test_wcs_fixture(wcs, postgres_db, tmpdir, olap_cmd, caplog):
assert ( assert (
'Le champ « 7th field bad duplicate » a un nom de variable dupliqué ' 'Le champ « 7th field bad duplicate » a un nom de variable dupliqué '
'« duplicate » mais pas le même type que « 6th field bad duplicate », ' '« duplicate » mais pas le même type que « 6th field bad duplicate », '
'tous les champs avec ce nom seront ignorés (string != bool).') in caplog.text 'tous les champs avec ce nom seront ignorés (string != bool).'
) in caplog.text
assert ( assert (
'Le champ « 11th field third bad duplicate » est un doublon d\'un ' 'Le champ « 11th field third bad duplicate » est un doublon d\'un '
'champ de type différent, il a été ignoré.') in caplog.text 'champ de type différent, il a été ignoré.'
) in caplog.text
expected_schema = [ expected_schema = [
('agent', 'id'), ('agent', 'id'),
@ -131,15 +131,17 @@ def test_wcs_fixture(wcs, postgres_db, tmpdir, olap_cmd, caplog):
('status', 'id'), ('status', 'id'),
('status', 'label'), ('status', 'label'),
('status_demande', 'id'), ('status_demande', 'id'),
('status_demande', 'label') ('status_demande', 'label'),
] ]
# verify SQL schema # verify SQL schema
with postgres_db.conn() as conn: with postgres_db.conn() as conn:
with conn.cursor() as c: with conn.cursor() as c:
c.execute('SELECT table_name, column_name ' c.execute(
'FROM information_schema.columns ' 'SELECT table_name, column_name '
'WHERE table_schema = \'olap\' ORDER BY table_name, ordinal_position') 'FROM information_schema.columns '
'WHERE table_schema = \'olap\' ORDER BY table_name, ordinal_position'
)
assert list(c.fetchall()) == expected_schema assert list(c.fetchall()) == expected_schema
@ -165,15 +167,18 @@ def test_wcs_fixture(wcs, postgres_db, tmpdir, olap_cmd, caplog):
c.execute('SELECT MIN(date), MAX(date) FROM public.dates') c.execute('SELECT MIN(date), MAX(date) FROM public.dates')
last_date = (datetime.date.today() + datetime.timedelta(days=150)).replace(month=12, day=31) last_date = (datetime.date.today() + datetime.timedelta(days=150)).replace(month=12, day=31)
assert c.fetchone()[0:2] == (datetime.date(2011, 1, 1), last_date) assert c.fetchone()[0:2] == (datetime.date(2011, 1, 1), last_date)
c.execute('''SELECT COUNT(*) FROM c.execute(
'''SELECT COUNT(*) FROM
GENERATE_SERIES(%s::date, %s::date, '1 day'::interval) AS series(date) GENERATE_SERIES(%s::date, %s::date, '1 day'::interval) AS series(date)
FULL OUTER JOIN public.dates AS dates ON series.date = dates.date WHERE dates.date IS NULL OR series.date IS NULL''', FULL OUTER JOIN public.dates AS dates ON series.date = dates.date WHERE dates.date IS NULL OR series.date IS NULL''',
vars=[datetime.date(2011, 1, 1), last_date]) vars=[datetime.date(2011, 1, 1), last_date],
)
assert c.fetchone()[0] == 0 assert c.fetchone()[0] == 0
# verify JSON schema # verify JSON schema
with (olap_cmd.model_dir / 'olap.model').open() as fd, \ with (olap_cmd.model_dir / 'olap.model').open() as fd, (
(pathlib.Path(__file__).parent / 'olap.model').open() as fd2: pathlib.Path(__file__).parent / 'olap.model'
).open() as fd2:
json_schema = json.load(fd) json_schema = json.load(fd)
expected_json_schema = json.load(fd2) expected_json_schema = json.load(fd2)
expected_json_schema['pg_dsn'] = postgres_db.dsn expected_json_schema['pg_dsn'] = postgres_db.dsn
@ -183,10 +188,12 @@ FULL OUTER JOIN public.dates AS dates ON series.date = dates.date WHERE dates.da
with postgres_db.conn() as conn: with postgres_db.conn() as conn:
with conn.cursor() as c: with conn.cursor() as c:
c.execute('SET search_path = olap') c.execute('SET search_path = olap')
c.execute('SELECT field_good_duplicate, count(id) ' c.execute(
'FROM formdata_demande ' 'SELECT field_good_duplicate, count(id) '
'GROUP BY field_good_duplicate ' 'FROM formdata_demande '
'ORDER BY field_good_duplicate') 'GROUP BY field_good_duplicate '
'ORDER BY field_good_duplicate'
)
assert dict(c.fetchall()) == { assert dict(c.fetchall()) == {
'a': 37, 'a': 37,
'b': 13, 'b': 13,
@ -238,7 +245,7 @@ def test_dimension_stability(wcs, wcs_dir, postgres_db, tmpdir, olap_cmd, caplog
assert len(open_refs) == 3 assert len(open_refs) == 3
# Change an item of the field # Change an item of the field
script = u""" script = """
import datetime import datetime
import random import random
from quixote import get_publisher from quixote import get_publisher
@ -292,8 +299,10 @@ formdata.store()
assert new_open_refs[-1][1] == 'open_new_value' assert new_open_refs[-1][1] == 'open_new_value'
open_new_id = new_open_refs[-1][0] open_new_id = new_open_refs[-1][0]
c.execute('''SELECT field_item, "field_itemOpen" c.execute(
FROM formdata_demande ORDER BY id''') '''SELECT field_item, "field_itemOpen"
FROM formdata_demande ORDER BY id'''
)
formdata = c.fetchone() formdata = c.fetchone()
assert formdata[0] == bazouka_id assert formdata[0] == bazouka_id
assert formdata[1] == open_new_id assert formdata[1] == open_new_id

View File

@ -2,7 +2,6 @@ import os
import subprocess import subprocess
import sys import sys
HOSTNAME = '127.0.0.1' HOSTNAME = '127.0.0.1'
WCS_MANAGE = os.environ.get('WCS_MANAGE') WCS_MANAGE = os.environ.get('WCS_MANAGE')
@ -14,5 +13,15 @@ def run_wcs_script(wcs_dir, script, script_name):
fd.write(script) fd.write(script)
subprocess.check_call( subprocess.check_call(
[sys.executable, WCS_MANAGE, 'runscript', '--app-dir', str(wcs_dir), '--vhost', HOSTNAME, str(script_path)], [
env={'DJANGO_SETTINGS_MODULE': 'wcs.settings'}) sys.executable,
WCS_MANAGE,
'runscript',
'--app-dir',
str(wcs_dir),
'--vhost',
HOSTNAME,
str(script_path),
],
env={'DJANGO_SETTINGS_MODULE': 'wcs.settings'},
)

View File

@ -14,7 +14,7 @@ except ImportError:
else: else:
from sentry_sdk.integrations.logging import LoggingIntegration from sentry_sdk.integrations.logging import LoggingIntegration
from . import wcs_api, feeder from . import feeder, wcs_api
def main(): def main():
@ -63,15 +63,14 @@ def configure_sentry(config):
# get DEBUG level logs as breadcrumbs # get DEBUG level logs as breadcrumbs
logger.setLevel(logging.DEBUG) logger.setLevel(logging.DEBUG)
sentry_logging = LoggingIntegration( sentry_logging = LoggingIntegration(level=logging.DEBUG, event_level=logging.ERROR)
level=logging.DEBUG,
event_level=logging.ERROR)
sentry_sdk.init( sentry_sdk.init(
dsn=config.get('sentry', 'dsn'), dsn=config.get('sentry', 'dsn'),
environment=config.get('sentry', 'environment', fallback=None), environment=config.get('sentry', 'environment', fallback=None),
attach_stacktrace=True, attach_stacktrace=True,
integrations=[sentry_logging]) integrations=[sentry_logging],
)
scopes.append(sentry_sdk.push_scope) scopes.append(sentry_sdk.push_scope)
@ -82,17 +81,17 @@ def main2():
except locale.Error: except locale.Error:
# current locale does not exist # current locale does not exist
pass pass
parser = argparse.ArgumentParser(description='Export W.C.S. data as a star schema in a ' parser = argparse.ArgumentParser(
'postgresql DB', add_help=False) description='Export W.C.S. data as a star schema in a ' 'postgresql DB', add_help=False
)
parser.add_argument('config_path', nargs='?', default=None) parser.add_argument('config_path', nargs='?', default=None)
group = parser.add_mutually_exclusive_group() group = parser.add_mutually_exclusive_group()
parser.add_argument('--no-feed', dest='feed', help='only produce the model', parser.add_argument(
action='store_false', default=True) '--no-feed', dest='feed', help='only produce the model', action='store_false', default=True
parser.add_argument('--no-log-errors', dest='no_log_errors', )
action='store_true', default=False) parser.add_argument('--no-log-errors', dest='no_log_errors', action='store_true', default=False)
parser.add_argument('--fake', action='store_true', default=False) parser.add_argument('--fake', action='store_true', default=False)
group.add_argument("-a", "--all", help="synchronize all wcs", action='store_true', group.add_argument("-a", "--all", help="synchronize all wcs", action='store_true', default=False)
default=False)
group.add_argument('--url', help='url of the w.c.s. instance', required=False, default=None) group.add_argument('--url', help='url of the w.c.s. instance', required=False, default=None)
args, rest = parser.parse_known_args() args, rest = parser.parse_known_args()
feed = args.feed feed = args.feed
@ -100,8 +99,7 @@ def main2():
config = get_config(path=args.config_path) config = get_config(path=args.config_path)
configure_sentry(config) configure_sentry(config)
# list all known urls # list all known urls
urls = [url for url in config.sections() if url.startswith('http://') urls = [url for url in config.sections() if url.startswith('http://') or url.startswith('https://')]
or url.startswith('https://')]
defaults = {} defaults = {}
if not args.all: if not args.all:
try: try:
@ -129,7 +127,7 @@ def main2():
if config.has_section('wcs-olap'): if config.has_section('wcs-olap'):
defaults.update(config.items('wcs-olap')) defaults.update(config.items('wcs-olap'))
if config.has_section(url): if config.has_section(url):
defaults.update((config.items(url))) defaults.update(config.items(url))
try: try:
key = defaults['key'] key = defaults['key']
orig = defaults['orig'] orig = defaults['orig']
@ -142,14 +140,25 @@ def main2():
logger.error('configuration incomplete for %s: %s', url, e) logger.error('configuration incomplete for %s: %s', url, e)
else: else:
try: try:
api = wcs_api.WcsApi(url=url, orig=orig, key=key, api = wcs_api.WcsApi(
batch_size=batch_size, url=url,
verify=(defaults.get('verify', 'True') == 'True')) orig=orig,
logger.info('starting synchronizing w.c.s. at %r with PostgreSQL at %s', url, key=key,
pg_dsn) batch_size=batch_size,
verify=(defaults.get('verify', 'True') == 'True'),
)
logger.info('starting synchronizing w.c.s. at %r with PostgreSQL at %s', url, pg_dsn)
olap_feeder = feeder.WcsOlapFeeder( olap_feeder = feeder.WcsOlapFeeder(
api=api, schema=schema, pg_dsn=pg_dsn, logger=logger, api=api,
config=defaults, do_feed=feed, fake=fake, slugs=slugs, scope=scope) schema=schema,
pg_dsn=pg_dsn,
logger=logger,
config=defaults,
do_feed=feed,
fake=fake,
slugs=slugs,
scope=scope,
)
olap_feeder.feed() olap_feeder.feed()
logger.info('finished') logger.info('finished')
feed_result = False feed_result = False
@ -163,5 +172,6 @@ def main2():
if failure: if failure:
sys.exit(1) sys.exit(1)
if __name__ == '__main__': if __name__ == '__main__':
main() main()

View File

@ -1,24 +1,22 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from collections import OrderedDict
import contextlib import contextlib
import datetime
import copy import copy
import itertools import datetime
import os
import json
import hashlib import hashlib
import itertools
import json
import os
import reprlib import reprlib
from .utils import Whatever import time
from collections import OrderedDict
import psycopg2 import psycopg2
import psycopg2.errorcodes import psycopg2.errorcodes
import time
from cached_property import cached_property from cached_property import cached_property
from wcs_olap.wcs_api import WcsApiError from wcs_olap.wcs_api import WcsApiError
from .utils import Whatever
psycopg2.extensions.register_type(psycopg2.extensions.UNICODE) psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
psycopg2.extensions.register_type(psycopg2.extensions.UNICODEARRAY) psycopg2.extensions.register_type(psycopg2.extensions.UNICODEARRAY)
@ -29,9 +27,10 @@ def truncate_pg_identifier(identifier, hash_length=6, force_hash=False):
else: else:
# insert hash in the middle, to keep some readability # insert hash in the middle, to keep some readability
return ( return (
identifier[:(63 - hash_length) // 2] identifier[: (63 - hash_length) // 2]
+ hashlib.md5(identifier.encode('utf-8')).hexdigest()[:hash_length] + hashlib.md5(identifier.encode('utf-8')).hexdigest()[:hash_length]
+ identifier[-(63 - hash_length) // 2:]) + identifier[-(63 - hash_length) // 2 :]
)
@contextlib.contextmanager @contextlib.contextmanager
@ -51,7 +50,7 @@ def slugify(s):
return s.replace('-', '_').replace(' ', '_') return s.replace('-', '_').replace(' ', '_')
class Context(object): class Context:
def __init__(self): def __init__(self):
self.stack = [] self.stack = []
@ -68,7 +67,7 @@ class Context(object):
return r return r
class WcsOlapFeeder(object): class WcsOlapFeeder:
channels = [ channels = [
[1, 'web', 'web'], [1, 'web', 'web'],
@ -80,18 +79,20 @@ class WcsOlapFeeder(object):
[7, 'fax', 'fax'], [7, 'fax', 'fax'],
[8, 'social-network', 'réseau social'], [8, 'social-network', 'réseau social'],
] ]
channel_to_id = dict((c[1], c[0]) for c in channels) channel_to_id = {c[1]: c[0] for c in channels}
id_to_channel = dict((c[0], c[1]) for c in channels) id_to_channel = {c[0]: c[1] for c in channels}
status = [ status = [
[1, 'Nouveau'], [1, 'Nouveau'],
[2, 'En cours'], [2, 'En cours'],
[3, 'Terminé'], [3, 'Terminé'],
] ]
status_to_id = dict((c[1], c[0]) for c in channels) status_to_id = {c[1]: c[0] for c in channels}
id_to_status = dict((c[0], c[1]) for c in channels) id_to_status = {c[0]: c[1] for c in channels}
def __init__(self, api, pg_dsn, schema, logger=None, config=None, do_feed=True, fake=False, slugs=None, scope=None): def __init__(
self, api, pg_dsn, schema, logger=None, config=None, do_feed=True, fake=False, slugs=None, scope=None
):
self.api = api self.api = api
self.slugs = slugs self.slugs = slugs
self.fake = fake self.fake = fake
@ -103,23 +104,25 @@ class WcsOlapFeeder(object):
self.schema_temp = truncate_pg_identifier(schema + '_temp') self.schema_temp = truncate_pg_identifier(schema + '_temp')
self.do_feed = do_feed self.do_feed = do_feed
self.ctx = Context() self.ctx = Context()
self.ctx.push({ self.ctx.push(
'schema': self.schema, {
'schema_temp': self.schema_temp, 'schema': self.schema,
'role_table': 'role', 'schema_temp': self.schema_temp,
'channel_table': 'channel', 'role_table': 'role',
'category_table': 'category', 'channel_table': 'channel',
'form_table': 'formdef', 'category_table': 'category',
'generic_formdata_table': 'formdata', 'form_table': 'formdef',
'generic_status_table': 'status', 'generic_formdata_table': 'formdata',
'generic_evolution_table': 'evolution', 'generic_status_table': 'status',
'year_table': 'year', 'generic_evolution_table': 'evolution',
'month_table': 'month', 'year_table': 'year',
'day_table': 'day', 'month_table': 'month',
'dow_table': 'dow', 'day_table': 'day',
'hour_table': 'hour', 'dow_table': 'dow',
'agent_table': 'agent', 'hour_table': 'hour',
}) 'agent_table': 'agent',
}
)
self.config = config or {} self.config = config or {}
self.model = { self.model = {
'label': self.config.get('cubes_label', schema), 'label': self.config.get('cubes_label', schema),
@ -276,19 +279,19 @@ class WcsOlapFeeder(object):
'label': 'pourcentage des demandes', 'label': 'pourcentage des demandes',
'type': 'percent', 'type': 'percent',
"expression": 'case (select count({fact_table}.id) from {table_expression} ' "expression": 'case (select count({fact_table}.id) from {table_expression} '
'where {where_conditions}) when 0 then null else ' 'where {where_conditions}) when 0 then null else '
'count({fact_table}.id) * 100. / (select ' 'count({fact_table}.id) * 100. / (select '
'count({fact_table}.id) from {table_expression} where ' 'count({fact_table}.id) from {table_expression} where '
'{where_conditions}) end', '{where_conditions}) end',
}, },
{ {
'name': 'geolocation', 'name': 'geolocation',
'label': 'localisation géographique', 'label': 'localisation géographique',
'type': 'point', 'type': 'point',
'expression': 'array_agg({fact_table}.geolocation_base) ' 'expression': 'array_agg({fact_table}.geolocation_base) '
'FILTER (WHERE {fact_table}.geolocation_base IS NOT NULL)', 'FILTER (WHERE {fact_table}.geolocation_base IS NOT NULL)',
} },
] ],
} }
self.model['cubes'].append(cube) self.model['cubes'].append(cube)
self.base_cube = self.model['cubes'][0] self.base_cube = self.model['cubes'][0]
@ -309,7 +312,11 @@ class WcsOlapFeeder(object):
@cached_property @cached_property
def formdefs(self): def formdefs(self):
return [formdef for formdef in self.api.formdefs if (not self.slugs or formdef.slug in self.slugs) and not formdef.is_empty] return [
formdef
for formdef in self.api.formdefs
if (not self.slugs or formdef.slug in self.slugs) and not formdef.is_empty
]
@cached_property @cached_property
def roles(self): def roles(self):
@ -343,7 +350,9 @@ class WcsOlapFeeder(object):
try: try:
self.cur.execute(sql, vars=vars) self.cur.execute(sql, vars=vars)
except Exception as e: except Exception as e:
self.logger.warning('Failed to execute %r with vars %s, raised %s', sql, reprlib.repr(vars or []), e) self.logger.warning(
'Failed to execute %r with vars %s, raised %s', sql, reprlib.repr(vars or []), e
)
raise raise
@contextlib.contextmanager @contextlib.contextmanager
@ -381,23 +390,33 @@ class WcsOlapFeeder(object):
Drop tables one by one in order to avoid reaching max_locks_per_transaction Drop tables one by one in order to avoid reaching max_locks_per_transaction
""" """
# drop foreign key constraints first # drop foreign key constraints first
self.ex("SELECT table_name, constraint_name FROM " self.ex(
"information_schema.key_column_usage " "SELECT table_name, constraint_name FROM "
"WHERE table_schema = %s AND constraint_name LIKE '%%_fkey'", vars=[schema]) "information_schema.key_column_usage "
"WHERE table_schema = %s AND constraint_name LIKE '%%_fkey'",
vars=[schema],
)
for table_name, constraint_name in self.cur.fetchall(): for table_name, constraint_name in self.cur.fetchall():
# drop of PK constraints can have effects on FK constraint on other tables. # drop of PK constraints can have effects on FK constraint on other tables.
with ignore_undefined_object_or_table(): with ignore_undefined_object_or_table():
self.ex('ALTER TABLE %s.%s DROP CONSTRAINT IF EXISTS %s CASCADE' self.ex(
% (quote(schema), quote(table_name), quote(constraint_name))) 'ALTER TABLE %s.%s DROP CONSTRAINT IF EXISTS %s CASCADE'
% (quote(schema), quote(table_name), quote(constraint_name))
)
# remove others # remove others
self.ex("SELECT table_name, constraint_name FROM " self.ex(
"information_schema.key_column_usage " "SELECT table_name, constraint_name FROM "
"WHERE table_schema = %s", vars=[schema]) "information_schema.key_column_usage "
"WHERE table_schema = %s",
vars=[schema],
)
for table_name, constraint_name in self.cur.fetchall(): for table_name, constraint_name in self.cur.fetchall():
# drop of PK constraints can have effects on FK constraint on other tables. # drop of PK constraints can have effects on FK constraint on other tables.
with ignore_undefined_object_or_table(): with ignore_undefined_object_or_table():
self.ex('ALTER TABLE %s.%s DROP CONSTRAINT IF EXISTS %s CASCADE' self.ex(
% (quote(schema), quote(table_name), quote(constraint_name))) 'ALTER TABLE %s.%s DROP CONSTRAINT IF EXISTS %s CASCADE'
% (quote(schema), quote(table_name), quote(constraint_name))
)
# then drop indexes # then drop indexes
self.ex("SELECT tablename, indexname FROM pg_indexes WHERE schemaname = %s", vars=[schema]) self.ex("SELECT tablename, indexname FROM pg_indexes WHERE schemaname = %s", vars=[schema])
for table_name, index_name in self.cur.fetchall(): for table_name, index_name in self.cur.fetchall():
@ -405,7 +424,9 @@ class WcsOlapFeeder(object):
self.ex('DROP INDEX %s.%s CASCADE' % (quote(schema), quote(index_name))) self.ex('DROP INDEX %s.%s CASCADE' % (quote(schema), quote(index_name)))
# finally drop tables, cascade will have no effect # finally drop tables, cascade will have no effect
self.ex("SELECT tablename FROM pg_tables WHERE schemaname = %s ORDER BY tablename DESC", vars=[schema]) self.ex(
"SELECT tablename FROM pg_tables WHERE schemaname = %s ORDER BY tablename DESC", vars=[schema]
)
for table in self.cur.fetchall(): for table in self.cur.fetchall():
tablename = '%s.%s' % (quote(schema), quote(table[0])) tablename = '%s.%s' % (quote(schema), quote(table[0]))
self.ex('DROP TABLE IF EXISTS %s;' % tablename) self.ex('DROP TABLE IF EXISTS %s;' % tablename)
@ -418,7 +439,8 @@ class WcsOlapFeeder(object):
first_date = max_date or datetime.date(2010, 1, 1) first_date = max_date or datetime.date(2010, 1, 1)
last_date = (datetime.date.today() + datetime.timedelta(days=150)).replace(month=12, day=31) last_date = (datetime.date.today() + datetime.timedelta(days=150)).replace(month=12, day=31)
self.ex('''INSERT INTO public.dates SELECT self.ex(
'''INSERT INTO public.dates SELECT
the_date.the_date::date AS date, the_date.the_date::date AS date,
to_char(the_date.the_date, 'TMday') AS day, to_char(the_date.the_date, 'TMday') AS day,
to_char(the_date.the_date, 'TMmonth') AS month to_char(the_date.the_date, 'TMmonth') AS month
@ -427,7 +449,8 @@ class WcsOlapFeeder(object):
LEFT JOIN public.dates AS dates LEFT JOIN public.dates AS dates
ON the_date.the_date = dates.date ON the_date.the_date = dates.date
WHERE dates.date IS NULL''', WHERE dates.date IS NULL''',
vars=[first_date, last_date]) vars=[first_date, last_date],
)
def create_table(self, name, columns, inherits=None, comment=None, unlogged=True): def create_table(self, name, columns, inherits=None, comment=None, unlogged=True):
if unlogged: if unlogged:
@ -448,18 +471,21 @@ class WcsOlapFeeder(object):
return self.cur.fetchone()[0] return self.cur.fetchone()[0]
def update_table_sequence_number(self, name): def update_table_sequence_number(self, name):
self.ex("""SELECT setval(pg_get_serial_sequence('{name}', 'id'), self.ex(
(SELECT GREATEST(1, MAX(id)) FROM {name}))""", ctx={'name': quote(name)}) """SELECT setval(pg_get_serial_sequence('{name}', 'id'),
(SELECT GREATEST(1, MAX(id)) FROM {name}))""",
ctx={'name': quote(name)},
)
def create_labeled_table_serial(self, name, comment): def create_labeled_table_serial(self, name, comment):
self.create_table( self.create_table(
name, [['id', 'serial primary key'], ['label', 'varchar']], comment=comment, unlogged=False) name, [['id', 'serial primary key'], ['label', 'varchar']], comment=comment, unlogged=False
)
if self.prev_table_exists(name): if self.prev_table_exists(name):
# Insert data from previous table # Insert data from previous table
self.ex( self.ex(
'INSERT INTO {schema_temp}.{name} SELECT * FROM {schema}.{name}', 'INSERT INTO {schema_temp}.{name} SELECT * FROM {schema}.{name}', ctx={'name': quote(name)}
ctx={'name': quote(name)}
) )
self.update_table_sequence_number(name) self.update_table_sequence_number(name)
@ -476,12 +502,15 @@ class WcsOlapFeeder(object):
self.ex( self.ex(
'SELECT ref, {column} FROM {name} WHERE ref = %s', 'SELECT ref, {column} FROM {name} WHERE ref = %s',
ctx={'name': quote(name), 'column': quote(update_column)}, ctx={'name': quote(name), 'column': quote(update_column)},
vars=(ref,)) vars=(ref,),
)
if self.cur.fetchall(): if self.cur.fetchall():
for item in self.cur.fetchall(): for item in self.cur.fetchall():
self.ex('UPDATE {name} SET {column}=%s WHERE ref=%s', self.ex(
ctx={'name': quote(name), 'column': quote(update_column)}, 'UPDATE {name} SET {column}=%s WHERE ref=%s',
vars=[item[1], ref]) ctx={'name': quote(name), 'column': quote(update_column)},
vars=[item[1], ref],
)
else: else:
to_insert.append(item) to_insert.append(item)
if to_insert: if to_insert:
@ -489,35 +518,32 @@ class WcsOlapFeeder(object):
tmpl = ', '.join(['(DEFAULT, %s)' % columns_values] * len(data)) tmpl = ', '.join(['(DEFAULT, %s)' % columns_values] * len(data))
query = 'INSERT INTO {name} VALUES %s' % tmpl query = 'INSERT INTO {name} VALUES %s' % tmpl
self.ex(query, ctx={'name': quote(name)}, # 'column': quote(update_column)}, self.ex(
vars=list(itertools.chain(*to_insert))) query,
ctx={'name': quote(name)}, # 'column': quote(update_column)},
vars=list(itertools.chain(*to_insert)),
)
result = {} result = {}
self.ex('SELECT id, {column} FROM {name}', ctx={'name': quote(name), self.ex('SELECT id, {column} FROM {name}', ctx={'name': quote(name), 'column': result_column})
'column': result_column}) for _id, column in self.cur.fetchall():
for _id, column in self.cur.fetchall():
result[column] = _id result[column] = _id
return result return result
def create_labeled_table(self, name, labels, comment=None): def create_labeled_table(self, name, labels, comment=None):
self.create_table( self.create_table(
name, name, [['id', 'integer primary key'], ['label', 'varchar']], comment=comment, unlogged=False
[ )
['id', 'integer primary key'],
['label', 'varchar']
], comment=comment, unlogged=False)
if self.prev_table_exists(name): if self.prev_table_exists(name):
# Insert data from previous table # Insert data from previous table
self.ex( self.ex(
'INSERT INTO {schema_temp}.{name} select * FROM {schema}.{name}', 'INSERT INTO {schema_temp}.{name} select * FROM {schema}.{name}', ctx={'name': quote(name)}
ctx={'name': quote(name)}
) )
# Find what is missing # Find what is missing
to_insert = [] to_insert = []
for _id, _label in labels: for _id, _label in labels:
self.ex( self.ex('SELECT * FROM {name} WHERE label = %s', ctx={'name': quote(name)}, vars=(_label,))
'SELECT * FROM {name} WHERE label = %s', ctx={'name': quote(name)}, vars=(_label,))
if self.cur.fetchone() is None: if self.cur.fetchone() is None:
to_insert.append(_label) to_insert.append(_label)
@ -551,19 +577,19 @@ class WcsOlapFeeder(object):
tmp_cat_map = self.do_referenced_data(table_name, categories_data, 'label') tmp_cat_map = self.do_referenced_data(table_name, categories_data, 'label')
self.update_table_sequence_number(table_name) self.update_table_sequence_number(table_name)
# remap categories ids to ids in the table # remap categories ids to ids in the table
return dict((c.title, tmp_cat_map[c.title]) for c in self.categories) return {c.title: tmp_cat_map[c.title] for c in self.categories}
def do_formdef_table(self): def do_formdef_table(self):
categories_mapping = self.do_category_table() categories_mapping = self.do_category_table()
formdef_fields = [['category_id', 'integer REFERENCES {category_table} (id)'], formdef_fields = [['category_id', 'integer REFERENCES {category_table} (id)'], ['label', 'varchar']]
['label', 'varchar']
]
table_name = self.default_ctx['form_table'] table_name = self.default_ctx['form_table']
self.create_referenced_table(table_name, formdef_fields, 'types de formulaire') self.create_referenced_table(table_name, formdef_fields, 'types de formulaire')
formdefs = [(formdef.slug, categories_mapping.get(formdef.schema.category), formdefs = [
formdef.schema.name) for formdef in self.formdefs] (formdef.slug, categories_mapping.get(formdef.schema.category), formdef.schema.name)
for formdef in self.formdefs
]
self.formdefs_mapping = self.do_referenced_data(table_name, formdefs, 'ref') self.formdefs_mapping = self.do_referenced_data(table_name, formdefs, 'ref')
self.formdefs_unique_slug = {} self.formdefs_unique_slug = {}
seen = set() seen = set()
@ -580,23 +606,19 @@ class WcsOlapFeeder(object):
def do_base_table(self): def do_base_table(self):
# channels # channels
self.create_labeled_table('channel', [[c[0], c[2]] for c in self.channels], self.create_labeled_table('channel', [[c[0], c[2]] for c in self.channels], comment='canal')
comment='canal')
# roles # roles
roles = dict((i, role.name) for i, role in enumerate(self.roles)) roles = {i: role.name for i, role in enumerate(self.roles)}
tmp_role_map = self.create_labeled_table('role', roles.items(), comment='role') tmp_role_map = self.create_labeled_table('role', roles.items(), comment='role')
self.role_mapping = dict( self.role_mapping = {role.id: tmp_role_map[role.name] for role in self.roles}
(role.id, tmp_role_map[role.name]) for role in self.roles)
# forms # forms
self.do_formdef_table() self.do_formdef_table()
self.create_labeled_table('hour', zip(range(0, 24), map(str, range(0, 24))), self.create_labeled_table('hour', zip(range(0, 24), map(str, range(0, 24))), comment='heures')
comment='heures')
self.create_labeled_table('status', self.status, self.create_labeled_table('status', self.status, comment='statuts simplifiés')
comment='statuts simplifiés')
# agents # agents
self.create_labeled_table_serial('agent', comment='agents') self.create_labeled_table_serial('agent', comment='agents')
@ -630,14 +652,20 @@ class WcsOlapFeeder(object):
self.ex('COMMENT ON COLUMN {generic_formdata_table}.%s IS %%s' % at, vars=(comment,)) self.ex('COMMENT ON COLUMN {generic_formdata_table}.%s IS %%s' % at, vars=(comment,))
self.ex('COMMENT ON TABLE {generic_formdata_table} IS %s', vars=('tous les formulaires',)) self.ex('COMMENT ON TABLE {generic_formdata_table} IS %s', vars=('tous les formulaires',))
# evolutions # evolutions
self.create_table('{generic_evolution_table}', [ self.create_table(
['id', 'serial primary key'], '{generic_evolution_table}',
['generic_status_id', 'smallint REFERENCES {generic_status_table} (id)'], [
['formdata_id', 'integer'], # "REFERENCES {generic_formdata_table} (id)" is impossible because FK constraints do not work on inherited tables ['id', 'serial primary key'],
['time', 'timestamp'], ['generic_status_id', 'smallint REFERENCES {generic_status_table} (id)'],
['date', 'date'], [
['hour_id', 'smallint REFERENCES {hour_table} (id)'], 'formdata_id',
]) 'integer',
], # "REFERENCES {generic_formdata_table} (id)" is impossible because FK constraints do not work on inherited tables
['time', 'timestamp'],
['date', 'date'],
['hour_id', 'smallint REFERENCES {hour_table} (id)'],
],
)
self.ex('COMMENT ON TABLE {generic_evolution_table} IS %s', vars=('evolution générique',)) self.ex('COMMENT ON TABLE {generic_evolution_table} IS %s', vars=('evolution générique',))
def feed(self): def feed(self):
@ -678,37 +706,38 @@ class WcsOlapFeeder(object):
def switch_schemas(self): def switch_schemas(self):
self.old_schema = truncate_pg_identifier( self.old_schema = truncate_pg_identifier(
self.schema self.schema + '_old_' + datetime.datetime.now().strftime('%Y%m%d%H%M')
+ '_old_' )
+ datetime.datetime.now().strftime('%Y%m%d%H%M'))
self.ctx.push({'old_schema': self.old_schema}) self.ctx.push({'old_schema': self.old_schema})
try: try:
def switch(): def switch():
with self.atomic(): with self.atomic():
self.logger.info('renaming schema %s to %s and schema %s to %s', self.logger.info(
self.schema, self.old_schema, 'renaming schema %s to %s and schema %s to %s',
self.schema_temp, self.schema) self.schema,
self.old_schema,
self.schema_temp,
self.schema,
)
if self.schema_exists(self.schema): if self.schema_exists(self.schema):
self.ex('ALTER SCHEMA {schema} RENAME TO {old_schema}') self.ex('ALTER SCHEMA {schema} RENAME TO {old_schema}')
self.ex('ALTER SCHEMA {schema_temp} RENAME TO {schema}') self.ex('ALTER SCHEMA {schema_temp} RENAME TO {schema}')
self.retry_on_db_error(
switch, self.retry_on_db_error(switch, times=33, sleep_time=1)
times=33,
sleep_time=1)
except Exception: except Exception:
self.logger.warning('failed to switch schemas 3-times in 33 seconds') self.logger.warning('failed to switch schemas 3-times in 33 seconds')
raise raise
try: try:
def drop_old_schema(): def drop_old_schema():
self.logger.info('dropping schema %s', self.old_schema) self.logger.info('dropping schema %s', self.old_schema)
self.drop_tables_sequencially(self.old_schema) self.drop_tables_sequencially(self.old_schema)
self.ex('DROP SCHEMA IF EXISTS {old_schema} CASCADE') self.ex('DROP SCHEMA IF EXISTS {old_schema} CASCADE')
self.retry_on_db_error(
drop_old_schema, self.retry_on_db_error(drop_old_schema, times=33, sleep_time=1)
times=33,
sleep_time=1)
except Exception: except Exception:
self.logger.exception('could not drop schema %s', self.old_schema) self.logger.exception('could not drop schema %s', self.old_schema)
@ -730,10 +759,14 @@ class WcsOlapFeeder(object):
def create_formdata_json_index(self, table_name, varname): def create_formdata_json_index(self, table_name, varname):
if varname in self.formdata_json_index: if varname in self.formdata_json_index:
return return
index_name = self.hash_table_name('%s_%s_json_idx' % (table_name, varname), hash_length=8, index_name = self.hash_table_name(
force_hash=True) '%s_%s_json_idx' % (table_name, varname), hash_length=8, force_hash=True
self.ex('CREATE INDEX {index_name} ON {generic_formdata_table} (("json_data"->>%s))', )
ctx={'index_name': quote(index_name)}, vars=[varname]) self.ex(
'CREATE INDEX {index_name} ON {generic_formdata_table} (("json_data"->>%s))',
ctx={'index_name': quote(index_name)},
vars=[varname],
)
# prevent double creation # prevent double creation
self.formdata_json_index.append(varname) self.formdata_json_index.append(varname)
@ -742,7 +775,7 @@ def has_digits_validation(field):
return field.validation and field.validation.get('type') == 'digits' return field.validation and field.validation.get('type') == 'digits'
class WcsFormdefFeeder(object): class WcsFormdefFeeder:
def __init__(self, olap_feeder, formdef, do_feed=True): def __init__(self, olap_feeder, formdef, do_feed=True):
self.olap_feeder = olap_feeder self.olap_feeder = olap_feeder
self.formdef = formdef self.formdef = formdef
@ -771,26 +804,31 @@ class WcsFormdefFeeder(object):
def do_statuses(self): def do_statuses(self):
statuses = self.formdef.schema.workflow.statuses statuses = self.formdef.schema.workflow.statuses
tmp_status_map = self.olap_feeder.create_labeled_table( tmp_status_map = self.olap_feeder.create_labeled_table(
self.status_table_name, enumerate([s.name for s in statuses]), self.status_table_name,
comment='statuts du formulaire « %s »' % self.formdef.schema.name) enumerate([s.name for s in statuses]),
self.status_mapping = dict((s.id, tmp_status_map[s.name]) for s in statuses) comment='statuts du formulaire « %s »' % self.formdef.schema.name,
)
self.status_mapping = {s.id: tmp_status_map[s.name] for s in statuses}
def do_data_table(self): def do_data_table(self):
columns = OrderedDict() columns = OrderedDict()
columns['status_id'] = {'sql_col_name': 'status_id', 'sql_col_def': 'smallint REFERENCES {status_table} (id)'} columns['status_id'] = {
'sql_col_name': 'status_id',
'sql_col_def': 'smallint REFERENCES {status_table} (id)',
}
# add item fields # add item fields
for field in self.good_fields.values(): for field in self.good_fields.values():
if field.type == 'item': if field.type == 'item':
comment = ('valeurs du champ « %s » du formulaire %s' comment = 'valeurs du champ « %s » du formulaire %s' % (field.label, self.formdef.schema.name)
% (field.label, self.formdef.schema.name))
table_name = self.hash_table_name('%s_field_%s' % (self.table_name, field.varname)) table_name = self.hash_table_name('%s_field_%s' % (self.table_name, field.varname))
# create table and mapping # create table and mapping
if field.items: if field.items:
# filter non string items # filter non string items
items = [item for item in field.items if isinstance(item, str)] items = [item for item in field.items if isinstance(item, str)]
self.items_mappings[field.varname] = self.create_labeled_table( self.items_mappings[field.varname] = self.create_labeled_table(
table_name, enumerate(items), comment=comment) table_name, enumerate(items), comment=comment
)
else: else:
# open item field, from data sources... # open item field, from data sources...
self.create_labeled_table_serial(table_name, comment=comment) self.create_labeled_table_serial(table_name, comment=comment)
@ -839,15 +877,19 @@ class WcsFormdefFeeder(object):
self.columns.append(columns[key]['sql_col_name']) self.columns.append(columns[key]['sql_col_name'])
self.columns.remove('geolocation_base') self.columns.remove('geolocation_base')
self.create_table(self.table_name, self.create_table(
[(columns[key]['sql_col_name'], columns[key]['sql_col_def']) for key in columns], self.table_name,
inherits='{generic_formdata_table}', [(columns[key]['sql_col_name'], columns[key]['sql_col_def']) for key in columns],
comment='formulaire %s' % self.formdef.schema.name) inherits='{generic_formdata_table}',
comment='formulaire %s' % self.formdef.schema.name,
)
for key in columns: for key in columns:
column = columns[key] column = columns[key]
if column.get('sql_comment'): if column.get('sql_comment'):
self.ex('COMMENT ON COLUMN {formdata_table}.%s IS %%s' % quote(column['sql_col_name']), self.ex(
vars=(column['sql_comment'],)) 'COMMENT ON COLUMN {formdata_table}.%s IS %%s' % quote(column['sql_col_name']),
vars=(column['sql_comment'],),
)
# Creat index for JSON fields # Creat index for JSON fields
if self.has_jsonb: if self.has_jsonb:
@ -864,26 +906,37 @@ class WcsFormdefFeeder(object):
self.ex('ALTER TABLE {formdata_table} ADD CONSTRAINT %s' % constraint) self.ex('ALTER TABLE {formdata_table} ADD CONSTRAINT %s' % constraint)
self.ex('ALTER TABLE {formdata_table} ADD PRIMARY KEY (id)') self.ex('ALTER TABLE {formdata_table} ADD PRIMARY KEY (id)')
# table des evolutions # table des evolutions
self.create_table(self.evolution_table_name, [ self.create_table(
['id', 'serial primary key'], self.evolution_table_name,
['status_id', 'smallint REFERENCES {status_table} (id)'], [
['formdata_id', 'integer REFERENCES {formdata_table} (id)'], ['id', 'serial primary key'],
['time', 'timestamp'], ['status_id', 'smallint REFERENCES {status_table} (id)'],
['date', 'date'], ['formdata_id', 'integer REFERENCES {formdata_table} (id)'],
['hour_id', 'smallint REFERENCES {hour_table} (id)'], ['time', 'timestamp'],
]) ['date', 'date'],
self.ex('COMMENT ON TABLE {evolution_table} IS %s', ['hour_id', 'smallint REFERENCES {hour_table} (id)'],
vars=('evolution des demandes %s' % self.formdef.schema.name,)) ],
)
self.ex(
'COMMENT ON TABLE {evolution_table} IS %s',
vars=('evolution des demandes %s' % self.formdef.schema.name,),
)
def insert_item_value(self, field, value): def insert_item_value(self, field, value):
table_name = self.hash_table_name('%s_field_%s' % (self.table_name, field.varname)) table_name = self.hash_table_name('%s_field_%s' % (self.table_name, field.varname))
self.ex('SELECT id FROM {item_table} WHERE label = %s', self.ex(
ctx={'item_table': quote(table_name)}, vars=(value,)) 'SELECT id FROM {item_table} WHERE label = %s',
ctx={'item_table': quote(table_name)},
vars=(value,),
)
res = self.cur.fetchone() res = self.cur.fetchone()
if res: if res:
return res[0] return res[0]
self.ex('INSERT INTO {item_table} (label) VALUES (%s) RETURNING (id)', vars=(value,), self.ex(
ctx={'item_table': quote(table_name)}) 'INSERT INTO {item_table} (label) VALUES (%s) RETURNING (id)',
vars=(value,),
ctx={'item_table': quote(table_name)},
)
return self.cur.fetchone()[0] return self.cur.fetchone()[0]
def get_item_id(self, field, value): def get_item_id(self, field, value):
@ -930,8 +983,9 @@ class WcsFormdefFeeder(object):
try: try:
status = data.formdef.schema.workflow.statuses_map[status_id] status = data.formdef.schema.workflow.statuses_map[status_id]
except KeyError: except KeyError:
self.logger.warning('%s.%s unknown status status_id %s', self.logger.warning(
data.formdef.schema.name, data.id, status_id) '%s.%s unknown status status_id %s', data.formdef.schema.name, data.id, status_id
)
continue continue
channel = data.submission.channel.lower() channel = data.submission.channel.lower()
@ -1022,17 +1076,16 @@ class WcsFormdefFeeder(object):
try: try:
status = data.formdef.schema.workflow.statuses_map[evo.status] status = data.formdef.schema.workflow.statuses_map[evo.status]
except KeyError: except KeyError:
self.logger.warning('%s.%s unknown status in evolution %s', self.logger.warning(
data.formdef.schema.name, data.id, evo.status) '%s.%s unknown status in evolution %s', data.formdef.schema.name, data.id, evo.status
)
continue continue
status_id = self.status_mapping[status.id] status_id = self.status_mapping[status.id]
generic_status_id = self.generic_status(status) generic_status_id = self.generic_status(status)
evolution.append( evolution.append([0, status_id, evo.time, evo.time.date(), evo.time.hour])
[0, status_id, evo.time, evo.time.date(), evo.time.hour])
if generic_status_id == last_status: if generic_status_id == last_status:
continue continue
generic_evolution.append( generic_evolution.append([0, generic_status_id, evo.time, evo.time.date(), evo.time.hour])
[0, generic_status_id, evo.time, evo.time.date(), evo.time.hour])
last_status = generic_status_id last_status = generic_status_id
generic_evolution_values.append(generic_evolution) generic_evolution_values.append(generic_evolution)
evolution_values.append(evolution) evolution_values.append(evolution)
@ -1041,12 +1094,11 @@ class WcsFormdefFeeder(object):
return return
insert_columns = ['%s' % quote(column) for column in self.columns[1:]] insert_columns = ['%s' % quote(column) for column in self.columns[1:]]
insert_columns = ', '.join(insert_columns) insert_columns = ', '.join(insert_columns)
self.ex('INSERT INTO {formdata_table} ({columns}) VALUES {values} RETURNING id', self.ex(
ctx=dict( 'INSERT INTO {formdata_table} ({columns}) VALUES {values} RETURNING id',
columns=insert_columns, ctx=dict(columns=insert_columns, values=', '.join(['%s'] * len(values))),
values=', '.join(['%s'] * len(values)) vars=values,
), )
vars=values)
# insert generic evolutions # insert generic evolutions
generic_evolutions = [] generic_evolutions = []
@ -1056,14 +1108,24 @@ class WcsFormdefFeeder(object):
row[0] = formdata_id row[0] = formdata_id
generic_evolutions.append(tuple(row)) generic_evolutions.append(tuple(row))
if len(generic_evolutions) == 500: if len(generic_evolutions) == 500:
self.ex('INSERT INTO {generic_evolution_table} (%s) VALUES %s' % ( self.ex(
', '.join(['formdata_id', 'generic_status_id', 'time', 'date', 'hour_id']), 'INSERT INTO {generic_evolution_table} (%s) VALUES %s'
', '.join(['%s'] * len(generic_evolutions))), vars=generic_evolutions) % (
', '.join(['formdata_id', 'generic_status_id', 'time', 'date', 'hour_id']),
', '.join(['%s'] * len(generic_evolutions)),
),
vars=generic_evolutions,
)
generic_evolutions = [] generic_evolutions = []
if generic_evolutions: if generic_evolutions:
self.ex('INSERT INTO {generic_evolution_table} (%s) VALUES %s' % ( self.ex(
', '.join(['formdata_id', 'generic_status_id', 'time', 'date', 'hour_id']), 'INSERT INTO {generic_evolution_table} (%s) VALUES %s'
', '.join(['%s'] * len(generic_evolutions))), vars=generic_evolutions) % (
', '.join(['formdata_id', 'generic_status_id', 'time', 'date', 'hour_id']),
', '.join(['%s'] * len(generic_evolutions)),
),
vars=generic_evolutions,
)
# insert evolutions # insert evolutions
evolutions = [] evolutions = []
@ -1072,14 +1134,24 @@ class WcsFormdefFeeder(object):
row[0] = formdata_id row[0] = formdata_id
evolutions.append(tuple(row)) evolutions.append(tuple(row))
if len(evolutions) == 500: if len(evolutions) == 500:
self.ex('INSERT INTO {evolution_table} (%s) VALUES %s' % ( self.ex(
', '.join(['formdata_id', 'status_id', 'time', 'date', 'hour_id']), 'INSERT INTO {evolution_table} (%s) VALUES %s'
', '.join(['%s'] * len(evolutions))), vars=evolutions) % (
', '.join(['formdata_id', 'status_id', 'time', 'date', 'hour_id']),
', '.join(['%s'] * len(evolutions)),
),
vars=evolutions,
)
evolutions = [] evolutions = []
if evolutions: if evolutions:
self.ex('INSERT INTO {evolution_table} (%s) VALUES %s' % ( self.ex(
', '.join(['formdata_id', 'status_id', 'time', 'date', 'hour_id']), 'INSERT INTO {evolution_table} (%s) VALUES %s'
', '.join(['%s'] * len(evolutions))), vars=evolutions) % (
', '.join(['formdata_id', 'status_id', 'time', 'date', 'hour_id']),
', '.join(['%s'] * len(evolutions)),
),
vars=evolutions,
)
def get_first_agent_in_evolution(self, formdata): def get_first_agent_in_evolution(self, formdata):
for evo in formdata.evolution: for evo in formdata.evolution:
@ -1087,11 +1159,13 @@ class WcsFormdefFeeder(object):
return self.get_agent(evo.who) return self.get_agent(evo.who)
def feed(self): def feed(self):
self.olap_feeder.ctx.push({ self.olap_feeder.ctx.push(
'formdata_table': quote(self.table_name), {
'status_table': quote(self.status_table_name), 'formdata_table': quote(self.table_name),
'evolution_table': quote(self.evolution_table_name), 'status_table': quote(self.status_table_name),
}) 'evolution_table': quote(self.evolution_table_name),
}
)
# create cube # create cube
cube = self.cube = copy.deepcopy(self.base_cube) cube = self.cube = copy.deepcopy(self.base_cube)
@ -1102,51 +1176,62 @@ class WcsFormdefFeeder(object):
# remove json field from formdef cubes # remove json field from formdef cubes
cube.pop('json_field', None) cube.pop('json_field', None)
cube.update({ cube.update(
'name': self.table_name, {
'label': self.formdef.schema.name, 'name': self.table_name,
'fact_table': quote(self.table_name), 'label': self.formdef.schema.name,
'key': 'id', 'fact_table': quote(self.table_name),
}) 'key': 'id',
cube['dimensions'] = [dimension for dimension in cube['dimensions'] }
if dimension['name'] not in ('category', 'formdef')] )
cube['dimensions'] = [
dimension for dimension in cube['dimensions'] if dimension['name'] not in ('category', 'formdef')
]
# add dimension for status # add dimension for status
cube['joins'].append({ cube['joins'].append(
'name': 'status', {
'table': quote(self.status_table_name), 'name': 'status',
'master': 'status_id', 'table': quote(self.status_table_name),
'detail': 'id', 'master': 'status_id',
'kind': 'left', 'detail': 'id',
}) 'kind': 'left',
cube['dimensions'].append({ }
'name': 'status', )
'label': 'statut', cube['dimensions'].append(
'join': ['status'], {
'type': 'integer', 'name': 'status',
'value': 'status.id', 'label': 'statut',
'value_label': 'status.label', 'join': ['status'],
}) 'type': 'integer',
'value': 'status.id',
'value_label': 'status.label',
}
)
# add dimension for function # add dimension for function
for function, name in self.formdef.schema.workflow.functions.items(): for function, name in self.formdef.schema.workflow.functions.items():
at = 'function_%s' % slugify(function) at = 'function_%s' % slugify(function)
cube['joins'].append({ cube['joins'].append(
'name': at, {
'table': 'role', 'name': at,
'master': quote(at), 'table': 'role',
'detail': 'id', 'master': quote(at),
'kind': 'left', 'detail': 'id',
}) 'kind': 'left',
cube['dimensions'].append({ }
'name': at, )
'label': 'fonction %s' % name.lower(), cube['dimensions'].append(
'join': [at], {
'type': 'integer', 'name': at,
'value': '%s.id' % quote(at), 'label': 'fonction %s' % name.lower(),
'value_label': '%s.label' % quote(at), 'join': [at],
'filter': False, 'type': 'integer',
}) 'value': '%s.id' % quote(at),
'value_label': '%s.label' % quote(at),
'filter': False,
}
)
# add dimensions for item fields # add dimensions for item fields
fields = self.formdef.schema.fields fields = self.formdef.schema.fields
@ -1171,7 +1256,8 @@ class WcsFormdefFeeder(object):
add_warning( add_warning(
'Le champ « %(label)s » a un nom de variable dupliqué « %(varname)s » ' 'Le champ « %(label)s » a un nom de variable dupliqué « %(varname)s » '
'mais pas le même type que « %(label_good)s », tous les champs avec ce nom seront ignorés ' 'mais pas le même type que « %(label_good)s », tous les champs avec ce nom seront ignorés '
'(%(type)s != %(type_good)s).' % { '(%(type)s != %(type_good)s).'
% {
'label': field.label, 'label': field.label,
'varname': field.varname, 'varname': field.varname,
'type': field.type, 'type': field.type,
@ -1183,8 +1269,10 @@ class WcsFormdefFeeder(object):
bad_varnames.add(field.varname) bad_varnames.add(field.varname)
continue continue
if field.varname in bad_varnames: if field.varname in bad_varnames:
add_warning('Le champ « %s » est un doublon d\'un champ de type différent, il a été ignoré.' add_warning(
% field.label) 'Le champ « %s » est un doublon d\'un champ de type différent, il a été ignoré.'
% field.label
)
continue continue
self.good_fields[field.varname] = field self.good_fields[field.varname] = field
@ -1221,8 +1309,9 @@ class WcsFormdefFeeder(object):
'type': 'bool', 'type': 'bool',
'value': quote(field_name), 'value': quote(field_name),
'value_label': '(CASE WHEN %(field)s IS NULL THEN NULL' 'value_label': '(CASE WHEN %(field)s IS NULL THEN NULL'
' WHEN %(field)s THEN \'Oui\'' ' WHEN %(field)s THEN \'Oui\''
' ELSE \'Non\' END)' % { ' ELSE \'Non\' END)'
% {
'field': quote(field_name), 'field': quote(field_name),
}, },
'filter': True, 'filter': True,
@ -1230,12 +1319,14 @@ class WcsFormdefFeeder(object):
elif field.type == 'string': elif field.type == 'string':
if has_digits_validation(field): if has_digits_validation(field):
# we will define a SUM measure instead # we will define a SUM measure instead
cube['measures'].append({ cube['measures'].append(
'name': 'sum_' + dimension_name, {
'label': 'total du champ « %s »' % dimension_label, 'name': 'sum_' + dimension_name,
'type': 'integer', 'label': 'total du champ « %s »' % dimension_label,
'expression': 'SUM({fact_table}.%s)' % quote(field_name), 'type': 'integer',
}) 'expression': 'SUM({fact_table}.%s)' % quote(field_name),
}
)
continue continue
else: else:
dimension = { dimension = {

View File

@ -1,9 +1,9 @@
import urllib.parse as urlparse
import datetime
import base64 import base64
import hmac import datetime
import hashlib import hashlib
import hmac
import random import random
import urllib.parse as urlparse
'''Simple signature scheme for query strings''' '''Simple signature scheme for query strings'''
# from http://repos.entrouvert.org/portail-citoyen.git/tree/portail_citoyen/apps/data_source_plugin/signature.py # from http://repos.entrouvert.org/portail-citoyen.git/tree/portail_citoyen/apps/data_source_plugin/signature.py
@ -24,10 +24,7 @@ def sign_query(query, key, algo='sha256', timestamp=None, nonce=None):
new_query = query new_query = query
if new_query: if new_query:
new_query += '&' new_query += '&'
new_query += urlparse.urlencode(( new_query += urlparse.urlencode((('algo', algo), ('timestamp', timestamp), ('nonce', nonce)))
('algo', algo),
('timestamp', timestamp),
('nonce', nonce)))
signature = base64.b64encode(sign_string(new_query, key, algo=algo)) signature = base64.b64encode(sign_string(new_query, key, algo=algo))
new_query += '&signature=' + urlparse.quote(signature) new_query += '&signature=' + urlparse.quote(signature)
return new_query return new_query
@ -50,8 +47,7 @@ def check_url(url, key, known_nonce=None, timedelta=30):
def check_query(query, key, known_nonce=None, timedelta=30): def check_query(query, key, known_nonce=None, timedelta=30):
parsed = urlparse.parse_qs(query) parsed = urlparse.parse_qs(query)
if not ('signature' in parsed and 'algo' in parsed if not ('signature' in parsed and 'algo' in parsed and 'timestamp' in parsed and 'nonce' in parsed):
and 'timestamp' in parsed and 'nonce' in parsed):
return False return False
unsigned_query, signature_content = query.split('&signature=', 1) unsigned_query, signature_content = query.split('&signature=', 1)
if '&' in signature_content: if '&' in signature_content:

View File

@ -1,4 +1,4 @@
class Whatever(object): class Whatever:
def __call__(*args, **kwargs): def __call__(*args, **kwargs):
pass pass

View File

@ -14,19 +14,18 @@
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import collections
import base64 import base64
import copy import collections
import logging
import datetime
import contextlib import contextlib
import copy
import datetime
import json import json
import logging
import requests
import isodate
import urllib.parse as urlparse import urllib.parse as urlparse
import isodate
import requests
from . import signature from . import signature
@ -34,7 +33,7 @@ class WcsApiError(Exception):
pass pass
class JSONFile(object): class JSONFile:
def __init__(self, d): def __init__(self, d):
self.d = d self.d = d
@ -62,7 +61,7 @@ def to_dict(o):
return o return o
class BaseObject(object): class BaseObject:
def __init__(self, wcs_api, **kwargs): def __init__(self, wcs_api, **kwargs):
self._wcs_api = wcs_api self._wcs_api = wcs_api
self.__dict__.update(**kwargs) self.__dict__.update(**kwargs)
@ -82,7 +81,7 @@ class FormDataWorkflow(BaseObject):
fields = None fields = None
def __init__(self, wcs_api, **kwargs): def __init__(self, wcs_api, **kwargs):
super(FormDataWorkflow, self).__init__(wcs_api, **kwargs) super().__init__(wcs_api, **kwargs)
self.status = BaseObject(wcs_api, **self.status) if self.status else None self.status = BaseObject(wcs_api, **self.status) if self.status else None
self.real_status = BaseObject(wcs_api, **self.real_status) if self.real_status else None self.real_status = BaseObject(wcs_api, **self.real_status) if self.real_status else None
self.fields = self.fields or {} self.fields = self.fields or {}
@ -101,7 +100,7 @@ class Evolution(BaseObject):
parts = None parts = None
def __init__(self, wcs_api, **kwargs): def __init__(self, wcs_api, **kwargs):
super(Evolution, self).__init__(wcs_api, **kwargs) super().__init__(wcs_api, **kwargs)
self.time = isodate.parse_datetime(self.time) self.time = isodate.parse_datetime(self.time)
self.parts = [BaseObject(wcs_api, **part) for part in self.parts or []] self.parts = [BaseObject(wcs_api, **part) for part in self.parts or []]
self.who = EvolutionUser(wcs_api, **self.who) if self.who else None self.who = EvolutionUser(wcs_api, **self.who) if self.who else None
@ -117,7 +116,7 @@ class FormData(BaseObject):
def __init__(self, wcs_api, forms, **kwargs): def __init__(self, wcs_api, forms, **kwargs):
self.forms = forms self.forms = forms
super(FormData, self).__init__(wcs_api, **kwargs) super().__init__(wcs_api, **kwargs)
self.receipt_time = isodate.parse_datetime(self.receipt_time) self.receipt_time = isodate.parse_datetime(self.receipt_time)
self.submission = BaseObject(wcs_api, **self.submission or {}) self.submission = BaseObject(wcs_api, **self.submission or {})
self.workflow = FormDataWorkflow(wcs_api, **self.workflow or {}) self.workflow = FormDataWorkflow(wcs_api, **self.workflow or {})
@ -157,7 +156,7 @@ class FormData(BaseObject):
@property @property
def endpoint_delay(self): def endpoint_delay(self):
'''Compute delay as the time when the last not endpoint status precedes an endpoint '''Compute delay as the time when the last not endpoint status precedes an endpoint
status.''' status.'''
statuses_map = self.formdef.schema.workflow.statuses_map statuses_map = self.formdef.schema.workflow.statuses_map
s = 0 s = 0
for evo in self.evolution[::-1]: for evo in self.evolution[::-1]:
@ -188,13 +187,13 @@ class Workflow(BaseObject):
fields = None fields = None
def __init__(self, wcs_api, **kwargs): def __init__(self, wcs_api, **kwargs):
super(Workflow, self).__init__(wcs_api, **kwargs) super().__init__(wcs_api, **kwargs)
self.statuses = [BaseObject(wcs_api, **v) for v in (self.statuses or [])] self.statuses = [BaseObject(wcs_api, **v) for v in (self.statuses or [])]
assert not hasattr(self.statuses[0], 'startpoint'), 'startpoint is exported by w.c.s. FIXME' assert not hasattr(self.statuses[0], 'startpoint'), 'startpoint is exported by w.c.s. FIXME'
for status in self.statuses: for status in self.statuses:
status.startpoint = False status.startpoint = False
self.statuses[0].startpoint = True self.statuses[0].startpoint = True
self.statuses_map = dict((s.id, s) for s in self.statuses) self.statuses_map = {s.id: s for s in self.statuses}
self.fields = [Field(wcs_api, **field) for field in (self.fields or [])] self.fields = [Field(wcs_api, **field) for field in (self.fields or [])]
@ -214,13 +213,13 @@ class Schema(BaseObject):
geolocations = None geolocations = None
def __init__(self, wcs_api, **kwargs): def __init__(self, wcs_api, **kwargs):
super(Schema, self).__init__(wcs_api, **kwargs) super().__init__(wcs_api, **kwargs)
self.workflow = Workflow(wcs_api, **self.workflow) self.workflow = Workflow(wcs_api, **self.workflow)
self.fields = [Field(wcs_api, **f) for f in self.fields or []] self.fields = [Field(wcs_api, **f) for f in self.fields or []]
self.geolocations = sorted((k, v) for k, v in (self.geolocations or {}).items()) self.geolocations = sorted((k, v) for k, v in (self.geolocations or {}).items())
class FormDatas(object): class FormDatas:
def __init__(self, wcs_api, formdef, full=False, anonymize=False): def __init__(self, wcs_api, formdef, full=False, anonymize=False):
self.wcs_api = wcs_api self.wcs_api = wcs_api
self.formdef = formdef self.formdef = formdef
@ -231,13 +230,14 @@ class FormDatas(object):
def __getitem__(self, slice_or_id): def __getitem__(self, slice_or_id):
# get batch of forms # get batch of forms
if isinstance(slice_or_id, slice): if isinstance(slice_or_id, slice):
def helper(): def helper():
if slice_or_id.stop <= slice_or_id.start or slice_or_id.step: if slice_or_id.stop <= slice_or_id.start or slice_or_id.step:
raise ValueError('invalid slice %s' % slice_or_id) raise ValueError('invalid slice %s' % slice_or_id)
offset = slice_or_id.start offset = slice_or_id.start
limit = slice_or_id.stop - slice_or_id.start limit = slice_or_id.stop - slice_or_id.start
url_parts = ['api/forms/{self.formdef.slug}/list'.format(self=self)] url_parts = [f'api/forms/{self.formdef.slug}/list']
query = {} query = {}
query['full'] = 'on' if self._full else 'off' query['full'] = 'on' if self._full else 'off'
if offset: if offset:
@ -252,10 +252,11 @@ class FormDatas(object):
if not d.get('receipt_time'): if not d.get('receipt_time'):
continue continue
yield FormData(wcs_api=self.wcs_api, forms=self, formdef=self.formdef, **d) yield FormData(wcs_api=self.wcs_api, forms=self, formdef=self.formdef, **d)
return helper() return helper()
# or get one form # or get one form
else: else:
url_parts = ['api/forms/{formdef.slug}/{id}/'.format(formdef=self.formdef, id=slice_or_id)] url_parts = [f'api/forms/{self.formdef.slug}/{slice_or_id}/']
if self.anonymize: if self.anonymize:
url_parts.append('?anonymise=true') url_parts.append('?anonymise=true')
d = self.wcs_api.get_json(*url_parts) d = self.wcs_api.get_json(*url_parts)
@ -282,7 +283,7 @@ class FormDatas(object):
start = 0 start = 0
while True: while True:
empty = True empty = True
for formdef in self[start:start + self.batch_size]: for formdef in self[start : start + self.batch_size]:
empty = False empty = False
yield formdef yield formdef
if empty: if empty:
@ -290,14 +291,14 @@ class FormDatas(object):
start += self.batch_size start += self.batch_size
def __len__(self): def __len__(self):
return len(list((o for o in self))) return len(list(o for o in self))
class CancelSubmitError(Exception): class CancelSubmitError(Exception):
pass pass
class FormDefSubmit(object): class FormDefSubmit:
formdef = None formdef = None
data = None data = None
user_email = None user_email = None
@ -346,16 +347,13 @@ class FormDefSubmit(object):
if value is None or value == {} or value == []: if value is None or value == {} or value == []:
self.data.pop(varname, None) self.data.pop(varname, None)
elif hasattr(self, '_set_type_%s' % field.type): elif hasattr(self, '_set_type_%s' % field.type):
getattr(self, '_set_type_%s' % field.type)( getattr(self, '_set_type_%s' % field.type)(varname=varname, field=field, value=value, **kwargs)
varname=varname,
field=field,
value=value, **kwargs)
else: else:
self.data[varname] = value self.data[varname] = value
def _set_type_item(self, varname, field, value, **kwargs): def _set_type_item(self, varname, field, value, **kwargs):
if isinstance(value, dict): if isinstance(value, dict):
if not set(value).issuperset(set(['id', 'text'])): if not set(value).issuperset({'id', 'text'}):
raise ValueError('item field value must have id and text value') raise ValueError('item field value must have id and text value')
# clean previous values # clean previous values
self.data.pop(varname, None) self.data.pop(varname, None)
@ -378,7 +376,7 @@ class FormDefSubmit(object):
has_dict = False has_dict = False
for choice in value: for choice in value:
if isinstance(value, dict): if isinstance(value, dict):
if not set(value).issuperset(set(['id', 'text'])): if not set(value).issuperset({'id', 'text'}):
raise ValueError('items field values must have id and text value') raise ValueError('items field values must have id and text value')
has_dict = True has_dict = True
if has_dict: if has_dict:
@ -407,7 +405,7 @@ class FormDefSubmit(object):
elif isinstance(value, bytes): elif isinstance(value, bytes):
content = base64.b64encode(value).decode('ascii') content = base64.b64encode(value).decode('ascii')
elif isinstance(value, dict): elif isinstance(value, dict):
if not set(value).issuperset(set(['filename', 'content'])): if not set(value).issuperset({'filename', 'content'}):
raise ValueError('file field needs a dict value with filename and content') raise ValueError('file field needs a dict value with filename and content')
content = value['content'] content = value['content']
filename = value['filename'] filename = value['filename']
@ -432,7 +430,7 @@ class FormDefSubmit(object):
def _set_type_map(self, varname, field, value): def _set_type_map(self, varname, field, value):
if not isinstance(value, dict): if not isinstance(value, dict):
raise TypeError('value must be a dict for a map field') raise TypeError('value must be a dict for a map field')
if set(value) != set(['lat', 'lon']): if set(value) != {'lat', 'lon'}:
raise ValueError('map field expect keys lat and lon') raise ValueError('map field expect keys lat and lon')
self.data[varname] = value self.data[varname] = value
@ -470,22 +468,19 @@ class FormDef(BaseObject):
@property @property
def schema(self): def schema(self):
if not hasattr(self, '_schema'): if not hasattr(self, '_schema'):
d = self._wcs_api.get_json('api/formdefs/{self.slug}/schema'.format(self=self)) d = self._wcs_api.get_json(f'api/formdefs/{self.slug}/schema')
self._schema = Schema(self._wcs_api, **d) self._schema = Schema(self._wcs_api, **d)
return self._schema return self._schema
@contextlib.contextmanager @contextlib.contextmanager
def submit(self, **kwargs): def submit(self, **kwargs):
submitter = FormDefSubmit( submitter = FormDefSubmit(wcs_api=self._wcs_api, formdef=self, **kwargs)
wcs_api=self._wcs_api,
formdef=self,
**kwargs)
try: try:
yield submitter yield submitter
except CancelSubmitError: except CancelSubmitError:
return return
payload = submitter.payload() payload = submitter.payload()
d = self._wcs_api.post_json(payload, 'api/formdefs/{self.slug}/submit'.format(self=self)) d = self._wcs_api.post_json(payload, f'api/formdefs/{self.slug}/submit')
if d['err'] != 0: if d['err'] != 0:
raise WcsApiError('submited returned an error: %s' % d) raise WcsApiError('submited returned an error: %s' % d)
submitter.result = BaseObject(self._wcs_api, **d['data']) submitter.result = BaseObject(self._wcs_api, **d['data'])
@ -499,7 +494,7 @@ class Category(BaseObject):
pass pass
class WcsObjects(object): class WcsObjects:
url = None url = None
object_class = None object_class = None
@ -519,7 +514,7 @@ class WcsObjects(object):
yield self.object_class(wcs_api=self.wcs_api, **d) yield self.object_class(wcs_api=self.wcs_api, **d)
def __len__(self): def __len__(self):
return len(list((o for o in self))) return len(list(o for o in self))
class Roles(WcsObjects): class Roles(WcsObjects):
@ -538,9 +533,19 @@ class Categories(WcsObjects):
object_class = Category object_class = Category
class WcsApi(object): class WcsApi:
def __init__(self, url, email=None, name_id=None, batch_size=1000, def __init__(
session=None, logger=None, orig=None, key=None, verify=True): self,
url,
email=None,
name_id=None,
batch_size=1000,
session=None,
logger=None,
orig=None,
key=None,
verify=True,
):
self.url = url self.url = url
self.email = email self.email = email
self.name_id = name_id self.name_id = name_id
@ -602,7 +607,8 @@ class WcsApi(object):
final_url, final_url,
data=json.dumps(data), data=json.dumps(data),
headers={'content-type': 'application/json'}, headers={'content-type': 'application/json'},
verify=self.verify) verify=self.verify,
)
response.raise_for_status() response.raise_for_status()
except requests.RequestException as e: except requests.RequestException as e:
content = getattr(getattr(e, 'response', None), 'content', None) content = getattr(getattr(e, 'response', None), 'content', None)