general: remove fargo.oauth2 (#67570)

This commit is contained in:
Frédéric Péters 2022-07-20 23:15:40 +02:00
parent 61680f8410
commit c4a28c47c1
29 changed files with 11 additions and 1473 deletions

View File

@ -28,7 +28,7 @@ class DocumentManager(models.Manager):
n = n or now()
# use a window of 60 seconds to be sure this document will never be used
qs = self.filter(creation_date__lt=n - datetime.timedelta(seconds=60))
qs = qs.filter(user_documents__isnull=True, oauth2_tempfiles__isnull=True)
qs = qs.filter(user_documents__isnull=True)
for document in qs:
document.content.delete(False)
qs.delete()

View File

@ -1,73 +0,0 @@
# fargo - document box
# Copyright (C) 2016-2019 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from django.contrib import admin
from django.utils.translation import ugettext_lazy as _
from .models import OAuth2Authorize, OAuth2Client, OAuth2TempFile
class OAuth2ClientAdmin(admin.ModelAdmin):
fields = ('client_name', 'client_id', 'client_secret', 'redirect_uris')
list_display = ['client_name', 'client_id', 'client_secret', 'redirect_uris']
class OAuth2AuthorizeAdmin(admin.ModelAdmin):
list_display = [
'id',
'client_name',
'user_document',
'thumbnail',
'access_token',
'code',
'creation_date',
]
raw_id_fields = ['user_document']
search_fields = [
'client__client_name',
'user_document__user__email',
'user_document__user__first_name',
'user_document__user__last_name',
'user_document__filename',
'user_document__user__contenat_has',
]
def thumbnail(self, instance):
return instance.user_document.document.thumbnail_img_tag
thumbnail.short_description = _('thumbnail')
def client_name(self, instance):
return instance.client.client_name
class OAuth2TempFileAdmin(admin.ModelAdmin):
list_display = ['uuid', 'client_name', 'filename', 'thumbnail', 'creation_date']
raw_id_fields = ['document']
search_fields = ['filename', 'uuid', 'client__client_name']
def thumbnail(self, instance):
return instance.document.thumbnail_img_tag
thumbnail.short_description = _('thumbnail')
def client_name(self, instance):
return instance.client.client_name
admin.site.register(OAuth2Client, OAuth2ClientAdmin)
admin.site.register(OAuth2Authorize, OAuth2AuthorizeAdmin)
admin.site.register(OAuth2TempFile, OAuth2TempFileAdmin)

View File

@ -1,104 +0,0 @@
# fargo - document box
# Copyright (C) 2016-2019 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
import requests
from django.conf import settings
from django.utils.six.moves.urllib import parse as urlparse
from django.utils.translation import ugettext_lazy as _
from rest_framework.authentication import BasicAuthentication
from rest_framework.exceptions import AuthenticationFailed
from .models import OAuth2Client
class OAuth2User:
"""Fake user class to return in case OAuth2 Client authentication"""
def __init__(self, oauth2_client):
self.oauth2_client = oauth2_client
self.authenticated = False
def has_perm(self, *args, **kwargs):
return True
def has_perm_any(self, *args, **kwargs):
return True
def has_ou_perm(self, *args, **kwargs):
return True
def filter_by_perm(self, perms, queryset):
return queryset
def is_authenticated(self):
return self.authenticated
def is_staff(self):
return False
class FargoOAUTH2Authentication(BasicAuthentication):
def authenticate_through_idp(self, client_id, client_secret):
"""Check client_id and client_secret with configured IdP, and verify it is an OIDC
client.
"""
logger = logging.getLogger(__name__)
authentic_idp = getattr(settings, 'FARGO_IDP_URL', None)
if not authentic_idp:
logger.warning('idp check-password not configured')
return False, ''
url = urlparse.urljoin(authentic_idp, 'api/check-password/')
try:
response = requests.post(
url,
json={'username': client_id, 'password': client_secret},
auth=(client_id, client_secret),
verify=False,
)
response.raise_for_status()
except requests.RequestException as e:
logger.warning('idp check-password API failed: %s', e)
return False, 'idp is down'
try:
response = response.json()
except ValueError as e:
logger.warning('idp check-password API failed: %s, %r', e, response.content)
return False, 'idp is down'
if response.get('result') == 0:
logger.warning('idp check-password API failed')
return False, response.get('errors', [''])[0]
return True, None
def authenticate_credentials(self, client_id, client_secret, request=None):
try:
client = OAuth2Client.objects.get(client_id=client_id, client_secret=client_secret)
except OAuth2Client.DoesNotExist:
success, error = self.authenticate_through_idp(client_id, client_secret)
if not success:
raise AuthenticationFailed(error or _('Invalid client_id/client_secret.'))
client = OAuth2Client.objects.get(client_id=client_id)
client.client_secret = client_secret
client.save()
user = OAuth2User(client)
user.authenticated = True
return user, True

View File

@ -1,34 +0,0 @@
# fargo - document box
# Copyright (C) 2016-2019 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from django import forms
from django.utils.translation import ugettext_lazy as _
from fargo.fargo.models import UserDocument
class UserDocModelChoiceField(forms.ModelChoiceField):
def label_from_instance(self, obj):
return obj.filename
class OAuth2AuthorizeForm(forms.Form):
document = UserDocModelChoiceField(queryset=None)
def __init__(self, user, *args, **kwargs):
super().__init__(*args, **kwargs)
self.fields['document'].queryset = UserDocument.objects.filter(user=user)
self.fields['document'].label = _('Document')

View File

@ -1,40 +0,0 @@
# fargo - document box
# Copyright (C) 2016-2019 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from django.core.management.base import BaseCommand
from fargo.oauth2.models import OAuth2Client
class Command(BaseCommand):
help = 'Create an OAuth2 client'
def add_arguments(self, parser):
parser.add_argument('client_name')
parser.add_argument('redirect_uris')
parser.add_argument('--client-id', required=False, default=None)
parser.add_argument('--client-secret', required=False, default=None)
def handle(self, client_name, redirect_uris, client_id, client_secret, **options):
kwargs = {
'client_name': client_name,
'redirect_uris': redirect_uris,
}
if client_id:
kwargs['client_id'] = client_id
if client_secret:
kwargs['client_secret'] = client_secret
OAuth2Client.objects.create(**kwargs)

View File

@ -1,47 +0,0 @@
# fargo - document box
# Copyright (C) 2016-2019 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
from django.core.files.base import ContentFile
from django.core.management.base import BaseCommand
from django.urls import reverse
from fargo.fargo.models import Document
from fargo.oauth2.models import OAuth2Client, OAuth2TempFile
from fargo.utils import make_url
class Command(BaseCommand):
help = 'Push documents inside fargo, returns URLs'
def add_arguments(self, parser):
parser.add_argument('--client-id', type=int)
parser.add_argument('redirect_uri')
parser.add_argument('paths', nargs='+')
def handle(self, redirect_uri, paths, client_id, **options):
client = OAuth2Client.objects.get(id=client_id)
for path in paths:
with open(path, 'rb') as file_object:
filename = os.path.basename(path)
f = ContentFile(file_object.read(), name=filename)
document = Document.objects.get_by_file(f)
oauth2_document = OAuth2TempFile.objects.create(
client=client, document=document, filename=filename
)
uri = reverse('oauth2-put-document-authorize', args=[oauth2_document.pk])
self.stdout.write('https://localhost:8000' + make_url(uri, redirect_uri=redirect_uri))

View File

@ -1,60 +0,0 @@
from django.db import migrations, models
import fargo.oauth2.models
class Migration(migrations.Migration):
dependencies = [
('fargo', '0013_document_mime_type'),
]
operations = [
migrations.CreateModel(
name='OAuth2Authorize',
fields=[
(
'id',
models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True),
),
('access_token', models.CharField(default=fargo.oauth2.models.generate_uuid, max_length=255)),
('code', models.CharField(default=fargo.oauth2.models.generate_uuid, max_length=255)),
('creation_date', models.DateTimeField(auto_now=True)),
('user_document', models.ForeignKey(to='fargo.UserDocument', on_delete=models.CASCADE)),
],
),
migrations.CreateModel(
name='OAuth2Client',
fields=[
(
'id',
models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True),
),
(
'client_secret',
models.CharField(default=fargo.oauth2.models.generate_uuid, max_length=255),
),
('client_id', models.CharField(default=fargo.oauth2.models.generate_uuid, max_length=255)),
('client_name', models.CharField(max_length=255)),
(
'redirect_uris',
models.TextField(
verbose_name='redirect URIs', validators=[fargo.oauth2.models.validate_https_url]
),
),
],
),
migrations.CreateModel(
name='OAuth2TempFile',
fields=[
('hash_key', models.CharField(max_length=128, serialize=False, primary_key=True)),
('filename', models.CharField(max_length=512)),
(
'document',
models.ForeignKey(
to='fargo.Document', related_name='oauth2_tempfiles', on_delete=models.CASCADE
),
),
],
),
]

