utils: add conversion from XMLSchema to JSON schema (#37488)

We target the Draft 7 jsonschema specification.
This commit is contained in:
Benjamin Dauvergne 2019-10-15 11:16:42 +02:00
parent d4d3e59e3d
commit 50c17adbfe
4 changed files with 358 additions and 1 deletions

View File

@ -14,6 +14,12 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from collections import OrderedDict
import copy
import jsonschema
import xmlschema
def text_content(node):
'''Extract text content from node and all its children. Equivalent to
@ -81,3 +87,202 @@ def to_json(root):
if child_content:
d[child.tag].append(child_content)
return d
class JSONSchemaFromXMLSchema(object):
def __init__(self, xml_schema, root_element):
if not isinstance(xml_schema, xmlschema.XMLSchema):
xml_schema = xmlschema.XMLSchema(xml_schema)
self.xml_schema = xml_schema
self.json_schema = {
'type': 'object',
'properties': {
root_element: self.element_to_jsonschema(
xml_schema.elements[root_element]),
},
'required': [root_element],
'additionalProperties': False,
}
@classmethod
def simpletype_to_jsonschema(cls, simple_type):
assert isinstance(simple_type, xmlschema.validators.XsdSimpleType)
if isinstance(simple_type, xmlschema.validators.XsdAtomicBuiltin):
if (simple_type.min_length
or simple_type.max_length
or simple_type.white_space not in ('collapse', 'preserve')
or simple_type.patterns):
raise NotImplementedError(simple_type)
if simple_type.name == xmlschema.qnames.XSD_STRING:
schema = {'type': 'string'}
elif simple_type.name == xmlschema.qnames.XSD_INTEGER:
schema = {'type': 'integer'}
elif simple_type.name == xmlschema.qnames.XSD_BOOLEAN:
schema = {'type': 'boolean'}
elif simple_type.name == xmlschema.qnames.XSD_DOUBLE:
schema = {'type': 'number'}
else:
raise NotImplementedError(simple_type)
return schema
elif isinstance(simple_type, xmlschema.validators.XsdAtomicRestriction):
if (simple_type.white_space not in ('collapse', 'preserve')
or simple_type.patterns):
raise NotImplementedError(simple_type)
schema = OrderedDict(cls.simpletype_to_jsonschema(simple_type.base_type))
for validator in simple_type.validators:
if isinstance(validator, xmlschema.validators.XsdEnumerationFacets):
schema['enum'] = validator.enumeration
elif (isinstance(validator, xmlschema.validators.XsdMinLengthFacet)
and simple_type.base_type.name == xmlschema.qnames.XSD_STRING):
schema['minLength'] = validator.value
elif (isinstance(validator, xmlschema.validators.XsdMaxLengthFacet)
and simple_type.base_type.name == xmlschema.qnames.XSD_STRING):
schema['maxLength'] = validator.value
elif (isinstance(validator, xmlschema.validators.XsdLengthFacet)
and simple_type.base_type.name == xmlschema.qnames.XSD_STRING):
schema['minLength'] = validator.value
schema['maxLength'] = validator.value
else:
raise NotImplementedError(validator)
return schema
raise NotImplementedError(simple_type)
@classmethod
def attributegroup_to_jsonschema(cls, attributegroup, schema, required=None):
assert isinstance(attributegroup, xmlschema.validators.XsdAttributeGroup)
properties = schema.setdefault('properties', OrderedDict())
for component in attributegroup.iter_component():
if component.use == 'prohibited':
continue
if required is not None and component.use != 'optional':
if component.name not in schema.get('required', []):
schema.setdefault('required', []).append(component.name)
if component.ref:
raise NotImplementedError(component)
else:
properties[component.name] = cls.simpletype_to_jsonschema(component.type)
@classmethod
def group_to_alternatives(cls, group, alternatives=None):
alternatives = alternatives or [[]]
if group.model == 'choice':
cls.choice_to_alternatives(group, alternatives=alternatives)
elif group.model == 'sequence' or group.model == 'all':
cls.sequence_to_alternatives(group, alternatives=alternatives)
else:
raise NotImplementedError(group)
return alternatives
@classmethod
def choice_to_alternatives(cls, group, alternatives):
new_alternatives = alternatives
alternatives = list(alternatives)
new_alternatives[:] = []
for component in group:
if isinstance(component, xmlschema.validators.XsdElement):
for alternative in alternatives:
alternative = alternative + [component]
new_alternatives.append(alternative)
elif isinstance(component, xmlschema.validators.XsdGroup):
sub_alternatives = [list(alternative) for alternative in alternatives]
cls.group_to_alternatives(component, alternatives=sub_alternatives)
for alternative in sub_alternatives:
new_alternatives.append(alternative)
else:
raise NotImplementedError(component)
@classmethod
def sequence_to_alternatives(cls, group, alternatives):
for component in group:
if isinstance(component, xmlschema.validators.XsdElement):
for alternative in alternatives:
alternative.append(component)
elif isinstance(component, xmlschema.validators.XsdGroup):
cls.group_to_alternatives(component, alternatives=alternatives)
else:
raise NotImplementedError(component)
@classmethod
def group_to_jsonschema(cls, group, schema, base_schema=None):
assert isinstance(group, xmlschema.validators.XsdGroup)
alternatives = cls.group_to_alternatives(group)
assert len(alternatives) >= 1 and all(len(alternative) >= 1 for alternative in alternatives), alternatives
def fill_schema_with_alternative(schema, alternative):
for component in alternative:
properties = schema.setdefault('properties', OrderedDict())
properties[component.name] = cls.element_to_jsonschema(component)
if (component.min_occurs > 0
and component.name not in schema.get('required', [])):
schema.setdefault('required', []).append(component.name)
if len(alternatives) == 1:
fill_schema_with_alternative(schema, alternatives[0])
elif len(alternatives) > 1:
base_schema = copy.deepcopy(schema)
schema.clear()
one_of = []
schema['oneOf'] = one_of
for alternative in alternatives:
new_schema = copy.deepcopy(base_schema)
fill_schema_with_alternative(new_schema, alternative)
one_of.append(new_schema)
@classmethod
def type_to_jsonschema(cls, xmltype, depth=0):
assert isinstance(xmltype, xmlschema.validators.XsdType)
if xmltype.is_simple():
schema = cls.simpletype_to_jsonschema(xmltype)
if depth == 0:
schema = {'oneOf': [schema, {'type': 'null'}]}
return schema
elif xmltype.has_simple_content():
base_schema = cls.type_to_jsonschema(xmltype.base_type, depth=depth + 1)
if not xmltype.attributes:
schema = base_schema
else:
cls.attributegroup_to_jsonschema(xmltype.attributes)
schema['properties']['$'] = base_schema
if depth == 0:
schema = {'oneOf': [schema, {'type': 'null'}]}
return schema
else:
if xmltype.has_mixed_content() or not xmltype.is_element_only():
raise NotImplementedError(xmltype)
schema = OrderedDict({'type': 'object'})
schema['additionalProperties'] = False
if xmltype.attributes:
cls.attributegroup_to_jsonschema(schema)
cls.group_to_jsonschema(xmltype.content_type, schema)
return schema
@classmethod
def element_to_jsonschema(cls, element):
assert isinstance(element, xmlschema.validators.XsdElement)
is_array = element.max_occurs > 1 or element.max_occurs is None
type_schema = cls.type_to_jsonschema(element.type)
if is_array:
d = {
'type': 'array',
'items': type_schema,
'minItems': element.min_occurs,
}
if element.max_occurs is not None:
d['maxItems'] = element.max_occurs
return d
else:
return type_schema
def validate(self, instance):
return jsonschema.validate(instance=instance, schema=self.json_schema)

View File

@ -110,6 +110,7 @@ setup(name='passerelle',
'paramiko',
'pdfrw',
'httplib2',
'xmlschema',
],
cmdclass={
'build': build,

101
tests/data/pacs-doc.xml Normal file
View File

@ -0,0 +1,101 @@
<?xml version="1.0" encoding="UTF-8" ?>
<PACS>
<partenaire1>
<civilite>MME</civilite>
<nomNaissance>Doe</nomNaissance>
<prenoms>Jane</prenoms>
<codeNationalite>FRA</codeNationalite>
<codeNationalite>BHS</codeNationalite>
<codeNationalite>BEL</codeNationalite>
<jourNaissance>28</jourNaissance>
<moisNaissance>01</moisNaissance>
<anneeNaissance>1950</anneeNaissance>
<LieuNaissance>
<localite>ST ETIENNE</localite>
<codePostal>42000</codePostal>
<codeInsee>42218</codeInsee>
<departement>Loire</departement>
<codePays>FRA</codePays>
</LieuNaissance>
<ofpra>false</ofpra>
<mesureJuridique>true</mesureJuridique>
<adressePostale>
<NumeroLibelleVoie>1 rue du test</NumeroLibelleVoie>
<Complement1>Appartement, étage, escalier</Complement1>
<Complement2>Résidence, bâtiment ou immeuble</Complement2>
<LieuDitBpCommuneDeleguee>BP1</LieuDitBpCommuneDeleguee>
<CodePostal>05100</CodePostal>
<Localite>VILLAR ST PANCRACE</Localite>
<Pays>FRA</Pays>
</adressePostale>
<adresseElectronique>mates@entrouvert.com</adresseElectronique>
<telephone>+33123456789</telephone>
<titreIdentiteVerifie>true</titreIdentiteVerifie>
</partenaire1>
<partenaire2>
<civilite>MME</civilite>
<nomNaissance>Doe</nomNaissance>
<prenoms>Jane</prenoms>
<codeNationalite>BEL</codeNationalite>
<jourNaissance>28</jourNaissance>
<moisNaissance>01</moisNaissance>
<anneeNaissance>1982</anneeNaissance>
<LieuNaissance>
<localite>CLERMONT FERRAND</localite>
<codePostal>63000</codePostal>
<codeInsee>63113</codeInsee>
<departement>Puy-de-dôme</departement>
<codePays>FRA</codePays>
</LieuNaissance>
<ofpra>false</ofpra>
<mesureJuridique>true</mesureJuridique>
<adressePostale>
<NumeroLibelleVoie>2 rue du test</NumeroLibelleVoie>
<CodePostal>05100</CodePostal>
<Localite>VILLAR ST PANCRACE</Localite>
<Pays>FRA</Pays>
</adressePostale>
<adresseElectronique>mates@entrouvert.com</adresseElectronique>
<telephone>+33123456789</telephone>
<titreIdentiteVerifie>false</titreIdentiteVerifie>
</partenaire2>
<convention>
<conventionType>
<aideMaterielMontant>100000</aideMaterielMontant>
<regimePacs>legal</regimePacs>
<aideMateriel>
<typeAideMateriel>aideFixe</typeAideMateriel>
</aideMateriel>
</conventionType>
</convention>
<residenceCommune>
<NumeroLibelleVoie>3 place du test</NumeroLibelleVoie>
<CodePostal>05100</CodePostal>
<Localite>VILLAR ST PANCRACE</Localite>
<Pays></Pays>
</residenceCommune>
<attestationHonneur>
<nonParente>true</nonParente>
<residenceCommune>true</residenceCommune>
</attestationHonneur>
</PACS>

View File

@ -1,6 +1,27 @@
# passerelle - uniform access to multiple data sources and services
# Copyright (C) 2019 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import xml.etree.ElementTree as ET
from passerelle.utils.xml import to_json, text_content
import xmlschema
import jsonschema
from passerelle.utils.xml import to_json, text_content, JSONSchemaFromXMLSchema
from passerelle.utils.json import flatten_json_schema, flatten, unflatten
def test_text_content():
@ -31,3 +52,32 @@ def test_to_json():
{'text3': '4'},
]
}
def test_xmlschema_to_jsonschema():
schema_path = 'passerelle/apps/sp_fr/depotDossierPACS.XSD'
# go from XML to JSON,
# convert XMLSchema to JSONSchema
# validate jsonschema, on converted data,
# flatten the JSON schema,
# flatten the data,
# validate flattened data with flatenned JSON schema
# unflatten data
# convert unflattened data to XML
# convert XML to JSON
# then compare to initially converted JSON data
schema = xmlschema.XMLSchema(schema_path, converter=xmlschema.UnorderedConverter)
json_schema = JSONSchemaFromXMLSchema(schema, 'PACS')
d = schema.elements['PACS'].decode(ET.parse('tests/data/pacs-doc.xml').getroot())
d = {'PACS': d}
json_schema.validate(d)
flattened_json_schema = flatten_json_schema(json_schema.json_schema)
flattened_d = flatten(d)
jsonschema.validate(instance=flattened_d, schema=flattened_json_schema)
d2 = unflatten(d)
json_schema.validate(d2)
tree = schema.elements['PACS'].encode(d2['PACS'], converter=xmlschema.UnorderedConverter)
d3 = schema.elements['PACS'].decode(tree)
assert d == {'PACS': d3}