authentic/tests/test_csv_import.py

388 lines
14 KiB
Python

# -*- coding: utf-8 -*-
# authentic2 - versatile identity manager
# Copyright (C) 2010-2019 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from __future__ import unicode_literals
import pytest
import io
from authentic2.custom_user.models import User
from authentic2.models import Attribute
from authentic2.a2_rbac.utils import get_default_ou
from authentic2.csv_import import CsvImporter, UserCsvImporter, CsvHeader, Error, LineError
ENCODINGS = [
'iso-8859-1',
'iso-8859-15',
'utf-8',
'cp1252',
]
def pytest_generate_tests(metafunc):
if 'encoding' in metafunc.fixturenames:
metafunc.parametrize('encoding', ENCODINGS)
if 'style' in metafunc.fixturenames:
metafunc.parametrize('style', ['str', 'file'])
@pytest.fixture
def profile(db):
Attribute.objects.create(name='phone', kind='phone_number', label='Numéro de téléphone')
@pytest.fixture
def csv_importer_factory(encoding, style):
def factory(content):
content = content.encode(encoding)
if style == 'file':
content = io.BytesIO(content)
importer = CsvImporter()
run = importer.run
importer.run = lambda *args, **kwargs: run(content, *args, encoding=encoding, **kwargs)
return importer
return factory
@pytest.fixture
def user_csv_importer_factory(encoding, style):
def factory(content):
content = content.encode(encoding)
if style == 'file':
content = io.BytesIO(content)
importer = UserCsvImporter()
run = importer.run
importer.run = lambda *args, **kwargs: run(content, *args, encoding=encoding, **kwargs)
return importer
return factory
def test_unknown_csv_dialect_error(profile, user_csv_importer_factory):
importer = user_csv_importer_factory('')
assert not importer.run()
assert importer.has_errors
assert importer.errors == [Error('unknown-csv-dialect')]
def test_empty_header_row_error(profile, user_csv_importer_factory):
importer = user_csv_importer_factory('\n1,2,3')
assert not importer.run()
assert importer.has_errors
assert importer.errors == [Error('empty-header-row')]
def test_unknown_or_missing_attribute_error1(profile, user_csv_importer_factory):
importer = user_csv_importer_factory('email key,first_name," "\n1,2,3')
assert not importer.run()
assert importer.has_errors
assert importer.errors == [LineError('unknown-or-missing-attribute', line=1, column=2)]
def test_unknown_or_missing_attribute_error2(profile, user_csv_importer_factory):
importer = user_csv_importer_factory('email key,first_name,x\n1,2,3')
assert not importer.run()
assert importer.has_errors
assert importer.errors == [LineError('unknown-or-missing-attribute', line=1, column=3)]
def test_unknown_flag_error(profile, user_csv_importer_factory):
importer = user_csv_importer_factory('email key,first_name xxx\n1,2')
assert not importer.run()
assert importer.has_errors
assert importer.errors == [LineError('unknown-flag', line=1, column=2)]
def test_missing_key_column_error(profile, user_csv_importer_factory):
importer = user_csv_importer_factory('email,first_name\n1,2')
assert not importer.run()
assert importer.has_errors
assert importer.errors == [Error('missing-key-column')]
def test_too_many_key_columns_error(profile, user_csv_importer_factory):
importer = user_csv_importer_factory('email key,first_name key\n1,2')
assert not importer.run()
assert importer.has_errors
assert importer.errors == [Error('too-many-key-columns')]
def test_run(profile, user_csv_importer_factory):
assert User.objects.count() == 0
content = '''email key,first_name,last_name,phone update
tnoel@entrouvert.com,Thomas,Noël,1234
fpeters@entrouvert.com,Frédéric,Péters,5678
x,x,x,x'''
importer = user_csv_importer_factory(content)
assert importer.run(), importer.errors
assert importer.headers == [
CsvHeader(1, 'email', field=True, key=True, verified=True),
CsvHeader(2, 'first_name', field=True),
CsvHeader(3, 'last_name', field=True),
CsvHeader(4, 'phone', attribute=True),
]
assert importer.has_errors
assert len(importer.rows) == 3
assert all(row.is_valid for row in importer.rows[:2])
assert not importer.rows[2].is_valid
assert importer.rows[2].cells[0].errors
assert all(error == Error('data-error') for error in importer.rows[2].cells[0].errors)
assert not importer.rows[2].cells[1].errors
assert not importer.rows[2].cells[2].errors
assert importer.rows[2].cells[3].errors
assert all(error == Error('data-error') for error in importer.rows[2].cells[3].errors)
assert importer.updated == 0
assert importer.created == 2
assert User.objects.count() == 2
thomas = User.objects.get(email='tnoel@entrouvert.com')
assert thomas.ou == get_default_ou()
assert thomas.email_verified is True
assert thomas.first_name == 'Thomas'
assert thomas.attributes.first_name == 'Thomas'
assert thomas.last_name == 'Noël'
assert thomas.attributes.last_name == 'Noël'
assert thomas.attributes.phone == '1234'
fpeters = User.objects.get(email='fpeters@entrouvert.com')
assert fpeters.ou == get_default_ou()
assert fpeters.first_name == 'Frédéric'
assert fpeters.email_verified is True
assert fpeters.attributes.first_name == 'Frédéric'
assert fpeters.last_name == 'Péters'
assert fpeters.attributes.last_name == 'Péters'
assert fpeters.attributes.phone == '5678'
def test_simulate(profile, user_csv_importer_factory):
assert User.objects.count() == 0
content = '''email key,first_name,last_name,phone update
tnoel@entrouvert.com,Thomas,Noël,1234
fpeters@entrouvert.com,Frédéric,Péters,5678
x,x,x,x'''
importer = user_csv_importer_factory(content)
assert importer.run(simulate=True), importer.errors
assert importer.headers == [
CsvHeader(1, 'email', field=True, key=True, verified=True),
CsvHeader(2, 'first_name', field=True),
CsvHeader(3, 'last_name', field=True),
CsvHeader(4, 'phone', attribute=True),
]
assert importer.has_errors
assert len(importer.rows) == 3
assert all(row.is_valid for row in importer.rows[:2])
assert not importer.rows[2].is_valid
assert importer.rows[2].cells[0].errors
assert all(error == Error('data-error') for error in importer.rows[2].cells[0].errors)
assert not importer.rows[2].cells[1].errors
assert not importer.rows[2].cells[2].errors
assert importer.rows[2].cells[3].errors
assert all(error == Error('data-error') for error in importer.rows[2].cells[3].errors)
assert importer.updated == 0
assert importer.created == 2
assert User.objects.count() == 0
def test_create_unique_error(profile, user_csv_importer_factory):
content = '''email key verified,first_name,last_name,phone unique
tnoel@entrouvert.com,Thomas,Noël,1234'''
importer = user_csv_importer_factory(content)
user = User.objects.create(ou=get_default_ou())
user.attributes.phone = '1234'
assert importer.run()
assert importer.created == 0
assert importer.updated == 0
assert len(importer.rows) == 1
assert not importer.rows[0].is_valid
assert importer.rows[0].action == 'create'
assert all(not cell.errors for cell in importer.rows[0])
assert all(not cell.action for cell in importer.rows[0])
assert importer.rows[0].errors == [Error('unique-constraint-failed')]
def test_create_unique_in_ou(profile, user_csv_importer_factory):
content = '''email key verified,first_name,last_name,phone unique
tnoel@entrouvert.com,Thomas,Noël,1234'''
importer = user_csv_importer_factory(content)
user = User.objects.create()
user.attributes.phone = '1234'
assert importer.run()
assert len(importer.rows) == 1
assert importer.rows[0].is_valid
assert importer.rows[0].action == 'create'
assert all(not cell.errors for cell in importer.rows[0])
assert all(cell.action == 'updated' for cell in importer.rows[0])
assert importer.created == 1
assert importer.updated == 0
def test_create_unique_globally_error(profile, user_csv_importer_factory):
content = '''email key verified,first_name,last_name,phone globally-unique
tnoel@entrouvert.com,Thomas,Noël,1234'''
importer = user_csv_importer_factory(content)
user = User.objects.create()
user.attributes.phone = '1234'
assert importer.run()
assert importer.created == 0
assert importer.updated == 0
assert len(importer.rows) == 1
assert not importer.rows[0].is_valid
assert importer.rows[0].action == 'create'
assert all(not cell.errors for cell in importer.rows[0])
assert all(not cell.action for cell in importer.rows[0])
assert importer.rows[0].errors == [Error('unique-constraint-failed')]
def test_create_key_self_reference_error(profile, user_csv_importer_factory):
content = '''email key,first_name,last_name,phone
tnoel@entrouvert.com,Thomas,Noël,1234
tnoel@entrouvert.com,Frédéric,Péters,1234'''
importer = user_csv_importer_factory(content)
assert importer.run()
assert importer.created == 1
assert importer.updated == 0
assert len(importer.rows) == 2
assert importer.rows[0].is_valid
assert importer.rows[0].action == 'create'
assert not importer.rows[1].is_valid
assert importer.rows[1].action == 'update'
assert importer.rows[1].errors == [Error('unique-constraint-failed')]
def test_update_unique_error(profile, user_csv_importer_factory):
content = '''email key verified,first_name,last_name,phone unique update
tnoel@entrouvert.com,Thomas,Noël,1234'''
importer = user_csv_importer_factory(content)
user = User.objects.create(ou=get_default_ou())
user.attributes.phone = '1234'
user = User.objects.create(email='tnoel@entrouvert.com', ou=get_default_ou())
assert importer.run()
assert importer.created == 0
assert importer.updated == 0
assert len(importer.rows) == 1
assert not importer.rows[0].is_valid
assert importer.rows[0].action == 'update'
assert all(not cell.errors for cell in importer.rows[0])
assert all(not cell.action for cell in importer.rows[0])
assert importer.rows[0].errors == [Error('unique-constraint-failed')]
def test_update_unique_globally_error(profile, user_csv_importer_factory):
content = '''email key verified,first_name,last_name,phone globally-unique update
tnoel@entrouvert.com,Thomas,Noël,1234'''
importer = user_csv_importer_factory(content)
user = User.objects.create()
user.attributes.phone = '1234'
User.objects.create(email='tnoel@entrouvert.com', ou=get_default_ou())
assert importer.run()
assert importer.created == 0
assert importer.updated == 0
assert len(importer.rows) == 1
assert not importer.rows[0].is_valid
assert importer.rows[0].action == 'update'
assert all(not cell.errors for cell in importer.rows[0])
assert all(not cell.action for cell in importer.rows[0])
assert importer.rows[0].errors == [Error('unique-constraint-failed')]
def test_update_unique_globally(profile, user_csv_importer_factory):
content = '''email key verified no-update,first_name no-update,last_name no-update,phone unique update
tnoel@entrouvert.com,Thomas,Noël,1234'''
importer = user_csv_importer_factory(content)
user = User.objects.create()
user.attributes.phone = '1234'
thomas = User.objects.create(email='tnoel@entrouvert.com', ou=get_default_ou())
assert importer.run()
assert importer.created == 0
assert importer.updated == 1
assert len(importer.rows) == 1
assert importer.rows[0].is_valid
assert importer.rows[0].action == 'update'
assert all(not cell.errors for cell in importer.rows[0])
assert all(cell.action == 'nothing' for cell in importer.rows[0].cells[:3])
assert importer.rows[0].cells[3].action == 'updated'
thomas.refresh_from_db()
assert not thomas.first_name
assert not thomas.last_name
assert thomas.attributes.phone == '1234'
def test_external_id(profile, user_csv_importer_factory):
assert User.objects.count() == 0
content = '''_source_name,_source_id,email,first_name,last_name,phone
app1,1,tnoel@entrouvert.com,Thomas,Noël,1234
app1,2,tnoel@entrouvert.com,Thomas,Noël,1234
'''
importer = user_csv_importer_factory(content)
assert importer.run(), importer.errors
assert importer.headers == [
CsvHeader(1, '_source_name'),
CsvHeader(2, '_source_id', key=True),
CsvHeader(3, 'email', field=True, verified=True),
CsvHeader(4, 'first_name', field=True),
CsvHeader(5, 'last_name', field=True),
CsvHeader(6, 'phone', attribute=True),
]
assert not importer.has_errors
assert len(importer.rows) == 2
for external_id in ['1', '2']:
thomas = User.objects.get(
userexternalid__source='app1',
userexternalid__external_id=external_id)
assert thomas.ou == get_default_ou()
assert thomas.email_verified is True
assert thomas.first_name == 'Thomas'
assert thomas.attributes.first_name == 'Thomas'
assert thomas.last_name == 'Noël'
assert thomas.attributes.last_name == 'Noël'
assert thomas.attributes.phone == '1234'