faire de hobo_* des commandes de management django (#20411) #755

Merged
fpeters merged 2 commits from wip/20411-hobo-management-commands into main 2023-10-27 08:33:28 +02:00
8 changed files with 260 additions and 225 deletions

View File

@ -61,10 +61,10 @@ def setup_user_profile(pub):
if not pub.cfg:
pub.cfg = {}
# create some roles
from wcs.ctl.check_hobos import CmdCheckHobos
from wcs.ctl.management.commands.hobo_deploy import Command
# setup an hobo profile
CmdCheckHobos().update_profile(PROFILE, pub)
Command().update_profile(PROFILE, pub)
pub.cfg['users']['fullname_template'] = '{{ user_var_prenoms }} {{ user_var_nom }}'
pub.user_class.wipe()
pub.write_cfg()

View File

@ -1,21 +1,20 @@
import collections
import copy
import json
import os
import pickle
import shutil
import sys
import tempfile
import urllib.parse
import zipfile
from unittest import mock
import pytest
from django.core.management import call_command
from quixote import cleanup
from wcs import fields, sql
from wcs.ctl.check_hobos import CmdCheckHobos
from wcs.publisher import WcsPublisher
from wcs.compat import CompatWcsPublisher
from wcs.ctl.management.commands.hobo_deploy import Command as HoboDeployCommand
from wcs.qommon import force_str
from wcs.sql import cleanup_connection
@ -229,17 +228,17 @@ HOBO_JSON = {
@pytest.fixture
def setuptest():
hobo_cmd = CmdCheckHobos()
hobo_cmd = HoboDeployCommand()
hobo_cmd.all_services = HOBO_JSON
WcsPublisher.APP_DIR = tempfile.mkdtemp()
CompatWcsPublisher.APP_DIR = tempfile.mkdtemp()
pub = create_temporary_pub()
pub.cfg['language'] = {'language': 'en'}
yield pub, hobo_cmd
cleanup_connection()
clean_temporary_pub()
if os.path.exists(WcsPublisher.APP_DIR):
shutil.rmtree(WcsPublisher.APP_DIR)
if os.path.exists(CompatWcsPublisher.APP_DIR):
shutil.rmtree(CompatWcsPublisher.APP_DIR)
@pytest.fixture
@ -251,12 +250,12 @@ def alt_tempdir():
@pytest.fixture
def deploy_setup(alt_tempdir):
WcsPublisher.APP_DIR = alt_tempdir
CompatWcsPublisher.APP_DIR = alt_tempdir
with open(os.path.join(alt_tempdir, 'hobo.json'), 'w') as fd:
hobo_json = copy.deepcopy(HOBO_JSON)
del hobo_json['services'][1] # authentic
fd.write(json.dumps(HOBO_JSON))
skeleton_dir = os.path.join(WcsPublisher.APP_DIR, 'skeletons')
skeleton_dir = os.path.join(CompatWcsPublisher.APP_DIR, 'skeletons')
if not os.path.exists(skeleton_dir):
os.mkdir(skeleton_dir)
with open(os.path.join(skeleton_dir, 'export-test.wcs'), 'wb') as f:
@ -313,7 +312,8 @@ def test_configure_site_options(setuptest, alt_tempdir):
assert pub.get_site_option('local-region-code') == 'FR'
def test_update_configuration(setuptest):
def test_update_configuration(setuptest, settings):
settings.THEMES_DIRECTORY = os.path.join(os.path.dirname(__file__), 'themes')
pub, hobo_cmd = setuptest
service = [x for x in HOBO_JSON.get('services', []) if x.get('service-id') == 'wcs'][0]
hobo_cmd.update_configuration(service, pub)
@ -325,7 +325,8 @@ def test_update_configuration(setuptest):
assert pub.cfg['sms']['sender'] == 'EO'
def test_update_themes(setuptest):
def test_update_themes(setuptest, settings):
settings.THEMES_DIRECTORY = ''
pub, hobo_cmd = setuptest
pub.cfg['branding'] = {'theme': 'django'}
service = [x for x in HOBO_JSON.get('services', []) if x.get('service-id') == 'wcs'][0]
@ -336,17 +337,17 @@ def test_update_themes(setuptest):
hobo_cmd.update_configuration(service, pub)
assert pub.cfg['branding']['theme'] == 'django'
hobo_cmd.THEMES_DIRECTORY = os.path.join(os.path.dirname(__file__), 'themes')
settings.THEMES_DIRECTORY = os.path.join(os.path.dirname(__file__), 'themes')
hobo_cmd.update_configuration(service, pub)
assert pub.cfg['branding']['theme'] == 'publik-base'
assert os.readlink(os.path.join(pub.app_dir, 'static')) == os.path.join(
hobo_cmd.THEMES_DIRECTORY, 'foobar/static'
settings.THEMES_DIRECTORY, 'foobar/static'
)
assert os.readlink(os.path.join(pub.app_dir, 'templates')) == os.path.join(
hobo_cmd.THEMES_DIRECTORY, 'foobar/templates'
settings.THEMES_DIRECTORY, 'foobar/templates'
)
assert os.readlink(os.path.join(pub.app_dir, 'theme')) == os.path.join(
hobo_cmd.THEMES_DIRECTORY, 'publik-base'
settings.THEMES_DIRECTORY, 'publik-base'
)
service['variables']['theme'] = 'foobar2'
@ -354,7 +355,7 @@ def test_update_themes(setuptest):
assert not os.path.lexists(os.path.join(pub.app_dir, 'static'))
assert not os.path.lexists(os.path.join(pub.app_dir, 'templates'))
assert os.readlink(os.path.join(pub.app_dir, 'theme')) == os.path.join(
hobo_cmd.THEMES_DIRECTORY, 'foobar'
settings.THEMES_DIRECTORY, 'foobar'
)
@ -485,17 +486,12 @@ def test_configure_authentication_methods(setuptest, http_requests):
hobo_cmd.all_services = HOBO_JSON
def test_deploy(setuptest, alt_tempdir, deploy_setup):
dummy, hobo_cmd = setuptest
def test_deploy(setuptest, alt_tempdir, deploy_setup, settings):
settings.THEMES_DIRECTORY = os.path.join(os.path.dirname(__file__), 'themes')
cleanup_connection()
cleanup()
hobo_cmd = CmdCheckHobos()
base_options = {}
sub_options_class = collections.namedtuple('Options', ['ignore_timestamp', 'redeploy', 'extra'])
sub_options = sub_options_class(True, False, None)
sys.modules['publisher'] = sys.modules['wcs.publisher']
hobo_cmd.execute(
base_options, sub_options, ['http://wcs.example.net/', os.path.join(alt_tempdir, 'hobo.json')]
call_command(
'hobo_deploy', '--ignore-timestamp', 'http://wcs.example.net/', os.path.join(alt_tempdir, 'hobo.json')
)
assert os.path.exists(os.path.join(alt_tempdir, 'tenants', 'wcs.example.net'))
@ -508,17 +504,20 @@ def test_deploy(setuptest, alt_tempdir, deploy_setup):
del pub_cfg['language']
with open(os.path.join(alt_tempdir, 'tenants', 'wcs.example.net', 'config.pck'), 'wb') as fd:
pickle.dump(pub_cfg, fd)
hobo_cmd.execute(
base_options,
sub_options,
['http://wcs.example.net/', os.path.join(alt_tempdir, 'tenants', 'wcs.example.net', 'hobo.json')],
call_command(
'hobo_deploy',
'--ignore-timestamp',
'http://wcs.example.net/',
os.path.join(alt_tempdir, 'tenants', 'wcs.example.net', 'hobo.json'),
)
with open(os.path.join(alt_tempdir, 'tenants', 'wcs.example.net', 'config.pck'), 'rb') as fd:
pub_cfg = pickle.load(fd)
assert pub_cfg['language'] == {'language': 'fr'}
def test_configure_postgresql(setuptest, alt_tempdir, deploy_setup):
def test_configure_postgresql(setuptest, alt_tempdir, deploy_setup, settings):
settings.THEMES_DIRECTORY = os.path.join(os.path.dirname(__file__), 'themes')
pub, hobo_cmd = setuptest
cleanup_connection()
cleanup()
with open(os.path.join(alt_tempdir, 'hobo.json'), 'w') as fd:
@ -528,20 +527,15 @@ def test_configure_postgresql(setuptest, alt_tempdir, deploy_setup):
service = [x for x in HOBO_JSON.get('services', []) if x.get('service-id') == 'wcs'][0]
hobo_cmd = CmdCheckHobos()
base_options = collections.namedtuple('Options', ['configfile'])(configfile=None)
sub_options_class = collections.namedtuple('Options', ['ignore_timestamp', 'redeploy', 'extra'])
sub_options = sub_options_class(True, False, None)
sys.modules['publisher'] = sys.modules['wcs.publisher']
hobo_cmd.execute(
base_options, sub_options, ['http://wcs.example.net/', os.path.join(alt_tempdir, 'hobo.json')]
call_command(
'hobo_deploy', '--ignore-timestamp', 'http://wcs.example.net/', os.path.join(alt_tempdir, 'hobo.json')
)
assert os.path.exists(os.path.join(alt_tempdir, 'tenants', 'wcs.example.net'))
cleanup_connection()
cleanup()
pub = WcsPublisher.create_publisher(register_tld_names=False)
pub = CompatWcsPublisher.create_publisher(register_tld_names=False)
pub.app_dir = os.path.join(alt_tempdir, 'tenants', 'wcs.example.net')
pub.cfg['postgresql'] = {
'createdb-connection-params': {'user': 'test', 'database': 'postgres'},
@ -595,3 +589,38 @@ def test_configure_postgresql(setuptest, alt_tempdir, deploy_setup):
'application_name': 'wcs',
}
assert pub.initialize_sql.call_count == 3
def test_redeploy(setuptest, alt_tempdir, deploy_setup, settings):
settings.THEMES_DIRECTORY = os.path.join(os.path.dirname(__file__), 'themes')
cleanup_connection()
cleanup()
call_command(
'hobo_deploy', '--ignore-timestamp', 'http://wcs.example.net/', os.path.join(alt_tempdir, 'hobo.json')
)
assert os.path.exists(os.path.join(alt_tempdir, 'tenants', 'wcs.example.net'))
with open(os.path.join(alt_tempdir, 'hobo2.json'), 'w') as fd:
hobo_json = copy.deepcopy(HOBO_JSON)
del hobo_json['services'][1] # authentic
hobo_json['services'][1]['saml-sp-metadata-url'] = 'http://wcs2.example.net/saml/metadata'
hobo_json['services'][1]['base_url'] = 'http://wcs2.example.net/'
hobo_json['services'][1]['backoffice-menu-url'] = 'http://wcs2.example.net/backoffice/menu.json'
hobo_json['services'][1]['slug'] = 'test-wcs2'
fd.write(json.dumps(hobo_json))
call_command(
'hobo_deploy',
'--ignore-timestamp',
'http://wcs2.example.net/',
os.path.join(alt_tempdir, 'hobo2.json'),
)
assert os.path.exists(os.path.join(alt_tempdir, 'tenants', 'wcs2.example.net'))
with mock.patch('wcs.ctl.management.commands.hobo_deploy.Command.deploy') as deploy:
call_command('hobo_deploy', '--redeploy')
assert deploy.call_count == 2
assert {x[1]['base_url'] for x in deploy.call_args_list} == {
'http://wcs.example.net',
'http://wcs2.example.net',
}

View File

@ -1,10 +1,14 @@
import json
import os
import shutil
import tempfile
import uuid
import pytest
from django.core.management import call_command
from wcs.api_utils import sign_url
from wcs.ctl.hobo_notify import CmdHoboNotify
from wcs.ctl.management.commands.hobo_notify import Command as HoboNotifyCommand
from wcs.qommon import force_str
from wcs.qommon.afterjobs import AfterJob
from wcs.qommon.http_request import HTTPRequest
@ -13,6 +17,13 @@ from wcs.sql_criterias import NotNull
from .utilities import create_temporary_pub, get_app
@pytest.fixture
def alt_tempdir():
alt_tempdir = tempfile.mkdtemp()
yield alt_tempdir
shutil.rmtree(alt_tempdir)
@pytest.fixture
def pub(request):
pub = create_temporary_pub()
@ -74,7 +85,7 @@ def test_process_notification_role_wrong_audience(pub):
assert pub.role_class.select()[0].emails is None
assert pub.role_class.select()[0].emails_to_members is False
assert pub.role_class.select()[0].allows_backoffice_access is False
CmdHoboNotify.process_notification(notification)
HoboNotifyCommand().process_notification(notification)
assert pub.role_class.count() == 1
assert pub.role_class.select()[0].name == 'Service étt civil'
assert pub.role_class.select()[0].slug == 'service-ett-civil'
@ -121,7 +132,7 @@ def test_process_notification_role(pub):
assert pub.role_class.select()[0].emails_to_members is False
assert pub.role_class.select()[0].allows_backoffice_access is False
existing_role_id = pub.role_class.select()[0].id
CmdHoboNotify.process_notification(notification)
HoboNotifyCommand().process_notification(notification)
assert pub.role_class.count() == 2
old_role = pub.role_class.get(existing_role_id)
assert old_role.name == 'Service état civil'
@ -159,7 +170,7 @@ def test_process_notification_role(pub):
],
},
}
CmdHoboNotify.process_notification(notification)
HoboNotifyCommand().process_notification(notification)
assert pub.role_class.count() == 1
assert pub.role_class.select()[0].id == new_role.id
assert pub.role_class.select()[0].uuid == uuid1
@ -174,7 +185,7 @@ def test_process_notification_role(pub):
role.allows_backoffice_access = True
role.store()
CmdHoboNotify.process_notification(notification)
HoboNotifyCommand().process_notification(notification)
assert pub.role_class.count() == 1
pub.role_class.select()[0].refresh_from_storage()
assert pub.role_class.select()[0].name == 'Service enfance'
@ -203,7 +214,7 @@ def test_process_notification_internal_role(pub):
],
},
}
CmdHoboNotify.process_notification(notification)
HoboNotifyCommand().process_notification(notification)
assert pub.role_class.count() == 1
role = pub.role_class.select()[0]
assert role.is_internal()
@ -247,7 +258,7 @@ def test_process_notification_role_description(pub):
assert pub.role_class.select()[0].emails is None
assert pub.role_class.select()[0].emails_to_members is False
existing_role_id = pub.role_class.select()[0].id
CmdHoboNotify.process_notification(notification)
HoboNotifyCommand().process_notification(notification)
assert pub.role_class.count() == 2
old_role = pub.role_class.get(existing_role_id)
assert old_role.name == 'Service état civil'
@ -282,7 +293,7 @@ def test_process_notification_role_description(pub):
],
},
}
CmdHoboNotify.process_notification(notification)
HoboNotifyCommand().process_notification(notification)
assert pub.role_class.count() == 1
assert pub.role_class.select()[0].id == new_role.id
assert pub.role_class.select()[0].name == 'Service enfance'
@ -321,7 +332,7 @@ def test_process_notification_role_deprovision(pub):
role.store()
assert pub.role_class.count() == 2
CmdHoboNotify.process_notification(notification)
HoboNotifyCommand().process_notification(notification)
assert pub.role_class.count() == 1
assert pub.role_class.select()[0].slug == 'bar'
@ -329,7 +340,7 @@ def test_process_notification_role_deprovision(pub):
r.uuid = uuid1
r.store()
assert pub.role_class.count() == 2
CmdHoboNotify.process_notification(notification)
HoboNotifyCommand().process_notification(notification)
assert pub.role_class.count() == 1
assert pub.role_class.select()[0].slug == 'bar'
@ -465,11 +476,11 @@ def test_process_notification_user_provision(pub):
User = pub.user_class
# create some roles
from wcs.ctl.check_hobos import CmdCheckHobos
from wcs.ctl.management.commands.hobo_deploy import Command
# setup an hobo profile
assert 'users' not in pub.cfg
CmdCheckHobos().update_profile(PROFILE, pub)
Command().update_profile(PROFILE, pub)
assert pub.cfg['users']['field_phone'] == '_phone'
uuid1 = str(uuid.uuid4())
@ -507,7 +518,7 @@ def test_process_notification_user_provision(pub):
assert pub.role_class.select()[0].emails is None
assert pub.role_class.select()[0].emails_to_members is False
existing_role_id = pub.role_class.select()[0].id
CmdHoboNotify.process_notification(notification)
HoboNotifyCommand().process_notification(notification)
assert pub.role_class.count() == 2
old_role = pub.role_class.get(existing_role_id)
assert old_role.name == 'Service état civil'
@ -556,7 +567,7 @@ def test_process_notification_user_provision(pub):
],
},
}
CmdHoboNotify.process_notification(notification)
HoboNotifyCommand().process_notification(notification)
assert User.count() == 1
user = User.select()[0]
assert user.form_data is not None
@ -604,7 +615,7 @@ def test_process_notification_user_provision(pub):
],
},
}
CmdHoboNotify.process_notification(notification)
HoboNotifyCommand().process_notification(notification)
assert User.count() == 1
user = User.select()[0]
assert user.form_data is not None
@ -646,7 +657,7 @@ def test_process_notification_user_provision(pub):
],
},
}
CmdHoboNotify.process_notification(notification)
HoboNotifyCommand().process_notification(notification)
assert User.count() == 1
if birthdate not in (None, ''): # wrong value : no nothing
assert User.select()[0].form_data['_birthdate'].tm_year == 2000
@ -686,7 +697,7 @@ def test_process_notification_user_provision(pub):
],
},
}
CmdHoboNotify.process_notification(notification)
HoboNotifyCommand().process_notification(notification)
assert User.count() == 1
assert User.select()[0].first_name == 'John'
assert not User.select()[0].email
@ -725,7 +736,7 @@ def test_process_notification_user_provision(pub):
],
},
}
CmdHoboNotify.process_notification(notification)
HoboNotifyCommand().process_notification(notification)
assert User.count() == 1
assert User.select()[0].first_name == 'John'
assert not User.select()[0].phone
@ -740,11 +751,11 @@ def test_process_notification_user_with_errors(pub):
User = pub.user_class
# setup an hobo profile
from wcs.ctl.check_hobos import CmdCheckHobos
from wcs.ctl.management.commands.hobo_deploy import Command
User.wipe()
pub.role_class.wipe()
CmdCheckHobos().update_profile(PROFILE, pub)
Command().update_profile(PROFILE, pub)
notification = {
'@type': 'provision',
@ -773,7 +784,7 @@ def test_process_notification_user_with_errors(pub):
},
}
with pytest.raises(NotImplementedError) as e:
CmdHoboNotify.process_notification(notification)
HoboNotifyCommand().process_notification(notification)
assert e.value.args == ('full is not supported for users',)
assert User.count() == 0
@ -785,7 +796,7 @@ def test_process_notification_user_with_errors(pub):
backup = notification['objects']['data'][0][key]
del notification['objects']['data'][0][key]
with pytest.raises(Exception) as e:
CmdHoboNotify.process_notification(notification)
HoboNotifyCommand().process_notification(notification)
assert e.type == ValueError
assert e.value.args == ('invalid user',)
assert User.count() == 0
@ -794,7 +805,7 @@ def test_process_notification_user_with_errors(pub):
notification['@type'] = 'deprovision'
del notification['objects']['data'][0]['uuid']
with pytest.raises(Exception) as e:
CmdHoboNotify.process_notification(notification)
HoboNotifyCommand().process_notification(notification)
assert e.type == KeyError
assert e.value.args == ('user without uuid',)
@ -822,14 +833,14 @@ def test_process_notification_role_with_errors(pub):
},
}
with pytest.raises(KeyError) as e:
CmdHoboNotify.process_notification(notification)
HoboNotifyCommand().process_notification(notification)
assert e.value.args == ('role without uuid',)
assert pub.role_class.count() == 0
notification['objects']['data'][0]['uuid'] = '12345'
del notification['objects']['data'][0]['name']
with pytest.raises(ValueError) as e:
CmdHoboNotify.process_notification(notification)
HoboNotifyCommand().process_notification(notification)
assert e.value.args == ('invalid role',)
assert pub.role_class.count() == 0
@ -838,11 +849,11 @@ def test_process_user_deprovision(pub):
User = pub.user_class
# setup an hobo profile
from wcs.ctl.check_hobos import CmdCheckHobos
from wcs.ctl.management.commands.hobo_deploy import Command
User.wipe()
pub.role_class.wipe()
CmdCheckHobos().update_profile(PROFILE, pub)
Command().update_profile(PROFILE, pub)
user = User()
user.name = 'Pierre'
@ -865,7 +876,7 @@ def test_process_user_deprovision(pub):
assert User.count() == 1
assert len(User.select([NotNull('deleted_timestamp')])) == 0
CmdHoboNotify.process_notification(notification)
HoboNotifyCommand().process_notification(notification)
assert User.count() == 1
assert len(User.select([NotNull('deleted_timestamp')])) == 1
@ -876,11 +887,11 @@ def test_process_user_deprovision_with_data(pub):
User = pub.user_class
# setup an hobo profile
from wcs.ctl.check_hobos import CmdCheckHobos
from wcs.ctl.management.commands.hobo_deploy import Command
User.wipe()
FormDef.wipe()
CmdCheckHobos().update_profile(PROFILE, pub)
Command().update_profile(PROFILE, pub)
user = User()
user.name = 'Pierre'
@ -914,7 +925,7 @@ def test_process_user_deprovision_with_data(pub):
assert User.count() == 1
assert len(User.select([NotNull('deleted_timestamp')])) == 0
CmdHoboNotify.process_notification(notification)
HoboNotifyCommand().process_notification(notification)
assert User.count() == 1
assert len(User.select([NotNull('deleted_timestamp')])) == 1
@ -956,3 +967,32 @@ def test_provision_http_endpoint(pub):
get_app(pub).post_json(sign_url('/__provision__/?orig=coucou&sync=1', '1234'), notification)
assert AfterJob.count() == 0 # sync
assert pub.user_class.count() == 1
def test_hobo_notify_call_command(pub, alt_tempdir):
uuid1 = str(uuid.uuid4())
with open(os.path.join(alt_tempdir, 'message.json'), 'w') as fd:
json.dump(
{
'@type': 'deprovision',
'audience': ['test'],
'full': False,
'objects': {
'@type': 'role',
'data': [
{
'@type': 'role',
'uuid': uuid1,
},
],
},
},
fd,
)
pub.role_class.wipe()
role = pub.role_class('foo')
role.id = uuid1
role.store()
call_command('hobo_notify', os.path.join(alt_tempdir, 'message.json'))
assert pub.role_class.count() == 0

