diff --git a/passerelle/utils/files.py b/passerelle/utils/files.py new file mode 100644 index 00000000..50661b31 --- /dev/null +++ b/passerelle/utils/files.py @@ -0,0 +1,53 @@ +# Copyright (C) 2019 Entr'ouvert +# +# This program is free software: you can redistribute it and/or modify it +# under the terms of the GNU Affero General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +import os.path +import contextlib +import tempfile +import errno + +from django.conf import settings + + +@contextlib.contextmanager +def atomic_write(filepath, **kwargs): + '''Return a file descriptor to a temporary file using NamedTemporaryFile + which will be atomically renamed to filepath if possible. + + Atomic renaming is only possible on the same filesystem, so the + temporary file will be created in the same directory as the target file + + You can pass any possible argument to NamedTemporaryFile with kwargs. + ''' + + tmp_dir = kwargs.pop('dir', None) + if not tmp_dir: + tmp_dir = os.path.join(settings.MEDIA_ROOT, 'tmp') + if not os.path.exists(tmp_dir): + try: + os.makedirs(tmp_dir) + except OSError as e: + if e.errno != errno.EEXIST: + raise + fd = tempfile.NamedTemporaryFile(dir=tmp_dir, delete=False, **kwargs) + try: + with fd: + yield fd + fd.flush() + os.fsync(fd.fileno()) + os.rename(fd.name, filepath) + except Exception: + os.unlink(fd.name) + raise diff --git a/tests/test_utils_files.py b/tests/test_utils_files.py new file mode 100644 index 00000000..fab9f53b --- /dev/null +++ b/tests/test_utils_files.py @@ -0,0 +1,42 @@ +# Copyright (C) 2019 Entr'ouvert +# +# This program is free software: you can redistribute it and/or modify it +# under the terms of the GNU Affero General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +import os + +import pytest + +from passerelle.utils.files import atomic_write + + +def test_atomic_write(settings, tmpdir): + settings.MEDIA_ROOT = str(tmpdir.mkdir('media')) + target_dir = tmpdir.mkdir('target') + filepath = str(target_dir.join('test')) + + assert not os.path.exists(os.path.join(settings.MEDIA_ROOT, 'tmp')) + with pytest.raises(Exception): + with atomic_write(filepath) as fd: + fd.write('coucou') + raise Exception() + assert os.path.exists(os.path.join(settings.MEDIA_ROOT, 'tmp')) + assert not os.path.exists(filepath) + assert os.listdir(str(target_dir)) == [] + + with atomic_write(filepath) as fd: + fd.write('coucou') + + assert os.path.exists(filepath) + with open(filepath) as fd: + assert fd.read() == 'coucou'