multitenant: rename uwsgidecorators to spooler (#76423)
gitea/hobo/pipeline/head This commit looks good Details

Fixed: key and values must be passed as bytes, and decoded if necessary
at reception.

Logging of succesfull spool jobs is augmented with the duration.

Add a universal 'launch_in_spooler' function which does not necessitate
to declare a .spooler module, given that the function can be pickled
(works with global functions, class methods and instance methods, if the
instance is picklable).

Remove any limitation on the size of arguments to spooled functions,
using the body parameter of spool jobs (which can be of any size).

Keeps compatibility with hobo.provisionning.spooler/uwsgidecorators
until all bricks are moved to the new interface.
This commit is contained in:
Benjamin Dauvergne 2023-04-08 12:14:18 +02:00
parent 2daa7b3170
commit 56a97bb138
10 changed files with 323 additions and 214 deletions

1
debian/control vendored
View File

@ -39,7 +39,6 @@ Depends: python3-django (>= 1.8),
python3-gadjo,
python3-hobo (= ${binary:Version}),
python3-psycopg2,
python3-uwsgidecorators,
uwsgi,
uwsgi-plugin-python3,
Recommends: erlang-nox (>= 1:17.1),

View File

@ -13,8 +13,7 @@ chmod-socket = 666
vacuum = true
spooler-processes = 3
spooler-python-import = hobo.applications.spooler
spooler-python-import = hobo.provisionning.spooler
spooler-python-import = hobo.multitenant.spooler
spooler-max-tasks = 20
master = true

View File

@ -18,14 +18,13 @@ import collections
import io
import json
import os
import sys
import tarfile
import traceback
import urllib.parse
from django.conf import settings
from django.core.files.base import ContentFile
from django.db import connection, models
from django.db import models
from django.db.models import JSONField
from django.utils.text import slugify
from django.utils.timezone import now
@ -34,6 +33,7 @@ from django.utils.translation import gettext_lazy as _
from hobo.deploy.signals import notify_agents
from hobo.environment.models import Variable
from hobo.environment.utils import get_installed_services
from hobo.multitenant import spooler
from .utils import Requests
@ -565,15 +565,10 @@ class AsyncJob(models.Model):
progression_urls = JSONField(blank=True, default=dict)
details = JSONField(blank=True, default=dict)
raise_exception = True
def run(self, spool=False):
if 'uwsgi' in sys.modules and spool:
from hobo.applications.spooler import run_job
tenant = getattr(connection, 'tenant', None)
domain = getattr(tenant, 'domain_url', '')
run_job.spool(domain=domain.encode(), job_id=str(self.pk).encode())
def run(self, spool=False, raise_exception=True):
# tests depends on raised Exception envent if spool is True
if spool and spooler.uwsgi:
spooler.run(self.run, raise_exception=False)
return
self.status = 'running'
self.save()
@ -591,12 +586,12 @@ class AsyncJob(models.Model):
except ApplicationError as e:
self.status = 'failed'
self.exception = e.msg
if self.raise_exception:
if raise_exception:
raise
except Exception:
self.status = 'failed'
self.exception = traceback.format_exc()
if self.raise_exception:
if raise_exception:
raise
finally:
if self.status == 'running':

View File

@ -1,31 +0,0 @@
# hobo - portal to configure and deploy applications
# Copyright (C) 2015-2022 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from uwsgidecorators import spool
from hobo.provisionning.spooler import ensure_db, set_connection
from .models import AsyncJob
@spool
@ensure_db
def run_job(args):
set_connection(args['domain'])
job = AsyncJob.objects.get(id=args['job_id'])
job.raise_exception = False
job.run()
print('got job:', job)

186
hobo/multitenant/spooler.py Normal file
View File

@ -0,0 +1,186 @@
# Copyright (C) 2021 Entr'ouvert
import contextlib
import datetime
import functools
import logging
import pickle
import sys
import time
try:
import uwsgi
except ImportError:
uwsgi = None
logger = logging.getLogger('spooler')
@contextlib.contextmanager
def close_db():
if 'django' in sys.modules:
from django.db import close_old_connections
close_old_connections()
try:
yield None
finally:
close_old_connections()
else:
yield
@contextlib.contextmanager
def tenant_context(domain):
if domain:
from tenant_schemas.utils import tenant_context
from hobo.multitenant.middleware import TenantMiddleware
tenant = TenantMiddleware.get_tenant_by_hostname(domain)
with tenant_context(tenant):
yield
else:
yield
def get_tenant():
if 'django.db' not in sys.modules:
return ''
from django.db import connection
tenant_model = getattr(connection, 'tenant', None)
return getattr(tenant_model, 'domain_url', '')
class Retry(Exception):
at = None
def __init__(self, at=None):
if isinstance(at, datetime.datetime):
self.at = at
elif isinstance(at, int):
self.at = datetime.datetime.now() + datetime.timedelta(seconds=at)
def raw_spool(name, args, kwargs, spooler=None, at=None, priority=None):
assert uwsgi, 'uwsgi is not available'
kwargs = {
b'name': name.encode(),
b'tenant': get_tenant().encode(),
b'body': pickle.dumps({'args': args, 'kwargs': kwargs}),
}
if spooler:
kwargs[b'spooler'] = spooler.encode()
if isinstance(at, datetime.datetime):
kwargs[b'at'] = str(at.timestamp()).encode()
if isinstance(priority, (str, int)):
kwargs[b'priority'] = str(priority)
uwsgi.spool(kwargs)
class SpoolFunc:
def __init__(self, name, spooler=None, at=None, priority=None):
self.name = name
self.spooler = spooler
self.at = at
self.priority = priority
def context(self, spooler=None, at=None, priority=None):
return SpoolFunc(
self.name, spooler=spooler or self.spooler, at=at or self.at, priority=priority or self.priority
)
def __call__(self, *args, **kwargs):
raw_spool(self.name, args, kwargs, spooler=self.spooler, at=self.at, priority=self.priority)
spooler_registry = {}
def spool(func):
if uwsgi:
name = '%s.%s' % (func.__module__, func.__qualname__)
spooler_registry[name] = func
wrapper = functools.wraps(func)(SpoolFunc(name))
else:
@functools.wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Retry:
pass
wrapper.func = func
return wrapper
@spool
def run(func, *args, **kwargs):
name = '%s.%s' % (func.__module__, func.__qualname__)
start = time.time()
try:
func(*args, **kwargs)
except Retry as e:
if e.at:
logger.info('retrying function %s at %s (%f seconds)', name, e.at, time.time() - start)
else:
logger.info('retrying function %s (%f seconds)', name, time.time() - start)
raise e
except Exception:
# interpolating name in message to make the function name part of the trace fingerprint (sentry)
logger.exception(
f'failed function {name} (%%f seconds)', # noqa pylint: disable=logging-not-lazy
time.time() - start,
)
else:
logger.info('finished function %s (%f seconds)', name, time.time() - start)
if uwsgi:
def spooler_function(env):
# env is not encoded uniformly :/
env = {key.decode() if isinstance(key, bytes) else key: value for key, value in env.items()}
start = time.time()
try:
try:
name = env.get('name').decode()
tenant = env.get('tenant', b'').decode()
body = env.get('body')
except Exception as e:
logger.error('failure, env parsing failed env.keys()=%s error=%s', env.keys(), e)
return uwsgi.SPOOL_OK
try:
params = pickle.loads(body)
args = params['args']
kwargs = params['kwargs']
except Exception as e:
logger.exception('depickling of body failed, name=%s tenant=%s error=%s', name, tenant, e)
return uwsgi.SPOOL_OK
try:
function = spooler_registry[name]
except KeyError:
logger.error('no function named "%s" in spooler_registry', name)
# prevent connections to leak between jobs
# maintain current tenant when spool is launched
ret = uwsgi.SPOOL_OK
with close_db(), tenant_context(tenant):
try:
function(*args, **kwargs)
except Retry as e:
if not e.at:
ret = uwsgi.SPOOL_RETRY
else:
raw_spool(name, args, kwargs, at=e.at)
return ret
except Exception:
# interpolating name in message to make the function name part of the trace fingerprint (sentry)
logger.exception(
f'failed function {name} (%%f seconds)', # noqa pylint: disable=logging-not-lazy
time.time() - start,
)
return uwsgi.SPOOL_OK
uwsgi.spooler = spooler_function

View File

@ -1,102 +0,0 @@
# Copyright (C) 2021 Entr'ouvert
import contextlib
import logging
import pickle
import sys
try:
import uwsgi
except ImportError:
uwsgi = None
logger = logging.getLogger(__name__)
spooler_registry = {}
@contextlib.contextmanager
def close_db():
if 'django' in sys.modules:
from django.db import close_old_connections
close_old_connections()
try:
yield None
finally:
close_old_connections()
else:
yield
@contextlib.contextmanager
def tenant_context(domain):
if domain:
from tenant_schemas.utils import tenant_context
from hobo.multitenant.middleware import TenantMiddleware
tenant = TenantMiddleware.get_tenant_by_hostname(domain)
with tenant_context(tenant):
yield
else:
yield
def get_tenant():
if 'django.db' not in sys.modules:
return ''
from django.db import connection
tenant_model = getattr(connection, 'tenant', None)
return getattr(tenant_model, 'domain_url', '')
def spool(func):
if uwsgi:
name = '%s.%s' % (func.__module__, func.__name__)
spooler_registry[name] = func
def spool_function(*args, **kwargs):
uwsgi.spool(
name=name.encode(),
tenant=get_tenant().encode(),
body=pickle.dumps({'args': args, 'kwargs': kwargs}),
)
logger.debug('spooler: spooled function %s', name)
func.spool = spool_function
return func
if uwsgi:
def spooler_function(env):
try:
try:
name = env.get('name').decode()
tenant = env.get('tenant', b'').decode()
body = env.get('body')
except Exception:
logger.error('spooler: no name or body found: env.keys()=%s', env.keys())
return uwsgi.SPOOL_OK
try:
params = pickle.loads(body)
args = params['args']
kwargs = params['kwargs']
except Exception:
logger.exception('spooler: depickling of body failed')
return uwsgi.SPOOL_OK
try:
function = spooler_registry[name]
except KeyError:
logger.error('spooler: no function named "%s"', name)
# prevent connections to leak between jobs
# maintain current tenant when spool is launched
with close_db(), tenant_context(tenant):
function(*args, **kwargs)
except Exception:
logger.exception('spooler: function "%s" raised', name)
return uwsgi.SPOOL_OK
uwsgi.spooler = spooler_function

View File

@ -16,15 +16,14 @@
import json
import logging
import sys
import urllib.parse
from django.conf import settings
from django.db import connection
from django.http import HttpResponseBadRequest, HttpResponseForbidden, JsonResponse
from django.utils.deprecation import MiddlewareMixin
from django.utils.encoding import force_bytes, force_str
from django.utils.encoding import force_str
from hobo.multitenant import spooler
from hobo.provisionning.utils import NotificationProcessing
from hobo.rest_authentication import PublikAuthentication, PublikAuthenticationFailed
@ -62,7 +61,22 @@ class ProvisionningMiddleware(MiddlewareMixin, NotificationProcessing):
msg = 'received request for %sing %%d %%s objects (HTTP)' % action
logger.info(msg, len(notification['objects']['data']), object_type)
if 'uwsgi' in sys.modules and 'sync' not in request.GET:
if 'sync' in request.GET:
self.provision(object_type=object_type, issuer=issuer, action=action, data=data, full=full)
elif getattr(settings, 'USE_NEW_SPOOLER', False):
spooler.run(
NotificationProcessing.provision,
object_type=object_type,
issuer=issuer,
action=action,
data=data,
full=full,
)
else:
from django.db import connection
from django.utils.encoding import force_bytes
from hobo.provisionning.spooler import provision
tenant = getattr(connection, 'tenant', None)
@ -81,8 +95,6 @@ class ProvisionningMiddleware(MiddlewareMixin, NotificationProcessing):
body=force_bytes(body),
full=force_bytes(full),
)
else:
self.provision(object_type=object_type, issuer=issuer, action=action, data=data, full=full)
return JsonResponse({'err': 0})
def hobo_specific_setup(self):

