passerelle/passerelle/utils/json.py

204 lines
6.5 KiB
Python

# passerelle - uniform access to multiple data sources and services
# Copyright (C) 2019 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# passerelle - uniform access to multiple data sources and services
# Copyright (C) 2018 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import re
from passerelle.utils.validation import is_number
FLATTEN_SEPARATOR = '/'
def unflatten(d, separator=FLATTEN_SEPARATOR):
"""Transform:
{"a/b/0/x": "1234"}
into:
{"a": {"b": [{"x": "1234"}]}}
"""
if not isinstance(d, dict) or not d: # unflattening an empty dict has no sense
return d
# ok d is a dict
def split_key(key):
def map_key(x):
if is_number(x):
return int(x)
elif isinstance(x, str):
return x.replace('%s%s' % (FLATTEN_SEPARATOR, FLATTEN_SEPARATOR), FLATTEN_SEPARATOR)
return x
return [
map_key(x)
for x in re.split(
r'(?<!%s)%s(?!%s)' % (FLATTEN_SEPARATOR, FLATTEN_SEPARATOR, FLATTEN_SEPARATOR), key
)
]
keys = [(split_key(key), key) for key in d]
keys.sort()
def set_path(path, orig_key, d, value, i=0):
assert path
key, tail = path[i], path[i + 1 :]
if not tail: # end of path, set thevalue
if isinstance(key, int):
assert isinstance(d, list)
if len(d) != key:
raise ValueError('incomplete array before %s' % orig_key)
d.append(value)
else:
assert isinstance(d, dict)
d[key] = value
else:
new = [] if isinstance(tail[0], int) else {}
if isinstance(key, int):
assert isinstance(d, list)
if len(d) < key:
raise ValueError(
'incomplete array before %s in %s'
% (separator.join(map(str, path[: i + 1])), orig_key)
)
if len(d) == key:
d.append(new)
else:
new = d[key]
else:
new = d.setdefault(key, new)
set_path(path, orig_key, new, value, i + 1)
# Is the first level an array or a dict ?
if isinstance(keys[0][0][0], int):
new = []
else:
new = {}
for path, key in keys:
value = d[key]
set_path(path, key, new, value)
return new
def flatten(data, separator=FLATTEN_SEPARATOR):
assert isinstance(data, (list, dict))
def helper(data):
if isinstance(data, list):
for i, value in enumerate(data):
for path, value in helper(value):
yield [str(i)] + path, value
elif isinstance(data, dict):
for key, value in data.items():
for path, value in helper(value):
yield [str(key)] + path, value
else:
yield [], data
return {separator.join(path): value for path, value in helper(data)}
def flatten_json_schema(schema, separator=FLATTEN_SEPARATOR):
assert isinstance(schema, dict)
def helper(prefix, schema):
if 'oneOf' in schema:
schemas_by_keys = {}
for subschema in schema['oneOf']:
for key, schema in helper(prefix, subschema):
schemas_by_keys.setdefault(key, []).append(schema)
for key, schemas in schemas_by_keys.items():
if len(schemas) > 1:
yield key, {'oneOf': schemas}
else:
yield key, schemas[0]
elif schema['type'] == 'array':
prefix = prefix + separator if prefix else prefix
subschema = schema['items']
max_items = schema.get('maxItems', 3)
for i in range(max_items):
for key, schema in helper(str(i), subschema):
yield '%s%s' % (prefix, key), schema
elif schema['type'] == 'object':
prefix = prefix + separator if prefix else prefix
properties = schema['properties']
for key in properties:
for subkey, schema in helper(key, properties[key]):
yield '%s%s' % (prefix, subkey), schema
else:
yield prefix, schema
return {
'type': 'object',
'description': 'flattened schema *never* use for validation',
'properties': dict(helper('', schema)),
'additionalProperties': False,
}
def response_schema(data_schema=None, toplevel_properties=None):
schema = {
'type': 'object',
'required': ['err'],
'properties': {
'err': {'enum': [0, 1]},
},
}
if data_schema:
schema['properties']['data'] = data_schema
if toplevel_properties:
schema['properties'].update(toplevel_properties)
return schema
def datasource_array_schema():
return {
'type': 'array',
'items': {
'type': 'object',
'properties': {
'id': {
'type': 'string',
},
'text': {
'type': 'string',
},
},
},
}
def datasource_schema():
return response_schema(datasource_array_schema())