misc: add one-time token model (#39745)

This commit is contained in:
Benjamin Dauvergne 2020-02-11 12:05:49 +01:00
parent 208dd0755c
commit 3d55b1a0c5
4 changed files with 176 additions and 1 deletions

View File

@ -39,3 +39,5 @@ class Command(BaseCommand):
manager = getattr(model, 'objects', None)
if hasattr(manager, 'cleanup'):
manager.cleanup()
if hasattr(model, 'cleanup'):
model.cleanup()

View File

@ -0,0 +1,30 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.11.20 on 2020-02-11 10:27
from __future__ import unicode_literals
import django.contrib.postgres.fields.jsonb
from django.db import migrations, models
import uuid
class Migration(migrations.Migration):
dependencies = [
('authentic2', '0025_auto_20191009_1047'),
]
operations = [
migrations.CreateModel(
name='Token',
fields=[
('uuid', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False, verbose_name='Identifier')),
('kind', models.CharField(max_length=32, verbose_name='Kind')),
('content', django.contrib.postgres.fields.jsonb.JSONField(blank=True, verbose_name='Content')),
('created', models.DateTimeField(verbose_name='Creation date', auto_now_add=True)),
('expires', models.DateTimeField(verbose_name='Expires')),
],
options={
'ordering': ('-expires', 'kind', 'uuid'),
},
),
]

View File

@ -14,21 +14,25 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import datetime
import time
import uuid
from django.utils.http import urlquote
from django.conf import settings
from django.db import models, transaction
from django.db.models.query import Q
from django.utils import six
from django.utils import six, timezone
from django.utils.translation import ugettext_lazy as _
from django.utils.six.moves.urllib import parse as urlparse
from django.core.exceptions import ValidationError
from django.contrib.contenttypes.models import ContentType
from django.contrib.postgres.fields import jsonb
from model_utils.managers import QueryManager
from authentic2.a2_rbac.models import Role
from authentic2.crypto import base64url_encode, base64url_decode
from django_rbac.utils import get_role_model_name
try:
@ -456,3 +460,68 @@ Service._meta.natural_key = [['slug', 'ou']]
class AuthorizedRole(models.Model):
service = models.ForeignKey(Service, on_delete=models.CASCADE)
role = models.ForeignKey(get_role_model_name(), on_delete=models.CASCADE)
class Token(models.Model):
uuid = models.UUIDField(
verbose_name=_('Identifier'),
primary_key=True,
default=uuid.uuid4,
editable=False)
kind = models.CharField(
verbose_name=_('Kind'),
max_length=32)
content = jsonb.JSONField(
verbose_name=_('Content'),
blank=True)
created = models.DateTimeField(
verbose_name=_('Creation date'),
auto_now_add=True)
expires = models.DateTimeField(
verbose_name=_('Expires'))
class Meta:
ordering = ('-expires', 'kind', 'uuid')
@property
def uuid_b64url(self):
return base64url_encode(self.uuid.bytes).decode('ascii')
@classmethod
def create(cls, kind, content, expires=None, duration=60):
expires = expires or (timezone.now() + datetime.timedelta(seconds=duration))
return cls.objects.create(
kind=kind,
content=content,
expires=expires)
@classmethod
def _decode_uuid(cls, _uuid):
try:
_uuid = uuid.UUID(_uuid)
except (TypeError, ValueError):
pass
else:
return _uuid
if isinstance(_uuid, six.text_type):
_uuid = _uuid.encode('ascii')
_uuid = base64url_decode(_uuid)
return uuid.UUID(bytes=_uuid)
@classmethod
def use(cls, kind, _uuid, now=None, delete=True):
'''Can raise TypeError, ValueError if uuid is invalid, DoesNotExist if uuid is unknown or expired.'''
now = now or timezone.now()
if not isinstance(_uuid, uuid.UUID):
_uuid = cls._decode_uuid(_uuid)
with transaction.atomic():
token = cls.objects.get(kind=kind, uuid=_uuid, expires__gt=now)
if delete:
token.delete()
return token
@classmethod
def cleanup(cls, now=None):
now = now or timezone.now()
cls.objects.filter(expires__lte=now).delete()

74
tests/test_token.py Normal file
View File

@ -0,0 +1,74 @@
# authentic2 - versatile identity manager
# Copyright (C) 2010-2020 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from __future__ import unicode_literals
import pytest
from authentic2.models import Token
def test_base(db):
assert Token.objects.count() == 0
token = Token.create('su', {'user_pk': 36})
assert Token.objects.count() == 1
assert Token.use('su', token.uuid, delete=False) == token
assert Token.use('su', token.uuid.bytes, delete=False) == token
assert Token.use('su', token.uuid.hex, delete=False) == token
assert Token.use('su', token.uuid_b64url, delete=False) == token
token2 = Token.use('su', str(token.uuid), delete=False)
with pytest.raises(Token.DoesNotExist):
Token.use('wtf', str(token.uuid))
assert token2.content == {'user_pk': 36}
Token.use('su', token.uuid)
assert Token.objects.count() == 0
with pytest.raises(Token.DoesNotExist):
Token.use('su', token.uuid)
def test_default_expires(db, freezer):
freezer.move_to('2020-01-01')
assert Token.objects.count() == 0
token = Token.create('su', {'user_pk': 36})
Token.use('su', str(token.uuid), delete=False)
freezer.tick(60) # advance 60 seconds
with pytest.raises(Token.DoesNotExist):
Token.use('su', str(token.uuid), delete=False)
def test_default_integer_expires(db, freezer):
freezer.move_to('2020-01-01')
assert Token.objects.count() == 0
token = Token.create('su', {'user_pk': 36}, duration=120)
Token.use('su', str(token.uuid), delete=False)
freezer.tick(60) # advance 60 seconds
Token.use('su', str(token.uuid), delete=False)
freezer.tick(60) # advance 60 seconds
with pytest.raises(Token.DoesNotExist):
Token.use('su', str(token.uuid), delete=False)
def test_cleanup(db, freezer):
freezer.move_to('2020-01-01')
Token.create('su', {'user_pk': 36})
assert Token.objects.count() == 1
Token.cleanup()
assert Token.objects.count() == 1
freezer.tick(60) # advance 60 seconds
Token.cleanup()
assert Token.objects.count() == 0