wip: correctly check for import errors

This commit is contained in:
Benjamin Dauvergne 2023-01-18 17:06:00 +01:00
parent ed8b7ee6ea
commit 615dbcfe2a
2 changed files with 125 additions and 79 deletions

View File

@ -326,6 +326,14 @@ class CsvRow:
def has_cell_errors(self):
return any(cell.errors for cell in self)
@property
def all_errors(self):
errors = {}
if self.errors:
errors['__all__'] = self.errors
errors.update({cell.name: cell.errors for cell in self.cells if cell.errors})
return errors
def __iter__(self):
return iter(self.cells)
@ -346,6 +354,10 @@ class CsvCell:
def column(self):
return self.header.column
@property
def name(self):
return self.header.name
class Simulate(Exception):
pass
@ -415,7 +427,7 @@ class UserCsvImporter:
break
self.has_errors = self.has_errors or bool(self.errors)
return not bool(self.errors)
return not self.has_errors
def parse_header_row(self):
self.headers = []
@ -827,3 +839,12 @@ class UserCsvImporter:
if cell.value:
PasswordReset.objects.get_or_create(user=user)
return True
@property
def all_errors(self):
errors = []
errors.extend(self.errors)
for i, row in enumerate(self.rows):
if not row.is_valid:
errors.append((i, row.all_errors))
return errors

View File

