wcs/wcs/forms/root.ptl

1266 lines
50 KiB
Plaintext

# w.c.s. - web application for online forms
# Copyright (C) 2005-2010 Entr'ouvert
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import os
import time
import urllib
import re
from quixote import get_publisher, get_request, get_response, get_session, redirect
from quixote.directory import Directory, AccessControlled
from quixote.util import randbytes
from quixote.form.widget import *
from qommon import errors, get_cfg
from wcs import formdata
from qommon import misc, get_logger
from qommon import template
from qommon.form import *
from qommon import tokens
from qommon import ezt
from wcs.anonylink import AnonymityLink
from wcs.categories import Category
from wcs.formdef import FormField, FormDef
from wcs.formdata import FormData
from wcs.users import User
from wcs.roles import logged_users_role
from wcs.workflows import Workflow, EditableWorkflowStatusItem
from qommon.admin.texts import TextsDirectory
from backoffice import FormDefUI
import qommon.idwsf as idwsf
try:
from qommon.certificate import Certificate
m2crypto = True
except ImportError:
m2crypto = False
try:
import lasso
except ImportError:
lasso = None
import qommon.liberty as liberty
import StringIO as sio
try:
import elementtree.ElementTree as ET
except ImportError:
try:
import xml.etree.ElementTree as ET
except ImportError:
ET = None
def html_top [html] (title = None):
template.html_top(title = title, default_org = _('Forms'))
def get_user_forms(formdef):
"""Return forms data for the current user
formdef - the formdef from which we want form datas
"""
session = get_session()
user = session.get_user()
user_forms = []
if user:
if user.anonymous:
anonylinks = AnonymityLink.select(
lambda x: x.name_identifier == session.name_identifier and
x.formdata_type == 'form')
for anonylink in anonylinks:
if anonylink.formdata_def_id != formdef.id:
continue
user_forms.append(formdef.data_class().get(anonylink.formdata_id))
else:
user_forms.extend(formdef.data_class().get_with_indexed_value('user_id', user.id))
try:
user_forms.extend(formdef.data_class().get_with_indexed_value('user_hash', user.hash))
except AttributeError:
pass
return user_forms
from wcs.forms.common import FormStatusPage
class TokenDirectory(Directory):
_q_exports = ['']
def __init__(self, formdef, token):
self.formdef = formdef
self.token = token
def _q_index [html] (self):
if self.token.type != 'form-invite':
raise errors.TraversalError()
if self.token.formdef_id != self.formdef.id:
raise errors.TraversalError()
try:
self.token.remove_self()
except OSError:
# race condition, and the token already got removed (??!)
self.token.type = None
get_session().set_user(self.token.user_id)
return redirect(self.formdef.get_url())
class TokensDirectory(Directory):
def __init__(self, formdef):
self.formdef = formdef
def _q_lookup(self, component):
try:
token = tokens.Token.get(component)
except KeyError:
raise errors.TraversalError()
return TokenDirectory(self.formdef, token)
class FormPage(Directory):
_q_exports = ['', 'listing', 'tempfile', 'tokens']
steps = [N_("Filling"), N_("Validating"), N_("Signing"), N_("Receipt")]
def __init__(self, component):
try:
self.formdef = FormDef.get_by_urlname(component)
except KeyError:
raise errors.TraversalError()
get_publisher().substitutions.feed(self.formdef)
self.tokens = TokensDirectory(self.formdef)
self.page_number = len([
x for x in self.formdef.fields[1:] if x.type == 'page']) + 1
self.user = get_request().user
get_response().breadcrumb.append( (component + '/', self.formdef.name) )
def check_role(self):
if self.formdef.roles:
if not self.user:
raise errors.AccessUnauthorizedError()
if logged_users_role().id not in self.formdef.roles and not (
self.user and self.user.is_admin):
for q in (self.user and self.user.roles) or []:
if q in self.formdef.roles or q == self.formdef.receiver_id:
break
else:
raise errors.AccessForbiddenError()
def step [html] (self, no, page_no = 0, log_detail = None, data = None, editing = None):
'<div id="steps"><ol>'
steps = self.steps[:]
if not self.formdef.confirmation and not self.formdef.signing:
del steps[1:3]
elif not self.formdef.confirmation:
del steps[1]
elif not self.formdef.signing:
del steps[2]
if log_detail:
get_logger().info('form %s - step %s (%s)' % (self.formdef.name, steps[no], log_detail))
else:
get_logger().info('form %s - step %s' % (self.formdef.name, steps[no]))
for i, l in enumerate(steps):
classes = ['step-%d' % i]
if no == i:
classes.append('current')
elif no < i:
classes.append('step-after')
elif no > i:
classes.append('step-before')
if i+1 == len(steps):
classes.append('last')
if i == 0:
classes.append('first')
'<li class="%s">' % ' '.join(classes)
'<span class="marker">%d</span> <span class="label">%s</span>' % (i+1, _(l))
if i == 0 and (no == i or True) and self.page_number > 1:
'<ul>'
t = 0
for field in self.formdef.fields:
if field.type != 'page':
continue
classes = []
if t == page_no and no == i:
classes.append('current')
t += 1
if not field.is_visible(data, self.formdef):
continue
'<li class="%s"><span>%s</span></li>' % (' '.join(classes), field.label)
'</ul>'
'</li>'
if editing:
break
'</ol></div>'
def get_disco_epr(self):
session=get_session()
if not session.lasso_session_dump:
return None
lasso_session=lasso.Session.newFromDump(session.lasso_session_dump)
if not lasso_session:
return None
assertions = lasso_session.assertions
if not assertions or not assertions.values():
return None
assertion = assertions.values()[0]
if not isinstance(assertion, lasso.Saml2Assertion):
return None
return assertion.idwsf2GetDiscoveryBootstrapEpr()
def get_eprs(self, service_type):
disco_epr = self.get_disco_epr()
if not disco_epr:
return []
disco = idwsf.DiscoveryClient2(epr=disco_epr,
server=misc.get_lasso_server(protocol='saml2'))
disco.setLogger(get_logger())
eprs = disco.lookupService(service_type)
return eprs
def do_queries(self, epr, queries, namespaces, flatten=True):
dst = idwsf.DSTClient2(epr=epr,
server=misc.get_lasso_server(protocol='saml2'))
dst.setLogger(get_logger())
result = dst.query(queries, flatten = flatten, namespaces = namespaces)
return result
def get_namespaces(self):
namespaces = {
'pp': getattr(lasso, 'PP11_HREF', getattr(lasso, 'PP_HREF', None)), # support old lasso
'ep': getattr(lasso, 'EP_HREF', ''),
'eoa': 'urn:entrouvert:id-sis-adeline:2005-09', # eo schema for adeline
'cafp': 'urn:entrouvert:id-sis-caf-profile:2005-09', # eo schema for caf
}
namespaces.update(get_cfg('misc', {}).get('namespaces', {}))
return namespaces
def get_queries(self, fields):
queries = {}
for field in fields:
query = field.prefill.get('value')
if query:
queries['f%s' % field.id] = query
return queries
def prefill_form(self, form, results):
for key, value in results.items():
widget = form.get_widget(key)
if widget and value:
encoded = get_publisher().utf82sitecharset(value)
widget.set_value(encoded)
def wsf_prefill(self, form):
service_types = self.get_namespaces()
session = get_session()
if not session.lasso_session_dump:
return False
if str('DiscoveryResourceOffering') in session.lasso_session_dump:
idwsf = 1
if str('urn:liberty:disco:2006-08:DiscoveryEPR') in session.lasso_session_dump:
idwsf = 2
request = get_request()
prefill_fields = []
if request.form.has_key('prefill') and type(request.form['prefill']) is list:
prefill_fields = request.form['prefill']
prefixes = {}
for f in self.formdef.fields:
if not f.prefill:
continue
if f.prefill.get('type') != 'wsf':
continue
if prefill_fields and not f.id in prefill_fields:
continue
if f.wsf_prefill_explicit and not f.id in prefill_fields:
continue
wsf_prefill_expression = f.prefill.get('value')
m = re.search('/*(\w+):', wsf_prefill_expression)
prefix = m.group(1)
if not service_types.has_key(prefix):
get_logger().warn('Unknown prefix: %r' % prefix)
continue
if not prefixes.has_key(prefix):
prefixes[prefix] = []
prefixes[prefix].append(f)
has_prefilled = False
if idwsf == 1:
for prefix, fields in prefixes.items():
if service_types[prefix] not in ('pp', 'ep'):
# needs lasso registration
lasso.registerDstService(prefix, service_types[prefix])
disco = lasso.Discovery(misc.get_lasso_server())
disco.setSessionFromDump(get_session().lasso_session_dump)
disco.initQuery()
disco.addRequestedServiceType(service_types[prefix])
disco.buildRequestMsg()
try:
soap_answer = liberty.soap_call(disco.msgUrl, disco.msgBody)
except liberty.SOAPException:
continue
disco.processQueryResponseMsg(soap_answer)
service = disco.getService()
if not service:
continue
service.initQuery()
for f in fields:
service.addQueryItem(f.prefill.get('value'), f.id)
service.buildRequestMsg()
try:
soap_answer = liberty.soap_call(service.msgUrl, service.msgBody)
except liberty.SOAPException:
continue
service.processQueryResponseMsg(soap_answer)
for f in fields:
answer = service.getAnswerForItemId(f.id)
if not answer:
continue
answer = re.sub('<.*?>', '', answer)
form._names['f%s' % f.id].set_value(get_publisher().utf82sitecharset(answer))
has_prefilled = True
if idwsf == 2:
for prefix, fields in prefixes.items():
try:
lasso.registerIdwsf2DstService(prefix, service_types[prefix])
eprs = self.get_eprs(service_types[prefix])
queries = self.get_queries(fields)
if eprs:
# FIXME: if more than one epr
results = self.do_queries(eprs[0], queries, service_types)
self.prefill_form(form, results)
has_prefilled = True
except Exception, exception:
raise
get_logger().error("Got an exception when trying to prefill %s: %s " % (str(form), str(exception)))
return has_prefilled
def page [html] (self, page_no, page_change = True, log_detail = None, editing = None):
if self.formdef.signing == "compulsory":
'<noscript>'
'<div class=errornotice>%s</div>' % \
_('You need to enable javascript to complete this form')
'</noscript>'
displayed_fields = []
session = get_session()
if page_no > 0:
magictoken = get_request().form['magictoken']
self.feed_current_data(magictoken)
form = self.formdef.create_form(page_no, displayed_fields)
if True or [x for x in displayed_fields if x.prefill and
x.prefill.get('type') == 'wsf'] and (
session.lasso_session_dump and
( str('DiscoveryResourceOffering') in session.lasso_session_dump or
str('urn:liberty:disco:2006-08:DiscoveryEPR') in session.lasso_session_dump)):
form.add_submit('prefill', _('Prefill'))
if form.get_submit() == 'prefill':
form.clear_errors()
if not self.wsf_prefill(form):
form.info = N_('No data found for prefilling')
request = get_request()
if request.form.has_key('prefill') and not type(request.form['prefill']) is list:
form.submit_widgets.remove(form._names['prefill'])
del form._names['prefill']
if form._names.has_key('prefill') and not [x for x in displayed_fields if \
x.prefill and x.prefill.get('type') == 'wsf']:
form.submit_widgets.remove(form._names['prefill'])
del form._names['prefill']
if not editing:
if page_no == 0 and not get_request().form.has_key('magictoken'):
magictoken = randbytes(8)
else:
magictoken = get_request().form['magictoken']
form.add_hidden('magictoken', magictoken)
form.add_submit('submit', _('Next'))
data = session.get_by_magictoken(magictoken, {})
else:
if page_no == self.page_number - 1:
form.add_submit('submit', _('Save Changes'))
else:
form.add_submit('submit', _('Next'))
data = editing.data
if page_no > 0:
form.add_submit('previous', _('Previous'))
if page_change:
# on page change, we fake a GET request so the form is not altered
# with errors from the previous submit; if the page was already
# visited, we restore values; otherwise we set req.form as empty.
req = get_request()
req.environ['REQUEST_METHOD'] = 'GET'
one = False
for field in displayed_fields:
k = field.id
v = None
prefilled = False
if data.has_key(k):
v = data[k]
elif field.prefill:
t = field.prefill.get('type')
if t == 'string':
v = field.prefill.get('value')
elif t == 'user' and get_request().user:
x = field.prefill.get('value')
user = get_request().user
if x == 'email':
v = user.email
elif user.form_data:
userform = user.get_formdef()
for userfield in userform.fields:
if userfield.id == x:
v = user.form_data.get(x)
break
elif t == 'formula':
formula = field.prefill.get('value')
try:
v = str(eval(formula,
get_publisher().get_global_eval_dict(),
get_publisher().substitutions.get_context_variables()))
except:
pass
if v:
prefilled = True
form.get_widget('f%s' % k).set_message(
_('Value has been automatically prefilled.'))
form.get_widget('f%s' % k).prefilled = True
if not prefilled and form.get_widget('f%s' % k):
form.get_widget('f%s' % k).clear_error()
if v is not None:
if not isinstance(v, str) and field.convert_value_to_str:
v = field.convert_value_to_str(v)
form.get_widget('f%s' % k).set_value(v)
req.form['f%s' % k] = v
one = True
if not one:
req.form = {}
html_top(self.formdef.name)
self.step(0, page_no, log_detail, data = data, editing = editing)
form.add_hidden('step', '0')
form.add_hidden('page', page_no)
if get_request().form.has_key('mt'):
form.add_submit('cancel', _('Remove Draft'), css_class = 'cancel')
else:
form.add_submit('cancel', _('Cancel'), css_class = 'cancel')
if session.user and self.formdef.allow_drafts:
form.add_submit('savedraft', _('Save As Draft'), css_class = 'save-draft')
form.render()
def feed_current_data(self, magictoken):
# create a fake FormData to feed variables
formdata = FormData()
formdata._formdef = self.formdef
formdata.user = get_request().user
formdata.data = get_session().get_by_magictoken(magictoken, {})
formdata.status = str('')
get_publisher().substitutions.feed(formdata)
def _q_index(self, log_detail = None, editing = None):
self.check_role()
if self.formdef.is_disabled():
if self.formdef.disabled_redirection:
if re.search('\[.*\]', self.formdef.disabled_redirection):
try:
variables = get_publisher().substitutions.get_context_variables()
redirect_url = template.process_template(self.formdef.disabled_redirection, variables)
return redirect(redirect_url)
except ezt.EZTException:
pass
return redirect(self.formdef.disabled_redirection)
else:
raise errors.AccessForbiddenError()
session = get_session()
existing_formdata = None
if editing:
existing_formdata = editing.data
if not get_request().form:
token = randbytes(8)
get_request().form['magictoken'] = token
session.add_magictoken(token, editing.data)
elif self.formdef.only_allow_one:
user_forms = get_user_forms(self.formdef)
if [x for x in user_forms if x.status != 'draft']:
return redirect('%s/' % user_forms[0].id)
form = Form()
form.add_hidden('step', '-1')
form.add_hidden('page', '-1')
form.add_hidden('magictoken', '-1')
form.add_submit('cancel')
if not m2crypto and self.formdef.signing == "compulsory":
get_logger().error("You need to install M2Crypto module "\
"to use the signature feature")
return self.error(_("This form is unusable because the signature "\
"is not functional. Please contact your system "\
"administrator."))
useragent = get_request().get_header('User-agent', '')
if not 'Gecko' in useragent and self.formdef.signing == 'compulsory':
return self.error(_('Browser unsupported for this form. Please use Firefox.'))
if self.formdef.allow_drafts:
form.add_submit('savedraft')
if not form.is_submitted():
if get_request().form.has_key('mt'):
magictoken = get_request().form['mt']
data = session.get_by_magictoken(magictoken, {})
if data:
# create a new one since the other has been exposed in a url
magictoken = randbytes(8)
session.add_magictoken(magictoken, data)
get_request().form['magictoken'] = magictoken
if data.has_key('page_no'):
page_no = int(data['page_no'])
del data['page_no']
if page_no == -1:
req = get_request()
for k, v in data.items():
req.form['f%s' % k] = v
return self.validating(data)
else:
page_no = 0
return self.page(page_no, True)
return self.page(0, editing = editing)
if form.get_submit() == 'cancel':
get_logger().info('form %s - cancel' % (self.formdef.name))
if editing:
return redirect('.')
return redirect(get_publisher().get_root_url())
try:
step = int(form.get_widget('step').parse())
except TypeError:
step = 0
if step == 0:
try:
page_no = int(form.get_widget('page').parse())
except TypeError:
page_no = -1
try:
magictoken = form.get_widget('magictoken').parse()
except KeyError:
magictoken = randbytes(8)
self.feed_current_data(magictoken)
form = self.formdef.create_form(page_no)
form.add_submit('previous')
if self.formdef.allow_drafts:
form.add_submit('savedraft')
form.add_submit('submit')
if page_no > 0 and form.get_submit() == 'previous':
return self.previous_page(page_no, magictoken, editing = editing)
if self.formdef.allow_drafts and form.get_submit() == 'savedraft':
form_data = session.get_by_magictoken(magictoken, {})
data = self.formdef.get_data(form)
form_data.update(data)
self.save_draft(form_data, page_no)
return redirect(get_publisher().get_root_url())
# form.get_submit() returns the name of the clicked button, and
# it will return True if the form has been submitted, but not
# by clicking on a submit widget; for example if an "add row"
# button is clicked.
if form.has_errors() or form.get_submit() is True:
return self.page(page_no, page_change = False, editing = editing)
form_data = session.get_by_magictoken(magictoken, {})
data = self.formdef.get_data(form)
form_data.update(data)
session.add_magictoken(magictoken, form_data)
while True:
page_no = int(page_no) + 1
try:
next_page = self.formdef.get_page(page_no)
except IndexError:
break
if next_page.is_visible(form_data, self.formdef):
break
if page_no == self.page_number:
# last page has been submitted
req = get_request()
for field in self.formdef.fields:
k = field.id
if form_data.has_key(k):
v = form_data[k]
if field.convert_value_to_str:
v = field.convert_value_to_str(v)
req.form['f%s' % k] = v
if editing:
form = self.formdef.create_view_form(form_data, use_tokens = False)
return self.submitted_existing(form, editing)
if self.formdef.confirmation:
return self.validating(form_data)
else:
step = 1 # so it will flow to submit
# kind of restore state
form = Form()
form.add_hidden('step', '-1')
form.add_hidden('page', '-1')
form.add_hidden('magictoken', '-1')
form.add_submit('cancel')
if self.formdef.allow_drafts:
form.add_submit('savedraft')
else:
return self.page(page_no, editing = editing)
if step == 1:
form.add_submit('previous')
magictoken = form.get_widget('magictoken').parse()
if form.get_submit() == 'previous':
return self.previous_page(self.page_number, magictoken, editing = editing)
magictoken = form.get_widget('magictoken').parse()
form_data = session.get_by_magictoken(magictoken, {})
data = self.formdef.get_data(form)
form_data.update(data)
session.add_magictoken(magictoken, form_data)
if self.formdef.signing:
return self.signing(form_data)
else:
step = 2 # so it will flow to submit
form = Form()
form.add_hidden('step', '-1')
form.add_hidden('page', '-1')
form.add_hidden('magictoken', '-1')
form.add_submit('cancel')
if step == 2:
form.add_submit('previous')
magictoken = form.get_widget('magictoken').parse()
self.feed_current_data(magictoken)
form_data = session.get_by_magictoken(magictoken, {})
signature = None
if form.get_submit() == 'previous':
if self.formdef.signing:
if self.formdef.confirmation:
return self.validating(form_data)
else:
return self.previous_page(self.page_number, magictoken, editing = editing)
return self.previous_page(self.page_number, magictoken, editing = editing)
if self.formdef.signing:
cert = None
form.add(TextWidget, 'signature', style="display: none;")
cert = form.get_widget('signature').parse()
get_request().form['signature'] = ""
if cert:
if "userCancel" in cert:
return self.signing(form_data, _("Signature has been cancelled"))
elif "noMatchingCert" in cert:
return self.signing(form_data, _("No matching certificate found in your browser"))
elif "internalError" in cert:
return self.signing(form_data, _("Signature failed: internal error"))
cert = urllib.unquote(cert)
try:
signature = self.get_signature(cert, form_data)
except:
return self.signing(form_data, _("Signing failled"))
if not signature and self.formdef.signing == "compulsory":
return self.signing(form_data, _("Invalid signature"))
if signature and not signature["valid"]:
return self.signing(form_data,
_("The signautre validation failed. Perhaps the administrator needs to add a CA."))
if self.formdef.allow_drafts and form.get_submit() == 'savedraft':
self.save_draft(form_data, page_no = -1)
return redirect(get_publisher().get_root_url())
# so it gets FakeFileWidget in preview mode
form = self.formdef.create_view_form(form_data,
use_tokens = self.formdef.confirmation)
if form.has_errors():
# the only possible error here is a token error if the form is
# submitted a second time
return template.error_page(_('This form has already been submitted.'))
return self.submitted(form, existing_formdata, signature)
def previous_page(self, page_no, magictoken, editing = None):
session = get_session()
form_data = session.get_by_magictoken(magictoken, {})
while True:
page_no = int(page_no) - 1
try:
previous_page = self.formdef.get_page(page_no)
except IndexError:
break
if not previous_page.condition:
break
if previous_page.is_visible(form_data, self.formdef):
break
return self.page(page_no, page_change = True, editing = editing)
def save_draft(self, data, page_no):
filled = self.formdef.data_class()()
filled.data = data
filled.status = 'draft'
filled.page_no = page_no
session = get_session()
if session and session.user and not str(session.user).startswith('anonymous-'):
filled.user_id = session.user
filled.store()
get_logger().info('form %s - saving draft (id: %s)' % (self.formdef.name, filled.id))
def submitted(self, form, existing_formdata = None, signature = None):
if existing_formdata: # modifying
filled = existing_formdata
filled.last_modification_time = time.localtime()
# XXX: what about status?
else:
filled = self.formdef.data_class()()
filled.just_created()
filled.data = self.formdef.get_data(form)
filled.signature = signature
session = get_session()
if session and session.user and not str(session.user).startswith('anonymous-'):
try:
filled.user_hash = get_request().user.hash
except AttributeError:
filled.user_id = get_request().user.id
if self.formdef.only_allow_one:
# this is already checked in _q_index but it's done a second time
# just before a new form is to be stored.
user_forms = get_user_forms(self.formdef)
if [x for x in user_forms if x.status != 'draft']:
return redirect('%s/' % user_forms[0].id)
filled.store()
if not filled.user_id and existing_formdata is None:
a = AnonymityLink()
a.formdata_type = 'form'
a.formdata_def_id = self.formdef.id
a.formdata_id = filled.id
if session.name_identifier:
a.name_identifier = session.name_identifier
# XXX nothing with anonylink.key ?
a.store()
get_logger().info('form %s - done (id: %s)' % (self.formdef.name, filled.id))
if existing_formdata is None:
url = filled.perform_workflow()
if url:
return redirect(url)
return self.receipt_page(filled)
def submitted_existing(self, form, editing):
old_data = editing.data
editing.data = self.formdef.get_data(form)
editing.store()
return redirect('.')
def tempfile(self):
self.check_role()
if not self.formdef.acl_read == 'all' and (
self.user and not self.user.id == get_session().user):
self.check_receiver()
try:
t = get_request().form['t']
tempfile = get_session().tempfiles[t]
value = tempfile['value']
except (KeyError, ValueError, TypeError):
raise errors.TraversalError()
response = get_response()
if tempfile['content_type']:
response.set_content_type(tempfile['content_type'])
else:
response.set_content_type('application/octet-stream')
if tempfile['charset']:
response.set_charset(tempfile['charset'])
del tempfile['value']
return value
def get_signature(self, cert, data):
"""
Return: a signature dict if the signature is valide else
return a None object
"""
useragent = get_request().get_header("User-agent")
signature = None
if cert:
cert = "-----BEGIN PKCS7-----\n%s\n-----END PKCS7-----" % cert
cert = Certificate(cert, "pkcs7")
data = self.formdef.get_sign_text(data).decode('utf-8')
if "MSIE" in useragent:
data = data.encode("utf_16_le")
else:
data = data.encode("utf8")
valid = cert.validate(data)
signature = {"issuer": cert.issuer,
"cert": cert.cert,
"subject": cert.subject,
"valid": valid}
return signature
def validating [html] (self, data):
html_top(self.formdef.name)
'<div class="form-validation">'
self.step(1, data = data)
TextsDirectory.get_html_text('check-before-submit')
form = self.formdef.create_view_form(data)
token_widget = form.get_widget(form.TOKEN_NAME)
token_widget._parsed = True
form.add_submit('previous', _('Previous'))
form.add_submit('submit', _('Submit'))
form.add_submit('cancel', _('Cancel'), css_class = 'cancel')
session = get_session()
if session.user and self.formdef.allow_drafts:
form.add_submit('savedraft', _('Save As Draft'), css_class = 'save-draft')
if self.formdef.signing:
form.add_hidden('step', '1')
else:
form.add_hidden('step', '2')
magictoken = get_request().form['magictoken']
form.add_hidden('magictoken', magictoken)
form.render()
'</div>'
def signing [html] (self, data, error_msg = None):
supported_browsers = [str("Gecko"), str("MSIE")]
html_top(self.formdef.name)
self.step(2, data = data)
if not m2crypto:
error_msg = _("You can't sign this form")
if error_msg:
'<div class="errornotice">%s</div>' % error_msg
'<noscript>'
if self.formdef.signing == 'compulsory':
'<div class="errornotice">%s</div>' % \
_('You need to enable javascript to use this feature.')
elif self.formdef.signing == 'optional':
'<p>%s</p>' % \
_("You can't use this feature without javascript. Please go to the next step.")
form = Form()
form.add_submit('next', _('Next'))
form.add_submit('cancel', _('Cancel'), css_class = 'cancel')
session = get_session()
form.add_hidden('step', '2')
magictoken = get_request().form['magictoken']
form.add_hidden('magictoken', magictoken)
form.render()
'<div class="back-home-button">'
homepage = get_publisher().get_root_url()
'<a href="%s">%s</a>' % (homepage, _('Back Home'))
'</div>'
'</noscript>'
useragent = get_request().get_header(str("User-agent"))
browser_supported = False
if True in [browser in useragent for browser in supported_browsers]:
browser_supported = True
if not browser_supported:
'<div class="errornotice">%s</div>' % \
_("This feature doesn't work with your browser. Please use Firefox.")
get_response().add_javascript(['jquery.js', 'jquery.sha1.js', 'sign.js'])
form = self.formdef.create_view_form(data, visible = False)
token_widget = form.get_widget(form.TOKEN_NAME)
token_widget._parsed = True
form.add(StringWidget, 'signature', style="display: none;")
text = urllib.quote(self.formdef.get_sign_text(data), str("#;,/?:@&=+$,-_.!~*()"))
form.add_submit('previous', _('Previous'))
if self.formdef.signing == "optional":
form.add_submit('next', _('Submit without signing'))
if m2crypto and browser_supported:
form.add_submit('submit', value=_('Sign and submit'),
onClick="signText(this.form, '%s')" % text)
form.add_submit('cancel', _('Cancel'), css_class = 'cancel')
session = get_session()
form.add_hidden('step', '2')
magictoken = get_request().form['magictoken']
form.add_hidden('magictoken', magictoken)
form.render()
def error [html] (self, msg):
html_top(self.formdef.name)
homepage = get_publisher().get_root_url()
'<div class="errornotice">%s</div>' % msg
'<a href="%s">%s</a>' % (homepage, _('Back Home'))
def receipt_page [html] (self, filled):
html_top(self.formdef.name)
if self.formdef.signing:
self.step(3)
elif self.formdef.confirmation:
self.step(2)
else:
self.step(1)
if self.formdef.workflow:
htmltext(filled.display_workflow_message())
else:
# behaviour without workflow set
tm = misc.localstrftime(filled.receipt_time)
'<div id="receipt-intro">'
TextsDirectory.get_html_text('form-recorded',
vars={'date': tm, 'number': filled.id})
if self.formdef.receiver:
if self.formdef.receiver.details:
'<p>'
_('Your case will be handled by:')
'</p>'
'<p id="receiver">'
htmltext(self.formdef.receiver.details.replace(str('\n'), str('<br />')))
'</p>'
'</div>'
form_status = PublicFormStatusPage(self.formdef, filled)
form_status.receipt(show_status = False, show_signature = False,
form_url = os.path.join(get_request().get_url(1), str(filled.id), ''))
if not get_response().iframe_mode:
'<div class="back-home-button">'
homepage = get_publisher().get_root_url()
'<a href="%s">%s</a>' % (homepage, _('Back Home'))
'</div>'
def listing [html] (self):
if not self.formdef.is_user_allowed_read(get_request().user):
raise errors.AccessForbiddenError()
get_response().breadcrumb.append( ('listing', _('Listing')) )
html_top('%s - %s' % (_('Listing'), self.formdef.name))
fields = []
for field in self.formdef.fields:
if hasattr(field, str('get_view_value')) and field.in_listing:
fields.append(field)
FormDefUI(self.formdef).listing(fields, 'all', include_form=True)
'<p><a href="..">%s</a></p>' % _('Back')
def _q_lookup(self, component):
try:
filled = self.formdef.data_class().get(component)
except KeyError:
raise errors.TraversalError()
if filled.status != 'draft':
return PublicFormStatusPage(self.formdef, filled)
if get_request().user is None:
raise errors.AccessUnauthorizedError()
session = get_session()
if session.user != filled.user_id:
raise errors.AccessForbiddenError()
magictoken = randbytes(8)
form_data = filled.data
form_data['page_no'] = filled.page_no
session.add_magictoken(magictoken, form_data)
filled.remove_self()
return redirect('./?mt=%s' % magictoken)
class RootDirectory(AccessControlled, Directory):
_q_exports = ['']
category = None
def __init__(self, category = None):
self.category = category
get_publisher().substitutions.feed(category)
def _q_access(self):
if self.category:
response = get_response()
response.breadcrumb.append( ('%s/' % self.category.url_name, self.category.name ) )
def _q_index [html] (self):
template.html_top(default_org = _('Forms'))
session = get_session()
request = get_request()
user = request.user
home_cfg = get_cfg('home', {})
if not self.category:
get_logger().info('home page')
if user:
message = TextsDirectory.get_html_text('welcome-logged')
else:
message = TextsDirectory.get_html_text('welcome-unlogged')
if message:
'<div id="welcome-message">'
message
'</div>'
list_forms = []
advertised_forms = []
if self.category:
formdefs = FormDef.select(lambda x: (
x.category_id == self.category.id and (not x.is_disabled() or x.disabled_redirection)),
order_by = 'name')
else:
formdefs = FormDef.select(lambda x: not x.is_disabled() or x.disabled_redirection, order_by = 'name',
ignore_errors=True)
for formdef in formdefs:
if formdef.roles:
if not user:
if formdef.always_advertise:
advertised_forms.append(formdef)
continue
if logged_users_role().id not in formdef.roles:
for q in user.roles or []:
if q in formdef.roles:
break
else:
if formdef.always_advertise:
advertised_forms.append(formdef)
continue
list_forms.append(formdef)
user_forms = []
if user:
for formdef in list_forms:
if not formdef.is_disabled():
user_forms.extend(get_user_forms(formdef))
user_forms = [x for x in user_forms if x.formdef.is_user_allowed_read(user, x)]
user_forms.sort(lambda x,y: cmp(x.receipt_time, y.receipt_time))
if self.category:
self.form_list(list_forms, category = self.category,
session = session, user_forms = user_forms,
advertised_forms = advertised_forms)
else:
cats = Category.select()
Category.sort_by_position(cats)
one = False
for c in cats:
l2 = [x for x in list_forms if x.category_id == c.id]
l2_advertise = [x for x in advertised_forms if x.category_id == c.id]
if l2 or l2_advertise:
self.form_list(l2, category = c,
session = session, user_forms = user_forms,
advertised_forms = l2_advertise)
one = True
l2 = [x for x in list_forms if not x.category]
l2_advertise = [x for x in advertised_forms if not x.category]
if l2 or l2_advertise:
if one:
title = _('Misc')
else:
title = None
self.form_list(l2, title = title,
session = session, user_forms = user_forms,
advertised_forms = l2_advertise)
root_url = get_publisher().get_root_url()
if user:
self.user_forms(user_forms)
if not self.category:
'<p id="logout">'
if user.can_go_in_backoffice():
'<a href="%sbackoffice/">%s</a> ' % (root_url, _('Back Office'))
if user.anonymous:
if not get_cfg('misc', {}).get('do-not-token', False):
'<a href="%stoken">%s</a> - ' % (root_url, _('Enter Identification Token'))
if get_cfg('saml_identities', {}).get('creation', 'admin') != 'admin':
'<a href="%sregister">%s</a> - ' % (root_url, _('Register'))
'<a href="%slogout">%s</a></p>' % (root_url, _('Logout'))
elif get_cfg('sp') or get_cfg('identification', {}).get('methods'):
if not self.category:
'<p id="login"><a href="%slogin">%s</a>' % (root_url, _('Login'))
identities_cfg = get_cfg('identities', {})
if identities_cfg.get('creation') in ('self', 'moderated'):
' - <a href="%sregister">%s</a>' % (root_url, _('Register'))
'</p>'
def user_forms [html] (self, user_forms):
draft = [x for x in user_forms if x.status == 'draft']
if draft:
'<h2 id="drafts">%s</h2>' % _('Your Current Drafts')
'<ul>'
for f in draft:
'<li><a href="%s/%s">%s</a></li>' % (
f.formdef.url_name, f.id, f.formdef.name)
'</ul>'
# with workflows
workflows = [Workflow.get_default_workflow()] + Workflow.select(order_by = 'name')
for workflow in workflows:
# XXX: seperate endpoints from non-endpoints
for status in workflow.possible_status:
fms = [x for x in user_forms if \
not x.formdef.private_status_and_history and \
x.formdef.workflow.id == workflow.id and \
(x.get_visible_status() == status)]
if not fms:
continue
'<h2>%s</h2>' % _('Your forms with status "%s"') % status.name
'<ul>'
for f in fms:
'<li><a href="%s/%s/">%s</a>, %s</li>' % (
f.formdef.url_name, f.id, f.formdef.name,
misc.localstrftime(f.receipt_time))
'</ul>'
def form_list [html] (self, list, category = None, title = None,
session = None, user_forms = None, advertised_forms = []):
if title:
'<h2>%s</h2>' % title
elif category:
'<h2>%s</h2>' % category.name
formdefs_data = None
if self.category:
url_prefix = ''
if self.category.description:
if self.category.description[0] == '<':
htmltext(self.category.description)
else:
'<p>'
self.category.description
'</p>'
elif category:
url_prefix = '%s/' % category.url_name
else:
url_prefix = ''
'<ul class="catforms">'
for formdef in list:
if formdef.only_allow_one and user_forms:
if formdefs_data is None:
formdefs_data = [x.formdef.id for x in user_forms
if x.formdef.only_allow_one and x.status != 'draft']
if formdefs_data and formdef.id in formdefs_data:
# form has already been completed
'<li>%s (%s, <a href="%s%s/">%s</a>)' % (
formdef.name, _('already completed'),
url_prefix, formdef.url_name, _('review'))
else:
'<li><a href="%s%s/">%s</a>' % (url_prefix, formdef.url_name, formdef.name)
if formdef.acl_read == 'all':
' <a class="listing" href="%s%s/listing">%s</a>' % (
url_prefix, formdef.url_name, _('(listing)'))
'</li>'
for formdef in advertised_forms:
'<li>'
'<a href="%s%s/">%s</a>' % (url_prefix, formdef.url_name, formdef.name)
' (%s)</li>' % _('authentication required')
'</ul>'
def _q_lookup(self, component):
return FormPage(component)
class PublicFormStatusPage(FormStatusPage):
_q_exports = ['', 'download', 'status', 'wfedit']
def status(self):
return redirect('%sbackoffice/%s/%s/' % (
get_publisher().get_root_url(),
self.formdef.url_name,
str(self.filled.id)))
def form_status_buttons [html] (self):
if not get_response().iframe_mode:
'<div class="back-home-button">'
'<a href="%s">%s</a>' % (get_publisher().get_root_url(), _('Back Home'))
'</div>'
def wfedit(self):
wf_status = self.filled.get_workflow_status()
for item in wf_status.items:
if not isinstance(item, EditableWorkflowStatusItem):
continue
if item.check_auth(self.filled, get_request().user):
f = FormPage(self.formdef.url_name)
get_response().breadcrumb = get_response().breadcrumb[:-1]
get_response().breadcrumb.append( ('%s/' % self.filled.id, str(self.filled.id)) )
get_response().breadcrumb.append( ('wfedit', _('Edit')) )
return f._q_index(editing = self.filled)
raise errors.AccessForbiddenError()
TextsDirectory.register('welcome-logged',
N_('Welcome text on home page for logged users'))
TextsDirectory.register('welcome-unlogged',
N_('Welcome text on home page for unlogged users'))
TextsDirectory.register('form-recorded',
N_('Message when a form has been recorded'),
hint = N_('Available variables: date, number'),
category = N_('Forms'),
default = N_('The form has been recorded on [date] with the number [number].'))
TextsDirectory.register('form-recorded-allow-one',
N_('Message when a form has been recorded, and the form is set to only allow one per user'),
hint = N_('Available variable: date'),
category = N_('Forms'),
default = N_('The form has been recorded on [date].'))
TextsDirectory.register('check-before-submit',
N_('Message when a form is displayed before validation'),
category = N_('Forms'),
default = N_('Check values then click submit.'))