manager: add user import views (fixes #32833)

This commit is contained in:
Benjamin Dauvergne 2019-06-15 15:59:31 +02:00
parent fe0895da8b
commit dc3582ed45
16 changed files with 994 additions and 8 deletions

3
debian/control vendored
View File

@ -31,7 +31,8 @@ Depends: ${misc:Depends}, ${python:Depends},
python-pil,
python-tablib,
python-chardet,
python-attr
python-attr,
python-atomicwrites
Breaks: python-authentic2-auth-fc (<< 0.26)
Replaces: python-authentic2-auth-fc (<< 0.26)
Provides: ${python:Provides}, python-authentic2-auth-fc

View File

@ -142,6 +142,7 @@ setup(name="authentic2",
'tablib',
'chardet',
'attrs',
'atomicwrites',
],
zip_safe=False,
classifiers=[

View File

@ -45,6 +45,7 @@ from authentic2 import app_settings as a2_app_settings
from . import fields, app_settings, utils
User = get_user_model()
OU = get_ou_model()
logger = logging.getLogger(__name__)
@ -715,3 +716,74 @@ class UserChangeEmailForm(CssClass, FormWithRequest, forms.ModelForm):
class SiteImportForm(forms.Form):
site_json = forms.FileField(
label=_('Site Export File'))
ENCODINGS = [
('utf-8', _('Unicode')),
('cp1252', _('Western Europe (Windows-1252)')),
('iso-8859-15', _('Western Europe (ISO-8859-15)')),
]
class UserImportForm(forms.Form):
import_file = forms.FileField(
label=_('Import file'),
help_text=_('A CSV file'))
encoding = forms.ChoiceField(
label=_('Encoding'),
choices=ENCODINGS)
ou = forms.ModelChoiceField(
label=_('Organizational Unit'),
queryset=OU.objects.all())
class UserNewImportForm(UserImportForm):
def clean(self):
from authentic2.csv_import import CsvImporter
import_file = self.cleaned_data['import_file']
encoding = self.cleaned_data['encoding']
# force seek(0)
import_file.open()
importer = CsvImporter()
if not importer.run(import_file, encoding):
raise forms.ValidationError(importer.error.description or importer.error.code)
self.cleaned_data['rows_count'] = len(importer.rows)
def save(self):
from . import user_import
import_file = self.cleaned_data['import_file']
import_file.open()
new_import = user_import.UserImport.new(
import_file=import_file,
encoding=self.cleaned_data['encoding'])
with new_import.meta_update as meta:
meta['filename'] = import_file.name
meta['ou'] = self.cleaned_data['ou']
meta['rows_count'] = self.cleaned_data['rows_count']
return new_import
class UserEditImportForm(UserImportForm):
def __init__(self, *args, **kwargs):
self.user_import = kwargs.pop('user_import')
initial = kwargs.setdefault('initial', {})
initial['encoding'] = self.user_import.encoding
initial['ou'] = self.user_import.ou
super(UserEditImportForm, self).__init__(*args, **kwargs)
del self.fields['import_file']
def clean(self):
from authentic2.csv_import import CsvImporter
encoding = self.cleaned_data['encoding']
with self.user_import.import_file as fd:
importer = CsvImporter()
if not importer.run(fd, encoding):
raise forms.ValidationError(importer.error.description or importer.error.code)
self.cleaned_data['rows_count'] = len(importer.rows)
def save(self):
with self.user_import.meta_update as meta:
meta['ou'] = self.cleaned_data['ou']
meta['encoding'] = self.cleaned_data['encoding']

View File

@ -8,6 +8,10 @@
width: 100%;
}
#sidebar .buttons button {
width: auto;
}
#sidebar input[type="checkbox"] {
width: auto;
}

View File

@ -0,0 +1,51 @@
#import-report-table tr.row-valid td, .legend-row-valid {
background-color: #d5f5e3 ;
}
#import-report-table tr.row-invalid td, .legend-row-invalid {
background-color: #ff4408;
}
#import-report-table tr td.cell-action-updated, .legend-cell-action-updated {
background-color: #abebc6;
}
#import-report-table tr td.cell-errors, .legend-cell-errors {
background-color: #cd6155;
}
.header-flag-key::after {
content: "\f084"; /* fa-key */
font-family: FontAwesome;
padding-left: 1ex;
}
.header-flag-unique::after,
.header-flag-globally-unique::after
{
content: "\f0cd"; /* fa-underline */
font-family: FontAwesome;
padding-left: 1ex;
}
.header-flag-create::after {
content: "\f0fe"; /* fa-plus-square */
font-family: FontAwesome;
padding-left: 1ex;
}
.header-flag-update::after {
content: "\f040"; /* fa-pencil */
font-family: FontAwesome;
padding-left: 1ex;
}
.header-flag-verified::after {
content: "\f023"; /* fa-lock */
font-family: FontAwesome;
padding-left: 1ex;
}
span.icon-check::after {
content: "\f00c";
font-family: FontAwesome;
}