View File

@ -260,6 +260,9 @@ PHONE_COUNTRY_CODES = {
'596': {'region': 'MQ', 'region_desc': _('Martinique')},
}
# use the new universal spooler
USE_NEW_SPOOLER = True
local_settings_file = os.environ.get(
'HOBO_SETTINGS_FILE', os.path.join(os.path.dirname(__file__), 'local_settings.py')
)

View File

@ -0,0 +1,107 @@
import datetime
import importlib
import pickle
from unittest import mock
import pytest
from hobo.multitenant import spooler
@pytest.fixture
def uwsgi():
import sys
uwsgi = mock.Mock()
uwsgi.SPOOL_OK = -2
uwsgi.SPOOL_RETRY = -3
sys.modules['uwsgi'] = uwsgi
importlib.reload(spooler)
yield uwsgi
del sys.modules['uwsgi']
importlib.reload(spooler)
def test_basic():
@spooler.spool
def function(a, b):
return 1
assert function(1, 2) == 1
def test_mocked_uwsgi(uwsgi):
@spooler.spool
def function(a, b):
return 1
assert function(1, 2) is None
assert set(uwsgi.spool.call_args[0][0].keys()) == {b'body', b'tenant', b'name'}
assert pickle.loads(uwsgi.spool.call_args[0][0][b'body']) == {'args': (1, 2), 'kwargs': {}}
assert (
uwsgi.spool.call_args[0][0][b'name']
== b'tests_multitenant.test_spooler.test_mocked_uwsgi.<locals>.function'
)
assert uwsgi.spool.call_args[0][0][b'tenant'] == b''
def test_mocked_uwsgi_tenant(uwsgi, tenant):
from tenant_schemas.utils import tenant_context
@spooler.spool
def function(a, b):
pass
with tenant_context(tenant):
function(1, 2)
assert set(uwsgi.spool.call_args[0][0].keys()) == {b'body', b'tenant', b'name'}
assert pickle.loads(uwsgi.spool.call_args[0][0][b'body']) == {'args': (1, 2), 'kwargs': {}}
assert (
uwsgi.spool.call_args[0][0][b'name']
== b'tests_multitenant.test_spooler.test_mocked_uwsgi_tenant.<locals>.function'
)
assert uwsgi.spool.call_args[0][0][b'tenant'] == b'tenant.example.net'
def function(a, b):
return 1
def test_mocked_uwsgi_run(uwsgi):
spooler.run(function, 1, 2)
env = uwsgi.spool.call_args[0][0]
assert set(env.keys()) == {b'body', b'tenant', b'name'}
assert pickle.loads(env[b'body']) == {'args': (function, 1, 2), 'kwargs': {}}
assert env[b'name'] == b'hobo.multitenant.spooler.run'
assert env[b'tenant'] == b''
def with_retry(a, b):
raise spooler.Retry
def with_retry_at(a, b):
raise spooler.Retry(at=30)
def test_mocked_uwsgi_run_with_retry(uwsgi, freezer):
spooler.run(with_retry, 1, 2)
env = uwsgi.spool.call_args[0][0]
assert set(env.keys()) == {b'body', b'tenant', b'name'}
ret = spooler.spooler_function(env)
assert ret == uwsgi.SPOOL_RETRY
assert uwsgi.spool.call_count == 1
def test_mocked_uwsgi_run_with_retry_at(uwsgi, freezer):
freezer.move_to('2023-01-01T00:00:00Z')
spooler.run(with_retry_at, 1, 2)
env = uwsgi.spool.call_args[0][0]
ret = spooler.spooler_function(env)
assert ret == uwsgi.SPOOL_OK
assert uwsgi.spool.call_count == 2
env = uwsgi.spool.call_args[0][0]
assert set(env.keys()) == {b'body', b'tenant', b'name', b'at'}
assert float(env[b'at'].decode()) == datetime.datetime.now().timestamp() + 30

