faire de hobo_* des commandes de management django (#20411) #755
|
@ -61,10 +61,10 @@ def setup_user_profile(pub):
|
|||
if not pub.cfg:
|
||||
pub.cfg = {}
|
||||
# create some roles
|
||||
from wcs.ctl.check_hobos import CmdCheckHobos
|
||||
from wcs.ctl.management.commands.hobo_deploy import Command
|
||||
|
||||
# setup an hobo profile
|
||||
CmdCheckHobos().update_profile(PROFILE, pub)
|
||||
Command().update_profile(PROFILE, pub)
|
||||
pub.cfg['users']['fullname_template'] = '{{ user_var_prenoms }} {{ user_var_nom }}'
|
||||
pub.user_class.wipe()
|
||||
pub.write_cfg()
|
||||
|
|
|
@ -1,21 +1,20 @@
|
|||
import collections
|
||||
import copy
|
||||
import json
|
||||
import os
|
||||
import pickle
|
||||
import shutil
|
||||
import sys
|
||||
import tempfile
|
||||
import urllib.parse
|
||||
import zipfile
|
||||
from unittest import mock
|
||||
|
||||
import pytest
|
||||
from django.core.management import call_command
|
||||
from quixote import cleanup
|
||||
|
||||
from wcs import fields, sql
|
||||
from wcs.ctl.check_hobos import CmdCheckHobos
|
||||
from wcs.publisher import WcsPublisher
|
||||
from wcs.compat import CompatWcsPublisher
|
||||
from wcs.ctl.management.commands.hobo_deploy import Command as HoboDeployCommand
|
||||
from wcs.qommon import force_str
|
||||
from wcs.sql import cleanup_connection
|
||||
|
||||
|
@ -229,17 +228,17 @@ HOBO_JSON = {
|
|||
|
||||
@pytest.fixture
|
||||
def setuptest():
|
||||
hobo_cmd = CmdCheckHobos()
|
||||
hobo_cmd = HoboDeployCommand()
|
||||
hobo_cmd.all_services = HOBO_JSON
|
||||
WcsPublisher.APP_DIR = tempfile.mkdtemp()
|
||||
CompatWcsPublisher.APP_DIR = tempfile.mkdtemp()
|
||||
pub = create_temporary_pub()
|
||||
pub.cfg['language'] = {'language': 'en'}
|
||||
|
||||
yield pub, hobo_cmd
|
||||
cleanup_connection()
|
||||
clean_temporary_pub()
|
||||
if os.path.exists(WcsPublisher.APP_DIR):
|
||||
shutil.rmtree(WcsPublisher.APP_DIR)
|
||||
if os.path.exists(CompatWcsPublisher.APP_DIR):
|
||||
shutil.rmtree(CompatWcsPublisher.APP_DIR)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
|
@ -251,12 +250,12 @@ def alt_tempdir():
|
|||
|
||||
@pytest.fixture
|
||||
def deploy_setup(alt_tempdir):
|
||||
WcsPublisher.APP_DIR = alt_tempdir
|
||||
CompatWcsPublisher.APP_DIR = alt_tempdir
|
||||
with open(os.path.join(alt_tempdir, 'hobo.json'), 'w') as fd:
|
||||
hobo_json = copy.deepcopy(HOBO_JSON)
|
||||
del hobo_json['services'][1] # authentic
|
||||
fd.write(json.dumps(HOBO_JSON))
|
||||
skeleton_dir = os.path.join(WcsPublisher.APP_DIR, 'skeletons')
|
||||
skeleton_dir = os.path.join(CompatWcsPublisher.APP_DIR, 'skeletons')
|
||||
if not os.path.exists(skeleton_dir):
|
||||
os.mkdir(skeleton_dir)
|
||||
with open(os.path.join(skeleton_dir, 'export-test.wcs'), 'wb') as f:
|
||||
|
@ -313,7 +312,8 @@ def test_configure_site_options(setuptest, alt_tempdir):
|
|||
assert pub.get_site_option('local-region-code') == 'FR'
|
||||
|
||||
|
||||
def test_update_configuration(setuptest):
|
||||
def test_update_configuration(setuptest, settings):
|
||||
settings.THEMES_DIRECTORY = os.path.join(os.path.dirname(__file__), 'themes')
|
||||
pub, hobo_cmd = setuptest
|
||||
service = [x for x in HOBO_JSON.get('services', []) if x.get('service-id') == 'wcs'][0]
|
||||
hobo_cmd.update_configuration(service, pub)
|
||||
|
@ -325,7 +325,8 @@ def test_update_configuration(setuptest):
|
|||
assert pub.cfg['sms']['sender'] == 'EO'
|
||||
|
||||
|
||||
def test_update_themes(setuptest):
|
||||
def test_update_themes(setuptest, settings):
|
||||
settings.THEMES_DIRECTORY = ''
|
||||
pub, hobo_cmd = setuptest
|
||||
pub.cfg['branding'] = {'theme': 'django'}
|
||||
service = [x for x in HOBO_JSON.get('services', []) if x.get('service-id') == 'wcs'][0]
|
||||
|
@ -336,17 +337,17 @@ def test_update_themes(setuptest):
|
|||
hobo_cmd.update_configuration(service, pub)
|
||||
assert pub.cfg['branding']['theme'] == 'django'
|
||||
|
||||
hobo_cmd.THEMES_DIRECTORY = os.path.join(os.path.dirname(__file__), 'themes')
|
||||
settings.THEMES_DIRECTORY = os.path.join(os.path.dirname(__file__), 'themes')
|
||||
hobo_cmd.update_configuration(service, pub)
|
||||
assert pub.cfg['branding']['theme'] == 'publik-base'
|
||||
assert os.readlink(os.path.join(pub.app_dir, 'static')) == os.path.join(
|
||||
hobo_cmd.THEMES_DIRECTORY, 'foobar/static'
|
||||
settings.THEMES_DIRECTORY, 'foobar/static'
|
||||
)
|
||||
assert os.readlink(os.path.join(pub.app_dir, 'templates')) == os.path.join(
|
||||
hobo_cmd.THEMES_DIRECTORY, 'foobar/templates'
|
||||
settings.THEMES_DIRECTORY, 'foobar/templates'
|
||||
)
|
||||
assert os.readlink(os.path.join(pub.app_dir, 'theme')) == os.path.join(
|
||||
hobo_cmd.THEMES_DIRECTORY, 'publik-base'
|
||||
settings.THEMES_DIRECTORY, 'publik-base'
|
||||
)
|
||||
|
||||
service['variables']['theme'] = 'foobar2'
|
||||
|
@ -354,7 +355,7 @@ def test_update_themes(setuptest):
|
|||
assert not os.path.lexists(os.path.join(pub.app_dir, 'static'))
|
||||
assert not os.path.lexists(os.path.join(pub.app_dir, 'templates'))
|
||||
assert os.readlink(os.path.join(pub.app_dir, 'theme')) == os.path.join(
|
||||
hobo_cmd.THEMES_DIRECTORY, 'foobar'
|
||||
settings.THEMES_DIRECTORY, 'foobar'
|
||||
)
|
||||
|
||||
|
||||
|
@ -485,17 +486,12 @@ def test_configure_authentication_methods(setuptest, http_requests):
|
|||
hobo_cmd.all_services = HOBO_JSON
|
||||
|
||||
|
||||
def test_deploy(setuptest, alt_tempdir, deploy_setup):
|
||||
dummy, hobo_cmd = setuptest
|
||||
def test_deploy(setuptest, alt_tempdir, deploy_setup, settings):
|
||||
settings.THEMES_DIRECTORY = os.path.join(os.path.dirname(__file__), 'themes')
|
||||
cleanup_connection()
|
||||
cleanup()
|
||||
hobo_cmd = CmdCheckHobos()
|
||||
base_options = {}
|
||||
sub_options_class = collections.namedtuple('Options', ['ignore_timestamp', 'redeploy', 'extra'])
|
||||
sub_options = sub_options_class(True, False, None)
|
||||
sys.modules['publisher'] = sys.modules['wcs.publisher']
|
||||
hobo_cmd.execute(
|
||||
base_options, sub_options, ['http://wcs.example.net/', os.path.join(alt_tempdir, 'hobo.json')]
|
||||
call_command(
|
||||
'hobo_deploy', '--ignore-timestamp', 'http://wcs.example.net/', os.path.join(alt_tempdir, 'hobo.json')
|
||||
)
|
||||
assert os.path.exists(os.path.join(alt_tempdir, 'tenants', 'wcs.example.net'))
|
||||
|
||||
|
@ -508,17 +504,20 @@ def test_deploy(setuptest, alt_tempdir, deploy_setup):
|
|||
del pub_cfg['language']
|
||||
with open(os.path.join(alt_tempdir, 'tenants', 'wcs.example.net', 'config.pck'), 'wb') as fd:
|
||||
pickle.dump(pub_cfg, fd)
|
||||
hobo_cmd.execute(
|
||||
base_options,
|
||||
sub_options,
|
||||
['http://wcs.example.net/', os.path.join(alt_tempdir, 'tenants', 'wcs.example.net', 'hobo.json')],
|
||||
call_command(
|
||||
'hobo_deploy',
|
||||
'--ignore-timestamp',
|
||||
'http://wcs.example.net/',
|
||||
os.path.join(alt_tempdir, 'tenants', 'wcs.example.net', 'hobo.json'),
|
||||
)
|
||||
with open(os.path.join(alt_tempdir, 'tenants', 'wcs.example.net', 'config.pck'), 'rb') as fd:
|
||||
pub_cfg = pickle.load(fd)
|
||||
assert pub_cfg['language'] == {'language': 'fr'}
|
||||
|
||||
|
||||
def test_configure_postgresql(setuptest, alt_tempdir, deploy_setup):
|
||||
def test_configure_postgresql(setuptest, alt_tempdir, deploy_setup, settings):
|
||||
settings.THEMES_DIRECTORY = os.path.join(os.path.dirname(__file__), 'themes')
|
||||
pub, hobo_cmd = setuptest
|
||||
cleanup_connection()
|
||||
cleanup()
|
||||
with open(os.path.join(alt_tempdir, 'hobo.json'), 'w') as fd:
|
||||
|
@ -528,20 +527,15 @@ def test_configure_postgresql(setuptest, alt_tempdir, deploy_setup):
|
|||
|
||||
service = [x for x in HOBO_JSON.get('services', []) if x.get('service-id') == 'wcs'][0]
|
||||
|
||||
hobo_cmd = CmdCheckHobos()
|
||||
base_options = collections.namedtuple('Options', ['configfile'])(configfile=None)
|
||||
sub_options_class = collections.namedtuple('Options', ['ignore_timestamp', 'redeploy', 'extra'])
|
||||
sub_options = sub_options_class(True, False, None)
|
||||
sys.modules['publisher'] = sys.modules['wcs.publisher']
|
||||
hobo_cmd.execute(
|
||||
base_options, sub_options, ['http://wcs.example.net/', os.path.join(alt_tempdir, 'hobo.json')]
|
||||
call_command(
|
||||
'hobo_deploy', '--ignore-timestamp', 'http://wcs.example.net/', os.path.join(alt_tempdir, 'hobo.json')
|
||||
)
|
||||
assert os.path.exists(os.path.join(alt_tempdir, 'tenants', 'wcs.example.net'))
|
||||
|
||||
cleanup_connection()
|
||||
cleanup()
|
||||
|
||||
pub = WcsPublisher.create_publisher(register_tld_names=False)
|
||||
pub = CompatWcsPublisher.create_publisher(register_tld_names=False)
|
||||
pub.app_dir = os.path.join(alt_tempdir, 'tenants', 'wcs.example.net')
|
||||
pub.cfg['postgresql'] = {
|
||||
'createdb-connection-params': {'user': 'test', 'database': 'postgres'},
|
||||
|
@ -595,3 +589,38 @@ def test_configure_postgresql(setuptest, alt_tempdir, deploy_setup):
|
|||
'application_name': 'wcs',
|
||||
}
|
||||
assert pub.initialize_sql.call_count == 3
|
||||
|
||||
|
||||
def test_redeploy(setuptest, alt_tempdir, deploy_setup, settings):
|
||||
settings.THEMES_DIRECTORY = os.path.join(os.path.dirname(__file__), 'themes')
|
||||
cleanup_connection()
|
||||
cleanup()
|
||||
call_command(
|
||||
'hobo_deploy', '--ignore-timestamp', 'http://wcs.example.net/', os.path.join(alt_tempdir, 'hobo.json')
|
||||
)
|
||||
assert os.path.exists(os.path.join(alt_tempdir, 'tenants', 'wcs.example.net'))
|
||||
|
||||
with open(os.path.join(alt_tempdir, 'hobo2.json'), 'w') as fd:
|
||||
hobo_json = copy.deepcopy(HOBO_JSON)
|
||||
del hobo_json['services'][1] # authentic
|
||||
hobo_json['services'][1]['saml-sp-metadata-url'] = 'http://wcs2.example.net/saml/metadata'
|
||||
hobo_json['services'][1]['base_url'] = 'http://wcs2.example.net/'
|
||||
hobo_json['services'][1]['backoffice-menu-url'] = 'http://wcs2.example.net/backoffice/menu.json'
|
||||
hobo_json['services'][1]['slug'] = 'test-wcs2'
|
||||
fd.write(json.dumps(hobo_json))
|
||||
|
||||
call_command(
|
||||
'hobo_deploy',
|
||||
'--ignore-timestamp',
|
||||
'http://wcs2.example.net/',
|
||||
os.path.join(alt_tempdir, 'hobo2.json'),
|
||||
)
|
||||
assert os.path.exists(os.path.join(alt_tempdir, 'tenants', 'wcs2.example.net'))
|
||||
|
||||
with mock.patch('wcs.ctl.management.commands.hobo_deploy.Command.deploy') as deploy:
|
||||
call_command('hobo_deploy', '--redeploy')
|
||||
assert deploy.call_count == 2
|
||||
assert {x[1]['base_url'] for x in deploy.call_args_list} == {
|
||||
'http://wcs.example.net',
|
||||
'http://wcs2.example.net',
|
||||
}
|
||||
|
|
|
@ -1,10 +1,14 @@
|
|||
import json
|
||||
import os
|
||||
import shutil
|
||||
import tempfile
|
||||
import uuid
|
||||
|
||||
import pytest
|
||||
from django.core.management import call_command
|
||||
|
||||
from wcs.api_utils import sign_url
|
||||
from wcs.ctl.hobo_notify import CmdHoboNotify
|
||||
from wcs.ctl.management.commands.hobo_notify import Command as HoboNotifyCommand
|
||||
from wcs.qommon import force_str
|
||||
from wcs.qommon.afterjobs import AfterJob
|
||||
from wcs.qommon.http_request import HTTPRequest
|
||||
|
@ -13,6 +17,13 @@ from wcs.sql_criterias import NotNull
|
|||
from .utilities import create_temporary_pub, get_app
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def alt_tempdir():
|
||||
alt_tempdir = tempfile.mkdtemp()
|
||||
yield alt_tempdir
|
||||
shutil.rmtree(alt_tempdir)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def pub(request):
|
||||
pub = create_temporary_pub()
|
||||
|
@ -74,7 +85,7 @@ def test_process_notification_role_wrong_audience(pub):
|
|||
assert pub.role_class.select()[0].emails is None
|
||||
assert pub.role_class.select()[0].emails_to_members is False
|
||||
assert pub.role_class.select()[0].allows_backoffice_access is False
|
||||
CmdHoboNotify.process_notification(notification)
|
||||
HoboNotifyCommand().process_notification(notification)
|
||||
assert pub.role_class.count() == 1
|
||||
assert pub.role_class.select()[0].name == 'Service étt civil'
|
||||
assert pub.role_class.select()[0].slug == 'service-ett-civil'
|
||||
|
@ -121,7 +132,7 @@ def test_process_notification_role(pub):
|
|||
assert pub.role_class.select()[0].emails_to_members is False
|
||||
assert pub.role_class.select()[0].allows_backoffice_access is False
|
||||
existing_role_id = pub.role_class.select()[0].id
|
||||
CmdHoboNotify.process_notification(notification)
|
||||
HoboNotifyCommand().process_notification(notification)
|
||||
assert pub.role_class.count() == 2
|
||||
old_role = pub.role_class.get(existing_role_id)
|
||||
assert old_role.name == 'Service état civil'
|
||||
|
@ -159,7 +170,7 @@ def test_process_notification_role(pub):
|
|||
],
|
||||
},
|
||||
}
|
||||
CmdHoboNotify.process_notification(notification)
|
||||
HoboNotifyCommand().process_notification(notification)
|
||||
assert pub.role_class.count() == 1
|
||||
assert pub.role_class.select()[0].id == new_role.id
|
||||
assert pub.role_class.select()[0].uuid == uuid1
|
||||
|
@ -174,7 +185,7 @@ def test_process_notification_role(pub):
|
|||
role.allows_backoffice_access = True
|
||||
role.store()
|
||||
|
||||
CmdHoboNotify.process_notification(notification)
|
||||
HoboNotifyCommand().process_notification(notification)
|
||||
assert pub.role_class.count() == 1
|
||||
pub.role_class.select()[0].refresh_from_storage()
|
||||
assert pub.role_class.select()[0].name == 'Service enfance'
|
||||
|
@ -203,7 +214,7 @@ def test_process_notification_internal_role(pub):
|
|||
],
|
||||
},
|
||||
}
|
||||
CmdHoboNotify.process_notification(notification)
|
||||
HoboNotifyCommand().process_notification(notification)
|
||||
assert pub.role_class.count() == 1
|
||||
role = pub.role_class.select()[0]
|
||||
assert role.is_internal()
|
||||
|
@ -247,7 +258,7 @@ def test_process_notification_role_description(pub):
|
|||
assert pub.role_class.select()[0].emails is None
|
||||
assert pub.role_class.select()[0].emails_to_members is False
|
||||
existing_role_id = pub.role_class.select()[0].id
|
||||
CmdHoboNotify.process_notification(notification)
|
||||
HoboNotifyCommand().process_notification(notification)
|
||||
assert pub.role_class.count() == 2
|
||||
old_role = pub.role_class.get(existing_role_id)
|
||||
assert old_role.name == 'Service état civil'
|
||||
|
@ -282,7 +293,7 @@ def test_process_notification_role_description(pub):
|
|||
],
|
||||
},
|
||||
}
|
||||
CmdHoboNotify.process_notification(notification)
|
||||
HoboNotifyCommand().process_notification(notification)
|
||||
assert pub.role_class.count() == 1
|
||||
assert pub.role_class.select()[0].id == new_role.id
|
||||
assert pub.role_class.select()[0].name == 'Service enfance'
|
||||
|
@ -321,7 +332,7 @@ def test_process_notification_role_deprovision(pub):
|
|||
role.store()
|
||||
|
||||
assert pub.role_class.count() == 2
|
||||
CmdHoboNotify.process_notification(notification)
|
||||
HoboNotifyCommand().process_notification(notification)
|
||||
assert pub.role_class.count() == 1
|
||||
assert pub.role_class.select()[0].slug == 'bar'
|
||||
|
||||
|
@ -329,7 +340,7 @@ def test_process_notification_role_deprovision(pub):
|
|||
r.uuid = uuid1
|
||||
r.store()
|
||||
assert pub.role_class.count() == 2
|
||||
CmdHoboNotify.process_notification(notification)
|
||||
HoboNotifyCommand().process_notification(notification)
|
||||
assert pub.role_class.count() == 1
|
||||
assert pub.role_class.select()[0].slug == 'bar'
|
||||
|
||||
|
@ -465,11 +476,11 @@ def test_process_notification_user_provision(pub):
|
|||
User = pub.user_class
|
||||
|
||||
# create some roles
|
||||
from wcs.ctl.check_hobos import CmdCheckHobos
|
||||
from wcs.ctl.management.commands.hobo_deploy import Command
|
||||
|
||||
# setup an hobo profile
|
||||
assert 'users' not in pub.cfg
|
||||
CmdCheckHobos().update_profile(PROFILE, pub)
|
||||
Command().update_profile(PROFILE, pub)
|
||||
assert pub.cfg['users']['field_phone'] == '_phone'
|
||||
|
||||
uuid1 = str(uuid.uuid4())
|
||||
|
@ -507,7 +518,7 @@ def test_process_notification_user_provision(pub):
|
|||
assert pub.role_class.select()[0].emails is None
|
||||
assert pub.role_class.select()[0].emails_to_members is False
|
||||
existing_role_id = pub.role_class.select()[0].id
|
||||
CmdHoboNotify.process_notification(notification)
|
||||
HoboNotifyCommand().process_notification(notification)
|
||||
assert pub.role_class.count() == 2
|
||||
old_role = pub.role_class.get(existing_role_id)
|
||||
assert old_role.name == 'Service état civil'
|
||||
|
@ -556,7 +567,7 @@ def test_process_notification_user_provision(pub):
|
|||
],
|
||||
},
|
||||
}
|
||||
CmdHoboNotify.process_notification(notification)
|
||||
HoboNotifyCommand().process_notification(notification)
|
||||
assert User.count() == 1
|
||||
user = User.select()[0]
|
||||
assert user.form_data is not None
|
||||
|
@ -604,7 +615,7 @@ def test_process_notification_user_provision(pub):
|
|||
],
|
||||
},
|
||||
}
|
||||
CmdHoboNotify.process_notification(notification)
|
||||
HoboNotifyCommand().process_notification(notification)
|
||||
assert User.count() == 1
|
||||
user = User.select()[0]
|
||||
assert user.form_data is not None
|
||||
|
@ -646,7 +657,7 @@ def test_process_notification_user_provision(pub):
|
|||
],
|
||||
},
|
||||
}
|
||||
CmdHoboNotify.process_notification(notification)
|
||||
HoboNotifyCommand().process_notification(notification)
|
||||
assert User.count() == 1
|
||||
if birthdate not in (None, ''): # wrong value : no nothing
|
||||
assert User.select()[0].form_data['_birthdate'].tm_year == 2000
|
||||
|
@ -686,7 +697,7 @@ def test_process_notification_user_provision(pub):
|
|||
],
|
||||
},
|
||||
}
|
||||
CmdHoboNotify.process_notification(notification)
|
||||
HoboNotifyCommand().process_notification(notification)
|
||||
assert User.count() == 1
|
||||
assert User.select()[0].first_name == 'John'
|
||||
assert not User.select()[0].email
|
||||
|
@ -725,7 +736,7 @@ def test_process_notification_user_provision(pub):
|
|||
],
|
||||
},
|
||||
}
|
||||
CmdHoboNotify.process_notification(notification)
|
||||
HoboNotifyCommand().process_notification(notification)
|
||||
assert User.count() == 1
|
||||
assert User.select()[0].first_name == 'John'
|
||||
assert not User.select()[0].phone
|
||||
|
@ -740,11 +751,11 @@ def test_process_notification_user_with_errors(pub):
|
|||
User = pub.user_class
|
||||
|
||||
# setup an hobo profile
|
||||
from wcs.ctl.check_hobos import CmdCheckHobos
|
||||
from wcs.ctl.management.commands.hobo_deploy import Command
|
||||
|
||||
User.wipe()
|
||||
pub.role_class.wipe()
|
||||
CmdCheckHobos().update_profile(PROFILE, pub)
|
||||
Command().update_profile(PROFILE, pub)
|
||||
|
||||
notification = {
|
||||
'@type': 'provision',
|
||||
|
@ -773,7 +784,7 @@ def test_process_notification_user_with_errors(pub):
|
|||
},
|
||||
}
|
||||
with pytest.raises(NotImplementedError) as e:
|
||||
CmdHoboNotify.process_notification(notification)
|
||||
HoboNotifyCommand().process_notification(notification)
|
||||
assert e.value.args == ('full is not supported for users',)
|
||||
assert User.count() == 0
|
||||
|
||||
|
@ -785,7 +796,7 @@ def test_process_notification_user_with_errors(pub):
|
|||
backup = notification['objects']['data'][0][key]
|
||||
del notification['objects']['data'][0][key]
|
||||
with pytest.raises(Exception) as e:
|
||||
CmdHoboNotify.process_notification(notification)
|
||||
HoboNotifyCommand().process_notification(notification)
|
||||
assert e.type == ValueError
|
||||
assert e.value.args == ('invalid user',)
|
||||
assert User.count() == 0
|
||||
|
@ -794,7 +805,7 @@ def test_process_notification_user_with_errors(pub):
|
|||
notification['@type'] = 'deprovision'
|
||||
del notification['objects']['data'][0]['uuid']
|
||||
with pytest.raises(Exception) as e:
|
||||
CmdHoboNotify.process_notification(notification)
|
||||
HoboNotifyCommand().process_notification(notification)
|
||||
assert e.type == KeyError
|
||||
assert e.value.args == ('user without uuid',)
|
||||
|
||||
|
@ -822,14 +833,14 @@ def test_process_notification_role_with_errors(pub):
|
|||
},
|
||||
}
|
||||
with pytest.raises(KeyError) as e:
|
||||
CmdHoboNotify.process_notification(notification)
|
||||
HoboNotifyCommand().process_notification(notification)
|
||||
assert e.value.args == ('role without uuid',)
|
||||
assert pub.role_class.count() == 0
|
||||
|
||||
notification['objects']['data'][0]['uuid'] = '12345'
|
||||
del notification['objects']['data'][0]['name']
|
||||
with pytest.raises(ValueError) as e:
|
||||
CmdHoboNotify.process_notification(notification)
|
||||
HoboNotifyCommand().process_notification(notification)
|
||||
assert e.value.args == ('invalid role',)
|
||||
assert pub.role_class.count() == 0
|
||||
|
||||
|
@ -838,11 +849,11 @@ def test_process_user_deprovision(pub):
|
|||
User = pub.user_class
|
||||
|
||||
# setup an hobo profile
|
||||
from wcs.ctl.check_hobos import CmdCheckHobos
|
||||
from wcs.ctl.management.commands.hobo_deploy import Command
|
||||
|
||||
User.wipe()
|
||||
pub.role_class.wipe()
|
||||
CmdCheckHobos().update_profile(PROFILE, pub)
|
||||
Command().update_profile(PROFILE, pub)
|
||||
|
||||
user = User()
|
||||
user.name = 'Pierre'
|
||||
|
@ -865,7 +876,7 @@ def test_process_user_deprovision(pub):
|
|||
|
||||
assert User.count() == 1
|
||||
assert len(User.select([NotNull('deleted_timestamp')])) == 0
|
||||
CmdHoboNotify.process_notification(notification)
|
||||
HoboNotifyCommand().process_notification(notification)
|
||||
assert User.count() == 1
|
||||
assert len(User.select([NotNull('deleted_timestamp')])) == 1
|
||||
|
||||
|
@ -876,11 +887,11 @@ def test_process_user_deprovision_with_data(pub):
|
|||
User = pub.user_class
|
||||
|
||||
# setup an hobo profile
|
||||
from wcs.ctl.check_hobos import CmdCheckHobos
|
||||
from wcs.ctl.management.commands.hobo_deploy import Command
|
||||
|
||||
User.wipe()
|
||||
FormDef.wipe()
|
||||
CmdCheckHobos().update_profile(PROFILE, pub)
|
||||
Command().update_profile(PROFILE, pub)
|
||||
|
||||
user = User()
|
||||
user.name = 'Pierre'
|
||||
|
@ -914,7 +925,7 @@ def test_process_user_deprovision_with_data(pub):
|
|||
|
||||
assert User.count() == 1
|
||||
assert len(User.select([NotNull('deleted_timestamp')])) == 0
|
||||
CmdHoboNotify.process_notification(notification)
|
||||
HoboNotifyCommand().process_notification(notification)
|
||||
assert User.count() == 1
|
||||
assert len(User.select([NotNull('deleted_timestamp')])) == 1
|
||||
|
||||
|
@ -956,3 +967,32 @@ def test_provision_http_endpoint(pub):
|
|||
get_app(pub).post_json(sign_url('/__provision__/?orig=coucou&sync=1', '1234'), notification)
|
||||
assert AfterJob.count() == 0 # sync
|
||||
assert pub.user_class.count() == 1
|
||||
|
||||
|
||||
def test_hobo_notify_call_command(pub, alt_tempdir):
|
||||
uuid1 = str(uuid.uuid4())
|
||||
with open(os.path.join(alt_tempdir, 'message.json'), 'w') as fd:
|
||||
json.dump(
|
||||
{
|
||||
'@type': 'deprovision',
|
||||
'audience': ['test'],
|
||||
'full': False,
|
||||
'objects': {
|
||||
'@type': 'role',
|
||||
'data': [
|
||||
{
|
||||
'@type': 'role',
|
||||
'uuid': uuid1,
|
||||
},
|
||||
],
|
||||
},
|
||||
},
|
||||
fd,
|
||||
)
|
||||
|
||||
pub.role_class.wipe()
|
||||
role = pub.role_class('foo')
|
||||
role.id = uuid1
|
||||
role.store()
|
||||
call_command('hobo_notify', os.path.join(alt_tempdir, 'message.json'))
|
||||
assert pub.role_class.count() == 0
|
||||
|
|
|
@ -1,4 +1,3 @@
|
|||
import collections
|
||||
import copy
|
||||
import json
|
||||
import os
|
||||
|
@ -9,10 +8,11 @@ import zipfile
|
|||
|
||||
import psycopg2
|
||||
import pytest
|
||||
from django.core.management import call_command
|
||||
from quixote import cleanup
|
||||
|
||||
from wcs.ctl.check_hobos import CmdCheckHobos
|
||||
from wcs.publisher import WcsPublisher
|
||||
from wcs.compat import CompatWcsPublisher
|
||||
from wcs.ctl.management.commands.hobo_deploy import Command as HoboDeployCommand
|
||||
from wcs.sql import cleanup_connection
|
||||
|
||||
from .utilities import clean_temporary_pub, create_temporary_pub
|
||||
|
@ -97,11 +97,11 @@ def setuptest():
|
|||
|
||||
pub = create_temporary_pub()
|
||||
pub.cfg['language'] = {'language': 'en'}
|
||||
hobo_cmd = CmdCheckHobos()
|
||||
hobo_cmd = HoboDeployCommand()
|
||||
hobo_cmd.all_services = HOBO_JSON
|
||||
WcsPublisher.APP_DIR = tempfile.mkdtemp()
|
||||
CompatWcsPublisher.APP_DIR = tempfile.mkdtemp()
|
||||
|
||||
skeleton_dir = os.path.join(WcsPublisher.APP_DIR, 'skeletons')
|
||||
skeleton_dir = os.path.join(CompatWcsPublisher.APP_DIR, 'skeletons')
|
||||
os.mkdir(skeleton_dir)
|
||||
with open(os.path.join(skeleton_dir, 'publik.zip'), 'wb') as f:
|
||||
with zipfile.ZipFile(f, 'w') as z:
|
||||
|
@ -111,8 +111,8 @@ def setuptest():
|
|||
yield pub, hobo_cmd
|
||||
|
||||
clean_temporary_pub()
|
||||
if os.path.exists(WcsPublisher.APP_DIR):
|
||||
shutil.rmtree(WcsPublisher.APP_DIR)
|
||||
if os.path.exists(CompatWcsPublisher.APP_DIR):
|
||||
shutil.rmtree(CompatWcsPublisher.APP_DIR)
|
||||
cleanup_connection()
|
||||
for dbname in (WCS_DB_NAME, NEW_WCS_DB_NAME):
|
||||
cursor.execute('DROP DATABASE IF EXISTS %s' % dbname)
|
||||
|
@ -134,29 +134,25 @@ def database_exists(database):
|
|||
|
||||
|
||||
def test_deploy(setuptest):
|
||||
_, hobo_cmd = setuptest
|
||||
assert not os.path.exists(os.path.join(WcsPublisher.APP_DIR, 'tenants', WCS_TENANT))
|
||||
assert not os.path.exists(os.path.join(CompatWcsPublisher.APP_DIR, 'tenants', WCS_TENANT))
|
||||
assert not database_exists(WCS_DB_NAME)
|
||||
assert not os.path.exists(os.path.join(WcsPublisher.APP_DIR, 'tenants', NEW_WCS_TENANT))
|
||||
assert not os.path.exists(os.path.join(CompatWcsPublisher.APP_DIR, 'tenants', NEW_WCS_TENANT))
|
||||
assert not database_exists(NEW_WCS_DB_NAME)
|
||||
|
||||
cleanup()
|
||||
with open(os.path.join(WcsPublisher.APP_DIR, 'hobo.json'), 'w') as fd:
|
||||
with open(os.path.join(CompatWcsPublisher.APP_DIR, 'hobo.json'), 'w') as fd:
|
||||
fd.write(json.dumps(HOBO_JSON))
|
||||
hobo_cmd = CmdCheckHobos()
|
||||
base_options = {}
|
||||
sub_options_class = collections.namedtuple('Options', ['ignore_timestamp', 'redeploy', 'extra'])
|
||||
sub_options = sub_options_class(True, False, None)
|
||||
hobo_cmd.execute(
|
||||
base_options,
|
||||
sub_options,
|
||||
['http://%s/' % WCS_TENANT, os.path.join(WcsPublisher.APP_DIR, 'hobo.json')],
|
||||
call_command(
|
||||
'hobo_deploy',
|
||||
'--ignore-timestamp',
|
||||
'http://%s/' % WCS_TENANT,
|
||||
os.path.join(CompatWcsPublisher.APP_DIR, 'hobo.json'),
|
||||
)
|
||||
assert os.path.exists(os.path.join(WcsPublisher.APP_DIR, 'tenants', WCS_TENANT))
|
||||
assert os.path.exists(os.path.join(CompatWcsPublisher.APP_DIR, 'tenants', WCS_TENANT))
|
||||
assert database_exists(WCS_DB_NAME)
|
||||
|
||||
# deploy a new tenant
|
||||
with open(os.path.join(WcsPublisher.APP_DIR, 'hobo.json'), 'w') as fd:
|
||||
with open(os.path.join(CompatWcsPublisher.APP_DIR, 'hobo.json'), 'w') as fd:
|
||||
hobo_json = copy.deepcopy(HOBO_JSON)
|
||||
wcs_service = hobo_json['services'][2]
|
||||
wcs_service['saml-sp-metadata-url'] = ('http://%s/saml/metadata' % WCS_TENANT,)
|
||||
|
@ -164,41 +160,38 @@ def test_deploy(setuptest):
|
|||
wcs_service['backoffice-menu-url'] = 'http://%s/backoffice/menu.json' % NEW_WCS_TENANT
|
||||
fd.write(json.dumps(hobo_json))
|
||||
|
||||
hobo_cmd.execute(
|
||||
base_options,
|
||||
sub_options,
|
||||
['http://%s/' % NEW_WCS_TENANT, os.path.join(WcsPublisher.APP_DIR, 'hobo.json')],
|
||||
call_command(
|
||||
'hobo_deploy',
|
||||
'--ignore-timestamp',
|
||||
'http://%s/' % NEW_WCS_TENANT,
|
||||
os.path.join(CompatWcsPublisher.APP_DIR, 'hobo.json'),
|
||||
)
|
||||
assert os.path.exists(os.path.join(WcsPublisher.APP_DIR, 'tenants', WCS_TENANT))
|
||||
assert os.path.exists(os.path.join(CompatWcsPublisher.APP_DIR, 'tenants', WCS_TENANT))
|
||||
assert database_exists(WCS_DB_NAME)
|
||||
assert os.path.exists(os.path.join(WcsPublisher.APP_DIR, 'tenants', NEW_WCS_TENANT))
|
||||
assert os.path.exists(os.path.join(CompatWcsPublisher.APP_DIR, 'tenants', NEW_WCS_TENANT))
|
||||
assert database_exists(NEW_WCS_DB_NAME)
|
||||
|
||||
|
||||
def test_deploy_url_change(setuptest):
|
||||
_, hobo_cmd = setuptest
|
||||
assert not os.path.exists(os.path.join(WcsPublisher.APP_DIR, 'tenants', WCS_TENANT))
|
||||
assert not os.path.exists(os.path.join(CompatWcsPublisher.APP_DIR, 'tenants', WCS_TENANT))
|
||||
assert not database_exists(WCS_DB_NAME)
|
||||
assert not os.path.exists(os.path.join(WcsPublisher.APP_DIR, 'tenants', NEW_WCS_TENANT))
|
||||
assert not os.path.exists(os.path.join(CompatWcsPublisher.APP_DIR, 'tenants', NEW_WCS_TENANT))
|
||||
assert not database_exists(NEW_WCS_DB_NAME)
|
||||
|
||||
cleanup()
|
||||
with open(os.path.join(WcsPublisher.APP_DIR, 'hobo.json'), 'w') as fd:
|
||||
with open(os.path.join(CompatWcsPublisher.APP_DIR, 'hobo.json'), 'w') as fd:
|
||||
fd.write(json.dumps(HOBO_JSON))
|
||||
hobo_cmd = CmdCheckHobos()
|
||||
base_options = {}
|
||||
sub_options_class = collections.namedtuple('Options', ['ignore_timestamp', 'redeploy', 'extra'])
|
||||
sub_options = sub_options_class(True, False, None)
|
||||
hobo_cmd.execute(
|
||||
base_options,
|
||||
sub_options,
|
||||
['http://%s/' % WCS_TENANT, os.path.join(WcsPublisher.APP_DIR, 'hobo.json')],
|
||||
call_command(
|
||||
'hobo_deploy',
|
||||
'--ignore-timestamp',
|
||||
'http://%s/' % WCS_TENANT,
|
||||
os.path.join(CompatWcsPublisher.APP_DIR, 'hobo.json'),
|
||||
)
|
||||
assert os.path.exists(os.path.join(WcsPublisher.APP_DIR, 'tenants', WCS_TENANT))
|
||||
assert os.path.exists(os.path.join(CompatWcsPublisher.APP_DIR, 'tenants', WCS_TENANT))
|
||||
assert database_exists(WCS_DB_NAME)
|
||||
|
||||
# domain change request
|
||||
with open(os.path.join(WcsPublisher.APP_DIR, 'hobo.json'), 'w') as fd:
|
||||
with open(os.path.join(CompatWcsPublisher.APP_DIR, 'hobo.json'), 'w') as fd:
|
||||
hobo_json = copy.deepcopy(HOBO_JSON)
|
||||
wcs_service = hobo_json['services'][2]
|
||||
wcs_service['legacy_urls'] = [
|
||||
|
@ -214,17 +207,18 @@ def test_deploy_url_change(setuptest):
|
|||
wcs_service['backoffice-menu-url'] = 'http://%s/backoffice/menu.json' % NEW_WCS_TENANT
|
||||
fd.write(json.dumps(hobo_json))
|
||||
|
||||
hobo_cmd.execute(
|
||||
base_options,
|
||||
sub_options,
|
||||
['http://%s/' % NEW_WCS_TENANT, os.path.join(WcsPublisher.APP_DIR, 'hobo.json')],
|
||||
call_command(
|
||||
'hobo_deploy',
|
||||
'--ignore-timestamp',
|
||||
'http://%s/' % NEW_WCS_TENANT,
|
||||
os.path.join(CompatWcsPublisher.APP_DIR, 'hobo.json'),
|
||||
)
|
||||
assert not os.path.exists(os.path.join(WcsPublisher.APP_DIR, 'tenants', WCS_TENANT))
|
||||
assert not os.path.exists(os.path.join(CompatWcsPublisher.APP_DIR, 'tenants', WCS_TENANT))
|
||||
assert database_exists(WCS_DB_NAME)
|
||||
assert os.path.exists(os.path.join(WcsPublisher.APP_DIR, 'tenants', NEW_WCS_TENANT))
|
||||
assert os.path.exists(os.path.join(CompatWcsPublisher.APP_DIR, 'tenants', NEW_WCS_TENANT))
|
||||
assert not database_exists(NEW_WCS_DB_NAME)
|
||||
|
||||
publisher = WcsPublisher.create_publisher()
|
||||
publisher = CompatWcsPublisher.create_publisher()
|
||||
publisher.set_tenant_by_hostname(NEW_WCS_TENANT)
|
||||
# check that WCS_DB_NAME is used by NEW_WCS_TENANT
|
||||
assert publisher.cfg['postgresql']['database'] == WCS_DB_NAME
|
||||
|
@ -233,35 +227,31 @@ def test_deploy_url_change(setuptest):
|
|||
|
||||
|
||||
def test_deploy_url_change_old_tenant_dir(setuptest):
|
||||
_, hobo_cmd = setuptest
|
||||
assert not os.path.exists(os.path.join(WcsPublisher.APP_DIR, 'tenants', WCS_TENANT))
|
||||
assert not os.path.exists(os.path.join(CompatWcsPublisher.APP_DIR, 'tenants', WCS_TENANT))
|
||||
assert not database_exists(WCS_DB_NAME)
|
||||
assert not os.path.exists(os.path.join(WcsPublisher.APP_DIR, 'tenants', NEW_WCS_TENANT))
|
||||
assert not os.path.exists(os.path.join(CompatWcsPublisher.APP_DIR, 'tenants', NEW_WCS_TENANT))
|
||||
assert not database_exists(NEW_WCS_DB_NAME)
|
||||
|
||||
cleanup()
|
||||
with open(os.path.join(WcsPublisher.APP_DIR, 'hobo.json'), 'w') as fd:
|
||||
with open(os.path.join(CompatWcsPublisher.APP_DIR, 'hobo.json'), 'w') as fd:
|
||||
fd.write(json.dumps(HOBO_JSON))
|
||||
hobo_cmd = CmdCheckHobos()
|
||||
base_options = {}
|
||||
sub_options_class = collections.namedtuple('Options', ['ignore_timestamp', 'redeploy', 'extra'])
|
||||
sub_options = sub_options_class(True, False, None)
|
||||
hobo_cmd.execute(
|
||||
base_options,
|
||||
sub_options,
|
||||
['http://%s/' % WCS_TENANT, os.path.join(WcsPublisher.APP_DIR, 'hobo.json')],
|
||||
call_command(
|
||||
'hobo_deploy',
|
||||
'--ignore-timestamp',
|
||||
'http://%s/' % WCS_TENANT,
|
||||
os.path.join(CompatWcsPublisher.APP_DIR, 'hobo.json'),
|
||||
)
|
||||
assert os.path.exists(os.path.join(WcsPublisher.APP_DIR, 'tenants', WCS_TENANT))
|
||||
assert os.path.exists(os.path.join(CompatWcsPublisher.APP_DIR, 'tenants', WCS_TENANT))
|
||||
assert database_exists(WCS_DB_NAME)
|
||||
# move tenant to APP_DIR (legacy way)
|
||||
os.replace(
|
||||
os.path.join(WcsPublisher.APP_DIR, 'tenants', WCS_TENANT),
|
||||
os.path.join(WcsPublisher.APP_DIR, WCS_TENANT),
|
||||
os.path.join(CompatWcsPublisher.APP_DIR, 'tenants', WCS_TENANT),
|
||||
os.path.join(CompatWcsPublisher.APP_DIR, WCS_TENANT),
|
||||
)
|
||||
assert not os.path.exists(os.path.join(WcsPublisher.APP_DIR, 'tenants', WCS_TENANT))
|
||||
assert not os.path.exists(os.path.join(CompatWcsPublisher.APP_DIR, 'tenants', WCS_TENANT))
|
||||
|
||||
# domain change request
|
||||
with open(os.path.join(WcsPublisher.APP_DIR, 'hobo.json'), 'w') as fd:
|
||||
with open(os.path.join(CompatWcsPublisher.APP_DIR, 'hobo.json'), 'w') as fd:
|
||||
hobo_json = copy.deepcopy(HOBO_JSON)
|
||||
wcs_service = hobo_json['services'][2]
|
||||
wcs_service['legacy_urls'] = [
|
||||
|
@ -277,17 +267,18 @@ def test_deploy_url_change_old_tenant_dir(setuptest):
|
|||
wcs_service['backoffice-menu-url'] = 'http://%s/backoffice/menu.json' % NEW_WCS_TENANT
|
||||
fd.write(json.dumps(hobo_json))
|
||||
|
||||
hobo_cmd.execute(
|
||||
base_options,
|
||||
sub_options,
|
||||
['http://%s/' % NEW_WCS_TENANT, os.path.join(WcsPublisher.APP_DIR, 'hobo.json')],
|
||||
call_command(
|
||||
'hobo_deploy',
|
||||
'--ignore-timestamp',
|
||||
'http://%s/' % NEW_WCS_TENANT,
|
||||
os.path.join(CompatWcsPublisher.APP_DIR, 'hobo.json'),
|
||||
)
|
||||
assert not os.path.exists(os.path.join(WcsPublisher.APP_DIR, WCS_TENANT))
|
||||
assert not os.path.exists(os.path.join(CompatWcsPublisher.APP_DIR, WCS_TENANT))
|
||||
assert database_exists(WCS_DB_NAME)
|
||||
assert os.path.exists(os.path.join(WcsPublisher.APP_DIR, 'tenants', NEW_WCS_TENANT))
|
||||
assert os.path.exists(os.path.join(CompatWcsPublisher.APP_DIR, 'tenants', NEW_WCS_TENANT))
|
||||
assert not database_exists(NEW_WCS_DB_NAME)
|
||||
|
||||
publisher = WcsPublisher.create_publisher()
|
||||
publisher = CompatWcsPublisher.create_publisher()
|
||||
publisher.set_tenant_by_hostname(NEW_WCS_TENANT)
|
||||
# check that WCS_DB_NAME is used by NEW_WCS_TENANT
|
||||
assert publisher.cfg['postgresql']['database'] == WCS_DB_NAME
|
||||
|
|
|
@ -279,9 +279,9 @@ def test_assertion_consumer_unspecified(pub):
|
|||
|
||||
def test_assertion_consumer_existing_federation(pub, caplog):
|
||||
# setup an hobo profile
|
||||
from wcs.ctl.check_hobos import CmdCheckHobos
|
||||
from wcs.ctl.management.commands.hobo_deploy import Command as CmdHoboDeploy
|
||||
|
||||
CmdCheckHobos().update_profile(PROFILE, pub)
|
||||
CmdHoboDeploy().update_profile(PROFILE, pub)
|
||||
|
||||
pub.set_config()
|
||||
|
||||
|
|
|
@ -34,7 +34,7 @@ from wcs.api_utils import get_query_flag, get_user_from_api_query_string, is_url
|
|||
from wcs.carddef import CardDef
|
||||
from wcs.categories import Category
|
||||
from wcs.conditions import Condition, ValidationError
|
||||
from wcs.ctl.hobo_notify import CmdHoboNotify
|
||||
from wcs.ctl.management.commands.hobo_notify import Command as CmdHoboNotify
|
||||
from wcs.data_sources import NamedDataSource
|
||||
from wcs.data_sources import get_object as get_data_source_object
|
||||
from wcs.data_sources import request_json_items
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
# w.c.s. - web application for online forms
|
||||
# Copyright (C) 2005-2014 Entr'ouvert
|
||||
# Copyright (C) 2005-2023 Entr'ouvert
|
||||
#
|
||||
# This program is free software; you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
|
@ -24,21 +24,17 @@ import tempfile
|
|||
import urllib.parse
|
||||
|
||||
import phonenumbers
|
||||
from django.conf import settings
|
||||
from django.utils.encoding import force_bytes
|
||||
from quixote import cleanup
|
||||
|
||||
from wcs.admin.settings import UserFieldsFormDef
|
||||
from wcs.fields import DateField, EmailField, StringField
|
||||
from wcs.qommon import force_str
|
||||
from wcs.qommon import force_str, misc
|
||||
from wcs.qommon.publisher import UnknownTenantError, get_publisher_class
|
||||
from wcs.qommon.storage import atomic_write
|
||||
|
||||
from ..qommon import misc
|
||||
from ..qommon.ctl import Command, make_option
|
||||
from ..qommon.publisher import UnknownTenantError
|
||||
from ..qommon.storage import atomic_write
|
||||
|
||||
|
||||
class NoChange(Exception):
|
||||
pass
|
||||
from . import TenantCommand
|
||||
|
||||
|
||||
def atomic_symlink(src, dst):
|
||||
|
@ -50,30 +46,21 @@ def atomic_symlink(src, dst):
|
|||
os.rename(dst + '.tmp', dst)
|
||||
|
||||
|
||||
class CmdCheckHobos(Command):
|
||||
name = 'hobo_deploy'
|
||||
class NoChange(Exception):
|
||||
pass
|
||||
|
||||
# TODO: import this from django settings
|
||||
THEMES_DIRECTORY = os.environ.get('THEMES_DIRECTORY', '/usr/share/publik/themes')
|
||||
|
||||
def __init__(self):
|
||||
Command.__init__(
|
||||
self,
|
||||
[
|
||||
make_option(
|
||||
'--ignore-timestamp', action='store_true', dest='ignore_timestamp', default=False
|
||||
),
|
||||
make_option('--redeploy', action='store_true', default=False),
|
||||
],
|
||||
)
|
||||
class Command(TenantCommand):
|
||||
def add_arguments(self, parser):
|
||||
parser.add_argument('base_url', metavar='BASE_URL', nargs='?', type=str)
|
||||
parser.add_argument('json_filename', metavar='JSON_FILENAME', nargs='?', type=str)
|
||||
parser.add_argument('--ignore-timestamp', dest='ignore_timestamp', action='store_true', default=False)
|
||||
parser.add_argument('--redeploy', action='store_true', default=False)
|
||||
|
||||
def execute(self, base_options, sub_options, args):
|
||||
from .. import publisher
|
||||
|
||||
publisher.WcsPublisher.configure(self.config)
|
||||
if sub_options.redeploy:
|
||||
sub_options.ignore_timestamp = True
|
||||
for tenant in publisher.WcsPublisher.get_tenants():
|
||||
def handle(self, **options):
|
||||
if options.get('redeploy'):
|
||||
options['ignore_timestamp'] = True
|
||||
for tenant in get_publisher_class().get_tenants():
|
||||
hobo_json_path = os.path.join(tenant.directory, 'hobo.json')
|
||||
if not os.path.exists(hobo_json_path):
|
||||
continue
|
||||
|
@ -85,26 +72,25 @@ class CmdCheckHobos(Command):
|
|||
pass
|
||||
else:
|
||||
cleanup()
|
||||
self.deploy(base_options, sub_options, [me['base_url'], hobo_json_path])
|
||||
options['base_url'] = me['base_url']
|
||||
options['json_filename'] = hobo_json_path
|
||||
self.init_tenant_publisher(tenant.hostname, register_tld_names=False)
|
||||
self.deploy(**options)
|
||||
else:
|
||||
self.deploy(base_options, sub_options, args)
|
||||
self.deploy(**options)
|
||||
|
||||
def deploy(self, base_options, sub_options, args):
|
||||
from .. import publisher
|
||||
|
||||
self.base_options = base_options
|
||||
publisher.WcsPublisher.configure(self.config)
|
||||
pub = publisher.WcsPublisher.create_publisher(register_tld_names=False)
|
||||
def deploy(self, **options):
|
||||
pub = get_publisher_class().create_publisher(register_tld_names=False)
|
||||
|
||||
global_app_dir = pub.app_dir
|
||||
global_tenants_dir = os.path.join(global_app_dir, 'tenants')
|
||||
base_url = args[0]
|
||||
base_url = options.get('base_url')
|
||||
|
||||
if args[1] == '-':
|
||||
if options.get('json_filename') == '-':
|
||||
# get environment definition from stdin
|
||||
self.all_services = json.load(sys.stdin)
|
||||
else:
|
||||
with open(args[1]) as fd:
|
||||
with open(options.get('json_filename')) as fd:
|
||||
self.all_services = json.load(fd)
|
||||
|
||||
try:
|
||||
|
@ -173,7 +159,7 @@ class CmdCheckHobos(Command):
|
|||
print('updating instance in', pub.app_dir)
|
||||
|
||||
try:
|
||||
self.configure_site_options(service, pub, ignore_timestamp=sub_options.ignore_timestamp)
|
||||
self.configure_site_options(service, pub, ignore_timestamp=options.get('ignore_timestamp'))
|
||||
except NoChange:
|
||||
print(' skipping')
|
||||
return
|
||||
|
@ -208,10 +194,10 @@ class CmdCheckHobos(Command):
|
|||
|
||||
theme_id = variables.get('theme')
|
||||
theme_data = None
|
||||
if theme_id and os.path.exists(self.THEMES_DIRECTORY):
|
||||
for theme_module in os.listdir(self.THEMES_DIRECTORY):
|
||||
if theme_id and os.path.exists(settings.THEMES_DIRECTORY):
|
||||
for theme_module in os.listdir(settings.THEMES_DIRECTORY):
|
||||
try:
|
||||
with open(os.path.join(self.THEMES_DIRECTORY, theme_module, 'themes.json')) as fd:
|
||||
with open(os.path.join(settings.THEMES_DIRECTORY, theme_module, 'themes.json')) as fd:
|
||||
themes_json = json.load(fd)
|
||||
except OSError:
|
||||
continue
|
||||
|
@ -233,7 +219,7 @@ class CmdCheckHobos(Command):
|
|||
)
|
||||
tenant_dir = pub.app_dir
|
||||
theme_dir = os.path.join(tenant_dir, 'theme')
|
||||
target_dir = os.path.join(self.THEMES_DIRECTORY, theme_data['module'])
|
||||
target_dir = os.path.join(settings.THEMES_DIRECTORY, theme_data['module'])
|
||||
atomic_symlink(target_dir, theme_dir)
|
||||
for component in ('static', 'templates'):
|
||||
component_dir = os.path.join(tenant_dir, component)
|
||||
|
@ -249,7 +235,8 @@ class CmdCheckHobos(Command):
|
|||
pass
|
||||
else:
|
||||
atomic_symlink(
|
||||
os.path.join(self.THEMES_DIRECTORY, theme_data['overlay'], component), component_dir
|
||||
os.path.join(settings.THEMES_DIRECTORY, theme_data['overlay'], component),
|
||||
component_dir,
|
||||
)
|
||||
|
||||
if variables.get('default_from_email'):
|
||||
|
@ -361,7 +348,7 @@ class CmdCheckHobos(Command):
|
|||
|
||||
# initialize service provider side
|
||||
if not pub.cfg['sp'].get('publickey') or force_spconfig:
|
||||
from ..qommon.ident.idp import MethodAdminDirectory
|
||||
from wcs.qommon.ident.idp import MethodAdminDirectory
|
||||
|
||||
spconfig = pub.cfg['sp']
|
||||
spconfig['saml2_base_url'] = str(service.get('base_url')) + '/saml'
|
||||
|
@ -403,7 +390,7 @@ class CmdCheckHobos(Command):
|
|||
metadata_pathname = tempfile.mkstemp('.metadata')[1]
|
||||
atomic_write(metadata_pathname, force_bytes(s))
|
||||
|
||||
from ..qommon.ident.idp import AdminIDPDir
|
||||
from wcs.qommon.ident.idp import AdminIDPDir
|
||||
|
||||
admin_dir = AdminIDPDir()
|
||||
key_provider_id = admin_dir.submit_new_remote(metadata_pathname, metadata_url)
|
||||
|
@ -672,6 +659,3 @@ class CmdCheckHobos(Command):
|
|||
secret2 = hashlib.sha256(force_bytes(secret2)).hexdigest()
|
||||
# rstrip('L') for py2/3 compatibility, as py2 formats number as 0x...L, and py3 as 0x...
|
||||
return hex(int(secret1, 16) ^ int(secret2, 16))[2:].rstrip('L')
|
||||
|
||||
|
||||
CmdCheckHobos.register()
|
|
@ -1,5 +1,5 @@
|
|||
# w.c.s. - web application for online forms
|
||||
# Copyright (C) 2005-2014 Entr'ouvert
|
||||
# Copyright (C) 2005-2023 Entr'ouvert
|
||||
#
|
||||
# This program is free software; you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
|
@ -15,7 +15,6 @@
|
|||
# along with this program; if not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
|
||||
from quixote import get_publisher
|
||||
|
@ -23,37 +22,32 @@ from quixote import get_publisher
|
|||
from wcs.admin.settings import UserFieldsFormDef
|
||||
from wcs.qommon import force_str
|
||||
from wcs.qommon.misc import json_encode_helper
|
||||
from wcs.qommon.publisher import get_cfg, get_publisher_class
|
||||
|
||||
from ..qommon.ctl import Command
|
||||
from ..qommon.publisher import get_cfg
|
||||
from . import TenantCommand
|
||||
|
||||
|
||||
class CmdHoboNotify(Command):
|
||||
name = 'hobo_notify'
|
||||
class Command(TenantCommand):
|
||||
def add_arguments(self, parser):
|
||||
parser.add_argument('notification', metavar='NOTIFICATION', type=str)
|
||||
|
||||
def execute(self, base_options, sub_options, args):
|
||||
self.base_options = base_options
|
||||
|
||||
notification = self.load_notification(args)
|
||||
def handle(self, **options):
|
||||
notification = self.load_notification(options.get('notification'))
|
||||
if not self.check_valid_notification(notification):
|
||||
sys.exit(1)
|
||||
from .. import publisher
|
||||
|
||||
publisher.WcsPublisher.configure(self.config)
|
||||
pub = publisher.WcsPublisher.create_publisher(register_tld_names=False)
|
||||
for tenant in publisher.WcsPublisher.get_tenants():
|
||||
if not os.path.exists(os.path.join(tenant.directory, 'config.pck')):
|
||||
continue
|
||||
pub.set_tenant(tenant)
|
||||
self.process_notification(notification, pub)
|
||||
for tenant in get_publisher_class().get_tenants():
|
||||
pub = get_publisher_class().create_publisher(register_tld_names=False)
|
||||
pub.set_tenant_by_hostname(tenant.hostname)
|
||||
self.process_notification(notification, publisher=pub)
|
||||
|
||||
@classmethod
|
||||
def load_notification(cls, args):
|
||||
if args[0] == '-':
|
||||
def load_notification(cls, notification):
|
||||
if notification == '-':
|
||||
# get environment definition from stdin
|
||||
return json.load(sys.stdin)
|
||||
else:
|
||||
with open(args[0]) as fd:
|
||||
with open(notification) as fd:
|
||||
return json.load(fd)
|
||||
|
||||
@classmethod
|
||||
|
@ -198,6 +192,3 @@ class CmdHoboNotify(Command):
|
|||
user.set_deleted()
|
||||
except Exception as e:
|
||||
publisher.record_error(exception=e, context='[PROVISIONNING]', notify=True)
|
||||
|
||||
|
||||
CmdHoboNotify.register()
|
Loading…
Reference in New Issue