bijoe/bijoe/schemas.py

375 lines
12 KiB
Python

# -*- coding: utf-8 -*-
#
# bijoe - BI dashboard
# Copyright (C) 2015 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import datetime
import decimal
import collections
from .relative_time import RelativeDate
Point = collections.namedtuple('Point', ['x', 'y'])
TYPE_MAP = {
'duration': datetime.timedelta,
'date': datetime.date,
'integer': int,
'decimal': decimal.Decimal,
'percent': float,
'point': Point,
}
class SchemaError(Exception):
pass
def type_cast(t):
if t not in TYPE_MAP:
raise SchemaError('invalid type: %r' % t)
return t
class Base(object):
__types__ = {}
def __init__(self, **kwargs):
for k, v in kwargs.iteritems():
if k in self.__types__ and self.__types__ == 'str':
v = str(v)
setattr(self, k, v)
@classmethod
def slots(cls):
s = set()
for cls in reversed(cls.__mro__):
if hasattr(cls, '__slots__'):
s.update(cls.__slots__)
return s
@classmethod
def types(cls):
d = {}
for cls in reversed(cls.__mro__):
if hasattr(cls, '__types__'):
d.update(cls.__types__)
return d
@classmethod
def from_json(cls, d):
assert hasattr(d, 'keys')
slots = cls.slots()
assert set(d.keys()) <= set(slots), \
'given keys %r does not match %s.__slots__: %r' % (d.keys(), cls.__name__, slots)
types = cls.types()
kwargs = {}
for key in slots:
assert key in d or hasattr(cls, key), \
'%s.%s is is a mandatory attribute' % (cls.__name__, key)
if not key in d:
continue
value = d[key]
if key in types:
kls = types[key]
if isinstance(kls, list):
kls = kls[0]
if not isinstance(value, list):
value = [value]
if hasattr(kls, 'from_json'):
value = [kls.from_json(v) for v in value]
else:
value = [kls(v) for v in value]
elif hasattr(kls, 'from_json'):
value = kls.from_json(value)
else:
value = kls(value)
kwargs[key] = value
return cls(**kwargs)
def to_json(self):
d = {}
types = self.types()
for attr in self.slots():
try:
v = getattr(self, attr)
except AttributeError:
pass
else:
if attr in types and hasattr(types[attr], 'to_json'):
v = types[attr].to_json(v)
if isinstance(v, list):
v = [x.to_json() if hasattr(x, 'to_json') else x for x in v]
d[attr] = v
return d
def __repr__(self):
kwargs = ['%s=%r' % (key, getattr(self, key)) for key in self.slots() if hasattr(self, key)]
return '<%s %s>' % (self.__class__.__name__, ' '.join(kwargs))
class Measure(Base):
__slots__ = ['name', 'label', 'type', 'expression']
__types__ = {
'name': str,
'label': unicode,
'type': type_cast,
'expression': str,
}
class Dimension(Base):
__slots__ = ['name', 'label', 'type', 'join', 'value', 'value_label',
'order_by', 'group_by', 'filter_in_join', 'filter', 'filter_value',
'filter_needs_join', 'filter_expression']
__types__ = {
'name': str,
'label': unicode,
'type': str,
'join': [str],
'value': str,
'value_label': str,
'order_by': [str],
'group_by': str,
'filter': bool,
'members_query': str,
# do filtering need joins ?
'filter_in_join': bool,
'filter_value': str,
'filter_needs_join': bool,
}
label = None
value_label = None
order_by = None
group_by = None
join = None
filter = True
filter_in_join = False
filter_value = None
filter_expression = None
filter_needs_join = True
members_query = None
@property
def dimensions(self):
if self.type == 'date':
filter_value = self.filter_value or self.value
return [
self,
Dimension(
label=u'année (%s)' % self.label,
name=self.name + '__year',
type='integer',
join=self.join,
filter_value='EXTRACT(year from %s)::integer' % filter_value,
filter_in_join=self.filter_in_join,
value='EXTRACT(year from %s)::integer' % self.value,
filter=False),
Dimension(
label=u'mois (%s)' % self.label,
name=self.name + '__month',
type='integer',
filter_value='EXTRACT(month from %s)' % filter_value,
filter_in_join=self.filter_in_join,
join=self.join,
value='EXTRACT(month from %s)' % self.value,
value_label='to_char(date_trunc(\'month\', %s), \'TMmonth\')' % self.value,
group_by='EXTRACT(month from %s), '
'to_char(date_trunc(\'month\', %s), \'TMmonth\')'
% (self.value, self.value),
filter=False),
Dimension(
label=u'jour de la semaine (%s)' % self.label,
name=self.name + '__dow',
type='integer',
join=self.join,
filter_value='EXTRACT(dow from %s)' % filter_value,
filter_in_join=self.filter_in_join,
value='EXTRACT(dow from %s)' % self.value,
order_by=['(EXTRACT(dow from %s) + 6)::integer %% 7' % self.value],
value_label='to_char(date_trunc(\'week\', current_date)::date '
'+ EXTRACT(dow from %s)::integer - 1, \'TMday\')' % self.value,
filter=False),
Dimension(
label=u'semaine (%s)' % self.label,
name=self.name + '__isoweek',
type='integer',
join=self.join,
filter_value='EXTRACT(isoyear from %s) || \'S\' || EXTRACT(week from %s)'
% (filter_value, filter_value),
filter_in_join=self.filter_in_join,
value='EXTRACT(isoyear from %s) || \'S\' || EXTRACT(week from %s)'
% (self.value, self.value),
group_by='EXTRACT(isoyear from %s), EXTRACT(week from %s)' % (self.value,
self.value),
order_by=['EXTRACT(isoyear from %s), EXTRACT(week from %s)' % (self.value,
self.value)],
filter=False)
]
return [self]
def build_filter(self, filter_values):
value = self.filter_value or self.value
if self.type == 'date':
assert isinstance(filter_values, dict) and set(filter_values.keys()) == set(['start',
'end'])
filters = []
values = []
def date_filter(tpl, filter_value):
if not isinstance(filter_value, (datetime.date, datetime.datetime)):
filter_value = RelativeDate(filter_value)
filters.append(tpl % (value, '%s'))
values.append(filter_value)
try:
if filter_values['start']:
date_filter('%s >= %s', filter_values['start'])
except IndexError:
pass
try:
if filter_values['end']:
date_filter('%s < %s', filter_values['end'])
except IndexError:
pass
return ' AND '.join(filters), values
else:
if not isinstance(filter_values, list):
filter_values = [filter_values]
if not filter_values:
return '', []
is_none = None in filter_values
filter_values = [v for v in filter_values if v is not None]
if self.type == 'integer':
values = [int(v) for v in filter_values]
else:
values = filter_values
s = ', '.join(['%s'] * len(values))
if self.filter_expression:
expression = self.filter_expression % s
else:
expression = '%s IN (%s)' % (value, s)
if is_none:
expression = '((%s) OR (%s IS NULL))' % (expression, value)
return expression, values
def join_kind(kind):
if kind not in ('inner', 'left', 'right', 'full'):
raise ValueError('bad join kind: %s' % kind)
return kind
class Join(Base):
__slots__ = ['name', 'table', 'master', 'detail', 'kind']
__types__ = {
'name': str,
'table': str,
'master': str,
'detail': str,
'kind': join_kind,
}
kind = 'right'
@property
def master_table(self):
if '.' in self.master:
return self.master.split('.', 1)[0]
return None
class Cube(Base):
__slots__ = ['name', 'label', 'fact_table', 'json_field', 'key', 'joins', 'dimensions',
'measures']
__types__ = {
'name': str,
'label': unicode,
'fact_table': str,
'json_field': str,
'key': str,
'joins': [Join],
'dimensions': [Dimension],
'measures': [Measure],
}
json_field = None
joins = ()
dimensions = ()
measures = ()
def check(self):
names = collections.Counter()
names.update(join.name for join in self.joins)
names.update(dimension.name for dimension in self.dimensions)
names.update(measure.name for measure in self.measures)
duplicates = [k for k, v in names.iteritems() if v > 1]
if duplicates:
raise SchemaError(
'More than one join, dimension or measure with name(s) %s' % ', '.join(duplicates))
@property
def all_dimensions(self):
for dimension in self.dimensions:
for sub_dimension in dimension.dimensions:
yield sub_dimension
def get_dimension(self, name):
for dimension in self.dimensions:
for sub_dimension in dimension.dimensions:
if sub_dimension.name == name:
return sub_dimension
raise KeyError
def get_join(self, name):
for join in self.joins:
if join.name == name:
return join
raise KeyError
def get_measure(self, name):
for measure in self.measures:
if measure.name == name:
return measure
raise KeyError
class Warehouse(Base):
__slots__ = ['name', 'label', 'pg_dsn', 'search_path', 'cubes']
__types__ = {
'name': str,
'label': unicode,
'pg_dsn': str,
'search_path': [str],
'cubes': [Cube],
'search_path': [str],
}
def check(self):
names = collections.Counter(cube.name for cube in self.cubes)
duplicates = [k for k, v in names.iteritems() if v > 1]
if duplicates:
raise SchemaError('More than one cube with name(s) %s' % ', '.join(duplicates))
def get_cube(self, name):
for cube in self.cubes:
if cube.name == name:
return cube
raise KeyError