manager: add ou and role imports (#45082)

This commit is contained in:
Valentin Deniaud 2020-08-24 14:44:38 +02:00
parent 7c1e2e1d2b
commit 4f3c6a47d6
10 changed files with 238 additions and 22 deletions

View File

@ -115,13 +115,19 @@ def search_ou(ou_d):
return None
def search_role(role_d):
def search_role(role_d, ou=None):
Role = get_role_model()
try:
Role = get_role_model()
return Role.objects.get_by_natural_key_json(role_d)
role = Role.objects.get_by_natural_key_json(role_d)
except Role.DoesNotExist:
return None
else:
if ou and role.ou != ou:
# Allow creation of the role in a different OU
role_d.pop('uuid')
return None
return role
class ImportContext(object):
@ -153,7 +159,8 @@ class ImportContext(object):
role_parentings_update=True,
role_permissions_update=True,
role_attributes_update=True,
ou_delete_orphans=False):
ou_delete_orphans=False,
set_ou=None):
self.import_roles = import_roles
self.import_ous = import_ous
self.role_delete_orphans = role_delete_orphans
@ -161,6 +168,7 @@ class ImportContext(object):
self.role_parentings_update = role_parentings_update
self.role_permissions_update = role_permissions_update
self.role_attributes_update = role_attributes_update
self.set_ou = set_ou
class RoleDeserializer(object):
@ -196,11 +204,17 @@ class RoleDeserializer(object):
@wraps_validationerror
def deserialize(self):
ou_d = self._role_d['ou']
has_ou = bool(ou_d)
ou = None if not has_ou else search_ou(ou_d)
if has_ou and not ou:
raise ValidationError(_("Can't import role because missing Organizational Unit: %s") % ou_d)
if self._import_context.set_ou:
ou = self._import_context.set_ou
has_ou = True
else:
ou_d = self._role_d['ou']
has_ou = bool(ou_d)
ou = None if not has_ou else search_ou(ou_d)
if has_ou and not ou:
raise ValidationError(_("Can't import role because missing Organizational Unit: %s") % ou_d)
obj = search_role(self._role_d, ou=self._import_context.set_ou)
kwargs = self._role_d.copy()
kwargs.pop('ou', None)
@ -208,7 +222,6 @@ class RoleDeserializer(object):
if has_ou:
kwargs['ou'] = ou
obj = search_role(self._role_d)
if obj: # Role already exist
self._obj = obj
status = 'updated'

View File

