588 lines
20 KiB
Python
588 lines
20 KiB
Python
# bijoe - BI dashboard
|
|
# Copyright (C) 2015 Entr'ouvert
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify it
|
|
# under the terms of the GNU Affero General Public License as published
|
|
# by the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import collections
|
|
import contextlib
|
|
import datetime
|
|
import hashlib
|
|
import itertools
|
|
import logging
|
|
|
|
import psycopg2
|
|
from django.conf import settings
|
|
from django.core.cache import cache
|
|
from django.utils.encoding import force_bytes, force_text
|
|
from django.utils.translation import ugettext_lazy as _
|
|
|
|
from . import schemas
|
|
|
|
psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
|
|
psycopg2.extensions.register_type(psycopg2.extensions.UNICODEARRAY)
|
|
|
|
|
|
class DimensionCell(collections.namedtuple('_Cell', ['dimension', 'value', 'value_label'])):
|
|
@property
|
|
def label(self):
|
|
if self.value_label:
|
|
return self.value_label
|
|
if self.value is None:
|
|
return self.dimension.absent_label
|
|
elif self.dimension.type == 'bool':
|
|
return _('Yes') if self.value else _('No')
|
|
else:
|
|
return force_text(self.value)
|
|
|
|
def __str__(self):
|
|
return force_text(self.label)
|
|
|
|
|
|
class MeasureCell(collections.namedtuple('_Cell', ['measure', 'value'])):
|
|
@property
|
|
def label(self):
|
|
value = self.value
|
|
|
|
if self.measure.type == 'percent':
|
|
if value is None:
|
|
return _('N/A')
|
|
else:
|
|
try:
|
|
return ('%4.2f' % float(value)).replace('.', ',') + ' %'
|
|
except TypeError:
|
|
return _('N/A')
|
|
elif self.measure.type == 'duration':
|
|
if value is None:
|
|
return '0'
|
|
else:
|
|
s = ''
|
|
if value.days:
|
|
s += '%d jour(s)' % value.days
|
|
if value.seconds // 3600:
|
|
s += ' %d heure(s)' % (value.seconds // 3600)
|
|
if not s:
|
|
s = 'moins d\'1 heure'
|
|
return s
|
|
elif self.measure.type == 'bool':
|
|
if value is None:
|
|
return _('N/A')
|
|
else:
|
|
return _('Yes') if value else _('No')
|
|
elif self.measure.type == 'integer':
|
|
if value is None:
|
|
return '0'
|
|
else:
|
|
return force_text(value)
|
|
else:
|
|
raise NotImplementedError('unknown type %s' % self.measure.type)
|
|
|
|
def __str__(self):
|
|
return force_text(self.label)
|
|
|
|
|
|
class Cells(collections.namedtuple('Cells', ['dimensions', 'measures'])):
|
|
def __new__(cls, dimensions=[], measures=[]):
|
|
dimensions = list(dimensions)
|
|
measures = list(measures)
|
|
return super().__new__(cls, dimensions, measures)
|
|
|
|
|
|
def quote(s):
|
|
return '"%s"' % s.replace('"', '\\"')
|
|
|
|
|
|
def cast_point(value, cur):
|
|
if value is None:
|
|
return None
|
|
return schemas.Point._make(map(float, value[1:-1].split(',')))
|
|
|
|
|
|
POINT = psycopg2.extensions.new_type((600,), 'POINT', cast_point)
|
|
psycopg2.extensions.register_type(POINT)
|
|
|
|
POINT_ARRAY = psycopg2.extensions.new_array_type((1017,), 'POINT[]', POINT)
|
|
psycopg2.extensions.register_type(POINT_ARRAY)
|
|
|
|
|
|
def to_tuple(cur, values):
|
|
return force_text(cur.mogrify(', '.join(['%s'] * len(values)), values))
|
|
|
|
|
|
Member = collections.namedtuple('Member', ['id', 'label'])
|
|
|
|
|
|
class EngineDimension:
|
|
def __init__(self, engine, engine_cube, dimension):
|
|
self.engine = engine
|
|
self.engine_cube = engine_cube
|
|
self.dimension = dimension
|
|
|
|
def __getattr__(self, name):
|
|
return getattr(self.dimension, name)
|
|
|
|
def cache_key(self, filters):
|
|
key = self.engine.path + self.engine_cube.name + self.name + repr(filters)
|
|
return hashlib.md5(force_bytes(key)).hexdigest()
|
|
|
|
def members(self, filters=()):
|
|
if self.type == 'bool':
|
|
return [Member(id=True, label=_('Yes')), Member(id=False, label=_('No'))]
|
|
|
|
cache_key = self.cache_key(filters)
|
|
members = cache.get(cache_key)
|
|
if members is not None:
|
|
self.engine.log.debug(
|
|
'MEMBERS: (from cache) dimension %s.%s filters=%s: %s',
|
|
self.engine_cube.name,
|
|
self.name,
|
|
filters,
|
|
members,
|
|
)
|
|
return members
|
|
|
|
members = []
|
|
|
|
value = self.value
|
|
value_label = self.value_label or value
|
|
order_by = self.order_by
|
|
|
|
joins = set(self.join or [])
|
|
conditions = []
|
|
for dimension_name, values in filters:
|
|
try:
|
|
dimension = self.engine_cube.dimensions[dimension_name]
|
|
except KeyError as e:
|
|
raise schemas.DimensionNotFound(str(e))
|
|
# we build a filter on two sufficient conditions :
|
|
# * the filter is on the same dimension as the one we currently query for members,
|
|
# * or, the dimension of the filter has impact on a join shared with the current dimension.
|
|
if dimension_name != self.name and not (set(dimension.join or []) & set(self.join or [])):
|
|
continue
|
|
condition, values = dimension.build_filter(values)
|
|
if not condition:
|
|
continue
|
|
with self.engine.get_cursor() as cursor: # Ugly...
|
|
condition = force_text(cursor.mogrify(condition, values))
|
|
conditions.append(condition)
|
|
# add all joins needed by the filter's dimension
|
|
if dimension.join:
|
|
joins.update(dimension.join)
|
|
|
|
with self.engine.get_cursor() as cursor:
|
|
sql = self.members_query
|
|
if not sql:
|
|
table_expression = '%s' % self.engine_cube.fact_table
|
|
if joins:
|
|
table_expression = self.engine_cube.build_table_expression(
|
|
joins, self.engine_cube.fact_table, force_join='right'
|
|
)
|
|
sql = 'SELECT %s AS value, %s::text AS label ' % (value, value_label)
|
|
sql += 'FROM %s ' % table_expression
|
|
if order_by:
|
|
if not isinstance(order_by, list):
|
|
order_by = [order_by]
|
|
else:
|
|
order_by = [value]
|
|
group_by = [value]
|
|
if value_label not in group_by:
|
|
group_by.append(value_label)
|
|
for order_value in order_by:
|
|
if order_value not in group_by:
|
|
group_by.append(order_value)
|
|
if conditions:
|
|
sql += 'WHERE %s ' % ' AND '.join(conditions)
|
|
sql += 'GROUP BY %s ' % ', '.join(group_by)
|
|
sql += 'ORDER BY (%s) ' % ', '.join(order_by)
|
|
sql = sql.format(fact_table=self.engine_cube.fact_table)
|
|
self.engine.log.debug('SQL: %s', sql)
|
|
cursor.execute(sql)
|
|
for row in cursor.fetchall():
|
|
if row[0] is None:
|
|
continue
|
|
members.append(Member(id=row[0], label=force_text(row[1])))
|
|
cache.set(cache_key, members, 600)
|
|
self.engine.log.debug(
|
|
'MEMBERS: dimension %s.%s filters=%s: %s', self.engine_cube.name, self.name, filters, members
|
|
)
|
|
return members
|
|
|
|
|
|
class SchemaJSONDimension(schemas.Dimension):
|
|
'''Generated dimensions for JSON fields keys'''
|
|
|
|
filter = False
|
|
order_by = None
|
|
group_by = None
|
|
join = ()
|
|
type = 'string'
|
|
|
|
def __init__(self, json_field, name):
|
|
super().__init__()
|
|
name = str(name)
|
|
self.name = name
|
|
self.label = name.title()
|
|
expr = '\"json_%s\".value' % name
|
|
self.value_label = expr
|
|
self.value = expr
|
|
self.join = ['json_' + name]
|
|
sql = (
|
|
'SELECT DISTINCT {json_field}->>\'%s\' AS v, {json_field}->>\'%s\' AS v'
|
|
' FROM {{fact_table}} WHERE ({json_field}->>\'%s\') IS NOT NULL ORDER BY v'
|
|
% (self.name, self.name, self.name)
|
|
)
|
|
self.members_query = sql.format(json_field=json_field)
|
|
self.filter_expression = '({fact_table}.id IS NULL OR ({fact_table}.%s->>\'%s\') IN (%%s))' % (
|
|
json_field,
|
|
name,
|
|
)
|
|
self.filter_needs_join = False
|
|
self.absent_label = _('None')
|
|
|
|
|
|
class EngineJSONDimension(EngineDimension):
|
|
def __init__(self, engine, engine_cube, name):
|
|
self.engine = engine
|
|
self.engine_cube = engine_cube
|
|
self.dimension = SchemaJSONDimension(self.engine_cube.json_field, name)
|
|
|
|
def cache_key(self, filters):
|
|
key = (
|
|
self.engine.path + self.engine_cube.json_field + self.engine_cube.name + self.name + repr(filters)
|
|
)
|
|
return hashlib.md5(force_bytes(key)).hexdigest()
|
|
|
|
def to_json(self):
|
|
return {
|
|
'name': self.name,
|
|
'label': self.label,
|
|
}
|
|
|
|
|
|
class EngineMeasure:
|
|
def __init__(self, engine, engine_cube, measure):
|
|
self.engine = engine
|
|
self.engine_cube = engine_cube
|
|
self.measure = measure
|
|
|
|
def __getattr__(self, name):
|
|
return getattr(self.measure, name)
|
|
|
|
|
|
class ProxyList:
|
|
chain = None
|
|
|
|
def __init__(self, engine, engine_cube, attribute, cls, chain=None, keyerror_class=KeyError):
|
|
self.engine = engine
|
|
self.engine_cube = engine_cube
|
|
self.attribute = attribute
|
|
self.cls = cls
|
|
self.keyerror_class = keyerror_class
|
|
if chain:
|
|
self.chain = chain(engine, engine_cube)
|
|
|
|
def __iter__(self):
|
|
i = (
|
|
self.cls(self.engine, self.engine_cube, o) for o in getattr(self.engine_cube.cube, self.attribute)
|
|
)
|
|
if self.chain:
|
|
i = itertools.chain(i, self.chain)
|
|
return i
|
|
|
|
def __getitem__(self, name):
|
|
try:
|
|
for o in getattr(self.engine_cube.cube, self.attribute):
|
|
if o.name == name:
|
|
return self.cls(self.engine, self.engine_cube, o)
|
|
if self.chain:
|
|
return self.chain[name]
|
|
except KeyError:
|
|
raise self.keyerror_class(name)
|
|
|
|
|
|
class JSONDimensions:
|
|
__cache = None
|
|
|
|
def __init__(self, engine, engine_cube):
|
|
self.engine = engine
|
|
self.engine_cube = engine_cube
|
|
|
|
@property
|
|
def cache(self):
|
|
if not self.engine_cube.json_field:
|
|
return []
|
|
if not self.__cache:
|
|
with self.engine.get_cursor() as cursor:
|
|
sql = 'select distinct jsonb_object_keys(%s) as a from %s order by a' % (
|
|
self.engine_cube.json_field,
|
|
self.engine_cube.fact_table,
|
|
)
|
|
cursor.execute(sql)
|
|
self.__cache = [row[0] for row in cursor.fetchall()]
|
|
return self.__cache
|
|
|
|
def __iter__(self):
|
|
for name in self.cache:
|
|
yield EngineJSONDimension(self.engine, self.engine_cube, name)
|
|
|
|
def __getitem__(self, key):
|
|
for name in self.cache:
|
|
if name == key:
|
|
return EngineJSONDimension(self.engine, self.engine_cube, name)
|
|
raise KeyError(key)
|
|
|
|
|
|
class ProxyListDescriptor:
|
|
def __init__(self, attribute, cls, chain=None, keyerror_class=KeyError):
|
|
self.attribute = attribute
|
|
self.cls = cls
|
|
self.chain = chain
|
|
self.keyerror_class = keyerror_class
|
|
|
|
def __get__(self, obj, t=None):
|
|
key = '_proxy_list_cache_%s' % id(self)
|
|
if key not in obj.__dict__:
|
|
obj.__dict__[key] = ProxyList(
|
|
obj.engine,
|
|
obj,
|
|
self.attribute,
|
|
self.cls,
|
|
chain=self.chain,
|
|
keyerror_class=self.keyerror_class,
|
|
)
|
|
return obj.__dict__[key]
|
|
|
|
|
|
class EngineCube:
|
|
dimensions = ProxyListDescriptor(
|
|
'all_dimensions', EngineDimension, chain=JSONDimensions, keyerror_class=schemas.DimensionNotFound
|
|
)
|
|
measures = ProxyListDescriptor('measures', EngineMeasure)
|
|
|
|
def __init__(self, warehouse, cube):
|
|
self.engine = warehouse
|
|
self.cube = cube
|
|
|
|
def __getattr__(self, name):
|
|
return getattr(self.cube, name)
|
|
|
|
def count(self):
|
|
with self.engine.get_cursor() as cursor:
|
|
cursor.execute('SELECT count(%s) FROM %s' % (self.key, self.fact_table))
|
|
return cursor.fetchone()[0]
|
|
|
|
def get_join(self, name):
|
|
if name.startswith('json_'):
|
|
json_key = name[5:]
|
|
return schemas.Join(
|
|
name=name,
|
|
table=(
|
|
'(SELECT DISTINCT %s.%s->>\'%s\' AS value FROM %s '
|
|
'WHERE (%s.%s->>\'%s\') IS NOT NULL ORDER BY value)'
|
|
)
|
|
% (
|
|
self.fact_table,
|
|
self.json_field,
|
|
json_key,
|
|
self.fact_table,
|
|
self.fact_table,
|
|
self.json_field,
|
|
json_key,
|
|
),
|
|
master='%s->>\'%s\'' % (self.json_field, json_key),
|
|
detail='value',
|
|
kind='left',
|
|
)
|
|
return self.cube.get_join(name)
|
|
|
|
def sql_query(self, filters, drilldown, measures, **kwargs):
|
|
with self.engine.get_cursor() as cursor:
|
|
projections = []
|
|
joins = set()
|
|
where = []
|
|
group_by = []
|
|
order_by = []
|
|
|
|
for dimension_name, values in filters:
|
|
dimension = self.dimensions[dimension_name]
|
|
# assert dimension.filter
|
|
condition, values = dimension.build_filter(values)
|
|
if not condition:
|
|
continue
|
|
condition = force_text(cursor.mogrify(condition, values))
|
|
if dimension.filter_needs_join and dimension.join:
|
|
joins.update(dimension.join)
|
|
if dimension.filter_in_join:
|
|
assert False, 'filter in join is not supported anymore'
|
|
else:
|
|
where.append(condition)
|
|
|
|
for dimension in drilldown:
|
|
joins.update(dimension.join or [])
|
|
projections.append('%s AS %s' % (dimension.value, dimension.name + '_value'))
|
|
if dimension.value_label:
|
|
projections.append('%s AS %s' % (dimension.value_label, dimension.name + '_label'))
|
|
group_by.append(dimension.group_by or dimension.value)
|
|
order_by.extend(dimension.order_by or [dimension.value])
|
|
|
|
for order_value in order_by:
|
|
if order_value not in group_by:
|
|
group_by.append(order_value)
|
|
|
|
for measure in measures:
|
|
if measure.expression not in projections:
|
|
projections.append(measure.expression + ' AS ' + measure.name)
|
|
sql = 'SELECT ' + ', '.join(projections)
|
|
table_expression = ' %s' % self.cube.fact_table
|
|
if joins:
|
|
table_expression = self.build_table_expression(joins, self.fact_table)
|
|
sql += ' FROM %s' % table_expression
|
|
where_conditions = 'true'
|
|
if where:
|
|
where_conditions = ' AND '.join(where)
|
|
where_conditions = where_conditions.format(fact_table=self.cube.fact_table)
|
|
sql += ' WHERE %s' % where_conditions
|
|
if group_by:
|
|
sql += ' GROUP BY %s' % ', '.join(group_by)
|
|
if order_by:
|
|
sql += ' ORDER BY %s' % ', '.join(order_by)
|
|
sql = sql.format(
|
|
fact_table=self.cube.fact_table,
|
|
table_expression=table_expression,
|
|
where_conditions=where_conditions,
|
|
)
|
|
return sql
|
|
|
|
def query(self, filters, drilldown, measures, **kwargs):
|
|
self.engine.log.debug(
|
|
'%s.%s query filters=%s drilldown=%s measures=%s',
|
|
self.engine.warehouse.name,
|
|
self.cube.name,
|
|
filters,
|
|
drilldown,
|
|
measures,
|
|
)
|
|
with self.engine.get_cursor() as cursor:
|
|
sql = self.sql_query(filters=filters, drilldown=drilldown, measures=measures, **kwargs)
|
|
self.engine.log.debug('SQL: %s', sql)
|
|
cursor.execute(sql)
|
|
for row in cursor.fetchall():
|
|
cells = Cells()
|
|
j = 0
|
|
for dimension in drilldown:
|
|
value = row[j]
|
|
if not dimension.value_label:
|
|
value_label = None
|
|
j += 1
|
|
else:
|
|
value_label = row[j + 1]
|
|
j += 2
|
|
cells.dimensions.append(
|
|
DimensionCell(
|
|
dimension=dimension,
|
|
value=value,
|
|
value_label=value_label,
|
|
)
|
|
)
|
|
for i, measure in enumerate(measures):
|
|
cells.measures.append(
|
|
MeasureCell(
|
|
measure=measure,
|
|
value=row[j + i],
|
|
)
|
|
)
|
|
yield cells
|
|
|
|
JOIN_KINDS = {
|
|
'inner': 'INNER JOIN',
|
|
'left': 'LEFT OUTER JOIN',
|
|
'right': 'RIGHT OUTER JOIN',
|
|
'full': 'FULL OUTER JOIN',
|
|
}
|
|
|
|
def build_table_expression(self, joins, table_name, force_join=None):
|
|
"""Recursively build the table expression from the join tree,
|
|
starting from the fact table"""
|
|
|
|
join_tree = {}
|
|
# Build join tree
|
|
for join_name in joins:
|
|
join = self.get_join(join_name)
|
|
master_table = join.master_table or self.fact_table
|
|
join_tree.setdefault(master_table, {}).setdefault(force_join or join.kind, {})[join.name] = join
|
|
|
|
def build_table_expression_helper(join_tree, table_name, alias=None, top=True):
|
|
contain_joins = False
|
|
|
|
sql = table_name
|
|
if alias:
|
|
sql += ' AS %s' % quote(alias)
|
|
|
|
for kind in ['left', 'inner', 'right', 'full']:
|
|
joins = join_tree.get(alias or table_name, {}).get(kind)
|
|
if not joins:
|
|
continue
|
|
for join_name, join in joins.items():
|
|
contain_joins = True
|
|
sql += ' %s ' % self.JOIN_KINDS[kind]
|
|
sql += ' ' + build_table_expression_helper(
|
|
join_tree, join.table, alias=join.name, top=False
|
|
)
|
|
condition = '%s.%s = %s.%s' % (
|
|
alias or table_name,
|
|
join.master.split('.')[-1],
|
|
quote(join.name),
|
|
join.detail,
|
|
)
|
|
sql += ' ON ' + condition
|
|
|
|
# if the table expression contains joins and is not the full table
|
|
# expression but used in another join, it must be quoted with
|
|
# parenthesis
|
|
if not top and contain_joins:
|
|
sql = '(%s)' % sql
|
|
|
|
return sql
|
|
|
|
return build_table_expression_helper(join_tree, table_name)
|
|
|
|
|
|
class Engine:
|
|
def __init__(self, warehouse):
|
|
self.warehouse = warehouse
|
|
self.log = logging.getLogger(__name__)
|
|
|
|
@property
|
|
def cubes(self):
|
|
for cube in self.warehouse.cubes:
|
|
yield EngineCube(self, cube)
|
|
|
|
def __getitem__(self, name):
|
|
return EngineCube(self, self.warehouse.get_cube(name))
|
|
|
|
def __getattr__(self, name):
|
|
return getattr(self.warehouse, name)
|
|
|
|
def timestamp(self):
|
|
return datetime.datetime.fromtimestamp(self.warehouse.timestamp) if self.warehouse.timestamp else None
|
|
|
|
@contextlib.contextmanager
|
|
def get_cursor(self):
|
|
with contextlib.closing(psycopg2.connect(self.warehouse.pg_dsn)) as connection:
|
|
with connection.cursor() as cursor:
|
|
search_path = ', '.join(['%s' % namespace for namespace in self.warehouse.search_path])
|
|
cursor.execute('SET SEARCH_PATH = %s' % search_path)
|
|
for statement in getattr(settings, 'BIJOE_INIT_SQL', []):
|
|
cursor.execute(statement)
|
|
yield cursor
|