View File

@ -0,0 +1,67 @@
{% extends "authentic2/manager/base.html" %}
{% load i18n gadjo staticfiles %}
{% block page-title %}{{ block.super }} - {% trans "Import Users" %}{% endblock %}
{% block css %}
{{ block.super }}
<link rel="stylesheet" href="{% static "authentic2/manager/css/user_import.css" %}">
{% endblock %}
{% block breadcrumb %}
{{ block.super }}
<a href="{% url 'a2-manager-users' %}">{% trans 'Users' %}</a>
<a href="{% url 'a2-manager-users-imports' %}">{% trans 'Imports' %}</a>
<a href="{% url 'a2-manager-users-import' uuid=user_import.uuid %}">{% trans "User Import" %} {{ user_import.created }}</a>
{% endblock %}
{% block sidebar %}
<aside id="sidebar">
<div>
<h3>{% trans "Modify import" %}</h3>
<form method="post" id="action-form">
{% csrf_token %}
{{ form|with_template }}
<div class="buttons">
<button name="modify">{% trans "Modify" %}</button>
<button name="simulate">{% trans "Simulate" %}</button>
<button name="execute">{% trans "Execute" %}</button>
</div>
</form>
</div>
<div>
<h3>{% trans "Download" %}</h3>
<form action="download/{{ user_import.filename }}" id="download-form">
<div class="buttons">
<button>{% trans "Download" %}</button>
</div>
</form>
</div>
</aside>
{% endblock %}
{% block main %}
<h2>{% trans "User Import" %} - {{ user_import.created }} - {{ user_import.user }}</h2>
<p>{% trans "Rows count:" %} {{ user_import.rows_count }}</p>
<h2>{% trans "Reports" %}</h2>
<table class="main">
<thead>
<tr>
<td>{% trans "Creation date" %}</td>
<td>{% trans "State" %}</td>
<td>{% trans "Imported" %}</td>
<td></td>
</tr>
</thead>
<tbody>
{% for report in reports %}
<tr data-uuid="{{ report.uuid }}">
<td class="created">{% if report.state != 'running' %}<a href="{% url "a2-manager-users-import-report" import_uuid=user_import.uuid report_uuid=report.uuid %}">{{ report.created }}</a>{% else %}{{ report.created }}{% endif %}</td>
<td class="state"><span>{{ report.state }}</span> {% if report.state == 'error' %}"{{ report.exception }}"{% endif %} {% if not report.is_running %}!failed before finishing!{% endif %}</td>
<td class="applied">{% if not report.simulate %}<span class="icon-check"></span>{% endif %}</td>
<td class="delete-action">{% if report.simulate %}<form method="post" id="delete-form-{{ report.uuid }}">{% csrf_token %}<button name="delete" value="{{ report.uuid }}">{% trans "Delete" %}</button></form>{% endif %}</td>
</tr>
{% endfor %}
</tbody>
</table>
{% endblock %}

View File

