hobo/hobo/multitenant/management/commands/migrate_schemas.py

122 lines
5.4 KiB
Python

# hobo - portal to configure and deploy applications
# Copyright (C) 2019 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import django
from django.apps import apps
from django.conf import settings
from django.core.management.commands.migrate import Command as MigrateCommand
from django.db import connection
from django.db.migrations.loader import MigrationLoader
from django.db.migrations.recorder import MigrationRecorder
from django.utils.timezone import localtime
from tenant_schemas.postgresql_backend.base import FakeTenant
from tenant_schemas.utils import get_public_schema_name, schema_exists
from hobo.multitenant.management.commands import SyncCommon
from hobo.multitenant.middleware import TenantMiddleware, TenantNotFound
class MigrateSchemasCommand(SyncCommon):
help = "Updates database schema. Manages both apps with migrations and those without."
requires_system_checks = []
def add_arguments(self, parser):
super().add_arguments(parser)
command = MigrateCommand()
command.add_arguments(parser)
parser.set_defaults(verbosity=0)
def handle(self, *args, **options):
options['skip_checks'] = True
super().handle(*args, **options)
if self.domain:
try:
tenant = TenantMiddleware.get_tenant_by_hostname(self.domain)
except TenantNotFound:
raise RuntimeError(f'Tenant "{self.domain}" does not exist')
else:
self.run_migrations(tenant, settings.TENANT_APPS)
elif self.schema_name:
self.run_migrations_on_schema(self.schema_name, settings.TENANT_APPS)
else:
app_labels = []
for app in apps.get_app_configs():
if app.name in settings.TENANT_APPS:
app_labels.append(app.label)
loader = MigrationLoader(None)
loader.load_disk()
all_migrations = {
(app, migration) for app, migration in loader.disk_migrations if app in app_labels
}
tenants = list(TenantMiddleware.get_tenants())
len_tenants = len(tenants)
start_datetime = localtime()
for step, tenant in enumerate(tenants, start=1):
connection.set_tenant(tenant, include_public=False)
applied_migrations = self.get_applied_migrations(app_labels)
if options.get('fake') or options.get('migration_name') or options.get('app_label'):
# never skip migrations if explicit migration actions
# are given.
applied_migrations = []
if all([x in applied_migrations for x in all_migrations]):
if int(self.options.get('verbosity', 1)) >= 1:
self._notice(
"=== Skipping migrations of tenant %s (%s/%s)"
% (tenant.domain_url, step, len_tenants)
)
continue
self.run_migrations(tenant, settings.TENANT_APPS, step, len_tenants)
if int(self.options.get('verbosity', 1)) >= 1:
eta = start_datetime + len_tenants * (localtime() - start_datetime) / step
self._notice('=== migrate_schemas ETA: %s' % eta, flush=True)
def get_applied_migrations(self, app_labels):
applied_migrations = []
with connection.cursor() as cursor:
cursor.execute('SELECT app, name FROM django_migrations')
for row in cursor.fetchall():
applied_migrations.append(row)
applied_migrations = [x for x in applied_migrations if x[0] in app_labels]
return applied_migrations
def run_migrations(self, tenant, included_apps, step=1, steps=1):
if int(self.options.get('verbosity', 1)) >= 1:
self._notice("=== Running migrate for tenant %s (%s/%s)" % (tenant.domain_url, step, steps))
connection.set_tenant(tenant, include_public=False)
command = MigrateCommand()
command.requires_system_checks = False
command.requires_migrations_checks = False
command.execute(*self.args, **self.options)
connection.set_schema_to_public()
def run_migrations_on_schema(self, schema, included_apps):
if int(self.options.get('verbosity', 1)) >= 1:
self._notice("=== Running migrate for schema %s" % schema)
connection.set_schema(schema, include_public=False)
command = MigrateCommand()
command.requires_system_checks = False
command.requires_migrations_checks = False
command.execute(*self.args, **self.options)
connection.set_schema_to_public()
def _notice(self, output, flush=False):
self.stdout.write(self.style.NOTICE(output))
if flush:
self.stdout.flush()
Command = MigrateSchemasCommand