combo/combo/apps/dataviz/models.py

529 lines
18 KiB
Python

# combo - content management system
# Copyright (C) 2014-2015 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import copy
import os
from datetime import date
from requests.exceptions import RequestException
from django.urls import reverse
from django.db import models
from django.utils import timezone
from django.utils.encoding import force_text
from django.utils.translation import ugettext_lazy as _, ungettext, gettext
from django.conf import settings
from jsonfield import JSONField
from requests.exceptions import HTTPError
import pygal
import pygal.util
from combo.data.models import CellBase
from combo.data.library import register_cell_class
from combo.utils import get_templated_url, requests
class UnsupportedDataSet(Exception):
pass
@register_cell_class
class Gauge(CellBase):
title = models.CharField(_('Title'), max_length=150, blank=True, null=True)
url = models.CharField(_('URL'), max_length=150, blank=True, null=True)
data_source = models.CharField(_('Data Source'), max_length=150, blank=True, null=True)
jsonp_data_source = models.BooleanField(_('Use JSONP to get data'), default=True)
max_value = models.PositiveIntegerField(_('Max Value'), blank=True, null=True)
template_name = 'combo/gauge-cell.html'
class Media:
js = ('js/gauge.min.js', 'js/combo.gauge.js')
class Meta:
verbose_name = _('Gauge')
def get_additional_label(self):
return self.title
def is_relevant(self, context):
return bool(self.data_source)
def get_cell_extra_context(self, context):
if self.jsonp_data_source:
data_source_url = get_templated_url(self.data_source)
else:
data_source_url = reverse('combo-ajax-gauge-count', kwargs={'cell': self.id})
return {
'cell': self,
'title': self.title,
'url': get_templated_url(self.url) if self.url else None,
'max_value': self.max_value,
'data_source_url': data_source_url,
'jsonp': self.jsonp_data_source,
}
@register_cell_class
class ChartCell(CellBase):
template_name = 'combo/dataviz-chart.html'
title = models.CharField(_('Title'), max_length=150, blank=True, null=True)
url = models.URLField(_('URL'), max_length=250, blank=True, null=True)
class Meta:
verbose_name = _('Chart (legacy)')
@classmethod
def is_enabled(self):
return (
settings.LEGACY_CHART_CELL_ENABLED
and hasattr(settings, 'KNOWN_SERVICES')
and settings.KNOWN_SERVICES.get('bijoe')
)
def get_default_form_class(self):
from .forms import ChartForm
return ChartForm
def get_additional_label(self):
if self.title:
return self.title
return ''
def get_cell_extra_context(self, context):
context = super(ChartCell, self).get_cell_extra_context(context)
context['title'] = self.title
context['url'] = self.url
return context
class StatisticManager(models.Manager):
def get_by_natural_key(self, slug, site_slug, service_slug):
return self.get_or_create(slug=slug, site_slug=site_slug, service_slug=service_slug)[0]
class Statistic(models.Model):
slug = models.SlugField(_('Slug'), max_length=256)
label = models.CharField(_('Label'), max_length=256)
site_slug = models.SlugField(_('Site slug'), max_length=256)
service_slug = models.SlugField(_('Service slug'), max_length=256)
site_title = models.CharField(_('Site title'), max_length=256)
url = models.URLField(_('Data URL'))
filters = JSONField(default=list)
available = models.BooleanField(_('Available data'), default=True)
last_update = models.DateTimeField(_('Last update'), null=True, auto_now=True)
objects = StatisticManager()
class Meta:
ordering = ['-available', 'site_title', 'label']
unique_together = ['slug', 'site_slug', 'service_slug']
def __str__(self):
name = _('%(title)s: %(label)s') % {'title': self.site_title or self.site_slug, 'label': self.label}
if not self.available:
name = _('%s (unavailable)') % name
return name
def natural_key(self):
return (self.slug, self.site_slug, self.service_slug)
@register_cell_class
class ChartNgCell(CellBase):
TIME_FILTERS = (
('current-year', _('Current year')),
('previous-year', _('Previous year')),
('current-month', _('Current month')),
('previous-month', _('Previous month')),
('range', _('Free range')),
)
statistic = models.ForeignKey(
verbose_name=_('Data'),
to=Statistic,
blank=False,
null=True,
on_delete=models.SET_NULL,
related_name='cells',
help_text=_(
'This list may take a few seconds to be updated, please refresh the page if an item is missing.'
),
)
filter_params = JSONField(default=dict)
title = models.CharField(_('Title'), max_length=150, blank=True)
time_range = models.CharField(
_('Filtering (time)'),
max_length=20,
blank=True,
choices=(
('current-year', _('Current year')),
('previous-year', _('Previous year')),
('current-month', _('Current month')),
('previous-month', _('Previous month')),
('range', _('Free range')),
),
)
time_range_start = models.DateField(_('From'), null=True, blank=True)
time_range_end = models.DateField(_('To'), null=True, blank=True)
chart_type = models.CharField(
_('Chart Type'),
max_length=20,
default='bar',
choices=(
('bar', _('Bar')),
('horizontal-bar', _('Horizontal Bar')),
('stacked-bar', _('Stacked Bar')),
('line', _('Line')),
('pie', _('Pie')),
('dot', _('Dot')),
('table', _('Table')),
),
)
height = models.CharField(
_('Height'),
max_length=20,
default='250',
choices=(
('150', _('Short (150px)')),
('250', _('Average (250px)')),
('350', _('Tall (350px)')),
),
)
sort_order = models.CharField(
_('Sort data'),
max_length=5,
default='none',
help_text=_('This setting only applies for one-dimensional charts.'),
choices=(
('none', _('None')),
('alpha', _('Alphabetically')),
('asc', _('Increasing values')),
('desc', _('Decreasing values')),
),
)
hide_null_values = models.BooleanField(
default=False,
verbose_name=_('Hide null values'),
help_text=_('This setting only applies for one-dimensional charts.'),
)
manager_form_template = 'combo/chartngcell_form.html'
class Meta:
verbose_name = _('Chart')
@classmethod
def is_enabled(self):
return settings.KNOWN_SERVICES.get('bijoe') or settings.STATISTICS_PROVIDERS
def get_default_form_class(self):
from .forms import ChartNgForm
return ChartNgForm
def get_additional_label(self):
return self.title
def is_relevant(self, context):
return bool(self.statistic)
def check_validity(self):
if not self.statistic:
return
resp = None
try:
resp = self.get_statistic_data()
except RequestException:
pass
self.set_validity_from_url(
resp, not_found_code='statistic_data_not_found', invalid_code='statistic_url_invalid'
)
def get_cell_extra_context(self, context):
ctx = super(ChartNgCell, self).get_cell_extra_context(context)
if self.chart_type == 'table' and self.statistic:
try:
chart = self.get_chart(raise_if_not_cached=not (context.get('synchronous')))
except UnsupportedDataSet:
ctx['table'] = '<p>%s</p>' % _('Unsupported dataset.')
except HTTPError as e:
if e.response.status_code == 404:
ctx['table'] = '<p>%s</p>' % _('Visualization not found.')
else:
if not chart.raw_series:
ctx['table'] = '<p>%s</p>' % _('No data.')
else:
ctx['table'] = chart.render_table(
transpose=bool(chart.axis_count == 2),
total=getattr(chart, 'compute_sum', True),
)
ctx['table'] = ctx['table'].replace('<table>', '<table class="main">')
return ctx
def get_statistic_data(self, raise_if_not_cached=False):
return requests.get(
self.statistic.url,
params=self.get_filter_params(),
cache_duration=300,
remote_service='auto',
without_user=True,
raise_if_not_cached=raise_if_not_cached,
log_errors=False,
)
def get_chart(self, width=None, height=None, raise_if_not_cached=False):
response = self.get_statistic_data(raise_if_not_cached)
response.raise_for_status()
response = response.json()
style = pygal.style.DefaultStyle(font_family='OpenSans, sans-serif', background='transparent')
chart = {
'bar': pygal.Bar,
'horizontal-bar': pygal.HorizontalBar,
'stacked-bar': pygal.StackedBar,
'line': pygal.Line,
'pie': pygal.Pie,
'dot': pygal.Dot,
'table': pygal.Bar,
}[self.chart_type](config=pygal.Config(style=copy.copy(style)))
if self.statistic.service_slug == 'bijoe':
x_labels, y_labels, data = self.parse_response(response, chart)
chart.x_labels = x_labels
self.prepare_chart(chart, width, height)
if chart.axis_count == 1:
data = self.process_one_dimensional_data(chart, data)
self.add_data_to_chart(chart, data, y_labels)
else:
data = response['data']
chart.x_labels = data['x_labels']
chart.axis_count = min(len(data['series']), 2)
self.prepare_chart(chart, width, height)
if chart.axis_count == 1:
data['series'][0]['data'] = self.process_one_dimensional_data(
chart, data['series'][0]['data']
)
if self.chart_type == 'pie':
data["series"] = [
{"label": label, "data": [data]}
for label, data in zip(chart.x_labels, data["series"][0]["data"])
if data
]
for serie in data['series']:
chart.add(serie['label'], serie['data'])
return chart
def get_filter_params(self):
params = self.filter_params.copy()
now = timezone.now().date()
if self.time_range == 'current-year':
params['start'] = date(year=now.year, month=1, day=1)
elif self.time_range == 'previous-year':
params['start'] = date(year=now.year - 1, month=1, day=1)
params['end'] = date(year=now.year, month=1, day=1)
elif self.time_range == 'current-month':
params['start'] = date(year=now.year, month=now.month, day=1)
elif self.time_range == 'previous-month':
params['start'] = date(year=now.year, month=now.month - 1, day=1)
params['end'] = date(year=now.year, month=now.month, day=1)
elif self.time_range == 'range':
if self.time_range_start:
params['start'] = self.time_range_start
if self.time_range_end:
params['end'] = self.time_range_end
return params
def parse_response(self, response, chart):
# normalize axis to have a fake axis when there are no dimensions and
# always a x axis when there is a single dimension.
data = response['data']
loop_labels = response['axis'].get('loop') or []
x_labels = response['axis'].get('x_labels') or []
y_labels = response['axis'].get('y_labels') or []
if loop_labels:
if x_labels and y_labels:
# no support for three dimensions
raise UnsupportedDataSet()
if not y_labels:
y_labels = loop_labels
else:
x_labels, y_labels = y_labels, loop_labels
if len(y_labels) != len(data) or not all([len(x) == len(x_labels) for x in data]):
# varying dimensions
raise UnsupportedDataSet()
if not x_labels and not y_labels: # unidata
x_labels = ['']
y_labels = ['']
data = [data]
chart.axis_count = 0
elif not x_labels:
x_labels = y_labels
y_labels = ['']
chart.axis_count = 1
elif not y_labels:
y_labels = ['']
chart.axis_count = 1
else:
chart.axis_count = 2
chart.compute_sum = bool(response.get('measure') == 'integer' and chart.axis_count > 0)
formatter = self.get_value_formatter(response.get('unit'), response.get('measure'))
if formatter:
chart.config.value_formatter = formatter
return x_labels, y_labels, data
def prepare_chart(self, chart, width, height):
chart.config.margin = 0
if width:
chart.config.width = width
if height:
chart.config.height = height
if width or height:
chart.config.explicit_size = True
chart.config.js = [os.path.join(settings.STATIC_URL, 'js/pygal-tooltips.js')]
chart.show_legend = bool(chart.axis_count > 1)
chart.truncate_legend = 30
# matplotlib tab10 palette
chart.config.style.colors = (
'#1f77b4',
'#ff7f0e',
'#2ca02c',
'#d62728',
'#9467bd',
'#8c564b',
'#e377c2',
'#7f7f7f',
'#bcbd22',
'#17becf',
)
if self.chart_type == 'dot':
chart.show_legend = False
# use a single colour for dots
chart.config.style.colors = ('#1f77b4',) * len(chart.x_labels)
if self.chart_type != 'pie':
if width and width < 500:
chart.legend_at_bottom = True
if self.chart_type == 'horizontal-bar':
# truncate labels
chart.x_labels = [pygal.util.truncate(x, 15) for x in chart.x_labels]
else:
chart.show_legend = True
if width and width < 500:
chart.truncate_legend = 15
def process_one_dimensional_data(self, chart, data):
if self.hide_null_values:
data = self.hide_values(chart, data)
if self.sort_order != 'none':
data = self.sort_values(chart, data)
if getattr(chart, 'compute_sum', True) and self.chart_type == 'table':
data = self.add_total_to_line_table(chart, data)
return data
@staticmethod
def hide_values(chart, data):
x_labels, new_data = [], []
for label, value in zip(chart.x_labels, data):
if value:
x_labels.append(label)
new_data.append(value)
chart.x_labels = x_labels
return new_data
def sort_values(self, chart, data):
if self.sort_order == 'alpha':
tmp_items = sorted(zip(chart.x_labels, data), key=lambda x: x[0])
elif self.sort_order == 'asc':
tmp_items = sorted(zip(chart.x_labels, data), key=lambda x: (x[1] or 0))
elif self.sort_order == 'desc':
tmp_items = sorted(zip(chart.x_labels, data), key=lambda x: (x[1] or 0), reverse=True)
x_labels, sorted_data = zip(*[(label, value) for label, value in tmp_items])
chart.x_labels = list(x_labels)
return list(sorted_data)
@staticmethod
def add_total_to_line_table(chart, data):
# workaround pygal
chart.compute_sum = False
data.append(sum(x for x in data if x is not None))
chart.x_labels.append(gettext('Total'))
return data
def add_data_to_chart(self, chart, data, y_labels):
if self.chart_type != 'pie':
for i, serie_label in enumerate(y_labels):
if chart.axis_count < 2:
values = data
else:
values = [data[i][j] for j in range(len(chart.x_labels))]
chart.add(serie_label, values)
else:
# pie, create a serie by data, to get different colours
values = data
for label, value in zip(chart.x_labels, values):
if not value:
continue
chart.add(label, value)
@staticmethod
def get_value_formatter(unit, measure):
if unit == 'seconds' or measure == 'duration':
def format_duration(value):
if value is None:
return '-'
days = value // 86400
hours = (value % 86400) // 3600
if days:
days_string = ungettext('%d day', '%d days', days) % days
if hours:
hours_string = ungettext('%d hour', '%d hours', hours) % hours
if days and hours:
value = _('%(days_string)s and %(hours_string)s') % {
'days_string': days_string,
'hours_string': hours_string,
}
elif days:
value = days_string
elif hours:
value = hours_string
else:
value = _('Less than an hour')
return force_text(value)
return format_duration
elif measure == 'percent':
percent_formatter = lambda x: '{:.1f}%'.format(x)
return percent_formatter