@ -0,0 +1,135 @@
{% extends "authentic2/manager/base.html" %}
{% load i18n gadjo staticfiles %}
{% block page-title %}{{ block.super }} - {% trans "Import Users" %}{% endblock %}
{% block css %}
{{ block.super }}
<link rel="stylesheet" href="{% static "authentic2/manager/css/user_import.css" %}">
{% endblock %}
{% block breadcrumb %}
{{ block.super }}
<a href="{% url 'a2-manager-users' %}">{% trans 'Users' %}</a>
<a href="{% url 'a2-manager-users-imports' %}">{% trans 'Imports' %}</a>
<a href="{% url 'a2-manager-users-import' uuid=user_import.uuid %}">{% trans "User Import" %} {{ user_import.created }}</a>
<a href="{% url 'a2-manager-users-import-report' import_uuid=user_import.uuid report_uuid=report.uuid%}">{{ report_title }} {{ report.created }}</a>
{% endblock %}
{% block sidebar %}
<aside id="sidebar">
<h3>{% trans "Legend" %}</td>
<table>
<tr>
<td><span class="header-flag-key"></span></td>
<td>{% trans "value is a key" %}</td>
</tr>
<tr>
<td><span class="header-flag-unique"></span></td>
<td>{% trans "value must be unique" %}</td>
</tr>
<tr>
<td><span class="header-flag-create"></span></td>
<td>{% trans "used on creation" %}</td>
</tr>
<tr>
<td><span class="header-flag-update"></span></td>
<td>{% trans "used on update" %}</td>
</tr>
<tr>
<td><span class="header-flag-verified"></span></td>
<td>{% trans "value is verified" %}</td>
</tr>
<tr>
<td class="legend-row-valid"></td>
<td>{% trans "row is valid" %}</td>
</tr>
<tr>
<td class="legend-cell-action-updated"></td>
<td>{% trans "value will be written" %}</td>
</tr>
<tr>
<td class="legend-row-invalid"></td>
<td>{% trans "row is invalid" %}</td>
</tr>
<tr>
<td class="legend-cell-errors"></td>
<td>{% trans "value has errors" %}</td>
</tr>
</table>
</aside>
{% endblock %}
{% block main %}
<h2>{{ report_title }} - {{ report.created }} - {{ report.state }}</h2>
{% if report.exception %}
<p>{% trans "Exception:" %} {{ report.exception}}</p>
{% endif %}
{% if report.importer %}
{% with importer=report.importer %}
{% if importer.errors %}
<h4>{% trans "Errors" %}</h4>
<ul class="errors">
{% for error in importer.errors %}
<li data-code="{{ error.code }}">{% firstof error.description error.code %}</li>
{% endfor %}
</ul>
{% endif %}
<h3>{% trans "Abstract" %}</h3>
<ul>
<li>{{ importer.rows|length }} {% trans "rows" %}</li>
<li>{{ importer.created }} {% trans "user(s) created" %}</li>
<li>{{ importer.updated }} {% trans "user(s) updated" %}</li>
<li>{{ importer.rows_with_errors }} {% trans "rows have errors" %}</li>
<li>{% blocktrans with duration=report.duration %}import took {{ duration }}{% endblocktrans %}</li>
</ul>
<h3>{% trans "Details" %}</h3>
{% if importer.rows %}
<table id="import-report-table" class="main">
<thead>
<tr>
<td>{% trans "Line" %}</td>
{% for header in importer.headers %}
<td
{% if header.flags %}
title="flags: {% for flag in header.flags %}{{ flag }} {% endfor %}"
{% endif %}
>
{{ header.name }}
{% for flag in header.flags %}
<span class="header-flag-{{ flag }}"></span>
{% endfor %}
</td>
{% endfor %}
<td>{% trans "Action" %}</td>
</tr>
</thead>
<tbody>
{% for row in importer.rows %}
<tr
class="{% if row.is_valid %}row-valid{% else %}row-invalid{% endif %} row-action-{{ row.action }}"
{% if not row.is_valid %}title="{% for error in row.errors %}{% firstof error.description error.code %}
{% endfor %}{% for cell in row %}{% for error in cell.errors %}
{{ cell.header.name }}: {% firstof error.description error.code %}{% endfor %}{% endfor %}"{% endif %}
>
<td class="row-line">{{ row.line }}</td>
{% for cell in row %}
<td
class="{% if cell.errors %}cell-errors{% endif %} {% if cell.action %}cell-action-{{ cell.action }}{% endif %}"
{% if cell.errors %}title="{% for error in cell.errors %}{% firstof error.description error.code %}
{% endfor %}"{% endif %}
>
{{ cell.value }}
</td>
{% endfor %}
<td class="row-action">{% firstof row.action "-" %}</td>
</tr>
{% endfor %}
</tbody>
</table>
{% else %}
<p>{% trans "No row analysed." %}</p>
{% endif %}
{% endwith %}
{% endif %}
{% endblock %}

View File

