wcs-olap/wcs_olap/wcs_api.py

618 lines
20 KiB
Python

# wcs_olap
# Copyright (C) 2020 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import collections
import base64
import copy
import logging
import datetime
import contextlib
import json
import requests
import isodate
import urllib.parse as urlparse
from . import signature
class WcsApiError(Exception):
pass
class JSONFile(object):
def __init__(self, d):
self.d = d
@property
def filename(self):
return self.d.get('filename', '')
@property
def content_type(self):
return self.d.get('content_type', 'application/octet-stream')
@property
def content(self):
return base64.b64decode(self.d['content'])
def to_dict(o):
if hasattr(o, 'to_dict'):
return o.to_dict()
elif isinstance(o, dict):
return {k: to_dict(v) for k, v in o.items()}
elif isinstance(o, (list, tuple)):
return [to_dict(v) for v in o]
else:
return o
class BaseObject(object):
def __init__(self, wcs_api, **kwargs):
self._wcs_api = wcs_api
self.__dict__.update(**kwargs)
def to_dict(self):
d = collections.OrderedDict()
for key, value in self.__dict__.items():
if key[0] == '_':
continue
d[key] = to_dict(value)
return d
class FormDataWorkflow(BaseObject):
status = None
fields = None
def __init__(self, wcs_api, **kwargs):
super(FormDataWorkflow, self).__init__(wcs_api, **kwargs)
self.status = BaseObject(wcs_api, **self.status) if self.status else None
self.fields = self.fields or {}
class EvolutionUser(BaseObject):
id = None
name = None
NameID = None
email = None
class Evolution(BaseObject):
who = None
status = None
parts = None
def __init__(self, wcs_api, **kwargs):
super(Evolution, self).__init__(wcs_api, **kwargs)
self.time = isodate.parse_datetime(self.time)
self.parts = [BaseObject(wcs_api, **part) for part in self.parts or []]
self.who = EvolutionUser(wcs_api, **self.who) if self.who else None
class FormData(BaseObject):
geolocations = None
evolution = None
submission = None
workflow = None
roles = None
with_files = False
def __init__(self, wcs_api, forms, **kwargs):
self.forms = forms
super(FormData, self).__init__(wcs_api, **kwargs)
self.receipt_time = isodate.parse_datetime(self.receipt_time)
self.submission = BaseObject(wcs_api, **self.submission or {})
self.workflow = FormDataWorkflow(wcs_api, **self.workflow or {})
self.evolution = [Evolution(wcs_api, **evo) for evo in self.evolution or []]
self.functions = {}
self.concerned_roles = []
self.action_roles = []
for function in self.roles or []:
roles = [Role(wcs_api, **r) for r in self.roles[function]]
if function == 'concerned':
self.concerned_roles.extend(roles)
elif function == 'actions':
self.concerned_roles.extend(roles)
else:
try:
self.functions[function] = roles[0]
except IndexError:
self.functions[function] = None
if 'roles' in self.__dict__:
del self.roles
def __str__(self):
return '{self.formdef} - {self.id}'.format(self=self)
@property
def full(self):
if self.with_files:
return self
if not hasattr(self, '_full'):
self._full = self.forms[self.id]
return self._full
@property
def anonymized(self):
return self.forms.anonymized[self.id]
@property
def endpoint_delay(self):
'''Compute delay as the time when the last not endpoint status precedes an endpoint
status.'''
statuses_map = self.formdef.schema.workflow.statuses_map
s = 0
for evo in self.evolution[::-1]:
if evo.status:
try:
status = statuses_map[evo.status]
except KeyError: # happen when workflow has changed
return
if status.endpoint:
s = 1
last = evo.time - self.receipt_time
else:
if s == 1:
return last
else:
return
def __getitem__(self, key):
value = self.full.fields.get(key)
# unserialize files
if isinstance(value, dict) and 'content' in value:
return JSONFile(value)
return value
class Workflow(BaseObject):
statuses = None
fields = None
def __init__(self, wcs_api, **kwargs):
super(Workflow, self).__init__(wcs_api, **kwargs)
self.statuses = [BaseObject(wcs_api, **v) for v in (self.statuses or [])]
assert not hasattr(self.statuses[0], 'startpoint'), 'startpoint is exported by w.c.s. FIXME'
for status in self.statuses:
status.startpoint = False
self.statuses[0].startpoint = True
self.statuses_map = dict((s.id, s) for s in self.statuses)
self.fields = [Field(wcs_api, **field) for field in (self.fields or [])]
class Field(BaseObject):
items = None
options = None
varname = None
in_filters = False
anonymise = None
validation = {}
class Schema(BaseObject):
category_id = None
category = None
geolocations = None
def __init__(self, wcs_api, **kwargs):
super(Schema, self).__init__(wcs_api, **kwargs)
self.workflow = Workflow(wcs_api, **self.workflow)
self.fields = [Field(wcs_api, **f) for f in self.fields or []]
self.geolocations = sorted((k, v) for k, v in (self.geolocations or {}).items())
class FormDatas(object):
def __init__(self, wcs_api, formdef, full=False, anonymize=False):
self.wcs_api = wcs_api
self.formdef = formdef
self._full = full
self.anonymize = anonymize
self.batch_size = wcs_api.batch_size
def __getitem__(self, slice_or_id):
# get batch of forms
if isinstance(slice_or_id, slice):
def helper():
if slice_or_id.stop <= slice_or_id.start or slice_or_id.step:
raise ValueError('invalid slice %s' % slice_or_id)
offset = slice_or_id.start
limit = slice_or_id.stop - slice_or_id.start
url_parts = ['api/forms/{self.formdef.slug}/list'.format(self=self)]
query = {}
query['full'] = 'on' if self._full else 'off'
if offset:
query['offset'] = str(offset)
if limit:
query['limit'] = str(limit)
if self.anonymize:
query['anonymise'] = 'on'
url_parts.append('?%s' % urlparse.urlencode(query))
for d in self.wcs_api.get_json(*url_parts):
# w.c.s. had a bug where some formdata lost their draft status, skip them
if not d.get('receipt_time'):
continue
yield FormData(wcs_api=self.wcs_api, forms=self, formdef=self.formdef, **d)
return helper()
# or get one form
else:
url_parts = ['api/forms/{formdef.slug}/{id}/'.format(formdef=self.formdef, id=slice_or_id)]
if self.anonymize:
url_parts.append('?anonymise=true')
d = self.wcs_api.get_json(*url_parts)
return FormData(wcs_api=self.wcs_api, forms=self, formdef=self.formdef, with_files=True, **d)
@property
def full(self):
forms = copy.copy(self)
forms._full = True
return forms
@property
def anonymized(self):
forms = copy.copy(self)
forms.anonymize = True
return forms
def batched(self, batch):
forms = copy.copy(self)
forms.batch_size = batch
return forms
def __iter__(self):
start = 0
while True:
empty = True
for formdef in self[start:start + self.batch_size]:
empty = False
yield formdef
if empty:
break
start += self.batch_size
def __len__(self):
return len(list((o for o in self)))
class CancelSubmitError(Exception):
pass
class FormDefSubmit(object):
formdef = None
data = None
user_email = None
user_name_id = None
backoffice_submission = False
submission_channel = None
submission_context = None
draft = False
def __init__(self, wcs_api, formdef, **kwargs):
self.wcs_api = wcs_api
self.formdef = formdef
self.data = {}
self.__dict__.update(kwargs)
def payload(self):
d = {
'data': self.data.copy(),
}
if self.draft:
d.setdefault('meta', {})['draft'] = True
if self.backoffice_submission:
d.setdefault('meta', {})['backoffice-submission'] = True
if self.submission_context:
d['context'] = self.submission_context
if self.submission_channel:
d.setdefault('context', {})['channel'] = self.submission_channel
if self.user_email:
d.setdefault('user', {})['email'] = self.user_email
if self.user_name_id:
d.setdefault('user', {})['NameID'] = self.user_name_id
return d
def set(self, field, value, **kwargs):
if isinstance(field, Field):
varname = field.varname
if not varname:
raise ValueError('field has no varname, submit is impossible')
else:
varname = field
try:
field = [f for f in self.formdef.schema.fields if f.varname == varname][0]
except IndexError:
raise ValueError('no field for varname %s' % varname)
if value is None or value == {} or value == []:
self.data.pop(varname, None)
elif hasattr(self, '_set_type_%s' % field.type):
getattr(self, '_set_type_%s' % field.type)(
varname=varname,
field=field,
value=value, **kwargs)
else:
self.data[varname] = value
def _set_type_item(self, varname, field, value, **kwargs):
if isinstance(value, dict):
if not set(value).issuperset(set(['id', 'text'])):
raise ValueError('item field value must have id and text value')
# clean previous values
self.data.pop(varname, None)
self.data.pop(varname + '_raw', None)
self.data.pop(varname + '_structured', None)
if isinstance(value, dict):
# structured & display values
self.data[varname + '_raw'] = value['id']
self.data[varname] = value['text']
if len(value) > 2:
self.data[varname + '_structured'] = value
else:
# raw id in varname
self.data[varname] = value
def _set_type_items(self, varname, field, value, **kwargs):
if not isinstance(value, list):
raise TypeError('%s is an ItemsField it needs a list as value' % varname)
has_dict = False
for choice in value:
if isinstance(value, dict):
if not set(value).issuperset(set(['id', 'text'])):
raise ValueError('items field values must have id and text value')
has_dict = True
if has_dict:
if not all(isinstance(choice, dict) for choice in value):
raise ValueError('ItemsField value must be all structured or none')
# clean previous values
self.data.pop(varname, None)
self.data.pop(varname + '_raw', None)
self.data.pop(varname + '_structured', None)
if has_dict:
raw = self.data[varname + '_raw'] = []
display = self.data[varname] = []
structured = self.data[varname + '_structured'] = []
for choice in value:
raw.append(choice['id'])
display.append(choice['text'])
structured.append(choice)
else:
self.data[varname] = value[:]
def _set_type_file(self, varname, field, value, **kwargs):
filename = kwargs.get('filename')
content_type = kwargs.get('content_type', 'application/octet-stream')
if hasattr(value, 'read'):
content = base64.b64encode(value.read()).decode('ascii')
elif isinstance(value, bytes):
content = base64.b64encode(value).decode('ascii')
elif isinstance(value, dict):
if not set(value).issuperset(set(['filename', 'content'])):
raise ValueError('file field needs a dict value with filename and content')
content = value['content']
filename = value['filename']
content_type = value.get('content_type', content_type)
if not filename:
raise ValueError('missing filename')
self.data[varname] = {
'filename': filename,
'content': content,
'content_type': content_type,
}
def _set_type_date(self, varname, field, value):
if isinstance(value, str):
value = datetime.datetime.strptime(value, '%Y-%m-%d').date()
if isinstance(value, datetime.datetime):
value = value.date()
if isinstance(value, datetime.date):
value = value.strftime('%Y-%m-%d')
self.data[varname] = value
def _set_type_map(self, varname, field, value):
if not isinstance(value, dict):
raise TypeError('value must be a dict for a map field')
if set(value) != set(['lat', 'lon']):
raise ValueError('map field expect keys lat and lon')
self.data[varname] = value
def _set_type_bool(self, varname, field, value):
if isinstance(value, str):
value = value.lower().strip() in ['yes', 'true', 'on']
if not isinstance(value, bool):
raise TypeError('value must be a boolean or a string true, yes, on, false, no, off')
self.data[varname] = value
def cancel(self):
raise CancelSubmitError
class FormDef(BaseObject):
geolocations = None
def __init__(self, wcs_api, **kwargs):
self._wcs_api = wcs_api
self.__dict__.update(**kwargs)
def __str__(self):
return self.title
@property
def formdatas(self):
return FormDatas(wcs_api=self._wcs_api, formdef=self)
@property
def schema(self):
if not hasattr(self, '_schema'):
d = self._wcs_api.get_json('api/formdefs/{self.slug}/schema'.format(self=self))
self._schema = Schema(self._wcs_api, **d)
return self._schema
@contextlib.contextmanager
def submit(self, **kwargs):
submitter = FormDefSubmit(
wcs_api=self._wcs_api,
formdef=self,
**kwargs)
try:
yield submitter
except CancelSubmitError:
return
payload = submitter.payload()
d = self._wcs_api.post_json(payload, 'api/formdefs/{self.slug}/submit'.format(self=self))
if d['err'] != 0:
raise WcsApiError('submited returned an error: %s' % d)
submitter.result = BaseObject(self._wcs_api, **d['data'])
class Role(BaseObject):
pass
class Category(BaseObject):
pass
class WcsObjects(object):
url = None
object_class = None
def __init__(self, wcs_api):
self.wcs_api = wcs_api
def __getitem__(self, slug):
if isinstance(slug, self.object_class):
slug = slug.slug
for instance in self:
if instance.slug == slug:
return instance
raise KeyError('no instance with slug %r' % slug)
def __iter__(self):
for d in self.wcs_api.get_json(self.url)['data']:
yield self.object_class(wcs_api=self.wcs_api, **d)
def __len__(self):
return len(list((o for o in self)))
class Roles(WcsObjects):
# Paths are not coherent :/
url = 'api/roles'
object_class = Role
class FormDefs(WcsObjects):
url = 'api/formdefs/?include-count=on&include-disabled=on'
object_class = FormDef
class Categories(WcsObjects):
url = 'api/categories/?include-disabled=on'
object_class = Category
class WcsApi(object):
def __init__(self, url, email=None, name_id=None, batch_size=1000,
session=None, logger=None, orig=None, key=None, verify=True):
self.url = url
self.email = email
self.name_id = name_id
self.requests = session or requests.Session()
self.logger = logger or logging.getLogger(__name__)
self.orig = orig
self.key = key
self.verify = verify
self.batch_size = batch_size
def _build_url(self, url_parts):
url = self.url
for url_part in url_parts:
url = urlparse.urljoin(url, url_part)
return url
def get_json(self, *url_parts):
url = self._build_url(url_parts)
params = {}
if self.email:
params['email'] = self.email
if self.name_id:
params['NameID'] = self.name_id
if self.orig:
params['orig'] = self.orig
query_string = urlparse.urlencode(params)
complete_url = url + ('&' if '?' in url else '?') + query_string
final_url = complete_url
if self.key:
final_url = signature.sign_url(final_url, self.key)
try:
response = self.requests.get(final_url, verify=self.verify)
response.raise_for_status()
except requests.RequestException as e:
content = getattr(getattr(e, 'response', None), 'content', None)
raise WcsApiError('GET request failed', final_url, e, content)
else:
try:
return response.json()
except ValueError as e:
raise WcsApiError('Invalid JSON content', final_url, e)
def post_json(self, data, *url_parts):
url = self._build_url(url_parts)
params = {}
if self.email:
params['email'] = self.email
if self.name_id:
params['NameID'] = self.name_id
if self.orig:
params['orig'] = self.orig
query_string = urlparse.urlencode(params)
complete_url = url + ('&' if '?' in url else '?') + query_string
final_url = complete_url
if self.key:
final_url = signature.sign_url(final_url, self.key)
try:
response = self.requests.post(
final_url,
data=json.dumps(data),
headers={'content-type': 'application/json'},
verify=self.verify)
response.raise_for_status()
except requests.RequestException as e:
content = getattr(getattr(e, 'response', None), 'content', None)
raise WcsApiError('POST request failed', final_url, e, content)
else:
try:
return response.json()
except ValueError as e:
raise WcsApiError('Invalid JSON content', final_url, e)
@property
def roles(self):
return Roles(self)
@property
def formdefs(self):
return FormDefs(self)
@property
def categories(self):
return Categories(self)