View File

@ -1,59 +0,0 @@
import importlib
import pickle
from unittest import mock
import pytest
import hobo.multitenant.uwsgidecorators
@pytest.fixture
def uwsgi():
import sys
uwsgi = mock.Mock()
uwsgi.SPOOL_OK = -2
sys.modules['uwsgi'] = uwsgi
importlib.reload(hobo.multitenant.uwsgidecorators)
yield uwsgi
del sys.modules['uwsgi']
importlib.reload(hobo.multitenant.uwsgidecorators)
def test_basic():
@hobo.multitenant.uwsgidecorators.spool
def function(a, b):
pass
function(1, 2)
with pytest.raises(AttributeError):
function.spool(1, 2)
def test_mocked_uwsgi(uwsgi):
@hobo.multitenant.uwsgidecorators.spool
def function(a, b):
pass
function(1, 2)
function.spool(1, 2)
assert set(uwsgi.spool.call_args[1].keys()) == {'body', 'tenant', 'name'}
assert pickle.loads(uwsgi.spool.call_args[1]['body']) == {'args': (1, 2), 'kwargs': {}}
assert uwsgi.spool.call_args[1]['name'] == b'tests_multitenant.test_uwsgidecorators.function'
assert uwsgi.spool.call_args[1]['tenant'] == b''
def test_mocked_uwsgi_tenant(uwsgi, tenant):
from tenant_schemas.utils import tenant_context
@hobo.multitenant.uwsgidecorators.spool
def function(a, b):
pass
with tenant_context(tenant):
function.spool(1, 2)
assert set(uwsgi.spool.call_args[1].keys()) == {'body', 'tenant', 'name'}
assert pickle.loads(uwsgi.spool.call_args[1]['body']) == {'args': (1, 2), 'kwargs': {}}
assert uwsgi.spool.call_args[1]['name'] == b'tests_multitenant.test_uwsgidecorators.function'
assert uwsgi.spool.call_args[1]['tenant'] == b'tenant.example.net'