@ -0,0 +1,47 @@
{% extends "authentic2/manager/base.html" %}
{% load i18n gadjo staticfiles %}
{% block page-title %}{{ block.super }} - {% trans "Import Users" %}{% endblock %}
{% block breadcrumb %}
{{ block.super }}
<a href="{% url 'a2-manager-users' %}">{% trans 'Users' %}</a>
<a href="{% url 'a2-manager-users-imports' %}">{% trans 'Import Users' %}</a>
{% endblock %}
{% block sidebar %}
<aside id="sidebar">
<h3>{% trans "Create new import" %}</h3>
<form method="post" enctype="multipart/form-data">
{% csrf_token %}
{{ form|with_template }}
<button name="create">{% trans "Create" %}</button>
</form>
</aside>
{% endblock %}
{% block content %}
<h3>{% trans "Imports" %}</h3>
<table class="main">
<thead>
<tr>
<td>{% trans "Filename" %}</td>
<td>{% trans "Creation date" %}</td>
<td>{% trans "By" %}</td>
<td>{% trans "Rows" %}</td>
<td></td>
</tr>
</thead>
<tbody>
{% for import in imports %}
<tr>
<td><a href="{% url "a2-manager-users-import" uuid=import.uuid %}">{{ import.filename }}</a></td>
<td>{{ import.created }}</td>
<td>{{ import.user }}</td>
<td>{{ import.rows_count }}</td>
<td><form method="post">{% csrf_token %}<button name="delete" value="{{ import.uuid }}">{% trans "Delete" %}</button></form></td>
</tr>
{% endfor %}
</tbody>
</table>
{% endblock %}

View File

@ -6,6 +6,7 @@
{% block appbar %}
{{ block.super }}
<span class="actions">
<a class="extra-actions-menu-opener"></a>
{% if add_ou %}
<a
href="{% url "a2-manager-user-add" ou_pk=add_ou.pk %}"
@ -20,7 +21,15 @@
{% trans "Add user" %}
</a>
{% endif %}
{% if extra_actions %}
<ul class="extra-actions-menu">
{% for extra_action in extra_actions %}
<li><a href="{{ extra_action.url }}">{{ extra_action.label }}</a></li>
{% endfor %}
</ul>
{% endif %}
</span>
{% endblock %}
{% block breadcrumb %}

View File