View File

@ -1,107 +0,0 @@
# Generated by Django 1.11.11 on 2018-03-31 13:36
import django.db.models.deletion
from django.db import migrations, models
import fargo.oauth2.models
class Migration(migrations.Migration):
replaces = [
('oauth2', '0001_initial'),
('oauth2', '0002_auto_20180321_2343'),
('oauth2', '0003_auto_20180322_1016'),
('oauth2', '0004_auto_20180326_1330'),
('oauth2', '0005_auto_20180331_1532'),
]
initial = True
dependencies = [
('fargo', '0013_document_mime_type'),
]
operations = [
migrations.CreateModel(
name='OAuth2Authorize',
fields=[
(
'id',
models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True),
),
('access_token', models.CharField(default=fargo.oauth2.models.generate_uuid, max_length=255)),
('code', models.CharField(default=fargo.oauth2.models.generate_uuid, max_length=255)),
('creation_date', models.DateTimeField(auto_now_add=True)),
],
options={
'ordering': ('creation_date',),
'verbose_name': 'OAUTH2 authorization',
'verbose_name_plural': 'OAUTH2 authorizations',
},
),
migrations.CreateModel(
name='OAuth2Client',
fields=[
(
'id',
models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True),
),
('client_name', models.CharField(max_length=255)),
(
'redirect_uris',
models.TextField(
verbose_name='redirect URIs', validators=[fargo.oauth2.models.validate_https_url]
),
),
('client_id', models.CharField(default=fargo.oauth2.models.generate_uuid, max_length=255)),
(
'client_secret',
models.CharField(default=fargo.oauth2.models.generate_uuid, max_length=255),
),
],
options={
'ordering': ('client_name',),
'verbose_name': 'OAUTH2 client',
'verbose_name_plural': 'OAUTH2 clients',
},
),
migrations.CreateModel(
name='OAuth2TempFile',
fields=[
(
'uuid',
models.CharField(
default=fargo.oauth2.models.generate_uuid,
max_length=32,
serialize=False,
primary_key=True,
),
),
('filename', models.CharField(max_length=512)),
('creation_date', models.DateTimeField(auto_now_add=True)),
('client', models.ForeignKey(to='oauth2.OAuth2Client', on_delete=models.CASCADE)),
(
'document',
models.ForeignKey(
related_name='oauth2_tempfiles', to='fargo.Document', on_delete=models.CASCADE
),
),
],
options={
'ordering': ('creation_date',),
'verbose_name': 'OAUTH2 temporary file',
'verbose_name_plural': 'OAUTH2 temporary files',
},
),
migrations.AddField(
model_name='oauth2authorize',
name='client',
field=models.ForeignKey(to='oauth2.OAuth2Client', on_delete=models.CASCADE),
),
migrations.AddField(
model_name='oauth2authorize',
name='user_document',
field=models.ForeignKey(to='fargo.UserDocument', on_delete=models.CASCADE),
),
]

