summaryrefslogtreecommitdiffstats
path: root/bijoe/engine.py
blob: 8e0793d145714bc9f7a3fab0c647eb8112d12c33 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
import logging

import collections
import psycopg2

psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
psycopg2.extensions.register_type(psycopg2.extensions.UNICODEARRAY)

def to_tuple(cur, values):
    return cur.mogrify(', '.join(['%s'] * len(values)), values)


Member = collections.namedtuple('Member', ['id', 'label'])


class EngineDimension(object):
    def __init__(self, engine, engine_cube, dimension):
        self.engine = engine
        self.engine_cube = engine_cube
        self.dimension = dimension

    def __getattr__(self, name):
        return getattr(self.dimension, name)

    @property
    def members(self):
        assert self.dimension.type != 'date'
        cursor = self.engine.get_cursor()
        if self.dimension.join:
            join = self.engine_cube.get_join(self.dimension.join[-1])
            sql = 'SELECT %s AS value, %s::text AS label FROM %s AS %s ORDER BY %s' % (
                self.value, self.value_label or self.value, join.table, join.name, self.order_by or
                self.value)
        else:
            sql = 'SELECT %s AS value, %s::text AS label FROM {fact_table} ORDER BY %s' % (
                self.value, self.value_label or self.value, self.self.order_by or self.value)
        sql = sql.format(fact_table=self.engine_cube.fact_table)
        self.engine.log.debug('SQL: %s', sql)
        cursor.execute(sql)
        for row in cursor.fetchall():
            yield Member(*row)


class EngineMeasure(object):
    def __init__(self, engine, engine_cube, measure):
        self.engine = engine
        self.engine_cube = engine_cube
        self.measure = measure

    def __getattr__(self, name):
        return getattr(self.measure, name)


class ProxyList(object):
    def __init__(self, engine, engine_cube, attribute, cls):
        self.engine = engine
        self.engine_cube = engine_cube
        self.attribute = attribute
        self.cls = cls

    def __iter__(self):
        return (self.cls(self.engine, self.engine_cube, o)
                for o in getattr(self.engine_cube.cube, self.attribute))

    def __getitem__(self, name):
        for o in getattr(self.engine_cube.cube, self.attribute):
            if o.name == name:
                return self.cls(self.engine, self.engine_cube, o)
        raise KeyError


class ProxyListDescriptor(object):
    def __init__(self, attribute, cls):
        self.attribute = attribute
        self.cls = cls

    def __get__(self, obj, t=None):
        return ProxyList(obj.engine, obj, self.attribute, self.cls)


