misc: limit site import to a subset of settings (#11484) #983

Merged
fpeters merged 3 commits from wip/11484-import-settings-subset into main 2024-01-12 16:15:56 +01:00
4 changed files with 74 additions and 40 deletions

View File

@ -1,6 +1,7 @@
import collections
import json
import os
import pickle
import shutil
import sys
import tempfile
@ -445,7 +446,7 @@ def test_import_site():
def test_export_site(tmp_path):
create_temporary_pub()
pub = create_temporary_pub()
Workflow.wipe()
FormDef.wipe()
formdef = FormDef()
@ -457,6 +458,8 @@ def test_export_site(tmp_path):
call_command('export_site', '--domain=example.net', f'--output={site_zip_path}')
with zipfile.ZipFile(site_zip_path, mode='r') as zfile:
assert set(zfile.namelist()) == {'formdefs_xml/1', 'config.pck'}
assert 'postgresql' in pub.cfg
assert 'postgresql' not in pickle.loads(zfile.read('config.pck'))
def test_shell():

View File

@ -216,6 +216,34 @@ def test_import_config_zip():
assert pub.cfg['sp'] == {'what': 'ever'}
def test_import_config_zip_no_overwrite():
pub = create_temporary_pub()
pub.cfg['emails'] = {'smtp_server': 'xxx'}
pub.cfg['misc'] = {'sitename': 'xxx'}
pub.write_cfg()
c = io.BytesIO()
with zipfile.ZipFile(c, 'w') as z:
z.writestr(
'config.pck',
pickle.dumps(
{
'language': {'language': 'fr'},
'emails': {'smtp_server': 'yyy', 'email-tracking-code-reminder': 'Hello!'},
'misc': {'sitename': 'yyy', 'default-zoom-level': '13'},
'filetypes': {1: {'mimetypes': ['application/pdf'], 'label': 'Documents PDF'}},
}
),
)
c.seek(0)
pub.import_zip(c, overwrite_settings=False)
assert pub.cfg['language'] == {'language': 'fr'}
assert pub.cfg['emails'] == {'smtp_server': 'xxx', 'email-tracking-code-reminder': 'Hello!'}
assert pub.cfg['misc'] == {'sitename': 'xxx', 'default-zoom-level': '13'}
assert pub.cfg['filetypes'] == {1: {'mimetypes': ['application/pdf'], 'label': 'Documents PDF'}}
def clear_log_files():
shutil.rmtree(os.path.join(get_publisher().APP_DIR, 'cron-logs'), ignore_errors=True)
for log_dir in glob.glob(os.path.join(get_publisher().APP_DIR, '*', 'cron-logs')):

View File

@ -19,6 +19,7 @@ import hashlib
import io
import mimetypes
import os
import pickle
try:
import lasso
@ -787,7 +788,7 @@ class SettingsDirectory(AccessControlled, Directory):
CheckboxWidget, 'comment_template_categories', title=_('Comment Templates Categories'), value=True
)
form.add(CheckboxWidget, 'data_source_categories', title=_('Data Sources Categories'), value=True)
form.add(CheckboxWidget, 'settings', title=_('Settings'), value=False)
form.add(CheckboxWidget, 'settings', title=_('Settings (customisation sections)'), value=False)
form.add(CheckboxWidget, 'datasources', title=_('Data sources'), value=True)
form.add(CheckboxWidget, 'mail-templates', title=_('Mail templates'), value=True)
form.add(CheckboxWidget, 'comment-templates', title=_('Comment templates'), value=True)
@ -1460,7 +1461,10 @@ class SiteExporter:
job.increment_count()
if self.settings:
z.write(os.path.join(self.app_dir, 'config.pck'), 'config.pck')
cfg = copy.copy(get_publisher().cfg)
cfg.pop('postgresql', None) # remove as it may be sensitive
z_info = zipfile.ZipInfo.from_file(os.path.join(self.app_dir, 'config.pck'), 'config.pck')
z.writestr(z_info, pickle.dumps(cfg, protocol=2))
if job:
job.increment_count()
for f in os.listdir(self.app_dir):
@ -1490,7 +1494,9 @@ class SiteImportAfterJob(AfterJob):
def execute(self):
error = None
try:
results = get_publisher().import_zip(io.BytesIO(self.kwargs['site_import_zip_content']))
results = get_publisher().import_zip(
io.BytesIO(self.kwargs['site_import_zip_content']), overwrite_settings=False
)
results['mail_templates'] = results['mail-templates']
results['comment_templates'] = results['comment-templates']
except zipfile.BadZipfile:

View File

@ -14,6 +14,7 @@
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import fnmatch
import io
import json
import os
@ -25,7 +26,6 @@ import traceback
import zipfile
from contextlib import ExitStack, contextmanager
from django.utils.encoding import force_str
from django.utils.timezone import localtime
from . import custom_views, data_sources, formdef, sessions
@ -237,7 +237,7 @@ class WcsPublisher(QommonPublisher):
self.get_site_option('local-region-code') or self.get_site_option('default-country-code') or 'FR'
)
def import_zip(self, fd):
def import_zip(self, fd, overwrite_settings=True):
results = {
'formdefs': 0,
'carddefs': 0,
@ -259,31 +259,6 @@ class WcsPublisher(QommonPublisher):
'apiaccess': 0,
}
def _decode_list(data):
rv = []
for item in data:
if isinstance(item, str):
item = force_str(item)
elif isinstance(item, list):
item = _decode_list(item)
elif isinstance(item, dict):
item = _decode_dict(item)
rv.append(item)
return rv
def _decode_dict(data):
rv = {}
for key, value in data.items():
key = force_str(key)
if isinstance(value, str):
value = force_str(value)
elif isinstance(value, list):
value = _decode_list(value)
elif isinstance(value, dict):
value = _decode_dict(value)
rv[key] = value
return rv
now = localtime()
for filename in ('config.pck', 'config.json'):
filepath = os.path.join(self.app_dir, filename)
@ -314,16 +289,38 @@ class WcsPublisher(QommonPublisher):
if f == 'config.pck':
d = pickle.loads(data)
else:
d = json.loads(force_str(data), object_hook=_decode_dict)
if 'sp' in self.cfg:
current_sp = self.cfg['sp']
d = json.loads(data)
if overwrite_settings:
if 'sp' in self.cfg:
current_sp = self.cfg['sp']
else:
current_sp = None
self.cfg = d
if current_sp:
self.cfg['sp'] = current_sp
elif 'sp' in self.cfg:
del self.cfg['sp']
else:
current_sp = None
self.cfg = d
if current_sp:
self.cfg['sp'] = current_sp
elif 'sp' in self.cfg:
del self.cfg['sp']
# only update a subset of settings, critical system parts such as
# authentication and database settings are not overwritten.
for section, section_parts in (
('emails', ('email-*',)),
('filetypes', '*'),
('language', '*'),
('misc', ('default-position', 'default-zoom-level')),
('sms', '*'),
('submission-channels', '*'),
('texts', '*'),
('users', ('*_template',)),
):
if section not in d:
continue
if section not in self.cfg:
self.cfg[section] = {}
for key in d[section]:
for pattern in section_parts:
if fnmatch.fnmatch(str(key), pattern):
self.cfg[section][key] = d[section][key]
self.write_cfg()
continue
with open(path, 'wb') as fd: