misc: store Role also in SQL (#51772)

This commit is contained in:
Lauréline Guérin 2021-03-09 15:35:21 +01:00
parent f577aa1d47
commit a320d4f25b
No known key found for this signature in database
GPG Key ID: 1FAB9B9B4F93D473
53 changed files with 789 additions and 593 deletions

View File

@ -6,7 +6,6 @@ import pytest
from wcs.qommon.ident.password_accounts import PasswordAccount
from wcs.qommon.http_request import HTTPRequest
from wcs.roles import Role
from utilities import get_app, login, create_temporary_pub, clean_temporary_pub
@ -50,9 +49,9 @@ def create_superuser(pub):
return user1
def create_role():
Role.wipe()
role = Role(name='foobar')
def create_role(pub):
pub.role_class.wipe()
role = pub.role_class(name='foobar')
role.store()
return role
@ -96,7 +95,7 @@ def test_admin_redirect(pub):
def test_admin_for_all(pub):
user = create_superuser(pub)
role = create_role()
role = create_role(pub)
try:
open(os.path.join(pub.app_dir, 'ADMIN_FOR_ALL'), 'w').close()

View File

@ -45,7 +45,7 @@ def teardown_module(module):
def test_block_new(pub, blocks_feature):
create_superuser(pub)
create_role()
create_role(pub)
BlockDef.wipe()
app = login(get_app(pub))
resp = app.get('/backoffice/forms/')
@ -76,7 +76,6 @@ def test_block_new(pub, blocks_feature):
def test_block_options(pub, blocks_feature):
create_superuser(pub)
create_role()
BlockDef.wipe()
block = BlockDef()
block.name = 'foobar'
@ -134,7 +133,6 @@ def test_block_options_digest_template(pub, blocks_feature):
def test_block_export_import(pub, blocks_feature):
create_superuser(pub)
create_role()
BlockDef.wipe()
block = BlockDef()
block.name = 'foobar'
@ -182,7 +180,6 @@ def test_block_export_import(pub, blocks_feature):
def test_block_delete(pub, blocks_feature):
create_superuser(pub)
create_role()
BlockDef.wipe()
FormDef.wipe()
block = BlockDef()
@ -219,7 +216,6 @@ def test_block_delete(pub, blocks_feature):
def test_block_edit_duplicate_delete_field(pub, blocks_feature):
create_superuser(pub)
create_role()
BlockDef.wipe()
block = BlockDef()
block.name = 'foobar'
@ -248,7 +244,6 @@ def test_block_edit_duplicate_delete_field(pub, blocks_feature):
def test_block_use_in_formdef(pub, blocks_feature):
create_superuser(pub)
create_role()
FormDef.wipe()
BlockDef.wipe()
block = BlockDef()

View File

@ -9,7 +9,6 @@ from wcs.categories import CardDefCategory
from wcs.carddef import CardDef
from wcs.formdef import FormDef
from wcs.qommon.http_request import HTTPRequest
from wcs.roles import Role
from wcs.wf.form import FormWorkflowStatusItem, WorkflowFormFieldsFormDef
from wcs.workflows import Workflow
from wcs.workflows import WorkflowBackofficeFieldsFormDef
@ -357,8 +356,8 @@ def test_card_user_support(pub):
def test_card_custom_view_data_source(pub):
user = create_superuser(pub)
Role.wipe()
role = Role(name='foobar')
pub.role_class.wipe()
role = pub.role_class(name='foobar')
role.store()
user.roles = [role.id]
user.store()
@ -418,8 +417,8 @@ def test_card_custom_view_data_source(pub):
def test_carddef_usage(pub):
user = create_superuser(pub)
Role.wipe()
role = Role(name='foobar')
pub.role_class.wipe()
role = pub.role_class(name='foobar')
role.store()
user.roles = [role.id]
user.store()

View File

@ -23,7 +23,7 @@ from wcs.carddef import CardDef
from wcs import fields
from utilities import get_app, login, create_temporary_pub, clean_temporary_pub, HttpRequestsMocking
from .test_all import create_superuser, create_role
from .test_all import create_superuser
def pytest_generate_tests(metafunc):
@ -488,7 +488,6 @@ def test_data_sources_in_use_delete(pub):
def test_data_sources_export(pub):
create_superuser(pub)
create_role()
NamedDataSource.wipe()
data_source = NamedDataSource(name='foobar')
@ -508,7 +507,6 @@ def test_data_sources_export(pub):
def test_data_sources_import(pub):
create_superuser(pub)
create_role()
NamedDataSource.wipe()
data_source = NamedDataSource(name='foobar')

View File

@ -17,7 +17,6 @@ from wcs.qommon.errors import ConnectionError
from wcs.categories import Category
from wcs.data_sources import NamedDataSource
from wcs.wscalls import NamedWsCall
from wcs.roles import Role
from wcs.workflows import Workflow
from wcs.formdef import FormDef
from wcs.carddef import CardDef
@ -53,18 +52,20 @@ def teardown_module(module):
def test_forms(pub):
user = create_superuser(pub)
create_superuser(pub)
pub.role_class.wipe()
app = login(get_app(pub))
resp = app.get('/backoffice/forms/')
assert 'You first have to define roles.' in resp.text
assert not 'New Form' in resp.text
assert 'New Form' not in resp.text
def test_forms_new(pub):
create_superuser(pub)
app = login(get_app(pub))
create_role()
create_role(pub)
FormDef.wipe()
# create a new form
resp = app.get('/backoffice/forms/')
assert 'New Form' in resp.text
@ -80,14 +81,14 @@ def test_forms_new(pub):
assert formdef.name == 'form title'
assert formdef.url_name == 'form-title'
assert formdef.fields == []
assert formdef.disabled == True
assert formdef.disabled is True
def test_forms_new_popup(pub):
FormDef.wipe()
create_superuser(pub)
app = login(get_app(pub))
create_role()
create_role(pub)
# create a new form
resp = app.get('/backoffice/forms/')
@ -105,7 +106,7 @@ def test_forms_new_popup(pub):
assert formdef.name == 'form title'
assert formdef.url_name == 'form-title'
assert formdef.fields == []
assert formdef.disabled == True
assert formdef.disabled is True
def assert_option_display(resp, label, value):
@ -116,7 +117,7 @@ def assert_option_display(resp, label, value):
def test_forms_edit(pub):
create_superuser(pub)
create_role()
create_role(pub)
FormDef.wipe()
formdef = FormDef()
@ -138,7 +139,7 @@ def test_forms_edit(pub):
assert resp.location == 'http://example.net/backoffice/forms/1/'
resp = resp.follow()
assert_option_display(resp, 'Confirmation Page', 'Disabled')
assert FormDef.get(1).confirmation == False
assert FormDef.get(1).confirmation is False
# try cancel button
resp = resp.click('Confirmation Page')
@ -148,7 +149,7 @@ def test_forms_edit(pub):
assert resp.location == 'http://example.net/backoffice/forms/1/'
resp = resp.follow()
assert_option_display(resp, 'Confirmation Page', 'Disabled')
assert FormDef.get(1).confirmation == False
assert FormDef.get(1).confirmation is False
# Limit to one form
assert_option_display(resp, 'Limit to one form', 'Disabled')
@ -267,7 +268,7 @@ def test_forms_edit(pub):
def test_form_title_change(pub):
create_superuser(pub)
create_role()
create_role(pub)
FormDef.wipe()
formdef = FormDef()
@ -320,7 +321,7 @@ def test_form_title_change(pub):
def test_forms_edit_publication_date(pub):
create_superuser(pub)
create_role()
create_role(pub)
FormDef.wipe()
formdef = FormDef()
@ -355,7 +356,7 @@ def test_forms_edit_publication_date(pub):
def test_form_category(pub):
create_superuser(pub)
create_role()
create_role(pub)
FormDef.wipe()
formdef = FormDef()
@ -379,7 +380,7 @@ def test_form_category(pub):
def test_form_category_select(pub):
create_superuser(pub)
create_role()
create_role(pub)
FormDef.wipe()
formdef = FormDef()
@ -408,7 +409,7 @@ def test_form_category_select(pub):
def test_form_workflow(pub):
create_superuser(pub)
create_role()
create_role(pub)
FormDef.wipe()
formdef = FormDef()
@ -432,7 +433,7 @@ def test_form_workflow(pub):
def test_form_workflow_change(pub):
create_superuser(pub)
create_role()
create_role(pub)
FormDef.wipe()
formdef = FormDef()
@ -463,7 +464,7 @@ def test_form_workflow_change(pub):
def test_form_workflow_link(pub):
create_superuser(pub)
create_role()
create_role(pub)
FormDef.wipe()
formdef = FormDef()
@ -487,7 +488,7 @@ def test_form_workflow_link(pub):
assert '/backoffice/workflows/%s/' % workflow.id in resp.text
# check workflow link is not displayed if user has no access right
pub.cfg['admin-permissions'] = {'workflows': ['x']} # block access
pub.cfg['admin-permissions'] = {'workflows': ['x']} # block access
pub.write_cfg()
resp = app.get('/backoffice/forms/%s/' % formdef.id)
assert '/backoffice/workflows/%s/' % workflow.id not in resp.text
@ -495,7 +496,7 @@ def test_form_workflow_link(pub):
def test_form_workflow_remapping(pub):
create_superuser(pub)
create_role()
create_role(pub)
FormDef.wipe()
formdef = FormDef()
@ -561,7 +562,7 @@ def test_form_workflow_remapping(pub):
def test_form_submitter_roles(pub):
create_superuser(pub)
role = create_role()
create_role(pub)
FormDef.wipe()
formdef = FormDef()
@ -594,7 +595,7 @@ def test_form_submitter_roles(pub):
assert FormDef.get(formdef.id).required_authentication_contexts == ['fedict']
# check internal roles are not advertised
role2 = Role(name='internal')
role2 = pub.role_class(name='internal')
role2.internal = True
role2.store()
@ -607,7 +608,7 @@ def test_form_submitter_roles(pub):
def test_form_workflow_role(pub):
create_superuser(pub)
role = create_role()
role = create_role(pub)
FormDef.wipe()
formdef = FormDef()
@ -627,7 +628,7 @@ def test_form_workflow_role(pub):
assert FormDef.get(1).workflow_roles == {'_receiver': '1'}
# check it doesn't fail if a second role with the same name exists
role = Role(name='foobar')
role = pub.role_class(name='foobar')
role.store()
resp = app.get('/backoffice/forms/1/')
resp = resp.click(href='role/_receiver')
@ -635,7 +636,7 @@ def test_form_workflow_role(pub):
def test_form_workflow_options(pub):
create_superuser(pub)
create_role()
create_role(pub)
Workflow.wipe()
workflow = Workflow(name='Workflow One')
@ -656,7 +657,7 @@ def test_form_workflow_options(pub):
def test_form_workflow_variables(pub):
create_superuser(pub)
create_role()
create_role(pub)
FormDef.wipe()
formdef = FormDef()
@ -721,7 +722,7 @@ def test_form_workflow_variables(pub):
def test_form_workflow_table_variables(pub):
create_superuser(pub)
create_role()
create_role(pub)
Workflow.wipe()
workflow = Workflow(name='Workflow One')
@ -768,7 +769,7 @@ def test_form_workflow_table_variables(pub):
def test_form_roles(pub):
create_superuser(pub)
role = create_role()
role = create_role(pub)
FormDef.wipe()
formdef = FormDef()
@ -791,7 +792,7 @@ def test_form_roles(pub):
def test_form_always_advertise(pub):
create_superuser(pub)
role = create_role()
role = create_role(pub)
FormDef.wipe()
formdef = FormDef()
@ -866,7 +867,7 @@ def test_form_templates(pub):
def test_form_delete(pub):
create_superuser(pub)
create_role()
create_role(pub)
FormDef.wipe()
formdef = FormDef()
@ -886,7 +887,7 @@ def test_form_delete(pub):
def test_form_delete_with_data(pub):
create_superuser(pub)
create_role()
create_role(pub)
FormDef.wipe()
formdef = FormDef()
@ -920,7 +921,7 @@ def test_form_delete_with_data(pub):
def test_form_duplicate(pub):
create_superuser(pub)
create_role()
create_role(pub)
FormDef.wipe()
formdef = FormDef()
@ -947,7 +948,7 @@ def test_form_duplicate(pub):
def test_form_export(pub):
create_superuser(pub)
create_role()
create_role(pub)
FormDef.wipe()
formdef = FormDef()
@ -967,8 +968,8 @@ def test_form_export(pub):
def test_form_import(pub):
user = create_superuser(pub)
role = create_role()
create_superuser(pub)
create_role(pub)
FormDef.wipe()
formdef = FormDef()
@ -1040,7 +1041,7 @@ def test_form_import(pub):
def test_form_import_from_url(pub):
create_superuser(pub)
create_role()
create_role(pub)
FormDef.wipe()
formdef = FormDef()
@ -1071,7 +1072,7 @@ def test_form_import_from_url(pub):
def test_form_qrcode(pub):
create_superuser(pub)
create_role()
create_role(pub)
FormDef.wipe()
formdef = FormDef()
@ -1090,7 +1091,7 @@ def test_form_qrcode(pub):
def test_form_description(pub):
create_superuser(pub)
create_role()
create_role(pub)
FormDef.wipe()
formdef = FormDef()
@ -1112,7 +1113,7 @@ def test_form_description(pub):
def test_form_enable_from_fields_page(pub):
create_superuser(pub)
create_role()
create_role(pub)
FormDef.wipe()
formdef = FormDef()
@ -1132,7 +1133,7 @@ def test_form_enable_from_fields_page(pub):
def test_form_new_field(pub):
create_superuser(pub)
create_role()
create_role(pub)
FormDef.wipe()
formdef = FormDef()
@ -1184,7 +1185,7 @@ def test_form_field_without_label(pub):
def test_form_delete_field(pub):
create_superuser(pub)
create_role()
create_role(pub)
FormDef.wipe()
formdef = FormDef()
@ -1211,7 +1212,7 @@ def test_form_delete_field(pub):
def test_form_delete_field_existing_data(pub):
create_superuser(pub)
create_role()
create_role(pub)
FormDef.wipe()
formdef = FormDef()
@ -1252,7 +1253,7 @@ def test_form_delete_field_existing_data(pub):
def test_form_delete_page_field(pub):
create_superuser(pub)
create_role()
create_role(pub)
FormDef.wipe()
formdef = FormDef()
@ -1299,8 +1300,8 @@ def test_form_delete_page_field(pub):
def test_form_duplicate_field(pub):
user = create_superuser(pub)
create_role()
create_superuser(pub)
create_role(pub)
FormDef.wipe()
formdef = FormDef()
@ -1323,7 +1324,7 @@ def test_form_duplicate_field(pub):
def test_form_duplicate_file_field(pub):
create_superuser(pub)
create_role()
create_role(pub)
FormDef.wipe()
formdef = FormDef()
@ -1350,7 +1351,7 @@ def test_form_duplicate_file_field(pub):
def test_form_edit_field(pub):
create_superuser(pub)
create_role()
create_role(pub)
FormDef.wipe()
formdef = FormDef()
@ -1371,12 +1372,12 @@ def test_form_edit_field(pub):
assert resp.location == 'http://example.net/backoffice/forms/1/fields/#itemId_1'
assert FormDef.get(1).fields[0].label == 'changed field'
assert FormDef.get(1).fields[0].required == False
assert FormDef.get(1).fields[0].required is False
def test_form_edit_field_advanced(pub):
create_superuser(pub)
create_role()
create_role(pub)
FormDef.wipe()
formdef = FormDef()
@ -1449,7 +1450,7 @@ def test_form_edit_field_advanced(pub):
def test_form_prefill_field(pub):
create_superuser(pub)
create_role()
create_role(pub)
FormDef.wipe()
formdef = FormDef()
@ -1497,7 +1498,7 @@ def test_form_prefill_field(pub):
def test_form_edit_string_field_validation(pub):
create_superuser(pub)
create_role()
create_role(pub)
FormDef.wipe()
formdef = FormDef()
@ -1546,7 +1547,7 @@ def test_form_edit_string_field_validation(pub):
def test_form_edit_item_field(pub):
create_superuser(pub)
create_role()
create_role(pub)
FormDef.wipe()
formdef = FormDef()
@ -1573,7 +1574,7 @@ def test_form_edit_item_field(pub):
resp = resp.follow()
assert FormDef.get(1).fields[0].label == 'changed field'
assert FormDef.get(1).fields[0].required == False
assert FormDef.get(1).fields[0].required is False
assert FormDef.get(1).fields[0].items is None
# edit and fill with one item
@ -1587,7 +1588,7 @@ def test_form_edit_item_field(pub):
def test_form_edit_item_field_data_source(pub):
create_superuser(pub)
create_role()
create_role(pub)
FormDef.wipe()
formdef = FormDef()
@ -1676,7 +1677,7 @@ def test_form_edit_item_field_data_source(pub):
def test_form_edit_item_field_geojson_data_source(pub, http_requests):
NamedDataSource.wipe()
create_superuser(pub)
create_role()
create_role(pub)
NamedDataSource.wipe()
data_source = NamedDataSource(name='foobar')
@ -1715,7 +1716,7 @@ def test_form_edit_item_field_geojson_data_source(pub, http_requests):
def test_form_edit_items_field(pub):
create_superuser(pub)
create_role()
create_role(pub)
FormDef.wipe()
formdef = FormDef()
@ -1787,7 +1788,7 @@ def test_form_edit_items_field(pub):
def test_form_edit_page_field(pub):
create_superuser(pub)
create_role()
create_role(pub)
FormDef.wipe()
formdef = FormDef()
@ -1842,7 +1843,7 @@ def test_form_edit_page_field(pub):
def test_form_edit_comment_field(pub):
create_superuser(pub)
create_role()
create_role(pub)
FormDef.wipe()
formdef = FormDef()
@ -1932,7 +1933,7 @@ def test_form_comment_field_textwidget_validation(pub):
def test_form_comment_field_wysiwygtextwidget_validation(pub):
create_superuser(pub)
create_role()
create_role(pub)
FormDef.wipe()
formdef = FormDef()
@ -1971,7 +1972,7 @@ def test_form_comment_field_wysiwygtextwidget_validation(pub):
def test_form_edit_map_field(pub):
create_superuser(pub)
create_role()
create_role(pub)
FormDef.wipe()
formdef = FormDef()
@ -2039,7 +2040,7 @@ def test_form_edit_map_field(pub):
def test_form_edit_field_warnings(pub):
create_superuser(pub)
create_role()
create_role(pub)
FormDef.wipe()
formdef = FormDef()
@ -2076,7 +2077,7 @@ def test_form_edit_field_warnings(pub):
def test_form_limit_display_to_page(pub):
create_superuser(pub)
create_role()
create_role(pub)
FormDef.wipe()
formdef = FormDef()
@ -2102,7 +2103,7 @@ def test_form_limit_display_to_page(pub):
def test_form_fields_reorder(pub):
create_superuser(pub)
create_role()
create_role(pub)
FormDef.wipe()
formdef = FormDef()
@ -2165,7 +2166,7 @@ def test_form_fields_reorder(pub):
def test_form_move_page_fields(pub):
create_superuser(pub)
create_role()
create_role(pub)
FormDef.wipe()
formdef = FormDef()
@ -2210,7 +2211,7 @@ def test_form_move_page_fields(pub):
def test_form_legacy_int_id(pub):
create_superuser(pub)
create_role()
create_role(pub)
Category.wipe()
cat = Category(name='Foo')
@ -2227,7 +2228,7 @@ def test_form_legacy_int_id(pub):
formdef.name = 'form title'
formdef.fields = []
role = Role(name='ZAB') # Z to get sorted last
role = pub.role_class(name='ZAB') # Z to get sorted last
role.store()
# set attributes using integers
@ -2262,7 +2263,7 @@ def test_form_legacy_int_id(pub):
def test_form_anonymise(pub):
create_superuser(pub)
create_role()
create_role(pub)
FormDef.wipe()
formdef = FormDef()
@ -2329,7 +2330,7 @@ def test_form_anonymise(pub):
def test_form_public_url(pub):
create_superuser(pub)
create_role()
create_role(pub)
FormDef.wipe()
formdef = FormDef()
@ -2345,7 +2346,7 @@ def test_form_public_url(pub):
def test_form_archive(pub):
create_superuser(pub)
create_role()
create_role(pub)
if pub.is_using_postgresql():
# this doesn't exist in SQL
@ -2395,8 +2396,8 @@ def test_form_archive(pub):
def test_form_overwrite(pub):
user = create_superuser(pub)
role = create_role()
create_superuser(pub)
create_role(pub)
FormDef.wipe()
formdef = FormDef()
@ -2511,7 +2512,7 @@ def test_form_overwrite(pub):
def test_form_export_import_export_overwrite(pub):
create_superuser(pub)
create_role()
create_role(pub)
FormDef.wipe()
formdef = FormDef()

View File

@ -3,7 +3,6 @@
import pytest
from wcs.qommon.http_request import HTTPRequest
from wcs.roles import Role
from wcs.formdef import FormDef
from utilities import get_app, login, create_temporary_pub, clean_temporary_pub
@ -42,7 +41,7 @@ def test_roles(pub):
def test_roles_new(pub):
create_superuser(pub)
Role.wipe()
pub.role_class.wipe()
app = login(get_app(pub))
resp = app.get('/backoffice/roles/')
resp = resp.click('New Role')
@ -55,14 +54,14 @@ def test_roles_new(pub):
resp = resp.click('a new role')
assert '<h2>a new role' in resp.text
assert Role.get(1).name == 'a new role'
assert Role.get(1).details == 'bla bla bla'
assert pub.role_class.get(1).name == 'a new role'
assert pub.role_class.get(1).details == 'bla bla bla'
def test_roles_edit(pub):
create_superuser(pub)
Role.wipe()
role = Role(name='foobar')
pub.role_class.wipe()
role = pub.role_class(name='foobar')
role.store()
app = login(get_app(pub))
@ -80,14 +79,14 @@ def test_roles_edit(pub):
assert '<h2>baz' in resp.text
assert 'Holders of this role will receive all emails adressed to the role.' in resp.text
assert Role.get(1).details == 'bla bla bla'
assert Role.get(1).emails_to_members is True
assert pub.role_class.get(1).details == 'bla bla bla'
assert pub.role_class.get(1).emails_to_members is True
def test_roles_matching_formdefs(pub):
create_superuser(pub)
Role.wipe()
role = Role(name='foo')
pub.role_class.wipe()
role = pub.role_class(name='foo')
role.store()
FormDef.wipe()
@ -119,8 +118,8 @@ def test_roles_matching_formdefs(pub):
def test_roles_delete(pub):
create_superuser(pub)
Role.wipe()
role = Role(name='foobar')
pub.role_class.wipe()
role = pub.role_class(name='foobar')
role.store()
app = login(get_app(pub))
@ -130,4 +129,4 @@ def test_roles_delete(pub):
resp = resp.forms[0].submit()
assert resp.location == 'http://example.net/backoffice/roles/'
resp = resp.follow()
assert Role.count() == 0
assert pub.role_class.count() == 0

View File

@ -24,7 +24,6 @@ from wcs.api_access import ApiAccess
from wcs.categories import Category, CardDefCategory
from wcs.data_sources import NamedDataSource
from wcs.wscalls import NamedWsCall
from wcs.roles import Role
from wcs.workflows import Workflow, CommentableWorkflowStatusItem
from wcs.wf.export_to_model import ExportToModel
from wcs.formdef import FormDef
@ -32,7 +31,7 @@ from wcs.carddef import CardDef
from wcs import fields
from utilities import get_app, login, create_temporary_pub, clean_temporary_pub
from .test_all import create_superuser, create_role
from .test_all import create_superuser
def pytest_generate_tests(metafunc):
@ -97,7 +96,7 @@ def test_settings_export_import(pub):
FormDef.wipe()
CardDef.wipe()
Workflow.wipe()
Role.wipe()
pub.role_class.wipe()
Category.wipe()
CardDefCategory.wipe()
NamedDataSource.wipe()
@ -133,7 +132,7 @@ def test_settings_export_import(pub):
carddef.store()
Category(name='baz').store()
CardDefCategory(name='foobar').store()
Role(name='qux').store()
pub.role_class(name='qux').store()
NamedDataSource(name='quux').store()
ds = NamedDataSource(name='agenda')
ds.external = 'agenda'
@ -176,7 +175,8 @@ def test_settings_export_import(pub):
assert 'workflows/1' not in filelist
assert 'workflows_xml/1' in filelist
assert 'models/export_to_model-1.upload' not in filelist
assert 'roles/1' in filelist
assert 'roles/1' not in filelist
assert 'roles_xml/1' in filelist
assert 'categories/1' in filelist
assert 'carddef_categories/1' in filelist
assert 'datasources/1' in filelist
@ -208,10 +208,11 @@ def test_settings_export_import(pub):
assert CardDef.count() == 1
assert CardDef.select()[0].url_name == 'bar'
assert ApiAccess.count() == 1
assert pub.role_class.count() == 1
# check roles are found by name
wipe()
role = Role(name='qux')
role = pub.role_class(name='qux')
role.store()
workflow = Workflow(name='Workflow One')
@ -250,14 +251,14 @@ def test_settings_export_import(pub):
filelist = zipf.namelist()
assert 'formdefs_xml/%s' % formdef.id in filelist
assert 'workflows_xml/%s' % workflow.id in filelist
assert 'roles/%s' % role.id not in filelist
assert 'roles_xml/%s' % role.id not in filelist
FormDef.wipe()
Workflow.wipe()
Role.wipe()
pub.role_class.wipe()
# create role beforehand, it should be matched by name
role = Role(name='qux')
role = pub.role_class(name='qux')
role.id = '012345'
role.store()
@ -282,13 +283,13 @@ def test_settings_export_import(pub):
zip_content = io.BytesIO(resp.body)
zipf = zipfile.ZipFile(zip_content, 'a')
filelist = zipf.namelist()
assert len([x for x in filelist if 'roles/' in x]) == 0
assert len([x for x in filelist if 'roles_xml/' in x]) == 0
# check an error is displayed if such an import is then used and roles are
# missing.
FormDef.wipe()
Workflow.wipe()
Role.wipe()
pub.role_class.wipe()
resp = app.get('/backoffice/settings/import')
resp.form['file'] = Upload('export.wcs', zip_content.getvalue())
resp = resp.form.submit('submit')
@ -579,7 +580,7 @@ def test_settings_idp(pub):
def test_settings_auth_password(pub):
Role.wipe()
pub.role_class.wipe()
pub.user_class.wipe() # makes sure there are no users
pub.cfg['identification'] = {'methods': ['password']}
@ -724,12 +725,12 @@ def test_settings_geolocation(pub):
def test_settings_permissions(pub):
create_superuser(pub)
role1 = create_role()
role1.name = 'foobar1'
pub.role_class.wipe()
role1 = pub.role_class(name='foobar1')
role1.store()
role2 = Role(name='foobar2')
role2 = pub.role_class(name='foobar2')
role2.store()
role3 = Role(name='foobar3')
role3 = pub.role_class(name='foobar3')
role3.store()
app = login(get_app(pub))
@ -749,8 +750,8 @@ def test_settings_permissions(pub):
resp.forms[0]['permissions$c-0-0'].checked = False
resp.forms[0]['permissions$c-1-0'].checked = True
resp = resp.forms[0].submit()
assert Role.get(role1.id).allows_backoffice_access is False
assert Role.get(role2.id).allows_backoffice_access is True
assert pub.role_class.get(role1.id).allows_backoffice_access is False
assert pub.role_class.get(role2.id).allows_backoffice_access is True
# give some roles access to the forms workshop (2nd checkbox) and to the
# workflows workshop (4th)

View File

@ -159,7 +159,7 @@ def test_users_edit_edit_account(pub):
def test_users_edit_with_managing_idp(pub):
create_role()
create_role(pub)
pub.user_class.wipe()
pub.cfg['sp'] = {'idp-manage-user-attributes': True}
pub.write_cfg()
@ -261,7 +261,7 @@ def test_users_filter(pub):
pub.user_class.wipe()
PasswordAccount.wipe()
create_superuser(pub)
role = create_role()
role = create_role(pub)
for i in range(50):
user = pub.user_class(name='foo bar %s' % (i + 1))
user.store()
@ -346,7 +346,7 @@ def test_users_display_roles(pub):
pub.user_class.wipe()
user = create_superuser(pub)
role = create_role()
role = create_role(pub)
user.roles = [role.id, 'XXX']
user.store()

View File

@ -15,7 +15,6 @@ from webtest import Upload
from wcs.qommon.http_request import HTTPRequest
from wcs.qommon.errors import ConnectionError
from wcs.roles import Role
from wcs.workflows import (
Workflow,
WorkflowCriticalityLevel,
@ -25,6 +24,7 @@ from wcs.workflows import (
ChoiceWorkflowStatusItem,
JumpOnSubmitWorkflowStatusItem,
WorkflowVariablesFieldsFormDef,
item_classes,
)
from wcs.wf.create_carddata import CreateCarddataWorkflowStatusItem
from wcs.wf.create_formdata import CreateFormdataWorkflowStatusItem, Mapping
@ -42,7 +42,7 @@ from wcs.formdef import FormDef
from wcs import fields
from utilities import get_app, login, create_temporary_pub, clean_temporary_pub
from .test_all import create_superuser, create_role
from .test_all import create_superuser
def pytest_generate_tests(metafunc):
@ -95,7 +95,6 @@ def test_workflows_default(pub):
def test_workflows_new(pub):
create_superuser(pub)
create_role()
Workflow.wipe()
app = login(get_app(pub))
resp = app.get('/backoffice/workflows/')
@ -141,7 +140,9 @@ def test_workflows_new(pub):
def test_workflows_svg(pub):
create_superuser(pub)
role = create_role()
pub.role_class.wipe()
role = pub.role_class(name='foobar')
role.store()
Workflow.wipe()
workflow = Workflow(name='foo')
@ -367,8 +368,8 @@ def test_workflows_import_from_url(pub):
def test_workflows_export_import_create_role(pub):
create_superuser(pub)
Role.wipe()
role = Role()
pub.role_class.wipe()
role = pub.role_class()
role.name = 'PLOP'
role.store()
@ -409,12 +410,12 @@ def test_workflows_export_import_create_role(pub):
resp = resp.follow()
assert 'This workflow has been successfully imported' in resp.text
assert Workflow.get(3).name == 'Copy of foo (2)'
assert Role.count() == 1
assert Role.select()[0].name == 'PLOP'
assert Workflow.get(3).possible_status[0].items[0].by == [Role.select()[0].id]
assert pub.role_class.count() == 1
assert pub.role_class.select()[0].name == 'PLOP'
assert Workflow.get(3).possible_status[0].items[0].by == [pub.role_class.select()[0].id]
# don't create role if they are managed by the identity provider
Role.wipe()
pub.role_class.wipe()
pub.cfg['sp'] = {'idp-manage-roles': True}
pub.write_cfg()
@ -523,7 +524,9 @@ def test_workflows_check_available_actions(pub):
def test_workflows_edit_dispatch_action(pub):
create_superuser(pub)
role = create_role()
pub.role_class.wipe()
role = pub.role_class(name='foobar')
role.store()
Workflow.wipe()
workflow = Workflow(name='foo')
workflow.add_status(name='baz')
@ -560,7 +563,6 @@ def test_workflows_edit_dispatch_action(pub):
def test_workflows_edit_dispatch_action_repeated_function(pub):
create_superuser(pub)
create_role()
Workflow.wipe()
workflow = Workflow(name='foo')
workflow.add_status(name='baz')
@ -591,7 +593,6 @@ def test_workflows_edit_dispatch_action_repeated_function(pub):
def test_workflows_edit_email_action(pub):
create_superuser(pub)
create_role()
Workflow.wipe()
workflow = Workflow(name='foo')
st1 = workflow.add_status(name='baz')
@ -740,7 +741,6 @@ def test_workflows_edit_email_action(pub):
def test_workflows_edit_jump_previous(pub):
create_superuser(pub)
create_role()
Workflow.wipe()
workflow = Workflow(name='foo')
st1 = workflow.add_status(name='baz')
@ -792,7 +792,6 @@ def test_workflows_edit_jump_previous(pub):
def test_workflows_edit_jump_timeout(pub):
create_superuser(pub)
create_role()
Workflow.wipe()
workflow = Workflow(name='foo')
st1 = workflow.add_status(name='baz')
@ -832,7 +831,6 @@ def test_workflows_edit_jump_timeout(pub):
def test_workflows_edit_sms_action(pub):
create_superuser(pub)
create_role()
Workflow.wipe()
workflow = Workflow(name='foo')
workflow.add_status(name='baz')
@ -859,7 +857,6 @@ def test_workflows_edit_sms_action(pub):
def test_workflows_edit_attachment_action(pub):
create_superuser(pub)
create_role()
Workflow.wipe()
workflow = Workflow(name='foo')
workflow.add_status(name='baz')
@ -924,7 +921,6 @@ def test_workflows_edit_attachment_action(pub):
def test_workflows_edit_display_form_action(pub):
create_superuser(pub)
create_role()
Workflow.wipe()
workflow = Workflow(name='foo')
workflow.add_status(name='baz')
@ -969,7 +965,6 @@ def test_workflows_edit_display_form_action(pub):
def test_workflows_edit_choice_action(pub):
create_superuser(pub)
create_role()
Workflow.wipe()
workflow = Workflow(name='foo')
workflow.add_status(name='baz')
@ -1002,7 +997,6 @@ def test_workflows_edit_choice_action(pub):
def test_workflows_edit_choice_action_functions_only(pub):
create_superuser(pub)
create_role()
Workflow.wipe()
workflow = Workflow(name='foo')
workflow.add_status(name='baz')
@ -1033,7 +1027,6 @@ def test_workflows_edit_choice_action_functions_only(pub):
def test_workflows_edit_choice_action_line_details(pub):
create_superuser(pub)
create_role()
Workflow.wipe()
wf = Workflow(name='foo')
@ -1136,7 +1129,6 @@ def test_workflows_edit_export_to_model_action(pub):
def test_workflows_action_subpath(pub):
create_superuser(pub)
create_role()
Workflow.wipe()
workflow = Workflow(name='foo')
baz_status = workflow.add_status(name='baz')
@ -1153,7 +1145,6 @@ def test_workflows_action_subpath(pub):
def test_workflows_display_action_ezt_validation(pub):
create_superuser(pub)
create_role()
Workflow.wipe()
workflow = Workflow(name='foo')
baz_status = workflow.add_status(name='baz')
@ -1191,7 +1182,6 @@ def test_workflows_display_action_ezt_validation(pub):
def test_workflows_delete_action(pub):
create_superuser(pub)
create_role()
Workflow.wipe()
workflow = Workflow(name='foo')
workflow.add_status(name='baz')
@ -1219,7 +1209,6 @@ def test_workflows_delete_action(pub):
def test_workflows_variables(pub):
create_superuser(pub)
create_role()
Workflow.wipe()
workflow = Workflow(name='foo')
@ -1278,7 +1267,14 @@ def test_workflows_variables_edit(pub):
def test_workflows_variables_edit_with_all_action_types(pub):
test_workflows_add_all_actions(pub)
create_superuser(pub)
Workflow.wipe()
workflow = Workflow(name='foo')
status = workflow.add_status(name='baz')
for item_class in item_classes:
status.append_item(item_class.key)
workflow.store()
app = login(get_app(pub))
resp = app.get('/backoffice/workflows/1/')
@ -1320,7 +1316,6 @@ def test_workflows_variables_with_export_to_model_action(pub):
def test_workflows_backoffice_fields(pub):
create_superuser(pub)
create_role()
Workflow.wipe()
workflow = Workflow(name='foo')
@ -1428,7 +1423,6 @@ def test_workflows_backoffice_fields(pub):
def test_workflows_functions(pub):
create_superuser(pub)
create_role()
Workflow.wipe()
workflow = Workflow(name='foo')
@ -1484,7 +1478,6 @@ def test_workflows_functions(pub):
def test_workflows_functions_vs_visibility(pub):
create_superuser(pub)
create_role()
Workflow.wipe()
workflow = Workflow(name='foo')
@ -1525,7 +1518,6 @@ def test_workflows_functions_vs_visibility(pub):
def test_workflows_global_actions(pub):
create_superuser(pub)
create_role()
Workflow.wipe()
workflow = Workflow(name='foo')
@ -1573,7 +1565,6 @@ def test_workflows_global_actions(pub):
def test_workflows_global_actions_edit(pub):
create_superuser(pub)
create_role()
Workflow.wipe()
workflow = Workflow(name='foo')
@ -1635,7 +1626,6 @@ def test_workflows_global_actions_edit(pub):
def test_workflows_global_actions_timeout_triggers(pub):
create_superuser(pub)
create_role()
Workflow.wipe()
workflow = Workflow(name='foo')
@ -1687,7 +1677,6 @@ def test_workflows_global_actions_timeout_triggers(pub):
def test_workflows_global_actions_webservice_trigger(pub):
create_superuser(pub)
create_role()
Workflow.wipe()
workflow = Workflow(name='foo')
@ -1825,7 +1814,6 @@ def test_workflows_external_workflow_action_config(pub):
def test_workflows_create_formdata(pub):
create_superuser(pub)
create_role()
FormDef.wipe()
target_formdef = FormDef()
@ -2086,7 +2074,6 @@ def test_workflows_edit_carddata_action(pub):
def test_workflows_criticality_levels(pub):
create_superuser(pub)
create_role()
Workflow.wipe()
workflow = Workflow(name='foo')
@ -2151,7 +2138,6 @@ def test_workflows_criticality_levels(pub):
def test_workflows_wscall_label(pub):
create_superuser(pub)
create_role()
Workflow.wipe()
workflow = Workflow(name='foo')
@ -2175,7 +2161,6 @@ def test_workflows_wscall_label(pub):
@pytest.mark.parametrize('value', [True, False])
def test_workflows_wscall_options(pub, value):
create_superuser(pub)
create_role()
Workflow.wipe()
workflow = Workflow(name='foo')
@ -2280,7 +2265,9 @@ def test_workflows_form_action_config(pub):
def test_workflows_inspect_view(pub):
create_superuser(pub)
role = create_role()
pub.role_class.wipe()
role = pub.role_class(name='foobar')
role.store()
Workflow.wipe()
workflow = Workflow(name='foo')

View File

@ -17,7 +17,6 @@ from wcs.api_utils import get_secret_and_orig, is_url_signed, sign_url
from wcs.qommon.errors import AccessForbiddenError
from wcs.qommon.http_request import HTTPRequest
from wcs.qommon.ident.password_accounts import PasswordAccount
from wcs.roles import Role
def pytest_generate_tests(metafunc):
@ -269,8 +268,8 @@ def test_sign_url(pub, local_user):
def test_get_user(pub, local_user):
Role.wipe()
role = Role(name='Foo bar')
pub.role_class.wipe()
role = pub.role_class(name='Foo bar')
role.store()
local_user.roles = [role.id]
local_user.store()
@ -294,8 +293,8 @@ def test_api_access_from_xml_storable_object(pub, local_user, admin_user):
resp.form['access_key'] = '5678'
resp = resp.form.submit('submit')
Role.wipe()
role = Role(name='Foo bar')
pub.role_class.wipe()
role = pub.role_class(name='Foo bar')
role.store()
local_user.roles = [role.id]
local_user.store()

View File

@ -20,7 +20,6 @@ from wcs.categories import CardDefCategory
from wcs.data_sources import NamedDataSource
from wcs.qommon.form import PicklableUpload
from wcs.qommon.http_request import HTTPRequest
from wcs.roles import Role
from wcs.workflows import Workflow, WorkflowBackofficeFieldsFormDef
from .utils import sign_uri
@ -67,8 +66,8 @@ def local_user():
def test_cards(pub, local_user):
Role.wipe()
role = Role(name='test')
pub.role_class.wipe()
role = pub.role_class(name='test')
role.store()
local_user.roles = [role.id]
local_user.store()
@ -164,8 +163,8 @@ def test_cards(pub, local_user):
def test_cards_import_csv(pub, local_user):
Role.wipe()
role = Role(name='test')
pub.role_class.wipe()
role = pub.role_class(name='test')
role.store()
local_user.roles = [role.id]
local_user.store()
@ -204,8 +203,8 @@ def test_post_invalid_json(pub, local_user):
def test_card_submit(pub, local_user):
Role.wipe()
role = Role(name='test')
pub.role_class.wipe()
role = pub.role_class(name='test')
role.store()
local_user.roles = [role.id]
local_user.store()
@ -277,8 +276,8 @@ def test_carddef_submit_with_varname(pub, local_user):
data_source.data_source = {'type': 'formula', 'value': 'http://example.com/jsonp'}
data_source.store()
Role.wipe()
role = Role(name='test')
pub.role_class.wipe()
role = pub.role_class(name='test')
role.store()
local_user.roles = [role.id]
local_user.store()
@ -369,8 +368,8 @@ def test_carddef_submit_from_wscall(pub, local_user):
data_source.data_source = {'type': 'formula', 'value': 'http://example.com/jsonp'}
data_source.store()
Role.wipe()
role = Role(name='test')
pub.role_class.wipe()
role = pub.role_class(name='test')
role.store()
local_user.roles = [role.id]
local_user.store()
@ -482,8 +481,8 @@ def test_carddef_submit_from_wscall(pub, local_user):
def test_formdef_submit_structured(pub, local_user):
Role.wipe()
role = Role(name='test')
pub.role_class.wipe()
role = pub.role_class(name='test')
role.store()
local_user.roles = [role.id]
local_user.store()

View File

@ -9,7 +9,6 @@ from utilities import clean_temporary_pub, create_temporary_pub, get_app
from wcs.categories import Category
from wcs.formdef import FormDef
from wcs.qommon.http_request import HTTPRequest
from wcs.roles import Role
from .utils import sign_uri
@ -205,8 +204,8 @@ def test_categories_formdefs(pub, local_user):
assert resp.json['err'] == 0
assert len(resp.json['data']) == 0
Role.wipe()
role = Role(name='test')
pub.role_class.wipe()
role = pub.role_class(name='test')
role.store()
local_user.roles = []
local_user.store()

View File

@ -14,7 +14,6 @@ from wcs.carddef import CardDef
from wcs.formdef import FormDef
from wcs.qommon import ods
from wcs.qommon.http_request import HTTPRequest
from wcs.roles import Role
from .utils import sign_uri
@ -61,8 +60,8 @@ def local_user():
@pytest.fixture
def test_api_custom_view_access(pub, local_user):
Role.wipe()
role = Role(name='test')
pub.role_class.wipe()
role = pub.role_class(name='test')
role.store()
local_user.roles = [role.id]
local_user.store()
@ -163,8 +162,8 @@ def test_api_custom_view_access(pub, local_user):
def test_api_list_formdata_custom_view(pub, local_user):
Role.wipe()
role = Role(name='test')
pub.role_class.wipe()
role = pub.role_class(name='test')
role.store()
FormDef.wipe()
@ -212,8 +211,8 @@ def test_api_list_formdata_custom_view(pub, local_user):
def test_api_ods_formdata_custom_view(pub, local_user):
Role.wipe()
role = Role(name='test')
pub.role_class.wipe()
role = pub.role_class(name='test')
role.store()
FormDef.wipe()
@ -265,8 +264,8 @@ def test_api_ods_formdata_custom_view(pub, local_user):
def test_api_geojson_formdata_custom_view(pub, local_user):
Role.wipe()
role = Role(name='test')
pub.role_class.wipe()
role = pub.role_class(name='test')
role.store()
FormDef.wipe()

View File

@ -23,7 +23,6 @@ from wcs.qommon import ods
from wcs.qommon.form import PicklableUpload
from wcs.qommon.http_request import HTTPRequest
from wcs.qommon.ident.password_accounts import PasswordAccount
from wcs.roles import Role
from wcs.workflows import EditableWorkflowStatusItem, Workflow, WorkflowBackofficeFieldsFormDef
from .utils import sign_uri
@ -89,8 +88,8 @@ def admin_user():
@pytest.fixture
def ics_data(local_user):
Role.wipe()
role = Role(name='test')
get_publisher().role_class.wipe()
role = get_publisher().role_class(name='test')
role.store()
FormDef.wipe()
@ -150,11 +149,11 @@ def test_formdata(pub, local_user):
]
block.store()
Role.wipe()
role = Role(name='test')
pub.role_class.wipe()
role = pub.role_class(name='test')
role.id = '123'
role.store()
another_role = Role(name='another')
another_role = pub.role_class(name='another')
another_role.id = '321'
another_role.store()
FormDef.wipe()
@ -280,11 +279,11 @@ def test_formdata_backoffice_fields(pub, local_user):
def test_formdata_duplicated_varnames(pub, local_user):
Role.wipe()
role = Role(name='test')
pub.role_class.wipe()
role = pub.role_class(name='test')
role.id = '123'
role.store()
another_role = Role(name='another')
another_role = pub.role_class(name='another')
another_role.id = '321'
another_role.store()
FormDef.wipe()
@ -338,11 +337,11 @@ def test_formdata_duplicated_varnames(pub, local_user):
def test_formdata_edit(pub, local_user):
Role.wipe()
role = Role(name='test')
pub.role_class.wipe()
role = pub.role_class(name='test')
role.id = '123'
role.store()
another_role = Role(name='another')
another_role = pub.role_class(name='another')
another_role.id = '321'
another_role.store()
local_user.roles = [role.id]
@ -422,8 +421,8 @@ def test_formdata_edit(pub, local_user):
def test_formdata_with_workflow_data(pub, local_user):
Role.wipe()
role = Role(name='test')
pub.role_class.wipe()
role = pub.role_class(name='test')
role.id = '123'
role.store()
@ -465,8 +464,8 @@ def test_formdata_with_workflow_data(pub, local_user):
def test_api_list_formdata(pub, local_user):
Role.wipe()
role = Role(name='test')
pub.role_class.wipe()
role = pub.role_class(name='test')
role.store()
FormDef.wipe()
@ -629,8 +628,8 @@ def test_api_list_formdata(pub, local_user):
def test_api_anonymized_formdata(pub, local_user, admin_user):
Role.wipe()
role = Role(name='test')
pub.role_class.wipe()
role = pub.role_class(name='test')
role.store()
FormDef.wipe()
@ -734,8 +733,8 @@ def test_api_anonymized_formdata(pub, local_user, admin_user):
def test_api_geojson_formdata(pub, local_user):
Role.wipe()
role = Role(name='test')
pub.role_class.wipe()
role = pub.role_class(name='test')
role.store()
FormDef.wipe()
@ -848,8 +847,8 @@ def test_api_geojson_formdata(pub, local_user):
def test_api_ods_formdata(pub, local_user):
Role.wipe()
role = Role(name='test')
pub.role_class.wipe()
role = pub.role_class(name='test')
role.store()
FormDef.wipe()
@ -906,8 +905,8 @@ def test_api_ods_formdata(pub, local_user):
def test_api_global_geojson(pub, local_user):
Role.wipe()
role = Role(name='test')
pub.role_class.wipe()
role = pub.role_class(name='test')
role.store()
FormDef.wipe()
@ -965,8 +964,8 @@ def test_api_global_listing(pub, local_user):
pytest.skip('this requires SQL')
return
Role.wipe()
role = Role(name='test')
pub.role_class.wipe()
role = pub.role_class(name='test')
role.store()
# check there's no crash if there are no formdefs
@ -1039,7 +1038,7 @@ def test_api_global_listing(pub, local_user):
def test_api_global_listing_ignored_roles(pub, local_user):
test_api_global_listing(pub, local_user)
role = Role(name='test2')
role = pub.role_class(name='test2')
role.store()
formdef = FormDef()
@ -1082,8 +1081,8 @@ def test_api_include_anonymised(pub, local_user):
pytest.skip('this requires SQL')
return
Role.wipe()
role = Role(name='test')
pub.role_class.wipe()
role = pub.role_class(name='test')
role.store()
# add proper role to user
@ -1124,7 +1123,7 @@ def test_api_include_anonymised(pub, local_user):
def test_api_ics_formdata(pub, local_user, ics_data):
role = Role.select()[0]
role = pub.role_class.select()[0]
# check access is denied if the user has not the appropriate role
resp = get_app(pub).get(sign_uri('/api/forms/test/ics/foobar', user=local_user), status=403)
@ -1189,7 +1188,7 @@ def test_api_ics_formdata(pub, local_user, ics_data):
def test_api_ics_formdata_http_auth(pub, local_user, admin_user, ics_data):
role = Role.select()[0]
role = pub.role_class.select()[0]
# check as admin
app = login(get_app(pub))
@ -1232,7 +1231,7 @@ def test_api_ics_formdata_http_auth(pub, local_user, admin_user, ics_data):
def test_api_ics_formdata_custom_view(pub, local_user, ics_data):
role = Role.select()[0]
role = pub.role_class.select()[0]
formdef = FormDef.get_by_urlname('test')

View File

@ -20,7 +20,6 @@ from wcs.data_sources import NamedDataSource
from wcs.formdef import FormDef
from wcs.qommon.form import PicklableUpload
from wcs.qommon.http_request import HTTPRequest
from wcs.roles import Role
from wcs.wf.jump import JumpWorkflowStatusItem
from wcs.workflows import Workflow, WorkflowBackofficeFieldsFormDef
@ -68,8 +67,8 @@ def local_user():
def test_formdef_list(pub):
Role.wipe()
role = Role(name='Foo bar')
pub.role_class.wipe()
role = pub.role_class(name='Foo bar')
role.id = '14'
role.store()
@ -149,8 +148,8 @@ def test_formdef_list(pub):
def test_limited_formdef_list(pub, local_user):
Role.wipe()
role = Role(name='Foo bar')
pub.role_class.wipe()
role = pub.role_class(name='Foo bar')
role.id = '14'
role.store()
@ -239,8 +238,8 @@ def test_formdef_list_redirection(pub):
def test_backoffice_submission_formdef_list(pub, local_user):
Role.wipe()
role = Role(name='Foo bar')
pub.role_class.wipe()
role = pub.role_class(name='Foo bar')
role.id = '14'
role.store()
@ -445,8 +444,8 @@ def test_post_invalid_json(pub, local_user):
def test_formdef_submit(pub, local_user):
Role.wipe()
role = Role(name='test')
pub.role_class.wipe()
role = pub.role_class(name='test')
role.store()
local_user.roles = [role.id]
local_user.store()

View File

@ -12,7 +12,6 @@ from wcs.formdef import FormDef
from wcs.qommon.form import PicklableUpload
from wcs.qommon.http_request import HTTPRequest
from wcs.qommon.ident.password_accounts import PasswordAccount
from wcs.roles import Role
from wcs.workflows import Workflow, WorkflowVariablesFieldsFormDef
from .utils import sign_uri
@ -77,8 +76,8 @@ def admin_user():
def test_roles(pub, local_user):
Role.wipe()
role = Role(name='Hello World')
pub.role_class.wipe()
role = pub.role_class(name='Hello World')
role.emails = ['toto@example.com', 'zozo@example.com']
role.details = 'kouign amann'
role.store()
@ -109,7 +108,7 @@ def test_users(pub, local_user):
assert resp.json['data'][0]['user_email'] == local_user.email
assert resp.json['data'][0]['user_id'] == local_user.id
role = Role(name='Foo bar')
role = pub.role_class(name='Foo bar')
role.store()
local_user.roles = [role.id]
local_user.store()
@ -165,7 +164,7 @@ def test_user_by_nameid(pub, local_user):
def test_user_roles(pub, local_user):
local_user.name_identifiers = ['xyz']
local_user.store()
role = Role(name='Foo bar')
role = pub.role_class(name='Foo bar')
role.store()
local_user.roles = [role.id]
local_user.store()
@ -339,8 +338,8 @@ def test_user_forms_limit_offset(pub, local_user):
def test_user_forms_from_agent(pub, local_user):
Role.wipe()
role = Role(name='Foo bar')
pub.role_class.wipe()
role = pub.role_class(name='Foo bar')
role.store()
agent_user = get_publisher().user_class()

View File

@ -10,7 +10,6 @@ from wcs import fields
from wcs.formdef import FormDef
from wcs.qommon.http_request import HTTPRequest
from wcs.qommon.ident.password_accounts import PasswordAccount
from wcs.roles import Role
from wcs.wf.jump import JumpWorkflowStatusItem
from wcs.wf.register_comment import RegisterCommenterWorkflowStatusItem
from wcs.workflows import Workflow
@ -108,8 +107,8 @@ def test_workflow_trigger(pub, local_user):
get_app(pub).post(sign_uri(formdata.get_url() + 'jump/trigger/XXX/'), status=200)
assert formdef.data_class().get(formdata.id).status == 'wf-st2'
Role.wipe()
role = Role(name='xxx')
pub.role_class.wipe()
role = pub.role_class(name='xxx')
role.store()
jump.by = [role.id]
@ -318,8 +317,8 @@ def test_workflow_global_webservice_trigger(pub, local_user):
add_to_journal.comment = 'HELLO WORLD 4'
trigger.roles = ['logged-users']
workflow.store()
Role.wipe()
role = Role(name='xxx')
pub.role_class.wipe()
role = pub.role_class(name='xxx')
role.store()
trigger.roles = [role.id]
workflow.store()

View File

@ -20,7 +20,7 @@ from wcs.qommon.form import UploadedFile
from wcs.qommon.ident.password_accounts import PasswordAccount
from wcs.qommon.http_request import HTTPRequest
import wcs.qommon.storage as st
from wcs.roles import Role, logged_users_role
from wcs.roles import logged_users_role
from wcs.workflows import (
Workflow,
CommentableWorkflowStatusItem,
@ -84,7 +84,7 @@ def create_user(pub, is_admin=False):
if user.name == 'admin':
user1 = user
user1.is_admin = is_admin
user1.roles = [x.id for x in Role.select() if x.name == 'foobar']
user1.roles = [x.id for x in pub.role_class.select() if x.name == 'foobar']
user1.store()
elif user.email == 'jean.darmette@triffouilis.fr':
pass # don't remove user created by local_user fixture
@ -102,8 +102,8 @@ def create_user(pub, is_admin=False):
account1.user_id = user1.id
account1.store()
Role.wipe()
role = Role(name='foobar')
pub.role_class.wipe()
role = pub.role_class(name='foobar')
role.store()
user1.roles = [role.id]
@ -223,14 +223,14 @@ def test_backoffice_role_user(pub):
assert 'Forms' not in resp.text
assert 'Workflows' not in resp.text
pub.cfg['admin-permissions'] = {'forms': [x.id for x in Role.select()]}
pub.cfg['admin-permissions'] = {'forms': [x.id for x in pub.role_class.select()]}
pub.write_cfg()
resp = app.get('/backoffice/')
assert 'Management' in resp.text
assert 'Forms' in resp.text
assert 'Workflows' not in resp.text
pub.cfg['admin-permissions'] = {'workflows': [x.id for x in Role.select()]}
pub.cfg['admin-permissions'] = {'workflows': [x.id for x in pub.role_class.select()]}
pub.write_cfg()
resp = app.get('/backoffice/')
assert 'Management' in resp.text
@ -238,7 +238,7 @@ def test_backoffice_role_user(pub):
assert 'Workflows' in resp.text
# check role id int->str migration
pub.cfg['admin-permissions'] = {'workflows': [int(x.id) for x in Role.select()]}
pub.cfg['admin-permissions'] = {'workflows': [int(x.id) for x in pub.role_class.select()]}
pub.write_cfg()
resp = app.get('/backoffice/')
assert 'Management' in resp.text
@ -1524,7 +1524,7 @@ def test_backoffice_multi_actions(pub):
resp = app.get('/backoffice/management/form-title/')
assert 'id="multi-actions"' in resp.text
trigger.roles = [x.id for x in Role.select() if x.name == 'foobar']
trigger.roles = [x.id for x in pub.role_class.select() if x.name == 'foobar']
workflow.store()
resp = app.get('/backoffice/management/form-title/')
@ -1683,7 +1683,7 @@ def test_backoffice_multi_actions_oldest_form(pub):
register_comment.parent = workflow.possible_status[2]
trigger = action.triggers[0]
trigger.roles = [x.id for x in Role.select() if x.name == 'foobar']
trigger.roles = [x.id for x in pub.role_class.select() if x.name == 'foobar']
workflow.store()
formdef.workflow_id = workflow.id
@ -1728,7 +1728,7 @@ def test_backoffice_multi_actions_using_session_user(pub):
register_comment = action.append_item('register-comment')
register_comment.comment = 'session_user={{session_user}}'
trigger = action.triggers[0]
trigger.roles = [x.id for x in Role.select() if x.name == 'foobar']
trigger.roles = [x.id for x in pub.role_class.select() if x.name == 'foobar']
workflow.store()
formdef.workflow_id = workflow.id
@ -1970,7 +1970,7 @@ def test_backoffice_handling_global_action(pub):
jump = action.append_item('jump')
jump.status = 'finished'
trigger = action.triggers[0]
trigger.roles = [x.id for x in Role.select() if x.name == 'foobar']
trigger.roles = [x.id for x in pub.role_class.select() if x.name == 'foobar']
workflow.store()
formdef.workflow_id = workflow.id
@ -2004,7 +2004,7 @@ def test_backoffice_global_remove_action(pub):
action = workflow.add_global_action('FOOBAR')
remove = action.append_item('remove')
trigger = action.triggers[0]
trigger.roles = [x.id for x in Role.select() if x.name == 'foobar']
trigger.roles = [x.id for x in pub.role_class.select() if x.name == 'foobar']
workflow.store()
formdef.workflow_id = workflow.id
@ -2207,7 +2207,7 @@ def test_backoffice_info_text(pub):
# info text is not visible if form is locked
second_user = pub.user_class(name='foobar')
second_user.roles = Role.keys()
second_user.roles = pub.role_class.keys()
second_user.store()
account = PasswordAccount(id='foobar')
account.set_password('foobar')
@ -2248,7 +2248,7 @@ def test_backoffice_handling_post_dispatch(pub):
# check a formdata that has been dispatched to another role is accessible
# by an user with that role.
user1 = create_user(pub)
role = Role(name='foobaz')
role = pub.role_class(name='foobaz')
role.store()
user1.roles = [role.id]
user1.store()
@ -3044,7 +3044,7 @@ def test_global_listing(pub):
# change role handling a formdef, make sure they do not appear anylonger in
# the all/done views.
role = Role(name='whatever')
role = pub.role_class(name='whatever')
role.store()
formdef = FormDef.get_by_urlname('form-title')
formdef.workflow_roles = {'_receiver': role.id}
@ -3759,7 +3759,7 @@ def test_backoffice_advisory_lock(pub):
create_environment(pub)
second_user = pub.user_class(name='foobar')
second_user.roles = Role.keys()
second_user.roles = pub.role_class.keys()
second_user.store()
account = PasswordAccount(id='foobar')
account.set_password('foobar')
@ -3837,7 +3837,7 @@ def test_backoffice_advisory_lock_related_formdatas(pub):
formdatas = formdef.data_class().select(lambda x: x.status == 'wf-new')
second_user = pub.user_class(name='foobar')
second_user.roles = Role.keys()
second_user.roles = pub.role_class.keys()
second_user.store()
account = PasswordAccount(id='foobar')
account.set_password('foobar')
@ -4449,7 +4449,7 @@ def test_inspect_page(pub, local_user):
# check functions
assert re.findall('Recipient.*foobar', resp.text)
role = Role(name='plop')
role = pub.role_class(name='plop')
role.store()
formdata.workflow_roles = {'_receiver': role.id}
formdata.store()
@ -5634,7 +5634,7 @@ def test_workflow_comment_required(pub):
def test_lazy_eval_with_conditional_workflow_form(pub):
role = Role(name='foobar')
role = pub.role_class(name='foobar')
role.store()
user = create_user(pub)

View File

@ -9,7 +9,6 @@ from wcs.qommon.ident.password_accounts import PasswordAccount
from wcs.qommon.http_request import HTTPRequest
from wcs.carddef import CardDef
from wcs.formdef import FormDef
from wcs.roles import Role
from wcs import fields
from utilities import get_app, login, create_temporary_pub, clean_temporary_pub
@ -597,8 +596,8 @@ def test_carddata_custom_view(pub):
def test_carddata_custom_view_is_default(pub):
user = create_superuser(pub)
Role.wipe()
role = Role(name='foobar')
pub.role_class.wipe()
role = pub.role_class(name='foobar')
role.store()
user.roles = [role.id]
user.store()

View File

@ -46,7 +46,7 @@ from wcs.wf.create_formdata import CreateFormdataWorkflowStatusItem, Mapping
from wcs.wf.redirect_to_url import RedirectToUrlWorkflowStatusItem
from wcs.blocks import BlockDef
from wcs.categories import Category
from wcs.roles import Role, logged_users_role
from wcs.roles import logged_users_role
from wcs.tracking_code import TrackingCode
from wcs.data_sources import NamedDataSource
from wcs import fields
@ -364,8 +364,8 @@ def test_form_access(pub):
formdef = create_formdef()
get_app(pub).get('/test/', status=200)
Role.wipe()
role = Role(name='xxx')
pub.role_class.wipe()
role = pub.role_class(name='xxx')
role.store()
# check a formdef protected by a role cannot be accessed
@ -602,7 +602,7 @@ def test_form_string_with_invalid_xml_chars(pub):
def test_form_submit_handling_role_info(pub):
role = Role(name='xxx')
role = pub.role_class(name='xxx')
role.details = 'Managing service'
role.store()
formdef = create_formdef()
@ -4689,8 +4689,8 @@ def test_form_workflow_trigger(pub):
resp = login(app, username='foo', password='foo').get('/')
resp = app.post(formdata.get_url() + 'jump/trigger/XXX', status=403)
Role.wipe()
role = Role(name='xxx')
pub.role_class.wipe()
role = pub.role_class(name='xxx')
role.store()
jump.by = [role.id]
@ -4753,8 +4753,8 @@ def test_form_worklow_multiple_identical_status(pub):
app = get_app(pub)
Role.wipe()
role = Role(name='xxx')
pub.role_class.wipe()
role = pub.role_class(name='xxx')
role.allows_backoffice_access = False
role.store()
@ -4819,7 +4819,7 @@ def test_form_worklow_comments_on_same_status(pub):
pub.session_manager.session_class.wipe()
user = create_user(pub)
role = Role(name='xxx')
role = pub.role_class(name='xxx')
role.store()
user.roles = [role.id]
user.store()
@ -5256,11 +5256,11 @@ def test_item_field_from_cards(pub):
def test_item_field_from_custom_view_on_cards(pub):
Role.wipe()
pub.role_class.wipe()
pub.custom_view_class.wipe()
user = create_user(pub)
role = Role(name='xxx')
role = pub.role_class(name='xxx')
role.store()
user.roles = [role.id]
user.is_admin = True
@ -5335,11 +5335,11 @@ def test_dynamic_item_field_from_custom_view_on_cards(pub):
pytest.skip('this requires SQL')
return
Role.wipe()
pub.role_class.wipe()
pub.custom_view_class.wipe()
user = create_user(pub)
role = Role(name='xxx')
role = pub.role_class(name='xxx')
role.store()
user.roles = [role.id]
user.is_admin = True
@ -6660,8 +6660,8 @@ def test_email_actions(pub, emails):
def test_manager_public_access(pub):
user, manager = create_user_and_admin(pub)
Role.wipe()
role = Role(name='xxx')
pub.role_class.wipe()
role = pub.role_class(name='xxx')
role.store()
manager.is_admin = False

View File

@ -19,7 +19,6 @@ from wcs import fields
from wcs.data_sources import NamedDataSource
from wcs.formdef import FormDef
from wcs.qommon.form import UploadedFile
from wcs.roles import Role
from wcs.wf.attachment import AddAttachmentWorkflowStatusItem
from wcs.wf.export_to_model import ExportToModel, transform_to_pdf
from wcs.wf.form import FormWorkflowStatusItem, WorkflowFormFieldsFormDef
@ -921,8 +920,8 @@ def test_formdata_generated_document_to_backoffice_field(pub):
def test_formdata_generated_document_in_private_history(pub):
user = create_user(pub)
Role.wipe()
role = Role(name='xxx')
pub.role_class.wipe()
role = pub.role_class(name='xxx')
role.store()
user.roles = [role.id]
@ -1510,10 +1509,10 @@ def test_formdata_named_wscall_in_conditions(http_requests, pub):
def test_formdata_evolution_registercommenter_to(pub):
user = create_user(pub)
Role.wipe()
role1 = Role(name='role the user does not have')
pub.role_class.wipe()
role1 = pub.role_class(name='role the user does not have')
role1.store()
role2 = Role(name='role the user does have')
role2 = pub.role_class(name='role the user does have')
role2.store()
user.roles = [role2.id]
user.store()

View File

@ -19,7 +19,6 @@ from wcs.categories import Category
from wcs.conditions import Condition
from wcs.formdef import FormDef
from wcs.formdata import Evolution
from wcs.roles import Role
from wcs import sessions
from wcs.variables import LazyFormData
from wcs.workflows import (
@ -598,7 +597,7 @@ def variable_test_data(pub):
user.name_identifiers = ['....']
user.store()
role = Role(name='foobar')
role = pub.role_class(name='foobar')
role.store()
FormDef.wipe()
@ -1732,7 +1731,7 @@ def test_has_role_templatefilter(pub, variable_test_data):
condition = Condition({'type': 'django', 'value': 'form_user|has_role:form_role_receiver_name'})
assert condition.evaluate() is False
role = Role.select()[0]
role = pub.role_class.select()[0]
user = pub.user_class.select()[0]
user.roles = [role.id, '42'] # role.id 42 does not exist
user.store()
@ -1759,7 +1758,7 @@ def test_roles_templatefilter(pub, variable_test_data):
condition = Condition({'type': 'django', 'value': 'form_role_receiver_name in form_user|roles'})
assert condition.evaluate() is False
role = Role.select()[0]
role = pub.role_class.select()[0]
user = pub.user_class.select()[0]
user.roles = [role.id, '42'] # role.id 42 does not exist
user.store()

View File

@ -12,7 +12,6 @@ from wcs.categories import Category
from wcs.carddef import CardDef
from wcs.formdef import FormDef, fields, FormdefImportError
from wcs.workflows import Workflow
from wcs.roles import Role
from wcs.qommon.misc import indent_xml as indent
from utilities import create_temporary_pub
@ -506,8 +505,8 @@ def test_page_post_conditions():
def test_workflow_roles():
Role.wipe()
role = Role(name='blah')
pub.role_class.wipe()
role = pub.role_class(name='blah')
role.store()
formdef = FormDef()
@ -549,9 +548,9 @@ def test_geolocations():
def test_user_roles():
Role.wipe()
pub.role_class.wipe()
role = Role(name='blah')
role = pub.role_class(name='blah')
role.store()
formdef = FormDef()
@ -576,9 +575,9 @@ def test_user_roles():
def test_backoffice_submission_roles():
Role.wipe()
pub.role_class.wipe()
role = Role(name='blah')
role = pub.role_class(name='blah')
role.store()
formdef = FormDef()

View File

@ -1,14 +1,12 @@
# -*- coding: utf-8 -*-
import shutil
from quixote import cleanup
import uuid
from wcs.qommon import force_str
from wcs.qommon.http_request import HTTPRequest
from wcs.ctl.hobo_notify import CmdHoboNotify
from wcs.roles import Role
from wcs.qommon import storage as st
from utilities import create_temporary_pub, clean_temporary_pub
from utilities import create_temporary_pub
import pytest
@ -27,8 +25,8 @@ def pub(request):
pub.cfg['sp'] = {'saml2_providerid': 'test'}
pub.write_cfg()
Role.wipe()
r = Role(name='Service étt civil')
pub.role_class.wipe()
r = pub.role_class(name='Service étt civil')
r.slug = 'service-ett-civil'
r.store()
@ -36,8 +34,6 @@ def pub(request):
def test_process_notification_role_wrong_audience(pub):
User = pub.user_class
notification = {
'@type': u'provision',
'audience': [u'coin'],
@ -49,7 +45,7 @@ def test_process_notification_role_wrong_audience(pub):
'name': u'Service enfance',
'slug': u'service-enfance',
'description': u'Rôle du service petite enfance',
'uuid': u'12345',
'uuid': str(uuid.uuid4()),
'emails': [u'petite-enfance@example.com'],
'emails_to_members': False,
},
@ -58,31 +54,31 @@ def test_process_notification_role_wrong_audience(pub):
'name': u'Service état civil',
'slug': u'service-etat-civil',
'description': u'Rôle du service état civil',
'uuid': u'xyz',
'uuid': str(uuid.uuid4()),
'emails': [u'etat-civil@example.com'],
'emails_to_members': True,
},
],
},
}
assert Role.count() == 1
assert Role.select()[0].name == 'Service étt civil'
assert Role.select()[0].slug == 'service-ett-civil'
assert Role.select()[0].details is None
assert Role.select()[0].emails is None
assert Role.select()[0].emails_to_members is False
assert pub.role_class.count() == 1
assert pub.role_class.select()[0].name == 'Service étt civil'
assert pub.role_class.select()[0].slug == 'service-ett-civil'
assert pub.role_class.select()[0].details is None
assert pub.role_class.select()[0].emails is None
assert pub.role_class.select()[0].emails_to_members is False
CmdHoboNotify.process_notification(notification)
assert Role.count() == 1
assert Role.select()[0].name == 'Service étt civil'
assert Role.select()[0].slug == 'service-ett-civil'
assert Role.select()[0].details is None
assert Role.select()[0].emails is None
assert Role.select()[0].emails_to_members is False
assert pub.role_class.count() == 1
assert pub.role_class.select()[0].name == 'Service étt civil'
assert pub.role_class.select()[0].slug == 'service-ett-civil'
assert pub.role_class.select()[0].details is None
assert pub.role_class.select()[0].emails is None
assert pub.role_class.select()[0].emails_to_members is False
def test_process_notification_role(pub):
User = pub.user_class
uuid1 = str(uuid.uuid4())
uuid2 = str(uuid.uuid4())
notification = {
'@type': u'provision',
'audience': [u'test'],
@ -94,7 +90,7 @@ def test_process_notification_role(pub):
'name': u'Service enfance',
'slug': u'service-enfance',
'details': u'Rôle du service petite enfance',
'uuid': u'12345',
'uuid': uuid1,
'emails': [u'petite-enfance@example.com'],
'emails_to_members': False,
},
@ -102,34 +98,34 @@ def test_process_notification_role(pub):
'name': u'Service état civil',
'slug': u'service-ett-civil',
'details': u'Rôle du service état civil',
'uuid': u'xyz',
'uuid': uuid2,
'emails': [u'etat-civil@example.com'],
'emails_to_members': True,
},
],
},
}
assert Role.count() == 1
assert Role.select()[0].name == 'Service étt civil'
assert Role.select()[0].slug == 'service-ett-civil'
assert Role.select()[0].details is None
assert Role.select()[0].emails is None
assert Role.select()[0].emails_to_members is False
existing_role_id = Role.select()[0].id
assert pub.role_class.count() == 1
assert pub.role_class.select()[0].name == 'Service étt civil'
assert pub.role_class.select()[0].slug == 'service-ett-civil'
assert pub.role_class.select()[0].details is None
assert pub.role_class.select()[0].emails is None
assert pub.role_class.select()[0].emails_to_members is False
existing_role_id = pub.role_class.select()[0].id
CmdHoboNotify.process_notification(notification)
assert Role.count() == 2
old_role = Role.get(existing_role_id)
assert pub.role_class.count() == 2
old_role = pub.role_class.get(existing_role_id)
assert old_role.name == 'Service état civil'
assert old_role.uuid == 'xyz'
assert old_role.uuid == uuid2
assert old_role.slug == 'service-ett-civil'
assert old_role.details == "Rôle du service état civil"
assert old_role.emails == ['etat-civil@example.com']
assert old_role.emails_to_members is True
new_role = Role.get_on_index('12345', 'uuid')
assert new_role.id == '12345'
new_role = pub.role_class.get_on_index(uuid1, 'uuid')
assert new_role.id == uuid1
assert new_role.name == 'Service enfance'
assert new_role.slug == 'service-enfance'
assert new_role.uuid == '12345'
assert new_role.uuid == uuid1
assert new_role.details == "Rôle du service petite enfance"
assert new_role.emails == ['petite-enfance@example.com']
assert new_role.emails_to_members is False
@ -145,7 +141,7 @@ def test_process_notification_role(pub):
'name': u'Service enfance',
'slug': u'service-enfance',
'description': u'Rôle du service petite enfance',
'uuid': u'12345',
'uuid': uuid1,
'emails': [u'petite-enfance@example.com'],
'emails_to_members': True,
},
@ -153,18 +149,18 @@ def test_process_notification_role(pub):
},
}
CmdHoboNotify.process_notification(notification)
assert Role.count() == 1
assert Role.select()[0].id == new_role.id
assert Role.select()[0].uuid == '12345'
assert Role.select()[0].name == 'Service enfance'
assert Role.select()[0].slug == 'service-enfance'
assert Role.select()[0].details is None
assert Role.select()[0].emails == ['petite-enfance@example.com']
assert Role.select()[0].emails_to_members is True
assert pub.role_class.count() == 1
assert pub.role_class.select()[0].id == new_role.id
assert pub.role_class.select()[0].uuid == uuid1
assert pub.role_class.select()[0].name == 'Service enfance'
assert pub.role_class.select()[0].slug == 'service-enfance'
assert pub.role_class.select()[0].details is None
assert pub.role_class.select()[0].emails == ['petite-enfance@example.com']
assert pub.role_class.select()[0].emails_to_members is True
def test_process_notification_internal_role(pub):
Role.wipe()
pub.role_class.wipe()
notification = {
'@type': u'provision',
@ -177,7 +173,7 @@ def test_process_notification_internal_role(pub):
'name': u'Service enfance',
'slug': u'_service-enfance',
'details': u'Rôle du service petite enfance',
'uuid': u'12345',
'uuid': str(uuid.uuid4()),
'emails': [u'petite-enfance@example.com'],
'emails_to_members': False,
},
@ -185,15 +181,15 @@ def test_process_notification_internal_role(pub):
},
}
CmdHoboNotify.process_notification(notification)
assert Role.count() == 1
role = Role.select()[0]
assert pub.role_class.count() == 1
role = pub.role_class.select()[0]
assert role.is_internal()
def test_process_notification_role_description(pub):
User = pub.user_class
# check descriptions are not used to fill role.details
uuid1 = str(uuid.uuid4())
uuid2 = str(uuid.uuid4())
notification = {
'@type': u'provision',
'audience': [u'test'],
@ -205,7 +201,7 @@ def test_process_notification_role_description(pub):
'name': u'Service enfance',
'slug': u'service-enfance',
'description': u'Rôle du service petite enfance',
'uuid': u'12345',
'uuid': uuid1,
'emails': [u'petite-enfance@example.com'],
'emails_to_members': False,
},
@ -213,33 +209,33 @@ def test_process_notification_role_description(pub):
'name': u'Service état civil',
'slug': u'service-ett-civil',
'description': u'Rôle du service état civil',
'uuid': u'xyz',
'uuid': uuid2,
'emails': [u'etat-civil@example.com'],
'emails_to_members': True,
},
],
},
}
assert Role.count() == 1
assert Role.select()[0].name == 'Service étt civil'
assert Role.select()[0].slug == 'service-ett-civil'
assert Role.select()[0].details is None
assert Role.select()[0].emails is None
assert Role.select()[0].emails_to_members is False
existing_role_id = Role.select()[0].id
assert pub.role_class.count() == 1
assert pub.role_class.select()[0].name == 'Service étt civil'
assert pub.role_class.select()[0].slug == 'service-ett-civil'
assert pub.role_class.select()[0].details is None
assert pub.role_class.select()[0].emails is None
assert pub.role_class.select()[0].emails_to_members is False
existing_role_id = pub.role_class.select()[0].id
CmdHoboNotify.process_notification(notification)
assert Role.count() == 2
old_role = Role.get(existing_role_id)
assert pub.role_class.count() == 2
old_role = pub.role_class.get(existing_role_id)
assert old_role.name == 'Service état civil'
assert old_role.slug == 'service-ett-civil'
assert old_role.uuid == 'xyz'
assert old_role.uuid == uuid2
assert old_role.details is None
assert old_role.emails == ['etat-civil@example.com']
assert old_role.emails_to_members is True
new_role = Role.get_on_index('12345', 'uuid')
new_role = pub.role_class.get_on_index(uuid1, 'uuid')
assert new_role.name == 'Service enfance'
assert new_role.slug == 'service-enfance'
assert new_role.uuid == '12345'
assert new_role.uuid == uuid1
assert new_role.details is None
assert new_role.emails == ['petite-enfance@example.com']
assert new_role.emails_to_members is False
@ -255,7 +251,7 @@ def test_process_notification_role_description(pub):
'name': u'Service enfance',
'slug': u'service-enfance',
'description': u'Rôle du service petite enfance',
'uuid': u'12345',
'uuid': uuid1,
'emails': [u'petite-enfance@example.com'],
'emails_to_members': True,
},
@ -263,19 +259,18 @@ def test_process_notification_role_description(pub):
},
}
CmdHoboNotify.process_notification(notification)
assert Role.count() == 1
assert Role.select()[0].id == new_role.id
assert Role.select()[0].name == 'Service enfance'
assert Role.select()[0].uuid == '12345'
assert Role.select()[0].slug == 'service-enfance'
assert Role.select()[0].details is None
assert Role.select()[0].emails == ['petite-enfance@example.com']
assert Role.select()[0].emails_to_members is True
assert pub.role_class.count() == 1
assert pub.role_class.select()[0].id == new_role.id
assert pub.role_class.select()[0].name == 'Service enfance'
assert pub.role_class.select()[0].uuid == uuid1
assert pub.role_class.select()[0].slug == 'service-enfance'
assert pub.role_class.select()[0].details is None
assert pub.role_class.select()[0].emails == ['petite-enfance@example.com']
assert pub.role_class.select()[0].emails_to_members is True
def test_process_notification_role_deprovision(pub):
User = pub.user_class
uuid1 = str(uuid.uuid4())
notification = {
'@type': u'deprovision',
'audience': [u'test'],
@ -285,34 +280,34 @@ def test_process_notification_role_deprovision(pub):
'data': [
{
'@type': 'role',
'uuid': u'xyz',
'uuid': uuid1,
},
],
},
}
role = Role.select()[0]
role = pub.role_class.select()[0]
role.remove_self()
assert role.name == 'Service étt civil'
assert role.slug == 'service-ett-civil'
role.id = 'xyz'
role.id = uuid1
role.store()
role = Role('foo')
role = pub.role_class('foo')
role.slug = 'bar'
role.store()
assert Role.count() == 2
assert pub.role_class.count() == 2
CmdHoboNotify.process_notification(notification)
assert Role.count() == 1
assert Role.select()[0].slug == 'bar'
assert pub.role_class.count() == 1
assert pub.role_class.select()[0].slug == 'bar'
r = Role(name='Service étt civil')
r.uuid = 'xyz'
r = pub.role_class(name='Service étt civil')
r.uuid = uuid1
r.store()
assert Role.count() == 2
assert pub.role_class.count() == 2
CmdHoboNotify.process_notification(notification)
assert Role.count() == 1
assert Role.select()[0].slug == 'bar'
assert pub.role_class.count() == 1
assert pub.role_class.select()[0].slug == 'bar'
PROFILE = {
@ -451,6 +446,8 @@ def test_process_notification_user_provision(pub):
# setup an hobo profile
CmdCheckHobos().update_profile(PROFILE, pub)
uuid1 = str(uuid.uuid4())
uuid2 = str(uuid.uuid4())
notification = {
'@type': u'provision',
'audience': [u'test'],
@ -462,7 +459,7 @@ def test_process_notification_user_provision(pub):
'name': u'Service enfance',
'slug': u'service-enfance',
'description': u'Rôle du service petite enfance',
'uuid': u'12345',
'uuid': uuid1,
'emails': [u'petite-enfance@example.com'],
'emails_to_members': False,
},
@ -470,33 +467,33 @@ def test_process_notification_user_provision(pub):
'name': u'Service état civil',
'slug': u'service-ett-civil',
'description': u'Rôle du service état civil',
'uuid': u'xyz',
'uuid': uuid2,
'emails': [u'etat-civil@example.com'],
'emails_to_members': True,
},
],
},
}
assert Role.count() == 1
assert Role.select()[0].name == 'Service étt civil'
assert Role.select()[0].slug == 'service-ett-civil'
assert Role.select()[0].details is None
assert Role.select()[0].emails is None
assert Role.select()[0].emails_to_members is False
existing_role_id = Role.select()[0].id
assert pub.role_class.count() == 1
assert pub.role_class.select()[0].name == 'Service étt civil'
assert pub.role_class.select()[0].slug == 'service-ett-civil'
assert pub.role_class.select()[0].details is None
assert pub.role_class.select()[0].emails is None
assert pub.role_class.select()[0].emails_to_members is False
existing_role_id = pub.role_class.select()[0].id
CmdHoboNotify.process_notification(notification)
assert Role.count() == 2
old_role = Role.get(existing_role_id)
assert pub.role_class.count() == 2
old_role = pub.role_class.get(existing_role_id)
assert old_role.name == 'Service état civil'
assert old_role.uuid == 'xyz'
assert old_role.uuid == uuid2
assert old_role.slug == 'service-ett-civil'
assert old_role.details is None
assert old_role.emails == ['etat-civil@example.com']
assert old_role.emails_to_members is True
new_role = Role.get_on_index('12345', 'uuid')
new_role = pub.role_class.get_on_index(uuid1, 'uuid')
assert new_role.name == 'Service enfance'
assert new_role.slug == 'service-enfance'
assert new_role.uuid == '12345'
assert new_role.uuid == uuid1
assert new_role.details is None
assert new_role.emails == ['petite-enfance@example.com']
assert new_role.emails_to_members is False
@ -518,12 +515,12 @@ def test_process_notification_user_provision(pub):
'is_active': True,
u'roles': [
{
u'uuid': u'12345',
u'uuid': uuid1,
u'name': u'Service petite enfance',
u'description': u'etc.',
},
{
u'uuid': u'xyz',
u'uuid': uuid2,
u'name': u'Service état civil',
u'description': u'etc.',
},
@ -565,12 +562,12 @@ def test_process_notification_user_provision(pub):
'is_active': False,
u'roles': [
{
u'uuid': u'xyz',
u'uuid': uuid2,
u'name': u'Service état civil',
u'description': u'etc.',
},
{
u'uuid': u'unknown-uuid',
u'uuid': str(uuid.uuid4()),
u'name': u'Service enfance',
u'description': u'',
},
@ -612,7 +609,7 @@ def test_process_notification_user_provision(pub):
u'is_superuser': True,
u'roles': [
{
u'uuid': u'xyz',
u'uuid': uuid2,
u'name': u'Service état civil',
u'description': u'etc.',
},
@ -647,12 +644,12 @@ def test_process_notification_user_provision(pub):
'is_superuser': False,
'roles': [
{
'uuid': '12345',
'uuid': uuid1,
'name': 'Service petite enfance',
'description': 'etc.',
},
{
'uuid': 'xyz',
'uuid': uuid2,
'name': 'Service état civil',
'description': 'etc.',
},
@ -678,7 +675,7 @@ def test_process_notification_user_with_errors(pub):
from wcs.ctl.check_hobos import CmdCheckHobos
User.wipe()
Role.wipe()
pub.role_class.wipe()
CmdCheckHobos().update_profile(PROFILE, pub)
notification = {
@ -737,7 +734,7 @@ def test_process_notification_user_with_errors(pub):
def test_process_notification_role_with_errors(pub):
User = pub.user_class
User.wipe()
Role.wipe()
pub.role_class.wipe()
notification = {
'@type': u'provision',
'audience': [u'test'],
@ -759,14 +756,14 @@ def test_process_notification_role_with_errors(pub):
with pytest.raises(KeyError) as e:
CmdHoboNotify.process_notification(notification)
assert e.value.args == ('role without uuid',)
assert Role.count() == 0
assert pub.role_class.count() == 0
notification['objects']['data'][0]['uuid'] = u'12345'
del notification['objects']['data'][0]['name']
with pytest.raises(ValueError) as e:
CmdHoboNotify.process_notification(notification)
assert e.value.args == ('invalid role',)
assert Role.count() == 0
assert pub.role_class.count() == 0
def test_process_user_deprovision(pub):
@ -776,7 +773,7 @@ def test_process_user_deprovision(pub):
from wcs.ctl.check_hobos import CmdCheckHobos
User.wipe()
Role.wipe()
pub.role_class.wipe()
CmdCheckHobos().update_profile(PROFILE, pub)
user = User()

View File

@ -4,8 +4,7 @@ import pickle
from utilities import create_temporary_pub, clean_temporary_pub
from wcs.qommon.storage import StorableObject
from wcs.roles import Role, get_user_roles
from wcs.roles import get_user_roles
from quixote import get_publisher
@ -21,47 +20,47 @@ def teardown_module(module):
def test_slug():
Role.wipe()
role = Role(name='Hello world')
get_publisher().role_class.wipe()
role = get_publisher().role_class(name='Hello world')
role.store()
assert role.slug == 'hello-world'
def test_duplicated_name():
Role.wipe()
role = Role(name='Hello world')
get_publisher().role_class.wipe()
role = get_publisher().role_class(name='Hello world')
role.store()
assert role.slug == 'hello-world'
role = Role(name='Hello world')
role = get_publisher().role_class(name='Hello world')
role.store()
assert role.slug == 'hello-world-1'
def test_migrate():
Role.wipe()
role = Role(name='Hello world')
get_publisher().role_class.wipe()
role = get_publisher().role_class(name='Hello world')
role.store()
obj = pickle.load(open(role.get_object_filename(), 'rb'))
del obj.slug
pickle.dump(obj, open(role.get_object_filename(), 'wb'))
assert pickle.load(open(role.get_object_filename(), 'rb')).slug is None
assert Role.get(role.id).slug == 'hello-world'
assert get_publisher().role_class.get(role.id).slug == 'hello-world'
def test_get_user_roles():
Role.wipe()
Role(name='f1').store()
Role(name='é1').store()
Role(name='a1').store()
get_publisher().role_class.wipe()
get_publisher().role_class(name='f1').store()
get_publisher().role_class(name='é1').store()
get_publisher().role_class(name='a1').store()
assert [x[1] for x in get_user_roles()] == ['a1', 'é1', 'f1']
def test_get_emails():
User = get_publisher().user_class
User.wipe()
Role.wipe()
get_publisher().role_class.wipe()
role = Role(name='role')
role = get_publisher().role_class(name='role')
role.emails_to_members = True
role.store()

View File

@ -1,7 +1,7 @@
import datetime
import os
import sys
import shutil
import uuid
import urllib.parse
try:
@ -19,7 +19,6 @@ from wcs.qommon.saml2 import Saml2Directory
from wcs.qommon.ident.idp import MethodAdminDirectory, AdminIDPDir
from wcs.qommon import sessions, x509utils
from wcs.qommon.errors import RequestError
from wcs.roles import Role
from utilities import get_app, create_temporary_pub, clean_temporary_pub
@ -39,6 +38,9 @@ IDP_METADATA = """<?xml version="1.0"?>
</ns0:IDPSSODescriptor>
</ns0:EntityDescriptor>"""
role_uuid1 = str(uuid.uuid4())
role_uuid2 = str(uuid.uuid4())
def pytest_generate_tests(metafunc):
if 'pub' in metafunc.fixturenames:
@ -169,7 +171,7 @@ def get_authn_response_msg(pub, ni_format=lasso.SAML2_NAME_IDENTIFIER_FORMAT_PER
role_slug_attribute.name = 'role-slug'
role_slug_attribute.nameFormat = lasso.SAML2_ATTRIBUTE_NAME_FORMAT_BASIC
role_uuids = []
for role_uuid in ('foo', 'bar'):
for role_uuid in (role_uuid1, role_uuid2):
text_node = lasso.MiscTextNode.newWithString(role_uuid)
text_node.textChild = True
atv = lasso.Saml2AttributeValue()
@ -263,9 +265,9 @@ def test_assertion_consumer_existing_federation(pub, caplog):
pub.write_cfg()
pub.set_config()
Role.wipe()
role = Role('Foo')
role.uuid = 'foo'
pub.role_class.wipe()
role = pub.role_class('Foo')
role.uuid = role_uuid1
role.store()
# 1st pass to generate a user
@ -280,10 +282,10 @@ def test_assertion_consumer_existing_federation(pub, caplog):
assert user.verified_fields
assert len(user.verified_fields) == 3
assert user.form_data['_birthdate'].tm_year == 2000
assert user.roles == [role.id] # bar uuid is ignored as unknown
assert user.roles == [role.id] # other uuid is ignored as unknown
assert ('enrolling user %s in Foo' % user.id) in [x.message for x in caplog.records]
assert 'role uuid bar is unknown' in [x.message for x in caplog.records]
assert 'role uuid %s is unknown' % role_uuid2 in [x.message for x in caplog.records]
req = HTTPRequest(
None,

View File

@ -150,7 +150,7 @@ def test_snapshot_user(pub):
def test_form_snapshot_comments(pub):
create_superuser(pub)
create_role()
create_role(pub)
app = login(get_app(pub))
resp = app.get('/backoffice/forms/')
@ -179,7 +179,7 @@ def test_form_snapshot_comments(pub):
def test_form_snapshot_history(pub, formdef_with_history):
create_superuser(pub)
create_role()
create_role(pub)
app = login(get_app(pub))
resp = app.get('/backoffice/forms/%s/' % formdef_with_history.id)
resp = resp.click('History')
@ -195,7 +195,7 @@ def test_form_snapshot_history(pub, formdef_with_history):
def test_form_snapshot_export(pub, formdef_with_history):
create_superuser(pub)
create_role()
create_role(pub)
app = login(get_app(pub))
resp = app.get('/backoffice/forms/%s/history/' % formdef_with_history.id)
@ -207,7 +207,7 @@ def test_form_snapshot_export(pub, formdef_with_history):
def test_form_snapshot_restore(pub, formdef_with_history):
create_superuser(pub)
create_role()
create_role(pub)
app = login(get_app(pub))
# restore as new
@ -234,7 +234,7 @@ def test_form_snapshot_restore(pub, formdef_with_history):
def test_form_snapshot_restore_with_import_error(pub):
create_superuser(pub)
create_role()
create_role(pub)
app = login(get_app(pub))
formdef = FormDef()
@ -250,7 +250,7 @@ def test_form_snapshot_restore_with_import_error(pub):
def test_block_snapshot_browse(pub, blocks_feature):
create_superuser(pub)
create_role()
create_role(pub)
BlockDef.wipe()
blockdef = BlockDef()
@ -273,7 +273,7 @@ def test_block_snapshot_browse(pub, blocks_feature):
def test_card_snapshot_browse(pub):
create_superuser(pub)
create_role()
create_role(pub)
CardDef.wipe()
carddef = CardDef()
@ -315,7 +315,7 @@ def test_card_snapshot_browse(pub):
def test_datasource_snapshot_browse(pub):
create_superuser(pub)
create_role()
create_role(pub)
NamedDataSource.wipe()
datasource = NamedDataSource(name='test')
@ -339,7 +339,7 @@ def test_datasource_snapshot_browse(pub):
def test_form_snapshot_browse(pub, formdef_with_history):
create_superuser(pub)
create_role()
create_role(pub)
app = login(get_app(pub))
pub.custom_view_class.wipe()
@ -395,7 +395,7 @@ def test_form_snapshot_browse(pub, formdef_with_history):
def test_form_snapshot_browse_with_import_error(pub):
create_superuser(pub)
create_role()
create_role(pub)
app = login(get_app(pub))
formdef = FormDef()
@ -420,7 +420,7 @@ def test_form_snapshot_browse_with_import_error(pub):
def test_workflow_snapshot_browse(pub):
create_superuser(pub)
create_role()
create_role(pub)
Workflow.wipe()
workflow = Workflow(name='test')
@ -441,7 +441,7 @@ def test_workflow_snapshot_browse(pub):
def test_workflow_with_model_snapshot_browse(pub):
create_superuser(pub)
create_role()
create_role(pub)
Workflow.wipe()
if os.path.exists(os.path.join(pub.app_dir, 'models')):
@ -484,7 +484,7 @@ def test_workflow_with_model_snapshot_browse(pub):
def test_workflow_with_form_snapshot_browse(pub):
create_superuser(pub)
create_role()
create_role(pub)
Workflow.wipe()
wf = Workflow(name='status')
@ -508,7 +508,7 @@ def test_workflow_with_form_snapshot_browse(pub):
def test_wscall_snapshot_browse(pub):
create_superuser(pub)
create_role()
create_role(pub)
NamedWsCall.wipe()
wscall = NamedWsCall(name='test')
@ -531,7 +531,7 @@ def test_wscall_snapshot_browse(pub):
def test_form_snapshot_save(pub, formdef_with_history):
create_superuser(pub)
create_role()
create_role(pub)
app = login(get_app(pub))
resp = app.get('/backoffice/forms/%s/' % formdef_with_history.id)
@ -564,7 +564,7 @@ def test_form_snapshot_save(pub, formdef_with_history):
def test_snaphost_workflow_status_item_comments(pub):
create_superuser(pub)
create_role()
create_role(pub)
Workflow.wipe()
workflow = Workflow(name='test')

View File

@ -18,7 +18,6 @@ from wcs.qommon import force_str
from wcs import publisher, fields
from wcs.formdef import FormDef
from wcs.formdata import Evolution
from wcs.roles import Role
from wcs.workflows import Workflow, CommentableWorkflowStatusItem, WorkflowCriticalityLevel
from wcs.wf.jump import JumpWorkflowStatusItem
from wcs.wf.register_comment import RegisterCommenterWorkflowStatusItem
@ -1911,7 +1910,7 @@ def test_view_performances():
# create roles
roles = []
for i in range(nb_roles):
role = Role(name='role%s' % i)
role = pub.role_class(name='role%s' % i)
role.store()
roles.append(role)

View File

@ -33,7 +33,6 @@ from wcs.wf.form import FormWorkflowStatusItem, WorkflowFormFieldsFormDef
from wcs.wf.jump import JumpWorkflowStatusItem
from wcs.fields import StringField, FileField
from wcs.qommon.form import UploadedFile
from wcs.roles import Role
from wcs.workflows import ExportToModel, WorkflowVariablesFieldsFormDef, DisplayMessageWorkflowStatusItem
@ -117,7 +116,7 @@ def test_action_dispatch(pub):
wf = Workflow(name='status')
st1 = wf.add_status('Status1', 'st1')
role = Role()
role = pub.role_class()
role.id = '5'
role.name = 'Test Role'
role.store()
@ -174,7 +173,7 @@ def test_status_actions_named_role(pub):
def test_status_actions_named_existing_role(pub):
role = Role()
role = pub.role_class()
role.id = '2'
role.name = 'Test Role named existing role'
role.store()
@ -206,12 +205,12 @@ def test_status_actions_named_existing_role(pub):
def test_status_actions_named_missing_role(pub):
role = Role()
role = pub.role_class()
role.id = '3'
role.name = 'Test Role A'
role.store()
role = Role()
role = pub.role_class()
role.id = '4'
role.name = 'Test Role B'
role.store()
@ -241,19 +240,19 @@ def test_status_actions_named_missing_role(pub):
xml_export = xml_export_orig.replace(
b'<item role_id="3">Test Role A</item>', b'<item role_id="999">foobar</item>'
)
nb_roles = Role.count()
nb_roles = pub.role_class.count()
wf3 = Workflow.import_from_xml_tree(ET.parse(io.BytesIO(xml_export)))
assert Role.count() == nb_roles + 1
assert pub.role_class.count() == nb_roles + 1
# check that it doesn't fallback on the id if there's no match on the
# name
nb_roles = Role.count()
nb_roles = pub.role_class.count()
xml_export = xml_export_orig.replace(
b'<item role_id="3">Test Role A</item>', b'<item role_id="3">Test Role C</item>'
)
wf3 = Workflow.import_from_xml_tree(ET.parse(io.BytesIO(xml_export)))
assert wf3.possible_status[0].items[0].by != ['3']
assert Role.count() == nb_roles + 1
assert pub.role_class.count() == nb_roles + 1
# on the other hand, check that it uses the id when included_id is True
wf3 = Workflow.import_from_xml_tree(ET.parse(io.BytesIO(xml_export)), include_id=True)
@ -455,7 +454,7 @@ def test_backoffice_info_text(pub):
def test_global_actions(pub):
role = Role()
role = pub.role_class()
role.id = '5'
role.name = 'Test Role'
role.store()
@ -481,7 +480,7 @@ def test_global_actions(pub):
def test_register_comment_to(pub):
role = Role()
role = pub.role_class()
role.id = '5'
role.name = 'Test Role'
role.store()
@ -526,13 +525,13 @@ def test_complex_dispatch_action(pub):
wf = Workflow(name='status')
st1 = wf.add_status('Status1', 'st1')
Role.wipe()
pub.role_class.wipe()
role1 = Role()
role1 = pub.role_class()
role1.name = 'Test Role 1'
role1.store()
role2 = Role()
role2 = pub.role_class()
role2.name = 'Test Role 2'
role2.store()
@ -550,13 +549,13 @@ def test_complex_dispatch_action(pub):
assert wf2.possible_status[0].items[0].rules == dispatch.rules
assert wf2.possible_status[0].items[0].dispatch_type == 'automatic'
Role.wipe()
pub.role_class.wipe()
role3 = Role()
role3 = pub.role_class()
role3.name = 'Test Role 1'
role3.store()
role4 = Role()
role4 = pub.role_class()
role4.name = 'Test Role 2'
role4.store()
@ -605,9 +604,9 @@ def test_sendmail_other_destination(pub):
st1.items.append(sendmail)
sendmail.parent = st1
Role.wipe()
pub.role_class.wipe()
wf2 = assert_import_export_works(wf)
assert Role.count() == 0
assert pub.role_class.count() == 0
sendmail.to = [
'_submitter',
@ -617,7 +616,7 @@ def test_sendmail_other_destination(pub):
'foobar@localhost',
]
wf2 = assert_import_export_works(wf)
assert Role.count() == 0
assert pub.role_class.count() == 0
assert wf2.possible_status[0].items[0].to == sendmail.to
@ -648,9 +647,9 @@ def test_sms(pub):
st1.items.append(sendsms)
sendsms.parent = st1
Role.wipe()
pub.role_class.wipe()
wf2 = assert_import_export_works(wf)
assert Role.count() == 0
assert pub.role_class.count() == 0
assert wf2.possible_status[0].items[0].to == sendsms.to

View File

@ -47,7 +47,6 @@ from wcs.fields import (
BlockField,
)
from wcs.formdata import Evolution
from wcs.roles import Role
from wcs.workflows import (
Workflow,
WorkflowStatusItem,
@ -426,7 +425,7 @@ def test_check_auth(pub):
user = pub.user_class(name='foo')
user.store()
role = Role(name='bar1')
role = pub.role_class(name='bar1')
role.store()
formdef = FormDef()
@ -475,8 +474,8 @@ def test_dispatch(pub):
formdef.name = 'baz'
formdef.store()
Role.wipe()
role = Role(name='xxx')
pub.role_class.wipe()
role = pub.role_class(name='xxx')
role.store()
item = DispatchWorkflowStatusItem()
@ -516,10 +515,10 @@ def test_dispatch_auto(two_pubs):
item.perform(formdata)
assert not formdata.workflow_roles
Role.wipe()
role1 = Role('xxx1')
two_pubs.role_class.wipe()
role1 = two_pubs.role_class('xxx1')
role1.store()
role2 = Role('xxx2')
role2 = two_pubs.role_class('xxx2')
role2.store()
for variable in ('form_var_foo', '{{form_var_foo}}'):
@ -588,8 +587,8 @@ def test_dispatch_computed(two_pubs):
formdef.name = 'baz'
formdef.store()
Role.wipe()
role = Role(name='xxx')
two_pubs.role_class.wipe()
role = two_pubs.role_class(name='xxx')
role.slug = 'yyy'
role.store()
@ -679,9 +678,9 @@ def test_add_remove_computed_roles(pub):
formdata = formdef.data_class()()
formdata.user_id = user.id
role = Role(name='plop')
role = pub.role_class(name='plop')
role.store()
role2 = Role(name='xxx')
role2 = pub.role_class(name='xxx')
role2.store()
item = AddRoleWorkflowStatusItem()
@ -723,7 +722,7 @@ def test_roles_idp(pub):
user.name_identifiers = ['xxx']
user.store()
role = Role(name='bar1')
role = pub.role_class(name='bar1')
role.store()
formdef = FormDef()
@ -1156,9 +1155,9 @@ def test_register_comment_to(pub):
workflow = Workflow(name='register comment to')
st1 = workflow.add_status('Status1', 'st1')
role = Role(name='foorole')
role = pub.role_class(name='foorole')
role.store()
role2 = Role(name='no-one-role')
role2 = pub.role_class(name='no-one-role')
role2.store()
user = pub.user_class(name='baruser')
user.roles = []
@ -1262,12 +1261,12 @@ def test_email(pub, emails):
user.email = 'zorg@localhost'
user.store()
Role.wipe()
role1 = Role(name='foo')
pub.role_class.wipe()
role1 = pub.role_class(name='foo')
role1.emails = ['foo@localhost']
role1.store()
role2 = Role(name='bar')
role2 = pub.role_class(name='bar')
role2.emails = ['bar@localhost', 'baz@localhost']
role2.store()
@ -2790,7 +2789,7 @@ def test_display_form(two_pubs):
def test_display_form_and_comment(pub):
role = Role(name='bar1')
role = pub.role_class(name='bar1')
role.store()
user = pub.user_class()
@ -2854,7 +2853,7 @@ def test_display_form_migration(pub):
def test_choice_button_no_label(pub):
role = Role(name='bar1')
role = pub.role_class(name='bar1')
role.store()
user = pub.user_class()
@ -2957,9 +2956,9 @@ def test_workflow_display_message_to(pub):
workflow = Workflow(name='display message to')
st1 = workflow.add_status('Status1', 'st1')
role = Role(name='foorole')
role = pub.role_class(name='foorole')
role.store()
role2 = Role(name='no-one-role')
role2 = pub.role_class(name='no-one-role')
role2.store()
user = pub.user_class(name='baruser')
user.roles = []
@ -3049,7 +3048,7 @@ def test_workflow_display_message_line_details(pub):
display_message.position = 'actions'
assert display_message.get_line_details() == 'with actions'
role = Role(name='foorole')
role = pub.role_class(name='foorole')
role.store()
display_message.to = [role.id]
assert display_message.get_line_details() == 'with actions, for foorole'
@ -3062,13 +3061,13 @@ def test_workflow_roles(pub, emails):
user.email = 'zorg@localhost'
user.store()
Role.wipe()
role1 = Role(name='foo')
pub.role_class.wipe()
role1 = pub.role_class(name='foo')
role1.emails = ['foo@localhost']
role1.details = 'Hello World'
role1.store()
role2 = Role(name='bar')
role2 = pub.role_class(name='bar')
role2.emails = ['bar@localhost', 'baz@localhost']
role2.store()
@ -5302,7 +5301,7 @@ def test_workflow_action_condition(two_pubs):
st1 = workflow.add_status('Status1', 'st1')
workflow.store()
role = Role(name='bar1')
role = two_pubs.role_class(name='bar1')
role.store()
user = two_pubs.user_class()
@ -5424,7 +5423,7 @@ def test_notifications(pub, http_requests):
# roles (not exposed in current UI)
http_requests.empty()
role = Role(name='blah')
role = pub.role_class(name='blah')
role.store()
user1 = pub.user_class()
@ -5484,10 +5483,10 @@ def test_workflow_field_migration(pub):
def test_aggregation_email(pub, emails):
Workflow.wipe()
Role.wipe()
pub.role_class.wipe()
AggregationEmail.wipe()
role = Role(name='foobar')
role = pub.role_class(name='foobar')
role.emails = ['foobar@localhost']
role.emails_to_members = False
role.store()

View File

@ -20,6 +20,7 @@ from wcs.qommon import force_str
import wcs
import wcs.wsgi
from wcs import compat
from wcs.roles import Role
from wcs.users import User
from wcs.tracking_code import TrackingCode
import wcs.qommon.emails
@ -77,6 +78,7 @@ def create_temporary_pub(sql_mode=False, templates_mode=False, lazy_mode=False):
if sql_mode:
pub.user_class = sql.SqlUser
pub.role_class = sql.Role
pub.tracking_code_class = sql.TrackingCode
pub.session_class = sql.Session
pub.custom_view_class = sql.CustomView
@ -85,6 +87,7 @@ def create_temporary_pub(sql_mode=False, templates_mode=False, lazy_mode=False):
pub.is_using_postgresql = lambda: True
else:
pub.user_class = User
pub.role_class = Role
pub.tracking_code_class = TrackingCode
pub.session_class = sessions.BasicSession
pub.custom_view_class = custom_views.CustomView
@ -167,6 +170,7 @@ def create_temporary_pub(sql_mode=False, templates_mode=False, lazy_mode=False):
pub.write_cfg()
sql.do_user_table()
sql.do_role_table()
sql.do_tracking_code_table()
sql.do_session_table()
sql.do_custom_views_table()

View File

@ -41,7 +41,7 @@ from wcs.qommon.afterjobs import AfterJob
from wcs.formdef import FormDef, FormdefImportError, FormdefImportRecoverableError, DRAFTS_DEFAULT_LIFESPAN
from wcs.carddef import CardDef
from wcs.categories import Category
from wcs.roles import Role, logged_users_role, get_user_roles
from wcs.roles import logged_users_role, get_user_roles
from wcs.workflows import Workflow
from wcs.forms.root import qrcode
@ -694,7 +694,7 @@ class FormDefPage(Directory):
role_id = self.formdef.workflow_roles.get(wf_role_id)
if role_id:
try:
role = Role.get(role_id)
role = get_publisher().role_class.get(role_id)
role_label = role.name
except KeyError:
# removed role ?
@ -820,7 +820,7 @@ class FormDefPage(Directory):
roles.append(logged_users_role().name)
else:
try:
roles.append(Role.get(x).name)
roles.append(get_publisher().role_class.get(x).name)
except KeyError:
# removed role ?
roles.append(_('Unknown role (%s)') % x)
@ -1784,7 +1784,7 @@ class FormsDirectory(AccessControlled, Directory):
def form_actions(self):
r = TemplateIO(html=True)
has_roles = bool(Role.count())
has_roles = bool(get_publisher().role_class.count())
r += htmltext('<div id="appbar">')
r += htmltext('<h2>%s</h2>') % _('Forms')
if has_roles:
@ -1825,7 +1825,7 @@ class FormsDirectory(AccessControlled, Directory):
def new(self):
get_response().breadcrumb.append(('new', _('New')))
if Role.count() == 0:
if get_publisher().role_class.count() == 0:
return template.error_page('forms', _('You first have to define roles.'))
formdefui = self.formdef_ui_class(None)
form = formdefui.new_form_ui()

View File

@ -24,7 +24,7 @@ from wcs.qommon.form import *
from wcs.qommon.backoffice.menu import html_top
from wcs.roles import Role, get_user_roles
from wcs.roles import get_user_roles
from wcs.formdef import FormDef
@ -32,7 +32,7 @@ class RoleUI(object):
def __init__(self, role):
self.role = role
if self.role is None:
self.role = Role()
self.role = get_publisher().role_class()
def get_form(self):
form = Form(enctype="multipart/form-data")
@ -75,10 +75,10 @@ class RoleUI(object):
if self.role:
role = self.role
else:
role = Role(name=form.get_widget('name').parse())
role = get_publisher().role_class(name=form.get_widget('name').parse())
name = form.get_widget('name').parse()
role_names = [x.name for x in Role.select() if x.id != role.id]
role_names = [x.name for x in get_publisher().role_class.select() if x.id != role.id]
if name in role_names:
form.get_widget('name').set_error(_('This name is already used'))
raise ValueError()
@ -94,7 +94,7 @@ class RolePage(Directory):
def __init__(self, component):
try:
self.role = Role.get(component)
self.role = get_publisher().role_class.get(component)
except KeyError:
raise errors.TraversalError()
self.role_ui = RoleUI(self.role)

View File

@ -58,7 +58,6 @@ from wcs.carddef import CardDef
from wcs.data_sources import NamedDataSource
from wcs.formdef import FormDef
from wcs.workflows import Workflow, WorkflowImportError
from wcs.roles import Role
from wcs.backoffice.studio import StudioDirectory
from .fields import FieldDefPage, FieldsDirectory
@ -545,7 +544,7 @@ class SettingsDirectory(QommonSettingsDirectory):
)
if enabled('permissions'):
roles = list(Role.select())
roles = list(get_publisher().role_class.select())
if roles:
r += htmltext('<dt><a href="admin-permissions">%s</a></dt> <dd>%s</dd>') % (
_('Admin Permissions'),
@ -689,7 +688,7 @@ class SettingsDirectory(QommonSettingsDirectory):
rows = []
value = []
roles = [x for x in Role.select(order_by='name') if not x.is_internal()]
roles = [x for x in get_publisher().role_class.select(order_by='name') if not x.is_internal()]
for role in roles:
rows.append(role.name)
value.append([role.allows_backoffice_access])
@ -998,7 +997,6 @@ class SettingsDirectory(QommonSettingsDirectory):
z = zipfile.ZipFile(c, 'w')
for d in self.dirs:
if d not in (
'roles',
'categories',
'carddef_categories',
'wscalls',
@ -1055,6 +1053,14 @@ class SettingsDirectory(QommonSettingsDirectory):
os.path.join('blockdefs_xml', str(blockdef.id)),
b'<?xml version="1.0"?>\n' + ET.tostring(node),
)
if 'roles' in self.dirs:
for role in get_publisher().role_class.select():
node = role.export_to_xml(include_id=True)
misc.indent_xml(node)
z.writestr(
os.path.join('roles_xml', str(role.id)),
b'<?xml version="1.0"?>\n' + ET.tostring(node),
)
if self.settings:
z.write(os.path.join(self.app_dir, 'config.pck'), 'config.pck')

View File

@ -23,7 +23,6 @@ import wcs.qommon.storage as st
from wcs.qommon import errors
from wcs.qommon import misc, get_cfg
from wcs.qommon.backoffice.listing import pagination_links
from wcs.roles import Role
from wcs.qommon import ident
from wcs.qommon.ident.idp import is_idp_managing_user_attributes, is_idp_managing_user_roles
@ -56,7 +55,7 @@ class UserUI(object):
formdef.add_fields_to_form(form, form_data=self.user.form_data)
form.add(CheckboxWidget, 'is_admin', title=_('Administrator Account'), value=self.user.is_admin)
roles = list(Role.select(order_by='name'))
roles = list(get_publisher().role_class.select(order_by='name'))
if len(roles) and not is_idp_managing_user_roles():
form.add(
WidgetList,
@ -187,7 +186,7 @@ class UserPage(Directory):
r += htmltext('<li><strong>%s</strong></li>') % _('Site Administrator')
for k in self.user.roles or []:
try:
r += htmltext('<li>%s</li>') % Role.get(k).name
r += htmltext('<li>%s</li>') % get_publisher().role_class.get(k).name
except KeyError:
# removed role ?
r += htmltext('<li><em>')
@ -332,7 +331,7 @@ class UsersDirectory(Directory):
# optimize query by removing the roles criterias if they are all
# checked
possible_roles = ['admin', 'none']
possible_roles.extend(Role.keys())
possible_roles.extend(get_publisher().role_class.keys())
if set(possible_roles) == set(checked_roles):
checked_roles = None
@ -464,7 +463,7 @@ class UsersDirectory(Directory):
r += htmltext('<input type="hidden" name="filter" value="true"/>')
r += htmltext('<ul>')
roles = [('admin', _('Site Administrator'))]
for role in Role.select():
for role in get_publisher().role_class.select():
roles.append((role.id, role.name))
roles.append(('none', _('None')))

View File

@ -45,7 +45,7 @@ from wcs.carddef import CardDef
from wcs.formdef import FormDef
from wcs.data_sources import NamedDataSource
from wcs.data_sources import get_object as get_data_source_object
from wcs.roles import Role, logged_users_role
from wcs.roles import logged_users_role
from wcs.forms.common import FormStatusPage
import wcs.qommon.storage as st
from wcs.api_utils import sign_url_auto_orig, is_url_signed, get_user_from_api_query_string, get_query_flag
@ -702,7 +702,9 @@ class ApiFormdefsDirectory(Directory):
role_id = formdef_workflow_roles.get(wf_role_id)
if role_id:
try:
workflow_function['role'] = Role.get(role_id).get_json_export_dict()
workflow_function['role'] = (
get_publisher().role_class.get(role_id).get_json_export_dict()
)
except KeyError:
pass
formdict['functions'][wf_role_id] = workflow_function
@ -821,7 +823,7 @@ class ApiUserDirectory(Directory):
user_info = user.get_substitution_variables(prefix='')
del user_info['user']
user_info['id'] = user.id
user_roles = [Role.get(x, ignore_errors=True) for x in user.roles or []]
user_roles = [get_publisher().role_class.get(x, ignore_errors=True) for x in user.roles or []]
user_info['user_roles'] = [x.get_json_export_dict() for x in user_roles if x]
return json.dumps(user_info, cls=misc.JSONEncoder)
@ -960,7 +962,7 @@ class ApiUsersDirectory(Directory):
user_info = user.get_substitution_variables(prefix='')
del user_info['user']
user_info['user_id'] = user.id
user_roles = [Role.get(x, ignore_errors=True) for x in user.roles or []]
user_roles = [get_publisher().role_class.get(x, ignore_errors=True) for x in user.roles or []]
user_info['user_roles'] = [x.get_json_export_dict() for x in user_roles if x]
# add attributes to be usable as datasource
user_info['id'] = user.id
@ -1104,8 +1106,7 @@ class ApiDirectory(Directory):
if not (is_url_signed() or (get_request().user and get_request().user.can_go_in_admin())):
raise AccessForbiddenError('unsigned request or user has no access to backoffice')
list_roles = []
charset = get_publisher().site_charset
for role in Role.select():
for role in get_publisher().role_class.select():
if not role.is_internal():
list_roles.append(role.get_json_export_dict())
get_response().set_content_type('application/json')

View File

@ -26,7 +26,6 @@ from ..qommon.storage import NotEqual, Null
from wcs.carddef import CardDef
from wcs.categories import CardDefCategory
from wcs.roles import Role
from wcs.workflows import Workflow
from wcs.admin.categories import CardDefCategoriesDirectory
from wcs.admin.forms import FormsDirectory, FormDefPage, FormDefUI, html_top, OptionsDirectory
@ -142,7 +141,7 @@ class CardDefPage(FormDefPage):
role_id = self.formdef.workflow_roles.get(wf_role_id)
if role_id:
try:
role = Role.get(role_id)
role = get_publisher().role_class.get(role_id)
role_label = role.name
except KeyError:
# removed role ?

View File

@ -72,7 +72,7 @@ from wcs.admin.settings import UserFieldsFormDef
from wcs.categories import Category
from wcs.formdata import FormData
from wcs.formdef import FormDef
from wcs.roles import logged_users_role, Role
from wcs.roles import logged_users_role
from wcs.variables import LazyFieldVar
from wcs.workflows import template_on_formdata, WorkflowStatusItem
@ -3296,7 +3296,7 @@ class FormBackOfficeStatusPage(FormStatusPage):
acting_role_id = self.filled.get_role_translation(key)
if acting_role_id:
try:
acting_role = Role.get(acting_role_id)
acting_role = get_publisher().role_class.get(acting_role_id)
r += htmltext('<div class="value"><span>%s</span></div>') % acting_role.name
except KeyError:
r += htmltext('<div class="value"><span>%s %s</span></div>') % (

View File

@ -19,7 +19,6 @@ import sys
import json
from quixote import get_publisher
from wcs.roles import Role
from ..qommon.ctl import Command
from ..qommon.publisher import get_cfg
from wcs.admin.settings import UserFieldsFormDef
@ -116,11 +115,11 @@ class CmdHoboNotify(Command):
emails = [force_str(email) for email in o['emails']]
emails_to_members = o['emails_to_members']
# Find existing role
role = Role.resolve(uuid, slug, name)
role = get_publisher().role_class.resolve(uuid, slug, name)
if not role:
if action != 'provision':
continue
role = Role(id=uuid)
role = get_publisher().role_class(id=uuid)
if action == 'provision':
# Provision/rename
role.name = name
@ -138,7 +137,7 @@ class CmdHoboNotify(Command):
role.remove_self()
# All roles have been sent
if full and action == 'provision':
for role in Role.select(ignore_errors=True):
for role in get_publisher().role_class.select(ignore_errors=True):
if role and role.uuid not in uuids:
role.remove_self()
@ -186,7 +185,7 @@ class CmdHoboNotify(Command):
user.is_admin = o.get('is_superuser', False)
user.roles = []
for role_ref in o.get('roles', []):
role = Role.resolve(role_ref['uuid'])
role = get_publisher().role_class.resolve(role_ref['uuid'])
if role and role.id not in user.roles:
user.add_roles([role.id])
user.set_attributes_from_formdata(user.form_data)
@ -203,7 +202,7 @@ class CmdHoboNotify(Command):
users = User.get_users_with_name_identifier(o['uuid'])
for user in users:
user.set_deleted()
except Exception as e:
except Exception:
publisher.notify_of_exception(sys.exc_info(), context='[PROVISIONNING]')

View File

@ -33,7 +33,6 @@ from .qommon.publisher import get_cfg
from .qommon.substitution import Substitutions, invalidate_substitution_cache
from .qommon.template import Template
from .roles import Role
from .fields import FileField
@ -698,7 +697,7 @@ class FormData(StorableObject):
def get_handling_role(self):
try:
return Role.get(self.get_handling_role_id())
return get_publisher().role_class.get(self.get_handling_role_id())
except KeyError:
return None
@ -844,7 +843,7 @@ class FormData(StorableObject):
for role_type, role_id in workflow_roles.items():
prefix = 'form_role_%s_' % role_type.replace('-', '_').strip('_')
try:
d.update(Role.get(role_id).get_substitution_variables(prefix))
d.update(get_publisher().role_class.get(role_id).get_substitution_variables(prefix))
except KeyError:
pass
@ -1206,7 +1205,7 @@ class FormData(StorableObject):
# exclude special _submitter value
role_list = [x for x in data['roles'][role_key] if x != '_submitter']
# get role objects
role_list = [Role.get(x, ignore_errors=True) for x in role_list]
role_list = [get_publisher().role_class.get(x, ignore_errors=True) for x in role_list]
# export as json dicts
role_list = [x.get_json_export_dict() for x in role_list if x is not None]
data['roles'][role_key] = role_list

View File

@ -41,7 +41,7 @@ from .qommon.substitution import Substitutions
from .qommon.publisher import get_publisher_class
from .formdata import FormData
from .roles import Role, logged_users_role
from .roles import logged_users_role
from .categories import Category
from . import fields
from . import data_sources
@ -986,7 +986,7 @@ class FormDef(StorableObject):
role = force_text(role_id, charset)
else:
try:
role = force_text(Role.get(role_id).name, charset)
role = force_text(get_publisher().role_class.get(role_id).name, charset)
except KeyError:
role = force_text(role_id, charset)
sub = ET.SubElement(roles, 'role')
@ -1004,7 +1004,7 @@ class FormDef(StorableObject):
role = force_text(role_id, charset)
else:
try:
role = force_text(Role.get(role_id).name, charset)
role = force_text(get_publisher().role_class.get(role_id).name, charset)
except KeyError:
role = force_text(role_id, charset)
sub = ET.SubElement(roles, 'role')
@ -1198,11 +1198,11 @@ class FormDef(StorableObject):
role_id = value
elif include_id:
role_id = role_node.attrib.get('role_id')
if role_id and not Role.has_key(role_id):
if role_id and not get_publisher().role_class.get(role_id, ignore_errors=True):
role_id = None
if not role_id:
for role in Role.select(ignore_errors=True):
for role in get_publisher().role_class.select(ignore_errors=True):
if role.name == value:
role_id = role.id
break

View File

@ -55,6 +55,7 @@ from . import custom_views
from . import sessions
from .qommon.cron import CronJob
from .roles import Role
from .users import User
from .tracking_code import TrackingCode
@ -157,6 +158,7 @@ class WcsPublisher(StubWcsPublisher):
from . import sql
self.user_class = sql.SqlUser
self.role_class = sql.Role
self.tracking_code_class = sql.TrackingCode
self.session_class = sql.Session
self.custom_view_class = sql.CustomView
@ -165,6 +167,7 @@ class WcsPublisher(StubWcsPublisher):
sql.get_connection(new=True)
else:
self.user_class = User
self.role_class = Role
self.tracking_code_class = TrackingCode
self.session_class = sessions.BasicSession
self.custom_view_class = custom_views.CustomView
@ -218,7 +221,13 @@ class WcsPublisher(StubWcsPublisher):
for f in z.namelist():
if '.indexes' in f:
continue
if os.path.dirname(f) in ('formdefs_xml', 'carddefs_xml', 'workflows_xml', 'blockdefs_xml'):
if os.path.dirname(f) in (
'formdefs_xml',
'carddefs_xml',
'workflows_xml',
'blockdefs_xml',
'roles_xml',
):
continue
path = os.path.join(self.app_dir, f)
if not os.path.exists(os.path.dirname(path)):
@ -284,6 +293,15 @@ class WcsPublisher(StubWcsPublisher):
carddefs.append(carddef)
results['carddefs'] += 1
# sixth pass, roles
roles = []
for f in z.namelist():
if os.path.dirname(f) == 'roles_xml' and os.path.basename(f):
role = self.role_class.import_from_xml(z.open(f), include_id=True)
role.store()
roles.append(role)
results['roles'] += 1
# rebuild indexes for imported objects
for k, v in results.items():
if k == 'settings':
@ -306,9 +324,7 @@ class WcsPublisher(StubWcsPublisher):
klass = Category
elif k == 'roles':
from .roles import Role
klass = Role
klass = self.role_class
elif k == 'workflows':
from wcs.workflows import Workflow
@ -335,6 +351,7 @@ class WcsPublisher(StubWcsPublisher):
sql.get_connection(new=True)
sql.do_session_table()
sql.do_user_table()
sql.do_role_table()
sql.do_tracking_code_table()
sql.do_custom_views_table()
sql.do_snapshots_table()

View File

@ -36,7 +36,6 @@ from . import misc
from .publisher import get_cfg, get_logger
from . import _, force_str
from .template import error_page
from wcs.roles import Role
from . import errors
@ -495,7 +494,7 @@ class Saml2Directory(Directory):
# point roles in authentic where provisionned from w.c.s. and join
# was done on the slug field.
for uuid in m['role-slug']:
role = Role.resolve(uuid)
role = get_publisher().role_class.resolve(uuid)
if not role:
logger.warning('role uuid %s is unknown', uuid)
continue

View File

@ -27,6 +27,7 @@ import urllib.parse
import pyproj
from pyproj import Geod
from quixote import get_publisher
try:
import langdetect
@ -44,7 +45,6 @@ from wcs.qommon import calendar
from wcs.qommon import evalutils
from wcs.qommon import tokens
from wcs.qommon.admin.texts import TextsDirectory
from wcs.roles import Role
register = template.Library()
@ -704,7 +704,7 @@ def has_role(user, role_name):
return False
for role_id in user.get_roles():
try:
if role_name == Role.get(role_id).name:
if role_name == get_publisher().role_class.get(role_id).name:
return True
except KeyError: # role has been deleted
pass
@ -717,7 +717,7 @@ def roles(user):
# do not fail on non-user objects, just return empty list
return []
role_ids = user.get_roles()
roles = [Role.get(x, ignore_errors=True, ignore_migration=True) for x in role_ids]
roles = [get_publisher().role_class.get(x, ignore_errors=True, ignore_migration=True) for x in role_ids]
return [x.name for x in roles if x]

View File

@ -14,6 +14,8 @@
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import xml.etree.ElementTree as ET
from django.utils.encoding import force_text
from quixote import get_publisher
@ -24,6 +26,7 @@ from .qommon.storage import StorableObject
class Role(StorableObject):
_names = 'roles'
_indexes = ['uuid', 'slug']
xml_root_node = 'role'
name = None
uuid = None
@ -34,6 +37,9 @@ class Role(StorableObject):
emails_to_members = False
allows_backoffice_access = True
TEXT_ATTRIBUTES = ['name', 'uuid', 'slug', 'details', 'emails']
BOOLEAN_ATTRIBUTES = ['internal', 'emails_to_members', 'allows_backoffice_access']
def __init__(self, name=None, id=None):
StorableObject.__init__(self, id=id)
self.name = name
@ -98,6 +104,59 @@ class Role(StorableObject):
'id': self.id,
}
def export_to_xml(self, include_id=False):
charset = get_publisher().site_charset
root = ET.Element(self.xml_root_node)
if include_id and self.id:
root.attrib['id'] = str(self.id)
for text_attribute in list(self.TEXT_ATTRIBUTES):
if not hasattr(self, text_attribute) or not getattr(self, text_attribute):
continue
ET.SubElement(root, text_attribute).text = force_text(getattr(self, text_attribute), charset)
for boolean_attribute in self.BOOLEAN_ATTRIBUTES:
if not hasattr(self, boolean_attribute):
continue
value = getattr(self, boolean_attribute)
if value:
value = 'true'
else:
value = 'false'
ET.SubElement(root, boolean_attribute).text = value
return root
@classmethod
def import_from_xml(cls, fd, charset=None, include_id=False):
try:
tree = ET.parse(fd)
except Exception:
raise ValueError()
if charset is None:
charset = get_publisher().site_charset
assert charset == 'utf-8'
role = cls()
# if the tree we get is actually a ElementTree for real, we get its
# root element and go on happily.
if not ET.iselement(tree):
tree = tree.getroot()
if include_id and tree.attrib.get('id'):
role.id = tree.attrib.get('id')
for text_attribute in list(cls.TEXT_ATTRIBUTES):
value = tree.find(text_attribute)
if value is None or value.text is None:
continue
setattr(role, text_attribute, misc.xml_node_text(value))
for boolean_attribute in cls.BOOLEAN_ATTRIBUTES:
value = tree.find(boolean_attribute)
if value is None:
continue
setattr(role, boolean_attribute, value.text == 'true')
return role
@classmethod
def resolve(cls, uuid, slug=None, name=None):
try:
@ -132,5 +191,11 @@ def logged_users_role():
def get_user_roles():
t = sorted([(misc.simplify(x.name), x.id, x.name, x.id) for x in Role.select() if not x.is_internal()])
t = sorted(
[
(misc.simplify(x.name), x.id, x.name, x.id)
for x in get_publisher().role_class.select()
if not x.is_internal()
]
)
return [x[1:] for x in t]

View File

@ -46,6 +46,7 @@ import wcs.carddata
import wcs.custom_views
import wcs.formdata
import wcs.logged_errors
import wcs.roles
import wcs.snapshots
import wcs.tracking_code
import wcs.users
@ -802,6 +803,57 @@ def do_user_table():
cur.close()
def do_role_table(concurrently=False):
conn, cur = get_connection_and_cursor()
table_name = 'roles'
cur.execute(
'''SELECT COUNT(*) FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name = %s''',
(table_name,),
)
if cur.fetchone()[0] == 0:
cur.execute(
'''CREATE TABLE %s (id VARCHAR PRIMARY KEY,
name VARCHAR,
uuid UUID,
slug VARCHAR UNIQUE,
internal BOOLEAN,
details VARCHAR,
emails VARCHAR[],
emails_to_members BOOLEAN,
allows_backoffice_access BOOLEAN)'''
% table_name
)
cur.execute(
'''SELECT column_name FROM information_schema.columns
WHERE table_schema = 'public'
AND table_name = %s''',
(table_name,),
)
existing_fields = set([x[0] for x in cur.fetchall()])
needed_fields = set([x[0] for x in Role._table_static_fields])
# delete obsolete fields
for field in existing_fields - needed_fields:
cur.execute('''ALTER TABLE %s DROP COLUMN %s''' % (table_name, field))
conn.commit()
cur.close()
def migrate_legacy_roles():
# store old pickle roles in SQL
from wcs import roles
for role_id in roles.Role.keys():
role = roles.Role.get(role_id)
role.__class__ = Role
role.store()
def do_tracking_code_table():
conn, cur = get_connection_and_cursor()
table_name = 'tracking_codes'
@ -1401,8 +1453,8 @@ class SqlMixin(object):
@classmethod
@guard_postgres
def get(cls, id, ignore_errors=False, ignore_migration=False):
if cls._numerical_id or id is None:
def get(cls, id, ignore_errors=False, ignore_migration=False, column=None):
if column is None and (cls._numerical_id or id is None):
try:
int(id)
except (TypeError, ValueError):
@ -1414,11 +1466,12 @@ class SqlMixin(object):
sql_statement = '''SELECT %s
FROM %s
WHERE id = %%(id)s''' % (
WHERE %s = %%(value)s''' % (
', '.join([x[0] for x in cls._table_static_fields] + cls.get_data_fields()),
cls._table_name,
column or 'id',
)
cur.execute(sql_statement, {'id': str(id)})
cur.execute(sql_statement, {'value': str(id)})
row = cur.fetchone()
if row is None:
cur.close()
@ -1428,6 +1481,11 @@ class SqlMixin(object):
cur.close()
return cls._row2ob(row)
@classmethod
@guard_postgres
def get_on_index(cls, value, index, ignore_errors=False, **kwargs):
return cls.get(value, ignore_errors=ignore_errors, column=index)
@classmethod
@guard_postgres
def get_ids(cls, ids, ignore_errors=False, keep_order=False, fields=None, order_by=None):
@ -2447,6 +2505,93 @@ class SqlUser(SqlMixin, wcs.users.User):
return objects
class Role(SqlMixin, wcs.roles.Role):
_table_name = 'roles'
_table_static_fields = [
('id', 'varchar'),
('name', 'varchar'),
('uuid', 'uuid'),
('slug', 'varchar'),
('internal', 'boolean'),
('details', 'varchar'),
('emails', 'varchar[]'),
('emails_to_members', 'boolean'),
('allows_backoffice_access', 'boolean'),
]
_numerical_id = False
@guard_postgres
def store(self):
if self.slug is None:
# set slug if it's not yet there
self.slug = self.get_new_slug()
sql_dict = {
'id': self.id,
'name': self.name,
'uuid': self.uuid,
'slug': self.slug,
'internal': self.internal,
'details': self.details,
'emails': self.emails,
'emails_to_members': self.emails_to_members,
'allows_backoffice_access': self.allows_backoffice_access,
}
conn, cur = get_connection_and_cursor()
column_names = sql_dict.keys()
if not self.id:
sql_dict['id'] = self.get_new_id()
sql_statement = '''INSERT INTO %s (%s)
VALUES (%s)
RETURNING id''' % (
self._table_name,
', '.join(column_names),
', '.join(['%%(%s)s' % x for x in column_names]),
)
while True:
try:
cur.execute(sql_statement, sql_dict)
except psycopg2.IntegrityError:
conn.rollback()
sql_dict['id'] = self.get_new_id()
else:
break
self.id = str_encode(cur.fetchone()[0])
else:
sql_statement = '''UPDATE %s SET %s WHERE id = %%(id)s RETURNING id''' % (
self._table_name,
', '.join(['%s = %%(%s)s' % (x, x) for x in column_names]),
)
cur.execute(sql_statement, sql_dict)
if cur.fetchone() is None:
sql_statement = '''INSERT INTO %s (%s) VALUES (%s)''' % (
self._table_name,
', '.join(column_names),
', '.join(['%%(%s)s' % x for x in column_names]),
)
cur.execute(sql_statement, sql_dict)
conn.commit()
cur.close()
@classmethod
def get_data_fields(cls):
return []
@classmethod
def _row2ob(cls, row, **kwargs):
o = cls()
for field, value in zip(cls._table_static_fields, tuple(row)):
if field[1] in ('varchar', 'varchar[]'):
setattr(o, field[0], str_encode(value))
else:
setattr(o, field[0], value)
return o
class Session(SqlMixin, wcs.sessions.BasicSession):
_table_name = 'sessions'
_table_static_fields = [
@ -3152,7 +3297,7 @@ def get_period_total(
# latest migration, number + description (description is not used
# programmaticaly but will make sure git conflicts if two migrations are
# separately added with the same number)
SQL_LEVEL = (48, 'remove acked attribute from LoggedError')
SQL_LEVEL = (49, 'Role migration')
def migrate_global_views(conn, cur):
@ -3323,6 +3468,10 @@ def migrate():
# 47: store LoggedErrors in SQL
# 48: remove acked attribute from LoggedError
do_loggederrors_table()
if sql_level < 49:
# 49: store Role in SQL
do_role_table()
migrate_legacy_roles()
cur.execute('''UPDATE wcs_meta SET value = %s WHERE key = %s''', (str(SQL_LEVEL[0]), 'sql_level'))

View File

@ -16,6 +16,8 @@
import datetime
from quixote import get_publisher
from .qommon import _, N_
from .qommon.misc import simplify
from .qommon.storage import StorableObject
@ -134,11 +136,10 @@ class User(StorableObject):
return True
if self.anonymous:
return False
from .roles import Role
for role_id in self.roles or []:
try:
role = Role.get(role_id)
role = get_publisher().role_class.get(role_id)
if role.allows_backoffice_access:
return True
except KeyError: # role has been deleted

View File

@ -493,8 +493,6 @@ class LazyFormData(LazyFormDef):
@property
def role(self):
from wcs.roles import Role
workflow_roles = {}
if self._formdef.workflow_roles:
workflow_roles.update(self._formdef.workflow_roles)
@ -505,7 +503,7 @@ class LazyFormData(LazyFormDef):
for role_type, role_id in workflow_roles.items():
prefix = '%s_' % role_type.replace('-', '_').strip('_')
try:
d.update(Role.get(role_id).get_substitution_variables(prefix))
d.update(get_publisher().role_class.get(role_id).get_substitution_variables(prefix))
except KeyError:
pass
return d

View File

@ -22,7 +22,6 @@ from ..qommon.cron import CronJob
from ..qommon import emails
from wcs.workflows import WorkflowStatusItem, register_item_class
from wcs.roles import Role
class AggregationEmailWorkflowStatusItem(WorkflowStatusItem):
@ -110,7 +109,7 @@ def send_aggregation_emails(publisher):
aggregate.remove_self()
try:
role = Role.get(aggregate_id)
role = get_publisher().role_class.get(aggregate_id)
except KeyError:
continue
if not role.get_emails():

View File

@ -22,7 +22,7 @@ from quixote.html import htmltext
from ..qommon import _, N_
from ..qommon.form import *
from ..qommon.template import Template
from wcs.roles import Role, get_user_roles
from wcs.roles import get_user_roles
from wcs.workflows import XmlSerialisable, WorkflowStatusItem, register_item_class, get_role_name
@ -232,7 +232,7 @@ class DispatchWorkflowStatusItem(WorkflowStatusItem):
break
if new_role_id:
if not Role.has_key(new_role_id):
if not get_publisher().role_class.get(new_role_id, ignore_errors=True):
get_publisher().record_error(
_('error in dispatch, missing role (%s)') % new_role_id, formdata=formdata
)

View File

@ -23,7 +23,6 @@ from ..qommon.form import *
from ..qommon.template import TemplateError
from ..qommon import get_logger
from wcs.roles import Role
from wcs.workflows import WorkflowStatusItem, register_item_class, template_on_formdata
from .wscall import WebserviceCallStatusItem
@ -156,7 +155,7 @@ class SendNotificationWorkflowStatusItem(WebserviceCallStatusItem):
dest = formdata.get_role_translation(dest)
try:
role = Role.get(dest)
role = get_publisher().role_class.get(dest)
except KeyError:
continue
users.extend(get_publisher().user_class.get_users_with_role(role.id))

View File

@ -22,7 +22,7 @@ from quixote import get_request, get_publisher, get_response
from ..qommon import _, N_
from ..qommon.form import *
from wcs.workflows import WorkflowStatusItem, register_item_class
from wcs.roles import get_user_roles, Role
from wcs.roles import get_user_roles
from ..qommon.ident.idp import is_idp_managing_user_attributes
from ..qommon.misc import http_post_request, http_delete_request
from ..qommon.publisher import get_cfg, get_logger
@ -99,7 +99,7 @@ class AddRoleWorkflowStatusItem(WorkflowStatusItem):
request._user = user
def perform_idp(self, user, formdata, role_id):
role = Role.get(role_id)
role = get_publisher().role_class.get(role_id)
role_uuid = role.uuid or role.slug
user_uuid = user.name_identifiers[0]
try:
@ -169,7 +169,7 @@ class RemoveRoleWorkflowStatusItem(WorkflowStatusItem):
request._user = user
def perform_idp(self, user, formdata, role_id):
role = Role.get(role_id)
role = get_publisher().role_class.get(role_id)
role_uuid = role.uuid or role.slug
user_uuid = user.name_identifiers[0]
try:

View File

@ -27,7 +27,7 @@ import uuid
from django.utils.encoding import force_text
from quixote import get_request, get_response, redirect
from quixote import get_request, get_response, redirect, get_publisher
from .qommon import _, N_, force_str
from .qommon.misc import C_, get_as_datetime, file_digest, get_foreground_colour, xml_node_text
@ -41,7 +41,7 @@ from .qommon.template import Template, TemplateError
from .qommon.upload_storage import PicklableUpload, get_storage_object
from .conditions import Condition
from .roles import Role, logged_users_role, get_user_roles
from .roles import logged_users_role, get_user_roles
from .fields import FileField
from .formdef import FormDef, FormdefImportError
from .carddef import CardDef
@ -1022,13 +1022,13 @@ class XmlSerialisable(object):
# if we import using id, look at the role_id attribute
if include_id and 'role_id' in elem.attrib:
role_id = force_str(elem.attrib['role_id'])
if Role.has_key(role_id):
if get_publisher().role_class.get(role_id, ignore_errors=True):
return role_id
if WorkflowStatusItem.get_expression(role_id)['type'] in ('python', 'template'):
return role_id
# if not using id, look up on the name
for role in Role.select(ignore_errors=True):
for role in get_publisher().role_class.select(ignore_errors=True):
if role.name == value:
return role.id
@ -1042,7 +1042,7 @@ class XmlSerialisable(object):
raise WorkflowImportError(N_('Unknown referenced role (%s)'), (value,))
# and if there's no match, create a new role
role = Role()
role = get_publisher().role_class()
role.name = value
role.store()
return role.id
@ -2125,14 +2125,14 @@ class WorkflowStatusItem(XmlSerialisable):
def get_computed_role_id(self, role_id):
new_role_id = self.compute(str(role_id))
if Role.has_key(new_role_id):
if get_publisher().role_class.get(new_role_id, ignore_errors=True):
return new_role_id
# computed value, not an id, try to get role by slug
new_role = Role.get_on_index(new_role_id, 'slug', ignore_errors=True)
new_role = get_publisher().role_class.get_on_index(new_role_id, 'slug', ignore_errors=True)
if new_role:
return new_role.id
# fallback to role label
for role in Role.select():
for role in get_publisher().role_class.select():
if role.name == new_role_id:
return role.id
return None
@ -2396,7 +2396,7 @@ def get_role_translation_label(workflow, role_id):
return workflow.roles.get(role_id)
else:
try:
return Role.get(role_id).name
return get_publisher().role_class.get(role_id).name
except KeyError:
return
@ -2406,7 +2406,7 @@ def get_role_name(role_id, charset=None):
if role_id.startswith('_') or role_id == 'logged-users':
return force_text(role_id, charset)
try:
return force_text(Role.get(role_id).name, charset)
return force_text(get_publisher().role_class.get(role_id).name, charset)
except KeyError:
return force_text(role_id, charset)
@ -2943,7 +2943,7 @@ class SendmailWorkflowStatusItem(WorkflowStatusItem):
dest = formdata.get_role_translation(dest)
try:
role = Role.get(dest)
role = get_publisher().role_class.get(dest)
except KeyError:
continue
addresses.extend(role.get_emails())