trivial: apply pre-commit
This commit is contained in:
parent
c27d346d92
commit
461254e986
|
@ -2,7 +2,10 @@ Source: wcs-olap
|
|||
Section: python
|
||||
Priority: optional
|
||||
Maintainer: Benjamin Dauvergne <bdauvergne@entrouvert.com>
|
||||
Build-Depends: python3-setuptools, python3-all, debhelper-compat (= 12), dh-python
|
||||
Build-Depends: debhelper-compat (= 12),
|
||||
dh-python,
|
||||
python3-all,
|
||||
python3-setuptools,
|
||||
Standards-Version: 3.9.6
|
||||
Homepage: http://dev.entrouvert.org/projects/wcs-olap/
|
||||
|
||||
|
|
54
setup.py
54
setup.py
|
@ -1,9 +1,9 @@
|
|||
#! /usr/bin/env python
|
||||
|
||||
import subprocess
|
||||
import os
|
||||
import subprocess
|
||||
|
||||
from setuptools import setup, find_packages
|
||||
from setuptools import find_packages, setup
|
||||
from setuptools.command.sdist import sdist
|
||||
|
||||
|
||||
|
@ -21,15 +21,17 @@ class eo_sdist(sdist):
|
|||
|
||||
def get_version():
|
||||
'''Use the VERSION, if absent generates a version with git describe, if not
|
||||
tag exists, take 0.0- and add the length of the commit log.
|
||||
tag exists, take 0.0- and add the length of the commit log.
|
||||
'''
|
||||
if os.path.exists('VERSION'):
|
||||
with open('VERSION', 'r') as v:
|
||||
with open('VERSION') as v:
|
||||
return v.read()
|
||||
if os.path.exists('.git'):
|
||||
p = subprocess.Popen(
|
||||
['git', 'describe', '--dirty=.dirty', '--match=v*'],
|
||||
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
)
|
||||
result = p.communicate()[0]
|
||||
if p.returncode == 0:
|
||||
result = result.decode('ascii').strip()[1:] # strip spaces/newlines and initial v
|
||||
|
@ -44,26 +46,22 @@ def get_version():
|
|||
return '0.0'
|
||||
|
||||
|
||||
setup(name="wcs-olap",
|
||||
version=get_version(),
|
||||
license="AGPLv3+",
|
||||
description="Export w.c.s. data to an OLAP cube",
|
||||
long_description=open('README.rst').read(),
|
||||
url="http://dev.entrouvert.org/projects/publik-bi/",
|
||||
author="Entr'ouvert",
|
||||
author_email="authentic@listes.entrouvert.com",
|
||||
maintainer="Benjamin Dauvergne",
|
||||
maintainer_email="bdauvergne@entrouvert.com",
|
||||
packages=find_packages(),
|
||||
include_package_data=True,
|
||||
install_requires=[
|
||||
'requests',
|
||||
'psycopg2',
|
||||
'isodate',
|
||||
'six',
|
||||
'cached-property'
|
||||
],
|
||||
entry_points={
|
||||
'console_scripts': ['wcs-olap=wcs_olap.cmd:main'],
|
||||
},
|
||||
cmdclass={'sdist': eo_sdist})
|
||||
setup(
|
||||
name="wcs-olap",
|
||||
version=get_version(),
|
||||
license="AGPLv3+",
|
||||
description="Export w.c.s. data to an OLAP cube",
|
||||
long_description=open('README.rst').read(),
|
||||
url="http://dev.entrouvert.org/projects/publik-bi/",
|
||||
author="Entr'ouvert",
|
||||
author_email="authentic@listes.entrouvert.com",
|
||||
maintainer="Benjamin Dauvergne",
|
||||
maintainer_email="bdauvergne@entrouvert.com",
|
||||
packages=find_packages(),
|
||||
include_package_data=True,
|
||||
install_requires=['requests', 'psycopg2', 'isodate', 'six', 'cached-property'],
|
||||
entry_points={
|
||||
'console_scripts': ['wcs-olap=wcs_olap.cmd:main'],
|
||||
},
|
||||
cmdclass={'sdist': eo_sdist},
|
||||
)
|
||||
|
|
|
@ -1,25 +1,22 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
import configparser
|
||||
import os
|
||||
import random
|
||||
import shutil
|
||||
import socket
|
||||
import sys
|
||||
import time
|
||||
import os
|
||||
import shutil
|
||||
import random
|
||||
import socket
|
||||
from unittest import mock
|
||||
from contextlib import closing, contextmanager, ExitStack
|
||||
from collections import namedtuple
|
||||
from contextlib import ExitStack, closing, contextmanager
|
||||
from unittest import mock
|
||||
|
||||
import psycopg2
|
||||
import pytest
|
||||
|
||||
import utils
|
||||
|
||||
Wcs = namedtuple('Wcs', ['url', 'appdir', 'pid'])
|
||||
|
||||
|
||||
class Database(object):
|
||||
class Database:
|
||||
def __init__(self):
|
||||
self.db_name = 'db%s' % random.getrandbits(20)
|
||||
self.dsn = 'dbname=%s' % self.db_name
|
||||
|
@ -60,21 +57,21 @@ def wcs_db():
|
|||
|
||||
|
||||
WCS_SCRIPTS = {
|
||||
'setup-auth': u"""
|
||||
'setup-auth': """
|
||||
from quixote import get_publisher
|
||||
|
||||
get_publisher().cfg['identification'] = {'methods': ['password']}
|
||||
get_publisher().cfg['debug'] = {'display_exceptions': 'text'}
|
||||
get_publisher().write_cfg()
|
||||
""",
|
||||
'setup-storage': u"""
|
||||
'setup-storage': """
|
||||
from quixote import get_publisher
|
||||
|
||||
get_publisher().cfg['postgresql'] = {'database': %(dbname)r, 'port': %(port)r, 'host': %(host)r, 'user': %(user)r, 'password': %(password)r}
|
||||
get_publisher().write_cfg()
|
||||
get_publisher().initialize_sql()
|
||||
""",
|
||||
'create-user': u"""
|
||||
'create-user': """
|
||||
from quixote import get_publisher
|
||||
from qommon.ident.password_accounts import PasswordAccount
|
||||
|
||||
|
@ -87,7 +84,7 @@ account.set_password('user')
|
|||
account.user_id = user.id
|
||||
account.store()
|
||||
""",
|
||||
'create-data': u"""
|
||||
'create-data': """
|
||||
import datetime
|
||||
import random
|
||||
from quixote import get_publisher
|
||||
|
@ -197,32 +194,44 @@ def wcs(tmp_path_factory, wcs_dir, wcs_db):
|
|||
tenant_dir.mkdir()
|
||||
|
||||
utils.run_wcs_script(wcs_dir, WCS_SCRIPTS['setup-auth'], 'setup-auth')
|
||||
utils.run_wcs_script(wcs_dir, WCS_SCRIPTS['setup-storage'] % {
|
||||
utils.run_wcs_script(
|
||||
wcs_dir,
|
||||
WCS_SCRIPTS['setup-storage']
|
||||
% {
|
||||
'dbname': wcs_db.db_name,
|
||||
'port': os.environ.get('PGPORT'),
|
||||
'host': os.environ.get('PGHOST'),
|
||||
'user': os.environ.get('PGUSER'),
|
||||
'password': os.environ.get('PGPASSWORD'),
|
||||
},
|
||||
'setup-storage')
|
||||
'setup-storage',
|
||||
)
|
||||
utils.run_wcs_script(wcs_dir, WCS_SCRIPTS['create-user'], 'create-user')
|
||||
utils.run_wcs_script(wcs_dir, WCS_SCRIPTS['create-data'], 'create-data')
|
||||
|
||||
with (tenant_dir / 'site-options.cfg').open('w') as fd:
|
||||
fd.write(u'''[api-secrets]
|
||||
fd.write(
|
||||
'''[api-secrets]
|
||||
olap = olap
|
||||
''')
|
||||
'''
|
||||
)
|
||||
|
||||
with (wcs_dir / 'wcs.cfg').open('w') as fd:
|
||||
fd.write(u'''[main]
|
||||
app_dir = %s\n''' % (str(wcs_dir).replace('%', '%%')))
|
||||
fd.write(
|
||||
'''[main]
|
||||
app_dir = %s\n'''
|
||||
% (str(wcs_dir).replace('%', '%%'))
|
||||
)
|
||||
|
||||
with (wcs_dir / 'local_settings.py').open('w') as fd:
|
||||
fd.write(u'''
|
||||
fd.write(
|
||||
'''
|
||||
WCS_LEGACY_CONFIG_FILE = '%s/wcs.cfg'
|
||||
THEMES_DIRECTORY = '/'
|
||||
ALLOWED_HOSTS = ['%s']
|
||||
''' % (wcs_dir, utils.HOSTNAME))
|
||||
'''
|
||||
% (wcs_dir, utils.HOSTNAME)
|
||||
)
|
||||
|
||||
# launch a Django worker for running w.c.s.
|
||||
WCS_PID = os.fork()
|
||||
|
@ -230,7 +239,9 @@ ALLOWED_HOSTS = ['%s']
|
|||
os.chdir(os.path.dirname(utils.WCS_MANAGE))
|
||||
os.environ['DJANGO_SETTINGS_MODULE'] = 'wcs.settings'
|
||||
os.environ['WCS_SETTINGS_FILE'] = str(wcs_dir / 'local_settings.py')
|
||||
os.execvp('python', [sys.executable, 'manage.py', 'runserver', '--noreload', '%s:%s' % (ADDRESS, PORT)])
|
||||
os.execvp(
|
||||
'python', [sys.executable, 'manage.py', 'runserver', '--noreload', '%s:%s' % (ADDRESS, PORT)]
|
||||
)
|
||||
sys.exit(0)
|
||||
|
||||
# verify w.c.s. is launched
|
||||
|
@ -272,7 +283,8 @@ def olap_cmd(wcs, tmpdir, postgres_db):
|
|||
model_dir = tmpdir / 'model_dir'
|
||||
model_dir.mkdir()
|
||||
with config_ini.open('w') as fd:
|
||||
fd.write(u'''
|
||||
fd.write(
|
||||
'''
|
||||
[wcs-olap]
|
||||
cubes_model_dirs = {model_dir}
|
||||
pg_dsn = {dsn}
|
||||
|
@ -282,10 +294,14 @@ orig = olap
|
|||
key = olap
|
||||
schema = olap
|
||||
cubes_slug = olap-slug
|
||||
'''.format(wcs=wcs, model_dir=str(model_dir).replace('%', '%%'), dsn=postgres_db.dsn))
|
||||
'''.format(
|
||||
wcs=wcs, model_dir=str(model_dir).replace('%', '%%'), dsn=postgres_db.dsn
|
||||
)
|
||||
)
|
||||
|
||||
import sys
|
||||
|
||||
from wcs_olap import cmd
|
||||
import sys
|
||||
|
||||
def f(no_log_errors=True):
|
||||
old_argv = sys.argv
|
||||
|
@ -327,8 +343,11 @@ def mock_cursor_execute():
|
|||
mocked_cur = mock.Mock(wraps=cur)
|
||||
mocked_cur.execute = mock.Mock(wraps=cur.execute, **execute_mock_kwargs)
|
||||
return mocked_cur
|
||||
|
||||
mocked_conn.cursor = cursor
|
||||
return mocked_conn
|
||||
|
||||
stack.enter_context(mock.patch.object(psycopg2, 'connect', connect))
|
||||
yield None
|
||||
|
||||
yield do
|
||||
|
|
|
@ -28,7 +28,8 @@ def test_post_sync_commands(mock_cursor_execute, wcs, postgres_db, olap_cmd):
|
|||
config.set(
|
||||
'wcs-olap',
|
||||
'post-sync-commands',
|
||||
"NOTIFY coucou, '{schema}';\nSELECT * FROM information_schema.tables")
|
||||
"NOTIFY coucou, '{schema}';\nSELECT * FROM information_schema.tables",
|
||||
)
|
||||
|
||||
queries = []
|
||||
|
||||
|
|
|
@ -17,15 +17,17 @@ def capture_event_mock():
|
|||
return capture_event_mock(*args, **kwargs)
|
||||
|
||||
with contextlib.ExitStack() as stack:
|
||||
|
||||
def new_init(*args, **kwargs):
|
||||
old_init(*args, **kwargs)
|
||||
if sentry_sdk.Hub.current.client:
|
||||
stack.enter_context(
|
||||
mock.patch.object(
|
||||
sentry_sdk.Hub.current.client.transport, 'capture_event', my_capture_event))
|
||||
sentry_sdk.Hub.current.client.transport, 'capture_event', my_capture_event
|
||||
)
|
||||
)
|
||||
|
||||
stack.enter_context(
|
||||
mock.patch.object(sentry_sdk, 'init', new_init))
|
||||
stack.enter_context(mock.patch.object(sentry_sdk, 'init', new_init))
|
||||
yield capture_event_mock
|
||||
|
||||
|
||||
|
|
|
@ -16,17 +16,14 @@
|
|||
|
||||
import datetime
|
||||
import json
|
||||
import pytest
|
||||
import pathlib
|
||||
from urllib.parse import urlparse, parse_qs
|
||||
|
||||
import unittest.mock as mock
|
||||
from urllib.parse import parse_qs, urlparse
|
||||
|
||||
import requests
|
||||
import httmock
|
||||
|
||||
import pytest
|
||||
import requests
|
||||
import utils
|
||||
|
||||
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
|
||||
|
||||
|
||||
|
@ -49,8 +46,9 @@ def test_wcs_fixture(wcs, postgres_db, tmpdir, olap_cmd, caplog):
|
|||
with mock.patch('requests.Session', MockSession):
|
||||
olap_cmd()
|
||||
call_args_list = MockSession.mocks[-1].call_args_list
|
||||
url_with_limits = [call_args[0][0] for call_args in MockSession.mocks[-1].call_args_list
|
||||
if 'limit' in call_args[0][0]]
|
||||
url_with_limits = [
|
||||
call_args[0][0] for call_args in MockSession.mocks[-1].call_args_list if 'limit' in call_args[0][0]
|
||||
]
|
||||
assert url_with_limits, call_args_list
|
||||
for url_with_limit in url_with_limits:
|
||||
parsed_qs = parse_qs(urlparse(url_with_limit).query)
|
||||
|
@ -60,10 +58,12 @@ def test_wcs_fixture(wcs, postgres_db, tmpdir, olap_cmd, caplog):
|
|||
assert (
|
||||
'Le champ « 7th field bad duplicate » a un nom de variable dupliqué '
|
||||
'« duplicate » mais pas le même type que « 6th field bad duplicate », '
|
||||
'tous les champs avec ce nom seront ignorés (string != bool).') in caplog.text
|
||||
'tous les champs avec ce nom seront ignorés (string != bool).'
|
||||
) in caplog.text
|
||||
assert (
|
||||
'Le champ « 11th field third bad duplicate » est un doublon d\'un '
|
||||
'champ de type différent, il a été ignoré.') in caplog.text
|
||||
'champ de type différent, il a été ignoré.'
|
||||
) in caplog.text
|
||||
|
||||
expected_schema = [
|
||||
('agent', 'id'),
|
||||
|
@ -131,15 +131,17 @@ def test_wcs_fixture(wcs, postgres_db, tmpdir, olap_cmd, caplog):
|
|||
('status', 'id'),
|
||||
('status', 'label'),
|
||||
('status_demande', 'id'),
|
||||
('status_demande', 'label')
|
||||
('status_demande', 'label'),
|
||||
]
|
||||
|
||||
# verify SQL schema
|
||||
with postgres_db.conn() as conn:
|
||||
with conn.cursor() as c:
|
||||
c.execute('SELECT table_name, column_name '
|
||||
'FROM information_schema.columns '
|
||||
'WHERE table_schema = \'olap\' ORDER BY table_name, ordinal_position')
|
||||
c.execute(
|
||||
'SELECT table_name, column_name '
|
||||
'FROM information_schema.columns '
|
||||
'WHERE table_schema = \'olap\' ORDER BY table_name, ordinal_position'
|
||||
)
|
||||
|
||||
assert list(c.fetchall()) == expected_schema
|
||||
|
||||
|
@ -165,15 +167,18 @@ def test_wcs_fixture(wcs, postgres_db, tmpdir, olap_cmd, caplog):
|
|||
c.execute('SELECT MIN(date), MAX(date) FROM public.dates')
|
||||
last_date = (datetime.date.today() + datetime.timedelta(days=150)).replace(month=12, day=31)
|
||||
assert c.fetchone()[0:2] == (datetime.date(2011, 1, 1), last_date)
|
||||
c.execute('''SELECT COUNT(*) FROM
|
||||
c.execute(
|
||||
'''SELECT COUNT(*) FROM
|
||||
GENERATE_SERIES(%s::date, %s::date, '1 day'::interval) AS series(date)
|
||||
FULL OUTER JOIN public.dates AS dates ON series.date = dates.date WHERE dates.date IS NULL OR series.date IS NULL''',
|
||||
vars=[datetime.date(2011, 1, 1), last_date])
|
||||
vars=[datetime.date(2011, 1, 1), last_date],
|
||||
)
|
||||
assert c.fetchone()[0] == 0
|
||||
|
||||
# verify JSON schema
|
||||
with (olap_cmd.model_dir / 'olap.model').open() as fd, \
|
||||
(pathlib.Path(__file__).parent / 'olap.model').open() as fd2:
|
||||
with (olap_cmd.model_dir / 'olap.model').open() as fd, (
|
||||
pathlib.Path(__file__).parent / 'olap.model'
|
||||
).open() as fd2:
|
||||
json_schema = json.load(fd)
|
||||
expected_json_schema = json.load(fd2)
|
||||
expected_json_schema['pg_dsn'] = postgres_db.dsn
|
||||
|
@ -183,10 +188,12 @@ FULL OUTER JOIN public.dates AS dates ON series.date = dates.date WHERE dates.da
|
|||
with postgres_db.conn() as conn:
|
||||
with conn.cursor() as c:
|
||||
c.execute('SET search_path = olap')
|
||||
c.execute('SELECT field_good_duplicate, count(id) '
|
||||
'FROM formdata_demande '
|
||||
'GROUP BY field_good_duplicate '
|
||||
'ORDER BY field_good_duplicate')
|
||||
c.execute(
|
||||
'SELECT field_good_duplicate, count(id) '
|
||||
'FROM formdata_demande '
|
||||
'GROUP BY field_good_duplicate '
|
||||
'ORDER BY field_good_duplicate'
|
||||
)
|
||||
assert dict(c.fetchall()) == {
|
||||
'a': 37,
|
||||
'b': 13,
|
||||
|
@ -238,7 +245,7 @@ def test_dimension_stability(wcs, wcs_dir, postgres_db, tmpdir, olap_cmd, caplog
|
|||
assert len(open_refs) == 3
|
||||
|
||||
# Change an item of the field
|
||||
script = u"""
|
||||
script = """
|
||||
import datetime
|
||||
import random
|
||||
from quixote import get_publisher
|
||||
|
@ -292,8 +299,10 @@ formdata.store()
|
|||
assert new_open_refs[-1][1] == 'open_new_value'
|
||||
open_new_id = new_open_refs[-1][0]
|
||||
|
||||
c.execute('''SELECT field_item, "field_itemOpen"
|
||||
FROM formdata_demande ORDER BY id''')
|
||||
c.execute(
|
||||
'''SELECT field_item, "field_itemOpen"
|
||||
FROM formdata_demande ORDER BY id'''
|
||||
)
|
||||
formdata = c.fetchone()
|
||||
assert formdata[0] == bazouka_id
|
||||
assert formdata[1] == open_new_id
|
||||
|
|
|
@ -2,7 +2,6 @@ import os
|
|||
import subprocess
|
||||
import sys
|
||||
|
||||
|
||||
HOSTNAME = '127.0.0.1'
|
||||
WCS_MANAGE = os.environ.get('WCS_MANAGE')
|
||||
|
||||
|
@ -14,5 +13,15 @@ def run_wcs_script(wcs_dir, script, script_name):
|
|||
fd.write(script)
|
||||
|
||||
subprocess.check_call(
|
||||
[sys.executable, WCS_MANAGE, 'runscript', '--app-dir', str(wcs_dir), '--vhost', HOSTNAME, str(script_path)],
|
||||
env={'DJANGO_SETTINGS_MODULE': 'wcs.settings'})
|
||||
[
|
||||
sys.executable,
|
||||
WCS_MANAGE,
|
||||
'runscript',
|
||||
'--app-dir',
|
||||
str(wcs_dir),
|
||||
'--vhost',
|
||||
HOSTNAME,
|
||||
str(script_path),
|
||||
],
|
||||
env={'DJANGO_SETTINGS_MODULE': 'wcs.settings'},
|
||||
)
|
||||
|
|
|
@ -14,7 +14,7 @@ except ImportError:
|
|||
else:
|
||||
from sentry_sdk.integrations.logging import LoggingIntegration
|
||||
|
||||
from . import wcs_api, feeder
|
||||
from . import feeder, wcs_api
|
||||
|
||||
|
||||
def main():
|
||||
|
@ -63,15 +63,14 @@ def configure_sentry(config):
|
|||
# get DEBUG level logs as breadcrumbs
|
||||
logger.setLevel(logging.DEBUG)
|
||||
|
||||
sentry_logging = LoggingIntegration(
|
||||
level=logging.DEBUG,
|
||||
event_level=logging.ERROR)
|
||||
sentry_logging = LoggingIntegration(level=logging.DEBUG, event_level=logging.ERROR)
|
||||
|
||||
sentry_sdk.init(
|
||||
dsn=config.get('sentry', 'dsn'),
|
||||
environment=config.get('sentry', 'environment', fallback=None),
|
||||
attach_stacktrace=True,
|
||||
integrations=[sentry_logging])
|
||||
integrations=[sentry_logging],
|
||||
)
|
||||
|
||||
scopes.append(sentry_sdk.push_scope)
|
||||
|
||||
|
@ -82,17 +81,17 @@ def main2():
|
|||
except locale.Error:
|
||||
# current locale does not exist
|
||||
pass
|
||||
parser = argparse.ArgumentParser(description='Export W.C.S. data as a star schema in a '
|
||||
'postgresql DB', add_help=False)
|
||||
parser = argparse.ArgumentParser(
|
||||
description='Export W.C.S. data as a star schema in a ' 'postgresql DB', add_help=False
|
||||
)
|
||||
parser.add_argument('config_path', nargs='?', default=None)
|
||||
group = parser.add_mutually_exclusive_group()
|
||||
parser.add_argument('--no-feed', dest='feed', help='only produce the model',
|
||||
action='store_false', default=True)
|
||||
parser.add_argument('--no-log-errors', dest='no_log_errors',
|
||||
action='store_true', default=False)
|
||||
parser.add_argument(
|
||||
'--no-feed', dest='feed', help='only produce the model', action='store_false', default=True
|
||||
)
|
||||
parser.add_argument('--no-log-errors', dest='no_log_errors', action='store_true', default=False)
|
||||
parser.add_argument('--fake', action='store_true', default=False)
|
||||
group.add_argument("-a", "--all", help="synchronize all wcs", action='store_true',
|
||||
default=False)
|
||||
group.add_argument("-a", "--all", help="synchronize all wcs", action='store_true', default=False)
|
||||
group.add_argument('--url', help='url of the w.c.s. instance', required=False, default=None)
|
||||
args, rest = parser.parse_known_args()
|
||||
feed = args.feed
|
||||
|
@ -100,8 +99,7 @@ def main2():
|
|||
config = get_config(path=args.config_path)
|
||||
configure_sentry(config)
|
||||
# list all known urls
|
||||
urls = [url for url in config.sections() if url.startswith('http://')
|
||||
or url.startswith('https://')]
|
||||
urls = [url for url in config.sections() if url.startswith('http://') or url.startswith('https://')]
|
||||
defaults = {}
|
||||
if not args.all:
|
||||
try:
|
||||
|
@ -129,7 +127,7 @@ def main2():
|
|||
if config.has_section('wcs-olap'):
|
||||
defaults.update(config.items('wcs-olap'))
|
||||
if config.has_section(url):
|
||||
defaults.update((config.items(url)))
|
||||
defaults.update(config.items(url))
|
||||
try:
|
||||
key = defaults['key']
|
||||
orig = defaults['orig']
|
||||
|
@ -142,14 +140,25 @@ def main2():
|
|||
logger.error('configuration incomplete for %s: %s', url, e)
|
||||
else:
|
||||
try:
|
||||
api = wcs_api.WcsApi(url=url, orig=orig, key=key,
|
||||
batch_size=batch_size,
|
||||
verify=(defaults.get('verify', 'True') == 'True'))
|
||||
logger.info('starting synchronizing w.c.s. at %r with PostgreSQL at %s', url,
|
||||
pg_dsn)
|
||||
api = wcs_api.WcsApi(
|
||||
url=url,
|
||||
orig=orig,
|
||||
key=key,
|
||||
batch_size=batch_size,
|
||||
verify=(defaults.get('verify', 'True') == 'True'),
|
||||
)
|
||||
logger.info('starting synchronizing w.c.s. at %r with PostgreSQL at %s', url, pg_dsn)
|
||||
olap_feeder = feeder.WcsOlapFeeder(
|
||||
api=api, schema=schema, pg_dsn=pg_dsn, logger=logger,
|
||||
config=defaults, do_feed=feed, fake=fake, slugs=slugs, scope=scope)
|
||||
api=api,
|
||||
schema=schema,
|
||||
pg_dsn=pg_dsn,
|
||||
logger=logger,
|
||||
config=defaults,
|
||||
do_feed=feed,
|
||||
fake=fake,
|
||||
slugs=slugs,
|
||||
scope=scope,
|
||||
)
|
||||
olap_feeder.feed()
|
||||
logger.info('finished')
|
||||
feed_result = False
|
||||
|
@ -163,5 +172,6 @@ def main2():
|
|||
if failure:
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
|
|
@ -1,24 +1,22 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
from __future__ import unicode_literals
|
||||
|
||||
from collections import OrderedDict
|
||||
import contextlib
|
||||
import datetime
|
||||
import copy
|
||||
import itertools
|
||||
import os
|
||||
import json
|
||||
import datetime
|
||||
import hashlib
|
||||
import itertools
|
||||
import json
|
||||
import os
|
||||
import reprlib
|
||||
from .utils import Whatever
|
||||
import time
|
||||
from collections import OrderedDict
|
||||
|
||||
import psycopg2
|
||||
import psycopg2.errorcodes
|
||||
import time
|
||||
|
||||
from cached_property import cached_property
|
||||
|
||||
from wcs_olap.wcs_api import WcsApiError
|
||||
|
||||
from .utils import Whatever
|
||||
|
||||
psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
|
||||
psycopg2.extensions.register_type(psycopg2.extensions.UNICODEARRAY)
|
||||
|
||||
|
@ -29,9 +27,10 @@ def truncate_pg_identifier(identifier, hash_length=6, force_hash=False):
|
|||
else:
|
||||
# insert hash in the middle, to keep some readability
|
||||
return (
|
||||
identifier[:(63 - hash_length) // 2]
|
||||
identifier[: (63 - hash_length) // 2]
|
||||
+ hashlib.md5(identifier.encode('utf-8')).hexdigest()[:hash_length]
|
||||
+ identifier[-(63 - hash_length) // 2:])
|
||||
+ identifier[-(63 - hash_length) // 2 :]
|
||||
)
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
|
@ -51,7 +50,7 @@ def slugify(s):
|
|||
return s.replace('-', '_').replace(' ', '_')
|
||||
|
||||
|
||||
class Context(object):
|
||||
class Context:
|
||||
def __init__(self):
|
||||
self.stack = []
|
||||
|
||||
|
@ -68,7 +67,7 @@ class Context(object):
|
|||
return r
|
||||
|
||||
|
||||
class WcsOlapFeeder(object):
|
||||
class WcsOlapFeeder:
|
||||
|
||||
channels = [
|
||||
[1, 'web', 'web'],
|
||||
|
@ -80,18 +79,20 @@ class WcsOlapFeeder(object):
|
|||
[7, 'fax', 'fax'],
|
||||
[8, 'social-network', 'réseau social'],
|
||||
]
|
||||
channel_to_id = dict((c[1], c[0]) for c in channels)
|
||||
id_to_channel = dict((c[0], c[1]) for c in channels)
|
||||
channel_to_id = {c[1]: c[0] for c in channels}
|
||||
id_to_channel = {c[0]: c[1] for c in channels}
|
||||
|
||||
status = [
|
||||
[1, 'Nouveau'],
|
||||
[2, 'En cours'],
|
||||
[3, 'Terminé'],
|
||||
]
|
||||
status_to_id = dict((c[1], c[0]) for c in channels)
|
||||
id_to_status = dict((c[0], c[1]) for c in channels)
|
||||
status_to_id = {c[1]: c[0] for c in channels}
|
||||
id_to_status = {c[0]: c[1] for c in channels}
|
||||
|
||||
def __init__(self, api, pg_dsn, schema, logger=None, config=None, do_feed=True, fake=False, slugs=None, scope=None):
|
||||
def __init__(
|
||||
self, api, pg_dsn, schema, logger=None, config=None, do_feed=True, fake=False, slugs=None, scope=None
|
||||
):
|
||||
self.api = api
|
||||
self.slugs = slugs
|
||||
self.fake = fake
|
||||
|
@ -103,23 +104,25 @@ class WcsOlapFeeder(object):
|
|||
self.schema_temp = truncate_pg_identifier(schema + '_temp')
|
||||
self.do_feed = do_feed
|
||||
self.ctx = Context()
|
||||
self.ctx.push({
|
||||
'schema': self.schema,
|
||||
'schema_temp': self.schema_temp,
|
||||
'role_table': 'role',
|
||||
'channel_table': 'channel',
|
||||
'category_table': 'category',
|
||||
'form_table': 'formdef',
|
||||
'generic_formdata_table': 'formdata',
|
||||
'generic_status_table': 'status',
|
||||
'generic_evolution_table': 'evolution',
|
||||
'year_table': 'year',
|
||||
'month_table': 'month',
|
||||
'day_table': 'day',
|
||||
'dow_table': 'dow',
|
||||
'hour_table': 'hour',
|
||||
'agent_table': 'agent',
|
||||
})
|
||||
self.ctx.push(
|
||||
{
|
||||
'schema': self.schema,
|
||||
'schema_temp': self.schema_temp,
|
||||
'role_table': 'role',
|
||||
'channel_table': 'channel',
|
||||
'category_table': 'category',
|
||||
'form_table': 'formdef',
|
||||
'generic_formdata_table': 'formdata',
|
||||
'generic_status_table': 'status',
|
||||
'generic_evolution_table': 'evolution',
|
||||
'year_table': 'year',
|
||||
'month_table': 'month',
|
||||
'day_table': 'day',
|
||||
'dow_table': 'dow',
|
||||
'hour_table': 'hour',
|
||||
'agent_table': 'agent',
|
||||
}
|
||||
)
|
||||
self.config = config or {}
|
||||
self.model = {
|
||||
'label': self.config.get('cubes_label', schema),
|
||||
|
@ -276,19 +279,19 @@ class WcsOlapFeeder(object):
|
|||
'label': 'pourcentage des demandes',
|
||||
'type': 'percent',
|
||||
"expression": 'case (select count({fact_table}.id) from {table_expression} '
|
||||
'where {where_conditions}) when 0 then null else '
|
||||
'count({fact_table}.id) * 100. / (select '
|
||||
'count({fact_table}.id) from {table_expression} where '
|
||||
'{where_conditions}) end',
|
||||
'where {where_conditions}) when 0 then null else '
|
||||
'count({fact_table}.id) * 100. / (select '
|
||||
'count({fact_table}.id) from {table_expression} where '
|
||||
'{where_conditions}) end',
|
||||
},
|
||||
{
|
||||
'name': 'geolocation',
|
||||
'label': 'localisation géographique',
|
||||
'type': 'point',
|
||||
'expression': 'array_agg({fact_table}.geolocation_base) '
|
||||
'FILTER (WHERE {fact_table}.geolocation_base IS NOT NULL)',
|
||||
}
|
||||
]
|
||||
'FILTER (WHERE {fact_table}.geolocation_base IS NOT NULL)',
|
||||
},
|
||||
],
|
||||
}
|
||||
self.model['cubes'].append(cube)
|
||||
self.base_cube = self.model['cubes'][0]
|
||||
|
@ -309,7 +312,11 @@ class WcsOlapFeeder(object):
|
|||
|
||||
@cached_property
|
||||
def formdefs(self):
|
||||
return [formdef for formdef in self.api.formdefs if (not self.slugs or formdef.slug in self.slugs) and not formdef.is_empty]
|
||||
return [
|
||||
formdef
|
||||
for formdef in self.api.formdefs
|
||||
if (not self.slugs or formdef.slug in self.slugs) and not formdef.is_empty
|
||||
]
|
||||
|
||||
@cached_property
|
||||
def roles(self):
|
||||
|
@ -343,7 +350,9 @@ class WcsOlapFeeder(object):
|
|||
try:
|
||||
self.cur.execute(sql, vars=vars)
|
||||
except Exception as e:
|
||||
self.logger.warning('Failed to execute %r with vars %s, raised %s', sql, reprlib.repr(vars or []), e)
|
||||
self.logger.warning(
|
||||
'Failed to execute %r with vars %s, raised %s', sql, reprlib.repr(vars or []), e
|
||||
)
|
||||
raise
|
||||
|
||||
@contextlib.contextmanager
|
||||
|
@ -381,23 +390,33 @@ class WcsOlapFeeder(object):
|
|||
Drop tables one by one in order to avoid reaching max_locks_per_transaction
|
||||
"""
|
||||
# drop foreign key constraints first
|
||||
self.ex("SELECT table_name, constraint_name FROM "
|
||||
"information_schema.key_column_usage "
|
||||
"WHERE table_schema = %s AND constraint_name LIKE '%%_fkey'", vars=[schema])
|
||||
self.ex(
|
||||
"SELECT table_name, constraint_name FROM "
|
||||
"information_schema.key_column_usage "
|
||||
"WHERE table_schema = %s AND constraint_name LIKE '%%_fkey'",
|
||||
vars=[schema],
|
||||
)
|
||||
for table_name, constraint_name in self.cur.fetchall():
|
||||
# drop of PK constraints can have effects on FK constraint on other tables.
|
||||
with ignore_undefined_object_or_table():
|
||||
self.ex('ALTER TABLE %s.%s DROP CONSTRAINT IF EXISTS %s CASCADE'
|
||||
% (quote(schema), quote(table_name), quote(constraint_name)))
|
||||
self.ex(
|
||||
'ALTER TABLE %s.%s DROP CONSTRAINT IF EXISTS %s CASCADE'
|
||||
% (quote(schema), quote(table_name), quote(constraint_name))
|
||||
)
|
||||
# remove others
|
||||
self.ex("SELECT table_name, constraint_name FROM "
|
||||
"information_schema.key_column_usage "
|
||||
"WHERE table_schema = %s", vars=[schema])
|
||||
self.ex(
|
||||
"SELECT table_name, constraint_name FROM "
|
||||
"information_schema.key_column_usage "
|
||||
"WHERE table_schema = %s",
|
||||
vars=[schema],
|
||||
)
|
||||
for table_name, constraint_name in self.cur.fetchall():
|
||||
# drop of PK constraints can have effects on FK constraint on other tables.
|
||||
with ignore_undefined_object_or_table():
|
||||
self.ex('ALTER TABLE %s.%s DROP CONSTRAINT IF EXISTS %s CASCADE'
|
||||
% (quote(schema), quote(table_name), quote(constraint_name)))
|
||||
self.ex(
|
||||
'ALTER TABLE %s.%s DROP CONSTRAINT IF EXISTS %s CASCADE'
|
||||
% (quote(schema), quote(table_name), quote(constraint_name))
|
||||
)
|
||||
# then drop indexes
|
||||
self.ex("SELECT tablename, indexname FROM pg_indexes WHERE schemaname = %s", vars=[schema])
|
||||
for table_name, index_name in self.cur.fetchall():
|
||||
|
@ -405,7 +424,9 @@ class WcsOlapFeeder(object):
|
|||
self.ex('DROP INDEX %s.%s CASCADE' % (quote(schema), quote(index_name)))
|
||||
|
||||
# finally drop tables, cascade will have no effect
|
||||
self.ex("SELECT tablename FROM pg_tables WHERE schemaname = %s ORDER BY tablename DESC", vars=[schema])
|
||||
self.ex(
|
||||
"SELECT tablename FROM pg_tables WHERE schemaname = %s ORDER BY tablename DESC", vars=[schema]
|
||||
)
|
||||
for table in self.cur.fetchall():
|
||||
tablename = '%s.%s' % (quote(schema), quote(table[0]))
|
||||
self.ex('DROP TABLE IF EXISTS %s;' % tablename)
|
||||
|
@ -418,7 +439,8 @@ class WcsOlapFeeder(object):
|
|||
|
||||
first_date = max_date or datetime.date(2010, 1, 1)
|
||||
last_date = (datetime.date.today() + datetime.timedelta(days=150)).replace(month=12, day=31)
|
||||
self.ex('''INSERT INTO public.dates SELECT
|
||||
self.ex(
|
||||
'''INSERT INTO public.dates SELECT
|
||||
the_date.the_date::date AS date,
|
||||
to_char(the_date.the_date, 'TMday') AS day,
|
||||
to_char(the_date.the_date, 'TMmonth') AS month
|
||||
|
@ -427,7 +449,8 @@ class WcsOlapFeeder(object):
|
|||
LEFT JOIN public.dates AS dates
|
||||
ON the_date.the_date = dates.date
|
||||
WHERE dates.date IS NULL''',
|
||||
vars=[first_date, last_date])
|
||||
vars=[first_date, last_date],
|
||||
)
|
||||
|
||||
def create_table(self, name, columns, inherits=None, comment=None, unlogged=True):
|
||||
if unlogged:
|
||||
|
@ -448,18 +471,21 @@ class WcsOlapFeeder(object):
|
|||
return self.cur.fetchone()[0]
|
||||
|
||||
def update_table_sequence_number(self, name):
|
||||
self.ex("""SELECT setval(pg_get_serial_sequence('{name}', 'id'),
|
||||
(SELECT GREATEST(1, MAX(id)) FROM {name}))""", ctx={'name': quote(name)})
|
||||
self.ex(
|
||||
"""SELECT setval(pg_get_serial_sequence('{name}', 'id'),
|
||||
(SELECT GREATEST(1, MAX(id)) FROM {name}))""",
|
||||
ctx={'name': quote(name)},
|
||||
)
|
||||
|
||||
def create_labeled_table_serial(self, name, comment):
|
||||
self.create_table(
|
||||
name, [['id', 'serial primary key'], ['label', 'varchar']], comment=comment, unlogged=False)
|
||||
name, [['id', 'serial primary key'], ['label', 'varchar']], comment=comment, unlogged=False
|
||||
)
|
||||
|
||||
if self.prev_table_exists(name):
|
||||
# Insert data from previous table
|
||||
self.ex(
|
||||
'INSERT INTO {schema_temp}.{name} SELECT * FROM {schema}.{name}',
|
||||
ctx={'name': quote(name)}
|
||||
'INSERT INTO {schema_temp}.{name} SELECT * FROM {schema}.{name}', ctx={'name': quote(name)}
|
||||
)
|
||||
self.update_table_sequence_number(name)
|
||||
|
||||
|
@ -476,12 +502,15 @@ class WcsOlapFeeder(object):
|
|||
self.ex(
|
||||
'SELECT ref, {column} FROM {name} WHERE ref = %s',
|
||||
ctx={'name': quote(name), 'column': quote(update_column)},
|
||||
vars=(ref,))
|
||||
vars=(ref,),
|
||||
)
|
||||
if self.cur.fetchall():
|
||||
for item in self.cur.fetchall():
|
||||
self.ex('UPDATE {name} SET {column}=%s WHERE ref=%s',
|
||||
ctx={'name': quote(name), 'column': quote(update_column)},
|
||||
vars=[item[1], ref])
|
||||
self.ex(
|
||||
'UPDATE {name} SET {column}=%s WHERE ref=%s',
|
||||
ctx={'name': quote(name), 'column': quote(update_column)},
|
||||
vars=[item[1], ref],
|
||||
)
|
||||
else:
|
||||
to_insert.append(item)
|
||||
if to_insert:
|
||||
|
@ -489,35 +518,32 @@ class WcsOlapFeeder(object):
|
|||
tmpl = ', '.join(['(DEFAULT, %s)' % columns_values] * len(data))
|
||||
|
||||
query = 'INSERT INTO {name} VALUES %s' % tmpl
|
||||
self.ex(query, ctx={'name': quote(name)}, # 'column': quote(update_column)},
|
||||
vars=list(itertools.chain(*to_insert)))
|
||||
self.ex(
|
||||
query,
|
||||
ctx={'name': quote(name)}, # 'column': quote(update_column)},
|
||||
vars=list(itertools.chain(*to_insert)),
|
||||
)
|
||||
|
||||
result = {}
|
||||
self.ex('SELECT id, {column} FROM {name}', ctx={'name': quote(name),
|
||||
'column': result_column})
|
||||
for _id, column in self.cur.fetchall():
|
||||
self.ex('SELECT id, {column} FROM {name}', ctx={'name': quote(name), 'column': result_column})
|
||||
for _id, column in self.cur.fetchall():
|
||||
result[column] = _id
|
||||
return result
|
||||
|
||||
def create_labeled_table(self, name, labels, comment=None):
|
||||
self.create_table(
|
||||
name,
|
||||
[
|
||||
['id', 'integer primary key'],
|
||||
['label', 'varchar']
|
||||
], comment=comment, unlogged=False)
|
||||
name, [['id', 'integer primary key'], ['label', 'varchar']], comment=comment, unlogged=False
|
||||
)
|
||||
|
||||
if self.prev_table_exists(name):
|
||||
# Insert data from previous table
|
||||
self.ex(
|
||||
'INSERT INTO {schema_temp}.{name} select * FROM {schema}.{name}',
|
||||
ctx={'name': quote(name)}
|
||||
'INSERT INTO {schema_temp}.{name} select * FROM {schema}.{name}', ctx={'name': quote(name)}
|
||||
)
|
||||
# Find what is missing
|
||||
to_insert = []
|
||||
for _id, _label in labels:
|
||||
self.ex(
|
||||
'SELECT * FROM {name} WHERE label = %s', ctx={'name': quote(name)}, vars=(_label,))
|
||||
self.ex('SELECT * FROM {name} WHERE label = %s', ctx={'name': quote(name)}, vars=(_label,))
|
||||
if self.cur.fetchone() is None:
|
||||
to_insert.append(_label)
|
||||
|
||||
|
@ -551,19 +577,19 @@ class WcsOlapFeeder(object):
|
|||
tmp_cat_map = self.do_referenced_data(table_name, categories_data, 'label')
|
||||
self.update_table_sequence_number(table_name)
|
||||
# remap categories ids to ids in the table
|
||||
return dict((c.title, tmp_cat_map[c.title]) for c in self.categories)
|
||||
return {c.title: tmp_cat_map[c.title] for c in self.categories}
|
||||
|
||||
def do_formdef_table(self):
|
||||
categories_mapping = self.do_category_table()
|
||||
|
||||
formdef_fields = [['category_id', 'integer REFERENCES {category_table} (id)'],
|
||||
['label', 'varchar']
|
||||
]
|
||||
formdef_fields = [['category_id', 'integer REFERENCES {category_table} (id)'], ['label', 'varchar']]
|
||||
table_name = self.default_ctx['form_table']
|
||||
self.create_referenced_table(table_name, formdef_fields, 'types de formulaire')
|
||||
|
||||
formdefs = [(formdef.slug, categories_mapping.get(formdef.schema.category),
|
||||
formdef.schema.name) for formdef in self.formdefs]
|
||||
formdefs = [
|
||||
(formdef.slug, categories_mapping.get(formdef.schema.category), formdef.schema.name)
|
||||
for formdef in self.formdefs
|
||||
]
|
||||
self.formdefs_mapping = self.do_referenced_data(table_name, formdefs, 'ref')
|
||||
self.formdefs_unique_slug = {}
|
||||
seen = set()
|
||||
|
@ -580,23 +606,19 @@ class WcsOlapFeeder(object):
|
|||
|
||||
def do_base_table(self):
|
||||
# channels
|
||||
self.create_labeled_table('channel', [[c[0], c[2]] for c in self.channels],
|
||||
comment='canal')
|
||||
self.create_labeled_table('channel', [[c[0], c[2]] for c in self.channels], comment='canal')
|
||||
|
||||
# roles
|
||||
roles = dict((i, role.name) for i, role in enumerate(self.roles))
|
||||
roles = {i: role.name for i, role in enumerate(self.roles)}
|
||||
tmp_role_map = self.create_labeled_table('role', roles.items(), comment='role')
|
||||
self.role_mapping = dict(
|
||||
(role.id, tmp_role_map[role.name]) for role in self.roles)
|
||||
self.role_mapping = {role.id: tmp_role_map[role.name] for role in self.roles}
|
||||
|
||||
# forms
|
||||
self.do_formdef_table()
|
||||
|
||||
self.create_labeled_table('hour', zip(range(0, 24), map(str, range(0, 24))),
|
||||
comment='heures')
|
||||
self.create_labeled_table('hour', zip(range(0, 24), map(str, range(0, 24))), comment='heures')
|
||||
|
||||
self.create_labeled_table('status', self.status,
|
||||
comment='statuts simplifiés')
|
||||
self.create_labeled_table('status', self.status, comment='statuts simplifiés')
|
||||
|
||||
# agents
|
||||
self.create_labeled_table_serial('agent', comment='agents')
|
||||
|
@ -630,14 +652,20 @@ class WcsOlapFeeder(object):
|
|||
self.ex('COMMENT ON COLUMN {generic_formdata_table}.%s IS %%s' % at, vars=(comment,))
|
||||
self.ex('COMMENT ON TABLE {generic_formdata_table} IS %s', vars=('tous les formulaires',))
|
||||
# evolutions
|
||||
self.create_table('{generic_evolution_table}', [
|
||||
['id', 'serial primary key'],
|
||||
['generic_status_id', 'smallint REFERENCES {generic_status_table} (id)'],
|
||||
['formdata_id', 'integer'], # "REFERENCES {generic_formdata_table} (id)" is impossible because FK constraints do not work on inherited tables
|
||||
['time', 'timestamp'],
|
||||
['date', 'date'],
|
||||
['hour_id', 'smallint REFERENCES {hour_table} (id)'],
|
||||
])
|
||||
self.create_table(
|
||||
'{generic_evolution_table}',
|
||||
[
|
||||
['id', 'serial primary key'],
|
||||
['generic_status_id', 'smallint REFERENCES {generic_status_table} (id)'],
|
||||
[
|
||||
'formdata_id',
|
||||
'integer',
|
||||
], # "REFERENCES {generic_formdata_table} (id)" is impossible because FK constraints do not work on inherited tables
|
||||
['time', 'timestamp'],
|
||||
['date', 'date'],
|
||||
['hour_id', 'smallint REFERENCES {hour_table} (id)'],
|
||||
],
|
||||
)
|
||||
self.ex('COMMENT ON TABLE {generic_evolution_table} IS %s', vars=('evolution générique',))
|
||||
|
||||
def feed(self):
|
||||
|
@ -678,37 +706,38 @@ class WcsOlapFeeder(object):
|
|||
|
||||
def switch_schemas(self):
|
||||
self.old_schema = truncate_pg_identifier(
|
||||
self.schema
|
||||
+ '_old_'
|
||||
+ datetime.datetime.now().strftime('%Y%m%d%H%M'))
|
||||
self.schema + '_old_' + datetime.datetime.now().strftime('%Y%m%d%H%M')
|
||||
)
|
||||
self.ctx.push({'old_schema': self.old_schema})
|
||||
|
||||
try:
|
||||
|
||||
def switch():
|
||||
with self.atomic():
|
||||
self.logger.info('renaming schema %s to %s and schema %s to %s',
|
||||
self.schema, self.old_schema,
|
||||
self.schema_temp, self.schema)
|
||||
self.logger.info(
|
||||
'renaming schema %s to %s and schema %s to %s',
|
||||
self.schema,
|
||||
self.old_schema,
|
||||
self.schema_temp,
|
||||
self.schema,
|
||||
)
|
||||
if self.schema_exists(self.schema):
|
||||
self.ex('ALTER SCHEMA {schema} RENAME TO {old_schema}')
|
||||
self.ex('ALTER SCHEMA {schema_temp} RENAME TO {schema}')
|
||||
self.retry_on_db_error(
|
||||
switch,
|
||||
times=33,
|
||||
sleep_time=1)
|
||||
|
||||
self.retry_on_db_error(switch, times=33, sleep_time=1)
|
||||
except Exception:
|
||||
self.logger.warning('failed to switch schemas 3-times in 33 seconds')
|
||||
raise
|
||||
|
||||
try:
|
||||
|
||||
def drop_old_schema():
|
||||
self.logger.info('dropping schema %s', self.old_schema)
|
||||
self.drop_tables_sequencially(self.old_schema)
|
||||
self.ex('DROP SCHEMA IF EXISTS {old_schema} CASCADE')
|
||||
self.retry_on_db_error(
|
||||
drop_old_schema,
|
||||
times=33,
|
||||
sleep_time=1)
|
||||
|
||||
self.retry_on_db_error(drop_old_schema, times=33, sleep_time=1)
|
||||
except Exception:
|
||||
self.logger.exception('could not drop schema %s', self.old_schema)
|
||||
|
||||
|
@ -730,10 +759,14 @@ class WcsOlapFeeder(object):
|
|||
def create_formdata_json_index(self, table_name, varname):
|
||||
if varname in self.formdata_json_index:
|
||||
return
|
||||
index_name = self.hash_table_name('%s_%s_json_idx' % (table_name, varname), hash_length=8,
|
||||
force_hash=True)
|
||||
self.ex('CREATE INDEX {index_name} ON {generic_formdata_table} (("json_data"->>%s))',
|
||||
ctx={'index_name': quote(index_name)}, vars=[varname])
|
||||
index_name = self.hash_table_name(
|
||||
'%s_%s_json_idx' % (table_name, varname), hash_length=8, force_hash=True
|
||||
)
|
||||
self.ex(
|
||||
'CREATE INDEX {index_name} ON {generic_formdata_table} (("json_data"->>%s))',
|
||||
ctx={'index_name': quote(index_name)},
|
||||
vars=[varname],
|
||||
)
|
||||
# prevent double creation
|
||||
self.formdata_json_index.append(varname)
|
||||
|
||||
|
@ -742,7 +775,7 @@ def has_digits_validation(field):
|
|||
return field.validation and field.validation.get('type') == 'digits'
|
||||
|
||||
|
||||
class WcsFormdefFeeder(object):
|
||||
class WcsFormdefFeeder:
|
||||
def __init__(self, olap_feeder, formdef, do_feed=True):
|
||||
self.olap_feeder = olap_feeder
|
||||
self.formdef = formdef
|
||||
|
@ -771,26 +804,31 @@ class WcsFormdefFeeder(object):
|
|||
def do_statuses(self):
|
||||
statuses = self.formdef.schema.workflow.statuses
|
||||
tmp_status_map = self.olap_feeder.create_labeled_table(
|
||||
self.status_table_name, enumerate([s.name for s in statuses]),
|
||||
comment='statuts du formulaire « %s »' % self.formdef.schema.name)
|
||||
self.status_mapping = dict((s.id, tmp_status_map[s.name]) for s in statuses)
|
||||
self.status_table_name,
|
||||
enumerate([s.name for s in statuses]),
|
||||
comment='statuts du formulaire « %s »' % self.formdef.schema.name,
|
||||
)
|
||||
self.status_mapping = {s.id: tmp_status_map[s.name] for s in statuses}
|
||||
|
||||
def do_data_table(self):
|
||||
columns = OrderedDict()
|
||||
columns['status_id'] = {'sql_col_name': 'status_id', 'sql_col_def': 'smallint REFERENCES {status_table} (id)'}
|
||||
columns['status_id'] = {
|
||||
'sql_col_name': 'status_id',
|
||||
'sql_col_def': 'smallint REFERENCES {status_table} (id)',
|
||||
}
|
||||
|
||||
# add item fields
|
||||
for field in self.good_fields.values():
|
||||
if field.type == 'item':
|
||||
comment = ('valeurs du champ « %s » du formulaire %s'
|
||||
% (field.label, self.formdef.schema.name))
|
||||
comment = 'valeurs du champ « %s » du formulaire %s' % (field.label, self.formdef.schema.name)
|
||||
table_name = self.hash_table_name('%s_field_%s' % (self.table_name, field.varname))
|
||||
# create table and mapping
|
||||
if field.items:
|
||||
# filter non string items
|
||||
items = [item for item in field.items if isinstance(item, str)]
|
||||
self.items_mappings[field.varname] = self.create_labeled_table(
|
||||
table_name, enumerate(items), comment=comment)
|
||||
table_name, enumerate(items), comment=comment
|
||||
)
|
||||
else:
|
||||
# open item field, from data sources...
|
||||
self.create_labeled_table_serial(table_name, comment=comment)
|
||||
|
@ -839,15 +877,19 @@ class WcsFormdefFeeder(object):
|
|||
self.columns.append(columns[key]['sql_col_name'])
|
||||
self.columns.remove('geolocation_base')
|
||||
|
||||
self.create_table(self.table_name,
|
||||
[(columns[key]['sql_col_name'], columns[key]['sql_col_def']) for key in columns],
|
||||
inherits='{generic_formdata_table}',
|
||||
comment='formulaire %s' % self.formdef.schema.name)
|
||||
self.create_table(
|
||||
self.table_name,
|
||||
[(columns[key]['sql_col_name'], columns[key]['sql_col_def']) for key in columns],
|
||||
inherits='{generic_formdata_table}',
|
||||
comment='formulaire %s' % self.formdef.schema.name,
|
||||
)
|
||||
for key in columns:
|
||||
column = columns[key]
|
||||
if column.get('sql_comment'):
|
||||
self.ex('COMMENT ON COLUMN {formdata_table}.%s IS %%s' % quote(column['sql_col_name']),
|
||||
vars=(column['sql_comment'],))
|
||||
self.ex(
|
||||
'COMMENT ON COLUMN {formdata_table}.%s IS %%s' % quote(column['sql_col_name']),
|
||||
vars=(column['sql_comment'],),
|
||||
)
|
||||
|
||||
# Creat index for JSON fields
|
||||
if self.has_jsonb:
|
||||
|
@ -864,26 +906,37 @@ class WcsFormdefFeeder(object):
|
|||
self.ex('ALTER TABLE {formdata_table} ADD CONSTRAINT %s' % constraint)
|
||||
self.ex('ALTER TABLE {formdata_table} ADD PRIMARY KEY (id)')
|
||||
# table des evolutions
|
||||
self.create_table(self.evolution_table_name, [
|
||||
['id', 'serial primary key'],
|
||||
['status_id', 'smallint REFERENCES {status_table} (id)'],
|
||||
['formdata_id', 'integer REFERENCES {formdata_table} (id)'],
|
||||
['time', 'timestamp'],
|
||||
['date', 'date'],
|
||||
['hour_id', 'smallint REFERENCES {hour_table} (id)'],
|
||||
])
|
||||
self.ex('COMMENT ON TABLE {evolution_table} IS %s',
|
||||
vars=('evolution des demandes %s' % self.formdef.schema.name,))
|
||||
self.create_table(
|
||||
self.evolution_table_name,
|
||||
[
|
||||
['id', 'serial primary key'],
|
||||
['status_id', 'smallint REFERENCES {status_table} (id)'],
|
||||
['formdata_id', 'integer REFERENCES {formdata_table} (id)'],
|
||||
['time', 'timestamp'],
|
||||
['date', 'date'],
|
||||
['hour_id', 'smallint REFERENCES {hour_table} (id)'],
|
||||
],
|
||||
)
|
||||
self.ex(
|
||||
'COMMENT ON TABLE {evolution_table} IS %s',
|
||||
vars=('evolution des demandes %s' % self.formdef.schema.name,),
|
||||
)
|
||||
|
||||
def insert_item_value(self, field, value):
|
||||
table_name = self.hash_table_name('%s_field_%s' % (self.table_name, field.varname))
|
||||
self.ex('SELECT id FROM {item_table} WHERE label = %s',
|
||||
ctx={'item_table': quote(table_name)}, vars=(value,))
|
||||
self.ex(
|
||||
'SELECT id FROM {item_table} WHERE label = %s',
|
||||
ctx={'item_table': quote(table_name)},
|
||||
vars=(value,),
|
||||
)
|
||||
res = self.cur.fetchone()
|
||||
if res:
|
||||
return res[0]
|
||||
self.ex('INSERT INTO {item_table} (label) VALUES (%s) RETURNING (id)', vars=(value,),
|
||||
ctx={'item_table': quote(table_name)})
|
||||
self.ex(
|
||||
'INSERT INTO {item_table} (label) VALUES (%s) RETURNING (id)',
|
||||
vars=(value,),
|
||||
ctx={'item_table': quote(table_name)},
|
||||
)
|
||||
return self.cur.fetchone()[0]
|
||||
|
||||
def get_item_id(self, field, value):
|
||||
|
@ -930,8 +983,9 @@ class WcsFormdefFeeder(object):
|
|||
try:
|
||||
status = data.formdef.schema.workflow.statuses_map[status_id]
|
||||
except KeyError:
|
||||
self.logger.warning('%s.%s unknown status status_id %s',
|
||||
data.formdef.schema.name, data.id, status_id)
|
||||
self.logger.warning(
|
||||
'%s.%s unknown status status_id %s', data.formdef.schema.name, data.id, status_id
|
||||
)
|
||||
continue
|
||||
|
||||
channel = data.submission.channel.lower()
|
||||
|
@ -1022,17 +1076,16 @@ class WcsFormdefFeeder(object):
|
|||
try:
|
||||
status = data.formdef.schema.workflow.statuses_map[evo.status]
|
||||
except KeyError:
|
||||
self.logger.warning('%s.%s unknown status in evolution %s',
|
||||
data.formdef.schema.name, data.id, evo.status)
|
||||
self.logger.warning(
|
||||
'%s.%s unknown status in evolution %s', data.formdef.schema.name, data.id, evo.status
|
||||
)
|
||||
continue
|
||||
status_id = self.status_mapping[status.id]
|
||||
generic_status_id = self.generic_status(status)
|
||||
evolution.append(
|
||||
[0, status_id, evo.time, evo.time.date(), evo.time.hour])
|
||||
evolution.append([0, status_id, evo.time, evo.time.date(), evo.time.hour])
|
||||
if generic_status_id == last_status:
|
||||
continue
|
||||
generic_evolution.append(
|
||||
[0, generic_status_id, evo.time, evo.time.date(), evo.time.hour])
|
||||
generic_evolution.append([0, generic_status_id, evo.time, evo.time.date(), evo.time.hour])
|
||||
last_status = generic_status_id
|
||||
generic_evolution_values.append(generic_evolution)
|
||||
evolution_values.append(evolution)
|
||||
|
@ -1041,12 +1094,11 @@ class WcsFormdefFeeder(object):
|
|||
return
|
||||
insert_columns = ['%s' % quote(column) for column in self.columns[1:]]
|
||||
insert_columns = ', '.join(insert_columns)
|
||||
self.ex('INSERT INTO {formdata_table} ({columns}) VALUES {values} RETURNING id',
|
||||
ctx=dict(
|
||||
columns=insert_columns,
|
||||
values=', '.join(['%s'] * len(values))
|
||||
),
|
||||
vars=values)
|
||||
self.ex(
|
||||
'INSERT INTO {formdata_table} ({columns}) VALUES {values} RETURNING id',
|
||||
ctx=dict(columns=insert_columns, values=', '.join(['%s'] * len(values))),
|
||||
vars=values,
|
||||
)
|
||||
|
||||
# insert generic evolutions
|
||||
generic_evolutions = []
|
||||
|
@ -1056,14 +1108,24 @@ class WcsFormdefFeeder(object):
|
|||
row[0] = formdata_id
|
||||
generic_evolutions.append(tuple(row))
|
||||
if len(generic_evolutions) == 500:
|
||||
self.ex('INSERT INTO {generic_evolution_table} (%s) VALUES %s' % (
|
||||
', '.join(['formdata_id', 'generic_status_id', 'time', 'date', 'hour_id']),
|
||||
', '.join(['%s'] * len(generic_evolutions))), vars=generic_evolutions)
|
||||
self.ex(
|
||||
'INSERT INTO {generic_evolution_table} (%s) VALUES %s'
|
||||
% (
|
||||
', '.join(['formdata_id', 'generic_status_id', 'time', 'date', 'hour_id']),
|
||||
', '.join(['%s'] * len(generic_evolutions)),
|
||||
),
|
||||
vars=generic_evolutions,
|
||||
)
|
||||
generic_evolutions = []
|
||||
if generic_evolutions:
|
||||
self.ex('INSERT INTO {generic_evolution_table} (%s) VALUES %s' % (
|
||||
', '.join(['formdata_id', 'generic_status_id', 'time', 'date', 'hour_id']),
|
||||
', '.join(['%s'] * len(generic_evolutions))), vars=generic_evolutions)
|
||||
self.ex(
|
||||
'INSERT INTO {generic_evolution_table} (%s) VALUES %s'
|
||||
% (
|
||||
', '.join(['formdata_id', 'generic_status_id', 'time', 'date', 'hour_id']),
|
||||
', '.join(['%s'] * len(generic_evolutions)),
|
||||
),
|
||||
vars=generic_evolutions,
|
||||
)
|
||||
|
||||
# insert evolutions
|
||||
evolutions = []
|
||||
|
@ -1072,14 +1134,24 @@ class WcsFormdefFeeder(object):
|
|||
row[0] = formdata_id
|
||||
evolutions.append(tuple(row))
|
||||
if len(evolutions) == 500:
|
||||
self.ex('INSERT INTO {evolution_table} (%s) VALUES %s' % (
|
||||
', '.join(['formdata_id', 'status_id', 'time', 'date', 'hour_id']),
|
||||
', '.join(['%s'] * len(evolutions))), vars=evolutions)
|
||||
self.ex(
|
||||
'INSERT INTO {evolution_table} (%s) VALUES %s'
|
||||
% (
|
||||
', '.join(['formdata_id', 'status_id', 'time', 'date', 'hour_id']),
|
||||
', '.join(['%s'] * len(evolutions)),
|
||||
),
|
||||
vars=evolutions,
|
||||
)
|
||||
evolutions = []
|
||||
if evolutions:
|
||||
self.ex('INSERT INTO {evolution_table} (%s) VALUES %s' % (
|
||||
', '.join(['formdata_id', 'status_id', 'time', 'date', 'hour_id']),
|
||||
', '.join(['%s'] * len(evolutions))), vars=evolutions)
|
||||
self.ex(
|
||||
'INSERT INTO {evolution_table} (%s) VALUES %s'
|
||||
% (
|
||||
', '.join(['formdata_id', 'status_id', 'time', 'date', 'hour_id']),
|
||||
', '.join(['%s'] * len(evolutions)),
|
||||
),
|
||||
vars=evolutions,
|
||||
)
|
||||
|
||||
def get_first_agent_in_evolution(self, formdata):
|
||||
for evo in formdata.evolution:
|
||||
|
@ -1087,11 +1159,13 @@ class WcsFormdefFeeder(object):
|
|||
return self.get_agent(evo.who)
|
||||
|
||||
def feed(self):
|
||||
self.olap_feeder.ctx.push({
|
||||
'formdata_table': quote(self.table_name),
|
||||
'status_table': quote(self.status_table_name),
|
||||
'evolution_table': quote(self.evolution_table_name),
|
||||
})
|
||||
self.olap_feeder.ctx.push(
|
||||
{
|
||||
'formdata_table': quote(self.table_name),
|
||||
'status_table': quote(self.status_table_name),
|
||||
'evolution_table': quote(self.evolution_table_name),
|
||||
}
|
||||
)
|
||||
|
||||
# create cube
|
||||
cube = self.cube = copy.deepcopy(self.base_cube)
|
||||
|
@ -1102,51 +1176,62 @@ class WcsFormdefFeeder(object):
|
|||
|
||||
# remove json field from formdef cubes
|
||||
cube.pop('json_field', None)
|
||||
cube.update({
|
||||
'name': self.table_name,
|
||||
'label': self.formdef.schema.name,
|
||||
'fact_table': quote(self.table_name),
|
||||
'key': 'id',
|
||||
})
|
||||
cube['dimensions'] = [dimension for dimension in cube['dimensions']
|
||||
if dimension['name'] not in ('category', 'formdef')]
|
||||
cube.update(
|
||||
{
|
||||
'name': self.table_name,
|
||||
'label': self.formdef.schema.name,
|
||||
'fact_table': quote(self.table_name),
|
||||
'key': 'id',
|
||||
}
|
||||
)
|
||||
cube['dimensions'] = [
|
||||
dimension for dimension in cube['dimensions'] if dimension['name'] not in ('category', 'formdef')
|
||||
]
|
||||
|
||||
# add dimension for status
|
||||
cube['joins'].append({
|
||||
'name': 'status',
|
||||
'table': quote(self.status_table_name),
|
||||
'master': 'status_id',
|
||||
'detail': 'id',
|
||||
'kind': 'left',
|
||||
})
|
||||
cube['dimensions'].append({
|
||||
'name': 'status',
|
||||
'label': 'statut',
|
||||
'join': ['status'],
|
||||
'type': 'integer',
|
||||
'value': 'status.id',
|
||||
'value_label': 'status.label',
|
||||
})
|
||||
cube['joins'].append(
|
||||
{
|
||||
'name': 'status',
|
||||
'table': quote(self.status_table_name),
|
||||
'master': 'status_id',
|
||||
'detail': 'id',
|
||||
'kind': 'left',
|
||||
}
|
||||
)
|
||||
cube['dimensions'].append(
|
||||
{
|
||||
'name': 'status',
|
||||
'label': 'statut',
|
||||
'join': ['status'],
|
||||
'type': 'integer',
|
||||
'value': 'status.id',
|
||||
'value_label': 'status.label',
|
||||
}
|
||||
)
|
||||
|
||||
# add dimension for function
|
||||
for function, name in self.formdef.schema.workflow.functions.items():
|
||||
at = 'function_%s' % slugify(function)
|
||||
cube['joins'].append({
|
||||
'name': at,
|
||||
'table': 'role',
|
||||
'master': quote(at),
|
||||
'detail': 'id',
|
||||
'kind': 'left',
|
||||
})
|
||||
cube['dimensions'].append({
|
||||
'name': at,
|
||||
'label': 'fonction %s' % name.lower(),
|
||||
'join': [at],
|
||||
'type': 'integer',
|
||||
'value': '%s.id' % quote(at),
|
||||
'value_label': '%s.label' % quote(at),
|
||||
'filter': False,
|
||||
})
|
||||
cube['joins'].append(
|
||||
{
|
||||
'name': at,
|
||||
'table': 'role',
|
||||
'master': quote(at),
|
||||
'detail': 'id',
|
||||
'kind': 'left',
|
||||
}
|
||||
)
|
||||
cube['dimensions'].append(
|
||||
{
|
||||
'name': at,
|
||||
'label': 'fonction %s' % name.lower(),
|
||||
'join': [at],
|
||||
'type': 'integer',
|
||||
'value': '%s.id' % quote(at),
|
||||
'value_label': '%s.label' % quote(at),
|
||||
'filter': False,
|
||||
}
|
||||
)
|
||||
|
||||
# add dimensions for item fields
|
||||
fields = self.formdef.schema.fields
|
||||
|
@ -1171,7 +1256,8 @@ class WcsFormdefFeeder(object):
|
|||
add_warning(
|
||||
'Le champ « %(label)s » a un nom de variable dupliqué « %(varname)s » '
|
||||
'mais pas le même type que « %(label_good)s », tous les champs avec ce nom seront ignorés '
|
||||
'(%(type)s != %(type_good)s).' % {
|
||||
'(%(type)s != %(type_good)s).'
|
||||
% {
|
||||
'label': field.label,
|
||||
'varname': field.varname,
|
||||
'type': field.type,
|
||||
|
@ -1183,8 +1269,10 @@ class WcsFormdefFeeder(object):
|
|||
bad_varnames.add(field.varname)
|
||||
continue
|
||||
if field.varname in bad_varnames:
|
||||
add_warning('Le champ « %s » est un doublon d\'un champ de type différent, il a été ignoré.'
|
||||
% field.label)
|
||||
add_warning(
|
||||
'Le champ « %s » est un doublon d\'un champ de type différent, il a été ignoré.'
|
||||
% field.label
|
||||
)
|
||||
continue
|
||||
self.good_fields[field.varname] = field
|
||||
|
||||
|
@ -1221,8 +1309,9 @@ class WcsFormdefFeeder(object):
|
|||
'type': 'bool',
|
||||
'value': quote(field_name),
|
||||
'value_label': '(CASE WHEN %(field)s IS NULL THEN NULL'
|
||||
' WHEN %(field)s THEN \'Oui\''
|
||||
' ELSE \'Non\' END)' % {
|
||||
' WHEN %(field)s THEN \'Oui\''
|
||||
' ELSE \'Non\' END)'
|
||||
% {
|
||||
'field': quote(field_name),
|
||||
},
|
||||
'filter': True,
|
||||
|
@ -1230,12 +1319,14 @@ class WcsFormdefFeeder(object):
|
|||
elif field.type == 'string':
|
||||
if has_digits_validation(field):
|
||||
# we will define a SUM measure instead
|
||||
cube['measures'].append({
|
||||
'name': 'sum_' + dimension_name,
|
||||
'label': 'total du champ « %s »' % dimension_label,
|
||||
'type': 'integer',
|
||||
'expression': 'SUM({fact_table}.%s)' % quote(field_name),
|
||||
})
|
||||
cube['measures'].append(
|
||||
{
|
||||
'name': 'sum_' + dimension_name,
|
||||
'label': 'total du champ « %s »' % dimension_label,
|
||||
'type': 'integer',
|
||||
'expression': 'SUM({fact_table}.%s)' % quote(field_name),
|
||||
}
|
||||
)
|
||||
continue
|
||||
else:
|
||||
dimension = {
|
||||
|
|
|
@ -1,9 +1,9 @@
|
|||
import urllib.parse as urlparse
|
||||
import datetime
|
||||
import base64
|
||||
import hmac
|
||||
import datetime
|
||||
import hashlib
|
||||
import hmac
|
||||
import random
|
||||
import urllib.parse as urlparse
|
||||
|
||||
'''Simple signature scheme for query strings'''
|
||||
# from http://repos.entrouvert.org/portail-citoyen.git/tree/portail_citoyen/apps/data_source_plugin/signature.py
|
||||
|
@ -24,10 +24,7 @@ def sign_query(query, key, algo='sha256', timestamp=None, nonce=None):
|
|||
new_query = query
|
||||
if new_query:
|
||||
new_query += '&'
|
||||
new_query += urlparse.urlencode((
|
||||
('algo', algo),
|
||||
('timestamp', timestamp),
|
||||
('nonce', nonce)))
|
||||
new_query += urlparse.urlencode((('algo', algo), ('timestamp', timestamp), ('nonce', nonce)))
|
||||
signature = base64.b64encode(sign_string(new_query, key, algo=algo))
|
||||
new_query += '&signature=' + urlparse.quote(signature)
|
||||
return new_query
|
||||
|
@ -50,8 +47,7 @@ def check_url(url, key, known_nonce=None, timedelta=30):
|
|||
|
||||
def check_query(query, key, known_nonce=None, timedelta=30):
|
||||
parsed = urlparse.parse_qs(query)
|
||||
if not ('signature' in parsed and 'algo' in parsed
|
||||
and 'timestamp' in parsed and 'nonce' in parsed):
|
||||
if not ('signature' in parsed and 'algo' in parsed and 'timestamp' in parsed and 'nonce' in parsed):
|
||||
return False
|
||||
unsigned_query, signature_content = query.split('&signature=', 1)
|
||||
if '&' in signature_content:
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
class Whatever(object):
|
||||
class Whatever:
|
||||
def __call__(*args, **kwargs):
|
||||
pass
|
||||
|
||||
|
|
|
@ -14,19 +14,18 @@
|
|||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import collections
|
||||
import base64
|
||||
import copy
|
||||
import logging
|
||||
import datetime
|
||||
import collections
|
||||
import contextlib
|
||||
import copy
|
||||
import datetime
|
||||
import json
|
||||
|
||||
import requests
|
||||
import isodate
|
||||
|
||||
import logging
|
||||
import urllib.parse as urlparse
|
||||
|
||||
import isodate
|
||||
import requests
|
||||
|
||||
from . import signature
|
||||
|
||||
|
||||
|
@ -34,7 +33,7 @@ class WcsApiError(Exception):
|
|||
pass
|
||||
|
||||
|
||||
class JSONFile(object):
|
||||
class JSONFile:
|
||||
def __init__(self, d):
|
||||
self.d = d
|
||||
|
||||
|
@ -62,7 +61,7 @@ def to_dict(o):
|
|||
return o
|
||||
|
||||
|
||||
class BaseObject(object):
|
||||
class BaseObject:
|
||||
def __init__(self, wcs_api, **kwargs):
|
||||
self._wcs_api = wcs_api
|
||||
self.__dict__.update(**kwargs)
|
||||
|
@ -82,7 +81,7 @@ class FormDataWorkflow(BaseObject):
|
|||
fields = None
|
||||
|
||||
def __init__(self, wcs_api, **kwargs):
|
||||
super(FormDataWorkflow, self).__init__(wcs_api, **kwargs)
|
||||
super().__init__(wcs_api, **kwargs)
|
||||
self.status = BaseObject(wcs_api, **self.status) if self.status else None
|
||||
self.real_status = BaseObject(wcs_api, **self.real_status) if self.real_status else None
|
||||
self.fields = self.fields or {}
|
||||
|
@ -101,7 +100,7 @@ class Evolution(BaseObject):
|
|||
parts = None
|
||||
|
||||
def __init__(self, wcs_api, **kwargs):
|
||||
super(Evolution, self).__init__(wcs_api, **kwargs)
|
||||
super().__init__(wcs_api, **kwargs)
|
||||
self.time = isodate.parse_datetime(self.time)
|
||||
self.parts = [BaseObject(wcs_api, **part) for part in self.parts or []]
|
||||
self.who = EvolutionUser(wcs_api, **self.who) if self.who else None
|
||||
|
@ -117,7 +116,7 @@ class FormData(BaseObject):
|
|||
|
||||
def __init__(self, wcs_api, forms, **kwargs):
|
||||
self.forms = forms
|
||||
super(FormData, self).__init__(wcs_api, **kwargs)
|
||||
super().__init__(wcs_api, **kwargs)
|
||||
self.receipt_time = isodate.parse_datetime(self.receipt_time)
|
||||
self.submission = BaseObject(wcs_api, **self.submission or {})
|
||||
self.workflow = FormDataWorkflow(wcs_api, **self.workflow or {})
|
||||
|
@ -157,7 +156,7 @@ class FormData(BaseObject):
|
|||
@property
|
||||
def endpoint_delay(self):
|
||||
'''Compute delay as the time when the last not endpoint status precedes an endpoint
|
||||
status.'''
|
||||
status.'''
|
||||
statuses_map = self.formdef.schema.workflow.statuses_map
|
||||
s = 0
|
||||
for evo in self.evolution[::-1]:
|
||||
|
@ -188,13 +187,13 @@ class Workflow(BaseObject):
|
|||
fields = None
|
||||
|
||||
def __init__(self, wcs_api, **kwargs):
|
||||
super(Workflow, self).__init__(wcs_api, **kwargs)
|
||||
super().__init__(wcs_api, **kwargs)
|
||||
self.statuses = [BaseObject(wcs_api, **v) for v in (self.statuses or [])]
|
||||
assert not hasattr(self.statuses[0], 'startpoint'), 'startpoint is exported by w.c.s. FIXME'
|
||||
for status in self.statuses:
|
||||
status.startpoint = False
|
||||
self.statuses[0].startpoint = True
|
||||
self.statuses_map = dict((s.id, s) for s in self.statuses)
|
||||
self.statuses_map = {s.id: s for s in self.statuses}
|
||||
self.fields = [Field(wcs_api, **field) for field in (self.fields or [])]
|
||||
|
||||
|
||||
|
@ -214,13 +213,13 @@ class Schema(BaseObject):
|
|||
geolocations = None
|
||||
|
||||
def __init__(self, wcs_api, **kwargs):
|
||||
super(Schema, self).__init__(wcs_api, **kwargs)
|
||||
super().__init__(wcs_api, **kwargs)
|
||||
self.workflow = Workflow(wcs_api, **self.workflow)
|
||||
self.fields = [Field(wcs_api, **f) for f in self.fields or []]
|
||||
self.geolocations = sorted((k, v) for k, v in (self.geolocations or {}).items())
|
||||
|
||||
|
||||
class FormDatas(object):
|
||||
class FormDatas:
|
||||
def __init__(self, wcs_api, formdef, full=False, anonymize=False):
|
||||
self.wcs_api = wcs_api
|
||||
self.formdef = formdef
|
||||
|
@ -231,13 +230,14 @@ class FormDatas(object):
|
|||
def __getitem__(self, slice_or_id):
|
||||
# get batch of forms
|
||||
if isinstance(slice_or_id, slice):
|
||||
|
||||
def helper():
|
||||
if slice_or_id.stop <= slice_or_id.start or slice_or_id.step:
|
||||
raise ValueError('invalid slice %s' % slice_or_id)
|
||||
offset = slice_or_id.start
|
||||
limit = slice_or_id.stop - slice_or_id.start
|
||||
|
||||
url_parts = ['api/forms/{self.formdef.slug}/list'.format(self=self)]
|
||||
url_parts = [f'api/forms/{self.formdef.slug}/list']
|
||||
query = {}
|
||||
query['full'] = 'on' if self._full else 'off'
|
||||
if offset:
|
||||
|
@ -252,10 +252,11 @@ class FormDatas(object):
|
|||
if not d.get('receipt_time'):
|
||||
continue
|
||||
yield FormData(wcs_api=self.wcs_api, forms=self, formdef=self.formdef, **d)
|
||||
|
||||
return helper()
|
||||
# or get one form
|
||||
else:
|
||||
url_parts = ['api/forms/{formdef.slug}/{id}/'.format(formdef=self.formdef, id=slice_or_id)]
|
||||
url_parts = [f'api/forms/{self.formdef.slug}/{slice_or_id}/']
|
||||
if self.anonymize:
|
||||
url_parts.append('?anonymise=true')
|
||||
d = self.wcs_api.get_json(*url_parts)
|
||||
|
@ -282,7 +283,7 @@ class FormDatas(object):
|
|||
start = 0
|
||||
while True:
|
||||
empty = True
|
||||
for formdef in self[start:start + self.batch_size]:
|
||||
for formdef in self[start : start + self.batch_size]:
|
||||
empty = False
|
||||
yield formdef
|
||||
if empty:
|
||||
|
@ -290,14 +291,14 @@ class FormDatas(object):
|
|||
start += self.batch_size
|
||||
|
||||
def __len__(self):
|
||||
return len(list((o for o in self)))
|
||||
return len(list(o for o in self))
|
||||
|
||||
|
||||
class CancelSubmitError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class FormDefSubmit(object):
|
||||
class FormDefSubmit:
|
||||
formdef = None
|
||||
data = None
|
||||
user_email = None
|
||||
|
@ -346,16 +347,13 @@ class FormDefSubmit(object):
|
|||
if value is None or value == {} or value == []:
|
||||
self.data.pop(varname, None)
|
||||
elif hasattr(self, '_set_type_%s' % field.type):
|
||||
getattr(self, '_set_type_%s' % field.type)(
|
||||
varname=varname,
|
||||
field=field,
|
||||
value=value, **kwargs)
|
||||
getattr(self, '_set_type_%s' % field.type)(varname=varname, field=field, value=value, **kwargs)
|
||||
else:
|
||||
self.data[varname] = value
|
||||
|
||||
def _set_type_item(self, varname, field, value, **kwargs):
|
||||
if isinstance(value, dict):
|
||||
if not set(value).issuperset(set(['id', 'text'])):
|
||||
if not set(value).issuperset({'id', 'text'}):
|
||||
raise ValueError('item field value must have id and text value')
|
||||
# clean previous values
|
||||
self.data.pop(varname, None)
|
||||
|
@ -378,7 +376,7 @@ class FormDefSubmit(object):
|
|||
has_dict = False
|
||||
for choice in value:
|
||||
if isinstance(value, dict):
|
||||
if not set(value).issuperset(set(['id', 'text'])):
|
||||
if not set(value).issuperset({'id', 'text'}):
|
||||
raise ValueError('items field values must have id and text value')
|
||||
has_dict = True
|
||||
if has_dict:
|
||||
|
@ -407,7 +405,7 @@ class FormDefSubmit(object):
|
|||
elif isinstance(value, bytes):
|
||||
content = base64.b64encode(value).decode('ascii')
|
||||
elif isinstance(value, dict):
|
||||
if not set(value).issuperset(set(['filename', 'content'])):
|
||||
if not set(value).issuperset({'filename', 'content'}):
|
||||
raise ValueError('file field needs a dict value with filename and content')
|
||||
content = value['content']
|
||||
filename = value['filename']
|
||||
|
@ -432,7 +430,7 @@ class FormDefSubmit(object):
|
|||
def _set_type_map(self, varname, field, value):
|
||||
if not isinstance(value, dict):
|
||||
raise TypeError('value must be a dict for a map field')
|
||||
if set(value) != set(['lat', 'lon']):
|
||||
if set(value) != {'lat', 'lon'}:
|
||||
raise ValueError('map field expect keys lat and lon')
|
||||
self.data[varname] = value
|
||||
|
||||
|
@ -470,22 +468,19 @@ class FormDef(BaseObject):
|
|||
@property
|
||||
def schema(self):
|
||||
if not hasattr(self, '_schema'):
|
||||
d = self._wcs_api.get_json('api/formdefs/{self.slug}/schema'.format(self=self))
|
||||
d = self._wcs_api.get_json(f'api/formdefs/{self.slug}/schema')
|
||||
self._schema = Schema(self._wcs_api, **d)
|
||||
return self._schema
|
||||
|
||||
@contextlib.contextmanager
|
||||
def submit(self, **kwargs):
|
||||
submitter = FormDefSubmit(
|
||||
wcs_api=self._wcs_api,
|
||||
formdef=self,
|
||||
**kwargs)
|
||||
submitter = FormDefSubmit(wcs_api=self._wcs_api, formdef=self, **kwargs)
|
||||
try:
|
||||
yield submitter
|
||||
except CancelSubmitError:
|
||||
return
|
||||
payload = submitter.payload()
|
||||
d = self._wcs_api.post_json(payload, 'api/formdefs/{self.slug}/submit'.format(self=self))
|
||||
d = self._wcs_api.post_json(payload, f'api/formdefs/{self.slug}/submit')
|
||||
if d['err'] != 0:
|
||||
raise WcsApiError('submited returned an error: %s' % d)
|
||||
submitter.result = BaseObject(self._wcs_api, **d['data'])
|
||||
|
@ -499,7 +494,7 @@ class Category(BaseObject):
|
|||
pass
|
||||
|
||||
|
||||
class WcsObjects(object):
|
||||
class WcsObjects:
|
||||
url = None
|
||||
object_class = None
|
||||
|
||||
|
@ -519,7 +514,7 @@ class WcsObjects(object):
|
|||
yield self.object_class(wcs_api=self.wcs_api, **d)
|
||||
|
||||
def __len__(self):
|
||||
return len(list((o for o in self)))
|
||||
return len(list(o for o in self))
|
||||
|
||||
|
||||
class Roles(WcsObjects):
|
||||
|
@ -538,9 +533,19 @@ class Categories(WcsObjects):
|
|||
object_class = Category
|
||||
|
||||
|
||||
class WcsApi(object):
|
||||
def __init__(self, url, email=None, name_id=None, batch_size=1000,
|
||||
session=None, logger=None, orig=None, key=None, verify=True):
|
||||
class WcsApi:
|
||||
def __init__(
|
||||
self,
|
||||
url,
|
||||
email=None,
|
||||
name_id=None,
|
||||
batch_size=1000,
|
||||
session=None,
|
||||
logger=None,
|
||||
orig=None,
|
||||
key=None,
|
||||
verify=True,
|
||||
):
|
||||
self.url = url
|
||||
self.email = email
|
||||
self.name_id = name_id
|
||||
|
@ -602,7 +607,8 @@ class WcsApi(object):
|
|||
final_url,
|
||||
data=json.dumps(data),
|
||||
headers={'content-type': 'application/json'},
|
||||
verify=self.verify)
|
||||
verify=self.verify,
|
||||
)
|
||||
response.raise_for_status()
|
||||
except requests.RequestException as e:
|
||||
content = getattr(getattr(e, 'response', None), 'content', None)
|
||||
|
|
Loading…
Reference in New Issue