passerelle/passerelle/apps/soap/models.py

229 lines
9.1 KiB
Python

# passerelle - uniform access to multiple data sources and services
# Copyright (C) 2019 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import collections
import zeep
import zeep.exceptions
import zeep.helpers
import zeep.xsd
from django.db import models
from django.forms import ValidationError
from django.utils.functional import cached_property
from django.utils.translation import gettext_lazy as _
from zeep.wsse.username import UsernameToken
from passerelle.base.models import BaseResource, HTTPResource
from passerelle.utils.api import endpoint
from passerelle.utils.conversion import exception_to_text
from passerelle.utils.json import unflatten
from passerelle.utils.jsonresponse import APIError
class SOAPConnector(BaseResource, HTTPResource):
wsdl_url = models.URLField(
max_length=400, verbose_name=_('WSDL URL'), help_text=_('URL of the WSDL file')
)
zeep_strict = models.BooleanField(default=False, verbose_name=_('Be strict with returned XML'))
zeep_xsd_ignore_sequence_order = models.BooleanField(
default=True, verbose_name=_('Ignore sequence order')
)
zeep_wsse_username = models.CharField(
max_length=256, blank=True, default='', verbose_name=_('WSSE Username')
)
zeep_wsse_password = models.CharField(
max_length=256, blank=True, default='', verbose_name=_('WSSE Password')
)
category = _('Business Process Connectors')
class Meta:
verbose_name = _('SOAP connector')
def clean(self):
try:
self.operations_and_schemas
except Exception as e:
raise ValidationError(e)
@classmethod
def get_manager_form_class(cls, **kwargs):
exclude = kwargs.get('exclude')
form_class = super().get_manager_form_class(**kwargs)
fields = list(form_class.base_fields.items())
if exclude and 'slug' in exclude:
form_class.base_fields = collections.OrderedDict(fields[:2] + fields[-5:] + fields[2:-5])
else:
form_class.base_fields = collections.OrderedDict(fields[:3] + fields[-5:] + fields[3:-5])
return form_class
@cached_property
def client(self):
kwargs = {}
if self.zeep_wsse_username:
kwargs['wsse'] = UsernameToken(self.zeep_wsse_username, self.zeep_wsse_password)
return self.soap_client(
wsdl_url=self.wsdl_url,
settings=zeep.Settings(
strict=self.zeep_strict, xsd_ignore_sequence_order=self.zeep_xsd_ignore_sequence_order
),
api_error=True,
**kwargs,
)
@endpoint(
methods=['post'],
perm='can_access',
name='method',
pattern=r'^(?P<method_name>\w+)/$',
example_pattern='method_name/',
description_get=_('Call a SOAP method'),
description_post=_('Call a SOAP method'),
post_json_schema={'type': 'object'},
)
def method(self, request, method_name, post_data=None, **kwargs):
def jsonify(data):
if isinstance(data, (dict, collections.OrderedDict)):
# ignore _raw_elements, zeep put there nodes not maching the
# XSD when strict parsing is disabled.
return {
jsonify(k): jsonify(v)
for k, v in data.items()
if (self.zeep_strict or k != '_raw_elements')
}
elif isinstance(data, (list, tuple, collections.deque)):
return [jsonify(item) for item in data]
else:
return data
input_schema = self.type2schema(self.client.service._binding._operations[method_name].input.body.type)
payload = {}
if input_schema and input_schema.get('type') == 'object':
properties = list(input_schema.get('properties', []))
for k in request.GET:
if k.split('/')[0] not in properties:
continue
value = request.GET.getlist(k)
if len(value) > 1:
payload[k] = value
else:
payload[k] = value[0]
payload.update(post_data or {})
payload = unflatten(payload)
try:
soap_response = getattr(self.client.service, method_name)(**payload)
except zeep.exceptions.Fault as e:
fault_details = {}
for attrib in ['actor', 'code', 'message', 'subcode']:
fault_details[attrib] = getattr(e, attrib, None)
raise APIError('soap:Fault', data=fault_details)
except zeep.exceptions.ValidationError as e:
e.status_code = 400
raise e
serialized = zeep.helpers.serialize_object(soap_response)
json_response = jsonify(serialized)
return {'err': 0, 'data': json_response}
method.endpoint_info.methods.append('get')
def get_endpoints_infos(self):
endpoints = super().get_endpoints_infos()
try:
operations_and_schemas = self.operations_and_schemas
except Exception as e:
self.set_availability_status('down', message=exception_to_text(e)[:500])
return endpoints
for name, input_schema, output_schema in operations_and_schemas:
kwargs = {
'name': 'method',
'pattern': f'{name}/',
'example_pattern': f'{name}/',
'description': f'Method {name}',
'json_schema_response': {
'type': 'object',
'properties': collections.OrderedDict(
[
('err', {'type': 'integer'}),
('data', output_schema),
]
),
},
}
if input_schema:
kwargs['post_json_schema'] = input_schema
endpoints.append(endpoint(**kwargs))
endpoints[-1].object = self
endpoints[-1].func = lambda request: None
if input_schema and input_schema.get('properties'):
endpoints[-1].http_method = 'post'
else:
endpoints[-1].http_method = 'get'
return endpoints
@property
def operations_and_schemas(self):
operations = self.client.service._binding._operations
operations_and_schemas = []
for name in operations:
operation = operations[name]
input_type = operation.input.body.type
output_type = operation.output.body.type
input_schema = self.type2schema(input_type, keep_root=True)
output_schema = self.type2schema(output_type, compress=True)
operations_and_schemas.append((name, input_schema, output_schema))
return operations_and_schemas
def type2schema(self, xsd_type, keep_root=False, compress=False):
# simplify schema: when a type contains a unique element, it will try
# to match any dict or list with it on input and will flatten the
# schema on output.
if (
isinstance(xsd_type, zeep.xsd.ComplexType)
and len(xsd_type.elements) == 1
and not keep_root
and compress
):
if xsd_type.elements[0][1].max_occurs != 1:
schema = {
'type': 'array',
'items': self.type2schema(xsd_type.elements[0][1].type, compress=compress),
}
else:
schema = self.type2schema(xsd_type.elements[0][1].type, compress=compress)
elif isinstance(xsd_type, zeep.xsd.ComplexType):
properties = collections.OrderedDict()
schema = {
'type': 'object',
'properties': properties,
}
for key, element in xsd_type.elements:
if element.min_occurs > 0:
schema.setdefault('required', []).append(key)
element_schema = self.type2schema(element.type, compress=compress)
if element.max_occurs == 'unbounded' or element.max_occurs > 1:
element_schema = {'type': 'array', 'items': element_schema}
properties[key] = element_schema
if not properties:
schema = {'type': 'null'}
elif isinstance(xsd_type, zeep.xsd.BuiltinType):
schema = {'type': 'string'}
else:
schema = {}
if xsd_type.qname:
schema['description'] = str(xsd_type.qname).replace('{http://www.w3.org/2001/XMLSchema}', 'xsd:')
return schema