summaryrefslogtreecommitdiffstats
path: root/tests_multitenant/test_tenant_command.py
blob: 55ed76119fc7d9a21acb94718523d150e1cc6072 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
# -*- coding: utf-8 -*-

import pytest
import mock
import os
import sys

from django.core.management import BaseCommand, call_command, load_command_class
from django.utils.encoding import force_bytes
from django.utils import six
from django.core.management.base import CommandError

from hobo.multitenant.models import Tenant

pytestmark = pytest.mark.django_db


class RecordTenant(object):
    def __init__(self):
        self.tenants = []

    def __call__(self, *args, **kwargs):
        from django.db import connection
        self.tenants.append(connection.tenant)


def get_schemas():
    from django.db import connection
    cursor = connection.cursor()
    cursor.execute('select schema_name from information_schema.schemata')
    return [x[0] for x in cursor.fetchall()]


@mock.patch('django.contrib.sessions.management.commands.clearsessions.Command.handle')
def test_all_tenants(handle, tenants):
    from django.core.management import execute_from_command_line
    handle.side_effect = RecordTenant()
    execute_from_command_line(['manage.py', 'tenant_command', 'clearsessions', '--all-tenants'])
    assert handle.call_count == 2
    assert len(handle.side_effect.tenants) == 2
    assert (set(tenant.domain_url for tenant in handle.side_effect.tenants)
            == set(['tenant1.example.net', 'tenant2.example.net']))


@mock.patch('django.contrib.sessions.management.commands.clearsessions.Command.handle')
def test_one_tenant(handle, tenants, tenant_in_call=None):
    from django.core.management import execute_from_command_line
    handle.side_effect = RecordTenant()
    execute_from_command_line(['manage.py', 'tenant_command', 'clearsessions', '-d', 'tenant2.example.net'])
    assert handle.call_count == 1
    assert len(handle.side_effect.tenants) == 1
    assert handle.side_effect.tenants[0].domain_url == 'tenant2.example.net'


def test_delete_tenant(tenants):
    from django.core.management import execute_from_command_line
    from hobo.multitenant.middleware import TenantMiddleware
    base = os.path.dirname(tenants[0].get_directory())
    if any('removed' in d for d in os.listdir(base)):
        assert False

    if any('removed' in x for x in get_schemas()):
        assert False
    all_tenants = list(TenantMiddleware.get_tenants())
    assert len(all_tenants) == 2
    execute_from_command_line(['manage.py', 'delete_tenant', 'tenant2.example.net'])
    assert any('removed' in d for d in os.listdir(base))
    assert any('removed' in x for x in get_schemas())
    all_tenants = list(TenantMiddleware.get_tenants())
    assert len(all_tenants) == 1


def test_tenant_command_all_tenants_errors(tenants, monkeypatch, capsys):
    from hobo.multitenant.management.commands import tenant_command

    get_commands = tenant_command.get_commands

    class UnicodeErrorCommand(BaseCommand):
        def handle(self, *args, **kwargs):
            raise Exception(u'héhé')

    class BytesErrorCommand(BaseCommand):
        def handle(self, *args, **kwargs):
            raise Exception(force_bytes('héhé'))

    class MixOfBothCommand(BaseCommand):
        def handle(self, *args, **kwargs):
            raise Exception([force_bytes('héhé'), u'hého'])

    class WtfExceptionCommand(BaseCommand):
        def handle(self, *args, **kwargs):
            class WTF(Exception):
                def __str__(self):
                    raise Exception

                def __repr__(self):
                    raise Exception

            raise WTF()

    def new_get_commands():
        d = get_commands().copy()
        d['uni-error'] = UnicodeErrorCommand()
        d['bytes-error'] = BytesErrorCommand()
        d['mix-error'] = MixOfBothCommand()
        d['wtf-error'] = WtfExceptionCommand()
        return d

    monkeypatch.setattr(tenant_command, 'get_commands', new_get_commands)

    klass = get_commands()['tenant_command']
    if not hasattr(klass, '__call__'):
        klass = load_command_class(klass, 'tenant_command')

    capsys.readouterr()
    with pytest.raises(SystemExit):
        klass.run_from_argv(['manage.py', 'tenant_command', 'uni-error', '--all-tenants'])
    captured = capsys.readouterr()
    assert u'héhé' in captured.err

    with pytest.raises(SystemExit):
        klass.run_from_argv(['manage.py', 'tenant_command', 'bytes-error', '--all-tenants'])

    captured = capsys.readouterr()
    if six.PY2:
        assert u'héhé' in captured.err
    else:
        assert repr(force_bytes('héhé')) in captured.err

    with pytest.raises(SystemExit):
        klass.run_from_argv(['manage.py', 'tenant_command', 'mix-error', '--all-tenants'])

    captured = capsys.readouterr()
    assert repr(force_bytes('héhé')) in captured.err
    assert repr(u'hého') in captured.err

    with pytest.raises(SystemExit):
        klass.run_from_argv(['manage.py', 'tenant_command', 'wtf-error', '--all-tenants'])

    captured = capsys.readouterr()
    assert 'Unrepresentable exception' in captured.err


def test_list_tenants_command(db, tenants, capsys):
    call_command('list_tenants')
    captured = capsys.readouterr()
    assert captured.out.split('\n') == [
        'tenant1_example_net tenant1.example.net',
        'tenant2_example_net tenant2.example.net',
        ''
    ]


def test_list_tenants_command_empty(db, capsys):
    call_command('list_tenants')
    captured = capsys.readouterr()
    assert captured.out == ''


@mock.patch('hobo.multitenant.management.commands.create_schemas.TenantMiddleware.get_tenants')
def test_create_schema_command(mocked_get_tenants):
    tenant = Tenant(domain_url='my-tenant-url', schema_name='my_schema_name')
    mocked_get_tenants.return_value = [tenant]
    nb_schemas = len(get_schemas())
    call_command('create_schemas')
    assert 'my_schema_name' in get_schemas()[nb_schemas]


def test_shell_command_empty():
    with pytest.raises(CommandError, match="There are no tenants in the system."):
        call_command('shell')