wcs/wcs/backoffice/management.py

2616 lines
113 KiB
Python

# w.c.s. - web application for online forms
# Copyright (C) 2005-2015 Entr'ouvert
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import csv
import cStringIO
import datetime
import json
import re
import time
import urllib
import vobject
try:
import xlwt
except ImportError:
xlwt = None
from quixote import get_session, get_publisher, get_request, get_response, redirect
from quixote.directory import Directory
from quixote.html import TemplateIO, htmltext, htmlescape
from qommon import _, ngettext, ezt
from qommon.admin.emails import EmailsDirectory
from qommon.admin.menu import command_icon
from qommon.backoffice.menu import html_top
from qommon.backoffice.listing import pagination_links
from qommon import misc, get_logger
from qommon.evalutils import make_datetime
from qommon.misc import C_, ellipsize
from qommon.afterjobs import AfterJob
from qommon import emails
import qommon.sms
from qommon import errors
from qommon import ods
from qommon.form import *
from qommon.storage import (Equal, NotEqual, LessOrEqual, GreaterOrEqual, Or,
Intersects, ILike, FtsMatch, Contains, Null)
from wcs.api_utils import get_user_from_api_query_string
from wcs.conditions import Condition
from wcs.forms.backoffice import FormDefUI
from wcs.forms.common import FormStatusPage
from wcs.admin.settings import UserFieldsFormDef
from wcs.categories import Category
from wcs.formdata import FormData
from wcs.formdef import FormDef
from wcs.roles import logged_users_role, Role
from wcs.workflows import get_role_translation, template_on_formdata
from .submission import FormFillPage
def geojson_formdatas(formdatas, geoloc_key='base', fields=None):
geojson = {
'type': 'FeatureCollection',
'features': []
}
for formdata in formdatas:
if not formdata.geolocations or not geoloc_key in formdata.geolocations:
continue
coords = misc.normalize_geolocation(formdata.geolocations[geoloc_key])
status = formdata.get_status()
try:
status_colour = status.colour
except AttributeError:
status_colour = 'ffffff'
display_fields = []
formdata_backoffice_url = formdata.get_url(backoffice=True)
if fields:
for field in fields:
if field.type == 'map':
continue
html_value = formdata.get_field_view_value(field, max_length=60)
html_value = html_value.replace('[download]', '%sdownload' % formdata_backoffice_url)
value = formdata.get_field_view_value(field)
if not html_value and not value:
continue
display_fields.append({
'varname': field.varname,
'label': field.label,
'value': str(value),
'html_value': str(htmlescape(html_value))
})
feature = {
'type': 'Feature',
'properties': {
'name': str(htmlescape(formdata.get_display_name())),
'url': formdata_backoffice_url,
'status_name': str(htmlescape(status.name)),
'status_colour': '#%s' % status_colour,
'display_fields': display_fields,
'view_label': _('View'),
},
'geometry': {
'type': 'Point',
'coordinates': [coords['lon'], coords['lat']],
}
}
geojson['features'].append(feature)
return geojson
class SendCodeFormdefDirectory(Directory):
formdef = None
def __init__(self, formdef):
self.formdef = formdef
def _q_lookup(self, formdata_id):
html_top('management', _('Management'))
formdata = self.formdef.data_class().get(formdata_id)
submitter_email = formdata.formdef.get_submitter_email(formdata)
mail_subject = EmailsDirectory.get_subject('tracking-code-reminder')
mail_body = EmailsDirectory.get_body('tracking-code-reminder')
form = Form()
form.add(TextWidget, 'text', title=_('Message'), required=True,
cols=60, rows=5, value=mail_body)
form.add(EmailWidget, 'email', title=_('Email'), required=False,
value=submitter_email)
sms_class = None
if get_publisher().use_sms_feature:
sms_cfg = get_cfg('sms', {})
mode = sms_cfg.get('mode', 'none')
sms_class = qommon.sms.SMS.get_sms_class(mode)
if sms_class:
form.add(StringWidget, 'sms', title=_('SMS Number'), required=False)
form.add(RadiobuttonsWidget, 'method',
options=[('email', _('Email')),
('sms', _('SMS'))],
value='email',
required=True)
form.add_submit('submit', _('Send'))
form.add_submit('cancel', _('Cancel'))
if not form.is_submitted() or form.has_errors():
r = TemplateIO(html=True)
r += htmltext('<h2>%s</h2>') % _('Send tracking code')
r += form.render()
return r.getvalue()
if not formdata.tracking_code:
tracking_code = get_publisher().tracking_code_class()
tracking_code.formdata = formdata # this stores both objects
message = form.get_widget('text').parse()
data = {
'form_tracking_code': formdata.tracking_code,
'tracking_code': formdata.tracking_code,
'email': form.get_widget('email').parse(),
}
data.update(self.formdef.get_substitution_variables(minimal=True))
if sms_class and form.get_widget('method').parse() == 'sms':
# send sms
sitename = get_cfg('misc', {}).get('sitename') or 'w.c.s.'
sender = sms_cfg.get('sender', sitename)[:11]
message = Template(message).render(data)
try:
sms_class.send(sender, [form.get_widget('sms').parse()], message)
except errors.SMSError, e:
get_logger().error(e)
get_session().message = ('info', _('SMS with tracking code sent to the user'))
else:
# send mail
emails.template_email(mail_subject, message,
mail_body_data=data,
email_rcpt=form.get_widget('email').parse())
get_session().message = ('info', _('Email with tracking code sent to the user'))
return redirect('../..')
class SendCodeDirectory(Directory):
def _q_lookup(self, component):
try:
formdef = FormDef.get_by_urlname(component)
if not formdef.enable_tracking_codes:
raise errors.TraversalError()
return SendCodeFormdefDirectory(formdef)
except KeyError:
raise errors.TraversalError()
class UserViewDirectory(Directory):
_q_exports = ['', 'sendcode']
sendcode = SendCodeDirectory()
user = None
def __init__(self, user):
self.user = user
def _q_index(self):
get_response().breadcrumb.append(('%s/' % self.user.id, self.user.display_name))
html_top('management', _('Management'))
# display list of open formdata for the user
formdefs = [x for x in FormDef.select(lightweight=True) if not x.skip_from_360_view]
user_roles = set([logged_users_role().id] + (get_request().user.roles or []))
criterias = [Equal('is_at_endpoint', False),
Equal('user_id', str(self.user.id)),
Contains('formdef_id', [x.id for x in formdefs]),
]
from wcs import sql
formdatas = sql.AnyFormData.select(criterias, order_by='receipt_time')
criterias = [Equal('is_at_endpoint', False),
Equal('user_id', str(self.user.id)),
Intersects('concerned_roles_array', user_roles),
]
viewable_formdatas = sql.AnyFormData.select(criterias)
viewable_formdatas_ids = {}
for viewable_formdata in viewable_formdatas:
viewable_formdatas_ids[(viewable_formdata.formdef.id, viewable_formdata.id)] = True
r = TemplateIO(html=True)
r += get_session().display_message()
r += htmltext('<div class="bo-block">')
r += htmltext('<h2>%s</h2>') % self.user.display_name
formdef = UserFieldsFormDef()
r += htmltext('<div class="form">')
for field in formdef.get_all_fields():
if not hasattr(field, str('get_view_value')):
continue
value = self.user.form_data.get(field.id)
if not value:
continue
r += htmltext('<div class="title">')
r += field.label
r += htmltext('</div>')
r += htmltext('<div class="StringWidget content">')
r += field.get_view_value(value)
r += htmltext('</div>')
r += htmltext('</div>')
r += htmltext('</div>')
if formdatas:
categories = {}
formdata_by_category = {}
for formdata in formdatas:
if not formdata.formdef.category_id in categories:
categories[formdata.formdef.category_id] = formdata.formdef.category
formdata_by_category[formdata.formdef.category_id] = []
formdata_by_category[formdata.formdef.category_id].append(formdata)
cats = categories.values()
Category.sort_by_position(cats)
for cat in cats:
r += htmltext('<div class="bo-block">')
if len(cats) > 1:
if cat is None:
r += htmltext('<h2>%s</h2>') % _('Misc')
cat_formdatas = formdata_by_category[None]
else:
r += htmltext('<h2>%s</h2>') % cat.name
cat_formdatas = formdata_by_category[cat.id]
else:
cat_formdatas = formdatas
r += htmltext('<ul class="biglist c-360-user-view">')
for formdata in cat_formdatas:
status_label = formdata.get_status_label()
submit_date = misc.strftime(
misc.date_format(), formdata.receipt_time)
formdata_key_id = (formdata.formdef.id, formdata.id)
if formdata_key_id in viewable_formdatas_ids:
r += htmltext('<li><a href="%s">%s, '
'<span class="datetime">%s</span> '
'<span class="status">(%s)</span></a>' % (
formdata.get_url(backoffice=True),
formdata.formdef.name,
submit_date, status_label))
else:
r += htmltext('<li><span>%s, '
'<span class="datetime">%s</span> '
'<span class="status">(%s)</span></span>' % (
formdata.formdef.name,
submit_date, status_label))
if formdata.formdef.enable_tracking_codes:
r += htmltext('<p class="commands">')
r += command_icon('sendcode/%s/%s' %
(formdata.formdef.url_name, formdata.id),
'export',
label=_('Send tracking code'),
popup=True)
r += htmltext('</p>')
r += htmltext('</ul>')
r += htmltext('</div>')
r += htmltext('</div>')
return r.getvalue()
class UsersViewDirectory(Directory):
_q_exports = ['']
def _q_traverse(self, path):
if not get_publisher().is_using_postgresql():
raise errors.TraversalError()
get_response().breadcrumb.append(('users', _('Per User View')))
return super(UsersViewDirectory, self)._q_traverse(path)
def get_search_sidebar(self, offset=None, limit=None, order_by=None):
get_response().add_javascript(['jquery.js', 'jquery-ui.js', 'wcs.listing.js'])
r = TemplateIO(html=True)
r += htmltext('<form id="listing-settings" action=".">')
if offset or limit:
if not offset:
offset = 0
r += htmltext('<input type="hidden" name="offset" value="%s"/>') % offset
if limit:
r += htmltext('<input type="hidden" name="limit" value="%s"/>') % limit
if order_by is None:
order_by = ''
r += htmltext('<input type="hidden" name="order_by" value="%s"/>') % order_by
r += htmltext('<h3>%s</h3>') % _('Search')
if get_request().form.get('q'):
q = get_request().form.get('q')
if type(q) is not unicode:
q = unicode(q, get_publisher().site_charset)
r += htmltext('<input name="q" value="%s">') % q.encode(get_publisher().site_charset)
else:
r += htmltext('<input name="q">')
r += htmltext('<input type="submit" value="%s"/>') % _('Search')
return r.getvalue()
def _q_index(self):
html_top('management', _('Management'))
r = TemplateIO(html=True)
limit = int(get_request().form.get('limit',
get_publisher().get_site_option('default-page-size')) or 20)
offset = int(get_request().form.get('offset', 0))
order_by = get_request().form.get('order_by', None) or 'name'
query = get_request().form.get('q')
get_response().filter['sidebar'] = self.get_search_sidebar(
limit=limit, offset=offset, order_by=order_by)
if not query:
r += htmltext('<div id="listing">')
r += htmltext('<div class="big-msg-info">')
r += htmltext('<p>%s</p>') % _('Use the search field on the right '
'to look for an user.')
r += htmltext('</div>')
r += htmltext('</div>')
if get_request().form.get('ajax') == 'true':
get_response().filter = {'raw': True}
return r.getvalue()
formdef = UserFieldsFormDef()
criteria_fields = [
ILike('name', query),
ILike('ascii_name', misc.simplify(query, ' ')),
ILike('email', query)]
for field in formdef.get_all_fields():
if field.type in ('string', 'text', 'email'):
criteria_fields.append(ILike('f%s' % field.id, query))
if get_publisher().is_using_postgresql():
criteria_fields.append(FtsMatch(query))
criterias = [Or(criteria_fields)]
users = get_publisher().user_class.select(order_by=order_by,
clause=criterias, limit=limit, offset=offset)
total_count = get_publisher().user_class.count(criterias)
users_cfg = get_cfg('users', {})
include_name_column = not(users_cfg.get('field_name'))
include_email_column = not(users_cfg.get('field_email'))
r += htmltext('<div id="listing">')
r += htmltext('<table class="main">')
r += htmltext('<thead>')
r += htmltext('<tr>')
if include_name_column:
r += htmltext('<th data-field-sort-key="name"><span>%s</span></th>') % _('Name')
if include_email_column:
r += htmltext('<th data-field-sort-key="email"><span>%s</span></th>') % _('Email')
for field in formdef.get_all_fields():
if field.in_listing:
r += htmltext('<th data-field-sort-key="f%s"><span>%s</span></th>') % (
field.id, field.label)
r += htmltext('</tr>')
r += htmltext('</thead>')
r += htmltext('<tbody>')
for user in users:
r += htmltext('<tr data-link="%s/">') % user.id
if include_name_column:
r += htmltext('<td>%s</td>') % (user.name or '')
if include_email_column:
r += htmltext('<td>%s</td>') % (user.email or '')
for field in formdef.get_all_fields():
if field.in_listing:
r += htmltext('<td>%s</td>') % (user.form_data.get(field.id) or '')
r += htmltext('</tr>')
r += htmltext('</tbody>')
r += htmltext('</table>')
if get_publisher().is_using_postgresql():
r += pagination_links(offset, limit, total_count)
r += htmltext('</div>')
if get_request().form.get('ajax') == 'true':
get_response().filter = {'raw': True}
return r.getvalue()
return r.getvalue()
def _q_lookup(self, component):
try:
user = get_publisher().user_class.get(component)
return UserViewDirectory(user)
except KeyError:
pass
raise errors.TraversalError()
class ManagementDirectory(Directory):
_q_exports = ['', 'forms', 'listing', 'statistics', 'lookup', 'count',
'users', 'geojson', 'map']
users = UsersViewDirectory()
def is_accessible(self, user):
return user.can_go_in_backoffice()
def _q_traverse(self, path):
get_response().breadcrumb.append(('management/', _('Management')))
return super(ManagementDirectory, self)._q_traverse(path)
def _q_index(self):
if get_publisher().has_site_option('default-to-global-view'):
return redirect('listing')
else:
return redirect('forms')
def forms(self):
html_top('management', _('Management'))
formdefs = FormDef.select(order_by='name', ignore_errors=True, lightweight=True)
if len(formdefs) == 0:
return self.empty_site_message(_('Forms'))
get_response().filter['sidebar'] = self.get_sidebar(formdefs)
r = TemplateIO(html=True)
r += get_session().display_message()
user = get_request().user
user_roles = [logged_users_role().id] + (user.roles or [])
forms_without_pending_stuff = []
forms_with_pending_stuff = []
using_postgresql = get_publisher().is_using_postgresql()
if using_postgresql:
from wcs import sql
actionable_counts = sql.get_actionable_counts(user_roles)
total_counts = sql.get_total_counts(user_roles)
def append_form_entry(formdef):
if using_postgresql:
count_forms = total_counts.get(formdef.id) or 0
waiting_forms_count = actionable_counts.get(formdef.id) or 0
else:
formdef_data_class = formdef.data_class()
count_forms = formdef_data_class.count() - len(formdef_data_class.get_ids_with_indexed_value('status', 'draft'))
waiting_forms_count = formdef_data_class.get_actionable_count(user_roles)
if waiting_forms_count == 0:
forms_without_pending_stuff.append((formdef, waiting_forms_count, count_forms))
else:
forms_with_pending_stuff.append((formdef, waiting_forms_count, count_forms))
if user:
for formdef in formdefs:
if user.is_admin or formdef.is_of_concern_for_user(user):
append_form_entry(formdef)
def top_action_links(r):
r += htmltext('<span class="actions">')
if get_publisher().is_using_postgresql() and \
get_publisher().get_site_option('postgresql_views') != 'false':
r += htmltext('<a href="listing">%s</a>') % _('Global View')
if get_publisher().has_site_option('per-user-view'):
r += htmltext(' <a href="users/">%s</a>') % _('Per User View')
for formdef in formdefs:
if formdef.geolocations:
r += htmltext(' <a href="map">%s</a>') % _('Map View')
break
r += htmltext('</span>')
if forms_with_pending_stuff:
r += htmltext('<div id="appbar">')
r += htmltext('<h2>%s</h2>') % _('Forms in your care')
top_action_links(r)
r += htmltext('</div>')
r += htmltext('<div class="bo-block" id="forms-in-your-care">')
r += self.display_forms(forms_with_pending_stuff)
r += htmltext('</div>')
if forms_without_pending_stuff:
r += htmltext('<div id="appbar">')
r += htmltext('<h2>%s</h2>') % _('Other Forms')
if not forms_without_pending_stuff:
top_action_links(r)
r += htmltext('</div>')
r += htmltext('<div class="bo-block" id="other-forms">')
r += self.display_forms(forms_without_pending_stuff)
r += htmltext('</div>')
if not (forms_with_pending_stuff or forms_without_pending_stuff):
r += htmltext('<div id="appbar">')
r += htmltext('<h2>%s</h2>') % _('Forms')
top_action_links(r)
r += htmltext('</div>')
return r.getvalue()
def get_sidebar(self, formdefs):
r = TemplateIO(html=True)
r += self.get_lookup_sidebox()
r += htmltext('<div class="bo-block">')
r += htmltext('<ul id="sidebar-actions">')
r += htmltext('<li class="stats"><a href="statistics">%s</a></li>') % _('Global statistics')
r += htmltext('</ul>')
r += htmltext('</div>')
return r.getvalue()
def lookup(self):
query = get_request().form.get('query', '').strip()
if get_publisher().is_using_postgresql():
from wcs import sql
formdatas = sql.AnyFormData.select([Equal('id_display', query)])
if formdatas:
return redirect(formdatas[0].get_url(backoffice=True))
if any((x for x in FormDef.select(lightweight=True) if x.enable_tracking_codes)):
try:
tracking_code = get_publisher().tracking_code_class.get(query)
formdata = tracking_code.formdata
get_session().mark_anonymous_formdata(formdata)
return redirect(formdata.get_url(backoffice=True))
except KeyError:
pass
if re.match(r'^\d+-\d{1,10}$', query):
formdef_id, formdata_id = query.split('-')
try:
formdef = FormDef.get(formdef_id)
formdata = formdef.data_class().get(formdata_id)
except KeyError:
pass
else:
return redirect(formdata.get_url(backoffice=True))
get_session().message = ('error', _('No such tracking code or identifier.'))
return redirect(get_request().form.get('back') or '.')
def get_lookup_sidebox(self, back_place=''):
r = TemplateIO(html=True)
r += htmltext('<div id="lookup-box">')
r += htmltext('<h3>%s</h3>' % _('Look up by tracking code or identifier'))
r += htmltext('<form action="lookup">')
r += htmltext('<input type="hidden" name="back" value="%s"/>') % back_place
r += htmltext('<input class="inline-input" size="12" name="query"/>')
r += htmltext('<button>%s</button>') % _('Look up')
r += htmltext('</form>')
r += htmltext('</div>')
return r.getvalue()
def get_global_listing_sidebar(self, limit=None, offset=None, order_by=None):
get_response().add_javascript(['jquery.js'])
DateWidget.prepare_javascript()
form = Form(use_tokens=False, id='listing-settings')
form.add(SingleSelectWidget, 'status', title=_('Status'),
options=[
('waiting', _('Waiting for an action'), 'waiting'),
('open', C_('formdata|Open'), 'open'),
('done', _('Done'), 'done'),
('all', _('All'), 'all')])
form.add(DateWidget, 'start', title=_('Start Date'))
form.add(DateWidget, 'end', title=_('End Date'))
categories = Category.select()
if categories:
Category.sort_by_position(categories)
category_options = [(None, C_('categories|All'), '')] + [(x.id, x.name, x.id) for x in categories]
form.add(SingleSelectWidget, 'category_id',
title=_('Category'),
options=category_options)
if bool(get_publisher().get_site_option('welco_url', 'variables')):
form.add(SingleSelectWidget, 'submission_channel',
title=_('Channel'),
options=[(None, C_('channel|All'), '')] +
[(x, y, x) for x, y in FormData.get_submission_channels().items()])
form.add(StringWidget, 'q', title=_('Text'))
if not offset:
offset = 0
if not limit:
limit = int(get_publisher().get_site_option('default-page-size') or 20)
if not order_by:
order_by = get_publisher().get_site_option('default-sort-order') or '-receipt_time'
form.add_hidden('offset', offset)
form.add_hidden('limit', limit)
form.add_hidden('order_by', order_by)
form.add_submit('submit', _('Submit'))
r = TemplateIO(html=True)
r += self.get_lookup_sidebox('listing')
r += htmltext('<div>')
r += htmltext('<h3>%s</h3>') % _('Filters')
r += form.render()
r += htmltext('</div>')
return r.getvalue()
def get_stats_sidebar(self):
get_response().add_javascript(['jquery.js'])
DateWidget.prepare_javascript()
form = Form(use_tokens=False)
form.add(DateWidget, 'start', title=_('Start Date'))
form.add(DateWidget, 'end', title=_('End Date'))
form.add_submit('submit', _('Submit'))
r = TemplateIO(html=True)
r += htmltext('<h3>%s</h3>') % _('Period')
r += form.render()
r += htmltext('<h3>%s</h3>') % _('Shortcuts')
r += htmltext('<ul>') # presets
current_month_start = datetime.datetime.now().replace(day=1)
start = current_month_start.strftime(misc.date_format())
r += htmltext(' <li><a href="?start=%s">%s</a>') % (start, _('Current Month'))
previous_month_start = current_month_start - datetime.timedelta(days=2)
previous_month_start = previous_month_start.replace(day=1)
start = previous_month_start.strftime(misc.date_format())
end = current_month_start.strftime(misc.date_format())
r += htmltext(' <li><a href="?start=%s&end=%s">%s</a>') % (
start, end, _('Previous Month'))
current_year_start = datetime.datetime.now().replace(month=1, day=1)
start = current_year_start.strftime(misc.date_format())
r += htmltext(' <li><a href="?start=%s">%s</a>') % (start, _('Current Year'))
previous_year_start = current_year_start.replace(year=current_year_start.year-1)
start = previous_year_start.strftime(misc.date_format())
end = current_year_start.strftime(misc.date_format())
r += htmltext(' <li><a href="?start=%s&end=%s">%s</a>') % (
start, end, _('Previous Year'))
return r.getvalue()
def statistics(self):
html_top('management', _('Global statistics'))
get_response().breadcrumb.append(('statistics', _('Global statistics')))
get_response().filter['sidebar'] = self.get_stats_sidebar()
r = TemplateIO(html=True)
r += htmltext('<h2>%s</h2>') % _('Global statistics')
formdefs = FormDef.select(order_by='name', ignore_errors=True)
counts = {}
parsed_values = {}
criterias = get_global_criteria(get_request(), parsed_values)
for formdef in formdefs:
values = formdef.data_class().select(criterias)
counts[formdef.id] = len(values)
do_graphs = False
if get_publisher().is_using_postgresql() and \
get_publisher().get_site_option('postgresql_views') != 'false':
do_graphs = True
r += htmltext('<p>%s %s</p>') % (_('Total count:'), sum(counts.values()))
if do_graphs:
r += htmltext('<div class="splitcontent-left">')
cats = Category.select()
for cat in cats:
category_formdefs = [x for x in formdefs if x.category_id == cat.id]
r += self.category_global_stats(cat.name, category_formdefs, counts)
category_formdefs = [x for x in formdefs if x.category_id is None]
r += self.category_global_stats(_('Misc'), category_formdefs, counts)
if do_graphs:
r += htmltext('</div>')
r += htmltext('<div class="splitcontent-right">')
period_start = parsed_values.get('period_start')
period_end = parsed_values.get('period_end')
r += do_graphs_section(period_start, period_end,
criterias=[NotEqual('status', 'draft')])
r += htmltext('</div>')
return r.getvalue()
def category_global_stats(self, title, category_formdefs, counts):
r = TemplateIO(html=True)
category_formdefs_ids = [x.id for x in category_formdefs]
if not category_formdefs:
return
cat_counts = dict([(x, y) for x, y in counts.items() if x in
category_formdefs_ids])
if sum(cat_counts.values()) == 0:
return
r += htmltext('<div class="bo-block">')
r += htmltext('<h3>%s</h3>') % title
r += htmltext('<p>%s %s</p>') % (_('Count:'), sum(cat_counts.values()))
r += htmltext('<ul>')
for category_formdef in category_formdefs:
if not counts.get(category_formdef.id):
continue
r += htmltext('<li>%s %s</li>') % (
_('%s:') % category_formdef.name,
counts.get(category_formdef.id))
r += htmltext('</ul>')
r += htmltext('</div>')
return r.getvalue()
def display_forms(self, forms_list):
r = TemplateIO(html=True)
r += htmltext('<ul class="biglist">')
cats = Category.select(order_by = 'name')
for c in cats + [None]:
if c is None:
l2 = [x for x in forms_list if not x[0].category_id]
cat_name = _('Misc')
else:
l2 = [x for x in forms_list if x[0].category_id == c.id]
cat_name = c.name
if not l2:
continue
r += htmltext('<li><h3>%s</h3></li>') % cat_name
for formdef, no_pending, no_total in l2:
r += htmltext('<li><strong><a href="%s/">%s</a></strong>') % (formdef.url_name, formdef.name)
klass = ''
if no_total:
klass = 'badge'
r += htmltext('<p class="details %s">' % klass)
if no_pending:
r += _('%(pending)s open on %(total)s') % {'pending': no_pending,
'total': no_total}
else:
r += ngettext('%(total)s item', '%(total)s items', no_total) % {'total': no_total}
r += htmltext('</p>')
r += htmltext('</li>')
r += htmltext('</ul>')
return r.getvalue()
def get_global_listing_criterias(self, ignore_user_roles=False):
parsed_values = {}
user_roles = [logged_users_role().id]
if get_request().user and get_request().user.roles:
user_roles.extend(get_request().user.roles)
criterias = get_global_criteria(get_request(), parsed_values)
query_parameters = (get_request().form or {}).copy()
query_parameters.pop('callback', None) # when using jsonp
status = query_parameters.get('status', 'waiting')
if query_parameters.get('waiting') == 'yes':
# compatibility with ?waiting=yes|no parameter, still used in
# the /count endpoint used for indicators
status = 'waiting'
elif query_parameters.get('waiting') == 'no':
status = 'open'
if status == 'waiting':
criterias.append(Equal('is_at_endpoint', False))
if not ignore_user_roles:
criterias.append(Intersects('actions_roles_array', user_roles))
elif status == 'open':
criterias.append(Equal('is_at_endpoint', False))
if not ignore_user_roles:
criterias.append(Intersects('concerned_roles_array', user_roles))
elif status == 'done':
criterias.append(Equal('is_at_endpoint', True))
if not ignore_user_roles:
criterias.append(Intersects('concerned_roles_array', user_roles))
elif status == 'all':
if not ignore_user_roles:
criterias.append(Intersects('concerned_roles_array', user_roles))
if get_request().form.get('submission_channel'):
if get_request().form.get('submission_channel') == 'web':
criterias.append(Null('submission_channel'))
else:
criterias.append(Equal('submission_channel',
get_request().form.get('submission_channel')))
if get_request().form.get('category_id'):
criterias.append(Equal('category_id',
get_request().form.get('category_id')))
if get_request().form.get('q'):
criterias.append(FtsMatch(get_request().form.get('q')))
return criterias
def empty_site_message(self, title):
r = TemplateIO(html=True)
r += htmltext('<div class="top-title">')
r += htmltext('<h2>%s</h2>') % title
r += htmltext('</div>')
r += htmltext('<div class="big-msg-info">')
r += htmltext('<p>%s</p>') % _(
'This site is currently empty. It is required to first add forms.')
r += htmltext('</div>')
return r.getvalue()
def listing(self):
if not get_publisher().is_using_postgresql():
raise errors.TraversalError()
get_response().add_javascript(['jquery.js', 'jquery-ui.js', 'wcs.listing.js'])
from wcs import sql
html_top('management', _('Management'))
if FormDef.count() == 0:
return self.empty_site_message(_('Global View'))
limit = int(get_request().form.get('limit',
get_publisher().get_site_option('default-page-size') or 20))
offset = int(get_request().form.get('offset', 0))
order_by = get_request().form.get('order_by',
get_publisher().get_site_option('default-sort-order') or '-receipt_time')
criterias = self.get_global_listing_criterias()
total_count = sql.AnyFormData.count(criterias)
if offset > total_count:
get_request().form['offset'] = '0'
return redirect('listing?' + urllib.urlencode(get_request().form))
formdatas = sql.AnyFormData.select(criterias,
order_by=order_by, limit=limit, offset=offset)
include_submission_channel = bool(
get_publisher().get_site_option('welco_url', 'variables'))
r = TemplateIO(html=True)
r += htmltext('<table id="listing" class="main">')
r += htmltext('<thead><tr>')
r += htmltext('<th data-field-sort-key="criticality_level"><span></span></th>')
if include_submission_channel:
r += htmltext('<th data-field-sort-key="submission_channel"><span>%s</span></th>') % _('Channel')
r += htmltext('<th data-field-sort-key="formdef_name"><span>%s</span></th>') % _('Form')
r += htmltext('<th><span>%s</span></th>') % _('Reference')
r += htmltext('<th data-field-sort-key="receipt_time"><span>%s</span></th>') % _('Created')
r += htmltext('<th data-field-sort-key="last_update_time"><span>%s</span></th>') % _('Last Modified')
r += htmltext('<th data-field-sort-key="user_name"><span>%s</span></th>') % C_('frontoffice|User')
r += htmltext('<th class="nosort"><span>%s</span></th>') % _('Status')
r += htmltext('</tr></thead>')
r += htmltext('<tbody>')
workflows = {}
visited_objects = get_publisher().get_visited_objects(exclude_user=get_session().user)
for formdata in formdatas:
if not formdata.formdef.workflow_id in workflows:
workflows[formdata.formdef.workflow_id] = formdata.formdef.workflow
classes = ['status-%s-%s' % (formdata.formdef.workflow.id, formdata.status)]
object_key = 'formdata-%s-%s' % (formdata.formdef.url_name, formdata.id)
if object_key in visited_objects:
classes.append('advisory-lock')
if formdata.backoffice_submission:
classes.append('backoffice-submission')
style = ''
try:
level = formdata.get_criticality_level_object()
except IndexError:
pass
else:
classes.append('criticality-level')
style = 'style="border-left-color: #%s;"' % level.colour
r += htmltext('<tr class="%s" data-link="%s">' % (
' '.join(classes),
formdata.get_url(backoffice=True)))
r += htmltext('<td %s></td>' % style) # lock
if include_submission_channel:
r += htmltext('<td>%s</td>') % formdata.get_submission_channel_label()
r += htmltext('<td>%s') % formdata.formdef.name
if formdata.digest:
r += htmltext(' <small>%s</small>') % formdata.digest
r += htmltext('</td>')
r += htmltext('<td><a href="%s">%s</a></td>') % (
formdata.get_url(backoffice=True), formdata.get_display_id())
r += htmltext('<td class="cell-time">%s</td>') % misc.localstrftime(
formdata.receipt_time)
r += htmltext('<td class="cell-time">%s</td>') % misc.localstrftime(
formdata.last_update_time)
try:
value = get_publisher().user_class.get(formdata.user_id).display_name
r += htmltext('<td class="cell-user">%s</td>') % value
except:
r += htmltext('<td class="cell-user cell-no-user">-</td>')
r += htmltext('<td class="cell-status">%s</td>') % formdata.get_status_label()
r += htmltext('</tr>\n')
if workflows:
colours = []
for workflow in workflows.values():
for status in workflow.possible_status:
if status.colour and status.colour != 'FFFFFF':
fg_colour = misc.get_foreground_colour(status.colour)
colours.append((workflow.id, status.id, status.colour, fg_colour))
if colours:
r += htmltext('<style>')
for workflow_id, status_id, bg_colour, fg_colour in colours:
r += htmltext('tr.status-%s-wf-%s td.cell-status { '\
'background-color: #%s !important; color: %s !important; }\n' % (
workflow_id, status_id, bg_colour, fg_colour))
r += htmltext('</style>')
r += htmltext('</tbody></table>')
if (offset > 0) or (total_count > limit > 0):
r += pagination_links(offset, limit, total_count)
if get_request().form.get('ajax') == 'true':
get_response().filter = {'raw': True}
return r.getvalue()
get_response().filter['sidebar'] = self.get_global_listing_sidebar(
limit=limit, offset=offset, order_by=order_by)
rt = TemplateIO(html=True)
rt += htmltext('<div id="appbar">')
rt += htmltext('<h2>%s</h2>') % _('Global View')
rt += htmltext('<span class="actions">')
rt += htmltext('<a href="forms">%s</a>') % _('Forms View')
for formdef in FormDef.select(lightweight=True):
if formdef.geolocations:
rt += htmltext(' <a href="map">%s</a>') % _('Map View')
break
rt += htmltext('</span>')
rt += htmltext('</div>')
rt += get_session().display_message()
rt += r.getvalue()
r = rt
return rt.getvalue()
def count(self):
if not get_publisher().is_using_postgresql():
raise errors.TraversalError()
if FormDef.count() == 0:
return misc.json_response({'count': 0})
from wcs import sql
criterias = self.get_global_listing_criterias()
count = sql.AnyFormData.count(criterias)
return misc.json_response({'count': count})
def geojson(self):
from wcs import sql
criterias = self.get_global_listing_criterias()
formdatas = sql.AnyFormData.select(criterias)
get_response().set_content_type('application/json')
return json.dumps(geojson_formdatas(formdatas))
def map(self):
if not get_publisher().is_using_postgresql():
raise errors.TraversalError()
get_response().add_javascript(['wcs.listing.js', 'qommon.map.js'])
html_top('management', _('Global Map'))
r = TemplateIO(html=True)
get_response().breadcrumb.append(('map', _('Global Map')))
attrs = {
'class': 'qommon-map',
'id': 'backoffice-map',
'data-readonly': True,
'data-geojson-url': '%s/geojson?%s' % (
get_request().get_url(1), get_request().get_query())
}
attrs.update(get_publisher().get_map_attributes())
get_response().filter['sidebar'] = self.get_global_listing_sidebar()
r += htmltext('<h2>%s</h2>') % _('Global Map')
r += htmltext('<div %s></div>' % ' '.join(['%s="%s"' % x for x in attrs.items()]))
return r.getvalue()
def _q_lookup(self, component):
return FormPage(component)
class FormPage(Directory):
_q_exports = ['', 'csv', 'stats', 'xls', 'ods', 'json', 'export', 'map',
'geojson']
def __init__(self, component):
try:
self.formdef = FormDef.get_by_urlname(component)
except KeyError:
raise errors.TraversalError()
get_response().breadcrumb.append( (component + '/', self.formdef.name) )
def check_access(self, api_name=None):
session = get_session()
user = get_request().user
if user is None and get_publisher().user_class.count() == 0:
user = get_publisher().user_class()
user.is_admin = True
if not user:
raise errors.AccessUnauthorizedError()
if not user.is_admin and not self.formdef.is_of_concern_for_user(user):
if session.user:
raise errors.AccessForbiddenError()
else:
raise errors.AccessUnauthorizedError()
def get_formdata_sidebar(self, qs=''):
r = TemplateIO(html=True)
r += htmltext('<ul id="sidebar-actions">')
#' <li><a href="list%s">%s</a></li>' % (qs, _('List of results'))
r += htmltext(' <li><a data-base-href="ods" href="ods%s">%s</a></li>') % (
qs, _('Export a Spreadsheet'))
r += htmltext(' <li><a data-base-href="csv" href="csv%s">%s</a></li>') % (
qs, _('Export as CSV File'))
if xlwt and get_publisher().has_site_option('legacy-excel-export'):
r += htmltext('<li><a data-base-href="xls" href="xls%s">%s</a></li>') % (
qs, _('Excel Export'))
if self.formdef.geolocations:
r += htmltext(' <li><a data-base-href="map" href="map%s">%s</a></li>') % (
qs, _('Plot on a Map'))
r += htmltext(' <li class="stats"><a href="stats">%s</a></li>') % _('Statistics')
r += htmltext('</ul>')
return r.getvalue()
def get_filter_sidebar(self, selected_filter=None, mode='listing'):
r = TemplateIO(html=True)
waitpoint_status = self.formdef.workflow.get_waitpoint_status()
period_fake_fields = [
FakeField('start', 'period-date', _('Start')),
FakeField('end', 'period-date', _('End')),
]
filter_fields = []
for field in period_fake_fields + self.get_formdef_fields():
field.enabled = False
if field.type not in ('item', 'bool', 'items', 'period-date', 'status'):
continue
if field.type == 'status' and not waitpoint_status:
continue
filter_fields.append(field)
if get_request().form:
field.enabled = 'filter-%s' % field.id in get_request().form
else:
if mode == 'listing':
# enable status filter by default
field.enabled = (field.id in ('status',))
elif mode == 'stats':
# enable period filters by default
field.enabled = (field.id in ('start', 'end'))
if field.type in ('item', 'items'):
field.enabled = field.in_filters
r += htmltext('<h3><span>%s</span> <span class="change">(<a id="filter-settings">%s</a>)</span></h3>' % (
_('Filters'), _('change')))
for filter_field in filter_fields:
if not filter_field.enabled:
continue
filter_field_key = 'filter-%s-value' % filter_field.id
filter_field_value = get_request().form.get(filter_field_key)
if filter_field.type == 'status':
r += htmltext('<div class="widget">')
r += htmltext('<div class="title">%s</div>') % _('Status to display')
r += htmltext('<div class="content">')
r += htmltext('<select name="filter">')
filters = [
('waiting', _('Waiting for an action'), None),
('all', _('All'), None),
('pending', C_('formdata|Open'), None),
('done', _('Done'), None),
]
for status in waitpoint_status:
filters.append((status.id, status.name, status.colour))
for filter_id, filter_label, filter_colour in filters:
if filter_id == selected_filter:
selected = ' selected="selected"'
else:
selected = ''
style = ''
if filter_colour and filter_colour != 'FFFFFF':
fg_colour = misc.get_foreground_colour(filter_colour)
style = 'style="background: #%s; color: %s;"' % (
filter_colour, fg_colour)
r += htmltext('<option value="%s"%s %s>' % (filter_id, selected, style))
r += htmltext('%s</option>') % filter_label
r += htmltext('</select>')
r += htmltext('</div>')
r += htmltext('</div>')
elif filter_field.type == 'period-date':
r += DateWidget(filter_field_key, title=filter_field.label,
value=filter_field_value, render_br=False).render()
elif filter_field.type in ('item', 'items'):
filter_field.required = False
options = filter_field.get_options()
if options:
if len(options[0]) == 2:
options = [(x[0], x[1], x[0]) for x in options]
options.insert(0, (None, '', ''))
r += SingleSelectWidget(filter_field_key, title=filter_field.label,
options=options, value=filter_field_value,
render_br=False).render()
else:
# There may be no options because the field is using
# a jsonp data source, or a json source using a
# parametrized URL depending on unavailable variables.
#
# In that case fall back on a string widget.
r += StringWidget(filter_field_key, title=filter_field.label,
value=filter_field_value, render_br=False).render()
elif filter_field.type == 'bool':
options = [(None, '', ''), (True, _('Yes'), 'true'), (False, _('No'), 'false')]
r += SingleSelectWidget(filter_field_key, title=filter_field.label,
options=options, value=filter_field_value,
render_br=False).render()
# field filter dialog content
r += htmltext('<div style="display: none;">')
r += htmltext('<ul id="field-filter">')
for field in filter_fields:
r += htmltext('<li><input type="checkbox" name="filter-%s"') % field.id
if field.enabled:
r += htmltext(' checked="checked"')
r += htmltext(' id="fields-filter-%s"') % field.id
r += htmltext('/>')
r += htmltext('<label for="fields-filter-%s">%s</label>') % (
field.id, misc.ellipsize(field.label, 70))
r += htmltext('</li>')
r += htmltext('</ul>')
r += htmltext('</div>')
return r.getvalue()
def get_fields_sidebar(self, selected_filter, fields, offset=None,
limit=None, order_by=None, columns_settings_label=None):
get_response().add_javascript(['jquery.js', 'jquery-ui.js', 'wcs.listing.js'])
r = TemplateIO(html=True)
r += htmltext('<form id="listing-settings" action=".">')
if offset or limit:
if not offset:
offset = 0
r += htmltext('<input type="hidden" name="offset" value="%s"/>') % offset
if limit:
r += htmltext('<input type="hidden" name="limit" value="%s"/>') % limit
if get_publisher().is_using_postgresql():
if order_by is None:
order_by = get_publisher().get_site_option('default-sort-order') or '-receipt_time'
r += htmltext('<input type="hidden" name="order_by" value="%s"/>') % order_by
if get_publisher().is_using_postgresql():
r += htmltext('<h3>%s</h3>') % _('Search')
if get_request().form.get('q'):
q = get_request().form.get('q')
if type(q) is not unicode:
q = unicode(q, get_publisher().site_charset)
r += htmltext('<input class="inline-input" name="q" value="%s">') % q.encode(get_publisher().site_charset)
else:
r += htmltext('<input class="inline-input" name="q">')
r += htmltext('<input type="submit" class="side-button" value="%s"/>') % _('Search')
r += self.get_filter_sidebar(selected_filter=selected_filter)
r += htmltext('<button class="refresh">%s</button>') % _('Refresh')
if columns_settings_label:
r += htmltext('<button id="columns-settings">%s</button>') % columns_settings_label
# column settings dialog content
r += htmltext('<div style="display: none;">')
r += htmltext('<ul id="columns-filter">')
for field in self.get_formdef_fields():
if not hasattr(field, str('get_view_value')):
continue
r += htmltext('<li><input type="checkbox" name="%s"') % field.id
if field.id in [x.id for x in fields]:
r += htmltext(' checked="checked"')
r += htmltext(' id="fields-column-%s"') % field.id
r += htmltext('/>')
r += htmltext('<label for="fields-column-%s">%s</label>') % (
field.id, misc.ellipsize(field.label, 70))
r += htmltext('</li>')
r += htmltext('</ul>')
r += htmltext('</div>')
r += htmltext('</form>')
return r.getvalue()
def get_formdef_fields(self):
fields = []
fields.append(FakeField('id', 'id', _('Number')))
if get_publisher().get_site_option('welco_url', 'variables'):
fields.append(FakeField('submission_channel', 'submission_channel', _('Channel')))
if self.formdef.backoffice_submission_roles:
fields.append(FakeField('submission_agent', 'submission_agent', _('Submission By')))
fields.append(FakeField('time', 'time', _('Created')))
fields.append(FakeField('last_update_time', 'last_update_time', _('Last Modified')))
fields.append(FakeField('user-label', 'user-label', _('User Label')))
fields.extend(self.formdef.get_all_fields())
fields.append(FakeField('status', 'status', _('Status')))
fields.append(FakeField('anonymised', 'anonymised', _('Anonymised')))
return fields
def get_fields_from_query(self, ignore_form=False):
field_ids = [x for x in get_request().form.keys()]
if not field_ids or ignore_form:
field_ids = ['id', 'time', 'last_update_time', 'user-label']
for field in self.formdef.get_all_fields():
if hasattr(field, str('get_view_value')) and field.in_listing:
field_ids.append(field.id)
field_ids.append('status')
fields = []
for field in self.get_formdef_fields():
if field.id in field_ids:
fields.append(field)
if not fields:
return self.get_fields_from_query(ignore_form=True)
return fields
def get_filter_from_query(self, default='waiting'):
if 'filter' in get_request().form:
return get_request().form['filter']
if self.formdef.workflow.possible_status:
return default
return 'all'
def get_criterias_from_query(self):
period_fake_fields = [
FakeField('start', 'period-date', _('Start')),
FakeField('end', 'period-date', _('End')),
]
filter_fields = []
criterias = []
for filter_field in period_fake_fields + self.get_formdef_fields():
if filter_field.type not in ('item', 'bool', 'items', 'period-date'):
continue
filter_field_key = None
if filter_field.varname:
# if this is a field with a varname and filter-%(varname)s is
# present in the query string, enable this filter.
if get_request().form.get('filter-%s' % filter_field.varname):
filter_field_key = 'filter-%s' % filter_field.varname
if get_request().form.get('filter-%s' % filter_field.id):
# if there's a filter-%(id)s, it is used to enable the actual
# filter, and the value will be found in filter-%s-value.
filter_field_key = 'filter-%s-value' % filter_field.id
if not filter_field_key:
# if there's not known filter key, skip.
continue
filter_field_value = get_request().form.get(filter_field_key)
if not filter_field_value:
continue
if filter_field.id == 'start':
try:
period_start = misc.get_as_datetime(filter_field_value).timetuple()
except ValueError:
pass
else:
criterias.append(GreaterOrEqual('receipt_time', period_start))
criterias[-1]._label = '%s: %s' % (_('Start'), filter_field_value)
elif filter_field.id == 'end':
try:
period_end = misc.get_as_datetime(filter_field_value).timetuple()
except ValueError:
pass
else:
criterias.append(LessOrEqual('receipt_time', period_end))
criterias[-1]._label = '%s: %s' % (_('End'), filter_field_value)
elif filter_field.type in ('item', 'items') and filter_field_value not in (None, 'None'):
if filter_field.type == 'item':
criterias.append(Equal('f%s' % filter_field.id, filter_field_value))
elif filter_field.type == 'items':
criterias.append(Intersects('f%s' % filter_field.id, [filter_field_value]))
field_options = filter_field.get_options()
if field_options and type(field_options[0]) in (list, tuple):
for option in field_options:
if option[0] == filter_field_value or option[-1] == filter_field_value:
filter_field_value = option[1]
break
criterias[-1]._label = '%s: %s' % (filter_field.label, filter_field_value)
elif filter_field.type == 'bool' and filter_field_value not in (None, 'None'):
if filter_field_value == 'true':
criterias.append(Equal('f%s' % filter_field.id, True))
elif filter_field_value == 'false':
criterias.append(Equal('f%s' % filter_field.id, False))
return criterias
def _q_index(self):
self.check_access()
get_logger().info('backoffice - form %s - listing' % self.formdef.name)
fields = self.get_fields_from_query()
selected_filter = self.get_filter_from_query()
criterias = self.get_criterias_from_query()
if get_publisher().is_using_postgresql():
# only enable pagination in SQL mode, as we do not have sorting in
# the other case.
limit = get_request().form.get('limit',
int(get_publisher().get_site_option('default-page-size') or 20))
else:
limit = get_request().form.get('limit', 0)
offset = get_request().form.get('offset', 0)
order_by = get_request().form.get('order_by',
get_publisher().get_site_option('default-sort-order') or '-receipt_time')
query = get_request().form.get('q')
qs = ''
if get_request().get_query():
qs = '?' + get_request().get_query()
table = FormDefUI(self.formdef).listing(fields=fields,
selected_filter=selected_filter,
limit=int(limit), offset=int(offset), query=query,
order_by=order_by, criterias=criterias)
if get_response().status_code == 302:
# catch early redirect
return table
if get_request().form.get('ajax') == 'true':
get_response().filter = {'raw': True}
return table
html_top('management', '%s - %s' % (_('Listing'), self.formdef.name))
r = TemplateIO(html=True)
r += htmltext('<h2>%s - %s</h2>') % (self.formdef.name, _('Listing'))
r += table
get_response().filter['sidebar'] = self.get_formdata_sidebar(qs) + \
self.get_fields_sidebar(selected_filter, fields, limit=limit,
offset=offset, order_by=order_by,
columns_settings_label=_('Columns Settings'))
return r.getvalue()
def csv_tuple_heading(self, fields):
heading_fields = [] # '#id', _('time'), _('userlabel'), _('status')]
for field in fields:
heading_fields.extend(field.get_csv_heading())
return heading_fields
def get_spreadsheet_line(self, fields, data):
elements = []
for field in fields:
element = data.get_field_view_value(field) or ''
display_value = None
if field.store_display_value:
display_value = data.data.get('%s_display' % field.id) or ''
for value in field.get_csv_value(element, display_value=display_value):
elements.append({'field': field, 'value': value, 'native_value': element})
return elements
def csv(self):
self.check_access()
fields = self.get_fields_from_query()
selected_filter = self.get_filter_from_query()
user = get_request().user
query = get_request().form.get('q')
criterias = self.get_criterias_from_query()
class Exporter(object):
def __init__(self, formpage, formdef, fields, selected_filter):
self.formpage = formpage
self.formdef = formdef
self.fields = fields
self.selected_filter = selected_filter
def export(self, job=None):
self.output = cStringIO.StringIO()
csv_output = csv.writer(self.output)
csv_output.writerow(self.formpage.csv_tuple_heading(self.fields))
items, total_count = FormDefUI(self.formdef).get_listing_items(
self.selected_filter, user=user, query=query,
criterias=criterias)
for filled in items:
csv_output.writerow(tuple(
[x['value'] for x in self.formpage.get_spreadsheet_line(self.fields, filled)]))
if job:
job.file_content = self.output.getvalue()
job.content_type = 'text/csv'
job.store()
get_logger().info('backoffice - form %s - listing csv' % self.formdef.name)
count = self.formdef.data_class().count()
exporter = Exporter(self, self.formdef, fields, selected_filter)
if count > 100: # Arbitrary threshold
job = get_response().add_after_job(
str(N_('Exporting forms in CSV')),
exporter.export)
job.file_name = '%s.csv' % self.formdef.url_name
job.store()
return redirect('export?job=%s' % job.id)
else:
exporter.export()
response = get_response()
response.set_content_type('text/plain')
#response.set_header('content-disposition', 'attachment; filename=%s.csv' % self.formdef.url_name)
return exporter.output.getvalue()
def export(self):
self.check_access()
if get_request().form.get('download'):
return self.export_download()
try:
job = AfterJob.get(get_request().form.get('job'))
except KeyError:
return redirect('.')
html_top('management', title=_('Exporting'))
r = TemplateIO(html=True)
r += get_session().display_message()
get_response().add_javascript(['jquery.js', 'afterjob.js'])
r += htmltext('<dl class="job-status">')
r += htmltext('<dt>')
r += _(job.label)
r += htmltext('</dt>')
r += htmltext('<dd>')
r += htmltext('<span class="afterjob" id="%s">') % job.id
r += _(job.status)
r += htmltext('</span>')
r += htmltext('</dd>')
r += htmltext('</dl>')
r += htmltext('<div class="done">')
r += htmltext('<a download="%s" href="export?download=%s">%s</a>') % (
job.file_name, job.id, _('Download Export'))
r += htmltext('</div>')
return r.getvalue()
def export_download(self):
try:
job = AfterJob.get(get_request().form.get('download'))
except KeyError:
return redirect('.')
if not job.status == 'completed':
raise errors.TraversalError()
response = get_response()
response.set_content_type(job.content_type)
response.set_header('content-disposition',
'attachment; filename=%s' % job.file_name)
return job.file_content
def xls(self):
self.check_access()
if xlwt is None or not get_publisher().has_site_option('legacy-excel-export'):
raise errors.TraversalError()
fields = self.get_fields_from_query()
selected_filter = self.get_filter_from_query()
user = get_request().user
query = get_request().form.get('q')
criterias = self.get_criterias_from_query()
class Exporter(object):
def __init__(self, formpage, formdef, fields, selected_filter):
self.formpage = formpage
self.formdef = formdef
self.fields = fields
self.selected_filter = selected_filter
def export(self, job=None):
w = xlwt.Workbook(encoding=get_publisher().site_charset)
ws = w.add_sheet('1')
for i, f in enumerate(self.formpage.csv_tuple_heading(self.fields)):
ws.write(0, i, f)
items, total_count = FormDefUI(self.formdef).get_listing_items(
self.selected_filter, user=user, query=query,
criterias=criterias)
for i, filled in enumerate(items):
for j, item in enumerate(self.formpage.get_spreadsheet_line(fields, filled)):
elem = item['value']
if elem and len(elem) > 32767:
# xls cells have a limit of 32767 characters, cut
# it down.
elem = elem[:32760] + ' [...]'
ws.write(i+1, j, elem)
self.output = cStringIO.StringIO()
w.save(self.output)
if job:
job.file_content = self.output.getvalue()
job.content_type = 'application/vnd.ms-excel'
job.store()
get_logger().info('backoffice - form %s - as excel' % self.formdef.name)
count = self.formdef.data_class().count()
exporter = Exporter(self, self.formdef, fields, selected_filter)
if count > 100: # Arbitrary threshold
job = get_response().add_after_job(
str(N_('Exporting forms in Excel format')),
exporter.export)
job.file_name = '%s.xls' % self.formdef.url_name
job.store()
return redirect('export?job=%s' % job.id)
else:
exporter.export()
response = get_response()
response.set_content_type('application/vnd.ms-excel')
response.set_header('content-disposition', 'attachment; filename=%s.xls' % self.formdef.url_name)
return exporter.output.getvalue()
def ods(self):
self.check_access()
fields = self.get_fields_from_query()
selected_filter = self.get_filter_from_query()
user = get_request().user
query = get_request().form.get('q')
criterias = self.get_criterias_from_query()
class Exporter(object):
def __init__(self, formpage, formdef, fields, selected_filter):
self.formpage = formpage
self.formdef = formdef
self.fields = fields
self.selected_filter = selected_filter
def export(self, job=None):
w = ods.Workbook(encoding=get_publisher().site_charset)
ws = w.add_sheet('1')
for i, f in enumerate(self.formpage.csv_tuple_heading(self.fields)):
ws.write(0, i, f)
items, total_count = FormDefUI(self.formdef).get_listing_items(
self.selected_filter, user=user, query=query,
criterias=criterias)
for i, formdata in enumerate(items):
for j, item in enumerate(self.formpage.get_spreadsheet_line(fields, formdata)):
ws.write(i+1, j, item['value'],
formdata=formdata,
data_field=item['field'],
native_value=item['native_value'])
self.output = cStringIO.StringIO()
w.save(self.output)
if job:
job.file_content = self.output.getvalue()
job.content_type = 'application/vnd.oasis.opendocument.spreadsheet'
job.store()
get_logger().info('backoffice - form %s - as ods' % self.formdef.name)
count = self.formdef.data_class().count()
exporter = Exporter(self, self.formdef, fields, selected_filter)
if count > 100: # Arbitrary threshold
job = get_response().add_after_job(
str(N_('Exporting forms in Open Document format')),
exporter.export)
job.file_name = '%s.ods' % self.formdef.url_name
job.store()
return redirect('export?job=%s' % job.id)
else:
exporter.export()
response = get_response()
response.set_content_type('application/vnd.oasis.opendocument.spreadsheet')
response.set_header('content-disposition', 'attachment; filename=%s.ods' % self.formdef.url_name)
return exporter.output.getvalue()
def json(self):
anonymise = 'anonymise' in get_request().form
self.check_access()
get_response().set_content_type('application/json')
user = get_user_from_api_query_string() or get_request().user if not anonymise else None
selected_filter = self.get_filter_from_query(default='all')
criterias = self.get_criterias_from_query()
order_by = get_request().form.get('order_by', None)
query = get_request().form.get('q') if not anonymise else None
offset = None
if 'offset' in get_request().form:
try:
offset = int(get_request().form['offset'])
except ValueError:
raise errors.RequestError('invalid offset parameter')
limit = None
if 'limit' in get_request().form:
try:
limit = int(get_request().form['limit'])
except ValueError:
raise errors.RequestError('invalid limit parameter')
items, total_count = FormDefUI(self.formdef).get_listing_items(
selected_filter, user=user, query=query, criterias=criterias,
order_by=order_by, anonymise=anonymise, offset=offset, limit=limit)
if get_publisher().is_using_postgresql():
self.formdef.data_class().load_all_evolutions(items)
if get_request().form.get('full') == 'on':
output = [filled.get_json_export_dict(include_files=False, anonymise=anonymise)
for filled in items]
else:
output = [{'id': filled.id,
'url': filled.get_url(),
'receipt_time': filled.receipt_time,
'last_update_time': filled.last_update_time} for filled in items]
return json.dumps(output,
cls=misc.JSONEncoder,
encoding=get_publisher().site_charset)
def geojson(self):
if not self.formdef.geolocations:
raise errors.TraversalError()
if 'anonymise' in get_request().form:
# api/ will let this pass but we don't want that.
raise errors.AccessForbiddenError()
self.check_access()
get_response().set_content_type('application/json')
user = get_user_from_api_query_string() or get_request().user
selected_filter = self.get_filter_from_query()
fields = self.get_fields_from_query()
criterias = self.get_criterias_from_query()
query = get_request().form.get('q')
items, total_count = FormDefUI(self.formdef).get_listing_items(
selected_filter, user=user, query=query, criterias=criterias)
# only consider first key for now
geoloc_key = self.formdef.geolocations.keys()[0]
return json.dumps(geojson_formdatas(items, fields=fields))
def ics(self):
if 'anonymise' in get_request().form:
# api/ will let this pass but we don't want that.
raise errors.AccessForbiddenError()
charset = get_publisher().site_charset
self.check_access('ics')
user = get_user_from_api_query_string('ics') or get_request().user
formdef = self.formdef
selected_filter = self.get_filter_from_query()
fields = self.get_fields_from_query()
criterias = self.get_criterias_from_query()
query = get_request().form.get('q')
class IcsDirectory(Directory):
# ics/<component> with <component> being the identifier (varname)
# of the field to use as start date (may be a date field or a
# string field).
# ics/<component>/<component2> with <component2> as the identifier
# of the field to use as end date (ditto, date or string field)
def _q_traverse(self, path):
if not path[-1]:
# allow trailing slash
path = path[:-1]
start_date_field_varname = path[0]
end_date_field_varname = None
if len(path) == 2:
end_date_field_varname = path[1]
elif len(path) > 2:
raise errors.TraversalError()
start_date_field_id = None
end_date_field_id = None
for field in formdef.get_all_fields():
if getattr(field, 'varname', None) == start_date_field_varname:
start_date_field_id = field.id
if end_date_field_varname and getattr(field, 'varname', None) == end_date_field_varname:
end_date_field_id = field.id
if not start_date_field_id:
raise errors.TraversalError()
if end_date_field_varname and not end_date_field_id:
raise errors.TraversalError()
formdatas, total_count = FormDefUI(formdef).get_listing_items(
selected_filter, user=user, query=query, criterias=criterias)
cal = vobject.iCalendar()
cal.add('prodid').value = '-//Entr\'ouvert//NON SGML Publik'
for formdata in formdatas:
if not formdata.data.get(start_date_field_id):
continue
vevent = vobject.newFromBehavior('vevent')
vevent.add('uid').value = '%s-%s-%s' % (
get_request().get_server().lower(),
formdef.url_name,
formdata.id)
vevent.add('summary').value = unicode(formdata.get_display_name(), charset)
vevent.add('dtstart').value = make_datetime(formdata.data[start_date_field_id])
if end_date_field_id and formdata.data.get(end_date_field_id):
vevent.add('dtend').value = make_datetime(formdata.data[end_date_field_id])
vevent.dtstart.value_param = 'DATE'
backoffice_url = formdata.get_url(backoffice=True)
vevent.add('url').value = backoffice_url
form_name = unicode(formdef.name, charset)
status_name = unicode(formdata.get_status_label(), charset)
description = '%s | %s | %s\n' % (form_name, formdata.get_display_id(), status_name)
description += backoffice_url
# TODO: improve performance by loading all users in one
# single query before the loop
if formdata.user:
description += '\n%s' % unicode(formdata.user.get_display_name(), charset)
vevent.add('description').value = description
cal.add(vevent)
get_response().set_content_type('text/calendar')
return cal.serialize()
return IcsDirectory()
def map(self):
get_response().add_javascript(['qommon.map.js'])
html_top('management', '%s - %s' % (_('Form'), self.formdef.name))
r = TemplateIO(html=True)
get_response().breadcrumb.append(('map', _('Map')))
attrs = {
'class': 'qommon-map',
'id': 'backoffice-map',
'data-readonly': True,
'data-geojson-url': '%s/geojson?%s' % (
get_request().get_url(1), get_request().get_query())
}
attrs.update(get_publisher().get_map_attributes())
fields = self.get_fields_from_query()
selected_filter = self.get_filter_from_query()
get_response().filter['sidebar'] = self.get_fields_sidebar(selected_filter,
fields, columns_settings_label=_('Markers Settings'))
r += htmltext('<h2>%s - %s</h2>') % (self.formdef.name, _('Map'))
r += htmltext('<div %s></div>' % ' '.join(['%s="%s"' % x for x in attrs.items()]))
return r.getvalue()
def get_stats_sidebar(self, selected_filter):
get_response().add_javascript(['jquery.js', 'jquery-ui.js', 'wcs.listing.js'])
r = TemplateIO(html=True)
r += htmltext('<form id="listing-settings" action="stats">')
r += self.get_filter_sidebar(selected_filter=selected_filter, mode='stats')
r += htmltext('<button class="refresh">%s</button>') % _('Refresh')
if misc.can_decorate_as_pdf():
r += htmltext('<button class="pdf">%s</button>') % _('Download PDF')
r += htmltext('</form>')
return r.getvalue()
def stats(self):
self.check_access()
get_logger().info('backoffice - form %s - stats' % self.formdef.name)
html_top('management', '%s - %s' % (_('Form'), self.formdef.name))
r = TemplateIO(html=True)
get_response().breadcrumb.append( ('stats', _('Statistics')) )
selected_filter = self.get_filter_from_query(default='all')
criterias = self.get_criterias_from_query()
get_response().filter['sidebar'] = self.get_formdata_sidebar() + \
self.get_stats_sidebar(selected_filter)
do_graphs = get_publisher().is_using_postgresql()
if selected_filter and selected_filter != 'all':
if selected_filter == 'pending':
applied_filters = ['wf-%s' % x.id for x in
self.formdef.workflow.get_not_endpoint_status()]
criteria_label = _('Status: %s') % _('Pending')
elif selected_filter == 'done':
applied_filters = ['wf-%s' % x.id for x in
self.formdef.workflow.get_endpoint_status()]
criteria_label = _('Status: %s') % _('Done')
else:
criteria_label = _('Status: %s') % self.formdef.workflow.get_status(
selected_filter).name
applied_filters = ['wf-%s' % selected_filter]
criterias.append(Or([Equal('status', x) for x in applied_filters]))
criterias[-1]._label = criteria_label
displayed_criterias = criterias
else:
displayed_criterias = criterias
criterias = [NotEqual('status', 'draft')] + displayed_criterias
values = self.formdef.data_class().select(criterias)
if get_publisher().is_using_postgresql():
# load all evolutions in a single batch, to avoid as many query as
# there are formdata when computing resolution times statistics.
self.formdef.data_class().load_all_evolutions(values)
r += htmltext('<div id="statistics">')
if displayed_criterias:
r += htmltext('<div class="criterias bo-block">')
r += htmltext('<h2>%s</h2>') % _('Filters')
r += htmltext('<ul>')
for criteria in displayed_criterias:
criteria_label = getattr(criteria, '_label', None)
if criteria_label:
r += htmltext('<li>%s</li>') % criteria_label
r += htmltext('</ul></div>')
if do_graphs:
r += htmltext('<div class="splitcontent-left">')
no_forms = len(values)
r += htmltext('<div class="bo-block">')
r += htmltext('<p>%s %d</p>') % (_('Total number of records:'), no_forms)
if self.formdef.workflow:
r += htmltext('<ul>')
for status in self.formdef.workflow.possible_status:
r += htmltext('<li>%s: %d</li>') % (status.name,
len([x for x in values if x.status == 'wf-%s' % status.id]))
r += htmltext('</ul>')
r += htmltext('</div>')
excluded_fields = []
for criteria in displayed_criterias:
if not isinstance(criteria, Equal):
continue
excluded_fields.append(criteria.attribute[1:])
stats_for_fields = self.stats_fields(values,
excluded_fields=excluded_fields)
if stats_for_fields:
r += htmltext('<div class="bo-block">')
r += stats_for_fields
r += htmltext('</div>')
stats_times = self.stats_resolution_time(values)
if stats_times:
r += htmltext('<div class="bo-block">')
r += stats_times
r += htmltext('</div>')
if do_graphs:
r += htmltext('</div>')
r += htmltext('<div class="splitcontent-right">')
criterias.append(Equal('formdef_id', int(self.formdef.id)))
r += do_graphs_section(criterias=criterias)
r += htmltext('</div>')
r += htmltext('</div>') # id="statistics"
if get_request().form.get('ajax') == 'true':
get_response().filter = {'raw': True}
return r.getvalue()
page = TemplateIO(html=True)
page += htmltext('<h2>%s - %s</h2>') % (self.formdef.name, _('Statistics'))
page += htmltext(r)
page += htmltext('<a class="back" href=".">%s</a>') % _('Back')
if 'pdf' in get_request().form:
pdf_content = misc.decorate_as_pdf(page.getvalue())
response = get_response()
response.set_content_type('application/pdf')
return pdf_content
return page.getvalue()
def stats_fields(self, values, excluded_fields=None):
r = TemplateIO(html=True)
had_page = False
last_page = None
last_title = None
for f in self.formdef.get_all_fields():
if excluded_fields and f.id in excluded_fields:
continue
if f.type == 'page':
last_page = f.label
last_title = None
continue
if f.type == 'title':
last_title = f.label
continue
if not f.stats:
continue
t = f.stats(values)
if not t:
continue
if last_page:
if had_page:
r += htmltext('</div>')
r += htmltext('<div class="page">')
r += htmltext('<h3>%s</h3>') % last_page
had_page = True
last_page = None
if last_title:
r += htmltext('<h3>%s</h3>') % last_title
last_title = None
r += t
if had_page:
r += htmltext('</div>')
return r.getvalue()
def stats_resolution_time(self, values):
possible_status = [('wf-%s' % x.id, x.id) for x in self.formdef.workflow.possible_status]
if len(possible_status) < 2:
return
r = TemplateIO(html=True)
r += htmltext('<h2>%s</h2>') % _('Resolution time')
for status, status_id in possible_status:
res_time_forms = [
(time.mktime(x.evolution[-1].time) - time.mktime(x.receipt_time)) \
for x in values if x.status == status and x.evolution]
if not res_time_forms:
continue
res_time_forms.sort()
sum_times = sum(res_time_forms)
len_times = len(res_time_forms)
r += htmltext('<h3>%s</h3>') % (_('To Status "%s"') % self.formdef.workflow.get_status(status_id).name)
r += htmltext('<ul>')
r += htmltext(' <li>%s %s</li>') % (_('Minimum Time:'), format_time(min(res_time_forms)))
r += htmltext(' <li>%s %s</li>') % (_('Maximum Time:'), format_time(max(res_time_forms)))
r += htmltext(' <li>%s %s</li>') % (_('Range:'), format_time(max(res_time_forms)-min(res_time_forms)))
mean = sum_times/len_times
r += htmltext(' <li>%s %s</li>') % (_('Mean:'), format_time(mean))
if len_times % 2:
median = res_time_forms[len_times/2]
else:
midpt = len_times/2
median = (res_time_forms[midpt-1]+res_time_forms[midpt])/2
r += htmltext(' <li>%s %s</li>') % (_('Median:'), format_time(median))
# variance...
x = 0
for t in res_time_forms:
x += (t - mean)**2.0
try:
variance = x/(len_times+1)
except:
variance = 0
# not displayed since in square seconds which is not easy to grasp
from math import sqrt
# and standard deviation
std_dev = sqrt(variance)
r += htmltext(' <li>%s %s</li>') % (_('Standard Deviation:'), format_time(std_dev))
r += htmltext('</ul>')
return r.getvalue()
def _q_lookup(self, component):
if component == 'ics':
return self.ics()
try:
filled = self.formdef.data_class().get(component)
except KeyError:
raise errors.TraversalError()
return FormBackOfficeStatusPage(self.formdef, filled)
class FormBackOfficeStatusPage(FormStatusPage):
_q_exports_orig = ['', 'download', 'json', 'action',
'inspect', ('inspect-tool', 'inspect_tool'),
('user-pending-forms', 'user_pending_forms')]
form_page_class = FormFillPage
def html_top(self, title = None):
return html_top('management', title)
def _q_index(self):
if self.filled.status == 'draft':
if self.filled.backoffice_submission:
for role in get_request().user.roles or []:
if role in self.formdef.backoffice_submission_roles:
return redirect('../../../submission/%s/%s' % (
self.formdef.url_name, self.filled.id))
raise errors.AccessForbiddenError()
get_response().filter['sidebar'] = self.get_sidebar()
return self.status()
def receipt(self, *args, **kwargs):
r = TemplateIO(html=True)
if get_session() and get_session().is_anonymous_submitter(self.filled):
r += htmltext('<div class="infonotice">')
r += _('This form has been accessed via its tracking code, it is '
'therefore displayed like you were also its owner.')
r += htmltext('</div>')
r += super(FormBackOfficeStatusPage, self).receipt(*args, **kwargs)
return r.getvalue()
def get_sidebar(self):
return self.get_extra_context_bar()
def user_pending_forms(self):
self.check_receiver()
get_response().filter = {'raw': True}
response = self.get_user_pending_forms()
# preemptive locking of forms
object_key = 'formdata-%s-%s' % (self.formdef.url_name, self.filled.id)
all_visitors = get_publisher().get_object_visitors(object_key)
visitors = [x for x in all_visitors if x[0] != get_session().user]
me_in_visitors = bool(get_session().user in [x[0] for x in all_visitors])
if not visitors or me_in_visitors:
related_user_forms = getattr(self.filled, 'related_user_forms', None) or []
user_roles = set(get_request().user.roles or [])
for user_formdata in related_user_forms:
if user_roles.intersection(user_formdata.actions_roles):
user_formdata.mark_as_being_visited()
return response
def get_extra_context_bar(self):
formdata = self.filled
r = TemplateIO(html=True)
if not formdata.is_draft():
r += htmltext('<div class="extra-context">')
if (formdata.backoffice_submission and formdata.submission_context and
formdata.submission_context.get('agent_id') == get_request().user.id and
formdata.tracking_code and
time.time() - time.mktime(formdata.receipt_time) < 30*60):
# keep displaying tracking code to submission agent for 30
# minutes after submission
r += htmltext('<h3>%s</h3>') % _('Tracking Code')
r += htmltext('<p>%s</p>') % formdata.tracking_code
r += htmltext('<h3>%s</h3>') % _('General Information')
r += htmltext('<p>')
tm = misc.localstrftime(formdata.receipt_time)
agent_user = None
if formdata.submission_context and 'agent_id' in formdata.submission_context:
agent_user = get_publisher().user_class.get(
formdata.submission_context['agent_id'], ignore_errors=True)
if agent_user:
r += _('The form has been recorded on %(date)s with the number %(number)s by %(agent)s.') % {
'date': tm, 'number': formdata.get_display_id(),
'agent': agent_user.get_display_name()}
else:
r += _('The form has been recorded on %(date)s with the number %(number)s.') % {
'date': tm, 'number': formdata.get_display_id()}
r += htmltext('</p>')
try:
status_colour = formdata.get_status().colour
except AttributeError:
status_colour = None
status_colour = status_colour or 'ffffff'
fg_colour = misc.get_foreground_colour(status_colour)
r += htmltext('<p class="current-status"><span class="item" style="background: #%s; color: %s;"></span>' %
(status_colour, fg_colour))
r += htmltext('<span>%s %s</span></p>') % (_('Status:'), formdata.get_status_label())
if formdata.formdef.workflow.criticality_levels:
try:
level = formdata.get_criticality_level_object()
except IndexError:
pass
else:
r += htmltext('<p class="current-level">')
if level.colour:
r += htmltext('<span class="item" style="background: #%s;"></span>' % level.colour)
r += htmltext('<span>%s %s</span></p>') % (_('Criticality Level:'), level.name)
if formdata.anonymised:
r += htmltext('<div class="infonotice">')
r += htmltext(_('This form has been anonymised on %(date)s.')) % {
'date': formdata.anonymised.strftime(misc.date_format())}
r += htmltext('</div>')
r += htmltext('</div>')
if formdata.submission_context or formdata.submission_channel:
extra_context = formdata.submission_context or {}
r += htmltext('<div class="extra-context">')
if extra_context.get('orig_formdef_id'):
r += htmltext('<h3>%s</h3>') % _('Original form')
try:
formdata = FormDef.get(extra_context.get('orig_formdef_id')
).data_class().get(extra_context.get('orig_formdata_id'))
except KeyError:
r += htmltext('<p>%s</p>') % _('(deleted)')
else:
r += htmltext('<p><a href="%s">%s %s</a></p>') % (
formdata.get_url(backoffice=True),
formdata.formdef.name,
formdata.get_display_id())
if formdata.submission_channel:
r += htmltext('<h3>%s</h3>') % '%s: %s' % (
_('Channel'), formdata.get_submission_channel_label())
if extra_context.get('thumbnail_url'):
r += htmltext('<p class="thumbnail"><img src="%s" alt=""/></p>'
) % extra_context.get('thumbnail_url')
if extra_context.get('mail_url'):
r += htmltext('<p><a href="%s">%s</a></p>') % (
extra_context.get('mail_url'), _('Open'))
if extra_context.get('comments'):
r += htmltext('<h3>%s</h3>') % _('Comments')
r += htmltext('<p>%s</p>') % extra_context.get('comments')
if extra_context.get('summary_url'):
r += htmltext('<div data-content-url="%s"></div>' %
(extra_context.get('summary_url')))
r += htmltext('</div>')
if formdata.user_id and formdata.get_user():
r += htmltext('<div class="extra-context">')
r += htmltext('<h3>%s</h3>') % _('Associated User')
r += htmltext('<p>%s</p>') % formdata.get_user().display_name
r += htmltext('</div>')
if formdata.formdef.geolocations and formdata.geolocations:
r += htmltext('<div class="geolocations">')
for geoloc_key in formdata.formdef.geolocations:
if not geoloc_key in formdata.geolocations:
continue
r += htmltext('<h3>%s</h3>') % formdata.formdef.geolocations[geoloc_key]
geoloc_value = formdata.geolocations[geoloc_key]
map_widget = MapWidget('geoloc_%s' % geoloc_key,
readonly=True,
value='%(lat)s;%(lon)s' % geoloc_value)
r += map_widget.render()
r += htmltext('</div>')
if formdata.user_id and get_publisher().is_using_postgresql():
r += htmltext('<div data-async-url="%suser-pending-forms"></div>' % formdata.get_url(backoffice=True))
if (get_publisher().get_backoffice_root().is_accessible('forms') or
get_publisher().get_backoffice_root().is_accessible('workflows')):
r += htmltext('<div class="extra-context">')
r += htmltext('<p><a href="inspect">%s</a></p>') % _('Form Inspector')
r += htmltext('</div>')
return r.getvalue()
def get_user_pending_forms(self):
from wcs import sql
formdata = self.filled
r = TemplateIO(html=True)
user_roles = [logged_users_role().id] + (get_request().user.roles or [])
criterias = [Equal('is_at_endpoint', False),
Equal('user_id', str(formdata.user_id)),
Intersects('concerned_roles_array', user_roles),
]
formdatas = sql.AnyFormData.select(criterias, order_by='receipt_time')
self.filled.related_user_forms = formdatas
if formdatas:
r += htmltext('<div class="user-pending-forms">')
r += htmltext('<h3>%s</h3>') % _('User Pending Forms')
categories = {}
formdata_by_category = {}
for formdata in formdatas:
if not formdata.formdef.category_id in categories:
categories[formdata.formdef.category_id] = formdata.formdef.category
formdata_by_category[formdata.formdef.category_id] = []
formdata_by_category[formdata.formdef.category_id].append(formdata)
cats = categories.values()
Category.sort_by_position(cats)
if self.formdef.category_id in categories:
# move current category to the top
cats.remove(categories[self.formdef.category_id])
cats.insert(0, categories[self.formdef.category_id])
for cat in cats:
if len(cats) > 1:
if cat is None:
r += htmltext('<h4>%s</h4>') % _('Misc')
cat_formdatas = formdata_by_category[None]
else:
r += htmltext('<h4>%s</h4>') % cat.name
cat_formdatas = formdata_by_category[cat.id]
else:
cat_formdatas = formdatas
r += htmltext('<ul class="user-formdatas">')
for formdata in cat_formdatas:
status = formdata.get_status()
if status:
status_label = status.name
else:
status_label = _('Unknown')
submit_date = misc.strftime(
misc.date_format(), formdata.receipt_time)
if str(formdata.formdef_id) == str(self.formdef.id) and (
str(formdata.id) == str(self.filled.id)):
r += htmltext('<li class="self"><span class="formname">%s</span> '
) % formdata.formdef.name
else:
r += htmltext('<li><a href="%s">%s</a> ') % (
formdata.get_url(backoffice=True),
formdata.formdef.name)
r += htmltext('(<span class="id">%s</span>), ') % formdata.get_display_id()
r += htmltext('<span class="datetime">%s</span> '
'<span class="status">(%s)</span>') % (
submit_date, status_label)
if formdata.digest:
r += htmltext('<small>%s</small>') % formdata.digest
r += htmltext('</li>')
r += htmltext('</ul>')
r += htmltext('</div>')
return r.getvalue()
def test_tools_form(self):
form = Form(use_tokens=False)
form.add(RadiobuttonsWidget, 'test_mode',
options=[
('django-condition', _('Condition (Django)'), 'django-condition'),
('python-condition', _('Condition (Python)'), 'python-condition'),
('template', '%s / %s' % (_('Template'), _('Django Expression')), 'template'),
('html_template', _('HTML Template (WYSIWYG)'), 'html_template'),
],
value='django-condition',
attrs={'data-dynamic-display-parent': 'true'})
form.add(StringWidget, 'django-condition',
extra_css_class='grid-1-1',
attrs={
'data-dynamic-display-child-of': 'test_mode',
'data-dynamic-display-value': 'django-condition'
})
form.add(StringWidget, 'python-condition',
attrs={
'data-dynamic-display-child-of': 'test_mode',
'data-dynamic-display-value': 'python-condition'
})
form.add(WysiwygTextWidget, 'html_template',
attrs={
'data-dynamic-display-child-of': 'test_mode',
'data-dynamic-display-value': 'html_template'
})
form.add(TextWidget, 'template',
attrs={
'data-dynamic-display-child-of': 'test_mode',
'data-dynamic-display-value': 'template'
})
form.add_submit('submit', _('Evaluate'))
return form
def test_tool_result(self):
form = self.test_tools_form()
r = TemplateIO(html=True)
if form.is_submitted() and not form.has_errors():
# show test result
test_mode = form.get_widget('test_mode').parse()
if test_mode in ('django-condition', 'python-condition'):
condition = Condition({
'value': form.get_widget(test_mode).parse(),
'type': test_mode.split('-')[0]})
condition.log_errors = False
condition.record_errors = False
try:
result = condition.unsafe_evaluate()
except Exception as exception:
r += htmltext('<div class="errornotice">')
r += htmltext('<p>%s</p>') % _('Failed to evaluate condition')
r += htmltext('<p>%s <code>%s: %s</code></p>') % (
_('Error message:'), exception.__class__.__name__, str(exception))
r += htmltext('</div>')
else:
r += htmltext('<div class="test-tool-result infonotice">')
r += htmltext('<h3>%s</h3>') % _('Condition result:')
r += htmltext('<p><span class="result-%s">%s</span>') % (
str(bool(result)).lower(), _('True') if result else _('False'))
if condition.type == 'python':
r += htmltext(' &mdash; %s <strong><code>%r</code></strong> '
'<span class="type">(%r)</span>') % (
_('Python actual result is'), result, type(result))
r += htmltext('</p>')
r += htmltext('</div>')
elif test_mode == 'template':
try:
template = form.get_widget('template').parse() or ''
result = template_on_formdata(self.filled, template, raises=True)
except Exception as exception:
r += htmltext('<div class="errornotice">')
r += htmltext('<p>%s</p>') % _('Failed to evaluate template')
r += htmltext('<p>%s <code>%s: %s</code></p>') % (
_('Error message:'), exception.__class__.__name__, str(exception))
else:
r += htmltext('<div class="test-tool-result infonotice">')
r += htmltext('<h3>%s</h3>') % _('Template rendering:')
if result and result[0] == '<': # seems to be HTML
r += htmltext('<div class="test-tool-result-html">')
r += htmltext(result)
r += htmltext('</div>')
r += htmltext('<h3>%s</h3>') % _('HTML Source:')
r += htmltext('<pre class="test-tool-result-plain">%s</pre>') % result
else:
r += htmltext('<div class="test-tool-result-plain">%s</div>') % result
r += htmltext('</div>')
elif test_mode == 'html_template':
try:
html_template = form.get_widget('html_template').parse() or ''
result = template_on_formdata(self.filled, html_template, raises=True,
ezt_format=ezt.FORMAT_HTML)
except Exception as exception:
r += htmltext('<div class="errornotice">')
r += htmltext('<p>%s</p>') % _('Failed to evaluate HTML template')
r += htmltext('<p>%s <code>%s: %s</code></p>') % (
_('Error message:'), exception.__class__.__name__, str(exception))
r += htmltext('</div>')
else:
r += htmltext('<div class="test-tool-result infonotice">')
r += htmltext('<h3>%s</h3>') % _('Template rendering:')
r += htmltext('<div class="test-tool-result-html">')
r += htmltext(result)
r += htmltext('</div>')
r += htmltext('<h3>%s</h3>') % _('HTML Source:')
r += htmltext('<pre class="test-tool-result-plain">%s</pre>') % result
r += htmltext('</div>')
return r.getvalue()
def inspect(self):
if not (get_publisher().get_backoffice_root().is_accessible('forms') or
get_publisher().get_backoffice_root().is_accessible('workflows')):
raise errors.AccessForbiddenError()
charset = get_publisher().site_charset
get_response().breadcrumb.append(('inspect', _('Form Inspector')))
self.html_top(self.formdef.name)
r = TemplateIO(html=True)
r += htmltext('<div id="appbar">')
r += htmltext('<h2>%s</h2>') % _('Form Inspector')
r += htmltext('<span class="actions">')
if get_publisher().get_backoffice_root().is_accessible('forms'):
r += htmltext(' <a href="../../../forms/%s/">%s</a>') % (
self.formdef.id, _('View Form'))
if get_publisher().get_backoffice_root().is_accessible('workflows'):
r += htmltext(' <a href="../../../workflows/%s/">%s</a>') % (
self.formdef.workflow.id, _('View Workflow'))
r += htmltext('</span>')
r += htmltext('</div>')
r += htmltext('<div id="inspect-test-tools" class="section">')
r += htmltext('<h2>%s</h2>') % _('Test tools')
r += htmltext('<div>')
form = self.test_tools_form()
r += form.render()
r += htmltext('<div id="test-tool-result">')
r += self.test_tool_result()
r += htmltext('</div>')
r += htmltext('</div></div>')
r += htmltext('<div id="inspect-variables" class="section">')
r += htmltext('<h2>%s</h2>') % _('Variables')
r += htmltext('<ul class="form-inspector biglist">')
substvars = self.filled.get_static_substitution_variables()
substvars.update(self.filled.formdef.get_static_substitution_variables())
def safe(v):
if isinstance(v, str):
try:
unicode(v, charset)
except UnicodeDecodeError:
v = repr(v)
else:
try:
v = unicode(v).encode(charset)
except:
v = repr(v)
return v
backward_compatibility_varnames = re.compile('^(attachments|form_field_.*|'
'form_f[0-9]+|form_fbo[0-9]+|form_user_f[0-9]+|'
'form_user_field_.*|form_user_f_.*)$')
for k, v in sorted(substvars.items()):
if backward_compatibility_varnames.search(k):
continue
k = safe(k)
r += htmltext('<li><code title="%s">%s</code>') % (k, k)
r += htmltext(' <div class="value"><span>%s</span>') % ellipsize(safe(v), 10000)
if not isinstance(v, basestring):
r += htmltext(' <span class="type">(%r)</span>') % type(v)
r += htmltext('</div></li>')
r += htmltext('</div>')
# assigned functions
if self.formdef.workflow.roles:
workflow = self.formdef.workflow
r += htmltext('<div id="inspect-functions" class="section">')
r += htmltext('<h2>%s</h2></li>\n') % _('Functions')
r += htmltext('<ul class="form-inspector biglist">')
for key, label in (workflow.roles or {}).items():
r += htmltext('<li><span class="label">%s</span>') % label
acting_role_id = get_role_translation(self.filled, key)
if acting_role_id:
try:
acting_role = Role.get(acting_role_id)
r += htmltext('<div class="value"><span>%s</span></div>') % acting_role.name
except KeyError:
r += htmltext('<div class="value"><span>%s %s</span></div>') % (
acting_role_id, _('(deleted)'))
else:
r += htmltext('<div class="value"><span class="unset">%s</span></div>') % _('unset')
r += htmltext('</li>\n')
r += htmltext('</ul>')
r += htmltext('</div>')
# markers stack
if '_markers_stack' in (self.filled.workflow_data or {}):
r += htmltext('<div id="inspect-markers" class="section">')
r += htmltext('<h2>%s</h2>') % _('Markers Stack')
r += htmltext('<ul class="form-inspector biglist">')
for marker in reversed(self.filled.workflow_data['_markers_stack']):
status = self.filled.get_status(marker['status_id'])
if status:
r += htmltext('<li><span class="status">%s</span></li>') % status.name
else:
r += htmltext('<li><span class="status">%s</span></li>') % _('Unknown')
r += htmltext('</ul>')
r += htmltext('</div>')
return r.getvalue()
def inspect_tool(self):
if not (get_publisher().get_backoffice_root().is_accessible('forms') or
get_publisher().get_backoffice_root().is_accessible('workflows')):
raise errors.AccessForbiddenError()
get_response().filter = {'raw': True}
return self.test_tool_result()
class FakeField(object):
def __init__(self, id, type_, label):
self.id = id
self.type = type_
self.label = label
self.fake = True
self.varname = id.replace('-', '_')
self.store_display_value = None
def get_view_value(self, value):
# just here to quack like a duck
return None
def get_csv_heading(self):
return [self.label]
def get_csv_value(self, element, **kwargs):
return [element]
def do_graphs_section(period_start=None, period_end=None, criterias=None):
from wcs import sql
r = TemplateIO(html=True)
monthly_totals = sql.get_monthly_totals(period_start, period_end, criterias)[-12:]
yearly_totals = sql.get_yearly_totals(period_start, period_end, criterias)[-10:]
if not monthly_totals:
monthly_totals = [('%s-%s' % datetime.date.today().timetuple()[:2], 0)]
if not yearly_totals:
yearly_totals = [(datetime.date.today().year, 0)]
weekday_totals = sql.get_weekday_totals(period_start, period_end, criterias)
weekday_line = []
weekday_names = [_('Sunday'), _('Monday'), _('Tuesday'),
_('Wednesday'), _('Thursday'), _('Friday'), _('Saturday')]
for weekday, total in weekday_totals:
label = weekday_names[weekday]
weekday_line.append((label, total))
# move Sunday to the last place
weekday_line = weekday_line[1:] + [weekday_line[0]]
hour_totals = sql.get_hour_totals(period_start, period_end, criterias)
r += htmltext('''<script>
var weekday_line = %(weekday_line)s;
var hour_line = %(hour_line)s;
var month_line = %(month_line)s;
var year_line = %(year_line)s;
</script>''' % {
'weekday_line': json.dumps(weekday_line),
'hour_line': json.dumps(hour_totals),
'month_line': json.dumps(monthly_totals),
'year_line': json.dumps(yearly_totals),
})
if len(yearly_totals) > 1:
r += htmltext('<h3>%s</h3>') % _('Submissions by year')
r += htmltext('<div id="chart_years" style="height:160px; width:100%;"></div>')
r += htmltext('<h3>%s</h3>') % _('Submissions by month')
r += htmltext('<div id="chart_months" style="height:160px; width:100%;"></div>')
r += htmltext('<h3>%s</h3>') % _('Submissions by weekday')
r += htmltext('<div id="chart_weekdays" style="height:160px; width:100%;"></div>')
r += htmltext('<h3>%s</h3>') % _('Submissions by hour')
r += htmltext('<div id="chart_hours" style="height:160px; width:100%;"></div>')
get_response().add_javascript(['jquery.js', 'jqplot/jquery.jqplot.min.js',
'jqplot/plugins/jqplot.canvasTextRenderer.min.js',
'jqplot/plugins/jqplot.canvasAxisLabelRenderer.min.js',
'jqplot/plugins/jqplot.canvasAxisTickRenderer.min.js',
'jqplot/plugins/jqplot.categoryAxisRenderer.min.js',
'jqplot/plugins/jqplot.barRenderer.min.js',
])
get_response().add_javascript_code('''
function wcs_draw_graphs() {
$.jqplot ('chart_weekdays', [weekday_line], {
series:[{renderer:$.jqplot.BarRenderer}],
axesDefaults: {
tickRenderer: $.jqplot.CanvasAxisTickRenderer,
tickOptions: { angle: -30, }
},
axes: { xaxis: { renderer: $.jqplot.CategoryAxisRenderer } }
});
$.jqplot ('chart_hours', [hour_line], {
axesDefaults: {
tickRenderer: $.jqplot.CanvasAxisTickRenderer,
tickOptions: { angle: -30, }
},
axes: { xaxis: { renderer: $.jqplot.CategoryAxisRenderer }, yaxis: {min: 0} }
});
$.jqplot ('chart_months', [month_line], {
axesDefaults: {
tickRenderer: $.jqplot.CanvasAxisTickRenderer,
tickOptions: { angle: -30, }
},
axes: { xaxis: { renderer: $.jqplot.CategoryAxisRenderer }, yaxis: {min: 0} }
});
if ($('#chart_years').length) {
$.jqplot ('chart_years', [year_line], {
series:[{renderer:$.jqplot.BarRenderer}],
axesDefaults: {
tickRenderer: $.jqplot.CanvasAxisTickRenderer,
tickOptions: { angle: -30, }
},
axes: { xaxis: { renderer: $.jqplot.CategoryAxisRenderer }, yaxis: {min: 0} }
});
}
}
$(document).ready(function(){
wcs_draw_graphs();
});
''')
return r.getvalue()
def get_global_criteria(request, parsed_values=None):
"""
Parses the request query string and returns a list of criterias suitable
for select() usage. The parsed_values parameter can be given a dictionary,
to be filled with the parsed values.
"""
criterias = [NotEqual('status', 'draft')]
try:
period_start = misc.get_as_datetime(request.form.get('start')).timetuple()
criterias.append(GreaterOrEqual('receipt_time', period_start))
parsed_values['period_start'] = period_start
except (ValueError, TypeError):
pass
try:
period_end = misc.get_as_datetime(request.form.get('end')).timetuple()
criterias.append(LessOrEqual('receipt_time', period_end))
parsed_values['period_end'] = period_end
except (ValueError, TypeError):
pass
return criterias
def format_time(t, units = 2):
days = int(t/86400)
hours = int((t-days*86400)/3600)
minutes = int((t-days*86400-hours*3600)/60)
seconds = t % 60
if units == 1:
if days:
return _('%d day(s)') % days
if hours:
return _('%d hour(s)') % hours
if minutes:
return _('%d minute(s)') % minutes
elif units == 2:
if days:
return _('%(days)d day(s) and %(hours)d hour(s)') % {
'days': days, 'hours': hours}
if hours:
return _('%(hours)d hour(s) and %(minutes)d minute(s)') % {
'hours': hours, 'minutes': minutes}
if minutes:
return _('%(minutes)d minute(s) and %(seconds)d seconds') % {
'minutes': minutes, 'seconds': seconds}
return _('%d seconds') % seconds