wcs/wcs/backoffice/management.py

2332 lines
99 KiB
Python

# w.c.s. - web application for online forms
# Copyright (C) 2005-2015 Entr'ouvert
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import csv
import cStringIO
import datetime
import json
import time
import urllib
import vobject
try:
import xlwt
except ImportError:
xlwt = None
from quixote import get_session, get_publisher, get_request, get_response, redirect
from quixote.directory import Directory
from quixote.html import TemplateIO, htmltext, htmlescape
from qommon import _, ngettext
from qommon.admin.emails import EmailsDirectory
from qommon.admin.menu import command_icon
from qommon.backoffice.menu import html_top
from qommon.backoffice.listing import pagination_links
from qommon import misc, get_logger
from qommon.evalutils import make_datetime
from qommon.misc import C_, ellipsize
from qommon.afterjobs import AfterJob
from qommon import emails
import qommon.sms
from qommon import errors
from qommon import ezt
from qommon import ods
from qommon.form import *
from qommon.storage import (Equal, NotEqual, LessOrEqual, GreaterOrEqual, Or,
Intersects, ILike, FtsMatch, Contains, Null)
from wcs.api_utils import get_user_from_api_query_string
from wcs.forms.backoffice import FormDefUI
from wcs.forms.common import FormStatusPage
from wcs.admin.settings import UserFieldsFormDef
from wcs.categories import Category
from wcs.formdata import FormData
from wcs.formdef import FormDef
from wcs.roles import logged_users_role
from .submission import FormFillPage
def geojson_formdatas(formdatas, geoloc_key='base', fields=None):
geojson = {
'type': 'FeatureCollection',
'features': []
}
for formdata in formdatas:
if not formdata.geolocations or not geoloc_key in formdata.geolocations:
continue
coords = formdata.geolocations[geoloc_key]
status = formdata.get_status()
try:
status_colour = status.colour
except AttributeError:
status_colour = 'ffffff'
display_fields = []
formdata_backoffice_url = formdata.get_url(backoffice=True)
if fields:
for field in fields:
if field.type == 'map':
continue
value = formdata.get_field_view_value(field, max_length=60)
value = value.replace('[download]', formdata_backoffice_url)
if not value:
continue
display_fields.append((str(htmlescape(field.label)), str(htmlescape(value))))
feature = {
'type': 'Feature',
'properties': {
'name': str(htmlescape(formdata.get_display_name())),
'url': formdata_backoffice_url,
'status_name': str(htmlescape(status.name)),
'status_colour': '#%s' % status_colour,
'display_fields': display_fields,
'view_label': _('View'),
},
'geometry': {
'type': 'Point',
'coordinates': [coords['lon'], coords['lat']],
}
}
geojson['features'].append(feature)
return geojson
class SendCodeFormdefDirectory(Directory):
formdef = None
def __init__(self, formdef):
self.formdef = formdef
def _q_lookup(self, formdata_id):
html_top('management', _('Management'))
formdata = self.formdef.data_class().get(formdata_id)
submitter_email = formdata.formdef.get_submitter_email(formdata)
mail_subject = EmailsDirectory.get_subject('tracking-code-reminder')
mail_body = EmailsDirectory.get_body('tracking-code-reminder')
form = Form()
form.add(TextWidget, 'text', title=_('Message'), required=True,
cols=60, rows=5, value=mail_body)
form.add(EmailWidget, 'email', title=_('Email'), required=False,
value=submitter_email)
sms_class = None
if get_publisher().use_sms_feature:
sms_cfg = get_cfg('sms', {})
mode = sms_cfg.get('mode', 'none')
sms_class = qommon.sms.SMS.get_sms_class(mode)
if sms_class:
form.add(StringWidget, 'sms', title=_('SMS Number'), required=False)
form.add(RadiobuttonsWidget, 'method',
options=[('email', _('Email')),
('sms', _('SMS'))],
value='email',
required=True)
form.add_submit('submit', _('Send'))
form.add_submit('cancel', _('Cancel'))
if not form.is_submitted() or form.has_errors():
r = TemplateIO(html=True)
r += htmltext('<h2>%s</h2>') % _('Send tracking code')
r += form.render()
return r.getvalue()
if not formdata.tracking_code:
tracking_code = get_publisher().tracking_code_class()
tracking_code.formdata = formdata # this stores both objects
msg = form.get_widget('text').parse()
data = {
'form_tracking_code': formdata.tracking_code,
'tracking_code': formdata.tracking_code,
'email': form.get_widget('email').parse(),
}
data.update(self.formdef.get_substitution_variables(minimal=True))
msg_template = ezt.Template(compress_whitespace=False)
msg_template.parse(msg)
fd = cStringIO.StringIO()
msg_template.generate(fd, data)
msg_body = fd.getvalue()
if sms_class and form.get_widget('method').parse() == 'sms':
# send sms
sitename = get_cfg('misc', {}).get('sitename') or 'w.c.s.'
sender = sms_cfg.get('sender', sitename)[:11]
message = msg_body
try:
sms_class.send(sender, [form.get_widget('sms').parse()], message)
except errors.SMSError, e:
get_logger().error(e)
get_session().message = ('info', _('SMS with tracking code sent to the user'))
else:
# send mail
emails.ezt_email(mail_subject, msg_body,
mail_body_data=data,
email_rcpt=form.get_widget('email').parse())
get_session().message = ('info', _('Email with tracking code sent to the user'))
return redirect('../..')
class SendCodeDirectory(Directory):
def _q_lookup(self, component):
try:
formdef = FormDef.get_by_urlname(component)
if not formdef.enable_tracking_codes:
raise errors.TraversalError()
return SendCodeFormdefDirectory(formdef)
except KeyError:
raise errors.TraversalError()
class UserViewDirectory(Directory):
_q_exports = ['', 'sendcode']
sendcode = SendCodeDirectory()
user = None
def __init__(self, user):
self.user = user
def _q_index(self):
get_response().breadcrumb.append(('%s/' % self.user.id, self.user.display_name))
html_top('management', _('Management'))
# display list of open formdata for the user
formdefs = [x for x in FormDef.select() if not x.skip_from_360_view]
user_roles = set([logged_users_role().id] + (get_request().user.roles or []))
criterias = [Equal('is_at_endpoint', False),
Equal('user_id', str(self.user.id)),
Contains('formdef_id', [x.id for x in formdefs]),
]
from wcs import sql
formdatas = sql.AnyFormData.select(criterias, order_by='receipt_time')
criterias = [Equal('is_at_endpoint', False),
Equal('user_id', str(self.user.id)),
Intersects('concerned_roles_array', user_roles),
]
viewable_formdatas = sql.AnyFormData.select(criterias)
viewable_formdatas_ids = {}
for viewable_formdata in viewable_formdatas:
viewable_formdatas_ids[(viewable_formdata.formdef.id, viewable_formdata.id)] = True
r = TemplateIO(html=True)
r += get_session().display_message()
r += htmltext('<div class="bo-block">')
r += htmltext('<h2>%s</h2>') % self.user.display_name
formdef = UserFieldsFormDef()
r += htmltext('<div class="form">')
for field in formdef.get_all_fields():
if not hasattr(field, str('get_view_value')):
continue
value = self.user.form_data.get(field.id)
if not value:
continue
r += htmltext('<div class="title">')
r += field.label
r += htmltext('</div>')
r += htmltext('<div class="StringWidget content">')
r += field.get_view_value(value)
r += htmltext('</div>')
r += htmltext('</div>')
r += htmltext('</div>')
if formdatas:
categories = {}
formdata_by_category = {}
for formdata in formdatas:
if not formdata.formdef.category_id in categories:
categories[formdata.formdef.category_id] = formdata.formdef.category
formdata_by_category[formdata.formdef.category_id] = []
formdata_by_category[formdata.formdef.category_id].append(formdata)
cats = categories.values()
Category.sort_by_position(cats)
for cat in cats:
r += htmltext('<div class="bo-block">')
if len(cats) > 1:
if cat is None:
r += htmltext('<h2>%s</h2>') % _('Misc')
cat_formdatas = formdata_by_category[None]
else:
r += htmltext('<h2>%s</h2>') % cat.name
cat_formdatas = formdata_by_category[cat.id]
else:
cat_formdatas = formdatas
r += htmltext('<ul class="biglist c-360-user-view">')
for formdata in cat_formdatas:
status_label = formdata.get_status_label()
submit_date = misc.strftime.strftime(
misc.date_format(), formdata.receipt_time)
formdata_key_id = (formdata.formdef.id, formdata.id)
if formdata_key_id in viewable_formdatas_ids:
r += htmltext('<li><a href="%s">%s, '
'<span class="datetime">%s</span> '
'<span class="status">(%s)</span></a>' % (
formdata.get_url(backoffice=True),
formdata.formdef.name,
submit_date, status_label))
else:
r += htmltext('<li><span>%s, '
'<span class="datetime">%s</span> '
'<span class="status">(%s)</span></span>' % (
formdata.formdef.name,
submit_date, status_label))
if formdata.formdef.enable_tracking_codes:
r += htmltext('<p class="commands">')
r += command_icon('sendcode/%s/%s' %
(formdata.formdef.url_name, formdata.id),
'export',
label=_('Send tracking code'),
popup=True)
r += htmltext('</p>')
r += htmltext('</ul>')
r += htmltext('</div>')
r += htmltext('</div>')
return r.getvalue()
class UsersViewDirectory(Directory):
_q_exports = ['']
def _q_traverse(self, path):
if not get_publisher().is_using_postgresql():
raise errors.TraversalError()
get_response().breadcrumb.append(('users', _('Per User View')))
return super(UsersViewDirectory, self)._q_traverse(path)
def get_search_sidebar(self, offset=None, limit=None, order_by=None):
get_response().add_javascript(['jquery.js', 'jquery-ui.js', 'wcs.listing.js'])
r = TemplateIO(html=True)
r += htmltext('<form id="listing-settings" action=".">')
if offset or limit:
if not offset:
offset = 0
r += htmltext('<input type="hidden" name="offset" value="%s"/>') % offset
if limit:
r += htmltext('<input type="hidden" name="limit" value="%s"/>') % limit
if order_by is None:
order_by = ''
r += htmltext('<input type="hidden" name="order_by" value="%s"/>') % order_by
r += htmltext('<h3>%s</h3>') % _('Search')
if get_request().form.get('q'):
q = get_request().form.get('q')
if type(q) is not unicode:
q = unicode(q, get_publisher().site_charset)
r += htmltext('<input name="q" value="%s">') % q.encode(get_publisher().site_charset)
else:
r += htmltext('<input name="q">')
r += htmltext('<input type="submit" value="%s"/>') % _('Search')
return r.getvalue()
def _q_index(self):
html_top('management', _('Management'))
r = TemplateIO(html=True)
limit = int(get_request().form.get('limit',
get_publisher().get_site_option('default-page-size')) or 20)
offset = int(get_request().form.get('offset', 0))
order_by = get_request().form.get('order_by', None) or 'name'
query = get_request().form.get('q')
get_response().filter['sidebar'] = self.get_search_sidebar(
limit=limit, offset=offset, order_by=order_by)
if not query:
r += htmltext('<div id="listing">')
r += htmltext('<div class="big-msg-info">')
r += htmltext('<p>%s</p>') % _('Use the search field on the right '
'to look for an user.')
r += htmltext('</div>')
r += htmltext('</div>')
if get_request().form.get('ajax') == 'true':
get_response().filter = None
return r.getvalue()
formdef = UserFieldsFormDef()
criteria_fields = [
ILike('name', query),
ILike('ascii_name', misc.simplify(query, ' ')),
ILike('email', query)]
for field in formdef.get_all_fields():
if field.type in ('string', 'text', 'email'):
criteria_fields.append(ILike('f%s' % field.id, query))
if get_publisher().is_using_postgresql():
criteria_fields.append(FtsMatch(query))
criterias = [Or(criteria_fields)]
users = get_publisher().user_class.select(order_by=order_by,
clause=criterias, limit=limit, offset=offset)
total_count = get_publisher().user_class.count(criterias)
users_cfg = get_cfg('users', {})
include_name_column = not(users_cfg.get('field_name'))
include_email_column = not(users_cfg.get('field_email'))
r += htmltext('<div id="listing">')
r += htmltext('<table class="main">')
r += htmltext('<thead>')
r += htmltext('<tr>')
if include_name_column:
r += htmltext('<th data-field-sort-key="name"><span>%s</span></th>') % _('Name')
if include_email_column:
r += htmltext('<th data-field-sort-key="email"><span>%s</span></th>') % _('Email')
for field in formdef.get_all_fields():
if field.in_listing:
r += htmltext('<th data-field-sort-key="f%s"><span>%s</span></th>') % (
field.id, field.label)
r += htmltext('</tr>')
r += htmltext('</thead>')
r += htmltext('<tbody>')
for user in users:
r += htmltext('<tr data-link="%s/">') % user.id
if include_name_column:
r += htmltext('<td>%s</td>') % (user.name or '')
if include_email_column:
r += htmltext('<td>%s</td>') % (user.email or '')
for field in formdef.get_all_fields():
if field.in_listing:
r += htmltext('<td>%s</td>') % (user.form_data.get(field.id) or '')
r += htmltext('</tr>')
r += htmltext('</tbody>')
r += htmltext('</table>')
if get_publisher().is_using_postgresql():
r += pagination_links(offset, limit, total_count)
r += htmltext('</div>')
if get_request().form.get('ajax') == 'true':
get_response().filter = None
return r.getvalue()
return r.getvalue()
def _q_lookup(self, component):
try:
user = get_publisher().user_class.get(component)
return UserViewDirectory(user)
except KeyError:
pass
raise errors.TraversalError()
class ManagementDirectory(Directory):
_q_exports = ['', 'forms', 'listing', 'statistics', 'lookup', 'count',
'users', 'geojson', 'map']
users = UsersViewDirectory()
def is_accessible(self, user):
return user.can_go_in_backoffice()
def _q_traverse(self, path):
get_response().breadcrumb.append(('management/', _('Management')))
return super(ManagementDirectory, self)._q_traverse(path)
def _q_index(self):
if get_publisher().has_site_option('default-to-global-view'):
return redirect('listing')
else:
return redirect('forms')
def forms(self):
html_top('management', _('Management'))
get_response().filter['sidebar'] = self.get_sidebar()
r = TemplateIO(html=True)
r += get_session().display_message()
user = get_request().user
user_roles = [logged_users_role().id] + (user.roles or [])
forms_without_pending_stuff = []
forms_with_pending_stuff = []
def append_form_entry(formdef):
formdef_data_class = formdef.data_class()
count_forms = formdef_data_class.count() - len(formdef_data_class.get_ids_with_indexed_value('status', 'draft'))
waiting_forms_count = formdef_data_class.get_actionable_count(user_roles)
if waiting_forms_count == 0:
forms_without_pending_stuff.append((formdef, waiting_forms_count, count_forms))
else:
forms_with_pending_stuff.append((formdef, waiting_forms_count, count_forms))
if user:
for formdef in FormDef.select(order_by='name', ignore_errors=True):
if formdef.disabled:
continue
if user.is_admin or formdef.is_of_concern_for_user(user):
append_form_entry(formdef)
if forms_with_pending_stuff:
r += htmltext('<div class="bo-block" id="forms-in-your-care">')
r += htmltext('<h2>%s</h2>') % _('Forms in your care')
r += self.display_forms(forms_with_pending_stuff)
r += htmltext('</div>')
if forms_without_pending_stuff:
r += htmltext('<div class="bo-block" id="other-forms">')
r += htmltext('<h2>%s</h2>') % _('Other Forms')
r += self.display_forms(forms_without_pending_stuff)
r += htmltext('</div>')
return r.getvalue()
def get_sidebar(self):
r = TemplateIO(html=True)
r += htmltext('<div class="bo-block">')
r += htmltext('<ul id="sidebar-actions">')
r += htmltext('<li class="stats"><a href="statistics">%s</a></li>') % _('Global statistics')
if get_publisher().is_using_postgresql() and \
get_publisher().get_site_option('postgresql_views') != 'false':
r += htmltext('<li><a href="listing">%s</a></li>') % _('Global View')
r += htmltext('<li><a href="users/">%s</a></li>') % _('Per User View')
for formdef in FormDef.select():
if formdef.geolocations:
r += htmltext('<li><a href="map">%s</a>') % _('Map View')
break
r += htmltext('</ul>')
r += htmltext('</div>')
r += self.get_lookup_sidebox()
return r.getvalue()
def lookup(self):
query = get_request().form.get('query', '').strip()
if any((x for x in FormDef.select() if x.enable_tracking_codes)):
try:
tracking_code = get_publisher().tracking_code_class.get(query)
formdata = tracking_code.formdata
get_session().mark_anonymous_formdata(formdata)
return redirect(formdata.get_url(backoffice=True))
except KeyError:
pass
if re.match(r'^\d+-\d{1,10}$', query):
formdef_id, formdata_id = query.split('-')
try:
formdef = FormDef.get(formdef_id)
formdata = formdef.data_class().get(formdata_id)
except KeyError:
pass
else:
return redirect(formdata.get_url(backoffice=True))
get_session().message = ('error', _('No such tracking code or identifier.'))
return redirect(get_request().form.get('back') or '.')
def get_lookup_sidebox(self, back_place=''):
r = TemplateIO(html=True)
r += htmltext('<div id="lookup-box">')
r += htmltext('<h3>%s</h3>' % _('Look up by tracking code or identifier'))
r += htmltext('<form action="lookup">')
r += htmltext('<input type="hidden" name="back" value="%s"/>') % back_place
r += htmltext('<input class="inline-input" size="12" name="query"/>')
r += htmltext('<button>%s</button>') % _('Look up')
r += htmltext('</form>')
r += htmltext('</div>')
return r.getvalue()
def get_global_listing_sidebar(self, limit=None, offset=None, order_by=None):
get_response().add_javascript(['jquery.js'])
DateWidget.prepare_javascript()
form = Form(use_tokens=False, id='listing-settings')
form.add(SingleSelectWidget, 'status', title=_('Status'),
options=[
('waiting', _('Waiting for an action'), 'waiting'),
('open', C_('formdata|Open'), 'open'),
('done', _('Done'), 'done'),
('all', _('All'), 'all')])
form.add(DateWidget, 'start', title=_('Start Date'))
form.add(DateWidget, 'end', title=_('End Date'))
categories = Category.select()
if categories:
Category.sort_by_position(categories)
category_options = [(None, C_('categories|All'), '')] + [(x.id, x.name, x.id) for x in categories]
form.add(SingleSelectWidget, 'category_id',
title=_('Category'),
options=category_options)
if bool(get_publisher().get_site_option('welco_url', 'variables')):
form.add(SingleSelectWidget, 'submission_channel',
title=_('Channel'),
options=[(None, C_('channel|All'), '')] +
[(x, y, x) for x, y in FormData.get_submission_channels().items()])
form.add(StringWidget, 'q', title=_('Text'))
if not offset:
offset = 0
if not limit:
limit = int(get_publisher().get_site_option('default-page-size') or 20)
if not order_by:
order_by = get_publisher().get_site_option('default-sort-order') or '-receipt_time'
form.add_hidden('offset', offset)
form.add_hidden('limit', limit)
form.add_hidden('order_by', order_by)
form.add_submit('submit', _('Submit'))
r = TemplateIO(html=True)
r += self.get_lookup_sidebox('listing')
r += htmltext('<div>')
r += htmltext('<h3>%s</h3>') % _('Filters')
r += form.render()
r += htmltext('</div>')
return r.getvalue()
def get_stats_sidebar(self):
get_response().add_javascript(['jquery.js'])
DateWidget.prepare_javascript()
form = Form(use_tokens=False)
form.add(DateWidget, 'start', title=_('Start Date'))
form.add(DateWidget, 'end', title=_('End Date'))
form.add_submit('submit', _('Submit'))
r = TemplateIO(html=True)
r += htmltext('<h3>%s</h3>') % _('Period')
r += form.render()
r += htmltext('<h3>%s</h3>') % _('Shortcuts')
r += htmltext('<ul>') # presets
current_month_start = datetime.datetime.now().replace(day=1)
start = current_month_start.strftime(misc.date_format())
r += htmltext(' <li><a href="?start=%s">%s</a>') % (start, _('Current Month'))
previous_month_start = current_month_start - datetime.timedelta(days=2)
previous_month_start = previous_month_start.replace(day=1)
start = previous_month_start.strftime(misc.date_format())
end = current_month_start.strftime(misc.date_format())
r += htmltext(' <li><a href="?start=%s&end=%s">%s</a>') % (
start, end, _('Previous Month'))
current_year_start = datetime.datetime.now().replace(month=1, day=1)
start = current_year_start.strftime(misc.date_format())
r += htmltext(' <li><a href="?start=%s">%s</a>') % (start, _('Current Year'))
previous_year_start = current_year_start.replace(year=current_year_start.year-1)
start = previous_year_start.strftime(misc.date_format())
end = current_year_start.strftime(misc.date_format())
r += htmltext(' <li><a href="?start=%s&end=%s">%s</a>') % (
start, end, _('Previous Year'))
return r.getvalue()
def statistics(self):
html_top('management', _('Global statistics'))
get_response().breadcrumb.append(('statistics', _('Global statistics')))
get_response().filter['sidebar'] = self.get_stats_sidebar()
r = TemplateIO(html=True)
r += htmltext('<h2>%s</h2>') % _('Global statistics')
formdefs = FormDef.select(lambda x: not x.is_disabled() or x.disabled_redirection,
order_by='name', ignore_errors=True)
counts = {}
parsed_values = {}
criterias = get_global_criteria(get_request(), parsed_values)
for formdef in formdefs:
values = formdef.data_class().select(criterias)
counts[formdef.id] = len(values)
do_graphs = False
if get_publisher().is_using_postgresql() and \
get_publisher().get_site_option('postgresql_views') != 'false':
do_graphs = True
r += htmltext('<p>%s %s</p>') % (_('Total count:'), sum(counts.values()))
if do_graphs:
r += htmltext('<div class="splitcontent-left">')
cats = Category.select()
for cat in cats:
category_formdefs = [x for x in formdefs if x.category_id == cat.id]
r += self.category_global_stats(cat.name, category_formdefs, counts)
category_formdefs = [x for x in formdefs if x.category_id is None]
r += self.category_global_stats(_('Misc'), category_formdefs, counts)
if do_graphs:
r += htmltext('</div>')
r += htmltext('<div class="splitcontent-right">')
period_start = parsed_values.get('period_start')
period_end = parsed_values.get('period_end')
r += do_graphs_section(period_start, period_end,
criterias=[NotEqual('status', 'draft')])
r += htmltext('</div>')
return r.getvalue()
def category_global_stats(self, title, category_formdefs, counts):
r = TemplateIO(html=True)
category_formdefs_ids = [x.id for x in category_formdefs]
if not category_formdefs:
return
cat_counts = dict([(x, y) for x, y in counts.items() if x in
category_formdefs_ids])
if sum(cat_counts.values()) == 0:
return
r += htmltext('<div class="bo-block">')
r += htmltext('<h3>%s</h3>') % title
r += htmltext('<p>%s %s</p>') % (_('Count:'), sum(cat_counts.values()))
r += htmltext('<ul>')
for category_formdef in category_formdefs:
if not counts.get(category_formdef.id):
continue
r += htmltext('<li>%s %s</li>') % (
_('%s:') % category_formdef.name,
counts.get(category_formdef.id))
r += htmltext('</ul>')
r += htmltext('</div>')
return r.getvalue()
def display_forms(self, forms_list):
r = TemplateIO(html=True)
r += htmltext('<ul class="biglist">')
cats = Category.select(order_by = 'name')
for c in cats + [None]:
if c is None:
l2 = [x for x in forms_list if not x[0].category_id]
cat_name = _('Misc')
else:
l2 = [x for x in forms_list if x[0].category_id == c.id]
cat_name = c.name
if not l2:
continue
r += htmltext('<li><h3>%s</h3></li>') % cat_name
for formdef, no_pending, no_total in l2:
r += htmltext('<li><strong><a href="%s/">%s</a></strong>') % (formdef.url_name, formdef.name)
klass = ''
if no_total:
klass = 'badge'
r += htmltext('<p class="details %s">' % klass)
if no_pending:
r += _('%(pending)s open on %(total)s') % {'pending': no_pending,
'total': no_total}
else:
r += ngettext('%(total)s item', '%(total)s items', no_total) % {'total': no_total}
r += htmltext('</p>')
r += htmltext('</li>')
r += htmltext('</ul>')
return r.getvalue()
def get_global_listing_criterias(self):
parsed_values = {}
user_roles = [logged_users_role().id] + (get_request().user.roles or [])
criterias = get_global_criteria(get_request(), parsed_values)
query_parameters = (get_request().form or {}).copy()
query_parameters.pop('callback', None) # when using jsonp
status = query_parameters.get('status', 'waiting')
if query_parameters.get('waiting') == 'yes':
# compatibility with ?waiting=yes|no parameter, still used in
# the /count endpoint used for indicators
status = 'waiting'
elif query_parameters.get('waiting') == 'no':
status = 'open'
if status == 'waiting':
criterias.append(Equal('is_at_endpoint', False))
criterias.append(Intersects('actions_roles_array', user_roles))
elif status == 'open':
criterias.append(Equal('is_at_endpoint', False))
criterias.append(Intersects('concerned_roles_array', user_roles))
elif status == 'done':
criterias.append(Equal('is_at_endpoint', True))
criterias.append(Intersects('concerned_roles_array', user_roles))
elif status == 'all':
criterias.append(Intersects('concerned_roles_array', user_roles))
if get_request().form.get('submission_channel'):
if get_request().form.get('submission_channel') == 'web':
criterias.append(Null('submission_channel'))
else:
criterias.append(Equal('submission_channel',
get_request().form.get('submission_channel')))
if get_request().form.get('category_id'):
criterias.append(Equal('category_id',
get_request().form.get('category_id')))
if get_request().form.get('q'):
criterias.append(FtsMatch(get_request().form.get('q')))
return criterias
def listing(self):
if not get_publisher().is_using_postgresql():
raise errors.TraversalError()
get_response().add_javascript(['jquery.js', 'jquery-ui.js', 'wcs.listing.js'])
from wcs import sql
html_top('management', _('Management'))
if FormDef.count() == 0:
r = TemplateIO(html=True)
r += htmltext('<div class="top-title">')
r += htmltext('<h2>%s</h2>') % _('Global View')
r += htmltext('</div>')
r += htmltext('<div class="big-msg-info">')
r += htmltext('<p>%s</p>') % _(
'This site is currently empty. It is required to first add forms.')
r += htmltext('</div>')
return r.getvalue()
limit = int(get_request().form.get('limit',
get_publisher().get_site_option('default-page-size') or 20))
offset = int(get_request().form.get('offset', 0))
order_by = get_request().form.get('order_by',
get_publisher().get_site_option('default-sort-order') or '-receipt_time')
criterias = self.get_global_listing_criterias()
total_count = sql.AnyFormData.count(criterias)
if offset > total_count:
get_request().form['offset'] = '0'
return redirect('listing?' + urllib.urlencode(get_request().form))
formdatas = sql.AnyFormData.select(criterias,
order_by=order_by, limit=limit, offset=offset)
include_submission_channel = bool(
get_publisher().get_site_option('welco_url', 'variables'))
include_criticality_level = get_publisher().has_site_option('workflow-criticality-levels')
r = TemplateIO(html=True)
r += htmltext('<table id="listing" class="main">')
r += htmltext('<thead><tr>')
if include_criticality_level:
r += htmltext('<th data-field-sort-key="criticality_level"><span></span></th>')
else:
r += htmltext('<th></th>') # lock
if include_submission_channel:
r += htmltext('<th data-field-sort-key="submission_channel"><span>%s</span></th>') % _('Channel')
r += htmltext('<th data-field-sort-key="formdef_name"><span>%s</span></th>') % _('Form')
r += htmltext('<th><span>%s</span></th>') % _('Number')
r += htmltext('<th data-field-sort-key="receipt_time"><span>%s</span></th>') % _('Created')
r += htmltext('<th data-field-sort-key="last_update_time"><span>%s</span></th>') % _('Last Modified')
r += htmltext('<th data-field-sort-key="user_name"><span>%s</span></th>') % C_('frontoffice|User')
r += htmltext('<th class="nosort"><span>%s</span></th>') % _('Status')
r += htmltext('</tr></thead>')
r += htmltext('<tbody>')
workflows = {}
visited_objects = get_publisher().get_visited_objects(exclude_user=get_session().user)
for formdata in formdatas:
if not formdata.formdef.workflow_id in workflows:
workflows[formdata.formdef.workflow_id] = formdata.formdef.workflow
classes = ['status-%s-%s' % (formdata.formdef.workflow.id, formdata.status)]
object_key = 'formdata-%s-%s' % (formdata.formdef.url_name, formdata.id)
if object_key in visited_objects:
classes.append('advisory-lock')
style = ''
if include_criticality_level:
try:
level = formdata.get_criticality_level_object()
except IndexError:
pass
else:
classes.append('criticality-level')
style = 'style="border-left-color: #%s;"' % level.colour
r += htmltext('<tr class="%s" data-link="%s">' % (
' '.join(classes),
formdata.get_url(backoffice=True)))
r += htmltext('<td %s></td>' % style) # lock
if include_submission_channel:
r += htmltext('<td>%s</td>') % formdata.get_submission_channel_label()
r += htmltext('<td>%s</td>') % formdata.formdef.name
r += htmltext('<td><a href="%s">%s</a></td>') % (
formdata.get_url(backoffice=True), formdata.get_display_id())
r += htmltext('<td class="cell-time">%s</td>') % misc.localstrftime(
formdata.receipt_time)
r += htmltext('<td class="cell-time">%s</td>') % misc.localstrftime(
formdata.last_update_time)
try:
value = get_publisher().user_class.get(formdata.user_id).display_name
r += htmltext('<td class="cell-user">%s</td>') % value
except:
r += htmltext('<td class="cell-user cell-no-user">-</td>')
r += htmltext('<td class="cell-status">%s</td>') % formdata.get_status_label()
r += htmltext('</tr>\n')
if workflows:
colours = []
for workflow in workflows.values():
for status in workflow.possible_status:
if status.colour and status.colour != 'FFFFFF':
fg_colour = misc.get_foreground_colour(status.colour)
colours.append((workflow.id, status.id, status.colour, fg_colour))
if colours:
r += htmltext('<style>')
for workflow_id, status_id, bg_colour, fg_colour in colours:
r += htmltext('tr.status-%s-wf-%s td.cell-status { '\
'background-color: #%s !important; color: %s !important; }\n' % (
workflow_id, status_id, bg_colour, fg_colour))
r += htmltext('</style>')
r += htmltext('</tbody></table>')
if (offset > 0) or (total_count > limit > 0):
r += pagination_links(offset, limit, total_count)
if get_request().form.get('ajax') == 'true':
get_response().filter = None
return r.getvalue()
get_response().filter['sidebar'] = self.get_global_listing_sidebar(
limit=limit, offset=offset, order_by=order_by)
rt = TemplateIO(html=True)
rt += htmltext('<div class="top-title">')
rt += htmltext('<h2>%s</h2>') % _('Global View')
rt += htmltext('<div class="alt-views">')
rt += htmltext('<a href="forms">%s</a>') % _('Forms View')
for formdef in FormDef.select():
if formdef.geolocations:
rt += htmltext(' - <a href="map">%s</a>') % _('Map View')
break
rt += htmltext('</div>')
rt += htmltext('</div>')
rt += get_session().display_message()
rt += r.getvalue()
r = rt
return rt.getvalue()
def count(self):
if not get_publisher().is_using_postgresql():
raise errors.TraversalError()
if FormDef.count() == 0:
return misc.json_response({'count': 0})
from wcs import sql
criterias = self.get_global_listing_criterias()
count = sql.AnyFormData.count(criterias)
return misc.json_response({'count': count})
def geojson(self):
from wcs import sql
criterias = self.get_global_listing_criterias()
formdatas = sql.AnyFormData.select(criterias)
get_response().set_content_type('application/json')
return json.dumps(geojson_formdatas(formdatas))
def map(self):
if not get_publisher().is_using_postgresql():
raise errors.TraversalError()
get_response().add_javascript(['wcs.listing.js', 'qommon.map.js'])
html_top('management', _('Global Map'))
r = TemplateIO(html=True)
get_response().breadcrumb.append(('map', _('Global Map')))
attrs = {
'class': 'qommon-map',
'id': 'backoffice-map',
'data-readonly': True,
'data-geojson-url': '%s/geojson?%s' % (
get_request().get_url(1), get_request().get_query())
}
attrs.update(get_publisher().get_map_attributes())
get_response().filter['sidebar'] = self.get_global_listing_sidebar()
r += htmltext('<h2>%s</h2>') % _('Global Map')
r += htmltext('<div %s></div>' % ' '.join(['%s="%s"' % x for x in attrs.items()]))
return r.getvalue()
def _q_lookup(self, component):
return FormPage(component)
class FormPage(Directory):
_q_exports = ['', 'csv', 'stats', 'xls', 'ods', 'json', 'export', 'map',
'geojson']
def __init__(self, component):
try:
self.formdef = FormDef.get_by_urlname(component)
except KeyError:
raise errors.TraversalError()
get_response().breadcrumb.append( (component + '/', self.formdef.name) )
def check_access(self):
session = get_session()
user = get_request().user
if user is None and get_publisher().user_class.count() == 0:
user = get_publisher().user_class()
user.is_admin = True
if not user:
raise errors.AccessUnauthorizedError()
if not user.is_admin and not self.formdef.is_of_concern_for_user(user):
if session.user:
raise errors.AccessForbiddenError()
else:
raise errors.AccessUnauthorizedError()
def get_formdata_sidebar(self, qs=''):
r = TemplateIO(html=True)
r += htmltext('<ul id="sidebar-actions">')
#' <li><a href="list%s">%s</a></li>' % (qs, _('List of results'))
r += htmltext(' <li><a data-base-href="ods" href="ods%s">%s</a></li>') % (
qs, _('Export a Spreadsheet'))
r += htmltext(' <li><a data-base-href="csv" href="csv%s">%s</a></li>') % (
qs, _('Export as CSV File'))
if xlwt:
r += htmltext('<li><a data-base-href="xls" href="xls%s">%s</a></li>') % (
qs, _('Excel Export'))
if self.formdef.geolocations:
r += htmltext(' <li><a data-base-href="map" href="map%s">%s</a></li>') % (
qs, _('Plot on a Map'))
r += htmltext(' <li class="stats"><a href="stats">%s</a></li>') % _('Statistics')
r += htmltext('</ul>')
return r.getvalue()
def get_filter_sidebar(self, selected_filter=None, mode='listing'):
r = TemplateIO(html=True)
waitpoint_status = self.formdef.workflow.get_waitpoint_status()
period_fake_fields = [
FakeField('start', 'period-date', _('Start')),
FakeField('end', 'period-date', _('End')),
]
filter_fields = []
for field in period_fake_fields + self.get_formdef_fields():
field.enabled = False
if field.type not in ('item', 'bool', 'items', 'period-date', 'status'):
continue
if field.type == 'status' and not waitpoint_status:
continue
filter_fields.append(field)
if get_request().form:
field.enabled = 'filter-%s' % field.id in get_request().form
else:
if mode == 'listing':
# enable status filter by default
field.enabled = (field.id in ('status',))
elif mode == 'stats':
# enable period filters by default
field.enabled = (field.id in ('start', 'end'))
if field.type == 'item':
field.enabled = field.in_filters
r += htmltext('<h3><span>%s</span> <span class="change">(<a id="filter-settings">%s</a>)</span></h3>' % (
_('Filters'), _('change')))
for filter_field in filter_fields:
if not filter_field.enabled:
continue
filter_field_key = 'filter-%s-value' % filter_field.id
filter_field_value = get_request().form.get(filter_field_key)
if filter_field.type == 'status':
r += htmltext('<div class="widget">')
r += htmltext('<div class="title">%s</div>') % _('Status to display')
r += htmltext('<div class="content">')
r += htmltext('<select name="filter">')
filters = [
('waiting', _('Waiting for an action'), None),
('all', _('All'), None),
('pending', C_('formdata|Open'), None),
('done', _('Done'), None),
]
for status in waitpoint_status:
filters.append((status.id, status.name, status.colour))
for filter_id, filter_label, filter_colour in filters:
if filter_id == selected_filter:
selected = ' selected="selected"'
else:
selected = ''
style = ''
if filter_colour and filter_colour != 'FFFFFF':
fg_colour = misc.get_foreground_colour(filter_colour)
style = 'style="background: #%s; color: %s;"' % (
filter_colour, fg_colour)
r += htmltext('<option value="%s"%s %s>' % (filter_id, selected, style))
r += htmltext('%s</option>') % filter_label
r += htmltext('</select>')
r += htmltext('</div>')
r += htmltext('</div>')
elif filter_field.type == 'period-date':
r += DateWidget(filter_field_key, title=filter_field.label,
value=filter_field_value, render_br=False).render()
elif filter_field.type in ('item', 'items'):
filter_field.required = False
options = filter_field.get_options()
if options:
if len(options[0]) == 2:
options = [(x[0], x[1], x[0]) for x in options]
options.insert(0, (None, '', ''))
r += SingleSelectWidget(filter_field_key, title=filter_field.label,
options=options, value=filter_field_value,
render_br=False).render()
else:
# There may be no options because the field is using
# a jsonp data source, or a json source using a
# parametrized URL depending on unavailable variables.
#
# In that case fall back on a string widget.
r += StringWidget(filter_field_key, title=filter_field.label,
value=filter_field_value, render_br=False).render()
elif filter_field.type == 'bool':
options = [(None, '', ''), (True, _('Yes'), 'true'), (False, _('No'), 'false')]
r += SingleSelectWidget(filter_field_key, title=filter_field.label,
options=options, value=filter_field_value,
render_br=False).render()
# field filter dialog content
r += htmltext('<div style="display: none;">')
r += htmltext('<ul id="field-filter">')
for field in filter_fields:
r += htmltext('<li><input type="checkbox" name="filter-%s"') % field.id
if field.enabled:
r += htmltext(' checked="checked"')
r += htmltext(' id="fields-filter-%s"') % field.id
r += htmltext('/>')
r += htmltext('<label for="fields-filter-%s">%s</label>') % (
field.id, misc.ellipsize(field.label, 70))
r += htmltext('</li>')
r += htmltext('</ul>')
r += htmltext('</div>')
return r.getvalue()
def get_fields_sidebar(self, selected_filter, fields, offset=None,
limit=None, order_by=None, columns_settings_label=None):
get_response().add_javascript(['jquery.js', 'jquery-ui.js', 'wcs.listing.js'])
r = TemplateIO(html=True)
r += htmltext('<form id="listing-settings" action=".">')
if offset or limit:
if not offset:
offset = 0
r += htmltext('<input type="hidden" name="offset" value="%s"/>') % offset
if limit:
r += htmltext('<input type="hidden" name="limit" value="%s"/>') % limit
if get_publisher().is_using_postgresql():
if order_by is None:
order_by = get_publisher().get_site_option('default-sort-order') or '-receipt_time'
r += htmltext('<input type="hidden" name="order_by" value="%s"/>') % order_by
if get_publisher().is_using_postgresql():
r += htmltext('<h3>%s</h3>') % _('Search')
if get_request().form.get('q'):
q = get_request().form.get('q')
if type(q) is not unicode:
q = unicode(q, get_publisher().site_charset)
r += htmltext('<input class="inline-input" name="q" value="%s">') % q.encode(get_publisher().site_charset)
else:
r += htmltext('<input class="inline-input" name="q">')
r += htmltext('<input type="submit" class="side-button" value="%s"/>') % _('Search')
r += self.get_filter_sidebar(selected_filter=selected_filter)
r += htmltext('<button class="refresh">%s</button>') % _('Refresh')
if columns_settings_label:
r += htmltext('<button id="columns-settings">%s</button>') % columns_settings_label
# column settings dialog content
r += htmltext('<div style="display: none;">')
r += htmltext('<ul id="columns-filter">')
for field in self.get_formdef_fields():
if not hasattr(field, str('get_view_value')):
continue
r += htmltext('<li><input type="checkbox" name="%s"') % field.id
if field.id in [x.id for x in fields]:
r += htmltext(' checked="checked"')
r += htmltext(' id="fields-column-%s"') % field.id
r += htmltext('/>')
r += htmltext('<label for="fields-column-%s">%s</label>') % (
field.id, misc.ellipsize(field.label, 70))
r += htmltext('</li>')
r += htmltext('</ul>')
r += htmltext('</div>')
r += htmltext('</form>')
return r.getvalue()
def get_formdef_fields(self):
fields = []
fields.append(FakeField('id', 'id', _('Number')))
if get_publisher().get_site_option('welco_url', 'variables'):
fields.append(FakeField('submission_channel', 'submission_channel', _('Channel')))
if self.formdef.backoffice_submission_roles:
fields.append(FakeField('submission_agent', 'submission_agent', _('Submission By')))
fields.append(FakeField('time', 'time', _('Created')))
fields.append(FakeField('last_update_time', 'last_update_time', _('Last Modified')))
fields.append(FakeField('user-label', 'user-label', _('User Label')))
fields.extend(self.formdef.get_all_fields())
fields.append(FakeField('status', 'status', _('Status')))
fields.append(FakeField('anonymised', 'anonymised', _('Anonymised')))
return fields
def get_fields_from_query(self, ignore_form=False):
field_ids = [x for x in get_request().form.keys()]
if not field_ids or ignore_form:
field_ids = ['id', 'time', 'last_update_time', 'user-label']
for field in self.formdef.get_all_fields():
if hasattr(field, str('get_view_value')) and field.in_listing:
field_ids.append(field.id)
field_ids.append('status')
fields = []
for field in self.get_formdef_fields():
if field.id in field_ids:
fields.append(field)
if not fields:
return self.get_fields_from_query(ignore_form=True)
return fields
def get_filter_from_query(self, default='waiting'):
if 'filter' in get_request().form:
return get_request().form['filter']
if self.formdef.workflow.possible_status:
return default
return 'all'
def get_criterias_from_query(self):
period_fake_fields = [
FakeField('start', 'period-date', _('Start')),
FakeField('end', 'period-date', _('End')),
]
filter_fields = []
criterias = []
format_string = misc.date_format()
for filter_field in period_fake_fields + self.get_formdef_fields():
if filter_field.type not in ('item', 'bool', 'items', 'period-date'):
continue
filter_field_key = None
if filter_field.varname:
# if this is a field with a varname and filter-%(varname)s is
# present in the query string, enable this filter.
if get_request().form.get('filter-%s' % filter_field.varname):
filter_field_key = 'filter-%s' % filter_field.varname
if get_request().form.get('filter-%s' % filter_field.id):
# if there's a filter-%(id)s, it is used to enable the actual
# filter, and the value will be found in filter-%s-value.
filter_field_key = 'filter-%s-value' % filter_field.id
if not filter_field_key:
# if there's not known filter key, skip.
continue
filter_field_value = get_request().form.get(filter_field_key)
if not filter_field_value:
continue
if filter_field.id == 'start':
period_start = time.strptime(filter_field_value, format_string)
criterias.append(GreaterOrEqual('receipt_time', period_start))
criterias[-1]._label = '%s: %s' % (_('Start'), filter_field_value)
elif filter_field.id == 'end':
period_end = time.strptime(filter_field_value, format_string)
criterias.append(LessOrEqual('receipt_time', period_end))
criterias[-1]._label = '%s: %s' % (_('End'), filter_field_value)
elif filter_field.type in ('item', 'items') and filter_field_value not in (None, 'None'):
if filter_field.type == 'item':
criterias.append(Equal('f%s' % filter_field.id, filter_field_value))
elif filter_field.type == 'items':
criterias.append(Intersects('f%s' % filter_field.id, [filter_field_value]))
field_options = filter_field.get_options()
if field_options and type(field_options[0]) in (list, tuple):
for option in field_options:
if option[0] == filter_field_value or option[-1] == filter_field_value:
filter_field_value = option[1]
break
criterias[-1]._label = '%s: %s' % (filter_field.label, filter_field_value)
elif filter_field.type == 'bool' and filter_field_value not in (None, 'None'):
if filter_field_value == 'true':
criterias.append(Equal('f%s' % filter_field.id, True))
elif filter_field_value == 'false':
criterias.append(Equal('f%s' % filter_field.id, False))
return criterias
def _q_index(self):
self.check_access()
get_logger().info('backoffice - form %s - listing' % self.formdef.name)
fields = self.get_fields_from_query()
selected_filter = self.get_filter_from_query()
criterias = self.get_criterias_from_query()
if get_publisher().is_using_postgresql():
# only enable pagination in SQL mode, as we do not have sorting in
# the other case.
limit = get_request().form.get('limit',
int(get_publisher().get_site_option('default-page-size') or 20))
else:
limit = get_request().form.get('limit', 0)
offset = get_request().form.get('offset', 0)
order_by = get_request().form.get('order_by',
get_publisher().get_site_option('default-sort-order') or '-receipt_time')
query = get_request().form.get('q')
qs = ''
if get_request().get_query():
qs = '?' + get_request().get_query()
table = FormDefUI(self.formdef).listing(fields=fields,
selected_filter=selected_filter,
limit=int(limit), offset=int(offset), query=query,
order_by=order_by, criterias=criterias)
if get_response().status_code == 302:
# catch early redirect
return table
if get_request().form.get('ajax') == 'true':
get_response().filter = None
return table
html_top('management', '%s - %s' % (_('Listing'), self.formdef.name))
r = TemplateIO(html=True)
r += htmltext('<h2>%s - %s</h2>') % (self.formdef.name, _('Listing'))
r += table
get_response().filter['sidebar'] = self.get_formdata_sidebar(qs) + \
self.get_fields_sidebar(selected_filter, fields, limit=limit,
offset=offset, order_by=order_by,
columns_settings_label=_('Columns Settings'))
return r.getvalue()
def csv_tuple_heading(self, fields):
heading_fields = [] # '#id', _('time'), _('userlabel'), _('status')]
for field in fields:
heading_fields.extend(field.get_csv_heading())
return heading_fields
def get_spreadsheet_line(self, fields, data):
elements = []
for field in fields:
element = data.get_field_view_value(field) or ''
display_value = None
if field.store_display_value:
display_value = data.data.get('%s_display' % field.id) or ''
for value in field.get_csv_value(element, display_value=display_value):
elements.append({'field': field, 'value': value, 'native_value': element})
return elements
def csv(self):
self.check_access()
fields = self.get_fields_from_query()
selected_filter = self.get_filter_from_query()
user = get_request().user
query = get_request().form.get('q')
criterias = self.get_criterias_from_query()
class Exporter(object):
def __init__(self, formpage, formdef, fields, selected_filter):
self.formpage = formpage
self.formdef = formdef
self.fields = fields
self.selected_filter = selected_filter
def export(self, job=None):
self.output = cStringIO.StringIO()
csv_output = csv.writer(self.output)
csv_output.writerow(self.formpage.csv_tuple_heading(self.fields))
items, total_count = FormDefUI(self.formdef).get_listing_items(
self.selected_filter, user=user, query=query,
criterias=criterias)
for filled in items:
csv_output.writerow(tuple(
[x['value'] for x in self.formpage.get_spreadsheet_line(self.fields, filled)]))
if job:
job.file_content = self.output.getvalue()
job.content_type = 'text/csv'
job.store()
get_logger().info('backoffice - form %s - listing csv' % self.formdef.name)
count = self.formdef.data_class().count()
exporter = Exporter(self, self.formdef, fields, selected_filter)
if count > 100: # Arbitrary threshold
job = get_response().add_after_job(
str(N_('Exporting forms in CSV')),
exporter.export)
job.file_name = '%s.csv' % self.formdef.url_name
job.store()
return redirect('export?job=%s' % job.id)
else:
exporter.export()
response = get_response()
response.set_content_type('text/plain')
#response.set_header('content-disposition', 'attachment; filename=%s.csv' % self.formdef.url_name)
return exporter.output.getvalue()
def export(self):
self.check_access()
if get_request().form.get('download'):
return self.export_download()
try:
job = AfterJob.get(get_request().form.get('job'))
except KeyError:
return redirect('.')
html_top('management', title=_('Exporting'))
r = TemplateIO(html=True)
r += get_session().display_message()
get_response().add_javascript(['jquery.js', 'afterjob.js'])
r += htmltext('<dl class="job-status">')
r += htmltext('<dt>')
r += _(job.label)
r += htmltext('</dt>')
r += htmltext('<dd>')
r += htmltext('<span class="afterjob" id="%s">') % job.id
r += _(job.status)
r += htmltext('</span>')
r += htmltext('</dd>')
r += htmltext('</dl>')
r += htmltext('<div class="done">')
r += htmltext('<a download="%s" href="export?download=%s">%s</a>') % (
job.file_name, job.id, _('Download Export'))
r += htmltext('</div>')
return r.getvalue()
def export_download(self):
try:
job = AfterJob.get(get_request().form.get('download'))
except KeyError:
return redirect('.')
if not job.status == 'completed':
raise errors.TraversalError()
response = get_response()
response.set_content_type(job.content_type)
response.set_header('content-disposition',
'attachment; filename=%s' % job.file_name)
return job.file_content
def xls(self):
self.check_access()
if xlwt is None:
raise errors.TraversalError()
fields = self.get_fields_from_query()
selected_filter = self.get_filter_from_query()
user = get_request().user
query = get_request().form.get('q')
criterias = self.get_criterias_from_query()
class Exporter(object):
def __init__(self, formpage, formdef, fields, selected_filter):
self.formpage = formpage
self.formdef = formdef
self.fields = fields
self.selected_filter = selected_filter
def export(self, job=None):
w = xlwt.Workbook(encoding=get_publisher().site_charset)
ws = w.add_sheet('1')
for i, f in enumerate(self.formpage.csv_tuple_heading(self.fields)):
ws.write(0, i, f)
items, total_count = FormDefUI(self.formdef).get_listing_items(
self.selected_filter, user=user, query=query,
criterias=criterias)
for i, filled in enumerate(items):
for j, item in enumerate(self.formpage.get_spreadsheet_line(fields, filled)):
elem = item['value']
if elem and len(elem) > 32767:
# xls cells have a limit of 32767 characters, cut
# it down.
elem = elem[:32760] + ' [...]'
ws.write(i+1, j, elem)
self.output = cStringIO.StringIO()
w.save(self.output)
if job:
job.file_content = self.output.getvalue()
job.content_type = 'application/vnd.ms-excel'
job.store()
get_logger().info('backoffice - form %s - as excel' % self.formdef.name)
count = self.formdef.data_class().count()
exporter = Exporter(self, self.formdef, fields, selected_filter)
if count > 100: # Arbitrary threshold
job = get_response().add_after_job(
str(N_('Exporting forms in Excel format')),
exporter.export)
job.file_name = '%s.xls' % self.formdef.url_name
job.store()
return redirect('export?job=%s' % job.id)
else:
exporter.export()
response = get_response()
response.set_content_type('application/vnd.ms-excel')
response.set_header('content-disposition', 'attachment; filename=%s.xls' % self.formdef.url_name)
return exporter.output.getvalue()
def ods(self):
self.check_access()
fields = self.get_fields_from_query()
selected_filter = self.get_filter_from_query()
user = get_request().user
query = get_request().form.get('q')
criterias = self.get_criterias_from_query()
class Exporter(object):
def __init__(self, formpage, formdef, fields, selected_filter):
self.formpage = formpage
self.formdef = formdef
self.fields = fields
self.selected_filter = selected_filter
def export(self, job=None):
w = ods.Workbook(encoding=get_publisher().site_charset)
ws = w.add_sheet('1')
for i, f in enumerate(self.formpage.csv_tuple_heading(self.fields)):
ws.write(0, i, f)
items, total_count = FormDefUI(self.formdef).get_listing_items(
self.selected_filter, user=user, query=query,
criterias=criterias)
for i, formdata in enumerate(items):
for j, item in enumerate(self.formpage.get_spreadsheet_line(fields, formdata)):
ws.write(i+1, j, item['value'],
formdata=formdata,
data_field=item['field'],
native_value=item['native_value'])
self.output = cStringIO.StringIO()
w.save(self.output)
if job:
job.file_content = self.output.getvalue()
job.content_type = 'application/vnd.oasis.opendocument.spreadsheet'
job.store()
get_logger().info('backoffice - form %s - as ods' % self.formdef.name)
count = self.formdef.data_class().count()
exporter = Exporter(self, self.formdef, fields, selected_filter)
if count > 100: # Arbitrary threshold
job = get_response().add_after_job(
str(N_('Exporting forms in Open Document format')),
exporter.export)
job.file_name = '%s.ods' % self.formdef.url_name
job.store()
return redirect('export?job=%s' % job.id)
else:
exporter.export()
response = get_response()
response.set_content_type('application/vnd.oasis.opendocument.spreadsheet')
response.set_header('content-disposition', 'attachment; filename=%s.ods' % self.formdef.url_name)
return exporter.output.getvalue()
def json(self):
anonymise = 'anonymise' in get_request().form
self.check_access()
get_response().set_content_type('application/json')
user = get_user_from_api_query_string() or get_request().user if not anonymise else None
selected_filter = self.get_filter_from_query(default='all')
criterias = self.get_criterias_from_query()
order_by = get_request().form.get('order_by', None)
query = get_request().form.get('q') if not anonymise else None
offset = None
if 'offset' in get_request().form:
offset = int(get_request().form['offset'])
limit = None
if 'limit' in get_request().form:
limit = int(get_request().form['limit'])
items, total_count = FormDefUI(self.formdef).get_listing_items(
selected_filter, user=user, query=query, criterias=criterias,
order_by=order_by, anonymise=anonymise, offset=offset, limit=limit)
if get_request().form.get('full') == 'on':
output = [filled.get_json_export_dict(include_files=False, anonymise=anonymise)
for filled in items]
else:
output = [{'id': filled.id,
'url': filled.get_url(),
'receipt_time': filled.receipt_time,
'last_update_time': filled.last_update_time} for filled in items]
return json.dumps(output,
cls=misc.JSONEncoder,
encoding=get_publisher().site_charset)
def geojson(self):
if not self.formdef.geolocations:
raise errors.TraversalError()
if 'anonymise' in get_request().form:
# api/ will let this pass but we don't want that.
raise errors.AccessForbiddenError()
self.check_access()
get_response().set_content_type('application/json')
user = get_user_from_api_query_string() or get_request().user
selected_filter = self.get_filter_from_query()
fields = self.get_fields_from_query()
criterias = self.get_criterias_from_query()
query = get_request().form.get('q')
items, total_count = FormDefUI(self.formdef).get_listing_items(
selected_filter, user=user, query=query, criterias=criterias)
# only consider first key for now
geoloc_key = self.formdef.geolocations.keys()[0]
return json.dumps(geojson_formdatas(items, fields=fields))
def ics(self):
if 'anonymise' in get_request().form:
# api/ will let this pass but we don't want that.
raise errors.AccessForbiddenError()
self.check_access('ics')
user = get_user_from_api_query_string('ics') or get_request().user
formdef = self.formdef
selected_filter = self.get_filter_from_query()
fields = self.get_fields_from_query()
criterias = self.get_criterias_from_query()
query = get_request().form.get('q')
class IcsDirectory(Directory):
# ics/<component> with <component> being the identifier (varname)
# of the field to use as start date (may be a date field or a
# string field).
def _q_lookup(self, component):
for field in formdef.get_all_fields():
if not getattr(field, 'varname', None) == component:
continue
datefield_field_id = field.id
break
else:
raise errors.TraversalError()
formdatas, total_count = FormDefUI(formdef).get_listing_items(
selected_filter, user=user, query=query, criterias=criterias)
cal = vobject.iCalendar()
cal.add('prodid').value = '-//Entr\'ouvert//NON SGML Publik'
for formdata in formdatas:
if not formdata.data.get(datefield_field_id):
continue
vevent = vobject.newFromBehavior('vevent')
vevent.add('uid').value = '%s-%s-%s' % (
get_request().get_server().lower(),
formdef.url_name,
formdata.id)
vevent.add('summary').value = formdata.get_display_name()
vevent.add('dtstart').value = make_datetime(formdata.data[datefield_field_id])
vevent.dtstart.value_param = 'DATE'
vevent.add('url').value = formdata.get_url(backoffice=True)
cal.add(vevent)
get_response().set_content_type('text/calendar')
return cal.serialize()
return IcsDirectory()
def map(self):
get_response().add_javascript(['qommon.map.js'])
html_top('management', '%s - %s' % (_('Form'), self.formdef.name))
r = TemplateIO(html=True)
get_response().breadcrumb.append(('map', _('Map')))
attrs = {
'class': 'qommon-map',
'id': 'backoffice-map',
'data-readonly': True,
'data-geojson-url': '%s/geojson?%s' % (
get_request().get_url(1), get_request().get_query())
}
attrs.update(get_publisher().get_map_attributes())
fields = self.get_fields_from_query()
selected_filter = self.get_filter_from_query()
get_response().filter['sidebar'] = self.get_fields_sidebar(selected_filter,
fields, columns_settings_label=_('Markers Settings'))
r += htmltext('<h2>%s - %s</h2>') % (self.formdef.name, _('Map'))
r += htmltext('<div %s></div>' % ' '.join(['%s="%s"' % x for x in attrs.items()]))
return r.getvalue()
def get_stats_sidebar(self, selected_filter):
get_response().add_javascript(['jquery.js', 'jquery-ui.js', 'wcs.listing.js'])
r = TemplateIO(html=True)
r += htmltext('<form id="listing-settings" action="stats">')
r += self.get_filter_sidebar(selected_filter=selected_filter, mode='stats')
r += htmltext('<button class="refresh">%s</button>') % _('Refresh')
if misc.can_decorate_as_pdf():
r += htmltext('<button class="pdf">%s</button>') % _('Download PDF')
r += htmltext('</form>')
return r.getvalue()
def stats(self):
self.check_access()
get_logger().info('backoffice - form %s - stats' % self.formdef.name)
html_top('management', '%s - %s' % (_('Form'), self.formdef.name))
r = TemplateIO(html=True)
get_response().breadcrumb.append( ('stats', _('Statistics')) )
selected_filter = self.get_filter_from_query(default='all')
criterias = self.get_criterias_from_query()
get_response().filter['sidebar'] = self.get_formdata_sidebar() + \
self.get_stats_sidebar(selected_filter)
do_graphs = get_publisher().is_using_postgresql()
if selected_filter and selected_filter != 'all':
if selected_filter == 'pending':
applied_filters = ['wf-%s' % x.id for x in
self.formdef.workflow.get_not_endpoint_status()]
criteria_label = _('Status: %s') % _('Pending')
elif selected_filter == 'done':
applied_filters = ['wf-%s' % x.id for x in
self.formdef.workflow.get_endpoint_status()]
criteria_label = _('Status: %s') % _('Done')
else:
criteria_label = _('Status: %s') % self.formdef.workflow.get_status(
selected_filter).name
applied_filters = ['wf-%s' % selected_filter]
criterias.append(Or([Equal('status', x) for x in applied_filters]))
criterias[-1]._label = criteria_label
displayed_criterias = criterias
else:
displayed_criterias = criterias
criterias = [NotEqual('status', 'draft')] + displayed_criterias
values = self.formdef.data_class().select(criterias)
if get_publisher().is_using_postgresql():
# load all evolutions in a single batch, to avoid as many query as
# there are formdata when computing resolution times statistics.
self.formdef.data_class().load_all_evolutions(values)
r += htmltext('<div id="statistics">')
if displayed_criterias:
r += htmltext('<div class="criterias bo-block">')
r += htmltext('<h2>%s</h2>') % _('Filters')
r += htmltext('<ul>')
for criteria in displayed_criterias:
criteria_label = getattr(criteria, '_label', None)
if criteria_label:
r += htmltext('<li>%s</li>') % criteria_label
r += htmltext('</ul></div>')
if do_graphs:
r += htmltext('<div class="splitcontent-left">')
no_forms = len(values)
r += htmltext('<div class="bo-block">')
r += htmltext('<p>%s %d</p>') % (_('Total number of records:'), no_forms)
if self.formdef.workflow:
r += htmltext('<ul>')
for status in self.formdef.workflow.possible_status:
r += htmltext('<li>%s: %d</li>') % (status.name,
len([x for x in values if x.status == 'wf-%s' % status.id]))
r += htmltext('</ul>')
r += htmltext('</div>')
excluded_fields = []
for criteria in displayed_criterias:
if not isinstance(criteria, Equal):
continue
excluded_fields.append(criteria.attribute[1:])
stats_for_fields = self.stats_fields(values,
excluded_fields=excluded_fields)
if stats_for_fields:
r += htmltext('<div class="bo-block">')
r += stats_for_fields
r += htmltext('</div>')
stats_times = self.stats_resolution_time(values)
if stats_times:
r += htmltext('<div class="bo-block">')
r += stats_times
r += htmltext('</div>')
if do_graphs:
r += htmltext('</div>')
r += htmltext('<div class="splitcontent-right">')
criterias.append(Equal('formdef_id', int(self.formdef.id)))
r += do_graphs_section(criterias=criterias)
r += htmltext('</div>')
r += htmltext('</div>') # id="statistics"
if get_request().form.get('ajax') == 'true':
get_response().filter = None
return r.getvalue()
page = TemplateIO(html=True)
page += htmltext('<h2>%s - %s</h2>') % (self.formdef.name, _('Statistics'))
page += htmltext(r)
page += htmltext('<a class="back" href=".">%s</a>') % _('Back')
if 'pdf' in get_request().form:
pdf_content = misc.decorate_as_pdf(page.getvalue())
response = get_response()
response.set_content_type('application/pdf')
return pdf_content
return page.getvalue()
def stats_fields(self, values, excluded_fields=None):
r = TemplateIO(html=True)
had_page = False
last_page = None
last_title = None
for f in self.formdef.get_all_fields():
if excluded_fields and f.id in excluded_fields:
continue
if f.type == 'page':
last_page = f.label
last_title = None
continue
if f.type == 'title':
last_title = f.label
continue
if not f.stats:
continue
t = f.stats(values)
if not t:
continue
if last_page:
if had_page:
r += htmltext('</div>')
r += htmltext('<div class="page">')
r += htmltext('<h3>%s</h3>') % last_page
had_page = True
last_page = None
if last_title:
r += htmltext('<h3>%s</h3>') % last_title
last_title = None
r += t
if had_page:
r += htmltext('</div>')
return r.getvalue()
def stats_resolution_time(self, values):
possible_status = [('wf-%s' % x.id, x.id) for x in self.formdef.workflow.possible_status]
if len(possible_status) < 2:
return
r = TemplateIO(html=True)
r += htmltext('<h2>%s</h2>') % _('Resolution time')
for status, status_id in possible_status:
res_time_forms = [
(time.mktime(x.evolution[-1].time) - time.mktime(x.receipt_time)) \
for x in values if x.status == status and x.evolution]
if not res_time_forms:
continue
res_time_forms.sort()
sum_times = sum(res_time_forms)
len_times = len(res_time_forms)
r += htmltext('<h3>%s</h3>') % (_('To Status "%s"') % self.formdef.workflow.get_status(status_id).name)
r += htmltext('<ul>')
r += htmltext(' <li>%s %s</li>') % (_('Minimum Time:'), format_time(min(res_time_forms)))
r += htmltext(' <li>%s %s</li>') % (_('Maximum Time:'), format_time(max(res_time_forms)))
r += htmltext(' <li>%s %s</li>') % (_('Range:'), format_time(max(res_time_forms)-min(res_time_forms)))
mean = sum_times/len_times
r += htmltext(' <li>%s %s</li>') % (_('Mean:'), format_time(mean))
if len_times % 2:
median = res_time_forms[len_times/2]
else:
midpt = len_times/2
median = (res_time_forms[midpt-1]+res_time_forms[midpt])/2
r += htmltext(' <li>%s %s</li>') % (_('Median:'), format_time(median))
# variance...
x = 0
for t in res_time_forms:
x += (t - mean)**2.0
try:
variance = x/(len_times+1)
except:
variance = 0
# not displayed since in square seconds which is not easy to grasp
from math import sqrt
# and standard deviation
std_dev = sqrt(variance)
r += htmltext(' <li>%s %s</li>') % (_('Standard Deviation:'), format_time(std_dev))
r += htmltext('</ul>')
return r.getvalue()
def _q_lookup(self, component):
if component == 'ics':
return self.ics()
try:
filled = self.formdef.data_class().get(component)
except KeyError:
raise errors.TraversalError()
return FormBackOfficeStatusPage(self.formdef, filled)
class FormBackOfficeStatusPage(FormStatusPage):
_q_exports_orig = ['', 'download', 'json', 'action', 'inspect']
form_page_class = FormFillPage
def html_top(self, title = None):
return html_top('management', title)
def _q_index(self):
if self.filled.status == 'draft':
if self.filled.backoffice_submission:
for role in get_request().user.roles or []:
if role in self.formdef.backoffice_submission_roles:
return redirect('../../../submission/%s/%s' % (
self.formdef.url_name, self.filled.id))
raise errors.AccessForbiddenError()
get_response().filter['sidebar'] = self.get_sidebar()
return self.status()
def receipt(self, *args, **kwargs):
r = TemplateIO(html=True)
if get_session() and get_session().is_anonymous_submitter(self.filled):
r += htmltext('<div class="infonotice">')
r += _('This form has been accessed via its tracking code, it is '
'therefore displayed like you were also its owner.')
r += htmltext('</div>')
r += super(FormBackOfficeStatusPage, self).receipt(*args, **kwargs)
return r.getvalue()
def get_sidebar(self):
return self.get_extra_context_bar()
def get_extra_context_bar(self):
formdata = self.filled
r = TemplateIO(html=True)
if not formdata.is_draft():
r += htmltext('<div class="extra-context">')
if (formdata.backoffice_submission and formdata.submission_context and
formdata.submission_context.get('agent_id') == get_request().user.id and
formdata.tracking_code and
time.time() - time.mktime(formdata.receipt_time) < 30*60):
# keep displaying tracking code to submission agent for 30
# minutes after submission
r += htmltext('<h3>%s</h3>') % _('Tracking Code')
r += htmltext('<p>%s</p>') % formdata.tracking_code
r += htmltext('<h3>%s</h3>') % _('General Information')
r += htmltext('<p>')
tm = misc.localstrftime(formdata.receipt_time)
agent_user = None
if formdata.submission_context and 'agent_id' in formdata.submission_context:
agent_user = get_publisher().user_class.get(
formdata.submission_context['agent_id'], ignore_errors=True)
if agent_user:
r += _('The form has been recorded on %(date)s with the number %(number)s by %(agent)s.') % {
'date': tm, 'number': formdata.get_display_id(),
'agent': agent_user.get_display_name()}
else:
r += _('The form has been recorded on %(date)s with the number %(number)s.') % {
'date': tm, 'number': formdata.get_display_id()}
r += htmltext('</p>')
try:
status_colour = formdata.get_status().colour
except AttributeError:
status_colour = 'ffffff'
fg_colour = misc.get_foreground_colour(status_colour)
r += htmltext('<p class="current-status"><span class="item" style="background: #%s; color: %s;"></span>' %
(status_colour, fg_colour))
r += htmltext('<span>%s %s</span></p>') % (_('Status:'), formdata.get_status_label())
if formdata.formdef.workflow.criticality_levels:
try:
level = formdata.get_criticality_level_object()
except IndexError:
pass
else:
r += htmltext('<p class="current-level">')
if level.colour:
r += htmltext('<span class="item" style="background: #%s;"></span>' % level.colour)
r += htmltext('<span>%s %s</span></p>') % (_('Criticality Level:'), level.name)
if formdata.anonymised:
r += htmltext('<div class="infonotice">')
r += htmltext(_('This form has been anonymised on %(date)s.')) % {
'date': formdata.anonymised.strftime(misc.date_format())}
r += htmltext('</div>')
r += htmltext('</div>')
if formdata.submission_context or formdata.submission_channel:
extra_context = formdata.submission_context or {}
r += htmltext('<div class="extra-context">')
if extra_context.get('orig_formdef_id'):
r += htmltext('<h3>%s</h3>') % _('Original form')
try:
formdata = FormDef.get(extra_context.get('orig_formdef_id')
).data_class().get(extra_context.get('orig_formdata_id'))
except KeyError:
r += htmltext('<p>%s</p>') % _('(deleted)')
else:
r += htmltext('<p><a href="%s">%s %s</a></p>') % (
formdata.get_url(backoffice=True),
formdata.formdef.name,
formdata.get_display_id())
if formdata.submission_channel:
r += htmltext('<h3>%s</h3>') % '%s: %s' % (
_('Channel'), formdata.get_submission_channel_label())
if extra_context.get('thumbnail_url'):
r += htmltext('<p class="thumbnail"><img src="%s" alt=""/></p>'
) % extra_context.get('thumbnail_url')
if extra_context.get('mail_url'):
r += htmltext('<p><a href="%s">%s</a></p>') % (
extra_context.get('mail_url'), _('Open'))
if extra_context.get('comments'):
r += htmltext('<h3>%s</h3>') % _('Comments')
r += htmltext('<p>%s</p>') % extra_context.get('comments')
if extra_context.get('summary_url'):
r += htmltext('<div data-content-url="%s"></div>' %
(extra_context.get('summary_url')))
r += htmltext('</div>')
if formdata.user_id and formdata.get_user():
r += htmltext('<div class="extra-context">')
r += htmltext('<h3>%s</h3>') % _('Associated User')
r += htmltext('<p>%s</p>') % formdata.get_user().display_name
r += htmltext('</div>')
if formdata.formdef.geolocations and formdata.geolocations:
r += htmltext('<div class="geolocations">')
for geoloc_key in formdata.formdef.geolocations:
if not geoloc_key in formdata.geolocations:
continue
r += htmltext('<h3>%s</h3>') % formdata.formdef.geolocations[geoloc_key]
geoloc_value = formdata.geolocations[geoloc_key]
map_widget = MapWidget('geoloc_%s' % geoloc_key,
readonly=True,
value='%(lat)s;%(lon)s' % geoloc_value)
r += map_widget.render()
r += htmltext('</div>')
if formdata.user_id and get_publisher().is_using_postgresql():
# display list of open formdata for the same user
user_roles = [logged_users_role().id] + (get_request().user.roles or [])
criterias = [Equal('is_at_endpoint', False),
Equal('user_id', str(formdata.user_id)),
Intersects('concerned_roles_array', user_roles),
]
from wcs import sql
formdatas = sql.AnyFormData.select(criterias, order_by='receipt_time')
self.filled.related_user_forms = formdatas
if formdatas:
r += htmltext('<div class="user-pending-forms">')
r += htmltext('<h3>%s</h3>') % _('User Pending Forms')
categories = {}
formdata_by_category = {}
for formdata in formdatas:
if not formdata.formdef.category_id in categories:
categories[formdata.formdef.category_id] = formdata.formdef.category
formdata_by_category[formdata.formdef.category_id] = []
formdata_by_category[formdata.formdef.category_id].append(formdata)
cats = categories.values()
Category.sort_by_position(cats)
if self.formdef.category_id in categories:
# move current category to the top
cats.remove(categories[self.formdef.category_id])
cats.insert(0, categories[self.formdef.category_id])
for cat in cats:
if len(cats) > 1:
if cat is None:
r += htmltext('<h4>%s</h4>') % _('Misc')
cat_formdatas = formdata_by_category[None]
else:
r += htmltext('<h4>%s</h4>') % cat.name
cat_formdatas = formdata_by_category[cat.id]
else:
cat_formdatas = formdatas
r += htmltext('<ul>')
for formdata in cat_formdatas:
status = formdata.get_status()
if status:
status_label = status.name
else:
status_label = _('Unknown')
submit_date = misc.strftime.strftime(
misc.date_format(), formdata.receipt_time)
if str(formdata.formdef_id) == str(self.formdef.id) and (
str(formdata.id) == str(self.filled.id)):
r += htmltext('<li class="self"><span class="formname">%s</span> '
'(<span class="id">%s</span>), '
'<span class="datetime">%s</span> '
'<span class="status">(%s)</span>' % (
formdata.formdef.name,
formdata.get_display_id(),
submit_date, status_label))
else:
r += htmltext('<li><a href="%s">%s</a> '
'(<span class="id">%s</span>), '
'<span class="datetime">%s</span> '
'<span class="status">(%s)</span>' % (
formdata.get_url(backoffice=True),
formdata.formdef.name,
formdata.get_display_id(),
submit_date, status_label))
r += htmltext('</ul>')
r += htmltext('</div>')
return r.getvalue()
def inspect(self):
if not (get_publisher().get_backoffice_root().is_accessible('forms') or
get_publisher().get_backoffice_root().is_accessible('workflows')):
raise errors.AccessForbiddenError()
charset = get_publisher().site_charset
get_response().breadcrumb.append(('inspect', _('Form Inspector')))
self.html_top(self.formdef.name)
r = TemplateIO(html=True)
r += htmltext('<div class="bo-block">')
r += htmltext('<h2>%s</h2>') % _('Form Inspector')
r += htmltext('<ul class="biglist form-inspector">')
r += htmltext(' <li><h3>%s</h3></li>') % _('Substitution variables')
substvars = self.filled.get_substitution_variables()
substvars.update(self.filled.formdef.get_substitution_variables())
def safe(v):
if isinstance(v, str):
try:
unicode(v, charset)
except UnicodeDecodeError:
v = repr(v)
else:
try:
v = unicode(v).encode(charset)
except:
v = repr(v)
return v
backward_compatibility_varnames = ['attachments']
for k, v in sorted(substvars.items()):
if k in backward_compatibility_varnames:
continue
k = safe(k)
r += htmltext('<li><code title="%s">%s</code>') % (k, k)
r += htmltext(' <div class="value"><span>%s</span>') % ellipsize(safe(v), 10000)
if not isinstance(v, basestring):
r += htmltext(' <span class="type">(%r)</span>') % type(v)
r += htmltext('</div></li>')
if '_markers_stack' in (self.filled.workflow_data or {}):
r += htmltext('<li><h3>%s</h3></li>') % _('Markers Stack')
for marker in reversed(self.filled.workflow_data['_markers_stack']):
status = self.filled.get_status(marker['status_id'])
if status:
r += htmltext('<li><span class="status">%s</span></li>') % status.name
else:
r += htmltext('<li><span class="status">%s</span></li>') % _('Unknown')
r += htmltext('</ul>')
r += htmltext('</div>')
return r.getvalue()
class FakeField(object):
def __init__(self, id, type_, label):
self.id = id
self.type = type_
self.label = label
self.fake = True
self.varname = None
self.store_display_value = None
def get_view_value(self, value):
# just here to quack like a duck
return None
def get_csv_heading(self):
return [self.label]
def get_csv_value(self, element, **kwargs):
return [element]
def do_graphs_section(period_start=None, period_end=None, criterias=None):
from wcs import sql
r = TemplateIO(html=True)
monthly_totals = sql.get_monthly_totals(period_start, period_end, criterias)[-12:]
yearly_totals = sql.get_yearly_totals(period_start, period_end, criterias)[-10:]
if not monthly_totals:
monthly_totals = [('%s-%s' % datetime.date.today().timetuple()[:2], 0)]
if not yearly_totals:
yearly_totals = [(datetime.date.today().year, 0)]
weekday_totals = sql.get_weekday_totals(period_start, period_end, criterias)
weekday_line = []
weekday_names = [_('Sunday'), _('Monday'), _('Tuesday'),
_('Wednesday'), _('Thursday'), _('Friday'), _('Saturday')]
for weekday, total in weekday_totals:
label = weekday_names[weekday]
weekday_line.append((label, total))
# move Sunday to the last place
weekday_line = weekday_line[1:] + [weekday_line[0]]
hour_totals = sql.get_hour_totals(period_start, period_end, criterias)
r += htmltext('''<script>
var weekday_line = %(weekday_line)s;
var hour_line = %(hour_line)s;
var month_line = %(month_line)s;
var year_line = %(year_line)s;
</script>''' % {
'weekday_line': json.dumps(weekday_line),
'hour_line': json.dumps(hour_totals),
'month_line': json.dumps(monthly_totals),
'year_line': json.dumps(yearly_totals),
})
if len(yearly_totals) > 1:
r += htmltext('<h3>%s</h3>') % _('Submissions by year')
r += htmltext('<div id="chart_years" style="height:160px; width:100%;"></div>')
r += htmltext('<h3>%s</h3>') % _('Submissions by month')
r += htmltext('<div id="chart_months" style="height:160px; width:100%;"></div>')
r += htmltext('<h3>%s</h3>') % _('Submissions by weekday')
r += htmltext('<div id="chart_weekdays" style="height:160px; width:100%;"></div>')
r += htmltext('<h3>%s</h3>') % _('Submissions by hour')
r += htmltext('<div id="chart_hours" style="height:160px; width:100%;"></div>')
get_response().add_javascript(['jquery.js', 'jqplot/jquery.jqplot.min.js',
'jqplot/plugins/jqplot.canvasTextRenderer.min.js',
'jqplot/plugins/jqplot.canvasAxisLabelRenderer.min.js',
'jqplot/plugins/jqplot.canvasAxisTickRenderer.min.js',
'jqplot/plugins/jqplot.categoryAxisRenderer.min.js',
'jqplot/plugins/jqplot.barRenderer.min.js',
])
get_response().add_javascript_code('''
function wcs_draw_graphs() {
$.jqplot ('chart_weekdays', [weekday_line], {
series:[{renderer:$.jqplot.BarRenderer}],
axesDefaults: {
tickRenderer: $.jqplot.CanvasAxisTickRenderer,
tickOptions: { angle: -30, }
},
axes: { xaxis: { renderer: $.jqplot.CategoryAxisRenderer } }
});
$.jqplot ('chart_hours', [hour_line], {
axesDefaults: {
tickRenderer: $.jqplot.CanvasAxisTickRenderer,
tickOptions: { angle: -30, }
},
axes: { xaxis: { renderer: $.jqplot.CategoryAxisRenderer }, yaxis: {min: 0} }
});
$.jqplot ('chart_months', [month_line], {
axesDefaults: {
tickRenderer: $.jqplot.CanvasAxisTickRenderer,
tickOptions: { angle: -30, }
},
axes: { xaxis: { renderer: $.jqplot.CategoryAxisRenderer }, yaxis: {min: 0} }
});
if ($('#chart_years').length) {
$.jqplot ('chart_years', [year_line], {
series:[{renderer:$.jqplot.BarRenderer}],
axesDefaults: {
tickRenderer: $.jqplot.CanvasAxisTickRenderer,
tickOptions: { angle: -30, }
},
axes: { xaxis: { renderer: $.jqplot.CategoryAxisRenderer }, yaxis: {min: 0} }
});
}
}
$(document).ready(function(){
wcs_draw_graphs();
});
''')
return r.getvalue()
def get_global_criteria(request, parsed_values=None):
"""
Parses the request query string and returns a list of criterias suitable
for select() usage. The parsed_values parameter can be given a dictionary,
to be filled with the parsed values.
"""
format_string = misc.date_format()
criterias = [NotEqual('status', 'draft')]
try:
period_start = time.strptime(request.form.get('start'), format_string)
criterias.append(GreaterOrEqual('receipt_time', period_start))
if parsed_values is not None:
parsed_values['period_start'] = datetime.datetime.fromtimestamp(time.mktime(period_start))
except (ValueError, TypeError):
pass
try:
period_end = time.strptime(request.form.get('end'), format_string)
criterias.append(LessOrEqual('receipt_time', period_end))
if parsed_values is not None:
parsed_values['period_end'] = datetime.datetime.fromtimestamp(time.mktime(period_end))
except (ValueError, TypeError):
pass
return criterias
def format_time(t, units = 2):
days = int(t/86400)
hours = int((t-days*86400)/3600)
minutes = int((t-days*86400-hours*3600)/60)
seconds = t % 60
if units == 1:
if days:
return _('%d day(s)') % days
if hours:
return _('%d hour(s)') % hours
if minutes:
return _('%d minute(s)') % minutes
elif units == 2:
if days:
return _('%(days)d day(s) and %(hours)d hour(s)') % {
'days': days, 'hours': hours}
if hours:
return _('%(hours)d hour(s) and %(minutes)d minute(s)') % {
'hours': hours, 'minutes': minutes}
if minutes:
return _('%(minutes)d minute(s) and %(seconds)d seconds') % {
'minutes': minutes, 'seconds': seconds}
return _('%d seconds') % seconds