@ -39,6 +39,14 @@ urlpatterns = required(
user_views.users_export, name='a2-manager-users-export'),
url(r'^users/add/$', user_views.user_add_default_ou,
name='a2-manager-user-add-default-ou'),
url(r'^users/import/$',
user_views.user_imports, name='a2-manager-users-imports'),
url(r'^users/import/(?P<uuid>[a-z0-9]+)/download/(?P<filename>.*)$',
user_views.user_import, name='a2-manager-users-import-download'),
url(r'^users/import/(?P<uuid>[a-z0-9]+)/$',
user_views.user_import, name='a2-manager-users-import'),
url(r'^users/import/(?P<import_uuid>[a-z0-9]+)/(?P<report_uuid>[a-z0-9]+)/$',
user_views.user_import_report, name='a2-manager-users-import-report'),
url(r'^users/(?P<ou_pk>\d+)/add/$', user_views.user_add,
name='a2-manager-user-add'),
url(r'^users/(?P<pk>\d+)/$', user_views.user_detail,

View File

@ -0,0 +1,250 @@
# authentic2 - versatile identity manager
# Copyright (C) 2010-2019 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from __future__ import unicode_literals
import base64
import contextlib
import datetime
import logging
import os
import pickle
import shutil
import uuid
import threading
from atomicwrites import atomic_write
from django.core.files.storage import default_storage
from django.db import connection
from django.utils import six
from django.utils.functional import cached_property
from django.utils.timezone import utc
from authentic2.utils import gettid
logger = logging.getLogger(__name__)
def new_id():
return (base64.b32encode(uuid.uuid4().get_bytes())
.strip('=')
.lower()
.decode('ascii'))
class UserImport(object):
def __init__(self, uuid):
self.uuid = uuid
self.path = os.path.join(self.base_path(), self.uuid)
self.import_path = os.path.join(self.path, 'content')
self.meta_path = os.path.join(self.path, 'meta.pck')
def exists(self):
return os.path.exists(self.import_path) and os.path.exists(self.meta_path)
@cached_property
def created(self):
return datetime.datetime.fromtimestamp(os.path.getctime(self.path), utc)
@property
def import_file(self):
return open(self.import_path, 'rb')
@cached_property
def meta(self):
meta = {}
if os.path.exists(self.meta_path):
with open(self.meta_path) as fd:
meta = pickle.load(fd)
return meta
@property
@contextlib.contextmanager
def meta_update(self):
try:
yield self.meta
finally:
with atomic_write(self.meta_path, overwrite=True) as fd:
pickle.dump(self.meta, fd)
@classmethod
def base_path(self):
path = default_storage.path('user_imports')
if not os.path.exists(path):
os.makedirs(path)
return path
@classmethod
def new(cls, import_file, encoding):
o = cls(new_id())
os.makedirs(o.path)
with open(o.import_path, 'wb') as fd:
import_file.seek(0)
fd.write(import_file.read())
with o.meta_update as meta:
meta['encoding'] = encoding
return o
@classmethod
def all(cls):
for subpath in os.listdir(cls.base_path()):
user_import = UserImport(subpath)
if user_import.exists():
yield user_import
@property
def reports(self):
return Reports(self)
def __getattr__(self, name):
try:
return self.meta[name]
except KeyError:
raise AttributeError(name)
def delete(self):
if self.exists():
shutil.rmtree(self.path)
class Report(object):
def __init__(self, user_import, uuid):
self.user_import = user_import
self.uuid = uuid
self.path = os.path.join(self.user_import.path, '%s%s' % (Reports.PREFIX, uuid))
@cached_property
def created(self):
return datetime.datetime.fromtimestamp(os.path.getctime(self.path), utc)
@cached_property
def data(self):
data = {}
if os.path.exists(self.path):
with open(self.path) as fd:
data = pickle.load(fd)
return data
@property
@contextlib.contextmanager
def data_update(self):
try:
yield self.data
finally:
with atomic_write(self.path, overwrite=True) as fd:
pickle.dump(self.data, fd)
@classmethod
def new(cls, user_import):
report = cls(user_import, new_id())
with report.data_update as data:
data['encoding'] = user_import.meta['encoding']
data['ou'] = user_import.meta.get('ou')
data['state'] = 'waiting'
return report
def run(self, start=True, simulate=False):
assert self.data.get('state') == 'waiting'
with self.data_update as data:
data['simulate'] = simulate
def target():
from authentic2.csv_import import UserCsvImporter
with self.user_import.import_file as fd:
importer = UserCsvImporter()
start = datetime.datetime.now()
with self.data_update as data:
data['state'] = 'running'
data['pid'] = os.getpid()
data['tid'] = gettid()
try:
importer.run(fd,
encoding=self.data['encoding'],
ou=self.data['ou'],
simulate=simulate)
except Exception as e:
logger.exception('error during report %s:%s run', self.user_import.uuid, self.uuid)
state = 'error'
try:
exception = six.text_type(e)
except Exception:
exception = repr(repr(e))
else:
exception = None
state = 'finished'
finally:
duration = datetime.datetime.now() - start
try:
connection.close()
except Exception:
logger.exception('cannot close connection to DB')
with self.data_update as data:
data['state'] = state
data['exception'] = exception
data['importer'] = importer
data['duration'] = duration
t = threading.Thread(target=target)
t.daemon = True
if start:
t.start()
return t
def is_running(self):
try:
pid = self.pid
tid = self.tid
return os.path.exists('/proc/%s/task/%s/' % (pid, tid))
except AttributeError:
return False
def __getattr__(self, name):
try:
return self.data[name]
except KeyError:
raise AttributeError(name)
def exists(self):
return os.path.exists(self.path)
def delete(self):
if self.simulate and self.exists():
os.unlink(self.path)
class Reports(object):
PREFIX = 'report-'
def __init__(self, user_import):
self.user_import = user_import
def __getitem__(self, uuid):
report = Report(self.user_import, uuid)
if not report.exists():
raise KeyError
return report
def __iter__(self):
for name in os.listdir(self.user_import.path):
if name.startswith(self.PREFIX):
try:
yield self[name[len(self.PREFIX):]]
except KeyError:
pass

View File

@ -16,6 +16,7 @@
import datetime
import collections
import operator
from django.db import models
from django.utils.translation import ugettext_lazy as _, ugettext
@ -26,6 +27,8 @@ from django.core.urlresolvers import reverse
from django.contrib.auth import get_user_model
from django.contrib.contenttypes.models import ContentType
from django.contrib import messages
from django.views.generic import FormView, TemplateView
from django.http import Http404, FileResponse
import tablib
@ -36,12 +39,15 @@ from authentic2 import hooks
from django_rbac.utils import get_role_model, get_role_parenting_model, get_ou_model
from .views import BaseTableView, BaseAddView, \
BaseEditView, ActionMixin, OtherActionsMixin, Action, ExportMixin, \
BaseSubTableView, HideOUColumnMixin, BaseDeleteView, BaseDetailView
from .views import (BaseTableView, BaseAddView, BaseEditView, ActionMixin,
OtherActionsMixin, Action, ExportMixin, BaseSubTableView,
HideOUColumnMixin, BaseDeleteView, BaseDetailView,
PermissionMixin, MediaMixin)
from .tables import UserTable, UserRolesTable, OuUserRolesTable
from .forms import (UserSearchForm, UserAddForm, UserEditForm,
UserChangePasswordForm, ChooseUserRoleForm, UserRoleSearchForm, UserChangeEmailForm)
UserChangePasswordForm, ChooseUserRoleForm,
UserRoleSearchForm, UserChangeEmailForm, UserNewImportForm,
UserEditImportForm)
from .resources import UserResource
from .utils import get_ou_count, has_show_username
from . import app_settings
@ -104,6 +110,12 @@ class UsersView(HideOUColumnMixin, BaseTableView):
ou = self.search_form.cleaned_data.get('ou')
if ou and self.request.user.has_ou_perm('custom_user.add_user', ou):
ctx['add_ou'] = ou
extra_actions = ctx['extra_actions'] = []
if self.request.user.has_perm('custom_user.admin_user'):
extra_actions.append({
'url': reverse('a2-manager-users-imports'),
'label': _('Import users'),
})
return ctx
@ -618,3 +630,125 @@ class UserDeleteView(BaseDeleteView):
user_delete = UserDeleteView.as_view()
class UserImportsView(MediaMixin, PermissionMixin, FormView):
form_class = UserNewImportForm
permissions = ['custom_user.admin_user']
permissions_global = True
template_name = 'authentic2/manager/user_imports.html'
def post(self, request, *args, **kwargs):
from . import user_import
if 'delete' in request.POST:
uuid = request.POST['delete']
user_import.UserImport(uuid).delete()
return redirect(self.request, 'a2-manager-users-imports')
return super(UserImportsView, self).post(request, *args, **kwargs)
def form_valid(self, form):
user_import = form.save()
with user_import.meta_update as meta:
meta['user'] = self.request.user.get_full_name()
meta['user_pk'] = self.request.user.pk
return redirect(self.request, 'a2-manager-users-import', kwargs={'uuid': user_import.uuid})
def get_context_data(self, **kwargs):
from . import user_import
ctx = super(UserImportsView, self).get_context_data(**kwargs)
ctx['imports'] = sorted(user_import.UserImport.all(), key=operator.attrgetter('created'), reverse=True)
return ctx
user_imports = UserImportsView.as_view()
class UserImportView(MediaMixin, PermissionMixin, FormView):
form_class = UserEditImportForm
permissions = ['custom_user.admin_user']
permissions_global = True
template_name = 'authentic2/manager/user_import.html'
def dispatch(self, request, uuid, **kwargs):
from user_import import UserImport
self.user_import = UserImport(uuid)
if not self.user_import.exists():
raise Http404
return super(UserImportView, self).dispatch(request, uuid, **kwargs)
def get(self, request, uuid, filename=None):
if filename:
return FileResponse(self.user_import.import_file, content_type='text/csv')
return super(UserImportView, self).get(request, uuid=uuid, filename=filename)
def get_form_kwargs(self):
kwargs = super(UserImportView, self).get_form_kwargs()
kwargs['user_import'] = self.user_import
return kwargs
def post(self, request, *args, **kwargs):
from . import user_import
if 'delete' in request.POST:
uuid = request.POST['delete']
try:
report = self.user_import.reports[uuid]
except KeyError:
pass
else:
report.delete()
return redirect(request, 'a2-manager-users-import', kwargs={'uuid': self.user_import.uuid})
simulate = 'simulate' in request.POST
execute = 'execute' in request.POST
if simulate or execute:
report = user_import.Report.new(self.user_import)
report.run(simulate=simulate)
return redirect(request, 'a2-manager-users-import', kwargs={'uuid': self.user_import.uuid})
return super(UserImportView, self).post(request, *args, **kwargs)
def form_valid(self, form):
form.save()
return super(UserImportView, self).form_valid(form)
def get_success_url(self):
return reverse('a2-manager-users-import', kwargs={'uuid': self.user_import.uuid})
def get_context_data(self, **kwargs):
ctx = super(UserImportView, self).get_context_data(**kwargs)
ctx['user_import'] = self.user_import
ctx['reports'] = sorted(self.user_import.reports, key=operator.attrgetter('created'), reverse=True)
return ctx
user_import = UserImportView.as_view()
class UserImportReportView(MediaMixin, PermissionMixin, TemplateView):
form_class = UserEditImportForm
permissions = ['custom_user.admin_user']
permissions_global = True
template_name = 'authentic2/manager/user_import_report.html'
def dispatch(self, request, import_uuid, report_uuid):
from user_import import UserImport
self.user_import = UserImport(import_uuid)
if not self.user_import.exists():
raise Http404
try:
self.report = self.user_import.reports[report_uuid]
except KeyError:
raise Http404
return super(UserImportReportView, self).dispatch(request, import_uuid, report_uuid)
def get_context_data(self, **kwargs):
ctx = super(UserImportReportView, self).get_context_data(**kwargs)
ctx['user_import'] = self.user_import
ctx['report'] = self.report
if self.report.simulate:
ctx['report_title'] = _('Simulation')
else:
ctx['report_title'] = _('Execution')
return ctx
user_import_report = UserImportReportView.as_view()

