support for json field (fixes #15169)
This commit is contained in:
parent
1284460d7d
commit
b6b01da8ee
127
bijoe/engine.py
127
bijoe/engine.py
|
@ -15,10 +15,14 @@
|
|||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import logging
|
||||
import itertools
|
||||
|
||||
import collections
|
||||
import psycopg2
|
||||
|
||||
|
||||
from . import schemas
|
||||
|
||||
psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
|
||||
psycopg2.extensions.register_type(psycopg2.extensions.UNICODEARRAY)
|
||||
|
||||
|
@ -41,18 +45,21 @@ class EngineDimension(object):
|
|||
|
||||
@property
|
||||
def members(self):
|
||||
assert self.dimension.type != 'date'
|
||||
assert self.type != 'date'
|
||||
cursor = self.engine.get_cursor()
|
||||
sql = self.members_query
|
||||
if not sql:
|
||||
if self.dimension.join:
|
||||
join = self.engine_cube.get_join(self.dimension.join[-1])
|
||||
sql = 'SELECT %s AS value, %s::text AS label FROM %s AS %s ORDER BY %s' % (
|
||||
self.value, self.value_label or self.value, join.table, join.name,
|
||||
self.order_by or self.value)
|
||||
sql = ('SELECT %s AS value, %s::text AS label FROM %s AS "%s" '
|
||||
'GROUP BY %s, %s ORDER BY %s' % (
|
||||
self.value, self.value_label or self.value, join.table, join.name,
|
||||
self.value, self.value_label or self.value, self.order_by or self.value))
|
||||
else:
|
||||
sql = 'SELECT %s AS value, %s::text AS label FROM {fact_table} ORDER BY %s' % (
|
||||
self.value, self.value_label or self.value, self.self.order_by or self.value)
|
||||
sql = ('SELECT %s AS value, %s::text AS label FROM {fact_table} '
|
||||
'GROUP BY %s, %s ORDER BY %s' % (
|
||||
self.value, self.value_label or self.value, self.value,
|
||||
self.value_label or self.value, self.self.order_by or self.value))
|
||||
sql = sql.format(fact_table=self.engine_cube.fact_table)
|
||||
self.engine.log.debug('SQL: %s', sql)
|
||||
cursor.execute(sql)
|
||||
|
@ -60,6 +67,46 @@ class EngineDimension(object):
|
|||
yield Member(*row)
|
||||
|
||||
|
||||
class SchemaJSONDimension(schemas.Dimension):
|
||||
'''Generated dimensions for JSON fields keys'''
|
||||
filter = False
|
||||
order_by = None
|
||||
group_by = None
|
||||
join = ()
|
||||
type = 'string'
|
||||
|
||||
def __init__(self, json_field, name):
|
||||
name = str(name)
|
||||
self.name = name
|
||||
self.label = name.title()
|
||||
expr = '\"json_%s\".value' % name
|
||||
self.value_label = expr
|
||||
self.value = expr
|
||||
self.join = ['json_' + name]
|
||||
sql = ('SELECT DISTINCT "{json_field}"->>\'%s\' AS v, "{json_field}"->>\'%s\' AS v'
|
||||
' FROM "{{fact_table}}" WHERE ("{json_field}"->>\'%s\') IS NOT NULL ORDER BY v' %
|
||||
(self.name, self.name, self.name))
|
||||
self.members_query = sql.format(json_field=json_field)
|
||||
self.filter_expression = ('("{fact_table}".id IS NULL '
|
||||
'OR ("{fact_table}"."%s"->>\'%s\') IN (%%s))'
|
||||
% (json_field, name))
|
||||
self.filter_needs_join = False
|
||||
|
||||
|
||||
class EngineJSONDimension(EngineDimension):
|
||||
|
||||
def __init__(self, engine, engine_cube, name):
|
||||
self.engine = engine
|
||||
self.engine_cube = engine_cube
|
||||
self.dimension = SchemaJSONDimension(self.engine_cube.json_field, name)
|
||||
|
||||
def to_json(self):
|
||||
return {
|
||||
'name': self.name,
|
||||
'label': self.label,
|
||||
}
|
||||
|
||||
|
||||
class EngineMeasure(object):
|
||||
def __init__(self, engine, engine_cube, measure):
|
||||
self.engine = engine
|
||||
|
@ -71,33 +118,66 @@ class EngineMeasure(object):
|
|||
|
||||
|
||||
class ProxyList(object):
|
||||
def __init__(self, engine, engine_cube, attribute, cls):
|
||||
chain = None
|
||||
|
||||
def __init__(self, engine, engine_cube, attribute, cls, chain=None):
|
||||
self.engine = engine
|
||||
self.engine_cube = engine_cube
|
||||
self.attribute = attribute
|
||||
self.cls = cls
|
||||
if chain:
|
||||
self.chain = chain(engine, engine_cube)
|
||||
|
||||
def __iter__(self):
|
||||
return (self.cls(self.engine, self.engine_cube, o)
|
||||
for o in getattr(self.engine_cube.cube, self.attribute))
|
||||
i = (self.cls(self.engine, self.engine_cube, o)
|
||||
for o in getattr(self.engine_cube.cube, self.attribute))
|
||||
if self.chain:
|
||||
i = itertools.chain(i, self.chain)
|
||||
return i
|
||||
|
||||
def __getitem__(self, name):
|
||||
for o in getattr(self.engine_cube.cube, self.attribute):
|
||||
if o.name == name:
|
||||
return self.cls(self.engine, self.engine_cube, o)
|
||||
if self.chain:
|
||||
return self.chain[name]
|
||||
raise KeyError
|
||||
|
||||
|
||||
class JSONDimensions(object):
|
||||
__cache = None
|
||||
|
||||
def __init__(self, engine, engine_cube):
|
||||
self.engine = engine
|
||||
self.engine_cube = engine_cube
|
||||
|
||||
@property
|
||||
def cache(self):
|
||||
if not self.__cache:
|
||||
cursor = self.engine.get_cursor()
|
||||
sql = 'select distinct jsonb_object_keys(json_data) as a from formdata order by a'
|
||||
cursor.execute(sql)
|
||||
self.__cache = [row[0] for row in cursor.fetchall()]
|
||||
return self.__cache
|
||||
|
||||
def __iter__(self):
|
||||
for name in self.cache:
|
||||
yield EngineJSONDimension(self.engine, self.engine_cube, name)
|
||||
|
||||
def __getitem__(self, key):
|
||||
for name in self.cache:
|
||||
if name == key:
|
||||
return EngineJSONDimension(self.engine, self.engine_cube, name)
|
||||
|
||||
|
||||
class ProxyListDescriptor(object):
|
||||
def __init__(self, attribute, cls):
|
||||
def __init__(self, attribute, cls, chain=None):
|
||||
self.attribute = attribute
|
||||
self.cls = cls
|
||||
self.chain = chain
|
||||
|
||||
def __get__(self, obj, t=None):
|
||||
return ProxyList(obj.engine, obj, self.attribute, self.cls)
|
||||
|
||||
|
||||
|
||||
return ProxyList(obj.engine, obj, self.attribute, self.cls, chain=self.chain)
|
||||
|
||||
|
||||
def build_table_expression(join_tree, table_name, alias=None, top=True, other_conditions=None):
|
||||
|
@ -143,7 +223,7 @@ def build_table_expression(join_tree, table_name, alias=None, top=True, other_co
|
|||
|
||||
|
||||
class EngineCube(object):
|
||||
dimensions = ProxyListDescriptor('all_dimensions', EngineDimension)
|
||||
dimensions = ProxyListDescriptor('all_dimensions', EngineDimension, chain=JSONDimensions)
|
||||
measures = ProxyListDescriptor('measures', EngineMeasure)
|
||||
|
||||
def __init__(self, warehouse, cube):
|
||||
|
@ -159,6 +239,15 @@ class EngineCube(object):
|
|||
return cursor.fetchone()[0]
|
||||
|
||||
def get_join(self, name):
|
||||
if name.startswith('json_'):
|
||||
json_key = name[5:]
|
||||
return schemas.Join(
|
||||
name=name,
|
||||
table='(SELECT DISTINCT "%s"."%s"->>\'%s\' AS value FROM "%s" ORDER BY value)' % (
|
||||
self.fact_table, self.json_field, json_key, self.fact_table),
|
||||
master='"%s"->>\'%s\'' % (self.json_field, json_key),
|
||||
detail='value',
|
||||
)
|
||||
return self.cube.get_join(name)
|
||||
|
||||
def sql_query(self, filters, drilldown, measures, **kwargs):
|
||||
|
@ -172,8 +261,8 @@ class EngineCube(object):
|
|||
join_conditions = []
|
||||
|
||||
for dimension_name, values in filters:
|
||||
dimension = self.cube.get_dimension(dimension_name)
|
||||
assert dimension.filter
|
||||
dimension = self.dimensions[dimension_name]
|
||||
# assert dimension.filter
|
||||
condition, values = dimension.build_filter(values)
|
||||
condition = cursor.mogrify(condition, values)
|
||||
if dimension.filter_needs_join:
|
||||
|
@ -184,7 +273,7 @@ class EngineCube(object):
|
|||
where.append(condition)
|
||||
|
||||
for dimension_name in drilldown:
|
||||
dimension = self.cube.get_dimension(dimension_name)
|
||||
dimension = self.dimensions[dimension_name]
|
||||
joins.update(dimension.join)
|
||||
projections.append('%s AS %s' % (dimension.value_label or dimension.value,
|
||||
dimension.name))
|
||||
|
@ -192,7 +281,7 @@ class EngineCube(object):
|
|||
order_by.append(dimension.order_by or dimension.value)
|
||||
|
||||
for measure_name in measures:
|
||||
measure = self.cube.get_measure(measure_name)
|
||||
measure = self.get_measure(measure_name)
|
||||
if measure.expression not in projections:
|
||||
projections.append(measure.expression + ' AS ' + measure.name)
|
||||
sql = 'SELECT ' + ', '.join(projections)
|
||||
|
|
|
@ -47,6 +47,8 @@ class Base(object):
|
|||
|
||||
def __init__(self, **kwargs):
|
||||
for k, v in kwargs.iteritems():
|
||||
if k in self.__types__ and self.__types__ == 'str':
|
||||
v = str(v)
|
||||
setattr(self, k, v)
|
||||
|
||||
@classmethod
|
||||
|
@ -272,17 +274,20 @@ class Join(Base):
|
|||
|
||||
|
||||
class Cube(Base):
|
||||
__slots__ = ['name', 'label', 'fact_table', 'key', 'joins', 'dimensions', 'measures']
|
||||
__slots__ = ['name', 'label', 'fact_table', 'json_field', 'key', 'joins', 'dimensions',
|
||||
'measures']
|
||||
__types__ = {
|
||||
'name': str,
|
||||
'label': unicode,
|
||||
'fact_table': str,
|
||||
'json_field': str,
|
||||
'key': str,
|
||||
'joins': [Join],
|
||||
'dimensions': [Dimension],
|
||||
'measures': [Measure],
|
||||
}
|
||||
|
||||
json_field = None
|
||||
joins = ()
|
||||
dimensions = ()
|
||||
measures = ()
|
||||
|
|
Loading…
Reference in New Issue