608 lines
24 KiB
Python
608 lines
24 KiB
Python
# w.c.s. - web application for online forms
|
|
# Copyright (C) 2005-2010 Entr'ouvert
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation; either version 2 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program; if not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import sys
|
|
import time
|
|
|
|
from quixote import get_publisher, get_request, get_response, get_session, redirect
|
|
from quixote.directory import Directory
|
|
from quixote.html import TemplateIO, htmltext
|
|
|
|
from wcs.api_utils import get_user_from_api_query_string, is_url_signed
|
|
from wcs.fields import WidgetField, FileField
|
|
from wcs.workflows import EditableWorkflowStatusItem
|
|
|
|
from django.template import RequestContext
|
|
|
|
from qommon import _
|
|
from qommon import template
|
|
from qommon import get_logger
|
|
from qommon.form import *
|
|
from qommon.strftime import strftime
|
|
|
|
from qommon.admin.texts import TextsDirectory
|
|
|
|
from qommon import errors
|
|
|
|
|
|
class FileDirectory(Directory):
|
|
_q_exports = []
|
|
_lookup_methods = ['lookup_file_field']
|
|
|
|
def __init__(self, formdata, reference):
|
|
self.formdata = formdata
|
|
self.reference = reference
|
|
|
|
def lookup_file_field(self, filename):
|
|
if self.reference in self.formdata.data:
|
|
return self.formdata.data[self.reference]
|
|
|
|
def _q_lookup(self, component):
|
|
upload = None
|
|
for lookup_method_name in self._lookup_methods:
|
|
lookup_method = getattr(self, lookup_method_name)
|
|
file = lookup_method(filename=component)
|
|
if file:
|
|
break
|
|
else:
|
|
# no such file
|
|
raise errors.TraversalError()
|
|
|
|
if component and component != file.base_filename:
|
|
raise errors.TraversalError()
|
|
|
|
response = get_response()
|
|
if file.content_type:
|
|
response.set_content_type(file.content_type)
|
|
else:
|
|
response.set_content_type('application/octet-stream')
|
|
if file.charset:
|
|
response.set_charset(file.charset)
|
|
if file.base_filename:
|
|
if file.content_type.startswith('image/') or file.content_type == 'application/pdf':
|
|
response.set_header(
|
|
'content-disposition', 'inline; filename="%s"' % file.base_filename)
|
|
else:
|
|
response.set_header(
|
|
'content-disposition', 'attachment; filename="%s"' % file.base_filename)
|
|
|
|
return file.get_file_pointer().read()
|
|
|
|
|
|
|
|
class FilesDirectory(Directory):
|
|
def __init__(self, formdata):
|
|
self.formdata = formdata
|
|
|
|
def _q_lookup(self, component):
|
|
return FileDirectory(self.formdata, reference=component)
|
|
|
|
|
|
class FormStatusPage(Directory):
|
|
_q_exports_orig = ['', 'download', 'json', 'action']
|
|
_q_extra_exports = []
|
|
form_page_class = None
|
|
|
|
history_templates = ['wcs/formdata_history.html']
|
|
|
|
def html_top(self, title = None):
|
|
template.html_top(title = title, default_org = _('Forms'))
|
|
|
|
def __init__(self, formdef, filled, register_workflow_subdirs=True):
|
|
get_publisher().substitutions.feed(filled)
|
|
self.formdef = formdef
|
|
self.formdata = filled
|
|
self.filled = filled
|
|
self._q_exports = self._q_exports_orig[:]
|
|
for q in self._q_extra_exports:
|
|
if not q in self._q_exports:
|
|
self._q_exports.append(q)
|
|
|
|
if self.formdef.workflow and register_workflow_subdirs:
|
|
for name, directory in self.filled.get_workflow_subdirectories():
|
|
self._q_exports.append(name)
|
|
setattr(self, name, directory)
|
|
|
|
def check_auth(self, api_call=False):
|
|
session = get_session()
|
|
mine = False
|
|
if api_call:
|
|
if 'anonymise' in get_request().form:
|
|
if is_url_signed() or (get_request().user and get_request().user.is_admin):
|
|
return None
|
|
else:
|
|
raise errors.AccessUnauthorizedError()
|
|
else:
|
|
user = get_user_from_api_query_string() or get_request().user
|
|
else:
|
|
user = get_request().user
|
|
if user and not user.anonymous:
|
|
if self.filled.is_submitter(user):
|
|
mine = True
|
|
else:
|
|
if session and session.is_anonymous_submitter(self.filled):
|
|
mine = True
|
|
|
|
self.check_receiver()
|
|
return mine
|
|
|
|
def json(self):
|
|
self.check_auth(api_call=True)
|
|
anonymise = 'anonymise' in get_request().form
|
|
return self.export_to_json(anonymise=anonymise)
|
|
|
|
def workflow_messages(self):
|
|
if self.formdef.workflow:
|
|
workflow_messages = self.filled.get_workflow_messages()
|
|
if workflow_messages:
|
|
r = TemplateIO(html=True)
|
|
r += htmltext('<div id="receipt-intro">')
|
|
for workflow_message in workflow_messages:
|
|
if workflow_message.startswith('<'):
|
|
r += htmltext(workflow_message)
|
|
else:
|
|
r += htmltext('<p>%s</p>' % workflow_message)
|
|
r += htmltext('</div>')
|
|
return r.getvalue()
|
|
return ''
|
|
|
|
def receipt_message(self, mine=False):
|
|
workflow_messages = self.workflow_messages()
|
|
if workflow_messages:
|
|
return workflow_messages
|
|
|
|
r = TemplateIO(html=True)
|
|
# behaviour if workflow doesn't display any message
|
|
if self.filled.receipt_time is not None:
|
|
tm = misc.localstrftime(self.filled.receipt_time)
|
|
else:
|
|
tm = '???'
|
|
|
|
r += htmltext('<div id="receipt-intro">')
|
|
if self.formdef.only_allow_one:
|
|
r += TextsDirectory.get_html_text('form-recorded-allow-one', vars={'date': tm})
|
|
else:
|
|
r += TextsDirectory.get_html_text('form-recorded',
|
|
vars={'date': tm, 'number': self.filled.get_display_id()})
|
|
|
|
if mine:
|
|
handling_role = self.filled.get_handling_role()
|
|
if handling_role and handling_role.details:
|
|
endpoint_status = self.formdef.workflow.get_endpoint_status()
|
|
r += htmltext('<p>')
|
|
if self.filled.status in [x.id for x in endpoint_status]:
|
|
r += _('Your case has been handled by:')
|
|
else:
|
|
r += _('Your case is handled by:')
|
|
r += htmltext('</p>')
|
|
r += htmltext('<p id="receiver">')
|
|
r += htmltext(handling_role.details.replace(str('\n'), str('<br />')))
|
|
r += htmltext('</p>')
|
|
|
|
if self.formdef.enable_tracking_codes and self.filled.tracking_code:
|
|
r += htmltext('<p id="tracking-code">')
|
|
r += _('You can get back to this page using the following '
|
|
'tracking code: ')
|
|
r += htmltext('<a href="../code/%s/" rel="popup">%s</a>') % (
|
|
self.filled.tracking_code, self.filled.tracking_code)
|
|
r += htmltext('</p>')
|
|
|
|
r += htmltext('</div>')
|
|
|
|
return r.getvalue()
|
|
|
|
def _q_index(self):
|
|
mine = self.check_auth()
|
|
get_logger().info('form %s - id: %s - view' % (self.formdef.name, self.filled.id))
|
|
|
|
self.html_top(self.formdef.name)
|
|
|
|
r = TemplateIO(html=True)
|
|
|
|
r += self.receipt_message(mine=mine)
|
|
r += self.receipt()
|
|
r += self.history()
|
|
|
|
session = get_session()
|
|
user = get_request().user
|
|
form = self.filled.get_workflow_form(user)
|
|
|
|
if form and form.is_submitted():
|
|
if not form.has_errors():
|
|
url = self.submit(form, comment_only = True)
|
|
if not form.has_errors():
|
|
if url is None:
|
|
url = get_request().get_frontoffice_url()
|
|
response = get_response()
|
|
response.set_status(303)
|
|
response.headers[str('location')] = url
|
|
response.content_type = 'text/plain'
|
|
return "Your browser should redirect you"
|
|
|
|
if form:
|
|
r += form.render()
|
|
|
|
r += self.form_status_buttons()
|
|
return r.getvalue()
|
|
|
|
def export_to_json(self, anonymise=False):
|
|
get_response().set_content_type('application/json')
|
|
return self.filled.export_to_json(anonymise=anonymise)
|
|
|
|
def form_status_buttons(self):
|
|
if not get_response().iframe_mode:
|
|
r = TemplateIO(html=True)
|
|
r += htmltext('<div class="back-home-button">')
|
|
r += htmltext('<a href="%s">%s</a>') % (get_publisher().get_root_url(), _('Back Home'))
|
|
r += htmltext('</div>')
|
|
return r.getvalue()
|
|
|
|
def history(self):
|
|
if not self.filled.evolution:
|
|
return
|
|
if not self.formdef.is_user_allowed_read_status_and_history(get_request().user, self.filled):
|
|
return
|
|
|
|
context = RequestContext(get_request().django_request, {'formdata': self.filled})
|
|
return template.render(self.history_templates, context)
|
|
|
|
def check_receiver(self):
|
|
session = get_session()
|
|
if not session or not session.user:
|
|
if not self.filled.formdef.is_user_allowed_read(None, self.filled):
|
|
raise errors.AccessUnauthorizedError()
|
|
user = get_request().user
|
|
if self.filled.formdef is None:
|
|
raise errors.AccessForbiddenError()
|
|
if not self.filled.formdef.is_user_allowed_read(user, self.filled):
|
|
raise errors.AccessForbiddenError()
|
|
return user
|
|
|
|
def receipt(self, always_include_user=False, show_status=True, form_url='', folded=False):
|
|
user = get_request().user
|
|
if not always_include_user and get_request().user and \
|
|
get_request().user.id == self.filled.user_id:
|
|
user = None
|
|
else:
|
|
try:
|
|
user = get_publisher().user_class.get(self.filled.user_id)
|
|
except KeyError:
|
|
user = None
|
|
|
|
# this is custom code so it is possible to mark forms as anonyms, this
|
|
# is done through the VoteAnonymity field, this is very specific but
|
|
# isn't generalised yet into an useful extension mechanism, as it's not
|
|
# clear at the moment what could be useful.
|
|
for f in self.formdef.fields:
|
|
if f.key == 'vote-anonymity':
|
|
user = None
|
|
break
|
|
|
|
r = TemplateIO(html=True)
|
|
r += htmltext('<div class="bo-block" id="summary">')
|
|
klasses = 'foldable'
|
|
if folded:
|
|
klasses += ' folded'
|
|
r += htmltext('<h2 class="%s">' % klasses)
|
|
r += htmltext('%s</h2>') % _('Summary')
|
|
r += htmltext('<div class="dataview">')
|
|
|
|
if user:
|
|
r += htmltext('<div class="field"><span class="label">%s</span>') % _('User name')
|
|
r += htmltext('<span class="value">%s</span></div>') % user.display_name
|
|
|
|
r += self.display_fields(self.formdef.fields, form_url)
|
|
|
|
if show_status and self.formdef.is_user_allowed_read_status_and_history(
|
|
get_request().user, self.filled):
|
|
wf_status = self.filled.get_visible_status()
|
|
if wf_status:
|
|
r += htmltext('<div><span class="label">%s</span> ') % _('Status')
|
|
r += htmltext('<span class="value">%s</span></div>') % wf_status.name
|
|
|
|
r += htmltext('</div>') # .dataview
|
|
r += htmltext('</div>') # .bo-block
|
|
|
|
return r.getvalue()
|
|
|
|
def display_fields(self, fields, form_url='', include_unset_required_fields=False):
|
|
r = TemplateIO(html=True)
|
|
on_page = False
|
|
on_disabled_page = False
|
|
for f in fields:
|
|
if f.type == 'page':
|
|
on_disabled_page = False
|
|
if not f.is_visible(self.filled.data, self.formdef):
|
|
on_disabled_page = True
|
|
|
|
form_field = False
|
|
for f1 in self.formdef.fields[self.formdef.fields.index(f)+1:]:
|
|
if f1.key == 'page':
|
|
break
|
|
if isinstance(f1, WidgetField):
|
|
form_field = True
|
|
break
|
|
if form_field is False:
|
|
on_disabled_page = True
|
|
|
|
if on_disabled_page:
|
|
continue
|
|
|
|
|
|
if f.type == 'page':
|
|
if on_page:
|
|
r += htmltext('</div>')
|
|
r += htmltext('</div>')
|
|
r += htmltext('<div class="page">')
|
|
r += htmltext('<h3>%s</h3>') % f.label
|
|
r += htmltext('<div>')
|
|
on_page = True
|
|
continue
|
|
|
|
if not hasattr(f, str('get_view_value')):
|
|
continue
|
|
|
|
if not self.filled.data.has_key(f.id):
|
|
value = None
|
|
else:
|
|
if f.store_display_value and ('%s_display' % f.id) in self.filled.data:
|
|
value = self.filled.data['%s_display' % f.id]
|
|
else:
|
|
value = self.filled.data[f.id]
|
|
|
|
if value is None or value == '':
|
|
value = None
|
|
|
|
if value is None and not (f.required and include_unset_required_fields):
|
|
continue
|
|
|
|
css_classes = ['field']
|
|
if f.extra_css_class:
|
|
css_classes.append(f.extra_css_class)
|
|
r += htmltext('<div class="%s">' % ' '.join(css_classes))
|
|
r += htmltext('<span class="label">%s</span> ') % f.label
|
|
if value is None:
|
|
r += htmltext('<div class="value"><i>%s</i></div>') % _('Not set')
|
|
else:
|
|
if isinstance(f, FileField):
|
|
r += htmltext(self.display_file_field(form_url, f, value))
|
|
else: # normal display
|
|
r += htmltext('<div class="value">')
|
|
s = f.get_view_value(value)
|
|
s = s.replace(str('[download]'), str('%sdownload' % form_url))
|
|
r += s
|
|
r += htmltext('</div>')
|
|
r += htmltext('</div>')
|
|
|
|
if on_page:
|
|
r += htmltext('</div></div>')
|
|
|
|
return r.getvalue()
|
|
|
|
def backoffice_fields_section(self):
|
|
backoffice_fields = self.formdef.workflow.get_backoffice_fields()
|
|
if not backoffice_fields:
|
|
return
|
|
content = self.display_fields(backoffice_fields,
|
|
include_unset_required_fields=True)
|
|
if not len(content):
|
|
return
|
|
r = TemplateIO(html=True)
|
|
r += htmltext('<div class="bo-block">')
|
|
r += htmltext('<h2 class="foldable">%s</h2>') % _('Backoffice Data')
|
|
r += htmltext('<div class="dataview">')
|
|
r += content
|
|
r += htmltext('</div>')
|
|
r += htmltext('</div>')
|
|
return r.getvalue()
|
|
|
|
def status(self):
|
|
object_key = 'formdata-%s-%s' % (self.formdef.url_name, self.filled.id)
|
|
|
|
if get_request().get_query() == 'unlock':
|
|
# mark user as active visitor of the object, then redirect to self,
|
|
# the unlocked form will appear.
|
|
self.filled.mark_as_being_visited()
|
|
return redirect('./#lock-notice')
|
|
|
|
user = self.check_receiver()
|
|
form = None
|
|
|
|
try:
|
|
form = self.filled.get_workflow_form(user)
|
|
except:
|
|
# XXX: probably because there are mixed forms, with and without
|
|
# workflow; send a trace nevertheless.
|
|
get_publisher().notify_of_exception(sys.exc_info(), context='[BACKOFFICE]')
|
|
form = Form()
|
|
|
|
if form and form.is_submitted() and not form.has_errors():
|
|
url = self.submit(form)
|
|
get_session().unmark_visited_object(object_key)
|
|
if url is None:
|
|
url = get_request().get_frontoffice_url()
|
|
response = get_response()
|
|
response.set_status(303)
|
|
response.headers[str('location')] = url
|
|
response.content_type = 'text/plain'
|
|
return "Your browser should redirect you"
|
|
|
|
get_logger().info('form %s - id: %s - view status' % (self.formdef.name, self.filled.id))
|
|
self.html_top('%s - %s' % (self.formdef.name, self.filled.id))
|
|
r = TemplateIO(html=True)
|
|
|
|
r += htmltext(self.workflow_messages())
|
|
|
|
# fold the summary if the form has already been seen by the user, i.e. if the user is
|
|
# present in the formdata log (evolution).
|
|
folded = False
|
|
if user and self.filled.evolution:
|
|
for evo in self.filled.evolution:
|
|
if (str(evo.who) == str(user.id) or
|
|
(evo.who == '_submitter' and self.filled.is_submitter(user))):
|
|
folded = True
|
|
break
|
|
|
|
r += self.receipt(always_include_user=True, folded=folded)
|
|
r += self.backoffice_fields_section()
|
|
|
|
r += self.history()
|
|
|
|
locked = False
|
|
if form:
|
|
all_visitors = get_publisher().get_object_visitors(object_key)
|
|
visitors = [x for x in all_visitors if x[0] != get_session().user]
|
|
me_in_visitors = bool(get_session().user in [x[0] for x in all_visitors])
|
|
if visitors:
|
|
current_timestamp = time.time()
|
|
visitor_users = []
|
|
for visitor_id, visitor_timestamp in visitors:
|
|
try:
|
|
visitor_name = get_publisher().user_class.get(visitor_id).display_name
|
|
except KeyError:
|
|
continue
|
|
minutes_ago = int((current_timestamp - visitor_timestamp) / 60)
|
|
if minutes_ago < 1:
|
|
time_ago = _('less than a minute ago')
|
|
else:
|
|
time_ago = _('less than %s minutes ago') % (minutes_ago + 1)
|
|
visitor_users.append('%s (%s)' % (visitor_name, time_ago))
|
|
if visitor_users:
|
|
r += htmltext('<div id="lock-notice" class="infonotice"><p>')
|
|
r += _('Be warned forms of this user are also being looked at by: '
|
|
'%s.') % ', '.join(visitor_users)
|
|
r += ' '
|
|
r += htmltext('</p>')
|
|
if not me_in_visitors:
|
|
locked = True
|
|
r += htmltext('<p class="action"><a href="?unlock">%s</a></p>'
|
|
) % _('(unlock actions)')
|
|
r += htmltext('</div>')
|
|
if not visitors or me_in_visitors:
|
|
r += form.render()
|
|
self.filled.mark_as_being_visited()
|
|
related_user_forms = getattr(self.filled, 'related_user_forms', None) or []
|
|
user_roles = set(get_request().user.roles or [])
|
|
for user_formdata in related_user_forms:
|
|
if user_roles.intersection(user_formdata.actions_roles):
|
|
user_formdata.mark_as_being_visited()
|
|
|
|
if not locked:
|
|
if (self.filled.get_status() and self.filled.get_status().backoffice_info_text) or (
|
|
form and any((getattr(button, 'backoffice_info_text', None)
|
|
for button in form.get_submit_widgets()))):
|
|
r += htmltext('<div class="backoffice-description bo-block">')
|
|
if self.filled.get_status().backoffice_info_text:
|
|
r += htmltext(self.filled.get_status().backoffice_info_text)
|
|
if form:
|
|
for button in form.get_submit_widgets():
|
|
if not getattr(button, 'backoffice_info_text', None):
|
|
continue
|
|
r += htmltext('<div class="action-info-text" data-button-name="%s">' % button.name)
|
|
r += htmltext(button.backoffice_info_text)
|
|
r += htmltext('</div>')
|
|
r += htmltext('</div>')
|
|
|
|
r += htmltext('<a href="..">%s</a>') % _('Back to Listing')
|
|
return r.getvalue()
|
|
|
|
def submit(self, form, comment_only = False):
|
|
current_status = self.filled.status
|
|
user = get_request().user
|
|
next_url = self.filled.handle_workflow_form(user, form)
|
|
if next_url:
|
|
return next_url
|
|
if form.has_errors():
|
|
return
|
|
if current_status != self.filled.status:
|
|
get_logger().info('form %s - id: %s - status -> %s' % (
|
|
self.formdef.name, self.filled.id, self.filled.status))
|
|
try:
|
|
self.check_auth()
|
|
except errors.AccessError:
|
|
# the user no longer has access to the form; redirect to a
|
|
# different page
|
|
if 'backoffice/' in [x[0] for x in get_response().breadcrumb]:
|
|
user = get_request().user
|
|
if user and (user.is_admin or self.formdef.is_of_concern_for_user(user)):
|
|
# user has access to the formdef, redirect to the
|
|
# listing.
|
|
return '..'
|
|
else:
|
|
return get_publisher().get_backoffice_url()
|
|
else:
|
|
return get_publisher().get_root_url()
|
|
|
|
def download(self):
|
|
self.check_receiver()
|
|
try:
|
|
fn = get_request().form['f']
|
|
f = self.filled.data[fn]
|
|
except (KeyError, ValueError):
|
|
raise errors.TraversalError()
|
|
|
|
file = self.filled.data[fn]
|
|
if not hasattr(file, 'content_type'):
|
|
raise errors.TraversalError()
|
|
|
|
if hasattr(file, 'base_filename') and file.base_filename:
|
|
return redirect('files/%s/%s' % (fn, file.base_filename))
|
|
else:
|
|
return redirect('files/%s/' % fn)
|
|
|
|
def display_file_field(self, form_url, field, value):
|
|
r = TemplateIO(html=True)
|
|
r += htmltext('<div class="value">')
|
|
s = field.get_view_value(value)
|
|
s = s.replace(str('[download]'), str('%sdownload' % form_url))
|
|
r += s
|
|
r += htmltext('</div>')
|
|
return str(r)
|
|
|
|
def _q_lookup(self, component):
|
|
if component == 'files':
|
|
self.check_receiver()
|
|
return FilesDirectory(self.filled)
|
|
if component.startswith('wfedit-'):
|
|
return self.wfedit(component[len('wfedit-'):])
|
|
return Directory._q_lookup(self, component)
|
|
|
|
def _q_traverse(self, path):
|
|
get_response().breadcrumb.append(
|
|
(str(self.filled.id) + '/' , self.filled.get_display_id()))
|
|
return super(FormStatusPage, self)._q_traverse(path)
|
|
|
|
def wfedit(self, action_id):
|
|
wf_status = self.filled.get_status()
|
|
for item in wf_status.items:
|
|
if item.id != action_id:
|
|
continue
|
|
if not isinstance(item, EditableWorkflowStatusItem):
|
|
break
|
|
if not item.check_auth(self.filled, get_request().user):
|
|
break
|
|
f = self.form_page_class(self.formdef.url_name)
|
|
f.edit_mode = True
|
|
f.edited_data = self.filled
|
|
f.edit_action_id = action_id
|
|
f.action_url = 'wfedit-%s' % action_id
|
|
self.filled.mark_as_being_visited()
|
|
get_response().breadcrumb = get_response().breadcrumb[:-1]
|
|
get_response().breadcrumb.append((f.action_url, _('Edit')))
|
|
return f._q_index()
|
|
|
|
raise errors.AccessForbiddenError()
|