utils: add an atomic_write() context manager (#32413)

This commit is contained in:
Benjamin Dauvergne 2019-04-17 12:06:18 +02:00
parent 5c344f80d3
commit c778669e90
2 changed files with 95 additions and 0 deletions

53
passerelle/utils/files.py Normal file
View File

@ -0,0 +1,53 @@
# Copyright (C) 2019 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os.path
import contextlib
import tempfile
import errno
from django.conf import settings
@contextlib.contextmanager
def atomic_write(filepath, **kwargs):
'''Return a file descriptor to a temporary file using NamedTemporaryFile
which will be atomically renamed to filepath if possible.
Atomic renaming is only possible on the same filesystem, so the
temporary file will be created in the same directory as the target file
You can pass any possible argument to NamedTemporaryFile with kwargs.
'''
tmp_dir = kwargs.pop('dir', None)
if not tmp_dir:
tmp_dir = os.path.join(settings.MEDIA_ROOT, 'tmp')
if not os.path.exists(tmp_dir):
try:
os.makedirs(tmp_dir)
except OSError as e:
if e.errno != errno.EEXIST:
raise
fd = tempfile.NamedTemporaryFile(dir=tmp_dir, delete=False, **kwargs)
try:
with fd:
yield fd
fd.flush()
os.fsync(fd.fileno())
os.rename(fd.name, filepath)
except Exception:
os.unlink(fd.name)
raise

42
tests/test_utils_files.py Normal file
View File

@ -0,0 +1,42 @@
# Copyright (C) 2019 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
import pytest
from passerelle.utils.files import atomic_write
def test_atomic_write(settings, tmpdir):
settings.MEDIA_ROOT = str(tmpdir.mkdir('media'))
target_dir = tmpdir.mkdir('target')
filepath = str(target_dir.join('test'))
assert not os.path.exists(os.path.join(settings.MEDIA_ROOT, 'tmp'))
with pytest.raises(Exception):
with atomic_write(filepath) as fd:
fd.write('coucou')
raise Exception()
assert os.path.exists(os.path.join(settings.MEDIA_ROOT, 'tmp'))
assert not os.path.exists(filepath)
assert os.listdir(str(target_dir)) == []
with atomic_write(filepath) as fd:
fd.write('coucou')
assert os.path.exists(filepath)
with open(filepath) as fd:
assert fd.read() == 'coucou'