wcs/wcs/backoffice/data_management.py

404 lines
14 KiB
Python

# w.c.s. - web application for online forms
# Copyright (C) 2005-2019 Entr'ouvert
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import csv
import datetime
import io
import json
from quixote import get_publisher, get_request, get_response, redirect
from quixote.html import htmltext
from wcs import fields
from wcs.carddef import CardDef
from wcs.categories import CardDefCategory
from ..qommon import _, errors, template
from ..qommon.afterjobs import AfterJob
from ..qommon.backoffice.menu import html_top
from ..qommon.form import FileWidget, Form
from .management import FormBackOfficeStatusPage, FormPage, ManagementDirectory
from .submission import FormFillPage
def get_import_csv_fields(carddef):
class UserField:
key = 'user'
id = '_user'
label = _('User (email or UUID)')
def convert_value_from_str(self, x):
return x
def set_value(self, data, value):
data['_user'] = value
# skip non-data fields
csv_fields = [x for x in carddef.get_all_fields() if isinstance(x, fields.WidgetField)]
if carddef.user_support == 'optional':
return [UserField()] + csv_fields
return csv_fields
class DataManagementDirectory(ManagementDirectory):
do_not_call_in_templates = True
_q_exports = ['']
def add_breadcrumb(self):
get_response().breadcrumb.append(('data/', _('Cards')))
def is_accessible(self, user):
if not user.can_go_in_backoffice():
return False
if get_publisher().get_backoffice_root().is_global_accessible('cards') and CardDef.keys():
# open for admins as soon as there are cards
return True
# only include data management if there are accessible cards
for carddef in CardDef.select(ignore_errors=True, lightweight=True, iterator=True):
for role_id in user.get_roles():
if role_id in (carddef.backoffice_submission_roles or []):
return True
if role_id in (carddef.workflow_roles or {}).values():
return True
return False
def get_carddefs(self):
user = get_request().user
if not user:
return
carddefs = CardDef.select(order_by='name', ignore_errors=True, lightweight=True)
carddefs = [c for c in carddefs if user.is_admin or c.is_of_concern_for_user(user)]
cats = CardDefCategory.select()
CardDefCategory.sort_by_position(cats)
for c in cats + [None]:
for carddef in carddefs:
if c is None and not carddef.category_id:
yield carddef
if c is not None and carddef.category_id == c.id:
yield carddef
def _q_index(self):
html_top('data_management', _('Cards'))
if not (CardDef.exists()):
return self.empty_site_message(_('Cards'))
return template.QommonTemplateResponse(
templates=['wcs/backoffice/data-management.html'], context={'view': self}
)
def _q_lookup(self, component):
return CardPage(component)
class CardPage(FormPage):
_q_exports = [
'',
'csv',
'ods',
'json',
'export',
'map',
'geojson',
'add',
('export-spreadsheet', 'export_spreadsheet'),
('save-view', 'save_view'),
('delete-view', 'delete_view'),
('import-csv', 'import_csv'),
('filter-options', 'filter_options'),
('data-sample-csv', 'data_sample_csv'),
]
admin_permission = 'cards'
formdef_class = CardDef
search_label = _('Search in card content')
formdef_view_label = _('View Card')
@property
def add(self):
return CardFillPage(self.formdef.url_name)
def listing_top_actions(self):
if not self.formdef.can_user_add_cards(get_request().user):
return ''
return htmltext('<span class="actions"><a href="./add/">%s</a></span>') % _('Add')
def get_default_filters(self, mode):
if self.view:
return self.view.get_default_filters()
return ()
def get_default_columns(self):
if self.view:
field_ids = self.view.get_columns()
else:
field_ids = ['id', 'time']
for field in self.formdef.get_all_fields():
if hasattr(field, 'get_view_value') and field.include_in_listing:
field_ids.append(field.id)
return field_ids
def get_filter_from_query(self, default=Ellipsis):
return super().get_filter_from_query(default='all' if default is Ellipsis else default)
def get_formdata_sidebar_actions(self, qs=''):
r = super().get_formdata_sidebar_actions(qs=qs)
if self.formdef.can_user_add_cards(get_request().user):
r += htmltext('<li><a rel="popup" href="import-csv">%s</a></li>') % _(
'Import data from a CSV file'
)
return r
def data_sample_csv(self):
carddef_fields = get_import_csv_fields(self.formdef)
output = io.StringIO()
if len(carddef_fields) == 1:
csv_output = csv.writer(output, quoting=csv.QUOTE_NONE, delimiter='\uE000', escapechar='\uE001')
else:
csv_output = csv.writer(output, quoting=csv.QUOTE_ALL)
csv_output.writerow([f.label for f in carddef_fields])
sample_line = []
for f in carddef_fields:
if f.convert_value_from_str is None:
value = _('will be ignored - type %s not supported') % f.get_type_label()
elif isinstance(f, fields.DateField):
value = datetime.date.today()
elif isinstance(f, fields.BoolField):
value = _('Yes')
elif isinstance(f, fields.EmailField):
value = 'foo@example.com'
elif isinstance(f, fields.MapField):
value = get_publisher().get_default_position()
elif isinstance(f, fields.ItemsField):
value = 'id1|id2|...'
else:
value = 'value'
sample_line.append(value)
csv_output.writerow(sample_line)
response = get_response()
response.set_content_type('text/plain')
response.set_header(
'content-disposition', 'attachment; filename=%s-sample.csv' % self.formdef.url_name
)
return output.getvalue()
def import_csv(self):
if not self.formdef.can_user_add_cards(get_request().user):
raise errors.AccessForbiddenError()
context = {'required_fields': []}
form = Form(enctype='multipart/form-data', use_tokens=False)
form.add(FileWidget, 'file', title=_('File'), required=True)
form.add_submit('submit', _('Submit'))
form.add_submit('cancel', _('Cancel'))
if form.get_widget('cancel').parse():
return redirect('.')
if form.is_submitted() and not form.has_errors():
try:
return self.import_csv_submit(form.get_widget('file').parse().fp)
except ValueError as e:
form.set_error('file', e)
get_response().breadcrumb.append(('import_csv', _('Import CSV')))
html_top('data_management', _('Import CSV'))
context['html_form'] = form
for field in get_import_csv_fields(self.formdef):
if not hasattr(field, 'required'):
continue
if field.required and field.convert_value_from_str is None:
context['required_fields'].append(field.label)
return template.QommonTemplateResponse(
templates=['wcs/backoffice/card-data-import-form.html'], context=context
)
def import_csv_submit(self, fd, afterjob=True, api=False):
content = fd.read()
if b'\0' in content:
raise ValueError(_('Invalid file format.'))
for charset in ('utf-8', 'iso-8859-15'):
try:
content = content.decode(charset)
break
except UnicodeDecodeError:
continue
try:
dialect = csv.Sniffer().sniff(content)
except csv.Error:
dialect = None
reader = csv.reader(content.splitlines(keepends=True), dialect=dialect)
try:
caption = next(reader)
except StopIteration:
raise ValueError(_('Invalid CSV file.'))
carddef_fields = get_import_csv_fields(self.formdef)
if len(caption) < len(carddef_fields):
raise ValueError(_('CSV file contains less columns than card fields.'))
data_lines = []
incomplete_lines = []
for line_no, csv_line in enumerate(reader):
if len(csv_line) != len(carddef_fields):
# +2 because header and counting from 1.
incomplete_lines.append(str(line_no + 2))
continue
data_lines.append(csv_line)
if incomplete_lines:
error_message = _('CSV file contains lines with wrong number of columns.')
if len(incomplete_lines) < 5:
error_message += ' ' + _('(line numbers %s)') % ', '.join(incomplete_lines)
else:
error_message += ' ' + _('(line numbers %s and more)') % ', '.join(incomplete_lines[:5])
raise ValueError(error_message)
job = ImportFromCsvAfterJob(carddef=self.formdef, data_lines=data_lines)
if afterjob:
get_response().add_after_job(job)
if api:
return job
return redirect(job.get_processing_url())
else:
job.execute()
def _q_lookup(self, component):
if not self.view:
for view in self.get_custom_views():
if view.get_url_slug() == component:
return self.__class__(formdef=self.formdef, view=view)
try:
filled = self.formdef.data_class().get(component)
except KeyError:
raise errors.TraversalError()
return CardBackOfficeStatusPage(self.formdef, filled)
class CardFillPage(FormFillPage):
formdef_class = CardDef
has_channel_support = False
has_user_support = False
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if self.formdef.user_support == 'optional':
self.has_user_support = True
def get_default_return_url(self):
if self.formdef.is_of_concern_for_user(get_request().user):
return '%s/data/%s/' % (get_publisher().get_backoffice_url(), self.formdef.url_name)
else:
# redirect to cards index page if the user is not allowed to see the cards
return '%s/data/' % get_publisher().get_backoffice_url()
def redirect_after_submitted(self, form, filled):
if get_request().form.get('_popup'):
popup_response_data = json.dumps(
{
'value': str(filled.id),
'obj': str(filled.default_digest),
}
)
return template.QommonTemplateResponse(
templates=['wcs/backoffice/popup_response.html'],
context={'popup_response_data': popup_response_data},
is_django_native=True,
)
return super().redirect_after_submitted(form, filled)
def create_form(self, *args, **kwargs):
form = super().create_form(*args, **kwargs)
if get_request().form.get('_popup'):
form.add_hidden('_popup', 1)
return form
class CardBackOfficeStatusPage(FormBackOfficeStatusPage):
form_page_class = CardFillPage
sidebar_recorded_message = _('The card has been recorded on %(date)s with the number %(number)s.')
sidebar_recorded_by_agent_message = _(
'The card has been recorded on %(date)s with the number %(number)s by %(agent)s.'
)
def html_top(self, title=None):
return html_top('data_management', title)
def should_fold_summary(self, mine, request_user):
return False
def should_fold_history(self):
return True
class ImportFromCsvAfterJob(AfterJob):
def __init__(self, carddef, data_lines):
super().__init__(
label=_('Importing data into cards'),
carddef_class=carddef.__class__,
carddef_id=carddef.id,
data_lines=data_lines,
)
def user_lookup(self, user_value):
if self.carddef.user_support != 'optional':
return None
return get_publisher().user_class.lookup_by_string(user_value)
def execute(self):
self.carddef = self.kwargs['carddef_class'].get(self.kwargs['carddef_id'])
carddata_class = self.carddef.data_class()
self.total_count = len(self.kwargs['data_lines'])
self.store()
carddef_fields = get_import_csv_fields(self.carddef)
for csv_line in self.kwargs['data_lines']:
data_instance = carddata_class()
data_instance.data = {}
for i, field in enumerate(carddef_fields):
value = csv_line[i].strip()
# skip empty values
if not value:
continue
# skip unsupported field types
if field.convert_value_from_str is None:
continue
field.set_value(data_instance.data, field.convert_value_from_str(value))
user_value = data_instance.data.pop('_user', None)
data_instance.user = self.user_lookup(user_value)
data_instance.just_created()
data_instance.store()
data_instance.perform_workflow(event='csv-import-created')
self.increment_count()
def done_action_url(self):
carddef = self.kwargs['carddef_class'].get(self.kwargs['carddef_id'])
return carddef.get_url()
def done_action_label(self):
return _('Back to Listing')
def done_button_attributes(self):
return {'data-redirect-auto': 'true'}