tests: add test to agent worker (#40098)

This commit is contained in:
Nicolas Roche 2020-02-25 11:53:56 +01:00
parent 4323e97b98
commit 8f8b71736b
1 changed files with 170 additions and 0 deletions

View File

@ -0,0 +1,170 @@
# -*- coding: utf-8 -*-
import importlib
import json
import mock
import os
import pytest
import six
from hobo.agent.worker.services import deploy, notify
from hobo.agent.worker import settings
ENVIRONMENT = {
u'timestamp': '2022-02-22',
u'services': [
{
u'service-id': u'hobo',
u'slug': u'hobo',
u'title': u'Hobo primary',
u'base_url': u'https://hobo1.dev.publik.love/',
u'this': True,
u'secret_key': u'1nesüper5Cr!eteKAaY~',
},
{
u'service-id': u'combo',
u'slug': u'portal',
u'title': u'Compte citoyen',
u'base_url': u'https://combo.dev.publik.love/',
u'secret_key': u'1nesüper5Cr!eteKAaY~',
},
{
u'service-id': u'wcs',
u'slug': u'eservices',
u'title': u'Démarches',
u'base_url': u'https://wcs.dev.publik.love/',
u'secret_key': u'1nesüper5Cr!eteKAaY~',
},
{
u'service-id': u'hobo',
u'slug': u'hobo',
u'title': u'skip beacause of agent host patterns',
u'base_url': u'https://hobo2.dev.publik.love/',
u'secondary': True,
u'secret_key': u'1nesüper5Cr!eteKAaY~',
},
{
u'service-id': u'passerelle',
u'slug': u'passerelle',
u'title': u'skipped because no secret_key',
u'base_url': u'https://passerelle.dev.publik.love/',
},
{
u'service-id': u'unknown',
u'title': u'skipped because unknown service-id',
}
]}
LOCAL_SETTINGS = """
AGENT_HOST_PATTERNS = {
'wcs': ['*.dev.publik.love'],
'hobo': ['*hobo1*', '!*hobo2*'],
}
"""
NOTIFICATION = {
'@type': 'provision',
'audience': ['https://combo.dev.publik.love/'],
'objects': {
'@type': 'role',
'data': [
{
'uuid': '12345',
'name': 'Service petite enfance',
'slug': 'service-petite-enfance',
'description': 'Role du service petite enfance',
}]}}
@pytest.fixture
def local_settings(tmpdir):
path = os.path.join(str(tmpdir), 'local_settings.py')
open(path, 'w').write(LOCAL_SETTINGS)
os.environ['HOBO_AGENT_SETTINGS_FILE'] = path
@mock.patch('hobo.agent.worker.services.os.path.exists', return_value=True)
@mock.patch('hobo.agent.worker.services.subprocess')
def test_deploy(mocked_subprocess, mocked_exists):
mocked_communicate = mock.Mock(return_value=('', ''))
mocked_opened = mock.Mock(communicate=mocked_communicate, returncode=0)
mocked_subprocess.Popen = mock.Mock(return_value=mocked_opened)
deploy(ENVIRONMENT)
# process called
mock_calls = set(x[1][0] for x in mocked_subprocess.Popen.mock_calls)
assert len(mock_calls) == 4
assert '/usr/bin/hobo-manage hobo_deploy https://hobo1.dev.publik.love/ -' in mock_calls
assert '/usr/bin/wcsctl hobo_deploy https://wcs.dev.publik.love/ -' in mock_calls
assert '/usr/lib/combo/manage.py hobo_deploy https://combo.dev.publik.love/ -' in mock_calls
assert '/usr/bin/hobo-manage hobo_deploy https://hobo2.dev.publik.love/ -' in mock_calls
# environment sent
mock_calls = mocked_communicate.mock_calls
assert len(mock_calls) == 4
for i in range(0, len(mock_calls)):
assert json.loads(mock_calls[0][2]['input']) == ENVIRONMENT
mocked_opened.returncode = 1
with pytest.raises(RuntimeError, match='failed: '):
deploy(ENVIRONMENT)
@mock.patch('hobo.agent.worker.services.os.path.exists', return_value=True)
@mock.patch('hobo.agent.worker.services.subprocess')
def test_deploy_host_with_agent_patterns(mocked_subprocess, mocked_exists, local_settings):
mocked_communicate = mock.Mock(return_value=('', ''))
mocked_opened = mock.Mock(communicate=mocked_communicate, returncode=0)
mocked_subprocess.Popen = mock.Mock(return_value=mocked_opened)
# load AGENT_HOST_PATTERNS from local_settings fixture
if six.PY2:
reload(settings)
else:
importlib.reload(settings)
deploy(ENVIRONMENT)
# process called
mock_calls = set(x[1][0] for x in mocked_subprocess.Popen.mock_calls)
assert len(mock_calls) == 3
assert '/usr/bin/hobo-manage hobo_deploy https://hobo1.dev.publik.love/ -' in mock_calls
assert '/usr/bin/wcsctl hobo_deploy https://wcs.dev.publik.love/ -' in mock_calls
assert '/usr/lib/combo/manage.py hobo_deploy https://combo.dev.publik.love/ -' in mock_calls
@mock.patch('hobo.agent.worker.services.os.path.exists', return_value=True)
@mock.patch('hobo.agent.worker.services.os.listdir')
@mock.patch('hobo.agent.worker.services.subprocess')
def test_notify(mocked_subprocess, mocked_listdir, mocked_exists):
def listdir(path):
return '%s.dev.publik.love' % path.split('/')[3]
mocked_listdir.side_effect = listdir
mocked_communicate = mock.Mock(return_value=('', ''))
mocked_opened = mock.Mock(communicate=mocked_communicate, returncode=0)
mocked_subprocess.Popen = mock.Mock(return_value=mocked_opened)
notify(NOTIFICATION)
# process called
mock_calls = set(x[1][0] for x in mocked_subprocess.Popen.mock_calls)
assert mock_calls == {'/usr/lib/combo/manage.py hobo_notify -'}
# notification sent
mock_calls = mocked_communicate.mock_calls
assert len(mock_calls) == 1
assert json.loads(mock_calls[0][2]['input'])['objects']['data'][0]['uuid'] == '12345'
mocked_opened.returncode = 1
with pytest.raises(RuntimeError, match='failed: '):
notify(NOTIFICATION)
# ignore subprocess error
mocked_communicate.reset_mock()
mocked_subprocess.Popen.side_effect = OSError
notify(NOTIFICATION)
@mock.patch('hobo.agent.worker.services.os.path.exists', return_value=False)
@mock.patch('hobo.agent.worker.services.subprocess')
def test_notify_no_tenant_path(mocked_subprocess, mocked_exists):
mocked_communicate = mock.Mock(return_value=('', ''))
mocked_opened = mock.Mock(communicate=mocked_communicate, returncode=0)
mocked_subprocess.Popen = mock.Mock(return_value=mocked_opened)
notify(NOTIFICATION)
mock_calls = mocked_communicate.mock_calls
assert len(mock_calls) == 0