# w.c.s. - web application for online forms # Copyright (C) 2005-2013 Entr'ouvert # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, see . import base64 import copy import datetime import json import re import urllib.parse from django.http import HttpResponse, HttpResponseBadRequest, JsonResponse from django.utils.encoding import force_bytes from django.utils.timezone import localtime, make_naive from quixote import get_publisher, get_request, get_response, get_session, redirect from quixote.directory import Directory from quixote.errors import MethodNotAllowedError, RequestError from quixote.html import TemplateIO, htmltext import wcs.qommon.storage as st from wcs.admin.settings import UserFieldsFormDef from wcs.api_utils import get_query_flag, get_user_from_api_query_string, is_url_signed, sign_url_auto_orig from wcs.carddef import CardDef from wcs.categories import Category from wcs.conditions import Condition, ValidationError from wcs.ctl.management.commands.hobo_notify import Command as CmdHoboNotify from wcs.data_sources import NamedDataSource from wcs.data_sources import get_object as get_data_source_object from wcs.data_sources import request_json_items from wcs.formdef import FormDef from wcs.forms.common import FileDirectory, FormStatusPage from wcs.qommon import get_cfg from wcs.qommon.afterjobs import AfterJob from wcs.roles import logged_users_role from wcs.sql_criterias import ( Contains, ElementIntersects, Equal, FtsMatch, ILike, Intersects, NotContains, Nothing, Null, Or, StrictNotEqual, ) from wcs.workflows import ContentSnapshotPart from wcs.wscalls import UnflattenKeysException, unflatten_keys from .backoffice.data_management import CardPage as BackofficeCardPage from .backoffice.management import FormPage as BackofficeFormPage from .backoffice.management import ManagementDirectory from .backoffice.submission import SubmissionDirectory from .qommon import _, misc from .qommon.errors import AccessForbiddenError, TraversalError, UnknownNameIdAccessForbiddenError from .qommon.template import Template, TemplateError def posted_json_data_to_formdata_data(formdef, data): data = copy.deepcopy(data) # remap fields from varname to field id for field in formdef.get_all_fields(): if not field.varname: continue if field.varname not in data: continue raw = '%s_raw' % field.varname structured = '%s_structured' % field.varname if field.store_display_value and raw in data: data[field.id] = data.pop(raw) data['%s_display' % field.id] = data.pop(field.varname) else: data[field.id] = data.pop(field.varname) if field.store_structured_value and structured in data: data['%s_structured' % field.id] = data.pop(structured) # merge unnamed fields if they exist if '_unnamed' in data: unnamed_data = data.pop('_unnamed') for k in unnamed_data.keys(): data[k] = unnamed_data.get('%s_raw' % k, unnamed_data.get(k)) # prefer raw value # create a temporary formdata so datasources using previous fields in # parameters can find their values. transient_formdata = formdef.data_class()() transient_formdata.data = data with get_publisher().substitutions.temporary_feed(transient_formdata, force_mode='lazy'): # complete/adapt field values for field in formdef.get_all_fields(): structured = '%s_structured' % field.id display = '%s_display' % field.id if data.get(field.id) is None: continue if hasattr(field, 'from_json_value'): data[field.id] = field.from_json_value(data[field.id]) # only fill display/structured if both are absent if display not in data and structured not in data: if field.store_display_value: display_value = field.store_display_value(data, field.id) if display_value is not None: data[display] = display_value if field.store_structured_value: structured_value = field.store_structured_value(data, field.id) if structured_value is not None: data[structured] = structured_value return data def get_formdata_dict(formdata, user, consider_status_visibility=True): if consider_status_visibility and not formdata.is_draft(): status = formdata.get_visible_status(user=user) else: status = formdata.get_status() status_name = None if formdata.is_draft(): status_name = _('Draft') elif status: status_name = status.name d = { 'name': formdata.formdef.name, 'url': formdata.get_url(), 'datetime': misc.strftime('%Y-%m-%d %H:%M:%S', formdata.receipt_time), 'status': status_name, 'status_css_class': status.extra_css_class if status else None, 'keywords': formdata.formdef.keywords_list, 'draft': formdata.is_draft(), } if formdata.last_update_time: d['last_update_time'] = misc.strftime('%Y-%m-%d %H:%M:%S', formdata.last_update_time) if formdata.is_draft(): d['form_number_raw'] = d['form_number'] = None d['title'] = _('%(name)s (draft)') % {'name': formdata.formdef.name} else: d['title'] = _('%(name)s #%(id)s (%(status)s)') % { 'name': formdata.formdef.name, 'id': formdata.get_display_id(), 'status': status_name or _('unknown'), } d.update(formdata.get_static_substitution_variables(minimal=True)) if get_request().form.get('full') == 'on': d.update(formdata.get_json_export_dict(include_files=False, user=user)) if d.get('form_receipt_datetime'): d['form_receipt_datetime'] = make_naive(d['form_receipt_datetime'].replace(microsecond=0)) if d.get('form_last_update_datetime'): d['form_last_update_datetime'] = make_naive(d['form_last_update_datetime'].replace(microsecond=0)) return d class ApiFormdataPage(FormStatusPage): _q_exports_orig = ['', 'download'] def _q_index(self): if get_request().get_method() == 'POST': return self.post() return self.json() def post(self): get_response().set_content_type('application/json') api_user = get_user_from_api_query_string() if self.formdata.is_draft(): raise AccessForbiddenError('formdata is not editable (still a draft)') # check the formdata is currently editable wf_status = self.formdata.get_status() for item in wf_status.items: if not item.key == 'editable': continue if not item.check_auth(self.formdata, api_user): continue json_input = get_request().json if not isinstance(json_input, dict): raise RequestError('payload is not a dict') if 'data' not in json_input: raise RequestError('missing data entry in payload') data = posted_json_data_to_formdata_data(self.formdef, json_input['data']) old_data = copy.deepcopy(self.formdata.data) self.formdata.data.update(data) self.formdata.store() if self.formdata.jump_status(item.status): self.formdata.record_workflow_event('api-post-edit-action', action_item_id=item.id) self.formdata.perform_workflow() ContentSnapshotPart.take(formdata=self.formdata, old_data=old_data, user=api_user) self.formdata.store() return json.dumps({'err': 0, 'data': {'id': str(self.formdata.id)}}) raise AccessForbiddenError('formdata is not editable by given user') def check_receiver(self): api_user = get_user_from_api_query_string() if not api_user: if get_request().user and get_request().user.is_admin: return # grant access to admins, to ease debug raise AccessForbiddenError('user not authenticated') if not self.formdef.is_user_allowed_read_status_and_history(api_user, self.filled): raise AccessForbiddenError('unsufficient roles') class ApiFormPageMixin: def __init__(self, component): try: self.formdef = self.formdef_class.get_by_urlname(component) except KeyError: raise TraversalError() self._view = None def check_access(self, api_name=None): if get_request().user and get_request().user.is_admin: return # grant access to admins, to ease debug if get_request().has_anonymised_data_api_restriction() and is_url_signed(): # when requesting anonymous data, a signature is enough return api_user = get_user_from_api_query_string(api_name=api_name) if not api_user: raise AccessForbiddenError('user not authenticated') if not self.formdef.is_of_concern_for_user(api_user): raise AccessForbiddenError('unsufficient roles') def _q_lookup(self, component): if component == 'ics': return self.ics() if not misc.is_ascii_digit(component) and not self._view: for view in self.get_custom_views( [StrictNotEqual('visibility', 'owner'), Equal('slug', component)] ): # /api/cards//// self._view = view return self # check access for all paths (except webooks), to block access to # formdata that would otherwise be accessible if the user is the # submitter. if not self.is_webhook: self.check_access() try: formdata = self.formdef.data_class().get_by_id(component) except KeyError: raise TraversalError() return ApiFormdataPage(self.formdef, formdata, custom_view=self._view) def _q_traverse(self, path): if path[0] in ('list', 'ods', 'geojson') and len([x for x in path if x]) < 3: # /api/cards/// # /api/cards//// if path[-1]: # always consider a trailing slash path.append('') if path[1]: for view in self.get_custom_views( [StrictNotEqual('visibility', 'owner'), Equal('slug', path[1])] ): self._view = view break else: path = ['not-found'] path = [path[0]] if len(path) >= 2 and path[1] == 'ics': for view in self.get_custom_views( [StrictNotEqual('visibility', 'owner'), Equal('slug', path[0])] ): self._view = view path = path[1:] if misc.is_ascii_digit(path[-1]): # allow trailing / after path.append('') self.is_webhook = False if len(path) > 1: # webhooks have their own access checks, request cannot be blocked # at this point. self.is_webhook = bool(path[1] == 'hooks') return super()._q_traverse(path) class ApiFormPage(ApiFormPageMixin, BackofficeFormPage): _q_exports = [('list', 'json'), 'geojson', 'ods'] # restrict to API endpoints class ApiCardPage(ApiFormPageMixin, BackofficeCardPage): _q_exports = [ # restricted to API endpoints ('list', 'json'), ('import-csv', 'import_csv'), ('import-json', 'import_json'), 'geojson', 'ods', ('@schema', 'schema'), 'submit', ] def check_access(self, api_name=None): if is_url_signed() and get_user_from_api_query_string(api_name=api_name) is None: # signed but no user specified, grant access. class ApiAdminUser: id = Ellipsis # make sure it fails all over the place if used is_admin = True is_api_user = True get_roles = lambda x: [] get_request()._user = ApiAdminUser() return True return super().check_access(api_name=api_name) def schema(self): if is_url_signed() or self.formdef.has_admin_access(get_user_from_api_query_string()): get_response().set_content_type('application/json') return self.formdef.export_to_json(with_user_fields=True) raise AccessForbiddenError() def submit(self): get_response().set_content_type('application/json') user = get_user_from_api_query_string() if user and user.is_api_user: pass # API users are ok else: get_request()._user = user json_input = get_request().json formdata = self.formdef.data_class()() if not (user and self.formdef.can_user_add_cards(user)): raise AccessForbiddenError('cannot create card') if not isinstance(json_input, dict): raise RequestError('invalid payload') if 'data' in json_input: # the published API expects data in 'data'. data = json_input['data'] elif 'fields' in json_input: # but the API also supports data in 'fields', to match the json # output produded by wf/wscall.py. data = json_input['fields'] if 'workflow' in json_input and json_input['workflow'].get('fields'): # handle workflow fields, put them all in the same data dictionary. data.update(json_input['workflow']['fields']) if 'extra' in json_input: data.update(json_input['extra']) else: data = {} if not isinstance(data, dict): raise RequestError('invalid data parameter') formdata.data = posted_json_data_to_formdata_data(self.formdef, data) if 'user' in json_input: if not isinstance(json_input['user'], dict): raise RequestError('invalid user parameter') formdata.set_user_from_json(json_input['user']) elif user and not user.is_api_user: formdata.user_id = user.id formdata.store() formdata.just_created() formdata.store() formdata.refresh_from_storage() formdata.record_workflow_event('api-created') formdata.perform_workflow() formdata.store() return json.dumps( { 'err': 0, 'data': { 'id': str(formdata.id), 'url': formdata.get_url(), 'backoffice_url': formdata.get_url(backoffice=True), 'api_url': formdata.get_api_url(), }, } ) def import_csv(self): return self.import_file('csv') def import_json(self): return self.import_file('json') def import_file(self, file_format): if get_request().get_method() not in ('POST', 'PUT'): raise MethodNotAllowedError(allowed_methods=['POST', 'PUT']) get_request()._user = get_user_from_api_query_string() if not (get_request()._user and self.formdef.can_user_add_cards(get_request()._user)): raise AccessForbiddenError('cannot import cards') afterjob = bool(get_request().form.get('async') == 'on') do_update = bool(get_request().form.get('update') == 'on') get_response().set_content_type('application/json') try: if file_format == 'csv': if get_request().get_method() == 'POST': try: content = base64.decodebytes(force_bytes(get_request().json['file']['content'])) except (ValueError, KeyError): return json.dumps( {'err': 1, 'err_desc': 'Invalid format (must be {"file": {"content": base64}})'} ) else: content = get_request().stdin.read() job = self.import_csv_submit(content, afterjob=afterjob, api=True) elif file_format == 'json': job = self.import_json_submit( get_request().json, update_existing_cards=do_update, afterjob=afterjob, api=True ) except ValueError as e: return json.dumps({'err': 1, 'err_desc': str(e)}) if job is None: return json.dumps({'err': 0}) return json.dumps( { 'err': 0, 'data': { 'job': { 'id': str(job.id), 'url': get_publisher().get_frontoffice_url() + '/api/jobs/%s/' % job.id, } }, } ) class CardFileByTokenDirectory(Directory): def _q_lookup(self, component): get_request().ignore_session = True try: token = get_session().get_token('card-file-by-token', component) except KeyError: raise TraversalError() context = token.data carddef = CardDef.get_by_urlname(context['carddef_slug']) data = carddef.data_class().get(context['data_id']) for field_data in data.get_all_file_data(with_history=True): if not hasattr(field_data, 'file_digest'): continue if field_data.file_digest() == context['file_digest']: return FileDirectory.serve_file(field_data) raise TraversalError() class SignUrlTokenDirectory(Directory): def _q_lookup(self, component): get_request().ignore_session = True try: token = get_session().get_token('sign-url-token', component) except KeyError: raise TraversalError() return redirect(sign_url_auto_orig(token.data['url'])) class ApiFormsDirectory(Directory): _q_exports = ['', 'geojson'] def check_access(self): if not is_url_signed(): api_user = get_user_from_api_query_string() if api_user and api_user.is_api_user: # API users are ok return # grant access to admins, to ease debug if not (get_request().user and get_request().user.is_admin): raise AccessForbiddenError('user not authenticated') ignore_roles = get_query_flag('ignore-roles') if ignore_roles and not get_request().user.can_go_in_backoffice(): raise AccessForbiddenError('user not allowed to ignore roles') def _q_index(self): self.check_access() get_request()._user = get_user_from_api_query_string() or get_request().user if get_request().form.get('full') == 'on': raise RequestError('no such parameter "full"') if not (FormDef.exists()): # early return, this avoids running a query against a missing SQL view. get_response().set_content_type('application/json') return json.dumps({'data': []}, cls=misc.JSONEncoder) from wcs import sql management_directory = ManagementDirectory() criterias = management_directory.get_global_listing_criterias() if get_query_flag('ignore-roles'): roles_criterias = criterias criterias = management_directory.get_global_listing_criterias(ignore_user_roles=True) if not get_query_flag('include-anonymised', default=False): criterias.append(Null('anonymised')) related_filter = get_request().form.get('related') if related_filter: try: formdef_type, formdef_slug, formdata_id = related_filter.split(':') key = f'{formdef_type}:{formdef_slug}' criterias.append(ElementIntersects('relations_data', key, [formdata_id])) except ValueError: criterias.append(Nothing()) limit = misc.get_int_or_400( get_request().form.get('limit', get_publisher().get_site_option('default-page-size') or 20) ) offset = misc.get_int_or_400(get_request().form.get('offset', 0)) order_by = misc.get_order_by_or_400( get_request().form.get( 'order_by', get_publisher().get_site_option('default-sort-order') or '-receipt_time' ) ) formdatas = sql.AnyFormData.select(criterias, order_by=order_by, limit=limit, offset=offset) if get_query_flag('ignore-roles'): # When ignoring roles formdatas will be returned even if they are # not readable by the user, an additional attribute (readable) is # added to differentiate readable and non-readable formdatas. # # A full SQL query is run as it will benefit from cached # concerned_roles/action_roles. limited_formdatas = [ (x.formdef.id, x.id) for x in sql.AnyFormData.select( roles_criterias, order_by=order_by, limit=limit, offset=offset ) ] output = [] for formdata in formdatas: readable = bool((formdata.formdef.id, formdata.id) in limited_formdatas) if not readable and formdata.formdef.skip_from_360_view: continue formdata_dict = get_formdata_dict( formdata, user=get_request().user, consider_status_visibility=False ) formdata_dict['readable'] = readable output.append(formdata_dict) else: output = [ get_formdata_dict(x, user=get_request().user, consider_status_visibility=False) for x in formdatas ] get_response().set_content_type('application/json') return json.dumps({'data': output}, cls=misc.JSONEncoder) def geojson(self): self.check_access() get_request()._user = get_user_from_api_query_string() or get_request().user return ManagementDirectory().geojson() def _q_lookup(self, component): return ApiFormPage(component) class ApiCardsDirectory(Directory): _q_exports = [('@list', 'list')] def list(self): def get_custom_views(carddef): custom_views = [] for view in get_publisher().custom_view_class.select( clause=[StrictNotEqual('visibility', 'owner')] ): if view.match(user=None, formdef=carddef): custom_views.append({'id': view.slug, 'text': view.title}) custom_views.sort(key=lambda x: misc.simplify(x['text'])) return custom_views get_response().set_content_type('application/json') if not is_url_signed(): user = get_user_from_api_query_string() or get_request().user if not get_publisher().get_backoffice_root().is_global_accessible('cards', user=user): raise AccessForbiddenError('unsigned request or API user has no access to cards') carddefs = CardDef.select(order_by='name', ignore_errors=True, lightweight=True) data = [ { 'id': x.url_name, 'text': x.name, 'title': x.name, 'slug': x.url_name, 'url': x.get_url(), 'category_slug': x.category.url_name if x.category else None, 'category_name': x.category.name if x.category else None, 'description': x.description or '', 'keywords': x.keywords_list, 'custom_views': get_custom_views(x), } for x in carddefs ] return json.dumps({'data': data, 'err': 0}) def _q_lookup(self, component): return ApiCardPage(component) class ApiFormdefDirectory(Directory): _q_exports = ['schema', 'submit'] def __init__(self, formdef): self.formdef = formdef def schema(self): if is_url_signed() or self.formdef.has_admin_access(get_user_from_api_query_string()): get_response().set_content_type('application/json') return self.formdef.export_to_json() raise AccessForbiddenError() def submit(self): # expects json as input # { # "meta": { # "attr": "value" # }, # "data": { # "0": "value", # "1": "value", # ... # } # } get_response().set_content_type('application/json') if self.formdef.is_disabled(): raise AccessForbiddenError('disabled form') user = get_user_from_api_query_string() if user and user.is_api_user: pass # API users are ok json_input = get_request().json formdata = self.formdef.data_class()() if not isinstance(json_input, dict): raise RequestError('invalid payload') if 'data' in json_input: # the published API expects data in 'data'. data = json_input['data'] elif 'fields' in json_input: # but the API also supports data in 'fields', to match the json # output produded by wf/wscall.py. data = json_input['fields'] if 'workflow' in json_input and json_input['workflow'].get('fields'): # handle workflow fields, put them all in the same data dictionary. data.update(json_input['workflow']['fields']) if 'extra' in json_input: data.update(json_input['extra']) else: data = {} if not isinstance(data, dict): raise RequestError('invalid data parameter') formdata.data = posted_json_data_to_formdata_data(self.formdef, data) meta = json_input.get('meta') or {} if meta.get('backoffice-submission') or user and user.is_api_user: if not user: raise AccessForbiddenError('no user set for backoffice submission') if not self.formdef.backoffice_submission_roles: raise AccessForbiddenError('no backoffice submission roles') if not set(user.get_roles()).intersection(self.formdef.backoffice_submission_roles): raise AccessForbiddenError('not cleared for backoffice submit') formdata.backoffice_submission = True if not meta.get('backoffice-submission'): if 'user' in json_input: if not isinstance(json_input['user'], dict): raise RequestError('invalid user parameter') formdata.set_user_from_json(json_input['user']) elif user and not user.is_api_user: formdata.user_id = user.id if json_input.get('context'): formdata.submission_context = json_input['context'] formdata.submission_channel = formdata.submission_context.pop('channel', None) formdata.user_id = formdata.submission_context.pop('user_id', None) if self.formdef.only_allow_one and formdata.user_id: user_id = formdata.user_id user_forms = self.formdef.data_class().select([Equal('user_id', str(user_id))]) if [x for x in user_forms if not x.is_draft()]: raise AccessForbiddenError('only one formdata by user is allowed') if meta.get('backoffice-submission') and not user.is_api_user: # keep track of the agent that did the submit formdata.submission_agent_id = str(user.id) formdata.store() if self.formdef.enable_tracking_codes: code = get_publisher().tracking_code_class() code.formdata = formdata # this will .store() the code if meta.get('draft'): formdata.status = 'draft' formdata.receipt_time = localtime() formdata.store() else: formdata.just_created() formdata.store() formdata.refresh_from_storage() formdata.record_workflow_event('api-created') formdata.perform_workflow() formdata.store() return json.dumps( { 'err': 0, 'data': { 'id': str(formdata.id), 'url': formdata.get_url(), 'backoffice_url': formdata.get_url(backoffice=True), 'api_url': formdata.get_api_url(), }, } ) class ApiFormdefsDirectory(Directory): _q_exports = [''] def __init__(self, category=None): self.category = category def get_list_forms(self, user, list_all_forms=False, formdefs=None, backoffice_submission=False): list_forms = [] if not user and backoffice_submission: return list_forms if get_request().form.get('q'): from wcs import sql object_ids = sql.SearchableFormDef.search(FormDef.xml_root_node, get_request().form.get('q')) if formdefs is None: formdefs = FormDef.get_ids(object_ids, ignore_errors=True, lightweight=True) else: formdefs = [x for x in formdefs if str(x.id) in object_ids] elif formdefs is None: formdefs = FormDef.select(order_by='name', ignore_errors=True, lightweight=True) include_disabled = get_query_flag('include-disabled') category_slugs = (get_request().form.get('category_slugs') or '').split(',') category_slugs = [c.strip() for c in category_slugs if c.strip()] if not include_disabled: if backoffice_submission: formdefs = [x for x in formdefs if not x.is_disabled()] else: formdefs = [x for x in formdefs if not x.is_disabled() or x.disabled_redirection] if self.category: formdefs = [x for x in formdefs if str(x.category_id) == str(self.category.id)] elif category_slugs: formdefs = [x for x in formdefs if x.category and (x.category.url_name in category_slugs)] include_count = get_query_flag('include-count') for formdef in formdefs: authentication_required = False if formdef.roles and not list_all_forms and not backoffice_submission: if not user: if not formdef.always_advertise: continue authentication_required = True elif logged_users_role().id not in formdef.roles: for q in user.get_roles(): if q in formdef.roles: break else: if not formdef.always_advertise: continue authentication_required = True elif backoffice_submission: if not formdef.backoffice_submission_roles: continue for role in user.get_roles(): if role in formdef.backoffice_submission_roles: break else: continue elif formdef.roles and user is None and list_all_forms: # anonymous API call, mark authentication as required authentication_required = True formdict = { 'title': formdef.name, 'slug': formdef.url_name, 'url': formdef.get_url(), 'description': formdef.description or '', 'keywords': formdef.keywords_list, 'authentication_required': authentication_required, 'always_advertise': formdef.always_advertise, } if formdef.required_authentication_contexts: formdict['required_authentication_contexts'] = formdef.required_authentication_contexts if backoffice_submission: formdict['backoffice_submission_url'] = formdef.get_backoffice_submission_url() formdict['redirection'] = bool(formdef.is_disabled() and formdef.disabled_redirection) if include_count: # we include the count of submitted forms so it's possible to sort # them by "popularity" from wcs import sql # 4 * number of submitted forms of last 2 days # + 2 * number of submitted forms of last 8 days # + 1 * number of submitted forms of last 30 days # exclude drafts criterias = [Equal('formdef_id', formdef.id), StrictNotEqual('status', 'draft')] d_now = datetime.datetime.now() count = 4 * sql.get_period_total( period_start=d_now - datetime.timedelta(days=2), include_start=True, criterias=criterias, ) count += 2 * sql.get_period_total( period_start=d_now - datetime.timedelta(days=8), include_start=True, period_end=d_now - datetime.timedelta(days=2), include_end=False, criterias=criterias, ) count += sql.get_period_total( period_start=d_now - datetime.timedelta(days=30), include_start=True, period_end=d_now - datetime.timedelta(days=8), include_end=False, criterias=criterias, ) formdict['count'] = count formdict['functions'] = {} formdef_workflow_roles = formdef.workflow_roles or {} for wf_role_id, wf_role_label in formdef.workflow.roles.items(): workflow_function = {'label': wf_role_label} role_id = formdef_workflow_roles.get(wf_role_id) if role_id: try: workflow_function['role'] = ( get_publisher().role_class.get(role_id).get_json_export_dict() ) except KeyError: pass formdict['functions'][wf_role_id] = workflow_function if formdef.category: formdict['category'] = formdef.category.name formdict['category_slug'] = formdef.category.url_name list_forms.append(formdict) return list_forms def _q_index(self): try: user = get_user_from_api_query_string() except UnknownNameIdAccessForbiddenError: # if authenticating the user via the query string failed, return # results for the anonymous case; user is set to 'False' as a # signed URL with a None user is considered like an appropriate # webservice call. user = False url_signed = is_url_signed() if user and user.is_api_user: pass # API users are ok elif not url_signed: if not (get_request().user and get_request().user.is_admin): raise AccessForbiddenError('user not authenticated') user = get_request().user list_all_forms = (user and user.is_admin) or (url_signed and user is None) backoffice_submission = get_query_flag('backoffice-submission') list_forms = self.get_list_forms(user, list_all_forms, backoffice_submission=backoffice_submission) get_response().set_content_type('application/json') return json.dumps({'err': 0, 'data': list_forms}) def _q_lookup(self, component): try: formdef = FormDef.get_by_urlname(component) except KeyError: raise TraversalError() return ApiFormdefDirectory(formdef) class ApiCategoryDirectory(Directory): _q_exports = ['formdefs'] def __init__(self, category): self.category = category self.formdefs = ApiFormdefsDirectory(category) class ApiCategoriesDirectory(Directory): _q_exports = [''] def __init__(self): pass def _q_index(self): try: user = get_user_from_api_query_string() or get_request().user except UnknownNameIdAccessForbiddenError: # the name id was unknown, return the categories for anonymous # users. user = None list_all_forms = (user and user.is_admin) or (is_url_signed() and user is None) backoffice_submission = get_request().form.get('backoffice-submission') == 'on' list_categories = [] categories = Category.select() Category.sort_by_position(categories) all_formdefs = FormDef.select(order_by='name', ignore_errors=True, lightweight=True) for category in categories: d = {} d['title'] = category.name d['slug'] = category.url_name d['url'] = category.get_url() if category.description: d['description'] = str(category.get_description_html_text()) formdefs = ApiFormdefsDirectory(category).get_list_forms( user, formdefs=all_formdefs, list_all_forms=list_all_forms, backoffice_submission=backoffice_submission, ) if not formdefs: # don't advertise empty categories continue keywords = {} for formdef in formdefs: for keyword in formdef['keywords']: keywords[keyword] = True d['keywords'] = list(keywords.keys()) if get_request().form.get('full') == 'on': d['forms'] = formdefs list_categories.append(d) get_response().set_content_type('application/json') return json.dumps({'data': list_categories}) def _q_lookup(self, component): try: return ApiCategoryDirectory(Category.get_by_urlname(component)) except KeyError: raise TraversalError() class ApiUserDirectory(Directory): _q_exports = ['', 'forms', 'drafts', 'preferences'] def __init__(self, user=None): self.user = user def _q_index(self): get_response().set_content_type('application/json') user = self.user or get_user_from_api_query_string() or get_request().user if not user: raise AccessForbiddenError('no user specified') if user.is_api_user: raise AccessForbiddenError('restricted API access') user_info = user.get_substitution_variables(prefix='') del user_info['user'] user_info['id'] = user.id user_roles = [get_publisher().role_class.get(x, ignore_errors=True) for x in user.roles or []] user_info['user_roles'] = [x.get_json_export_dict() for x in user_roles if x] return json.dumps(user_info, cls=misc.JSONEncoder) def get_user_forms(self, user, include_drafts=False, include_non_drafts=True): if not (FormDef.exists()): # early return, this avoids running a query against a missing SQL view. return [] category_slugs = (get_request().form.get('category_slugs') or '').split(',') category_slugs = [c.strip() for c in category_slugs if c.strip()] if category_slugs: categories = Category.select([st.Contains('url_name', category_slugs)]) from wcs import sql order_by = 'receipt_time' if get_request().form.get('sort') == 'desc': order_by = '-receipt_time' if get_query_flag('include-accessible'): user_roles = user.get_roles() criterias = [ Or( [ Intersects('concerned_roles_array', user_roles), Equal('user_id', str(user.id)), ] ) ] else: criterias = [Equal('user_id', str(user.id))] if category_slugs: criterias.append(Contains('category_id', [c.id for c in categories])) status_criteria = get_request().form.get('status') or 'all' if status_criteria == 'open': criterias.append(Equal('is_at_endpoint', False)) elif status_criteria == 'done': criterias.append(Equal('is_at_endpoint', True)) elif status_criteria == 'all': pass else: return HttpResponseBadRequest('invalid status parameter value') related_filter = get_request().form.get('related') if related_filter: try: formdef_type, formdef_slug, formdata_id = related_filter.split(':') key = f'{formdef_type}:{formdef_slug}' criterias.append(ElementIntersects('relations_data', key, [formdata_id])) except ValueError: criterias.append(Nothing()) if include_drafts: disabled_formdef_ids = [formdef.id for formdef in FormDef.select() if formdef.is_disabled()] if disabled_formdef_ids: criterias.append( Or( [ StrictNotEqual('status', 'draft'), NotContains('formdef_id', disabled_formdef_ids), ] ) ) else: criterias.append(StrictNotEqual('status', 'draft')) if not include_non_drafts: criterias.append(Equal('status', 'draft')) user_forms = sql.AnyFormData.select( criterias, limit=misc.get_int_or_400(get_request().form.get('limit')), offset=misc.get_int_or_400(get_request().form.get('offset')), order_by=order_by, ) if get_request().form.get('full') == 'on': # load full objects formdefs = {x.formdef_id: x.formdef for x in user_forms} formdef_user_forms = {} for formdef_id, formdef in formdefs.items(): formdef_user_forms.update( { (formdef_id, x.id): x for x in formdef.data_class().select( [Contains('id', [x.id for x in user_forms if x.formdef_id == formdef_id])] ) } ) # and put them back in order sorted_user_forms_tuples = [(x.formdef_id, x.id) for x in user_forms] user_forms = [formdef_user_forms.get(x) for x in sorted_user_forms_tuples] else: # prefetch evolutions to avoid individual loads when computing # formdata.get_visible_status(). sql.AnyFormData.load_all_evolutions(user_forms) return user_forms def drafts(self): return self.forms(include_drafts=True, include_non_drafts=False) def forms(self, include_drafts=False, include_non_drafts=True): include_drafts = include_drafts or get_query_flag('include-drafts') get_response().set_content_type('application/json') try: user = self.user or get_user_from_api_query_string() or get_request().user except UnknownNameIdAccessForbiddenError: return json.dumps({'err': 1, 'err_desc': 'unknown NameID', 'data': []}) if not user: return json.dumps({'err': 1, 'err_desc': 'no user specified', 'data': []}) if user.is_api_user: raise AccessForbiddenError('restricted API access') query_user = get_user_from_api_query_string() or get_request().user if query_user and query_user.is_api_user and query_user.api_access.restrict_to_anonymised_data: raise AccessForbiddenError('restricted API access') forms = self.get_user_forms( user, include_drafts=include_drafts, include_non_drafts=include_non_drafts ) if self.user: # call to /api/users//forms, this returns the forms of the # given user filtered according to the permissions of the caller # (from query string or session). if query_user and query_user.id != self.user.id: if not query_user.is_api_user and not query_user.can_go_in_backoffice(): raise AccessForbiddenError('user not allowed to query data from others') # mark forms that are readable by querying user user_roles = set(query_user.get_roles()) # use concerned_roles_array attribute that was saved in the # table. for form in forms: form.readable = bool(set(form.concerned_roles_array).intersection(user_roles)) # ignore confidential forms forms = [x for x in forms if x.readable or not x.formdef.skip_from_360_view] result = [] for form in forms: if form.is_draft(): if not include_drafts: continue if form.formdef.is_disabled(): # the form or its draft support has been disabled continue elif not include_non_drafts: continue formdata_dict = get_formdata_dict(form, user) if not formdata_dict: # skip hidden forms continue formdata_dict['readable'] = getattr(form, 'readable', True) result.append(formdata_dict) return json.dumps({'err': 0, 'data': result}, cls=misc.JSONEncoder) def preferences(self): if get_request().get_method() != 'POST': raise MethodNotAllowedError(allowed_methods=['POST']) get_response().set_content_type('application/json') user = self.user or get_request().user if not user: raise AccessForbiddenError('user not authenticated') if int(get_request().environ.get('CONTENT_LENGTH')) > 1000: # protect against storing "huge" blobs raise RequestError('too much data') user.update_preferences(get_request().json) return json.dumps({'err': 0}) class ApiUsersDirectory(Directory): _q_exports = [''] def can_create_cards(self, user): for role_id in user.roles or []: ids = CardDef.get_ids_with_indexed_value('backoffice_submission_roles', role_id) if ids: return True return False def _q_index(self): get_response().set_content_type('application/json') if not ( is_url_signed() or ( get_request().user and ( get_request().user.can_go_in_admin() or SubmissionDirectory().is_accessible(get_request().user) or self.can_create_cards(get_request().user) ) ) ): # request must be signed, or user must be an administrator or # allowed to submit forms (as they have a form to select an user). raise AccessForbiddenError('unsigned request or user has no access to backoffice') api_user = get_user_from_api_query_string() if api_user and api_user.is_api_user: raise AccessForbiddenError('restricted API access') criterias = [Null('deleted_timestamp')] query = get_request().form.get('q') if query: formdef = UserFieldsFormDef() criteria_fields = [ ILike('name', query), ILike('ascii_name', misc.simplify(query, ' ')), ILike('email', query), ] for field in formdef.fields: if field.key in ('string', 'text', 'email'): criteria_fields.append(ILike('f%s' % field.id, query)) criteria_fields.append(FtsMatch(query)) criterias.append(Or(criteria_fields)) roles = get_request().form.get('roles') if roles: criterias.append(Intersects('roles', roles.split(','))) def as_dict(user): users_cfg = get_cfg('users', {}) template = ( users_cfg.get('search_result_template') or get_publisher().user_class.default_search_result_template ) user_info = user.get_substitution_variables(prefix='') del user_info['user'] user_info['user_id'] = user.id user_roles = [get_publisher().role_class.get(x, ignore_errors=True) for x in user.roles or []] user_info['user_roles'] = [x.get_json_export_dict() for x in user_roles if x] # add attributes to be usable as datasource user_info['id'] = user.id user_info['text'] = user_info['user_display_name'] if Template.is_template_string(template): try: user_info['description'] = Template(template).render(user_info) except TemplateError: pass return user_info limit = misc.get_int_or_400(get_request().form.get('limit')) users = get_publisher().user_class.select(order_by='name', clause=criterias, limit=limit) data = [as_dict(x) for x in users] return json.dumps({'data': data, 'err': 0}, cls=misc.JSONEncoder) def _q_lookup(self, component): api_user = get_user_from_api_query_string() if api_user and api_user.is_api_user: # API users are ok except if they are restricted to anonymised data if api_user.api_access.restrict_to_anonymised_data: raise AccessForbiddenError('restricted API access') elif not (is_url_signed() or (get_request().user and get_request().user.can_go_in_admin())): raise AccessForbiddenError('unsigned request or user has no access to backoffice') user_class = get_publisher().user_class try: int(component) # makes sure this is an id user = user_class.get(component) except (KeyError, ValueError): try: user = user_class.get_users_with_name_identifier(component)[0] except IndexError: raise TraversalError() return ApiUserDirectory(user) class ApiTrackingCodeDirectory(Directory): def _q_lookup(self, component): # /api/code/$code # * allows signed requests # * allows HTTP basic auth requests with API user with no role restriction get_response().set_content_type('application/json') user = get_user_from_api_query_string() if not (user and user.is_api_user and not user.roles): # HTTP auth if not is_url_signed(): # signed request raise AccessForbiddenError('missing signature') try: tracking_code = get_publisher().tracking_code_class.get(component) except KeyError: raise TraversalError() try: formdata = tracking_code.formdata except KeyError: raise TraversalError() if formdata.formdef.enable_tracking_codes is False: raise TraversalError() # return load_url with a temporary access URL so the caller can directly # redirect the user to the formdata. data = { 'err': 0, 'url': formdata.get_url(backoffice=get_query_flag('backoffice')), 'load_url': formdata.get_temporary_access_url( duration=300, backoffice=get_query_flag('backoffice') ), } return json.dumps(data) class AutocompleteDirectory(Directory): def _q_lookup(self, component): get_request().ignore_session = True try: autocomplete_context = get_session().get_token('autocomplete', component) if autocomplete_context.data.get('url') == '': # this is a datasource without a json url # (typically a Python source) raise KeyError() except KeyError: raise AccessForbiddenError() get_response().set_content_type('application/json') info = autocomplete_context.data if 'url' in info: named_data_source = None cache_duration = 0 if info.get('data_source'): named_data_source = NamedDataSource.get(info['data_source']) if named_data_source.cache_duration: cache_duration = int(named_data_source.cache_duration) url = info['url'] url += urllib.parse.quote(get_request().form.get('q', '')) get_response().set_content_type('application/json') entries = request_json_items( url, named_data_source and named_data_source.extended_data_source, cache_duration=cache_duration, ) if entries is not None: return json.dumps({'err': 0, 'data': entries}) return json.dumps({'err': 1, 'data': []}) # carddef_ref in info carddef_ref = info['carddef_ref'] custom_view = None if 'dynamic_custom_view' in info: custom_view = get_publisher().custom_view_class.get(info['dynamic_custom_view']) custom_view.filters = info['dynamic_custom_view_filters'] query = get_request().form.get('q', '') limit = misc.get_int_or_400(get_request().form.get('page_limit')) with_related = info.get('with_related') with_related_urls = query and limit and with_related values = CardDef.get_data_source_items( carddef_ref, custom_view=custom_view, query=query, limit=limit, with_related_urls=with_related_urls, ) keys = ['id', 'text'] if with_related_urls: keys += ['edit_related_url', 'view_related_url'] return json.dumps({'data': [{key: x.get(key, '') for key in keys} for x in values]}) class GeoJsonDirectory(Directory): def _q_lookup(self, component): url = None try: data_source = get_data_source_object({'type': component}, ignore_errors=False) except KeyError: try: context = get_session().get_token('geojson', component) except KeyError: raise TraversalError() info = context.data try: data_source = get_data_source_object({'type': info['slug']}, ignore_errors=False) except KeyError: raise TraversalError() url = info['url'] get_response().set_content_type('application/json') return json.dumps(data_source.get_geojson_data(force_url=url)) class AfterJobDirectory(Directory): _q_exports = [''] def __init__(self, afterjob): self.afterjob = afterjob def _q_index(self): get_response().set_content_type('application/json') return json.dumps( { 'err': 0, 'data': { 'status': self.afterjob.status, 'label': self.afterjob.label, 'creation_time': self.afterjob.creation_time, 'completion_time': self.afterjob.completion_time, 'completion_status': self.afterjob.get_completion_status(), }, }, cls=misc.JSONEncoder, ) class AfterJobsDirectory(Directory): _q_exports = [] def _q_lookup(self, component): api_user = get_user_from_api_query_string() if api_user and api_user.is_api_user: pass # API users are ok elif not (is_url_signed() or (get_request().user and get_request().user.is_admin)): raise AccessForbiddenError('unsigned request or user is not admin') try: afterjob = AfterJob.get(component, ignore_errors=False) except KeyError: raise TraversalError() return AfterJobDirectory(afterjob) class ApiDirectory(Directory): _q_exports = [ 'forms', 'roles', ('reverse-geocoding', 'reverse_geocoding'), 'formdefs', 'categories', 'user', 'users', 'code', 'autocomplete', 'cards', 'geojson', 'jobs', ('card-file-by-token', 'card_file_by_token'), ('preview-payload-structure', 'preview_payload_structure'), ('sign-url-token', 'sign_url_token'), ] cards = ApiCardsDirectory() forms = ApiFormsDirectory() formdefs = ApiFormdefsDirectory() categories = ApiCategoriesDirectory() user = ApiUserDirectory() users = ApiUsersDirectory() code = ApiTrackingCodeDirectory() autocomplete = AutocompleteDirectory() geojson = GeoJsonDirectory() jobs = AfterJobsDirectory() card_file_by_token = CardFileByTokenDirectory() sign_url_token = SignUrlTokenDirectory() def roles(self): get_response().set_content_type('application/json') if not (is_url_signed() or (get_request().user and get_request().user.can_go_in_admin())): raise AccessForbiddenError('unsigned request or user has no access to backoffice') list_roles = [] for role in get_publisher().role_class.select(): if not role.is_internal(): list_roles.append(role.get_json_export_dict()) get_response().set_content_type('application/json') return json.dumps({'err': 0, 'data': list_roles}) def preview_payload_structure(self): if not (get_request().user and get_request().user.can_go_in_admin()): raise AccessForbiddenError('user has no access to backoffice') def parse_payload(): payload = {} for param, value in get_request().form.items(): # skip elements which are not part of payload if 'post_data$element' not in param or param.endswith('value_python'): continue prefix, order, field = re.split(r'(\d+)(?!\d)', param) # noqa pylint: disable=unused-variable # skip elements that aren't ordered if not order: continue if order not in payload: payload[order] = [] if field == 'key': # skip empty keys if not value: continue # insert key on first position payload[order].insert(0, value) else: payload[order].append(value) return dict([v for v in payload.values() if len(v) > 1]) def format_payload(o, html=htmltext(''), last_element=True): if isinstance(o, (list, tuple)): html += htmltext('[') while True: try: head, tail = o[0], o[1:] except IndexError: break html = format_payload(head, html=html, last_element=len(tail) < 1) o = tail html += htmltext(']') elif isinstance(o, dict): html += htmltext('{') for i, (k, v) in enumerate(o.items()): html += htmltext('"%s": ' % k) html = format_payload(v, html=html, last_element=i == len(o) - 1) html += htmltext('}') else: # check if it's empty string, a template with text around or just text if not o or re.sub('^({[{|%]).+([%|}]})$', '', o): # and add double quotes html += htmltext('"%s"' % o) else: html += htmltext('%s' % o) # last element doesn't need separator if not last_element: html += htmltext(',') return html payload = parse_payload() r = TemplateIO(html=True) r += htmltext('

