misc: make hobo_notify a django management command (#20411)
gitea/wcs/pipeline/head This commit looks good Details

This commit is contained in:
Frédéric Péters 2023-10-04 08:50:41 +02:00
parent 98f96c98fe
commit 73e8ab9b24
3 changed files with 79 additions and 48 deletions

View File

@ -1,10 +1,14 @@
import json
import os
import shutil
import tempfile
import uuid
import pytest
from django.core.management import call_command
from wcs.api_utils import sign_url
from wcs.ctl.hobo_notify import CmdHoboNotify
from wcs.ctl.management.commands.hobo_notify import Command as HoboNotifyCommand
from wcs.qommon import force_str
from wcs.qommon.afterjobs import AfterJob
from wcs.qommon.http_request import HTTPRequest
@ -13,6 +17,13 @@ from wcs.sql_criterias import NotNull
from .utilities import create_temporary_pub, get_app
@pytest.fixture
def alt_tempdir():
alt_tempdir = tempfile.mkdtemp()
yield alt_tempdir
shutil.rmtree(alt_tempdir)
@pytest.fixture
def pub(request):
pub = create_temporary_pub()
@ -74,7 +85,7 @@ def test_process_notification_role_wrong_audience(pub):
assert pub.role_class.select()[0].emails is None
assert pub.role_class.select()[0].emails_to_members is False
assert pub.role_class.select()[0].allows_backoffice_access is False
CmdHoboNotify.process_notification(notification)
HoboNotifyCommand().process_notification(notification)
assert pub.role_class.count() == 1
assert pub.role_class.select()[0].name == 'Service étt civil'
assert pub.role_class.select()[0].slug == 'service-ett-civil'
@ -121,7 +132,7 @@ def test_process_notification_role(pub):
assert pub.role_class.select()[0].emails_to_members is False
assert pub.role_class.select()[0].allows_backoffice_access is False
existing_role_id = pub.role_class.select()[0].id
CmdHoboNotify.process_notification(notification)
HoboNotifyCommand().process_notification(notification)
assert pub.role_class.count() == 2
old_role = pub.role_class.get(existing_role_id)
assert old_role.name == 'Service état civil'
@ -159,7 +170,7 @@ def test_process_notification_role(pub):
],
},
}
CmdHoboNotify.process_notification(notification)
HoboNotifyCommand().process_notification(notification)
assert pub.role_class.count() == 1
assert pub.role_class.select()[0].id == new_role.id
assert pub.role_class.select()[0].uuid == uuid1
@ -174,7 +185,7 @@ def test_process_notification_role(pub):
role.allows_backoffice_access = True
role.store()
CmdHoboNotify.process_notification(notification)
HoboNotifyCommand().process_notification(notification)
assert pub.role_class.count() == 1
pub.role_class.select()[0].refresh_from_storage()
assert pub.role_class.select()[0].name == 'Service enfance'
@ -203,7 +214,7 @@ def test_process_notification_internal_role(pub):
],
},
}
CmdHoboNotify.process_notification(notification)
HoboNotifyCommand().process_notification(notification)
assert pub.role_class.count() == 1
role = pub.role_class.select()[0]
assert role.is_internal()
@ -247,7 +258,7 @@ def test_process_notification_role_description(pub):
assert pub.role_class.select()[0].emails is None
assert pub.role_class.select()[0].emails_to_members is False
existing_role_id = pub.role_class.select()[0].id
CmdHoboNotify.process_notification(notification)
HoboNotifyCommand().process_notification(notification)
assert pub.role_class.count() == 2
old_role = pub.role_class.get(existing_role_id)
assert old_role.name == 'Service état civil'
@ -282,7 +293,7 @@ def test_process_notification_role_description(pub):
],
},
}
CmdHoboNotify.process_notification(notification)
HoboNotifyCommand().process_notification(notification)
assert pub.role_class.count() == 1
assert pub.role_class.select()[0].id == new_role.id
assert pub.role_class.select()[0].name == 'Service enfance'
@ -321,7 +332,7 @@ def test_process_notification_role_deprovision(pub):
role.store()
assert pub.role_class.count() == 2
CmdHoboNotify.process_notification(notification)
HoboNotifyCommand().process_notification(notification)
assert pub.role_class.count() == 1
assert pub.role_class.select()[0].slug == 'bar'
@ -329,7 +340,7 @@ def test_process_notification_role_deprovision(pub):
r.uuid = uuid1
r.store()
assert pub.role_class.count() == 2
CmdHoboNotify.process_notification(notification)
HoboNotifyCommand().process_notification(notification)
assert pub.role_class.count() == 1
assert pub.role_class.select()[0].slug == 'bar'
@ -507,7 +518,7 @@ def test_process_notification_user_provision(pub):
assert pub.role_class.select()[0].emails is None
assert pub.role_class.select()[0].emails_to_members is False
existing_role_id = pub.role_class.select()[0].id
CmdHoboNotify.process_notification(notification)
HoboNotifyCommand().process_notification(notification)
assert pub.role_class.count() == 2
old_role = pub.role_class.get(existing_role_id)
assert old_role.name == 'Service état civil'
@ -556,7 +567,7 @@ def test_process_notification_user_provision(pub):
],
},
}
CmdHoboNotify.process_notification(notification)
HoboNotifyCommand().process_notification(notification)
assert User.count() == 1
user = User.select()[0]
assert user.form_data is not None
@ -604,7 +615,7 @@ def test_process_notification_user_provision(pub):
],
},
}
CmdHoboNotify.process_notification(notification)
HoboNotifyCommand().process_notification(notification)
assert User.count() == 1
user = User.select()[0]
assert user.form_data is not None
@ -646,7 +657,7 @@ def test_process_notification_user_provision(pub):
],
},
}
CmdHoboNotify.process_notification(notification)
HoboNotifyCommand().process_notification(notification)
assert User.count() == 1
if birthdate not in (None, ''): # wrong value : no nothing
assert User.select()[0].form_data['_birthdate'].tm_year == 2000
@ -686,7 +697,7 @@ def test_process_notification_user_provision(pub):
],
},
}
CmdHoboNotify.process_notification(notification)
HoboNotifyCommand().process_notification(notification)
assert User.count() == 1
assert User.select()[0].first_name == 'John'
assert not User.select()[0].email
@ -725,7 +736,7 @@ def test_process_notification_user_provision(pub):
],
},
}
CmdHoboNotify.process_notification(notification)
HoboNotifyCommand().process_notification(notification)
assert User.count() == 1
assert User.select()[0].first_name == 'John'
assert not User.select()[0].phone
@ -773,7 +784,7 @@ def test_process_notification_user_with_errors(pub):
},
}
with pytest.raises(NotImplementedError) as e:
CmdHoboNotify.process_notification(notification)
HoboNotifyCommand().process_notification(notification)
assert e.value.args == ('full is not supported for users',)
assert User.count() == 0
@ -785,7 +796,7 @@ def test_process_notification_user_with_errors(pub):
backup = notification['objects']['data'][0][key]
del notification['objects']['data'][0][key]
with pytest.raises(Exception) as e:
CmdHoboNotify.process_notification(notification)
HoboNotifyCommand().process_notification(notification)
assert e.type == ValueError
assert e.value.args == ('invalid user',)
assert User.count() == 0
@ -794,7 +805,7 @@ def test_process_notification_user_with_errors(pub):
notification['@type'] = 'deprovision'
del notification['objects']['data'][0]['uuid']
with pytest.raises(Exception) as e:
CmdHoboNotify.process_notification(notification)
HoboNotifyCommand().process_notification(notification)
assert e.type == KeyError
assert e.value.args == ('user without uuid',)
@ -822,14 +833,14 @@ def test_process_notification_role_with_errors(pub):
},
}
with pytest.raises(KeyError) as e:
CmdHoboNotify.process_notification(notification)
HoboNotifyCommand().process_notification(notification)
assert e.value.args == ('role without uuid',)
assert pub.role_class.count() == 0
notification['objects']['data'][0]['uuid'] = '12345'
del notification['objects']['data'][0]['name']
with pytest.raises(ValueError) as e:
CmdHoboNotify.process_notification(notification)
HoboNotifyCommand().process_notification(notification)
assert e.value.args == ('invalid role',)
assert pub.role_class.count() == 0
@ -865,7 +876,7 @@ def test_process_user_deprovision(pub):
assert User.count() == 1
assert len(User.select([NotNull('deleted_timestamp')])) == 0
CmdHoboNotify.process_notification(notification)
HoboNotifyCommand().process_notification(notification)
assert User.count() == 1
assert len(User.select([NotNull('deleted_timestamp')])) == 1
@ -914,7 +925,7 @@ def test_process_user_deprovision_with_data(pub):
assert User.count() == 1
assert len(User.select([NotNull('deleted_timestamp')])) == 0
CmdHoboNotify.process_notification(notification)
HoboNotifyCommand().process_notification(notification)
assert User.count() == 1
assert len(User.select([NotNull('deleted_timestamp')])) == 1
@ -956,3 +967,32 @@ def test_provision_http_endpoint(pub):
get_app(pub).post_json(sign_url('/__provision__/?orig=coucou&sync=1', '1234'), notification)
assert AfterJob.count() == 0 # sync
assert pub.user_class.count() == 1
def test_hobo_notify_call_command(pub, alt_tempdir):
uuid1 = str(uuid.uuid4())
with open(os.path.join(alt_tempdir, 'message.json'), 'w') as fd:
json.dump(
{
'@type': 'deprovision',
'audience': ['test'],
'full': False,
'objects': {
'@type': 'role',
'data': [
{
'@type': 'role',
'uuid': uuid1,
},
],
},
},
fd,
)
pub.role_class.wipe()
role = pub.role_class('foo')
role.id = uuid1
role.store()
call_command('hobo_notify', os.path.join(alt_tempdir, 'message.json'))
assert pub.role_class.count() == 0

