bijoe/bijoe/engine.py

399 lines
15 KiB
Python

# bijoe - BI dashboard
# Copyright (C) 2015 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import contextlib
import logging
import itertools
import collections
import psycopg2
from . import schemas
psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
psycopg2.extensions.register_type(psycopg2.extensions.UNICODEARRAY)
def cast_point(value, cur):
if value is None:
return None
return schemas.Point._make(map(float, value[1:-1].split(',')))
POINT = psycopg2.extensions.new_type((600,), "POINT", cast_point)
psycopg2.extensions.register_type(POINT)
POINT_ARRAY = psycopg2.extensions.new_array_type((1017,), "POINT[]", POINT)
psycopg2.extensions.register_type(POINT_ARRAY)
def to_tuple(cur, values):
return cur.mogrify(', '.join(['%s'] * len(values)), values)
Member = collections.namedtuple('Member', ['id', 'label'])
class EngineDimension(object):
def __init__(self, engine, engine_cube, dimension):
self.engine = engine
self.engine_cube = engine_cube
self.dimension = dimension
def __getattr__(self, name):
return getattr(self.dimension, name)
@property
def members(self):
assert self.type != 'date'
value = self.value
value_label = self.value_label or value
order_by = self.order_by
with self.engine.get_cursor() as cursor:
sql = self.members_query
if not sql:
table_expression = '"%s"' % self.engine_cube.fact_table
if self.join:
table_expression = self.engine_cube.build_table_expression(
self.join, self.engine_cube.fact_table)
sql = 'SELECT %s AS value, %s::text AS label ' % (value, value_label)
sql += 'FROM %s ' % table_expression
if order_by:
if not isinstance(order_by, list):
order_by = [order_by]
else:
order_by = [value]
group_by = [value]
if value_label not in group_by:
group_by.append(value_label)
for order_value in order_by:
if order_value not in group_by:
group_by.append(order_value)
sql += 'GROUP BY %s ' % ', '.join(group_by)
sql += 'ORDER BY (%s) ' % ', '.join(order_by)
sql = sql.format(fact_table=self.engine_cube.fact_table)
self.engine.log.debug('SQL: %s', sql)
cursor.execute(sql)
for row in cursor.fetchall():
if row[0] is None:
continue
yield Member(*row)
class SchemaJSONDimension(schemas.Dimension):
'''Generated dimensions for JSON fields keys'''
filter = False
order_by = None
group_by = None
join = ()
type = 'string'
def __init__(self, json_field, name):
name = str(name)
self.name = name
self.label = name.title()
expr = '\"json_%s\".value' % name
self.value_label = expr
self.value = expr
self.join = ['json_' + name]
sql = ('SELECT DISTINCT "{json_field}"->>\'%s\' AS v, "{json_field}"->>\'%s\' AS v'
' FROM "{{fact_table}}" WHERE ("{json_field}"->>\'%s\') IS NOT NULL ORDER BY v' %
(self.name, self.name, self.name))
self.members_query = sql.format(json_field=json_field)
self.filter_expression = ('("{fact_table}".id IS NULL '
'OR ("{fact_table}.%s"->>\'%s\') IN (%%s))'
% (json_field, name))
self.filter_needs_join = False
class EngineJSONDimension(EngineDimension):
def __init__(self, engine, engine_cube, name):
self.engine = engine
self.engine_cube = engine_cube
self.dimension = SchemaJSONDimension(self.engine_cube.json_field, name)
def to_json(self):
return {
'name': self.name,
'label': self.label,
}
class EngineMeasure(object):
def __init__(self, engine, engine_cube, measure):
self.engine = engine
self.engine_cube = engine_cube
self.measure = measure
def __getattr__(self, name):
return getattr(self.measure, name)
class ProxyList(object):
chain = None
def __init__(self, engine, engine_cube, attribute, cls, chain=None):
self.engine = engine
self.engine_cube = engine_cube
self.attribute = attribute
self.cls = cls
if chain:
self.chain = chain(engine, engine_cube)
def __iter__(self):
i = (self.cls(self.engine, self.engine_cube, o)
for o in getattr(self.engine_cube.cube, self.attribute))
if self.chain:
i = itertools.chain(i, self.chain)
return i
def __getitem__(self, name):
for o in getattr(self.engine_cube.cube, self.attribute):
if o.name == name:
return self.cls(self.engine, self.engine_cube, o)
if self.chain:
return self.chain[name]
raise KeyError
class JSONDimensions(object):
__cache = None
def __init__(self, engine, engine_cube):
self.engine = engine
self.engine_cube = engine_cube
@property
def cache(self):
if not self.engine_cube.json_field:
return []
if not self.__cache:
with self.engine.get_cursor() as cursor:
sql = 'select distinct jsonb_object_keys(json_data) as a from formdata order by a'
cursor.execute(sql)
self.__cache = [row[0] for row in cursor.fetchall()]
return self.__cache
def __iter__(self):
for name in self.cache:
yield EngineJSONDimension(self.engine, self.engine_cube, name)
def __getitem__(self, key):
for name in self.cache:
if name == key:
return EngineJSONDimension(self.engine, self.engine_cube, name)
class ProxyListDescriptor(object):
def __init__(self, attribute, cls, chain=None):
self.attribute = attribute
self.cls = cls
self.chain = chain
def __get__(self, obj, t=None):
return ProxyList(obj.engine, obj, self.attribute, self.cls, chain=self.chain)
class EngineCube(object):
dimensions = ProxyListDescriptor('all_dimensions', EngineDimension, chain=JSONDimensions)
measures = ProxyListDescriptor('measures', EngineMeasure)
def __init__(self, warehouse, cube):
self.engine = warehouse
self.cube = cube
def __getattr__(self, name):
return getattr(self.cube, name)
def count(self):
with self.engine.get_cursor() as cursor:
cursor.execute('SELECT count(%s) FROM "%s"' % (self.key, self.fact_table))
return cursor.fetchone()[0]
def get_join(self, name):
if name.startswith('json_'):
json_key = name[5:]
return schemas.Join(
name=name,
table='(SELECT DISTINCT "%s"."%s"->>\'%s\' AS value FROM "%s" ORDER BY value)' % (
self.fact_table, self.json_field, json_key, self.fact_table),
master='"%s"->>\'%s\'' % (self.json_field, json_key),
detail='value',
)
return self.cube.get_join(name)
def sql_query(self, filters, drilldown, measures, **kwargs):
with self.engine.get_cursor() as cursor:
projections = []
joins = set()
where = []
group_by = []
order_by = []
join_conditions = []
for dimension_name, values in filters:
dimension = self.dimensions[dimension_name]
# assert dimension.filter
condition, values = dimension.build_filter(values)
condition = cursor.mogrify(condition, values)
if dimension.filter_needs_join and dimension.join:
joins.update(dimension.join)
if dimension.filter_in_join:
join_conditions.append(condition)
else:
where.append(condition)
for dimension_name in drilldown:
dimension = self.dimensions[dimension_name]
joins.update(dimension.join or [])
projections.append('%s AS %s' % (dimension.value_label or dimension.value,
dimension.name))
group_by.append(dimension.group_by or dimension.value)
order_by.extend(dimension.order_by or [dimension.value])
for order_value in order_by:
if order_value not in group_by:
group_by.append(order_value)
for measure_name in measures:
measure = self.get_measure(measure_name)
if measure.expression not in projections:
projections.append(measure.expression + ' AS ' + measure.name)
sql = 'SELECT ' + ', '.join(projections)
table_expression = ' "%s"' % self.cube.fact_table
if joins:
table_expression = self.build_table_expression(
joins, self.fact_table, other_conditions=join_conditions)
sql += ' FROM %s' % table_expression
where_conditions = 'true'
if where:
where_conditions = ' AND '.join(where)
sql += ' WHERE %s' % where_conditions
if group_by:
sql += ' GROUP BY %s' % ', '.join(group_by)
if order_by:
sql += ' ORDER BY %s' % ', '.join(order_by)
sql = sql.format(fact_table=self.cube.fact_table,
table_expression=table_expression,
where_conditions=where_conditions)
sql = sql.format(fact_table=self.cube.fact_table,
table_expression=table_expression,
where_conditions=where_conditions)
return sql
def query(self, filters, drilldown, measures, **kwargs):
self.engine.log.debug('%s.%s query filters=%s drilldown=%s measures=%s',
self.engine.warehouse.name, self.cube.name, filters, drilldown,
measures)
cells = []
for dimension_name in drilldown:
cells.append(self.dimensions[dimension_name])
for measure_name in measures:
cells.append(self.measures[measure_name])
with self.engine.get_cursor() as cursor:
sql = self.sql_query(filters=filters, drilldown=drilldown, measures=measures, **kwargs)
self.engine.log.debug('SQL: %s', sql)
cursor.execute(sql)
for row in cursor.fetchall():
yield [{
'name': cell.name,
'label': cell.label,
'type': cell.type,
'value': value,
} for cell, value in zip(cells, row)]
def build_table_expression(self, joins, table_name, other_conditions=None):
'''Recursively build the table expression from the join tree,
starting from the fact table'''
join_tree = {}
# Build join tree
for join_name in joins:
join = self.get_join(join_name)
master_table = join.master_table or self.fact_table
join_tree.setdefault(master_table, {}).setdefault(join.kind, {})[join.name] = join
def build_table_expression_helper(join_tree, table_name, alias=None, top=True, other_conditions=None):
if table_name.strip().startswith('('):
sql = table_name
else:
sql = '"%s"' % table_name
if alias:
sql += ' AS "%s"' % alias
add_paren = False
for kind in ['left', 'inner', 'right', 'full']:
joins = join_tree.get(alias or table_name, {}).get(kind)
if not joins:
continue
add_paren = True
join_kinds = {
'inner': 'INNER JOIN',
'left': 'LEFT OUTER JOIN',
'right': 'RIGHT OUTER JOIN',
'full': 'FULL OUTER JOIN',
}
sql += ' %s ' % join_kinds[kind]
sub_joins = []
conditions = []
if other_conditions:
conditions = other_conditions
other_conditions = None
for join_name, join in joins.iteritems():
sub_joins.append(
build_table_expression_helper(join_tree, join.table, alias=join.name, top=False))
conditions.append('"%s".%s = "%s"."%s"' % (
alias or table_name,
join.master.split('.')[-1],
join.name, join.detail))
sub_join = ' CROSS JOIN '.join(sub_joins)
if len(sub_joins) > 1:
sub_join = '(%s)' % sub_join
sql += sub_join
sql += ' ON %s' % ' AND '.join(conditions)
if not top and add_paren:
sql = '(%s)' % sql
return sql
return build_table_expression_helper(
join_tree, table_name, other_conditions=other_conditions)
class Engine(object):
def __init__(self, warehouse):
self.warehouse = warehouse
self.log = logging.getLogger(__name__)
@property
def cubes(self):
for cube in self.warehouse.cubes:
yield EngineCube(self, cube)
def __getitem__(self, name):
return EngineCube(self, self.warehouse.get_cube(name))
def __getattr__(self, name):
return getattr(self.warehouse, name)
@contextlib.contextmanager
def get_cursor(self):
with contextlib.closing(psycopg2.connect(self.warehouse.pg_dsn)) as connection:
with connection.cursor() as cursor:
search_path = ', '.join(['"%s"' % namespace for namespace in self.warehouse.search_path])
cursor.execute('SET SEARCH_PATH = %s' % search_path)
yield cursor