Add migration executors

This commit is contained in:
Mateu Cànaves 2017-03-10 16:06:03 +01:00
parent 8f71ffed76
commit 323cfcd38a
7 changed files with 132 additions and 24 deletions

View File

@ -1,6 +1,7 @@
#!/usr/bin/env python
from os.path import exists
from version import get_git_version
try:
@ -15,6 +16,7 @@ setup(
author_email='carneiro.be@gmail.com',
packages=[
'tenant_schemas',
'tenant_schemas.migration_executors',
'tenant_schemas.postgresql_backend',
'tenant_schemas.management',
'tenant_schemas.management.commands',

View File

@ -149,11 +149,14 @@ class SyncCommon(BaseCommand):
help=('Database state will be brought to the state after that '
'migration. Use the name "zero" to unapply all migrations.'))
parser.add_argument("-s", "--schema", dest="schema_name")
parser.add_argument('--executor', action='store', dest='executor', default=None,
help='Executor for running migrations [standard (default)|parallel]')
def handle(self, *args, **options):
self.sync_tenant = options.get('tenant')
self.sync_public = options.get('shared')
self.schema_name = options.get('schema_name')
self.executor = options.get('executor')
self.installed_apps = settings.INSTALLED_APPS
self.args = args
self.options = options

View File

@ -1,11 +1,10 @@
import django
from django.conf import settings
from django.core.management.commands.migrate import Command as MigrateCommand
from django.db import connection
from tenant_schemas.management.commands import SyncCommon
from tenant_schemas.utils import get_tenant_model, get_public_schema_name, schema_exists
from tenant_schemas.migration_executors import get_executor
from tenant_schemas.utils import get_public_schema_name, get_tenant_model, schema_exists
if django.VERSION >= (1, 9, 0):
from django.db.migrations.exceptions import MigrationSchemaMissing
@ -34,35 +33,21 @@ class Command(SyncCommon):
super(Command, self).handle(*args, **options)
self.PUBLIC_SCHEMA_NAME = get_public_schema_name()
executor = get_executor(codename=self.executor)(self.args, self.options)
if self.sync_public and not self.schema_name:
self.schema_name = self.PUBLIC_SCHEMA_NAME
if self.sync_public:
self.run_migrations(self.schema_name, settings.SHARED_APPS)
executor.run_migrations(tenants=[self.schema_name])
if self.sync_tenant:
if self.schema_name and self.schema_name != self.PUBLIC_SCHEMA_NAME:
if not schema_exists(self.schema_name):
raise MigrationSchemaMissing('Schema "{}" does not exist'.format(
self.schema_name))
else:
self.run_migrations(self.schema_name, settings.TENANT_APPS)
tenants = [self.schema_name]
else:
all_tenants = get_tenant_model().objects.exclude(schema_name=get_public_schema_name())
for tenant in all_tenants:
self.run_migrations(tenant.schema_name, settings.TENANT_APPS)
def run_migrations(self, schema_name, included_apps):
if int(self.options.get('verbosity', 1)) >= 1:
self._notice("=== Running migrate for schema %s" % schema_name)
if not schema_exists(schema_name):
raise MigrationSchemaMissing('Schema "{}" does not exist'.format(
schema_name))
connection.set_schema(schema_name)
command = MigrateCommand()
command.execute(*self.args, **self.options)
connection.set_schema_to_public()
def _notice(self, output):
self.stdout.write(self.style.NOTICE(output))
tenants = get_tenant_model().objects.exclude(schema_name=get_public_schema_name()).values_list(
'schema_name', flat=True)
executor.run_migrations(tenants=tenants)

View File

@ -0,0 +1,15 @@
import os
from tenant_schemas.migration_executors.base import MigrationExecutor
from tenant_schemas.migration_executors.parallel import ParallelExecutor
from tenant_schemas.migration_executors.standard import StandardExecutor
def get_executor(codename=None):
codename = codename or os.environ.get('EXECUTOR', StandardExecutor.codename)
for klass in MigrationExecutor.__subclasses__():
if klass.codename == codename:
return klass
raise NotImplementedError('No executor with codename %s' % codename)

View File

@ -0,0 +1,64 @@
import sys
from django.core.management.commands.migrate import Command as MigrateCommand
from django.db import transaction
from tenant_schemas.utils import get_public_schema_name
def run_migrations(args, options, executor_codename, schema_name, allow_atomic=True):
from django.core.management import color
from django.core.management.base import OutputWrapper
from django.db import connection
style = color.color_style()
def style_func(msg):
return '[%s:%s] %s' % (
style.NOTICE(executor_codename),
style.NOTICE(schema_name),
msg
)
stdout = OutputWrapper(sys.stdout)
stdout.style_func = style_func
stderr = OutputWrapper(sys.stderr)
stderr.style_func = style_func
if int(options.get('verbosity', 1)) >= 1:
stdout.write(style.NOTICE("=== Running migrate for schema %s" % schema_name))
connection.set_schema(schema_name)
MigrateCommand(stdout=stdout, stderr=stderr).execute(*args, **options)
try:
transaction.commit()
connection.close()
connection.connection = None
except transaction.TransactionManagementError:
if not allow_atomic:
raise
# We are in atomic transaction, don't close connections
pass
connection.set_schema_to_public()
class MigrationExecutor(object):
codename = None
def __init__(self, args, options):
self.args = args
self.options = options
def run_migrations(self, tenants):
public_schema_name = get_public_schema_name()
if public_schema_name in tenants:
run_migrations(self.args, self.options, self.codename, public_schema_name)
tenants.pop(tenants.index(public_schema_name))
self.run_tenant_migrations(tenants)
def run_tenant_migrations(self, tenant):
raise NotImplementedError

View File

@ -0,0 +1,30 @@
import functools
import multiprocessing
from django.conf import settings
from tenant_schemas.migration_executors.base import MigrationExecutor, run_migrations
class ParallelExecutor(MigrationExecutor):
codename = 'parallel'
def run_tenant_migrations(self, tenants):
if tenants:
processes = getattr(settings, 'TENANT_PARALLEL_MIGRATION_MAX_PROCESSES', 2)
chunks = getattr(settings, 'TENANT_PARALLEL_MIGRATION_CHUNKS', 2)
from django.db import connection
connection.close()
connection.connection = None
run_migrations_p = functools.partial(
run_migrations,
self.args,
self.options,
self.codename,
allow_atomic=False
)
p = multiprocessing.Pool(processes=processes)
p.map(run_migrations_p, tenants, chunks)

View File

@ -0,0 +1,9 @@
from tenant_schemas.migration_executors.base import MigrationExecutor, run_migrations
class StandardExecutor(MigrationExecutor):
codename = 'standard'
def run_tenant_migrations(self, tenants):
for schema_name in tenants:
run_migrations(self.args, self.options, self.codename, schema_name)