View File

@ -101,6 +101,7 @@ class MediaMixin(object):
class PermissionMixin(object):
'''Control access to views based on permissions'''
permissions = None
permissions_global = False
def authorize(self, request, *args, **kwargs):
if hasattr(self, 'model'):
@ -130,9 +131,11 @@ class PermissionMixin(object):
and not request.user.has_perm_any(self.permissions):
raise PermissionDenied
else:
if self.permissions \
and not request.user.has_perm_any(self.permissions):
raise PermissionDenied
if self.permissions:
if self.permissions_global and not request.user.has_perms(self.permissions):
raise PermissionDenied
if not self.permissions_global and not request.user.has_perm_any(self.permissions):
raise PermissionDenied
def dispatch(self, request, *args, **kwargs):
response = self.authorize(request, *args, **kwargs)

View File

@ -21,6 +21,7 @@ import logging
import uuid
import datetime
import copy
import ctypes
from functools import wraps
from itertools import islice, chain, count
@ -1144,3 +1145,10 @@ def get_authentication_events(request=None, session=None):
if session is not None:
return session.get(constants.AUTHENTICATION_EVENTS_SESSION_KEY, [])
return []
def gettid():
"""Returns OS thread id - Specific to Linux"""
libc = ctypes.cdll.LoadLibrary('libc.so.6')
SYS_gettid = 186
return libc.syscall(SYS_gettid)