class EngineCube(object):
    dimensions = ProxyListDescriptor('all_dimensions', EngineDimension)
    measures = ProxyListDescriptor('measures', EngineMeasure)

    def __init__(self, warehouse, cube):
        self.engine = warehouse
        self.cube = cube

    def __getattr__(self, name):
        return getattr(self.cube, name)

    def count(self):
        cursor = self.engine.get_cursor()
        cursor.execute('SELECT count(%s) FROM %s' % (self.key, self.fact_table))
        return cursor.fetchone()[0]

    def sql_query(self, filters, drilldown, measures, **kwargs):
        cursor = self.engine.get_cursor()

        projections = []
        joins = set()
        where = []
        group_by = []
        order_by = []

        for dimension_name, values in filters:
            dimension = self.cube.get_dimension(dimension_name)
            assert dimension.filter
            condition, values = dimension.filter(values)
            condition = cursor.mogrify(condition, values)
            where.append(condition)
            joins.update(dimension.join)

        for dimension_name in drilldown:
            dimension = self.cube.get_dimension(dimension_name)
            joins.update(dimension.join)
            projections.append('%s AS %s' % (dimension.value_label or dimension.value,
                                             dimension.name))
            group_by.append(dimension.group_by or dimension.value)
            order_by.append(dimension.order_by or dimension.value)

        for measure_name in measures:
            measure = self.cube.get_measure(measure_name)
            if measure.expression not in projections:
                projections.append(measure.expression + ' AS ' + measure.name)
        sql = 'SELECT ' + ', '.join(projections)
        table_expression = ' %s' % self.cube.fact_table
        if joins:
            join_tree = {}
            for join_name in joins:
                join = self.cube.get_join(join_name)
                if '.' in join.master:
                    master_table = join.master.split('.', 1)[0]
                else:
                    master_table = self.fact_table
                join_tree.setdefault(master_table, {}).setdefault(join.kind, {})[join.name] = join

            def build_table_expression(table_name, alias=None, top=True):
                sql = table_name
                if alias:
                    sql += ' AS %s' % alias
                add_paren = False
                for kind in ['left', 'inner', 'right']:
                    joins = join_tree.get(table_name, {}).get(kind)
                    if not joins:
                        continue
                    add_paren = True
                    join_kinds = {
                        'inner': 'INNER JOIN',
                        'left': 'LEFT OUTER JOIN',
                        'right': 'RIGHT OUTER JOIN',
                    }
                    sql += ' %s ' % join_kinds[kind]
                    sub_joins = []
                    conditions = []
                    for join_name, join in joins.iteritems():
                        sub_joins.append(
                            build_table_expression(join.table, join.name, top=False))
                        conditions.append('%s.%s = %s.%s' % (alias or table_name,
                                                             join.master.split('.')[-1],
                                                             join.name, join.detail))
                    sub_sql = ' CROSS JOIN '.join(sub_joins)
                    if len(sub_joins) > 1:
                        sub_sql = '(%s)' % sub_sql
                    sql += sub_sql
                    sql += ' ON %s' % ' AND '.join(conditions)
                if not top and add_paren:
                    sql = '(%s)' % sql
                return sql

            table_expression = build_table_expression(self.fact_table)
        sql += ' FROM %s' % table_expression
        where_conditions = 'true'
        if where:
            where_conditions = ' AND '.join(where)
            sql += ' WHERE %s' % where_conditions
        if group_by:
            sql += ' GROUP BY %s' % ', '.join(group_by)
        if order_by:
            sql += ' ORDER BY %s' % ', '.join(order_by)
        sql = sql.format(fact_table=self.cube.fact_table,
                         table_expression=table_expression,
                         where_conditions=where_conditions)
        sql = sql.format(fact_table=self.cube.fact_table,
                         table_expression=table_expression,
                         where_conditions=where_conditions)
        return sql

    def query(self, filters, drilldown, measures, **kwargs):
        self.engine.log.debug('%s.%s query filters=%s drilldown=%s measures=%s',
                              self.engine.warehouse.name, self.cube.name, filters, drilldown,
                              measures)
        cells = []
        for dimension_name in drilldown:
            cells.append(self.dimensions[dimension_name])
        for measure_name in measures:
            cells.append(self.measures[measure_name])
        cursor = self.engine.get_cursor()
        sql = self.sql_query(filters=filters, drilldown=drilldown, measures=measures, **kwargs)
        self.engine.log.debug('SQL: %s', sql)
        cursor.execute(sql)
        for row in cursor.fetchall():
            yield zip(cells, row)


class Engine(object):
    def __init__(self, warehouse):
        self.warehouse = warehouse
        self.log = logging.getLogger(__name__)

    @property
    def cubes(self):
        for cube in self.warehouse.cubes:
            yield EngineCube(self, cube)

    def __getitem__(self, name):
        return EngineCube(self, self.warehouse.get_cube(name))

    def __getattr__(self, name):
        return getattr(self.warehouse, name)

    def get_cursor(self):
        connection = psycopg2.connect(
            self.warehouse.pg_dsn)
        cursor = connection.cursor()
        search_path = ', '.join(['"%s"' % namespace for namespace in self.warehouse.search_path])
        cursor.execute('SET SEARCH_PATH = %s' % search_path)
        return cursor