wcs/wcs/testdef.py

807 lines
28 KiB
Python

# w.c.s. - web application for online forms
# Copyright (C) 2005-2022 Entr'ouvert
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import collections
import copy
import datetime
import http
import io
import json
import socket
import urllib.parse
import uuid
import xml.etree.ElementTree as ET
from contextlib import contextmanager
import requests
from django.core.handlers.wsgi import WSGIRequest
from django.utils.timezone import now
from quixote import get_publisher, get_session_manager
from urllib3 import HTTPResponse
from wcs import sql
from wcs.carddef import CardDef
from wcs.compat import CompatHTTPRequest
from wcs.fields import Field, PageField
from wcs.formdef import FormDef
from wcs.qommon.form import FileWithPreviewWidget, Form, get_selection_error_text
from wcs.qommon.storage import Equal
from wcs.qommon.template import TemplateError
from wcs.qommon.xml_storage import XmlStorableObject
from wcs.sql_criterias import NotNull
from wcs.workflows import WorkflowStatusItem
from .qommon import _
class TestError(Exception):
action_uuid = None
def __init__(self, msg, error=None, details=None, field_id=None):
self.msg = msg
self.error = error or msg
self.details = details or []
self.field_id = field_id
# prevent pytest from trying to collect this class (#75521)
__test__ = False
class TestDefXmlProxy(XmlStorableObject):
xml_root_node = 'testdef'
_names = 'testdef'
readonly = True
# prevent pytest from trying to collect this class
__test__ = False
_webservice_responses = []
@classmethod
@property
def XML_NODES(cls):
json_to_xml_types = {
'varchar': 'str',
'boolean': 'bool',
'jsonb': 'jsonb',
}
excluded_fields = ['id']
extra_fields = [
('_webservice_responses', 'webservice_responses'),
('workflow_tests', 'workflow_tests'),
]
return [
(field, json_to_xml_types[kind])
for field, kind in sql.TestDef._table_static_fields
if field not in excluded_fields
] + extra_fields
def export_jsonb_to_xml(self, element, attribute_name, **kwargs):
element.text = json.dumps(getattr(self, attribute_name), indent=2)
def import_jsonb_from_xml(self, element, **kwargs):
return json.loads(element.text)
def export_workflow_tests_to_xml(self, element, attribute_name, include_id=False):
workflow_tests = self.workflow_tests.export_to_xml(include_id=include_id)
if include_id:
element.set('id', workflow_tests.get('id'))
for subelement in workflow_tests:
element.append(subelement)
def import_workflow_tests_from_xml(self, element, include_id=False):
from wcs.workflow_tests import WorkflowTests
return WorkflowTests.import_from_xml_tree(element, include_id=include_id)
def export_webservice_responses_to_xml(self, element, attribute_name, include_id=False):
for response in self._webservice_responses:
element.append(response.export_to_xml(include_id=include_id))
def import_webservice_responses_from_xml(self, element, include_id=False):
return [
WebserviceResponse.import_from_xml_tree(response, include_id=include_id) for response in element
]
class TestDef(sql.TestDef):
_names = 'testdef'
name = ''
object_type = None # (formdef, carddef, etc.)
object_id = None
data = None # (json export of formdata, carddata, etc.)
is_in_backoffice = False
expected_error = None
user_uuid = None
agent_id = None
ignored_field_types = (
'subtitle',
'title',
'comment',
'computed',
'table',
'table-select',
'tablerows',
'ranked-items',
)
backoffice_class = 'wcs.admin.tests.TestPage'
xml_root_node = TestDefXmlProxy.xml_root_node
get_table_name = TestDefXmlProxy.get_table_name
is_readonly = TestDefXmlProxy.is_readonly
def __str__(self):
return self.name
@property
def workflow_tests(self):
from wcs.workflow_tests import WorkflowTests
if hasattr(self, '_workflow_tests'):
return self._workflow_tests
workflow_tests_list = WorkflowTests.select([Equal('testdef_id', self.id)])
self.workflow_tests = workflow_tests_list[0] if workflow_tests_list else WorkflowTests()
return self._workflow_tests
@workflow_tests.setter
def workflow_tests(self, value):
self._workflow_tests = value
self._workflow_tests.testdef = self
def get_webservice_responses(self):
if hasattr(self, '_webservice_responses'):
# this attribute is set by import/export, and should be used in snapshot context
return self._webservice_responses
return WebserviceResponse.select([Equal('testdef_id', self.id)], order_by='name')
def get_admin_url(self):
base_url = get_publisher().get_backoffice_url()
objects_dir = 'forms' if self.object_type == 'formdefs' else 'cards'
return '%s/%s/%s/tests/%s/' % (base_url, objects_dir, self.object_id, self.id)
def store(self, comment=None):
super().store()
self.workflow_tests.testdef_id = self.id
self.workflow_tests.store()
if hasattr(self, '_webservice_responses'):
# first store after import, attach webservice responses and delete old ones on snapshot restore
response_ids = {x.id for x in self._webservice_responses}
for response in WebserviceResponse.select([Equal('testdef_id', self.id)]):
if response.id not in response_ids:
response.remove_self()
for response in self._webservice_responses:
response.testdef_id = self.id
response.store()
del self._webservice_responses
if get_publisher().snapshot_class:
get_publisher().snapshot_class.snap(instance=self, comment=comment)
@classmethod
def remove_object(cls, id):
super().remove_object(id)
from wcs.workflow_tests import WorkflowTests
workflow_tests_list = WorkflowTests.select([Equal('testdef_id', id)])
for workflow_tests in workflow_tests_list:
workflow_tests.remove_self()
responses = WebserviceResponse.select([Equal('testdef_id', id)])
for response in responses:
response.remove_self()
@classmethod
def select_for_objectdef(cls, objectdef):
return cls.select(
[Equal('object_type', objectdef.get_table_name()), Equal('object_id', objectdef.id)]
)
@staticmethod
def get_or_create_test_user(user):
users = get_publisher().user_class.select([Equal('email', user.email), NotNull('test_uuid')])
if len(users):
return users[0], False
user.id = None
user.test_uuid = str(uuid.uuid4())
user.store()
return user, True
@classmethod
def create_from_formdata(cls, formdef, formdata, add_workflow_tests=False):
testdef = cls()
testdef.object_type = formdef.get_table_name()
testdef.object_id = formdef.id
testdef.is_in_backoffice = formdata.backoffice_submission
field_data = {}
for field in formdef.fields:
if field.key in cls.ignored_field_types:
continue
if field.id in formdata.data:
value = formdata.data[field.id]
if value and hasattr(field, 'get_json_value'):
value = field.get_json_value(value)
field_data[field.id] = value
for suffix in ('raw', 'display', 'structured'):
key = '%s_%s' % (field.id, suffix)
if key in formdata.data:
field_data[key] = formdata.data[key]
testdef.data = {
'fields': field_data,
}
if formdata.user:
user, dummy = cls.get_or_create_test_user(copy.deepcopy(formdata.user))
testdef.user_uuid = user.test_uuid
if add_workflow_tests:
testdef.workflow_tests.add_actions_from_formdata(formdata)
return testdef
def build_formdata(self, objectdef, include_fields=False):
formdata = objectdef.data_class()()
formdata.just_created()
formdata.backoffice_submission = self.is_in_backoffice
formdata.workflow_traces = []
if self.user_uuid:
try:
user = get_publisher().user_class.select([Equal('test_uuid', self.user_uuid)])[0]
except IndexError:
pass
else:
formdata.user_id = user.id
if include_fields:
for field in objectdef.fields:
if field.id not in self.data['fields']:
continue
value = self.data['fields'].get(field.id)
if value is not None:
value = field.from_json_value(value)
self.add_value_to_formdata(field, formdata, value)
return formdata
@contextmanager
def fake_request(self):
def record_error(error_summary=None, exception=None, *args, **kwargs):
self.recorded_errors.append(str(error_summary or exception))
real_record_error = get_publisher().record_error
real_http_adapter = getattr(get_publisher(), '._http_adapter', None)
true_request = get_publisher().get_request()
wsgi_request = WSGIRequest({'REQUEST_METHOD': 'POST', 'wsgi.input': io.StringIO()})
fake_request = CompatHTTPRequest(wsgi_request)
fake_request.is_in_backoffice_forced_value = self.is_in_backoffice
try:
get_publisher()._set_request(fake_request)
fake_request.session = get_session_manager().new_session(None)
get_publisher().record_error = record_error
get_publisher()._http_adapter = MockWebserviceResponseAdapter(self)
yield
finally:
get_publisher()._set_request(true_request)
get_publisher().record_error = real_record_error
get_publisher()._http_adapter = real_http_adapter
def run(self, objectdef):
self.exception = None
self.sent_requests = []
self.used_webservice_responses = []
self.recorded_errors = []
self.missing_required_fields = []
with self.fake_request():
try:
self._run(objectdef)
except TestError as e:
if not self.expected_error:
raise e
if e.error != self.expected_error:
raise TestError(
_('Expected error "%(expected_error)s" but got error "%(error)s" instead.')
% {'expected_error': self.expected_error, 'error': e.error},
field_id=e.field_id,
)
else:
if self.expected_error:
raise TestError(
_('Expected error "%s" but test completed with success.') % self.expected_error
)
def _run(self, objectdef):
formdata = self.run_form_fill(objectdef)
if self.workflow_tests.actions:
self.workflow_tests.run(formdata)
def run_form_fill(self, objectdef):
self.formdata = formdata = self.build_formdata(objectdef)
get_publisher().reset_formdata_state()
get_publisher().substitutions.feed(objectdef)
get_publisher().substitutions.feed(formdata)
self.form = Form(action='#')
fields = []
fields_by_page = {}
for field in objectdef.fields:
if field.key == 'page':
fields = fields_by_page[field] = []
continue
fields.append(field)
if not fields_by_page: # form without pages
fields_by_page[PageField()] = fields
previous_page = None
for i, (page, fields) in enumerate(fields_by_page.items(), 1):
page.index = i
if previous_page:
self.evaluate_page_conditions(previous_page, formdata, objectdef)
if page and not page.is_visible(formdata.data, objectdef):
fields_with_data = [
field for field in fields if self.data['fields'].get(field.id) is not None
]
if fields_with_data:
raise TestError(
_('Tried to fill field "%(label)s" on page %(no)d but page was not shown.')
% {'label': fields_with_data[0].label, 'no': page.index},
field_id=page.id,
)
continue
self.fill_page_fields(fields, page, formdata, objectdef)
previous_page = page
if previous_page: # evaluate last page post conditions
self.evaluate_page_conditions(previous_page, formdata, objectdef)
return formdata
def fill_page_fields(self, fields, page, formdata, objectdef):
self.handle_computed_fields(fields, formdata)
for field in fields:
if field.key in self.ignored_field_types:
continue
if not field.is_visible(formdata.data, objectdef):
if self.data['fields'].get(field.id) is not None:
raise TestError(
_('Tried to fill field "%(label)s" on page %(no)d but it is hidden.')
% {'label': field.label, 'no': page.index},
field_id=field.id,
)
continue
# make sure to never request remote data source
if getattr(field, 'data_source', None) and not field.data_source.get('type', '').startswith(
'carddef:'
):
field.data_source = None
field.had_data_source = True
elif hasattr(field, 'block'):
for x in field.block.fields:
if getattr(x, 'data_source', None) and not x.data_source.get('type', '').startswith(
'carddef:'
):
x.data_source = None
x.had_data_source = True
value = self.data['fields'].get(field.id)
if value is not None:
value = field.from_json_value(value)
self.run_widget_validation(field, value)
self.add_value_to_formdata(field, formdata, value)
get_publisher().substitutions.invalidate_cache()
self.handle_computed_fields(fields, formdata, exclude_frozen=True)
def add_value_to_formdata(self, field, formdata, value):
if field.key in ('item', 'items') and (field.data_source or hasattr(field, 'had_data_source')):
# add values without requesting data source
formdata.data[field.id] = value
for suffix in ('raw', 'display', 'structured'):
key = '%s_%s' % (field.id, suffix)
if key in self.data['fields']:
formdata.data[key] = self.data['fields'][key]
else:
field.set_value(formdata.data, value)
def evaluate_page_conditions(self, page, formdata, objectdef):
for post_condition in page.post_conditions or []:
condition = post_condition.get('condition', {})
try:
if not Field.evaluate_condition(formdata.data, objectdef, condition, record_errors=False):
raise TestError(
_('Page %(no)d post condition was not met (%(condition)s).')
% {'no': page.index, 'condition': condition.get('value')},
error=post_condition.get('error_message'),
field_id=page.id,
)
except RuntimeError:
raise TestError(
_('Failed to evaluate page %d post condition.') % page.index, field_id=page.id
)
def run_widget_validation(self, field, value):
widget = field.add_to_form(self.form)
if isinstance(widget, FileWithPreviewWidget):
widget.get_value_from_token = False
widget.set_value(value)
widget.transfer_form_value(get_publisher().get_request())
widget._parsed = False
widget.parse()
widget = TestDef.get_error_widget(widget, self)
if not widget:
return
field_label = _('"%s"') % field.label
if getattr(widget, 'is_subwidget', False):
value = widget.value
field = widget.field
field_label = _('"%(subfield)s" (of field %(field)s)') % {
'subfield': field.label,
'field': field_label,
}
if field.convert_value_to_str:
value = field.convert_value_to_str(value)
error_msg = _('Invalid value "%s"') % value if value else _('Empty value')
raise TestError(
_('%(error)s for field %(label)s: %(details)s')
% {
'error': error_msg,
'label': field_label,
'details': widget.error,
},
error=widget.error,
field_id=field.id,
)
def handle_computed_fields(self, fields, formdata, exclude_frozen=False):
for field in fields:
if field.key != 'computed':
continue
if exclude_frozen and field.freeze_on_initial_value:
continue
with get_publisher().complex_data():
try:
value = WorkflowStatusItem.compute(field.value_template, raises=True, allow_complex=True)
except TemplateError:
continue
else:
value = get_publisher().get_cached_complex_data(value)
if isinstance(value, str) and len(value) > 10000:
value = None
formdata.data[field.id] = value
get_publisher().substitutions.invalidate_cache()
@staticmethod
def widget_has_real_error(widget, testdef):
if widget.error == widget.REQUIRED_ERROR:
if testdef:
label = widget.block.name if hasattr(widget, 'block') else widget.field.label
testdef.missing_required_fields.append(label)
return False
ignore_invalid_selection = bool(
widget.error == get_selection_error_text()
and (widget.field.data_source or hasattr(widget.field, 'had_data_source'))
)
if ignore_invalid_selection:
return False
return True
@classmethod
def get_error_widget(cls, widget, testdef=None):
if not widget.has_error():
return
if widget.field.key == 'block' and (not widget.error or widget.error == widget.REQUIRED_ERROR):
widget.error = None
return cls.get_error_subwidget(widget, testdef)
if cls.widget_has_real_error(widget, testdef):
return widget
@classmethod
def get_error_subwidget(cls, widget, testdef):
for widget in widget.get_widgets():
widget.is_subwidget = True
if widget.error and cls.widget_has_real_error(widget, testdef):
return widget
if hasattr(widget, 'get_widgets'):
widget = TestDef.get_error_subwidget(widget, testdef)
if widget:
return widget
def export_to_xml(self, include_id=False):
testdef_xml = TestDefXmlProxy(id=str(self.id))
for field, dummy in TestDefXmlProxy.XML_NODES: # pylint: disable=not-an-iterable
if field == '_webservice_responses':
testdef_xml._webservice_responses = self.get_webservice_responses()
else:
setattr(testdef_xml, field, getattr(self, field))
return testdef_xml.export_to_xml(include_id=include_id)
@classmethod
def import_from_xml(cls, fd, formdef, include_id=False):
try:
tree = ET.parse(fd)
except Exception:
raise ValueError
return cls.import_from_xml_tree(tree, formdef, include_id=include_id)
@classmethod
def import_from_xml_tree(cls, tree, formdef=None, include_id=False, **kwargs):
testdef_xml = TestDefXmlProxy.import_from_xml_tree(tree, include_id)
if not formdef:
klass = FormDef if testdef_xml.object_type == 'formdefs' else CardDef
formdef = klass.get(testdef_xml.object_id)
testdef = TestDef.create_from_formdata(formdef, formdef.data_class()())
testdef.id = int(testdef_xml.id) if testdef_xml.id else None
for field, dummy in TestDefXmlProxy.XML_NODES: # pylint: disable=not-an-iterable
if field in ('object_type', 'object_id'):
continue
if hasattr(testdef_xml, field):
setattr(testdef, field, getattr(testdef_xml, field))
return testdef
class TestResult(sql.TestResult):
_names = 'test_result'
object_type = None # (formdef, carddef, etc.)
object_id = None
timestamp = None
success = None
reason = None # reason for tests execution
results = None # results for each test associated to object
def get_admin_url(self):
base_url = get_publisher().get_backoffice_url()
objects_dir = 'forms' if self.object_type == 'formdefs' else 'cards'
return '%s/%s/%s/tests/results/%s/' % (base_url, objects_dir, self.object_id, self.id)
@classmethod
def clean(cls, publisher=None, **kwargs):
test_results_by_formdef = collections.defaultdict(list)
for test_result in cls.select(order_by='-timestamp'):
test_results_by_formdef[(test_result.object_id, test_result.object_type)].append(test_result)
deletion_timestamp_by_formdef = {}
for formdef_key, test_results in test_results_by_formdef.items():
success = False
test_results_count = 0
for test_result in test_results:
test_results_count += 1
if (
success
and test_results_count > 10
and test_result.timestamp < now() - datetime.timedelta(days=14)
):
break
success |= test_result.success
else:
continue
deletion_timestamp_by_formdef[formdef_key] = test_result.timestamp
for (object_id, object_type), deletion_timestamp in deletion_timestamp_by_formdef.items():
TestResult.wipe(
clause=[
sql.LessOrEqual('timestamp', deletion_timestamp),
sql.Equal('object_id', object_id),
sql.Equal('object_type', object_type),
]
)
class WebserviceResponseError(Exception):
pass
class MockWebserviceResponseAdapter(requests.adapters.HTTPAdapter):
def __init__(self, testdef, *args, **kwargs):
super().__init__(*args, **kwargs)
self.testdef = testdef
def send(self, request, *args, **kwargs):
try:
return self._send(request, *args, **kwargs)
except WebserviceResponseError as e:
raise requests.RequestException(str(e))
except Exception as e:
# Webservice call can happen through templates which catch all exceptions.
# Record error to ensure we have a trace nonetheless.
get_publisher().record_error(
_('Unexpected error when mocking webservice call for url %(url)s: %(error)s.')
% {'url': request.url.split('?')[0], 'error': str(e)}
)
raise e
def _send(self, request, *args, **kwargs):
request_info = {
'url': request.url.split('?')[0],
'method': request.method,
'webservice_response_id': None,
'forbidden_method': False,
}
self.testdef.sent_requests.append(request_info)
for response in self.testdef.get_webservice_responses():
if response.is_configured() and response.match_request(request):
break
else:
if request.method != 'GET':
request_info['forbidden_method'] = True
raise WebserviceResponseError(str(_('method must be GET')))
return super().send(request, *args, **kwargs)
request_info['webservice_response_id'] = response.id
self.testdef.used_webservice_responses.append(response)
headers = {
'Content-Type': 'application/json',
}
raw_response = HTTPResponse(
status=response.status_code,
body=io.BytesIO(response.payload.encode()),
headers=headers,
original_response=self.make_original_response(headers),
preload_content=False,
)
return self.build_response(request, raw_response)
def make_original_response(self, headers):
dummy_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
original_response = http.client.HTTPResponse(sock=dummy_socket)
original_headers = http.client.HTTPMessage()
for k, v in headers.items():
original_headers.add_header(k, v)
original_response.msg = original_headers
return original_response
class WebserviceResponse(XmlStorableObject):
_names = 'webservice-response'
xml_root_node = 'webservice-response'
uuid = None
testdef_id = None
name = ''
payload = None
url = None
status_code = 200
qs_data = None
method = ''
post_data = None
XML_NODES = [
('uuid', 'str'),
('testdef_id', 'int'),
('name', 'str'),
('payload', 'str'),
('url', 'str'),
('status_code', 'int'),
('qs_data', 'kv_data'),
('method', 'str'),
('post_data', 'kv_data'),
]
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.uuid = str(uuid.uuid4())
def __str__(self):
return self.name
def is_configured(self):
return self.payload is not None and self.url
def match_request(self, request):
if request.url.split('?')[0] != self.url:
return False
if self.method and request.method != self.method:
return False
parsed_url = urllib.parse.urlparse(request.url)
query_string = urllib.parse.parse_qs(parsed_url.query)
for param, value in (self.qs_data or {}).items():
if value not in query_string.get(param, []):
return False
try:
request_data = json.loads(request.body)
except (TypeError, ValueError):
request_data = {}
for param, value in (self.post_data or {}).items():
if request_data.get(param) != value:
return False
return True
def export_kv_data_to_xml(self, element, attribute_name, **kwargs):
for key, value in getattr(self, attribute_name).items():
item = ET.SubElement(element, 'item')
ET.SubElement(item, 'name').text = key
ET.SubElement(item, 'value').text = value
def import_kv_data_from_xml(self, element, **kwargs):
if element is None:
return
data = {}
for item in element.findall('item'):
key = item.find('name').text
value = item.find('value').text or ''
data[key] = value
return data