add utilities to call w.c.s. APIs (#32656)
This commit is contained in:
parent
d5352aec67
commit
7ce97ff996
|
@ -3,3 +3,4 @@ local_settings.py
|
|||
passerelle.sqlite3
|
||||
media
|
||||
/static
|
||||
/wcs
|
||||
|
|
|
@ -0,0 +1,4 @@
|
|||
#!/bin/sh -xue
|
||||
|
||||
test -d wcs || git clone http://git.entrouvert.org/wcs.git
|
||||
(cd wcs && git pull)
|
|
@ -0,0 +1,736 @@
|
|||
# passerelle - uniform access to multiple data sources and services
|
||||
# Copyright (C) 2019 Entr'ouvert
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify it
|
||||
# under the terms of the GNU Affero General Public License as published
|
||||
# by the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
|
||||
import collections
|
||||
import base64
|
||||
import copy
|
||||
import logging
|
||||
import datetime
|
||||
import contextlib
|
||||
import json
|
||||
|
||||
import requests
|
||||
import isodate
|
||||
|
||||
from django.conf import settings
|
||||
from django.db import models
|
||||
from django.core.cache import cache
|
||||
from django import forms
|
||||
from django.utils.six.moves.urllib import parse as urlparse
|
||||
from django.utils import six
|
||||
|
||||
from passerelle.base import signature
|
||||
|
||||
|
||||
class WcsApiError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class JSONFile(object):
|
||||
def __init__(self, d):
|
||||
self.d = d
|
||||
|
||||
@property
|
||||
def filename(self):
|
||||
return self.d.get('filename', '')
|
||||
|
||||
@property
|
||||
def content_type(self):
|
||||
return self.d.get('content_type', 'application/octet-stream')
|
||||
|
||||
@property
|
||||
def content(self):
|
||||
return base64.b64decode(self.d['content'])
|
||||
|
||||
|
||||
def to_dict(o):
|
||||
if hasattr(o, 'to_dict'):
|
||||
return o.to_dict()
|
||||
elif isinstance(o, dict):
|
||||
return {k: to_dict(v) for k, v in o.items()}
|
||||
elif isinstance(o, (list, tuple)):
|
||||
return [to_dict(v) for v in o]
|
||||
else:
|
||||
return o
|
||||
|
||||
|
||||
class BaseObject(object):
|
||||
def __init__(self, wcs_api, **kwargs):
|
||||
self._wcs_api = wcs_api
|
||||
self.__dict__.update(**kwargs)
|
||||
|
||||
def to_dict(self):
|
||||
d = collections.OrderedDict()
|
||||
for key, value in self.__dict__.items():
|
||||
if key[0] == '_':
|
||||
continue
|
||||
d[key] = to_dict(value)
|
||||
return d
|
||||
|
||||
|
||||
class FormDataWorkflow(BaseObject):
|
||||
status = None
|
||||
fields = None
|
||||
|
||||
def __init__(self, wcs_api, **kwargs):
|
||||
super(FormDataWorkflow, self).__init__(wcs_api, **kwargs)
|
||||
if self.status is not None:
|
||||
self.status = BaseObject(wcs_api, **self.status)
|
||||
self.fields = self.fields or {}
|
||||
|
||||
|
||||
class EvolutionUser(BaseObject):
|
||||
id = None
|
||||
name = None
|
||||
NameID = None
|
||||
email = None
|
||||
|
||||
|
||||
class Evolution(BaseObject):
|
||||
who = None
|
||||
status = None
|
||||
parts = None
|
||||
|
||||
def __init__(self, wcs_api, **kwargs):
|
||||
super(Evolution, self).__init__(wcs_api, **kwargs)
|
||||
self.time = isodate.parse_datetime(self.time)
|
||||
if self.parts:
|
||||
self.parts = [BaseObject(wcs_api, **part) for part in self.parts]
|
||||
if self.who:
|
||||
self.who = EvolutionUser(wcs_api, **self.who)
|
||||
|
||||
|
||||
@six.python_2_unicode_compatible
|
||||
class FormData(BaseObject):
|
||||
geolocations = None
|
||||
evolution = None
|
||||
submissions = None
|
||||
workflow = None
|
||||
roles = None
|
||||
with_files = False
|
||||
|
||||
def __init__(self, wcs_api, forms, **kwargs):
|
||||
self.forms = forms
|
||||
super(FormData, self).__init__(wcs_api, **kwargs)
|
||||
self.receipt_time = isodate.parse_datetime(self.receipt_time)
|
||||
if self.submissions:
|
||||
self.submission = BaseObject(wcs_api, **self.submission)
|
||||
if self.workflow:
|
||||
self.workflow = FormDataWorkflow(wcs_api, **self.workflow)
|
||||
self.evolution = [Evolution(wcs_api, **evo) for evo in self.evolution or []]
|
||||
self.functions = {}
|
||||
self.concerned_roles = []
|
||||
self.action_roles = []
|
||||
for function in self.roles or []:
|
||||
roles = [Role(wcs_api, **r) for r in self.roles[function]]
|
||||
if function == 'concerned':
|
||||
self.concerned_roles.extend(roles)
|
||||
elif function == 'actions':
|
||||
self.concerned_roles.extend(roles)
|
||||
else:
|
||||
try:
|
||||
self.functions[function] = roles[0]
|
||||
except IndexError:
|
||||
self.functions[function] = None
|
||||
if 'roles' in self.__dict__:
|
||||
del self.roles
|
||||
|
||||
def __str__(self):
|
||||
return '{self.formdef} - {self.id}'.format(self=self)
|
||||
|
||||
@property
|
||||
def full(self):
|
||||
if self.with_files:
|
||||
return self
|
||||
if not hasattr(self, '_full'):
|
||||
self._full = self.forms[self.id]
|
||||
return self._full
|
||||
|
||||
@property
|
||||
def anonymized(self):
|
||||
return self.forms.anonymized[self.id]
|
||||
|
||||
@property
|
||||
def endpoint_delay(self):
|
||||
'''Compute delay as the time when the last not endpoint status precedes an endpoint
|
||||
status.'''
|
||||
statuses_map = self.formdef.schema.workflow.statuses_map
|
||||
s = 0
|
||||
for evo in self.evolution[::-1]:
|
||||
if evo.status:
|
||||
try:
|
||||
status = statuses_map[evo.status]
|
||||
except KeyError: # happen when workflow has changed
|
||||
return
|
||||
if status.endpoint:
|
||||
s = 1
|
||||
last = evo.time - self.receipt_time
|
||||
else:
|
||||
if s == 1:
|
||||
return last
|
||||
else:
|
||||
return
|
||||
|
||||
def __getitem__(self, key):
|
||||
value = self.full.fields.get(key)
|
||||
# unserialize files
|
||||
if isinstance(value, dict) and 'content' in value:
|
||||
return JSONFile(value)
|
||||
return value
|
||||
|
||||
|
||||
class Workflow(BaseObject):
|
||||
statuses = None
|
||||
fields = None
|
||||
|
||||
def __init__(self, wcs_api, **kwargs):
|
||||
super(Workflow, self).__init__(wcs_api, **kwargs)
|
||||
self.statuses = [BaseObject(wcs_api, **v) for v in (self.statuses or [])]
|
||||
assert not hasattr(self.statuses[0], 'startpoint'), 'startpoint is exported by w.c.s. FIXME'
|
||||
for status in self.statuses:
|
||||
status.startpoint = False
|
||||
self.statuses[0].startpoint = True
|
||||
self.statuses_map = dict((s.id, s) for s in self.statuses)
|
||||
self.fields = [Field(wcs_api, **field) for field in (self.fields or [])]
|
||||
|
||||
|
||||
class Field(BaseObject):
|
||||
items = None
|
||||
options = None
|
||||
varname = None
|
||||
in_filters = False
|
||||
anonymise = None
|
||||
|
||||
|
||||
class Schema(BaseObject):
|
||||
category_id = None
|
||||
category = None
|
||||
geolocations = None
|
||||
|
||||
def __init__(self, wcs_api, **kwargs):
|
||||
super(Schema, self).__init__(wcs_api, **kwargs)
|
||||
self.workflow = Workflow(wcs_api, **self.workflow)
|
||||
self.fields = [Field(wcs_api, **f) for f in self.fields]
|
||||
self.geolocations = sorted((k, v) for k, v in (self.geolocations or {}).items())
|
||||
|
||||
|
||||
class FormDatas(object):
|
||||
def __init__(self, wcs_api, formdef, full=False, anonymize=False, batch=1000):
|
||||
self.wcs_api = wcs_api
|
||||
self.formdef = formdef
|
||||
self._full = full
|
||||
self.anonymize = anonymize
|
||||
self.batch = batch
|
||||
|
||||
def __getitem__(self, slice_or_id):
|
||||
# get batch of forms
|
||||
if isinstance(slice_or_id, slice):
|
||||
def helper():
|
||||
if slice_or_id.stop <= slice_or_id.start or slice_or_id.step:
|
||||
raise ValueError('invalid slice %s' % slice_or_id)
|
||||
offset = slice_or_id.start
|
||||
limit = slice_or_id.stop - slice_or_id.start
|
||||
|
||||
url_parts = ['api/forms/{self.formdef.slug}/list'.format(self=self)]
|
||||
query = {}
|
||||
query['full'] = 'on' if self._full else 'off'
|
||||
if offset:
|
||||
query['offset'] = str(offset)
|
||||
if limit:
|
||||
query['limit'] = str(limit)
|
||||
if self.anonymize:
|
||||
query['anonymise'] = 'on'
|
||||
url_parts.append('?%s' % urlparse.urlencode(query))
|
||||
for d in self.wcs_api.get_json(*url_parts):
|
||||
# w.c.s. had a bug where some formdata lost their draft status, skip them
|
||||
if not d.get('receipt_time'):
|
||||
continue
|
||||
yield FormData(wcs_api=self.wcs_api, forms=self, formdef=self.formdef, **d)
|
||||
return helper()
|
||||
# or get one form
|
||||
else:
|
||||
url_parts = ['api/forms/{formdef.slug}/{id}/'.format(formdef=self.formdef, id=slice_or_id)]
|
||||
if self.anonymize:
|
||||
url_parts.append('?anonymise=true')
|
||||
d = self.wcs_api.get_json(*url_parts)
|
||||
return FormData(wcs_api=self.wcs_api, forms=self, formdef=self.formdef, with_files=True, **d)
|
||||
|
||||
@property
|
||||
def full(self):
|
||||
forms = copy.copy(self)
|
||||
forms._full = True
|
||||
return forms
|
||||
|
||||
@property
|
||||
def anonymized(self):
|
||||
forms = copy.copy(self)
|
||||
forms.anonymize = True
|
||||
return forms
|
||||
|
||||
def batched(self, batch):
|
||||
forms = copy.copy(self)
|
||||
forms.batch = batch
|
||||
return forms
|
||||
|
||||
def __iter__(self):
|
||||
start = 0
|
||||
while True:
|
||||
empty = True
|
||||
for formdef in self[start:start + self.batch]:
|
||||
empty = False
|
||||
yield formdef
|
||||
if empty:
|
||||
break
|
||||
start += self.batch
|
||||
|
||||
def __len__(self):
|
||||
return len(list((o for o in self)))
|
||||
|
||||
|
||||
class CancelSubmitError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class FormDefSubmit(object):
|
||||
formdef = None
|
||||
data = None
|
||||
user_email = None
|
||||
user_name_id = None
|
||||
backoffice_submission = False
|
||||
submission_channel = None
|
||||
submission_context = None
|
||||
draft = False
|
||||
|
||||
def __init__(self, wcs_api, formdef, **kwargs):
|
||||
self.wcs_api = wcs_api
|
||||
self.formdef = formdef
|
||||
self.data = {}
|
||||
self.__dict__.update(kwargs)
|
||||
|
||||
def payload(self):
|
||||
d = {
|
||||
'data': self.data.copy(),
|
||||
}
|
||||
if self.draft:
|
||||
d.setdefault('meta', {})['draft'] = True
|
||||
if self.backoffice_submission:
|
||||
d.setdefault('meta', {})['backoffice-submission'] = True
|
||||
if self.submission_context:
|
||||
d['context'] = self.submission_context
|
||||
if self.submission_channel:
|
||||
d.setdefault('context', {})['channel'] = self.submission_channel
|
||||
if self.user_email:
|
||||
d.setdefault('user', {})['email'] = self.user_email
|
||||
if self.user_name_id:
|
||||
d.setdefault('user', {})['NameID'] = self.user_name_id
|
||||
return d
|
||||
|
||||
def set(self, field, value, **kwargs):
|
||||
if isinstance(field, Field):
|
||||
varname = field.varname
|
||||
if not varname:
|
||||
raise ValueError('field has no varname, submit is impossible')
|
||||
else:
|
||||
varname = field
|
||||
try:
|
||||
field = [f for f in self.formdef.schema.fields if f.varname == varname][0]
|
||||
except IndexError:
|
||||
raise ValueError('no field for varname %s' % varname)
|
||||
|
||||
if value is None or value == {} or value == []:
|
||||
self.data.pop(varname, None)
|
||||
elif hasattr(self, '_set_type_%s' % field.type):
|
||||
getattr(self, '_set_type_%s' % field.type)(
|
||||
varname=varname,
|
||||
field=field,
|
||||
value=value, **kwargs)
|
||||
else:
|
||||
self.data[varname] = value
|
||||
|
||||
def _set_type_item(self, varname, field, value, **kwargs):
|
||||
if isinstance(value, dict):
|
||||
if not set(value).issuperset(set(['id', 'text'])):
|
||||
raise ValueError('item field value must have id and text value')
|
||||
# clean previous values
|
||||
self.data.pop(varname, None)
|
||||
self.data.pop(varname + '_raw', None)
|
||||
self.data.pop(varname + '_structured', None)
|
||||
if isinstance(value, dict):
|
||||
# structured & display values
|
||||
self.data[varname + '_raw'] = value['id']
|
||||
self.data[varname] = value['text']
|
||||
if len(value) > 2:
|
||||
self.data[varname + '_structured'] = value
|
||||
else:
|
||||
# raw id in varname
|
||||
self.data[varname] = value
|
||||
|
||||
def _set_type_items(self, varname, field, value, **kwargs):
|
||||
if not isinstance(value, list):
|
||||
raise TypeError('%s is an ItemsField it needs a list as value' % varname)
|
||||
|
||||
has_dict = False
|
||||
for choice in value:
|
||||
if isinstance(value, dict):
|
||||
if not set(value).issuperset(set(['id', 'text'])):
|
||||
raise ValueError('items field values must have id and text value')
|
||||
has_dict = True
|
||||
if has_dict:
|
||||
if not all(isinstance(choice, dict) for choice in value):
|
||||
raise ValueError('ItemsField value must be all structured or none')
|
||||
# clean previous values
|
||||
self.data.pop(varname, None)
|
||||
self.data.pop(varname + '_raw', None)
|
||||
self.data.pop(varname + '_structured', None)
|
||||
if has_dict:
|
||||
raw = self.data[varname + '_raw'] = []
|
||||
display = self.data[varname] = []
|
||||
structured = self.data[varname + '_structured'] = []
|
||||
for choice in value:
|
||||
raw.append(choice['id'])
|
||||
display.append(choice['text'])
|
||||
structured.append(choice)
|
||||
else:
|
||||
self.data[varname] = value[:]
|
||||
|
||||
def _set_type_file(self, varname, field, value, **kwargs):
|
||||
filename = kwargs.get('filename')
|
||||
content_type = kwargs.get('content_type', 'application/octet-stream')
|
||||
if hasattr(value, 'read'):
|
||||
content = base64.b64encode(value.read())
|
||||
elif isinstance(value, six.binary_type):
|
||||
content = base64.b64encode(value)
|
||||
elif isinstance(value, dict):
|
||||
if not set(value).issuperset(set(['filename', 'content'])):
|
||||
raise ValueError('file field needs a dict value with filename and content')
|
||||
content = value['content']
|
||||
filename = value['filename']
|
||||
content_type = value.get('content_type', content_type)
|
||||
if not filename:
|
||||
raise ValueError('missing filename')
|
||||
self.data[varname] = {
|
||||
'filename': filename,
|
||||
'content': content,
|
||||
'content_type': content_type,
|
||||
}
|
||||
|
||||
def _set_type_date(self, varname, field, value):
|
||||
if isinstance(value, six.string_types):
|
||||
value = datetime.datetime.strptime(value, '%Y-%m-%d').date()
|
||||
if isinstance(value, datetime.datetime):
|
||||
value = value.date()
|
||||
if isinstance(value, datetime.date):
|
||||
value = value.strftime('%Y-%m-%d')
|
||||
self.data[varname] = value
|
||||
|
||||
def _set_type_map(self, varname, field, value):
|
||||
if not isinstance(value, dict):
|
||||
raise TypeError('value must be a dict for a map field')
|
||||
if set(value) != set(['lat', 'lon']):
|
||||
raise ValueError('map field expect keys lat and lon')
|
||||
self.data[varname] = value
|
||||
|
||||
def _set_type_bool(self, varname, field, value):
|
||||
if isinstance(value, six.string_types):
|
||||
value = value.lower().strip() in ['yes', 'true', 'on']
|
||||
if not isinstance(value, bool):
|
||||
raise TypeError('value must be a boolean or a string true, yes, on, false, no, off')
|
||||
self.data[varname] = value
|
||||
|
||||
def cancel(self):
|
||||
raise CancelSubmitError
|
||||
|
||||
|
||||
@six.python_2_unicode_compatible
|
||||
class FormDef(BaseObject):
|
||||
geolocations = None
|
||||
|
||||
def __init__(self, wcs_api, **kwargs):
|
||||
self._wcs_api = wcs_api
|
||||
self.__dict__.update(**kwargs)
|
||||
|
||||
def __str__(self):
|
||||
return self.title
|
||||
|
||||
@property
|
||||
def formdatas(self):
|
||||
return FormDatas(wcs_api=self._wcs_api, formdef=self)
|
||||
|
||||
@property
|
||||
def schema(self):
|
||||
if not hasattr(self, '_schema'):
|
||||
d = self._wcs_api.get_json('api/formdefs/{self.slug}/schema'.format(self=self))
|
||||
self._schema = Schema(self._wcs_api, **d)
|
||||
return self._schema
|
||||
|
||||
@contextlib.contextmanager
|
||||
def submit(self, **kwargs):
|
||||
submitter = FormDefSubmit(
|
||||
wcs_api=self._wcs_api,
|
||||
formdef=self,
|
||||
**kwargs)
|
||||
try:
|
||||
yield submitter
|
||||
except CancelSubmitError:
|
||||
return
|
||||
payload = submitter.payload()
|
||||
d = self._wcs_api.post_json(payload, 'api/formdefs/{self.slug}/submit'.format(self=self))
|
||||
if d['err'] != 0:
|
||||
raise WcsApiError('submited returned an error: %s' % d)
|
||||
submitter.result = BaseObject(self._wcs_api, **d['data'])
|
||||
|
||||
|
||||
class Role(BaseObject):
|
||||
pass
|
||||
|
||||
|
||||
class Category(BaseObject):
|
||||
pass
|
||||
|
||||
|
||||
class WcsObjects(object):
|
||||
url = None
|
||||
object_class = None
|
||||
|
||||
def __init__(self, wcs_api):
|
||||
self.wcs_api = wcs_api
|
||||
|
||||
def __getitem__(self, slug):
|
||||
if isinstance(slug, self.object_class):
|
||||
slug = slug.slug
|
||||
for instance in self:
|
||||
if instance.slug == slug:
|
||||
return instance
|
||||
raise KeyError('no instance with slug %r' % slug)
|
||||
|
||||
def __iter__(self):
|
||||
for d in self.wcs_api.get_json(self.url)['data']:
|
||||
yield self.object_class(wcs_api=self.wcs_api, **d)
|
||||
|
||||
def __len__(self):
|
||||
return len(list((o for o in self)))
|
||||
|
||||
|
||||
class Roles(WcsObjects):
|
||||
# Paths are not coherent :/
|
||||
url = 'api/roles'
|
||||
object_class = Role
|
||||
|
||||
|
||||
class FormDefs(WcsObjects):
|
||||
url = 'api/formdefs/'
|
||||
object_class = FormDef
|
||||
|
||||
|
||||
class Categories(WcsObjects):
|
||||
url = 'api/categories/'
|
||||
object_class = Category
|
||||
|
||||
|
||||
class WcsApi(object):
|
||||
def __init__(self, url, email=None, name_id=None, batch_size=1000, session=None, logger=None, orig=None, key=None):
|
||||
self.url = url
|
||||
self.batch_size = batch_size
|
||||
self.email = email
|
||||
self.name_id = name_id
|
||||
self.requests = session or requests.Session()
|
||||
self.logger = logger or logging.getLogger(__name__)
|
||||
self.orig = orig
|
||||
self.key = key
|
||||
|
||||
def _build_url(self, url_parts):
|
||||
url = self.url
|
||||
for url_part in url_parts:
|
||||
url = urlparse.urljoin(url, url_part)
|
||||
return url
|
||||
|
||||
def get_json(self, *url_parts):
|
||||
url = self._build_url(url_parts)
|
||||
params = {}
|
||||
if self.email:
|
||||
params['email'] = self.email
|
||||
if self.name_id:
|
||||
params['NameID'] = self.name_id
|
||||
if self.orig:
|
||||
params['orig'] = self.orig
|
||||
query_string = urlparse.urlencode(params)
|
||||
complete_url = url + ('&' if '?' in url else '?') + query_string
|
||||
final_url = complete_url
|
||||
if self.key:
|
||||
final_url = signature.sign_url(final_url, self.key)
|
||||
try:
|
||||
response = self.requests.get(final_url)
|
||||
response.raise_for_status()
|
||||
except requests.RequestException as e:
|
||||
content = getattr(getattr(e, 'response', None), 'content', None)
|
||||
raise WcsApiError('GET request failed', final_url, e, content)
|
||||
else:
|
||||
try:
|
||||
return response.json()
|
||||
except ValueError as e:
|
||||
raise WcsApiError('Invalid JSON content', final_url, e)
|
||||
|
||||
def post_json(self, data, *url_parts):
|
||||
url = self._build_url(url_parts)
|
||||
params = {}
|
||||
if self.email:
|
||||
params['email'] = self.email
|
||||
if self.name_id:
|
||||
params['NameID'] = self.name_id
|
||||
if self.orig:
|
||||
params['orig'] = self.orig
|
||||
query_string = urlparse.urlencode(params)
|
||||
complete_url = url + ('&' if '?' in url else '?') + query_string
|
||||
final_url = complete_url
|
||||
if self.key:
|
||||
final_url = signature.sign_url(final_url, self.key)
|
||||
try:
|
||||
response = self.requests.post(final_url, data=json.dumps(data), headers={'content-type': 'application/json'})
|
||||
response.raise_for_status()
|
||||
except requests.RequestException as e:
|
||||
content = getattr(getattr(e, 'response', None), 'content', None)
|
||||
raise WcsApiError('POST request failed', final_url, e, content)
|
||||
else:
|
||||
try:
|
||||
return response.json()
|
||||
except ValueError as e:
|
||||
raise WcsApiError('Invalid JSON content', final_url, e)
|
||||
|
||||
@property
|
||||
def roles(self):
|
||||
return Roles(self)
|
||||
|
||||
@property
|
||||
def formdefs(self):
|
||||
return FormDefs(self)
|
||||
|
||||
@property
|
||||
def categories(self):
|
||||
return Categories(self)
|
||||
|
||||
|
||||
def get_wcs_choices(session=None):
|
||||
cached_choices = cache.get('wcs-formdef-choices')
|
||||
if cached_choices is None:
|
||||
known_services = getattr(settings, 'KNOWN_SERVICES', {})
|
||||
|
||||
def helper():
|
||||
for key, value in known_services.get('wcs', {}).items():
|
||||
api = WcsApi(
|
||||
url=value['url'],
|
||||
orig=value['orig'],
|
||||
key=value['secret'],
|
||||
session=session)
|
||||
for formdef in list(api.formdefs):
|
||||
title = '%s - %s' % (
|
||||
value['title'],
|
||||
formdef.title)
|
||||
yield key, formdef.slug, title
|
||||
cached_choices = sorted(helper(), key=lambda x: x[2])
|
||||
cache.set('wcs-formdef-choices', cached_choices, 600)
|
||||
|
||||
choices = [('', '---------')]
|
||||
for wcs_slug, formdef_slug, title in cached_choices:
|
||||
choices.append((FormDefRef(wcs_slug, formdef_slug), title))
|
||||
return choices
|
||||
|
||||
|
||||
@six.python_2_unicode_compatible
|
||||
class FormDefRef(object):
|
||||
_formdef = None
|
||||
_api = None
|
||||
session = None
|
||||
|
||||
def __init__(self, value1, value2=None):
|
||||
if value2:
|
||||
self.wcs_slug, self.formdef_slug = value1, value2
|
||||
else:
|
||||
self.wcs_slug, self.formdef_slug = six.text_type(value1).rsplit(':', 1)
|
||||
|
||||
@property
|
||||
def api(self):
|
||||
if not self._api:
|
||||
config = settings.KNOWN_SERVICES['wcs'].get(self.wcs_slug)
|
||||
self._api = WcsApi(
|
||||
url=config['url'],
|
||||
orig=config['orig'],
|
||||
key=config['secret'],
|
||||
session=self.session)
|
||||
return self._api
|
||||
|
||||
@property
|
||||
def formdef(self):
|
||||
if not self._formdef:
|
||||
self._formdef = self.api.formdefs[self.formdef_slug]
|
||||
return self._formdef
|
||||
|
||||
def __getattr__(self, name):
|
||||
return getattr(self.formdef, name)
|
||||
|
||||
def __str__(self):
|
||||
return '%s:%s' % (self.wcs_slug, self.formdef_slug)
|
||||
|
||||
def __eq__(self, other):
|
||||
if not other:
|
||||
return False
|
||||
if not hasattr(other, 'wcs_slug'):
|
||||
other = FormDefRef(other)
|
||||
return self.wcs_slug == other.wcs_slug and self.formdef_slug == other.formdef_slug
|
||||
|
||||
def __ne__(self, other):
|
||||
return not self.__eq__(other)
|
||||
|
||||
def __deepcopy__(self, memo):
|
||||
return self.__class__(self.wcs_slug, self.formdef_slug)
|
||||
|
||||
|
||||
class FormDefFormField(forms.TypedChoiceField):
|
||||
def __init__(self, **kwargs):
|
||||
super(FormDefFormField, self).__init__(
|
||||
choices=self.get_formdef_choices,
|
||||
coerce=FormDefRef, **kwargs)
|
||||
|
||||
def get_formdef_choices(self):
|
||||
requests = getattr(self, 'requests', None)
|
||||
return get_wcs_choices(requests)
|
||||
|
||||
|
||||
class FormDefField(models.Field):
|
||||
def get_internal_type(self):
|
||||
return 'TextField'
|
||||
|
||||
def from_db_value(self, value, *args, **kwargs):
|
||||
return self.to_python(value)
|
||||
|
||||
def to_python(self, value):
|
||||
if not value:
|
||||
return None
|
||||
if isinstance(value, FormDefRef):
|
||||
return value
|
||||
return FormDefRef(value)
|
||||
|
||||
def get_prep_value(self, value):
|
||||
if not value:
|
||||
return ''
|
||||
return str(value)
|
||||
|
||||
def formfield(self, **kwargs):
|
||||
defaults = {
|
||||
'form_class': FormDefFormField,
|
||||
}
|
||||
defaults.update(kwargs)
|
||||
return super(FormDefField, self).formfield(**defaults)
|
|
@ -0,0 +1,450 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# passerelle - uniform access to multiple data sources and services
|
||||
# Copyright (C) 2019 Entr'ouvert
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify it
|
||||
# under the terms of the GNU Affero General Public License as published
|
||||
# by the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
|
||||
import pickle
|
||||
import sys
|
||||
import time
|
||||
import os
|
||||
import shutil
|
||||
import random
|
||||
import socket
|
||||
import tempfile
|
||||
import contextlib
|
||||
import ConfigParser
|
||||
|
||||
import psycopg2
|
||||
import pytest
|
||||
import httmock
|
||||
|
||||
|
||||
def find_free_tcp_port():
|
||||
with contextlib.closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
|
||||
s.bind(('', 0))
|
||||
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
return s.getsockname()[1]
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def postgres_db_factory():
|
||||
database = 'db%s' % random.getrandbits(20)
|
||||
|
||||
with contextlib.closing(psycopg2.connect('')) as conn:
|
||||
conn.set_isolation_level(0)
|
||||
with conn.cursor() as cursor:
|
||||
cursor.execute('CREATE DATABASE %s' % database)
|
||||
try:
|
||||
yield PostgresDB(database)
|
||||
finally:
|
||||
with contextlib.closing(psycopg2.connect('')) as conn:
|
||||
conn.set_isolation_level(0)
|
||||
with conn.cursor() as cursor:
|
||||
cursor.execute('DROP DATABASE IF EXISTS %s' % database)
|
||||
|
||||
|
||||
class PostgresDB(object):
|
||||
def __init__(self, database):
|
||||
self.database = database
|
||||
|
||||
@property
|
||||
def dsn(self):
|
||||
return 'dbname={self.database}'.format(self=self)
|
||||
|
||||
@contextlib.contextmanager
|
||||
def conn(self):
|
||||
with contextlib.closing(psycopg2.connect(self.dsn)) as conn:
|
||||
yield conn
|
||||
|
||||
def __repr__(self):
|
||||
return '<Postgres Database %r>' % self.database
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def postgres_db():
|
||||
with postgres_db_factory() as pg_db:
|
||||
yield pg_db
|
||||
|
||||
|
||||
class WcsRunInContextError(Exception):
|
||||
def __init__(self, msg, exception, tb):
|
||||
self.msg = msg
|
||||
self.exception = exception
|
||||
self.tb = tb
|
||||
super(WcsRunInContextError, self).__init__(msg)
|
||||
|
||||
def __str__(self):
|
||||
return '%s\n%s' % (self.msg, self.tb)
|
||||
|
||||
|
||||
class WcsHost(object):
|
||||
def __init__(self, wcs, hostname, database=None):
|
||||
self.wcs = wcs
|
||||
self.hostname = hostname
|
||||
self.app_dir = os.path.join(wcs.app_dir, hostname)
|
||||
with self.config_pck as config:
|
||||
config['misc'] = {'charset': 'utf-8'}
|
||||
config['language'] = {'language': 'en'}
|
||||
config['branding'] = {'theme': 'django'}
|
||||
if database:
|
||||
self.set_postgresql(database)
|
||||
self.__wcs_init()
|
||||
|
||||
@property
|
||||
def url(self):
|
||||
return 'http://{self.hostname}:{self.wcs.port}'.format(self=self)
|
||||
|
||||
def run_in_context(self, func):
|
||||
from multiprocessing import Pipe
|
||||
|
||||
WCSCTL = os.environ.get('WCSCTL')
|
||||
pipe_out, pipe_in = Pipe()
|
||||
pid = os.fork()
|
||||
if pid:
|
||||
pid, exit_code = os.waitpid(pid, 0)
|
||||
try:
|
||||
if pid and exit_code != 0:
|
||||
try:
|
||||
e, formatted_tb = pipe_out.recv()
|
||||
except EOFError:
|
||||
e, formatted_tb = None, None
|
||||
raise WcsRunInContextError('%s failed' % func, e, formatted_tb)
|
||||
finally:
|
||||
pipe_out.close()
|
||||
else:
|
||||
sys.path.append(os.path.dirname(WCSCTL))
|
||||
try:
|
||||
import wcs.publisher
|
||||
wcs.publisher.WcsPublisher.APP_DIR = self.wcs.app_dir
|
||||
publisher = wcs.publisher.WcsPublisher.create_publisher(
|
||||
register_tld_names=False)
|
||||
publisher.app_dir = self.app_dir
|
||||
publisher.set_config()
|
||||
func()
|
||||
except Exception as e:
|
||||
import traceback
|
||||
pipe_in.send((e, traceback.format_exc()))
|
||||
pipe_in.close()
|
||||
# FIXME: send exception to parent
|
||||
os._exit(1)
|
||||
finally:
|
||||
pipe_in.close()
|
||||
os._exit(0)
|
||||
|
||||
def __wcs_init(self):
|
||||
for name in sorted(dir(self)):
|
||||
if not name.startswith('wcs_init_'):
|
||||
continue
|
||||
method = getattr(self, name)
|
||||
if not hasattr(method, '__call__'):
|
||||
continue
|
||||
self.run_in_context(method)
|
||||
|
||||
@property
|
||||
@contextlib.contextmanager
|
||||
def site_options(self):
|
||||
config = ConfigParser.ConfigParser()
|
||||
|
||||
site_options_path = os.path.join(self.app_dir, 'site-options.cfg')
|
||||
if os.path.exists(site_options_path):
|
||||
with open(site_options_path) as fd:
|
||||
config.readfp(fd)
|
||||
yield config
|
||||
with open(site_options_path, 'w') as fd:
|
||||
fd.seek(0)
|
||||
config.write(fd)
|
||||
|
||||
@property
|
||||
@contextlib.contextmanager
|
||||
def config_pck(self):
|
||||
config_pck_path = os.path.join(self.app_dir, 'config.pck')
|
||||
if os.path.exists(config_pck_path):
|
||||
with open(config_pck_path) as fd:
|
||||
config = pickle.load(fd)
|
||||
else:
|
||||
config = {}
|
||||
yield config
|
||||
with open(config_pck_path, 'w') as fd:
|
||||
pickle.dump(config, fd)
|
||||
|
||||
def add_api_secret(self, orig, secret):
|
||||
with self.site_options as config:
|
||||
if not config.has_section('api-secrets'):
|
||||
config.add_section('api-secrets')
|
||||
config.set('api-secrets', orig, secret)
|
||||
|
||||
def set_postgresql(self, database):
|
||||
with self.site_options as config:
|
||||
if not config.has_section('options'):
|
||||
config.add_section('options')
|
||||
config.set('options', 'postgresql', 'true')
|
||||
|
||||
with self.config_pck as config:
|
||||
config['postgresql'] = {
|
||||
'database': database,
|
||||
}
|
||||
self.run_in_context(self._wcs_init_sql)
|
||||
|
||||
def _wcs_init_sql(self):
|
||||
from quixote import get_publisher
|
||||
|
||||
get_publisher().initialize_sql()
|
||||
|
||||
@property
|
||||
def api(self):
|
||||
from passerelle.utils import wcs
|
||||
self.add_api_secret('test', 'test')
|
||||
return wcs.WcsApi(self.url, name_id='xxx', orig='test', key='test')
|
||||
|
||||
@property
|
||||
def anonym_api(self):
|
||||
from passerelle.utils import wcs
|
||||
self.add_api_secret('test', 'test')
|
||||
return wcs.WcsApi(self.url, orig='test', key='test')
|
||||
|
||||
|
||||
class Wcs(object):
|
||||
def __init__(self, app_dir, port, wcs_host_class=None, **kwargs):
|
||||
self.app_dir = app_dir
|
||||
self.port = port
|
||||
self.wcs_host_class = wcs_host_class or WcsHost
|
||||
self.wcs_host_class_kwargs = kwargs
|
||||
|
||||
@contextlib.contextmanager
|
||||
def host(self, hostname='127.0.0.1', wcs_host_class=None, **kwargs):
|
||||
wcs_host_class = wcs_host_class or self.wcs_host_class
|
||||
app_dir = os.path.join(self.app_dir, hostname)
|
||||
os.mkdir(app_dir)
|
||||
try:
|
||||
init_kwargs = self.wcs_host_class_kwargs.copy()
|
||||
init_kwargs.update(kwargs)
|
||||
yield wcs_host_class(self, hostname, **init_kwargs)
|
||||
finally:
|
||||
shutil.rmtree(app_dir)
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def wcs_factory(base_dir, wcs_class=Wcs, **kwargs):
|
||||
WCSCTL = os.environ.get('WCSCTL')
|
||||
if not WCSCTL:
|
||||
raise Exception('WCSCTL is not defined')
|
||||
tmp_app_dir = tempfile.mkdtemp(dir=base_dir)
|
||||
|
||||
wcs_cfg_path = os.path.join(base_dir, 'wcs.cfg')
|
||||
|
||||
with open(wcs_cfg_path, 'w') as fd:
|
||||
fd.write(u'''[main]
|
||||
app_dir = %s\n''' % tmp_app_dir)
|
||||
|
||||
local_settings_path = os.path.join(base_dir, 'local_settings.py')
|
||||
with open(local_settings_path, 'w') as fd:
|
||||
fd.write(u'''
|
||||
WCS_LEGACY_CONFIG_FILE = '{base_dir}/wcs.cfg'
|
||||
THEMES_DIRECTORY = '/'
|
||||
ALLOWED_HOSTS = ['*']
|
||||
SILENCED_SYSTEM_CHECKS = ['*']
|
||||
DEBUG = False
|
||||
LOGGING = {{
|
||||
'version': 1,
|
||||
'disable_existing_loggers': False,
|
||||
'handlers': {{
|
||||
'console': {{
|
||||
'class': 'logging.StreamHandler',
|
||||
}},
|
||||
}},
|
||||
'loggers': {{
|
||||
'django': {{
|
||||
'handlers': ['console'],
|
||||
'level': os.getenv('DJANGO_LOG_LEVEL', 'INFO'),
|
||||
}},
|
||||
}},
|
||||
}}
|
||||
'''.format(base_dir=base_dir))
|
||||
|
||||
address = '0.0.0.0'
|
||||
port = find_free_tcp_port()
|
||||
|
||||
wcs_pid = os.fork()
|
||||
try:
|
||||
# launch a Django worker for running w.c.s.
|
||||
if not wcs_pid:
|
||||
os.chdir(os.path.dirname(WCSCTL))
|
||||
os.environ['DJANGO_SETTINGS_MODULE'] = 'wcs.settings'
|
||||
os.environ['WCS_SETTINGS_FILE'] = local_settings_path
|
||||
os.execvp('python', ['python', 'manage.py', 'runserver', '--insecure', '--noreload', '%s:%s' % (address, port)])
|
||||
os._exit(0)
|
||||
|
||||
# verify w.c.s. is launched
|
||||
s = socket.socket()
|
||||
i = 0
|
||||
while True:
|
||||
i += 1
|
||||
try:
|
||||
s.connect((address, port))
|
||||
except Exception:
|
||||
time.sleep(0.1)
|
||||
else:
|
||||
s.close()
|
||||
break
|
||||
assert i < 50, 'no connection found after 5 seconds'
|
||||
|
||||
# verify w.c.s. is still running
|
||||
pid, exit_code = os.waitpid(wcs_pid, os.WNOHANG)
|
||||
if pid:
|
||||
raise Exception('w.c.s. stopped with exit-code %s' % exit_code)
|
||||
yield wcs_class(tmp_app_dir, port=port, **kwargs)
|
||||
finally:
|
||||
os.kill(wcs_pid, 9)
|
||||
shutil.rmtree(tmp_app_dir)
|
||||
|
||||
|
||||
class DefaultWcsHost(WcsHost):
|
||||
def wcs_init_01_setup_auth(self):
|
||||
from quixote import get_publisher
|
||||
|
||||
get_publisher().cfg['identification'] = {'methods': ['password']}
|
||||
get_publisher().cfg['debug'] = {'display_exceptions': 'text'}
|
||||
get_publisher().write_cfg()
|
||||
|
||||
def wcs_init_02_create_user(self):
|
||||
from quixote import get_publisher
|
||||
from qommon.ident.password_accounts import PasswordAccount
|
||||
from wcs.roles import Role
|
||||
|
||||
user = get_publisher().user_class()
|
||||
user.name = 'foo bar'
|
||||
user.email = 'foo@example.net'
|
||||
user.store()
|
||||
account = PasswordAccount(id='user')
|
||||
account.set_password('user')
|
||||
account.user_id = user.id
|
||||
account.store()
|
||||
|
||||
role = Role(name='role')
|
||||
role.store()
|
||||
|
||||
user = get_publisher().user_class()
|
||||
user.name = 'admin'
|
||||
user.email = 'admin@example.net'
|
||||
user.name_identifiers = ['xxx']
|
||||
user.is_admin = True
|
||||
user.roles = [str(role.id)]
|
||||
user.store()
|
||||
account = PasswordAccount(id='admin')
|
||||
account.set_password('admin')
|
||||
account.user_id = user.id
|
||||
account.store()
|
||||
|
||||
def wcs_init_03_create_data(self):
|
||||
import datetime
|
||||
import random
|
||||
from quixote import get_publisher
|
||||
|
||||
from wcs.categories import Category
|
||||
from wcs.formdef import FormDef
|
||||
from wcs.roles import Role
|
||||
from wcs import fields
|
||||
|
||||
cat = Category()
|
||||
cat.name = 'Catégorie'
|
||||
cat.description = ''
|
||||
cat.store()
|
||||
|
||||
role = Role.select()[0]
|
||||
|
||||
formdef = FormDef()
|
||||
formdef.name = 'Demande'
|
||||
formdef.category_id = cat.id
|
||||
formdef.workflow_roles = {'_receiver': role.id}
|
||||
formdef.fields = [
|
||||
fields.StringField(id='1', label='1st field', type='string', anonymise=False, varname='string'),
|
||||
fields.ItemField(id='2', label='2nd field', type='item',
|
||||
items=['foo', 'bar', 'baz'], varname='item'),
|
||||
fields.BoolField(id='3', label='3rd field', type='bool', varname='bool'),
|
||||
fields.ItemField(id='4', label='4rth field', type='item', varname='item_open'),
|
||||
fields.ItemField(id='5', label='5th field', type='item',
|
||||
varname='item_datasource',
|
||||
data_source={'type': 'json', 'value': 'http://datasource.com/'}),
|
||||
]
|
||||
formdef.store()
|
||||
|
||||
user = get_publisher().user_class.select()[0]
|
||||
|
||||
for i in range(10):
|
||||
formdata = formdef.data_class()()
|
||||
formdata.just_created()
|
||||
formdata.receipt_time = datetime.datetime(
|
||||
2018,
|
||||
random.randrange(1, 13),
|
||||
random.randrange(1, 29)
|
||||
).timetuple()
|
||||
formdata.data = {'1': 'FOO BAR %d' % i}
|
||||
if i % 4 == 0:
|
||||
formdata.data['2'] = 'foo'
|
||||
formdata.data['2_display'] = 'foo'
|
||||
formdata.data['4'] = 'open_one'
|
||||
formdata.data['4_display'] = 'open_one'
|
||||
elif i % 4 == 1:
|
||||
formdata.data['2'] = 'bar'
|
||||
formdata.data['2_display'] = 'bar'
|
||||
formdata.data['4'] = 'open_two'
|
||||
formdata.data['4_display'] = 'open_two'
|
||||
else:
|
||||
formdata.data['2'] = 'baz'
|
||||
formdata.data['2_display'] = 'baz'
|
||||
formdata.data['4'] = "open'three"
|
||||
formdata.data['4_display'] = "open'three"
|
||||
|
||||
formdata.data['3'] = bool(i % 2)
|
||||
if i % 3 == 0:
|
||||
formdata.jump_status('new')
|
||||
else:
|
||||
formdata.jump_status('finished')
|
||||
if i % 7 == 0:
|
||||
formdata.user_id = user.id
|
||||
formdata.store()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def datasource():
|
||||
@httmock.urlmatch(netloc='datasource.com')
|
||||
def handler(url, request):
|
||||
return {
|
||||
'status_code': 200,
|
||||
'content': {
|
||||
'err': 0,
|
||||
'data': [
|
||||
{'id': '1', 'text': 'hello'},
|
||||
{'id': '2', 'text': 'world'},
|
||||
]
|
||||
},
|
||||
'content-type': 'application/json',
|
||||
}
|
||||
with httmock.HTTMock(handler):
|
||||
yield
|
||||
|
||||
|
||||
@pytest.fixture(scope='session')
|
||||
def wcs(tmp_path_factory):
|
||||
base_dir = tmp_path_factory.mktemp('wcs')
|
||||
with wcs_factory(str(base_dir), wcs_host_class=DefaultWcsHost) as wcs:
|
||||
yield wcs
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def wcs_host(wcs, postgres_db, datasource):
|
||||
with wcs.host('127.0.0.1', database=postgres_db.database) as wcs_host:
|
||||
yield wcs_host
|
|
@ -0,0 +1,79 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# passerelle - uniform access to multiple data sources and services
|
||||
# Copyright (C) 2019 Entr'ouvert
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify it
|
||||
# under the terms of the GNU Affero General Public License as published
|
||||
# by the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import pytest
|
||||
|
||||
from django.utils.six.moves.urllib import parse as urlparse
|
||||
import requests
|
||||
|
||||
|
||||
def test_wcs_fixture(wcs_host):
|
||||
assert wcs_host.url.startswith('http://127.0.0.1:')
|
||||
requests.get(wcs_host.url)
|
||||
response = requests.get(urlparse.urljoin(wcs_host.url, '/api/categories/'))
|
||||
assert response.json()['data'][0]['title'] == u'Catégorie'
|
||||
|
||||
|
||||
def test_wcs_api(wcs_host):
|
||||
from passerelle.utils.wcs import WcsApiError
|
||||
|
||||
api = wcs_host.api
|
||||
assert len(api.categories) == 1
|
||||
assert len(api.formdefs) == 1
|
||||
assert len(api.roles) == 1
|
||||
formdef = api.formdefs['demande']
|
||||
|
||||
assert formdef.schema.fields[4].label == '5th field'
|
||||
assert len(formdef.formdatas) == 10
|
||||
assert len(formdef.formdatas.full) == 10
|
||||
formdata = next(iter(formdef.formdatas))
|
||||
assert formdata is not formdata.full
|
||||
assert formdata.full is formdata.full
|
||||
assert formdata.full.full is formdata.full
|
||||
assert formdata.full.anonymized is not formdata.full
|
||||
|
||||
with formdef.submit() as submitter:
|
||||
with pytest.raises(ValueError):
|
||||
submitter.set('zob', '1')
|
||||
submitter.draft = True
|
||||
submitter.submission_channel = 'mdel'
|
||||
submitter.submission_context = {
|
||||
'mdel_ref': 'ABCD',
|
||||
}
|
||||
submitter.set('string', 'hello')
|
||||
submitter.set('item', 'foo')
|
||||
submitter.set('item_open', {
|
||||
'id': '1',
|
||||
'text': 'world',
|
||||
'foo': 'bar'
|
||||
})
|
||||
submitter.set('item_datasource', {
|
||||
'id': '2',
|
||||
'text': 'world',
|
||||
})
|
||||
|
||||
formdata = formdef.formdatas[submitter.result.id]
|
||||
api = wcs_host.anonym_api
|
||||
|
||||
assert len(api.categories) == 1
|
||||
assert len(api.formdefs) == 1
|
||||
assert len(api.roles) == 1
|
||||
formdef = api.formdefs['demande']
|
||||
assert formdef.schema.fields[4].label == '5th field'
|
||||
with pytest.raises(WcsApiError):
|
||||
assert len(formdef.formdatas) == 10
|
||||
assert len(formdef.formdatas.anonymized) == 10
|
6
tox.ini
6
tox.ini
|
@ -9,13 +9,14 @@ setenv =
|
|||
DJANGO_SETTINGS_MODULE=passerelle.settings
|
||||
PASSERELLE_SETTINGS_FILE=tests/settings.py
|
||||
BRANCH_NAME={env:BRANCH_NAME:}
|
||||
WCSCTL=wcs/wcsctl.py
|
||||
fast: FAST=--nomigrations
|
||||
sqlite: DB_ENGINE=django.db.backends.sqlite3
|
||||
pg: DB_ENGINE=django.db.backends.postgresql_psycopg2
|
||||
deps =
|
||||
django18: django>=1.8,<1.9
|
||||
django111: django>=1.11,<1.12
|
||||
pg: psycopg2-binary
|
||||
psycopg2-binary
|
||||
pytest-cov
|
||||
pytest-django<3.4.6
|
||||
pytest
|
||||
|
@ -32,7 +33,10 @@ deps =
|
|||
pytest-httpbin
|
||||
pytest-localserver
|
||||
pytest-sftpserver
|
||||
http://quixote.python.ca/releases/Quixote-2.7b2.tar.gz
|
||||
vobject
|
||||
commands =
|
||||
./get_wcs.sh
|
||||
django18: py.test {posargs: {env:FAST:} --junitxml=test_{envname}_results.xml --cov-report xml --cov-report html --cov=passerelle/ --cov-config .coveragerc tests/}
|
||||
django18: ./pylint.sh passerelle/
|
||||
django111: py.test {posargs: --junitxml=test_{envname}_results.xml tests/}
|
||||
|
|
Loading…
Reference in New Issue