general: always store&use object id as strings (#6222)

This commit is contained in:
Frédéric Péters 2014-12-31 10:08:18 +01:00
parent a2ccc22baa
commit 5178ee8b4e
16 changed files with 165 additions and 71 deletions

View File

@ -1,6 +1,7 @@
import os
import shutil
import StringIO
import time
try:
import lasso
@ -356,7 +357,7 @@ def test_form_workflow_role():
resp = resp.click('change', href='role/_receiver')
resp.forms[0]['role_id'] = 'foobar'
resp = resp.forms[0].submit('submit')
assert FormDef.get(1).workflow_roles == {'_receiver': 1}
assert FormDef.get(1).workflow_roles == {'_receiver': '1'}
def test_form_workflow_options():
create_superuser()
@ -721,6 +722,57 @@ def test_form_edit_field_advanced():
assert resp.body.index('<legend>Additional parameters</legend>') > \
resp.body.index('<label for="form_data_source">External Data Source</label>')
def test_form_legacy_int_id():
create_superuser()
create_role()
Category.wipe()
cat = Category(name='Foo')
cat.store()
Workflow.wipe()
workflow = Workflow(name='Workflow One')
# create it with a single status
workflow.possible_status = [Workflow.get_default_workflow().possible_status[-1]]
workflow.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'form title'
formdef.fields = []
role = Role(name='ZAB') # Z to get sorted last
role.store()
# set attributes using integers
formdef.category_id = int(cat.id)
formdef.workflow_id = int(workflow.id)
formdef.workflow_roles = {'_receiver': int(role.id)}
formdef.roles = ['logged-users', int(role.id)]
formdef.store()
formdef = FormDef.get(formdef.id) # will run migrate
app = login(get_app(pub))
resp = app.get('/backoffice/forms/1/')
resp = resp.click(href='category')
assert resp.forms[0]['category_id'].value
resp = app.get('/backoffice/forms/1/')
resp = resp.click('change', href='workflow')
assert resp.forms[0]['workflow_id'].value
resp = app.get('/backoffice/forms/1/')
resp = resp.click('change', href='roles')
assert resp.forms[0]['roles$element0'].value == 'Logged Users'
assert resp.forms[0]['roles$element1'].value == 'ZAB'
resp = app.get('/backoffice/forms/1/')
resp = resp.click('change', href='role/_receiver')
assert resp.forms[0]['role_id'].value == 'ZAB'
def test_workflows():
app = login(get_app(pub))
app.get('/backoffice/workflows/')
@ -978,7 +1030,7 @@ def test_users_edit():
create_superuser()
user = pub.user_class(name='foo bar')
user.store()
assert user.id == 2
assert user.id == '2'
app = login(get_app(pub))
resp = app.get('/backoffice/users/2/')
@ -994,7 +1046,7 @@ def test_users_edit_new_account():
create_superuser()
user = pub.user_class(name='foo bar')
user.store()
assert user.id == 2
assert user.id == '2'
account_count = PasswordAccount.count()
app = login(get_app(pub))
@ -1385,12 +1437,12 @@ def test_categories_reorder():
resp = app.get('/backoffice/categories/update_order?order=1;2;3;')
categories = Category.select()
Category.sort_by_position(categories)
assert [x.id for x in categories] == [1, 2, 3]
assert [x.id for x in categories] == ['1', '2', '3']
resp = app.get('/backoffice/categories/update_order?order=3;1;2;')
categories = Category.select()
Category.sort_by_position(categories)
assert [x.id for x in categories] == [3, 1, 2]
assert [x.id for x in categories] == ['3', '1', '2']
def test_settings():
app = login(get_app(pub))

View File

@ -26,7 +26,6 @@ def teardown_module(module):
def test_store():
Category.wipe()
test = Category()
test.id = 1
test.name = 'Test'
test.description = 'Hello world'
test.store()
@ -39,7 +38,6 @@ def test_store():
def test_urlname():
Category.wipe()
test = Category()
test.id = 1
test.name = 'Test'
test.description = 'Hello world'
test.store()
@ -53,7 +51,6 @@ def test_sort_positions():
categories = []
for i in range(10):
test = Category()
test.id = i+1
test.name = 'Test %s' % i
test.position = 10-i
categories.append(test)
@ -84,7 +81,6 @@ def test_xml_export():
def test_xml_import():
Category.wipe()
test = Category()
test.id = 1
test.name = 'Test'
test.description = 'Hello world'
test.store()
@ -101,7 +97,7 @@ def test_load_old_pickle():
Category.wipe()
test = Category()
test.id = 1
test.id = '1'
test.name = 'Test'
test.description = 'Hello world'

View File

@ -64,7 +64,7 @@ def test_home(pub):
def test_home_category(pub):
formdef = create_formdef()
formdef.category_id = 1
formdef.category_id = '1'
formdef.store()
home = get_app(pub).get('/')
assert 'category-foobar' in home.body

View File

@ -196,7 +196,7 @@ def test_assertion_consumer_existing_federation():
saml2 = Saml2Directory()
assert req.session.user is None
body = saml2.assertionConsumerPost()
assert req.session.user == 17
assert req.session.user == '17'
def test_assertion_consumer_redirect_after_url():
setup_environment(pub)

View File

@ -38,7 +38,7 @@ def test_store():
test.value = 'value'
test.unique_value = 'unique-value'
test.store()
assert test.id == 1
assert test.id == '1'
def test_get():
test = Foobar.get(1)
@ -209,6 +209,18 @@ def test_select():
assert len(Foobar.select([st.Contains('unique_value', [24, 25, 86])])) == 2
assert len(Foobar.select([st.NotContains('unique_value', [24, 25, 86])])) == 48
def test_select_order_by():
Foobar.wipe()
for x in range(1, 51):
test = Foobar()
test.unique_value = 51-x
test.store()
assert [int(x.id) for x in Foobar.select(order_by='id')] == range(1, 51)
assert [int(x.id) for x in Foobar.select(order_by='-id')] == range(50, 0, -1)
assert [int(x.id) for x in Foobar.select(order_by='unique_value')] == range(50, 0, -1)
assert [int(x.id) for x in Foobar.select(order_by='-unique_value')] == range(1, 51)
def test_select_datetime():
Foobar.wipe()
@ -237,10 +249,10 @@ def test_select_limit_offset():
test.store()
assert len(Foobar.select()) == 50
assert [x.id for x in Foobar.select(order_by='id', limit=10)] == range(1, 11)
assert [x.id for x in Foobar.select(order_by='id', limit=10, offset=10)] == range(11, 21)
assert [x.id for x in Foobar.select(order_by='id', limit=20, offset=20)] == range(21, 41)
assert [x.id for x in Foobar.select(order_by='id', offset=10)] == range(11, 51)
assert [int(x.id) for x in Foobar.select(order_by='id', limit=10)] == range(1, 11)
assert [int(x.id) for x in Foobar.select(order_by='id', limit=10, offset=10)] == range(11, 21)
assert [int(x.id) for x in Foobar.select(order_by='id', limit=20, offset=20)] == range(21, 41)
assert [int(x.id) for x in Foobar.select(order_by='id', offset=10)] == range(11, 51)
def test_select_criteria_overlaps():
@ -267,12 +279,13 @@ def test_select_criteria_overlaps():
def test_count():
Foobar.wipe()
for x in range(1, 51):
for x in range(50):
test = Foobar()
test.value = x + 1
test.store()
assert Foobar.count() == 50
assert Foobar.count([st.Less('id', 26)]) == 25
assert Foobar.count([st.Less('value', 26)]) == 25
def test_select_criteria_or_and():
@ -280,15 +293,16 @@ def test_select_criteria_or_and():
for x in range(50):
test = Foobar()
test.value = x+1
test.store()
assert len(Foobar.select()) == 50
assert [x.id for x in Foobar.select([st.Or([st.Less('id', 10)])], order_by='id')] == range(1, 10)
assert [x.id for x in Foobar.select([st.Or([
st.Less('id', 10), st.Equal('id', 15)])], order_by='id')] == range(1, 10) + [15]
assert [x.id for x in Foobar.select([st.And([st.Less('id', 10),
st.Greater('id', 5)])], order_by='id')] == range(6, 10)
assert [int(x.id) for x in Foobar.select([st.Or([st.Less('value', 10)])], order_by='id')] == range(1, 10)
assert [int(x.id) for x in Foobar.select([st.Or([
st.Less('value', 10), st.Equal('value', 15)])], order_by='value')] == range(1, 10) + [15]
assert [int(x.id) for x in Foobar.select([st.And([st.Less('value', 10),
st.Greater('value', 5)])], order_by='id')] == range(6, 10)
def test_select_criteria_ilike():
@ -307,8 +321,8 @@ def test_select_criteria_ilike():
assert len(Foobar.select()) == 50
assert [x.id for x in Foobar.select([st.ILike('foo', 'bar')], order_by='id')] == range(21, 50)
assert [x.id for x in Foobar.select([st.ILike('foo', 'BAR')], order_by='id')] == range(21, 50)
assert [int(x.id) for x in Foobar.select([st.ILike('foo', 'bar')], order_by='id')] == range(21, 50)
assert [int(x.id) for x in Foobar.select([st.ILike('foo', 'BAR')], order_by='id')] == range(21, 50)
def test_store_async():
Foobar.wipe()
@ -317,7 +331,7 @@ def test_store_async():
test.value = 'value'
test.unique_value = 'unique-value'
test.store(async=True)
assert test.id == 1
assert test.id == '1'
t0 = time.time()
while not Foobar.keys():
time.sleep(0.1)
@ -343,7 +357,7 @@ def test_reversed_order():
test.store()
assert len(Foobar.select()) == 50
assert [x.id for x in Foobar.select(order_by='-id', limit=10)] == range(50, 40, -1)
assert [int(x.id) for x in Foobar.select(order_by='-id', limit=10)] == range(50, 40, -1)
def test_destroy_rebuild_index():
test_get_with_indexed_value()

View File

@ -67,7 +67,7 @@ def test_action_dispatch():
st1 = wf.add_status('Status1', 'st1')
role = Role()
role.id = 5
role.id = '5'
role.name = 'Test Role'
role.store()
@ -84,7 +84,7 @@ def test_action_dispatch():
wf2 = assert_import_export_works(wf)
# checks role id is imported as integer
assert(wf2.possible_status[0].items[0].role_id == 5)
assert(wf2.possible_status[0].items[0].role_id == '5')
def test_status_actions_named_role():
@ -103,7 +103,7 @@ def test_status_actions_named_role():
def test_status_actions_named_existing_role():
role = Role()
role.id = 2
role.id = '2'
role.name = 'Test Role named existing role'
role.store()
@ -119,7 +119,7 @@ def test_status_actions_named_existing_role():
wf2 = assert_import_export_works(wf)
assert '<item role_id="2">Test Role named existing role</item>' in ET.tostring(indent(wf.export_to_xml()))
assert wf2.possible_status[0].items[0].by == [2]
assert wf2.possible_status[0].items[0].by == ['2']
# check that it works even if the role_id is not set
xml_export_orig = ET.tostring(export_to_indented_xml(wf))
@ -127,17 +127,17 @@ def test_status_actions_named_existing_role():
'<item role_id="2">Test Role named existing role</item>',
'<item>Test Role named existing role</item>')
wf3 = Workflow.import_from_xml_tree(ET.parse(StringIO.StringIO(xml_export)))
assert wf3.possible_status[0].items[0].by == [2]
assert wf3.possible_status[0].items[0].by == ['2']
def test_status_actions_named_missing_role():
role = Role()
role.id = 3
role.id = '3'
role.name = 'Test Role A'
role.store()
role = Role()
role.id = 4
role.id = '4'
role.name = 'Test Role B'
role.store()
@ -159,7 +159,7 @@ def test_status_actions_named_missing_role():
xml_export = xml_export_orig.replace('<item role_id="3">Test Role A</item>',
'<item role_id="4">Test Role A</item>')
wf3 = Workflow.import_from_xml_tree(ET.parse(StringIO.StringIO(xml_export)))
assert wf3.possible_status[0].items[0].by == [3]
assert wf3.possible_status[0].items[0].by == ['3']
# check that it creates a new role if there's no match on id and name
xml_export = xml_export_orig.replace('<item role_id="3">Test Role A</item>',
@ -174,13 +174,13 @@ def test_status_actions_named_missing_role():
xml_export = xml_export_orig.replace('<item role_id="3">Test Role A</item>',
'<item role_id="3">Test Role C</item>')
wf3 = Workflow.import_from_xml_tree(ET.parse(StringIO.StringIO(xml_export)))
assert wf3.possible_status[0].items[0].by != [3]
assert wf3.possible_status[0].items[0].by != ['3']
assert Role.count() == nb_roles+1
# on the other hand, check that it uses the id when included_id is True
wf3 = Workflow.import_from_xml_tree(ET.parse(StringIO.StringIO(xml_export)),
include_id=True)
assert wf3.possible_status[0].items[0].by == [3]
assert wf3.possible_status[0].items[0].by == ['3']
def test_display_form_action():

View File

@ -341,18 +341,9 @@ class UsersDirectory(Directory):
if 'none' in checked_roles:
criterias.append(st.And([
st.Equal('is_admin', False), st.Equal('roles', [])]))
other_roles = [str(x) for x in checked_roles if x not in ('admin', 'none')]
other_roles = [x for x in checked_roles if x not in ('admin', 'none')]
if other_roles:
other_roles_set = set(other_roles)
if not get_publisher().is_using_postgresql():
# with pickle storage we have to duplicate all roles as
# they may have been stored as integers
for other_role in other_roles:
try:
other_roles_set.add(int(other_role))
except ValueError:
continue
criterias.append(st.Intersects('roles', other_roles_set))
criterias.append(st.Intersects('roles', other_roles))
criterias = [st.Or(criterias)]
query = get_request().form.get('q')

View File

@ -161,6 +161,33 @@ class FormDef(StorableObject):
self.max_field_id = max([lax_int(x.id) for x in self.fields])
changed = True
if type(self.category_id) is int:
self.category_id = str(self.category_id)
changed = True
if type(self.workflow_id) is int:
self.workflow_id = str(self.workflow_id)
changed = True
if self.roles:
for role in self.roles:
if type(role) is int:
self.roles = [str(x) for x in self.roles]
changed = True
break
if type(self.last_modification_user_id) is int:
self.last_modification_user_id = str(self.last_modification_user_id)
changed = True
if self.workflow_roles:
workflow_roles_list = self.workflow_roles.items()
for role_key, role_id in self.workflow_roles.items():
if type(role_id) is int:
self.workflow_roles = dict([(x, str(y)) for x, y in workflow_roles_list])
changed = True
break
if changed:
self.store()
@ -688,7 +715,7 @@ class FormDef(StorableObject):
if tree.find('category') is not None:
category_node = tree.find('category')
if include_id and category_node.attrib.get('category_id'):
formdef.category_id = int(category_node.attrib.get('category_id'))
formdef.category_id = str(category_node.attrib.get('category_id'))
else:
category = category_node.text.encode(charset)
for c in Category.select():

View File

@ -134,7 +134,7 @@ class FormDefUI(object):
limit=None, query=None, order_by=None, user=None, criterias=None):
formdata_class = self.formdef.data_class()
if selected_filter == 'all':
item_ids = [int(x) for x in formdata_class.keys()]
item_ids = formdata_class.keys()
else:
applied_filters = []
if selected_filter == 'pending':