View File

@ -1,42 +0,0 @@
# Generated by Django 1.11.11 on 2018-03-21 23:43
import django.db.models.deletion
from django.db import migrations, models
def delete_all_client_linked_models(apps, schema_editor):
OAuth2Authorize = apps.get_model('oauth2', 'OAuth2Authorize')
OAuth2TempFile = apps.get_model('oauth2', 'OAuth2TempFile')
OAuth2Authorize.objects.all().delete()
OAuth2TempFile.objects.all().delete()
def noop(apps, schema_editor):
pass
class Migration(migrations.Migration):
dependencies = [
('oauth2', '0001_initial'),
]
operations = [
migrations.RunPython(delete_all_client_linked_models, noop),
migrations.AddField(
model_name='oauth2authorize',
name='client',
field=models.ForeignKey(
default=1, on_delete=django.db.models.deletion.CASCADE, to='oauth2.OAuth2Client'
),
preserve_default=False,
),
migrations.AddField(
model_name='oauth2tempfile',
name='client',
field=models.ForeignKey(
default=1, on_delete=django.db.models.deletion.CASCADE, to='oauth2.OAuth2Client'
),
preserve_default=False,
),
]

View File

@ -1,31 +0,0 @@
# Generated by Django 1.11.11 on 2018-03-22 10:16
from django.db import migrations, models
import fargo.oauth2.models
class Migration(migrations.Migration):
dependencies = [
('oauth2', '0002_auto_20180321_2343'),
]
operations = [
migrations.RemoveField(
model_name='oauth2tempfile',
name='hash_key',
),
migrations.AddField(
model_name='oauth2tempfile',
name='creation_date',
field=models.DateTimeField(auto_now=True),
),
migrations.AddField(
model_name='oauth2tempfile',
name='uuid',
field=models.CharField(
default=fargo.oauth2.models.generate_uuid, max_length=32, primary_key=True, serialize=False
),
),
]

View File

@ -1,23 +0,0 @@
# Generated by Django 1.11.11 on 2018-03-26 13:30
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('oauth2', '0003_auto_20180322_1016'),
]
operations = [
migrations.AlterField(
model_name='oauth2authorize',
name='creation_date',
field=models.DateTimeField(auto_now_add=True),
),
migrations.AlterField(
model_name='oauth2tempfile',
name='creation_date',
field=models.DateTimeField(auto_now_add=True),
),
]

View File

@ -1,37 +0,0 @@
# Generated by Django 1.11.11 on 2018-03-31 13:32
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
('oauth2', '0004_auto_20180326_1330'),
]
operations = [
migrations.AlterModelOptions(
name='oauth2authorize',
options={
'ordering': ('creation_date',),
'verbose_name': 'OAUTH2 authorization',
'verbose_name_plural': 'OAUTH2 authorizations',
},
),
migrations.AlterModelOptions(
name='oauth2client',
options={
'ordering': ('client_name',),
'verbose_name': 'OAUTH2 client',
'verbose_name_plural': 'OAUTH2 clients',
},
),
migrations.AlterModelOptions(
name='oauth2tempfile',
options={
'ordering': ('creation_date',),
'verbose_name': 'OAUTH2 temporary file',
'verbose_name_plural': 'OAUTH2 temporary files',
},
),
]

View File

@ -1,120 +0,0 @@
# fargo - document box
# Copyright (C) 2016-2019 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import datetime
import uuid
from django.conf import settings
from django.core.exceptions import ValidationError
from django.core.validators import URLValidator
from django.db import models
from django.db.models.query import QuerySet
from django.utils.encoding import python_2_unicode_compatible
from django.utils.timezone import now
from django.utils.translation import ugettext_lazy as _
from fargo.fargo.models import Document, UserDocument
def generate_uuid():
return uuid.uuid4().hex
def validate_https_url(data):
errors = []
data = data.strip()
if not data:
return
for url in data.split():
try:
URLValidator(schemes=['http', 'https'])(url)
except ValidationError as e:
errors.append(e)
if errors:
raise ValidationError(errors)
@python_2_unicode_compatible
class OAuth2Client(models.Model):
client_name = models.CharField(max_length=255)
redirect_uris = models.TextField(verbose_name=_('redirect URIs'), validators=[validate_https_url])
client_id = models.CharField(max_length=255, default=generate_uuid)
client_secret = models.CharField(max_length=255, default=generate_uuid)
def __repr__(self):
return 'OAuth2Client name: %s with id: %s' % (self.client_name, self.client_id)
def get_redirect_uris(self):
return self.redirect_uris.split()
def check_redirect_uri(self, redirect_uri):
return redirect_uri in self.redirect_uris.strip().split()
def __str__(self):
return self.client_name
class Meta:
ordering = ('client_name',)
verbose_name = _('OAUTH2 client')
verbose_name_plural = _('OAUTH2 clients')
class CleanupQuerySet(QuerySet):
def cleanup(self, n=None):
n = n or now()
threshold = n - datetime.timedelta(seconds=2 * self.model.get_lifetime())
self.filter(creation_date__lt=threshold).delete()
class OAuth2Authorize(models.Model):
client = models.ForeignKey(OAuth2Client, on_delete=models.CASCADE)
user_document = models.ForeignKey(UserDocument, on_delete=models.CASCADE)
access_token = models.CharField(max_length=255, default=generate_uuid)
code = models.CharField(max_length=255, default=generate_uuid)
creation_date = models.DateTimeField(auto_now_add=True)
objects = CleanupQuerySet.as_manager()
class Meta:
ordering = ('creation_date',)
verbose_name = _('OAUTH2 authorization')
verbose_name_plural = _('OAUTH2 authorizations')
@classmethod
def get_lifetime(cls):
return max(settings.FARGO_CODE_LIFETIME, settings.FARGO_ACCESS_TOKEN_LIFETIME)
def __repr__(self):
return 'OAuth2Authorize for document %r' % self.user_document
class OAuth2TempFile(models.Model):
uuid = models.CharField(max_length=32, default=generate_uuid, primary_key=True)
client = models.ForeignKey(OAuth2Client, on_delete=models.CASCADE)
document = models.ForeignKey(Document, related_name='oauth2_tempfiles', on_delete=models.CASCADE)
filename = models.CharField(max_length=512)
creation_date = models.DateTimeField(auto_now_add=True)
objects = CleanupQuerySet.as_manager()
@classmethod
def get_lifetime(cls):
return settings.FARGO_OAUTH2_TEMPFILE_LIFETIME
class Meta:
ordering = ('creation_date',)
verbose_name = _('OAUTH2 temporary file')
verbose_name_plural = _('OAUTH2 temporary files')

