authentic/src/authentic2/middleware.py

274 lines
9.6 KiB
Python

# authentic2 - versatile identity manager
# Copyright (C) 2010-2019 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import time
try:
import threading
except ImportError:
threading = None
import urllib.parse
from django import http
from django.conf import settings
from django.contrib import messages
from django.db.models import Model
from django.utils.deprecation import MiddlewareMixin
from django.utils.functional import SimpleLazyObject
from django.utils.translation import ugettext as _
from . import app_settings, plugins
from .utils import misc as utils_misc
from .utils.service import get_service_from_request, get_service_from_session
class CollectIPMiddleware(MiddlewareMixin):
def process_response(self, request, response):
# only collect IP if session is used
if not hasattr(request, 'session') or request.session.is_empty():
return response
ips = set(request.session.setdefault('ips', []))
ip = request.META.get('REMOTE_ADDR', None)
if ip and ip not in ips:
ips.add(ip)
request.session['ips'] = list(ips)
request.session.modified = True
return response
class OpenedSessionCookieMiddleware(MiddlewareMixin):
def process_response(self, request, response):
# do not emit cookie for API requests
if request.path.startswith('/api/'):
return response
if not app_settings.A2_OPENED_SESSION_COOKIE_DOMAIN:
return response
name = app_settings.A2_OPENED_SESSION_COOKIE_NAME
if app_settings.A2_OPENED_SESSION_COOKIE_DOMAIN == 'parent':
domain = request.get_host().split('.', 1)[1]
else:
domain = app_settings.A2_OPENED_SESSION_COOKIE_DOMAIN
if hasattr(request, 'user') and request.user.is_authenticated:
response.set_cookie(
name,
value='1',
max_age=None,
domain=domain,
secure=app_settings.A2_OPENED_SESSION_COOKIE_SECURE,
)
elif app_settings.A2_OPENED_SESSION_COOKIE_NAME in request.COOKIES:
response.delete_cookie(name, domain=domain)
return response
class RequestIdMiddleware(MiddlewareMixin):
def process_request(self, request):
if not hasattr(request, 'request_id'):
request_id_header = getattr(settings, 'REQUEST_ID_HEADER', None)
if request_id_header and request.META.get(request_id_header):
request.request_id = request.META[request_id_header]
else:
request.request_id = 'r:' + hex(id(request))[2:].upper()
class StoreRequestMiddleware(MiddlewareMixin):
collection = {}
def process_request(self, request):
StoreRequestMiddleware.collection[threading.currentThread()] = request
def process_response(self, request, response):
StoreRequestMiddleware.collection.pop(threading.currentThread(), None)
return response
def process_exception(self, request, exception):
StoreRequestMiddleware.collection.pop(threading.currentThread(), None)
@classmethod
def get_request(cls):
return cls.collection.get(threading.currentThread())
class ViewRestrictionMiddleware(MiddlewareMixin):
RESTRICTION_SESSION_KEY = 'view-restriction'
def check_view_restrictions(self, request):
'''Check if a restriction on accessible views must be applied'''
user = request.user
# If the session is unlogged, do nothing
if user is None or not user.is_authenticated:
return None
# If the latest check was succesfull, do nothing.
now = time.time()
last_time = request.session.get('last_view_restriction_check', 0)
if now - last_time <= 60:
return None
view = self.check_password_reset_view_restriction(request, user)
if view:
return view
for plugin in plugins.get_plugins():
if hasattr(plugin, 'check_view_restrictions'):
view = plugin.check_view_restrictions(request, user)
if view:
return view
# do not check for 60 seconds
request.session['last_password_reset_check'] = now
return None
def check_password_reset_view_restriction(self, request, user):
# If user is authenticated and a password_reset_flag is set, force
# redirect to password change and show a message.
from . import models
if (
user.is_authenticated
and isinstance(user, Model)
and models.PasswordReset.objects.filter(user=request.user).exists()
):
if request.resolver_match.url_name != 'password_change':
messages.warning(request, _('You must change your password to continue'))
return 'password_change'
def process_view(self, request, view_func, view_args, view_kwargs):
'''If current view is not the one where we should be, redirect'''
view = self.check_view_restrictions(request)
if not view:
return
url_name = request.resolver_match.url_name
# do not block on the restricted view
if url_name == view:
return
# prevent blocking people when they logout
if url_name == 'auth_logout':
return
return utils_misc.redirect_and_come_back(request, view)
class XForwardedForMiddleware(MiddlewareMixin):
"""Copy the first address from X-Forwarded-For header to the REMOTE_ADDR meta.
This middleware should only be used if you are sure the header cannot be
forged (behind a reverse proxy for example)."""
def process_request(self, request):
if 'HTTP_X_FORWARDED_FOR' in request.META:
request.META['REMOTE_ADDR'] = request.META['HTTP_X_FORWARDED_FOR'].split(",")[0].strip()
return None
class DisplayMessageBeforeRedirectMiddleware(MiddlewareMixin):
"""Verify if messages are currently stored and if there is a redirection to another domain, in
this case show an intermediate page.
"""
def process_response(self, request, response):
# Check if response is a redirection
if response.status_code not in (301, 302, 303, 307, 308):
return response
# Check if location is to another domain
url = response['Location']
if not url:
return response
if not getattr(response, 'display_message', True):
return response
parsed_url = urllib.parse.urlparse(url)
if not parsed_url.scheme and not parsed_url.netloc:
return response
parsed_request_url = urllib.parse.urlparse(request.build_absolute_uri())
if (parsed_request_url.scheme == parsed_url.scheme or not parsed_url.scheme) and (
parsed_request_url.netloc == parsed_url.netloc
):
return response
# Check if there is some messages to show
storage = messages.get_messages(request)
if not len(storage):
return response
return utils_misc.redirect(request, 'continue', resolve=True, params={'next': url})
class ServiceAccessControlMiddleware(MiddlewareMixin):
def process_exception(self, request, exception):
if not isinstance(exception, (utils_misc.ServiceAccessDenied,)):
return None
return utils_misc.unauthorized_view(request, exception.service)
class CookieTestMiddleware(MiddlewareMixin):
COOKIE_NAME = 'cookie-test'
@classmethod
def check(cls, request):
return cls.COOKIE_NAME in request.COOKIES
def process_response(self, request, response):
if not self.check(request):
# set test cookie for 1 year
response.set_cookie(self.COOKIE_NAME, '1', max_age=365 * 24 * 3600)
return response
class SaveServiceInSessionMiddleware:
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
service = get_service_from_request(request)
if service:
request.session['service_pk'] = service.pk
request.service = SimpleLazyObject(lambda: get_service_from_session(request))
return self.get_response(request)
def journal_middleware(get_response):
from . import journal
def middleware(request):
request.journal = journal.Journal(request=request)
return get_response(request)
return middleware
def null_character_middleware(get_response):
def middleware(request):
def check_query_dict(qd):
for key in qd:
for value in qd.getlist(key):
if '\0' in value:
return False
return True
if not check_query_dict(request.GET):
return http.HttpResponseBadRequest('null character in query string')
if request.content_type == 'application/x-www-form-urlencoded':
if not check_query_dict(request.POST):
return http.HttpResponseBadRequest('null character in form data')
return get_response(request)
return middleware