88 lines
3.3 KiB
Python
88 lines
3.3 KiB
Python
# -*- coding: utf-8 -*-
|
|
# authentic2 - versatile identity manager
|
|
# Copyright (C) 2010-2019 Entr'ouvert
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify it
|
|
# under the terms of the GNU Affero General Public License as published
|
|
# by the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
from __future__ import unicode_literals
|
|
|
|
import io
|
|
import operator
|
|
|
|
import pytest
|
|
|
|
from authentic2.manager.user_import import UserImport, Report
|
|
from authentic2.models import Attribute
|
|
|
|
|
|
@pytest.fixture
|
|
def profile(transactional_db):
|
|
Attribute.objects.create(name='phone', kind='phone_number', label='Numéro de téléphone')
|
|
|
|
|
|
def test_user_import(transactional_db, profile):
|
|
content = '''email key verified,first_name,last_name,phone no-create
|
|
tnoel@entrouvert.com,Thomas,Noël,1234
|
|
fpeters@entrouvert.com,Frédéric,Péters,5678
|
|
x,x,x,x'''
|
|
fd = io.BytesIO(content.encode('utf-8'))
|
|
|
|
assert len(list(UserImport.all())) == 0
|
|
|
|
UserImport.new(fd, encoding='utf-8')
|
|
UserImport.new(fd, encoding='utf-8')
|
|
|
|
assert len(list(UserImport.all())) == 2
|
|
for user_import in UserImport.all():
|
|
with user_import.import_file as fd:
|
|
assert fd.read() == content.encode('utf-8')
|
|
|
|
for user_import in UserImport.all():
|
|
report = Report.new(user_import)
|
|
assert user_import.reports[report.uuid].exists()
|
|
assert user_import.reports[report.uuid].data['encoding'] == 'utf-8'
|
|
assert user_import.reports[report.uuid].data['state'] == 'waiting'
|
|
|
|
t = report.run(start=False)
|
|
t.start()
|
|
t.join()
|
|
|
|
assert user_import.reports[report.uuid].data['state'] == 'finished'
|
|
assert user_import.reports[report.uuid].data['importer']
|
|
assert not user_import.reports[report.uuid].data['importer'].errors
|
|
|
|
for user_import in UserImport.all():
|
|
reports = list(user_import.reports)
|
|
assert len(reports) == 1
|
|
assert reports[0].created
|
|
importer = reports[0].data['importer']
|
|
assert importer.rows[0].is_valid
|
|
assert importer.rows[1].is_valid
|
|
assert not importer.rows[2].is_valid
|
|
|
|
user_imports = sorted(UserImport.all(), key=operator.attrgetter('created'))
|
|
user_import1 = user_imports[0]
|
|
report1 = list(user_import1.reports)[0]
|
|
importer = report1.data['importer']
|
|
assert all(row.action == 'create' for row in importer.rows[:2])
|
|
assert all(cell.action == 'updated' for row in importer.rows[:2] for cell in row.cells[:3])
|
|
assert all(cell.action == 'nothing' for row in importer.rows[:2] for cell in row.cells[3:])
|
|
|
|
user_import2 = user_imports[1]
|
|
report2 = list(user_import2.reports)[0]
|
|
importer = report2.data['importer']
|
|
assert all(row.action == 'update' for row in importer.rows[:2])
|
|
assert all(cell.action == 'nothing' for row in importer.rows[:2] for cell in row.cells[:3])
|
|
assert all(cell.action == 'updated' for row in importer.rows[:2] for cell in row.cells[3:])
|