View File

@ -1,35 +0,0 @@
# fargo - document box
# Copyright (C) 2016-2019 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from django.conf.urls import url
from .views import (
authorize_get_document,
authorize_put_document,
download_put_document,
get_document,
get_document_token,
put_document,
)
urlpatterns = [
url(r'get-document/authorize', authorize_get_document, name='oauth2-authorize'),
url(r'get-document/token', get_document_token, name='oauth2-get-token'),
url(r'get-document/', get_document, name='oauth2-get-document'),
url(r'put-document/$', put_document, name='oauth2-put-document'),
url(r'put-document/(?P<pk>\w+)/authorize/', authorize_put_document, name='oauth2-put-document-authorize'),
url(r'put-document/(?P<pk>\w+)/download/', download_put_document, name='oauth2-put-document-download'),
]

View File

@ -1,60 +0,0 @@
# fargo - document box
# Copyright (C) 2016-2019 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import cgi
from django.conf import settings
from django.utils import six
from django.utils.http import unquote
from django.utils.timezone import now
from .models import OAuth2Authorize
def authenticate_bearer(request):
authorization = request.META.get('HTTP_AUTHORIZATION')
if not authorization:
return False
splitted = authorization.split()
if len(splitted) < 2:
return False
if splitted[0] != 'Bearer':
return False
token = splitted[1]
try:
authorize = OAuth2Authorize.objects.get(access_token=token)
if (now() - authorize.creation_date).total_seconds() > settings.FARGO_ACCESS_TOKEN_LIFETIME:
return False
return authorize
except OAuth2Authorize.DoesNotExist:
return False
def get_content_disposition_value(request):
if 'HTTP_CONTENT_DISPOSITION' not in request.META:
return None, 'missing content-disposition header'
content_header = request.META['HTTP_CONTENT_DISPOSITION']
disposition_type, filename = cgi.parse_header(content_header)
if disposition_type != 'attachment':
return None, 'wrong disposition type: attachment expected'
if 'filename*' in filename:
encode, country, name = filename['filename*'].split("'")
return (unquote(name, encode), None)
elif 'filename' in filename:
return filename['filename'], None
else:
# no filename in header
return None, 'missing filename(*) parameter in header'

View File

