Merge pull request #454 from mcanaves/master

Added parallel migrations. Thank you @mcanaves
This commit is contained in:
Bernardo Pires 2017-04-23 22:53:23 +02:00 committed by GitHub
commit a4fb6cd936
14 changed files with 183 additions and 45 deletions

View File

@ -1,21 +1,22 @@
sudo: false
language: python
python:
- 2.7
- 3.5
- 2.7
- 3.5
services:
- postgresql
- postgresql
addons:
postgresql: '9.4'
install:
- pip install tox
before_script:
- psql -c "CREATE DATABASE dts_test_project;" -U postgres
script:
- tox -e py${TRAVIS_PYTHON_VERSION/./}-dj${DJANGO/./}
install: pip install -q tox-travis
env:
- DJANGO=1.8
- DJANGO=1.10
- DJANGO=1.11
- DJANGO=1.8
- DJANGO=1.9
- DJANGO=1.10
- DJANGO=1.11
matrix:
fast_finish: true
script: tox
before_script: psql -c "CREATE DATABASE dts_test_project;" -U postgres
deploy:
provider: pypi
user: bcarneiro

View File

@ -11,8 +11,6 @@
# All configuration values have a default; values that are commented out
# serve to show the default.
import sys
import os
import datetime
# If extensions (or modules to document with autodoc) are in another directory,

View File

@ -98,6 +98,26 @@ The options given to ``migrate_schemas`` are also passed to every ``migrate``. H
``migrate_schemas`` raises an exception when an tenant schema is missing.
migrate_schemas in parallel
~~~~~~~~~~~~~~~~~~~~~~~~~~~
Once the number of tenants grow, migrating all the tenants can become a bottleneck. To speed up this process, you can run tenant migrations in parallel like this:
.. code-block:: bash
python manage.py migrate_schemas --executor=parallel
In fact, you can write your own executor which will run tenant migrations in
any way you want, just take a look at ``tenant_schemas/migration_executors``.
The ``parallel`` executor accepts the following settings:
* ``TENANT_PARALLEL_MIGRATION_MAX_PROCESSES`` (default: 2) - maximum number of
processes for migration pool (this is to avoid exhausting the database
connection pool)
* ``TENANT_PARALLEL_MIGRATION_CHUNKS`` (default: 2) - number of migrations to be
sent at once to every worker
tenant_command
~~~~~~~~~~~~~~

View File