@ -15,6 +15,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import hashlib
import json
import smtplib
import logging
@ -659,8 +660,29 @@ class UserChangeEmailForm(CssClass, FormWithRequest, forms.ModelForm):
class SiteImportForm(forms.Form):
site_json = forms.FileField(
label=_('Site Export File'))
file_field_label = _('Site Export File')
site_json = forms.FileField(label=file_field_label)
def clean_site_json(self):
try:
return json.loads(self.cleaned_data['site_json'].read().decode())
except ValueError:
raise ValidationError(_('File is not in the expected JSON format.'))
class OusImportForm(SiteImportForm):
file_field_label = _('Organizational Units Export File')
class RolesImportForm(HideOUFieldMixin, LimitQuerysetFormMixin, SiteImportForm):
file_field_label = _('Roles Export File')
ou = forms.ModelChoiceField(
label=_('Organizational unit'),
queryset=get_ou_model().objects,
initial=lambda: get_default_ou().pk
)
ENCODINGS = [

View File

@ -19,8 +19,12 @@ import json
from django_rbac.utils import get_ou_model
from django.http import HttpResponseRedirect
from django.contrib import messages
from django.core.exceptions import PermissionDenied, ValidationError
from django.db import transaction
from django.urls import reverse
from django.utils import six
from django.utils.translation import ugettext as _
from django.views.generic import FormView
from authentic2 import data_transfer
@ -98,6 +102,8 @@ delete = OrganizationalUnitDeleteView.as_view()
class OusExportView(views.ExportMixin, OrganizationalUnitView):
export_prefix = 'ous-export-'
def get(self, request, *args, **kwargs):
export = data_transfer.export_site(
data_transfer.ExportContext(
@ -108,3 +114,34 @@ class OusExportView(views.ExportMixin, OrganizationalUnitView):
export = OusExportView.as_view()
class OusImportView(views.PermissionMixin, views.TitleMixin, views.MediaMixin, views.FormNeedsRequest,
FormView):
form_class = forms.OusImportForm
model = get_ou_model()
template_name = 'authentic2/manager/import_form.html'
title = _('Organizational Units Import')
def post(self, request, *args, **kwargs):
if not self.can_add:
raise PermissionDenied
return super().post(request, *args, **kwargs)
def form_valid(self, form):
try:
context = data_transfer.ImportContext(import_roles=False)
with transaction.atomic():
data_transfer.import_site(form.cleaned_data['site_json'], context)
except ValidationError as e:
form.add_error('site_json', e)
return self.form_invalid(form)
return super().form_valid(form)
def get_success_url(self):
messages.success(self.request, _('Organizational Units have been successfully imported.'))
return reverse('a2-manager-ous')
ous_import = OusImportView.as_view()

View File

@ -16,13 +16,14 @@
import json
from django.core.exceptions import PermissionDenied
from django.core.exceptions import PermissionDenied, ValidationError
from django.utils.translation import ugettext_lazy as _
from django.urls import reverse
from django.views.generic import FormView, TemplateView
from django.views.generic.detail import SingleObjectMixin
from django.contrib import messages
from django.contrib.contenttypes.models import ContentType
from django.db import transaction
from django.db.models.query import Q, Prefetch
from django.db.models import Count, F
from django.contrib.auth import get_user_model
@ -101,6 +102,7 @@ add = RoleAddView.as_view()
class RolesExportView(views.ExportMixin, RolesView):
resource_class = resources.RoleResource
export_prefix = 'roles-export-'
def get(self, request, *args, **kwargs):
export_format = kwargs['format'].lower()
@ -509,3 +511,38 @@ class RoleRemoveAdminUserView(views.TitleMixin, views.AjaxFormViewMixin,
return redirect(self.request, self.success_url)
remove_admin_user = RoleRemoveAdminUserView.as_view()
class RolesImportView(views.PermissionMixin, views.TitleMixin, views.MediaMixin, views.FormNeedsRequest,
FormView):
form_class = forms.RolesImportForm
model = get_role_model()
template_name = 'authentic2/manager/import_form.html'
title = _('Roles Import')
def post(self, request, *args, **kwargs):
if not self.can_add:
raise PermissionDenied
return super().post(request, *args, **kwargs)
def form_valid(self, form):
self.ou = form.cleaned_data['ou']
try:
context = data_transfer.ImportContext(import_ous=False, set_ou=self.ou)
with transaction.atomic():
data_transfer.import_site(form.cleaned_data['site_json'], context)
except ValidationError as e:
form.add_error('site_json', e)
return self.form_invalid(form)
return super().form_valid(form)
def get_success_url(self):
messages.success(
self.request,
_('Roles have been successfully imported inside "%s" organizational unit.') % self.ou
)
return reverse('a2-manager-roles') + '?search-ou=%s' % self.ou.pk
roles_import = RolesImportView.as_view()

View File

@ -16,6 +16,9 @@
{% endif %}
<ul class="extra-actions-menu">
<li><a download href="{% url 'a2-manager-ou-export' format="json" %}?{{ request.GET.urlencode }}">{% trans 'Export' %}</a></li>
{% if view.can_add %}
<li><a href="{% url 'a2-manager-ous-import' %}" rel="popup">{% trans 'Import' %}</a></li>
{% endif %}
</ul>
</span>
</span>

View File

@ -14,6 +14,9 @@
{% endif %}
<ul class="extra-actions-menu">
<li><a download href="{% url 'a2-manager-roles-export' format="json" %}?{{ request.GET.urlencode }}">{% trans 'Export' %}</a></li>
{% if view.can_add %}
<li><a href="{% url 'a2-manager-roles-import' %}" rel="popup">{% trans 'Import' %}</a></li>
{% endif %}
</ul>
</span>
{% endblock %}

View File

@ -85,6 +85,8 @@ urlpatterns = required(
# Authentic2 roles
url(r'^roles/$', role_views.listing,
name='a2-manager-roles'),
url(r'^roles/import/$', role_views.roles_import,
name='a2-manager-roles-import'),
url(r'^roles/add/$', role_views.add,
name='a2-manager-role-add'),
url(r'^roles/export/(?P<format>csv|json)/$',
@ -135,6 +137,9 @@ urlpatterns = required(
url(r'^organizational-units/export/(?P<format>json)/$',
ou_views.export,
name='a2-manager-ou-export'),
url(r'^organizational-units/import/$',
ou_views.ous_import,
name='a2-manager-ous-import'),
# Services
url(r'^services/$', service_views.listing,

View File

@ -723,16 +723,9 @@ class SiteImportView(MediaMixin, TitleMixin, FormView):
title = _('Site Import')
def form_valid(self, form):
try:
json_site = json.loads(
force_text(self.request.FILES['site_json'].read()))
except ValueError:
form.add_error('site_json', _('File is not in the expected JSON format.'))
return self.form_invalid(form)
try:
with transaction.atomic():
import_site(json_site, ImportContext())
import_site(form.cleaned_data['site_json'], ImportContext())
except ValidationError as e:
form.add_error('site_json', e)
return self.form_invalid(form)

View File

@ -13,8 +13,18 @@
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import json
from webtest import Upload
from django_rbac.utils import get_ou_model, get_role_model
from .utils import login
OU = get_ou_model()
Role = get_role_model()
def test_manager_ou_export(app, admin, ou1, role_ou1, ou2, role_ou2):
response = login(app, admin, 'a2-manager-ous')
@ -34,3 +44,41 @@ def test_manager_ou_export(app, admin, ou1, role_ou1, ou2, role_ou2):
assert len(export['ous']) == 1
assert export['ous'][0]['slug'] == 'ou1'
def test_manager_ou_import(app, admin, ou1, role_ou1, ou2, role_ou2):
response = login(app, admin, 'a2-manager-ous')
export_response = response.click('Export')
export = export_response.json
assert len(export['ous']) == 3
assert not 'roles' in export
ou1.delete()
ou2.delete()
resp = app.get('/manage/organizational-units/')
resp = resp.click('Import')
resp.form['site_json'] = Upload('export.json', json.dumps(export).encode(), 'application/json')
resp = resp.form.submit().follow()
assert OU.objects.filter(name=ou1.name).exists()
assert OU.objects.filter(name=ou2.name).exists()
export_response = response.click('Export')
new_export = export_response.json
assert len(export['ous']) == 3
assert new_export['ous'][1]['uuid'] == export['ous'][1]['uuid']
assert new_export['ous'][2]['uuid'] == export['ous'][2]['uuid']
# in case roles are present in export file, they must not be imported
export['roles'] = [{
"uuid": "27255f404cb140df9a577da76b59f285",
"slug": "should_not_exist",
"name": "should_not_exist",
}]
resp = resp.click('Import')
resp.form['site_json'] = Upload('export.json', json.dumps(export).encode(), 'application/json')
resp = resp.form.submit().follow()
assert not Role.objects.filter(slug="should_not_exist").exists()

View File

@ -14,10 +14,15 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import json
from webtest import Upload
from django.utils.encoding import force_bytes, force_text
from authentic2.custom_user.models import User
from authentic2.a2_rbac.models import Role
from authentic2.a2_rbac.models import Role, OrganizationalUnit
from authentic2.a2_rbac.utils import get_default_ou
from .utils import login, text_content
@ -111,3 +116,53 @@ def test_role_members_via(app, admin):
('user1', '', ''),
('user2', '', 'role2'),
]
def test_manager_role_import(app, admin, ou1, role_ou1, ou2, role_ou2):
response = login(app, admin, 'a2-manager-roles')
export_response = response.click('Export')
export = export_response.json
assert len(export['roles']) == 2
assert not 'ous' in export
Role.objects.filter(ou__in=[ou1, ou2]).delete()
resp = app.get('/manage/roles/')
resp = resp.click('Import')
resp.form['site_json'] = Upload('export.json', json.dumps(export).encode(), 'application/json')
resp = resp.form.submit().follow()
assert Role.objects.filter(name=role_ou1.name, ou=get_default_ou()).exists()
assert Role.objects.filter(name=role_ou2.name, ou=get_default_ou()).exists()
response.form.set('search-text', 'role_ou1')
search_response = response.form.submit()
export_response = response.click('Export')
new_export = export_response.json
assert len(export['roles']) == 2
assert new_export['roles'][0]['uuid'] == export['roles'][0]['uuid']
assert new_export['roles'][1]['uuid'] == export['roles'][1]['uuid']
resp = app.get('/manage/roles/')
resp = resp.click('Import')
resp.form['site_json'] = Upload('export.json', json.dumps(export).encode(), 'application/json')
resp.form['ou'] = ou1.pk
resp = resp.form.submit().follow()
assert Role.objects.filter(name=role_ou1.name, ou=get_default_ou()).exists()
assert Role.objects.filter(name=role_ou2.name, ou=get_default_ou()).exists()
assert Role.objects.filter(ou=ou1).count() == 4
# in case ous are present in export file, they must not be imported
export['ous'] = [{
"uuid": "27255f404cb140df9a577da76b59f285",
"slug": "should_not_exist",
"name": "should_not_exist",
}]
resp = resp.click('Import')
resp.form['site_json'] = Upload('export.json', json.dumps(export).encode(), 'application/json')
resp = resp.form.submit().follow()
assert not OrganizationalUnit.objects.filter(slug="should_not_exist").exists()