authentic/src/authentic2/middleware.py

243 lines
9.3 KiB
Python

# authentic2 - versatile identity manager
# Copyright (C) 2010-2019 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import time
try:
import threading
except ImportError:
threading = None
from django.conf import settings
from django.contrib import messages
from django.utils.deprecation import MiddlewareMixin
from django.utils.functional import SimpleLazyObject
from django.utils.translation import ugettext as _
from django.utils.six.moves.urllib import parse as urlparse
from django.shortcuts import render
from django import http
from . import app_settings, utils, plugins
from .utils.service import get_service_from_request, get_service_from_session
class CollectIPMiddleware(MiddlewareMixin):
def process_response(self, request, response):
# only collect IP if session is used
if not hasattr(request, 'session') or request.session.is_empty():
return response
ips = set(request.session.setdefault('ips', []))
ip = request.META.get('REMOTE_ADDR', None)
if ip and ip not in ips:
ips.add(ip)
request.session['ips'] = list(ips)
request.session.modified = True
return response
class OpenedSessionCookieMiddleware(MiddlewareMixin):
def process_response(self, request, response):
# do not emit cookie for API requests
if request.path.startswith('/api/'):
return response
if not app_settings.A2_OPENED_SESSION_COOKIE_DOMAIN:
return response
name = app_settings.A2_OPENED_SESSION_COOKIE_NAME
if app_settings.A2_OPENED_SESSION_COOKIE_DOMAIN == 'parent':
domain = request.get_host().split('.', 1)[1]
else:
domain = app_settings.A2_OPENED_SESSION_COOKIE_DOMAIN
if hasattr(request, 'user') and request.user.is_authenticated:
response.set_cookie(name, value='1', max_age=None, domain=domain,
secure=app_settings.A2_OPENED_SESSION_COOKIE_SECURE)
elif app_settings.A2_OPENED_SESSION_COOKIE_NAME in request.COOKIES:
response.delete_cookie(name, domain=domain)
return response
class RequestIdMiddleware(MiddlewareMixin):
def process_request(self, request):
if not hasattr(request, 'request_id'):
request_id_header = getattr(settings, 'REQUEST_ID_HEADER', None)
if request_id_header and request.META.get(request_id_header):
request.request_id = request.META[request_id_header]
else:
request.request_id = 'r:' + hex(id(request))[2:].upper()
class StoreRequestMiddleware(MiddlewareMixin):
collection = {}
def process_request(self, request):
StoreRequestMiddleware.collection[threading.currentThread()] = request
def process_response(self, request, response):
StoreRequestMiddleware.collection.pop(threading.currentThread(), None)
return response
def process_exception(self, request, exception):
StoreRequestMiddleware.collection.pop(threading.currentThread(), None)
@classmethod
def get_request(cls):
return cls.collection.get(threading.currentThread())
class ViewRestrictionMiddleware(MiddlewareMixin):
RESTRICTION_SESSION_KEY = 'view-restriction'
def check_view_restrictions(self, request):
'''Check if a restriction on accessible views must be applied'''
from django.db.models import Model
from .models import PasswordReset
user = request.user
b = user.is_authenticated
if b and isinstance(user, Model):
now = time.time()
last_time = request.session.get('last_password_reset_check', 0)
if now - last_time > 10:
if PasswordReset.objects.filter(user=request.user).exists():
return 'password_change'
request.session['last_password_reset_check'] = now
for plugin in plugins.get_plugins():
if hasattr(plugin, 'check_view_restrictions'):
view = plugin.check_view_restrictions(request)
if view:
return view
def process_view(self, request, view_func, view_args, view_kwargs):
'''If current view is not the one we should be, redirect'''
view = self.check_view_restrictions(request)
if not view or request.resolver_match.url_name in (view, 'auth_logout'):
return
if view == 'password_change':
messages.warning(request, _('You must change your password to continue'))
return utils.redirect_and_come_back(request, view)
class XForwardedForMiddleware(MiddlewareMixin):
'''Copy the first address from X-Forwarded-For header to the REMOTE_ADDR meta.
This middleware should only be used if you are sure the header cannot be
forged (behind a reverse proxy for example).'''
def process_request(self, request):
if 'HTTP_X_FORWARDED_FOR' in request.META:
request.META['REMOTE_ADDR'] = request.META['HTTP_X_FORWARDED_FOR'].split(",")[0].strip()
return None
class DisplayMessageBeforeRedirectMiddleware(MiddlewareMixin):
'''Verify if messages are currently stored and if there is a redirection to another domain, in
this case show an intermediate page.
'''
def process_response(self, request, response):
# Check if response is a redirection
if response.status_code not in (301, 302, 303, 307, 308):
return response
# Check if location is to another domain
url = response['Location']
if not url:
return response
if not getattr(response, 'display_message', True):
return response
parsed_url = urlparse.urlparse(url)
if not parsed_url.scheme and not parsed_url.netloc:
return response
parsed_request_url = urlparse.urlparse(request.build_absolute_uri())
if (parsed_request_url.scheme == parsed_url.scheme or not parsed_url.scheme) and \
(parsed_request_url.netloc == parsed_url.netloc):
return response
# Check if there is some messages to show
storage = messages.get_messages(request)
if not storage:
return response
only_info = True
some_message = False
for message in storage:
some_message = True
if message.level != messages.INFO:
# If there are warnin or error messages, the intermediate page must not redirect
# automatically but should ask for an user confirmation
only_info = False
storage.used = False
if not some_message:
return response
return render(request, 'authentic2/display_message_and_continue.html',
{'url': url, 'only_info': only_info})
class ServiceAccessControlMiddleware(MiddlewareMixin):
def process_exception(self, request, exception):
if not isinstance(exception, (utils.ServiceAccessDenied,)):
return None
return utils.unauthorized_view(request, exception.service)
class CookieTestMiddleware(MiddlewareMixin):
COOKIE_NAME = 'cookie-test'
@classmethod
def check(cls, request):
return cls.COOKIE_NAME in request.COOKIES
def process_response(self, request, response):
if not self.check(request):
# set test cookie for 1 year
response.set_cookie(self.COOKIE_NAME, '1', max_age=365 * 24 * 3600)
return response
class SaveServiceInSessionMiddleware:
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
service = get_service_from_request(request)
if service:
request.session['service_pk'] = service.pk
request.service = SimpleLazyObject(lambda: get_service_from_session(request))
return self.get_response(request)
def journal_middleware(get_response):
from . import journal
def middleware(request):
request.journal = journal.Journal(request=request)
return get_response(request)
return middleware
def null_character_middleware(get_response):
def middleware(request):
def check_query_dict(qd):
for key in qd:
for value in qd.getlist(key):
if '\0' in value:
return False
return True
if not check_query_dict(request.GET):
return http.HttpResponseBadRequest('null character in query string')
if request.content_type == 'application/x-www-form-urlencoded':
if not check_query_dict(request.POST):
return http.HttpResponseBadRequest('null character in form data')
return get_response(request)
return middleware