This repository has been archived on 2023-02-21. You can view files and clone it, but cannot push or open issues or pull requests.
publik-bi/bijoe/engine.py

217 lines
8.1 KiB
Python

import logging
import collections
import psycopg2
def to_tuple(cur, values):
return cur.mogrify(', '.join(['%s'] * len(values)), values)
Member = collections.namedtuple('Member', ['id', 'label'])
class EngineDimension(object):
def __init__(self, engine, engine_cube, dimension):
self.engine = engine
self.engine_cube = engine_cube
self.dimension = dimension
def __getattr__(self, name):
return getattr(self.dimension, name)
@property
def members(self):
assert self.dimension.type != 'date'
cursor = self.engine.get_cursor()
if self.dimension.join:
join = self.engine_cube.get_join(self.dimension.join[-1])
sql = 'SELECT %s AS value, %s::text AS label FROM %s AS %s ORDER BY %s' % (
self.value, self.value_label or self.value, join.table, join.name, self.order_by or
self.value)
else:
sql = 'SELECT %s AS value, %s::text AS label FROM {fact_table} ORDER BY %s' % (
self.value, self.value_label or self.value, self.self.order_by or self.value)
sql = sql.format(fact_table=self.engine_cube.fact_table)
self.engine.log.debug('SQL: %s', sql)
cursor.execute(sql)
for row in cursor.fetchall():
yield Member(*row)
class EngineMeasure(object):
def __init__(self, engine, engine_cube, measure):
self.engine = engine
self.engine_cube = engine_cube
self.measure = measure
def __getattr__(self, name):
return getattr(self.measure, name)
class ProxyList(object):
def __init__(self, engine, engine_cube, attribute, cls):
self.engine = engine
self.engine_cube = engine_cube
self.attribute = attribute
self.cls = cls
def __iter__(self):
return (self.cls(self.engine, self.engine_cube, o)
for o in getattr(self.engine_cube.cube, self.attribute))
def __getitem__(self, name):
for o in getattr(self.engine_cube.cube, self.attribute):
if o.name == name:
return self.cls(self.engine, self.engine_cube, o)
raise KeyError
class ProxyListDescriptor(object):
def __init__(self, attribute, cls):
self.attribute = attribute
self.cls = cls
def __get__(self, obj, t=None):
return ProxyList(obj.engine, obj, self.attribute, self.cls)
class EngineCube(object):
dimensions = ProxyListDescriptor('all_dimensions', EngineDimension)
measures = ProxyListDescriptor('measures', EngineMeasure)
def __init__(self, warehouse, cube):
self.engine = warehouse
self.cube = cube
def __getattr__(self, name):
return getattr(self.cube, name)
def sql_query(self, filters, drilldown, measures, **kwargs):
cursor = self.engine.get_cursor()
projections = []
joins = set()
where = []
group_by = []
order_by = []
for dimension_name, values in filters:
dimension = self.cube.get_dimension(dimension_name)
assert dimension.filter
condition, values = dimension.filter(values)
condition = cursor.mogrify(condition, values)
where.append(condition)
joins.update(dimension.join)
for dimension_name in drilldown:
dimension = self.cube.get_dimension(dimension_name)
joins.update(dimension.join)
projections.append('%s AS %s' % (dimension.value_label or dimension.value,
dimension.name))
group_by.append(dimension.group_by or dimension.value)
order_by.append(dimension.order_by or dimension.value)
for measure_name in measures:
measure = self.cube.get_measure(measure_name)
if measure.expression not in projections:
projections.append(measure.expression + ' AS ' + measure.name)
sql = 'SELECT ' + ', '.join(projections)
table_expression = ' %s' % self.cube.fact_table
if joins:
join_tree = {}
for join_name in joins:
join = self.cube.get_join(join_name)
if '.' in join.master:
master_table = join.master.split('.', 1)[0]
else:
master_table = self.fact_table
join_tree.setdefault(master_table, {}).setdefault(join.kind, {})[join.name] = join
def build_table_expression(table_name, alias=None, top=True):
sql = table_name
if alias:
sql += ' AS %s' % alias
add_paren = False
for kind in ['left', 'inner', 'right']:
joins = join_tree.get(table_name, {}).get(kind)
if not joins:
continue
add_paren = True
join_kinds = {
'inner': 'INNER JOIN',
'left': 'LEFT OUTER JOIN',
'right': 'RIGHT OUTER JOIN',
}
sql += ' %s ' % join_kinds[kind]
sub_joins = []
conditions = []
for join_name, join in joins.iteritems():
sub_joins.append(
build_table_expression(join.table, join.name, top=False))
conditions.append('%s.%s = %s.%s' % (alias or table_name,
join.master.split('.')[-1],
join.name, join.detail))
sub_sql = ' CROSS JOIN '.join(sub_joins)
if len(sub_joins) > 1:
sub_sql = '(%s)' % sub_sql
sql += sub_sql
sql += ' ON %s' % ' AND '.join(conditions)
if not top and add_paren:
sql = '(%s)' % sql
return sql
table_expression = build_table_expression(self.fact_table)
sql += ' FROM %s' % table_expression
where_conditions = 'true'
if where:
where_conditions = ' AND '.join(where)
sql += ' WHERE %s' % where_conditions
if group_by:
sql += ' GROUP BY %s' % ', '.join(group_by)
if order_by:
sql += ' ORDER BY %s' % ', '.join(order_by)
sql = sql.format(fact_table=self.cube.fact_table,
table_expression=table_expression,
where_conditions=where_conditions)
sql = sql.format(fact_table=self.cube.fact_table,
table_expression=table_expression,
where_conditions=where_conditions)
return sql
def query(self, filters, drilldown, measures, **kwargs):
self.engine.log.debug('%s.%s query filters=%s drilldown=%s measures=%s',
self.engine.warehouse.name, self.cube.name, filters, drilldown,
measures)
cells = []
for dimension_name in drilldown:
cells.append(self.dimensions[dimension_name])
for measure_name in measures:
cells.append(self.measures[measure_name])
cursor = self.engine.get_cursor()
sql = self.sql_query(filters=filters, drilldown=drilldown, measures=measures, **kwargs)
self.engine.log.debug('SQL: %s', sql)
cursor.execute(sql)
for row in cursor.fetchall():
yield zip(cells, row)
class Engine(object):
def __init__(self, warehouse):
self.warehouse = warehouse
self.log = logging.getLogger(__name__)
def __getitem__(self, name):
return EngineCube(self, self.warehouse.get_cube(name))
def __getattr__(self, name):
return getattr(self.warehouse, name)
def get_cursor(self):
connection = psycopg2.connect(
self.warehouse.pg_dsn)
cursor = connection.cursor()
search_path = ', '.join(['"%s"' % namespace for namespace in self.warehouse.search_path])
cursor.execute('SET SEARCH_PATH = %s' % search_path)
return cursor