multitenant: wrap schema creation in atomic() (#23119)

This commit is contained in:
Benjamin Dauvergne 2018-04-12 11:10:21 +02:00
parent 27e006a180
commit 2263f59679
2 changed files with 68 additions and 2 deletions

View File

@ -1,7 +1,7 @@
import os
import sys
from django.db import connection
from django.db import connection, transaction
from django.core.management.base import CommandError, BaseCommand
from hobo.multitenant.middleware import TenantMiddleware, get_tenant_model
@ -47,7 +47,8 @@ class Command(BaseCommand):
print
print self.style.NOTICE("=== Creating schema ") \
+ self.style.SQL_TABLE(tenant.schema_name)
tenant.create_schema(check_if_exists=True)
with transaction.atomic():
tenant.create_schema(check_if_exists=True)
except Exception as e:
os.rmdir(tenant_dir_tmp)
raise CommandError('tenant creation failed (%s)' % str(e))

View File

@ -0,0 +1,65 @@
# hobo - portal to configure and deploy applications
# Copyright (C) 2019 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from django.db import connection
from django.core.management import call_command
from django.core.management.base import CommandError
from django.contrib.auth.models import User
from tenant_schemas.utils import tenant_context
from hobo.multitenant.middleware import TenantMiddleware
import mock
import pytest
@pytest.fixture(autouse=True)
def configuration(settings, tmpdir):
settings.TENANT_BASE = str(tmpdir.mkdir('tenants'))
def schema_exists(schema_name):
with connection.cursor() as cursor:
cursor.execute('select schema_name from information_schema.schemata')
return 'www_example_com' in [row[0] for row in cursor.fetchall()]
def test_create_tenant(db):
assert not schema_exists('www_example_com')
call_command('create_tenant', 'www.example.com')
assert schema_exists('www_example_com')
tenants = list(TenantMiddleware.get_tenants())
assert len(tenants) == 1
tenant = tenants[0]
assert tenant.domain_url == 'www.example.com'
assert tenant.schema_name == 'www_example_com'
with tenant_context(tenant):
User.objects.create(username='admin')
def test_create_tenant_failure(db, caplog):
with mock.patch('hobo.multitenant.management.commands.migrate_schemas.MigrateSchemasCommand.handle') as handle:
handle.side_effect = CommandError('unable to migrate')
assert not schema_exists('www_example_com')
with pytest.raises(CommandError) as exc_info:
call_command('create_tenant', 'www.example.com')
assert str(exc_info.value) == 'tenant creation failed (unable to migrate)'
assert not schema_exists('www_example_com')
with connection.cursor() as cursor:
cursor.execute('select schema_name from information_schema.schemata')
assert 'www_example_com' not in [row[0] for row in cursor.fetchall()]