wcs/wcs/backoffice/data_management.py

333 lines
13 KiB
Python

# w.c.s. - web application for online forms
# Copyright (C) 2005-2019 Entr'ouvert
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import csv
import datetime
from quixote import get_publisher, get_request, get_response, redirect
from quixote.html import TemplateIO, htmltext, htmlescape
from django.utils.encoding import force_text
from django.utils.six import StringIO
from ..qommon import _, N_
from ..qommon import errors
from ..qommon import template
from ..qommon.form import Form, FileWidget
from ..qommon.backoffice.menu import html_top
from ..qommon import template
from ..qommon.afterjobs import AfterJob
from wcs.carddef import CardDef
from wcs import fields
from .management import ManagementDirectory, FormPage, FormFillPage, FormBackOfficeStatusPage
class DataManagementDirectory(ManagementDirectory):
do_not_call_in_templates = True
_q_exports = ['']
def _q_traverse(self, path):
get_response().breadcrumb.append(('data/', _('Cards')))
return super(ManagementDirectory, self)._q_traverse(path)
def is_accessible(self, user):
if not user.can_go_in_backoffice():
return False
# only include data management if there are accessible cards
for carddef in CardDef.select(ignore_errors=True, lightweight=True, iterator=True):
for role_id in user.get_roles():
if role_id in (carddef.backoffice_submission_roles or []):
return True
if role_id in (carddef.workflow_roles or {}).values():
return True
return False
def get_carddefs(self):
user = get_request().user
if user:
for formdef in CardDef.select(order_by='name', ignore_errors=True, lightweight=True):
if user.is_admin or formdef.is_of_concern_for_user(user):
yield formdef
def _q_index(self):
html_top('data_management', _('Cards'))
if CardDef.count() == 0:
return self.empty_site_message(_('Cards'))
return template.QommonTemplateResponse(
templates=['wcs/backoffice/data-management.html'],
context={'view': self})
def _q_lookup(self, component):
return CardPage(component)
class CardPage(FormPage):
_q_exports = ['', 'csv', 'xls', 'ods', 'json', 'export', 'map', 'geojson', 'add',
('save-view', 'save_view'), ('delete-view', 'delete_view'),
('import-csv', 'import_csv'),
('data-sample-csv', 'data_sample_csv')]
admin_permission = 'cards'
def __init__(self, component=None, formdef=None, view=None):
try:
self.formdef = formdef if formdef else CardDef.get_by_urlname(component)
except KeyError:
raise errors.TraversalError()
self.add = CardFillPage(self.formdef.url_name)
if view:
self.view = view
def can_user_add_cards(self):
if not self.formdef.backoffice_submission_roles:
return False
for role in get_request().user.get_roles():
if role in self.formdef.backoffice_submission_roles:
return True
return False
def listing_top_actions(self):
if not self.can_user_add_cards():
return ''
return htmltext('<span class="actions"><a href="./add/">%s</a></span>') % _('Add')
def get_default_filters(self, mode):
if self.view:
return self.view.get_default_filters()
return ()
def get_default_columns(self):
if self.view:
field_ids = self.view.get_columns()
else:
field_ids = ['id', 'time']
for field in self.formdef.get_all_fields():
if hasattr(field, 'get_view_value') and field.include_in_listing:
field_ids.append(field.id)
return field_ids
def get_filter_from_query(self, default=Ellipsis):
return super(CardPage, self).get_filter_from_query(
default='all' if default is Ellipsis else default)
def get_formdata_sidebar_actions(self, qs=''):
r = super(CardPage, self).get_formdata_sidebar_actions(qs=qs)
if self.can_user_add_cards():
r += htmltext('<li><a rel="popup" href="import-csv">%s</a></li>'
) % _('Import data from a CSV file')
return r
def get_import_csv_fields(self):
# skip non-data fields
return [x for x in self.formdef.get_all_fields() if isinstance(x, fields.WidgetField)]
def data_sample_csv(self):
carddef_fields = self.get_import_csv_fields()
output = StringIO()
csv_output = csv.writer(output)
csv_output.writerow([f.label for f in carddef_fields])
sample_line = []
for f in carddef_fields:
if f.convert_value_from_str is None:
value = _('will be ignored - type %s not supported') % _(f.description)
elif isinstance(f, fields.DateField):
value = datetime.date.today()
elif isinstance(f, fields.BoolField):
value = _('Yes')
elif isinstance(f, fields.EmailField):
value = 'foo@example.com'
elif isinstance(f, fields.MapField):
value = get_publisher().get_default_position()
else:
value = 'value'
sample_line.append(value)
csv_output.writerow(sample_line)
response = get_response()
response.set_content_type('text/plain')
response.set_header('content-disposition', 'attachment; filename=%s-sample.csv' % self.formdef.url_name)
return output.getvalue()
def import_csv(self):
if not self.can_user_add_cards():
raise errors.AccessForbiddenError()
context = {
'unsupported_fields': [],
'required_fields': []
}
try:
job = AfterJob.get(get_request().form.get('job'))
get_response().add_javascript(['jquery.js', 'afterjob.js'])
context['job'] = job
except KeyError:
form = Form(enctype='multipart/form-data', use_tokens=False)
form.add(FileWidget, 'file', title=_('File'), required=False)
form.add_submit('submit', _('Submit'))
form.add_submit('cancel', _('Cancel'))
if form.get_widget('cancel').parse():
return redirect('.')
if form.is_submitted() and not form.has_errors():
try:
return self.import_csv_submit(form)
except ValueError as e:
form.set_error('file', e)
context['form'] = form
get_response().breadcrumb.append(('import_csv', _('Import CSV')))
html_top('data_management', _('Import CSV'))
for field in self.get_import_csv_fields():
if field.convert_value_from_str is None:
context['unsupported_fields'].append(field)
if not hasattr(field, 'required'):
continue
if field.required and field.convert_value_from_str is None:
context['required_fields'].append(field.label)
return template.QommonTemplateResponse(
templates=['wcs/backoffice/card-data-import-form.html'],
context=context)
def import_csv_submit(self, form):
if form.get_widget('file').parse():
content = form.get_widget('file').parse().fp.read()
if b'\0' in content:
raise ValueError(_('Invalid file format.'))
else:
raise ValueError(_('You have to enter a file.'))
for charset in ('utf-8', 'iso-8859-15'):
try:
content = content.decode(charset)
break
except UnicodeDecodeError:
continue
try:
dialect = csv.Sniffer().sniff(content)
except csv.Error:
dialect = None
reader = csv.reader(content.splitlines(keepends=True), dialect=dialect)
try:
caption = next(reader)
except StopIteration:
raise ValueError(_('Invalid CSV file.'))
carddef_fields = self.get_import_csv_fields()
if len(caption) < len(carddef_fields):
raise ValueError(_('CSV file contains less columns than card fields.'))
data_lines = []
incomplete_lines = []
for line_no, csv_line in enumerate(reader):
data_line = {}
if len(csv_line) != len(carddef_fields):
# +2 because header and counting from 1.
incomplete_lines.append(str(line_no + 2))
continue
for i, field in enumerate(carddef_fields):
value = csv_line[i].strip()
# skip empty values
if not value:
continue
# skip unsupported field types
if field.convert_value_from_str is None:
continue
field_id = str(field.id)
data_line[field_id] = field.convert_value_from_str(value)
if field.store_display_value:
display_value = field.store_display_value(data_line, field_id)
data_line['%s_display' % field_id] = display_value
if value and field.store_structured_value:
structured_value = field.store_structured_value(data_line, field_id)
if structured_value:
if isinstance(structured_value, dict) and structured_value.get('id'):
# in case of list field, override id
data_line[field_id] = str(structured_value.get('id'))
data_line['%s_structured' % field_id] = structured_value
data_lines.append(data_line)
if incomplete_lines:
error_message = _('CSV file contains lines with wrong number of columns.')
if len(incomplete_lines) < 5:
error_message += ' ' + _('(line numbers %s)') % ', '.join(incomplete_lines)
else:
error_message += ' ' + _('(line numbers %s and more)') % ', '.join(incomplete_lines[:5])
raise ValueError(error_message)
class ImportAction:
def __init__(self, data_class, lines):
self.data_class = data_class
self.lines = lines
def execute(self, job):
for item in self.lines:
data_instance = self.data_class()
data_instance.data = item
data_instance.just_created()
data_instance.store()
data_instance.perform_workflow()
action = ImportAction(self.formdef.data_class(), data_lines)
job = get_response().add_after_job(N_('Importing data into cards'),
action.execute)
job.store()
return redirect('import-csv?job=%s' % job.id)
def _q_lookup(self, component):
if not self.view:
for view in self.get_custom_views():
if view.get_url_slug() == component:
return self.__class__(formdef=self.formdef, view=view)
try:
filled = self.formdef.data_class().get(component)
except KeyError:
raise errors.TraversalError()
return CardBackOfficeStatusPage(self.formdef, filled)
class CardFillPage(FormFillPage):
formdef_class = CardDef
def submitted(self, form, *args):
super(CardFillPage, self).submitted(form, *args)
if get_response().get_header('location').endswith('/backoffice/submission/'):
return redirect('..')
class CardBackOfficeStatusPage(FormBackOfficeStatusPage):
form_page_class = CardFillPage
sidebar_recorded_message = N_(
'The card has been recorded on %(date)s with the number %(number)s.')
sidebar_recorded_by_agent_message = N_(
'The card has been recorded on %(date)s with the number %(number)s by %(agent)s.')
def html_top(self, title=None):
return html_top('data_management', title)
def should_fold_summary(self, mine, request_user):
return False
def should_fold_history(self):
return True