bijoe/bijoe/engine.py

534 lines
19 KiB
Python

# bijoe - BI dashboard
# Copyright (C) 2015 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import collections
import contextlib
import logging
import itertools
import hashlib
import psycopg2
from django.core.cache import cache
from django.conf import settings
from django.utils.encoding import force_bytes, force_text
from django.utils.translation import ugettext_lazy as _
from . import schemas
psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
psycopg2.extensions.register_type(psycopg2.extensions.UNICODEARRAY)
class DimensionCell(collections.namedtuple('_Cell', ['dimension', 'value', 'value_label'])):
@property
def label(self):
if self.value_label:
return self.value_label
if self.value is None:
return self.dimension.absent_label
elif self.dimension.type == 'bool':
return _('Yes') if self.value else _('No')
else:
return force_text(self.value)
def __unicode__(self):
return force_text(self.label)
class MeasureCell(collections.namedtuple('_Cell', ['measure', 'value'])):
@property
def label(self):
value = self.value
if self.measure.type == 'percent':
if value is None:
return _('N/A')
else:
try:
return (u'%4.2f' % float(value)).replace('.', ',') + u' %'
except TypeError:
return _('N/A')
elif self.measure.type == 'duration':
if value is None:
return u'0'
else:
s = u''
if value.days:
s += u'%d jour(s)' % value.days
if value.seconds / 3600:
s += u' %d heure(s)' % (value.seconds / 3600)
if not s:
s = u'moins d\'1 heure'
return s
elif self.measure.type == 'bool':
if value is None:
return _('N/A')
else:
return _('Yes') if value else _('No')
elif self.measure.type == 'integer':
if value is None:
return '0'
else:
return force_text(value)
else:
raise NotImplementedError('unknown type %s' % self.measure.type)
def __unicode__(self):
return force_text(self.label)
class Cells(collections.namedtuple('Cells', ['dimensions', 'measures'])):
def __new__(cls, dimensions=[], measures=[]):
dimensions = list(dimensions)
measures = list(measures)
return super(Cells, cls).__new__(cls, dimensions, measures)
def quote(s):
return '"%s"' % s.replace('"', '\\"')
def cast_point(value, cur):
if value is None:
return None
return schemas.Point._make(map(float, value[1:-1].split(',')))
POINT = psycopg2.extensions.new_type((600,), "POINT", cast_point)
psycopg2.extensions.register_type(POINT)
POINT_ARRAY = psycopg2.extensions.new_array_type((1017,), "POINT[]", POINT)
psycopg2.extensions.register_type(POINT_ARRAY)
def to_tuple(cur, values):
return force_text(cur.mogrify(', '.join(['%s'] * len(values)), values))
Member = collections.namedtuple('Member', ['id', 'label'])
class EngineDimension(object):
def __init__(self, engine, engine_cube, dimension):
self.engine = engine
self.engine_cube = engine_cube
self.dimension = dimension
def __getattr__(self, name):
return getattr(self.dimension, name)
def cache_key(self, filters):
key = self.engine.path + self.engine_cube.name + self.name + repr(filters)
return hashlib.md5(force_bytes(key)).hexdigest()
def members(self, filters=()):
assert self.type != 'date'
if self.type == 'bool':
return [Member(id=True, label=_('Yes')), Member(id=False, label=_('No'))]
cache_key = self.cache_key(filters)
members = cache.get(cache_key)
if members is not None:
self.engine.log.debug('MEMBERS: (from cache) dimension %s.%s filters=%s: %s',
self.engine_cube.name, self.name, filters,
members)
return members
members = []
value = self.value
value_label = self.value_label or value
order_by = self.order_by
joins = set(self.join or [])
conditions = []
for dimension_name, values in filters:
dimension = self.engine_cube.dimensions[dimension_name]
if not (set(dimension.join or []) & set(self.join or [])):
continue
# assert dimension.filter
condition, values = dimension.build_filter(values)
if not condition:
continue
with self.engine.get_cursor() as cursor: # Ugly...
condition = force_text(cursor.mogrify(condition, values))
if dimension.filter_needs_join and dimension.join:
joins.update(dimension.join)
conditions.append(condition)
joins.update(dimension.join)
with self.engine.get_cursor() as cursor:
sql = self.members_query
if not sql:
table_expression = '%s' % self.engine_cube.fact_table
if joins:
table_expression = self.engine_cube.build_table_expression(
joins, self.engine_cube.fact_table, force_join='right')
sql = 'SELECT %s AS value, %s::text AS label ' % (value, value_label)
sql += 'FROM %s ' % table_expression
if order_by:
if not isinstance(order_by, list):
order_by = [order_by]
else:
order_by = [value]
group_by = [value]
if value_label not in group_by:
group_by.append(value_label)
for order_value in order_by:
if order_value not in group_by:
group_by.append(order_value)
if conditions:
sql += 'WHERE %s ' % (' AND '.join(conditions))
sql += 'GROUP BY %s ' % ', '.join(group_by)
sql += 'ORDER BY (%s) ' % ', '.join(order_by)
sql = sql.format(fact_table=self.engine_cube.fact_table)
self.engine.log.debug('SQL: %s', sql)
cursor.execute(sql)
for row in cursor.fetchall():
if row[0] is None:
continue
members.append(Member(id=row[0], label=force_text(row[1])))
cache.set(cache_key, members, 600)
self.engine.log.debug('MEMBERS: dimension %s.%s filters=%s: %s',
self.engine_cube.name, self.name, filters,
members)
return members
class SchemaJSONDimension(schemas.Dimension):
'''Generated dimensions for JSON fields keys'''
filter = False
order_by = None
group_by = None
join = ()
type = 'string'
def __init__(self, json_field, name):
name = str(name)
self.name = name
self.label = name.title()
expr = '\"json_%s\".value' % name
self.value_label = expr
self.value = expr
self.join = ['json_' + name]
sql = ('SELECT DISTINCT {json_field}->>\'%s\' AS v, {json_field}->>\'%s\' AS v'
' FROM {{fact_table}} WHERE ({json_field}->>\'%s\') IS NOT NULL ORDER BY v' %
(self.name, self.name, self.name))
self.members_query = sql.format(json_field=json_field)
self.filter_expression = ('({fact_table}.id IS NULL '
'OR ({fact_table}.%s->>\'%s\') IN (%%s))'
% (json_field, name))
self.filter_needs_join = False
self.absent_label = _('None')
class EngineJSONDimension(EngineDimension):
def __init__(self, engine, engine_cube, name):
self.engine = engine
self.engine_cube = engine_cube
self.dimension = SchemaJSONDimension(self.engine_cube.json_field, name)
def cache_key(self, filters):
key = (self.engine.path + self.engine_cube.json_field
+ self.engine_cube.name + self.name + repr(filters))
return hashlib.md5(force_bytes(key)).hexdigest()
def to_json(self):
return {
'name': self.name,
'label': self.label,
}
class EngineMeasure(object):
def __init__(self, engine, engine_cube, measure):
self.engine = engine
self.engine_cube = engine_cube
self.measure = measure
def __getattr__(self, name):
return getattr(self.measure, name)
class ProxyList(object):
chain = None
def __init__(self, engine, engine_cube, attribute, cls, chain=None):
self.engine = engine
self.engine_cube = engine_cube
self.attribute = attribute
self.cls = cls
if chain:
self.chain = chain(engine, engine_cube)
def __iter__(self):
i = (self.cls(self.engine, self.engine_cube, o)
for o in getattr(self.engine_cube.cube, self.attribute))
if self.chain:
i = itertools.chain(i, self.chain)
return i
def __getitem__(self, name):
for o in getattr(self.engine_cube.cube, self.attribute):
if o.name == name:
return self.cls(self.engine, self.engine_cube, o)
if self.chain:
return self.chain[name]
raise KeyError
class JSONDimensions(object):
__cache = None
def __init__(self, engine, engine_cube):
self.engine = engine
self.engine_cube = engine_cube
@property
def cache(self):
if not self.engine_cube.json_field:
return []
if not self.__cache:
with self.engine.get_cursor() as cursor:
sql = ('select distinct jsonb_object_keys(%s) as a from formdata order by a'
% self.engine_cube.json_field)
cursor.execute(sql)
self.__cache = [row[0] for row in cursor.fetchall()]
return self.__cache
def __iter__(self):
for name in self.cache:
yield EngineJSONDimension(self.engine, self.engine_cube, name)
def __getitem__(self, key):
for name in self.cache:
if name == key:
return EngineJSONDimension(self.engine, self.engine_cube, name)
class ProxyListDescriptor(object):
def __init__(self, attribute, cls, chain=None):
self.attribute = attribute
self.cls = cls
self.chain = chain
def __get__(self, obj, t=None):
key = '_proxy_list_cache_%s' % id(self)
if key not in obj.__dict__:
obj.__dict__[key] = ProxyList(obj.engine, obj, self.attribute, self.cls, chain=self.chain)
return obj.__dict__[key]
class EngineCube(object):
dimensions = ProxyListDescriptor('all_dimensions', EngineDimension, chain=JSONDimensions)
measures = ProxyListDescriptor('measures', EngineMeasure)
def __init__(self, warehouse, cube):
self.engine = warehouse
self.cube = cube
def __getattr__(self, name):
return getattr(self.cube, name)
def count(self):
with self.engine.get_cursor() as cursor:
cursor.execute('SELECT count(%s) FROM %s' % (self.key, self.fact_table))
return cursor.fetchone()[0]
def get_join(self, name):
if name.startswith('json_'):
json_key = name[5:]
return schemas.Join(
name=name,
table=(
'(SELECT DISTINCT %s.%s->>\'%s\' AS value FROM %s '
'WHERE (%s.%s->>\'%s\') IS NOT NULL ORDER BY value)' % (
self.fact_table, self.json_field, json_key, self.fact_table,
self.fact_table, self.json_field, json_key
)
),
master='%s->>\'%s\'' % (self.json_field, json_key),
detail='value',
kind='left',
)
return self.cube.get_join(name)
def sql_query(self, filters, drilldown, measures, **kwargs):
with self.engine.get_cursor() as cursor:
projections = []
joins = set()
where = []
group_by = []
order_by = []
for dimension_name, values in filters:
dimension = self.dimensions[dimension_name]
# assert dimension.filter
condition, values = dimension.build_filter(values)
if not condition:
continue
condition = force_text(cursor.mogrify(condition, values))
if dimension.filter_needs_join and dimension.join:
joins.update(dimension.join)
if dimension.filter_in_join:
assert False, 'filter in join is not supported anymore'
else:
where.append(condition)
for dimension in drilldown:
joins.update(dimension.join or [])
projections.append('%s AS %s' % (dimension.value, dimension.name + '_value'))
if dimension.value_label:
projections.append('%s AS %s' % (dimension.value_label, dimension.name + '_label'))
group_by.append(dimension.group_by or dimension.value)
order_by.extend(dimension.order_by or [dimension.value])
for order_value in order_by:
if order_value not in group_by:
group_by.append(order_value)
for measure in measures:
if measure.expression not in projections:
projections.append(measure.expression + ' AS ' + measure.name)
sql = 'SELECT ' + ', '.join(projections)
table_expression = ' %s' % self.cube.fact_table
if joins:
table_expression = self.build_table_expression(joins, self.fact_table)
sql += ' FROM %s' % table_expression
where_conditions = 'true'
if where:
where_conditions = ' AND '.join(where)
sql += ' WHERE %s' % where_conditions
if group_by:
sql += ' GROUP BY %s' % ', '.join(group_by)
if order_by:
sql += ' ORDER BY %s' % ', '.join(order_by)
sql = sql.format(fact_table=self.cube.fact_table,
table_expression=table_expression,
where_conditions=where_conditions)
return sql
def query(self, filters, drilldown, measures, **kwargs):
self.engine.log.debug('%s.%s query filters=%s drilldown=%s measures=%s',
self.engine.warehouse.name, self.cube.name, filters, drilldown,
measures)
with self.engine.get_cursor() as cursor:
sql = self.sql_query(filters=filters, drilldown=drilldown, measures=measures, **kwargs)
self.engine.log.debug('SQL: %s', sql)
cursor.execute(sql)
for row in cursor.fetchall():
cells = Cells()
j = 0
for dimension in drilldown:
value = row[j]
if not dimension.value_label:
value_label = None
j += 1
else:
value_label = row[j + 1]
j += 2
cells.dimensions.append(DimensionCell(
dimension=dimension,
value=value,
value_label=value_label,
))
for i, measure in enumerate(measures):
cells.measures.append(MeasureCell(
measure=measure,
value=row[j + i],
))
yield cells
JOIN_KINDS = {
'inner': 'INNER JOIN',
'left': 'LEFT OUTER JOIN',
'right': 'RIGHT OUTER JOIN',
'full': 'FULL OUTER JOIN',
}
def build_table_expression(self, joins, table_name, force_join=None):
'''Recursively build the table expression from the join tree,
starting from the fact table'''
join_tree = {}
# Build join tree
for join_name in joins:
join = self.get_join(join_name)
master_table = join.master_table or self.fact_table
join_tree.setdefault(master_table, {}).setdefault(force_join or join.kind, {})[join.name] = join
def build_table_expression_helper(join_tree, table_name, alias=None, top=True):
contain_joins = False
sql = table_name
if alias:
sql += ' AS %s' % quote(alias)
for kind in ['left', 'inner', 'right', 'full']:
joins = join_tree.get(alias or table_name, {}).get(kind)
if not joins:
continue
for join_name, join in joins.items():
contain_joins = True
sql += ' %s ' % self.JOIN_KINDS[kind]
sql += ' ' + build_table_expression_helper(join_tree, join.table, alias=join.name, top=False)
condition = '%s.%s = %s.%s' % (
alias or table_name,
join.master.split('.')[-1],
quote(join.name),
join.detail)
sql += ' ON ' + condition
# if the table expression contains joins and is not the full table
# expression but used in another join, it must be quoted with
# parenthesis
if not top and contain_joins:
sql = '(%s)' % sql
return sql
return build_table_expression_helper(join_tree, table_name)
class Engine(object):
def __init__(self, warehouse):
self.warehouse = warehouse
self.log = logging.getLogger(__name__)
@property
def cubes(self):
for cube in self.warehouse.cubes:
yield EngineCube(self, cube)
def __getitem__(self, name):
return EngineCube(self, self.warehouse.get_cube(name))
def __getattr__(self, name):
return getattr(self.warehouse, name)
@contextlib.contextmanager
def get_cursor(self):
with contextlib.closing(psycopg2.connect(self.warehouse.pg_dsn)) as connection:
with connection.cursor() as cursor:
search_path = ', '.join(['%s' % namespace for namespace in self.warehouse.search_path])
cursor.execute('SET SEARCH_PATH = %s' % search_path)
for statement in getattr(settings, 'BIJOE_INIT_SQL', []):
cursor.execute(statement)
yield cursor