tenant_command: convert exception to UTF-8 (#30559)
This commit is contained in:
parent
874130fde1
commit
11c79a9770
|
@ -9,16 +9,39 @@ import argparse
|
|||
import sys
|
||||
|
||||
import django
|
||||
from django.utils import six
|
||||
from django.utils.encoding import force_text
|
||||
from django.conf import settings
|
||||
from django.core.exceptions import ImproperlyConfigured
|
||||
from django.core.management.base import (BaseCommand, CommandError,
|
||||
SystemCheckError, handle_default_options)
|
||||
SystemCheckError,
|
||||
handle_default_options)
|
||||
from django.core.management import call_command, get_commands, load_command_class
|
||||
from django.db import connection, connections
|
||||
|
||||
from hobo.multitenant.management.commands import InteractiveTenantOption
|
||||
from hobo.multitenant.middleware import TenantMiddleware
|
||||
|
||||
|
||||
def exception_to_text(e):
|
||||
try:
|
||||
return six.text_type(e)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
try:
|
||||
return force_text(str(e), errors='ignore')
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
try:
|
||||
return force_text(repr(e), errors='ignore')
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return 'Unrepresentable exception'
|
||||
|
||||
|
||||
def run_command_from_argv(command, argv):
|
||||
# copied/adapted from Django run_from_argv
|
||||
command._called_from_command_line = True
|
||||
|
@ -44,7 +67,7 @@ def run_command_from_argv(command, argv):
|
|||
command.stderr.write(str(e), lambda x: x)
|
||||
else:
|
||||
command.stderr.write('%s: %s: %s' % (
|
||||
connection.get_tenant(), e.__class__.__name__, e))
|
||||
connection.get_tenant(), e.__class__.__name__, exception_to_text(e)))
|
||||
return e
|
||||
finally:
|
||||
try:
|
||||
|
|
|
@ -1,9 +1,14 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
import pytest
|
||||
import mock
|
||||
import os
|
||||
|
||||
from django.core.management import BaseCommand, call_command, load_command_class
|
||||
|
||||
pytestmark = pytest.mark.django_db
|
||||
|
||||
|
||||
class RecordTenant(object):
|
||||
def __init__(self):
|
||||
self.tenants = []
|
||||
|
@ -12,6 +17,7 @@ class RecordTenant(object):
|
|||
from django.db import connection
|
||||
self.tenants.append(connection.tenant)
|
||||
|
||||
|
||||
@mock.patch('django.contrib.sessions.management.commands.clearsessions.Command.handle')
|
||||
def test_all_tenants(handle, tenants):
|
||||
from django.core.management import execute_from_command_line
|
||||
|
@ -19,8 +25,9 @@ def test_all_tenants(handle, tenants):
|
|||
execute_from_command_line(['manage.py', 'tenant_command', 'clearsessions', '--all-tenants'])
|
||||
assert handle.call_count == 2
|
||||
assert len(handle.side_effect.tenants) == 2
|
||||
assert set(tenant.domain_url for tenant in handle.side_effect.tenants) == \
|
||||
set(['tenant1.example.net', 'tenant2.example.net'])
|
||||
assert (set(tenant.domain_url for tenant in handle.side_effect.tenants)
|
||||
== set(['tenant1.example.net', 'tenant2.example.net']))
|
||||
|
||||
|
||||
@mock.patch('django.contrib.sessions.management.commands.clearsessions.Command.handle')
|
||||
def test_one_tenant(handle, tenants, tenant_in_call=None):
|
||||
|
@ -31,25 +38,94 @@ def test_one_tenant(handle, tenants, tenant_in_call=None):
|
|||
assert len(handle.side_effect.tenants) == 1
|
||||
assert handle.side_effect.tenants[0].domain_url == 'tenant2.example.net'
|
||||
|
||||
|
||||
def test_delete_tenant(tenants):
|
||||
from django.core.management import execute_from_command_line
|
||||
from hobo.multitenant.middleware import TenantMiddleware
|
||||
base = os.path.dirname(tenants[0].get_directory())
|
||||
if any('removed' in d for d in os.listdir(base)):
|
||||
assert False
|
||||
|
||||
def get_schemas():
|
||||
from django.db import connection
|
||||
cursor = connection.cursor()
|
||||
cursor.execute('select schema_name from information_schema.schemata')
|
||||
return [x[0] for x in cursor.fetchall()]
|
||||
|
||||
if any('removed' in x for x in get_schemas()):
|
||||
assert False
|
||||
all_tenants = list(TenantMiddleware.get_tenants())
|
||||
assert len(all_tenants) == 2
|
||||
execute_from_command_line(['manage.py', 'delete_tenant', 'tenant2.example.net'])
|
||||
if not any('removed' in d for d in os.listdir(base)):
|
||||
assert False
|
||||
if not any('removed' in x for x in get_schemas()):
|
||||
assert False
|
||||
assert any('removed' in d for d in os.listdir(base))
|
||||
assert any('removed' in x for x in get_schemas())
|
||||
all_tenants = list(TenantMiddleware.get_tenants())
|
||||
assert len(all_tenants) == 1
|
||||
|
||||
|
||||
def test_tenant_command_all_tenants_errors(tenants, monkeypatch, capsys):
|
||||
from hobo.multitenant.management.commands import tenant_command
|
||||
|
||||
get_commands = tenant_command.get_commands
|
||||
|
||||
class UnicodeErrorCommand(BaseCommand):
|
||||
def handle(self, *args, **kwargs):
|
||||
raise Exception(u'héhé')
|
||||
|
||||
class BytesErrorCommand(BaseCommand):
|
||||
def handle(self, *args, **kwargs):
|
||||
raise Exception(b'héhé')
|
||||
|
||||
class MixOfBothCommand(BaseCommand):
|
||||
def handle(self, *args, **kwargs):
|
||||
raise Exception([b'héhé', u'hého'])
|
||||
|
||||
class WtfExceptionCommand(BaseCommand):
|
||||
def handle(self, *args, **kwargs):
|
||||
class WTF(Exception):
|
||||
def __str__(self):
|
||||
raise Exception
|
||||
|
||||
def __repr__(self):
|
||||
raise Exception
|
||||
|
||||
raise WTF()
|
||||
|
||||
def new_get_commands():
|
||||
d = get_commands().copy()
|
||||
d['uni-error'] = UnicodeErrorCommand()
|
||||
d['bytes-error'] = BytesErrorCommand()
|
||||
d['mix-error'] = MixOfBothCommand()
|
||||
d['wtf-error'] = WtfExceptionCommand()
|
||||
return d
|
||||
|
||||
monkeypatch.setattr(tenant_command, 'get_commands', new_get_commands)
|
||||
|
||||
klass = get_commands()['tenant_command']
|
||||
if not hasattr(klass, '__call__'):
|
||||
klass = load_command_class(klass, 'tenant_command')
|
||||
|
||||
capsys.readouterr()
|
||||
with pytest.raises(SystemExit):
|
||||
klass.run_from_argv(['manage.py', 'tenant_command', 'uni-error', '--all-tenants'])
|
||||
captured = capsys.readouterr()
|
||||
assert u'héhé' in captured.err
|
||||
|
||||
with pytest.raises(SystemExit):
|
||||
klass.run_from_argv(['manage.py', 'tenant_command', 'bytes-error', '--all-tenants'])
|
||||
|
||||
captured = capsys.readouterr()
|
||||
assert u'héhé' in captured.err
|
||||
|
||||
with pytest.raises(SystemExit):
|
||||
klass.run_from_argv(['manage.py', 'tenant_command', 'mix-error', '--all-tenants'])
|
||||
|
||||
captured = capsys.readouterr()
|
||||
assert repr(b'héhé') in captured.err
|
||||
assert repr(u'hého') in captured.err
|
||||
|
||||
with pytest.raises(SystemExit):
|
||||
klass.run_from_argv(['manage.py', 'tenant_command', 'wtf-error', '--all-tenants'])
|
||||
|
||||
captured = capsys.readouterr()
|
||||
assert 'Unrepresentable exception' in captured.err
|
||||
|
|
Loading…
Reference in New Issue