View File

@ -1035,7 +1035,8 @@ class RootDirectory(AccessControlled, Directory):
r += htmltext('</div>')
if self.category:
formdefs = FormDef.select(lambda x: x.category_id == self.category.id,
formdefs = FormDef.select(
lambda x: str(x.category_id) == str(self.category.id),
order_by='name', ignore_errors=True)
else:
formdefs = FormDef.select(order_by='name', ignore_errors=True)
@ -1076,8 +1077,8 @@ class RootDirectory(AccessControlled, Directory):
Category.sort_by_position(cats)
one = False
for c in cats:
l2 = [x for x in list_forms if x.category_id == c.id]
l2_advertise = [x for x in advertised_forms if x.category_id == c.id]
l2 = [x for x in list_forms if str(x.category_id) == str(c.id)]
l2_advertise = [x for x in advertised_forms if str(x.category_id) == str(c.id)]
if l2 or l2_advertise:
r += self.form_list(l2, category = c,
session = session, user_forms = user_forms,
@ -1218,7 +1219,8 @@ class RootDirectory(AccessControlled, Directory):
if self.category:
formdefs = FormDef.select(lambda x: (
x.category_id == self.category.id and (not x.is_disabled() or x.disabled_redirection)),
str(x.category_id) == str(self.category.id) and (
not x.is_disabled() or x.disabled_redirection)),
order_by = 'name')
else:
formdefs = FormDef.select(lambda x: not x.is_disabled() or x.disabled_redirection,
@ -1280,7 +1282,7 @@ class RootDirectory(AccessControlled, Directory):
cats = Category.select()
Category.sort_by_position(cats)
for c in cats:
if [x for x in list_forms if x.category_id == c.id]:
if [x for x in list_forms if str(x.category_id) == str(c.id)]:
result.append(c)
return result

View File

@ -230,7 +230,11 @@ class StorableObject(object):
reverse = False
# only list can be sorted
objects = list(objects)
objects.sort(lambda x,y: cmp(getattr(x, order_by), getattr(y, order_by)))
if order_by == 'id':
cmp_function = lambda x, y: cmp(lax_int(x.id), lax_int(y.id))
else:
cmp_function = lambda x, y: cmp(getattr(x, order_by), getattr(y, order_by))
objects.sort(cmp_function)
if reverse:
objects.reverse()
if limit or offset:
@ -259,7 +263,7 @@ class StorableObject(object):
except OSError:
return cls.get_new_id(create=True)
os.close(fd)
return id
return str(id)
get_new_id = classmethod(get_new_id)
def get(cls, id, ignore_errors=False, ignore_migration=False):
@ -336,8 +340,10 @@ class StorableObject(object):
return None
raise KeyError()
o.__class__ = cls
if not ignore_migration and hasattr(cls, 'migrate'):
o.migrate()
if not ignore_migration:
o.id = str(o.id) # makes sure 'id' is a string
if hasattr(cls, 'migrate'):
o.migrate()
return o
get_filename = classmethod(get_filename)

View File

@ -82,7 +82,7 @@ class XmlStorableObject(StorableObject):
tree = tree.getroot()
if include_id and tree.attrib.get('id'):
obj.id = int(tree.attrib.get('id'))
obj.id = tree.attrib.get('id')
for attribute in cls.XML_NODES:
attribute_name, attribute_type = attribute

View File

@ -65,6 +65,13 @@ class User(StorableObject):
self.roles = [x for x in self.roles if x != 'site-admin']
changed = True
if self.roles:
for role in self.roles:
if type(role) is int:
self.roles = [str(x) for x in self.roles]
changed = True
break
if changed:
self.store()
@ -129,7 +136,7 @@ class User(StorableObject):
def get_users_with_role(cls, role_id):
# this will be slow with the pickle backend as there is no index
# suitable for Intersects()
return cls.select([st.Intersects('roles', [role_id])])
return cls.select([st.Intersects('roles', [str(role_id)])])
get_users_with_role = classmethod(get_users_with_role)
def get_users_with_name_identifier(cls, name_identifier):

View File

@ -45,7 +45,7 @@ class DispatchWorkflowStatusItem(WorkflowStatusItem):
options=[(None, '----')] + self.parent.parent.roles.items())
if 'role_id' in parameters:
form.add(SingleSelectWidget, '%srole_id' % prefix,
title=_('Value for role'), value=self.role_id,
title=_('Value for role'), value=str(self.role_id),
options=[(None, '----')] + get_user_roles())
def perform(self, formdata):
@ -53,7 +53,7 @@ class DispatchWorkflowStatusItem(WorkflowStatusItem):
return
if not formdata.workflow_roles:
formdata.workflow_roles = {}
formdata.workflow_roles[self.role_key] = self.role_id
formdata.workflow_roles[self.role_key] = str(self.role_id)
formdata.store()
register_item_class(DispatchWorkflowStatusItem)

View File

@ -31,7 +31,7 @@ class AddRoleWorkflowStatusItem(WorkflowStatusItem):
def add_parameters_widgets(self, form, parameters, prefix='', formdef=None):
if 'role_id' in parameters:
form.add(SingleSelectWidget, '%srole_id' % prefix,
title=_('Role to Add'), value=self.role_id,
title=_('Role to Add'), value=str(self.role_id),
options=[(None, '----')] + get_user_roles())
def role_id_export_to_xml(self, item, charset, include_id=False):
@ -45,6 +45,7 @@ class AddRoleWorkflowStatusItem(WorkflowStatusItem):
def perform(self, formdata):
if not self.role_id:
return
self.role_id = str(self.role_id)
if not formdata.user_id:
# we can't work on anonymous or user_hash'ed forms
return
@ -80,6 +81,7 @@ class RemoveRoleWorkflowStatusItem(WorkflowStatusItem):
def perform(self, formdata):
if not self.role_id:
return
self.role_id = str(self.role_id)
if not formdata.user_id:
# we can't work on anonymous or user_hash'ed forms
return

View File

@ -942,10 +942,7 @@ class WorkflowStatusItem(object):
return None
role_id = str(elem.attrib['role_id'])
if Role.has_key(role_id):
try:
return int(role_id)
except ValueError:
return str(role_id)
return role_id
else:
return None
@ -1021,9 +1018,9 @@ def get_role_translation(formdata, role_name):
role_id = formdata.workflow_roles.get(role_name)
if not role_id and formdata.formdef.workflow_roles:
role_id = formdata.formdef.workflow_roles.get(role_name)
return role_id
return str(role_id)
else:
return role_name
return str(role_name)
def get_role_translation_label(workflow, role_id):
if role_id == logged_users_role().id: