This repository has been archived on 2023-02-21. You can view files and clone it, but cannot push or open issues or pull requests.
publik-bi/wcs_olap/wcs_api.py

126 lines
3.7 KiB
Python

import requests
import urlparse
import urllib
import datetime
from . import signature
class WcsApiError(Exception):
pass
class BaseObject(object):
def __init__(self, wcs_api, **kwargs):
self.__wcs_api = wcs_api
self.__dict__.update(**kwargs)
def json(self):
d = self.__dict__.copy()
for key in d.keys():
if key.startswith('_'):
del d[key]
return d
class FormData(BaseObject):
def json(self):
d = super(FormData, self).json()
formdef = d.pop('formdef')
receipt_time = datetime.datetime.strptime(d['receipt_time'], "%Y-%m-%dT%H:%M:%SZ")
d['receipt_time__dow'] = receipt_time.strftime('%A')
d['receipt_time__dow_int'] = int(receipt_time.strftime('%w'))
d['receipt_time__month'] = receipt_time.strftime('%B')
d['receipt_time__month_int'] = int(receipt_time.strftime('%m'))
d['receipt_time__hour'] = int(receipt_time.strftime('%H'))
d['formdef_slug'] = formdef.slug
return d
def __repr__(self):
return '<{klass} {display_id!r}>'.format(klass=self.__class__.__name__,
display_id=self.id)
class FormDef(BaseObject):
def __init__(self, wcs_api, **kwargs):
self.__wcs_api = wcs_api
self.__dict__.update(**kwargs)
def __unicode__(self):
return self.title
@property
def datas(self):
datas = self.__wcs_api.get_formdata(self.slug)
for data in datas:
data.formdef = self
return datas
@property
def schema(self):
return self.__wcs_api.get_schema(self.slug)
def __repr__(self):
return '<{klass} {slug!r}>'.format(klass=self.__class__.__name__, slug=self.slug)
class Role(BaseObject):
pass
class WcsApi(object):
def __init__(self, url, orig, key, name_id=None, email=None, verify=False):
self.url = url
self.orig = orig
self.key = key
self.email = email
self.name_id = name_id
self.verify = verify
@property
def formdefs_url(self):
return urlparse.urljoin(self.url, 'api/formdefs/')
@property
def forms_url(self):
return urlparse.urljoin(self.url, 'api/forms/')
@property
def roles_url(self):
return urlparse.urljoin(self.url, 'api/roles')
def get_json(self, *url_parts):
url = reduce(lambda x, y: urlparse.urljoin(x, y), url_parts)
params = {'orig': self.orig}
if self.email:
params['email'] = self.email
if self.name_id:
params['NameID'] = self.name_id
query_string = urllib.urlencode(params)
presigned_url = url + ('&' if '?' in url else '?') + query_string
signed_url = signature.sign_url(presigned_url, self.key)
try:
response = requests.get(signed_url, verify=False)
response.raise_for_status()
except requests.RequestException, e:
raise WcsApiError('GET request failed', signed_url, e)
else:
try:
return response.json()
except ValueError, e:
raise WcsApiError('Invalid JSON content', signed_url, e)
@property
def roles(self):
return [Role(wcs_api=self, **d) for d in self.get_json(self.roles_url)['data']]
@property
def formdefs(self):
return [FormDef(wcs_api=self, **d) for d in self.get_json(self.formdefs_url)]
def get_formdata(self, slug):
for d in self.get_json(self.forms_url, slug + '/'):
yield FormData(wcs_api=self, **d)
def get_schema(self, slug):
return BaseObject(wcs_api=self, **self.get_json(self.formdefs_url, slug + '/', 'schema'))