csv_import: allow adding roles (#35773)

This commit is contained in:
Valentin Deniaud 2019-10-09 15:29:24 +02:00
parent 64157da70c
commit f12353d81c
3 changed files with 216 additions and 15 deletions

View File

@ -30,12 +30,16 @@ from django.db.transaction import atomic
from django.utils import six
from django.utils.translation import ugettext as _
from django_rbac.utils import get_role_model
from authentic2 import app_settings
from authentic2.a2_rbac.utils import get_default_ou
from authentic2.custom_user.models import User
from authentic2.forms.profile import modelform_factory, BaseUserForm
from authentic2.models import Attribute, AttributeValue, UserExternalId
Role = get_role_model()
class UTF8Recoder(object):
def __init__(self, fd):
@ -149,6 +153,8 @@ class CsvHeader(object):
unique = attr.ib(default=False, metadata={'flag': True})
globally_unique = attr.ib(default=False, metadata={'flag': True})
verified = attr.ib(default=False, metadata={'flag': True})
delete = attr.ib(default=False, metadata={'flag': True})
clear = attr.ib(default=False, metadata={'flag': True})
@property
def flags(self):
@ -186,14 +192,25 @@ class LineError(Error):
return (self.code, self.line, self.column) == (other.code, other.line, other.column)
class ImportUserForm(BaseUserForm):
def clean(self):
super(BaseUserForm, self).clean()
self._validate_unique = False
SOURCE_NAME = '_source_name'
SOURCE_ID = '_source_id'
SOURCE_COLUMNS = set([SOURCE_NAME, SOURCE_ID])
ROLE_NAME = '_role_name'
ROLE_SLUG = '_role_slug'
SPECIAL_COLUMNS = SOURCE_COLUMNS | {ROLE_NAME, ROLE_SLUG}
class ImportUserForm(BaseUserForm):
locals()[ROLE_NAME] = forms.CharField(
label=_('Role name'),
required=False)
locals()[ROLE_SLUG] = forms.CharField(
label=_('Role slug'),
required=False)
def clean(self):
super(BaseUserForm, self).clean()
self._validate_unique = False
class ImportUserFormWithExternalId(ImportUserForm):
@ -216,6 +233,7 @@ class CsvRow(object):
errors = attr.ib(default=[])
is_valid = attr.ib(default=True)
action = attr.ib(default=None)
user_first_seen = attr.ib(default=True)
def __getitem__(self, header):
for cell in self.cells:
@ -344,6 +362,10 @@ class UserCsvImporter(object):
self.add_error(
Error('invalid-external-id-pair',
_('You must have a _source_name and a _source_id column')))
if ROLE_NAME in header_names and ROLE_SLUG in header_names:
self.add_error(
Error('invalid-role-column',
_('Either specify role names or role slugs, not both')))
def parse_header(self, head, column):
splitted = head.split()
@ -360,7 +382,7 @@ class UserCsvImporter(object):
if header.name in SOURCE_COLUMNS:
if header.name == SOURCE_ID:
header.key = True
else:
elif header.name not in SPECIAL_COLUMNS:
try:
if header.name in ['email', 'first_name', 'last_name', 'username']:
User._meta.get_field(header.name)
@ -388,7 +410,7 @@ class UserCsvImporter(object):
self.headers.append(header)
if (not (header.field or header.attribute)
and header.name not in SOURCE_COLUMNS):
and header.name not in SPECIAL_COLUMNS):
self.add_error(LineError('unknown-or-missing-attribute',
_('unknown or missing attribute "%s"') % head,
line=1, column=column))
@ -463,6 +485,10 @@ class UserCsvImporter(object):
def username_is_unique(self):
return app_settings.A2_USERNAME_IS_UNIQUE or self.ou.username_is_unique
@property
def allow_duplicate_key(self):
return ROLE_NAME in self.headers_by_name or ROLE_SLUG in self.headers_by_name
def check_unique_constraints(self, row, unique_map, user=None):
ou_users = User.objects.filter(ou=self.ou)
users = User.objects.all()
@ -479,12 +505,15 @@ class UserCsvImporter(object):
else:
continue
if unique_key in unique_map:
errors.append(
Error('unique-constraint-failed',
_('Unique constraint on column "%(column)s" failed: '
'value already appear on line %(line)d') % {
'column': header.name,
'line': unique_map[unique_key]}))
if user and self.allow_duplicate_key:
row.user_first_seen = False
else:
errors.append(
Error('unique-constraint-failed',
_('Unique constraint on column "%(column)s" failed: '
'value already appear on line %(line)d') % {
'column': header.name,
'line': unique_map[unique_key]}))
else:
unique_map[unique_key] = row.line
@ -500,8 +529,12 @@ class UserCsvImporter(object):
atvs = AttributeValue.objects.filter(attribute__name=cell.header.name, content=cell.value)
unique = not qs.filter(attribute_values__in=atvs).exists()
if not unique:
errors.append(
Error('unique-constraint-failed', _('Unique constraint on column "%s" failed') % cell.header.name))
if user and self.allow_duplicate_key:
row.user_first_seen = False
else:
errors.append(
Error('unique-constraint-failed',
_('Unique constraint on column "%s" failed') % cell.header.name))
row.errors.extend(errors)
row.is_valid = row.is_valid and not bool(errors)
return not bool(errors)
@ -510,6 +543,7 @@ class UserCsvImporter(object):
def do_import_row(self, row, unique_map):
if not row.is_valid:
return False
success = True
for header in self.headers:
if header.key:
@ -553,6 +587,9 @@ class UserCsvImporter(object):
if not self.check_unique_constraints(row, unique_map, user=user):
return False
if not row.user_first_seen:
cell = next(c for c in row.cells if c.header.name in {ROLE_NAME, ROLE_SLUG})
return self.add_role(cell, user)
if not user:
user = User(ou=self.ou)
@ -597,5 +634,33 @@ class UserCsvImporter(object):
continue
cell.action = 'nothing'
for cell in row.cells:
if cell.header.field or cell.header.attribute:
continue
if cell.header.name in {ROLE_NAME, ROLE_SLUG}:
success &= self.add_role(cell, user, do_clear=True)
setattr(self, row.action + 'd', getattr(self, row.action + 'd') + 1)
return success
def add_role(self, cell, user, do_clear=False):
try:
if cell.header.name == ROLE_NAME:
role = Role.objects.get(name=cell.value, ou=self.ou)
elif cell.header.name == ROLE_SLUG:
role = Role.objects.get(slug=cell.value, ou=self.ou)
except Role.DoesNotExist:
cell.errors.append(
Error('role-not-found',
_('Role "%s" does not exist') % cell.value))
return False
if cell.header.delete:
user.roles.remove(role)
elif cell.header.clear:
if do_clear:
user.roles.clear()
user.roles.add(role)
else:
user.roles.add(role)
cell.action = 'updated'
return True

