wcs/wcs/wf/export_to_model.py

540 lines
23 KiB
Python

# w.c.s. - web application for online forms
# Copyright (C) 2005-2010 Entr'ouvert
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import base64
import collections
from StringIO import StringIO
from xml.etree import ElementTree as ET
import zipfile
import random
import subprocess
import tempfile
import shutil
from quixote import get_response, get_request, get_publisher
from quixote.directory import Directory
from quixote.html import htmltext
from qommon import _, ezt
from qommon import get_logger
from qommon.form import (SingleSelectWidget, WidgetList, CheckboxWidget,
StringWidget, UploadWidget, WysiwygTextWidget, Upload,
UploadedFile, UploadValidationError, VarnameWidget,
RadiobuttonsWidget, PicklableUpload, ComputedExpressionWidget)
from qommon.errors import PublishError
from qommon.template import TemplateError
import qommon
from wcs.fields import SubtitleField, TitleField, CommentField, PageField
from wcs.workflows import (WorkflowStatusItem, AttachmentEvolutionPart,
template_on_formdata, register_item_class,
get_formdata_template_context, template_on_context)
from wcs.portfolio import has_portfolio, push_document
OO_TEXT_NS = 'urn:oasis:names:tc:opendocument:xmlns:text:1.0'
OO_OFFICE_NS = 'urn:oasis:names:tc:opendocument:xmlns:office:1.0'
OO_DRAW_NS = 'urn:oasis:names:tc:opendocument:xmlns:drawing:1.0'
XLINK_NS = 'http://www.w3.org/1999/xlink'
USER_FIELD_DECL = '{%s}user-field-decl' % OO_TEXT_NS
USER_FIELD_GET = '{%s}user-field-get' % OO_TEXT_NS
STRING_VALUE = '{%s}string-value' % OO_OFFICE_NS
DRAW_FRAME = '{%s}frame' % OO_DRAW_NS
DRAW_NAME = '{%s}name' % OO_DRAW_NS
DRAW_IMAGE = '{%s}image' % OO_DRAW_NS
XLINK_HREF = '{%s}href' % XLINK_NS
NAME = '{%s}name' % OO_TEXT_NS
try:
subprocess.check_call(['which', 'libreoffice'], stdout=open('/dev/null', 'w'))
def transform_to_pdf(instream):
try:
temp_dir = tempfile.mkdtemp()
with open('/dev/null') as dev_null, tempfile.NamedTemporaryFile(dir=temp_dir) as infile:
while True:
chunk = instream.read(100000)
if not chunk:
break
infile.write(chunk)
infile.flush()
subprocess.check_call(['libreoffice', '--headless', '--convert-to', 'pdf',
infile.name, '--outdir', temp_dir], stdout=dev_null,
stderr=dev_null)
return open(infile.name + '.pdf')
except subprocess.CalledProcessError:
raise Exception('libreoffice is failing')
finally:
shutil.rmtree(temp_dir)
except subprocess.CalledProcessError:
transform_to_pdf = None
def transform_opendocument(instream, outstream, process):
'''Take a file-like object containing an ODT, ODS, or any open-office
format, parse context.xml with element tree and apply process to its root
node.
'''
zin = zipfile.ZipFile(instream, mode='r')
zout = zipfile.ZipFile(outstream, mode='w')
new_images = {}
assert 'content.xml' in zin.namelist()
for filename in zin.namelist():
# first pass to process meta.xml and content.xml
if filename not in ('meta.xml', 'content.xml'):
continue
content = zin.read(filename)
root = ET.fromstring(content)
process(root, new_images)
content = ET.tostring(root)
zout.writestr(filename, content)
for filename in zin.namelist():
# second pass to copy/replace other files
if filename in ('meta.xml', 'content.xml'):
continue
if filename in new_images:
content = new_images[filename].get_content()
else:
content = zin.read(filename)
zout.writestr(filename, content)
zout.close()
def is_opendocument(stream):
try:
with zipfile.ZipFile(stream) as z:
if 'mimetype' in z.namelist():
return z.read('mimetype').startswith('application/vnd.oasis.opendocument.')
except zipfile.BadZipfile:
return False
finally:
stream.seek(0)
class TemplatingError(PublishError):
def __init__(self, description):
self.title = _('Templating Error')
self.description = description
def get_varnames(fields):
'''Extract variable names for helping people fill their templates.
Prefer to variable name to the numeric field name.
'''
varnames = []
for field in fields:
if isinstance(field, (SubtitleField, TitleField, CommentField,
PageField)):
continue
# add it as f$n$
label = field.label
if field.varname:
varnames.append(('var_%s' % field.varname, label))
else:
varnames.append(('f%s' % field.id, label))
return varnames
class ExportToModelDirectory(Directory):
_q_exports = ['']
def __init__(self, formdata, wfstatusitem, wfstatus):
self.formdata = formdata
self.wfstatusitem = wfstatusitem
def _q_index(self):
if not self.wfstatusitem.model_file:
raise TemplatingError(_('No model defined for this action'))
response = get_response()
if self.wfstatusitem.convert_to_pdf:
response.content_type = 'application/pdf'
else:
response.content_type = self.wfstatusitem.model_file.content_type
response.set_header('location', '..')
filename = self.wfstatusitem.get_filename()
if self.wfstatusitem.convert_to_pdf:
filename = filename.rsplit('.', 1)[0] + '.pdf'
if response.content_type != 'text/html':
response.set_header('content-disposition',
'attachment; filename="%s"' % filename)
return self.wfstatusitem.apply_template_to_formdata(self.formdata).read()
class ExportToModel(WorkflowStatusItem):
description = N_('Document Creation')
key = 'export_to_model'
category = 'formdata-action'
support_substitution_variables = True
ok_in_global_action = False
filename = None
endpoint = False
waitpoint = True
label = None
model_file = None
attach_to_history = False
directory_class = ExportToModelDirectory
by = ['_receiver']
backoffice_info_text = None
varname = None
convert_to_pdf = False
push_to_portfolio = False
method = 'interactive'
backoffice_filefield_id = None
def get_line_details(self):
if self.model_file:
return _('with model named %(file_name)s of %(size)s bytes') % {
'file_name': self.model_file.base_filename,
'size': self.model_file.size}
else:
return _('no model set')
def fill_form(self, form, formdata, user):
if not self.method == 'interactive':
return
label = self.label
if not label:
label = _('Create Document')
form.add_submit('button%s' % self.id, label, **{'class': 'download'})
widget = form.get_widget('button%s' % self.id)
widget.backoffice_info_text = self.backoffice_info_text
def submit_form(self, form, formdata, user, evo):
if not self.method == 'interactive':
return
if not self.model_file:
return
if form.get_submit() == 'button%s' % self.id:
if not evo.comment:
evo.comment = _('Form exported in a model')
self.perform_real(formdata, evo)
in_backoffice = get_request() and get_request().is_in_backoffice()
if self.attach_to_history:
return formdata.get_url(backoffice=in_backoffice)
base_url = formdata.get_url(backoffice=in_backoffice)
return base_url + self.get_directory_name()
def model_file_validation(self, upload):
if hasattr(upload, 'fp'):
fp = upload.fp
elif hasattr(upload, 'get_file'):
fp = upload.get_file()
else:
raise UploadValidationError('unknown upload object %r' % upload)
if upload.content_type and upload.content_type == 'application/rtf':
return 'rtf'
if (upload.content_type and upload.content_type == 'application/octet-stream') or \
upload.content_type is None:
if upload.base_filename and upload.base_filename.endswith('.rtf'):
return 'rtf'
if fp.read(10).startswith('{\\rtf'):
fp.seek(0)
return 'rtf'
fp.seek(0)
if upload.content_type and upload.content_type.startswith('application/vnd.oasis.opendocument.'):
return 'opendocument'
if (upload.content_type and upload.content_type == 'application/octet-stream') or \
upload.content_type is None:
if upload.base_filename and upload.base_filename.rsplit('.', 1) in ('odt', 'ods', 'odc', 'odb'):
return 'opendocument'
if is_opendocument(fp):
return 'opendocument'
raise UploadValidationError(_('Only RTF and OpenDocument files can be used'))
def get_parameters(self):
parameters = ('model_file',)
if transform_to_pdf is not None:
parameters += ('convert_to_pdf',)
parameters += ('backoffice_filefield_id', 'attach_to_history', 'varname')
if has_portfolio():
parameters += ('push_to_portfolio',)
parameters += ('method', 'by', 'label', 'backoffice_info_text', 'filename', 'condition')
return parameters
def add_parameters_widgets(self, form, parameters, prefix='',
formdef=None):
super(ExportToModel, self).add_parameters_widgets(
form, parameters, prefix=prefix, formdef=formdef)
methods = collections.OrderedDict(
[('interactive', _('Interactive (button)')),
('non-interactive', _('Non interactive'))])
if 'model_file' in parameters:
ids = (self.parent.parent.id, self.parent.id, self.id)
if formdef:
hint = htmltext('%s: <ul class="varnames">') \
% _('Available variables')
varnames = get_varnames(formdef.fields)
for pair in varnames:
hint += htmltext('<li><tt class="varname">{{%s}}</tt>'
' <label>%s</label></span></li>') % pair
hint += htmltext('</ul>')
ids = (formdef.id,) + ids
filename = 'export_to_model-%s-%s-%s-%s.upload' % ids
else:
hint = _('You can use variables in your model using '
'the {{variable}} syntax, available variables '
'depends on the form.')
filename = 'export_to_model-%s-%s-%s.upload' % ids
widget_name = '%smodel_file' % prefix
if formdef and formdef.workflow_options and \
formdef.workflow_options.get(widget_name) is not None:
value = formdef.workflow_options.get(widget_name)
else:
value = self.model_file
if value:
hint_prefix = htmltext('<div>%s: <a href="?file=%s">%s</a></div>') % \
(_('Current value'), widget_name, value.base_filename)
hint = hint_prefix + hint
form.add(UploadWidget, widget_name, directory='models',
filename=filename, title=_('Model'), hint=hint,
validation=self.model_file_validation, value=value)
if 'convert_to_pdf' in parameters:
form.add(CheckboxWidget, '%sconvert_to_pdf' % prefix,
title=_('Convert generated file to PDF'),
value=self.convert_to_pdf)
if 'backoffice_filefield_id' in parameters:
options = self.get_backoffice_filefield_options()
if options:
form.add(SingleSelectWidget, '%sbackoffice_filefield_id' % prefix,
title=_('Store in a backoffice file field'),
value=self.backoffice_filefield_id,
options=[(None, '---', None)] + options)
if 'attach_to_history' in parameters:
form.add(CheckboxWidget, '%sattach_to_history' % prefix,
title=_('Include generated file in the form history'),
value=self.attach_to_history)
if 'varname' in parameters:
form.add(VarnameWidget, '%svarname' % prefix,
title=_('Identifier'), value=self.varname,
hint=_('This is used to get generated document in expressions.'))
if 'push_to_portfolio' in parameters:
form.add(CheckboxWidget, '%spush_to_portfolio' % prefix,
title=_('Push generated file to portfolio'),
value=self.push_to_portfolio)
if 'method' in parameters:
form.add(RadiobuttonsWidget, '%smethod' % prefix,
title=_('Method'),
options=methods.items(),
value=self.method,
attrs={'data-dynamic-display-parent': 'true'})
if 'by' in parameters:
options = [(None, '---', None)] + self.get_list_of_roles()
form.add(WidgetList, '%sby' % prefix, title=_('By'),
element_type=SingleSelectWidget,
value=self.by,
add_element_label=_('Add Role'),
attrs={
'data-dynamic-display-child-of': '%smethod' % prefix,
'data-dynamic-display-value': methods.get('interactive'),
},
element_kwargs={
'render_br': False,
'options': options})
if 'label' in parameters:
form.add(StringWidget, '%slabel' % prefix,
title=_('Button Label'),
value=self.label,
attrs={
'data-dynamic-display-child-of': '%smethod' % prefix,
'data-dynamic-display-value': methods.get('interactive'),
})
if 'backoffice_info_text' in parameters:
form.add(WysiwygTextWidget, '%sbackoffice_info_text' % prefix,
title=_('Information Text for Backoffice'),
value=self.backoffice_info_text,
attrs={
'data-dynamic-display-child-of': '%smethod' % prefix,
'data-dynamic-display-value': methods.get('interactive'),
})
if 'filename' in parameters:
form.add(ComputedExpressionWidget, name='%sfilename' % prefix, title=_('File name'),
value=self.filename)
def get_filename(self):
filename = None
if self.filename:
filename = self.compute(self.filename)
if not filename:
filename = self.model_file.base_filename
return filename
def get_directory_name(self):
return qommon.misc.simplify(self.label or 'export_to_model', space='_')
directory_name = property(get_directory_name)
def apply_template_to_formdata(self, formdata):
assert self.model_file
kind = self.model_file_validation(self.model_file)
if kind == 'rtf':
outstream = self.apply_rtf_template_to_formdata(formdata)
elif kind == 'opendocument':
outstream = self.apply_od_template_to_formdata(formdata)
else:
raise Exception('unsupported model kind %r' % kind)
if self.convert_to_pdf:
if transform_to_pdf is None:
raise Exception('libreoffice is missing')
return transform_to_pdf(outstream)
return outstream
def apply_rtf_template_to_formdata(self, formdata):
try:
# force ezt_only=True because an RTF file may contain {{ characters
# and would be seen as a Django template
return StringIO(template_on_formdata(formdata, self.model_file.get_file().read(),
ezt_format=ezt.FORMAT_RTF, ezt_only=True))
except TemplateError as e:
url = formdata.get_url()
get_logger().error('error in template for export to model [%s]: %s' % (url, str(e)))
raise TemplatingError(_('Error in template: %s') % str(e))
def apply_od_template_to_formdata(self, formdata):
context = get_formdata_template_context(formdata)
def process_root(root, new_images):
# cache for keeping computed user-field-decl value around
user_field_values = {}
def process_text(t):
if isinstance(t, unicode):
t = t.encode(get_publisher().site_charset)
t = template_on_context(context, t, autoescape=False)
return unicode(t, get_publisher().site_charset)
for node in root.iter():
got_blank_lines = False
# apply template to user-field-decl and update user-field-get
if node.tag == USER_FIELD_DECL and STRING_VALUE in node.attrib:
node.attrib[STRING_VALUE] = process_text(node.attrib[STRING_VALUE])
if NAME in node.attrib:
user_field_values[node.attrib[NAME]] = node.attrib[STRING_VALUE]
if (node.tag == USER_FIELD_GET and NAME in node.attrib and
node.attrib[NAME] in user_field_values):
node.text = user_field_values[node.attrib[NAME]]
if node.tag == DRAW_FRAME:
name = node.attrib.get(DRAW_NAME)
if not self.get_expression(name)['type'] == 'python':
continue
# variable image
try:
variable_image = self.compute(name)
except:
continue
if not hasattr(variable_image, 'get_content'):
continue
image = [x for x in node if x.tag == DRAW_IMAGE][0]
new_images[image.attrib.get(XLINK_HREF)] = variable_image
for attr in ('text', 'tail'):
if not getattr(node, attr):
continue
old_value = getattr(node, attr)
setattr(node, attr, process_text(old_value))
new_value = getattr(node, attr)
if old_value != new_value and '\n\n' in new_value:
got_blank_lines = True
if got_blank_lines:
# replace blank lines by forced line breaks (it would be
# better to be smart about the document format and create
# real paragraphs if we were inside a paragraph but then
# we would also need to copy its style and what not).
current_tail = node.tail or ''
node.tail = None
as_str = ET.tostring(node).replace('\n\n',
2 * ('<nsa:line-break xmlns:nsa="%(ns)s"/>' % {'ns': OO_TEXT_NS}))
as_node = ET.fromstring(as_str)
node.text = as_node.text
node._children = as_node._children
node.tail = current_tail
outstream = StringIO()
transform_opendocument(self.model_file.get_file(), outstream,
process_root)
outstream.seek(0)
return outstream
def model_file_export_to_xml(self, xml_item, charset, include_id=False):
if not self.model_file:
return
el = ET.SubElement(xml_item, 'model_file')
ET.SubElement(el, 'base_filename').text = self.model_file.base_filename
ET.SubElement(el, 'content_type').text = self.model_file.content_type
ET.SubElement(el, 'b64_content').text = base64.encodestring(
self.model_file.get_file().read())
def model_file_init_with_xml(self, elem, charset, include_id=False):
if elem is None:
return
base_filename = elem.find('base_filename').text
content_type = elem.find('content_type').text
if elem.find('b64_content') is not None:
content = base64.decodestring(elem.find('b64_content').text)
if elem.find('content') is not None:
content = elem.find('content').text
if self.parent.parent.id:
ids = (self.parent.parent.id, self.parent.id, self.id)
else:
# hopefully this will be random enough.
ids = ('i%i' % random.randint(0, 1000000), self.parent.id, self.id)
filename = 'export_to_model-%s-%s-%s.upload' % ids
upload = Upload(base_filename, content_type)
upload.fp = StringIO()
upload.fp.write(content)
upload.fp.seek(0)
self.model_file = UploadedFile('models', filename, upload)
def perform(self, formdata):
if self.method == 'interactive':
return
self.perform_real(formdata, formdata.evolution[-1])
def perform_real(self, formdata, evo):
outstream = self.apply_template_to_formdata(formdata)
filename = self.get_filename()
content_type = self.model_file.content_type
if self.convert_to_pdf:
filename = filename.rsplit('.', 1)[0] + '.pdf'
content_type = 'application/pdf'
if self.push_to_portfolio:
push_document(formdata.get_user(), filename, outstream)
if self.attach_to_history:
evo.add_part(AttachmentEvolutionPart(
filename,
outstream,
content_type=content_type,
varname=self.varname))
formdata.store()
if self.backoffice_filefield_id:
outstream.seek(0)
self.store_in_backoffice_filefield(
formdata,
self.backoffice_filefield_id,
filename,
content_type,
outstream.read())
register_item_class(ExportToModel)