%s

') % _('Payload structure preview') r += htmltext('
') try: unflattened_payload = unflatten_keys(payload) r += htmltext('
') r += format_payload(unflattened_payload) r += htmltext('
') except UnflattenKeysException as e: r += htmltext('

%s

%s %s

') % ( _('Unable to preview payload.'), _('Following error occured: '), e, ) r += htmltext('
') return r.getvalue() def _q_traverse(self, path): get_request().is_json_marker = True return super()._q_traverse(path) def reverse_geocoding(request, *args, **kwargs): if not ('lat' in request.GET and 'lon' in request.GET): return HttpResponseBadRequest() lat = request.GET['lat'] lon = request.GET['lon'] return HttpResponse(misc.get_reverse_geocoding_data(lat, lon), content_type='application/json') def geocoding(request, *args, **kwargs): if 'q' not in request.GET: return HttpResponseBadRequest() q = request.GET['q'] url = get_publisher().get_geocoding_service_url() if '?' in url: url += '&' else: url += '?' url += 'format=json&q=%s' % urllib.parse.quote(q.encode('utf-8')) url += '&accept-language=%s' % (get_publisher().get_site_language() or 'en') return HttpResponse(misc.urlopen(url).read(), content_type='application/json') def validate_condition(request, *args, **kwargs): condition = {} condition['type'] = request.GET.get('type') or '' condition['value'] = request.GET.get('value_' + condition['type']) or '' hint = {'msg': ''} try: Condition(condition).validate() except ValidationError as e: hint['msg'] = str(e) else: if request.GET.get('warn-on-datetime') == 'true' and condition['type'] == 'django': variables = re.compile(r'\b(today|now)\b') filters = re.compile(r'\|age_in_(years|months|days|hours)') if variables.search(condition['value']) or filters.search(condition['value']): hint['msg'] = _( 'Warning: conditions are only evaluated when entering the action, ' 'you may need to set a timeout if you want it to be evaluated regularly.' ) return JsonResponse(hint) class ProvisionAfterJob(AfterJob): def __init__(self, json_data, **kwargs): super().__init__(**kwargs) self.json_data = json_data def execute(self): CmdHoboNotify().process_notification(self.json_data) def provisionning(request): if not is_url_signed(): raise AccessForbiddenError() sync = request.GET.get('sync') == '1' if sync: CmdHoboNotify().process_notification(get_request().json) else: job = ProvisionAfterJob(json_data=get_request().json) job.run(spool=True) return JsonResponse({'err': 0})