751 lines
26 KiB
Python
751 lines
26 KiB
Python
# w.c.s. - web application for online forms
|
|
# Copyright (C) 2005-2022 Entr'ouvert
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation; either version 2 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program; if not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import collections
|
|
import datetime
|
|
import http
|
|
import io
|
|
import json
|
|
import socket
|
|
import urllib.parse
|
|
import uuid
|
|
import xml.etree.ElementTree as ET
|
|
from contextlib import contextmanager
|
|
|
|
import requests
|
|
from django.core.handlers.wsgi import WSGIRequest
|
|
from django.utils.timezone import now
|
|
from quixote import get_publisher, get_session_manager
|
|
from urllib3 import HTTPResponse
|
|
|
|
from wcs import sql
|
|
from wcs.compat import CompatHTTPRequest
|
|
from wcs.fields import Field, PageField
|
|
from wcs.qommon.form import FileWithPreviewWidget, Form, get_selection_error_text
|
|
from wcs.qommon.storage import Equal
|
|
from wcs.qommon.template import TemplateError
|
|
from wcs.qommon.xml_storage import XmlStorableObject
|
|
from wcs.workflows import WorkflowStatusItem
|
|
|
|
from .qommon import _
|
|
|
|
|
|
class TestError(Exception):
|
|
action_uuid = None
|
|
|
|
def __init__(self, msg, error=None, details=None, field_id=None):
|
|
self.msg = msg
|
|
self.error = error or msg
|
|
self.details = details or []
|
|
self.field_id = field_id
|
|
|
|
# prevent pytest from trying to collect this class (#75521)
|
|
__test__ = False
|
|
|
|
|
|
class TestDefXmlProxy(XmlStorableObject):
|
|
xml_root_node = 'testdef'
|
|
_names = 'testdef'
|
|
readonly = True
|
|
|
|
# prevent pytest from trying to collect this class
|
|
__test__ = False
|
|
|
|
_webservice_responses = []
|
|
|
|
@classmethod
|
|
@property
|
|
def XML_NODES(cls):
|
|
json_to_xml_types = {
|
|
'varchar': 'str',
|
|
'boolean': 'bool',
|
|
'jsonb': 'jsonb',
|
|
}
|
|
excluded_fields = ['id', 'object_type', 'object_id']
|
|
extra_fields = [
|
|
('_webservice_responses', 'webservice_responses'),
|
|
('workflow_tests', 'workflow_tests'),
|
|
]
|
|
|
|
return [
|
|
(field, json_to_xml_types[kind])
|
|
for field, kind in sql.TestDef._table_static_fields
|
|
if field not in excluded_fields
|
|
] + extra_fields
|
|
|
|
def export_jsonb_to_xml(self, element, attribute_name, **kwargs):
|
|
element.text = json.dumps(getattr(self, attribute_name))
|
|
|
|
def import_jsonb_from_xml(self, element, **kwargs):
|
|
return json.loads(element.text)
|
|
|
|
def export_workflow_tests_to_xml(self, element, attribute_name, **kwargs):
|
|
for subelement in self.workflow_tests.export_to_xml():
|
|
element.append(subelement)
|
|
|
|
def import_workflow_tests_from_xml(self, element, **kwargs):
|
|
from wcs.workflow_tests import WorkflowTests
|
|
|
|
return WorkflowTests.import_from_xml_tree(element)
|
|
|
|
def export_webservice_responses_to_xml(self, element, attribute_name, **kwargs):
|
|
for response in self._webservice_responses:
|
|
element.append(response.export_to_xml())
|
|
|
|
def import_webservice_responses_from_xml(self, element, **kwargs):
|
|
return [WebserviceResponse.import_from_xml_tree(response) for response in element]
|
|
|
|
|
|
class TestDef(sql.TestDef):
|
|
_names = 'testdef'
|
|
|
|
name = ''
|
|
object_type = None # (formdef, carddef, etc.)
|
|
object_id = None
|
|
|
|
data = None # (json export of formdata, carddata, etc.)
|
|
is_in_backoffice = False
|
|
expected_error = None
|
|
agent_id = None
|
|
|
|
ignored_field_types = (
|
|
'subtitle',
|
|
'title',
|
|
'comment',
|
|
'computed',
|
|
'table',
|
|
'table-select',
|
|
'tablerows',
|
|
'ranked-items',
|
|
)
|
|
|
|
def __str__(self):
|
|
return self.name
|
|
|
|
@property
|
|
def workflow_tests(self):
|
|
from wcs.workflow_tests import WorkflowTests
|
|
|
|
if hasattr(self, '_workflow_tests'):
|
|
return self._workflow_tests
|
|
|
|
workflow_tests_list = WorkflowTests.select([Equal('testdef_id', self.id)])
|
|
self._workflow_tests = workflow_tests_list[0] if workflow_tests_list else WorkflowTests()
|
|
self._workflow_tests.testdef = self
|
|
return self._workflow_tests
|
|
|
|
@workflow_tests.setter
|
|
def workflow_tests(self, value):
|
|
self._workflow_tests = value
|
|
|
|
def get_webservice_responses(self):
|
|
return WebserviceResponse.select([Equal('testdef_id', self.id)], order_by='name')
|
|
|
|
def get_admin_url(self):
|
|
base_url = get_publisher().get_backoffice_url()
|
|
objects_dir = 'forms' if self.object_type == 'formdefs' else 'cards'
|
|
return '%s/%s/%s/tests/%s/' % (base_url, objects_dir, self.object_id, self.id)
|
|
|
|
def store(self, *args, **kwargs):
|
|
super().store(*args, **kwargs)
|
|
|
|
self.workflow_tests.testdef_id = self.id
|
|
self.workflow_tests.testdef = self
|
|
self.workflow_tests.store()
|
|
|
|
@classmethod
|
|
def remove_object(cls, id):
|
|
super().remove_object(id)
|
|
from wcs.workflow_tests import WorkflowTests
|
|
|
|
workflow_tests_list = WorkflowTests.select([Equal('testdef_id', id)])
|
|
for workflow_tests in workflow_tests_list:
|
|
workflow_tests.remove_self()
|
|
|
|
responses = WebserviceResponse.select([Equal('testdef_id', id)])
|
|
for response in responses:
|
|
response.remove_self()
|
|
|
|
@classmethod
|
|
def select_for_objectdef(cls, objectdef):
|
|
return cls.select(
|
|
[Equal('object_type', objectdef.get_table_name()), Equal('object_id', objectdef.id)]
|
|
)
|
|
|
|
@classmethod
|
|
def create_from_formdata(cls, formdef, formdata, add_workflow_tests=False):
|
|
testdef = cls()
|
|
testdef.object_type = formdef.get_table_name()
|
|
testdef.object_id = formdef.id
|
|
testdef.is_in_backoffice = formdata.backoffice_submission
|
|
|
|
field_data = {}
|
|
for field in formdef.fields:
|
|
if field.key in cls.ignored_field_types:
|
|
continue
|
|
|
|
if field.id in formdata.data:
|
|
value = formdata.data[field.id]
|
|
|
|
if value and hasattr(field, 'get_json_value'):
|
|
value = field.get_json_value(value)
|
|
|
|
field_data[field.id] = value
|
|
|
|
for suffix in ('raw', 'display', 'structured'):
|
|
key = '%s_%s' % (field.id, suffix)
|
|
if key in formdata.data:
|
|
field_data[key] = formdata.data[key]
|
|
|
|
testdef.data = {
|
|
'fields': field_data,
|
|
'user': formdata.user.get_json_export_dict() if formdata.user else None,
|
|
}
|
|
|
|
if add_workflow_tests:
|
|
testdef.workflow_tests.add_actions_from_formdata(formdata)
|
|
|
|
return testdef
|
|
|
|
def build_formdata(self, objectdef, include_fields=False):
|
|
formdata = objectdef.data_class()()
|
|
formdata.just_created()
|
|
formdata.backoffice_submission = self.is_in_backoffice
|
|
|
|
formdata.workflow_traces = []
|
|
|
|
if self.data['user']:
|
|
formdata.set_user_from_json(self.data['user'])
|
|
|
|
if include_fields:
|
|
for field in objectdef.fields:
|
|
if field.id not in self.data['fields']:
|
|
continue
|
|
|
|
value = self.data['fields'].get(field.id)
|
|
if value is not None:
|
|
value = field.from_json_value(value)
|
|
|
|
self.add_value_to_formdata(field, formdata, value)
|
|
|
|
return formdata
|
|
|
|
@contextmanager
|
|
def fake_request(self):
|
|
def record_error(error_summary=None, exception=None, *args, **kwargs):
|
|
self.recorded_errors.append(str(error_summary or exception))
|
|
|
|
real_record_error = get_publisher().record_error
|
|
real_http_adapter = getattr(get_publisher(), '._http_adapter', None)
|
|
|
|
true_request = get_publisher().get_request()
|
|
wsgi_request = WSGIRequest({'REQUEST_METHOD': 'POST', 'wsgi.input': io.StringIO()})
|
|
fake_request = CompatHTTPRequest(wsgi_request)
|
|
fake_request.is_in_backoffice_forced_value = self.is_in_backoffice
|
|
try:
|
|
get_publisher()._set_request(fake_request)
|
|
fake_request.session = get_session_manager().new_session(None)
|
|
get_publisher().record_error = record_error
|
|
get_publisher()._http_adapter = MockWebserviceResponseAdapter(self)
|
|
yield
|
|
finally:
|
|
get_publisher()._set_request(true_request)
|
|
get_publisher().record_error = real_record_error
|
|
get_publisher()._http_adapter = real_http_adapter
|
|
|
|
def run(self, objectdef):
|
|
self.exception = None
|
|
self.sent_requests = []
|
|
self.used_webservice_responses = []
|
|
self.recorded_errors = []
|
|
self.missing_required_fields = []
|
|
with self.fake_request():
|
|
try:
|
|
self._run(objectdef)
|
|
except TestError as e:
|
|
if not self.expected_error:
|
|
raise e
|
|
|
|
if e.error != self.expected_error:
|
|
raise TestError(
|
|
_('Expected error "%(expected_error)s" but got error "%(error)s" instead.')
|
|
% {'expected_error': self.expected_error, 'error': e.error},
|
|
field_id=e.field_id,
|
|
)
|
|
else:
|
|
if self.expected_error:
|
|
raise TestError(
|
|
_('Expected error "%s" but test completed with success.') % self.expected_error
|
|
)
|
|
|
|
def _run(self, objectdef):
|
|
formdata = self.run_form_fill(objectdef)
|
|
if self.agent_id and self.workflow_tests.actions:
|
|
self.workflow_tests.run(formdata)
|
|
|
|
def run_form_fill(self, objectdef):
|
|
self.formdata = formdata = self.build_formdata(objectdef)
|
|
|
|
get_publisher().reset_formdata_state()
|
|
get_publisher().substitutions.feed(objectdef)
|
|
get_publisher().substitutions.feed(formdata)
|
|
|
|
self.form = Form(action='#')
|
|
|
|
fields = []
|
|
fields_by_page = {}
|
|
for field in objectdef.fields:
|
|
if field.key == 'page':
|
|
fields = fields_by_page[field] = []
|
|
continue
|
|
fields.append(field)
|
|
|
|
if not fields_by_page: # form without pages
|
|
fields_by_page[PageField()] = fields
|
|
|
|
previous_page = None
|
|
for i, (page, fields) in enumerate(fields_by_page.items(), 1):
|
|
page.index = i
|
|
|
|
if previous_page:
|
|
self.evaluate_page_conditions(previous_page, formdata, objectdef)
|
|
|
|
if page and not page.is_visible(formdata.data, objectdef):
|
|
fields_with_data = [
|
|
field for field in fields if self.data['fields'].get(field.id) is not None
|
|
]
|
|
if fields_with_data:
|
|
raise TestError(
|
|
_('Tried to fill field "%(label)s" on page %(no)d but page was not shown.')
|
|
% {'label': fields_with_data[0].label, 'no': page.index},
|
|
field_id=page.id,
|
|
)
|
|
continue
|
|
|
|
self.fill_page_fields(fields, page, formdata, objectdef)
|
|
previous_page = page
|
|
|
|
if previous_page: # evaluate last page post conditions
|
|
self.evaluate_page_conditions(previous_page, formdata, objectdef)
|
|
|
|
return formdata
|
|
|
|
def fill_page_fields(self, fields, page, formdata, objectdef):
|
|
self.handle_computed_fields(fields, formdata)
|
|
for field in fields:
|
|
if field.key in self.ignored_field_types:
|
|
continue
|
|
|
|
if not field.is_visible(formdata.data, objectdef):
|
|
if self.data['fields'].get(field.id) is not None:
|
|
raise TestError(
|
|
_('Tried to fill field "%(label)s" on page %(no)d but it is hidden.')
|
|
% {'label': field.label, 'no': page.index},
|
|
field_id=field.id,
|
|
)
|
|
continue
|
|
|
|
# make sure to never request remote data source
|
|
if getattr(field, 'data_source', None) and not field.data_source.get('type', '').startswith(
|
|
'carddef:'
|
|
):
|
|
field.data_source = None
|
|
field.had_data_source = True
|
|
elif hasattr(field, 'block'):
|
|
for x in field.block.fields:
|
|
if getattr(x, 'data_source', None) and not x.data_source.get('type', '').startswith(
|
|
'carddef:'
|
|
):
|
|
x.data_source = None
|
|
x.had_data_source = True
|
|
|
|
value = self.data['fields'].get(field.id)
|
|
if value is not None:
|
|
value = field.from_json_value(value)
|
|
|
|
self.run_widget_validation(field, value)
|
|
|
|
self.add_value_to_formdata(field, formdata, value)
|
|
|
|
get_publisher().substitutions.invalidate_cache()
|
|
|
|
self.handle_computed_fields(fields, formdata, exclude_frozen=True)
|
|
|
|
def add_value_to_formdata(self, field, formdata, value):
|
|
if field.key in ('item', 'items') and (field.data_source or hasattr(field, 'had_data_source')):
|
|
# add values without requesting data source
|
|
formdata.data[field.id] = value
|
|
for suffix in ('raw', 'display', 'structured'):
|
|
key = '%s_%s' % (field.id, suffix)
|
|
if key in self.data['fields']:
|
|
formdata.data[key] = self.data['fields'][key]
|
|
else:
|
|
field.set_value(formdata.data, value)
|
|
|
|
def evaluate_page_conditions(self, page, formdata, objectdef):
|
|
for post_condition in page.post_conditions or []:
|
|
condition = post_condition.get('condition', {})
|
|
try:
|
|
if not Field.evaluate_condition(formdata.data, objectdef, condition, record_errors=False):
|
|
raise TestError(
|
|
_('Page %(no)d post condition was not met (%(condition)s).')
|
|
% {'no': page.index, 'condition': condition.get('value')},
|
|
error=post_condition.get('error_message'),
|
|
field_id=page.id,
|
|
)
|
|
except RuntimeError:
|
|
raise TestError(
|
|
_('Failed to evaluate page %d post condition.') % page.index, field_id=page.id
|
|
)
|
|
|
|
def run_widget_validation(self, field, value):
|
|
widget = field.add_to_form(self.form)
|
|
|
|
if isinstance(widget, FileWithPreviewWidget):
|
|
widget.get_value_from_token = False
|
|
|
|
widget.set_value(value)
|
|
widget.transfer_form_value(get_publisher().get_request())
|
|
|
|
widget._parsed = False
|
|
widget.parse()
|
|
|
|
widget = TestDef.get_error_widget(widget, self)
|
|
if not widget:
|
|
return
|
|
|
|
field_label = _('"%s"') % field.label
|
|
|
|
if getattr(widget, 'is_subwidget', False):
|
|
value = widget.value
|
|
field = widget.field
|
|
field_label = _('"%(subfield)s" (of field %(field)s)') % {
|
|
'subfield': field.label,
|
|
'field': field_label,
|
|
}
|
|
|
|
if field.convert_value_to_str:
|
|
value = field.convert_value_to_str(value)
|
|
|
|
error_msg = _('Invalid value "%s"') % value if value else _('Empty value')
|
|
raise TestError(
|
|
_('%(error)s for field %(label)s: %(details)s')
|
|
% {
|
|
'error': error_msg,
|
|
'label': field_label,
|
|
'details': widget.error,
|
|
},
|
|
error=widget.error,
|
|
field_id=field.id,
|
|
)
|
|
|
|
def handle_computed_fields(self, fields, formdata, exclude_frozen=False):
|
|
for field in fields:
|
|
if field.key != 'computed':
|
|
continue
|
|
if exclude_frozen and field.freeze_on_initial_value:
|
|
continue
|
|
|
|
with get_publisher().complex_data():
|
|
try:
|
|
value = WorkflowStatusItem.compute(field.value_template, raises=True, allow_complex=True)
|
|
except TemplateError:
|
|
continue
|
|
else:
|
|
value = get_publisher().get_cached_complex_data(value)
|
|
|
|
if isinstance(value, str) and len(value) > 10000:
|
|
value = None
|
|
|
|
formdata.data[field.id] = value
|
|
get_publisher().substitutions.invalidate_cache()
|
|
|
|
@staticmethod
|
|
def widget_has_real_error(widget, testdef):
|
|
if widget.error == widget.REQUIRED_ERROR:
|
|
if testdef:
|
|
label = widget.block.name if hasattr(widget, 'block') else widget.field.label
|
|
testdef.missing_required_fields.append(label)
|
|
return False
|
|
|
|
ignore_invalid_selection = bool(
|
|
widget.error == get_selection_error_text()
|
|
and (widget.field.data_source or hasattr(widget.field, 'had_data_source'))
|
|
)
|
|
if ignore_invalid_selection:
|
|
return False
|
|
|
|
return True
|
|
|
|
@classmethod
|
|
def get_error_widget(cls, widget, testdef=None):
|
|
if not widget.has_error():
|
|
return
|
|
|
|
if widget.field.key == 'block' and (not widget.error or widget.error == widget.REQUIRED_ERROR):
|
|
widget.error = None
|
|
return cls.get_error_subwidget(widget, testdef)
|
|
|
|
if cls.widget_has_real_error(widget, testdef):
|
|
return widget
|
|
|
|
@classmethod
|
|
def get_error_subwidget(cls, widget, testdef):
|
|
for widget in widget.get_widgets():
|
|
widget.is_subwidget = True
|
|
|
|
if widget.error and cls.widget_has_real_error(widget, testdef):
|
|
return widget
|
|
|
|
if hasattr(widget, 'get_widgets'):
|
|
widget = TestDef.get_error_subwidget(widget, testdef)
|
|
if widget:
|
|
return widget
|
|
|
|
def export_to_xml(self, include_id=False):
|
|
self._webservice_responses = self.get_webservice_responses()
|
|
|
|
testdef_xml = TestDefXmlProxy()
|
|
for field, dummy in TestDefXmlProxy.XML_NODES: # pylint: disable=not-an-iterable
|
|
setattr(testdef_xml, field, getattr(self, field))
|
|
|
|
return testdef_xml.export_to_xml(include_id=include_id)
|
|
|
|
@classmethod
|
|
def import_from_xml(cls, fd, formdef, include_id=False):
|
|
try:
|
|
tree = ET.parse(fd)
|
|
except Exception:
|
|
raise ValueError
|
|
return cls.import_from_xml_tree(tree, formdef, include_id=include_id)
|
|
|
|
@classmethod
|
|
def import_from_xml_tree(cls, tree, formdef, include_id=False):
|
|
testdef = TestDef.create_from_formdata(formdef, formdef.data_class()())
|
|
|
|
testdef_xml = TestDefXmlProxy.import_from_xml_tree(tree, include_id)
|
|
for field, dummy in TestDefXmlProxy.XML_NODES: # pylint: disable=not-an-iterable
|
|
if hasattr(testdef_xml, field):
|
|
setattr(testdef, field, getattr(testdef_xml, field))
|
|
|
|
testdef.store()
|
|
|
|
for response in testdef._webservice_responses:
|
|
response.testdef_id = testdef.id
|
|
response.store()
|
|
|
|
return testdef
|
|
|
|
|
|
class TestResult(sql.TestResult):
|
|
_names = 'test_result'
|
|
|
|
object_type = None # (formdef, carddef, etc.)
|
|
object_id = None
|
|
timestamp = None
|
|
success = None
|
|
reason = None # reason for tests execution
|
|
results = None # results for each test associated to object
|
|
|
|
def get_admin_url(self):
|
|
base_url = get_publisher().get_backoffice_url()
|
|
objects_dir = 'forms' if self.object_type == 'formdefs' else 'cards'
|
|
return '%s/%s/%s/tests/results/%s/' % (base_url, objects_dir, self.object_id, self.id)
|
|
|
|
@classmethod
|
|
def clean(cls, publisher=None, **kwargs):
|
|
test_results_by_formdef = collections.defaultdict(list)
|
|
for test_result in cls.select(order_by='-timestamp'):
|
|
test_results_by_formdef[(test_result.object_id, test_result.object_type)].append(test_result)
|
|
|
|
deletion_timestamp_by_formdef = {}
|
|
for formdef_key, test_results in test_results_by_formdef.items():
|
|
success = False
|
|
test_results_count = 0
|
|
for test_result in test_results:
|
|
test_results_count += 1
|
|
|
|
if (
|
|
success
|
|
and test_results_count > 10
|
|
and test_result.timestamp < now() - datetime.timedelta(days=14)
|
|
):
|
|
break
|
|
|
|
success |= test_result.success
|
|
else:
|
|
continue
|
|
|
|
deletion_timestamp_by_formdef[formdef_key] = test_result.timestamp
|
|
|
|
for (object_id, object_type), deletion_timestamp in deletion_timestamp_by_formdef.items():
|
|
TestResult.wipe(
|
|
clause=[
|
|
sql.LessOrEqual('timestamp', deletion_timestamp),
|
|
sql.Equal('object_id', object_id),
|
|
sql.Equal('object_type', object_type),
|
|
]
|
|
)
|
|
|
|
|
|
class WebserviceResponseError(Exception):
|
|
pass
|
|
|
|
|
|
class MockWebserviceResponseAdapter(requests.adapters.HTTPAdapter):
|
|
def __init__(self, testdef, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self.testdef = testdef
|
|
|
|
def send(self, request, *args, **kwargs):
|
|
try:
|
|
return self._send(request, *args, **kwargs)
|
|
except WebserviceResponseError as e:
|
|
raise requests.RequestException(str(e))
|
|
except Exception as e:
|
|
# Webservice call can happen through templates which catch all exceptions.
|
|
# Record error to ensure we have a trace nonetheless.
|
|
get_publisher().record_error(
|
|
_('Unexpected error when mocking webservice call for url %(url)s: %(error)s.')
|
|
% {'url': request.url.split('?')[0], 'error': str(e)}
|
|
)
|
|
raise e
|
|
|
|
def _send(self, request, *args, **kwargs):
|
|
request_info = {
|
|
'url': request.url.split('?')[0],
|
|
'method': request.method,
|
|
'webservice_response_id': None,
|
|
'forbidden_method': False,
|
|
}
|
|
self.testdef.sent_requests.append(request_info)
|
|
|
|
for response in self.testdef.get_webservice_responses():
|
|
if response.is_configured() and response.match_request(request):
|
|
break
|
|
else:
|
|
if request.method != 'GET':
|
|
request_info['forbidden_method'] = True
|
|
raise WebserviceResponseError(str(_('method must be GET')))
|
|
return super().send(request, *args, **kwargs)
|
|
|
|
request_info['webservice_response_id'] = response.id
|
|
self.testdef.used_webservice_responses.append(response)
|
|
|
|
headers = {
|
|
'Content-Type': 'application/json',
|
|
}
|
|
|
|
raw_response = HTTPResponse(
|
|
status=response.status_code,
|
|
body=io.BytesIO(response.payload.encode()),
|
|
headers=headers,
|
|
original_response=self.make_original_response(headers),
|
|
preload_content=False,
|
|
)
|
|
|
|
return self.build_response(request, raw_response)
|
|
|
|
def make_original_response(self, headers):
|
|
dummy_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
original_response = http.client.HTTPResponse(sock=dummy_socket)
|
|
|
|
original_headers = http.client.HTTPMessage()
|
|
for k, v in headers.items():
|
|
original_headers.add_header(k, v)
|
|
original_response.msg = original_headers
|
|
|
|
return original_response
|
|
|
|
|
|
class WebserviceResponse(XmlStorableObject):
|
|
_names = 'webservice-response'
|
|
xml_root_node = 'webservice-response'
|
|
|
|
uuid = None
|
|
testdef_id = None
|
|
name = ''
|
|
payload = None
|
|
url = None
|
|
status_code = 200
|
|
qs_data = None
|
|
method = ''
|
|
post_data = None
|
|
|
|
XML_NODES = [
|
|
('uuid', 'str'),
|
|
('testdef_id', 'int'),
|
|
('name', 'str'),
|
|
('payload', 'str'),
|
|
('url', 'str'),
|
|
('status_code', 'int'),
|
|
('qs_data', 'kv_data'),
|
|
('method', 'str'),
|
|
('post_data', 'kv_data'),
|
|
]
|
|
|
|
def __init__(self, **kwargs):
|
|
super().__init__(**kwargs)
|
|
self.uuid = str(uuid.uuid4())
|
|
|
|
def __str__(self):
|
|
return self.name
|
|
|
|
def is_configured(self):
|
|
return self.payload is not None and self.url
|
|
|
|
def match_request(self, request):
|
|
if request.url.split('?')[0] != self.url:
|
|
return False
|
|
|
|
if self.method and request.method != self.method:
|
|
return False
|
|
|
|
parsed_url = urllib.parse.urlparse(request.url)
|
|
query_string = urllib.parse.parse_qs(parsed_url.query)
|
|
for param, value in (self.qs_data or {}).items():
|
|
if value not in query_string.get(param, []):
|
|
return False
|
|
|
|
try:
|
|
request_data = json.loads(request.body)
|
|
except (TypeError, ValueError):
|
|
request_data = {}
|
|
|
|
for param, value in (self.post_data or {}).items():
|
|
if request_data.get(param) != value:
|
|
return False
|
|
|
|
return True
|
|
|
|
def export_kv_data_to_xml(self, element, attribute_name, **kwargs):
|
|
for key, value in getattr(self, attribute_name).items():
|
|
item = ET.SubElement(element, 'item')
|
|
ET.SubElement(item, 'name').text = key
|
|
ET.SubElement(item, 'value').text = value
|
|
|
|
def import_kv_data_from_xml(self, element, **kwargs):
|
|
if element is None:
|
|
return
|
|
|
|
data = {}
|
|
for item in element.findall('item'):
|
|
key = item.find('name').text
|
|
value = item.find('value').text or ''
|
|
data[key] = value
|
|
|
|
return data
|