multitenant: add delete_tenant command (#15513)
This commit is contained in:
parent
5c37a4a6bc
commit
1e1ee079bd
|
@ -0,0 +1,29 @@
|
|||
import sys
|
||||
from optparse import make_option
|
||||
|
||||
from django.core.management.base import CommandError, BaseCommand
|
||||
from hobo.multitenant.middleware import TenantMiddleware
|
||||
|
||||
|
||||
class Command(BaseCommand):
|
||||
help = "Delete tenant(s) by hostname(s)"
|
||||
args = ['...']
|
||||
option_list = BaseCommand.option_list + (
|
||||
make_option('--force-drop', action='store_true', default=False,
|
||||
help='If you want the schema to be deleted from database'),
|
||||
)
|
||||
|
||||
def handle(self, *args, **options):
|
||||
|
||||
if not args:
|
||||
raise CommandError("you must give at least one tenant hostname")
|
||||
|
||||
# if - is given on the command line, get list of hostnames from stdin
|
||||
if '-' in args:
|
||||
args = list(args)
|
||||
args.remove('-')
|
||||
args.extend([x.strip() for x in sys.stdin.readlines()])
|
||||
for hostname in args:
|
||||
tenant = TenantMiddleware.get_tenant_by_hostname(hostname)
|
||||
tenant.delete(force_drop=options['force_drop'])
|
||||
|
|
@ -1,9 +1,11 @@
|
|||
import os
|
||||
from urlparse import urljoin
|
||||
import json
|
||||
from shutil import rmtree
|
||||
|
||||
from django.conf import settings
|
||||
from django.db import connection
|
||||
from django.utils import timezone
|
||||
|
||||
from tenant_schemas.utils import get_public_schema_name
|
||||
from tenant_schemas.models import TenantMixin
|
||||
|
@ -64,9 +66,17 @@ class Tenant(TenantMixin):
|
|||
raise Exception("Can't delete tenant outside it's own schema or "
|
||||
"the public schema. Current schema is %s."
|
||||
% connection.schema_name)
|
||||
|
||||
os.rename(self.get_directory(), self.get_directory() + '.invalid')
|
||||
if force_drop:
|
||||
rmtree(self.get_directory())
|
||||
else:
|
||||
deletion_date = timezone.now().strftime('%Y%m%d_%H%M%S_%f')
|
||||
os.rename(self.get_directory(), self.get_directory() + '.removed_%s.invalid' % deletion_date)
|
||||
|
||||
if schema_exists(self.schema_name) and (self.auto_drop_schema or force_drop):
|
||||
cursor = connection.cursor()
|
||||
cursor.execute('DROP SCHEMA %s CASCADE' % self.schema_name)
|
||||
|
||||
if schema_exists(self.schema_name) and (not self.auto_drop_schema and not force_drop):
|
||||
cursor = connection.cursor()
|
||||
schema_new_name = 'removed_%s_%s' % (deletion_date, self.schema_name)
|
||||
cursor.execute('ALTER SCHEMA %s RENAME TO %s' % (self.schema_name, schema_new_name[:63]))
|
||||
|
|
|
@ -54,8 +54,9 @@ def tenants(transactional_db, request, settings):
|
|||
tenants = [make_tenant('tenant1.example.net'), make_tenant('tenant2.example.net')]
|
||||
def fin():
|
||||
from django.db import connection
|
||||
from hobo.multitenant.middleware import TenantMiddleware
|
||||
connection.set_schema_to_public()
|
||||
for t in tenants:
|
||||
for t in TenantMiddleware.get_tenants():
|
||||
t.delete(True)
|
||||
shutil.rmtree(base)
|
||||
request.addfinalizer(fin)
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
import pytest
|
||||
import mock
|
||||
import os
|
||||
|
||||
pytestmark = pytest.mark.django_db
|
||||
|
||||
|
@ -29,3 +30,26 @@ def test_one_tenant(handle, tenants, tenant_in_call=None):
|
|||
assert handle.call_count == 1
|
||||
assert len(handle.side_effect.tenants) == 1
|
||||
assert handle.side_effect.tenants[0].domain_url == 'tenant2.example.net'
|
||||
|
||||
def test_delete_tenant(tenants):
|
||||
from django.core.management import execute_from_command_line
|
||||
from hobo.multitenant.middleware import TenantMiddleware
|
||||
base = os.path.dirname(tenants[0].get_directory())
|
||||
if any('removed' in d for d in os.listdir(base)):
|
||||
assert False
|
||||
def get_schemas():
|
||||
from django.db import connection
|
||||
cursor = connection.cursor()
|
||||
cursor.execute('select schema_name from information_schema.schemata')
|
||||
return [x[0] for x in cursor.fetchall()]
|
||||
if any('removed' in x for x in get_schemas()):
|
||||
assert False
|
||||
all_tenants = list(TenantMiddleware.get_tenants())
|
||||
assert len(all_tenants) == 2
|
||||
execute_from_command_line(['manage.py', 'delete_tenant', 'tenant2.example.net'])
|
||||
if not any('removed' in d for d in os.listdir(base)):
|
||||
assert False
|
||||
if not any('removed' in x for x in get_schemas()):
|
||||
assert False
|
||||
all_tenants = list(TenantMiddleware.get_tenants())
|
||||
assert len(all_tenants) == 1
|
||||
|
|
Loading…
Reference in New Issue