utils: add tools to execute actions outisde of transactions (#31204)
gitea/passerelle/pipeline/head This commit looks good Details

First use of it is to create ResourceLog objects after the current
transaction commit or abort.
This commit is contained in:
Benjamin Dauvergne 2019-03-07 19:41:54 +01:00
parent 4789f1e1ff
commit f63e250e0d
5 changed files with 288 additions and 1 deletions

View File

@ -39,6 +39,7 @@ import passerelle
from passerelle.forms import GenericConnectorForm
from passerelle.utils import ImportSiteError
from passerelle.utils.api import endpoint
from passerelle.utils.defer import run_later_if_in_transaction
from passerelle.utils.jsonresponse import APIError
from passerelle.utils.sftp import SFTP, SFTPField
@ -1058,7 +1059,10 @@ class ProxyLogger:
(exc_type, exc_value, dummy) = sys.exc_info()
attr['extra']['error_summary'] = traceback.format_exception_only(exc_type, exc_value)
ResourceLog.objects.create(**attr)
# keep log even if transaction fails if:
# * it's at least a warning
# * or if logger is configured for debug
run_later_if_in_transaction(ResourceLog.objects.create, **attr)
admins = settings.ADMINS
logging_parameters = self.connector.logging_parameters

View File

@ -77,6 +77,7 @@ MIDDLEWARE = (
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
'passerelle.utils.defer.run_later_middleware',
)
ROOT_URLCONF = 'passerelle.urls'

96
passerelle/utils/defer.py Normal file
View File

@ -0,0 +1,96 @@
# passerelle - uniform access to multiple data sources and services
# Copyright (C) 2023 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import contextlib
from contextvars import ContextVar
from django.core.management import BaseCommand
from django.db import connection
# See https://docs.python.org/3/library/contextvars.html
# ContextVar are concurrency-safe variables, they are thread safe (like
# threading.local()) and coroutine (asyncio) safe.
run_later_context: ContextVar[list] = ContextVar('run_later_context')
def is_in_transaction():
return getattr(connection, 'in_atomic_block', False)
@contextlib.contextmanager
def run_later_scope():
try:
run_later_context.get()
except LookupError:
callbacks = []
token = run_later_context.set(callbacks)
try:
yield
finally:
run_later_context.reset(token)
for func, args, kwargs in callbacks:
func(*args, **kwargs)
else:
# nested scopes have not effect, callbacks will always be called by the
# most enclosing scope, i.e. in this case:
# with run_later_scope():
# with run_later_scope():
# run_later(f)
# (1)
# ..other statements..
# (2)
#
# the function will be called at point (2), not (1)
yield
def run_later(func, *args, **kwargs):
try:
callbacks = run_later_context.get()
except LookupError:
# no scope, run immediately
return func(*args, **kwargs)
else:
callbacks.append((func, args, kwargs))
return None
def run_later_if_in_transaction(func, *args, **kwargs):
if is_in_transaction():
return run_later(func, *args, **kwargs)
else:
return func(*args, **kwargs)
class run_later_middleware:
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
with run_later_scope():
return self.get_response(request)
# monkeypatch BaseCommand execute to provide the same service to commands
old_BaseCommand_execute = BaseCommand.execute
def BaseCommand_execute(self, *args, **kwargs):
with run_later_scope():
return old_BaseCommand_execute(self, *args, **kwargs)
BaseCommand.execute = BaseCommand_execute

View File

@ -1,3 +1,19 @@
# passerelle - uniform access to multiple data sources and services
# Copyright (C) 2023 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import datetime
import itertools
import logging
@ -5,6 +21,7 @@ import logging
import pytest
import requests
from django.core.exceptions import ValidationError
from django.db import transaction
from django.utils.log import AdminEmailHandler
from httmock import HTTMock
@ -13,6 +30,7 @@ from passerelle.apps.feeds.models import Feed
from passerelle.base.models import ProxyLogger, ResourceLog
from passerelle.contrib.stub_invoices.models import StubInvoicesConnector
from passerelle.utils.api import endpoint
from passerelle.utils.defer import run_later_scope
from passerelle.utils.jsonresponse import APIError
from tests.test_availability import down_mock, up_mock
@ -388,3 +406,33 @@ def test_proxy_logger_bytes(db, connector):
base_logger.debug('test', extra={'payload': b'\xff\xff'})
log = ResourceLog.objects.latest('id')
assert log.extra == {'payload': '\\xff\\xff'}
def test_log_in_transaction(transactional_db, connector):
qs = ResourceLog.objects.all()
assert not qs.exists()
class MyError(Exception):
pass
# without run_later_scope logs inside transactions are lost
try:
with transaction.atomic():
connector.logger.info('info')
connector.logger.warning('warning')
raise MyError
except MyError:
pass
assert qs.count() == 0
# with run_later_scope logs inside transaction are kept, because they
# inserted in the db after the rollback
try:
with run_later_scope():
with transaction.atomic():
connector.logger.info('info')
connector.logger.warning('warning')
raise MyError
except MyError:
pass
assert qs.count() == 2

138
tests/test_utils_defer.py Normal file
View File

@ -0,0 +1,138 @@
# Copyright (C) 2023 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import threading
from django.core.management import BaseCommand, call_command
from django.db import transaction
from passerelle.utils.defer import (
run_later,
run_later_if_in_transaction,
run_later_middleware,
run_later_scope,
)
def test_run_later():
x = []
def f():
x.append(1)
run_later(f)
assert x == [1]
with run_later_scope():
run_later(f)
assert x == [1]
assert x == [1, 1]
def test_run_later_nested():
x = []
def f():
x.append(1)
run_later(f)
assert x == [1]
with run_later_scope():
with run_later_scope():
run_later(f)
assert x == [1]
# f is run by the most enclosing scope
assert x == [1, 1]
def test_run_threading():
x = []
def f(i):
x.append(i)
run_later(f, 1)
assert x == [1]
with run_later_scope():
run_later(f, 2)
assert x == [1]
thread = threading.Thread(target=run_later, args=(f, 3))
thread.start()
thread.join()
assert x == [1, 3]
assert x == [1, 3, 2]
def test_run_later_if_in_transaction(transactional_db):
x = []
def f():
x.append(1)
run_later_if_in_transaction(f)
assert x == [1]
with run_later_scope():
run_later_if_in_transaction(f)
assert x == [1, 1]
with transaction.atomic():
run_later_if_in_transaction(f)
assert x == [1, 1]
assert x == [1, 1]
assert x == [1, 1, 1]
def test_middleware(rf):
x = []
def view1(request):
def f():
x.append(1)
assert x == []
run_later(f)
assert x == [1]
request = rf.get('/')
view1(request)
assert x == [1]
x = []
def view2(request):
def f():
x.append(1)
assert x == []
run_later(f)
assert x == []
run_later_middleware(view2)(request)
assert x == [1]
def test_base_command():
x = []
def f():
x.append(1)
class MyCommand(BaseCommand):
def handle(self, *args, **kwargs):
assert x == []
run_later(f)
assert x == []
call_command(MyCommand())
assert x == [1]