View File

@ -1,4 +1,3 @@
import collections
import copy
import json
import os
@ -9,10 +8,11 @@ import zipfile
import psycopg2
import pytest
from django.core.management import call_command
from quixote import cleanup
from wcs.ctl.check_hobos import CmdCheckHobos
from wcs.publisher import WcsPublisher
from wcs.compat import CompatWcsPublisher
from wcs.ctl.management.commands.hobo_deploy import Command as HoboDeployCommand
from wcs.sql import cleanup_connection
from .utilities import clean_temporary_pub, create_temporary_pub
@ -97,11 +97,11 @@ def setuptest():
pub = create_temporary_pub()
pub.cfg['language'] = {'language': 'en'}
hobo_cmd = CmdCheckHobos()
hobo_cmd = HoboDeployCommand()
hobo_cmd.all_services = HOBO_JSON
WcsPublisher.APP_DIR = tempfile.mkdtemp()
CompatWcsPublisher.APP_DIR = tempfile.mkdtemp()
skeleton_dir = os.path.join(WcsPublisher.APP_DIR, 'skeletons')
skeleton_dir = os.path.join(CompatWcsPublisher.APP_DIR, 'skeletons')
os.mkdir(skeleton_dir)
with open(os.path.join(skeleton_dir, 'publik.zip'), 'wb') as f:
with zipfile.ZipFile(f, 'w') as z:
@ -111,8 +111,8 @@ def setuptest():
yield pub, hobo_cmd
clean_temporary_pub()
if os.path.exists(WcsPublisher.APP_DIR):
shutil.rmtree(WcsPublisher.APP_DIR)
if os.path.exists(CompatWcsPublisher.APP_DIR):
shutil.rmtree(CompatWcsPublisher.APP_DIR)
cleanup_connection()
for dbname in (WCS_DB_NAME, NEW_WCS_DB_NAME):
cursor.execute('DROP DATABASE IF EXISTS %s' % dbname)
@ -134,29 +134,25 @@ def database_exists(database):
def test_deploy(setuptest):
_, hobo_cmd = setuptest
assert not os.path.exists(os.path.join(WcsPublisher.APP_DIR, 'tenants', WCS_TENANT))
assert not os.path.exists(os.path.join(CompatWcsPublisher.APP_DIR, 'tenants', WCS_TENANT))
assert not database_exists(WCS_DB_NAME)
assert not os.path.exists(os.path.join(WcsPublisher.APP_DIR, 'tenants', NEW_WCS_TENANT))
assert not os.path.exists(os.path.join(CompatWcsPublisher.APP_DIR, 'tenants', NEW_WCS_TENANT))
assert not database_exists(NEW_WCS_DB_NAME)
cleanup()
with open(os.path.join(WcsPublisher.APP_DIR, 'hobo.json'), 'w') as fd:
with open(os.path.join(CompatWcsPublisher.APP_DIR, 'hobo.json'), 'w') as fd:
fd.write(json.dumps(HOBO_JSON))
hobo_cmd = CmdCheckHobos()
base_options = {}
sub_options_class = collections.namedtuple('Options', ['ignore_timestamp', 'redeploy', 'extra'])
sub_options = sub_options_class(True, False, None)
hobo_cmd.execute(
base_options,
sub_options,
['http://%s/' % WCS_TENANT, os.path.join(WcsPublisher.APP_DIR, 'hobo.json')],
call_command(
'hobo_deploy',
'--ignore-timestamp',
'http://%s/' % WCS_TENANT,
os.path.join(CompatWcsPublisher.APP_DIR, 'hobo.json'),
)
assert os.path.exists(os.path.join(WcsPublisher.APP_DIR, 'tenants', WCS_TENANT))
assert os.path.exists(os.path.join(CompatWcsPublisher.APP_DIR, 'tenants', WCS_TENANT))
assert database_exists(WCS_DB_NAME)
# deploy a new tenant
with open(os.path.join(WcsPublisher.APP_DIR, 'hobo.json'), 'w') as fd:
with open(os.path.join(CompatWcsPublisher.APP_DIR, 'hobo.json'), 'w') as fd:
hobo_json = copy.deepcopy(HOBO_JSON)
wcs_service = hobo_json['services'][2]
wcs_service['saml-sp-metadata-url'] = ('http://%s/saml/metadata' % WCS_TENANT,)
@ -164,41 +160,38 @@ def test_deploy(setuptest):
wcs_service['backoffice-menu-url'] = 'http://%s/backoffice/menu.json' % NEW_WCS_TENANT
fd.write(json.dumps(hobo_json))
hobo_cmd.execute(
base_options,
sub_options,
['http://%s/' % NEW_WCS_TENANT, os.path.join(WcsPublisher.APP_DIR, 'hobo.json')],
call_command(
'hobo_deploy',
'--ignore-timestamp',
'http://%s/' % NEW_WCS_TENANT,
os.path.join(CompatWcsPublisher.APP_DIR, 'hobo.json'),
)
assert os.path.exists(os.path.join(WcsPublisher.APP_DIR, 'tenants', WCS_TENANT))
assert os.path.exists(os.path.join(CompatWcsPublisher.APP_DIR, 'tenants', WCS_TENANT))
assert database_exists(WCS_DB_NAME)
assert os.path.exists(os.path.join(WcsPublisher.APP_DIR, 'tenants', NEW_WCS_TENANT))
assert os.path.exists(os.path.join(CompatWcsPublisher.APP_DIR, 'tenants', NEW_WCS_TENANT))
assert database_exists(NEW_WCS_DB_NAME)
def test_deploy_url_change(setuptest):
_, hobo_cmd = setuptest
assert not os.path.exists(os.path.join(WcsPublisher.APP_DIR, 'tenants', WCS_TENANT))
assert not os.path.exists(os.path.join(CompatWcsPublisher.APP_DIR, 'tenants', WCS_TENANT))
assert not database_exists(WCS_DB_NAME)
assert not os.path.exists(os.path.join(WcsPublisher.APP_DIR, 'tenants', NEW_WCS_TENANT))
assert not os.path.exists(os.path.join(CompatWcsPublisher.APP_DIR, 'tenants', NEW_WCS_TENANT))
assert not database_exists(NEW_WCS_DB_NAME)
cleanup()
with open(os.path.join(WcsPublisher.APP_DIR, 'hobo.json'), 'w') as fd:
with open(os.path.join(CompatWcsPublisher.APP_DIR, 'hobo.json'), 'w') as fd:
fd.write(json.dumps(HOBO_JSON))
hobo_cmd = CmdCheckHobos()
base_options = {}
sub_options_class = collections.namedtuple('Options', ['ignore_timestamp', 'redeploy', 'extra'])
sub_options = sub_options_class(True, False, None)
hobo_cmd.execute(
base_options,
sub_options,
['http://%s/' % WCS_TENANT, os.path.join(WcsPublisher.APP_DIR, 'hobo.json')],
call_command(
'hobo_deploy',
'--ignore-timestamp',
'http://%s/' % WCS_TENANT,
os.path.join(CompatWcsPublisher.APP_DIR, 'hobo.json'),
)
assert os.path.exists(os.path.join(WcsPublisher.APP_DIR, 'tenants', WCS_TENANT))
assert os.path.exists(os.path.join(CompatWcsPublisher.APP_DIR, 'tenants', WCS_TENANT))
assert database_exists(WCS_DB_NAME)
# domain change request
with open(os.path.join(WcsPublisher.APP_DIR, 'hobo.json'), 'w') as fd:
with open(os.path.join(CompatWcsPublisher.APP_DIR, 'hobo.json'), 'w') as fd:
hobo_json = copy.deepcopy(HOBO_JSON)
wcs_service = hobo_json['services'][2]
wcs_service['legacy_urls'] = [
@ -214,17 +207,18 @@ def test_deploy_url_change(setuptest):
wcs_service['backoffice-menu-url'] = 'http://%s/backoffice/menu.json' % NEW_WCS_TENANT
fd.write(json.dumps(hobo_json))
hobo_cmd.execute(
base_options,
sub_options,
['http://%s/' % NEW_WCS_TENANT, os.path.join(WcsPublisher.APP_DIR, 'hobo.json')],
call_command(
'hobo_deploy',
'--ignore-timestamp',
'http://%s/' % NEW_WCS_TENANT,
os.path.join(CompatWcsPublisher.APP_DIR, 'hobo.json'),
)
assert not os.path.exists(os.path.join(WcsPublisher.APP_DIR, 'tenants', WCS_TENANT))
assert not os.path.exists(os.path.join(CompatWcsPublisher.APP_DIR, 'tenants', WCS_TENANT))
assert database_exists(WCS_DB_NAME)
assert os.path.exists(os.path.join(WcsPublisher.APP_DIR, 'tenants', NEW_WCS_TENANT))
assert os.path.exists(os.path.join(CompatWcsPublisher.APP_DIR, 'tenants', NEW_WCS_TENANT))
assert not database_exists(NEW_WCS_DB_NAME)
publisher = WcsPublisher.create_publisher()
publisher = CompatWcsPublisher.create_publisher()
publisher.set_tenant_by_hostname(NEW_WCS_TENANT)
# check that WCS_DB_NAME is used by NEW_WCS_TENANT
assert publisher.cfg['postgresql']['database'] == WCS_DB_NAME
@ -233,35 +227,31 @@ def test_deploy_url_change(setuptest):
def test_deploy_url_change_old_tenant_dir(setuptest):
_, hobo_cmd = setuptest
assert not os.path.exists(os.path.join(WcsPublisher.APP_DIR, 'tenants', WCS_TENANT))
assert not os.path.exists(os.path.join(CompatWcsPublisher.APP_DIR, 'tenants', WCS_TENANT))
assert not database_exists(WCS_DB_NAME)
assert not os.path.exists(os.path.join(WcsPublisher.APP_DIR, 'tenants', NEW_WCS_TENANT))
assert not os.path.exists(os.path.join(CompatWcsPublisher.APP_DIR, 'tenants', NEW_WCS_TENANT))
assert not database_exists(NEW_WCS_DB_NAME)
cleanup()
with open(os.path.join(WcsPublisher.APP_DIR, 'hobo.json'), 'w') as fd:
with open(os.path.join(CompatWcsPublisher.APP_DIR, 'hobo.json'), 'w') as fd:
fd.write(json.dumps(HOBO_JSON))
hobo_cmd = CmdCheckHobos()
base_options = {}
sub_options_class = collections.namedtuple('Options', ['ignore_timestamp', 'redeploy', 'extra'])
sub_options = sub_options_class(True, False, None)
hobo_cmd.execute(
base_options,
sub_options,
['http://%s/' % WCS_TENANT, os.path.join(WcsPublisher.APP_DIR, 'hobo.json')],
call_command(
'hobo_deploy',
'--ignore-timestamp',
'http://%s/' % WCS_TENANT,
os.path.join(CompatWcsPublisher.APP_DIR, 'hobo.json'),
)
assert os.path.exists(os.path.join(WcsPublisher.APP_DIR, 'tenants', WCS_TENANT))
assert os.path.exists(os.path.join(CompatWcsPublisher.APP_DIR, 'tenants', WCS_TENANT))
assert database_exists(WCS_DB_NAME)
# move tenant to APP_DIR (legacy way)
os.replace(
os.path.join(WcsPublisher.APP_DIR, 'tenants', WCS_TENANT),
os.path.join(WcsPublisher.APP_DIR, WCS_TENANT),
os.path.join(CompatWcsPublisher.APP_DIR, 'tenants', WCS_TENANT),
os.path.join(CompatWcsPublisher.APP_DIR, WCS_TENANT),
)
assert not os.path.exists(os.path.join(WcsPublisher.APP_DIR, 'tenants', WCS_TENANT))
assert not os.path.exists(os.path.join(CompatWcsPublisher.APP_DIR, 'tenants', WCS_TENANT))
# domain change request
with open(os.path.join(WcsPublisher.APP_DIR, 'hobo.json'), 'w') as fd:
with open(os.path.join(CompatWcsPublisher.APP_DIR, 'hobo.json'), 'w') as fd:
hobo_json = copy.deepcopy(HOBO_JSON)
wcs_service = hobo_json['services'][2]
wcs_service['legacy_urls'] = [
@ -277,17 +267,18 @@ def test_deploy_url_change_old_tenant_dir(setuptest):
wcs_service['backoffice-menu-url'] = 'http://%s/backoffice/menu.json' % NEW_WCS_TENANT
fd.write(json.dumps(hobo_json))
hobo_cmd.execute(
base_options,
sub_options,
['http://%s/' % NEW_WCS_TENANT, os.path.join(WcsPublisher.APP_DIR, 'hobo.json')],
call_command(
'hobo_deploy',
'--ignore-timestamp',
'http://%s/' % NEW_WCS_TENANT,
os.path.join(CompatWcsPublisher.APP_DIR, 'hobo.json'),
)
assert not os.path.exists(os.path.join(WcsPublisher.APP_DIR, WCS_TENANT))
assert not os.path.exists(os.path.join(CompatWcsPublisher.APP_DIR, WCS_TENANT))
assert database_exists(WCS_DB_NAME)
assert os.path.exists(os.path.join(WcsPublisher.APP_DIR, 'tenants', NEW_WCS_TENANT))
assert os.path.exists(os.path.join(CompatWcsPublisher.APP_DIR, 'tenants', NEW_WCS_TENANT))
assert not database_exists(NEW_WCS_DB_NAME)
publisher = WcsPublisher.create_publisher()
publisher = CompatWcsPublisher.create_publisher()
publisher.set_tenant_by_hostname(NEW_WCS_TENANT)
# check that WCS_DB_NAME is used by NEW_WCS_TENANT
assert publisher.cfg['postgresql']['database'] == WCS_DB_NAME

View File

@ -279,9 +279,9 @@ def test_assertion_consumer_unspecified(pub):
def test_assertion_consumer_existing_federation(pub, caplog):
# setup an hobo profile
from wcs.ctl.check_hobos import CmdCheckHobos
from wcs.ctl.management.commands.hobo_deploy import Command as CmdHoboDeploy
CmdCheckHobos().update_profile(PROFILE, pub)
CmdHoboDeploy().update_profile(PROFILE, pub)
pub.set_config()

View File

@ -34,7 +34,7 @@ from wcs.api_utils import get_query_flag, get_user_from_api_query_string, is_url
from wcs.carddef import CardDef
from wcs.categories import Category
from wcs.conditions import Condition, ValidationError
from wcs.ctl.hobo_notify import CmdHoboNotify
from wcs.ctl.management.commands.hobo_notify import Command as CmdHoboNotify
from wcs.data_sources import NamedDataSource
from wcs.data_sources import get_object as get_data_source_object
from wcs.data_sources import request_json_items

View File

@ -1,5 +1,5 @@
# w.c.s. - web application for online forms
# Copyright (C) 2005-2014 Entr'ouvert
# Copyright (C) 2005-2023 Entr'ouvert
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@ -24,21 +24,17 @@ import tempfile
import urllib.parse
import phonenumbers
from django.conf import settings
from django.utils.encoding import force_bytes
from quixote import cleanup
from wcs.admin.settings import UserFieldsFormDef
from wcs.fields import DateField, EmailField, StringField
from wcs.qommon import force_str
from wcs.qommon import force_str, misc
from wcs.qommon.publisher import UnknownTenantError, get_publisher_class
from wcs.qommon.storage import atomic_write
from ..qommon import misc
from ..qommon.ctl import Command, make_option
from ..qommon.publisher import UnknownTenantError
from ..qommon.storage import atomic_write
class NoChange(Exception):
pass
from . import TenantCommand
def atomic_symlink(src, dst):
@ -50,30 +46,21 @@ def atomic_symlink(src, dst):
os.rename(dst + '.tmp', dst)
class CmdCheckHobos(Command):
name = 'hobo_deploy'
class NoChange(Exception):
pass
# TODO: import this from django settings
THEMES_DIRECTORY = os.environ.get('THEMES_DIRECTORY', '/usr/share/publik/themes')
def __init__(self):
Command.__init__(
self,
[
make_option(
'--ignore-timestamp', action='store_true', dest='ignore_timestamp', default=False
),
make_option('--redeploy', action='store_true', default=False),
],
)
class Command(TenantCommand):
def add_arguments(self, parser):
parser.add_argument('base_url', metavar='BASE_URL', nargs='?', type=str)
parser.add_argument('json_filename', metavar='JSON_FILENAME', nargs='?', type=str)
parser.add_argument('--ignore-timestamp', dest='ignore_timestamp', action='store_true', default=False)
parser.add_argument('--redeploy', action='store_true', default=False)
def execute(self, base_options, sub_options, args):
from .. import publisher
publisher.WcsPublisher.configure(self.config)
if sub_options.redeploy:
sub_options.ignore_timestamp = True
for tenant in publisher.WcsPublisher.get_tenants():
def handle(self, **options):
if options.get('redeploy'):
options['ignore_timestamp'] = True
for tenant in get_publisher_class().get_tenants():
hobo_json_path = os.path.join(tenant.directory, 'hobo.json')
if not os.path.exists(hobo_json_path):
continue
@ -85,26 +72,25 @@ class CmdCheckHobos(Command):
pass
else:
cleanup()
self.deploy(base_options, sub_options, [me['base_url'], hobo_json_path])
options['base_url'] = me['base_url']
options['json_filename'] = hobo_json_path
self.init_tenant_publisher(tenant.hostname, register_tld_names=False)
self.deploy(**options)
else:
self.deploy(base_options, sub_options, args)
self.deploy(**options)
def deploy(self, base_options, sub_options, args):
from .. import publisher
self.base_options = base_options
publisher.WcsPublisher.configure(self.config)
pub = publisher.WcsPublisher.create_publisher(register_tld_names=False)
def deploy(self, **options):
pub = get_publisher_class().create_publisher(register_tld_names=False)
global_app_dir = pub.app_dir
global_tenants_dir = os.path.join(global_app_dir, 'tenants')
base_url = args[0]
base_url = options.get('base_url')
if args[1] == '-':
if options.get('json_filename') == '-':
# get environment definition from stdin
self.all_services = json.load(sys.stdin)
else:
with open(args[1]) as fd:
with open(options.get('json_filename')) as fd:
self.all_services = json.load(fd)
try:
@ -173,7 +159,7 @@ class CmdCheckHobos(Command):
print('updating instance in', pub.app_dir)
try:
self.configure_site_options(service, pub, ignore_timestamp=sub_options.ignore_timestamp)
self.configure_site_options(service, pub, ignore_timestamp=options.get('ignore_timestamp'))
except NoChange:
print(' skipping')
return
@ -208,10 +194,10 @@ class CmdCheckHobos(Command):
theme_id = variables.get('theme')
theme_data = None
if theme_id and os.path.exists(self.THEMES_DIRECTORY):
for theme_module in os.listdir(self.THEMES_DIRECTORY):
if theme_id and os.path.exists(settings.THEMES_DIRECTORY):
for theme_module in os.listdir(settings.THEMES_DIRECTORY):
try:
with open(os.path.join(self.THEMES_DIRECTORY, theme_module, 'themes.json')) as fd:
with open(os.path.join(settings.THEMES_DIRECTORY, theme_module, 'themes.json')) as fd:
themes_json = json.load(fd)
except OSError:
continue
@ -233,7 +219,7 @@ class CmdCheckHobos(Command):
)
tenant_dir = pub.app_dir
theme_dir = os.path.join(tenant_dir, 'theme')
target_dir = os.path.join(self.THEMES_DIRECTORY, theme_data['module'])
target_dir = os.path.join(settings.THEMES_DIRECTORY, theme_data['module'])
atomic_symlink(target_dir, theme_dir)
for component in ('static', 'templates'):
component_dir = os.path.join(tenant_dir, component)
@ -249,7 +235,8 @@ class CmdCheckHobos(Command):
pass
else:
atomic_symlink(
os.path.join(self.THEMES_DIRECTORY, theme_data['overlay'], component), component_dir
os.path.join(settings.THEMES_DIRECTORY, theme_data['overlay'], component),
component_dir,
)
if variables.get('default_from_email'):
@ -361,7 +348,7 @@ class CmdCheckHobos(Command):
# initialize service provider side
if not pub.cfg['sp'].get('publickey') or force_spconfig:
from ..qommon.ident.idp import MethodAdminDirectory
from wcs.qommon.ident.idp import MethodAdminDirectory
spconfig = pub.cfg['sp']
spconfig['saml2_base_url'] = str(service.get('base_url')) + '/saml'
@ -403,7 +390,7 @@ class CmdCheckHobos(Command):
metadata_pathname = tempfile.mkstemp('.metadata')[1]
atomic_write(metadata_pathname, force_bytes(s))
from ..qommon.ident.idp import AdminIDPDir
from wcs.qommon.ident.idp import AdminIDPDir
admin_dir = AdminIDPDir()
key_provider_id = admin_dir.submit_new_remote(metadata_pathname, metadata_url)
@ -672,6 +659,3 @@ class CmdCheckHobos(Command):
secret2 = hashlib.sha256(force_bytes(secret2)).hexdigest()
# rstrip('L') for py2/3 compatibility, as py2 formats number as 0x...L, and py3 as 0x...
return hex(int(secret1, 16) ^ int(secret2, 16))[2:].rstrip('L')
CmdCheckHobos.register()

View File

@ -1,5 +1,5 @@
# w.c.s. - web application for online forms
# Copyright (C) 2005-2014 Entr'ouvert
# Copyright (C) 2005-2023 Entr'ouvert
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@ -15,7 +15,6 @@
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import json
import os
import sys
from quixote import get_publisher
@ -23,37 +22,32 @@ from quixote import get_publisher
from wcs.admin.settings import UserFieldsFormDef
from wcs.qommon import force_str
from wcs.qommon.misc import json_encode_helper
from wcs.qommon.publisher import get_cfg, get_publisher_class
from ..qommon.ctl import Command
from ..qommon.publisher import get_cfg
from . import TenantCommand
class CmdHoboNotify(Command):
name = 'hobo_notify'
class Command(TenantCommand):
def add_arguments(self, parser):
parser.add_argument('notification', metavar='NOTIFICATION', type=str)
def execute(self, base_options, sub_options, args):
self.base_options = base_options
notification = self.load_notification(args)
def handle(self, **options):
notification = self.load_notification(options.get('notification'))
if not self.check_valid_notification(notification):
sys.exit(1)
from .. import publisher
publisher.WcsPublisher.configure(self.config)
pub = publisher.WcsPublisher.create_publisher(register_tld_names=False)
for tenant in publisher.WcsPublisher.get_tenants():
if not os.path.exists(os.path.join(tenant.directory, 'config.pck')):
continue
pub.set_tenant(tenant)
self.process_notification(notification, pub)
for tenant in get_publisher_class().get_tenants():
pub = get_publisher_class().create_publisher(register_tld_names=False)
pub.set_tenant_by_hostname(tenant.hostname)
self.process_notification(notification, publisher=pub)
@classmethod
def load_notification(cls, args):
if args[0] == '-':
def load_notification(cls, notification):
if notification == '-':
# get environment definition from stdin
return json.load(sys.stdin)
else:
with open(args[0]) as fd:
with open(notification) as fd:
return json.load(fd)
@classmethod
@ -198,6 +192,3 @@ class CmdHoboNotify(Command):
user.set_deleted()
except Exception as e:
publisher.record_error(exception=e, context='[PROVISIONNING]', notify=True)
CmdHoboNotify.register()