@ -1,6 +1,7 @@
#!/usr/bin/env python
from os.path import exists
from version import get_git_version
try:
@ -15,6 +16,7 @@ setup(
author_email='carneiro.be@gmail.com',
packages=[
'tenant_schemas',
'tenant_schemas.migration_executors',
'tenant_schemas.postgresql_backend',
'tenant_schemas.management',
'tenant_schemas.management.commands',

View File

@ -149,11 +149,14 @@ class SyncCommon(BaseCommand):
help=('Database state will be brought to the state after that '
'migration. Use the name "zero" to unapply all migrations.'))
parser.add_argument("-s", "--schema", dest="schema_name")
parser.add_argument('--executor', action='store', dest='executor', default=None,
help='Executor for running migrations [standard (default)|parallel]')
def handle(self, *args, **options):
self.sync_tenant = options.get('tenant')
self.sync_public = options.get('shared')
self.schema_name = options.get('schema_name')
self.executor = options.get('executor')
self.installed_apps = settings.INSTALLED_APPS
self.args = args
self.options = options

View File

@ -1,11 +1,10 @@
import django
from django.conf import settings
from django.core.management.commands.migrate import Command as MigrateCommand
from django.db import connection
from tenant_schemas.management.commands import SyncCommon
from tenant_schemas.utils import get_tenant_model, get_public_schema_name, schema_exists
from tenant_schemas.migration_executors import get_executor
from tenant_schemas.utils import get_public_schema_name, get_tenant_model, schema_exists
if django.VERSION >= (1, 9, 0):
from django.db.migrations.exceptions import MigrationSchemaMissing
@ -34,35 +33,21 @@ class Command(SyncCommon):
super(Command, self).handle(*args, **options)
self.PUBLIC_SCHEMA_NAME = get_public_schema_name()
executor = get_executor(codename=self.executor)(self.args, self.options)
if self.sync_public and not self.schema_name:
self.schema_name = self.PUBLIC_SCHEMA_NAME
if self.sync_public:
self.run_migrations(self.schema_name, settings.SHARED_APPS)
executor.run_migrations(tenants=[self.schema_name])
if self.sync_tenant:
if self.schema_name and self.schema_name != self.PUBLIC_SCHEMA_NAME:
if not schema_exists(self.schema_name):
raise MigrationSchemaMissing('Schema "{}" does not exist'.format(
self.schema_name))
else:
self.run_migrations(self.schema_name, settings.TENANT_APPS)
tenants = [self.schema_name]
else:
all_tenants = get_tenant_model().objects.exclude(schema_name=get_public_schema_name())
for tenant in all_tenants:
self.run_migrations(tenant.schema_name, settings.TENANT_APPS)
def run_migrations(self, schema_name, included_apps):
if int(self.options.get('verbosity', 1)) >= 1:
self._notice("=== Running migrate for schema %s" % schema_name)
if not schema_exists(schema_name):
raise MigrationSchemaMissing('Schema "{}" does not exist'.format(
schema_name))
connection.set_schema(schema_name)
command = MigrateCommand()
command.execute(*self.args, **self.options)
connection.set_schema_to_public()
def _notice(self, output):
self.stdout.write(self.style.NOTICE(output))
tenants = get_tenant_model().objects.exclude(schema_name=get_public_schema_name()).values_list(
'schema_name', flat=True)
executor.run_migrations(tenants=tenants)

View File

@ -0,0 +1,15 @@
import os
from tenant_schemas.migration_executors.base import MigrationExecutor
from tenant_schemas.migration_executors.parallel import ParallelExecutor
from tenant_schemas.migration_executors.standard import StandardExecutor
def get_executor(codename=None):
codename = codename or os.environ.get('EXECUTOR', StandardExecutor.codename)
for klass in MigrationExecutor.__subclasses__():
if klass.codename == codename:
return klass
raise NotImplementedError('No executor with codename %s' % codename)

View File

@ -0,0 +1,64 @@
import sys
from django.core.management.commands.migrate import Command as MigrateCommand
from django.db import transaction
from tenant_schemas.utils import get_public_schema_name
def run_migrations(args, options, executor_codename, schema_name, allow_atomic=True):
from django.core.management import color
from django.core.management.base import OutputWrapper
from django.db import connection
style = color.color_style()
def style_func(msg):
return '[%s:%s] %s' % (
style.NOTICE(executor_codename),
style.NOTICE(schema_name),
msg
)
stdout = OutputWrapper(sys.stdout)
stdout.style_func = style_func
stderr = OutputWrapper(sys.stderr)
stderr.style_func = style_func
if int(options.get('verbosity', 1)) >= 1:
stdout.write(style.NOTICE("=== Running migrate for schema %s" % schema_name))
connection.set_schema(schema_name)
MigrateCommand(stdout=stdout, stderr=stderr).execute(*args, **options)
try:
transaction.commit()
connection.close()
connection.connection = None
except transaction.TransactionManagementError:
if not allow_atomic:
raise
# We are in atomic transaction, don't close connections
pass
connection.set_schema_to_public()
class MigrationExecutor(object):
codename = None
def __init__(self, args, options):
self.args = args
self.options = options
def run_migrations(self, tenants):
public_schema_name = get_public_schema_name()
if public_schema_name in tenants:
run_migrations(self.args, self.options, self.codename, public_schema_name)
tenants.pop(tenants.index(public_schema_name))
self.run_tenant_migrations(tenants)
def run_tenant_migrations(self, tenant):
raise NotImplementedError

View File

@ -0,0 +1,30 @@
import functools
import multiprocessing
from django.conf import settings
from tenant_schemas.migration_executors.base import MigrationExecutor, run_migrations
class ParallelExecutor(MigrationExecutor):
codename = 'parallel'
def run_tenant_migrations(self, tenants):
if tenants:
processes = getattr(settings, 'TENANT_PARALLEL_MIGRATION_MAX_PROCESSES', 2)
chunks = getattr(settings, 'TENANT_PARALLEL_MIGRATION_CHUNKS', 2)
from django.db import connection
connection.close()
connection.connection = None
run_migrations_p = functools.partial(
run_migrations,
self.args,
self.options,
self.codename,
allow_atomic=False
)
p = multiprocessing.Pool(processes=processes)
p.map(run_migrations_p, tenants, chunks)

View File

@ -0,0 +1,9 @@
from tenant_schemas.migration_executors.base import MigrationExecutor, run_migrations
class StandardExecutor(MigrationExecutor):
codename = 'standard'
def run_tenant_migrations(self, tenants):
for schema_name in tenants:
run_migrations(self.args, self.options, self.codename, schema_name)

View File

@ -1,13 +1,13 @@
import re
import warnings
import psycopg2
import django.db.utils
import psycopg2
from django.conf import settings
from django.core.exceptions import ImproperlyConfigured, ValidationError
import django.db.utils
from tenant_schemas.utils import get_public_schema_name, get_limit_set_calls
from tenant_schemas.postgresql_backend.introspection import DatabaseSchemaIntrospection
from tenant_schemas.utils import get_limit_set_calls, get_public_schema_name
ORIGINAL_BACKEND = getattr(settings, 'ORIGINAL_BACKEND', 'django.db.backends.postgresql_psycopg2')
# Django 1.9+ takes care to rename the default backend to 'django.db.backends.postgresql'

View File

@ -1,10 +1,9 @@
from django.core.management import call_command
from django.conf import settings
from django.core.management import call_command
from django.db import connection
from django.test import TestCase
from tenant_schemas.utils import get_public_schema_name
from tenant_schemas.utils import get_tenant_model
from tenant_schemas.utils import get_public_schema_name, get_tenant_model
ALLOWED_TEST_DOMAIN = '.test.com'

View File

@ -1,4 +1,5 @@
import inspect
from django.conf import settings
from django.core.management import call_command
from django.db import connection

13
tox.ini
View File

@ -1,5 +1,12 @@
[tox]
envlist = py{27,35}-dj{18,110,111}
envlist = py{27,35}-dj{18,19,110,111}-{standard,parallel}
[travis:env]
DJANGO =
1.8: dj18-{standard,parallel}
1.9: dj19-{standard,parallel}
1.10: dj110-{standard,parallel}
1.11: dj111-{standard,parallel}
[testenv]
usedevelop = True
@ -17,6 +24,10 @@ changedir = dts_test_project
passenv = PG_NAME PG_USER PG_PASSWORD PG_HOST PG_PORT
setenv =
standard: MIGRATION_EXECUTOR=standard
parallel: MIGRATION_EXECUTOR=parallel
commands =
coverage run manage.py test --noinput {posargs:tenant_schemas}
coverage report -m --include=../tenant_schemas/*