View File

@ -166,6 +166,47 @@
cannot use another key column.
{% endblocktrans %}
</p>
<h4 id="help-roles">{% trans "Role operations" %}</h4>
<p>
{% blocktrans trimmed %}
Adding existing roles to users is supported. Use either
<var>_role_name</var> or <var>_role_slug</var> special columns to
specify the names or the slugs that should be added to the user. In
order to add multiple roles, simply add a new line, identical to the
first one, except for the value of the role cell. These columns also
accept special flags, as listed below.
{% endblocktrans %}
</p>
<table class="main left">
<thead>
<tr>
<th>{% trans "Flag" %}</th>
<th>{% trans "Meaning" %}</th>
<th>{% trans "Default value" %}</th>
</tr>
</thead>
<tbody>
<tr>
<td>delete</td>
<td>
{% blocktrans trimmed %}
Remove role from user instead of adding it.
{% endblocktrans %}
</td>
<td>{% trans "False" %}</td>
</tr>
<tr>
<td>clear</td>
<td>
{% blocktrans trimmed %}
Clear user roles beforehand, so that they will have no more roles
than those specified in the import file.
{% endblocktrans %}
</td>
<td>{% trans "False" %}</td>
</tr>
</tbody>
</table>
<h4>{% trans "Examples" %}</h4>
<p>{% blocktrans trimmed %}Importing first and last name of users keyed by email{% endblocktrans %}</p>
<blockquote>
@ -183,6 +224,14 @@ john.doe@example.com,John,Doe
<blockquote>
<pre>_source_name,_source_id,email,"family_reference unique",first_name,last_name
app1,1,john.doe@example.com,1234,John,Doe
</pre>
</blockquote>
<p>{% blocktrans trimmed %}Importing email, first and last name of users
while adding roles.{% endblocktrans %}</p>
<blockquote>
<pre>email key,first_name,last_name,_role_name
john.doe@example.com,John,Doe,Role1
john.doe@example.com,John,Doe,Role2
</pre>
</blockquote>
</div>

