694 lines
27 KiB
Python
694 lines
27 KiB
Python
# w.c.s. - web application for online forms
|
|
# Copyright (C) 2005-2013 Entr'ouvert
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation; either version 2 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program; if not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import json
|
|
import re
|
|
import time
|
|
import sys
|
|
|
|
from quixote import get_request, get_publisher, get_response
|
|
from quixote.directory import Directory
|
|
|
|
from qommon import _
|
|
from qommon import misc
|
|
from qommon.evalutils import make_datetime
|
|
from qommon.errors import (AccessForbiddenError, QueryError, TraversalError,
|
|
UnknownNameIdAccessForbiddenError)
|
|
from qommon.form import ValidationError, ComputedExpressionWidget
|
|
|
|
from wcs.categories import Category
|
|
from wcs.formdef import FormDef
|
|
from wcs.roles import Role, logged_users_role
|
|
from wcs.forms.common import FormStatusPage
|
|
import wcs.qommon.storage as st
|
|
from wcs.api_utils import is_url_signed, get_user_from_api_query_string
|
|
|
|
from backoffice.management import FormPage as BackofficeFormPage
|
|
from backoffice.management import ManagementDirectory
|
|
|
|
def posted_json_data_to_formdata_data(formdef, data):
|
|
# remap fields from varname to field id
|
|
for field in formdef.get_all_fields():
|
|
if not field.varname:
|
|
continue
|
|
if not field.varname in data:
|
|
continue
|
|
raw = '%s_raw' % field.varname
|
|
structured = '%s_structured' % field.varname
|
|
if field.store_display_value and raw in data:
|
|
data[field.id] = data.pop(raw)
|
|
data['%s_display' % field.id] = data.pop(field.varname)
|
|
else:
|
|
data[field.id] = data.pop(field.varname)
|
|
if field.store_structured_value and structured in data:
|
|
data['%s_structured' % field.id] = data.pop(structured)
|
|
|
|
# parse special fields
|
|
for field in formdef.get_all_fields():
|
|
if not hasattr(field, 'from_json_value'):
|
|
continue
|
|
if data.get(field.id) is None:
|
|
continue
|
|
data[field.id] = field.from_json_value(data[field.id])
|
|
|
|
return data
|
|
|
|
|
|
def get_formdata_dict(formdata, user, consider_status_visibility=True):
|
|
if consider_status_visibility and not formdata.is_draft():
|
|
status = formdata.get_visible_status(user=user)
|
|
if not status:
|
|
# skip hidden forms
|
|
return None
|
|
else:
|
|
status = formdata.get_status()
|
|
|
|
d = {
|
|
'name': formdata.formdef.name,
|
|
'url': formdata.get_url(),
|
|
'datetime': misc.strftime('%Y-%m-%d %H:%M:%S', formdata.receipt_time),
|
|
'status': status.name if not formdata.is_draft() else _('Draft'),
|
|
'status_css_class': status.extra_css_class if status else None,
|
|
'keywords': formdata.formdef.keywords_list,
|
|
'draft': formdata.is_draft(),
|
|
}
|
|
if formdata.last_update_time:
|
|
d['last_update_time'] = misc.strftime('%Y-%m-%d %H:%M:%S', formdata.last_update_time)
|
|
|
|
if formdata.is_draft():
|
|
d['url'] = d['url'].rstrip('/')
|
|
d['form_number_raw'] = d['form_number'] = None
|
|
d['title'] = _('%(name)s (draft)') % {'name': formdata.formdef.name}
|
|
else:
|
|
d['title'] = _('%(name)s #%(id)s (%(status)s)') % {
|
|
'name': formdata.formdef.name,
|
|
'id': formdata.get_display_id(),
|
|
'status': status.name,
|
|
}
|
|
|
|
d.update(formdata.get_substitution_variables(minimal=True))
|
|
if get_request().form.get('full') == 'on':
|
|
d.update(formdata.get_json_export_dict(include_files=False))
|
|
return d
|
|
|
|
|
|
class ApiFormdataPage(FormStatusPage):
|
|
_q_exports_orig = ['', 'download']
|
|
|
|
def _q_index(self):
|
|
if get_request().get_method() == 'POST':
|
|
return self.post()
|
|
return self.json()
|
|
|
|
def post(self):
|
|
get_response().set_content_type('application/json')
|
|
api_user = get_user_from_api_query_string()
|
|
|
|
# check the formdata is currently editable
|
|
wf_status = self.formdata.get_status()
|
|
for item in wf_status.items:
|
|
if not item.key == 'editable':
|
|
continue
|
|
if not item.check_auth(self.formdata, api_user):
|
|
continue
|
|
|
|
json_input = get_request().json
|
|
data = posted_json_data_to_formdata_data(self.formdef, json_input['data'])
|
|
self.formdata.data.update(data)
|
|
self.formdata.store()
|
|
|
|
if item.status:
|
|
self.formdata.jump_status(item.status)
|
|
self.formdata.perform_workflow()
|
|
|
|
return json.dumps({'err': 0, 'data': {'id': self.formdata.id}})
|
|
|
|
raise AccessForbiddenError('formdata is not editable by given user')
|
|
|
|
def check_receiver(self):
|
|
api_user = get_user_from_api_query_string()
|
|
if not api_user:
|
|
if get_request().user and get_request().user.is_admin:
|
|
return # grant access to admins, to ease debug
|
|
raise AccessForbiddenError('user not authenticated')
|
|
if not self.formdef.is_user_allowed_read_status_and_history(api_user, self.filled):
|
|
raise AccessForbiddenError('unsufficient roles')
|
|
|
|
|
|
class ApiFormPage(BackofficeFormPage):
|
|
_q_exports = [('list', 'json'), 'geojson'] # restrict to API endpoints
|
|
|
|
def __init__(self, component):
|
|
try:
|
|
self.formdef = FormDef.get_by_urlname(component)
|
|
except KeyError:
|
|
raise TraversalError()
|
|
|
|
def check_access(self, api_name=None):
|
|
if 'anonymise' in get_request().form:
|
|
if not is_url_signed() or (get_request().user and get_request().user.is_admin):
|
|
raise AccessForbiddenError('user not authenticated')
|
|
else:
|
|
api_user = get_user_from_api_query_string(api_name=api_name)
|
|
if not api_user:
|
|
if get_request().user and get_request().user.is_admin:
|
|
return # grant access to admins, to ease debug
|
|
raise AccessForbiddenError('user not authenticated')
|
|
if not self.formdef.is_of_concern_for_user(api_user):
|
|
raise AccessForbiddenError('unsufficient roles')
|
|
|
|
def _q_lookup(self, component):
|
|
if component == 'ics':
|
|
return self.ics()
|
|
|
|
# check access for all paths, to block access to formdata that would
|
|
# otherwise be accessible if the user is the submitter.
|
|
self.check_access()
|
|
try:
|
|
formdata = self.formdef.data_class().get(component)
|
|
except KeyError:
|
|
raise TraversalError()
|
|
return ApiFormdataPage(self.formdef, formdata)
|
|
|
|
|
|
class ApiFormsDirectory(Directory):
|
|
_q_exports = ['', 'geojson']
|
|
|
|
def check_access(self):
|
|
if not is_url_signed():
|
|
# grant access to admins, to ease debug
|
|
if not (get_request().user and get_request().user.is_admin):
|
|
raise AccessForbiddenError('user not authenticated')
|
|
|
|
def _q_index(self):
|
|
if not get_publisher().is_using_postgresql():
|
|
raise TraversalError()
|
|
|
|
self.check_access()
|
|
get_request().user = get_user_from_api_query_string() or get_request().user
|
|
|
|
from wcs import sql
|
|
|
|
management_directory = ManagementDirectory()
|
|
criterias = management_directory.get_global_listing_criterias()
|
|
|
|
limit = int(get_request().form.get('limit',
|
|
get_publisher().get_site_option('default-page-size') or 20))
|
|
offset = int(get_request().form.get('offset', 0))
|
|
order_by = get_request().form.get('order_by',
|
|
get_publisher().get_site_option('default-sort-order') or '-receipt_time')
|
|
|
|
output = [get_formdata_dict(x, user=get_request().user, consider_status_visibility=False)
|
|
for x in sql.AnyFormData.select(
|
|
criterias, order_by=order_by, limit=limit, offset=offset)]
|
|
|
|
get_response().set_content_type('application/json')
|
|
return json.dumps({'data': output},
|
|
cls=misc.JSONEncoder,
|
|
encoding=get_publisher().site_charset)
|
|
|
|
|
|
def geojson(self):
|
|
if not get_publisher().is_using_postgresql():
|
|
raise TraversalError()
|
|
self.check_access()
|
|
get_request().user = get_user_from_api_query_string() or get_request().user
|
|
return ManagementDirectory().geojson()
|
|
|
|
def _q_lookup(self, component):
|
|
return ApiFormPage(component)
|
|
|
|
|
|
class ApiFormdefDirectory(Directory):
|
|
_q_exports = ['schema', 'submit']
|
|
|
|
def __init__(self, formdef):
|
|
self.formdef = formdef
|
|
|
|
def schema(self):
|
|
get_response().set_content_type('application/json')
|
|
return self.formdef.export_to_json()
|
|
|
|
def submit(self):
|
|
# expects json as input
|
|
# {
|
|
# "meta": {
|
|
# "attr": "value"
|
|
# },
|
|
# "data": {
|
|
# "0": "value",
|
|
# "1": "value",
|
|
# ...
|
|
# }
|
|
# }
|
|
get_response().set_content_type('application/json')
|
|
if self.formdef.is_disabled():
|
|
raise AccessForbiddenError('disabled form')
|
|
if not is_url_signed():
|
|
raise AccessForbiddenError('unsigned API call')
|
|
user = get_user_from_api_query_string()
|
|
json_input = get_request().json
|
|
formdata = self.formdef.data_class()()
|
|
|
|
if 'data' in json_input:
|
|
# the published API expects data in 'data'.
|
|
data = json_input['data']
|
|
elif 'fields' in json_input:
|
|
# but the API also supports data in 'fields', to match the json
|
|
# output produded by wf/wscall.py.
|
|
data = json_input['fields']
|
|
if 'workflow' in json_input and json_input['workflow'].get('fields'):
|
|
# handle workflow fields, put them all in the same data dictionary.
|
|
data.update(json_input['workflow']['fields'])
|
|
if 'extra' in json_input:
|
|
data.update(json_input['extra'])
|
|
else:
|
|
data = {}
|
|
|
|
formdata.data = posted_json_data_to_formdata_data(self.formdef, data)
|
|
|
|
meta = json_input.get('meta') or {}
|
|
if meta.get('backoffice-submission'):
|
|
if not user:
|
|
raise AccessForbiddenError('no user set for backoffice submission')
|
|
if not self.formdef.backoffice_submission_roles:
|
|
raise AccessForbiddenError('no backoffice submission roles')
|
|
if not set(user.roles or []).intersection(self.formdef.backoffice_submission_roles):
|
|
raise AccessForbiddenError('not cleared for backoffice submit')
|
|
formdata.backoffice_submission = True
|
|
elif 'user' in json_input:
|
|
formdata_user = None
|
|
for name_id in json_input['user'].get('NameID'):
|
|
formdata_user = get_publisher().user_class.get_users_with_name_identifier(name_id)
|
|
if formdata_user:
|
|
break
|
|
else:
|
|
if json_input['user'].get('email'):
|
|
formdata_user = get_publisher().user_class.get_users_with_email(
|
|
json_input['user'].get('email'))
|
|
if formdata_user:
|
|
formdata.user_id = formdata_user[0].id
|
|
elif user:
|
|
formdata.user_id = user.id
|
|
if json_input.get('context'):
|
|
formdata.submission_context = json_input['context']
|
|
formdata.submission_channel = formdata.submission_context.pop('channel', None)
|
|
formdata.user_id = formdata.submission_context.pop('user_id', None)
|
|
|
|
if self.formdef.only_allow_one and formdata.user_id:
|
|
user_id = formdata.user_id
|
|
user_forms = self.formdef.data_class().get_with_indexed_value('user_id', user_id)
|
|
if user_forms:
|
|
raise AccessForbiddenError('only one formdata by user is allowed')
|
|
|
|
if meta.get('backoffice-submission'):
|
|
# keep track of the agent that did the submit
|
|
if not formdata.submission_context:
|
|
formdata.submission_context = {}
|
|
formdata.submission_context['agent_id'] = user.id
|
|
|
|
formdata.store()
|
|
if self.formdef.enable_tracking_codes:
|
|
code = get_publisher().tracking_code_class()
|
|
code.formdata = formdata # this will .store() the code
|
|
if meta.get('draft'):
|
|
formdata.status = 'draft'
|
|
formdata.receipt_time = time.localtime()
|
|
formdata.store()
|
|
else:
|
|
formdata.just_created()
|
|
formdata.store()
|
|
formdata.perform_workflow()
|
|
formdata.store()
|
|
return json.dumps({'err': 0, 'data': {'id': formdata.id}})
|
|
|
|
|
|
class ApiFormdefsDirectory(Directory):
|
|
_q_exports = ['']
|
|
|
|
def __init__(self, category=None):
|
|
self.category = category
|
|
|
|
def get_list_forms(self, user, list_all_forms=False, formdefs=None):
|
|
list_forms = []
|
|
if formdefs is None:
|
|
formdefs = FormDef.select(order_by='name', ignore_errors=True)
|
|
|
|
formdefs = [x for x in formdefs if not x.is_disabled() or x.disabled_redirection]
|
|
|
|
if self.category:
|
|
formdefs = [x for x in formdefs if str(x.category_id) == str(self.category.id)]
|
|
|
|
charset = get_publisher().site_charset
|
|
|
|
for formdef in formdefs:
|
|
authentication_required = False
|
|
if formdef.roles and not list_all_forms:
|
|
if not user:
|
|
if not formdef.always_advertise:
|
|
continue
|
|
authentication_required = True
|
|
elif logged_users_role().id not in formdef.roles:
|
|
for q in user.roles or []:
|
|
if q in formdef.roles:
|
|
break
|
|
else:
|
|
if not formdef.always_advertise:
|
|
continue
|
|
authentication_required = True
|
|
|
|
formdict = {'title': unicode(formdef.name, charset),
|
|
'slug': formdef.url_name,
|
|
'url': formdef.get_url(),
|
|
'description': formdef.description or '',
|
|
'keywords': formdef.keywords_list,
|
|
'authentication_required': authentication_required}
|
|
if formdef.required_authentication_contexts:
|
|
formdict['required_authentication_contexts'] = formdef.required_authentication_contexts
|
|
|
|
formdict['redirection'] = bool(formdef.is_disabled() and
|
|
formdef.disabled_redirection)
|
|
|
|
# we include the count of submitted forms so it's possible to sort
|
|
# them by popularity
|
|
formdict['count'] = formdef.data_class().count()
|
|
|
|
formdict['functions'] = {}
|
|
formdef_workflow_roles = formdef.workflow_roles or {}
|
|
for (wf_role_id, wf_role_label) in formdef.workflow.roles.items():
|
|
workflow_function = {'label': wf_role_label}
|
|
role_id = formdef_workflow_roles.get(wf_role_id)
|
|
if role_id:
|
|
try:
|
|
workflow_function['role'] = Role.get(role_id).get_json_export_dict()
|
|
except KeyError:
|
|
pass
|
|
formdict['functions'][wf_role_id] = workflow_function
|
|
|
|
if formdef.category:
|
|
formdict['category'] = unicode(formdef.category.name, charset)
|
|
formdict['category_slug'] = unicode(formdef.category.url_name, charset)
|
|
formdict['category_position'] = (formdef.category.position or 0)
|
|
else:
|
|
formdict['category_position'] = sys.maxint
|
|
|
|
list_forms.append(formdict)
|
|
|
|
return list_forms
|
|
|
|
def _q_index(self):
|
|
try:
|
|
user = get_user_from_api_query_string()
|
|
if user is None and get_request().user:
|
|
user = get_request().user # helps debugging
|
|
except UnknownNameIdAccessForbiddenError:
|
|
# if authenticating the user via the query string failed, return
|
|
# results for the anonymous case; user is set to 'False' as a
|
|
# signed URL with a None user is considered like an appropriate
|
|
# webservice call.
|
|
user = False
|
|
list_all_forms = (user and user.is_admin) or (is_url_signed() and user is None)
|
|
|
|
list_forms = self.get_list_forms(user, list_all_forms)
|
|
|
|
list_forms.sort(lambda x, y: cmp(x['category_position'], y['category_position']))
|
|
for formdict in list_forms:
|
|
del formdict['category_position']
|
|
|
|
get_response().set_content_type('application/json')
|
|
return json.dumps(list_forms)
|
|
|
|
def _q_lookup(self, component):
|
|
try:
|
|
formdef = FormDef.get_by_urlname(component)
|
|
except KeyError:
|
|
raise TraversalError()
|
|
return ApiFormdefDirectory(formdef)
|
|
|
|
|
|
class ApiCategoryDirectory(Directory):
|
|
_q_exports = ['formdefs']
|
|
|
|
def __init__(self, category):
|
|
self.category = category
|
|
self.formdefs = ApiFormdefsDirectory(category)
|
|
|
|
|
|
class ApiCategoriesDirectory(Directory):
|
|
_q_exports = ['']
|
|
|
|
def __init__(self):
|
|
pass
|
|
|
|
def _q_index(self):
|
|
try:
|
|
user = get_user_from_api_query_string() or get_request().user
|
|
except UnknownNameIdAccessForbiddenError:
|
|
# the name id was unknown, return the categories for anonymous
|
|
# users.
|
|
user = None
|
|
list_all_forms = (user and user.is_admin) or (is_url_signed() and user is None)
|
|
list_categories = []
|
|
charset = get_publisher().site_charset
|
|
categories = Category.select()
|
|
Category.sort_by_position(categories)
|
|
all_formdefs = FormDef.select(order_by='name', ignore_errors=True)
|
|
for category in categories:
|
|
d = {}
|
|
d['title'] = unicode(category.name, charset)
|
|
d['slug'] = category.url_name
|
|
d['url'] = category.get_url()
|
|
if category.description:
|
|
d['description'] = unicode(str(category.get_description_html_text(editable=False)), charset)
|
|
formdefs = ApiFormdefsDirectory(category).get_list_forms(user,
|
|
formdefs=all_formdefs, list_all_forms=list_all_forms)
|
|
if not formdefs:
|
|
# don't advertise empty categories
|
|
continue
|
|
keywords = {}
|
|
for formdef in formdefs:
|
|
for keyword in formdef['keywords']:
|
|
keywords[keyword] = True
|
|
d['keywords'] = keywords.keys()
|
|
if get_request().form.get('full') == 'on':
|
|
d['forms'] = formdefs
|
|
list_categories.append(d)
|
|
get_response().set_content_type('application/json')
|
|
return json.dumps({'data': list_categories})
|
|
|
|
def _q_lookup(self, component):
|
|
try:
|
|
return ApiCategoryDirectory(Category.get_by_urlname(component))
|
|
except KeyError:
|
|
raise TraversalError()
|
|
|
|
|
|
class ApiUserDirectory(Directory):
|
|
_q_exports = ['', 'forms', 'drafts']
|
|
|
|
def __init__(self, user=None):
|
|
self.user = user
|
|
|
|
def _q_index(self):
|
|
get_response().set_content_type('application/json')
|
|
user = self.user or get_user_from_api_query_string() or get_request().user
|
|
if not user:
|
|
raise AccessForbiddenError('no user specified')
|
|
user_info = user.get_substitution_variables(prefix='')
|
|
del user_info['user']
|
|
user_info['id'] = user.id
|
|
user_roles = [Role.get(x, ignore_errors=True) for x in user.roles or []]
|
|
user_info['user_roles'] = [x.get_json_export_dict() for x in user_roles if x]
|
|
return json.dumps(user_info)
|
|
|
|
def get_user_forms(self, user):
|
|
if get_publisher().is_using_postgresql() and not get_request().form.get('full') == 'on':
|
|
from wcs import sql
|
|
from qommon.storage import Equal
|
|
user_forms = sql.AnyFormData.select([Equal('user_id', str(user.id))], order_by='receipt_time')
|
|
else:
|
|
formdefs = FormDef.select()
|
|
user_forms = []
|
|
for formdef in formdefs:
|
|
user_forms.extend(formdef.data_class().get_with_indexed_value(
|
|
'user_id', user.id))
|
|
user_forms.sort(lambda x, y: cmp(x.receipt_time, y.receipt_time))
|
|
return user_forms
|
|
|
|
def drafts(self):
|
|
return self.forms(include_drafts=True, include_non_drafts=False)
|
|
|
|
def forms(self, include_drafts=False, include_non_drafts=True):
|
|
get_response().set_content_type('application/json')
|
|
user = self.user or get_user_from_api_query_string() or get_request().user
|
|
if not user:
|
|
raise AccessForbiddenError('no user specified')
|
|
forms = []
|
|
include_drafts = include_drafts or get_request().form.get('include-drafts') == 'true'
|
|
for form in self.get_user_forms(user):
|
|
if form.is_draft():
|
|
if not include_drafts:
|
|
continue
|
|
if form.formdef.is_disabled() or not form.formdef.enable_tracking_codes:
|
|
# the form or its draft support has been disabled
|
|
continue
|
|
elif not include_non_drafts:
|
|
continue
|
|
formdata_dict = get_formdata_dict(form, user)
|
|
if not formdata_dict:
|
|
# skip hidden forms
|
|
continue
|
|
forms.append(formdata_dict)
|
|
|
|
return json.dumps(forms,
|
|
cls=misc.JSONEncoder,
|
|
encoding=get_publisher().site_charset)
|
|
|
|
|
|
class ApiUsersDirectory(Directory):
|
|
_q_exports = ['']
|
|
|
|
def _q_index(self):
|
|
get_response().set_content_type('application/json')
|
|
if not (is_url_signed() or (
|
|
get_request().user and get_request().user.can_go_in_admin())):
|
|
raise AccessForbiddenError('unsigned request or user has no access to backoffice')
|
|
|
|
criterias = []
|
|
query = get_request().form.get('q')
|
|
if query:
|
|
from admin.settings import UserFieldsFormDef
|
|
formdef = UserFieldsFormDef()
|
|
criteria_fields = [
|
|
st.ILike('name', query),
|
|
st.ILike('ascii_name', misc.simplify(query, ' ')),
|
|
st.ILike('email', query)]
|
|
for field in formdef.fields:
|
|
if field.type in ('string', 'text', 'email'):
|
|
criteria_fields.append(st.ILike('f%s' % field.id, query))
|
|
if get_publisher().is_using_postgresql():
|
|
criteria_fields.append(st.FtsMatch(query))
|
|
criterias.append(st.Or(criteria_fields))
|
|
|
|
def as_dict(user):
|
|
user_info = user.get_substitution_variables(prefix='')
|
|
del user_info['user']
|
|
user_info['user_id'] = user.id
|
|
user_roles = [Role.get(x, ignore_errors=True) for x in user.roles or []]
|
|
user_info['user_roles'] = [x.get_json_export_dict() for x in user_roles if x]
|
|
return user_info
|
|
|
|
limit = int(get_request().form.get('limit') or '0') or None
|
|
users = get_publisher().user_class.select(order_by='name',
|
|
clause=criterias, limit=limit)
|
|
data = [as_dict(x) for x in users]
|
|
return json.dumps({'data': data, 'err': 0})
|
|
|
|
def _q_lookup(self, component):
|
|
if not (is_url_signed() or (
|
|
get_request().user and get_request().user.can_go_in_admin())):
|
|
raise AccessForbiddenError('unsigned request or user has no access to backoffice')
|
|
user_class = get_publisher().user_class
|
|
try:
|
|
int(component) # makes sure this is an id
|
|
user = user_class.get(component)
|
|
except (KeyError, ValueError):
|
|
try:
|
|
user = user_class.get_users_with_name_identifier(component)[0]
|
|
except IndexError:
|
|
raise TraversalError()
|
|
return ApiUserDirectory(user)
|
|
|
|
|
|
class ApiTrackingCodeDirectory(Directory):
|
|
def _q_lookup(self, component):
|
|
get_response().set_content_type('application/json')
|
|
try:
|
|
tracking_code = get_publisher().tracking_code_class.get(component)
|
|
except KeyError:
|
|
raise TraversalError()
|
|
try:
|
|
formdata = tracking_code.formdata
|
|
except KeyError:
|
|
raise TraversalError()
|
|
if formdata.formdef.enable_tracking_codes is False:
|
|
raise TraversalError()
|
|
data = {'err': 0, 'url': formdata.get_url().rstrip('/')}
|
|
return json.dumps(data)
|
|
|
|
|
|
class ApiDirectory(Directory):
|
|
_q_exports = ['forms', 'roles', ('reverse-geocoding', 'reverse_geocoding'),
|
|
'formdefs', 'categories', 'user', 'users', 'code',
|
|
('validate-expression', 'validate_expression'),]
|
|
|
|
forms = ApiFormsDirectory()
|
|
formdefs = ApiFormdefsDirectory()
|
|
categories = ApiCategoriesDirectory()
|
|
user = ApiUserDirectory()
|
|
users = ApiUsersDirectory()
|
|
code = ApiTrackingCodeDirectory()
|
|
|
|
def reverse_geocoding(self):
|
|
try:
|
|
lat = get_request().form['lat']
|
|
lon = get_request().form['lon']
|
|
except KeyError:
|
|
raise QueryError
|
|
nominatim_url = get_publisher().get_site_option('nominatim_url')
|
|
if not nominatim_url:
|
|
nominatim_url = 'http://nominatim.openstreetmap.org'
|
|
nominatim_reverse_zoom_level = get_publisher().get_site_option(
|
|
'nominatim_reverse_zoom_level') or 18
|
|
get_response().set_content_type('application/json')
|
|
url = '%s/reverse?format=json&zoom=%s&addressdetails=1&lat=%s&lon=%s' % (
|
|
nominatim_url, nominatim_reverse_zoom_level, lat, lon)
|
|
url += '&accept-language=%s' % (get_publisher().get_site_language() or 'en')
|
|
if get_publisher().get_site_option('nominatim_key'):
|
|
url += '&key=' + get_publisher().get_site_option('nominatim_key')
|
|
return misc.urlopen(url).read()
|
|
|
|
def roles(self):
|
|
get_response().set_content_type('application/json')
|
|
if not (is_url_signed() or (
|
|
get_request().user and get_request().user.can_go_in_admin())):
|
|
raise AccessForbiddenError('unsigned request or user has no access to backoffice')
|
|
list_roles = []
|
|
charset = get_publisher().site_charset
|
|
for role in Role.select():
|
|
list_roles.append(role.get_json_export_dict())
|
|
get_response().set_content_type('application/json')
|
|
return json.dumps({'data': list_roles})
|
|
|
|
def validate_expression(self):
|
|
get_response().set_content_type('application/json')
|
|
expression = get_request().form.get('expression')
|
|
hint = {'klass': None, 'msg': ''}
|
|
try:
|
|
ComputedExpressionWidget.validate(expression)
|
|
except ValidationError as e:
|
|
hint['klass'] = 'error'
|
|
hint['msg'] = str(e)
|
|
else:
|
|
if expression and re.match(r'^=.*\[[a-zA-Z_]\w*\]', expression):
|
|
hint['klass'] = 'warning'
|
|
hint['msg'] = _('Make sure you want a Python expression, not a simple template string.')
|
|
return json.dumps(hint)
|
|
|
|
def _q_traverse(self, path):
|
|
get_request().is_json_marker = True
|
|
return super(ApiDirectory, self)._q_traverse(path)
|