353 lines
12 KiB
Python
353 lines
12 KiB
Python
# hobo - portal to configure and deploy applications
|
|
# Copyright (C) 2019 Entr'ouvert
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify it
|
|
# under the terms of the GNU Affero General Public License as published
|
|
# by the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import hashlib
|
|
import re
|
|
import string
|
|
import urllib.parse
|
|
from random import choice, randint
|
|
|
|
import requests
|
|
from django.conf import settings
|
|
from django.core import exceptions
|
|
from django.db import connection
|
|
from django.utils.encoding import force_bytes
|
|
from lxml import etree
|
|
|
|
from hobo.environment.models import Combo, Fargo, Variable, Wcs
|
|
|
|
CNIL_JS = """
|
|
// disallow cookie's time extension
|
|
_paq.push([function() {
|
|
var self = this;
|
|
function getOriginalVisitorCookieTimeout() {
|
|
var now = new Date(),
|
|
nowTs = Math.round(now.getTime() / 1000),
|
|
visitorInfo = self.getVisitorInfo();
|
|
var createTs = parseInt(visitorInfo[2]);
|
|
var cookieTimeout = 33696000; // 13 months in seconds
|
|
var originalTimeout = createTs + cookieTimeout - nowTs;
|
|
return originalTimeout;
|
|
}
|
|
this.setVisitorCookieTimeout( getOriginalVisitorCookieTimeout() );
|
|
}]);
|
|
"""
|
|
|
|
|
|
def get_variable(name, default=''):
|
|
"""get hobo variables from DB
|
|
set it to '' into DB if not already created
|
|
"""
|
|
variable, created = Variable.objects.get_or_create(name=name, defaults={'auto': True, 'value': default})
|
|
return variable
|
|
|
|
|
|
def get_variable_value(name, default=''):
|
|
"""get hobo variables's value from DB"""
|
|
try:
|
|
value = Variable.objects.get(name=name).value
|
|
except exceptions.ObjectDoesNotExist:
|
|
value = default
|
|
return value
|
|
|
|
|
|
def get_tracking_js():
|
|
"""merge JS code from the 2 above variables"""
|
|
tracking_js = get_variable_value('cnil_compliant_visits_tracking_js')
|
|
tracking_js += get_variable_value('visits_tracking_js')
|
|
return tracking_js
|
|
|
|
|
|
def put_tracking_js(tracking_js):
|
|
"""store JS code into only one of the 2 above variables"""
|
|
variable1 = get_variable('cnil_compliant_visits_tracking_js')
|
|
variable2 = get_variable('visits_tracking_js')
|
|
if tracking_js != '':
|
|
if compute_cnil_acknowledgment_level(tracking_js) != 'bad':
|
|
variable1.value = tracking_js
|
|
variable1.save()
|
|
variable2.delete()
|
|
else:
|
|
variable1.delete()
|
|
variable2.value = tracking_js
|
|
variable2.save()
|
|
else:
|
|
variable1.delete()
|
|
variable2.delete()
|
|
|
|
|
|
def get_tenant_name_and_public_urls():
|
|
"""get an alias for our matomo's id and urls to monitor"""
|
|
tenant_name = None
|
|
services = [x for x in Combo.objects.all() if 'portal-user' in x.template_name and not x.secondary]
|
|
if services != [] and services[0] != '':
|
|
tenant_name = urllib.parse.urlparse(services[0].base_url).netloc
|
|
services += [x for x in Wcs.objects.all() if not x.secondary]
|
|
services += [x for x in Fargo.objects.all() if not x.secondary]
|
|
site_urls = [x.base_url for x in services if x.base_url != '']
|
|
return tenant_name, site_urls
|
|
|
|
|
|
class MatomoException(Exception):
|
|
"""unexpected Matomo internal error"""
|
|
|
|
|
|
class MatomoError(MatomoException):
|
|
"""expected Matomo error responses"""
|
|
|
|
|
|
class MatomoWS(object):
|
|
"""api for matomo webservices"""
|
|
|
|
def __init__(self):
|
|
config = settings.MATOMO_SERVER
|
|
try:
|
|
self.url_ws_base = config['URL']
|
|
self.token_auth = config['TOKEN_AUTH']
|
|
self.email_template = config['EMAIL_TEMPLATE']
|
|
if '%s' in self.email_template:
|
|
self.email_template = self.email_template.replace('%s', '%(user_login)s')
|
|
except KeyError as exc:
|
|
raise MatomoError('no settings for matomo: %s' % str(exc))
|
|
|
|
@staticmethod
|
|
def parse_response(content):
|
|
try:
|
|
tree = etree.fromstring(content)
|
|
except etree.XMLSyntaxError as exc:
|
|
raise MatomoException('etree.XMLSyntaxError: %s' % str(exc))
|
|
return tree
|
|
|
|
@staticmethod
|
|
def raise_on_error(tree):
|
|
"""handle matomo XML error messages"""
|
|
tags = tree.xpath('/result/error')
|
|
if tags != []:
|
|
try:
|
|
attr = tags[0].items()[0]
|
|
if attr[0] == 'message':
|
|
raise MatomoError(attr[1])
|
|
except IndexError:
|
|
pass
|
|
raise MatomoException('internal error')
|
|
|
|
@staticmethod
|
|
def assert_success(tree, message='matomo'):
|
|
"""handle generic 'ok' responses"""
|
|
success = True
|
|
tags = tree.xpath('/result/success')
|
|
if tags != []:
|
|
try:
|
|
attr = tags[0].items()[0]
|
|
if attr[0] != 'message' or attr[1] != 'ok':
|
|
success = False
|
|
except IndexError:
|
|
success = False
|
|
if not success:
|
|
raise MatomoException(message + ' fails')
|
|
return success
|
|
|
|
def call(self, data):
|
|
data['module'] = 'API'
|
|
data['token_auth'] = self.token_auth
|
|
data['language'] = 'en'
|
|
resp = requests.post(self.url_ws_base, data=data)
|
|
if resp.status_code != 200:
|
|
raise MatomoException('unexpected status code: %s' % resp.status_code)
|
|
tree = self.parse_response(resp.content)
|
|
self.raise_on_error(tree)
|
|
return tree
|
|
|
|
def get_site_id_from_site_url(self, url):
|
|
data = {'method': 'SitesManager.getSitesIdFromSiteUrl', 'url': url}
|
|
tree = self.call(data)
|
|
try:
|
|
if tree.xpath('/result[not(*)]')[0].text is None:
|
|
raise MatomoError('url not found')
|
|
except IndexError:
|
|
pass
|
|
try:
|
|
tag = tree.xpath('/result/row/idsite')[0]
|
|
except IndexError:
|
|
raise MatomoException('get_site_id_from_site_url fails')
|
|
return tag.text
|
|
|
|
def add_site(self, site_name):
|
|
data = {'method': 'SitesManager.addSite', 'siteName': site_name}
|
|
tree = self.call(data)
|
|
try:
|
|
tag = tree.xpath('/result')[0]
|
|
except IndexError:
|
|
raise MatomoException('add_site fails')
|
|
return tag.text
|
|
|
|
def add_site_alias_urls(self, id_site, site_urls):
|
|
data = {'method': 'SitesManager.addSiteAliasUrls', 'idSite': id_site}
|
|
cpt = 0
|
|
for url in site_urls:
|
|
key = 'urls[%i]' % cpt
|
|
data[key] = url
|
|
cpt += 1
|
|
tree = self.call(data)
|
|
try:
|
|
tag = tree.xpath('/result')[0]
|
|
except IndexError:
|
|
raise MatomoException('add_site_alias_urls fails')
|
|
return tag.text
|
|
|
|
def add_user(self, user_login, password, initial_id_site):
|
|
data = {
|
|
'method': 'UsersManager.addUser',
|
|
'userLogin': user_login,
|
|
'password': password,
|
|
'email': self.email_template % {'user_login': user_login},
|
|
'initialIdSite': initial_id_site,
|
|
}
|
|
tree = self.call(data)
|
|
return self.assert_success(tree, 'add_user')
|
|
|
|
def del_user(self, user_login):
|
|
data = {'method': 'UsersManager.deleteUser', 'userLogin': user_login}
|
|
tree = self.call(data)
|
|
return self.assert_success(tree, 'del_user')
|
|
|
|
def get_javascript_tag(self, id_site):
|
|
data = {'method': 'SitesManager.getJavascriptTag', 'idSite': id_site}
|
|
tree = self.call(data)
|
|
try:
|
|
tag = tree.xpath('/result')[0]
|
|
except IndexError:
|
|
raise MatomoException('get_javascript_tag fails')
|
|
return tag.text
|
|
|
|
def create_fake_first_tracking_visit(self, id_site):
|
|
"""this function use a different matomo's webservice API"""
|
|
url = "%s/matomo.php" % self.url_ws_base
|
|
data = {'requests': ['?idsite=%s&action_name=ping&rec=1' % id_site]}
|
|
resp = requests.post(url, json=data)
|
|
if resp.status_code != 200:
|
|
raise MatomoException('unexpected status code: %s' % resp.status_code)
|
|
try:
|
|
tree = resp.json()
|
|
except ValueError:
|
|
raise MatomoException('internal error on ping (JSON expected): %s' % resp.content)
|
|
if not isinstance(tree, dict):
|
|
raise MatomoException('internal error on ping (dict expected): %s' % resp.content)
|
|
if 'status' not in tree:
|
|
raise MatomoException('internal error on ping (status expected): %s' % resp.content)
|
|
if tree['status'] != 'success':
|
|
raise MatomoError('ping fails: %s' % resp.content)
|
|
return True
|
|
|
|
|
|
def upgrade_site(matomo, tenant_name, site_urls):
|
|
try:
|
|
# tenant name match because it is the basename of one of registered urls
|
|
id_site = matomo.get_site_id_from_site_url(tenant_name)
|
|
except MatomoError as exc:
|
|
if str(exc) == 'url not found':
|
|
id_site = matomo.add_site(tenant_name)
|
|
else:
|
|
raise exc
|
|
matomo.add_site_alias_urls(id_site, site_urls)
|
|
return id_site
|
|
|
|
|
|
def upgrade_user(matomo, user_login, id_site):
|
|
# API is not obvious to change password (need the previous one)
|
|
try:
|
|
matomo.del_user(user_login)
|
|
except MatomoError:
|
|
pass
|
|
|
|
# generate a password and add a new user
|
|
characters = string.ascii_letters + string.punctuation + string.digits
|
|
password = "".join(choice(characters) for x in range(randint(8, 16)))
|
|
matomo.add_user(user_login, password, id_site)
|
|
|
|
# build the user's login url
|
|
password_md5 = hashlib.md5(force_bytes(password)).hexdigest()
|
|
logme_url = '%s/index.php?module=Login&action=logme&login=%s&password=%s' % (
|
|
matomo.url_ws_base,
|
|
user_login,
|
|
password_md5,
|
|
)
|
|
return logme_url
|
|
|
|
|
|
def upgrade_javascript_tag(matomo, id_site):
|
|
"""addapt JS return by Matomo and merge it whith previous JS code we have"""
|
|
matomo_tag = matomo.get_javascript_tag(id_site)
|
|
lines = matomo_tag.split('\n')
|
|
|
|
# acording to publik-base-theme/templates/includes/tracking.html,
|
|
# we need to remove <script> tags from matomo's output javascript,
|
|
regex = re.compile('</?script.*>')
|
|
count = len(lines)
|
|
while count > 0:
|
|
count -= 1
|
|
if regex.match(lines[count]):
|
|
del lines[count]
|
|
|
|
# and we also need to addapt matomo HTML comments to JS
|
|
regex = re.compile('<!-- (.*) -->')
|
|
for count, line in enumerate(lines):
|
|
lines[count] = regex.sub('// \\1', line)
|
|
|
|
# disallow cookie's time extension
|
|
regex = re.compile(r'\s*var _paq = window._paq \|\| \[\];')
|
|
for count, line in enumerate(lines):
|
|
if regex.match(line):
|
|
lines.insert(count + 1, CNIL_JS)
|
|
break
|
|
|
|
enhanced_tag = '\n'.join(lines)
|
|
return enhanced_tag
|
|
|
|
|
|
def compute_cnil_acknowledgment_level(tracking_js):
|
|
bad_tracker_words = [
|
|
'google',
|
|
'hotjar',
|
|
'ATInternet.Tracker.Tag',
|
|
]
|
|
for word in bad_tracker_words:
|
|
if word in tracking_js:
|
|
return 'bad'
|
|
if 'paq.push' in tracking_js and 'getOriginalVisitorCookieTimeout' in tracking_js:
|
|
# piwik/matomo and life time extension prevention
|
|
return 'excellent'
|
|
return 'good'
|
|
|
|
|
|
def auto_configure_matomo(matomo):
|
|
"""main function"""
|
|
tenant_name, site_urls = get_tenant_name_and_public_urls()
|
|
if tenant_name is None:
|
|
raise MatomoException("no portal-user's url available")
|
|
|
|
# update matomo account
|
|
id_site = upgrade_site(matomo, tenant_name, site_urls)
|
|
logme_url = upgrade_user(matomo, tenant_name, id_site)
|
|
tracking_js = upgrade_javascript_tag(matomo, id_site)
|
|
|
|
# save matomo's variables
|
|
logme_url_var = get_variable('matomo_logme_url')
|
|
logme_url_var.value = logme_url
|
|
logme_url_var.save()
|
|
put_tracking_js(tracking_js)
|
|
return id_site
|