# passerelle - uniform access to multiple data sources and services # Copyright (C) 2018 Entr'ouvert # # This program is free software: you can redistribute it and/or modify it # under the terms of the GNU Affero General Public License as published # by the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import copy from collections import OrderedDict import jsonschema from .logging import ignore_loggers with ignore_loggers('xmlschema'): import xmlschema try: import xmlschema.names as xmlschema_names # xmschema >= 1.1 except ModuleNotFoundError: import xmlschema.qnames as xmlschema_names # xmschema < 1.1 def text_content(node): """Extract text content from node and all its children. Equivalent to xmlNodeGetContent from libxml.""" if node is None: return '' def helper(node): s = [] if node.text: s.append(node.text) for child in node: s.extend(helper(child)) if child.tail: s.append(child.tail) return s return ''.join(helper(node)) def to_json(root): """Convert an XML document (a rooted tree) into dictionnary compatible with JSON serialization following those rules: - root is converted into a dictionnary, its children's node name are the keys, - all child nodes without child are considered to be only text and converted to a JSON string, - all child nodes with children are converted to an array with they children as root of a new conversion from XML to JSON. Ex.: wtv 2 3 is converted to: { "child1": "wtv", "rows": [ {"child2": "2"}, {"child3": "3"} ] }""" d = {} for child in root: if len(child) == 0: # text node value = text_content(child) if value: d[child.tag] = value else: values = d.setdefault(child.tag, []) for row in child: if len(row) == 0 and row.text: child_content = row.text else: child_content = to_json(row) if child_content: values.append(child_content) return d class TransformConverter(xmlschema.UnorderedConverter): def __init__(self, *args, **kwargs): self.transformer = kwargs.pop('transformer', None) super().__init__(*args, **kwargs) def copy(self, **kwargs): transformer = kwargs.pop('transformer', self.transformer) new = super().copy(**kwargs) new.transformer = transformer return new def element_decode(self, data, xsd_element, *args): if self.transformer: if xsd_element.type.name in getattr(self.transformer, 'type_map', {}): mapped = self.transformer.type_map[xsd_element.type.name] if hasattr(self.transformer, 'decode_%s' % mapped): data = getattr(self.transformer, 'decode_%s' % mapped)(data) return super().element_decode(data, xsd_element, *args) def element_encode(self, obj, xsd_element, *args): if self.transformer: if xsd_element.type.name in getattr(self.transformer, 'type_map', {}): mapped = self.transformer.type_map[xsd_element.type.name] if hasattr(self.transformer, 'encode_%s' % mapped): obj = getattr(self.transformer, 'encode_%s' % mapped)(obj) return super().element_encode(obj, xsd_element, *args) class JSONSchemaFromXMLSchema: SIMPLE_TYPE_MAPPING = { xmlschema_names.XSD_STRING: 'string', xmlschema_names.XSD_INTEGER: 'integer', xmlschema_names.XSD_INT: 'integer', xmlschema_names.XSD_POSITIVE_INTEGER: 'integer', xmlschema_names.XSD_UNSIGNED_INT: 'integer', xmlschema_names.XSD_NON_NEGATIVE_INTEGER: 'integer', xmlschema_names.XSD_BOOLEAN: 'boolean', xmlschema_names.XSD_DOUBLE: 'number', xmlschema_names.XSD_DECIMAL: 'number', } def __init__(self, xml_schema, root_element): if not isinstance(xml_schema, xmlschema.XMLSchema): with ignore_loggers('xmlschema'): xml_schema = xmlschema.XMLSchema(xml_schema, converter=TransformConverter(transformer=self)) self.xml_schema = xml_schema self.root_element = root_element self.json_schema = { 'type': 'object', 'properties': { root_element: self.element_to_jsonschema(xml_schema.elements[root_element]), }, 'required': [root_element], 'additionalProperties': False, } @classmethod def simpletype_to_jsonschema(cls, simple_type): assert isinstance(simple_type, xmlschema.validators.XsdSimpleType) def add_patterns(): if simple_type.patterns: if len(simple_type.patterns) == 1: schema['pattern'] = simple_type.patterns.regexps[0] else: schema['pattern'] = '|'.join(simple_type.patterns.regexps) if simple_type.name in getattr(cls, 'type_map', {}): mapped = cls.type_map[simple_type.name] if hasattr(cls, 'schema_%s' % mapped): return getattr(cls, 'schema_%s' % mapped)() if isinstance(simple_type, xmlschema.validators.XsdAtomicBuiltin): if ( simple_type.min_length or simple_type.max_length or simple_type.white_space not in ('collapse', 'preserve') ): raise NotImplementedError(simple_type) if simple_type.name in cls.SIMPLE_TYPE_MAPPING: schema = {'type': cls.SIMPLE_TYPE_MAPPING[simple_type.name]} else: raise NotImplementedError(simple_type) add_patterns() return schema if isinstance(simple_type, xmlschema.validators.XsdAtomicRestriction): if simple_type.white_space not in ('collapse', 'preserve'): raise NotImplementedError(simple_type) schema = OrderedDict(cls.simpletype_to_jsonschema(simple_type.base_type)) for validator in simple_type.validators: if isinstance(validator, xmlschema.validators.XsdEnumerationFacets): schema['enum'] = validator.enumeration elif ( isinstance(validator, xmlschema.validators.XsdMinLengthFacet) and simple_type.base_type.name == xmlschema_names.XSD_STRING ): schema['minLength'] = validator.value elif ( isinstance(validator, xmlschema.validators.XsdMaxLengthFacet) and simple_type.base_type.name == xmlschema_names.XSD_STRING ): schema['maxLength'] = validator.value elif ( isinstance(validator, xmlschema.validators.XsdLengthFacet) and simple_type.base_type.name == xmlschema_names.XSD_STRING ): schema['minLength'] = validator.value schema['maxLength'] = validator.value elif isinstance(validator, xmlschema.validators.XsdMinInclusiveFacet): schema['minimum'] = validator.value elif isinstance(validator, xmlschema.validators.XsdMaxInclusiveFacet): schema['maximum'] = validator.value elif ( isinstance(validator, xmlschema.validators.XsdTotalDigitsFacet) and simple_type.base_type.name == xmlschema_names.XSD_DECIMAL ): schema['exclusiveMaximum'] = 10**validator.value elif ( isinstance(validator, xmlschema.validators.XsdFractionDigitsFacet) and simple_type.base_type.name == xmlschema_names.XSD_DECIMAL ): schema['multipleOf'] = 1 / 10.0**validator.value else: raise NotImplementedError(validator) add_patterns() return schema if isinstance(simple_type, xmlschema.validators.XsdUnion): return {'oneOf': [cls.simpletype_to_jsonschema(m) for m in simple_type.member_types]} raise NotImplementedError(simple_type) @classmethod def attributegroup_to_jsonschema(cls, attributegroup, schema, required=None): assert isinstance(attributegroup, xmlschema.validators.XsdAttributeGroup) properties = schema.setdefault('properties', OrderedDict()) for component in attributegroup.values(): if component.use == 'prohibited': continue if required is not None and component.use != 'optional': if component.name not in schema.get('required', []): schema.setdefault('required', []).append(component.name) properties[component.name] = cls.simpletype_to_jsonschema(component.type) @classmethod def group_to_alternatives(cls, group, alternatives=None): alternatives = alternatives or [[]] if group.model == 'choice': cls.choice_to_alternatives(group, alternatives=alternatives) elif group.model in ('sequence', 'all'): cls.sequence_to_alternatives(group, alternatives=alternatives) else: raise NotImplementedError(group) return alternatives @classmethod def choice_to_alternatives(cls, group, alternatives): new_alternatives = alternatives alternatives = list(alternatives) new_alternatives[:] = [] for component in group: if isinstance(component, xmlschema.validators.XsdElement): for alternative in alternatives: alternative = alternative + [component] new_alternatives.append(alternative) elif isinstance(component, xmlschema.validators.XsdGroup): sub_alternatives = [list(alternative) for alternative in alternatives] cls.group_to_alternatives(component, alternatives=sub_alternatives) for alternative in sub_alternatives: new_alternatives.append(alternative) else: raise NotImplementedError(component) @classmethod def sequence_to_alternatives(cls, group, alternatives): for component in group: if isinstance(component, xmlschema.validators.XsdElement): for alternative in alternatives: alternative.append(component) elif isinstance(component, xmlschema.validators.XsdGroup): cls.group_to_alternatives(component, alternatives=alternatives) else: raise NotImplementedError(component) @classmethod def group_to_jsonschema(cls, group, schema, base_schema=None): assert isinstance(group, xmlschema.validators.XsdGroup) alternatives = cls.group_to_alternatives(group) def fill_schema_with_alternative(schema, alternative): for component in alternative: properties = schema.setdefault('properties', OrderedDict()) properties[component.name] = cls.element_to_jsonschema(component) if component.min_occurs > 0 and component.name not in schema.get('required', []): schema.setdefault('required', []).append(component.name) if len(alternatives) == 1: fill_schema_with_alternative(schema, alternatives[0]) elif len(alternatives) > 1: base_schema = copy.deepcopy(schema) schema.clear() one_of = [] schema['oneOf'] = one_of for alternative in alternatives: new_schema = copy.deepcopy(base_schema) fill_schema_with_alternative(new_schema, alternative) one_of.append(new_schema) @classmethod def type_to_jsonschema(cls, xmltype): assert isinstance(xmltype, xmlschema.validators.XsdType) if xmltype.is_simple(): base_schema = cls.simpletype_to_jsonschema(xmltype) try: xmltype.decode('') except xmlschema.XMLSchemaValidationError: return base_schema else: if base_schema.get('oneOf'): base_schema['oneOf'].append({'type': 'null'}) return base_schema return {'oneOf': [{'type': 'null'}, base_schema]} elif xmltype.has_simple_content(): base_schema = cls.type_to_jsonschema(xmltype.base_type) if not xmltype.attributes: schema = base_schema else: schema = OrderedDict({'type': 'object', 'properties': OrderedDict()}) schema['properties']['$'] = base_schema cls.attributegroup_to_jsonschema(xmltype.attributes, schema) return schema else: if xmltype.has_mixed_content() or xmltype.name == xmlschema_names.XSD_ANY_TYPE: raise NotImplementedError(xmltype) schema = OrderedDict({'type': 'object'}) schema['additionalProperties'] = False if xmltype.attributes: cls.attributegroup_to_jsonschema(xmltype.attributes, schema) cls.group_to_jsonschema(xmltype.content_type, schema) return schema @classmethod def element_to_jsonschema(cls, element): assert isinstance(element, xmlschema.validators.XsdElement) is_array = element.max_occurs is None or element.max_occurs > 1 item_schema = cls.type_to_jsonschema(element.type) if is_array: item_schema = { 'type': 'array', 'items': item_schema, 'minItems': element.min_occurs, } if element.max_occurs is not None: item_schema['maxItems'] = element.max_occurs return item_schema def validate(self, instance): return jsonschema.validate(instance=instance, schema=self.json_schema) def encode(self, instance): return self.xml_schema.elements[self.root_element].encode( instance[self.root_element], converter=TransformConverter, transformer=self ) def decode(self, source): return self.xml_schema.elements[self.root_element].decode( source, converter=TransformConverter, transformer=self )