doublon d'utilisateur au provisionning (#75777) #192

Open
bdauvergne wants to merge 2 commits from wip/75777-Doublon-d-utilisateur-au-provisi into main
4 changed files with 148 additions and 62 deletions

View File

@ -2,6 +2,7 @@ import json
import os
import shutil
import tempfile
import threading
import uuid
import pytest
@ -12,6 +13,7 @@ from wcs.ctl.management.commands.hobo_notify import Command as HoboNotifyCommand
from wcs.qommon import force_str
from wcs.qommon.afterjobs import AfterJob
from wcs.qommon.http_request import HTTPRequest
from wcs.sql import cleanup_connection
from wcs.sql_criterias import NotNull
from .utilities import create_temporary_pub, get_app
@ -996,3 +998,37 @@ def test_hobo_notify_call_command(pub, alt_tempdir):
role.store()
call_command('hobo_notify', os.path.join(alt_tempdir, 'message.json'))
assert pub.role_class.count() == 0
def test_provisionning_concurrency(pub):
concurrency = 10
user_data = {
'uuid': 'a' * 32,
'first_name': 'John',
'last_name': 'Doé',
'email': 'john.doe@example.net',
'zipcode': '13400',
'is_superuser': False,
'is_active': True,
'roles': [],
}
for i in range(10):
pub.user_class.wipe()
b = threading.Barrier(concurrency)
def thread_function(b):
b.wait()
try:
HoboNotifyCommand.create_or_update_user(pub, user_data)
except Exception:
pass
cleanup_connection()
threads = [threading.Thread(target=thread_function, args=[b]) for i in range(concurrency)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
assert pub.user_class.count() == 1

View File

@ -3,6 +3,7 @@ import http.cookies
import io
import os
import shutil
import threading
import urllib.parse
import uuid
@ -24,6 +25,7 @@ from wcs.qommon.http_request import HTTPRequest
from wcs.qommon.ident.idp import MethodAdminDirectory
from wcs.qommon.misc import get_lasso_server
from wcs.qommon.saml2 import Saml2Directory, SOAPException
from wcs.sql import cleanup_connection
from .test_fc_auth import get_session
from .test_hobo_notify import PROFILE
@ -835,3 +837,28 @@ def test_opened_session_backoffice_url(pub):
resp = app.get('/backoffice/studio/')
assert resp.status_int == 302
assert urllib.parse.parse_qs(urllib.parse.urlparse(resp.location).query).get('IsPassive')
def test_sso_provisionning_concurrency(pub):
concurrency = 10
directory = Saml2Directory()
login = mock.Mock()
login.identity.dump.return_value = ''
for i in range(3):
pub.user_class.wipe()
b = threading.Barrier(concurrency)
def thread_function(b):
b.wait()
directory.get_or_create_user_by_name_id(login, 'abcd')
cleanup_connection()
threads = [threading.Thread(target=thread_function, args=[b]) for i in range(concurrency)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
assert pub.user_class.count() == 1

View File

@ -15,7 +15,9 @@
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import json
import random
import sys
import time
from quixote import get_publisher
@ -134,7 +136,6 @@ class Command(TenantCommand):
@classmethod
def provision_user(cls, publisher, issuer, action, data, full=False):
formdef = UserFieldsFormDef(publisher=publisher)
User = publisher.user_class
if full:
@ -143,45 +144,7 @@ class Command(TenantCommand):
for o in data:
try:
if action == 'provision':
if not cls.check_valid_user(o):
raise ValueError('invalid user')
uuid = o['uuid']
users = User.get_users_with_name_identifier(uuid)
if len(users) > 1:
raise Exception('duplicate users')
if users:
user = users[0]
else:
user = User(uuid)
user.form_data = user.form_data or {}
for field in formdef.fields:
if not field.id.startswith('_'):
continue
field_value = o.get(field.id[1:])
if field.convert_value_from_anything:
try:
field_value = field.convert_value_from_anything(field_value)
except ValueError as e:
publisher.record_error(exception=e, context='[PROVISIONNING]', notify=True)
continue
user.form_data[field.id] = field_value
user.name_identifiers = [uuid]
# reset roles
user.is_active = o.get('is_active', True)
user.is_admin = o.get('is_superuser', False)
user.roles = []
for role_ref in o.get('roles', []):
role = get_publisher().role_class.resolve(role_ref['uuid'])
if role and role.id not in user.roles:
user.add_roles([role.id])
user.set_attributes_from_formdata(user.form_data)
user.store()
# verify we did not produce a doublon
users = User.get_users_with_name_identifier(uuid)
for doublon in users:
if int(doublon.id) < int(user.id): # we are not the first so backoff
user.remove_self()
break
cls.create_or_update_user(publisher, o)
elif action == 'deprovision':
if 'uuid' not in o:
raise KeyError('user without uuid')
@ -190,3 +153,52 @@ class Command(TenantCommand):
user.set_deleted()
except Exception as e:
publisher.record_error(exception=e, context='[PROVISIONNING]', notify=True)
@classmethod
def create_or_update_user(cls, publisher, o, retry=0):
User = publisher.user_class
formdef = UserFieldsFormDef(publisher=publisher)
if retry > 3:
raise Exception('user provisionning failed after %s tries.' % retry)
if retry > 0:
time.sleep(random.random() * 2 * retry)
if not cls.check_valid_user(o):
raise ValueError('invalid user')
uuid = o['uuid']
users = User.get_users_with_name_identifier(uuid)
if len(users) > 1:
raise Exception('duplicate users')
if users:
user = users[0]
else:
user = User(uuid)
user.form_data = user.form_data or {}
for field in formdef.fields:
if not field.id.startswith('_'):
continue
field_value = o.get(field.id[1:])
if field.convert_value_from_anything:
try:
field_value = field.convert_value_from_anything(field_value)
except ValueError as e:
publisher.record_error(exception=e, context='[PROVISIONNING]', notify=True)
continue
user.form_data[field.id] = field_value
user.name_identifiers = [uuid]
# reset roles
user.is_active = o.get('is_active', True)
user.is_admin = o.get('is_superuser', False)
user.roles = []
for role_ref in o.get('roles', []):
role = get_publisher().role_class.resolve(role_ref['uuid'])
if role and role.id not in user.roles:
user.add_roles([role.id])
user.set_attributes_from_formdata(user.form_data)
user.store()
# verify we did not produce a doublon
users = User.get_users_with_name_identifier(uuid)
if len(users) > 1:
user.remove_self()
cls.create_or_update_user(publisher, o, retry=retry + 1)

View File

@ -15,6 +15,7 @@
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import os
import random
import time
import urllib.parse
from xml.sax.saxutils import escape
@ -539,33 +540,43 @@ class Saml2Directory(Directory):
if save:
user.store()
def get_or_create_user_by_name_id(self, login, ni, retry=0):

T'ajoutes cette méthode uniquement pour les tests ? Elle ne devrait pas être utilisée lors du provisionning ?

T'ajoutes cette méthode uniquement pour les tests ? Elle ne devrait pas être utilisée lors du provisionning ?

C'est le souci avec les relectures qui prennent 10 mois c'est que je ne sais plus mais à regarder oui j'ai séparé le code pour le rendre plus facilement testable en isolation, soit j'ai oublié de le rebrancher dans lookup_user() initialement soit ça s'est perdu dans un rebase, je ne peux plus voir les versions intermédiaires du patch, gitea ne les a visiblement pas conservées. En attendant j'ai branché get_user_or_create_user_by_name_id dans lookup_user dans cette dernière version.

C'est le souci avec les relectures qui prennent 10 mois c'est que je ne sais plus mais à regarder oui j'ai séparé le code pour le rendre plus facilement testable en isolation, soit j'ai oublié de le rebrancher dans lookup_user() initialement soit ça s'est perdu dans un rebase, je ne peux plus voir les versions intermédiaires du patch, gitea ne les a visiblement pas conservées. En attendant j'ai branché get_user_or_create_user_by_name_id dans lookup_user dans cette dernière version.
user_class = get_publisher().user_class
if retry > 3:
raise Exception('user lookup on sso failed after %s tries.' % retry)
if retry > 0:
time.sleep(random.random() * 2 * retry)
users = sorted(
user_class.get_users_with_name_identifier(ni), key=lambda u: (u.last_seen or 0, -int(u.id))
)
if users:
# if multiple users, use the more recently used or the younger
user = users[-1]
else:
user = get_publisher().user_class(ni)
user.name_identifiers = [ni]
if login.identity:
user.lasso_dump = login.identity.dump()
user.store()
others = user_class.get_users_with_name_identifier(ni)
# there is an user mapping to the same id with a younger id:
# try again.
if len(others) > 1:
user.remove_self()
return self.get_or_create_user_by_name_id(login, ni, retry=retry + 1)
return user
def lookup_user(self, session, login):
if not login.nameIdentifier or not login.nameIdentifier.content:
return None
user_class = get_publisher().user_class
ni = login.nameIdentifier.content
session.name_identifier = ni
while True:
users = sorted(
user_class.get_users_with_name_identifier(ni), key=lambda u: (u.last_seen or 0, -int(u.id))
)
if users:
# if multiple users, use the more recently used or the younger
user = users[-1]
else:
user = get_publisher().user_class(ni)
user.name_identifiers = [ni]
if login.identity:
user.lasso_dump = login.identity.dump()
user.store()
others = user_class.get_users_with_name_identifier(ni)
# there is an user mapping to the same id with a younger id:
# try again.
if any(int(other.id) < int(user.id) for other in others):
user.remove_self()
continue
break
session.name_identifier = ni
user = self.get_or_create_user_by_name_id(login, ni)
self.fill_user_attributes(session, login, user)