matomo: manage matomo's webservices (#31778)
This commit is contained in:
parent
95e60632ab
commit
71fdf9f7c5
|
@ -0,0 +1,302 @@
|
|||
# hobo - portal to configure and deploy applications
|
||||
# Copyright (C) 2019 Entr'ouvert
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify it
|
||||
# under the terms of the GNU Affero General Public License as published
|
||||
# by the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import hashlib
|
||||
import re
|
||||
import requests
|
||||
import string
|
||||
|
||||
from lxml import etree
|
||||
from random import choice, randint
|
||||
|
||||
from django.db import connection
|
||||
from django.conf import settings
|
||||
from django.core import exceptions
|
||||
from django.utils.six.moves.urllib import parse as urlparse
|
||||
|
||||
from hobo.environment.models import Variable, Wcs, Combo, Fargo
|
||||
|
||||
CNIL_JS = """
|
||||
// disallow cookie's time extension
|
||||
_paq.push([function() {
|
||||
var self = this;
|
||||
function getOriginalVisitorCookieTimeout() {
|
||||
var now = new Date(),
|
||||
nowTs = Math.round(now.getTime() / 1000),
|
||||
visitorInfo = self.getVisitorInfo();
|
||||
var createTs = parseInt(visitorInfo[2]);
|
||||
var cookieTimeout = 33696000; // 13 months in seconds
|
||||
var originalTimeout = createTs + cookieTimeout - nowTs;
|
||||
return originalTimeout;
|
||||
}
|
||||
this.setVisitorCookieTimeout( getOriginalVisitorCookieTimeout() );
|
||||
}]);
|
||||
"""
|
||||
|
||||
def get_variable(name, default=''):
|
||||
"""get hobo variables from DB
|
||||
set it to '' into DB if not already created
|
||||
"""
|
||||
variable, created = Variable.objects.get_or_create(
|
||||
name=name,
|
||||
defaults={'auto': True, 'value': default})
|
||||
return variable
|
||||
|
||||
def get_variable_value(name, default=''):
|
||||
"""get hobo variables's value from DB"""
|
||||
try:
|
||||
value = Variable.objects.get(name=name).value
|
||||
except exceptions.ObjectDoesNotExist:
|
||||
value = default
|
||||
return value
|
||||
|
||||
def get_tracking_js():
|
||||
"""merge JS code from the 2 above variables"""
|
||||
tracking_js = get_variable_value('cnil_compliant_visits_tracking_js')
|
||||
tracking_js += get_variable_value('visits_tracking_js')
|
||||
return tracking_js
|
||||
|
||||
def put_tracking_js(tracking_js):
|
||||
"""store JS code into only one of the 2 above variables"""
|
||||
variable1 = get_variable('cnil_compliant_visits_tracking_js')
|
||||
variable2 = get_variable('visits_tracking_js')
|
||||
if tracking_js != '':
|
||||
if compute_cnil_acknowledgment_level(tracking_js) != 'error':
|
||||
variable1.value = tracking_js
|
||||
variable1.save()
|
||||
variable2.delete()
|
||||
else:
|
||||
variable1.delete()
|
||||
variable2.value = tracking_js
|
||||
variable2.save()
|
||||
else:
|
||||
variable1.delete()
|
||||
variable2.delete()
|
||||
|
||||
def get_tenant_name_and_public_urls():
|
||||
"""get an alias for our matomo's id and urls to monitor"""
|
||||
tenant_name = None
|
||||
services = [x for x in Combo.objects.all() if x.template_name == 'portal-user']
|
||||
if services != [] and services[0] != '':
|
||||
tenant_name = urlparse.urlparse(services[0].base_url).netloc
|
||||
services += [x for x in Wcs.objects.all()]
|
||||
services += [x for x in Fargo.objects.all()]
|
||||
site_urls = [x.base_url for x in services if x.base_url != '']
|
||||
return tenant_name, site_urls
|
||||
|
||||
class MatomoException(Exception):
|
||||
"""unexpected Matomo internal error"""
|
||||
|
||||
class MatomoError(MatomoException):
|
||||
"""expected Matomo error responses"""
|
||||
|
||||
class MatomoWS(object):
|
||||
"""api for matomo webservices"""
|
||||
|
||||
def __init__(self):
|
||||
config = settings.MATOMO_SERVER
|
||||
try:
|
||||
self.url_ws_base = config['URL']
|
||||
self.token_auth = config['TOKEN_AUTH']
|
||||
self.email_template = config['EMAIL_TEMPLATE']
|
||||
except KeyError as exc:
|
||||
raise MatomoError('no settings for matomo: %s' % str(exc))
|
||||
|
||||
@staticmethod
|
||||
def parse_response(content):
|
||||
try:
|
||||
tree = etree.fromstring(content)
|
||||
except etree.XMLSyntaxError as exc:
|
||||
raise MatomoException('etree.XMLSyntaxError: %s' % str(exc))
|
||||
return tree
|
||||
|
||||
@staticmethod
|
||||
def raise_on_error(tree):
|
||||
"""handle matomo XML error messages"""
|
||||
tags = tree.xpath('/result/error')
|
||||
if tags != []:
|
||||
try:
|
||||
attr = tags[0].items()[0]
|
||||
if attr[0] == 'message':
|
||||
raise MatomoError(attr[1])
|
||||
except IndexError:
|
||||
pass
|
||||
raise MatomoException('internal error')
|
||||
|
||||
@staticmethod
|
||||
def assert_success(tree, message='matomo'):
|
||||
"""handle generic 'ok' responses"""
|
||||
success = True
|
||||
tags = tree.xpath('/result/success')
|
||||
if tags != []:
|
||||
try:
|
||||
attr = tags[0].items()[0]
|
||||
if attr[0] != 'message' or attr[1] != 'ok':
|
||||
success = False
|
||||
except IndexError:
|
||||
success = False
|
||||
if not success:
|
||||
raise MatomoException(message + ' fails')
|
||||
return success
|
||||
|
||||
def call(self, data):
|
||||
data['module'] = 'API'
|
||||
data['token_auth'] = self.token_auth
|
||||
resp = requests.post(self.url_ws_base, data=data)
|
||||
tree = self.parse_response(resp.content)
|
||||
self.raise_on_error(tree)
|
||||
return tree
|
||||
|
||||
def get_site_id_from_site_url(self, url):
|
||||
data = {'method': 'SitesManager.getSitesIdFromSiteUrl',
|
||||
'url': url}
|
||||
tree = self.call(data)
|
||||
try:
|
||||
if tree.xpath('/result[not(*)]')[0].text is None:
|
||||
raise MatomoError('url not found')
|
||||
except IndexError:
|
||||
pass
|
||||
try:
|
||||
tag = tree.xpath('/result/row/idsite')[0]
|
||||
except IndexError:
|
||||
raise MatomoException('get_site_id_from_site_url fails')
|
||||
return tag.text
|
||||
|
||||
def add_site(self, site_name, site_urls):
|
||||
data = {'method': 'SitesManager.addSite',
|
||||
'siteName': site_name}
|
||||
cpt = 0
|
||||
for url in site_urls:
|
||||
key = 'urls[%i]' % cpt
|
||||
data[key] = url
|
||||
cpt += 1
|
||||
tree = self.call(data)
|
||||
try:
|
||||
tag = tree.xpath('/result')[0]
|
||||
except IndexError:
|
||||
raise MatomoException('add_site fails')
|
||||
return tag.text
|
||||
|
||||
def add_user(self, user_login, password, initial_id_site):
|
||||
data = {'method': 'UsersManager.addUser',
|
||||
'userLogin': user_login,
|
||||
'password': password,
|
||||
'email': self.email_template % user_login,
|
||||
'initialIdSite': initial_id_site}
|
||||
tree = self.call(data)
|
||||
return self.assert_success(tree, 'add_user')
|
||||
|
||||
def del_user(self, user_login):
|
||||
data = {'method': 'UsersManager.deleteUser',
|
||||
'userLogin': user_login}
|
||||
tree = self.call(data)
|
||||
return self.assert_success(tree, 'del_user')
|
||||
|
||||
def get_javascript_tag(self, id_site):
|
||||
data = {'method': 'SitesManager.getJavascriptTag',
|
||||
'idSite': id_site}
|
||||
tree = self.call(data)
|
||||
try:
|
||||
tag = tree.xpath('/result')[0]
|
||||
except IndexError:
|
||||
raise MatomoException('get_javascript_tag fails')
|
||||
return tag.text
|
||||
|
||||
def upgrade_site(matomo, tenant_name, site_urls):
|
||||
try:
|
||||
# tenant name match because it is the basename of one of registered urls
|
||||
id_site = matomo.get_site_id_from_site_url(tenant_name)
|
||||
except MatomoError as exc:
|
||||
if str(exc) == 'url not found':
|
||||
id_site = matomo.add_site(tenant_name, site_urls)
|
||||
else:
|
||||
raise exc
|
||||
return id_site
|
||||
|
||||
def upgrade_user(matomo, user_login, id_site):
|
||||
# API is not obvious to change password (need the previous one)
|
||||
try:
|
||||
matomo.del_user(user_login)
|
||||
except MatomoError:
|
||||
pass
|
||||
|
||||
# generate a password and add a new user
|
||||
characters = string.ascii_letters + string.punctuation + string.digits
|
||||
password = "".join(choice(characters) for x in range(randint(8, 16)))
|
||||
matomo.add_user(user_login, password, id_site)
|
||||
|
||||
# build the user's login url
|
||||
password_md5 = hashlib.md5(password).hexdigest()
|
||||
logme_url = '%s/index.php?module=Login&action=logme&login=%s&password=%s' % (
|
||||
matomo.url_ws_base, user_login, password_md5)
|
||||
return logme_url
|
||||
|
||||
def upgrade_javascript_tag(matomo, id_site):
|
||||
"""addapt JS return by Matomo and merge it whith previous JS code we have"""
|
||||
matomo_tag = matomo.get_javascript_tag(id_site)
|
||||
lines = matomo_tag.split('\n')
|
||||
|
||||
# acording to publik-base-theme/templates/includes/tracking.html,
|
||||
# we need to remove <script> tags from matomo's output javascript,
|
||||
regex = re.compile('</?script.*>')
|
||||
count = len(lines)
|
||||
while count > 0:
|
||||
count -= 1
|
||||
if regex.match(lines[count]):
|
||||
del lines[count]
|
||||
|
||||
# and we also need to addapt matomo HTML comments to JS
|
||||
regex = re.compile('<!-- (.*) -->')
|
||||
for count, line in enumerate(lines):
|
||||
lines[count] = regex.sub('// \\1', line)
|
||||
|
||||
# disallow cookie's time extension
|
||||
regex = re.compile(r'\s*var _paq = window._paq \|\| \[\];')
|
||||
for count, line in enumerate(lines):
|
||||
if regex.match(line):
|
||||
lines.insert(count+1, CNIL_JS)
|
||||
break
|
||||
|
||||
enhanced_tag = '\n'.join(lines)
|
||||
return enhanced_tag
|
||||
|
||||
def compute_cnil_acknowledgment_level(tracking_js):
|
||||
if tracking_js.find('google') != -1:
|
||||
# google reference found into javascript
|
||||
return 'error'
|
||||
if tracking_js.find('getOriginalVisitorCookieTimeout') == -1:
|
||||
# can't find cookie's life time extension prevention
|
||||
return 'warning'
|
||||
return 'success'
|
||||
|
||||
def auto_configure_matomo():
|
||||
"""main function"""
|
||||
tenant_name, site_urls = get_tenant_name_and_public_urls()
|
||||
if tenant_name is None:
|
||||
raise MatomoException("no portal-user's url available")
|
||||
|
||||
# update matomo account
|
||||
matomo = MatomoWS()
|
||||
id_site = upgrade_site(matomo, tenant_name, site_urls)
|
||||
logme_url = upgrade_user(matomo, tenant_name, id_site)
|
||||
tracking_js = upgrade_javascript_tag(matomo, id_site)
|
||||
|
||||
# save matomo's variables
|
||||
logme_url_var = get_variable('matomo_logme_url')
|
||||
logme_url_var.value = logme_url
|
||||
logme_url_var.save()
|
||||
put_tracking_js(tracking_js)
|
||||
return True
|
|
@ -200,6 +200,23 @@ VARIABLE_SETTINGS_DEFAULTS = {}
|
|||
|
||||
MELLON_USERNAME_TEMPLATE = '{attributes[name_id_content]}'
|
||||
|
||||
# MATOMO_SERVER: allow automatic configuration on a matomo server.
|
||||
# This variable excepts:
|
||||
# - the URL of the matomo server to connect
|
||||
# - an authentication token for a matomo admin user
|
||||
# - an email template for new emails associated with new matomo users
|
||||
# The token is available on the matomo GUI into the personal parameters
|
||||
# of the user.
|
||||
#
|
||||
# Example:
|
||||
# MATOMO_SERVER = {
|
||||
# 'URL': 'https://matomo.domain.org',
|
||||
# 'TOKEN_AUTH': '0123456789abcdef0123456789abcdef',
|
||||
# 'EMAIL_TEMPLATE': 'noreply+%s@domain.org'
|
||||
# }
|
||||
|
||||
MATOMO_SERVER = {}
|
||||
|
||||
local_settings_file = os.environ.get('HOBO_SETTINGS_FILE',
|
||||
os.path.join(os.path.dirname(__file__), 'local_settings.py'))
|
||||
if os.path.exists(local_settings_file):
|
||||
|
|
|
@ -2,4 +2,5 @@ django>=1.8,<1.9
|
|||
-e git+http://repos.entrouvert.org/gadjo.git/#egg=gadjo
|
||||
celery<4
|
||||
django-mellon
|
||||
lxml
|
||||
prometheus_client
|
||||
|
|
3
setup.py
3
setup.py
|
@ -111,7 +111,8 @@ setup(
|
|||
'django-tenant-schemas',
|
||||
'prometheus_client',
|
||||
'djangorestframework>=3.1, <3.7',
|
||||
'dnspython'
|
||||
'dnspython',
|
||||
'lxml',
|
||||
],
|
||||
zip_safe=False,
|
||||
cmdclass={
|
||||
|
|
|
@ -0,0 +1,706 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
import mock
|
||||
import pytest
|
||||
from requests import Response
|
||||
|
||||
from django.test import override_settings
|
||||
|
||||
from hobo.environment.models import Variable, Wcs, Combo, Fargo
|
||||
from hobo.matomo.utils import \
|
||||
get_variable, get_variable_value, get_tracking_js, put_tracking_js, \
|
||||
get_tenant_name_and_public_urls, MatomoError, MatomoException, MatomoWS, \
|
||||
upgrade_site, upgrade_user, upgrade_javascript_tag, \
|
||||
compute_cnil_acknowledgment_level, auto_configure_matomo
|
||||
|
||||
pytestmark = pytest.mark.django_db
|
||||
|
||||
CONFIG = {'URL': 'https://matomo.test',
|
||||
'TOKEN_AUTH': '1234',
|
||||
'EMAIL_TEMPLATE': 'noreply+%s@entrouvert.test'}
|
||||
|
||||
MATOMO_SUCCESS = """<?xml version="1.0" encoding="utf-8" ?>
|
||||
<result>
|
||||
<success message="ok" />
|
||||
</result>
|
||||
"""
|
||||
|
||||
MATOMO_ERROR = """<?xml version="1.0" encoding="utf-8" ?>
|
||||
<result>
|
||||
<error message="here is the error message" />
|
||||
</result>
|
||||
"""
|
||||
|
||||
MATOMO_BAD_RESPONSE_1 = """<?xml version="1.0" encoding="utf-8" ?>
|
||||
<result>
|
||||
<success message="KO" />
|
||||
</result>
|
||||
"""
|
||||
|
||||
MATOMO_BAD_RESPONSE_2 = """<?xml version="1.0" encoding="utf-8" ?>
|
||||
<result>
|
||||
<success>no message attribute</success>
|
||||
<not_success>no success tag</not_success>
|
||||
</result>
|
||||
"""
|
||||
|
||||
GET_SITE_42_FROM_URL = """<?xml version="1.0" encoding="utf-8" ?>
|
||||
<result>
|
||||
<row>
|
||||
<idsite>42</idsite>
|
||||
<moretags>...</moretags>
|
||||
</row>
|
||||
</result>
|
||||
"""
|
||||
|
||||
GET_NO_SITE_FROM_URL = """<?xml version="1.0" encoding="utf-8" ?>
|
||||
<result />
|
||||
"""
|
||||
|
||||
GET_SITE_BAD_QUERY = """<?xml version="1.0" encoding="utf-8" ?>
|
||||
<result>
|
||||
<error message="Please specify a value for 'url'." />
|
||||
</result>
|
||||
"""
|
||||
|
||||
GET_SITE_BAD_RESPONSE = """<?xml version="1.0" encoding="utf-8" ?>
|
||||
<result>
|
||||
<row>
|
||||
<not_idsite>there is no idsite tag</not_idsite>
|
||||
<moretags>...</moretags>
|
||||
</row>
|
||||
</result>
|
||||
"""
|
||||
|
||||
ADD_SITE_SUCCESS = """<?xml version="1.0" encoding="utf-8" ?>
|
||||
<result>42</result>
|
||||
"""
|
||||
|
||||
ADD_SITE_ERROR = """<?xml version="1.0" encoding="utf-8" ?>
|
||||
<result>
|
||||
<error message="Please specify a value for 'siteName'." />
|
||||
</result>
|
||||
"""
|
||||
|
||||
ADD_SITE_BAD_RESPONSE = """<?xml version="1.0" encoding="utf-8" ?>
|
||||
<not_result>no result tag</not_result>
|
||||
"""
|
||||
|
||||
USER_ALREADY_THERE = """<?xml version="1.0" encoding="utf-8" ?>
|
||||
<result>
|
||||
<error message="Username 'hobo.dev.publik.love' already exists." />
|
||||
</result>"""
|
||||
|
||||
MAIL_ALREADY_THERE = """<?xml version="1.0" encoding="utf-8" ?>
|
||||
<result>
|
||||
<error message="User with email 'hobo.dev.publik.love@testor.org' already exists." />
|
||||
</result>"""
|
||||
|
||||
BAD_CREDENTIAL = """<?xml version="1.0" encoding="utf-8" ?>
|
||||
<result>
|
||||
<error message="You can\'t access this resource as it requires a \'superuser\' access." />
|
||||
</result>"""
|
||||
|
||||
DEL_UNKNOWN_USER = """<?xml version="1.0" encoding="utf-8" ?>
|
||||
<result>
|
||||
<error message="User 'hobo.dev.publik.love' doesn't exist." />
|
||||
</result>
|
||||
"""
|
||||
|
||||
JAVASCRIPT_TAG = """<?xml version="1.0" encoding="utf-8" ?>
|
||||
<result><!-- Matomo -->
|
||||
<script type="text/javascript">
|
||||
var _paq = window._paq || [];
|
||||
/* tracker methods like "setCustomDimension" should be called before "trackPageView" */
|
||||
_paq.push(['trackPageView']);
|
||||
_paq.push(['enableLinkTracking']);
|
||||
(function() {
|
||||
var u="//matomo-test.entrouvert.org/";
|
||||
_paq.push(['setTrackerUrl', u+'piwik.php']);
|
||||
_paq.push(['setSiteId', '7']);
|
||||
var d=document, g=d.createElement('script'), s=d.getElementsByTagName('script')[0];
|
||||
g.type='text/javascript'; g.async=true; g.defer=true; g.src=u+'piwik.js'; s.parentNode.insertBefore(g,s);
|
||||
})();
|
||||
</script>
|
||||
<!-- End Matomo Code -->
|
||||
</result>
|
||||
"""
|
||||
|
||||
ENHANCED_JAVASCRIPT_TAG = """// Matomo
|
||||
var _paq = window._paq || [];
|
||||
|
||||
// disallow cookie's time extension
|
||||
_paq.push([function() {
|
||||
var self = this;
|
||||
function getOriginalVisitorCookieTimeout() {
|
||||
var now = new Date(),
|
||||
nowTs = Math.round(now.getTime() / 1000),
|
||||
visitorInfo = self.getVisitorInfo();
|
||||
var createTs = parseInt(visitorInfo[2]);
|
||||
var cookieTimeout = 33696000; // 13 months in seconds
|
||||
var originalTimeout = createTs + cookieTimeout - nowTs;
|
||||
return originalTimeout;
|
||||
}
|
||||
this.setVisitorCookieTimeout( getOriginalVisitorCookieTimeout() );
|
||||
}]);
|
||||
|
||||
/* tracker methods like "setCustomDimension" should be called before "trackPageView" */
|
||||
_paq.push(['trackPageView']);
|
||||
_paq.push(['enableLinkTracking']);
|
||||
(function() {
|
||||
var u="//matomo-test.entrouvert.org/";
|
||||
_paq.push(['setTrackerUrl', u+'piwik.php']);
|
||||
_paq.push(['setSiteId', '7']);
|
||||
var d=document, g=d.createElement('script'), s=d.getElementsByTagName('script')[0];
|
||||
g.type='text/javascript'; g.async=true; g.defer=true; g.src=u+'piwik.js'; s.parentNode.insertBefore(g,s);
|
||||
})();
|
||||
// End Matomo Code
|
||||
"""
|
||||
|
||||
JAVASCRIPT_TAG_BAD_RESPONSE = """<?xml version="1.0" encoding="utf-8" ?>
|
||||
<no_result_tag/>
|
||||
"""
|
||||
|
||||
def requests_post_mocked_replies(contents):
|
||||
"""buid an iterator for mock's side_effect parameter"""
|
||||
responses = []
|
||||
for content in contents:
|
||||
response = Response()
|
||||
response._content = content
|
||||
response.status_code = 200
|
||||
responses.append(response)
|
||||
return responses
|
||||
|
||||
def test_get_variable():
|
||||
"""hobo variables from"""
|
||||
|
||||
# create the variable with '' value if not there
|
||||
id_site_var = get_variable('name1')
|
||||
assert id_site_var.value == ''
|
||||
|
||||
# retrieve the variable if already there
|
||||
Variable.objects.create(name='name2', value='42')
|
||||
id_site_var = get_variable('name2')
|
||||
assert id_site_var.value == '42'
|
||||
|
||||
def test_get_variable_value():
|
||||
"""hobo variables from DB"""
|
||||
|
||||
# variable not there: return default value
|
||||
assert get_variable_value('name1') == ''
|
||||
assert get_variable_value('name2', default='42') == '42'
|
||||
|
||||
# variable already there
|
||||
get_variable('name3', '42')
|
||||
assert get_variable_value('name3') == '42'
|
||||
|
||||
def test_get_tenant_name_and_public_urls():
|
||||
Combo.objects.create(base_url='https://combo.dev.publik.love',
|
||||
template_name='portal-user')
|
||||
Wcs.objects.create(base_url='https://wcs.dev.publik.love')
|
||||
Fargo.objects.create(base_url='https://fargo.dev.publik.love')
|
||||
tenant_name, site_urls = get_tenant_name_and_public_urls()
|
||||
assert tenant_name == 'combo.dev.publik.love'
|
||||
assert site_urls[2] == 'https://fargo.dev.publik.love/'
|
||||
|
||||
def test_matomo_constructor():
|
||||
"""build the matomo webservice object"""
|
||||
with override_settings(MATOMO_SERVER=CONFIG):
|
||||
matomo = MatomoWS()
|
||||
assert matomo.url_ws_base == 'https://matomo.test'
|
||||
assert matomo.token_auth == '1234'
|
||||
|
||||
with override_settings(MATOMO_SERVER={}):
|
||||
try:
|
||||
matomo = MatomoWS()
|
||||
except MatomoError as exc:
|
||||
assert str(exc) == "no settings for matomo: 'URL'"
|
||||
else:
|
||||
assert False
|
||||
|
||||
def test_parse_response():
|
||||
"""parser used by all matomo webservice calls"""
|
||||
with override_settings(MATOMO_SERVER=CONFIG):
|
||||
matomo = MatomoWS()
|
||||
|
||||
# no error (expected format)
|
||||
content = """<?xml version="1.0" encoding="utf-8" ?><ok/>"""
|
||||
tree = matomo.parse_response(content)
|
||||
assert tree.tag == 'ok'
|
||||
|
||||
# error (not XML format)
|
||||
content = """this is not XML"""
|
||||
try:
|
||||
tree = matomo.parse_response(content)
|
||||
except MatomoException as exc:
|
||||
assert str(exc).find("XMLSyntaxError: Start tag expected") != -1
|
||||
else:
|
||||
assert False
|
||||
|
||||
def test_parse_error_message():
|
||||
"""error handler used by all matomo webservice calls"""
|
||||
with override_settings(MATOMO_SERVER=CONFIG):
|
||||
matomo = MatomoWS()
|
||||
|
||||
# no error (expected format)
|
||||
content = """<?xml version="1.0" encoding="utf-8" ?><ok/>"""
|
||||
tree = matomo.parse_response(content)
|
||||
matomo.raise_on_error(tree)
|
||||
assert tree.tag == 'ok'
|
||||
|
||||
# error (expected format)
|
||||
content = """<?xml version="1.0" encoding="utf-8" ?>
|
||||
<result>
|
||||
<error message="here is the error message" />
|
||||
</result>
|
||||
"""
|
||||
tree = matomo.parse_response(content)
|
||||
try:
|
||||
matomo.raise_on_error(tree)
|
||||
except MatomoError as exc:
|
||||
assert str(exc) == 'here is the error message'
|
||||
else:
|
||||
assert False
|
||||
|
||||
# error (unexpected format)
|
||||
content = """<?xml version="1.0" encoding="utf-8" ?>
|
||||
<result>
|
||||
<error>no 'message' attribute here</error>
|
||||
</result>
|
||||
"""
|
||||
tree = matomo.parse_response(content)
|
||||
try:
|
||||
matomo.raise_on_error(tree)
|
||||
except MatomoException as exc:
|
||||
assert str(exc) == 'internal error'
|
||||
else:
|
||||
assert False
|
||||
|
||||
@mock.patch('requests.post')
|
||||
def test_assert_success(mocked_post):
|
||||
"""webservice to add new user"""
|
||||
with override_settings(MATOMO_SERVER=CONFIG):
|
||||
matomo = MatomoWS()
|
||||
|
||||
# success
|
||||
tree = matomo.parse_response(MATOMO_SUCCESS)
|
||||
matomo.raise_on_error(tree)
|
||||
assert matomo.assert_success(tree, 'me') is True
|
||||
|
||||
# error (KO instead of ok)
|
||||
tree = matomo.parse_response(MATOMO_BAD_RESPONSE_1)
|
||||
matomo.raise_on_error(tree)
|
||||
try:
|
||||
matomo.assert_success(tree, 'me')
|
||||
except MatomoException as exc:
|
||||
assert str(exc).find('me fails') != -1
|
||||
else:
|
||||
assert False
|
||||
|
||||
# error (no message attribute)
|
||||
tree = matomo.parse_response(MATOMO_BAD_RESPONSE_2)
|
||||
matomo.raise_on_error(tree)
|
||||
try:
|
||||
matomo.assert_success(tree, 'me')
|
||||
except MatomoException as exc:
|
||||
assert str(exc).find('me fails') != -1
|
||||
else:
|
||||
assert False
|
||||
|
||||
@mock.patch('requests.post')
|
||||
def test_get_site_from_site_url(mocked_post):
|
||||
"""webservice to test if the site is already registered"""
|
||||
with override_settings(MATOMO_SERVER=CONFIG):
|
||||
matomo = MatomoWS()
|
||||
|
||||
# site already here
|
||||
content = GET_SITE_42_FROM_URL
|
||||
mocked_post.return_value.content = content
|
||||
assert matomo.get_site_id_from_site_url('combo.dev.publik.love') == '42'
|
||||
|
||||
# no such url
|
||||
content = GET_NO_SITE_FROM_URL
|
||||
mocked_post.return_value.content = content
|
||||
try:
|
||||
matomo.get_site_id_from_site_url('combo.dev.publik.love')
|
||||
except MatomoError as exc:
|
||||
assert str(exc).find('url not found') != -1
|
||||
else:
|
||||
assert False
|
||||
|
||||
# error on empty id
|
||||
content = GET_SITE_BAD_QUERY
|
||||
mocked_post.return_value.content = content
|
||||
try:
|
||||
matomo.get_site_id_from_site_url('combo.dev.publik.love')
|
||||
except MatomoError as exc:
|
||||
assert str(exc) == "Please specify a value for 'url'."
|
||||
else:
|
||||
assert False
|
||||
|
||||
# bad response (error on success response)
|
||||
content = GET_SITE_BAD_RESPONSE
|
||||
mocked_post.return_value.content = content
|
||||
try:
|
||||
matomo.get_site_id_from_site_url('combo.dev.publik.love')
|
||||
except MatomoException as exc:
|
||||
assert str(exc) == 'get_site_id_from_site_url fails'
|
||||
else:
|
||||
assert False
|
||||
|
||||
@mock.patch('requests.post')
|
||||
def test_add_site(mocked_post):
|
||||
"""webservice to add a new site"""
|
||||
urls = ['https://combo.dev.publik.love',
|
||||
'https://wcs.dev.publik.love']
|
||||
with override_settings(MATOMO_SERVER=CONFIG):
|
||||
matomo = MatomoWS()
|
||||
|
||||
# success
|
||||
content = ADD_SITE_SUCCESS
|
||||
mocked_post.return_value.content = content
|
||||
site_id = matomo.add_site("hobo.dev.publik.love", urls)
|
||||
assert site_id == '42'
|
||||
|
||||
# error
|
||||
content = ADD_SITE_ERROR
|
||||
mocked_post.return_value.content = content
|
||||
try:
|
||||
site_id = matomo.add_site("hobo.dev.publik.love", urls)
|
||||
except MatomoError as exc:
|
||||
assert str(exc) == "Please specify a value for 'siteName'."
|
||||
else:
|
||||
assert False
|
||||
|
||||
# strange message
|
||||
content = ADD_SITE_BAD_RESPONSE
|
||||
mocked_post.return_value.content = content
|
||||
try:
|
||||
site_id = matomo.add_site("hobo.dev.publik.love", urls)
|
||||
except MatomoException as exc:
|
||||
assert str(exc) == 'add_site fails'
|
||||
else:
|
||||
assert False
|
||||
|
||||
@mock.patch('requests.post')
|
||||
def test_add_user(mocked_post):
|
||||
"""webservice to add new user"""
|
||||
with override_settings(MATOMO_SERVER=CONFIG):
|
||||
matomo = MatomoWS()
|
||||
|
||||
# success
|
||||
content = MATOMO_SUCCESS
|
||||
mocked_post.return_value.content = content
|
||||
matomo.add_user('hobo.dev.publik.love', 'xxx', '42')
|
||||
assert True
|
||||
|
||||
# error (user already here)
|
||||
content = USER_ALREADY_THERE
|
||||
mocked_post.return_value.content = content
|
||||
try:
|
||||
matomo.add_user('hobo.dev.publik.love', 'xxx', '42')
|
||||
except MatomoError as exc:
|
||||
assert str(exc).find("Username 'hobo.dev.publik.love' already") != -1
|
||||
else:
|
||||
assert False
|
||||
|
||||
# error (mail already registered)
|
||||
content = MAIL_ALREADY_THERE
|
||||
mocked_post.return_value.content = content
|
||||
try:
|
||||
matomo.add_user('hobo.dev.publik.love', 'xxx', '42')
|
||||
except MatomoError as exc:
|
||||
assert str(exc).find("email 'hobo.dev.publik.love@testor.org'") != -1
|
||||
else:
|
||||
assert False
|
||||
|
||||
# error (bad credentials)
|
||||
content = BAD_CREDENTIAL
|
||||
mocked_post.return_value.content = content
|
||||
try:
|
||||
matomo.add_user('hobo.dev.publik.love', 'xxx', '42')
|
||||
except MatomoError as exc:
|
||||
assert str(exc).find("You can\'t access this resource") != -1
|
||||
else:
|
||||
assert False
|
||||
|
||||
# bad success message (wrong attribute value)
|
||||
content = MATOMO_BAD_RESPONSE_1
|
||||
mocked_post.return_value.content = content
|
||||
try:
|
||||
matomo.add_user('hobo.dev.publik.love', 'xxx', '42')
|
||||
except MatomoException as exc:
|
||||
assert str(exc) == 'add_user fails'
|
||||
else:
|
||||
assert False
|
||||
|
||||
# bad success message (no message attribute)
|
||||
content = MATOMO_BAD_RESPONSE_2
|
||||
mocked_post.return_value.content = content
|
||||
try:
|
||||
matomo.add_user('hobo.dev.publik.love', 'xxx', '42')
|
||||
except MatomoException as exc:
|
||||
assert str(exc) == 'add_user fails'
|
||||
else:
|
||||
assert False
|
||||
|
||||
@mock.patch('requests.post')
|
||||
def test_del_user(mocked_post):
|
||||
"""webservice to add new user"""
|
||||
with override_settings(MATOMO_SERVER=CONFIG):
|
||||
matomo = MatomoWS()
|
||||
|
||||
# success
|
||||
content = MATOMO_SUCCESS
|
||||
mocked_post.return_value.content = content
|
||||
matomo.del_user('hobo.dev.publik.love')
|
||||
assert True
|
||||
|
||||
# error (unknown user)
|
||||
content = DEL_UNKNOWN_USER
|
||||
mocked_post.return_value.content = content
|
||||
try:
|
||||
matomo.del_user('hobo.dev.publik.love')
|
||||
except MatomoError as exc:
|
||||
assert str(exc).find("User 'hobo.dev.publik.love' doesn't exist.") != -1
|
||||
else:
|
||||
assert False
|
||||
|
||||
@mock.patch('requests.post')
|
||||
def test_get_javascript_tag(mocked_post):
|
||||
"""webservice to get matomo JS tag"""
|
||||
with override_settings(MATOMO_SERVER=CONFIG):
|
||||
matomo = MatomoWS()
|
||||
|
||||
# success
|
||||
content = JAVASCRIPT_TAG
|
||||
mocked_post.return_value.content = content
|
||||
javascript_tag = matomo.get_javascript_tag('42')
|
||||
assert javascript_tag.find('(function() {') != -1
|
||||
|
||||
# error (bad credentials)
|
||||
content = BAD_CREDENTIAL
|
||||
mocked_post.return_value.content = content
|
||||
try:
|
||||
javascript_tag = matomo.get_javascript_tag('42')
|
||||
except MatomoError as exc:
|
||||
assert str(exc).find("You can't access this resource ") != -1
|
||||
else:
|
||||
assert False
|
||||
|
||||
# bad response (no result tag)
|
||||
content = JAVASCRIPT_TAG_BAD_RESPONSE
|
||||
mocked_post.return_value.content = content
|
||||
try:
|
||||
javascript_tag = matomo.get_javascript_tag('42')
|
||||
except MatomoException as exc:
|
||||
assert str(exc) == 'get_javascript_tag fails'
|
||||
else:
|
||||
assert False
|
||||
|
||||
@mock.patch('requests.post')
|
||||
def test_upgrade_site(mocked_post):
|
||||
"""function to test if the site is already regisered"""
|
||||
urls = ['https://combo.dev.publik.love',
|
||||
'https://wcs.dev.publik.love']
|
||||
with override_settings(MATOMO_SERVER=CONFIG):
|
||||
matomo = MatomoWS()
|
||||
|
||||
# site not already here
|
||||
contents = [GET_NO_SITE_FROM_URL, ADD_SITE_SUCCESS]
|
||||
mocked_post.side_effect = requests_post_mocked_replies(contents)
|
||||
site_id = upgrade_site(matomo, "hobo.dev.publik.love", urls)
|
||||
assert site_id == '42'
|
||||
|
||||
# site already here
|
||||
contents = [GET_SITE_42_FROM_URL]
|
||||
mocked_post.side_effect = requests_post_mocked_replies(contents)
|
||||
site_id = upgrade_site(matomo, "hobo.dev.publik.love", urls)
|
||||
assert site_id == '42'
|
||||
|
||||
# error while adding new site
|
||||
contents = [GET_NO_SITE_FROM_URL, MATOMO_ERROR]
|
||||
mocked_post.side_effect = requests_post_mocked_replies(contents)
|
||||
try:
|
||||
upgrade_site(matomo, "hobo.dev.publik.love", urls)
|
||||
except MatomoException as exc:
|
||||
assert True
|
||||
else:
|
||||
assert False
|
||||
|
||||
# error while looking for site already there
|
||||
contents = [MATOMO_ERROR]
|
||||
mocked_post.side_effect = requests_post_mocked_replies(contents)
|
||||
try:
|
||||
upgrade_site(matomo, "hobo.dev.publik.love", urls)
|
||||
except MatomoException as exc:
|
||||
assert str(exc) == 'here is the error message'
|
||||
else:
|
||||
assert False
|
||||
|
||||
@mock.patch('requests.post')
|
||||
def test_upgrade_user(mocked_post):
|
||||
"""function to assert we have a user"""
|
||||
with override_settings(MATOMO_SERVER=CONFIG):
|
||||
matomo = MatomoWS()
|
||||
|
||||
# success (add a new user)
|
||||
contents = [DEL_UNKNOWN_USER, MATOMO_SUCCESS]
|
||||
mocked_post.side_effect = requests_post_mocked_replies(contents)
|
||||
logme_url = upgrade_user(matomo, 'hobo.dev.publik.love', '42')
|
||||
assert logme_url.find('action=logme&login=hobo.dev.publik.love') != -1
|
||||
|
||||
# success (user already here)
|
||||
contents = [MATOMO_SUCCESS, MATOMO_SUCCESS]
|
||||
mocked_post.side_effect = requests_post_mocked_replies(contents)
|
||||
logme_url = upgrade_user(matomo, 'hobo.dev.publik.love', '42')
|
||||
assert logme_url.find('action=logme&login=hobo.dev.publik.love') != -1
|
||||
|
||||
# recover on error (del user fails)
|
||||
contents = [MATOMO_ERROR, MATOMO_SUCCESS]
|
||||
mocked_post.side_effect = requests_post_mocked_replies(contents)
|
||||
logme_url = upgrade_user(matomo, 'hobo.dev.publik.love', '42')
|
||||
assert logme_url.find('action=logme&login=hobo.dev.publik.love') != -1
|
||||
|
||||
# error (add user fails)
|
||||
contents = [MATOMO_SUCCESS, MATOMO_ERROR]
|
||||
mocked_post.side_effect = requests_post_mocked_replies(contents)
|
||||
try:
|
||||
upgrade_user(matomo, 'hobo.dev.publik.love', '42')
|
||||
except MatomoError:
|
||||
assert True
|
||||
else:
|
||||
assert False
|
||||
|
||||
def test_compute_cnil_acknowledgment_level():
|
||||
"""function use to inspect javascript content"""
|
||||
warning_content = JAVASCRIPT_TAG
|
||||
|
||||
# can't find cookie's life time extension prevention
|
||||
assert compute_cnil_acknowledgment_level(warning_content) == 'warning'
|
||||
|
||||
# ok
|
||||
success_content = warning_content + '\n...getOriginalVisitorCookieTimeout...'
|
||||
assert compute_cnil_acknowledgment_level(success_content) == 'success'
|
||||
|
||||
# google reference found into javascript
|
||||
error_content = success_content + '\n...google...'
|
||||
assert compute_cnil_acknowledgment_level(error_content) == 'error'
|
||||
|
||||
def test_get_tracking_js():
|
||||
"""read previous tracking JS from hobo variables"""
|
||||
var1 = get_variable('cnil_compliant_visits_tracking_js', 'content1')
|
||||
assert get_tracking_js() == 'content1'
|
||||
|
||||
var1.delete()
|
||||
var1 = get_variable('cnil_compliant_visits_tracking_js', '')
|
||||
var2 = get_variable('visits_tracking_js', 'content2')
|
||||
assert get_tracking_js() == 'content2'
|
||||
|
||||
var1.delete()
|
||||
var2.delete()
|
||||
get_variable('cnil_compliant_visits_tracking_js', 'content1')
|
||||
get_variable('visits_tracking_js', 'content2')
|
||||
assert get_tracking_js() == "content1content2"
|
||||
|
||||
def test_put_tracking_js():
|
||||
"""write tracking js into hobo variables:
|
||||
- visits_tracking_js: a banner will be displayed (javascript may be not removed)
|
||||
- cnil_compliant_visits_tracking_js: javascript is dislayed normally
|
||||
"""
|
||||
# JS is stored into 'cnil_compliant_visits_tracking_js'
|
||||
put_tracking_js('/* no gafa => no banner */')
|
||||
value1 = get_variable_value('cnil_compliant_visits_tracking_js', 'undefined')
|
||||
value2 = get_variable_value('visits_tracking_js', 'undefined')
|
||||
assert value1 == '/* no gafa => no banner */'
|
||||
assert value2 == 'undefined'
|
||||
|
||||
# JS is stord into 'visits_tracking_js'
|
||||
put_tracking_js('/* google => banner */')
|
||||
value1 = get_variable_value('cnil_compliant_visits_tracking_js', 'undefined')
|
||||
value2 = get_variable_value('visits_tracking_js', 'undefined')
|
||||
assert value1 == 'undefined'
|
||||
assert value2 == '/* google => banner */'
|
||||
|
||||
# test we remove variables when no more used
|
||||
put_tracking_js('')
|
||||
value1 = get_variable_value('cnil_compliant_visits_tracking_js', 'undefined')
|
||||
value2 = get_variable_value('visits_tracking_js', 'undefined')
|
||||
assert value1 == 'undefined'
|
||||
assert value2 == 'undefined'
|
||||
|
||||
@mock.patch('requests.post')
|
||||
def test_upgrade_javascript_tag(mocked_post):
|
||||
"""function to get matomo JS tag"""
|
||||
with override_settings(MATOMO_SERVER=CONFIG):
|
||||
matomo = MatomoWS()
|
||||
|
||||
# success
|
||||
content = JAVASCRIPT_TAG
|
||||
mocked_post.return_value.content = content
|
||||
javascript_tag = upgrade_javascript_tag(matomo, '42')
|
||||
assert javascript_tag.find('(function() {') != -1
|
||||
assert javascript_tag.find('<script') == -1
|
||||
assert javascript_tag.find('script>') == -1
|
||||
assert javascript_tag == ENHANCED_JAVASCRIPT_TAG
|
||||
assert compute_cnil_acknowledgment_level(javascript_tag) == 'success'
|
||||
|
||||
@mock.patch('requests.post')
|
||||
def test_auto_configure_matomo(mocked_post):
|
||||
tracking_js_var = get_variable('visits_tracking_js', 'js_code')
|
||||
logme_url_var = get_variable('matomo_logme_url', '')
|
||||
|
||||
Combo.objects.create(base_url='https://combo.dev.publik.love',
|
||||
template_name='portal-user')
|
||||
Wcs.objects.create(base_url='https://wcs.dev.publik.love')
|
||||
Fargo.objects.create(base_url='https://fargo.dev.publik.love')
|
||||
|
||||
with override_settings(MATOMO_SERVER=CONFIG):
|
||||
contents = [GET_NO_SITE_FROM_URL, ADD_SITE_SUCCESS,
|
||||
DEL_UNKNOWN_USER, MATOMO_SUCCESS,
|
||||
JAVASCRIPT_TAG]
|
||||
mocked_post.side_effect = requests_post_mocked_replies(contents)
|
||||
assert auto_configure_matomo() is True
|
||||
logme_url_var = get_variable('matomo_logme_url')
|
||||
assert logme_url_var.value != ''
|
||||
tracking_js_var = get_variable('visits_tracking_js')
|
||||
assert tracking_js_var.value == ''
|
||||
tracking_js2_var = get_variable('cnil_compliant_visits_tracking_js')
|
||||
assert tracking_js2_var.value != ''
|
||||
|
||||
@mock.patch('requests.post')
|
||||
def test_auto_configure_matomo_no_url(mocked_post):
|
||||
# no Wc url so as to raise
|
||||
Wcs.objects.create(base_url='https://wcs.dev.publik.love')
|
||||
Fargo.objects.create(base_url='https://fargo.dev.publik.love')
|
||||
|
||||
with override_settings(MATOMO_SERVER=CONFIG):
|
||||
try:
|
||||
auto_configure_matomo()
|
||||
except MatomoException as exc:
|
||||
assert str(exc) == "no portal-user's url available"
|
||||
else:
|
||||
assert False
|
||||
|
||||
@mock.patch('requests.post')
|
||||
def test_auto_configure_matomo_error(mocked_post):
|
||||
tracking_js_var = get_variable('visits_tracking_js', 'js_code')
|
||||
|
||||
Combo.objects.create(base_url='https://combo.dev.publik.love',
|
||||
template_name='portal-user')
|
||||
Wcs.objects.create(base_url='https://wcs.dev.publik.love')
|
||||
Fargo.objects.create(base_url='https://fargo.dev.publik.love')
|
||||
|
||||
with override_settings(MATOMO_SERVER=CONFIG):
|
||||
contents = [GET_NO_SITE_FROM_URL, ADD_SITE_SUCCESS,
|
||||
DEL_UNKNOWN_USER, MATOMO_SUCCESS,
|
||||
JAVASCRIPT_TAG_BAD_RESPONSE]
|
||||
mocked_post.side_effect = requests_post_mocked_replies(contents)
|
||||
try:
|
||||
auto_configure_matomo()
|
||||
except MatomoException as exc:
|
||||
assert str(exc) == "get_javascript_tag fails"
|
||||
else:
|
||||
assert False
|
||||
tracking_js_var = get_variable('visits_tracking_js')
|
||||
assert tracking_js_var.value == 'js_code'
|
Loading…
Reference in New Issue