wcs/wcs/forms/common.py

734 lines
29 KiB
Python

# w.c.s. - web application for online forms
# Copyright (C) 2005-2010 Entr'ouvert
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import sys
import time
from quixote import get_publisher, get_request, get_response, get_session, redirect
from quixote.directory import Directory
from quixote.html import TemplateIO, htmltext
from wcs import data_sources
from wcs.api_utils import get_user_from_api_query_string, is_url_signed
from wcs.fields import WidgetField, FileField
from wcs.workflows import EditableWorkflowStatusItem
from ..qommon import _
from ..qommon import misc
from ..qommon import template
from ..qommon import get_logger
from ..qommon.form import *
from wcs.qommon.admin.texts import TextsDirectory
from ..qommon import errors
class FileDirectory(Directory):
_q_exports = []
_lookup_methods = ['lookup_file_field']
thumbnails = False
def __init__(self, formdata, reference):
self.formdata = formdata
self.reference = reference
def lookup_file_field(self, filename):
if self.reference in self.formdata.data:
return self.formdata.data[self.reference]
def _q_lookup(self, component):
if component == 'thumbnail':
self.thumbnails = True
return self
upload = None
for lookup_method_name in self._lookup_methods:
lookup_method = getattr(self, lookup_method_name)
file = lookup_method(filename=component)
if file:
break
else:
# no such file
raise errors.TraversalError()
if component and component != file.base_filename:
raise errors.TraversalError()
response = get_response()
if file.content_type:
response.set_content_type(file.content_type)
else:
response.set_content_type('application/octet-stream')
if file.charset:
response.set_charset(file.charset)
if file.base_filename:
content_disposition = 'attachment'
if file.content_type.startswith('image/') and not file.content_type.startswith('image/svg'):
content_disposition = 'inline'
elif file.content_type == 'application/pdf':
content_disposition = 'inline'
response.set_header('content-disposition',
'%s; filename="%s"' % (content_disposition, file.base_filename))
if self.thumbnails and misc.can_thumbnail(file.content_type):
try:
thumbnail = misc.get_thumbnail(file.get_filename(),
content_type=file.content_type)
response.set_content_type('image/png')
return thumbnail
except misc.ThumbnailError:
pass
return file.get_file_pointer().read()
class FilesDirectory(Directory):
def __init__(self, formdata):
self.formdata = formdata
def _q_lookup(self, component):
return FileDirectory(self.formdata, reference=component)
class FormTemplateMixin(object):
def get_formdef_template_variants(self, template_names):
template_part_names = [(os.path.dirname(x), os.path.basename(x)) for x in template_names]
for dirname, basename in template_part_names:
for keyword in self.formdef.appearance_keywords_list:
yield os.path.join(dirname, 'appearance-' + keyword, basename)
if self.formdef.category_id:
yield os.path.join(dirname, 'category-' + self.formdef.category.url_name, basename)
yield os.path.join(dirname, basename)
class FormStatusPage(Directory, FormTemplateMixin):
_q_exports_orig = ['', 'download', 'json', 'action', 'live']
_q_extra_exports = []
form_page_class = None
do_not_call_in_templates = True
history_templates = ['wcs/formdata_history.html']
status_templates = ['wcs/formdata_status.html']
def html_top(self, title = None):
template.html_top(title = title, default_org = _('Forms'))
def __init__(self, formdef, filled, register_workflow_subdirs=True):
get_publisher().substitutions.feed(filled)
self.formdef = formdef
self.formdata = filled
self.filled = filled
self._q_exports = self._q_exports_orig[:]
for q in self._q_extra_exports:
if not q in self._q_exports:
self._q_exports.append(q)
if self.formdef.workflow and register_workflow_subdirs:
for name, directory in self.formdef.workflow.get_subdirectories(self.filled):
self._q_exports.append(name)
setattr(self, name, directory)
def check_auth(self, api_call=False):
session = get_session()
mine = False
if api_call:
if 'anonymise' in get_request().form:
if is_url_signed() or (get_request().user and get_request().user.is_admin):
return None
else:
raise errors.AccessUnauthorizedError()
else:
user = get_user_from_api_query_string() or get_request().user
else:
user = get_request().user
if user and not user.anonymous:
if self.filled.is_submitter(user):
mine = True
else:
if session and session.is_anonymous_submitter(self.filled):
mine = True
self.check_receiver()
return mine
def json(self):
self.check_auth(api_call=True)
anonymise = 'anonymise' in get_request().form
return self.export_to_json(anonymise=anonymise)
def workflow_messages(self, position='top'):
if self.formdef.workflow:
workflow_messages = self.filled.get_workflow_messages(position=position)
if workflow_messages:
r = TemplateIO(html=True)
if position == 'top':
r += htmltext('<div id="receipt-intro" class="workflow-messages %s">' % position)
else:
r += htmltext('<div class="workflow-messages %s">' % position)
for workflow_message in workflow_messages:
if workflow_message.startswith('<'):
r += htmltext(workflow_message)
else:
r += htmltext('<p>%s</p>' % workflow_message)
r += htmltext('</div>')
return r.getvalue()
return ''
def actions_workflow_messages(self):
return self.workflow_messages(position='actions')
def bottom_workflow_messages(self):
return self.workflow_messages(position='bottom')
def recorded_message(self):
r = TemplateIO(html=True)
# behaviour if workflow doesn't display any message
if self.filled.receipt_time is not None:
tm = misc.localstrftime(self.filled.receipt_time)
else:
tm = '???'
if self.formdef.only_allow_one:
r += TextsDirectory.get_html_text('form-recorded-allow-one', vars={'date': tm})
else:
r += TextsDirectory.get_html_text('form-recorded',
vars={'date': tm, 'number': self.filled.get_display_id()})
return r.getvalue()
def get_handling_role_info_text(self):
handling_role = self.filled.get_handling_role()
if not (handling_role and handling_role.details):
return ''
r = TemplateIO(html=True)
endpoint_status = self.formdef.workflow.get_endpoint_status()
r += htmltext('<p>')
if self.filled.status in [x.id for x in endpoint_status]:
r += _('Your case has been handled by:')
else:
r += _('Your case is handled by:')
r += htmltext('</p>')
r += htmltext('<p id="receiver">')
r += htmltext(handling_role.details.replace(str('\n'), str('<br />')))
r += htmltext('</p>')
return r.getvalue()
def _q_index(self):
mine = self.check_auth()
if not mine and not get_request().is_in_backoffice():
# Access authorized but the form doesn't belong to the user; if the
# user has access to the backoffice, redirect.
# Unless ?debug=whatever is set.
if get_request().user.can_go_in_backoffice() and not get_request().form.get('debug'):
return redirect(self.filled.get_url(backoffice=True))
get_request().view_name = 'status'
get_logger().info('form %s - id: %s - view' % (self.formdef.name, self.filled.id))
user = get_request().user
form = self.get_workflow_form(user)
response = self.check_submitted_form(form)
if response:
return response
if form:
form.add_media()
get_response().add_javascript(['jquery.js', 'qommon.forms.js', 'qommon.map.js'])
self.html_top(self.formdef.name)
context = {
'view': self,
'mine': mine,
'formdata': self.filled,
'workflow_form': form,
}
return template.QommonTemplateResponse(
templates=list(self.get_formdef_template_variants(self.status_templates)),
context=context)
def get_workflow_form(self, user):
submitted_fields = []
form = self.filled.get_workflow_form(user, displayed_fields=submitted_fields)
if form and form.is_submitted():
with get_publisher().substitutions.temporary_feed(self.filled, force_mode='lazy'):
# remove fields that could be required but are not visible
self.filled.evaluate_live_workflow_form(user, form)
get_publisher().substitutions.invalidate_cache()
get_publisher().substitutions.feed(self.filled)
# recreate form to get live data source items
form = self.filled.get_workflow_form(user, displayed_fields=submitted_fields)
for field in submitted_fields:
if not field.is_visible(self.filled.data, self.formdef) and 'f%s' % field.id in form._names:
del form._names['f%s' % field.id]
if form:
form.attrs['data-live-url'] = self.filled.get_url() + 'live'
return form
def check_submitted_form(self, form):
if form and form.is_submitted() and not form.has_errors():
url = self.submit(form)
if url is None:
url = get_request().get_frontoffice_url()
response = get_response()
response.set_status(303)
response.headers[str('location')] = url
response.content_type = 'text/plain'
return "Your browser should redirect you"
def export_to_json(self, anonymise=False):
get_response().set_content_type('application/json')
return self.filled.export_to_json(anonymise=anonymise)
def history(self):
if not self.filled.evolution:
return
if not self.formdef.is_user_allowed_read_status_and_history(get_request().user, self.filled):
return
include_authors_in_form_history = get_publisher().get_site_option('include_authors_in_form_history', 'variables') != 'False'
include_authors = get_request().is_in_backoffice() or include_authors_in_form_history
return template.render(
list(self.get_formdef_template_variants(self.history_templates)),
{
'formdata': self.filled,
'include_authors': include_authors,
'view': self,
})
def check_receiver(self):
session = get_session()
if not session or not session.user:
if not self.filled.formdef.is_user_allowed_read(None, self.filled):
raise errors.AccessUnauthorizedError()
user = get_request().user
if self.filled.formdef is None:
raise errors.AccessForbiddenError()
if not self.filled.formdef.is_user_allowed_read(user, self.filled):
raise errors.AccessForbiddenError()
return user
def should_fold_summary(self, mine, request_user):
# fold the summary if the form has already been seen by the user, i.e.
# if it's user own form or if the user is present in the formdata log
# (evolution).
if mine or (request_user and self.filled.is_submitter(request_user)):
return True
elif request_user and self.filled.evolution:
for evo in self.filled.evolution:
if (str(evo.who) == str(request_user.id) or
(evo.who == '_submitter' and self.filled.is_submitter(request_user))):
return True
return False
def should_fold_history(self):
return False
def receipt(self, always_include_user=False, show_status=True, form_url='', mine=True):
request_user = user = get_request().user
if not always_include_user and get_request().user and \
get_request().user.id == self.filled.user_id:
user = None
else:
try:
user = get_publisher().user_class.get(self.filled.user_id)
except KeyError:
user = None
# this is custom code so it is possible to mark forms as anonyms, this
# is done through the VoteAnonymity field, this is very specific but
# isn't generalised yet into an useful extension mechanism, as it's not
# clear at the moment what could be useful.
for f in self.formdef.fields:
if f.key == 'vote-anonymity':
user = None
break
r = TemplateIO(html=True)
klasses = 'foldable'
if self.should_fold_summary(mine, request_user):
klasses += ' folded'
r += htmltext('<div class="section %s" id="summary">' % klasses)
r += htmltext('<h2>%s</h2>') % _('Summary')
r += htmltext('<div class="dataview">')
if user:
r += htmltext('<div class="field username"><span class="label">%s</span>') % _('User name')
r += htmltext('<span class="value">%s</span></div>') % user.display_name
r += self.display_fields(form_url=form_url)
if show_status and self.formdef.is_user_allowed_read_status_and_history(
get_request().user, self.filled):
wf_status = self.filled.get_visible_status()
if wf_status:
r += htmltext('<div class="field status"><span class="label">%s</span> ') % _('Status')
r += htmltext('<span class="value">%s</span></div>') % wf_status.name
r += htmltext('</div>') # .dataview
r += htmltext('</div>') # .section
return r.getvalue()
def display_fields(self, fields=None, form_url='', include_unset_required_fields=False):
import wcs.workflows
if fields is None:
fields = self.formdef.fields
r = TemplateIO(html=True)
on_page = False
on_disabled_page = False
pages = []
current_page_fields = []
def get_value(f):
if not self.filled.data.has_key(f.id):
value = None
else:
if f.store_display_value and ('%s_display' % f.id) in self.filled.data:
value = self.filled.data['%s_display' % f.id]
else:
value = self.filled.data[f.id]
if value is None or value == '':
value = None
return value
for i, f in enumerate(fields):
if f.type == 'page':
on_page = f
current_page_fields = []
pages.append({'page': f, 'fields': current_page_fields})
continue
if f.type == 'title' and on_page and not current_page_fields and on_page.label == f.label:
# don't include first title of a page if that title has the
# same text as the page.
continue
if f.type in ('title', 'subtitle', 'comment') and f.include_in_summary_page:
current_page_fields.append({'field': f})
continue
if not hasattr(f, 'get_view_value'):
continue
if not f.include_in_summary_page:
continue
value = get_value(f)
if value is None and not (f.required and include_unset_required_fields):
continue
current_page_fields.append({'field': f, 'value': value})
if not pages:
fields = [x['field'] for x in current_page_fields]
else:
# ignore empty pages
fields = []
for page in pages:
if not any([x.has_key('value') for x in page['fields']]):
continue
fields.append(page['page'])
fields.extend([x['field'] for x in page['fields']])
on_page = None
for f in fields:
if f.type == 'page':
if on_page:
r += htmltext('</div>')
r += htmltext('</div>')
r += htmltext('<div class="page">')
r += htmltext('<h3>%s</h3>') % f.label
r += htmltext('<div>')
on_page = f
continue
if f.type == 'title':
label = wcs.workflows.template_on_formdata(None, f.label, autoescape=False)
r += htmltext('<div class="title %s"><h3>%s</h3></div>') % (f.extra_css_class or '', label)
continue
if f.type == 'subtitle':
label = wcs.workflows.template_on_formdata(None, f.label, autoescape=False)
r += htmltext('<div class="subtitle %s"><h4>%s</h4></div>') % (f.extra_css_class or '', label)
continue
if f.type == 'comment':
r += htmltext('<div class="comment-field %s">%s</div>' % (f.extra_css_class or '', f.get_text()))
continue
css_classes = ['field', 'field-type-%s' % f.key]
if f.extra_css_class:
css_classes.append(f.extra_css_class)
r += htmltext('<div class="%s">' % ' '.join(css_classes))
r += htmltext('<span class="label">%s</span> ') % f.label
value = get_value(f)
if value is None:
r += htmltext('<div class="value"><i>%s</i></div>') % _('Not set')
else:
r += htmltext('<div class="value">')
s = f.get_view_value(value)
s = s.replace(str('[download]'), str('%sdownload' % form_url))
r += s
r += htmltext('</div>')
r += htmltext('</div>')
if on_page:
r += htmltext('</div></div>')
return r.getvalue()
def backoffice_fields_section(self):
backoffice_fields = self.formdef.workflow.get_backoffice_fields()
if not backoffice_fields:
return
content = self.display_fields(backoffice_fields,
include_unset_required_fields=True)
if not len(content):
return
r = TemplateIO(html=True)
r += htmltext('<div class="section foldable">')
r += htmltext('<h2>%s</h2>') % _('Backoffice Data')
r += htmltext('<div class="dataview">')
r += content
r += htmltext('</div>')
r += htmltext('</div>')
return r.getvalue()
def status(self):
object_key = 'formdata-%s-%s' % (self.formdef.url_name, self.filled.id)
if get_request().get_query() == 'unlock':
# mark user as active visitor of the object, then redirect to self,
# the unlocked form will appear.
self.filled.mark_as_being_visited()
return redirect('./#lock-notice')
user = self.check_receiver()
form = self.get_workflow_form(user)
response = self.check_submitted_form(form)
if response:
get_session().unmark_visited_object(object_key)
return response
get_logger().info('form %s - id: %s - view status' % (self.formdef.name, self.filled.id))
get_response().add_javascript(['jquery.js', 'qommon.forms.js'])
self.html_top('%s - %s' % (self.formdef.name, self.filled.id))
r = TemplateIO(html=True)
r += htmltext(self.workflow_messages())
r += self.receipt(always_include_user=True, mine=False)
r += self.backoffice_fields_section()
r += self.history()
r += htmltext(self.bottom_workflow_messages())
locked = False
if form:
all_visitors = get_publisher().get_object_visitors(object_key)
visitors = [x for x in all_visitors if x[0] != get_session().user]
me_in_visitors = bool(get_session().user in [x[0] for x in all_visitors])
if visitors:
current_timestamp = time.time()
visitor_users = []
for visitor_id, visitor_timestamp in visitors:
try:
visitor_name = get_publisher().user_class.get(visitor_id).display_name
except KeyError:
continue
minutes_ago = int((current_timestamp - visitor_timestamp) / 60)
if minutes_ago < 1:
time_ago = _('less than a minute ago')
else:
time_ago = _('less than %s minutes ago') % (minutes_ago + 1)
visitor_users.append('%s (%s)' % (visitor_name, time_ago))
if visitor_users:
r += htmltext('<div id="lock-notice" class="infonotice"><p>')
r += _('Be warned forms of this user are also being looked at by: '
'%s.') % ', '.join(visitor_users)
r += ' '
r += htmltext('</p>')
if not me_in_visitors:
locked = True
r += htmltext('<p class="action"><a href="?unlock">%s</a></p>'
) % _('(unlock actions)')
r += htmltext('</div>')
if not visitors or me_in_visitors:
r += htmltext(self.actions_workflow_messages())
r += form.render()
self.filled.mark_as_being_visited()
if not locked:
if (self.filled.get_status() and self.filled.get_status().backoffice_info_text) or (
form and any((getattr(button, 'backoffice_info_text', None)
for button in form.get_submit_widgets()))):
r += htmltext('<div class="backoffice-description bo-block">')
if self.filled.get_status().backoffice_info_text:
r += htmltext(self.filled.get_status().backoffice_info_text)
if form:
for button in form.get_submit_widgets():
if not getattr(button, 'backoffice_info_text', None):
continue
r += htmltext('<div class="action-info-text" data-button-name="%s">' % button.name)
r += htmltext(button.backoffice_info_text)
r += htmltext('</div>')
r += htmltext('</div>')
r += htmltext('<a href="..">%s</a>') % _('Back to Listing')
return r.getvalue()
def submit(self, form):
current_status = self.filled.status
user = get_request().user
next_url = self.filled.handle_workflow_form(user, form)
if next_url:
return next_url
if form.has_errors():
return
if current_status != self.filled.status:
get_logger().info('form %s - id: %s - status -> %s' % (
self.formdef.name, self.filled.id, self.filled.status))
try:
self.check_auth()
except errors.AccessError:
# the user no longer has access to the form; redirect to a
# different page
if 'backoffice/' in [x[0] for x in get_response().breadcrumb]:
user = get_request().user
if user and (user.is_admin or self.formdef.is_of_concern_for_user(user)):
# user has access to the formdef, redirect to the
# listing.
return '..'
else:
return get_publisher().get_backoffice_url()
else:
return get_publisher().get_root_url()
def download(self):
self.check_receiver()
try:
fn = get_request().form['f']
f = self.filled.data[fn]
except (KeyError, ValueError):
raise errors.TraversalError()
file = self.filled.data[fn]
if not hasattr(file, 'content_type'):
raise errors.TraversalError()
file_url = 'files/%s/' % fn
if get_request().form.get('thumbnail') == '1':
file_url += 'thumbnail/'
if getattr(file, 'base_filename'):
file_url += file.base_filename
return redirect(file_url)
@classmethod
def live_process_fields(cls, form, formdata, displayed_fields):
result = {}
for field in displayed_fields:
result[field.id] = {'visible': field.is_visible(formdata.data, formdata.formdef)}
modified_field_varname = None
for field in displayed_fields:
if field.id == get_request().form.get('modified_field_id'):
modified_field_varname = field.varname
for field in displayed_fields:
if field.key == 'item' and field.data_source:
data_source = data_sources.get_object(field.data_source)
if data_source.type != 'json':
continue
varnames = field.get_referenced_varnames(
formdef=field.formdef,
value=data_source.data_source.get('value'))
if (modified_field_varname is None or modified_field_varname in varnames) and (
field.display_mode == 'autocomplete' and data_source.query_parameter):
# computed earlier, in perform_more_widget_changes, when the field
# was added to the form
result[field.id]['source_url'] = field.url
if modified_field_varname in varnames:
result[field.id]['items'] = [
{'id': x[2], 'text': x[1]} for x in field.get_options(mode='lazy')]
for widget in form.widgets:
if not getattr(widget, 'field', None):
continue
if widget.field.key == 'comment':
result[widget.field.id]['content'] = widget.content
return json.dumps({'result': result})
def live(self):
get_request().ignore_session = True
# live evaluation of fields
get_response().set_content_type('application/json')
def result_error(reason):
return json.dumps({'result': 'error', 'reason': reason})
session = get_session()
if not session:
return result_error('missing session')
displayed_fields = []
user = get_request().user
form = self.filled.get_workflow_form(user, displayed_fields=displayed_fields)
self.filled.evaluate_live_workflow_form(user, form)
get_publisher().substitutions.unfeed(lambda x: x is self.filled)
get_publisher().substitutions.feed(self.filled)
# reevaluate workflow form according to possible new content
displayed_fields = []
form = self.filled.get_workflow_form(user, displayed_fields=displayed_fields)
return self.live_process_fields(form, self.filled, displayed_fields)
def _q_lookup(self, component):
if component == 'files':
self.check_receiver()
return FilesDirectory(self.filled)
if component.startswith('wfedit-'):
return self.wfedit(component[len('wfedit-'):])
return Directory._q_lookup(self, component)
def _q_traverse(self, path):
get_response().breadcrumb.append(
(str(self.filled.id) + '/' , self.filled.get_display_id()))
return super(FormStatusPage, self)._q_traverse(path)
def wfedit(self, action_id):
wf_status = self.filled.get_status()
for item in wf_status.items:
if item.id != action_id:
continue
if not isinstance(item, EditableWorkflowStatusItem):
break
if not item.check_auth(self.filled, get_request().user):
break
f = self.form_page_class(self.formdef.url_name)
f.edit_mode = True
f.edited_data = self.filled
f.edit_action_id = action_id
f.action_url = 'wfedit-%s' % action_id
self.filled.mark_as_being_visited()
get_response().breadcrumb = get_response().breadcrumb[:-1]
get_response().breadcrumb.append((f.action_url, _('Edit')))
return f._q_index()
raise errors.AccessForbiddenError()