diff --git a/debian/control b/debian/control index a409293..c1c6d7b 100644 --- a/debian/control +++ b/debian/control @@ -2,7 +2,10 @@ Source: wcs-olap Section: python Priority: optional Maintainer: Benjamin Dauvergne -Build-Depends: python3-setuptools, python3-all, debhelper-compat (= 12), dh-python +Build-Depends: debhelper-compat (= 12), + dh-python, + python3-all, + python3-setuptools, Standards-Version: 3.9.6 Homepage: http://dev.entrouvert.org/projects/wcs-olap/ diff --git a/setup.py b/setup.py index 3c764a3..0332ed4 100644 --- a/setup.py +++ b/setup.py @@ -1,9 +1,9 @@ #! /usr/bin/env python -import subprocess import os +import subprocess -from setuptools import setup, find_packages +from setuptools import find_packages, setup from setuptools.command.sdist import sdist @@ -21,15 +21,17 @@ class eo_sdist(sdist): def get_version(): '''Use the VERSION, if absent generates a version with git describe, if not - tag exists, take 0.0- and add the length of the commit log. + tag exists, take 0.0- and add the length of the commit log. ''' if os.path.exists('VERSION'): - with open('VERSION', 'r') as v: + with open('VERSION') as v: return v.read() if os.path.exists('.git'): p = subprocess.Popen( ['git', 'describe', '--dirty=.dirty', '--match=v*'], - stdout=subprocess.PIPE, stderr=subprocess.PIPE) + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) result = p.communicate()[0] if p.returncode == 0: result = result.decode('ascii').strip()[1:] # strip spaces/newlines and initial v @@ -44,26 +46,22 @@ def get_version(): return '0.0' -setup(name="wcs-olap", - version=get_version(), - license="AGPLv3+", - description="Export w.c.s. data to an OLAP cube", - long_description=open('README.rst').read(), - url="http://dev.entrouvert.org/projects/publik-bi/", - author="Entr'ouvert", - author_email="authentic@listes.entrouvert.com", - maintainer="Benjamin Dauvergne", - maintainer_email="bdauvergne@entrouvert.com", - packages=find_packages(), - include_package_data=True, - install_requires=[ - 'requests', - 'psycopg2', - 'isodate', - 'six', - 'cached-property' - ], - entry_points={ - 'console_scripts': ['wcs-olap=wcs_olap.cmd:main'], - }, - cmdclass={'sdist': eo_sdist}) +setup( + name="wcs-olap", + version=get_version(), + license="AGPLv3+", + description="Export w.c.s. data to an OLAP cube", + long_description=open('README.rst').read(), + url="http://dev.entrouvert.org/projects/publik-bi/", + author="Entr'ouvert", + author_email="authentic@listes.entrouvert.com", + maintainer="Benjamin Dauvergne", + maintainer_email="bdauvergne@entrouvert.com", + packages=find_packages(), + include_package_data=True, + install_requires=['requests', 'psycopg2', 'isodate', 'six', 'cached-property'], + entry_points={ + 'console_scripts': ['wcs-olap=wcs_olap.cmd:main'], + }, + cmdclass={'sdist': eo_sdist}, +) diff --git a/tests/conftest.py b/tests/conftest.py index 3a64399..5c7315f 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,25 +1,22 @@ -# -*- coding: utf-8 -*- - import configparser +import os +import random +import shutil +import socket import sys import time -import os -import shutil -import random -import socket -from unittest import mock -from contextlib import closing, contextmanager, ExitStack from collections import namedtuple +from contextlib import ExitStack, closing, contextmanager +from unittest import mock import psycopg2 import pytest - import utils Wcs = namedtuple('Wcs', ['url', 'appdir', 'pid']) -class Database(object): +class Database: def __init__(self): self.db_name = 'db%s' % random.getrandbits(20) self.dsn = 'dbname=%s' % self.db_name @@ -60,21 +57,21 @@ def wcs_db(): WCS_SCRIPTS = { - 'setup-auth': u""" + 'setup-auth': """ from quixote import get_publisher get_publisher().cfg['identification'] = {'methods': ['password']} get_publisher().cfg['debug'] = {'display_exceptions': 'text'} get_publisher().write_cfg() """, - 'setup-storage': u""" + 'setup-storage': """ from quixote import get_publisher get_publisher().cfg['postgresql'] = {'database': %(dbname)r, 'port': %(port)r, 'host': %(host)r, 'user': %(user)r, 'password': %(password)r} get_publisher().write_cfg() get_publisher().initialize_sql() """, - 'create-user': u""" + 'create-user': """ from quixote import get_publisher from qommon.ident.password_accounts import PasswordAccount @@ -87,7 +84,7 @@ account.set_password('user') account.user_id = user.id account.store() """, - 'create-data': u""" + 'create-data': """ import datetime import random from quixote import get_publisher @@ -197,32 +194,44 @@ def wcs(tmp_path_factory, wcs_dir, wcs_db): tenant_dir.mkdir() utils.run_wcs_script(wcs_dir, WCS_SCRIPTS['setup-auth'], 'setup-auth') - utils.run_wcs_script(wcs_dir, WCS_SCRIPTS['setup-storage'] % { + utils.run_wcs_script( + wcs_dir, + WCS_SCRIPTS['setup-storage'] + % { 'dbname': wcs_db.db_name, 'port': os.environ.get('PGPORT'), 'host': os.environ.get('PGHOST'), 'user': os.environ.get('PGUSER'), 'password': os.environ.get('PGPASSWORD'), }, - 'setup-storage') + 'setup-storage', + ) utils.run_wcs_script(wcs_dir, WCS_SCRIPTS['create-user'], 'create-user') utils.run_wcs_script(wcs_dir, WCS_SCRIPTS['create-data'], 'create-data') with (tenant_dir / 'site-options.cfg').open('w') as fd: - fd.write(u'''[api-secrets] + fd.write( + '''[api-secrets] olap = olap -''') +''' + ) with (wcs_dir / 'wcs.cfg').open('w') as fd: - fd.write(u'''[main] -app_dir = %s\n''' % (str(wcs_dir).replace('%', '%%'))) + fd.write( + '''[main] +app_dir = %s\n''' + % (str(wcs_dir).replace('%', '%%')) + ) with (wcs_dir / 'local_settings.py').open('w') as fd: - fd.write(u''' + fd.write( + ''' WCS_LEGACY_CONFIG_FILE = '%s/wcs.cfg' THEMES_DIRECTORY = '/' ALLOWED_HOSTS = ['%s'] -''' % (wcs_dir, utils.HOSTNAME)) +''' + % (wcs_dir, utils.HOSTNAME) + ) # launch a Django worker for running w.c.s. WCS_PID = os.fork() @@ -230,7 +239,9 @@ ALLOWED_HOSTS = ['%s'] os.chdir(os.path.dirname(utils.WCS_MANAGE)) os.environ['DJANGO_SETTINGS_MODULE'] = 'wcs.settings' os.environ['WCS_SETTINGS_FILE'] = str(wcs_dir / 'local_settings.py') - os.execvp('python', [sys.executable, 'manage.py', 'runserver', '--noreload', '%s:%s' % (ADDRESS, PORT)]) + os.execvp( + 'python', [sys.executable, 'manage.py', 'runserver', '--noreload', '%s:%s' % (ADDRESS, PORT)] + ) sys.exit(0) # verify w.c.s. is launched @@ -272,7 +283,8 @@ def olap_cmd(wcs, tmpdir, postgres_db): model_dir = tmpdir / 'model_dir' model_dir.mkdir() with config_ini.open('w') as fd: - fd.write(u''' + fd.write( + ''' [wcs-olap] cubes_model_dirs = {model_dir} pg_dsn = {dsn} @@ -282,10 +294,14 @@ orig = olap key = olap schema = olap cubes_slug = olap-slug -'''.format(wcs=wcs, model_dir=str(model_dir).replace('%', '%%'), dsn=postgres_db.dsn)) +'''.format( + wcs=wcs, model_dir=str(model_dir).replace('%', '%%'), dsn=postgres_db.dsn + ) + ) + + import sys from wcs_olap import cmd - import sys def f(no_log_errors=True): old_argv = sys.argv @@ -327,8 +343,11 @@ def mock_cursor_execute(): mocked_cur = mock.Mock(wraps=cur) mocked_cur.execute = mock.Mock(wraps=cur.execute, **execute_mock_kwargs) return mocked_cur + mocked_conn.cursor = cursor return mocked_conn + stack.enter_context(mock.patch.object(psycopg2, 'connect', connect)) yield None + yield do diff --git a/tests/test_feeder.py b/tests/test_feeder.py index 47265f9..7ccbaef 100644 --- a/tests/test_feeder.py +++ b/tests/test_feeder.py @@ -28,7 +28,8 @@ def test_post_sync_commands(mock_cursor_execute, wcs, postgres_db, olap_cmd): config.set( 'wcs-olap', 'post-sync-commands', - "NOTIFY coucou, '{schema}';\nSELECT * FROM information_schema.tables") + "NOTIFY coucou, '{schema}';\nSELECT * FROM information_schema.tables", + ) queries = [] diff --git a/tests/test_sentry.py b/tests/test_sentry.py index 7a41942..8eae381 100644 --- a/tests/test_sentry.py +++ b/tests/test_sentry.py @@ -17,15 +17,17 @@ def capture_event_mock(): return capture_event_mock(*args, **kwargs) with contextlib.ExitStack() as stack: + def new_init(*args, **kwargs): old_init(*args, **kwargs) if sentry_sdk.Hub.current.client: stack.enter_context( mock.patch.object( - sentry_sdk.Hub.current.client.transport, 'capture_event', my_capture_event)) + sentry_sdk.Hub.current.client.transport, 'capture_event', my_capture_event + ) + ) - stack.enter_context( - mock.patch.object(sentry_sdk, 'init', new_init)) + stack.enter_context(mock.patch.object(sentry_sdk, 'init', new_init)) yield capture_event_mock diff --git a/tests/test_wcs.py b/tests/test_wcs.py index 64e3f54..2eade6a 100644 --- a/tests/test_wcs.py +++ b/tests/test_wcs.py @@ -16,17 +16,14 @@ import datetime import json -import pytest import pathlib -from urllib.parse import urlparse, parse_qs - import unittest.mock as mock +from urllib.parse import parse_qs, urlparse -import requests import httmock - +import pytest +import requests import utils - from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT @@ -49,8 +46,9 @@ def test_wcs_fixture(wcs, postgres_db, tmpdir, olap_cmd, caplog): with mock.patch('requests.Session', MockSession): olap_cmd() call_args_list = MockSession.mocks[-1].call_args_list - url_with_limits = [call_args[0][0] for call_args in MockSession.mocks[-1].call_args_list - if 'limit' in call_args[0][0]] + url_with_limits = [ + call_args[0][0] for call_args in MockSession.mocks[-1].call_args_list if 'limit' in call_args[0][0] + ] assert url_with_limits, call_args_list for url_with_limit in url_with_limits: parsed_qs = parse_qs(urlparse(url_with_limit).query) @@ -60,10 +58,12 @@ def test_wcs_fixture(wcs, postgres_db, tmpdir, olap_cmd, caplog): assert ( 'Le champ « 7th field bad duplicate » a un nom de variable dupliqué ' '« duplicate » mais pas le même type que « 6th field bad duplicate », ' - 'tous les champs avec ce nom seront ignorés (string != bool).') in caplog.text + 'tous les champs avec ce nom seront ignorés (string != bool).' + ) in caplog.text assert ( 'Le champ « 11th field third bad duplicate » est un doublon d\'un ' - 'champ de type différent, il a été ignoré.') in caplog.text + 'champ de type différent, il a été ignoré.' + ) in caplog.text expected_schema = [ ('agent', 'id'), @@ -131,15 +131,17 @@ def test_wcs_fixture(wcs, postgres_db, tmpdir, olap_cmd, caplog): ('status', 'id'), ('status', 'label'), ('status_demande', 'id'), - ('status_demande', 'label') + ('status_demande', 'label'), ] # verify SQL schema with postgres_db.conn() as conn: with conn.cursor() as c: - c.execute('SELECT table_name, column_name ' - 'FROM information_schema.columns ' - 'WHERE table_schema = \'olap\' ORDER BY table_name, ordinal_position') + c.execute( + 'SELECT table_name, column_name ' + 'FROM information_schema.columns ' + 'WHERE table_schema = \'olap\' ORDER BY table_name, ordinal_position' + ) assert list(c.fetchall()) == expected_schema @@ -165,15 +167,18 @@ def test_wcs_fixture(wcs, postgres_db, tmpdir, olap_cmd, caplog): c.execute('SELECT MIN(date), MAX(date) FROM public.dates') last_date = (datetime.date.today() + datetime.timedelta(days=150)).replace(month=12, day=31) assert c.fetchone()[0:2] == (datetime.date(2011, 1, 1), last_date) - c.execute('''SELECT COUNT(*) FROM + c.execute( + '''SELECT COUNT(*) FROM GENERATE_SERIES(%s::date, %s::date, '1 day'::interval) AS series(date) FULL OUTER JOIN public.dates AS dates ON series.date = dates.date WHERE dates.date IS NULL OR series.date IS NULL''', - vars=[datetime.date(2011, 1, 1), last_date]) + vars=[datetime.date(2011, 1, 1), last_date], + ) assert c.fetchone()[0] == 0 # verify JSON schema - with (olap_cmd.model_dir / 'olap.model').open() as fd, \ - (pathlib.Path(__file__).parent / 'olap.model').open() as fd2: + with (olap_cmd.model_dir / 'olap.model').open() as fd, ( + pathlib.Path(__file__).parent / 'olap.model' + ).open() as fd2: json_schema = json.load(fd) expected_json_schema = json.load(fd2) expected_json_schema['pg_dsn'] = postgres_db.dsn @@ -183,10 +188,12 @@ FULL OUTER JOIN public.dates AS dates ON series.date = dates.date WHERE dates.da with postgres_db.conn() as conn: with conn.cursor() as c: c.execute('SET search_path = olap') - c.execute('SELECT field_good_duplicate, count(id) ' - 'FROM formdata_demande ' - 'GROUP BY field_good_duplicate ' - 'ORDER BY field_good_duplicate') + c.execute( + 'SELECT field_good_duplicate, count(id) ' + 'FROM formdata_demande ' + 'GROUP BY field_good_duplicate ' + 'ORDER BY field_good_duplicate' + ) assert dict(c.fetchall()) == { 'a': 37, 'b': 13, @@ -238,7 +245,7 @@ def test_dimension_stability(wcs, wcs_dir, postgres_db, tmpdir, olap_cmd, caplog assert len(open_refs) == 3 # Change an item of the field - script = u""" + script = """ import datetime import random from quixote import get_publisher @@ -292,8 +299,10 @@ formdata.store() assert new_open_refs[-1][1] == 'open_new_value' open_new_id = new_open_refs[-1][0] - c.execute('''SELECT field_item, "field_itemOpen" - FROM formdata_demande ORDER BY id''') + c.execute( + '''SELECT field_item, "field_itemOpen" + FROM formdata_demande ORDER BY id''' + ) formdata = c.fetchone() assert formdata[0] == bazouka_id assert formdata[1] == open_new_id diff --git a/tests/utils.py b/tests/utils.py index 2648e7a..283a398 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -2,7 +2,6 @@ import os import subprocess import sys - HOSTNAME = '127.0.0.1' WCS_MANAGE = os.environ.get('WCS_MANAGE') @@ -14,5 +13,15 @@ def run_wcs_script(wcs_dir, script, script_name): fd.write(script) subprocess.check_call( - [sys.executable, WCS_MANAGE, 'runscript', '--app-dir', str(wcs_dir), '--vhost', HOSTNAME, str(script_path)], - env={'DJANGO_SETTINGS_MODULE': 'wcs.settings'}) + [ + sys.executable, + WCS_MANAGE, + 'runscript', + '--app-dir', + str(wcs_dir), + '--vhost', + HOSTNAME, + str(script_path), + ], + env={'DJANGO_SETTINGS_MODULE': 'wcs.settings'}, + ) diff --git a/wcs_olap/cmd.py b/wcs_olap/cmd.py index 5258029..3ab6c1f 100644 --- a/wcs_olap/cmd.py +++ b/wcs_olap/cmd.py @@ -14,7 +14,7 @@ except ImportError: else: from sentry_sdk.integrations.logging import LoggingIntegration -from . import wcs_api, feeder +from . import feeder, wcs_api def main(): @@ -63,15 +63,14 @@ def configure_sentry(config): # get DEBUG level logs as breadcrumbs logger.setLevel(logging.DEBUG) - sentry_logging = LoggingIntegration( - level=logging.DEBUG, - event_level=logging.ERROR) + sentry_logging = LoggingIntegration(level=logging.DEBUG, event_level=logging.ERROR) sentry_sdk.init( dsn=config.get('sentry', 'dsn'), environment=config.get('sentry', 'environment', fallback=None), attach_stacktrace=True, - integrations=[sentry_logging]) + integrations=[sentry_logging], + ) scopes.append(sentry_sdk.push_scope) @@ -82,17 +81,17 @@ def main2(): except locale.Error: # current locale does not exist pass - parser = argparse.ArgumentParser(description='Export W.C.S. data as a star schema in a ' - 'postgresql DB', add_help=False) + parser = argparse.ArgumentParser( + description='Export W.C.S. data as a star schema in a ' 'postgresql DB', add_help=False + ) parser.add_argument('config_path', nargs='?', default=None) group = parser.add_mutually_exclusive_group() - parser.add_argument('--no-feed', dest='feed', help='only produce the model', - action='store_false', default=True) - parser.add_argument('--no-log-errors', dest='no_log_errors', - action='store_true', default=False) + parser.add_argument( + '--no-feed', dest='feed', help='only produce the model', action='store_false', default=True + ) + parser.add_argument('--no-log-errors', dest='no_log_errors', action='store_true', default=False) parser.add_argument('--fake', action='store_true', default=False) - group.add_argument("-a", "--all", help="synchronize all wcs", action='store_true', - default=False) + group.add_argument("-a", "--all", help="synchronize all wcs", action='store_true', default=False) group.add_argument('--url', help='url of the w.c.s. instance', required=False, default=None) args, rest = parser.parse_known_args() feed = args.feed @@ -100,8 +99,7 @@ def main2(): config = get_config(path=args.config_path) configure_sentry(config) # list all known urls - urls = [url for url in config.sections() if url.startswith('http://') - or url.startswith('https://')] + urls = [url for url in config.sections() if url.startswith('http://') or url.startswith('https://')] defaults = {} if not args.all: try: @@ -129,7 +127,7 @@ def main2(): if config.has_section('wcs-olap'): defaults.update(config.items('wcs-olap')) if config.has_section(url): - defaults.update((config.items(url))) + defaults.update(config.items(url)) try: key = defaults['key'] orig = defaults['orig'] @@ -142,14 +140,25 @@ def main2(): logger.error('configuration incomplete for %s: %s', url, e) else: try: - api = wcs_api.WcsApi(url=url, orig=orig, key=key, - batch_size=batch_size, - verify=(defaults.get('verify', 'True') == 'True')) - logger.info('starting synchronizing w.c.s. at %r with PostgreSQL at %s', url, - pg_dsn) + api = wcs_api.WcsApi( + url=url, + orig=orig, + key=key, + batch_size=batch_size, + verify=(defaults.get('verify', 'True') == 'True'), + ) + logger.info('starting synchronizing w.c.s. at %r with PostgreSQL at %s', url, pg_dsn) olap_feeder = feeder.WcsOlapFeeder( - api=api, schema=schema, pg_dsn=pg_dsn, logger=logger, - config=defaults, do_feed=feed, fake=fake, slugs=slugs, scope=scope) + api=api, + schema=schema, + pg_dsn=pg_dsn, + logger=logger, + config=defaults, + do_feed=feed, + fake=fake, + slugs=slugs, + scope=scope, + ) olap_feeder.feed() logger.info('finished') feed_result = False @@ -163,5 +172,6 @@ def main2(): if failure: sys.exit(1) + if __name__ == '__main__': main() diff --git a/wcs_olap/feeder.py b/wcs_olap/feeder.py index cc78882..850198c 100644 --- a/wcs_olap/feeder.py +++ b/wcs_olap/feeder.py @@ -1,24 +1,22 @@ -# -*- coding: utf-8 -*- - -from __future__ import unicode_literals - -from collections import OrderedDict import contextlib -import datetime import copy -import itertools -import os -import json +import datetime import hashlib +import itertools +import json +import os import reprlib -from .utils import Whatever +import time +from collections import OrderedDict + import psycopg2 import psycopg2.errorcodes -import time - from cached_property import cached_property + from wcs_olap.wcs_api import WcsApiError +from .utils import Whatever + psycopg2.extensions.register_type(psycopg2.extensions.UNICODE) psycopg2.extensions.register_type(psycopg2.extensions.UNICODEARRAY) @@ -29,9 +27,10 @@ def truncate_pg_identifier(identifier, hash_length=6, force_hash=False): else: # insert hash in the middle, to keep some readability return ( - identifier[:(63 - hash_length) // 2] + identifier[: (63 - hash_length) // 2] + hashlib.md5(identifier.encode('utf-8')).hexdigest()[:hash_length] - + identifier[-(63 - hash_length) // 2:]) + + identifier[-(63 - hash_length) // 2 :] + ) @contextlib.contextmanager @@ -51,7 +50,7 @@ def slugify(s): return s.replace('-', '_').replace(' ', '_') -class Context(object): +class Context: def __init__(self): self.stack = [] @@ -68,7 +67,7 @@ class Context(object): return r -class WcsOlapFeeder(object): +class WcsOlapFeeder: channels = [ [1, 'web', 'web'], @@ -80,18 +79,20 @@ class WcsOlapFeeder(object): [7, 'fax', 'fax'], [8, 'social-network', 'réseau social'], ] - channel_to_id = dict((c[1], c[0]) for c in channels) - id_to_channel = dict((c[0], c[1]) for c in channels) + channel_to_id = {c[1]: c[0] for c in channels} + id_to_channel = {c[0]: c[1] for c in channels} status = [ [1, 'Nouveau'], [2, 'En cours'], [3, 'Terminé'], ] - status_to_id = dict((c[1], c[0]) for c in channels) - id_to_status = dict((c[0], c[1]) for c in channels) + status_to_id = {c[1]: c[0] for c in channels} + id_to_status = {c[0]: c[1] for c in channels} - def __init__(self, api, pg_dsn, schema, logger=None, config=None, do_feed=True, fake=False, slugs=None, scope=None): + def __init__( + self, api, pg_dsn, schema, logger=None, config=None, do_feed=True, fake=False, slugs=None, scope=None + ): self.api = api self.slugs = slugs self.fake = fake @@ -103,23 +104,25 @@ class WcsOlapFeeder(object): self.schema_temp = truncate_pg_identifier(schema + '_temp') self.do_feed = do_feed self.ctx = Context() - self.ctx.push({ - 'schema': self.schema, - 'schema_temp': self.schema_temp, - 'role_table': 'role', - 'channel_table': 'channel', - 'category_table': 'category', - 'form_table': 'formdef', - 'generic_formdata_table': 'formdata', - 'generic_status_table': 'status', - 'generic_evolution_table': 'evolution', - 'year_table': 'year', - 'month_table': 'month', - 'day_table': 'day', - 'dow_table': 'dow', - 'hour_table': 'hour', - 'agent_table': 'agent', - }) + self.ctx.push( + { + 'schema': self.schema, + 'schema_temp': self.schema_temp, + 'role_table': 'role', + 'channel_table': 'channel', + 'category_table': 'category', + 'form_table': 'formdef', + 'generic_formdata_table': 'formdata', + 'generic_status_table': 'status', + 'generic_evolution_table': 'evolution', + 'year_table': 'year', + 'month_table': 'month', + 'day_table': 'day', + 'dow_table': 'dow', + 'hour_table': 'hour', + 'agent_table': 'agent', + } + ) self.config = config or {} self.model = { 'label': self.config.get('cubes_label', schema), @@ -276,19 +279,19 @@ class WcsOlapFeeder(object): 'label': 'pourcentage des demandes', 'type': 'percent', "expression": 'case (select count({fact_table}.id) from {table_expression} ' - 'where {where_conditions}) when 0 then null else ' - 'count({fact_table}.id) * 100. / (select ' - 'count({fact_table}.id) from {table_expression} where ' - '{where_conditions}) end', + 'where {where_conditions}) when 0 then null else ' + 'count({fact_table}.id) * 100. / (select ' + 'count({fact_table}.id) from {table_expression} where ' + '{where_conditions}) end', }, { 'name': 'geolocation', 'label': 'localisation géographique', 'type': 'point', 'expression': 'array_agg({fact_table}.geolocation_base) ' - 'FILTER (WHERE {fact_table}.geolocation_base IS NOT NULL)', - } - ] + 'FILTER (WHERE {fact_table}.geolocation_base IS NOT NULL)', + }, + ], } self.model['cubes'].append(cube) self.base_cube = self.model['cubes'][0] @@ -309,7 +312,11 @@ class WcsOlapFeeder(object): @cached_property def formdefs(self): - return [formdef for formdef in self.api.formdefs if (not self.slugs or formdef.slug in self.slugs) and not formdef.is_empty] + return [ + formdef + for formdef in self.api.formdefs + if (not self.slugs or formdef.slug in self.slugs) and not formdef.is_empty + ] @cached_property def roles(self): @@ -343,7 +350,9 @@ class WcsOlapFeeder(object): try: self.cur.execute(sql, vars=vars) except Exception as e: - self.logger.warning('Failed to execute %r with vars %s, raised %s', sql, reprlib.repr(vars or []), e) + self.logger.warning( + 'Failed to execute %r with vars %s, raised %s', sql, reprlib.repr(vars or []), e + ) raise @contextlib.contextmanager @@ -381,23 +390,33 @@ class WcsOlapFeeder(object): Drop tables one by one in order to avoid reaching max_locks_per_transaction """ # drop foreign key constraints first - self.ex("SELECT table_name, constraint_name FROM " - "information_schema.key_column_usage " - "WHERE table_schema = %s AND constraint_name LIKE '%%_fkey'", vars=[schema]) + self.ex( + "SELECT table_name, constraint_name FROM " + "information_schema.key_column_usage " + "WHERE table_schema = %s AND constraint_name LIKE '%%_fkey'", + vars=[schema], + ) for table_name, constraint_name in self.cur.fetchall(): # drop of PK constraints can have effects on FK constraint on other tables. with ignore_undefined_object_or_table(): - self.ex('ALTER TABLE %s.%s DROP CONSTRAINT IF EXISTS %s CASCADE' - % (quote(schema), quote(table_name), quote(constraint_name))) + self.ex( + 'ALTER TABLE %s.%s DROP CONSTRAINT IF EXISTS %s CASCADE' + % (quote(schema), quote(table_name), quote(constraint_name)) + ) # remove others - self.ex("SELECT table_name, constraint_name FROM " - "information_schema.key_column_usage " - "WHERE table_schema = %s", vars=[schema]) + self.ex( + "SELECT table_name, constraint_name FROM " + "information_schema.key_column_usage " + "WHERE table_schema = %s", + vars=[schema], + ) for table_name, constraint_name in self.cur.fetchall(): # drop of PK constraints can have effects on FK constraint on other tables. with ignore_undefined_object_or_table(): - self.ex('ALTER TABLE %s.%s DROP CONSTRAINT IF EXISTS %s CASCADE' - % (quote(schema), quote(table_name), quote(constraint_name))) + self.ex( + 'ALTER TABLE %s.%s DROP CONSTRAINT IF EXISTS %s CASCADE' + % (quote(schema), quote(table_name), quote(constraint_name)) + ) # then drop indexes self.ex("SELECT tablename, indexname FROM pg_indexes WHERE schemaname = %s", vars=[schema]) for table_name, index_name in self.cur.fetchall(): @@ -405,7 +424,9 @@ class WcsOlapFeeder(object): self.ex('DROP INDEX %s.%s CASCADE' % (quote(schema), quote(index_name))) # finally drop tables, cascade will have no effect - self.ex("SELECT tablename FROM pg_tables WHERE schemaname = %s ORDER BY tablename DESC", vars=[schema]) + self.ex( + "SELECT tablename FROM pg_tables WHERE schemaname = %s ORDER BY tablename DESC", vars=[schema] + ) for table in self.cur.fetchall(): tablename = '%s.%s' % (quote(schema), quote(table[0])) self.ex('DROP TABLE IF EXISTS %s;' % tablename) @@ -418,7 +439,8 @@ class WcsOlapFeeder(object): first_date = max_date or datetime.date(2010, 1, 1) last_date = (datetime.date.today() + datetime.timedelta(days=150)).replace(month=12, day=31) - self.ex('''INSERT INTO public.dates SELECT + self.ex( + '''INSERT INTO public.dates SELECT the_date.the_date::date AS date, to_char(the_date.the_date, 'TMday') AS day, to_char(the_date.the_date, 'TMmonth') AS month @@ -427,7 +449,8 @@ class WcsOlapFeeder(object): LEFT JOIN public.dates AS dates ON the_date.the_date = dates.date WHERE dates.date IS NULL''', - vars=[first_date, last_date]) + vars=[first_date, last_date], + ) def create_table(self, name, columns, inherits=None, comment=None, unlogged=True): if unlogged: @@ -448,18 +471,21 @@ class WcsOlapFeeder(object): return self.cur.fetchone()[0] def update_table_sequence_number(self, name): - self.ex("""SELECT setval(pg_get_serial_sequence('{name}', 'id'), - (SELECT GREATEST(1, MAX(id)) FROM {name}))""", ctx={'name': quote(name)}) + self.ex( + """SELECT setval(pg_get_serial_sequence('{name}', 'id'), + (SELECT GREATEST(1, MAX(id)) FROM {name}))""", + ctx={'name': quote(name)}, + ) def create_labeled_table_serial(self, name, comment): self.create_table( - name, [['id', 'serial primary key'], ['label', 'varchar']], comment=comment, unlogged=False) + name, [['id', 'serial primary key'], ['label', 'varchar']], comment=comment, unlogged=False + ) if self.prev_table_exists(name): # Insert data from previous table self.ex( - 'INSERT INTO {schema_temp}.{name} SELECT * FROM {schema}.{name}', - ctx={'name': quote(name)} + 'INSERT INTO {schema_temp}.{name} SELECT * FROM {schema}.{name}', ctx={'name': quote(name)} ) self.update_table_sequence_number(name) @@ -476,12 +502,15 @@ class WcsOlapFeeder(object): self.ex( 'SELECT ref, {column} FROM {name} WHERE ref = %s', ctx={'name': quote(name), 'column': quote(update_column)}, - vars=(ref,)) + vars=(ref,), + ) if self.cur.fetchall(): for item in self.cur.fetchall(): - self.ex('UPDATE {name} SET {column}=%s WHERE ref=%s', - ctx={'name': quote(name), 'column': quote(update_column)}, - vars=[item[1], ref]) + self.ex( + 'UPDATE {name} SET {column}=%s WHERE ref=%s', + ctx={'name': quote(name), 'column': quote(update_column)}, + vars=[item[1], ref], + ) else: to_insert.append(item) if to_insert: @@ -489,35 +518,32 @@ class WcsOlapFeeder(object): tmpl = ', '.join(['(DEFAULT, %s)' % columns_values] * len(data)) query = 'INSERT INTO {name} VALUES %s' % tmpl - self.ex(query, ctx={'name': quote(name)}, # 'column': quote(update_column)}, - vars=list(itertools.chain(*to_insert))) + self.ex( + query, + ctx={'name': quote(name)}, # 'column': quote(update_column)}, + vars=list(itertools.chain(*to_insert)), + ) result = {} - self.ex('SELECT id, {column} FROM {name}', ctx={'name': quote(name), - 'column': result_column}) - for _id, column in self.cur.fetchall(): + self.ex('SELECT id, {column} FROM {name}', ctx={'name': quote(name), 'column': result_column}) + for _id, column in self.cur.fetchall(): result[column] = _id return result def create_labeled_table(self, name, labels, comment=None): self.create_table( - name, - [ - ['id', 'integer primary key'], - ['label', 'varchar'] - ], comment=comment, unlogged=False) + name, [['id', 'integer primary key'], ['label', 'varchar']], comment=comment, unlogged=False + ) if self.prev_table_exists(name): # Insert data from previous table self.ex( - 'INSERT INTO {schema_temp}.{name} select * FROM {schema}.{name}', - ctx={'name': quote(name)} + 'INSERT INTO {schema_temp}.{name} select * FROM {schema}.{name}', ctx={'name': quote(name)} ) # Find what is missing to_insert = [] for _id, _label in labels: - self.ex( - 'SELECT * FROM {name} WHERE label = %s', ctx={'name': quote(name)}, vars=(_label,)) + self.ex('SELECT * FROM {name} WHERE label = %s', ctx={'name': quote(name)}, vars=(_label,)) if self.cur.fetchone() is None: to_insert.append(_label) @@ -551,19 +577,19 @@ class WcsOlapFeeder(object): tmp_cat_map = self.do_referenced_data(table_name, categories_data, 'label') self.update_table_sequence_number(table_name) # remap categories ids to ids in the table - return dict((c.title, tmp_cat_map[c.title]) for c in self.categories) + return {c.title: tmp_cat_map[c.title] for c in self.categories} def do_formdef_table(self): categories_mapping = self.do_category_table() - formdef_fields = [['category_id', 'integer REFERENCES {category_table} (id)'], - ['label', 'varchar'] - ] + formdef_fields = [['category_id', 'integer REFERENCES {category_table} (id)'], ['label', 'varchar']] table_name = self.default_ctx['form_table'] self.create_referenced_table(table_name, formdef_fields, 'types de formulaire') - formdefs = [(formdef.slug, categories_mapping.get(formdef.schema.category), - formdef.schema.name) for formdef in self.formdefs] + formdefs = [ + (formdef.slug, categories_mapping.get(formdef.schema.category), formdef.schema.name) + for formdef in self.formdefs + ] self.formdefs_mapping = self.do_referenced_data(table_name, formdefs, 'ref') self.formdefs_unique_slug = {} seen = set() @@ -580,23 +606,19 @@ class WcsOlapFeeder(object): def do_base_table(self): # channels - self.create_labeled_table('channel', [[c[0], c[2]] for c in self.channels], - comment='canal') + self.create_labeled_table('channel', [[c[0], c[2]] for c in self.channels], comment='canal') # roles - roles = dict((i, role.name) for i, role in enumerate(self.roles)) + roles = {i: role.name for i, role in enumerate(self.roles)} tmp_role_map = self.create_labeled_table('role', roles.items(), comment='role') - self.role_mapping = dict( - (role.id, tmp_role_map[role.name]) for role in self.roles) + self.role_mapping = {role.id: tmp_role_map[role.name] for role in self.roles} # forms self.do_formdef_table() - self.create_labeled_table('hour', zip(range(0, 24), map(str, range(0, 24))), - comment='heures') + self.create_labeled_table('hour', zip(range(0, 24), map(str, range(0, 24))), comment='heures') - self.create_labeled_table('status', self.status, - comment='statuts simplifiés') + self.create_labeled_table('status', self.status, comment='statuts simplifiés') # agents self.create_labeled_table_serial('agent', comment='agents') @@ -630,14 +652,20 @@ class WcsOlapFeeder(object): self.ex('COMMENT ON COLUMN {generic_formdata_table}.%s IS %%s' % at, vars=(comment,)) self.ex('COMMENT ON TABLE {generic_formdata_table} IS %s', vars=('tous les formulaires',)) # evolutions - self.create_table('{generic_evolution_table}', [ - ['id', 'serial primary key'], - ['generic_status_id', 'smallint REFERENCES {generic_status_table} (id)'], - ['formdata_id', 'integer'], # "REFERENCES {generic_formdata_table} (id)" is impossible because FK constraints do not work on inherited tables - ['time', 'timestamp'], - ['date', 'date'], - ['hour_id', 'smallint REFERENCES {hour_table} (id)'], - ]) + self.create_table( + '{generic_evolution_table}', + [ + ['id', 'serial primary key'], + ['generic_status_id', 'smallint REFERENCES {generic_status_table} (id)'], + [ + 'formdata_id', + 'integer', + ], # "REFERENCES {generic_formdata_table} (id)" is impossible because FK constraints do not work on inherited tables + ['time', 'timestamp'], + ['date', 'date'], + ['hour_id', 'smallint REFERENCES {hour_table} (id)'], + ], + ) self.ex('COMMENT ON TABLE {generic_evolution_table} IS %s', vars=('evolution générique',)) def feed(self): @@ -678,37 +706,38 @@ class WcsOlapFeeder(object): def switch_schemas(self): self.old_schema = truncate_pg_identifier( - self.schema - + '_old_' - + datetime.datetime.now().strftime('%Y%m%d%H%M')) + self.schema + '_old_' + datetime.datetime.now().strftime('%Y%m%d%H%M') + ) self.ctx.push({'old_schema': self.old_schema}) try: + def switch(): with self.atomic(): - self.logger.info('renaming schema %s to %s and schema %s to %s', - self.schema, self.old_schema, - self.schema_temp, self.schema) + self.logger.info( + 'renaming schema %s to %s and schema %s to %s', + self.schema, + self.old_schema, + self.schema_temp, + self.schema, + ) if self.schema_exists(self.schema): self.ex('ALTER SCHEMA {schema} RENAME TO {old_schema}') self.ex('ALTER SCHEMA {schema_temp} RENAME TO {schema}') - self.retry_on_db_error( - switch, - times=33, - sleep_time=1) + + self.retry_on_db_error(switch, times=33, sleep_time=1) except Exception: self.logger.warning('failed to switch schemas 3-times in 33 seconds') raise try: + def drop_old_schema(): self.logger.info('dropping schema %s', self.old_schema) self.drop_tables_sequencially(self.old_schema) self.ex('DROP SCHEMA IF EXISTS {old_schema} CASCADE') - self.retry_on_db_error( - drop_old_schema, - times=33, - sleep_time=1) + + self.retry_on_db_error(drop_old_schema, times=33, sleep_time=1) except Exception: self.logger.exception('could not drop schema %s', self.old_schema) @@ -730,10 +759,14 @@ class WcsOlapFeeder(object): def create_formdata_json_index(self, table_name, varname): if varname in self.formdata_json_index: return - index_name = self.hash_table_name('%s_%s_json_idx' % (table_name, varname), hash_length=8, - force_hash=True) - self.ex('CREATE INDEX {index_name} ON {generic_formdata_table} (("json_data"->>%s))', - ctx={'index_name': quote(index_name)}, vars=[varname]) + index_name = self.hash_table_name( + '%s_%s_json_idx' % (table_name, varname), hash_length=8, force_hash=True + ) + self.ex( + 'CREATE INDEX {index_name} ON {generic_formdata_table} (("json_data"->>%s))', + ctx={'index_name': quote(index_name)}, + vars=[varname], + ) # prevent double creation self.formdata_json_index.append(varname) @@ -742,7 +775,7 @@ def has_digits_validation(field): return field.validation and field.validation.get('type') == 'digits' -class WcsFormdefFeeder(object): +class WcsFormdefFeeder: def __init__(self, olap_feeder, formdef, do_feed=True): self.olap_feeder = olap_feeder self.formdef = formdef @@ -771,26 +804,31 @@ class WcsFormdefFeeder(object): def do_statuses(self): statuses = self.formdef.schema.workflow.statuses tmp_status_map = self.olap_feeder.create_labeled_table( - self.status_table_name, enumerate([s.name for s in statuses]), - comment='statuts du formulaire « %s »' % self.formdef.schema.name) - self.status_mapping = dict((s.id, tmp_status_map[s.name]) for s in statuses) + self.status_table_name, + enumerate([s.name for s in statuses]), + comment='statuts du formulaire « %s »' % self.formdef.schema.name, + ) + self.status_mapping = {s.id: tmp_status_map[s.name] for s in statuses} def do_data_table(self): columns = OrderedDict() - columns['status_id'] = {'sql_col_name': 'status_id', 'sql_col_def': 'smallint REFERENCES {status_table} (id)'} + columns['status_id'] = { + 'sql_col_name': 'status_id', + 'sql_col_def': 'smallint REFERENCES {status_table} (id)', + } # add item fields for field in self.good_fields.values(): if field.type == 'item': - comment = ('valeurs du champ « %s » du formulaire %s' - % (field.label, self.formdef.schema.name)) + comment = 'valeurs du champ « %s » du formulaire %s' % (field.label, self.formdef.schema.name) table_name = self.hash_table_name('%s_field_%s' % (self.table_name, field.varname)) # create table and mapping if field.items: # filter non string items items = [item for item in field.items if isinstance(item, str)] self.items_mappings[field.varname] = self.create_labeled_table( - table_name, enumerate(items), comment=comment) + table_name, enumerate(items), comment=comment + ) else: # open item field, from data sources... self.create_labeled_table_serial(table_name, comment=comment) @@ -839,15 +877,19 @@ class WcsFormdefFeeder(object): self.columns.append(columns[key]['sql_col_name']) self.columns.remove('geolocation_base') - self.create_table(self.table_name, - [(columns[key]['sql_col_name'], columns[key]['sql_col_def']) for key in columns], - inherits='{generic_formdata_table}', - comment='formulaire %s' % self.formdef.schema.name) + self.create_table( + self.table_name, + [(columns[key]['sql_col_name'], columns[key]['sql_col_def']) for key in columns], + inherits='{generic_formdata_table}', + comment='formulaire %s' % self.formdef.schema.name, + ) for key in columns: column = columns[key] if column.get('sql_comment'): - self.ex('COMMENT ON COLUMN {formdata_table}.%s IS %%s' % quote(column['sql_col_name']), - vars=(column['sql_comment'],)) + self.ex( + 'COMMENT ON COLUMN {formdata_table}.%s IS %%s' % quote(column['sql_col_name']), + vars=(column['sql_comment'],), + ) # Creat index for JSON fields if self.has_jsonb: @@ -864,26 +906,37 @@ class WcsFormdefFeeder(object): self.ex('ALTER TABLE {formdata_table} ADD CONSTRAINT %s' % constraint) self.ex('ALTER TABLE {formdata_table} ADD PRIMARY KEY (id)') # table des evolutions - self.create_table(self.evolution_table_name, [ - ['id', 'serial primary key'], - ['status_id', 'smallint REFERENCES {status_table} (id)'], - ['formdata_id', 'integer REFERENCES {formdata_table} (id)'], - ['time', 'timestamp'], - ['date', 'date'], - ['hour_id', 'smallint REFERENCES {hour_table} (id)'], - ]) - self.ex('COMMENT ON TABLE {evolution_table} IS %s', - vars=('evolution des demandes %s' % self.formdef.schema.name,)) + self.create_table( + self.evolution_table_name, + [ + ['id', 'serial primary key'], + ['status_id', 'smallint REFERENCES {status_table} (id)'], + ['formdata_id', 'integer REFERENCES {formdata_table} (id)'], + ['time', 'timestamp'], + ['date', 'date'], + ['hour_id', 'smallint REFERENCES {hour_table} (id)'], + ], + ) + self.ex( + 'COMMENT ON TABLE {evolution_table} IS %s', + vars=('evolution des demandes %s' % self.formdef.schema.name,), + ) def insert_item_value(self, field, value): table_name = self.hash_table_name('%s_field_%s' % (self.table_name, field.varname)) - self.ex('SELECT id FROM {item_table} WHERE label = %s', - ctx={'item_table': quote(table_name)}, vars=(value,)) + self.ex( + 'SELECT id FROM {item_table} WHERE label = %s', + ctx={'item_table': quote(table_name)}, + vars=(value,), + ) res = self.cur.fetchone() if res: return res[0] - self.ex('INSERT INTO {item_table} (label) VALUES (%s) RETURNING (id)', vars=(value,), - ctx={'item_table': quote(table_name)}) + self.ex( + 'INSERT INTO {item_table} (label) VALUES (%s) RETURNING (id)', + vars=(value,), + ctx={'item_table': quote(table_name)}, + ) return self.cur.fetchone()[0] def get_item_id(self, field, value): @@ -930,8 +983,9 @@ class WcsFormdefFeeder(object): try: status = data.formdef.schema.workflow.statuses_map[status_id] except KeyError: - self.logger.warning('%s.%s unknown status status_id %s', - data.formdef.schema.name, data.id, status_id) + self.logger.warning( + '%s.%s unknown status status_id %s', data.formdef.schema.name, data.id, status_id + ) continue channel = data.submission.channel.lower() @@ -1022,17 +1076,16 @@ class WcsFormdefFeeder(object): try: status = data.formdef.schema.workflow.statuses_map[evo.status] except KeyError: - self.logger.warning('%s.%s unknown status in evolution %s', - data.formdef.schema.name, data.id, evo.status) + self.logger.warning( + '%s.%s unknown status in evolution %s', data.formdef.schema.name, data.id, evo.status + ) continue status_id = self.status_mapping[status.id] generic_status_id = self.generic_status(status) - evolution.append( - [0, status_id, evo.time, evo.time.date(), evo.time.hour]) + evolution.append([0, status_id, evo.time, evo.time.date(), evo.time.hour]) if generic_status_id == last_status: continue - generic_evolution.append( - [0, generic_status_id, evo.time, evo.time.date(), evo.time.hour]) + generic_evolution.append([0, generic_status_id, evo.time, evo.time.date(), evo.time.hour]) last_status = generic_status_id generic_evolution_values.append(generic_evolution) evolution_values.append(evolution) @@ -1041,12 +1094,11 @@ class WcsFormdefFeeder(object): return insert_columns = ['%s' % quote(column) for column in self.columns[1:]] insert_columns = ', '.join(insert_columns) - self.ex('INSERT INTO {formdata_table} ({columns}) VALUES {values} RETURNING id', - ctx=dict( - columns=insert_columns, - values=', '.join(['%s'] * len(values)) - ), - vars=values) + self.ex( + 'INSERT INTO {formdata_table} ({columns}) VALUES {values} RETURNING id', + ctx=dict(columns=insert_columns, values=', '.join(['%s'] * len(values))), + vars=values, + ) # insert generic evolutions generic_evolutions = [] @@ -1056,14 +1108,24 @@ class WcsFormdefFeeder(object): row[0] = formdata_id generic_evolutions.append(tuple(row)) if len(generic_evolutions) == 500: - self.ex('INSERT INTO {generic_evolution_table} (%s) VALUES %s' % ( - ', '.join(['formdata_id', 'generic_status_id', 'time', 'date', 'hour_id']), - ', '.join(['%s'] * len(generic_evolutions))), vars=generic_evolutions) + self.ex( + 'INSERT INTO {generic_evolution_table} (%s) VALUES %s' + % ( + ', '.join(['formdata_id', 'generic_status_id', 'time', 'date', 'hour_id']), + ', '.join(['%s'] * len(generic_evolutions)), + ), + vars=generic_evolutions, + ) generic_evolutions = [] if generic_evolutions: - self.ex('INSERT INTO {generic_evolution_table} (%s) VALUES %s' % ( - ', '.join(['formdata_id', 'generic_status_id', 'time', 'date', 'hour_id']), - ', '.join(['%s'] * len(generic_evolutions))), vars=generic_evolutions) + self.ex( + 'INSERT INTO {generic_evolution_table} (%s) VALUES %s' + % ( + ', '.join(['formdata_id', 'generic_status_id', 'time', 'date', 'hour_id']), + ', '.join(['%s'] * len(generic_evolutions)), + ), + vars=generic_evolutions, + ) # insert evolutions evolutions = [] @@ -1072,14 +1134,24 @@ class WcsFormdefFeeder(object): row[0] = formdata_id evolutions.append(tuple(row)) if len(evolutions) == 500: - self.ex('INSERT INTO {evolution_table} (%s) VALUES %s' % ( - ', '.join(['formdata_id', 'status_id', 'time', 'date', 'hour_id']), - ', '.join(['%s'] * len(evolutions))), vars=evolutions) + self.ex( + 'INSERT INTO {evolution_table} (%s) VALUES %s' + % ( + ', '.join(['formdata_id', 'status_id', 'time', 'date', 'hour_id']), + ', '.join(['%s'] * len(evolutions)), + ), + vars=evolutions, + ) evolutions = [] if evolutions: - self.ex('INSERT INTO {evolution_table} (%s) VALUES %s' % ( - ', '.join(['formdata_id', 'status_id', 'time', 'date', 'hour_id']), - ', '.join(['%s'] * len(evolutions))), vars=evolutions) + self.ex( + 'INSERT INTO {evolution_table} (%s) VALUES %s' + % ( + ', '.join(['formdata_id', 'status_id', 'time', 'date', 'hour_id']), + ', '.join(['%s'] * len(evolutions)), + ), + vars=evolutions, + ) def get_first_agent_in_evolution(self, formdata): for evo in formdata.evolution: @@ -1087,11 +1159,13 @@ class WcsFormdefFeeder(object): return self.get_agent(evo.who) def feed(self): - self.olap_feeder.ctx.push({ - 'formdata_table': quote(self.table_name), - 'status_table': quote(self.status_table_name), - 'evolution_table': quote(self.evolution_table_name), - }) + self.olap_feeder.ctx.push( + { + 'formdata_table': quote(self.table_name), + 'status_table': quote(self.status_table_name), + 'evolution_table': quote(self.evolution_table_name), + } + ) # create cube cube = self.cube = copy.deepcopy(self.base_cube) @@ -1102,51 +1176,62 @@ class WcsFormdefFeeder(object): # remove json field from formdef cubes cube.pop('json_field', None) - cube.update({ - 'name': self.table_name, - 'label': self.formdef.schema.name, - 'fact_table': quote(self.table_name), - 'key': 'id', - }) - cube['dimensions'] = [dimension for dimension in cube['dimensions'] - if dimension['name'] not in ('category', 'formdef')] + cube.update( + { + 'name': self.table_name, + 'label': self.formdef.schema.name, + 'fact_table': quote(self.table_name), + 'key': 'id', + } + ) + cube['dimensions'] = [ + dimension for dimension in cube['dimensions'] if dimension['name'] not in ('category', 'formdef') + ] # add dimension for status - cube['joins'].append({ - 'name': 'status', - 'table': quote(self.status_table_name), - 'master': 'status_id', - 'detail': 'id', - 'kind': 'left', - }) - cube['dimensions'].append({ - 'name': 'status', - 'label': 'statut', - 'join': ['status'], - 'type': 'integer', - 'value': 'status.id', - 'value_label': 'status.label', - }) + cube['joins'].append( + { + 'name': 'status', + 'table': quote(self.status_table_name), + 'master': 'status_id', + 'detail': 'id', + 'kind': 'left', + } + ) + cube['dimensions'].append( + { + 'name': 'status', + 'label': 'statut', + 'join': ['status'], + 'type': 'integer', + 'value': 'status.id', + 'value_label': 'status.label', + } + ) # add dimension for function for function, name in self.formdef.schema.workflow.functions.items(): at = 'function_%s' % slugify(function) - cube['joins'].append({ - 'name': at, - 'table': 'role', - 'master': quote(at), - 'detail': 'id', - 'kind': 'left', - }) - cube['dimensions'].append({ - 'name': at, - 'label': 'fonction %s' % name.lower(), - 'join': [at], - 'type': 'integer', - 'value': '%s.id' % quote(at), - 'value_label': '%s.label' % quote(at), - 'filter': False, - }) + cube['joins'].append( + { + 'name': at, + 'table': 'role', + 'master': quote(at), + 'detail': 'id', + 'kind': 'left', + } + ) + cube['dimensions'].append( + { + 'name': at, + 'label': 'fonction %s' % name.lower(), + 'join': [at], + 'type': 'integer', + 'value': '%s.id' % quote(at), + 'value_label': '%s.label' % quote(at), + 'filter': False, + } + ) # add dimensions for item fields fields = self.formdef.schema.fields @@ -1171,7 +1256,8 @@ class WcsFormdefFeeder(object): add_warning( 'Le champ « %(label)s » a un nom de variable dupliqué « %(varname)s » ' 'mais pas le même type que « %(label_good)s », tous les champs avec ce nom seront ignorés ' - '(%(type)s != %(type_good)s).' % { + '(%(type)s != %(type_good)s).' + % { 'label': field.label, 'varname': field.varname, 'type': field.type, @@ -1183,8 +1269,10 @@ class WcsFormdefFeeder(object): bad_varnames.add(field.varname) continue if field.varname in bad_varnames: - add_warning('Le champ « %s » est un doublon d\'un champ de type différent, il a été ignoré.' - % field.label) + add_warning( + 'Le champ « %s » est un doublon d\'un champ de type différent, il a été ignoré.' + % field.label + ) continue self.good_fields[field.varname] = field @@ -1221,8 +1309,9 @@ class WcsFormdefFeeder(object): 'type': 'bool', 'value': quote(field_name), 'value_label': '(CASE WHEN %(field)s IS NULL THEN NULL' - ' WHEN %(field)s THEN \'Oui\'' - ' ELSE \'Non\' END)' % { + ' WHEN %(field)s THEN \'Oui\'' + ' ELSE \'Non\' END)' + % { 'field': quote(field_name), }, 'filter': True, @@ -1230,12 +1319,14 @@ class WcsFormdefFeeder(object): elif field.type == 'string': if has_digits_validation(field): # we will define a SUM measure instead - cube['measures'].append({ - 'name': 'sum_' + dimension_name, - 'label': 'total du champ « %s »' % dimension_label, - 'type': 'integer', - 'expression': 'SUM({fact_table}.%s)' % quote(field_name), - }) + cube['measures'].append( + { + 'name': 'sum_' + dimension_name, + 'label': 'total du champ « %s »' % dimension_label, + 'type': 'integer', + 'expression': 'SUM({fact_table}.%s)' % quote(field_name), + } + ) continue else: dimension = { diff --git a/wcs_olap/signature.py b/wcs_olap/signature.py index b57e21f..1920efe 100644 --- a/wcs_olap/signature.py +++ b/wcs_olap/signature.py @@ -1,9 +1,9 @@ -import urllib.parse as urlparse -import datetime import base64 -import hmac +import datetime import hashlib +import hmac import random +import urllib.parse as urlparse '''Simple signature scheme for query strings''' # from http://repos.entrouvert.org/portail-citoyen.git/tree/portail_citoyen/apps/data_source_plugin/signature.py @@ -24,10 +24,7 @@ def sign_query(query, key, algo='sha256', timestamp=None, nonce=None): new_query = query if new_query: new_query += '&' - new_query += urlparse.urlencode(( - ('algo', algo), - ('timestamp', timestamp), - ('nonce', nonce))) + new_query += urlparse.urlencode((('algo', algo), ('timestamp', timestamp), ('nonce', nonce))) signature = base64.b64encode(sign_string(new_query, key, algo=algo)) new_query += '&signature=' + urlparse.quote(signature) return new_query @@ -50,8 +47,7 @@ def check_url(url, key, known_nonce=None, timedelta=30): def check_query(query, key, known_nonce=None, timedelta=30): parsed = urlparse.parse_qs(query) - if not ('signature' in parsed and 'algo' in parsed - and 'timestamp' in parsed and 'nonce' in parsed): + if not ('signature' in parsed and 'algo' in parsed and 'timestamp' in parsed and 'nonce' in parsed): return False unsigned_query, signature_content = query.split('&signature=', 1) if '&' in signature_content: diff --git a/wcs_olap/utils.py b/wcs_olap/utils.py index 3a3f54f..24cb50e 100644 --- a/wcs_olap/utils.py +++ b/wcs_olap/utils.py @@ -1,4 +1,4 @@ -class Whatever(object): +class Whatever: def __call__(*args, **kwargs): pass diff --git a/wcs_olap/wcs_api.py b/wcs_olap/wcs_api.py index 63ee58d..be6b893 100644 --- a/wcs_olap/wcs_api.py +++ b/wcs_olap/wcs_api.py @@ -14,19 +14,18 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -import collections import base64 -import copy -import logging -import datetime +import collections import contextlib +import copy +import datetime import json - -import requests -import isodate - +import logging import urllib.parse as urlparse +import isodate +import requests + from . import signature @@ -34,7 +33,7 @@ class WcsApiError(Exception): pass -class JSONFile(object): +class JSONFile: def __init__(self, d): self.d = d @@ -62,7 +61,7 @@ def to_dict(o): return o -class BaseObject(object): +class BaseObject: def __init__(self, wcs_api, **kwargs): self._wcs_api = wcs_api self.__dict__.update(**kwargs) @@ -82,7 +81,7 @@ class FormDataWorkflow(BaseObject): fields = None def __init__(self, wcs_api, **kwargs): - super(FormDataWorkflow, self).__init__(wcs_api, **kwargs) + super().__init__(wcs_api, **kwargs) self.status = BaseObject(wcs_api, **self.status) if self.status else None self.real_status = BaseObject(wcs_api, **self.real_status) if self.real_status else None self.fields = self.fields or {} @@ -101,7 +100,7 @@ class Evolution(BaseObject): parts = None def __init__(self, wcs_api, **kwargs): - super(Evolution, self).__init__(wcs_api, **kwargs) + super().__init__(wcs_api, **kwargs) self.time = isodate.parse_datetime(self.time) self.parts = [BaseObject(wcs_api, **part) for part in self.parts or []] self.who = EvolutionUser(wcs_api, **self.who) if self.who else None @@ -117,7 +116,7 @@ class FormData(BaseObject): def __init__(self, wcs_api, forms, **kwargs): self.forms = forms - super(FormData, self).__init__(wcs_api, **kwargs) + super().__init__(wcs_api, **kwargs) self.receipt_time = isodate.parse_datetime(self.receipt_time) self.submission = BaseObject(wcs_api, **self.submission or {}) self.workflow = FormDataWorkflow(wcs_api, **self.workflow or {}) @@ -157,7 +156,7 @@ class FormData(BaseObject): @property def endpoint_delay(self): '''Compute delay as the time when the last not endpoint status precedes an endpoint - status.''' + status.''' statuses_map = self.formdef.schema.workflow.statuses_map s = 0 for evo in self.evolution[::-1]: @@ -188,13 +187,13 @@ class Workflow(BaseObject): fields = None def __init__(self, wcs_api, **kwargs): - super(Workflow, self).__init__(wcs_api, **kwargs) + super().__init__(wcs_api, **kwargs) self.statuses = [BaseObject(wcs_api, **v) for v in (self.statuses or [])] assert not hasattr(self.statuses[0], 'startpoint'), 'startpoint is exported by w.c.s. FIXME' for status in self.statuses: status.startpoint = False self.statuses[0].startpoint = True - self.statuses_map = dict((s.id, s) for s in self.statuses) + self.statuses_map = {s.id: s for s in self.statuses} self.fields = [Field(wcs_api, **field) for field in (self.fields or [])] @@ -214,13 +213,13 @@ class Schema(BaseObject): geolocations = None def __init__(self, wcs_api, **kwargs): - super(Schema, self).__init__(wcs_api, **kwargs) + super().__init__(wcs_api, **kwargs) self.workflow = Workflow(wcs_api, **self.workflow) self.fields = [Field(wcs_api, **f) for f in self.fields or []] self.geolocations = sorted((k, v) for k, v in (self.geolocations or {}).items()) -class FormDatas(object): +class FormDatas: def __init__(self, wcs_api, formdef, full=False, anonymize=False): self.wcs_api = wcs_api self.formdef = formdef @@ -231,13 +230,14 @@ class FormDatas(object): def __getitem__(self, slice_or_id): # get batch of forms if isinstance(slice_or_id, slice): + def helper(): if slice_or_id.stop <= slice_or_id.start or slice_or_id.step: raise ValueError('invalid slice %s' % slice_or_id) offset = slice_or_id.start limit = slice_or_id.stop - slice_or_id.start - url_parts = ['api/forms/{self.formdef.slug}/list'.format(self=self)] + url_parts = [f'api/forms/{self.formdef.slug}/list'] query = {} query['full'] = 'on' if self._full else 'off' if offset: @@ -252,10 +252,11 @@ class FormDatas(object): if not d.get('receipt_time'): continue yield FormData(wcs_api=self.wcs_api, forms=self, formdef=self.formdef, **d) + return helper() # or get one form else: - url_parts = ['api/forms/{formdef.slug}/{id}/'.format(formdef=self.formdef, id=slice_or_id)] + url_parts = [f'api/forms/{self.formdef.slug}/{slice_or_id}/'] if self.anonymize: url_parts.append('?anonymise=true') d = self.wcs_api.get_json(*url_parts) @@ -282,7 +283,7 @@ class FormDatas(object): start = 0 while True: empty = True - for formdef in self[start:start + self.batch_size]: + for formdef in self[start : start + self.batch_size]: empty = False yield formdef if empty: @@ -290,14 +291,14 @@ class FormDatas(object): start += self.batch_size def __len__(self): - return len(list((o for o in self))) + return len(list(o for o in self)) class CancelSubmitError(Exception): pass -class FormDefSubmit(object): +class FormDefSubmit: formdef = None data = None user_email = None @@ -346,16 +347,13 @@ class FormDefSubmit(object): if value is None or value == {} or value == []: self.data.pop(varname, None) elif hasattr(self, '_set_type_%s' % field.type): - getattr(self, '_set_type_%s' % field.type)( - varname=varname, - field=field, - value=value, **kwargs) + getattr(self, '_set_type_%s' % field.type)(varname=varname, field=field, value=value, **kwargs) else: self.data[varname] = value def _set_type_item(self, varname, field, value, **kwargs): if isinstance(value, dict): - if not set(value).issuperset(set(['id', 'text'])): + if not set(value).issuperset({'id', 'text'}): raise ValueError('item field value must have id and text value') # clean previous values self.data.pop(varname, None) @@ -378,7 +376,7 @@ class FormDefSubmit(object): has_dict = False for choice in value: if isinstance(value, dict): - if not set(value).issuperset(set(['id', 'text'])): + if not set(value).issuperset({'id', 'text'}): raise ValueError('items field values must have id and text value') has_dict = True if has_dict: @@ -407,7 +405,7 @@ class FormDefSubmit(object): elif isinstance(value, bytes): content = base64.b64encode(value).decode('ascii') elif isinstance(value, dict): - if not set(value).issuperset(set(['filename', 'content'])): + if not set(value).issuperset({'filename', 'content'}): raise ValueError('file field needs a dict value with filename and content') content = value['content'] filename = value['filename'] @@ -432,7 +430,7 @@ class FormDefSubmit(object): def _set_type_map(self, varname, field, value): if not isinstance(value, dict): raise TypeError('value must be a dict for a map field') - if set(value) != set(['lat', 'lon']): + if set(value) != {'lat', 'lon'}: raise ValueError('map field expect keys lat and lon') self.data[varname] = value @@ -470,22 +468,19 @@ class FormDef(BaseObject): @property def schema(self): if not hasattr(self, '_schema'): - d = self._wcs_api.get_json('api/formdefs/{self.slug}/schema'.format(self=self)) + d = self._wcs_api.get_json(f'api/formdefs/{self.slug}/schema') self._schema = Schema(self._wcs_api, **d) return self._schema @contextlib.contextmanager def submit(self, **kwargs): - submitter = FormDefSubmit( - wcs_api=self._wcs_api, - formdef=self, - **kwargs) + submitter = FormDefSubmit(wcs_api=self._wcs_api, formdef=self, **kwargs) try: yield submitter except CancelSubmitError: return payload = submitter.payload() - d = self._wcs_api.post_json(payload, 'api/formdefs/{self.slug}/submit'.format(self=self)) + d = self._wcs_api.post_json(payload, f'api/formdefs/{self.slug}/submit') if d['err'] != 0: raise WcsApiError('submited returned an error: %s' % d) submitter.result = BaseObject(self._wcs_api, **d['data']) @@ -499,7 +494,7 @@ class Category(BaseObject): pass -class WcsObjects(object): +class WcsObjects: url = None object_class = None @@ -519,7 +514,7 @@ class WcsObjects(object): yield self.object_class(wcs_api=self.wcs_api, **d) def __len__(self): - return len(list((o for o in self))) + return len(list(o for o in self)) class Roles(WcsObjects): @@ -538,9 +533,19 @@ class Categories(WcsObjects): object_class = Category -class WcsApi(object): - def __init__(self, url, email=None, name_id=None, batch_size=1000, - session=None, logger=None, orig=None, key=None, verify=True): +class WcsApi: + def __init__( + self, + url, + email=None, + name_id=None, + batch_size=1000, + session=None, + logger=None, + orig=None, + key=None, + verify=True, + ): self.url = url self.email = email self.name_id = name_id @@ -602,7 +607,8 @@ class WcsApi(object): final_url, data=json.dumps(data), headers={'content-type': 'application/json'}, - verify=self.verify) + verify=self.verify, + ) response.raise_for_status() except requests.RequestException as e: content = getattr(getattr(e, 'response', None), 'content', None)