migrate to python3 (#39430)

This commit is contained in:
Benjamin Dauvergne 2020-01-30 21:03:06 +01:00
parent c304e0f8f8
commit dce418d99d
11 changed files with 568 additions and 284 deletions

10
debian/control vendored
View File

@ -2,13 +2,11 @@ Source: wcs-olap
Section: python Section: python
Priority: optional Priority: optional
Maintainer: Benjamin Dauvergne <bdauvergne@entrouvert.com> Maintainer: Benjamin Dauvergne <bdauvergne@entrouvert.com>
Build-Depends: python-setuptools (>= 0.6b3), python-all (>= 2.6), debhelper (>= 9), dh-python Build-Depends: python3-setuptools, python3-all, debhelper (>= 9), dh-python
Standards-Version: 3.9.1 Standards-Version: 3.9.6
X-Python-Version: >= 2.7 Homepage: http://dev.entrouvert.org/projects/wcs-olap/
Homepage: http://dev.entrouvert.org/projects/publik-bi/
Package: wcs-olap Package: wcs-olap
Architecture: all Architecture: all
Depends: ${python:Depends} Depends: ${python3:Depends}
XB-Python-Version: ${python:Versions}
Description: Export w.c.s. datas into a snowflake schema built on PostgreSQL Description: Export w.c.s. datas into a snowflake schema built on PostgreSQL

View File

@ -1,3 +1,3 @@
isodate python-isodate isodate python3-isodate
psycopg2 python-psycopg2 psycopg2 python3-psycopg2
cached_property python-cached-property cached_property python3-cached-property

4
debian/rules vendored
View File

@ -1,6 +1,8 @@
#!/usr/bin/make -f #!/usr/bin/make -f
export PYBUILD_NAME=wcs-olap
%: %:
dh $@ --with python2 dh $@ --with python3 --buildsystem=pybuild

View File

@ -9,37 +9,39 @@ from setuptools.command.sdist import sdist
class eo_sdist(sdist): class eo_sdist(sdist):
def run(self): def run(self):
print "creating VERSION file"
if os.path.exists('VERSION'): if os.path.exists('VERSION'):
os.remove('VERSION') os.remove('VERSION')
version = get_version() version = get_version()
version_file = open('VERSION', 'w') with open('VERSION', 'w') as fd:
version_file.write(version) fd.write(version)
version_file.close()
sdist.run(self) sdist.run(self)
print "removing VERSION file"
if os.path.exists('VERSION'): if os.path.exists('VERSION'):
os.remove('VERSION') os.remove('VERSION')
def get_version(): def get_version():
'''Use the VERSION, if absent generates a version with git describe, if not '''Use the VERSION, if absent generates a version with git describe, if not
tag exists, take 0.0.0- and add the length of the commit log. tag exists, take 0.0- and add the length of the commit log.
''' '''
if os.path.exists('VERSION'): if os.path.exists('VERSION'):
with open('VERSION', 'r') as v: with open('VERSION', 'r') as v:
return v.read() return v.read()
if os.path.exists('.git'): if os.path.exists('.git'):
p = subprocess.Popen(['git', 'describe', '--dirty', '--match=v*'], stdout=subprocess.PIPE, p = subprocess.Popen(
stderr=subprocess.PIPE) ['git', 'describe', '--dirty=.dirty', '--match=v*'],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
result = p.communicate()[0] result = p.communicate()[0]
if p.returncode == 0: if p.returncode == 0:
result = result.split()[0][1:] result = result.decode('ascii').strip()[1:] # strip spaces/newlines and initial v
if '-' in result: # not a tagged version
real_number, commit_count, commit_hash = result.split('-', 2)
version = '%s.post%s+%s' % (real_number, commit_count, commit_hash)
else:
version = result
return version
else: else:
result = '0.0.0-%s' % len(subprocess.check_output( return '0.0.post%s' % len(subprocess.check_output(['git', 'rev-list', 'HEAD']).splitlines())
['git', 'rev-list', 'HEAD']).splitlines()) return '0.0'
return result.replace('-', '.').replace('.g', '+g')
return '0.0.0'
setup(name="wcs-olap", setup(name="wcs-olap",
@ -54,7 +56,13 @@ setup(name="wcs-olap",
maintainer_email="bdauvergne@entrouvert.com", maintainer_email="bdauvergne@entrouvert.com",
packages=find_packages(), packages=find_packages(),
include_package_data=True, include_package_data=True,
install_requires=['requests', 'psycopg2', 'isodate', 'six', 'cached-property'], install_requires=[
'requests',
'psycopg2',
'isodate',
'six',
'cached-property'
],
entry_points={ entry_points={
'console_scripts': ['wcs-olap=wcs_olap.cmd:main'], 'console_scripts': ['wcs-olap=wcs_olap.cmd:main'],
}, },

View File

@ -1,12 +1,9 @@
from __future__ import unicode_literals
import json import json
import pytest import pytest
import pathlib
import requests import requests
import pathlib2 import httmock
import mock
import utils import utils
@ -90,34 +87,41 @@ def test_wcs_fixture(wcs, postgres_db, tmpdir, olap_cmd, caplog):
# verify JSON schema # verify JSON schema
with (olap_cmd.model_dir / 'olap.model').open() as fd, \ with (olap_cmd.model_dir / 'olap.model').open() as fd, \
(pathlib2.Path(__file__).parent / 'olap.model').open() as fd2: (pathlib.Path(__file__).parent / 'olap.model').open() as fd2:
json_schema = json.load(fd) json_schema = json.load(fd)
expected_json_schema = json.load(fd2) expected_json_schema = json.load(fd2)
expected_json_schema['pg_dsn'] = postgres_db.dsn expected_json_schema['pg_dsn'] = postgres_db.dsn
assert json_schema == expected_json_schema assert json_schema == expected_json_schema
def test_requests_exception(wcs, postgres_db, tmpdir, olap_cmd, caplog): def test_requests_exception(wcs, postgres_db, tmpdir, olap_cmd, caplog):
with mock.patch('requests.get', side_effect=requests.RequestException('wat!')): @httmock.urlmatch()
def requests_raise(url, request):
raise requests.RequestException('wat!')
with httmock.HTTMock(requests_raise):
with pytest.raises(SystemExit): with pytest.raises(SystemExit):
olap_cmd(no_log_errors=False) olap_cmd(no_log_errors=False)
assert 'wat!' in caplog.text assert 'wat!' in caplog.text
def test_requests_not_ok(wcs, postgres_db, tmpdir, olap_cmd, caplog): def test_requests_not_ok(wcs, postgres_db, tmpdir, olap_cmd, caplog):
with mock.patch('requests.get') as mocked_get: @httmock.urlmatch()
mocked_get.return_value.ok = False def return_401(url, request):
mocked_get.return_value.status_code = 401 return {'status_code': 401, 'content': {"err": 1, "err_desc": "invalid signature"}}
mocked_get.return_value.text = '{"err": 1, "err_desc": "invalid signature"}'
with httmock.HTTMock(return_401):
with pytest.raises(SystemExit): with pytest.raises(SystemExit):
olap_cmd(no_log_errors=False) olap_cmd(no_log_errors=False)
assert 'invalid signature' in caplog.text assert 'invalid signature' in caplog.text
def test_requests_not_json(wcs, postgres_db, tmpdir, olap_cmd, caplog): def test_requests_not_json(wcs, postgres_db, tmpdir, olap_cmd, caplog):
with mock.patch('requests.get') as mocked_get: @httmock.urlmatch()
mocked_get.return_value.ok = True def return_invalid_json(url, request):
mocked_get.return_value.json.side_effect = ValueError('invalid JSON') return 'x'
with httmock.HTTMock(return_invalid_json):
with pytest.raises(SystemExit): with pytest.raises(SystemExit):
olap_cmd(no_log_errors=False) olap_cmd(no_log_errors=False)
assert 'Invalid JSON content' in caplog.text assert 'Invalid JSON content' in caplog.text

View File

@ -5,7 +5,7 @@
[tox] [tox]
toxworkdir = {env:TMPDIR:/tmp}/tox-{env:USER}/wcs-olap/{env:BRANCH_NAME:} toxworkdir = {env:TMPDIR:/tmp}/tox-{env:USER}/wcs-olap/{env:BRANCH_NAME:}
envlist = py2-coverage envlist = py3-coverage
[testenv] [testenv]
usedevelop = true usedevelop = true
@ -17,14 +17,17 @@ deps =
pytest pytest
pytest-cov pytest-cov
pytest-random pytest-random
quixote<3.0 quixote>=3
psycopg2-binary psycopg2-binary
vobject vobject
pyproj pyproj
django-ratelimit<3 django-ratelimit<3
gadjo gadjo
mock httmock
django>=1.11,<1.12 django>=1.11,<1.12
commands = commands =
./get_wcs.sh ./get_wcs.sh
py.test {env:COVERAGE:} {posargs:--random-group tests} py.test {env:COVERAGE:} {posargs:--random-group tests}
[pytest]
junit_family=xunit2

View File

@ -1,14 +1,12 @@
import sys
import argparse import argparse
import ConfigParser import configparser
import os import locale
import logging import logging
import logging.config import logging.config
from . import wcs_api import os
from .feeder import WcsOlapFeeder import sys
import locale
from . import tb from . import wcs_api, feeder
def main(): def main():
@ -16,13 +14,10 @@ def main():
main2() main2()
except SystemExit: except SystemExit:
raise raise
except:
tb.print_tb()
raise SystemExit(1)
def get_config(path=None): def get_config(path=None):
config = ConfigParser.ConfigParser() config = configparser.ConfigParser()
global_config_path = '/etc/wcs-olap/config.ini' global_config_path = '/etc/wcs-olap/config.ini'
user_config_path = os.path.expanduser('~/.wcs-olap.ini') user_config_path = os.path.expanduser('~/.wcs-olap.ini')
if not path: if not path:
@ -60,14 +55,14 @@ def main2():
fake = args.fake fake = args.fake
config = get_config(path=args.config_path) config = get_config(path=args.config_path)
# list all known urls # list all known urls
urls = [url for url in config.sections() if url.startswith('http://') or urls = [url for url in config.sections() if url.startswith('http://')
url.startswith('https://')] or url.startswith('https://')]
defaults = {} defaults = {}
if not args.all: if not args.all:
try: try:
url = args.url or urls[0] url = args.url or urls[0]
except IndexError: except IndexError:
print 'no url found' print('no url found')
raise SystemExit(1) raise SystemExit(1)
urls = [url] urls = [url]
if config.has_section(args.url): if config.has_section(args.url):
@ -97,22 +92,23 @@ def main2():
pg_dsn = defaults['pg_dsn'] pg_dsn = defaults['pg_dsn']
slugs = defaults.get('slugs', '').strip().split() or getattr(args, 'slug', []) slugs = defaults.get('slugs', '').strip().split() or getattr(args, 'slug', [])
batch_size = int(defaults.get('batch_size', 500)) batch_size = int(defaults.get('batch_size', 500))
except KeyError, e: except KeyError as e:
failure = True failure = True
logger.error('configuration incomplete for %s: %s', url, e) logger.error('configuration incomplete for %s: %s', url, e)
else: else:
try: try:
api = wcs_api.WcsApi(url=url, orig=orig, key=key, slugs=slugs, api = wcs_api.WcsApi(url=url, orig=orig, key=key,
verify=defaults.get('verify', 'True') == 'True', batch_size=batch_size,
batch_size=batch_size) verify=(defaults.get('verify', 'True') == 'True'))
logger.info('starting synchronizing w.c.s. at %r with PostgreSQL at %s', url, logger.info('starting synchronizing w.c.s. at %r with PostgreSQL at %s', url,
pg_dsn) pg_dsn)
feeder = WcsOlapFeeder(api=api, schema=schema, pg_dsn=pg_dsn, logger=logger, olap_feeder = feeder.WcsOlapFeeder(
config=defaults, do_feed=feed, fake=fake) api=api, schema=schema, pg_dsn=pg_dsn, logger=logger,
feeder.feed() config=defaults, do_feed=feed, fake=fake, slugs=slugs)
olap_feeder.feed()
logger.info('finished') logger.info('finished')
feed_result = False feed_result = False
except: except Exception:
if args.no_log_errors: if args.no_log_errors:
raise raise
feed_result = True feed_result = True

View File

@ -2,7 +2,7 @@
from __future__ import unicode_literals from __future__ import unicode_literals
from collections import OrderedDict, Counter from collections import OrderedDict
import datetime import datetime
import six import six
import copy import copy
@ -10,7 +10,7 @@ import itertools
import os import os
import json import json
import hashlib import hashlib
from utils import Whatever from .utils import Whatever
import psycopg2 import psycopg2
from cached_property import cached_property from cached_property import cached_property
@ -78,8 +78,9 @@ class WcsOlapFeeder(object):
status_to_id = dict((c[1], c[0]) for c in channels) status_to_id = dict((c[1], c[0]) for c in channels)
id_to_status = dict((c[0], c[1]) for c in channels) id_to_status = dict((c[0], c[1]) for c in channels)
def __init__(self, api, pg_dsn, schema, logger=None, config=None, do_feed=True, fake=False): def __init__(self, api, pg_dsn, schema, logger=None, config=None, do_feed=True, fake=False, slugs=None):
self.api = api self.api = api
self.slugs = slugs
self.fake = fake self.fake = fake
self.logger = logger or Whatever() self.logger = logger or Whatever()
self.schema = schema self.schema = schema
@ -291,7 +292,7 @@ class WcsOlapFeeder(object):
@cached_property @cached_property
def formdefs(self): def formdefs(self):
return self.api.formdefs return [formdef for formdef in self.api.formdefs if not self.slugs or formdef.slug in self.slugs]
@cached_property @cached_property
def roles(self): def roles(self):
@ -441,7 +442,7 @@ CREATE TABLE public.dates AS (SELECT
if isinstance(o, six.string_types): if isinstance(o, six.string_types):
return o.format(**ctx) return o.format(**ctx)
elif isinstance(o, dict): elif isinstance(o, dict):
return dict((k, helper(v)) for k, v in o.iteritems()) return dict((k, helper(v)) for k, v in o.items())
elif isinstance(o, list): elif isinstance(o, list):
return [helper(v) for v in o] return [helper(v) for v in o]
elif isinstance(o, (bool, int, float)): elif isinstance(o, (bool, int, float)):
@ -466,8 +467,8 @@ CREATE TABLE public.dates AS (SELECT
# categories # categories
tmp_cat_map = self.create_labeled_table( tmp_cat_map = self.create_labeled_table(
'category', enumerate(c.name for c in self.categories), comment='catégorie') 'category', enumerate(c.title for c in self.categories), comment='catégorie')
self.categories_mapping = dict((c.id, tmp_cat_map[c.name]) for c in self.categories) self.categories_mapping = dict((c.slug, tmp_cat_map[c.title]) for c in self.categories)
self.create_labeled_table('hour', zip(range(0, 24), map(str, range(0, 24))), self.create_labeled_table('hour', zip(range(0, 24), map(str, range(0, 24))),
comment='heures') comment='heures')
@ -506,7 +507,7 @@ CREATE TABLE public.dates AS (SELECT
'geolocation_base': 'position géographique', 'geolocation_base': 'position géographique',
} }
self.create_table('{generic_formdata_table}', self.columns) self.create_table('{generic_formdata_table}', self.columns)
for at, comment in self.comments.iteritems(): for at, comment in self.comments.items():
self.ex('COMMENT ON COLUMN {generic_formdata_table}.%s IS %%s' % at, vars=(comment,)) self.ex('COMMENT ON COLUMN {generic_formdata_table}.%s IS %%s' % at, vars=(comment,))
self.ex('COMMENT ON TABLE {generic_formdata_table} IS %s', vars=('tous les formulaires',)) self.ex('COMMENT ON TABLE {generic_formdata_table} IS %s', vars=('tous les formulaires',))
# evolutions # evolutions
@ -663,7 +664,7 @@ class WcsFormdefFeeder(object):
} }
# add function fields # add function fields
for function, name in self.formdef.schema.workflow.functions.iteritems(): for function, name in self.formdef.schema.workflow.functions.items():
at = 'function_%s' % slugify(function) at = 'function_%s' % slugify(function)
columns[at] = { columns[at] = {
'sql_col_name': at, 'sql_col_name': at,
@ -746,7 +747,7 @@ class WcsFormdefFeeder(object):
values = [] values = []
generic_evolution_values = [] generic_evolution_values = []
evolution_values = [] evolution_values = []
for data in self.formdef.datas: for data in self.formdef.formdatas.anonymized.full:
json_data = {} json_data = {}
# ignore formdata without status # ignore formdata without status
@ -818,7 +819,7 @@ class WcsFormdefFeeder(object):
v = '(%.6f, %.6f)' % (v.get('lon'), v.get('lat')) v = '(%.6f, %.6f)' % (v.get('lon'), v.get('lat'))
row['geolocation_%s' % geolocation] = v row['geolocation_%s' % geolocation] = v
# add function fields value # add function fields value
for function, name in self.formdef.schema.workflow.functions.iteritems(): for function, name in self.formdef.schema.workflow.functions.items():
try: try:
v = data.functions[function] v = data.functions[function]
except KeyError: except KeyError:
@ -949,7 +950,7 @@ class WcsFormdefFeeder(object):
}) })
# add dimension for function # add dimension for function
for function, name in self.formdef.schema.workflow.functions.iteritems(): for function, name in self.formdef.schema.workflow.functions.items():
at = 'function_%s' % slugify(function) at = 'function_%s' % slugify(function)
cube['joins'].append({ cube['joins'].append({
'name': at, 'name': at,

View File

@ -1,12 +1,12 @@
import urllib.parse as urlparse
import datetime import datetime
import base64 import base64
import hmac import hmac
import hashlib import hashlib
import urllib
import random import random
import urlparse
'''Simple signature scheme for query strings''' '''Simple signature scheme for query strings'''
# from http://repos.entrouvert.org/portail-citoyen.git/tree/portail_citoyen/apps/data_source_plugin/signature.py
def sign_url(url, key, algo='sha256', timestamp=None, nonce=None): def sign_url(url, key, algo='sha256', timestamp=None, nonce=None):
@ -20,23 +20,25 @@ def sign_query(query, key, algo='sha256', timestamp=None, nonce=None):
timestamp = datetime.datetime.utcnow() timestamp = datetime.datetime.utcnow()
timestamp = timestamp.strftime('%Y-%m-%dT%H:%M:%SZ') timestamp = timestamp.strftime('%Y-%m-%dT%H:%M:%SZ')
if nonce is None: if nonce is None:
nonce = hex(random.SystemRandom().getrandbits(128))[2:-1] nonce = hex(random.getrandbits(128))[2:]
new_query = query new_query = query
if new_query: if new_query:
new_query += '&' new_query += '&'
new_query += urllib.urlencode(( new_query += urlparse.urlencode((
('algo', algo), ('algo', algo),
('timestamp', timestamp), ('timestamp', timestamp),
('nonce', nonce))) ('nonce', nonce)))
signature = base64.b64encode(sign_string(new_query, key, algo=algo)) signature = base64.b64encode(sign_string(new_query, key, algo=algo))
new_query += '&signature=' + urllib.quote(signature) new_query += '&signature=' + urlparse.quote(signature)
return new_query return new_query
def sign_string(s, key, algo='sha256', timedelta=30): def sign_string(s, key, algo='sha256', timedelta=30):
digestmod = getattr(hashlib, algo) digestmod = getattr(hashlib, algo)
if isinstance(key, unicode): if isinstance(key, str):
key = key.encode('utf-8') key = key.encode('utf-8')
if isinstance(s, str):
s = s.encode('utf-8')
hash = hmac.HMAC(key, digestmod=digestmod, msg=s) hash = hmac.HMAC(key, digestmod=digestmod, msg=s)
return hash.digest() return hash.digest()
@ -48,12 +50,17 @@ def check_url(url, key, known_nonce=None, timedelta=30):
def check_query(query, key, known_nonce=None, timedelta=30): def check_query(query, key, known_nonce=None, timedelta=30):
parsed = urlparse.parse_qs(query) parsed = urlparse.parse_qs(query)
if not ('signature' in parsed and 'algo' in parsed
and 'timestamp' in parsed and 'nonce' in parsed):
return False
unsigned_query, signature_content = query.split('&signature=', 1)
if '&' in signature_content:
return False # signature must be the last parameter
signature = base64.b64decode(parsed['signature'][0]) signature = base64.b64decode(parsed['signature'][0])
algo = parsed['algo'][0] algo = parsed['algo'][0]
timestamp = parsed['timestamp'][0] timestamp = parsed['timestamp'][0]
timestamp = datetime.datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%SZ') timestamp = datetime.datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%SZ')
nonce = parsed['nonce'] nonce = parsed['nonce']
unsigned_query = query.split('&signature=')[0]
if known_nonce is not None and known_nonce(nonce): if known_nonce is not None and known_nonce(nonce):
return False return False
if abs(datetime.datetime.utcnow() - timestamp) > datetime.timedelta(seconds=timedelta): if abs(datetime.datetime.utcnow() - timestamp) > datetime.timedelta(seconds=timedelta):
@ -68,5 +75,12 @@ def check_string(s, signature, key, algo='sha256'):
return False return False
res = 0 res = 0
for a, b in zip(signature, signature2): for a, b in zip(signature, signature2):
res |= ord(a) ^ ord(b) res |= a ^ b
return res == 0 return res == 0
if __name__ == '__main__':
key = '12345'
signed_query = sign_query('NameId=_12345&orig=montpellier', key)
assert check_query(signed_query, key, timedelta=0) is False
assert check_query(signed_query, key) is True

View File

@ -1,55 +0,0 @@
from StringIO import StringIO
import sys
import linecache
def print_tb():
exc_type, exc_value, tb = sys.exc_info()
if exc_value:
exc_value = unicode(str(exc_value), errors='ignore')
error_file = StringIO()
limit = None
if hasattr(sys, 'tracebacklimit'):
limit = sys.tracebacklimit
print >>error_file, "Exception:"
print >>error_file, " type = '%s', value = '%s'" % (exc_type, exc_value)
print >>error_file
# format the traceback
print >>error_file, 'Stack trace (most recent call first):'
n = 0
while tb is not None and (limit is None or n < limit):
frame = tb.tb_frame
function = frame.f_code.co_name
filename = frame.f_code.co_filename
exclineno = frame.f_lineno
locals = frame.f_locals.items()
print >>error_file, ' File "%s", line %s, in %s' % (filename, exclineno, function)
linecache.checkcache(filename)
for lineno in range(exclineno - 2, exclineno + 3):
line = linecache.getline(filename, lineno, frame.f_globals)
if line:
if lineno == exclineno:
print >>error_file, '>%5s %s' % (lineno, line.rstrip())
else:
print >>error_file, ' %5s %s' % (lineno, line.rstrip())
print >>error_file
if locals:
print >>error_file, " locals: "
for key, value in locals:
print >>error_file, " %s =" % key,
try:
repr_value = repr(value)
if len(repr_value) > 10000:
repr_value = repr_value[:10000] + ' [...]'
print >>error_file, repr_value,
except:
print >>error_file, "<ERROR WHILE PRINTING VALUE>",
print >>error_file
print >>error_file
tb = tb.tb_next
n = n + 1
print >>sys.stderr, error_file.getvalue()

View File

@ -1,61 +1,80 @@
import six # wcs_olap
import requests # Copyright (C) 2020 Entr'ouvert
import urlparse #
import urllib # This program is free software: you can redistribute it and/or modify it
import isodate # under the terms of the GNU Affero General Public License as published
import logging # by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import collections
import base64
import copy
import logging
import datetime
import contextlib
import json
import requests
import isodate
import urllib.parse as urlparse
from . import signature from . import signature
logger = logging.getLogger(__name__)
def exception_to_text(e):
try:
return six.text_type(e)
except Exception:
pass
try:
return six.text_type(e.decode('utf8'))
except Exception:
pass
try:
return six.text_type(repr(e))
except Exception:
pass
try:
args = e.args
try:
content = six.text_type(repr(args)) if args != [] else ''
except Exception:
content = '<exception-while-rendering-args>'
except AttributeError:
content = ''
return u'%s(%s)' % (e.__class__.__name__, content)
class WcsApiError(Exception): class WcsApiError(Exception):
def __init__(self, message, **kwargs): pass
super(WcsApiError, self).__init__(message)
self.kwargs = kwargs
def __str__(self):
kwargs = self.kwargs.copy() class JSONFile(object):
if 'exception' in kwargs: def __init__(self, d):
kwargs['exception'] = exception_to_text(kwargs['exception']) self.d = d
return '%s: %s' % (self.args[0], ' '.join('%s=%s' % (key, value) for key, value in kwargs.items()))
@property
def filename(self):
return self.d.get('filename', '')
@property
def content_type(self):
return self.d.get('content_type', 'application/octet-stream')
@property
def content(self):
return base64.b64decode(self.d['content'])
def to_dict(o):
if hasattr(o, 'to_dict'):
return o.to_dict()
elif isinstance(o, dict):
return {k: to_dict(v) for k, v in o.items()}
elif isinstance(o, (list, tuple)):
return [to_dict(v) for v in o]
else:
return o
class BaseObject(object): class BaseObject(object):
def __init__(self, wcs_api, **kwargs): def __init__(self, wcs_api, **kwargs):
self.__wcs_api = wcs_api self._wcs_api = wcs_api
self.__dict__.update(**kwargs) self.__dict__.update(**kwargs)
def to_dict(self):
d = collections.OrderedDict()
for key, value in self.__dict__.items():
if key[0] == '_':
continue
d[key] = to_dict(value)
return d
class FormDataWorkflow(BaseObject): class FormDataWorkflow(BaseObject):
status = None status = None
@ -92,17 +111,24 @@ class Evolution(BaseObject):
class FormData(BaseObject): class FormData(BaseObject):
geolocations = None geolocations = None
evolution = None evolution = None
submission = None
workflow = None
roles = None
with_files = False
def __init__(self, wcs_api, **kwargs): def __init__(self, wcs_api, forms, **kwargs):
self.forms = forms
super(FormData, self).__init__(wcs_api, **kwargs) super(FormData, self).__init__(wcs_api, **kwargs)
self.receipt_time = isodate.parse_datetime(self.receipt_time) self.receipt_time = isodate.parse_datetime(self.receipt_time)
self.submission = BaseObject(wcs_api, **self.submission) if self.submission:
self.workflow = FormDataWorkflow(wcs_api, **self.workflow) self.submission = BaseObject(wcs_api, **self.submission)
if self.workflow:
self.workflow = FormDataWorkflow(wcs_api, **self.workflow)
self.evolution = [Evolution(wcs_api, **evo) for evo in self.evolution or []] self.evolution = [Evolution(wcs_api, **evo) for evo in self.evolution or []]
self.functions = {} self.functions = {}
self.concerned_roles = [] self.concerned_roles = []
self.action_roles = [] self.action_roles = []
for function in self.roles: for function in self.roles or []:
roles = [Role(wcs_api, **r) for r in self.roles[function]] roles = [Role(wcs_api, **r) for r in self.roles[function]]
if function == 'concerned': if function == 'concerned':
self.concerned_roles.extend(roles) self.concerned_roles.extend(roles)
@ -113,11 +139,23 @@ class FormData(BaseObject):
self.functions[function] = roles[0] self.functions[function] = roles[0]
except IndexError: except IndexError:
self.functions[function] = None self.functions[function] = None
del self.roles if 'roles' in self.__dict__:
del self.roles
def __repr__(self): def __str__(self):
return '<{klass} {display_id!r}>'.format(klass=self.__class__.__name__, return '{self.formdef} - {self.id}'.format(self=self)
display_id=self.id)
@property
def full(self):
if self.with_files:
return self
if not hasattr(self, '_full'):
self._full = self.forms[self.id]
return self._full
@property
def anonymized(self):
return self.forms.anonymized[self.id]
@property @property
def endpoint_delay(self): def endpoint_delay(self):
@ -140,6 +178,13 @@ class FormData(BaseObject):
else: else:
return return
def __getitem__(self, key):
value = self.full.fields.get(key)
# unserialize files
if isinstance(value, dict) and 'content' in value:
return JSONFile(value)
return value
class Workflow(BaseObject): class Workflow(BaseObject):
statuses = None statuses = None
@ -148,11 +193,10 @@ class Workflow(BaseObject):
def __init__(self, wcs_api, **kwargs): def __init__(self, wcs_api, **kwargs):
super(Workflow, self).__init__(wcs_api, **kwargs) super(Workflow, self).__init__(wcs_api, **kwargs)
self.statuses = [BaseObject(wcs_api, **v) for v in (self.statuses or [])] self.statuses = [BaseObject(wcs_api, **v) for v in (self.statuses or [])]
if self.statuses: assert not hasattr(self.statuses[0], 'startpoint'), 'startpoint is exported by w.c.s. FIXME'
assert not hasattr(self.statuses[0], 'startpoint'), 'startpoint is exported by w.c.s. FIXME' for status in self.statuses:
for status in self.statuses: status.startpoint = False
status.startpoint = False self.statuses[0].startpoint = True
self.statuses[0].startpoint = True
self.statuses_map = dict((s.id, s) for s in self.statuses) self.statuses_map = dict((s.id, s) for s in self.statuses)
self.fields = [Field(wcs_api, **field) for field in (self.fields or [])] self.fields = [Field(wcs_api, **field) for field in (self.fields or [])]
@ -177,29 +221,269 @@ class Schema(BaseObject):
self.geolocations = sorted((k, v) for k, v in (self.geolocations or {}).items()) self.geolocations = sorted((k, v) for k, v in (self.geolocations or {}).items())
class FormDatas(object):
def __init__(self, wcs_api, formdef, full=False, anonymize=False, batch=1000):
self.wcs_api = wcs_api
self.formdef = formdef
self._full = full
self.anonymize = anonymize
self.batch = batch
def __getitem__(self, slice_or_id):
# get batch of forms
if isinstance(slice_or_id, slice):
def helper():
if slice_or_id.stop <= slice_or_id.start or slice_or_id.step:
raise ValueError('invalid slice %s' % slice_or_id)
offset = slice_or_id.start
limit = slice_or_id.stop - slice_or_id.start
url_parts = ['api/forms/{self.formdef.slug}/list'.format(self=self)]
query = {}
query['full'] = 'on' if self._full else 'off'
if offset:
query['offset'] = str(offset)
if limit:
query['limit'] = str(limit)
if self.anonymize:
query['anonymise'] = 'on'
url_parts.append('?%s' % urlparse.urlencode(query))
for d in self.wcs_api.get_json(*url_parts):
# w.c.s. had a bug where some formdata lost their draft status, skip them
if not d.get('receipt_time'):
continue
yield FormData(wcs_api=self.wcs_api, forms=self, formdef=self.formdef, **d)
return helper()
# or get one form
else:
url_parts = ['api/forms/{formdef.slug}/{id}/'.format(formdef=self.formdef, id=slice_or_id)]
if self.anonymize:
url_parts.append('?anonymise=true')
d = self.wcs_api.get_json(*url_parts)
return FormData(wcs_api=self.wcs_api, forms=self, formdef=self.formdef, with_files=True, **d)
@property
def full(self):
forms = copy.copy(self)
forms._full = True
return forms
@property
def anonymized(self):
forms = copy.copy(self)
forms.anonymize = True
return forms
def batched(self, batch):
forms = copy.copy(self)
forms.batch = batch
return forms
def __iter__(self):
start = 0
while True:
empty = True
for formdef in self[start:start + self.batch]:
empty = False
yield formdef
if empty:
break
start += self.batch
def __len__(self):
return len(list((o for o in self)))
class CancelSubmitError(Exception):
pass
class FormDefSubmit(object):
formdef = None
data = None
user_email = None
user_name_id = None
backoffice_submission = False
submission_channel = None
submission_context = None
draft = False
def __init__(self, wcs_api, formdef, **kwargs):
self.wcs_api = wcs_api
self.formdef = formdef
self.data = {}
self.__dict__.update(kwargs)
def payload(self):
d = {
'data': self.data.copy(),
}
if self.draft:
d.setdefault('meta', {})['draft'] = True
if self.backoffice_submission:
d.setdefault('meta', {})['backoffice-submission'] = True
if self.submission_context:
d['context'] = self.submission_context
if self.submission_channel:
d.setdefault('context', {})['channel'] = self.submission_channel
if self.user_email:
d.setdefault('user', {})['email'] = self.user_email
if self.user_name_id:
d.setdefault('user', {})['NameID'] = self.user_name_id
return d
def set(self, field, value, **kwargs):
if isinstance(field, Field):
varname = field.varname
if not varname:
raise ValueError('field has no varname, submit is impossible')
else:
varname = field
try:
field = [f for f in self.formdef.schema.fields if f.varname == varname][0]
except IndexError:
raise ValueError('no field for varname %s' % varname)
if value is None or value == {} or value == []:
self.data.pop(varname, None)
elif hasattr(self, '_set_type_%s' % field.type):
getattr(self, '_set_type_%s' % field.type)(
varname=varname,
field=field,
value=value, **kwargs)
else:
self.data[varname] = value
def _set_type_item(self, varname, field, value, **kwargs):
if isinstance(value, dict):
if not set(value).issuperset(set(['id', 'text'])):
raise ValueError('item field value must have id and text value')
# clean previous values
self.data.pop(varname, None)
self.data.pop(varname + '_raw', None)
self.data.pop(varname + '_structured', None)
if isinstance(value, dict):
# structured & display values
self.data[varname + '_raw'] = value['id']
self.data[varname] = value['text']
if len(value) > 2:
self.data[varname + '_structured'] = value
else:
# raw id in varname
self.data[varname] = value
def _set_type_items(self, varname, field, value, **kwargs):
if not isinstance(value, list):
raise TypeError('%s is an ItemsField it needs a list as value' % varname)
has_dict = False
for choice in value:
if isinstance(value, dict):
if not set(value).issuperset(set(['id', 'text'])):
raise ValueError('items field values must have id and text value')
has_dict = True
if has_dict:
if not all(isinstance(choice, dict) for choice in value):
raise ValueError('ItemsField value must be all structured or none')
# clean previous values
self.data.pop(varname, None)
self.data.pop(varname + '_raw', None)
self.data.pop(varname + '_structured', None)
if has_dict:
raw = self.data[varname + '_raw'] = []
display = self.data[varname] = []
structured = self.data[varname + '_structured'] = []
for choice in value:
raw.append(choice['id'])
display.append(choice['text'])
structured.append(choice)
else:
self.data[varname] = value[:]
def _set_type_file(self, varname, field, value, **kwargs):
filename = kwargs.get('filename')
content_type = kwargs.get('content_type', 'application/octet-stream')
if hasattr(value, 'read'):
content = base64.b64encode(value.read()).decode('ascii')
elif isinstance(value, bytes):
content = base64.b64encode(value).decode('ascii')
elif isinstance(value, dict):
if not set(value).issuperset(set(['filename', 'content'])):
raise ValueError('file field needs a dict value with filename and content')
content = value['content']
filename = value['filename']
content_type = value.get('content_type', content_type)
if not filename:
raise ValueError('missing filename')
self.data[varname] = {
'filename': filename,
'content': content,
'content_type': content_type,
}
def _set_type_date(self, varname, field, value):
if isinstance(value, str):
value = datetime.datetime.strptime(value, '%Y-%m-%d').date()
if isinstance(value, datetime.datetime):
value = value.date()
if isinstance(value, datetime.date):
value = value.strftime('%Y-%m-%d')
self.data[varname] = value
def _set_type_map(self, varname, field, value):
if not isinstance(value, dict):
raise TypeError('value must be a dict for a map field')
if set(value) != set(['lat', 'lon']):
raise ValueError('map field expect keys lat and lon')
self.data[varname] = value
def _set_type_bool(self, varname, field, value):
if isinstance(value, str):
value = value.lower().strip() in ['yes', 'true', 'on']
if not isinstance(value, bool):
raise TypeError('value must be a boolean or a string true, yes, on, false, no, off')
self.data[varname] = value
def cancel(self):
raise CancelSubmitError
class FormDef(BaseObject): class FormDef(BaseObject):
geolocations = None geolocations = None
def __init__(self, wcs_api, **kwargs): def __init__(self, wcs_api, **kwargs):
self.__wcs_api = wcs_api self._wcs_api = wcs_api
self.__dict__.update(**kwargs) self.__dict__.update(**kwargs)
def __unicode__(self): def __str__(self):
return self.title return self.title
@property @property
def datas(self): def formdatas(self):
datas = self.__wcs_api.get_formdata(self.slug) return FormDatas(wcs_api=self._wcs_api, formdef=self)
for data in datas:
data.formdef = self
yield data
@property @property
def schema(self): def schema(self):
return self.__wcs_api.get_schema(self.slug) if not hasattr(self, '_schema'):
d = self._wcs_api.get_json('api/formdefs/{self.slug}/schema'.format(self=self))
self._schema = Schema(self._wcs_api, **d)
return self._schema
def __repr__(self): @contextlib.contextmanager
return '<{klass} {slug!r}>'.format(klass=self.__class__.__name__, slug=self.slug) def submit(self, **kwargs):
submitter = FormDefSubmit(
wcs_api=self._wcs_api,
formdef=self,
**kwargs)
try:
yield submitter
except CancelSubmitError:
return
payload = submitter.payload()
d = self._wcs_api.post_json(payload, 'api/formdefs/{self.slug}/submit'.format(self=self))
if d['err'] != 0:
raise WcsApiError('submited returned an error: %s' % d)
submitter.result = BaseObject(self._wcs_api, **d['data'])
class Role(BaseObject): class Role(BaseObject):
@ -210,99 +494,128 @@ class Category(BaseObject):
pass pass
class WcsObjects(object):
url = None
object_class = None
def __init__(self, wcs_api):
self.wcs_api = wcs_api
def __getitem__(self, slug):
if isinstance(slug, self.object_class):
slug = slug.slug
for instance in self:
if instance.slug == slug:
return instance
raise KeyError('no instance with slug %r' % slug)
def __iter__(self):
for d in self.wcs_api.get_json(self.url)['data']:
yield self.object_class(wcs_api=self.wcs_api, **d)
def __len__(self):
return len(list((o for o in self)))
class Roles(WcsObjects):
# Paths are not coherent :/
url = 'api/roles'
object_class = Role
class FormDefs(WcsObjects):
url = 'api/formdefs/?include-count=on'
object_class = FormDef
class Categories(WcsObjects):
url = 'api/categories/'
object_class = Category
class WcsApi(object): class WcsApi(object):
def __init__(self, url, orig, key, verify=True, slugs=None, batch_size=500): def __init__(self, url, email=None, name_id=None, batch_size=1000,
session=None, logger=None, orig=None, key=None, verify=True):
self.url = url self.url = url
self.batch_size = batch_size
self.email = email
self.name_id = name_id
self.requests = session or requests.Session()
self.logger = logger or logging.getLogger(__name__)
self.orig = orig self.orig = orig
self.key = key self.key = key
self.verify = verify self.verify = verify
self.cache = {}
self.slugs = slugs or []
self.batch_size = batch_size
@property def _build_url(self, url_parts):
def formdefs_url(self): url = self.url
return urlparse.urljoin(self.url, 'api/formdefs/') for url_part in url_parts:
url = urlparse.urljoin(url, url_part)
@property return url
def forms_url(self):
return urlparse.urljoin(self.url, 'api/forms/')
@property
def roles_url(self):
return urlparse.urljoin(self.url, 'api/roles')
def get_json(self, *url_parts): def get_json(self, *url_parts):
url = reduce(lambda x, y: urlparse.urljoin(x, y), url_parts) url = self._build_url(url_parts)
params = {'orig': self.orig} params = {}
query_string = urllib.urlencode(params) if self.email:
presigned_url = url + ('&' if '?' in url else '?') + query_string params['email'] = self.email
if presigned_url in self.cache: if self.name_id:
return self.cache[presigned_url] params['NameID'] = self.name_id
signed_url = signature.sign_url(presigned_url, self.key) if self.orig:
params['orig'] = self.orig
query_string = urlparse.urlencode(params)
complete_url = url + ('&' if '?' in url else '?') + query_string
final_url = complete_url
if self.key:
final_url = signature.sign_url(final_url, self.key)
try: try:
response = requests.get(signed_url, verify=self.verify) response = self.requests.get(final_url, verify=self.verify)
response.raise_for_status()
except requests.RequestException as e: except requests.RequestException as e:
raise WcsApiError('GET request failed', url=signed_url, exception=e) content = getattr(getattr(e, 'response', None), 'content', None)
raise WcsApiError('GET request failed', final_url, e, content)
else: else:
if not response.ok:
try:
text = response.text
except UnicodeError:
text = '<undecodable>' + repr(response.content)
raise WcsApiError('GET response is not 200',
url=signed_url,
status_code=response.status_code,
content=text)
try: try:
content = response.json() return response.json()
self.cache[presigned_url] = content
return content
except ValueError as e: except ValueError as e:
raise WcsApiError('Invalid JSON content', url=signed_url, exception=e) raise WcsApiError('Invalid JSON content', final_url, e)
def post_json(self, data, *url_parts):
url = self._build_url(url_parts)
params = {}
if self.email:
params['email'] = self.email
if self.name_id:
params['NameID'] = self.name_id
if self.orig:
params['orig'] = self.orig
query_string = urlparse.urlencode(params)
complete_url = url + ('&' if '?' in url else '?') + query_string
final_url = complete_url
if self.key:
final_url = signature.sign_url(final_url, self.key)
try:
response = self.requests.post(
final_url,
data=json.dumps(data),
headers={'content-type': 'application/json'},
verify=self.verify)
response.raise_for_status()
except requests.RequestException as e:
content = getattr(getattr(e, 'response', None), 'content', None)
raise WcsApiError('POST request failed', final_url, e, content)
else:
try:
return response.json()
except ValueError as e:
raise WcsApiError('Invalid JSON content', final_url, e)
@property @property
def roles(self): def roles(self):
return [Role(wcs_api=self, **d) for d in self.get_json(self.roles_url)['data']] return Roles(self)
@property @property
def formdefs(self): def formdefs(self):
result = self.get_json(self.formdefs_url + '?include-count=on') return FormDefs(self)
if isinstance(result, dict):
if result['err'] == 0:
data = result['data']
else:
logger.error(u'could not retrieve formdefs from %s, err_desc: %s',
self.formdefs_url, result.get('err_desc'))
return []
else:
data = result
return [FormDef(wcs_api=self, **d) for d in data
if not self.slugs or d['slug'] in self.slugs]
@property @property
def categories(self): def categories(self):
d = {} return Categories(self)
for f in self.formdefs:
if hasattr(f.schema, 'category'):
d[f.schema.category_id] = f.schema.category
return [Category(wcs_api=self, id=k, name=v) for k, v in d.items()]
def get_formdata(self, slug):
offset = 0
limit = self.batch_size
while True:
data = self.get_json(self.forms_url,
slug + '/list?anonymise&full=on&offset=%d&limit=%d' % (offset, limit))
for d in data:
# w.c.s. had a bug where some formdata lost their draft status, skip them
if not d.get('receipt_time'):
continue
yield FormData(wcs_api=self, **d)
if len(data) < limit:
break
offset += limit
def get_schema(self, slug):
json_schema = self.get_json(self.formdefs_url, slug + '/', 'schema?anonymise')
return Schema(wcs_api=self, **json_schema)