wcs/wcs/wf/wscall.py

604 lines
22 KiB
Python

# w.c.s. - web application for online forms
# Copyright (C) 2005-2013 Entr'ouvert
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import collections
import datetime
import io
import mimetypes
import sys
import traceback
import xml.etree.ElementTree as ET
from django.utils.encoding import force_text
from quixote import get_publisher, get_request
from quixote.html import TemplateIO, htmltext
from wcs.workflows import (
AbortActionException,
AttachmentEvolutionPart,
WorkflowStatusItem,
register_item_class,
)
from wcs.wscalls import call_webservice, get_app_error_code
from ..qommon import N_, _, force_str
from ..qommon.errors import ConnectionError
from ..qommon.form import (
CheckboxWidget,
ComputedExpressionWidget,
HtmlWidget,
RadiobuttonsWidget,
SingleSelectWidget,
StringWidget,
VarnameWidget,
WidgetDict,
)
from ..qommon.misc import json_loads
class JournalWsCallErrorPart:
content = None
data = None
label = None
def __init__(self, summary, label=None, data=None):
self.summary = summary
self.label = label
if data:
self.data = data[:10000] # beware of huge responses
def view(self):
if not (get_request() and get_request().get_path().startswith('/backoffice/')):
return ''
r = TemplateIO(html=True)
r += htmltext('<div class="ws-error">')
r += htmltext('<h4 class="foldable folded">')
if self.label:
r += _('Error during webservice call "%s"') % self.label
else:
r += _('Error during webservice call')
r += htmltext('</h4>')
r += htmltext('<div>')
r += htmltext('<p>%s</p>\n') % self.summary
if self.data:
try:
json_data = json_loads(self.data)
except ValueError:
pass
else:
labels = {
'err': _('Error Code'),
'err_class': _('Error Class'),
'err_desc': _('Error Description'),
'reason': _('Reason'),
}
r += htmltext('<ul>')
for attr in ('err', 'err_class', 'err_desc', 'reason'):
if attr in json_data:
r += htmltext('<li>%s: %s</li>\n') % (labels.get(attr), json_data[attr])
r += htmltext('</ul>')
r += htmltext('</div>')
r += htmltext('</div>')
return r.getvalue()
def get_json_export_dict(self, anonymise=False, include_files=True):
d = {
'type': 'wscall-error',
}
if not anonymise:
d.update(
{
'summary': self.summary,
'label': self.label,
'data': self.data,
}
)
return d
class WebserviceCallStatusItem(WorkflowStatusItem):
description = N_('Webservice')
key = 'webservice_call'
category = 'interaction'
support_substitution_variables = True
label = None
url = None
varname = None
post = False
request_signature_key = None
post_data = None
qs_data = None
_method = None
response_type = 'json'
backoffice_filefield_id = None
action_on_app_error = ':pass'
action_on_4xx = ':stop'
action_on_5xx = ':stop'
action_on_bad_data = ':pass'
action_on_network_errors = ':stop'
notify_on_errors = False
record_on_errors = True
record_errors = False
@property
def waitpoint(self):
for jump_attribute in (
'action_on_app_error',
'action_on_4xx',
'action_on_5xx',
'action_on_bad_data',
'action_on_network_errors',
):
if getattr(self, jump_attribute) == ':stop':
return True
return False
@property
def method(self):
if self._method in ('GET', 'POST', 'PUT', 'PATCH', 'DELETE'):
return self._method
if self.post or self.post_data:
return 'POST'
return 'GET'
@method.setter
def method(self, value):
self._method = value
def get_line_details(self):
if self.label:
return self.label
else:
return None
def get_parameters(self):
return (
'label',
'url',
'request_signature_key',
'qs_data',
'method',
'post',
'post_data',
'response_type',
'varname',
'backoffice_filefield_id',
'action_on_app_error',
'action_on_4xx',
'action_on_5xx',
'action_on_bad_data',
'action_on_network_errors',
'notify_on_errors',
'record_on_errors',
'record_errors',
'condition',
)
def add_parameters_widgets(self, form, parameters, prefix='', formdef=None, **kwargs):
super().add_parameters_widgets(form, parameters, prefix=prefix, formdef=formdef, **kwargs)
if 'label' in parameters:
form.add(StringWidget, '%slabel' % prefix, size=40, title=_('Label'), value=self.label)
if 'url' in parameters:
form.widgets.append(HtmlWidget(htmltext('<h3>%s</h3>') % _('Request')))
form.add(
StringWidget,
'%surl' % prefix,
title=_('URL'),
value=self.url,
size=80,
hint=_('Common variables are available with the {{variable}} syntax.'),
)
if 'request_signature_key' in parameters:
form.add(
ComputedExpressionWidget,
'%srequest_signature_key' % prefix,
title=_('Request Signature Key'),
value=self.request_signature_key,
advanced=not (self.request_signature_key),
)
if 'qs_data' in parameters:
form.add(
WidgetDict,
'%sqs_data' % prefix,
title=_('Query string data'),
value=self.qs_data or {},
element_value_type=ComputedExpressionWidget,
)
methods = collections.OrderedDict(
[
('GET', _('GET')),
('POST', _('POST (JSON)')),
('PUT', _('PUT (JSON)')),
('PATCH', _('PATCH (JSON)')),
('DELETE', _('DELETE')),
]
)
if 'method' in parameters:
form.add(
RadiobuttonsWidget,
'%smethod' % prefix,
title=_('Method'),
options=list(methods.items()),
value=self.method,
attrs={'data-dynamic-display-parent': 'true'},
extra_css_class='widget-inline-radio',
)
if 'post' in parameters:
form.add(
CheckboxWidget,
'%spost' % prefix,
title=_('Post formdata'),
value=self.post,
attrs={
'data-dynamic-display-child-of': '%smethod' % prefix,
'data-dynamic-display-value-in': '|'.join(
[_(methods['POST']), _(methods['PUT']), _(methods['PATCH'])]
),
},
)
if 'post_data' in parameters:
form.add(
WidgetDict,
'%spost_data' % prefix,
title=_('POST data'),
value=self.post_data or {},
element_value_type=ComputedExpressionWidget,
attrs={
'data-dynamic-display-child-of': '%smethod' % prefix,
'data-dynamic-display-value-in': '|'.join(
[_(methods['POST']), _(methods['PUT']), _(methods['PATCH'])]
),
},
)
response_types = collections.OrderedDict([('json', _('JSON')), ('attachment', _('Attachment'))])
if 'response_type' in parameters:
form.widgets.append(HtmlWidget(htmltext('<h3>%s</h3>') % _('Response')))
form.add(
RadiobuttonsWidget,
'%sresponse_type' % prefix,
title=_('Response Type'),
options=list(response_types.items()),
value=self.response_type,
attrs={'data-dynamic-display-parent': 'true'},
extra_css_class='widget-inline-radio',
)
if 'varname' in parameters:
form.add(
VarnameWidget,
'%svarname' % prefix,
title=_('Identifier'),
value=self.varname,
hint=_('This is used as prefix for webservice result variable names.'),
)
if 'backoffice_filefield_id' in parameters:
options = self.get_backoffice_filefield_options()
if options:
form.add(
SingleSelectWidget,
'%sbackoffice_filefield_id' % prefix,
title=_('Store in a backoffice file field'),
value=self.backoffice_filefield_id,
options=[(None, '---', None)] + options,
attrs={
'data-dynamic-display-child-of': '%sresponse_type' % prefix,
'data-dynamic-display-value': response_types.get('attachment'),
},
)
if 'action_on_app_error' in parameters:
form.widgets.append(HtmlWidget(htmltext('<h3>%s</h3>') % _('Error Handling')))
error_actions = [(':stop', _('Stop')), (':pass', _('Ignore'))]
error_actions.extend([(x.id, _('Jump to %s') % x.name) for x in self.parent.parent.possible_status])
for attribute in (
'action_on_app_error',
'action_on_4xx',
'action_on_5xx',
'action_on_network_errors',
'action_on_bad_data',
):
if attribute not in parameters:
continue
if attribute == 'action_on_bad_data':
attrs = {
'data-dynamic-display-child-of': '%sresponse_type' % prefix,
'data-dynamic-display-value': response_types.get('json'),
}
else:
attrs = {}
label = {
'action_on_app_error': _('Action on application error'),
'action_on_4xx': _('Action on HTTP error 4xx'),
'action_on_5xx': _('Action on HTTP error 5xx'),
'action_on_bad_data': _('Action on non-JSON response'),
'action_on_network_errors': _('Action on network errors'),
}.get(attribute)
form.add(
SingleSelectWidget,
'%s%s' % (prefix, attribute),
title=label,
value=getattr(self, attribute),
options=error_actions,
attrs=attrs,
)
if 'notify_on_errors' in parameters:
form.add(
CheckboxWidget,
'%snotify_on_errors' % prefix,
title=_('Notify on errors'),
value=self.notify_on_errors,
)
if 'record_on_errors' in parameters:
form.add(
CheckboxWidget,
'%srecord_on_errors' % prefix,
title=_('Record on errors'),
value=self.record_on_errors,
)
if 'record_errors' in parameters:
form.add(
CheckboxWidget,
'%srecord_errors' % prefix,
title=_('Record errors in the log'),
value=self.record_errors,
)
def perform(self, formdata):
if not self.url:
# misconfigured action
return
workflow_data = {}
if self.varname:
workflow_data['%s_time' % self.varname] = datetime.datetime.now().isoformat()
try:
response, status, data = call_webservice(
url=self.url,
qs_data=self.qs_data,
request_signature_key=self.request_signature_key,
method=self.method,
post_data=self.post_data,
post_formdata=self.post,
formdata=formdata,
)
except ConnectionError as e:
status = 0
if self.varname:
workflow_data['%s_connection_error' % self.varname] = str(e)
formdata.update_workflow_data(workflow_data)
formdata.store()
self.action_on_error(self.action_on_network_errors, formdata, exc_info=sys.exc_info())
return
app_error_code = get_app_error_code(response, data, self.response_type)
app_error_code_header = response.headers.get('x-error-code')
if self.varname:
workflow_data.update(
{
'%s_status' % self.varname: status,
'%s_app_error_code' % self.varname: app_error_code,
}
)
if app_error_code_header:
workflow_data['%s_app_error_header' % self.varname] = app_error_code_header
if status in (204, 205):
pass # not returning any content
elif (status // 100) == 2 and app_error_code == 0:
self.store_response(formdata, response, data, workflow_data)
else: # on error, record data if it is JSON
try:
d = json_loads(data)
except (ValueError, TypeError):
pass
else:
workflow_data['%s_error_response' % self.varname] = d
formdata.update_workflow_data(workflow_data)
formdata.store()
if self.backoffice_filefield_id:
if (status // 100) == 2 and app_error_code == 0 and status not in (204, 205):
filename, content_type = self.get_attachment_data(response)
self.store_in_backoffice_filefield(
formdata, self.backoffice_filefield_id, filename, content_type, data
)
if app_error_code != 0:
self.action_on_error(self.action_on_app_error, formdata, response, data=data)
if (status // 100) == 4:
self.action_on_error(self.action_on_4xx, formdata, response, data=data)
if (status // 100) == 5:
self.action_on_error(self.action_on_5xx, formdata, response, data=data)
def get_attachment_data(self, response):
content_type = response.headers.get('content-type') or ''
if content_type:
content_type = content_type.split(';')[0].strip().lower()
extension = mimetypes.guess_extension(content_type, strict=False) or ''
if self.varname:
filename = '%s%s' % (self.varname, extension)
elif self.backoffice_filefield_id:
filename = 'file-%s%s' % (self.backoffice_filefield_id, extension)
else:
filename = 'file%s' % extension
return filename, content_type
def store_response(self, formdata, response, data, workflow_data):
if self.response_type == 'json':
try:
d = json_loads(force_text(data))
except (ValueError, TypeError):
formdata.update_workflow_data(workflow_data)
formdata.store()
self.action_on_error(
self.action_on_bad_data, formdata, response, data=data, exc_info=sys.exc_info()
)
else:
workflow_data['%s_response' % self.varname] = d
if isinstance(d, dict) and self.method == 'POST':
# if POST response contains a display_id value it is
# considered to be used as replacement for the form
# own identifier; this is used so a unique public
# identifier can be used between w.c.s. and a business
# application.
if isinstance(d.get('data'), dict) and d['data'].get('display_id'):
formdata.id_display = d.get('data', {}).get('display_id')
elif d.get('display_id'):
formdata.id_display = d.get('display_id')
else: # store result as attachment
filename, content_type = self.get_attachment_data(response)
workflow_data['%s_content_type' % self.varname] = content_type
workflow_data['%s_length' % self.varname] = len(data)
fp_content = io.BytesIO(data)
attachment = AttachmentEvolutionPart(
filename, fp_content, content_type=content_type, varname=self.varname
)
formdata.evolution[-1].add_part(attachment)
def action_on_error(self, action, formdata, response=None, data=None, exc_info=None):
if action in (':pass', ':stop') and (
self.notify_on_errors or self.record_on_errors or self.record_errors
):
if exc_info:
summary = traceback.format_exception_only(exc_info[0], exc_info[1])[-1]
else:
summary = '<no response>'
if response is not None:
summary = '%s %s' % (response.status_code, response.reason)
try:
raise Exception(summary)
except Exception:
exc_info = sys.exc_info()
if self.notify_on_errors or self.record_on_errors:
get_publisher().notify_of_exception(
exc_info, context='[WSCALL]', notify=self.notify_on_errors, record=self.record_on_errors
)
if self.record_errors and formdata.evolution:
formdata.evolution[-1].add_part(JournalWsCallErrorPart(summary, self.label, data))
formdata.store()
if action == ':pass':
return
if action == ':stop':
raise AbortActionException()
# verify that target still exist
try:
self.parent.parent.get_status(action)
except KeyError:
try:
raise IndexError(
'reference to invalid status %r in workflow %r, status %r'
% (action, self.parent.parent.name, self.parent.name)
)
except IndexError:
get_publisher().notify_of_exception(sys.exc_info(), context='[WSCALL]')
raise AbortActionException()
formdata.status = 'wf-%s' % action
formdata.store()
raise AbortActionException()
def get_target_status(self):
# always return self status as a target so it's included in the
# workflow visualisation as a "normal" action, in addition to
# jumps related to error handling.
targets = [self.parent]
for attribute in (
'action_on_app_error',
'action_on_4xx',
'action_on_5xx',
'action_on_bad_data',
'action_on_network_errors',
):
value = getattr(self, attribute)
if value in (':pass', ':stop'):
continue
try:
target = self.parent.parent.get_status(value)
except KeyError:
message = _(
'reference to invalid status in workflow %(workflow)s, status %(status)s, item %(item)s'
) % {
'workflow': self.parent.parent.name,
'status': self.parent.name,
'item': self.description,
}
get_publisher().record_error(message, workflow=self.parent.parent)
continue
targets.append(target)
return targets
def get_jump_label(self, target_id):
if target_id == self.parent.id:
if self.label:
return _('Webservice "%s"') % self.label
else:
return _('Webservice')
else:
if self.label:
return _('Error calling webservice "%s"') % self.label
else:
return _('Error calling webservice')
def _kv_data_export_to_xml(self, xml_item, charset, include_id, attribute):
assert attribute
if not getattr(self, attribute):
return
el = ET.SubElement(xml_item, attribute)
for (key, value) in getattr(self, attribute).items():
item = ET.SubElement(el, 'item')
if isinstance(key, str):
ET.SubElement(item, 'name').text = force_text(key)
else:
raise AssertionError('unknown type for key (%r)' % key)
if isinstance(value, str):
ET.SubElement(item, 'value').text = force_text(value)
else:
raise AssertionError('unknown type for value (%r)' % key)
def _kv_data_init_with_xml(self, elem, charset, include_id, attribute):
if elem is None:
return
setattr(self, attribute, {})
for item in elem.findall('item'):
key = force_str(item.find('name').text)
value = force_str(item.find('value').text)
getattr(self, attribute)[key] = value
def post_data_export_to_xml(self, xml_item, charset, include_id=False):
self._kv_data_export_to_xml(xml_item, charset, include_id=include_id, attribute='post_data')
def post_data_init_with_xml(self, elem, charset, include_id=False, snapshot=False):
self._kv_data_init_with_xml(elem, charset, include_id=include_id, attribute='post_data')
def qs_data_export_to_xml(self, xml_item, charset, include_id=False):
self._kv_data_export_to_xml(xml_item, charset, include_id=include_id, attribute='qs_data')
def qs_data_init_with_xml(self, elem, charset, include_id=False, snapshot=False):
self._kv_data_init_with_xml(elem, charset, include_id=include_id, attribute='qs_data')
register_item_class(WebserviceCallStatusItem)