View File

@ -34,7 +34,7 @@ from wcs.api_utils import get_query_flag, get_user_from_api_query_string, is_url
from wcs.carddef import CardDef
from wcs.categories import Category
from wcs.conditions import Condition, ValidationError
from wcs.ctl.hobo_notify import CmdHoboNotify
from wcs.ctl.management.commands.hobo_notify import Command as CmdHoboNotify
from wcs.data_sources import NamedDataSource
from wcs.data_sources import get_object as get_data_source_object
from wcs.data_sources import request_json_items

View File

@ -1,5 +1,5 @@
# w.c.s. - web application for online forms
# Copyright (C) 2005-2014 Entr'ouvert
# Copyright (C) 2005-2023 Entr'ouvert
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@ -15,7 +15,6 @@
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import json
import os
import sys
from quixote import get_publisher
@ -23,37 +22,32 @@ from quixote import get_publisher
from wcs.admin.settings import UserFieldsFormDef
from wcs.qommon import force_str
from wcs.qommon.misc import json_encode_helper
from wcs.qommon.publisher import get_cfg, get_publisher_class
from ..qommon.ctl import Command
from ..qommon.publisher import get_cfg
from . import TenantCommand
class CmdHoboNotify(Command):
name = 'hobo_notify'
class Command(TenantCommand):
def add_arguments(self, parser):
parser.add_argument('notification', metavar='NOTIFICATION', type=str)
def execute(self, base_options, sub_options, args):
self.base_options = base_options
notification = self.load_notification(args)
def handle(self, **options):
notification = self.load_notification(options.get('notification'))
if not self.check_valid_notification(notification):
sys.exit(1)
from .. import publisher
publisher.WcsPublisher.configure(self.config)
pub = publisher.WcsPublisher.create_publisher(register_tld_names=False)
for tenant in publisher.WcsPublisher.get_tenants():
if not os.path.exists(os.path.join(tenant.directory, 'config.pck')):
continue
pub.set_tenant(tenant)
self.process_notification(notification, pub)
for tenant in get_publisher_class().get_tenants():
pub = get_publisher_class().create_publisher(register_tld_names=False)
pub.set_tenant_by_hostname(tenant.hostname)
self.process_notification(notification, publisher=pub)
@classmethod
def load_notification(cls, args):
if args[0] == '-':
def load_notification(cls, notification):
if notification == '-':
# get environment definition from stdin
return json.load(sys.stdin)
else:
with open(args[0]) as fd:
with open(notification) as fd:
return json.load(fd)
@classmethod
@ -198,6 +192,3 @@ class CmdHoboNotify(Command):
user.set_deleted()
except Exception as e:
publisher.record_error(exception=e, context='[PROVISIONNING]', notify=True)
CmdHoboNotify.register()