summaryrefslogtreecommitdiffstats
path: root/tests_multitenant/test_tenant_command.py
blob: b258f7276ee33fca7e3ac753a415b5cef3997bf8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# -*- coding: utf-8 -*-

import pytest
import mock
import os

from django.core.management import BaseCommand, call_command, load_command_class

pytestmark = pytest.mark.django_db


class RecordTenant(object):
    def __init__(self):
        self.tenants = []

    def __call__(self, *args, **kwargs):
        from django.db import connection
        self.tenants.append(connection.tenant)


@mock.patch('django.contrib.sessions.management.commands.clearsessions.Command.handle')
def test_all_tenants(handle, tenants):
    from django.core.management import execute_from_command_line
    handle.side_effect = RecordTenant()
    execute_from_command_line(['manage.py', 'tenant_command', 'clearsessions', '--all-tenants'])
    assert handle.call_count == 2
    assert len(handle.side_effect.tenants) == 2
    assert (set(tenant.domain_url for tenant in handle.side_effect.tenants)
            == set(['tenant1.example.net', 'tenant2.example.net']))


@mock.patch('django.contrib.sessions.management.commands.clearsessions.Command.handle')
def test_one_tenant(handle, tenants, tenant_in_call=None):
    from django.core.management import execute_from_command_line
    handle.side_effect = RecordTenant()
    execute_from_command_line(['manage.py', 'tenant_command', 'clearsessions', '-d', 'tenant2.example.net'])
    assert handle.call_count == 1
    assert len(handle.side_effect.tenants) == 1
    assert handle.side_effect.tenants[0].domain_url == 'tenant2.example.net'


def test_delete_tenant(tenants):
    from django.core.management import execute_from_command_line
    from hobo.multitenant.middleware import TenantMiddleware
    base = os.path.dirname(tenants[0].get_directory())
    if any('removed' in d for d in os.listdir(base)):
        assert False

    def get_schemas():
        from django.db import connection
        cursor = connection.cursor()
        cursor.execute('select schema_name from information_schema.schemata')
        return [x[0] for x in cursor.fetchall()]

    if any('removed' in x for x in get_schemas()):
        assert False
    all_tenants = list(TenantMiddleware.get_tenants())
    assert len(all_tenants) == 2
    execute_from_command_line(['manage.py', 'delete_tenant', 'tenant2.example.net'])
    assert any('removed' in d for d in os.listdir(base))
    assert any('removed' in x for x in get_schemas())
    all_tenants = list(TenantMiddleware.get_tenants())
    assert len(all_tenants) == 1


def test_tenant_command_all_tenants_errors(tenants, monkeypatch, capsys):
    from hobo.multitenant.management.commands import tenant_command

    get_commands = tenant_command.get_commands

    class UnicodeErrorCommand(BaseCommand):
        def handle(self, *args, **kwargs):
            raise Exception(u'héhé')

    class BytesErrorCommand(BaseCommand):
        def handle(self, *args, **kwargs):
            raise Exception(b'héhé')

    class MixOfBothCommand(BaseCommand):
        def handle(self, *args, **kwargs):
            raise Exception([b'héhé', u'hého'])

    class WtfExceptionCommand(BaseCommand):
        def handle(self, *args, **kwargs):
            class WTF(Exception):
                def __str__(self):
                    raise Exception

                def __repr__(self):
                    raise Exception

            raise WTF()

    def new_get_commands():
        d = get_commands().copy()
        d['uni-error'] = UnicodeErrorCommand()
        d['bytes-error'] = BytesErrorCommand()
        d['mix-error'] = MixOfBothCommand()
        d['wtf-error'] = WtfExceptionCommand()
        return d

    monkeypatch.setattr(tenant_command, 'get_commands', new_get_commands)

    klass = get_commands()['tenant_command']
    if not hasattr(klass, '__call__'):
        klass = load_command_class(klass, 'tenant_command')

    capsys.readouterr()
    with pytest.raises(SystemExit):
        klass.run_from_argv(['manage.py', 'tenant_command', 'uni-error', '--all-tenants'])
    captured = capsys.readouterr()
    assert u'héhé' in captured.err

    with pytest.raises(SystemExit):
        klass.run_from_argv(['manage.py', 'tenant_command', 'bytes-error', '--all-tenants'])

    captured = capsys.readouterr()
    assert u'héhé' in captured.err

    with pytest.raises(SystemExit):
        klass.run_from_argv(['manage.py', 'tenant_command', 'mix-error', '--all-tenants'])

    captured = capsys.readouterr()
    assert repr(b'héhé') in captured.err
    assert repr(u'hého') in captured.err

    with pytest.raises(SystemExit):
        klass.run_from_argv(['manage.py', 'tenant_command', 'wtf-error', '--all-tenants'])

    captured = capsys.readouterr()
    assert 'Unrepresentable exception' in captured.err