hobo/hobo/matomo/utils.py

328 lines
11 KiB
Python

# hobo - portal to configure and deploy applications
# Copyright (C) 2019 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import hashlib
import re
import requests
import string
from lxml import etree
from random import choice, randint
from django.db import connection
from django.conf import settings
from django.core import exceptions
from django.utils.six.moves.urllib import parse as urlparse
from hobo.environment.models import Variable, Wcs, Combo, Fargo
CNIL_JS = """
// disallow cookie's time extension
_paq.push([function() {
var self = this;
function getOriginalVisitorCookieTimeout() {
var now = new Date(),
nowTs = Math.round(now.getTime() / 1000),
visitorInfo = self.getVisitorInfo();
var createTs = parseInt(visitorInfo[2]);
var cookieTimeout = 33696000; // 13 months in seconds
var originalTimeout = createTs + cookieTimeout - nowTs;
return originalTimeout;
}
this.setVisitorCookieTimeout( getOriginalVisitorCookieTimeout() );
}]);
"""
def get_variable(name, default=''):
"""get hobo variables from DB
set it to '' into DB if not already created
"""
variable, created = Variable.objects.get_or_create(
name=name,
defaults={'auto': True, 'value': default})
return variable
def get_variable_value(name, default=''):
"""get hobo variables's value from DB"""
try:
value = Variable.objects.get(name=name).value
except exceptions.ObjectDoesNotExist:
value = default
return value
def get_tracking_js():
"""merge JS code from the 2 above variables"""
tracking_js = get_variable_value('cnil_compliant_visits_tracking_js')
tracking_js += get_variable_value('visits_tracking_js')
return tracking_js
def put_tracking_js(tracking_js):
"""store JS code into only one of the 2 above variables"""
variable1 = get_variable('cnil_compliant_visits_tracking_js')
variable2 = get_variable('visits_tracking_js')
if tracking_js != '':
if compute_cnil_acknowledgment_level(tracking_js) != 'error':
variable1.value = tracking_js
variable1.save()
variable2.delete()
else:
variable1.delete()
variable2.value = tracking_js
variable2.save()
else:
variable1.delete()
variable2.delete()
def get_tenant_name_and_public_urls():
"""get an alias for our matomo's id and urls to monitor"""
tenant_name = None
services = [x for x in Combo.objects.all() if 'portal-user' in x.template_name]
if services != [] and services[0] != '':
tenant_name = urlparse.urlparse(services[0].base_url).netloc
services += [x for x in Wcs.objects.all()]
services += [x for x in Fargo.objects.all()]
site_urls = [x.base_url for x in services if x.base_url != '']
return tenant_name, site_urls
class MatomoException(Exception):
"""unexpected Matomo internal error"""
class MatomoError(MatomoException):
"""expected Matomo error responses"""
class MatomoWS(object):
"""api for matomo webservices"""
def __init__(self):
config = settings.MATOMO_SERVER
try:
self.url_ws_base = config['URL']
self.token_auth = config['TOKEN_AUTH']
self.email_template = config['EMAIL_TEMPLATE']
except KeyError as exc:
raise MatomoError('no settings for matomo: %s' % str(exc))
@staticmethod
def parse_response(content):
try:
tree = etree.fromstring(content)
except etree.XMLSyntaxError as exc:
raise MatomoException('etree.XMLSyntaxError: %s' % str(exc))
return tree
@staticmethod
def raise_on_error(tree):
"""handle matomo XML error messages"""
tags = tree.xpath('/result/error')
if tags != []:
try:
attr = tags[0].items()[0]
if attr[0] == 'message':
raise MatomoError(attr[1])
except IndexError:
pass
raise MatomoException('internal error')
@staticmethod
def assert_success(tree, message='matomo'):
"""handle generic 'ok' responses"""
success = True
tags = tree.xpath('/result/success')
if tags != []:
try:
attr = tags[0].items()[0]
if attr[0] != 'message' or attr[1] != 'ok':
success = False
except IndexError:
success = False
if not success:
raise MatomoException(message + ' fails')
return success
def call(self, data):
data['module'] = 'API'
data['token_auth'] = self.token_auth
data['language'] = 'en'
resp = requests.post(self.url_ws_base, data=data)
if resp.status_code != 200:
raise MatomoException('unexpected status code: %s' % resp.status_code)
tree = self.parse_response(resp.content)
self.raise_on_error(tree)
return tree
def get_site_id_from_site_url(self, url):
data = {'method': 'SitesManager.getSitesIdFromSiteUrl',
'url': url}
tree = self.call(data)
try:
if tree.xpath('/result[not(*)]')[0].text is None:
raise MatomoError('url not found')
except IndexError:
pass
try:
tag = tree.xpath('/result/row/idsite')[0]
except IndexError:
raise MatomoException('get_site_id_from_site_url fails')
return tag.text
def add_site(self, site_name, site_urls):
data = {'method': 'SitesManager.addSite',
'siteName': site_name}
cpt = 0
for url in site_urls:
key = 'urls[%i]' % cpt
data[key] = url
cpt += 1
tree = self.call(data)
try:
tag = tree.xpath('/result')[0]
except IndexError:
raise MatomoException('add_site fails')
return tag.text
def add_user(self, user_login, password, initial_id_site):
data = {'method': 'UsersManager.addUser',
'userLogin': user_login,
'password': password,
'email': self.email_template % user_login,
'initialIdSite': initial_id_site}
tree = self.call(data)
return self.assert_success(tree, 'add_user')
def del_user(self, user_login):
data = {'method': 'UsersManager.deleteUser',
'userLogin': user_login}
tree = self.call(data)
return self.assert_success(tree, 'del_user')
def get_javascript_tag(self, id_site):
data = {'method': 'SitesManager.getJavascriptTag',
'idSite': id_site}
tree = self.call(data)
try:
tag = tree.xpath('/result')[0]
except IndexError:
raise MatomoException('get_javascript_tag fails')
return tag.text
def create_fake_first_tracking_visit(self, id_site):
"""this function use a different matomo's webservice API"""
url = "%s/matomo.php" % self.url_ws_base
data = {'requests': ['?idsite=%s&action_name=ping&rec=1' % id_site]}
resp = requests.post(url, json=data)
if resp.status_code != 200:
raise MatomoException('unexpected status code: %s' % resp.status_code)
try:
tree = resp.json()
except ValueError:
raise MatomoException(
'internal error on ping (JSON expected): %s' % resp.content)
if not isinstance(tree, dict):
raise MatomoException(
'internal error on ping (dict expected): %s' % resp.content)
if 'status' not in tree:
raise MatomoException(
'internal error on ping (status expected): %s' % resp.content)
if tree['status'] != 'success':
raise MatomoError('ping fails: %s' % resp.content)
return True
def upgrade_site(matomo, tenant_name, site_urls):
try:
# tenant name match because it is the basename of one of registered urls
id_site = matomo.get_site_id_from_site_url(tenant_name)
except MatomoError as exc:
if str(exc) == 'url not found':
id_site = matomo.add_site(tenant_name, site_urls)
else:
raise exc
return id_site
def upgrade_user(matomo, user_login, id_site):
# API is not obvious to change password (need the previous one)
try:
matomo.del_user(user_login)
except MatomoError:
pass
# generate a password and add a new user
characters = string.ascii_letters + string.punctuation + string.digits
password = "".join(choice(characters) for x in range(randint(8, 16)))
matomo.add_user(user_login, password, id_site)
# build the user's login url
password_md5 = hashlib.md5(password).hexdigest()
logme_url = '%s/index.php?module=Login&action=logme&login=%s&password=%s' % (
matomo.url_ws_base, user_login, password_md5)
return logme_url
def upgrade_javascript_tag(matomo, id_site):
"""addapt JS return by Matomo and merge it whith previous JS code we have"""
matomo_tag = matomo.get_javascript_tag(id_site)
lines = matomo_tag.split('\n')
# acording to publik-base-theme/templates/includes/tracking.html,
# we need to remove <script> tags from matomo's output javascript,
regex = re.compile('</?script.*>')
count = len(lines)
while count > 0:
count -= 1
if regex.match(lines[count]):
del lines[count]
# and we also need to addapt matomo HTML comments to JS
regex = re.compile('<!-- (.*) -->')
for count, line in enumerate(lines):
lines[count] = regex.sub('// \\1', line)
# disallow cookie's time extension
regex = re.compile(r'\s*var _paq = window._paq \|\| \[\];')
for count, line in enumerate(lines):
if regex.match(line):
lines.insert(count+1, CNIL_JS)
break
enhanced_tag = '\n'.join(lines)
return enhanced_tag
def compute_cnil_acknowledgment_level(tracking_js):
if tracking_js.find('google') != -1:
# google reference found into javascript
return 'error'
if tracking_js.find('getOriginalVisitorCookieTimeout') == -1:
# can't find cookie's life time extension prevention
return 'warning'
return 'success'
def auto_configure_matomo(matomo):
"""main function"""
tenant_name, site_urls = get_tenant_name_and_public_urls()
if tenant_name is None:
raise MatomoException("no portal-user's url available")
# update matomo account
id_site = upgrade_site(matomo, tenant_name, site_urls)
logme_url = upgrade_user(matomo, tenant_name, id_site)
tracking_js = upgrade_javascript_tag(matomo, id_site)
# save matomo's variables
logme_url_var = get_variable('matomo_logme_url')
logme_url_var.value = logme_url
logme_url_var.save()
put_tracking_js(tracking_js)
return id_site