wcs/wcs/backoffice/management.py

1248 lines
50 KiB
Python

# w.c.s. - web application for online forms
# Copyright (C) 2005-2015 Entr'ouvert
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import csv
import cStringIO
import datetime
import json
import time
try:
import xlwt
except ImportError:
xlwt = None
from quixote import get_session, get_publisher, get_request, get_response, redirect
from quixote.directory import Directory
from quixote.html import TemplateIO, htmltext
from qommon.backoffice.menu import html_top
from qommon import misc, get_logger
from qommon.afterjobs import AfterJob
from qommon import errors
from qommon import ods
from qommon.form import *
from qommon.storage import Equal, NotEqual, LessOrEqual, GreaterOrEqual, Or
from wcs.forms.backoffice import FormDefUI
from wcs.forms.common import FormStatusPage
from wcs.categories import Category
from wcs.formdef import FormDef
class ManagementDirectory(Directory):
_q_exports = ['', 'statistics']
def _q_traverse(self, path):
get_response().breadcrumb.append(('management/', _('Management')))
return super(ManagementDirectory, self)._q_traverse(path)
def _q_index(self):
html_top('management', _('Management'))
get_response().filter['sidebar'] = self.get_sidebar()
r = TemplateIO(html=True)
user = get_request().user
forms_without_pending_stuff = []
forms_with_pending_stuff = []
def append_form_entry(formdef):
formdef_data_class = formdef.data_class()
count_forms = formdef_data_class.count() - len(formdef_data_class.get_ids_with_indexed_value('status', 'draft'))
not_endpoint_status = formdef.workflow.get_not_endpoint_status()
not_endpoint_status_ids = ['wf-%s' % x.id for x in not_endpoint_status]
pending_forms = []
for status in not_endpoint_status_ids:
pending_forms.extend(formdef_data_class.get_ids_with_indexed_value(
'status', status))
if formdef.acl_read != 'all' and pending_forms:
concerned_ids = set()
formdata_class = formdef.data_class()
user_roles = set(user.roles or [])
for role in user_roles:
concerned_ids |= set(formdata_class.get_ids_with_indexed_value(
'concerned_roles', str(role)))
pending_forms = set(pending_forms).intersection(concerned_ids)
if len(pending_forms) == 0:
forms_without_pending_stuff.append((formdef, len(pending_forms), count_forms))
else:
forms_with_pending_stuff.append((formdef, len(pending_forms), count_forms))
if user:
for formdef in FormDef.select(order_by='name', ignore_errors=True):
if formdef.disabled:
continue
if user.is_admin or formdef.is_of_concern_for_user(user):
append_form_entry(formdef)
if forms_with_pending_stuff:
r += htmltext('<div class="bo-block" id="forms-in-your-care">')
r += htmltext('<h2>%s</h2>') % _('Forms in your care')
r += self.display_forms(forms_with_pending_stuff)
r += htmltext('</div>')
if forms_without_pending_stuff:
r += htmltext('<div class="bo-block" id="other-forms">')
r += htmltext('<h2>%s</h2>') % _('Other Forms')
r += self.display_forms(forms_without_pending_stuff)
r += htmltext('</div>')
return r.getvalue()
def get_sidebar(self):
r = TemplateIO(html=True)
r += htmltext('<div class="bo-block">')
r += htmltext('<ul id="sidebar-actions">')
r += htmltext('<li><a href="statistics">%s</a></li>') % _('Global statistics')
r += htmltext('</ul>')
r += htmltext('</div>')
return r.getvalue()
def get_stats_sidebar(self):
get_response().add_javascript(['jquery.js'])
DateWidget.prepare_javascript()
form = Form(use_tokens=False)
form.add(DateWidget, 'start', title=_('Start Date'))
form.add(DateWidget, 'end', title=_('End Date'))
form.add_submit('submit', _('Submit'))
r = TemplateIO(html=True)
r += htmltext('<h3>%s</h3>') % _('Period')
r += form.render()
r += htmltext('<h3>%s</h3>') % _('Shortcuts')
r += htmltext('<ul>') # presets
current_month_start = datetime.datetime.now().replace(day=1)
start = current_month_start.strftime(misc.date_format())
r += htmltext(' <li><a href="?start=%s">%s</a>') % (start, _('Current Month'))
previous_month_start = current_month_start - datetime.timedelta(days=2)
previous_month_start = previous_month_start.replace(day=1)
start = previous_month_start.strftime(misc.date_format())
end = current_month_start.strftime(misc.date_format())
r += htmltext(' <li><a href="?start=%s&end=%s">%s</a>') % (
start, end, _('Previous Month'))
current_year_start = datetime.datetime.now().replace(month=1, day=1)
start = current_year_start.strftime(misc.date_format())
r += htmltext(' <li><a href="?start=%s">%s</a>') % (start, _('Current Year'))
previous_year_start = current_year_start.replace(year=current_year_start.year-1)
start = previous_year_start.strftime(misc.date_format())
end = current_year_start.strftime(misc.date_format())
r += htmltext(' <li><a href="?start=%s&end=%s">%s</a>') % (
start, end, _('Previous Year'))
return r.getvalue()
def statistics(self):
html_top('management', _('Global statistics'))
get_response().breadcrumb.append(('statistics', _('Global statistics')))
get_response().filter['sidebar'] = self.get_stats_sidebar()
r = TemplateIO(html=True)
r += htmltext('<h2>%s</h2>') % _('Global statistics')
formdefs = FormDef.select(lambda x: not x.is_disabled() or x.disabled_redirection,
order_by='name', ignore_errors=True)
counts = {}
parsed_values = {}
criterias = get_stats_criteria(get_request(), parsed_values)
for formdef in formdefs:
values = formdef.data_class().select(criterias)
counts[formdef.id] = len(values)
do_graphs = False
if get_publisher().is_using_postgresql() and \
get_publisher().get_site_option('postgresql_views') != 'false':
do_graphs = True
r += htmltext('<p>%s %s</p>') % (_('Total count:'), sum(counts.values()))
if do_graphs:
r += htmltext('<div class="splitcontent-left">')
cats = Category.select()
for cat in cats:
category_formdefs = [x for x in formdefs if x.category_id == cat.id]
r += self.category_global_stats(cat.name, category_formdefs, counts)
category_formdefs = [x for x in formdefs if x.category_id is None]
r += self.category_global_stats(_('Misc'), category_formdefs, counts)
if do_graphs:
r += htmltext('</div>')
r += htmltext('<div class="splitcontent-right">')
period_start = parsed_values.get('period_start')
period_end = parsed_values.get('period_end')
r += do_graphs_section(period_start, period_end)
r += htmltext('</div>')
return r.getvalue()
def category_global_stats(self, title, category_formdefs, counts):
r = TemplateIO(html=True)
category_formdefs_ids = [x.id for x in category_formdefs]
if not category_formdefs:
return
cat_counts = dict([(x, y) for x, y in counts.items() if x in
category_formdefs_ids])
if sum(cat_counts.values()) == 0:
return
r += htmltext('<div class="bo-block">')
r += htmltext('<h3>%s</h3>') % title
r += htmltext('<p>%s %s</p>') % (_('Count:'), sum(cat_counts.values()))
r += htmltext('<ul>')
for category_formdef in category_formdefs:
if not counts.get(category_formdef.id):
continue
r += htmltext('<li>%s %s</li>') % (
_('%s:') % category_formdef.name,
counts.get(category_formdef.id))
r += htmltext('</ul>')
r += htmltext('</div>')
return r.getvalue()
def display_forms(self, forms_list):
r = TemplateIO(html=True)
r += htmltext('<ul class="biglist">')
cats = Category.select(order_by = 'name')
for c in cats + [None]:
if c is None:
l2 = [x for x in forms_list if not x[0].category_id]
cat_name = _('Misc')
else:
l2 = [x for x in forms_list if x[0].category_id == c.id]
cat_name = c.name
if not l2:
continue
r += htmltext('<li><h3>%s</h3></li>') % cat_name
for formdef, no_pending, no_total in l2:
r += htmltext('<li><strong><a href="%s/">%s</a></strong>') % (formdef.url_name, formdef.name)
klass = ''
if no_total:
klass = 'badge'
r += htmltext('<p class="details %s">' % klass)
if no_pending:
r += _('%(pending)s open on %(total)s') % {'pending': no_pending,
'total': no_total}
else:
r += _('%(total)s items') % {'total': no_total}
r += htmltext('</p>')
r += htmltext('</li>')
r += htmltext('</ul>')
return r.getvalue()
def _q_lookup(self, component):
return FormPage(component)
class FormPage(Directory):
_q_exports = ['', 'csv', 'stats', 'xls', 'ods', 'json', 'pending', 'export']
def __init__(self, component):
try:
self.formdef = FormDef.get_by_urlname(component)
except KeyError:
raise errors.TraversalError()
session = get_session()
user = get_request().user
if user is None and get_publisher().user_class.count() == 0:
user = get_publisher().user_class()
user.is_admin = True
if not user:
raise errors.AccessUnauthorizedError()
if not user.is_admin and not self.formdef.is_of_concern_for_user(user):
if session.user:
raise errors.AccessForbiddenError()
else:
raise errors.AccessUnauthorizedError()
get_response().breadcrumb.append( (component + '/', self.formdef.name) )
def get_formdata_sidebar(self, qs=''):
r = TemplateIO(html=True)
r += htmltext('<ul id="sidebar-actions">')
#' <li><a href="list%s">%s</a></li>' % (qs, _('List of results'))
r += htmltext(' <li><a data-base-href="ods" href="ods%s">%s</a></li>') % (
qs, _('Open Document Format Export'))
r += htmltext(' <li><a data-base-href="csv" href="csv%s">%s</a></li>') % (
qs, _('CSV Export'))
if xlwt:
r += htmltext('<li><a data-base-href="xls" href="xls%s">%s</a></li>') % (
qs, _('Excel Export'))
r += htmltext(' <li><a href="stats">%s</a></li>') % _('Statistics')
r += htmltext('</ul>')
return r.getvalue()
def get_filter_sidebar(self, selected_filter=None, mode='listing'):
r = TemplateIO(html=True)
waitpoint_status = self.formdef.workflow.get_waitpoint_status()
period_fake_fields = [
FakeField('start', 'period-date', _('Start')),
FakeField('end', 'period-date', _('End')),
]
filter_fields = []
for field in period_fake_fields + self.get_formdef_fields():
field.enabled = False
if field.type not in ('item', 'period-date', 'status'):
continue
if field.type == 'status' and not waitpoint_status:
continue
filter_fields.append(field)
if get_request().form:
field.enabled = 'filter-%s' % field.id in get_request().form
else:
if mode == 'listing':
# enable status filter by default
field.enabled = (field.id in ('status',))
elif mode == 'stats':
# enable period filters by default
field.enabled = (field.id in ('start', 'end'))
r += htmltext('<h3><span>%s</span> <span class="change">(<a id="filter-settings">%s</a>)</span></h3>' % (
_('Filters'), _('change')))
for filter_field in filter_fields:
if not filter_field.enabled:
continue
filter_field_key = 'filter-%s-value' % filter_field.id
filter_field_value = get_request().form.get(filter_field_key)
if filter_field.type == 'status':
r += htmltext('<div class="widget">')
r += htmltext('<div class="title">%s</div>') % _('Status to display')
r += htmltext('<div class="content">')
r += htmltext('<select name="filter">')
filters = [('all', _('All'), None),
('pending', _('Pending'), None),
('done', _('Done'), None)]
for status in waitpoint_status:
filters.append((status.id, status.name, status.colour))
for filter_id, filter_label, filter_colour in filters:
if filter_id == selected_filter:
selected = ' selected="selected"'
else:
selected = ''
style = ''
if filter_colour and filter_colour != 'FFFFFF':
fg_colour = misc.get_foreground_colour(filter_colour)
style = 'style="background: #%s; color: %s;"' % (
filter_colour, fg_colour)
r += htmltext('<option value="%s"%s %s>' % (filter_id, selected, style))
r += htmltext('%s</option>') % filter_label
r += htmltext('</select>')
r += htmltext('</div>')
r += htmltext('</div>')
elif filter_field.type == 'period-date':
r += DateWidget(filter_field_key, title=filter_field.label,
value=filter_field_value, render_br=False).render()
elif filter_field.type == 'item':
filter_field.required = False
options = filter_field.get_options()
if options:
r += SingleSelectWidget(filter_field_key, title=filter_field.label,
options=options, value=filter_field_value,
render_br=False).render()
else:
# There may be no options because the field is using
# a jsonp data source, or a json source using a
# parametrized URL depending on unavailable variables.
#
# In that case fall back on a string widget.
r += StringWidget(filter_field_key, title=filter_field.label,
value=filter_field_value, render_br=False).render()
# field filter dialog content
r += htmltext('<div style="display: none;">')
r += htmltext('<ul id="field-filter">')
for field in filter_fields:
r += htmltext('<li><input type="checkbox" name="filter-%s"') % field.id
if field.enabled:
r += htmltext(' checked="checked"')
r += htmltext(' id="fields-filter-%s"') % field.id
r += htmltext('/>')
r += htmltext('<label for="fields-filter-%s">%s</label>') % (
field.id, misc.ellipsize(field.label, 70))
r += htmltext('</li>')
r += htmltext('</ul>')
r += htmltext('</div>')
return r.getvalue()
def get_fields_sidebar(self, selected_filter, fields, offset=None,
limit=None, order_by=None):
get_response().add_javascript(['jquery.js', 'jquery-ui.js', 'wcs.listing.js'])
get_response().add_css_include('../js/smoothness/jquery-ui-1.10.0.custom.min.css')
r = TemplateIO(html=True)
r += htmltext('<form id="listing-settings" action=".">')
if offset or limit:
if not offset:
offset = 0
r += htmltext('<input type="hidden" name="offset" value="%s"/>') % offset
if limit:
r += htmltext('<input type="hidden" name="limit" value="%s"/>') % limit
if get_publisher().is_using_postgresql():
if order_by is None:
order_by = ''
r += htmltext('<input type="hidden" name="order_by" value="%s"/>') % order_by
if get_publisher().is_using_postgresql():
r += htmltext('<h3>%s</h3>') % _('Search')
if get_request().form.get('q'):
q = get_request().form.get('q')
if type(q) is not unicode:
q = unicode(q, get_publisher().site_charset)
r += htmltext('<input name="q" value="%s">') % q.encode(get_publisher().site_charset)
else:
r += htmltext('<input name="q">')
r += htmltext('<input type="submit" value="%s"/>') % _('Search')
r += self.get_filter_sidebar(selected_filter=selected_filter)
r += htmltext('<button class="refresh">%s</button>') % _('Refresh')
r += htmltext('<button id="columns-settings">%s</button>') % _('Columns Settings')
# column settings dialog content
r += htmltext('<div style="display: none;">')
r += htmltext('<ul id="columns-filter">')
for field in self.get_formdef_fields():
if not hasattr(field, str('get_view_value')):
continue
r += htmltext('<li><input type="checkbox" name="%s"') % field.id
if field.id in [x.id for x in fields]:
r += htmltext(' checked="checked"')
r += htmltext(' id="fields-column-%s"') % field.id
r += htmltext('/>')
r += htmltext('<label for="fields-column-%s">%s</label>') % (
field.id, misc.ellipsize(field.label, 70))
r += htmltext('</li>')
r += htmltext('</ul>')
r += htmltext('</div>')
r += htmltext('</form>')
return r.getvalue()
def get_formdef_fields(self):
fields = []
fields.append(FakeField('id', 'id', _('Identifier')))
fields.append(FakeField('time', 'time', _('Time')))
fields.append(FakeField('user-label', 'user-label', _('User Label')))
fields.extend(self.formdef.fields)
fields.append(FakeField('status', 'status', _('Status')))
fields.append(FakeField('anonymised', 'anonymised', _('Anonymised')))
return fields
def get_fields_from_query(self):
field_ids = [x for x in get_request().form.keys()]
if not field_ids:
field_ids = ['id', 'time', 'user-label']
for field in self.formdef.fields:
if hasattr(field, str('get_view_value')) and field.in_listing:
field_ids.append(field.id)
field_ids.append('status')
fields = []
for field in self.get_formdef_fields():
if field.id in field_ids:
fields.append(field)
return fields
def get_filter_from_query(self, default='pending'):
if 'filter' in get_request().form:
return get_request().form['filter']
if self.formdef.workflow.possible_status:
return default
return 'all'
def get_criterias_from_query(self):
period_fake_fields = [
FakeField('start', 'period-date', _('Start')),
FakeField('end', 'period-date', _('End')),
]
filter_fields = []
criterias = []
format_string = misc.date_format()
for filter_field in period_fake_fields + self.get_formdef_fields():
if filter_field.type not in ('item', 'period-date'):
continue
filter_field_key = None
if filter_field.varname:
# if this is a field with a varname and filter-%(varname)s is
# present in the query string, enable this filter.
if get_request().form.get('filter-%s' % filter_field.varname):
filter_field_key = 'filter-%s' % filter_field.varname
if get_request().form.get('filter-%s' % filter_field.id):
# if there's a filter-%(id)s, it is used to enable the actual
# filter, and the value will be found in filter-%s-value.
filter_field_key = 'filter-%s-value' % filter_field.id
if not filter_field_key:
# if there's not known filter key, skip.
continue
filter_field_value = get_request().form.get(filter_field_key)
if not filter_field_value:
continue
if filter_field.id == 'start':
period_start = time.strptime(filter_field_value, format_string)
criterias.append(GreaterOrEqual('receipt_time', period_start))
elif filter_field.id == 'end':
period_end = time.strptime(filter_field_value, format_string)
criterias.append(LessOrEqual('receipt_time', period_end))
elif filter_field.type == 'item' and filter_field_value not in (None, 'None'):
criterias.append(Equal('f%s' % filter_field.id, filter_field_value))
return criterias
def _q_index(self):
get_logger().info('backoffice - form %s - listing' % self.formdef.name)
fields = self.get_fields_from_query()
selected_filter = self.get_filter_from_query()
criterias = self.get_criterias_from_query()
if get_publisher().is_using_postgresql():
# only enable pagination in SQL mode, as we do not have sorting in
# the other case.
limit = get_request().form.get('limit', 20)
else:
limit = get_request().form.get('limit', 0)
offset = get_request().form.get('offset', 0)
order_by = get_request().form.get('order_by', None)
query = get_request().form.get('q')
qs = ''
if get_request().get_query():
qs = '?' + get_request().get_query()
table = FormDefUI(self.formdef).listing(fields=fields,
selected_filter=selected_filter, include_form=True,
limit=int(limit), offset=int(offset), query=query,
order_by=order_by, criterias=criterias)
if get_request().form.get('ajax') == 'true':
get_response().filter = None
return table
html_top('management', '%s - %s' % (_('Listing'), self.formdef.name))
r = TemplateIO(html=True)
r += htmltext('<h2>%s - %s</h2>') % (self.formdef.name, _('Listing'))
r += table
get_response().filter['sidebar'] = self.get_formdata_sidebar(qs) + \
self.get_fields_sidebar(selected_filter, fields, limit=limit,
offset=offset, order_by=order_by)
return r.getvalue()
def pending(self):
get_logger().info('backoffice - form %s - pending' % self.formdef.name)
get_response().breadcrumb.append( ('pending', _('Pending Forms')) )
html_top('management', '%s - %s' % (_('Pending Forms'), self.formdef.name))
r = TemplateIO(html=True)
r += htmltext('<h2>%s - %s</h2>') % (self.formdef.name, _('Pending Forms'))
not_endpoint_status = [('wf-%s' % x.id, x.name) for x in \
self.formdef.workflow.get_not_endpoint_status()]
nb_status = len(not_endpoint_status)
column2 = nb_status/2
r += htmltext('<div class="splitcontent-left">')
for i, (status_id, status_label) in enumerate(not_endpoint_status):
if i > 0 and i == column2:
r += htmltext('</div>')
r += htmltext('<div class="splitcontent-right">')
status_forms = self.formdef.data_class().get_with_indexed_value(
str('status'), status_id)
if not status_forms:
continue
status_forms.sort(lambda x, y: cmp(getattr(x, str('receipt_time')),
getattr(y, str('receipt_time'))))
status_forms.reverse()
r += htmltext('<div class="bo-block">')
r += htmltext('<h3>%s</h3>') % _('Forms with status "%s"') % status_label
r += htmltext('<ul>')
for f in status_forms:
try:
u = get_publisher().user_class.get(f.user_id)
userlabel = u.display_name
except KeyError:
userlabel = _('unknown user')
r += htmltext('<li><a href="%s/">%s, %s</a></li>') % (
f.id,
misc.localstrftime(f.receipt_time),
userlabel)
r += htmltext('</ul>')
r += htmltext('</div>')
r += htmltext('</div>')
r += htmltext('<p class="clear"><a href=".">%s</a></p>') % _('Back')
return r.getvalue()
def csv_tuple_heading(self, fields):
heading_fields = [] # '#id', _('time'), _('userlabel'), _('status')]
for field in fields:
heading_fields.extend(field.get_csv_heading())
return heading_fields
def csv_tuple(self, fields, data, hint=None):
elements = []
for field in fields:
if field.type == 'id':
element = str(data.id)
elif field.type == 'time':
element = misc.localstrftime(data.receipt_time)
elif field.type == 'user-label':
try:
element = get_publisher().user_class.get(data.user_id).display_name
except:
element = '-'
elif field.type == 'status':
element = data.get_status_label()
else:
element = data.data.get(field.id, '') or ''
elements.extend(field.get_csv_value(element, hint=hint))
return elements
def csv(self):
fields = self.get_fields_from_query()
selected_filter = self.get_filter_from_query()
user = get_request().user
query = get_request().form.get('q')
class Exporter(object):
def __init__(self, formpage, formdef, fields, selected_filter):
self.formpage = formpage
self.formdef = formdef
self.fields = fields
self.selected_filter = selected_filter
def export(self, job=None):
self.output = cStringIO.StringIO()
csv_output = csv.writer(self.output)
csv_output.writerow(self.formpage.csv_tuple_heading(self.fields))
items, total_count = FormDefUI(self.formdef).get_listing_items(
self.selected_filter, user=user, query=query)
for filled in items:
csv_output.writerow(self.formpage.csv_tuple(self.fields, filled))
if job:
job.file_content = self.output.getvalue()
job.content_type = 'text/csv'
job.store()
get_logger().info('backoffice - form %s - listing csv' % self.formdef.name)
count = self.formdef.data_class().count()
exporter = Exporter(self, self.formdef, fields, selected_filter)
if count > 100: # Arbitrary threshold
job = get_response().add_after_job(
str(N_('Exporting forms in CSV')),
exporter.export)
job.file_name = '%s.csv' % self.formdef.url_name
job.store()
return redirect('export?job=%s' % job.id)
else:
exporter.export()
response = get_response()
response.set_content_type('text/plain')
#response.set_header('content-disposition', 'attachment; filename=%s.csv' % self.formdef.url_name)
return exporter.output.getvalue()
def export(self):
if get_request().form.get('download'):
return self.export_download()
try:
job = AfterJob.get(get_request().form.get('job'))
except KeyError:
return redirect('.')
html_top('management', title=_('Exporting'))
r = TemplateIO(html=True)
r += get_session().display_message()
get_response().add_javascript(['jquery.js', 'afterjob.js'])
r += htmltext('<dl class="job-status">')
r += htmltext('<dt>')
r += _(job.label)
r += htmltext('</dt>')
r += htmltext('<dd>')
r += htmltext('<span class="afterjob" id="%s">') % job.id
r += _(job.status)
r += htmltext('</span>')
r += htmltext('</dd>')
r += htmltext('</dl>')
r += htmltext('<div class="done">')
r += htmltext('<a download="%s" href="export?download=%s">%s</a>') % (
job.file_name, job.id, _('Download Export'))
r += htmltext('</div>')
return r.getvalue()
def export_download(self):
try:
job = AfterJob.get(get_request().form.get('download'))
except KeyError:
return redirect('.')
if not job.status == 'completed':
raise errors.TraversalError()
response = get_response()
response.set_content_type(job.content_type)
response.set_header('content-disposition',
'attachment; filename=%s' % job.file_name)
return job.file_content
def xls(self):
if xlwt is None:
raise errors.TraversalError()
fields = self.get_fields_from_query()
selected_filter = self.get_filter_from_query()
user = get_request().user
query = get_request().form.get('q')
class Exporter(object):
def __init__(self, formpage, formdef, fields, selected_filter):
self.formpage = formpage
self.formdef = formdef
self.fields = fields
self.selected_filter = selected_filter
def export(self, job=None):
w = xlwt.Workbook(encoding=get_publisher().site_charset)
ws = w.add_sheet('1')
for i, f in enumerate(self.formpage.csv_tuple_heading(self.fields)):
ws.write(0, i, f)
items, total_count = FormDefUI(self.formdef).get_listing_items(
self.selected_filter, user=user, query=query)
for i, filled in enumerate(items):
for j, elem in enumerate(self.formpage.csv_tuple(fields, filled)):
if elem and len(elem) > 32767:
# xls cells have a limit of 32767 characters, cut
# it down.
elem = elem[:32760] + ' [...]'
ws.write(i+1, j, elem)
self.output = cStringIO.StringIO()
w.save(self.output)
if job:
job.file_content = self.output.getvalue()
job.content_type = 'application/vnd.ms-excel'
job.store()
get_logger().info('backoffice - form %s - as excel' % self.formdef.name)
count = self.formdef.data_class().count()
exporter = Exporter(self, self.formdef, fields, selected_filter)
if count > 100: # Arbitrary threshold
job = get_response().add_after_job(
str(N_('Exporting forms in Excel format')),
exporter.export)
job.file_name = '%s.xls' % self.formdef.url_name
job.store()
return redirect('export?job=%s' % job.id)
else:
exporter.export()
response = get_response()
response.set_content_type('application/vnd.ms-excel')
response.set_header('content-disposition', 'attachment; filename=%s.xls' % self.formdef.url_name)
return exporter.output.getvalue()
def ods(self):
fields = self.get_fields_from_query()
selected_filter = self.get_filter_from_query()
user = get_request().user
query = get_request().form.get('q')
class Exporter(object):
def __init__(self, formpage, formdef, fields, selected_filter):
self.formpage = formpage
self.formdef = formdef
self.fields = fields
self.selected_filter = selected_filter
def export(self, job=None):
w = ods.Workbook(encoding=get_publisher().site_charset)
ws = w.add_sheet('1')
for i, f in enumerate(self.formpage.csv_tuple_heading(self.fields)):
ws.write(0, i, f)
items, total_count = FormDefUI(self.formdef).get_listing_items(
self.selected_filter, user=user, query=query)
for i, filled in enumerate(items):
for j, elem in enumerate(self.formpage.csv_tuple(fields, filled, hint='ods')):
if type(elem) is str and '[download]' in elem:
elem = elem.replace('[download]', filled.get_url(backoffice=True))
ws.write(i+1, j, elem, hint='uri')
else:
ws.write(i+1, j, elem)
self.output = cStringIO.StringIO()
w.save(self.output)
if job:
job.file_content = self.output.getvalue()
job.content_type = 'application/vnd.oasis.opendocument.spreadsheet'
job.store()
get_logger().info('backoffice - form %s - as ods' % self.formdef.name)
count = self.formdef.data_class().count()
exporter = Exporter(self, self.formdef, fields, selected_filter)
if count > 100: # Arbitrary threshold
job = get_response().add_after_job(
str(N_('Exporting forms in Open Document format')),
exporter.export)
job.file_name = '%s.ods' % self.formdef.url_name
job.store()
return redirect('export?job=%s' % job.id)
else:
exporter.export()
response = get_response()
response.set_content_type('application/vnd.oasis.opendocument.spreadsheet')
response.set_header('content-disposition', 'attachment; filename=%s.ods' % self.formdef.url_name)
return exporter.output.getvalue()
def json(self):
get_response().set_content_type('application/json')
from wcs.api import get_user_from_api_query_string
user = get_user_from_api_query_string() or get_request().user
selected_filter = self.get_filter_from_query(default='all')
criterias = self.get_criterias_from_query()
order_by = get_request().form.get('order_by', None)
query = get_request().form.get('q')
items, total_count = FormDefUI(self.formdef).get_listing_items(
selected_filter, user=user, query=query, criterias=criterias,
order_by=order_by)
if get_request().form.get('full') == 'on':
output = [json.loads(filled.export_to_json(include_files=False)) for filled in items]
else:
output = [{'id': filled.id,
'url': filled.get_url(),
'receipt_time': filled.receipt_time,
'last_update_time': filled.last_update_time} for filled in items]
return json.dumps(output,
cls=misc.JSONEncoder,
encoding=get_publisher().site_charset)
def get_stats_sidebar(self, selected_filter):
get_response().add_javascript(['jquery.js', 'jquery-ui.js', 'wcs.listing.js'])
get_response().add_css_include('../js/smoothness/jquery-ui-1.10.0.custom.min.css')
r = TemplateIO(html=True)
r += htmltext('<form id="listing-settings" action="stats">')
r += self.get_filter_sidebar(selected_filter=selected_filter, mode='stats')
r += htmltext('<button class="refresh">%s</button>') % _('Refresh')
if misc.can_decorate_as_pdf():
r += htmltext('<button class="pdf">%s</button>') % _('Download PDF')
r += htmltext('</form>')
return r.getvalue()
def stats(self):
get_logger().info('backoffice - form %s - stats' % self.formdef.name)
html_top('management', '%s - %s' % (_('Form'), self.formdef.name))
r = TemplateIO(html=True)
get_response().breadcrumb.append( ('stats', _('Statistics')) )
selected_filter = self.get_filter_from_query(default='all')
criterias = self.get_criterias_from_query()
get_response().filter['sidebar'] = self.get_formdata_sidebar() + \
self.get_stats_sidebar(selected_filter)
do_graphs = get_publisher().is_using_postgresql()
if selected_filter and selected_filter != 'all':
if selected_filter == 'pending':
applied_filters = ['wf-%s' % x.id for x in
self.formdef.workflow.get_not_endpoint_status()]
elif selected_filter == 'done':
applied_filters = ['wf-%s' % x.id for x in
self.formdef.workflow.get_endpoint_status()]
else:
applied_filters = ['wf-%s' % selected_filter]
criterias.append(Or([Equal('status', x) for x in applied_filters]))
values = self.formdef.data_class().select(criterias)
if get_publisher().is_using_postgresql():
# load all evolutions in a single batch, to avoid as many query as
# there are formdata when computing resolution times statistics.
self.formdef.data_class().load_all_evolutions(values)
r += htmltext('<div id="statistics">')
if do_graphs:
r += htmltext('<div class="splitcontent-left">')
no_forms = len(values)
r += htmltext('<div class="bo-block">')
r += htmltext('<p>%s %d</p>') % (_('Total number of records:'), no_forms)
if self.formdef.workflow:
r += htmltext('<ul>')
for status in self.formdef.workflow.possible_status:
r += htmltext('<li>%s: %d</li>') % (status.name,
len([x for x in values if x.status == 'wf-%s' % status.id]))
r += htmltext('</ul>')
r += htmltext('</div>')
excluded_fields = []
for criteria in criterias:
if not isinstance(criteria, Equal):
continue
excluded_fields.append(criteria.attribute[1:])
stats_for_fields = self.stats_fields(values,
excluded_fields=excluded_fields)
if stats_for_fields:
r += htmltext('<div class="bo-block">')
r += stats_for_fields
r += htmltext('</div>')
stats_times = self.stats_resolution_time(values)
if stats_times:
r += htmltext('<div class="bo-block">')
r += stats_times
r += htmltext('</div>')
if do_graphs:
r += htmltext('</div>')
r += htmltext('<div class="splitcontent-right">')
criterias.append(Equal('formdef_id', int(self.formdef.id)))
r += do_graphs_section(criterias=criterias)
r += htmltext('</div>')
r += htmltext('</div>') # id="statistics"
if get_request().form.get('ajax') == 'true':
get_response().filter = None
return r.getvalue()
page = TemplateIO(html=True)
page += htmltext('<h2>%s - %s</h2>') % (self.formdef.name, _('Statistics'))
page += htmltext(r)
page += htmltext('<a class="back" href=".">%s</a>') % _('Back')
if 'pdf' in get_request().form:
pdf_content = misc.decorate_as_pdf(page.getvalue())
response = get_response()
response.set_content_type('application/pdf')
return pdf_content
return page.getvalue()
def stats_fields(self, values, excluded_fields=None):
r = TemplateIO(html=True)
had_page = False
last_page = None
last_title = None
for f in self.formdef.fields:
if excluded_fields and f.id in excluded_fields:
continue
if f.type == 'page':
last_page = f.label
last_title = None
continue
if f.type == 'title':
last_title = f.label
continue
if not f.stats:
continue
t = f.stats(values)
if not t:
continue
if last_page:
if had_page:
r += htmltext('</div>')
r += htmltext('<div class="page">')
r += htmltext('<h3>%s</h3>') % last_page
had_page = True
last_page = None
if last_title:
r += htmltext('<h3>%s</h3>') % last_title
last_title = None
r += t
if had_page:
r += htmltext('</div>')
return r.getvalue()
def stats_resolution_time(self, values):
possible_status = [('wf-%s' % x.id, x.id) for x in self.formdef.workflow.possible_status]
if len(possible_status) < 2:
return
r = TemplateIO(html=True)
r += htmltext('<h2>%s</h2>') % _('Resolution time')
for status, status_id in possible_status:
res_time_forms = [
(time.mktime(x.evolution[-1].time) - time.mktime(x.receipt_time)) \
for x in values if x.status == status and x.evolution]
if not res_time_forms:
continue
res_time_forms.sort()
sum_times = sum(res_time_forms)
len_times = len(res_time_forms)
r += htmltext('<h3>%s</h3>') % (_('To Status "%s"') % self.formdef.workflow.get_status(status_id).name)
r += htmltext('<ul>')
r += htmltext(' <li>%s %s</li>') % (_('Minimum Time:'), format_time(min(res_time_forms)))
r += htmltext(' <li>%s %s</li>') % (_('Maximum Time:'), format_time(max(res_time_forms)))
r += htmltext(' <li>%s %s</li>') % (_('Range:'), format_time(max(res_time_forms)-min(res_time_forms)))
mean = sum_times/len_times
r += htmltext(' <li>%s %s</li>') % (_('Mean:'), format_time(mean))
if len_times % 2:
median = res_time_forms[len_times/2]
else:
midpt = len_times/2
median = (res_time_forms[midpt-1]+res_time_forms[midpt])/2
r += htmltext(' <li>%s %s</li>') % (_('Median:'), format_time(median))
# variance...
x = 0
for t in res_time_forms:
x += (t - mean)**2.0
try:
variance = x/(len_times+1)
except:
variance = 0
# not displayed since in square seconds which is not easy to grasp
from math import sqrt
# and standard deviation
std_dev = sqrt(variance)
r += htmltext(' <li>%s %s</li>') % (_('Standard Deviation:'), format_time(std_dev))
r += htmltext('</ul>')
return r.getvalue()
def _q_lookup(self, component):
try:
filled = self.formdef.data_class().get(component)
except KeyError:
raise errors.TraversalError()
return FormBackOfficeStatusPage(self.formdef, filled)
class FormBackOfficeStatusPage(FormStatusPage):
def html_top(self, title = None):
return html_top('management', title)
def _q_index(self):
get_response().add_javascript(['jquery.js', 'qommon.admin.js'])
return self.status()
class FakeField(object):
def __init__(self, id, type_, label):
self.id = id
self.type = type_
self.label = label
self.fake = True
self.varname = None
def get_view_value(self, value):
# just here to quack like a duck
return None
def get_csv_heading(self):
return [self.label]
def get_csv_value(self, element, hint=None):
return [element]
def do_graphs_section(period_start=None, period_end=None, criterias=None):
from wcs import sql
r = TemplateIO(html=True)
monthly_totals = sql.get_monthly_totals(period_start, period_end, criterias)[-12:]
yearly_totals = sql.get_yearly_totals(period_start, period_end, criterias)[-10:]
if not monthly_totals:
monthly_totals = [('%s-%s' % datetime.date.today().timetuple()[:2], 0)]
if not yearly_totals:
yearly_totals = [(datetime.date.today().year, 0)]
weekday_totals = sql.get_weekday_totals(period_start, period_end, criterias)
weekday_line = []
weekday_names = [_('Sunday'), _('Monday'), _('Tuesday'),
_('Wednesday'), _('Thursday'), _('Friday'), _('Saturday')]
for weekday, total in weekday_totals:
label = weekday_names[weekday]
weekday_line.append((label, total))
# move Sunday to the last place
weekday_line = weekday_line[1:] + [weekday_line[0]]
hour_totals = sql.get_hour_totals(period_start, period_end, criterias)
r += htmltext('''<script>
var weekday_line = %(weekday_line)s;
var hour_line = %(hour_line)s;
var month_line = %(month_line)s;
var year_line = %(year_line)s;
</script>''' % {
'weekday_line': json.dumps(weekday_line),
'hour_line': json.dumps(hour_totals),
'month_line': json.dumps(monthly_totals),
'year_line': json.dumps(yearly_totals),
})
if len(yearly_totals) > 1:
r += htmltext('<h3>%s</h3>') % _('Submissions by year')
r += htmltext('<div id="chart_years" style="height:160px; width:100%;"></div>')
r += htmltext('<h3>%s</h3>') % _('Submissions by month')
r += htmltext('<div id="chart_months" style="height:160px; width:100%;"></div>')
r += htmltext('<h3>%s</h3>') % _('Submissions by weekday')
r += htmltext('<div id="chart_weekdays" style="height:160px; width:100%;"></div>')
r += htmltext('<h3>%s</h3>') % _('Submissions by hour')
r += htmltext('<div id="chart_hours" style="height:160px; width:100%;"></div>')
get_response().add_javascript(['jquery.js', 'jqplot/jquery.jqplot.min.js',
'jqplot/plugins/jqplot.canvasTextRenderer.min.js',
'jqplot/plugins/jqplot.canvasAxisLabelRenderer.min.js',
'jqplot/plugins/jqplot.canvasAxisTickRenderer.min.js',
'jqplot/plugins/jqplot.categoryAxisRenderer.min.js',
'jqplot/plugins/jqplot.barRenderer.min.js',
])
get_response().add_javascript_code('''
function wcs_draw_graphs() {
$.jqplot ('chart_weekdays', [weekday_line], {
series:[{renderer:$.jqplot.BarRenderer}],
axesDefaults: {
tickRenderer: $.jqplot.CanvasAxisTickRenderer,
tickOptions: { angle: -30, }
},
axes: { xaxis: { renderer: $.jqplot.CategoryAxisRenderer } }
});
$.jqplot ('chart_hours', [hour_line], {
axesDefaults: {
tickRenderer: $.jqplot.CanvasAxisTickRenderer,
tickOptions: { angle: -30, }
},
axes: { xaxis: { renderer: $.jqplot.CategoryAxisRenderer }, yaxis: {min: 0} }
});
$.jqplot ('chart_months', [month_line], {
axesDefaults: {
tickRenderer: $.jqplot.CanvasAxisTickRenderer,
tickOptions: { angle: -30, }
},
axes: { xaxis: { renderer: $.jqplot.CategoryAxisRenderer }, yaxis: {min: 0} }
});
if ($('#chart_years').length) {
$.jqplot ('chart_years', [year_line], {
series:[{renderer:$.jqplot.BarRenderer}],
axesDefaults: {
tickRenderer: $.jqplot.CanvasAxisTickRenderer,
tickOptions: { angle: -30, }
},
axes: { xaxis: { renderer: $.jqplot.CategoryAxisRenderer }, yaxis: {min: 0} }
});
}
}
$(document).ready(function(){
wcs_draw_graphs();
});
''')
return r.getvalue()
def get_stats_criteria(request, parsed_values=None):
"""
Parses the request query string and returns a list of criterias suitable
for select() usage. The parsed_values parameter can be given a dictionary,
to be filled with the parsed values.
"""
format_string = misc.date_format()
criterias = [NotEqual('status', 'draft')]
try:
period_start = time.strptime(request.form.get('start'), format_string)
criterias.append(GreaterOrEqual('receipt_time', period_start))
if parsed_values is not None:
parsed_values['period_start'] = datetime.datetime.fromtimestamp(time.mktime(period_start))
except (ValueError, TypeError):
pass
try:
period_end = time.strptime(request.form.get('end'), format_string)
criterias.append(LessOrEqual('receipt_time', period_end))
if parsed_values is not None:
parsed_values['period_end'] = datetime.datetime.fromtimestamp(time.mktime(period_end))
except (ValueError, TypeError):
pass
return criterias
def format_time(t, units = 2):
days = int(t/86400)
hours = int((t-days*86400)/3600)
minutes = int((t-days*86400-hours*3600)/60)
seconds = t % 60
if units == 1:
if days:
return _('%d day(s)') % days
if hours:
return _('%d hour(s)') % hours
if minutes:
return _('%d minute(s)') % minutes
elif units == 2:
if days:
return _('%(days)d day(s) and %(hours)d hour(s)') % {
'days': days, 'hours': hours}
if hours:
return _('%(hours)d hour(s) and %(minutes)d minute(s)') % {
'hours': hours, 'minutes': minutes}
if minutes:
return _('%(minutes)d minute(s) and %(seconds)d seconds') % {
'minutes': minutes, 'seconds': seconds}
return _('%d seconds') % seconds