misc: remove unused files (#27255)

This commit is contained in:
Frédéric Péters 2019-07-17 11:15:50 +02:00
parent c49c2fe590
commit 69a12b7ec7
2 changed files with 208 additions and 2 deletions

View File

@ -1,17 +1,21 @@
import cPickle
import datetime
import glob
import json
import os
import sys
import shutil
import time
import pytest
from django.utils.six import StringIO
from quixote import cleanup
from wcs import formdef
from wcs import fields
from wcs.formdef import FormDef
from wcs.qommon.http_request import HTTPRequest
from wcs.workflows import Workflow
from wcs.qommon.form import PicklableUpload
from wcs.workflows import Workflow, AttachmentEvolutionPart
from wcs.fields import StringField, FileField, DateField, ItemField, PageField
from utilities import create_temporary_pub, clean_temporary_pub
@ -229,3 +233,146 @@ def test_page_field_migration(pub):
assert formdef.fields[0].post_conditions == [
{'condition': {'type': 'python', 'value': 'blah'},
'error_message': 'blah'}]
def test_unused_file_removal_job(pub):
from wcs.formdef import clean_unused_files
FormDef.wipe()
formdef = FormDef()
formdef.name = 'removal job'
formdef.fields = [
fields.FileField(id='5', label='file', varname='filefield'),
]
formdef.store()
formdef.data_class().wipe()
formdata = formdef.data_class()()
formdata.just_created()
formdata.data = {
'5': PicklableUpload('test.txt', 'text/plain'),
}
formdata.data['5'].receive(['hello world'])
formdata.store()
assert formdata.data['5'].qfilename in os.listdir(os.path.join(pub.app_dir, 'uploads'))
clean_unused_files(pub)
assert os.listdir(os.path.join(pub.app_dir, 'uploads')) == [formdata.data['5'].qfilename]
formdata.anonymise()
clean_unused_files(pub)
assert len(os.listdir(os.path.join(pub.app_dir, 'uploads'))) == 1 # file is not removed
if not pub.site_options.has_section('options'):
pub.site_options.add_section('options')
for behaviour in ('move', 'remove'):
pub.site_options.set('options', 'unused-files-behaviour', behaviour)
formdata = formdef.data_class()()
formdata.just_created()
formdata.data = {
'5': PicklableUpload('test.txt', 'text/plain'),
}
formdata.data['5'].receive(['hello world'])
formdata.store()
assert formdata.data['5'].qfilename in os.listdir(os.path.join(pub.app_dir, 'uploads'))
clean_unused_files(pub)
assert os.listdir(os.path.join(pub.app_dir, 'uploads')) == [formdata.data['5'].qfilename]
formdata.anonymise()
clean_unused_files(pub)
assert os.listdir(os.path.join(pub.app_dir, 'uploads')) == []
for i in range(5):
formdata = formdef.data_class()()
formdata.just_created()
formdata.data = {
'5': PicklableUpload('test.txt', 'text/plain'),
}
formdata.data['5'].receive(['hello world'])
formdata.store()
# same file, deduplicated
assert os.listdir(os.path.join(pub.app_dir, 'uploads')) == [formdata.data['5'].qfilename]
formdata.anonymise()
clean_unused_files(pub)
assert len(os.listdir(os.path.join(pub.app_dir, 'uploads'))) == 1
for formdata in formdef.data_class().select():
formdata.anonymise()
assert len(os.listdir(os.path.join(pub.app_dir, 'uploads'))) == 1
clean_unused_files(pub)
assert os.listdir(os.path.join(pub.app_dir, 'uploads')) == []
# file referenced in formdef option
workflow = Workflow(name='variables')
from wcs.workflows import WorkflowVariablesFieldsFormDef
workflow.variables_formdef = WorkflowVariablesFieldsFormDef(workflow=workflow)
workflow.variables_formdef.fields.append(fields.FileField(id='1', label='Test', type='file'))
workflow.add_status('Status1', 'st1')
workflow.store()
formdef.workflow = workflow
formdef.workflow_options = {'1': PicklableUpload('test.txt', 'text/plain')}
formdef.workflow_options['1'].receive(['hello world'])
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
formdata.data = {
'5': PicklableUpload('test.txt', 'text/plain'),
}
formdata.data['5'].receive(['hello world'])
formdata.store()
assert len(os.listdir(os.path.join(pub.app_dir, 'uploads'))) == 1
clean_unused_files(pub)
formdata.remove_self()
clean_unused_files(pub)
assert len(os.listdir(os.path.join(pub.app_dir, 'uploads'))) == 1
formdef.workflow_options = {}
formdef.store()
clean_unused_files(pub)
assert len(os.listdir(os.path.join(pub.app_dir, 'uploads'))) == 0
# workflow attachment
formdata = formdef.data_class()()
formdata.just_created()
formdata.data = {}
formdata.store()
formdata.evolution[-1].parts = [AttachmentEvolutionPart('hello.txt',
fp=StringIO('hello world'), varname='testfile')]
formdata.store()
assert len(glob.glob(os.path.join(pub.app_dir, 'attachments', '*/*'))) == 1
clean_unused_files(pub)
assert len(glob.glob(os.path.join(pub.app_dir, 'attachments', '*/*'))) == 1
formdata.anonymise()
clean_unused_files(pub)
assert len(glob.glob(os.path.join(pub.app_dir, 'attachments', '*/*'))) == 0
# files in user profile
from wcs.admin.settings import UserFieldsFormDef
user_formdef = UserFieldsFormDef(pub)
user_formdef.fields.append(fields.FileField(id='3', label='test', type='file'))
user_formdef.store()
user = pub.user_class()
user.email = 'bar@localhost'
user.form_data = {'3': PicklableUpload('test.txt', 'text/plain')}
user.form_data['3'].receive(['hello world 2'])
user.store()
assert len(os.listdir(os.path.join(pub.app_dir, 'uploads'))) == 1
clean_unused_files(pub)
assert len(os.listdir(os.path.join(pub.app_dir, 'uploads'))) == 1
user.remove_self()
assert len(os.listdir(os.path.join(pub.app_dir, 'uploads'))) == 1
clean_unused_files(pub)
assert len(os.listdir(os.path.join(pub.app_dir, 'uploads'))) == 0
if behaviour == 'move':
# 2 files ("hello world" + "hello world 2")
assert len(os.listdir(os.path.join(pub.app_dir, 'unused-files/uploads/'))) == 2
# 1 attachment
assert len(glob.glob(os.path.join(pub.app_dir, 'unused-files/attachments/*/*'))) == 1

View File

@ -16,6 +16,7 @@
import base64
import copy
import glob
import new
import pickle
import sys
@ -1533,8 +1534,66 @@ def clean_drafts(publisher):
st.Less('receipt_time', removal_date.timetuple())]):
formdata.remove_self()
def clean_unused_files(publisher):
from wcs.wf.attachment import AttachmentEvolutionPart
unused_files_behaviour = publisher.get_site_option('unused-files-behaviour')
if unused_files_behaviour not in ('move', 'remove'):
return
known_filenames = set()
known_filenames.update([x for x in glob.glob(os.path.join(publisher.app_dir, 'uploads/*'))])
known_filenames.update([x for x in glob.glob(os.path.join(publisher.app_dir, 'attachments/*/*'))])
def accumulate_filenames():
for formdef in FormDef.select(ignore_migration=True):
for option_data in (formdef.workflow_options or {}).values():
if isinstance(option_data, PicklableUpload):
yield option_data.get_filename()
for formdata in formdef.data_class().select(ignore_errors=True):
for field_data in (formdata.data or {}).values() + (formdata.workflow_data or {}).values():
if isinstance(field_data, PicklableUpload):
yield field_data.get_filename()
for evolution in (formdata.evolution or []):
for part in (evolution.parts or []):
if isinstance(part, AttachmentEvolutionPart):
yield part.filename
for user in publisher.user_class.select():
for field_data in (user.form_data or {}).values():
if isinstance(field_data, PicklableUpload):
yield field_data.get_filename()
used_filenames = set()
for filename in accumulate_filenames():
if not os.path.isabs(filename):
filename = os.path.join(publisher.app_dir, filename)
used_filenames.add(filename)
unused_filenames = known_filenames - used_filenames
for filename in unused_filenames:
try:
if unused_files_behaviour == 'move':
new_filename = os.path.join(publisher.app_dir, 'unused-files', filename[len(publisher.app_dir)+1:])
if os.path.exists(new_filename):
os.unlink(filename)
else:
new_dirname = os.path.dirname(new_filename)
if not os.path.exists(new_dirname):
os.makedirs(new_dirname)
os.rename(filename, new_filename)
else:
os.unlink(filename)
except OSError:
pass
if get_publisher_class():
# once a month, look for drafts to remove
get_publisher_class().register_cronjob(CronJob(clean_drafts,
name='clean_drafts',
days=[2], hours=[0], minutes=[0]))
# once a day, look for unused files
get_publisher_class().register_cronjob(CronJob(clean_unused_files,
name='clean_unused_files',
hours=[2], minutes=[0]))