View File

@ -21,12 +21,16 @@ import pytest
import io
from django_rbac.utils import get_role_model
from authentic2.custom_user.models import User
from authentic2.models import Attribute
from authentic2.a2_rbac.utils import get_default_ou
from authentic2.csv_import import CsvImporter, UserCsvImporter, CsvHeader, Error, LineError
Role = get_role_model()
ENCODINGS = [
'iso-8859-1',
'iso-8859-15',
@ -389,3 +393,86 @@ app1,2,tnoel@entrouvert.com,Thomas,Noël,1234
importer = user_csv_importer_factory(content)
assert importer.run(), importer.errors
assert not importer.has_errors
def test_user_roles_csv(profile, user_csv_importer_factory):
role_name = 'test_name'
role_slug = 'test_slug'
role = Role.objects.create(name=role_name, slug=role_slug, ou=get_default_ou())
role2 = Role.objects.create(name='test2', ou=get_default_ou())
base_header = 'email key,first_name,last_name,phone,'
base_user = 'tnoel@entrouvert.com,Thomas,Noël,1234,'
content_name_add = '\n'.join((base_header + '_role_name', base_user + role_name))
importer = user_csv_importer_factory(content_name_add)
assert importer.run()
thomas = User.objects.get(email='tnoel@entrouvert.com')
assert thomas in role.members.all()
thomas.roles.add(role2)
importer = user_csv_importer_factory(content_name_add)
assert importer.run()
thomas.refresh_from_db()
assert thomas in role2.members.all()
content_name_delete = '\n'.join((base_header + '_role_name delete', base_user + role_name))
importer = user_csv_importer_factory(content_name_delete)
assert importer.run()
thomas.refresh_from_db()
assert thomas not in role.members.all()
assert thomas in role2.members.all()
content_name_clear = '\n'.join((base_header + '_role_name clear', base_user + role_name))
importer = user_csv_importer_factory(content_name_clear)
assert importer.run()
thomas.refresh_from_db()
assert thomas in role.members.all()
assert thomas not in role2.members.all()
thomas.roles.remove(role)
content_name_add_multiple = '\n'.join((base_header + '_role_name', base_user + role_name,
base_user + 'test2'))
importer = user_csv_importer_factory(content_name_add_multiple)
assert importer.run()
thomas.refresh_from_db()
assert thomas in role.members.all()
assert thomas in role2.members.all()
thomas.roles.remove(role)
thomas.roles.remove(role2)
content_name_clear_multiple = '\n'.join((base_header + '_role_name clear',
base_user + role_name,
base_user + 'test2'))
importer = user_csv_importer_factory(content_name_clear_multiple)
assert importer.run()
thomas.refresh_from_db()
assert thomas in role.members.all()
assert thomas in role2.members.all()
thomas.roles.remove(role)
content_slug_add = '\n'.join((base_header + '_role_slug', base_user + role_slug))
importer = user_csv_importer_factory(content_slug_add)
assert importer.run()
thomas.refresh_from_db()
assert thomas in role.members.all()
thomas.roles.remove(role)
content_only_key = '''email key,_role_name
tnoel@entrouvert.com,test_name'''
importer = user_csv_importer_factory(content_slug_add)
assert importer.run()
thomas.refresh_from_db()
assert thomas in role.members.all()
content_name_error = '\n'.join((base_header + '_role_name', base_user + 'bad_name'))
importer = user_csv_importer_factory(content_name_error)
assert importer.run()
assert importer.has_errors
assert importer.rows[0].cells[-1].errors[0].code == 'role-not-found'
content_header_error = '\n'.join((base_header + '_role_name,_role_slug',
base_user + ','.join((role_name, role_slug))))
importer = user_csv_importer_factory(content_header_error)
assert not importer.run()
assert importer.has_errors
assert importer.errors[0].code == 'invalid-role-column'