manager: import roles using CSV (#24921)

This commit is contained in:
Valentin Deniaud 2021-03-16 11:17:12 +01:00
parent b3bc13d26c
commit 36fd7c6545
9 changed files with 249 additions and 5 deletions

View File

@ -14,10 +14,13 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import csv
import hashlib
import json
import logging
import smtplib
from collections import defaultdict
from io import StringIO
from django import forms
from django.contrib.auth import get_user_model
@ -40,12 +43,13 @@ from authentic2.utils import (
)
from django_rbac.backends import DjangoRBACBackend
from django_rbac.models import Operation
from django_rbac.utils import get_ou_model, get_permission_model, get_role_model
from django_rbac.utils import generate_slug, get_ou_model, get_permission_model, get_role_model
from . import app_settings, fields, utils
User = get_user_model()
OU = get_ou_model()
Role = get_role_model()
logger = logging.getLogger(__name__)
@ -756,3 +760,100 @@ class UserEditImportForm(UserImportForm):
with self.user_import.meta_update as meta:
meta['ou'] = self.cleaned_data['ou']
meta['encoding'] = self.cleaned_data['encoding']
class RolesCsvImportForm(LimitQuerysetFormMixin, forms.Form):
import_file = forms.FileField(
label=_('Roles file'),
required=True,
help_text=_('CSV file with role name and optionnaly role slug and organizational unit.'),
)
ou = forms.ModelChoiceField(
label=_('Organizational unit'), queryset=get_ou_model().objects, initial=lambda: get_default_ou().pk
)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if utils.get_ou_count() < 2:
self.fields['ou'].widget = forms.HiddenInput()
def clean(self):
super().clean()
content = self.cleaned_data['import_file'].read()
if b'\0' in content:
raise ValidationError(_('Invalid file format.'))
for charset in ('utf-8-sig', 'iso-8859-15'):
try:
content = content.decode(charset)
break
except UnicodeDecodeError:
continue
# all byte-sequences are ok for iso-8859-15 so we will always reach
# this line with content being a unicode string.
try:
dialect = csv.Sniffer().sniff(content)
except csv.Error:
dialect = None
all_roles = Role.objects.all()
roles_by_slugs = defaultdict(dict)
for role in all_roles:
roles_by_slugs[role.ou][role.slug] = role
roles_by_names = defaultdict(dict)
for role in all_roles:
if role.name:
roles_by_names[role.ou][role.name] = role
self.roles = []
for i, csvline in enumerate(csv.reader(StringIO(content), dialect=dialect, delimiter=',')):
if not csvline:
continue
if i == 0:
if csvline != ['name', 'slug', 'ou'][: len(csvline)]:
header = ','.join(csvline)
raise ValidationError(_('Invalid file header "%s", expected "name,slug,ou".') % header)
continue
name = csvline[0]
if not name:
self.add_line_error(_('Name is required.'), i)
continue
slug = ''
if len(csvline) > 1:
slug = csvline[1]
ou = self.cleaned_data['ou']
if len(csvline) > 2 and csvline[2]:
try:
ou = OU.objects.get(slug=csvline[2])
except OU.DoesNotExist:
self.add_line_error(_('Organizational Unit %s does not exist.') % csvline[2], i)
continue
if name in roles_by_names.get(ou, {}):
role = roles_by_names[ou][name]
role.slug = slug or role.slug
elif slug in roles_by_slugs.get(ou, {}):
role = roles_by_slugs[ou][slug]
role.name = name
else:
role = Role(name=name, slug=slug)
if not role.slug:
role.slug = generate_slug(role.name, seen_slugs=roles_by_slugs[ou])
roles_by_slugs[ou][role.slug] = role
roles_by_names[ou][role.name] = role
role.ou = ou
self.roles.append(role)
def add_line_error(self, error, line):
error = _('%s (line %d)') % (error, line + 1)
self.add_error('import_file', error)

View File

@ -686,6 +686,51 @@ class RolesImportView(
roles_import = RolesImportView.as_view()
class RolesCsvImportView(
views.PermissionMixin, views.TitleMixin, views.MediaMixin, views.FormNeedsRequest, FormView
):
form_class = forms.RolesCsvImportForm
model = get_role_model()
template_name = 'authentic2/manager/roles_csv_import_form.html'
title = _('Roles CSV Import')
def get_initial(self):
initial = super().get_initial()
search_ou = self.request.GET.get('search-ou')
if search_ou:
initial['ou'] = search_ou
return initial
def post(self, request, *args, **kwargs):
if not self.can_add:
raise PermissionDenied
return super().post(request, *args, **kwargs)
def form_valid(self, form):
self.ou = form.cleaned_data['ou']
for role in form.roles:
role.save()
return super().form_valid(form)
def get_success_url(self):
messages.success(
self.request,
_('Roles have been successfully imported inside "%s" organizational unit.') % self.ou,
)
return reverse('a2-manager-roles') + '?search-ou=%s' % self.ou.pk
roles_csv_import = RolesCsvImportView.as_view()
class RolesCsvImportSampleView(TemplateView):
template_name = 'authentic2/manager/sample_roles.txt'
content_type = 'text/csv'
roles_csv_import_sample = RolesCsvImportSampleView.as_view()
class RoleJournal(views.PermissionMixin, JournalViewWithContext, BaseJournalView):
template_name = 'authentic2/manager/role_journal.html'
permissions = ['a2_rbac.view_role']

View File

@ -19,6 +19,7 @@
<li><a download href="{% url 'a2-manager-roles-export' format="json" %}?{{ request.GET.urlencode }}">{% trans 'Export' %}</a></li>
{% if view.can_add %}
<li><a href="{% url 'a2-manager-roles-import' %}?{{ request.GET.urlencode }}" rel="popup">{% trans 'Import' %}</a></li>
<li><a href="{% url 'a2-manager-roles-csv-import' %}?{{ request.GET.urlencode }}" rel="popup">{% trans 'CSV import' %}</a></li>
{% endif %}
</ul>
</span>

View File

@ -0,0 +1,16 @@
{% extends "authentic2/manager/import_form.html" %}
{% load i18n gadjo %}
{% block content %}
<form method="post" enctype="multipart/form-data">
{% csrf_token %}
{{ form|with_template }}
<p>
<a href="{% url 'a2-manager-roles-csv-import-sample' %}">{% trans 'Download sample file' %}</a>
</p>
<div class="buttons">
<button>{% trans "Import" %}</button>
<a class="cancel" href="{% url 'a2-manager-homepage' %}">{% trans 'Cancel' %}</a>
</div>
</form>
{% endblock %}

View File

@ -0,0 +1,2 @@
name,slug,ou
Role Name,role_slug,ou_slug

View File

@ -116,6 +116,12 @@ urlpatterns = required(
# Authentic2 roles
url(r'^roles/$', role_views.listing, name='a2-manager-roles'),
url(r'^roles/import/$', role_views.roles_import, name='a2-manager-roles-import'),
url(r'^roles/csv-import/$', role_views.roles_csv_import, name='a2-manager-roles-csv-import'),
url(
r'^roles/csv-import-sample/$',
role_views.roles_csv_import_sample,
name='a2-manager-roles-csv-import-sample',
),
url(r'^roles/add/$', role_views.add, name='a2-manager-role-add'),
url(r'^roles/export/(?P<format>csv|json)/$', role_views.export, name='a2-manager-roles-export'),
url(r'^roles/journal/$', role_views.roles_journal, name='a2-manager-roles-journal'),

View File

@ -5,7 +5,6 @@ import operator
from django.conf import settings
from django.db import models
from django.db.models.query import Prefetch, Q
from django.utils.text import slugify
from django.utils.translation import ugettext_lazy as _
try:
@ -45,7 +44,7 @@ class AbstractBase(models.Model):
def save(self, *args, **kwargs):
# truncate slug and add a hash if it's too long
if not self.slug:
self.slug = slugify(str(self.name)).lstrip('_')
self.slug = utils.generate_slug(self.name)
if len(self.slug) > 256:
self.slug = self.slug[:252] + hashlib.md5(self.slug).hexdigest()[:4]
if not self.uuid:

View File

@ -2,6 +2,7 @@ import uuid
from django.apps import apps
from django.conf import settings
from django.utils.text import slugify
from . import constants
@ -80,3 +81,12 @@ def get_operation(operation_tpl):
operation, created = models.Operation.objects.get_or_create(slug=operation_tpl.slug)
return operation
def generate_slug(name, seen_slugs=None):
slug = base_slug = slugify(name).lstrip('_')
if seen_slugs:
i = 1
while slug in seen_slugs:
slug = '%s-%s' % (base_slug, i)
return slug

View File

@ -38,7 +38,7 @@ def test_manager_role_export(app, admin, ou1, role_ou1, ou2, role_ou2):
assert len(export['roles']) == 2
assert set([role['slug'] for role in export['roles']]) == set(['role_ou1', 'role_ou2'])
export_response = response.click('CSV')
export_response = response.click('CSV', href='/export/')
reader = csv.reader(
[force_text(line) for line in export_response.body.split(force_bytes('\r\n'))], delimiter=','
)
@ -59,7 +59,7 @@ def test_manager_role_export(app, admin, ou1, role_ou1, ou2, role_ou2):
assert len(export['roles']) == 1
assert export['roles'][0]['slug'] == 'role_ou1'
export_response = search_response.click('CSV')
export_response = search_response.click('CSV', href='/export/')
reader = csv.reader(
[force_text(line) for line in export_response.body.split(force_bytes('\r\n'))], delimiter=','
)
@ -272,3 +272,67 @@ def test_roles_displayed_fields(app, admin, ou1, ou2):
('role1', '', 'checked', None),
('role2 (LDAP)', 'role1 ', None, 'disabled'),
]
def test_manager_role_csv_import(app, admin, ou1, ou2):
roles_count = Role.objects.count()
resp = login(app, admin, '/manage/roles/')
resp = resp.click('CSV import')
csv_header = b'name,slug,ou\n'
csv_content = 'Role Name,role_slug,%s' % ou1.slug
resp.form['import_file'] = Upload('t.csv', csv_header + csv_content.encode(), 'text/csv')
resp.form.submit(status=302)
assert Role.objects.get(name='Role Name', slug='role_slug', ou=ou1)
assert Role.objects.count() == roles_count + 1
csv_content = 'Role 2,role2,\nRole 3,,%s' % ou2.slug
resp.form['import_file'] = Upload('t.csv', csv_header + csv_content.encode(), 'text/csv')
resp.form.submit(status=302)
assert Role.objects.get(name='Role 2', slug='role2', ou=get_default_ou())
assert Role.objects.get(name='Role 3', slug='role-3', ou=ou2)
assert Role.objects.count() == roles_count + 3
# slug can be updated using name, name can be updated using slug
csv_content = 'Role two,role2,\nRole 3,role-three,%s' % ou2.slug
resp.form['import_file'] = Upload('t.csv', csv_header + csv_content.encode(), 'text/csv')
resp.form.submit(status=302)
assert Role.objects.get(name='Role two', slug='role2', ou=get_default_ou())
assert Role.objects.get(name='Role 3', slug='role-three', ou=ou2)
assert Role.objects.count() == roles_count + 3
# conflict in auto-generated slug is handled
csv_header = b'name\n'
csv_content = 'Role!2'
resp.form['import_file'] = Upload('t.csv', csv_header + csv_content.encode(), 'text/csv')
resp.form.submit(status=302)
assert Role.objects.get(name='Role!2', slug='role2-1', ou=get_default_ou())
assert Role.objects.count() == roles_count + 4
# Identical roles are created only once
csv_content = 'Role 4,role-4,\nRole 4,,\nRole 4,role-4,'
resp.form['import_file'] = Upload('t.csv', csv_header + csv_content.encode(), 'text/csv')
resp.form.submit(status=302)
assert Role.objects.get(name='Role 4', slug='role-4', ou=get_default_ou())
assert Role.objects.count() == roles_count + 5
csv_content = 'xx\0xx,,'
resp.form['import_file'] = Upload('t.csv', csv_header + csv_content.encode(), 'text/csv')
resp = resp.form.submit()
assert 'Invalid file format.' in resp.text
wrong_header = b'a,b,c\n'
resp.form['import_file'] = Upload('t.csv', wrong_header, 'text/csv')
resp = resp.form.submit()
assert 'Invalid file header' in resp.text
csv_content = ',slug-but-no-name,\nRole,,unknown-ou'
resp = app.get('/manage/roles/csv-import/')
resp.form['import_file'] = Upload('t.csv', csv_header + csv_content.encode(), 'text/csv')
resp = resp.form.submit()
assert 'Name is required. (line 2)' in resp.text
assert 'Organizational Unit unknown-ou does not exist. (line 3)' in resp.text
resp = app.get('/manage/roles/csv-import/')
resp = resp.click('Download sample')
assert 'name,slug,ou' in resp.text