@ -1,279 +0,0 @@
# fargo - document box
# Copyright (C) 2016-2019 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
from django.conf import settings
from django.contrib.auth.decorators import login_required
from django.core.files.base import ContentFile
from django.http import HttpResponse, HttpResponseBadRequest, HttpResponseRedirect
from django.shortcuts import get_object_or_404
from django.urls import reverse
from django.utils.http import quote
from django.utils.timezone import now
from django.utils.translation import ugettext as _
from django.views.decorators.csrf import csrf_exempt
from django.views.generic import FormView, TemplateView, View
from rest_framework.response import Response
from rest_framework.views import APIView
from fargo.fargo.models import Document, UserDocument
from fargo.utils import make_url
from .authentication import FargoOAUTH2Authentication
from .forms import OAuth2AuthorizeForm
from .models import OAuth2Authorize, OAuth2Client, OAuth2TempFile
from .utils import authenticate_bearer, get_content_disposition_value
logger = logging.getLogger(__name__)
class OAuth2Exception(Exception):
pass
class OAUTH2APIViewMixin(APIView):
http_method_names = ['post']
authentication_classes = (FargoOAUTH2Authentication,)
@csrf_exempt
def dispatch(self, request, *args, **kwargs):
return super().dispatch(request, *args, **kwargs)
class OAuth2AuthorizeView(FormView):
template_name = 'fargo/oauth2/authorize.html'
form_class = OAuth2AuthorizeForm
success_url = '/'
def redirect(self, **kwargs):
'''Return to requester'''
return HttpResponseRedirect(make_url(self.redirect_uri, **kwargs))
def dispatch(self, request):
self.redirect_uri = request.GET.get('redirect_uri')
if not self.redirect_uri:
return HttpResponseBadRequest('missing redirect_uri parameter')
client_id = request.GET.get('client_id')
response_type = request.GET.get('response_type')
if not client_id or not response_type:
return self.redirect(error='invalid_request')
if response_type != 'code':
return self.redirect(error='unsupported_response_type')
try:
self.client = OAuth2Client.objects.get(client_id=client_id)
if not self.client.check_redirect_uri(self.redirect_uri):
return self.redirect(error='invalid_redirect_uri')
except OAuth2Client.DoesNotExist:
return self.redirect(error='unauthorized_client')
self.state = request.GET.get('state', None)
return super().dispatch(request)
def post(self, request):
if 'cancel' in request.POST:
return self.redirect(error='access_denied')
return super().post(request)
def get_form_kwargs(self):
kwargs = super().get_form_kwargs()
kwargs['user'] = self.request.user
return kwargs
def form_valid(self, form):
document = form.cleaned_data['document']
authorization = OAuth2Authorize.objects.create(client=self.client, user_document=document)
logger.info(
'user %s authorized client "%s" to get document "%s" (%s) with code "%s"',
self.request.user,
self.client,
document,
document.pk,
authorization.code,
)
return self.redirect(code=authorization.code, state=self.state)
def get_context_data(self, **kwargs):
kwargs['oauth2_client'] = self.client
return super().get_context_data(**kwargs)
authorize_get_document = login_required(OAuth2AuthorizeView.as_view())
class GetDocumentTokenView(OAUTH2APIViewMixin):
def error(self, error, description=None):
data = {
'error': error,
}
if description:
data['error_description'] = description
return Response(data, status=400)
def post(self, request):
if request.data['grant_type'] != 'authorization_code':
return self.error('unsupported_grant_type')
try:
authorize = OAuth2Authorize.objects.get(code=request.data['code'])
except OAuth2Authorize.DoesNotExist:
return self.error('invalid_grant', 'code is unknown')
if (now() - authorize.creation_date).total_seconds() > settings.FARGO_CODE_LIFETIME:
return self.error('invalid_grant', 'code is expired')
logger.info(
'client "%s" resolved code "%s" to access token "%s"',
request.user.oauth2_client,
authorize.code,
authorize.access_token,
)
return Response(
{'access_token': authorize.access_token, 'expires': settings.FARGO_ACCESS_TOKEN_LIFETIME}
)
get_document_token = GetDocumentTokenView.as_view()
def document_response(user_document):
response = HttpResponse(
content=user_document.document.content.chunks(), status=200, content_type='application/octet-stream'
)
filename = user_document.filename
ascii_filename = filename.encode('ascii', 'replace').decode()
percent_encoded_filename = quote(filename.encode('utf8'), safe='')
response['Content-Disposition'] = 'attachment; filename="%s"; filename*=UTF-8\'\'%s' % (
ascii_filename,
percent_encoded_filename,
)
return response
def get_document(request):
oauth_authorize = authenticate_bearer(request)
if not oauth_authorize:
return HttpResponseBadRequest('http bearer authentication failed: invalid authorization header')
user_document = oauth_authorize.user_document
logger.info(
'client "%s" retrieved document "%s" (%s) with access token "%s"',
oauth_authorize.client,
user_document,
user_document.pk,
oauth_authorize.access_token,
)
return document_response(user_document)
class PutDocumentAPIView(OAUTH2APIViewMixin):
def post(self, request, *args, **kwargs):
filename, error = get_content_disposition_value(request)
if error:
return HttpResponseBadRequest(error)
f = ContentFile(request.body, name=filename)
document = Document.objects.get_by_file(f)
oauth2_document = OAuth2TempFile.objects.create(
client=request.user.oauth2_client, document=document, filename=filename
)
uri = reverse('oauth2-put-document-authorize', args=[oauth2_document.pk])
response = Response()
response['Location'] = uri
logger.info(
'client "%s" uploaded document "%s" (%s)',
request.user.oauth2_client,
filename,
oauth2_document.pk,
)
return response
put_document = PutDocumentAPIView.as_view()
class OAuth2AuthorizePutView(TemplateView):
template_name = 'fargo/oauth2/confirm.html'
def redirect(self, **kwargs):
'''Return to requester'''
return HttpResponseRedirect(make_url(self.redirect_uri, **kwargs))
def dispatch(self, request, *args, **kwargs):
self.redirect_uri = request.GET.get('redirect_uri', '')
if not self.redirect_uri:
return HttpResponseBadRequest('missing redirect_uri parameter')
self.oauth2_document = OAuth2TempFile.objects.filter(pk=kwargs['pk']).first()
return super().dispatch(request)
def get_context_data(self, **kwargs):
if self.oauth2_document:
kwargs['oauth2_document'] = self.oauth2_document
kwargs['filename'] = self.oauth2_document.filename
kwargs['thumbnail_image'] = self.oauth2_document.document.thumbnail_image
kwargs['oauth2_client'] = self.oauth2_document.client
kwargs['download_url'] = reverse(
'oauth2-put-document-download', kwargs={'pk': self.oauth2_document.pk}
)
# verify if document already exists
if not UserDocument.objects.filter(
user=self.request.user, document=self.oauth2_document.document
).exists():
kwargs['error_message'] = ''
else:
kwargs['error_message'] = _('This document is already in your portfolio')
kwargs['redirect_uri'] = self.request.GET['redirect_uri']
else:
kwargs['error_message'] = _('The document has not been uploaded')
kwargs['redirect_uri'] = self.request.GET['redirect_uri']
return super().get_context_data(**kwargs)
def post(self, request):
if not self.oauth2_document:
return self.get(request)
try:
if 'cancel' in request.POST:
return self.redirect(error='access_denied')
UserDocument.objects.create(
user=request.user,
document=self.oauth2_document.document,
filename=self.oauth2_document.filename,
)
logger.info(
'user %s accepted document "%s" (%s) from client "%s"',
request.user,
self.oauth2_document.filename,
self.oauth2_document.pk,
self.oauth2_document.client,
)
return self.redirect()
finally:
self.oauth2_document.delete()
authorize_put_document = login_required(OAuth2AuthorizePutView.as_view())
class DownloadPutDocument(View):
def get(self, request, *args, **kwargs):
oauth2_document = get_object_or_404(OAuth2TempFile, pk=kwargs['pk'])
return document_response(oauth2_document)
download_put_document = login_required(DownloadPutDocument.as_view())

