169 lines
6.2 KiB
Python
169 lines
6.2 KiB
Python
# -*- coding: utf-8 -*-
|
|
import importlib
|
|
import json
|
|
import mock
|
|
import os
|
|
import pytest
|
|
|
|
from django.utils.encoding import force_text
|
|
|
|
from hobo.agent.worker.services import deploy, notify
|
|
from hobo.agent.worker import settings
|
|
|
|
|
|
ENVIRONMENT = {
|
|
u'timestamp': '2022-02-22',
|
|
u'services': [
|
|
{
|
|
u'service-id': u'hobo',
|
|
u'slug': u'hobo',
|
|
u'title': u'Hobo primary',
|
|
u'base_url': u'https://hobo1.dev.publik.love/',
|
|
u'this': True,
|
|
u'secret_key': u'1nesüper5Cr!eteKAaY~',
|
|
},
|
|
{
|
|
u'service-id': u'combo',
|
|
u'slug': u'portal',
|
|
u'title': u'Compte citoyen',
|
|
u'base_url': u'https://combo.dev.publik.love/',
|
|
u'secret_key': u'1nesüper5Cr!eteKAaY~',
|
|
},
|
|
{
|
|
u'service-id': u'wcs',
|
|
u'slug': u'eservices',
|
|
u'title': u'Démarches',
|
|
u'base_url': u'https://wcs.dev.publik.love/',
|
|
u'secret_key': u'1nesüper5Cr!eteKAaY~',
|
|
},
|
|
{
|
|
u'service-id': u'hobo',
|
|
u'slug': u'hobo',
|
|
u'title': u'skip beacause of agent host patterns',
|
|
u'base_url': u'https://hobo2.dev.publik.love/',
|
|
u'secondary': True,
|
|
u'secret_key': u'1nesüper5Cr!eteKAaY~',
|
|
},
|
|
{
|
|
u'service-id': u'passerelle',
|
|
u'slug': u'passerelle',
|
|
u'title': u'skipped because no secret_key',
|
|
u'base_url': u'https://passerelle.dev.publik.love/',
|
|
},
|
|
{
|
|
u'service-id': u'unknown',
|
|
u'title': u'skipped because unknown service-id',
|
|
}
|
|
]}
|
|
|
|
LOCAL_SETTINGS = """
|
|
AGENT_HOST_PATTERNS = {
|
|
'wcs': ['*.dev.publik.love'],
|
|
'hobo': ['*hobo1*', '!*hobo2*'],
|
|
}
|
|
"""
|
|
|
|
NOTIFICATION = {
|
|
'@type': 'provision',
|
|
'audience': ['https://combo.dev.publik.love/'],
|
|
'objects': {
|
|
'@type': 'role',
|
|
'data': [
|
|
{
|
|
'uuid': '12345',
|
|
'name': 'Service petite enfance',
|
|
'slug': 'service-petite-enfance',
|
|
'description': 'Role du service petite enfance',
|
|
}]}}
|
|
|
|
|
|
@pytest.fixture
|
|
def local_settings(tmpdir):
|
|
path = os.path.join(str(tmpdir), 'local_settings.py')
|
|
open(path, 'w').write(LOCAL_SETTINGS)
|
|
os.environ['HOBO_AGENT_SETTINGS_FILE'] = path
|
|
|
|
@mock.patch('hobo.agent.worker.services.os.path.exists', return_value=True)
|
|
@mock.patch('hobo.agent.worker.services.subprocess')
|
|
def test_deploy(mocked_subprocess, mocked_exists):
|
|
mocked_communicate = mock.Mock(return_value=('', ''))
|
|
mocked_opened = mock.Mock(communicate=mocked_communicate, returncode=0)
|
|
mocked_subprocess.Popen = mock.Mock(return_value=mocked_opened)
|
|
deploy(ENVIRONMENT)
|
|
|
|
# process called
|
|
mock_calls = set(x[1][0] for x in mocked_subprocess.Popen.mock_calls)
|
|
assert len(mock_calls) == 4
|
|
assert '/usr/bin/hobo-manage hobo_deploy https://hobo1.dev.publik.love/ -' in mock_calls
|
|
assert '/usr/bin/wcsctl hobo_deploy https://wcs.dev.publik.love/ -' in mock_calls
|
|
assert '/usr/lib/combo/manage.py hobo_deploy https://combo.dev.publik.love/ -' in mock_calls
|
|
assert '/usr/bin/hobo-manage hobo_deploy https://hobo2.dev.publik.love/ -' in mock_calls
|
|
|
|
# environment sent
|
|
mock_calls = mocked_communicate.mock_calls
|
|
assert len(mock_calls) == 4
|
|
for i in range(0, len(mock_calls)):
|
|
assert json.loads(force_text(mock_calls[0][2]['input'])) == ENVIRONMENT
|
|
|
|
mocked_opened.returncode = 1
|
|
with pytest.raises(RuntimeError, match='failed: '):
|
|
deploy(ENVIRONMENT)
|
|
|
|
@mock.patch('hobo.agent.worker.services.os.path.exists', return_value=True)
|
|
@mock.patch('hobo.agent.worker.services.subprocess')
|
|
def test_deploy_host_with_agent_patterns(mocked_subprocess, mocked_exists, local_settings):
|
|
mocked_communicate = mock.Mock(return_value=('', ''))
|
|
mocked_opened = mock.Mock(communicate=mocked_communicate, returncode=0)
|
|
mocked_subprocess.Popen = mock.Mock(return_value=mocked_opened)
|
|
|
|
# load AGENT_HOST_PATTERNS from local_settings fixture
|
|
importlib.reload(settings)
|
|
deploy(ENVIRONMENT)
|
|
# process called
|
|
mock_calls = set(x[1][0] for x in mocked_subprocess.Popen.mock_calls)
|
|
assert len(mock_calls) == 3
|
|
assert '/usr/bin/hobo-manage hobo_deploy https://hobo1.dev.publik.love/ -' in mock_calls
|
|
assert '/usr/bin/wcsctl hobo_deploy https://wcs.dev.publik.love/ -' in mock_calls
|
|
assert '/usr/lib/combo/manage.py hobo_deploy https://combo.dev.publik.love/ -' in mock_calls
|
|
|
|
@mock.patch('hobo.agent.worker.services.os.path.exists', return_value=True)
|
|
@mock.patch('hobo.agent.worker.services.os.listdir')
|
|
@mock.patch('hobo.agent.worker.services.subprocess')
|
|
def test_notify(mocked_subprocess, mocked_listdir, mocked_exists):
|
|
def listdir(path):
|
|
return '%s.dev.publik.love' % path.split('/')[3]
|
|
mocked_listdir.side_effect = listdir
|
|
mocked_communicate = mock.Mock(return_value=('', ''))
|
|
mocked_opened = mock.Mock(communicate=mocked_communicate, returncode=0)
|
|
mocked_subprocess.Popen = mock.Mock(return_value=mocked_opened)
|
|
notify(NOTIFICATION)
|
|
|
|
# process called
|
|
mock_calls = set(x[1][0] for x in mocked_subprocess.Popen.mock_calls)
|
|
assert mock_calls == {'/usr/lib/combo/manage.py hobo_notify -'}
|
|
|
|
# notification sent
|
|
mock_calls = mocked_communicate.mock_calls
|
|
assert len(mock_calls) == 1
|
|
assert json.loads(force_text(mock_calls[0][2]['input']))['objects']['data'][0]['uuid'] == '12345'
|
|
|
|
mocked_opened.returncode = 1
|
|
with pytest.raises(RuntimeError, match='failed: '):
|
|
notify(NOTIFICATION)
|
|
|
|
# ignore subprocess error
|
|
mocked_communicate.reset_mock()
|
|
mocked_subprocess.Popen.side_effect = OSError
|
|
notify(NOTIFICATION)
|
|
|
|
@mock.patch('hobo.agent.worker.services.os.path.exists', return_value=False)
|
|
@mock.patch('hobo.agent.worker.services.subprocess')
|
|
def test_notify_no_tenant_path(mocked_subprocess, mocked_exists):
|
|
mocked_communicate = mock.Mock(return_value=('', ''))
|
|
mocked_opened = mock.Mock(communicate=mocked_communicate, returncode=0)
|
|
mocked_subprocess.Popen = mock.Mock(return_value=mocked_opened)
|
|
|
|
notify(NOTIFICATION)
|
|
mock_calls = mocked_communicate.mock_calls
|
|
assert len(mock_calls) == 0
|