manager: move user export code (#43153)

This commit is contained in:
Valentin Deniaud 2021-03-09 17:23:41 +01:00
parent 6937f2f3ad
commit 413604d06b
2 changed files with 77 additions and 49 deletions

View File

@ -0,0 +1,74 @@
# authentic2 - versatile identity manager
# Copyright (C) 2010-2021 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import collections
import datetime
import tablib
from django.contrib.auth import get_user_model
from django.contrib.contenttypes.models import ContentType
from authentic2.manager.resources import UserResource
from authentic2.models import Attribute, AttributeValue
def get_user_dataset(qs):
user_resource = UserResource()
fields = user_resource._meta.export_order + ('email_verified', 'is_active', 'modified')
attributes = [attr.name for attr in Attribute.objects.all()]
headers = fields + tuple(['attribute_%s' % attr for attr in attributes])
at_mapping = {a.id: a for a in Attribute.objects.all()}
avs = (
AttributeValue.objects.filter(content_type=ContentType.objects.get_for_model(get_user_model()))
.filter(attribute__disabled=False)
.values()
)
user_attrs = collections.defaultdict(dict)
for av in avs:
user_attrs[av['object_id']][at_mapping[av['attribute_id']].name] = av['content']
def iso(rec):
if rec is None or rec == {}:
return ''
if hasattr(rec, 'strftime'):
if isinstance(rec, datetime.datetime):
_format = "%Y-%m-%d %H:%M:%S"
else:
_format = "%Y-%m-%d"
return rec.strftime(_format)
return rec
def create_record(user):
record = []
for field in fields:
if field == 'roles':
value = user_resource.dehydrate_roles(user)
else:
value = getattr(user, field)
record.append(value)
attr_d = user_attrs[user.pk]
for attr in attributes:
record.append(attr_d.get(attr))
return [iso(x) for x in record]
dataset = tablib.Dataset(headers=headers)
for user in qs:
dataset.append(create_record(user))
return dataset

View File

@ -16,13 +16,10 @@
import base64
import collections
import datetime
import operator
import tablib
from django.contrib import messages
from django.contrib.auth import REDIRECT_FIELD_NAME, get_user_model
from django.contrib.contenttypes.models import ContentType
from django.core.exceptions import PermissionDenied
from django.core.mail import EmailMultiAlternatives
from django.db import models, transaction
@ -41,7 +38,7 @@ from django.views.generic.edit import BaseFormView
from authentic2 import hooks
from authentic2.a2_rbac.utils import get_default_ou
from authentic2.apps.journal.views import JournalViewWithContext
from authentic2.models import Attribute, AttributeValue, PasswordReset
from authentic2.models import Attribute, PasswordReset
from authentic2.utils import make_url, redirect, select_next_url, send_password_reset_mail, switch_user
from authentic2_idp_oidc.models import OIDCAuthorization, OIDCClient
from django_rbac.utils import get_ou_model, get_role_model, get_role_parenting_model
@ -63,6 +60,7 @@ from .forms import (
from .journal_views import BaseJournalView
from .resources import UserResource
from .tables import OuUserRolesTable, UserAuthorizationsTable, UserRolesTable, UserTable
from .user_export import get_user_dataset
from .utils import get_ou_count, has_show_username
from .views import (
Action,
@ -516,51 +514,7 @@ class UsersExportView(ExportMixin, UsersView):
return self._dataset.export('csv')
def get_dataset(self):
user_resource = UserResource()
fields = user_resource._meta.export_order + ('email_verified', 'is_active', 'modified')
attributes = [attr.name for attr in Attribute.objects.all()]
headers = fields + tuple(['attribute_%s' % attr for attr in attributes])
at_mapping = {a.id: a for a in Attribute.objects.all()}
avs = (
AttributeValue.objects.filter(content_type=ContentType.objects.get_for_model(get_user_model()))
.filter(attribute__disabled=False)
.values()
)
user_attrs = collections.defaultdict(dict)
for av in avs:
user_attrs[av['object_id']][at_mapping[av['attribute_id']].name] = av['content']
def iso(rec):
if rec is None or rec == {}:
return ''
if hasattr(rec, 'strftime'):
if isinstance(rec, datetime.datetime):
_format = "%Y-%m-%d %H:%M:%S"
else:
_format = "%Y-%m-%d"
return rec.strftime(_format)
return rec
def create_record(user):
record = []
for field in fields:
if field == 'roles':
value = user_resource.dehydrate_roles(user)
else:
value = getattr(user, field)
record.append(value)
attr_d = user_attrs[user.pk]
for attr in attributes:
record.append(attr_d.get(attr))
return [iso(x) for x in record]
self._dataset = tablib.Dataset(headers=headers)
for user in self.get_data():
self._dataset.append(create_record(user))
self._dataset = get_user_dataset(self.get_data())
return self