View File

@ -0,0 +1,90 @@
# -*- coding: utf-8 -*-
# authentic2 - versatile identity manager
# Copyright (C) 2010-2019 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from __future__ import unicode_literals
import io
import operator
import pytest
from authentic2.manager.user_import import UserImport, Report
from authentic2.models import Attribute
from utils import skipif_sqlite
@pytest.fixture
def profile(transactional_db):
Attribute.objects.create(name='phone', kind='phone_number', label='Numéro de téléphone')
@skipif_sqlite
def test_user_import(media, transactional_db, profile):
content = '''email key verified,first_name,last_name,phone no-create
tnoel@entrouvert.com,Thomas,Noël,1234
fpeters@entrouvert.com,Frédéric,Péters,5678
x,x,x,x'''
fd = io.BytesIO(content.encode('utf-8'))
assert len(list(UserImport.all())) == 0
UserImport.new(fd, encoding='utf-8')
UserImport.new(fd, encoding='utf-8')
assert len(list(UserImport.all())) == 2
for user_import in UserImport.all():
with user_import.import_file as fd:
assert fd.read() == content.encode('utf-8')
for user_import in UserImport.all():
report = Report.new(user_import)
assert user_import.reports[report.uuid].exists()
assert user_import.reports[report.uuid].data['encoding'] == 'utf-8'
assert user_import.reports[report.uuid].data['state'] == 'waiting'
t = report.run(start=False)
t.start()
t.join()
assert user_import.reports[report.uuid].data['state'] == 'finished'
assert user_import.reports[report.uuid].data['importer']
assert not user_import.reports[report.uuid].data['importer'].errors
for user_import in UserImport.all():
reports = list(user_import.reports)
assert len(reports) == 1
assert reports[0].created
importer = reports[0].data['importer']
assert importer.rows[0].is_valid
assert importer.rows[1].is_valid
assert not importer.rows[2].is_valid
user_imports = sorted(UserImport.all(), key=operator.attrgetter('created'))
user_import1 = user_imports[0]
report1 = list(user_import1.reports)[0]
importer = report1.data['importer']
assert all(row.action == 'create' for row in importer.rows[:2])
assert all(cell.action == 'updated' for row in importer.rows[:2] for cell in row.cells[:3])
assert all(cell.action == 'nothing' for row in importer.rows[:2] for cell in row.cells[3:])
user_import2 = user_imports[1]
report2 = list(user_import2.reports)[0]
importer = report2.data['importer']
assert all(row.action == 'update' for row in importer.rows[:2])
assert all(cell.action == 'nothing' for row in importer.rows[:2] for cell in row.cells[:3])
assert all(cell.action == 'updated' for row in importer.rows[:2] for cell in row.cells[3:])