View File

@ -55,8 +55,6 @@ INSTALLED_APPS = (
'django_tables2',
'gadjo',
'fargo.fargo',
'rest_framework',
'fargo.oauth2',
'sorl.thumbnail',
)
@ -258,7 +256,6 @@ THUMBNAIL_FORCE_OVERWRITE = False
FARGO_CODE_LIFETIME = 300
FARGO_ACCESS_TOKEN_LIFETIME = 3600
FARGO_OAUTH2_TEMPFILE_LIFETIME = 86400
local_settings_file = os.environ.get(
'FARGO_SETTINGS_FILE', os.path.join(os.path.dirname(__file__), 'local_settings.py')

View File

@ -1,22 +0,0 @@
{% extends "fargo/base.html" %}
{% load i18n %}
{% block content %}
<div id="fargo-oauth2-authorize">
{% block form-intro %}
{% blocktrans %}
<p>The service {{ oauth2_client }} want to get one of your documents.</p>
{% endblocktrans %}
{% endblock %}
{% block form %}
<form method="post" enctype="multipart/form-data">
{% csrf_token %}
{{ form.as_p }}
<div class="buttons">
<button name="submit">{% trans "Choose" %}</button>
<button name="cancel">{% trans "Cancel" %}</button>
</div>
</form>
{% endblock %}
</div>
{% endblock %}

View File

@ -1,33 +0,0 @@
{% extends "fargo/base.html" %}
{% load i18n %}
{% block content %}
<div id="fargo-oauth2-confirm">
{% if oauth2_document %}
{% block form-intro %}
<p>
{% blocktrans %}
The service {{ oauth2_client }} want to add the document "<a href="{{ download_url }}"><em class="filename">{{ filename }}</em></a>" to your portfolio.
{% endblocktrans %}
</p>
{% if thumbnail %}<p class="fargo-thumbnail"><img src="{{ thumbnail.src }}" height="{{ thumbnail.height }}" width="{{ thumbnail.width }}"/></p>{% endif %}
{% endblock %}
{% endif %}
{% if error_message %}
{% block error-message %}
<p>{% trans error_message %}</p>
{% endblock %}
{% endif %}
{% block form %}
<form id="send-file" method="post">
{% csrf_token %}
<div class="buttons">
{% if not error_message %}
<button name="submit">{% trans "Allow" %}</button>
{% endif %}
<button name="cancel">{% trans "Cancel" %}</button>
</div>
</form>
{% endblock %}
</div>
{% endblock %}

View File

@ -55,7 +55,6 @@ urlpatterns = [
url(r'^api/documents/push/$', push_document, name='fargo-api-push-document'),
url(r'^api/documents/recently-added/$', recent_documents),
url(r'^api/', include(router.urls)),
url(r'^api/', include('fargo.oauth2.urls')),
]
if settings.DEBUG and 'debug_toolbar' in settings.INSTALLED_APPS:

View File

@ -100,8 +100,8 @@ def admin_user(db):
@pytest.fixture
def document():
with open('tests/test_oauth2.txt', 'rb') as f:
content = ContentFile(f.read(), 'test_oauth2.txt')
with open('tests/test_file.txt', 'rb') as f:
content = ContentFile(f.read(), 'test_file.txt')
return Document.objects.get_by_file(content)

View File

@ -21,46 +21,30 @@ from django.core.files.base import ContentFile
from django.core.management import call_command
from fargo.fargo.models import Document, UserDocument
from fargo.oauth2.models import OAuth2Client, OAuth2TempFile
def test_cleanup(freezer, john_doe):
start = freezer()
client = OAuth2Client.objects.create(client_name='c', redirect_uris='')
foo = Document.objects.create(content=ContentFile(b'foo', name='foo.txt'))
bar = Document.objects.create(content=ContentFile(b'bar', name='bar.txt'))
UserDocument.objects.create(user=john_doe, document=foo, filename='foo.txt', title='', description='')
OAuth2TempFile.objects.create(document=bar, client=client, filename='bar.txt')
call_command('fargo-cleanup')
assert UserDocument.objects.all().count()
assert OAuth2TempFile.objects.all().count()
assert Document.objects.all().count() == 2
assert Document.objects.all().count() == 1
User.objects.all().delete()
assert not UserDocument.objects.all().count()
assert Document.objects.all().count() == 2
assert Document.objects.all().count() == 1
call_command('fargo-cleanup')
assert Document.objects.all().count() == 2
freezer.move_to(start + datetime.timedelta(seconds=120))
call_command('fargo-cleanup')
assert Document.objects.all().count() == 1
freezer.move_to(start + datetime.timedelta(days=3))
call_command('fargo-cleanup')
assert not OAuth2TempFile.objects.count()
assert Document.objects.count()
call_command('fargo-cleanup')
assert not Document.objects.count()

6
tests/test_file.txt Normal file
View File

@ -0,0 +1,6 @@
Lorem ipsum dolor sit amet, atqui animal constituto sit no, pri liber mandamus
ea, usu no duis etiam copiosae. Ius liber scripserit at, nam nisl nonumes ne.
Ut vidit clita possim eum, eos eu melius perfecto. Ne ius intellegam
reformidans, pri repudiare conceptam definitiones cu, duo tota bonorum no.
Lorem omnesque principes in ius, facilis erroribus cu usu. Eum liber homero
qualisque id, cu pri illum consetetur.

View File

@ -1,265 +0,0 @@
# fargo - document box
# Copyright (C) 2016-2019 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import json
import os
from unittest import mock
import pytest
from django.core.management import call_command
from django.urls import reverse
from django.utils.http import quote, urlencode
from django.utils.six.moves.urllib import parse as urlparse
from test_manager import login
from fargo.fargo.models import UserDocument
from fargo.oauth2.models import OAuth2Authorize, OAuth2Client, OAuth2TempFile
pytestmark = pytest.mark.django_db
class FakedResponse(mock.Mock):
def json(self):
return json.loads(self.content)
@pytest.fixture
def oauth2_client():
return OAuth2Client.objects.create(
client_name='test_oauth2',
client_id='client-id',
client_secret='client-secret',
redirect_uris='https://example.net/document https://doc.example.net/ https://example.com',
)
def assert_error_redirect(url, error):
assert urlparse.urlparse(url).query == 'error=%s' % error
def test_get_document_oauth2(app, john_doe, oauth2_client, user_doc):
login(app, user=john_doe)
url = reverse('oauth2-authorize')
params = {'client_secret': oauth2_client.client_secret, 'response_type': 'code', 'state': 'achipeachope'}
# test missing redirect_uri
resp = app.get(url, params={}, status=400)
assert resp.text == 'missing redirect_uri parameter'
# test missing client id
params['redirect_uri'] = 'https://toto.example.com'
resp = app.get(url, params=params, status=302)
assert_error_redirect(resp.url, 'invalid_request')
# test invalid response type
params['client_id'] = oauth2_client.client_id
params['response_type'] = 'token'
resp = app.get(url, params=params, status=302)
assert_error_redirect(resp.url, 'unsupported_response_type')
# test invalid redirect uri
params['response_type'] = 'code'
resp = app.get(url, params=params, status=302)
assert_error_redirect(resp.url, 'invalid_redirect_uri')
params['redirect_uri'] = 'https://example.com'
resp = app.get(url, params=params)
assert resp.status_code == 200
assert len(resp.form['document'].options) == 2
options = resp.form['document'].options
assert 'éléphant.txt' in options[1]
# select the second document 'éléphant.txt'
resp.form['document'].select(options[1][0])
resp = resp.form.submit()
# check that the authorization has been registered for the user document
assert len(OAuth2Authorize.objects.filter(user_document__user=john_doe)) == 1
auth = OAuth2Authorize.objects.filter(user_document__user=john_doe)[0]
assert resp.status_code == 302
query = urlparse.urlparse(resp.location).query
assert [auth.code] == urlparse.parse_qs(query)['code']
assert ['achipeachope'] == urlparse.parse_qs(query)['state']
params.pop('response_type')
params.pop('state')
params['grant_type'] = 'authorization_code'
params['code'] = auth.code
url = reverse('oauth2-get-token')
app.authorization = ('Basic', (oauth2_client.client_id, oauth2_client.client_secret))
resp = app.post(url, params=params, status=200)
assert 'access_token' in resp.json
assert 'expires' in resp.json
assert resp.json['access_token'] == auth.access_token
url = reverse('oauth2-get-document')
app.authorization = ('Bearer', str(auth.access_token))
resp = app.get(url, status=200)
assert resp.content_type == 'application/octet-stream'
assert 'Content-disposition' in resp.headers
content_disposition = resp.content_disposition.replace(' ', '').split(';')
assert content_disposition[0] == 'attachment'
assert content_disposition[1] == 'filename="?l?phant.txt"'
assert content_disposition[2] == 'filename*=UTF-8\'\'%C3%A9l%C3%A9phant.txt'
def test_put_document(app, john_doe, oauth2_client):
login(app, user=john_doe)
with open('tests/test_oauth2.txt', 'rb') as f:
data = f.read()
url = reverse('oauth2-put-document')
resp = app.post(url, params=data, status=401)
app.authorization = ('Basic', (str(oauth2_client.client_id), str(oauth2_client.client_secret)))
resp = app.post(url, params=data, status=400)
assert 'missing content-disposition header' in resp.text
filename = 'éléphant.txt'
percent_encode_filename = quote(filename, safe='')
headers = {
'Content-disposition': 'attachment; filename="%s"; filename*=UTF-8\'\'%s'
% (filename, percent_encode_filename)
}
assert len(OAuth2TempFile.objects.all()) == 0
resp = app.post(url, params=data, headers=headers, status=200)
# test that we can still push the same document
resp = app.post(url, params=data, headers=headers, status=200)
assert len(OAuth2TempFile.objects.all()) == 2
doc = OAuth2TempFile.objects.latest('creation_date')
location = reverse('oauth2-put-document-authorize', kwargs={'pk': doc.pk})
assert location in resp.location
app.authorization = None
url = location + '?%s' % urlencode({'redirect_uri': 'https://example.com'})
resp = app.get(url, status=200)
assert OAuth2TempFile.objects.count() == 2
assert UserDocument.objects.count() == 0
resp = resp.form.submit()
assert resp.status_code == 302
assert resp.location == 'https://example.com'
assert OAuth2TempFile.objects.count() == 1
assert UserDocument.objects.count() == 1
assert OAuth2TempFile.objects.get().document == UserDocument.objects.get().document
assert UserDocument.objects.filter(user=john_doe, document=doc.document, filename='éléphant.txt').exists()
def test_confirm_put_document_file_exception(app, oauth2_client, john_doe, user_doc):
login(app, user=john_doe)
oauth_tmp_file = OAuth2TempFile.objects.create(
client=oauth2_client, document=user_doc.document, filename=user_doc.filename
)
url = reverse('oauth2-put-document-authorize', kwargs={'pk': 'fakemofo'})
url += '?%s' % urlencode({'redirect_uri': 'https://example.com'})
resp = app.get(url)
assert 'The document has not been uploaded' in resp.text
url = reverse('oauth2-put-document-authorize', kwargs={'pk': oauth_tmp_file.pk})
url += '?%s' % urlencode({'redirect_uri': 'https://example.com'})
resp = app.get(url)
assert 'This document is already in your portfolio' in resp.text
@mock.patch('fargo.oauth2.authentication.requests.post')
def test_idp_authentication(mocked_post, settings, app, oauth2_client, john_doe, user_doc):
login(app, user=john_doe)
url = reverse('oauth2-authorize')
params = {
'client_id': oauth2_client.client_id,
'client_secret': 'fake',
'response_type': 'code',
'state': 'achipeachope',
'redirect': 'https://example.com/',
}
params['redirect_uri'] = 'https://example.com'
resp = app.get(url, params=params)
options = resp.form['document'].options
assert 'éléphant.txt' in options[1]
resp.form['document'].select(options[1][0])
resp = resp.form.submit()
auth = OAuth2Authorize.objects.filter(user_document__user=john_doe)[0]
params.pop('response_type')
params.pop('state')
params['grant_type'] = 'authorization_code'
params['code'] = auth.code
url = reverse('oauth2-get-token')
# when remote remote idp not set
app.authorization = ('Basic', ('client-id', 'fake'))
resp = app.post(url, params=params, status=401)
resp.json['detail'] == 'Invalid client_id/client_secret.'
# when remote idp fails to authenticate rp
settings.FARGO_IDP_URL = 'https://idp.example.org'
response = {"result": 0, "errors": ["Invalid username/password."]}
mocked_post.return_value = FakedResponse(content=json.dumps(response))
resp = app.post(url, params=params, status=401)
resp.json['detail'] == 'Invalid client_id/client_secret.'
# when remote idp authenticates rp
response = {"result": 1, "errors": []}
mocked_post.return_value = FakedResponse(content=json.dumps(response))
resp = app.post(url, params=params, status=200)
assert resp.json['access_token'] == auth.access_token
url = reverse('oauth2-get-document')
app.authorization = ('Bearer', str(auth.access_token))
resp = app.get(url, status=200)
def test_command_create_client(db):
call_command('oauth2-create-client', 'test', 'https://example.com/')
client = OAuth2Client.objects.get()
assert client.client_name == 'test'
assert client.redirect_uris == 'https://example.com/'
assert client.client_id
assert client.client_secret
OAuth2Client.objects.all().delete()
call_command(
'oauth2-create-client', 'test', 'https://example.com/', '--client-id=wtf', '--client-secret=whocares'
)
client = OAuth2Client.objects.get()
assert client.client_name == 'test'
assert client.redirect_uris == 'https://example.com/'
assert client.client_id == 'wtf'
assert client.client_secret == 'whocares'
def test_command_put_document(db, capsys, app, john_doe):
call_command('oauth2-create-client', 'test', 'https://example.com/')
client = OAuth2Client.objects.get()
path = os.path.join(os.path.dirname(__file__), 'pdf-sample.pdf')
redirect_uri = 'https://example.com/'
call_command('oauth2-put-document', '--client-id=%s' % client.pk, redirect_uri, path)
out, err = capsys.readouterr()
assert err == ''
url = out.strip()
response = app.get(url).follow()
response.form.set('username', john_doe.username)
response.form.set('password', john_doe.username)
response = response.form.submit().follow()
assert 'pdf-sample.pdf' in response
temp_file = OAuth2TempFile.objects.get()
assert temp_file.uuid in response
response = response.form.submit('accept')
assert response['Location'] == redirect_uri
assert UserDocument.objects.filter(user=john_doe, document=temp_file.document).exists()
assert OAuth2TempFile.objects.count() == 0

View File

@ -1,36 +0,0 @@
Poème hymne à la beauté du recueil les fleurs du mal de Charles Baudelaire
Viens-tu du ciel profond ou sors-tu de l'abîme,
Ô Beauté ! ton regard, infernal et divin,
Verse confusément le bienfait et le crime,
Et l'on peut pour cela te comparer au vin.
Tu contiens dans ton oeil le couchant et l'aurore ;
Tu répands des parfums comme un soir orageux ;
Tes baisers sont un philtre et ta bouche une amphore
Qui font le héros lâche et l'enfant courageux.
Sors-tu du gouffre noir ou descends-tu des astres ?
Le Destin charmé suit tes jupons comme un chien ;
Tu sèmes au hasard la joie et les désastres,
Et tu gouvernes tout et ne réponds de rien.
Tu marches sur des morts, Beauté, dont tu te moques ;
De tes bijoux l'Horreur n'est pas le moins charmant,
Et le Meurtre, parmi tes plus chères breloques,
Sur ton ventre orgueilleux danse amoureusement.
L'éphémère ébloui vole vers toi, chandelle,
Crépite, flambe et dit : Bénissons ce flambeau !
L'amoureux pantelant incliné sur sa belle
A l'air d'un moribond caressant son tombeau.
Que tu viennes du ciel ou de l'enfer, qu'importe,
Ô Beauté ! monstre énorme, effrayant, ingénu !
Si ton oeil, ton souris, ton pied, m'ouvrent la porte
D'un Infini que j'aime et n'ai jamais connu ?
De Satan ou de Dieu, qu'importe ? Ange ou Sirène,
Qu'importe, si tu rends, - fée aux yeux de velours,
Rythme, parfum, lueur, ô mon unique reine ! -
L'univers moins hideux et les instants moins lourds ?