wcs-olap/wcs_olap/wcs_api.py

309 lines
9.4 KiB
Python

import six
import requests
import urlparse
import urllib
import isodate
import logging
from . import signature
logger = logging.getLogger(__name__)
def exception_to_text(e):
try:
return six.text_type(e)
except Exception:
pass
try:
return six.text_type(e.decode('utf8'))
except Exception:
pass
try:
return six.text_type(repr(e))
except Exception:
pass
try:
args = e.args
try:
content = six.text_type(repr(args)) if args != [] else ''
except Exception:
content = '<exception-while-rendering-args>'
except AttributeError:
content = ''
return u'%s(%s)' % (e.__class__.__name__, content)
class WcsApiError(Exception):
def __init__(self, message, **kwargs):
super(WcsApiError, self).__init__(message)
self.kwargs = kwargs
def __str__(self):
kwargs = self.kwargs.copy()
if 'exception' in kwargs:
kwargs['exception'] = exception_to_text(kwargs['exception'])
return '%s: %s' % (self.args[0], ' '.join('%s=%s' % (key, value) for key, value in kwargs.items()))
class BaseObject(object):
def __init__(self, wcs_api, **kwargs):
self.__wcs_api = wcs_api
self.__dict__.update(**kwargs)
class FormDataWorkflow(BaseObject):
status = None
fields = None
def __init__(self, wcs_api, **kwargs):
super(FormDataWorkflow, self).__init__(wcs_api, **kwargs)
if self.status is not None:
self.status = BaseObject(wcs_api, **self.status)
self.fields = self.fields or {}
class EvolutionUser(BaseObject):
id = None
name = None
NameID = None
email = None
class Evolution(BaseObject):
who = None
status = None
parts = None
def __init__(self, wcs_api, **kwargs):
super(Evolution, self).__init__(wcs_api, **kwargs)
self.time = isodate.parse_datetime(self.time)
if self.parts:
self.parts = [BaseObject(wcs_api, **part) for part in self.parts]
if self.who:
self.who = EvolutionUser(wcs_api, **self.who)
class FormData(BaseObject):
geolocations = None
evolution = None
def __init__(self, wcs_api, **kwargs):
super(FormData, self).__init__(wcs_api, **kwargs)
self.receipt_time = isodate.parse_datetime(self.receipt_time)
self.submission = BaseObject(wcs_api, **self.submission)
self.workflow = FormDataWorkflow(wcs_api, **self.workflow)
self.evolution = [Evolution(wcs_api, **evo) for evo in self.evolution or []]
self.functions = {}
self.concerned_roles = []
self.action_roles = []
for function in self.roles:
roles = [Role(wcs_api, **r) for r in self.roles[function]]
if function == 'concerned':
self.concerned_roles.extend(roles)
elif function == 'actions':
self.concerned_roles.extend(roles)
else:
try:
self.functions[function] = roles[0]
except IndexError:
self.functions[function] = None
del self.roles
def __repr__(self):
return '<{klass} {display_id!r}>'.format(klass=self.__class__.__name__,
display_id=self.id)
@property
def endpoint_delay(self):
'''Compute delay as the time when the last not endpoint status precedes an endpoint
status.'''
statuses_map = self.formdef.schema.workflow.statuses_map
s = 0
for evo in self.evolution[::-1]:
if evo.status:
try:
status = statuses_map[evo.status]
except KeyError: # happen when workflow has changed
return
if status.endpoint:
s = 1
last = evo.time - self.receipt_time
else:
if s == 1:
return last
else:
return
class Workflow(BaseObject):
statuses = None
fields = None
def __init__(self, wcs_api, **kwargs):
super(Workflow, self).__init__(wcs_api, **kwargs)
self.statuses = [BaseObject(wcs_api, **v) for v in (self.statuses or [])]
if self.statuses:
assert not hasattr(self.statuses[0], 'startpoint'), 'startpoint is exported by w.c.s. FIXME'
for status in self.statuses:
status.startpoint = False
self.statuses[0].startpoint = True
self.statuses_map = dict((s.id, s) for s in self.statuses)
self.fields = [Field(wcs_api, **field) for field in (self.fields or [])]
class Field(BaseObject):
items = None
options = None
varname = None
in_filters = False
anonymise = None
class Schema(BaseObject):
category_id = None
category = None
geolocations = None
def __init__(self, wcs_api, **kwargs):
super(Schema, self).__init__(wcs_api, **kwargs)
self.workflow = Workflow(wcs_api, **self.workflow)
self.fields = [Field(wcs_api, **f) for f in self.fields]
self.geolocations = sorted((k, v) for k, v in (self.geolocations or {}).items())
class FormDef(BaseObject):
geolocations = None
def __init__(self, wcs_api, **kwargs):
self.__wcs_api = wcs_api
self.__dict__.update(**kwargs)
def __unicode__(self):
return self.title
@property
def datas(self):
datas = self.__wcs_api.get_formdata(self.slug)
for data in datas:
data.formdef = self
yield data
@property
def schema(self):
return self.__wcs_api.get_schema(self.slug)
def __repr__(self):
return '<{klass} {slug!r}>'.format(klass=self.__class__.__name__, slug=self.slug)
class Role(BaseObject):
pass
class Category(BaseObject):
pass
class WcsApi(object):
def __init__(self, url, orig, key, verify=True, slugs=None, batch_size=500):
self.url = url
self.orig = orig
self.key = key
self.verify = verify
self.cache = {}
self.slugs = slugs or []
self.batch_size = batch_size
@property
def formdefs_url(self):
return urlparse.urljoin(self.url, 'api/formdefs/')
@property
def forms_url(self):
return urlparse.urljoin(self.url, 'api/forms/')
@property
def roles_url(self):
return urlparse.urljoin(self.url, 'api/roles')
def get_json(self, *url_parts):
url = reduce(lambda x, y: urlparse.urljoin(x, y), url_parts)
params = {'orig': self.orig}
query_string = urllib.urlencode(params)
presigned_url = url + ('&' if '?' in url else '?') + query_string
if presigned_url in self.cache:
return self.cache[presigned_url]
signed_url = signature.sign_url(presigned_url, self.key)
try:
response = requests.get(signed_url, verify=self.verify)
except requests.RequestException as e:
raise WcsApiError('GET request failed', url=signed_url, exception=e)
else:
if not response.ok:
try:
text = response.text
except UnicodeError:
text = '<undecodable>' + repr(response.content)
raise WcsApiError('GET response is not 200',
url=signed_url,
status_code=response.status_code,
content=text)
try:
content = response.json()
self.cache[presigned_url] = content
return content
except ValueError as e:
raise WcsApiError('Invalid JSON content', url=signed_url, exception=e)
@property
def roles(self):
return [Role(wcs_api=self, **d) for d in self.get_json(self.roles_url)['data']]
@property
def formdefs(self):
result = self.get_json(self.formdefs_url + '?include-count=on')
if isinstance(result, dict):
if result['err'] == 0:
data = result['data']
else:
logger.error(u'could not retrieve formdefs from %s, err_desc: %s',
self.formdefs_url, result.get('err_desc'))
return []
else:
data = result
return [FormDef(wcs_api=self, **d) for d in data
if not self.slugs or d['slug'] in self.slugs]
@property
def categories(self):
d = {}
for f in self.formdefs:
if hasattr(f.schema, 'category'):
d[f.schema.category_id] = f.schema.category
return [Category(wcs_api=self, id=k, name=v) for k, v in d.items()]
def get_formdata(self, slug):
offset = 0
limit = self.batch_size
while True:
data = self.get_json(self.forms_url,
slug + '/list?anonymise&full=on&offset=%d&limit=%d' % (offset, limit))
for d in data:
# w.c.s. had a bug where some formdata lost their draft status, skip them
if not d.get('receipt_time'):
continue
yield FormData(wcs_api=self, **d)
if len(data) < limit:
break
offset += limit
def get_schema(self, slug):
json_schema = self.get_json(self.formdefs_url, slug + '/', 'schema?anonymise')
return Schema(wcs_api=self, **json_schema)