api/statistics: add inactivity-related events (#85790)
gitea/authentic/pipeline/head This commit looks good Details

This commit is contained in:
Paul Marillonnet 2024-01-23 10:27:11 +01:00
parent 06ea77c46d
commit b41cca7ec5
2 changed files with 169 additions and 4 deletions

View File

@ -62,7 +62,12 @@ from .a2_rbac.models import OrganizationalUnit, Role, RoleParenting
from .a2_rbac.utils import get_default_ou
from .apps.journal.models import Event
from .custom_user.models import Profile, ProfileType, User
from .journal_event_types import UserLogin, UserRegistration
from .journal_event_types import (
UserDeletionForInactivity,
UserLogin,
UserNotificationInactivity,
UserRegistration,
)
from .models import APIClient, Attribute, PasswordReset, Service
from .passwords import get_password_checker, get_password_strength
from .utils import hooks
@ -1638,7 +1643,12 @@ class StatisticsAPI(ViewSet):
url = self.reverse_action(action.url_name)
filters = common_filters.copy()
if action.url_name in ('login-new', 'registration-new'):
if action.url_name in (
'login-new',
'registration-new',
'inactivity-alert',
'inactivity-deletion',
):
filters.append(group_by_filter)
deprecated = False
else:
@ -1760,6 +1770,14 @@ class StatisticsAPI(ViewSet):
def registration_new(self, request):
return self.get_statistics(request, UserRegistration)
@stat(name=_('Deletion for inactivity count'))
def inactivity_deletion(self, request):
return self.get_statistics(request, UserDeletionForInactivity)
@stat(name=_('Inactivity alert count'))
def inactivity_alert(self, request):
return self.get_statistics(request, UserNotificationInactivity)
@stat(name=_('Login count by authentication type'), filters=('services_ou', 'users_ou', 'service'))
def login(self, request):
return self.get_statistics(request, UserLogin, 'get_method_statistics')

View File

@ -33,7 +33,7 @@ User = get_user_model()
def test_api_statistics_list(app, admin):
headers = basic_authorization_header(admin)
resp = app.get('/api/statistics/', headers=headers)
assert len(resp.json['data']) == 2
assert len(resp.json['data']) == 4
login_new = {
'deprecated': False,
'filters': [
@ -94,8 +94,69 @@ def test_api_statistics_list(app, admin):
}
assert login_new in resp.json['data']
assert register_new in resp.json['data']
assert {
'name': 'Inactivity alert count',
'url': 'https://testserver/api/statistics/inactivity_alert/',
'id': 'inactivity-alert',
'filters': [
{
'id': 'time_interval',
'label': 'Time interval',
'options': [
{'id': 'day', 'label': 'Day'},
{'id': 'month', 'label': 'Month'},
{'id': 'year', 'label': 'Year'},
],
'required': True,
'default': 'month',
},
{
'id': 'group_by',
'label': 'Group by',
'options': [
{'id': 'authentication_type', 'label': 'Authentication type'},
{'id': 'service', 'label': 'Service'},
{'id': 'service_ou', 'label': 'Organizational unit'},
],
'has_subfilters': True,
},
],
'deprecated': False,
} in resp.json['data']
assert {
'name': 'Deletion for inactivity count',
'url': 'https://testserver/api/statistics/inactivity_deletion/',
'id': 'inactivity-deletion',
'filters': [
{
'id': 'time_interval',
'label': 'Time interval',
'options': [
{'id': 'day', 'label': 'Day'},
{'id': 'month', 'label': 'Month'},
{'id': 'year', 'label': 'Year'},
],
'required': True,
'default': 'month',
},
{
'id': 'group_by',
'label': 'Group by',
'options': [
{'id': 'authentication_type', 'label': 'Authentication type'},
{'id': 'service', 'label': 'Service'},
{'id': 'service_ou', 'label': 'Organizational unit'},
],
'has_subfilters': True,
},
],
'deprecated': False,
} in resp.json['data']
actions_id = {elt['id'] for elt in resp.json['data']}
assert {'login-new', 'registration-new'} == actions_id
assert {'login-new', 'registration-new', 'inactivity-alert', 'inactivity-deletion'} == actions_id
def test_api_statistics_list_new(app, admin):
@ -338,6 +399,92 @@ def test_api_statistics(app, admin, freezer, event_type_name, event_name):
assert 'count' in resp.json['data']['series'][0]['label']
@pytest.mark.parametrize(
'event_type_name,event_name,event_description',
[
(
'user.notification.inactivity',
'inactivity_alert',
'Inactivity alert count',
),
(
'user.deletion.inactivity',
'inactivity_deletion',
'Deletion for inactivity count',
),
],
)
def test_api_statistics_inactivity_events(
app, admin, freezer, event_type_name, event_name, event_description
):
headers = basic_authorization_header(admin)
resp = app.get('/api/statistics/login/?time_interval=month', headers=headers)
assert resp.json == {'data': {'series': [], 'x_labels': [], 'subfilters': []}, 'err': 0}
event_type = EventType.objects.get_for_name(event_type_name)
freezer.move_to('2020-02-03 12:00')
Event.objects.create(
type=event_type, data=dict(days_of_inactivity=3, identifier='john.doe', days_to_deletion=5)
)
Event.objects.create(
type=event_type, data=dict(days_of_inactivity=5, identifier='john.doe', days_to_deletion=8)
)
freezer.move_to('2020-03-04 13:00')
Event.objects.create(
type=event_type, data=dict(days_of_inactivity=3, identifier='john.doe', days_to_deletion=5)
)
Event.objects.create(
type=event_type, data=dict(days_of_inactivity=5, identifier='john.doe', days_to_deletion=8)
)
params = {}
url = '/api/statistics/%s/' % event_name
resp = app.get(url, headers=headers, params=params)
assert resp.json['data']['x_labels'] == ['2020-02', '2020-03']
assert resp.json['data']['series'] == [
{'label': event_description, 'data': [2, 2]},
]
# default time interval is 'month'
month_data = resp.json['data']
resp = app.get(url, headers=headers, params=params)
assert month_data == resp.json['data']
resp = app.get(url, headers=headers, params=params)
assert resp.json['data']['x_labels'] == ['2020-02', '2020-03']
assert resp.json['data']['series'] == [{'label': event_description, 'data': [2, 2]}]
resp = app.get(url, headers=headers, params={'users_ou': 'default', **params})
assert resp.json['data']['x_labels'] == ['2020-02', '2020-03']
assert resp.json['data']['series'] == [{'label': event_description, 'data': [2, 2]}]
resp = app.get(url, headers=headers, params={'start': '2020-03-01T01:01', **params})
assert resp.json['data']['x_labels'] == ['2020-03']
assert resp.json['data']['series'] == [{'label': event_description, 'data': [2]}]
resp = app.get(url, headers=headers, params={'end': '2020-03-01T01:01', **params})
assert resp.json['data']['x_labels'] == ['2020-02']
assert resp.json['data']['series'] == [{'label': event_description, 'data': [2]}]
resp = app.get(url, headers=headers, params={'end': '2020-03-01', **params})
assert resp.json['data']['x_labels'] == ['2020-02']
assert resp.json['data']['series'] == [{'label': event_description, 'data': [2]}]
resp = app.get(
url, headers=headers, params={'time_interval': 'year', 'service': 'portal second', **params}
)
assert resp.json['data']['x_labels'] == ['2020']
assert resp.json['data']['series'] == [{'label': event_description, 'data': [4]}]
# forbidden filter is ignored
resp = app.get(url, headers=headers, params={'service': 'portal second', **params})
assert resp.json['data']['x_labels'] == ['2020-02', '2020-03']
assert resp.json['data']['series'] == [{'label': event_description, 'data': [2, 2]}]
def test_api_statistics_no_crash_older_drf(app, admin):
headers = basic_authorization_header(admin)
app.get('/api/statistics/login/?time_interval=month', headers=headers)