wcs/wcs/wf/export_to_model.py

659 lines
29 KiB
Python

# w.c.s. - web application for online forms
# Copyright (C) 2005-2010 Entr'ouvert
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import base64
import collections
from xml.etree import ElementTree as ET
import zipfile
import os
import random
import subprocess
import tempfile
import time
import shutil
from django.utils import six
from django.utils.encoding import force_bytes, force_text
from django.utils.six import BytesIO, StringIO
from quixote import get_response, get_request, get_publisher
from quixote.directory import Directory
from quixote.html import htmltext
from ..qommon import _, N_, ezt, misc, force_str
from ..qommon import get_logger
from ..qommon.form import (SingleSelectWidget, WidgetList, CheckboxWidget,
StringWidget, UploadWidget, WysiwygTextWidget, Upload,
UploadedFile, UploadValidationError, VarnameWidget,
RadiobuttonsWidget, PicklableUpload, ComputedExpressionWidget)
from ..qommon.errors import PublishError
from ..qommon.template import TemplateError
from wcs.fields import SubtitleField, TitleField, CommentField, PageField
from wcs.workflows import (WorkflowStatusItem, AttachmentEvolutionPart,
template_on_formdata, register_item_class,
get_formdata_template_context, template_on_context)
from wcs.portfolio import has_portfolio, push_document
OO_TEXT_NS = 'urn:oasis:names:tc:opendocument:xmlns:text:1.0'
OO_OFFICE_NS = 'urn:oasis:names:tc:opendocument:xmlns:office:1.0'
OO_STYLE_NS = 'urn:oasis:names:tc:opendocument:xmlns:style:1.0'
OO_DRAW_NS = 'urn:oasis:names:tc:opendocument:xmlns:drawing:1.0'
OO_FO_NS = 'urn:oasis:names:tc:opendocument:xmlns:xsl-fo-compatible:1.0'
XLINK_NS = 'http://www.w3.org/1999/xlink'
USER_FIELD_DECL = '{%s}user-field-decl' % OO_TEXT_NS
USER_FIELD_GET = '{%s}user-field-get' % OO_TEXT_NS
SECTION_NODE = '{%s}section' % OO_TEXT_NS
SECTION_NAME = '{%s}name' % OO_TEXT_NS
STRING_VALUE = '{%s}string-value' % OO_OFFICE_NS
DRAW_FRAME = '{%s}frame' % OO_DRAW_NS
DRAW_NAME = '{%s}name' % OO_DRAW_NS
DRAW_IMAGE = '{%s}image' % OO_DRAW_NS
XLINK_HREF = '{%s}href' % XLINK_NS
NAME = '{%s}name' % OO_TEXT_NS
try:
subprocess.check_call(['which', 'libreoffice'], stdout=subprocess.DEVNULL)
def transform_to_pdf(instream):
try:
temp_dir = tempfile.mkdtemp()
with tempfile.NamedTemporaryFile(dir=temp_dir) as infile:
while True:
chunk = instream.read(100000)
if not chunk:
break
infile.write(chunk)
infile.flush()
for i in range(3):
lo_output = subprocess.run(
['libreoffice', '-env:UserInstallation=file://%s' % temp_dir,
'--headless', '--convert-to', 'pdf',
infile.name, '--outdir', temp_dir],
check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if os.path.exists(infile.name + '.pdf'):
break
# sometimes libreoffice fails and sometimes it's ok
# afterwards.
time.sleep(0.5)
if not os.path.exists(infile.name + '.pdf'):
raise Exception('libreoffice failed to produce pdf (stdout: %r, stderr: %r)' % (
lo_output.stdout, lo_output.stderr))
return open(infile.name + '.pdf', 'rb')
except subprocess.CalledProcessError:
raise Exception('libreoffice is failing')
finally:
shutil.rmtree(temp_dir)
except subprocess.CalledProcessError:
transform_to_pdf = None
def transform_opendocument(instream, outstream, process):
'''Take a file-like object containing an ODT, ODS, or any open-office
format, parse context.xml with element tree and apply process to its root
node.
'''
zin = zipfile.ZipFile(instream, mode='r')
zout = zipfile.ZipFile(outstream, mode='w')
new_images = {}
assert 'content.xml' in zin.namelist()
for filename in zin.namelist():
# first pass to process meta.xml, content.xml and styles.xml
if filename not in ('meta.xml', 'content.xml', 'styles.xml'):
continue
content = zin.read(filename)
root = ET.fromstring(content)
process(root, new_images)
content = ET.tostring(root)
zout.writestr(filename, content)
for filename in zin.namelist():
# second pass to copy/replace other files
if filename in ('meta.xml', 'content.xml', 'styles.xml'):
continue
if filename in new_images:
content = new_images[filename].get_content()
else:
content = zin.read(filename)
zout.writestr(filename, content)
zout.close()
def is_opendocument(stream):
try:
with zipfile.ZipFile(stream) as z:
if 'mimetype' in z.namelist():
return z.read('mimetype').startswith(b'application/vnd.oasis.opendocument.')
except zipfile.BadZipfile:
return False
finally:
stream.seek(0)
class TemplatingError(PublishError):
def __init__(self, description):
self.title = _('Templating Error')
self.description = description
def get_varnames(fields):
'''Extract variable names for helping people fill their templates.
Prefer to variable name to the numeric field name.
'''
varnames = []
for field in fields:
if isinstance(field, (SubtitleField, TitleField, CommentField,
PageField)):
continue
# add it as f$n$
label = field.label
if field.varname:
varnames.append(('var_%s' % field.varname, label))
else:
varnames.append(('f%s' % field.id, label))
return varnames
class ExportToModelDirectory(Directory):
_q_exports = ['']
def __init__(self, formdata, wfstatusitem, wfstatus):
self.formdata = formdata
self.wfstatusitem = wfstatusitem
def _q_index(self):
if not self.wfstatusitem.model_file:
raise TemplatingError(_('No model defined for this action'))
response = get_response()
if self.wfstatusitem.convert_to_pdf:
response.content_type = 'application/pdf'
else:
response.content_type = self.wfstatusitem.model_file.content_type
response.set_header('location', '..')
filename = self.wfstatusitem.get_filename()
if self.wfstatusitem.convert_to_pdf:
filename = filename.rsplit('.', 1)[0] + '.pdf'
if response.content_type != 'text/html':
response.set_header('content-disposition',
'attachment; filename="%s"' % filename)
return self.wfstatusitem.apply_template_to_formdata(self.formdata).read()
class ExportToModel(WorkflowStatusItem):
description = N_('Document Creation')
key = 'export_to_model'
category = 'formdata-action'
support_substitution_variables = True
ok_in_global_action = False
filename = None
endpoint = False
waitpoint = True
label = None
model_file = None
attach_to_history = False
directory_class = ExportToModelDirectory
by = ['_receiver']
backoffice_info_text = None
varname = None
convert_to_pdf = bool(transform_to_pdf)
push_to_portfolio = False
method = 'interactive'
backoffice_filefield_id = None
def get_line_details(self):
if self.model_file:
return _('with model named %(file_name)s of %(size)s bytes') % {
'file_name': self.model_file.base_filename,
'size': self.model_file.size}
else:
return _('no model set')
def fill_form(self, form, formdata, user, **kwargs):
if not self.method == 'interactive':
return
label = self.label
if not label:
label = _('Create Document')
form.add_submit('button%s' % self.id, label, **{'class': 'download'})
widget = form.get_widget('button%s' % self.id)
widget.backoffice_info_text = self.backoffice_info_text
def submit_form(self, form, formdata, user, evo):
if not self.method == 'interactive':
return
if not self.model_file:
return
if form.get_submit() == 'button%s' % self.id:
if not evo.comment:
evo.comment = _('Form exported in a model')
self.perform_real(formdata, evo)
in_backoffice = get_request() and get_request().is_in_backoffice()
if self.attach_to_history:
return
base_url = formdata.get_url(backoffice=in_backoffice)
return base_url + self.get_directory_name()
def model_file_validation(self, upload):
if hasattr(upload, 'fp'):
fp = upload.fp
elif hasattr(upload, 'get_file'):
fp = upload.get_file()
else:
raise UploadValidationError('unknown upload object %r' % upload)
if upload.content_type and upload.content_type == 'application/rtf':
return 'rtf'
if (upload.content_type and upload.content_type == 'application/octet-stream') or \
upload.content_type is None:
if upload.base_filename and upload.base_filename.endswith('.rtf'):
return 'rtf'
if fp.read(10).startswith(b'{\\rtf'):
fp.seek(0)
return 'rtf'
fp.seek(0)
if upload.content_type and upload.content_type.startswith('application/vnd.oasis.opendocument.'):
return 'opendocument'
if (upload.content_type and upload.content_type == 'application/octet-stream') or \
upload.content_type is None:
if upload.base_filename and upload.base_filename.rsplit('.', 1) in ('odt', 'ods', 'odc', 'odb'):
return 'opendocument'
if is_opendocument(fp):
return 'opendocument'
raise UploadValidationError(_('Only RTF and OpenDocument files can be used'))
def get_parameters(self):
parameters = ('model_file',)
if transform_to_pdf is not None:
parameters += ('convert_to_pdf',)
parameters += ('backoffice_filefield_id', 'attach_to_history', 'varname')
if has_portfolio():
parameters += ('push_to_portfolio',)
parameters += ('method', 'by', 'label', 'backoffice_info_text', 'filename', 'condition')
return parameters
def add_parameters_widgets(self, form, parameters, prefix='',
formdef=None):
super(ExportToModel, self).add_parameters_widgets(
form, parameters, prefix=prefix, formdef=formdef)
methods = collections.OrderedDict(
[('interactive', _('Interactive (button)')),
('non-interactive', _('Non interactive'))])
if 'model_file' in parameters:
ids = (self.parent.parent.id, self.parent.id, self.id)
if formdef:
hint = htmltext('%s: <ul class="varnames">') \
% _('Available variables')
varnames = get_varnames(formdef.fields)
for pair in varnames:
hint += htmltext('<li><tt class="varname">{{%s}}</tt>'
' <label>%s</label></span></li>') % pair
hint += htmltext('</ul>')
ids = (formdef.id,) + ids
filename = 'export_to_model-%s-%s-%s-%s.upload' % ids
else:
hint = _('You can use variables in your model using '
'the {{variable}} syntax, available variables '
'depends on the form.')
filename = 'export_to_model-%s-%s-%s.upload' % ids
widget_name = '%smodel_file' % prefix
if formdef and formdef.workflow_options and \
formdef.workflow_options.get(widget_name) is not None:
value = formdef.workflow_options.get(widget_name)
else:
value = self.model_file
if value:
hint_prefix = htmltext('<div>%s: <a href="?file=%s">%s</a></div>') % \
(_('Current value'), widget_name, value.base_filename)
hint = hint_prefix + hint
form.add(UploadWidget, widget_name, directory='models',
filename=filename, title=_('Model'), hint=hint,
validation=self.model_file_validation, value=value)
if 'convert_to_pdf' in parameters:
form.add(CheckboxWidget, '%sconvert_to_pdf' % prefix,
title=_('Convert generated file to PDF'),
value=self.convert_to_pdf)
if 'backoffice_filefield_id' in parameters:
options = self.get_backoffice_filefield_options()
if options:
form.add(SingleSelectWidget, '%sbackoffice_filefield_id' % prefix,
title=_('Store in a backoffice file field'),
value=self.backoffice_filefield_id,
options=[(None, '---', None)] + options)
if 'attach_to_history' in parameters:
form.add(CheckboxWidget, '%sattach_to_history' % prefix,
title=_('Include generated file in the form history'),
value=self.attach_to_history)
if 'varname' in parameters:
form.add(VarnameWidget, '%svarname' % prefix,
title=_('Identifier'), value=self.varname,
hint=_('This is used to get generated document in expressions.'))
if 'push_to_portfolio' in parameters:
form.add(CheckboxWidget, '%spush_to_portfolio' % prefix,
title=_('Push generated file to portfolio'),
value=self.push_to_portfolio)
if 'method' in parameters:
form.add(RadiobuttonsWidget, '%smethod' % prefix,
title=_('Method'),
options=list(methods.items()),
value=self.method,
attrs={'data-dynamic-display-parent': 'true'})
if 'by' in parameters:
options = [(None, '---', None)] + self.get_list_of_roles()
form.add(WidgetList, '%sby' % prefix, title=_('By'),
element_type=SingleSelectWidget,
value=self.by,
add_element_label=_('Add Role'),
attrs={
'data-dynamic-display-child-of': '%smethod' % prefix,
'data-dynamic-display-value': methods.get('interactive'),
},
element_kwargs={
'render_br': False,
'options': options})
if 'label' in parameters:
form.add(StringWidget, '%slabel' % prefix,
title=_('Button Label'),
value=self.label,
attrs={
'data-dynamic-display-child-of': '%smethod' % prefix,
'data-dynamic-display-value': methods.get('interactive'),
})
if 'backoffice_info_text' in parameters:
form.add(WysiwygTextWidget, '%sbackoffice_info_text' % prefix,
title=_('Information Text for Backoffice'),
value=self.backoffice_info_text,
attrs={
'data-dynamic-display-child-of': '%smethod' % prefix,
'data-dynamic-display-value': methods.get('interactive'),
})
if 'filename' in parameters:
form.add(ComputedExpressionWidget, name='%sfilename' % prefix, title=_('File name'),
value=self.filename)
def get_filename(self):
filename = None
if self.filename:
filename = self.compute(self.filename)
if not filename:
filename = self.model_file.base_filename
filename = filename.replace('/', '-')
return filename
def get_directory_name(self):
return misc.simplify(self.label or 'export_to_model', space='_')
directory_name = property(get_directory_name)
def apply_template_to_formdata(self, formdata):
assert self.model_file
kind = self.model_file_validation(self.model_file)
if kind == 'rtf':
outstream = self.apply_rtf_template_to_formdata(formdata)
elif kind == 'opendocument':
outstream = self.apply_od_template_to_formdata(formdata)
else:
raise Exception('unsupported model kind %r' % kind)
if self.convert_to_pdf:
if transform_to_pdf is None:
raise Exception('libreoffice is missing')
return transform_to_pdf(outstream)
return outstream
def apply_rtf_template_to_formdata(self, formdata):
try:
# force ezt_only=True because an RTF file may contain {{ characters
# and would be seen as a Django template
return BytesIO(force_bytes(template_on_formdata(
formdata,
force_text(self.model_file.get_file().read()),
ezt_format=ezt.FORMAT_RTF,
ezt_only=True)))
except TemplateError as e:
url = formdata.get_url()
get_logger().error('error in template for export to model [%s]: %s' % (url, str(e)))
raise TemplatingError(_('Error in template: %s') % str(e))
def apply_od_template_to_formdata(self, formdata):
context = get_formdata_template_context(formdata)
def process_styles(root):
styles_node = root.find('{%s}styles' % OO_OFFICE_NS)
if styles_node is None:
return
style_names = set([x.attrib.get('{%s}name' % OO_STYLE_NS) for x in styles_node.getchildren()])
for style_name in ['Page_20_Title', 'Form_20_Title', 'Form_20_Subtitle',
'Field_20_Label', 'Field_20_Value']:
# if any style name is defined, don't alter styles
if style_name in style_names:
return
for i, style_name in enumerate(['Field_20_Label', 'Field_20_Value',
'Form_20_Subtitle', 'Form_20_Title', 'Page_20_Title']):
style_node = ET.SubElement(styles_node, '{%s}style' % OO_STYLE_NS)
style_node.attrib['{%s}name' % OO_STYLE_NS] = style_name
style_node.attrib['{%s}display-name' % OO_STYLE_NS] = style_name.replace('_20_', ' ')
style_node.attrib['{%s}family' % OO_STYLE_NS] = 'paragraph'
para_props = ET.SubElement(style_node, '{%s}paragraph-properties' % OO_STYLE_NS)
if 'Value' not in style_name:
para_props.attrib['{%s}margin-top' % OO_FO_NS] = '0.5cm'
else:
para_props.attrib['{%s}margin-left' % OO_FO_NS] = '0.25cm'
if 'Title' in style_name:
text_props = ET.SubElement(style_node, '{%s}text-properties' % OO_STYLE_NS)
text_props.attrib['{%s}font-size' % OO_FO_NS] = '%s%%' % (90 + i * 10)
text_props.attrib['{%s}font-weight' % OO_FO_NS] = 'bold'
def process_root(root, new_images):
if root.tag == '{%s}document-styles' % OO_OFFICE_NS:
return process_styles(root)
# cache for keeping computed user-field-decl value around
user_field_values = {}
def process_text(t):
t = template_on_context(context, force_str(t), autoescape=False)
return force_text(t, get_publisher().site_charset)
nodes = []
for node in root.iter():
nodes.append(node)
for node in nodes:
got_blank_lines = False
if node.tag == SECTION_NODE and 'form_details' in node.attrib.get(SECTION_NAME, ''):
# custom behaviour for a section named form_details
# (actually any name containing form_details), create
# real odt markup.
for child in node.getchildren():
node.remove(child)
self.insert_form_details(node, formdata)
# apply template to user-field-decl and update user-field-get
if node.tag == USER_FIELD_DECL and STRING_VALUE in node.attrib:
node.attrib[STRING_VALUE] = process_text(node.attrib[STRING_VALUE])
if NAME in node.attrib:
user_field_values[node.attrib[NAME]] = node.attrib[STRING_VALUE]
if (node.tag == USER_FIELD_GET and NAME in node.attrib and
node.attrib[NAME] in user_field_values):
node.text = user_field_values[node.attrib[NAME]]
if node.tag == DRAW_FRAME:
name = node.attrib.get(DRAW_NAME)
if not self.get_expression(name)['type'] == 'python':
continue
# variable image
try:
variable_image = self.compute(name)
except:
continue
if not hasattr(variable_image, 'get_content'):
continue
image = [x for x in node if x.tag == DRAW_IMAGE][0]
new_images[image.attrib.get(XLINK_HREF)] = variable_image
for attr in ('text', 'tail'):
if not getattr(node, attr):
continue
old_value = getattr(node, attr)
setattr(node, attr, process_text(old_value))
new_value = getattr(node, attr)
if old_value != new_value and '\n\n' in new_value:
got_blank_lines = True
if got_blank_lines:
# replace blank lines by forced line breaks (it would be
# better to be smart about the document format and create
# real paragraphs if we were inside a paragraph but then
# we would also need to copy its style and what not).
current_tail = node.tail or ''
node.tail = None
as_str = force_str(ET.tostring(node)).replace('\n\n',
2 * ('<nsa:line-break xmlns:nsa="%(ns)s"/>' % {'ns': OO_TEXT_NS}))
as_node = ET.fromstring(as_str)
node.text = as_node.text
for child in node.getchildren():
node.remove(child)
for child in as_node.getchildren():
node.append(child)
node.tail = current_tail
outstream = BytesIO()
transform_opendocument(self.model_file.get_file(), outstream,
process_root)
outstream.seek(0)
return outstream
def insert_form_details(self, node, formdata):
field_details = formdata.get_summary_field_details()
section_node = node
for field_value_info in field_details:
f = field_value_info['field']
if f.type == 'page':
page_title = ET.SubElement(section_node, '{%s}h' % OO_TEXT_NS)
page_title.attrib['{%s}outline-level' % OO_TEXT_NS] = '1'
page_title.attrib['{%s}style-name' % OO_TEXT_NS] = 'Page_20_Title'
page_title.text = f.label
continue
if f.type in ('title', 'subtitle'):
label = template_on_formdata(None, f.label, autoescape=False)
title = ET.SubElement(section_node, '{%s}h' % OO_TEXT_NS)
title.attrib['{%s}outline-level' % OO_TEXT_NS] = '2'
title.attrib['{%s}style-name' % OO_TEXT_NS] = 'Form_20_Title'
if f.type == 'subtitle':
title.attrib['{%s}outline-level' % OO_TEXT_NS] = '3'
title.attrib['{%s}style-name' % OO_TEXT_NS] = 'Form_20_Subtitle'
title.text = label
continue
if f.type == 'comment':
# comment can be free form HTML, ignore them.
continue
if not f.get_opendocument_node_value:
# unsupported field type
continue
label_p = ET.SubElement(section_node, '{%s}p' % OO_TEXT_NS)
label_p.attrib['{%s}style-name' % OO_TEXT_NS] = 'Field_20_Label'
label_p.text = f.label
value = field_value_info['value']
if value is None:
unset_value_p = ET.SubElement(section_node, '{%s}p' % OO_TEXT_NS)
unset_value_p.attrib['{%s}style-name' % OO_TEXT_NS] = 'Field_20_Value'
unset_value_i = ET.SubElement(unset_value_p, '{%s}span' % OO_TEXT_NS)
unset_value_i.text = _('Not set')
else:
node_value = f.get_opendocument_node_value(value, formdata)
if isinstance(node_value, list):
for node in node_value:
section_node.append(node)
node.attrib['{%s}style-name' % OO_TEXT_NS] = 'Field_20_Value'
elif node_value.tag in ('{%s}span' % OO_TEXT_NS, '{%s}a' % OO_TEXT_NS):
value_p = ET.SubElement(section_node, '{%s}p' % OO_TEXT_NS)
value_p.attrib['{%s}style-name' % OO_TEXT_NS] = 'Field_20_Value'
value_p.append(node_value)
else:
node_value.attrib['{%s}style-name' % OO_TEXT_NS] = 'Field_20_Value'
section_node.append(node_value)
def model_file_export_to_xml(self, xml_item, charset, include_id=False):
if not self.model_file:
return
el = ET.SubElement(xml_item, 'model_file')
ET.SubElement(el, 'base_filename').text = self.model_file.base_filename
ET.SubElement(el, 'content_type').text = self.model_file.content_type
ET.SubElement(el, 'b64_content').text = force_text(base64.encodestring(
self.model_file.get_file().read()))
def model_file_init_with_xml(self, elem, charset, include_id=False):
if elem is None:
return
base_filename = elem.find('base_filename').text
content_type = elem.find('content_type').text
if elem.find('b64_content') is not None:
content = base64.decodestring(force_bytes(elem.find('b64_content').text or ''))
if elem.find('content') is not None:
content = elem.find('content').text
if self.parent.parent.id:
ids = (self.parent.parent.id, self.parent.id, self.id)
else:
# hopefully this will be random enough.
ids = ('i%i' % random.randint(0, 1000000), self.parent.id, self.id)
filename = 'export_to_model-%s-%s-%s.upload' % ids
upload = Upload(base_filename, content_type)
upload.fp = BytesIO()
upload.fp.write(content)
upload.fp.seek(0)
self.model_file = UploadedFile('models', filename, upload)
def perform(self, formdata):
if self.method == 'interactive':
return
self.perform_real(formdata, formdata.evolution[-1])
def perform_real(self, formdata, evo):
outstream = self.apply_template_to_formdata(formdata)
filename = self.get_filename()
content_type = self.model_file.content_type
if self.convert_to_pdf:
filename = filename.rsplit('.', 1)[0] + '.pdf'
content_type = 'application/pdf'
if self.push_to_portfolio:
push_document(formdata.get_user(), filename, outstream)
if self.attach_to_history:
evo.add_part(AttachmentEvolutionPart(
filename,
outstream,
content_type=content_type,
varname=self.varname))
formdata.store()
if self.backoffice_filefield_id:
outstream.seek(0)
self.store_in_backoffice_filefield(
formdata,
self.backoffice_filefield_id,
filename,
content_type,
outstream.read())
register_item_class(ExportToModel)