hobo/hobo/matomo/utils.py

352 lines
12 KiB
Python

# hobo - portal to configure and deploy applications
# Copyright (C) 2019 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import hashlib
import re
import string
import urllib.parse
from random import choice, randint
import requests
from django.conf import settings
from django.core import exceptions
from django.utils.encoding import force_bytes
from lxml import etree
from hobo.environment.models import Combo, Fargo, Variable, Wcs
CNIL_JS = """
// disallow cookie's time extension
_paq.push([function() {
var self = this;
function getOriginalVisitorCookieTimeout() {
var now = new Date(),
nowTs = Math.round(now.getTime() / 1000),
visitorInfo = self.getVisitorInfo();
var createTs = parseInt(visitorInfo[2]);
var cookieTimeout = 33696000; // 13 months in seconds
var originalTimeout = createTs + cookieTimeout - nowTs;
return originalTimeout;
}
this.setVisitorCookieTimeout( getOriginalVisitorCookieTimeout() );
}]);
"""
def get_variable(name, default=''):
"""get hobo variables from DB
set it to '' into DB if not already created
"""
variable, dummy = Variable.objects.get_or_create(name=name, defaults={'auto': True, 'value': default})
return variable
def get_variable_value(name, default=''):
"""get hobo variables's value from DB"""
try:
value = Variable.objects.get(name=name).value
except exceptions.ObjectDoesNotExist:
value = default
return value
def get_tracking_js():
"""merge JS code from the 2 above variables"""
tracking_js = get_variable_value('cnil_compliant_visits_tracking_js')
tracking_js += get_variable_value('visits_tracking_js')
return tracking_js
def put_tracking_js(tracking_js):
"""store JS code into only one of the 2 above variables"""
variable1 = get_variable('cnil_compliant_visits_tracking_js')
variable2 = get_variable('visits_tracking_js')
if tracking_js != '':
if compute_cnil_acknowledgment_level(tracking_js) != 'bad':
variable1.value = tracking_js
variable1.save()
variable2.delete()
else:
variable1.delete()
variable2.value = tracking_js
variable2.save()
else:
variable1.delete()
variable2.delete()
def get_tenant_name_and_public_urls():
"""get an alias for our matomo's id and urls to monitor"""
tenant_name = None
services = [x for x in Combo.objects.all() if 'portal-user' in x.template_name and not x.secondary]
if services != [] and services[0] != '':
tenant_name = urllib.parse.urlparse(services[0].base_url).netloc
services += [x for x in Wcs.objects.all() if not x.secondary]
services += [x for x in Fargo.objects.all() if not x.secondary]
site_urls = [x.base_url for x in services if x.base_url != '']
return tenant_name, site_urls
class MatomoException(Exception):
"""unexpected Matomo internal error"""
class MatomoError(MatomoException):
"""expected Matomo error responses"""
class MatomoWS:
"""api for matomo webservices"""
def __init__(self):
config = settings.MATOMO_SERVER
try:
self.url_ws_base = config['URL']
self.token_auth = config['TOKEN_AUTH']
self.email_template = config['EMAIL_TEMPLATE']
if '%s' in self.email_template:
self.email_template = self.email_template.replace('%s', '%(user_login)s')
except KeyError as exc:
raise MatomoError('no settings for matomo: %s' % str(exc))
@staticmethod
def parse_response(content):
try:
tree = etree.fromstring(content)
except etree.XMLSyntaxError as exc:
raise MatomoException('etree.XMLSyntaxError: %s' % str(exc))
return tree
@staticmethod
def raise_on_error(tree):
"""handle matomo XML error messages"""
tags = tree.xpath('/result/error')
if tags != []:
try:
attr = tags[0].items()[0]
if attr[0] == 'message':
raise MatomoError(attr[1])
except IndexError:
pass
raise MatomoException('internal error')
@staticmethod
def assert_success(tree, message='matomo'):
"""handle generic 'ok' responses"""
success = True
tags = tree.xpath('/result/success')
if tags != []:
try:
attr = tags[0].items()[0]
if attr[0] != 'message' or attr[1] != 'ok':
success = False
except IndexError:
success = False
if not success:
raise MatomoException(message + ' fails')
return success
def call(self, data):
data['module'] = 'API'
data['token_auth'] = self.token_auth
data['language'] = 'en'
resp = requests.post(self.url_ws_base, data=data, timeout=30)
if resp.status_code != 200:
raise MatomoException('unexpected status code: %s' % resp.status_code)
tree = self.parse_response(resp.content)
self.raise_on_error(tree)
return tree
def get_site_id_from_site_url(self, url):
data = {'method': 'SitesManager.getSitesIdFromSiteUrl', 'url': url}
tree = self.call(data)
try:
if tree.xpath('/result[not(*)]')[0].text is None:
raise MatomoError('url not found')
except IndexError:
pass
try:
tag = tree.xpath('/result/row/idsite')[0]
except IndexError:
raise MatomoException('get_site_id_from_site_url fails')
return tag.text
def add_site(self, site_name):
data = {'method': 'SitesManager.addSite', 'siteName': site_name}
tree = self.call(data)
try:
tag = tree.xpath('/result')[0]
except IndexError:
raise MatomoException('add_site fails')
return tag.text
def add_site_alias_urls(self, id_site, site_urls):
data = {'method': 'SitesManager.addSiteAliasUrls', 'idSite': id_site}
cpt = 0
for url in site_urls:
key = 'urls[%i]' % cpt
data[key] = url
cpt += 1
tree = self.call(data)
try:
tag = tree.xpath('/result')[0]
except IndexError:
raise MatomoException('add_site_alias_urls fails')
return tag.text
def add_user(self, user_login, password, initial_id_site):
data = {
'method': 'UsersManager.addUser',
'userLogin': user_login,
'password': password,
'email': self.email_template % {'user_login': user_login},
'initialIdSite': initial_id_site,
}
tree = self.call(data)
return self.assert_success(tree, 'add_user')
def del_user(self, user_login):
data = {'method': 'UsersManager.deleteUser', 'userLogin': user_login}
tree = self.call(data)
return self.assert_success(tree, 'del_user')
def get_javascript_tag(self, id_site):
data = {'method': 'SitesManager.getJavascriptTag', 'idSite': id_site}
tree = self.call(data)
try:
tag = tree.xpath('/result')[0]
except IndexError:
raise MatomoException('get_javascript_tag fails')
return tag.text
def create_fake_first_tracking_visit(self, id_site):
"""this function use a different matomo's webservice API"""
url = '%s/matomo.php' % self.url_ws_base
data = {'requests': ['?idsite=%s&action_name=ping&rec=1' % id_site]}
resp = requests.post(url, json=data, timeout=30)
if resp.status_code != 200:
raise MatomoException('unexpected status code: %s' % resp.status_code)
try:
tree = resp.json()
except ValueError:
raise MatomoException('internal error on ping (JSON expected): %s' % resp.content)
if not isinstance(tree, dict):
raise MatomoException('internal error on ping (dict expected): %s' % resp.content)
if 'status' not in tree:
raise MatomoException('internal error on ping (status expected): %s' % resp.content)
if tree['status'] != 'success':
raise MatomoError('ping fails: %s' % resp.content)
return True
def upgrade_site(matomo, tenant_name, site_urls):
try:
# tenant name match because it is the basename of one of registered urls
id_site = matomo.get_site_id_from_site_url(tenant_name)
except MatomoError as exc:
if str(exc) == 'url not found':
id_site = matomo.add_site(tenant_name)
else:
raise exc
matomo.add_site_alias_urls(id_site, site_urls)
return id_site
def upgrade_user(matomo, user_login, id_site):
# API is not obvious to change password (need the previous one)
try:
matomo.del_user(user_login)
except MatomoError:
pass
# generate a password and add a new user
characters = string.ascii_letters + string.punctuation + string.digits
password = ''.join(choice(characters) for x in range(randint(8, 16)))
matomo.add_user(user_login, password, id_site)
# build the user's login url
password_md5 = hashlib.md5(force_bytes(password)).hexdigest()
logme_url = '%s/index.php?module=Login&action=logme&login=%s&password=%s' % (
matomo.url_ws_base,
user_login,
password_md5,
)
return logme_url
def upgrade_javascript_tag(matomo, id_site):
"""addapt JS return by Matomo and merge it whith previous JS code we have"""
matomo_tag = matomo.get_javascript_tag(id_site)
lines = matomo_tag.split('\n')
# acording to publik-base-theme/templates/includes/tracking.html,
# we need to remove <script> tags from matomo's output javascript,
regex = re.compile('</?script.*>')
count = len(lines)
while count > 0:
count -= 1
if regex.match(lines[count]):
del lines[count]
# and we also need to addapt matomo HTML comments to JS
regex = re.compile('<!-- (.*) -->')
for count, line in enumerate(lines):
lines[count] = regex.sub('// \\1', line)
# disallow cookie's time extension
regex = re.compile(r'\s*var _paq = window._paq \|\| \[\];')
for count, line in enumerate(lines):
if regex.match(line):
lines.insert(count + 1, CNIL_JS)
break
enhanced_tag = '\n'.join(lines)
return enhanced_tag
def compute_cnil_acknowledgment_level(tracking_js):
bad_tracker_words = [
'google',
'hotjar',
'ATInternet.Tracker.Tag',
]
for word in bad_tracker_words:
if word in tracking_js:
return 'bad'
if 'paq.push' in tracking_js and 'getOriginalVisitorCookieTimeout' in tracking_js:
# piwik/matomo and life time extension prevention
return 'excellent'
return 'good'
def auto_configure_matomo(matomo):
"""main function"""
tenant_name, site_urls = get_tenant_name_and_public_urls()
if tenant_name is None:
raise MatomoException("no portal-user's url available")
# update matomo account
id_site = upgrade_site(matomo, tenant_name, site_urls)
logme_url = upgrade_user(matomo, tenant_name, id_site)
tracking_js = upgrade_javascript_tag(matomo, id_site)
# save matomo's variables
logme_url_var = get_variable('matomo_logme_url')
logme_url_var.value = logme_url
logme_url_var.save()
put_tracking_js(tracking_js)
return id_site