@ -58,20 +58,6 @@ def profile(db):
Attribute.objects.create(name='phone', kind='phone_number', label='Numéro de téléphone')
@pytest.fixture
def csv_importer_factory(encoding, style):
def factory(content):
content = content.encode(encoding)
if style == 'file':
content = io.BytesIO(content)
importer = CsvImporter()
run = importer.run
importer.run = lambda *args, **kwargs: run(content, *args, encoding=encoding, **kwargs)
return importer
return factory
@pytest.fixture
def user_csv_importer_factory(encoding, style):
def factory(content):
@ -196,7 +182,7 @@ fpeters@entrouvert.com,Frédéric,Péters,+3281005678
x,x,x,x'''
importer = user_csv_importer_factory(content)
assert importer.run(), importer.errors
importer.run()
assert importer.headers == [
CsvHeader(1, 'email', field=True, key=True, verified=True),
CsvHeader(2, 'first_name', field=True),
@ -248,7 +234,7 @@ fpeters@entrouvert.com,Frédéric,Péters,+3281005678
x,x,x,x'''
importer = user_csv_importer_factory(content)
assert importer.run(simulate=True), importer.errors
assert not importer.run(simulate=True)
assert importer.headers == [
CsvHeader(1, 'email', field=True, key=True, verified=True),
CsvHeader(2, 'first_name', field=True),
@ -281,8 +267,8 @@ tnoel@entrouvert.com,Thomas,Noël,0123456789'''
user = User.objects.create(ou=get_default_ou())
user.attributes.phone = '+33123456789'
assert importer.run()
assert not importer.run()
assert importer.has_error
assert importer.created == 0
assert importer.updated == 0
assert len(importer.rows) == 1
@ -302,7 +288,7 @@ tnoel@entrouvert.com,Thomas,Noël,0123456789'''
user = User.objects.create()
user.attributes.phone = '+33123456789'
assert importer.run()
assert importer.run(), importer.all_errors
assert len(importer.rows) == 1
assert importer.rows[0].is_valid
@ -322,8 +308,8 @@ tnoel@entrouvert.com,Thomas,Noël,0123456789'''
user = User.objects.create()
user.attributes.phone = '+33123456789'
assert importer.run()
assert not importer.run()
assert importer.has_errors
assert importer.created == 0
assert importer.updated == 0
assert len(importer.rows) == 1
@ -340,8 +326,8 @@ tnoel@entrouvert.com,Thomas,Noël,0606060606
tnoel@entrouvert.com,Frédéric,Péters,+3281123456'''
importer = user_csv_importer_factory(content)
assert importer.run()
assert not importer.run()
assert importer.has_errors
assert importer.created == 1
assert importer.updated == 0
assert len(importer.rows) == 2
@ -362,8 +348,8 @@ tnoel@entrouvert.com,Thomas,Noël,0123456789'''
user = User.objects.create(email='tnoel@entrouvert.com', ou=get_default_ou())
assert importer.run()
assert not importer.run()
assert importer.has_errors
assert importer.created == 0
assert importer.updated == 0
assert len(importer.rows) == 1
@ -384,8 +370,8 @@ tnoel@entrouvert.com,Thomas,Noël,0123456789'''
User.objects.create(email='tnoel@entrouvert.com', ou=get_default_ou())
assert importer.run()
assert not importer.run()
assert importer.has_errors
assert importer.created == 0
assert importer.updated == 0
assert len(importer.rows) == 1
@ -406,7 +392,7 @@ tnoel@entrouvert.com,Thomas,Noël,0123456789'''
thomas = User.objects.create(email='tnoel@entrouvert.com', ou=get_default_ou())
assert importer.run()
assert importer.run(), importer.all_errors
assert importer.created == 0
assert importer.updated == 1
@ -431,7 +417,7 @@ app1,2,tnoel@entrouvert.com,Thomas,Noël,0606060606
'''
importer = user_csv_importer_factory(content)
assert importer.run(), importer.errors
assert importer.run(), importer.all_errors
assert importer.headers == [
CsvHeader(1, '_source_name'),
CsvHeader(2, '_source_id', key=True),
@ -454,7 +440,7 @@ app1,2,tnoel@entrouvert.com,Thomas,Noël,0606060606
assert thomas.attributes.phone == '+33606060606'
importer = user_csv_importer_factory(content)
assert importer.run(), importer.errors
assert importer.run(), importer.all_errors
assert not importer.has_errors
@ -468,26 +454,26 @@ def test_user_roles_csv(profile, user_csv_importer_factory):
content_name_add = '\n'.join((base_header + '_role_name', base_user + role_name))
importer = user_csv_importer_factory(content_name_add)
assert importer.run()
assert importer.run(), importer.all_errors
thomas = User.objects.get(email='tnoel@entrouvert.com')
assert thomas in role.members.all()
thomas.roles.add(role2)
importer = user_csv_importer_factory(content_name_add)
assert importer.run()
assert importer.run(), importer.all_errors
thomas.refresh_from_db()
assert thomas in role2.members.all()
content_name_delete = '\n'.join((base_header + '_role_name delete', base_user + role_name))
importer = user_csv_importer_factory(content_name_delete)
assert importer.run()
assert importer.run(), importer.all_errors
thomas.refresh_from_db()
assert thomas not in role.members.all()
assert thomas in role2.members.all()
content_name_clear = '\n'.join((base_header + '_role_name clear', base_user + role_name))
importer = user_csv_importer_factory(content_name_clear)
assert importer.run()
assert importer.run(), importer.all_errors
thomas.refresh_from_db()
assert thomas in role.members.all()
assert thomas not in role2.members.all()
@ -497,7 +483,7 @@ def test_user_roles_csv(profile, user_csv_importer_factory):
(base_header + '_role_name', base_user + role_name, base_user + 'test2')
)
importer = user_csv_importer_factory(content_name_add_multiple)
assert importer.run()
assert importer.run(), importer.all_errors
thomas.refresh_from_db()
assert thomas in role.members.all()
assert thomas in role2.members.all()
@ -508,7 +494,7 @@ def test_user_roles_csv(profile, user_csv_importer_factory):
(base_header + '_role_name clear', base_user + role_name, base_user + 'test2')
)
importer = user_csv_importer_factory(content_name_clear_multiple)
assert importer.run()
assert importer.run(), importer.all_errors
thomas.refresh_from_db()
assert thomas in role.members.all()
assert thomas in role2.members.all()
@ -516,7 +502,7 @@ def test_user_roles_csv(profile, user_csv_importer_factory):
thomas.roles.remove(role)
content_slug_add = '\n'.join((base_header + '_role_slug', base_user + role_slug))
importer = user_csv_importer_factory(content_slug_add)
assert importer.run()
assert importer.run(), importer.all_errors
thomas.refresh_from_db()
assert thomas in role.members.all()
@ -524,13 +510,13 @@ def test_user_roles_csv(profile, user_csv_importer_factory):
content_only_key = '''email key,_role_name
tnoel@entrouvert.com,test_name'''
importer = user_csv_importer_factory(content_only_key)
assert importer.run()
assert importer.run(), importer.all_errors
thomas.refresh_from_db()
assert thomas in role.members.all()
content_name_error = '\n'.join((base_header + '_role_name', base_user + 'bad_name'))
importer = user_csv_importer_factory(content_name_error)
assert importer.run()
assert not importer.run()
assert importer.has_errors
assert importer.rows[0].cells[-1].errors[0].code == 'role-not-found'
@ -554,11 +540,11 @@ def test_csv_registration_options(profile, user_csv_importer_factory):
tnoel@entrouvert.com,Thomas,Noël,'''
importer = user_csv_importer_factory(content + 'send-email')
assert importer.run(simulate=True)
assert importer.run(simulate=True), importer.all_errors
assert not mail.outbox
importer = user_csv_importer_factory(content + 'send-email')
assert importer.run()
assert importer.run(), importer.all_errors
thomas = User.objects.get(email='tnoel@entrouvert.com')
assert len(mail.outbox) == 1
assert 'https://testserver/password/reset/confirm/' in mail.outbox[0].body
@ -566,41 +552,41 @@ tnoel@entrouvert.com,Thomas,Noël,'''
password = thomas.password
del mail.outbox[0]
importer = user_csv_importer_factory(content + 'send-email')
assert importer.run()
assert importer.run(), importer.all_errors
thomas.refresh_from_db()
assert thomas.password == password
assert not mail.outbox
importer = user_csv_importer_factory(content + 'invalid-option')
assert importer.run()
assert not importer.run()
assert importer.has_errors
assert importer.rows[0].cells[-1].errors[0].code == 'data-error'
def test_csv_force_password_reset(profile, user_csv_importer_factory):
importer = user_csv_importer_factory('email key,first_name,last_name\ntnoel@entrouvert.com,Thomas,Noël')
assert importer.run()
assert importer.run(), importer.all_errors
thomas = User.objects.get(email='tnoel@entrouvert.com')
assert not PasswordReset.objects.filter(user=thomas).exists()
csv = 'email key,first_name,last_name,@force-password-reset\ntnoel@entrouvert.com,Thomas,Noël,'
importer = user_csv_importer_factory(csv)
assert importer.run()
assert importer.run(), importer.all_errors
thomas = User.objects.get(email='tnoel@entrouvert.com')
assert not PasswordReset.objects.filter(user=thomas).exists()
importer = user_csv_importer_factory(csv + 'false')
assert importer.run()
assert importer.run(), importer.all_errors
thomas = User.objects.get(email='tnoel@entrouvert.com')
assert not PasswordReset.objects.filter(user=thomas).exists()
importer = user_csv_importer_factory(csv + 'true')
assert importer.run()
assert importer.run(), importer.all_errors
thomas = User.objects.get(email='tnoel@entrouvert.com')
assert PasswordReset.objects.filter(user=thomas).exists()
importer = user_csv_importer_factory(csv + 'any other value')
assert importer.run()
assert importer.run(), importer.all_errors
thomas = User.objects.get(email='tnoel@entrouvert.com')
assert PasswordReset.objects.filter(user=thomas).exists()
@ -627,30 +613,50 @@ remote,8,john.doe,,,
for i, line in enumerate(content.splitlines()):
print(i, line.count(','))
importer = user_csv_importer_factory(content)
importer.run()
assert importer.has_errors
assert not importer.errors
assert not importer.run()
assert importer.headers_by_name['username'].unique
assert importer.headers_by_name['username'].globally_unique
assert importer.headers_by_name['email'].unique
assert not importer.headers_by_name['email'].globally_unique
row_errors = {
row.line: [error.description for error in row.errors] for row in importer.rows if row.errors
}
assert row_errors == {
6: [
'Unique constraint on column "email" failed: value already appear on line 2',
'Unique constraint on column "email" failed',
],
7: ['Unique constraint on column "username" failed: value already appear on line 3'],
9: ['Unique constraint on column "username" failed'],
}
cell_errors = {
row.line: {cell.header.name: [error for error in cell.errors] for cell in row.cells if cell.errors}
for row in importer.rows
if any(cell.errors for cell in row.cells)
}
assert cell_errors == {}
assert importer.all_errors == [
(
4,
{
'__all__': [
Error(
code='unique-constraint-failed',
description='Unique constraint on column "email" failed: value already appear on line 2',
),
Error(
code='unique-constraint-failed',
description='Unique constraint on column "email" failed',
),
]
},
),
(
5,
{
'__all__': [
Error(
code='unique-constraint-failed',
description='Unique constraint on column "username" failed: value already appear on line 3',
)
]
},
),
(
7,
{
'__all__': [
Error(
code='unique-constraint-failed',
description='Unique constraint on column "username" failed',
)
]
},
),
]
def test_csv_password_hash(profile, user_csv_importer_factory):
@ -659,25 +665,45 @@ tnoel@entrouvert.com,Thomas,Noël,%s'''
password_hash = make_password('hop')
importer = user_csv_importer_factory(content % password_hash)
assert importer.run()
assert importer.run(), importer.all_errors
thomas = User.objects.get(email='tnoel@entrouvert.com')
assert check_password('hop', thomas.password)
password_hash = make_password('test', hasher='sha256')
importer = user_csv_importer_factory(content % password_hash)
assert importer.run()
assert importer.run(), importer.all_errors
thomas.refresh_from_db()
assert check_password('test', thomas.password)
importer = user_csv_importer_factory(content % 'wrong-format')
assert importer.run()
assert importer.has_errors
assert 'unknown hashing algorithm' in importer.rows[0].cells[-1].errors[0].description
assert not importer.run()
assert importer.all_errors == [
(
0,
{
'password_hash': [
Error(
code='data-error', description='Invalid password format or unknown hashing algorithm.'
)
]
},
),
]
importer = user_csv_importer_factory(content)
assert importer.run()
assert importer.has_errors
assert 'unknown hashing algorithm' in importer.rows[0].cells[-1].errors[0].description
assert not importer.run(), importer.all_errors
assert importer.all_errors == [
(
0,
{
'password_hash': [
Error(
code='data-error', description='Invalid password format or unknown hashing algorithm.'
)
]
},
),
]
def test_same_key_different_ou(db, user_csv_importer_factory):
@ -693,8 +719,7 @@ def test_same_key_different_ou(db, user_csv_importer_factory):
john.doe@example.com,John,Doe2'''
importer = user_csv_importer_factory(content)
importer.run(ou=ou_agent)
assert not importer.has_errors, importer.errors
assert importer.run(ou=ou_agent), importer.all_errors
assert User.objects.count() == 2
user1.refresh_from_db()
user2.refresh_from_db()