misc: add journal application (#47155)

This commit is contained in:
Benjamin Dauvergne 2020-08-23 02:28:25 +02:00
parent 8899d25376
commit 9a1631b18a
25 changed files with 1947 additions and 2 deletions

View File

@ -14,4 +14,4 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
default_app_config = 'authentic2.apps.Authentic2Config'
default_app_config = 'authentic2.app.Authentic2Config'

View File

View File

@ -0,0 +1,18 @@
# authentic2 - versatile identity manager
# Copyright (C) 2010-2020 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
default_app_config = 'authentic2.apps.journal.app.JournalAppConfig'

View File

@ -0,0 +1,70 @@
# authentic2 - versatile identity manager
# Copyright (C) 2010-2020 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import json
from django.contrib import admin
from django.utils.html import format_html
from .models import EventType, Event
class EventTypeAdmin(admin.ModelAdmin):
list_display = [
'__str__',
'name',
]
admin.site.register(EventType, EventTypeAdmin)
class EventAdmin(admin.ModelAdmin):
date_hierarchy = 'timestamp'
list_filter = ['type']
list_display = [
'timestamp',
'type',
'user',
'session_id_shortened',
'message',
]
fields = [
'timestamp',
'type',
'user',
'session_id_shortened',
'formatted_references',
'message',
'raw_json',
]
readonly_fields = [
'timestamp',
'user',
'session_id_shortened',
'formatted_references',
'message',
'raw_json',
]
def formatted_references(self, event):
return format_html('<pre>{}</pre>', event.reference_ids or [])
def raw_json(self, event):
return format_html('<pre>{}</pre>', json.dumps(event.data or {}, indent=4))
admin.site.register(Event, EventAdmin)

View File

@ -0,0 +1,29 @@
# authentic2 - versatile identity manager
# Copyright (C) 2010-2020 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from django.apps import AppConfig
from django.utils.module_loading import autodiscover_modules
from django.utils.translation import ugettext_lazy as _
class JournalAppConfig(AppConfig):
name = 'authentic2.apps.journal'
verbose_name = _('Journal')
def ready(self):
from . import models
autodiscover_modules('journal_event_types', register_to=models)

View File

@ -0,0 +1,352 @@
# authentic2 - versatile identity manager
# Copyright (C) 2010-2020 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from datetime import datetime
from django.http import QueryDict
from django.utils.formats import date_format
from django.utils.functional import cached_property
from django.utils.translation import ugettext_lazy as _
from django import forms
from . import models, search_engine
class Page:
def __init__(self, form, events, is_first_page, is_last_page):
self.form = form
self.events = events
self.is_first_page = is_first_page
self.is_last_page = is_last_page
self.limit = form.limit
@property
def previous_page_cursor(self):
return None if self.is_first_page else self.events[0].cursor
@property
def next_page_cursor(self):
return None if self.is_last_page else self.events[-1].cursor
@cached_property
def next_page_url(self):
if self.is_last_page:
return None
else:
return self.form.make_url('after_cursor', self.events[-1].cursor)
@cached_property
def first_page_url(self):
return self.form.make_url('after_cursor', '0 0')
@cached_property
def previous_page_url(self):
if self.is_first_page:
return None
else:
return self.form.make_url('before_cursor', self.events[0].cursor)
@cached_property
def last_page_url(self):
return self.form.make_url('before_cursor', '%s 0' % (2 ** 31 - 1))
def __bool__(self):
return bool(self.events)
def __iter__(self):
return reversed(self.events)
class DateHierarchy:
def __init__(self, form, year=None, month=None, day=None):
self.form = form
self.year = year
self.month = month
self.day = day
@property
def title(self):
if self.day:
return date_format(self.current_datetime, 'DATE_FORMAT')
elif self.month:
return date_format(self.current_datetime, 'F Y')
elif self.year:
return str(self.year)
@cached_property
def back_urls(self):
def helper():
if self.year:
yield _('All dates'), self.form.make_url(exclude=['year', 'month', 'day'])
if self.month:
yield str(self.year), self.form.make_url(exclude=['month', 'day'])
current_datetime = datetime(self.year, self.month or 1, self.day or 1)
month_name = date_format(current_datetime, format='F Y').title()
if self.day:
yield month_name, self.form.make_url(exclude=['day'])
yield str(self.day), '#'
else:
yield month_name, '#'
else:
yield str(self.year), '#'
else:
yield _('All dates'), '#'
return list(helper())
@property
def current_datetime(self):
return datetime(self.year or 1900, self.month or 1, self.day or 1)
@property
def month_name(self):
return date_format(self.current_datetime, format='F')
@cached_property
def choice_urls(self):
def helper():
if self.day:
return
elif self.month:
for day in self.form.days:
yield str(day), self.form.make_url('day', day)
elif self.year:
for month in self.form.months:
dt = datetime(self.year, month, 1)
month_name = date_format(dt, format='F')
yield month_name, self.form.make_url('month', month, exclude=['day'])
else:
for year in self.form.years:
yield str(year), self.form.make_url('year', year, exclude=['month', 'day'])
return list(helper())
@property
def choice_name(self):
if self.day:
return
elif self.month:
return _('Days of %s') % self.month_name
elif self.year:
return _('Months of %s') % self.year
else:
return _('Years')
class SearchField(forms.CharField):
type = 'search'
class JournalForm(forms.Form):
year = forms.CharField(label=_('year'), widget=forms.HiddenInput(), required=False)
month = forms.CharField(label=_('month'), widget=forms.HiddenInput(), required=False)
day = forms.CharField(label=_('day'), widget=forms.HiddenInput(), required=False)
after_cursor = forms.CharField(widget=forms.HiddenInput(), required=False)
before_cursor = forms.CharField(widget=forms.HiddenInput(), required=False)
search = SearchField(required=False, label='')
search_engine_class = search_engine.JournalSearchEngine
def __init__(self, *args, **kwargs):
self.queryset = kwargs.pop('queryset', None)
if self.queryset is None:
self.queryset = models.Event.objects.all()
self.limit = kwargs.pop('limit', 20)
search_engine_class = kwargs.pop('search_engine_class', None)
if search_engine_class:
self.search_engine_class = search_engine_class
super().__init__(*args, **kwargs)
@cached_property
def years(self):
self.is_valid()
return [dt.year for dt in self.queryset.datetimes('timestamp', 'year')]
@cached_property
def months(self):
self.is_valid()
if self.cleaned_data.get('year'):
return [
dt.month
for dt in self.queryset.filter(timestamp__year=self.cleaned_data['year']).datetimes(
'timestamp', 'month'
)
]
return []
@cached_property
def days(self):
self.is_valid()
if self.cleaned_data.get('month') and self.cleaned_data.get('year'):
return [
dt.day
for dt in self.queryset.filter(
timestamp__year=self.cleaned_data['year'], timestamp__month=self.cleaned_data['month']
).datetimes('timestamp', 'day')
]
return []
@staticmethod
def _clean_integer_value(value):
try:
return int(value)
except ValueError:
return None
def clean_year(self):
return self._clean_integer_value(self.cleaned_data['year'])
def clean_month(self):
return self._clean_integer_value(self.cleaned_data['month'])
def clean_day(self):
return self._clean_integer_value(self.cleaned_data['day'])
def clean(self):
super().clean()
year = self.cleaned_data.get('year')
if year not in self.years:
self.cleaned_data['year'] = None
month = self.cleaned_data.get('month')
if month not in self.months:
self.cleaned_data['month'] = None
day = self.cleaned_data.get('day')
if day not in self.days:
self.cleaned_data['day'] = None
def clean_after_cursor(self):
return models.EventCursor.parse(self.cleaned_data['after_cursor'])
def clean_before_cursor(self):
return models.EventCursor.parse(self.cleaned_data['before_cursor'])
def clean_search(self):
self.cleaned_data['_search_query'] = self.search_engine_class().query(
query_string=self.cleaned_data['search']
)
return self.cleaned_data['search']
def get_queryset(self, limit=None):
self.is_valid()
qs = self.queryset
year = self.cleaned_data.get('year')
month = self.cleaned_data.get('month')
day = self.cleaned_data.get('day')
search_query = self.cleaned_data.get('_search_query')
if year:
qs = qs.filter(timestamp__year=year)
if month:
qs = qs.filter(timestamp__month=month)
if day:
qs = qs.filter(timestamp__day=day)
if search_query:
qs = qs.filter(search_query)
return qs
def make_querydict(self, name=None, value=None, exclude=()):
querydict = QueryDict(mutable=True)
for k, v in self.cleaned_data.items():
if k.startswith('_'):
continue
if k in exclude:
continue
if v:
querydict[k] = str(v)
if name:
if name in ['after_cursor', 'before_cursor']:
querydict.pop('after_cursor', None)
querydict.pop('before_cursor', None)
assert name in self.fields
assert value is not None
querydict[name] = value
return querydict
def make_url(self, name=None, value=None, exclude=()):
return '?' + self.make_querydict(name=name, value=value, exclude=exclude).urlencode()
@cached_property
def page(self):
self.is_valid()
after_cursor = self.cleaned_data['after_cursor']
before_cursor = self.cleaned_data['before_cursor']
first = False
last = False
limit = self.limit
qs = self.get_queryset()
if after_cursor:
page = list(qs[after_cursor : (limit + 2)])
first = not (qs[-1:after_cursor])
if len(page) > limit:
last = len(page) != (limit + 2)
if page[0].cursor == after_cursor:
page = page[1 : (limit + 1)]
else:
page = page[:limit]
else:
last = True
before_cursor = after_cursor if not page else page[-1].cursor
page = list(qs[-(limit + 1) : before_cursor])
first = len(page) < (limit + 1)
page = page[-limit:]
elif before_cursor:
page = list(qs[-(limit + 2) : before_cursor])
last = not (qs[before_cursor:1])
if len(page) > limit:
first = len(page) != (limit + 2)
page = page[-(limit + 1) : -1]
else:
first = True
after_cursor = before_cursor if not page else page[0].cursor
page = list(qs[after_cursor : (limit + 1)])
last = len(page) < (limit + 1)
page = page[:limit]
else:
qs = qs.order_by('-timestamp', '-id')
page = qs[: (limit + 1) : -1]
first = len(page) <= limit
last = True
page = page[-limit:]
models.prefetch_events_references(page)
if page:
self.data = self.data.copy()
self.cleaned_data['after_cursor'] = self.data['after_cursor'] = page[0].cursor.minus_one()
self.cleaned_data['before_cursor'] = ''
return Page(self, page, first, last)
@cached_property
def date_hierarchy(self):
self.is_valid()
return DateHierarchy(
self,
year=self.cleaned_data['year'],
month=self.cleaned_data['month'],
day=self.cleaned_data['day'],
)
@property
def url(self):
return self.make_url()

View File

@ -0,0 +1,65 @@
# authentic2 - versatile identity manager
# Copyright (C) 2010-2020 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import inspect
import logging
from authentic2.apps.journal.models import EventTypeDefinition
logger = logging.getLogger(__name__)
class Journal:
def __init__(self, request=None, user=None, session=None, service=None):
self.request = request
self._user = user
self._session = session
@property
def user(self):
return self._user or (
self.request.user
if hasattr(self.request, 'user') and self.request.user.is_authenticated
else None
)
@property
def session(self):
return self._session or (self.request.session if hasattr(self.request, 'session') else None)
def massage_kwargs(self, record_parameters, kwargs):
for key in ['user', 'session']:
if key in record_parameters and key not in kwargs:
kwargs[key] = getattr(self, key)
return kwargs
def record(self, event_type_name, **kwargs):
evd_class = EventTypeDefinition.get_for_name(event_type_name)
if evd_class is None:
logger.error('invalid event_type name "%s"', event_type_name)
return
try:
record = evd_class.record
record_signature = inspect.signature(record)
parameters = record_signature.parameters
kwargs = self.massage_kwargs(parameters, kwargs)
record(**kwargs)
except Exception:
logger.exception('failure to record event "%s"', event_type_name)
journal = Journal()

View File

@ -0,0 +1,118 @@
# Generated by Django 2.2.15 on 2020-08-23 16:56
from django.conf import settings
import django.contrib.postgres.fields
import django.contrib.postgres.fields.jsonb
from django.db import migrations, models
import django.db.models.deletion
from django.utils import timezone
class Migration(migrations.Migration):
initial = True
dependencies = [
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
('authentic2', '0027_remove_deleteduser'),
('sessions', '0001_initial'),
]
operations = [
migrations.CreateModel(
name='EventType',
fields=[
(
'id',
models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID'),
),
('name', models.SlugField(max_length=256, unique=True, verbose_name='name')),
],
options={
'verbose_name': 'event type',
'verbose_name_plural': 'event types',
'ordering': ('name',),
},
),
migrations.CreateModel(
name='Event',
fields=[
(
'id',
models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID'),
),
(
'timestamp',
models.DateTimeField(
default=timezone.now, editable=False, blank=True, verbose_name='timestamp'
),
),
(
'reference_ids',
django.contrib.postgres.fields.ArrayField(
base_field=models.BigIntegerField(),
null=True,
size=None,
verbose_name='reference ids',
),
),
(
'reference_ct_ids',
django.contrib.postgres.fields.ArrayField(
base_field=models.IntegerField(),
null=True,
size=None,
verbose_name='reference ct ids',
),
),
('data', django.contrib.postgres.fields.jsonb.JSONField(null=True, verbose_name='data')),
(
'session',
models.ForeignKey(
blank=True,
db_constraint=False,
null=True,
on_delete=django.db.models.deletion.DO_NOTHING,
to='sessions.Session',
verbose_name='session',
),
),
(
'type',
models.ForeignKey(
on_delete=django.db.models.deletion.PROTECT,
to='journal.EventType',
verbose_name='type',
),
),
(
'user',
models.ForeignKey(
blank=True,
db_constraint=False,
null=True,
on_delete=django.db.models.deletion.DO_NOTHING,
to=settings.AUTH_USER_MODEL,
verbose_name='user',
),
),
],
options={
'verbose_name': 'event',
'verbose_name_plural': 'events',
'ordering': ('timestamp', 'id'),
},
),
migrations.RunSQL(
[
'CREATE INDEX journal_event_timestamp_id_idx ON journal_event USING BRIN("timestamp", "id");',
'CREATE INDEX journal_event_reference_ids_idx ON journal_event USING GIN("reference_ids");',
'CREATE INDEX journal_event_reference_ct_ids_idx ON journal_event USING GIN("reference_ct_ids");',
],
[
'DROP INDEX journal_event_reference_ct_ids_idx;',
'DROP INDEX journal_event_reference_ids_idx;',
'DROP INDEX journal_event_timestamp_id_idx;',
]
),
]

View File

@ -0,0 +1,430 @@
# authentic2 - versatile identity manager
# Copyright (C) 2010-2020 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from collections import defaultdict
from contextlib import contextmanager
from datetime import datetime, timedelta
import logging
import re
from django.conf import settings
from django.contrib.auth import get_user_model
from django.contrib.postgres.fields import ArrayField, JSONField
from django.contrib.contenttypes.models import ContentType
from django.core.exceptions import ObjectDoesNotExist
from django.db import models
from django.db.models import QuerySet, Q, F, Value
from django.utils.translation import ugettext_lazy as _
from django.utils.timezone import utc, now
from authentic2.decorators import GlobalCache
from . import sql
logger = logging.getLogger(__name__)
User = get_user_model()
_registry = {}
@contextmanager
def clean_registry():
global _registry
old_registry = _registry
_registry = {}
yield
_registry = old_registry
class EventTypeDefinitionMeta(type):
def __new__(cls, name, bases, namespace, **kwargs):
global _registry
new_cls = type.__new__(cls, name, bases, namespace, **kwargs)
name = namespace.get('name')
if name:
assert (
new_cls.retention_days is None or new_cls.retention_days >= 0
), 'retention_days must be None or >= 0'
assert new_cls.name, 'name is missing'
assert re.match(r'^[a-z_]+(?:\.[a-z_]+)*$', new_cls.name), (
'%r is not proper event type name' % new_cls.name
)
assert new_cls.label, 'label is missing'
assert new_cls.name not in _registry, 'name %r is already registered' % new_cls.name
_registry[new_cls.name] = new_cls
return new_cls
class EventTypeDefinition(metaclass=EventTypeDefinitionMeta):
name = ''
label = None
# used to group type of events
# how long to keep this type of events
retention_days = None
@classmethod
def record(cls, user=None, session=None, references=None, data=None):
event_type = EventType.objects.get_for_name(cls.name)
Event.objects.create(
type=event_type,
user=user,
session_id=session and session.session_key,
references=references or None, # NULL values take less space
data=data or None, # NULL values take less space
)
@classmethod
def get_for_name(cls, name):
return _registry.get(name)
@classmethod
def search_by_name(self, name):
for evd_name, evd in _registry.items():
if evd_name == name or evd_name.rsplit('.', 1)[-1] == name:
yield evd
@classmethod
def get_message(self, event, context=None):
return self.label
def __repr__(self):
return '<EventTypeDefinition %r %s>' % (self.name, self.label)
@GlobalCache
def event_type_cache(name):
event_type, created = EventType.objects.get_or_create(name=name)
return event_type
class EventTypeManager(models.Manager):
def get_for_name(self, name):
return event_type_cache(name)
class EventType(models.Model):
name = models.SlugField(verbose_name=_('name'), max_length=256, unique=True)
@property
def definition(self):
return EventTypeDefinition.get_for_name(self.name)
def __str__(self):
definition = self.definition
if definition:
return str(definition.label)
else:
return '%s (definition not found)' % self.name
objects = EventTypeManager()
class Meta:
verbose_name = _('event type')
verbose_name_plural = _('event types')
ordering = ('name',)
class EventQuerySet(QuerySet):
@classmethod
def _which_references_query(cls, instance_or_model_class_or_queryset):
if isinstance(instance_or_model_class_or_queryset, type) and issubclass(
instance_or_model_class_or_queryset, models.Model
):
ct = ContentType.objects.get_for_model(instance_or_model_class_or_queryset)
q = Q(reference_ct_ids__contains=[ct.pk])
# users can also be references by the user_id column
if instance_or_model_class_or_queryset is User:
q |= Q(user__isnull=False)
return q
elif isinstance(instance_or_model_class_or_queryset, QuerySet):
qs = instance_or_model_class_or_queryset
model = qs.model
ct_model = ContentType.objects.get_for_model(model)
qs_array = qs.values_list(Value(ct_model.id << 32) + F('pk'), flat=True)
q = Q(reference_ids__overlap=sql.ArraySubquery(qs_array))
if issubclass(model, User):
q = q | Q(user__in=qs)
else:
instance = instance_or_model_class_or_queryset
q = Q(reference_ids__contains=[reference_integer(instance)])
if isinstance(instance, User):
q = q | Q(user=instance)
return q
def which_references(self, instance_or_queryset):
return self.filter(self._which_references_query(instance_or_queryset))
def from_cursor(self, cursor):
return self.filter(
Q(timestamp=cursor.timestamp, id__gte=cursor.event_id) | Q(timestamp__gt=cursor.timestamp)
)
def to_cursor(self, cursor):
return self.filter(
Q(timestamp=cursor.timestamp, id__lte=cursor.event_id) | Q(timestamp__lt=cursor.timestamp)
)
def __getitem__(self, i):
# slice by cursor:
# [cursor..20] or [-20..cursor]
# it simplifies pagination
if isinstance(i, slice) and i.step is None:
_slice = i
if isinstance(_slice.start, EventCursor) and isinstance(_slice.stop, int) and _slice.stop >= 0:
return self.from_cursor(_slice.start)[: _slice.stop]
if isinstance(_slice.start, int) and _slice.start <= 0 and isinstance(_slice.stop, EventCursor):
qs = self.order_by('-timestamp', '-id').to_cursor(_slice.stop)[: -_slice.start]
return list(reversed(qs))
return super().__getitem__(i)
def prefetch_references(self):
prefetch_events_references(self)
return self
class EventManager(models.Manager):
def get_queryset(self):
return super().get_queryset().select_related('type', 'user')
# contains/overlap operator on postresql ARRAY do not really support nested arrays,
# so we cannot use them to query an array generic foreign keys repsented as two
# elements integers arrays like '{{1,100},{2,300}}' as any integer will match.
# ex.:
# authentic_multitenant=# select '{{1,2}}'::int[] && '{{1,3}}'::int[];
# ?column?
# ----------
# t
# (1 line)
#
# To work around this limitation we map pair of integers (content_type.pk,
# instance.pk) to a corresponding unique 64-bit integer using the reversible
# mapping (content_type.pk << 32 + instance.pk).
def n_2_pairing(a, b):
return a * 2 ** 32 + b
def n_2_pairing_rev(n):
return (n >> 32, n & (2 ** 32 - 1))
def reference_integer(instance):
return n_2_pairing(ContentType.objects.get_for_model(instance).pk, instance.pk)
class Event(models.Model):
timestamp = models.DateTimeField(verbose_name=_('timestamp'), default=now, editable=False, blank=True)
user = models.ForeignKey(
verbose_name=_('user'),
to=User,
on_delete=models.DO_NOTHING,
db_constraint=False,
blank=True,
null=True,
)
session = models.ForeignKey(
verbose_name=_('session'),
to='sessions.Session',
on_delete=models.DO_NOTHING,
db_constraint=False,
blank=True,
null=True,
)
type = models.ForeignKey(verbose_name=_('type'), to=EventType, on_delete=models.PROTECT)
reference_ids = ArrayField(
verbose_name=_('reference ids'), base_field=models.BigIntegerField(), null=True,
)
reference_ct_ids = ArrayField(
verbose_name=_('reference ct ids'), base_field=models.IntegerField(), null=True,
)
data = JSONField(verbose_name=_('data'), null=True)
objects = EventManager.from_queryset(EventQuerySet)()
def __init__(self, *args, **kwargs):
references = kwargs.pop('references', ())
super().__init__(*args, **kwargs)
for reference in references or ():
self.add_reference_to_instance(reference)
def add_reference_to_instance(self, instance):
self.reference_ids = self.reference_ids or []
self.reference_ct_ids = self.reference_ct_ids or []
if instance is not None:
self.reference_ids.append(reference_integer(instance))
self.reference_ct_ids.append(ContentType.objects.get_for_model(instance).pk)
else:
self.reference_ids.append(0)
self.reference_ct_ids.append(0)
def get_reference_ids(self):
return map(n_2_pairing_rev, self.reference_ids or ())
@property
def references(self):
if not hasattr(self, '_references_cache'):
self._references_cache = []
for content_type_id, instance_pk in self.get_reference_ids():
if content_type_id != 0:
content_type = ContentType.objects.get_for_id(content_type_id)
try:
self._references_cache.append(content_type.get_object_for_this_type(pk=instance_pk))
continue
except ObjectDoesNotExist:
pass
self._references_cache.append(None)
return self._references_cache
@property
def cursor(self):
return EventCursor.from_event(self)
def __repr__(self):
return '<Event id:%s %s %s>' % (self.id, self.timestamp, self.type.name)
@classmethod
def cleanup(cls):
'''Expire old events by default retention days or customized at the
EventTypeDefinition level.'''
event_types_by_retention_days = defaultdict(set)
default_retention_days = getattr(settings, 'JOURNAL_DEFAULT_RETENTION_DAYS', 365 * 2)
for event_type in EventType.objects.all():
evd = event_type.definition
retention_days = evd.retention_days if evd else None
if retention_days == 0:
# do not expire
continue
if retention_days is None:
retention_days = default_retention_days
event_types_by_retention_days[retention_days].add(event_type)
for retention_days, event_types in event_types_by_retention_days.items():
threshold = now() - timedelta(days=retention_days)
Event.objects.filter(type__in=event_types).filter(timestamp__lt=threshold).delete()
@property
def session_id_shortened(self):
return (self.session_id and self.session_id[:6]) or '-'
@property
def message(self):
return self.message_in_context(None)
def message_in_context(self, context):
if self.type.definition:
try:
return self.type.definition.get_message(self, context)
except Exception:
logger.exception('could not render message of event type "%s"', self.type.name)
return self.type.name
def get_data(self, key, default=None):
return (self.data or {}).get(key, default)
def get_typed_references(self, *reference_types):
count = 0
for reference_type, reference in zip(reference_types, self.references):
if reference_type is None:
yield None
else:
if isinstance(reference, reference_type):
yield reference
else:
yield None
count += 1
for i in range(len(reference_types) - count):
yield None
class Meta:
verbose_name = _('event')
verbose_name_plural = _('events')
ordering = ('timestamp', 'id')
class EventCursor(str):
'''Represents a point in the journal'''
def __new__(cls, value):
self = super().__new__(cls, value)
try:
timestamp, event_id = value.split(' ', 1)
timestamp = float(timestamp)
event_id = int(event_id)
timestamp = datetime.fromtimestamp(timestamp, tz=utc)
except ValueError as e:
raise ValueError('invalid event cursor') from e
self.timestamp = timestamp
self.event_id = event_id
return self
@classmethod
def parse(cls, value):
try:
return cls(value)
except ValueError:
return None
@classmethod
def from_event(cls, event):
assert event.id is not None
assert event.timestamp is not None
cursor = super().__new__(cls, '%s %s' % (event.timestamp.timestamp(), event.id))
cursor.timestamp = event.timestamp
cursor.event_id = event.id
return cursor
def minus_one(self):
return EventCursor('%s %s' % (self.timestamp.timestamp(), self.event_id - 1))
def prefetch_events_references(events):
'''Prefetch references on an iterable of events, prevent N+1 queries problem.'''
grouped_references = defaultdict(set)
references = {}
# group reference ids
for event in events:
for content_type_id, instance_pk in event.get_reference_ids():
grouped_references[content_type_id].add(instance_pk)
# make batched queries for each CT
for content_type_id, instance_pks in grouped_references.items():
content_type = ContentType.objects.get_for_id(content_type_id)
for instance in content_type.get_all_objects_for_this_type(pk__in=instance_pks):
references[(content_type_id, instance.pk)] = instance
# assign references to events
for event in events:
event._references_cache = [
references.get((content_type_id, instance_pk))
for content_type_id, instance_pk in event.get_reference_ids()
]

View File

@ -0,0 +1,136 @@
# authentic2 - versatile identity manager
# Copyright (C) 2010-2020 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from functools import reduce
import re
from django.contrib.auth import get_user_model
from django.db.models import Q
from django.utils.translation import ugettext_lazy as _
from . import models
User = get_user_model()
QUOTED_RE = re.compile(r'^[a-z0-9_-]*:"[^"]*"$')
LEXER_RE = re.compile(r'([a-z0-9_-]*:(?:"[^"]*"|[^ ]*)|[^\s]*)\s*')
class SearchEngine:
# https://stackoverflow.com/a/35894763/6686829
q_true = ~Q(pk__in=[])
q_false = Q(pk__in=[])
def lexer(self, query_string):
# quote can be used to passe string containing spaces to prefix directives, like :
# username:"john doe", used anywhere else they are part of words.
# ex. « john "doe » gives the list ['john', '"doe']
for lexem in LEXER_RE.findall(query_string):
if not lexem:
continue
if QUOTED_RE.match(lexem):
lexem = lexem.replace('"', '')
yield lexem
def query(self, query_string):
lexems = list(self.lexer(query_string))
queries = list(self.lexems_queries(lexems))
return reduce(Q.__and__, queries, self.q_true)
def lexems_queries(self, lexems):
unmatched_lexems = []
for lexem in lexems:
queries = list(self.lexem_queries(lexem))
if queries:
yield reduce(Q.__or__, queries)
else:
unmatched_lexems.append(lexem)
if unmatched_lexems:
query = self.unmatched_lexems_query(unmatched_lexems)
if query:
yield query
else:
yield self.q_false
def unmatched_lexems_query(self, unmatched_lexems):
return None
def lexem_queries(self, lexem):
yield from self.lexem_queries_by_prefix(lexem)
def lexem_queries_by_prefix(self, lexem):
if ':' not in lexem:
return
prefix = lexem.split(':', 1)[0]
method_name = 'search_by_' + prefix.replace('-', '_')
if not hasattr(self, method_name):
return
yield from getattr(self, method_name)(lexem[len(prefix) + 1 :])
@classmethod
def documentation(cls):
yield _('You can use quote around to preserve spaces.')
yield _('You can use colon terminated prefixes to make special searches.')
for name in dir(cls):
documentation = getattr(getattr(cls, name), 'documentation', None)
if documentation:
yield documentation
class JournalSearchEngine(SearchEngine):
def search_by_session(self, session_id):
yield Q(session__session_key__startswith=session_id)
search_by_session.documentation = _(
'''\
You can use <tt>session:abcd</tt> to find all events related to the session whose key starts with <tt>abcd</tt>.'''
)
def search_by_event(self, event_name):
q = self.q_false
for evd in models.EventTypeDefinition.search_by_name(event_name.lower()):
q |= Q(type__name=evd.name)
yield q
search_by_event.documentation = _(
'''\
You can use <tt>event:login</tt> to find all events of type <tt>login</tt>.'''
)
def query_for_users(self, users):
return models.EventQuerySet._which_references_query(users)
def search_by_email(self, email):
users = User.objects.filter(email__iexact=email.lower())
yield (self.query_for_users(users) | Q(data__email__iexact=email.lower()))
search_by_event.documentation = _(
'''\
You can use <tt>email:john.doe@example.com</tt> to find all events related \
to users with this email address.</tt>.'''
)
def search_by_username(self, lexem):
users = User.objects.filter(username__startswith=lexem)
yield (self.query_for_users(users) | Q(data__username__startswith=lexem.lower()))
search_by_username.documentation = _(
'''\
You can use <tt>username:john</tt> to find all events related \
to users whose username starts with <tt>john</tt>.'''
)

View File

@ -0,0 +1,29 @@
# authentic2 - versatile identity manager
# Copyright (C) 2010-2020 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from django.db.models import Func, Subquery
from django.contrib.postgres.fields import ArrayField
class ArraySubquery(Func):
"""A Postgres ARRAY() expression"""
function = 'ARRAY'
arity = 1
def __init__(self, expression, output_field=None):
expression = Subquery(expression)
super().__init__(expression, output_field=ArrayField(output_field))

View File

@ -0,0 +1,8 @@
{% for caption, url in date_hierarchy.back_urls %}
<a class="date-hierarchy--back-url" href="{{ url }}">{{ caption }}</a>
{% endfor %}
{% if date_hierarchy.choice_urls %}
{% for caption, url in date_hierarchy.choice_urls %}
<a class="date-hierarchy--choice" href="{{ url }}">{{ caption }}</a>
{% endfor %}
{% endif %}

View File

@ -0,0 +1,35 @@
{% load i18n %}
<div class="table-container">
{% for event in page %}
{% if forloop.first %}
<table class="main">
<thead>
<tr>
<th class="journal-list--timestamp-column">{% trans "Timestamp" %}</th>
<th class="journal-list--user-column">{% trans "User" %}</th>
<th class="journal-list--session-column">{% trans "Session" %}</th>
<th class="journal-list--message-column">{% trans "Message" %}</th>
</tr>
</thead>
<tbody>
{% endif %}
<tr data-event-id="{{ event.id }}" data-event-cursor="{{ event.cursor }}" data-event-type="{{ event.type.name }}">
<td class="journal-list--timestamp-column">{% block event-timestamp %}{{ event.timestamp }}{% endblock %}</td>
<td class="journal-list--user-column" {% if event.user %}data-user-id="{{ event.user.id }}"{% endif %}>{% block event-user %}{{ event.user.get_full_name|default:"-" }}{% endblock %}</td>
<td class="journal-list--session-column">{% block event-session %}{{ event.session_id_shortened|default:"-" }}{% endblock %}</td>
<td class="journal-list--message-column">{% block event-message %}{{ event.message|default:"-" }}{% endblock %}</td>
</tr>
{% if forloop.last %}
</tbody>
</table>
{% endif %}
{% empty %}
{% block empty %}
<div>
<p>{% trans "Journal is empty." %}</p>
</div>
{% endblock %}
{% endfor %}
{% include "journal/pagination.html" with page=page %}
</div>

View File

@ -0,0 +1,14 @@
{% load i18n %}
<p class="paginator">
{% if not page.is_first_page %}
<a href="{{ page.first_page_url }}">{% trans "First page" %}</a>
<a href="{{ page.previous_page_url }}">{% trans "Previous page" %}</a>
{% endif %}
{% if not page.is_first_page and not page.is_last_page %}
{% endif %}
{% if not page.is_last_page %}
<a href="{{ page.next_page_url }}">{% trans "Next page" %}</a>
<a href="{{ page.last_page_url }}">{% trans "Last page" %}</a>
{% endif %}
</p>

View File

@ -0,0 +1,32 @@
# authentic2 - versatile identity manager
# Copyright (C) 2010-2020 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
def _json_value(value):
if isinstance(value, (dict, list, str, int, bool)) or value is None:
return value
return str(value)
def form_to_old_new(form):
old = {}
new = {}
for key in form.changed_data:
old_value = form.initial.get(key)
if old_value is not None:
old[key] = _json_value(old_value)
new[key] = _json_value(form.cleaned_data.get(key))
return {'old': old, 'new': new}

View File

@ -0,0 +1,83 @@
# authentic2 - versatile identity manager
# Copyright (C) 2010-2020 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from django.views.generic import TemplateView
from django.views.generic.edit import FormMixin
from . import models, forms
class JournalView(FormMixin, TemplateView):
form_class = forms.JournalForm
limit = 20
def get_events(self):
return models.Event.objects.all()
def get_form_kwargs(self):
queryset = self.get_events()
return {
'data': self.request.GET,
'limit': self.limit,
'queryset': queryset,
}
def get_context_data(self, **kwargs):
ctx = super().get_context_data(**kwargs)
form = ctx['form']
ctx['page'] = form.page
ctx['date_hierarchy'] = form.date_hierarchy
return ctx
class ContextDecoratedEvent:
def __init__(self, context, event):
self.context = context
self.event = event
def __getattr__(self, name):
return getattr(self.event, name)
@property
def message(self):
return self.event.message_in_context(self.context)
class ContextDecoratedPage:
def __init__(self, context, page):
self.context = context
self.page = page
def __getattr__(self, name):
return getattr(self.page, name)
def __iter__(self):
return (ContextDecoratedEvent(self.context, event) for event in self.page)
class JournalViewWithContext(JournalView):
context = None
def get_events(self):
qs = super().get_events()
qs = qs.which_references(self.context)
return qs
def get_context_data(self, **kwargs):
ctx = super().get_context_data(**kwargs)
ctx['context'] = self.context
ctx['page'] = ContextDecoratedPage(self.context, ctx['page'])
return ctx

View File

@ -144,6 +144,7 @@ INSTALLED_APPS = (
'authentic2.attribute_aggregator',
'authentic2.disco_service',
'authentic2.manager',
'authentic2.apps.journal',
'authentic2',
'django_rbac',
'authentic2.a2_rbac',

443
tests/test_journal.py Normal file
View File

@ -0,0 +1,443 @@
# authentic2 - versatile identity manager
# Copyright (C) 2010-2020 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from datetime import datetime, timedelta
import random
import mock
import pytest
from django.contrib.auth import get_user_model
from django.core.management import call_command
from django.utils.timezone import make_aware, make_naive
from authentic2.apps.journal.forms import JournalForm
from authentic2.apps.journal.journal import Journal
from authentic2.apps.journal.models import EventTypeDefinition, EventType, Event, clean_registry
from authentic2.models import Service
User = get_user_model()
@pytest.fixture
def clean_event_types_definition_registry(request):
'''Protect EventTypeDefinition registry'''
with clean_registry():
yield
@pytest.fixture
def some_event_types(clean_event_types_definition_registry):
class UserRegistrationRequest(EventTypeDefinition):
name = 'user.registration.request'
label = 'registration request'
@classmethod
def record(cls, email):
super().record(data={'email': email.lower()})
class UserRegistration(EventTypeDefinition):
name = 'user.registration'
label = 'registration'
@classmethod
def record(cls, user, session, how):
super().record(user=user, session=session, data={'how': how})
class UserLogin(EventTypeDefinition):
name = 'user.login'
label = 'login'
@classmethod
def record(cls, user, session, how):
super().record(user=user, session=session, data={'how': how})
class UserLogout(EventTypeDefinition):
name = 'user.logout'
label = 'logout'
@classmethod
def record(cls, user, session):
super().record(user=user, session=session)
yield locals()
def test_models(db, django_assert_num_queries):
service = Service.objects.create(name='service', slug='service')
service2 = Service.objects.create(name='service2', slug='service2')
user = User.objects.create(username='john.doe')
sso_event = EventType.objects.create(name='sso')
whatever_event = EventType.objects.create(name='whatever')
ev1 = Event.objects.create(user=user, type=sso_event, data={'method': 'oidc'}, references=[service])
events = [ev1]
events.append(Event.objects.create(type=whatever_event, references=[user]))
for i in range(10):
events.append(Event.objects.create(type=whatever_event, references=[service if i % 2 else service2]))
ev2 = events[6]
# check extended queryset methods
assert Event.objects.count() == 12
assert Event.objects.which_references(user).count() == 2
assert Event.objects.which_references(User).count() == 2
assert Event.objects.filter(user=user).count() == 1
assert Event.objects.which_references(service).count() == 6
assert Event.objects.which_references(Service).count() == 11
assert Event.objects.from_cursor(ev1.cursor).count() == 12
assert list(Event.objects.all()[ev2.cursor:2]) == events[6:8]
assert list(Event.objects.all()[-4:ev2.cursor]) == events[3:7]
assert set(Event.objects.which_references(service)[0].references) == set([service])
# verify type, user and service are prefetched
with django_assert_num_queries(3):
events = list(Event.objects.prefetch_references())
assert len(events) == 12
event = events[0]
event.type.name
assert event.user == user
assert len(event.references) == 1
assert event.references[0] == service
# check foreign key constraints are not enforced, log should not change if an object is deleted
Service.objects.all().delete()
User.objects.all().delete()
assert Event.objects.count() == 12
assert Event.objects.filter(user_id=user.id).count() == 1
assert Event.objects.which_references(user).count() == 2
assert Event.objects.which_references(service).count() == 6
assert list(Event.objects.all())
def test_references(db):
user = User.objects.create(username='user')
service = Service.objects.create(name='service', slug='service')
event_type = EventType.objects.get_for_name('user.login')
event = Event.objects.create(type=event_type, references=[user, service], user=user)
event = Event.objects.get()
assert list(event.get_typed_references(None, Service)) == [None, service]
event = Event.objects.get()
assert list(event.get_typed_references(User, None)) == [user, None]
event = Event.objects.get()
assert list(event.get_typed_references(Service, User)) == [None, None]
assert list(event.get_typed_references(User, Service)) == [user, service]
user.delete()
service.delete()
event = Event.objects.get()
assert list(event.get_typed_references(None, Service)) == [None, None]
event = Event.objects.get()
assert list(event.get_typed_references(User, None)) == [None, None]
event = Event.objects.get()
assert list(event.get_typed_references(Service, User)) == [None, None]
def test_event_types(clean_event_types_definition_registry):
class UserEventTypes(EventTypeDefinition):
name = 'user'
label = 'User events'
class SSO(UserEventTypes):
name = 'user.sso'
label = 'Single sign On'
# user is an abstract type
assert EventTypeDefinition.get_for_name('user') is UserEventTypes
assert EventTypeDefinition.get_for_name('user.sso') is SSO
with pytest.raises(AssertionError, match='already registered'):
class SSO2(UserEventTypes):
name = 'user.sso'
label = 'Single Sign On'
@pytest.mark.urls('tests.test_journal_app.urls')
def test_integration(clean_event_types_definition_registry, app_factory, db, settings):
settings.INSTALLED_APPS = [
'django.contrib.auth',
'django.contrib.sessions',
'authentic2.custom_user',
'authentic2.apps.journal',
'tests.test_journal_app',
]
app = app_factory()
# the whole test is in a transaction :/
app.get('/login/john.doe/')
assert Event.objects.count() == 1
event = Event.objects.get()
assert event.type.name == 'login'
assert event.user.username == 'john.doe'
assert event.session_id == app.session.session_key
assert event.reference_ids is None
assert event.data is None
@pytest.fixture
def random_events(db):
count = 100
from_date = make_aware(datetime(2000, 1, 1))
to_date = make_aware(datetime(2010, 1, 1))
duration = (to_date - from_date).total_seconds()
events = []
event_types = []
for name in 'abcdef':
event_types.append(EventType.objects.create(name=name))
for i in range(count):
events.append(
Event(
type=random.choice(event_types),
timestamp=from_date + timedelta(seconds=random.uniform(0, duration)),
)
)
Event.objects.bulk_create(events)
return list(Event.objects.order_by('timestamp', 'id'))
def test_journal_form_date_hierarchy(random_events, rf):
request = rf.get('/')
form = JournalForm(data=request.GET)
assert len(form.years) > 1 # 1 chance on 10**100 of false negative
assert all(2000 <= year < 2010 for year in form.years)
assert form.months == []
assert form.days == []
assert form.get_queryset().count() == 100
year = random.choice(form.years)
request = rf.get('/?year=%s' % year)
form = JournalForm(data=request.GET)
assert len(form.years) > 1
assert all(2000 <= year < 2010 for year in form.years)
assert len(form.months)
assert all(1 <= month <= 12 for month in form.months)
assert form.days == []
assert form.get_queryset().count() == len(
[
# use make_naive() as filter(timestamp__year=..) convert value to local datetime
# but event.timestamp only return UTC timezoned datetimes.
event
for event in random_events
if make_naive(event.timestamp).year == year
]
)
month = random.choice(form.months)
request = rf.get('/?year=%s&month=%s' % (year, month))
form = JournalForm(data=request.GET)
assert len(form.years) > 1
assert all(2000 <= year < 2010 for year in form.years)
assert len(form.months)
assert all(1 <= month <= 12 for month in form.months)
assert len(form.days)
assert all(1 <= day <= 31 for day in form.days)
assert form.get_queryset().count() == len(
[
# use make_naive() as filter(timestamp__year=..) convert value to local datetime
# but event.timestamp only return UTC timezoned datetimes.
event
for event in random_events
if make_naive(event.timestamp).year == year and make_naive(event.timestamp).month == month
]
)
day = random.choice(form.days)
datetime(year, month, day)
request = rf.get('/?year=%s&month=%s&day=%s' % (year, month, day))
form = JournalForm(data=request.GET)
assert len(form.years) > 1
assert all(2000 <= year < 2010 for year in form.years)
assert len(form.months) > 1
assert all(1 <= month <= 12 for month in form.months)
assert len(form.days)
assert all(1 <= day <= 31 for day in form.days)
assert form.get_queryset().count() == len(
[
event
for event in random_events
if event.timestamp.year == year and event.timestamp.month == month and event.timestamp.day == day
]
)
def test_journal_form_pagination(random_events, rf):
request = rf.get('/')
page = JournalForm(data=request.GET).page
assert not page.is_first_page
assert page.is_last_page
assert not page.next_page_url
assert page.previous_page_url
assert page.events == random_events[-page.limit:]
request = rf.get('/' + page.previous_page_url)
page = JournalForm(data=request.GET).page
assert not page.is_first_page
assert not page.is_last_page
assert page.next_page_url
assert page.previous_page_url
assert page.events == random_events[-2 * page.limit:-page.limit]
request = rf.get('/' + page.previous_page_url)
page = JournalForm(data=request.GET).page
assert not page.is_first_page
assert not page.is_last_page
assert page.next_page_url
assert page.previous_page_url
assert page.events == random_events[-3 * page.limit:-2 * page.limit]
request = rf.get('/' + page.next_page_url)
form = JournalForm(data=request.GET)
page = form.page
assert not page.is_first_page
assert not page.is_last_page
assert page.next_page_url
assert page.previous_page_url
assert page.events == random_events[-2 * page.limit:-page.limit]
event_after_the_first_page = random_events[page.limit]
request = rf.get('/' + form.make_url('before_cursor', event_after_the_first_page.cursor))
form = JournalForm(data=request.GET)
page = form.page
assert page.is_first_page
assert not page.is_last_page
assert page.next_page_url
assert not page.previous_page_url
assert page.events == random_events[: page.limit]
# Test cursors out of queryset range
request = rf.get('/?' + form.make_url('after_cursor', random_events[0].cursor))
form = JournalForm(
queryset=Event.objects.filter(
timestamp__range=[random_events[1].timestamp, random_events[20].timestamp]
),
data=request.GET,
)
page = form.page
assert page.is_first_page
assert page.is_last_page
assert not page.previous_page_url
assert not page.next_page_url
assert page.events == random_events[1:21]
request = rf.get('/' + form.make_url('before_cursor', random_events[21].cursor))
page = JournalForm(
queryset=Event.objects.filter(
timestamp__range=[random_events[1].timestamp, random_events[20].timestamp]
),
data=request.GET,
).page
assert page.is_first_page
assert page.is_last_page
assert not page.previous_page_url
assert not page.next_page_url
assert page.events == random_events[1:21]
@pytest.fixture
def user_events(db, some_event_types):
user = User.objects.create(username='john.doe', email='john.doe@example.com')
journal = Journal(user=user)
count = 100
journal.record('user.registration.request', email=user.email)
journal.record('user.registration', how='fc')
journal.record('user.logout')
for i in range(count):
journal.record('user.login', how='fc')
journal.record('user.logout')
return list(Event.objects.order_by('timestamp', 'id'))
def test_journal_form_search(user_events, rf):
request = rf.get('/')
form = JournalForm(data=request.GET)
assert form.get_queryset().count() == len(user_events)
request = rf.get('/', data={'search': 'email:jane.doe@example.com'})
form = JournalForm(data=request.GET)
assert form.get_queryset().count() == 0
request = rf.get('/', data={'search': 'email:john.doe@example.com event:registration'})
form = JournalForm(data=request.GET)
assert form.get_queryset().count() == 1
User.objects.update(username='john doe')
request = rf.get('/', data={'search': 'username:"john doe" event:registration'})
form = JournalForm(data=request.GET)
assert form.get_queryset().count() == 1
# unhandled lexems make the queryset empty
request = rf.get('/', data={'search': 'john doe event:registration'})
form = JournalForm(data=request.GET)
assert form.get_queryset().count() == 0
# unhandled prefix make unhandled lexems
request = rf.get('/', data={'search': 'test:john'})
form = JournalForm(data=request.GET)
assert form.get_queryset().count() == 0
def test_cleanup(user_events, some_event_types, freezer, monkeypatch):
monkeypatch.setattr(some_event_types['UserRegistration'], 'retention_days', 0)
count = Event.objects.count()
freezer.move_to(timedelta(days=365 * 2 - 1))
call_command('cleanupauthentic')
assert Event.objects.count() == count
freezer.move_to(timedelta(days=2))
call_command('cleanupauthentic')
assert Event.objects.count() == 1
def test_record_exception_handling(db, some_event_types, caplog):
journal = Journal()
journal.record('user.registration.request', email='john.doe@example.com')
assert len(caplog.records) == 0
with mock.patch.object(some_event_types['UserRegistrationRequest'], 'record', side_effect=Exception('boum')):
journal.record('user.registration.request', email='john.doe@example.com')
assert len(caplog.records) == 1
assert caplog.records[0].levelname == 'ERROR'
assert caplog.records[0].message == 'failure to record event "user.registration.request"'
def test_message_in_context_exception_handling(db, some_event_types, caplog):
user = User.objects.create(username='john.doe', email='john.doe@example.com')
journal = Journal()
journal.record('user.login', user=user, how='password')
event = Event.objects.get()
event.message
assert not(caplog.records)
caplog.clear()
with mock.patch.object(some_event_types['UserLogin'], 'get_message', side_effect=Exception('boum')):
event.message
assert len(caplog.records) == 1
assert caplog.records[0].levelname == 'ERROR'
assert caplog.records[0].message == 'could not render message of event type "user.login"'
caplog.clear()
with mock.patch.object(some_event_types['UserLogin'], 'get_message', side_effect=Exception('boum')):
event.message_in_context(None)
assert len(caplog.records) == 1
assert caplog.records[0].levelname == 'ERROR'
assert caplog.records[0].message == 'could not render message of event type "user.login"'

View File

View File

@ -0,0 +1,27 @@
# authentic2 - versatile identity manager
# Copyright (C) 2010-2020 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from authentic2.apps.journal.models import EventTypeDefinition
class Login(EventTypeDefinition):
name = 'login'
label = 'Login'
@classmethod
def record(cls, user=None, session=None):
super().record(user=user, session=session)

View File

@ -0,0 +1,23 @@
# authentic2 - versatile identity manager
# Copyright (C) 2010-2020 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from django.conf.urls import url
from . import views
urlpatterns = [
url('^login/(?P<name>[^/]+)/', views.login_view),
]

View File

@ -0,0 +1,33 @@
# authentic2 - versatile identity manager
# Copyright (C) 2010-2020 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from django.contrib.auth import get_user_model, login, authenticate
from django.http import HttpResponse
from authentic2.apps.journal.journal import journal
User = get_user_model()
def login_view(request, name):
user = User.objects.create(username=name)
user.set_password('coin')
user.save()
user = authenticate(username=name, password='coin')
login(request, user)
journal.record('login', user=user, session=request.session)
return HttpResponse('logged in', content_type='text/plain')

View File

@ -130,7 +130,6 @@ source =
authentic2_auth_saml
authentic2_idp_cas
authentic2_idp_oidc
authentic2_journal
authentic2_provisionning_ldap
django_rbac
branch = True