passerelle/passerelle/contrib/utils/axel.py

264 lines
8.6 KiB
Python

# passerelle - uniform access to multiple data sources and services
# Copyright (C) 2021 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import copy
import datetime
import os
import re
import xml.etree.ElementTree as ET
from collections import namedtuple
import pytz
import xmlschema
from django.utils.encoding import force_str
from passerelle.utils.xml import JSONSchemaFromXMLSchema
boolean_type = {
'oneOf': [
{'type': 'boolean'},
{
'type': 'string',
'pattern': '[Oo][Uu][Ii]|[Nn][Oo][Nn]|[Tt][Rr][Uu][Ee]|[Ff][Aa][Ll][Ss][Ee]|1|0',
},
]
}
date_type = {
'type': 'string',
'pattern': '[0-9]{4}-[0-9]{2}-[0-9]{2}',
}
datetime_type = {
'type': 'string',
'pattern': '[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}',
}
json_date_format = '%Y-%m-%d'
json_datetime_format = '%Y-%m-%dT%H:%M:%S'
xml_date_format = '%d/%m/%Y'
xml_datetime_format = '%d/%m/%Y %H:%M:%S'
def indent(tree, space=" ", level=0):
# backport from Lib/xml/etree/ElementTree.py python 3.9
if isinstance(tree, ET.ElementTree):
tree = tree.getroot()
if level < 0:
raise ValueError("Initial indentation level must be >= 0, got {level}".format(level=level))
if len(tree) == 0:
return
# Reduce the memory consumption by reusing indentation strings.
indentations = ["\n" + level * space]
def _indent_children(elem, level):
# Start a new indentation level for the first child.
child_level = level + 1
try:
child_indentation = indentations[child_level]
except IndexError:
child_indentation = indentations[level] + space
indentations.append(child_indentation)
if not elem.text or not elem.text.strip():
elem.text = child_indentation
for child in elem:
if len(child):
_indent_children(child, child_level)
if not child.tail or not child.tail.strip():
child.tail = child_indentation
# Dedent after the last child by overwriting the previous indentation.
if not child.tail.strip():
child.tail = indentations[level]
_indent_children(tree, 0)
def encode_bool(obj):
if obj is True or str(obj).lower() in ['true', 'oui', '1']:
return 'OUI'
if obj is False or str(obj).lower() in ['false', 'non', '0']:
return 'NON'
return obj
def parse_datetime(value):
try:
dt = datetime.datetime.strptime(value, json_datetime_format)
except ValueError:
return None
return pytz.utc.localize(dt)
def encode_datetime(dt):
return dt.astimezone(pytz.timezone('Europe/Paris')).strftime(xml_datetime_format)
class AxelError(Exception):
def __init__(self, message, xml_request=None, xml_response=None, *args):
self.message = message
self.xml_request = xml_request
self.xml_response = xml_response
super().__init__(message, *args)
def __str__(self):
return self.message
class AxelSchema(JSONSchemaFromXMLSchema):
type_map = {
'{urn:AllAxelTypes}DATEREQUIREDType': 'date',
'{urn:AllAxelTypes}DATEType': 'date_optional',
'{urn:AllAxelTypes}OUINONREQUIREDType': 'bool',
'{urn:AllAxelTypes}OUINONType': 'bool_optional',
}
@classmethod
def schema_date(cls):
return {
'type': 'string',
'pattern': '[0-9]{4}-[0-9]{2}-[0-9]{2}',
}
def encode_date(self, obj):
try:
return datetime.datetime.strptime(obj, json_date_format).strftime(xml_date_format)
except ValueError:
return obj
def encode_date_optional(self, obj):
if not obj:
return obj
return self.encode_date(obj)
def decode_date(self, data):
value = datetime.datetime.strptime(data.text, xml_date_format).strftime(json_date_format)
return xmlschema.ElementData(
tag=data.tag, text=value, content=data.content, attributes=data.attributes
)
def decode_date_optional(self, data):
if not data.text:
return data
return self.decode_date(data)
@classmethod
def schema_bool(cls):
return copy.deepcopy(boolean_type)
def encode_bool(self, obj):
return encode_bool(obj)
def decode_bool(self, data):
value = False
if data.text.lower() == 'oui':
value = True
return xmlschema.ElementData(
tag=data.tag, text=value, content=data.content, attributes=data.attributes
)
@classmethod
def schema_bool_optional(cls):
schema_bool_optional = cls.schema_bool()
schema_bool_optional['oneOf'].append({'type': 'string', 'enum': ['']})
return schema_bool_optional
def encode_bool_optional(self, obj):
return self.encode_bool(obj)
def decode_bool_optional(self, data):
if not data.text:
return data
return self.decode_bool(data)
def xml_schema_converter(axel_schema, base_xsd_path, name, root_element):
xsd_path = os.path.join(base_xsd_path, name)
if not os.path.exists(xsd_path):
return None
return axel_schema(xsd_path, root_element)
OperationResult = namedtuple('OperationResult', ['json_response', 'xml_request', 'xml_response'])
class Operation:
base_xsd_path = None
default_prefix = ''
axel_schema = AxelSchema
def __init__(self, operation, prefix=None, request_root_element='PORTAIL', data_method='getData'):
if prefix is None:
prefix = self.default_prefix
self.operation = operation
self.request_converter = xml_schema_converter(
self.axel_schema, self.base_xsd_path, '%sQ_%s.xsd' % (prefix, operation), request_root_element
)
self.response_converter = xml_schema_converter(
self.axel_schema, self.base_xsd_path, '%sR_%s.xsd' % (prefix, operation), 'PORTAILSERVICE'
)
self.name = re.sub(
'(.?)([A-Z])', lambda s: s.group(1) + ('-' if s.group(1) else '') + s.group(2).lower(), operation
)
self.snake_name = self.name.replace('-', '_')
self.data_method = data_method
@property
def request_schema(self):
schema = self.request_converter.json_schema
schema['flatten'] = True
schema['merge_extra'] = True
return schema
def __call__(self, resource, request_data=None):
client = resource.soap_client(api_error=True)
serialized_request = ''
if self.request_converter:
try:
serialized_request = self.request_converter.encode(request_data)
except xmlschema.XMLSchemaValidationError as e:
raise AxelError('invalid request %s' % str(e))
indent(serialized_request)
serialized_request = force_str(ET.tostring(serialized_request))
try:
self.request_converter.xml_schema.validate(serialized_request)
except xmlschema.XMLSchemaValidationError as e:
raise AxelError('invalid request %s' % str(e), xml_request=serialized_request)
result = getattr(client.service, self.data_method)(
self.operation, serialized_request, ''
) # FIXME: What is the user parameter for ?
xml_result = ET.fromstring(result.encode('utf-8'))
indent(xml_result)
pretty_result = force_str(ET.tostring(xml_result))
if xml_result.find('RESULTAT/STATUS').text != 'OK':
msg = xml_result.find('RESULTAT/COMMENTAIRES').text
raise AxelError(msg, xml_request=serialized_request, xml_response=pretty_result)
try:
return OperationResult(
json_response=self.response_converter.decode(xml_result),
xml_request=serialized_request,
xml_response=pretty_result,
)
except xmlschema.XMLSchemaValidationError as e:
raise AxelError(
'invalid response %s' % str(e), xml_request=serialized_request, xml_response=pretty_result
)