215 lines
7.5 KiB
Python
215 lines
7.5 KiB
Python
import json
|
|
import os
|
|
from unittest import mock
|
|
|
|
import pytest
|
|
from django.core.management import BaseCommand, call_command, load_command_class
|
|
from django.core.management.base import CommandError
|
|
from django.utils.encoding import force_bytes
|
|
|
|
from hobo.multitenant.models import Tenant
|
|
|
|
pytestmark = pytest.mark.django_db
|
|
|
|
|
|
class RecordTenant:
|
|
def __init__(self):
|
|
self.tenants = []
|
|
|
|
def __call__(self, *args, **kwargs):
|
|
from django.db import connection
|
|
|
|
self.tenants.append(connection.tenant)
|
|
|
|
|
|
def get_schemas():
|
|
from django.db import connection
|
|
|
|
cursor = connection.cursor()
|
|
cursor.execute('select schema_name from information_schema.schemata')
|
|
return [x[0] for x in cursor.fetchall()]
|
|
|
|
|
|
@mock.patch('django.contrib.sessions.management.commands.clearsessions.Command.handle')
|
|
def test_all_tenants(handle, tenants):
|
|
from django.core.management import execute_from_command_line
|
|
|
|
handle.side_effect = RecordTenant()
|
|
execute_from_command_line(['manage.py', 'tenant_command', 'clearsessions', '--all-tenants'])
|
|
assert handle.call_count == 2
|
|
assert len(handle.side_effect.tenants) == 2
|
|
assert {tenant.domain_url for tenant in handle.side_effect.tenants} == {
|
|
'tenant1.example.net',
|
|
'tenant2.example.net',
|
|
}
|
|
|
|
|
|
@mock.patch('django.contrib.sessions.management.commands.clearsessions.Command.handle')
|
|
def test_all_tenants_disable_cron(handle, tenants, settings):
|
|
from django.core.management import execute_from_command_line
|
|
|
|
settings.DISABLE_CRON_JOBS = True
|
|
handle.side_effect = RecordTenant()
|
|
execute_from_command_line(['manage.py', 'tenant_command', 'clearsessions', '--all-tenants'])
|
|
assert handle.call_count == 0
|
|
assert len(handle.side_effect.tenants) == 0
|
|
|
|
|
|
@mock.patch('django.contrib.sessions.management.commands.clearsessions.Command.handle')
|
|
def test_all_tenants_disable_cron_for_specific_tenant(handle, tenants, settings):
|
|
from django.core.management import execute_from_command_line
|
|
|
|
disabled_tenant = tenants[0]
|
|
with open(os.path.join(disabled_tenant.get_directory(), 'settings.json'), 'w') as fd:
|
|
json.dump(
|
|
{'TENANT_DISABLE_CRON_JOBS': True},
|
|
fd,
|
|
)
|
|
handle.side_effect = RecordTenant()
|
|
execute_from_command_line(['manage.py', 'tenant_command', 'clearsessions', '--all-tenants'])
|
|
assert handle.call_count == 1
|
|
assert len(handle.side_effect.tenants) == 1
|
|
|
|
|
|
@mock.patch('django.contrib.sessions.management.commands.clearsessions.Command.handle')
|
|
def test_all_tenants_global_disable_cron_with_force_job(handle, tenants, settings):
|
|
from django.core.management import execute_from_command_line
|
|
|
|
settings.DISABLE_CRON_JOBS = True
|
|
handle.side_effect = RecordTenant()
|
|
execute_from_command_line(
|
|
['manage.py', 'tenant_command', 'clearsessions', '--all-tenants', '--force-job']
|
|
)
|
|
assert handle.call_count == 2
|
|
assert len(handle.side_effect.tenants) == 2
|
|
|
|
|
|
@mock.patch('django.contrib.sessions.management.commands.clearsessions.Command.handle')
|
|
def test_one_tenant(handle, tenants, tenant_in_call=None):
|
|
from django.core.management import execute_from_command_line
|
|
|
|
handle.side_effect = RecordTenant()
|
|
execute_from_command_line(['manage.py', 'tenant_command', 'clearsessions', '-d', 'tenant2.example.net'])
|
|
assert handle.call_count == 1
|
|
assert len(handle.side_effect.tenants) == 1
|
|
assert handle.side_effect.tenants[0].domain_url == 'tenant2.example.net'
|
|
|
|
|
|
def test_delete_tenant(tenants):
|
|
from django.core.management import execute_from_command_line
|
|
|
|
from hobo.multitenant.middleware import TenantMiddleware
|
|
|
|
base = os.path.dirname(tenants[0].get_directory())
|
|
if any('removed' in d for d in os.listdir(base)):
|
|
assert False
|
|
|
|
if any('removed' in x for x in get_schemas()):
|
|
assert False
|
|
all_tenants = list(TenantMiddleware.get_tenants())
|
|
assert len(all_tenants) == 2
|
|
execute_from_command_line(['manage.py', 'delete_tenant', 'tenant2.example.net'])
|
|
assert any('removed' in d for d in os.listdir(base))
|
|
assert any('removed' in x for x in get_schemas())
|
|
all_tenants = list(TenantMiddleware.get_tenants())
|
|
assert len(all_tenants) == 1
|
|
|
|
|
|
def test_tenant_command_all_tenants_errors(tenants, monkeypatch, capsys):
|
|
from hobo.multitenant.management.commands import tenant_command
|
|
|
|
get_commands = tenant_command.get_commands
|
|
|
|
class UnicodeErrorCommand(BaseCommand):
|
|
def handle(self, *args, **kwargs):
|
|
raise Exception('héhé')
|
|
|
|
class BytesErrorCommand(BaseCommand):
|
|
def handle(self, *args, **kwargs):
|
|
raise Exception(force_bytes('héhé'))
|
|
|
|
class MixOfBothCommand(BaseCommand):
|
|
def handle(self, *args, **kwargs):
|
|
raise Exception([force_bytes('héhé'), 'hého'])
|
|
|
|
class WtfExceptionCommand(BaseCommand):
|
|
def handle(self, *args, **kwargs):
|
|
class WTF(Exception):
|
|
def __str__(self):
|
|
raise Exception
|
|
|
|
def __repr__(self):
|
|
raise Exception
|
|
|
|
raise WTF()
|
|
|
|
def new_get_commands():
|
|
d = get_commands().copy()
|
|
d['uni-error'] = UnicodeErrorCommand()
|
|
d['bytes-error'] = BytesErrorCommand()
|
|
d['mix-error'] = MixOfBothCommand()
|
|
d['wtf-error'] = WtfExceptionCommand()
|
|
return d
|
|
|
|
monkeypatch.setattr(tenant_command, 'get_commands', new_get_commands)
|
|
|
|
klass = get_commands()['tenant_command']
|
|
if not hasattr(klass, '__call__'):
|
|
klass = load_command_class(klass, 'tenant_command')
|
|
|
|
capsys.readouterr()
|
|
with pytest.raises(SystemExit):
|
|
klass.run_from_argv(['manage.py', 'tenant_command', 'uni-error', '--all-tenants'])
|
|
captured = capsys.readouterr()
|
|
assert 'Tenant tenant1.example.net (tenant1_example_net): Exception: héhé' in captured.err
|
|
assert 'Tenant tenant2.example.net (tenant2_example_net): Exception: héhé' in captured.err
|
|
assert 'Command failed on multiple tenants' in captured.err
|
|
|
|
with pytest.raises(SystemExit):
|
|
klass.run_from_argv(['manage.py', 'tenant_command', 'bytes-error', '--all-tenants'])
|
|
|
|
captured = capsys.readouterr()
|
|
assert repr(force_bytes('héhé')) in captured.err
|
|
|
|
with pytest.raises(SystemExit):
|
|
klass.run_from_argv(['manage.py', 'tenant_command', 'mix-error', '--all-tenants'])
|
|
|
|
captured = capsys.readouterr()
|
|
assert repr(force_bytes('héhé')) in captured.err
|
|
assert repr('hého') in captured.err
|
|
|
|
with pytest.raises(SystemExit):
|
|
klass.run_from_argv(['manage.py', 'tenant_command', 'wtf-error', '--all-tenants'])
|
|
|
|
captured = capsys.readouterr()
|
|
assert 'Unrepresentable exception' in captured.err
|
|
|
|
|
|
def test_list_tenants_command(db, tenants, capsys):
|
|
call_command('list_tenants')
|
|
captured = capsys.readouterr()
|
|
assert captured.out.split('\n') == [
|
|
'tenant1_example_net tenant1.example.net',
|
|
'tenant2_example_net tenant2.example.net',
|
|
'',
|
|
]
|
|
|
|
|
|
def test_list_tenants_command_empty(db, capsys):
|
|
call_command('list_tenants')
|
|
captured = capsys.readouterr()
|
|
assert captured.out == ''
|
|
|
|
|
|
@mock.patch('hobo.multitenant.management.commands.create_schemas.TenantMiddleware.get_tenants')
|
|
def test_create_schema_command(mocked_get_tenants):
|
|
tenant = Tenant(domain_url='my-tenant-url', schema_name='my_schema_name')
|
|
mocked_get_tenants.return_value = [tenant]
|
|
call_command('create_schemas')
|
|
assert 'my_schema_name' in get_schemas()
|
|
|
|
|
|
def test_shell_command_empty():
|
|
with pytest.raises(CommandError, match='There are no tenants in the system.'):
|
|
call_command('shell')
|