View File

@ -1,3 +1,4 @@
# -*- coding: utf-8 -*-
# authentic2 - versatile identity manager
# Copyright (C) 2010-2019 Entr'ouvert
#
@ -14,6 +15,10 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import csv
import time
import pytest
from webtest import Upload
from django.core.urlresolvers import reverse
@ -25,6 +30,7 @@ from django_rbac.utils import get_ou_model
from authentic2.custom_user.models import User
from authentic2.models import Attribute, AttributeValue
from authentic2.a2_rbac.utils import get_default_ou
from authentic2.manager import user_import
from utils import login, get_link_from_mail, skipif_sqlite
@ -210,3 +216,103 @@ def test_user_table(app, admin, user_ou1, ou1):
response = app.get('/manage/users/?search-ou=%s' % ou1.id)
assert response.pyquery('td.username')
@skipif_sqlite
@pytest.mark.parametrize('encoding', ['utf-8', 'cp1252', 'iso-8859-15'])
def test_user_import(encoding, transactional_db, app, admin, ou1, admin_ou1, media):
Attribute.objects.create(name='phone', kind='phone_number', label='Numéro de téléphone')
user_count = User.objects.count()
assert Attribute.objects.count() == 3
response = login(app, admin, '/manage/users/')
response = response.click('Import users')
response.form.set('import_file',
Upload(
'users.csv',
u'''email key verified,first_name,last_name,phone
tnoel@entrouvert.com,Thomas,Noël,1234
fpeters@entrouvert.com,Frédéric,Péters,5678
x,x,x,x'''.encode(encoding),
'application/octet-stream'))
response.form.set('encoding', encoding)
response.form.set('ou', str(get_default_ou().pk))
response = response.form.submit()
imports = list(user_import.UserImport.all())
assert len(imports) == 1
_import_uuid = response.location.split('/')[-2]
_import = user_import.UserImport(uuid=_import_uuid)
assert _import.exists()
response = response.follow()
response = response.forms['action-form'].submit(name='modify').follow()
response = response.forms['action-form'].submit(name='simulate')
reports = list(_import.reports)
assert len(reports) == 1
uuid = reports[0].uuid
response = response.follow()
def assert_timeout(duration, wait_function):
start = time.time()
while True:
result = wait_function()
if result is not None:
return result
assert time.time() - start < duration, '%s timed out after %s seconds' % (wait_function, duration)
time.sleep(0.001)
def wait_finished():
new_resp = response.click('User Import')
if new_resp.pyquery('tr[data-uuid="%s"] td.state span' % uuid).text() == 'finished':
return new_resp
simulate = reports[0]
assert simulate.simulate
response = assert_timeout(2, wait_finished)
response = response.click(href=simulate.uuid)
assert len(response.pyquery('table.main tbody tr')) == 3
assert len(response.pyquery('table.main tbody tr.row-valid')) == 2
assert len(response.pyquery('table.main tbody tr.row-invalid')) == 1
assert User.objects.count() == user_count
response = response.click('User Import')
response = response.forms['action-form'].submit(name='execute')
execute = list(report for report in _import.reports if not report.simulate)[0]
uuid = execute.uuid
response = response.follow()
response = assert_timeout(2, wait_finished)
assert User.objects.count() == user_count + 2
assert User.objects.filter(
email='tnoel@entrouvert.com',
first_name=u'Thomas',
last_name=u'Noël',
attribute_values__content='1234').count() == 1
assert User.objects.filter(
email='fpeters@entrouvert.com',
first_name=u'Frédéric',
last_name=u'Péters',
attribute_values__content='5678').count() == 1
# logout
app.session.flush()
response = login(app, admin_ou1, '/manage/users/')
app.get('/manage/users/import/', status=403)
app.get('/manage/users/import/%s/' % _import.uuid, status=403)
app.get('/manage/users/import/%s/%s/' % (_import.uuid, simulate.uuid), status=403)
app.get('/manage/users/import/%s/%s/' % (_import.uuid, execute.uuid), status=403)