bijoe/bijoe/schemas.py

470 lines
15 KiB
Python

#
# bijoe - BI dashboard
# Copyright (C) 2015 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import collections
import datetime
import decimal
from django.utils import six
from django.utils.encoding import force_text
from django.utils.functional import Promise
from django.utils.translation import ugettext_lazy as _
from .relative_time import RelativeDate
Point = collections.namedtuple('Point', ['x', 'y'])
TYPE_MAP = {
'duration': datetime.timedelta,
'date': datetime.date,
'integer': int,
'decimal': decimal.Decimal,
'percent': float,
'point': Point,
}
class SchemaError(Exception):
pass
def type_cast(t):
if t not in TYPE_MAP:
raise SchemaError('invalid type: %r' % t)
return t
class Base:
__types__ = {}
def __init__(self, **kwargs):
for k, v in kwargs.items():
if k in self.__types__ and self.__types__ == 'str':
v = str(v)
setattr(self, k, v)
@classmethod
def slots(cls):
s = set()
for cls in reversed(cls.__mro__):
if hasattr(cls, '__slots__'):
s.update(cls.__slots__)
return s
@classmethod
def types(cls):
d = {}
for cls in reversed(cls.__mro__):
if hasattr(cls, '__types__'):
d.update(cls.__types__)
return d
@classmethod
def from_json(cls, d):
assert hasattr(d, 'keys')
slots = cls.slots()
assert set(d.keys()) <= set(slots), 'given keys %r does not match %s.__slots__: %r' % (
d.keys(),
cls.__name__,
slots,
)
types = cls.types()
kwargs = {}
for key in slots:
assert key in d or hasattr(cls, key), '%s.%s is is a mandatory attribute' % (cls.__name__, key)
if not key in d:
continue
value = d[key]
if key in types:
kls = types[key]
if isinstance(kls, list):
kls = kls[0]
if not isinstance(value, list):
value = [value]
if hasattr(kls, 'from_json'):
value = [kls.from_json(v) for v in value]
else:
value = [kls(v) for v in value]
elif hasattr(kls, 'from_json'):
value = kls.from_json(value)
else:
value = kls(value)
kwargs[key] = value
return cls(**kwargs)
def to_json(self):
d = {}
types = self.types()
for attr in self.slots():
try:
v = getattr(self, attr)
except AttributeError:
pass
else:
if attr in types and hasattr(types[attr], 'to_json'):
v = types[attr].to_json(v)
if isinstance(v, list):
v = [x.to_json() if hasattr(x, 'to_json') else x for x in v]
if isinstance(v, Promise):
v = force_text(v)
d[attr] = v
return d
def __repr__(self):
kwargs = ['%s=%r' % (key, getattr(self, key)) for key in self.slots() if hasattr(self, key)]
return '<%s %s>' % (self.__class__.__name__, ' '.join(kwargs))
class Measure(Base):
__slots__ = ['name', 'label', 'type', 'expression']
__types__ = {
'name': str,
'label': str,
'type': type_cast,
'expression': str,
}
@property
def default_value(self):
if self.type in ['integer', 'percent']:
return 0
return None
class Dimension(Base):
__slots__ = [
'name',
'label',
'type',
'join',
'value',
'value_label',
'order_by',
'group_by',
'filter_in_join',
'filter',
'filter_value',
'filter_needs_join',
'filter_expression',
'absent_label',
]
__types__ = {
'name': str,
'label': str,
'type': str,
'join': [str],
'value': str,
'value_label': str,
'order_by': [str],
'group_by': str,
'filter': bool,
'members_query': str,
# do filtering need joins ?
'filter_in_join': bool,
'filter_value': str,
'filter_needs_join': bool,
'absent_label': str,
}
def __init__(self, **kwargs):
self.label = None
self.value_label = None
self.order_by = None
self.group_by = None
self.join = None
self.filter = True
self.filter_in_join = False
self.filter_value = None
self.filter_expression = None
self.filter_needs_join = True
self.members_query = None
self.absent_label = None
super().__init__(**kwargs)
if not self.absent_label:
if self.type in ('date', 'integer', 'string'):
self.absent_label = _('None')
elif self.type in ('bool',):
self.absent_label = _('N/A')
else:
raise NotImplementedError('not absent label for type %r' % self.type)
@property
def dimensions(self):
if self.type == 'date':
filter_value = self.filter_value or self.value
return [
self,
Dimension(
label='année (%s)' % self.label,
name=self.name + '__year',
type='integer',
join=self.join,
filter_value='EXTRACT(year from %s)::integer' % filter_value,
filter_in_join=self.filter_in_join,
value='EXTRACT(year from %s)::integer' % self.value,
filter=False,
),
Dimension(
label='année et mois (%s)' % self.label,
name=self.name + '__yearmonth',
type='integer',
join=self.join,
filter_value='EXTRACT(year from %s) || \'M\' || EXTRACT(month from %s)'
% (filter_value, filter_value),
filter_in_join=self.filter_in_join,
value='TO_CHAR(EXTRACT(month from %s), \'00\') || \'/\' || EXTRACT(year from %s)'
% (self.value, self.value),
group_by='EXTRACT(year from %s), EXTRACT(month from %s)' % (self.value, self.value),
order_by=['EXTRACT(year from %s), EXTRACT(month from %s)' % (self.value, self.value)],
filter=False,
),
Dimension(
label='mois (%s)' % self.label,
name=self.name + '__month',
type='integer',
filter_value='EXTRACT(month from %s)' % filter_value,
filter_in_join=self.filter_in_join,
join=self.join,
value='EXTRACT(month from %s)' % self.value,
value_label='to_char(date_trunc(\'month\', %s), \'TMmonth\')' % self.value,
group_by='EXTRACT(month from %s), to_char(date_trunc(\'month\', %s), \'TMmonth\')'
% (self.value, self.value),
filter=False,
),
Dimension(
label='jour de la semaine (%s)' % self.label,
name=self.name + '__dow',
type='integer',
join=self.join,
filter_value='EXTRACT(dow from %s)' % filter_value,
filter_in_join=self.filter_in_join,
value='EXTRACT(dow from %s)' % self.value,
order_by=['(EXTRACT(dow from %s) + 6)::integer %% 7' % self.value],
value_label=(
'to_char(date_trunc(\'week\', current_date)::date '
'+ EXTRACT(dow from %s)::integer - 1, \'TMday\')'
)
% self.value,
filter=False,
),
Dimension(
label='semaine (%s)' % self.label,
name=self.name + '__isoweek',
type='integer',
join=self.join,
filter_value='EXTRACT(isoyear from %s) || \'S\' || EXTRACT(week from %s)'
% (filter_value, filter_value),
filter_in_join=self.filter_in_join,
value='EXTRACT(isoyear from %s) || \'S\' || EXTRACT(week from %s)'
% (self.value, self.value),
group_by='EXTRACT(isoyear from %s), EXTRACT(week from %s)' % (self.value, self.value),
order_by=['EXTRACT(isoyear from %s), EXTRACT(week from %s)' % (self.value, self.value)],
filter=False,
),
]
return [self]
def build_filter(self, filter_values):
value = self.filter_value or self.value
if self.type == 'date':
assert isinstance(filter_values, dict) and set(filter_values.keys()) == {'start', 'end'}
filters = []
values = []
def date_filter(tpl, filter_value):
if not isinstance(filter_value, (datetime.date, datetime.datetime)):
filter_value = RelativeDate(filter_value)
filters.append(tpl % (value, '%s'))
values.append(filter_value)
try:
if filter_values['start']:
date_filter('%s >= %s', filter_values['start'])
except IndexError:
pass
try:
if filter_values['end']:
date_filter('%s < %s', filter_values['end'])
except IndexError:
pass
return ' AND '.join(filters), values
else:
if not isinstance(filter_values, list):
filter_values = [filter_values]
if not filter_values:
return '', []
is_none = None in filter_values
filter_values = [v for v in filter_values if v is not None]
values = []
for filter_value in filter_values:
try:
if self.type == 'string':
filter_value = str(filter_value)
elif self.type == 'integer':
filter_value = int(filter_value)
elif self.type == 'bool':
filter_value = bool(filter_value)
else:
raise NotImplementedError
except (ValueError, TypeError):
continue
values.append(filter_value)
expressions = []
if values:
s = ', '.join(['%s'] * len(values))
if self.filter_expression:
expressions.append(self.filter_expression % s)
else:
expressions.append('%s IN (%s)' % (value, s))
if is_none:
expressions.append('%s IS NULL' % value)
if len(expressions) == 0:
expression = 'FALSE'
elif len(expressions) == 1:
expression = expressions[0]
else:
expression = ' OR '.join('(%s)' % e for e in expressions)
return expression, values
def join_kind(kind):
if kind not in ('inner', 'left', 'right', 'full'):
raise ValueError('bad join kind: %s' % kind)
return kind
class Join(Base):
__slots__ = ['name', 'table', 'master', 'detail', 'kind']
__types__ = {
'name': str,
'table': str,
'master': str,
'detail': str,
'kind': join_kind,
}
def __init__(self, **kwargs):
self.kind = 'full'
super().__init__(**kwargs)
@property
def master_table(self):
if '.' in self.master:
return self.master.split('.', 1)[0]
return None
class Cube(Base):
__slots__ = [
'name',
'label',
'fact_table',
'json_field',
'key',
'joins',
'dimensions',
'measures',
'warnings',
]
__types__ = {
'name': str,
'label': str,
'fact_table': str,
'json_field': str,
'key': str,
'joins': [Join],
'dimensions': [Dimension],
'measures': [Measure],
'warnings': [str],
}
def __init__(self, **kwargs):
self.json_field = None
self.joins = ()
self.dimensions = ()
self.measures = ()
self.warnings = ()
super().__init__(**kwargs)
def check(self):
names = collections.Counter()
names.update(join.name for join in self.joins)
names.update(dimension.name for dimension in self.dimensions)
names.update(measure.name for measure in self.measures)
duplicates = [k for k, v in names.items() if v > 1]
if duplicates:
raise SchemaError(
'More than one join, dimension or measure with name(s) %s' % ', '.join(duplicates)
)
@property
def all_dimensions(self):
for dimension in self.dimensions:
yield from dimension.dimensions
def get_dimension(self, name):
for dimension in self.dimensions:
for sub_dimension in dimension.dimensions:
if sub_dimension.name == name:
return sub_dimension
raise KeyError
def get_join(self, name):
for join in self.joins:
if join.name == name:
return join
raise KeyError
def get_measure(self, name):
for measure in self.measures:
if measure.name == name:
return measure
raise KeyError
class Warehouse(Base):
__slots__ = ['name', 'slug', 'label', 'pg_dsn', 'search_path', 'cubes', 'path', 'timestamp']
__types__ = {
'name': str,
'slug': str,
'label': str,
'pg_dsn': str,
'search_path': [str],
'cubes': [Cube],
'path': str,
}
def __init__(self, **kwargs):
self.path = None
self.slug = None
self.timestamp = None
super().__init__(**kwargs)
def check(self):
names = collections.Counter(cube.name for cube in self.cubes)
duplicates = [k for k, v in names.items() if v > 1]
if duplicates:
raise SchemaError('More than one cube with name(s) %s' % ', '.join(duplicates))
def get_cube(self, name):
for cube in self.cubes:
if cube.name == name:
return cube
raise KeyError