use force_str only when necessary (#64309)

This commit is contained in:
Paul Marillonnet 2022-04-19 11:30:35 +02:00
parent b4704b16c9
commit dedd924f99
5 changed files with 23 additions and 24 deletions

View File

@ -32,7 +32,6 @@ from django.contrib.auth import get_user_model
from django.contrib.auth.models import Group
from django.core.exceptions import FieldDoesNotExist, PermissionDenied
from django.core.files.storage import default_storage
from django.utils.encoding import force_text
from django.utils.translation import gettext as _
from . import app_settings, models, models_utils, utils
@ -276,7 +275,7 @@ class DefaultAdapter:
realm = utils.get_setting(idp, 'REALM')
username_template = utils.get_setting(idp, 'USERNAME_TEMPLATE')
try:
username = force_text(username_template).format(realm=realm, attributes=saml_attributes, idp=idp)[
username = username_template.format(realm=realm, attributes=saml_attributes, idp=idp)[
: self.user_class._meta.get_field('username').max_length
]
except ValueError:
@ -476,7 +475,7 @@ class DefaultAdapter:
attribute_set = False
for field, tpl in attribute_mapping.items():
try:
value = force_text(tpl).format(realm=realm, attributes=saml_attributes, idp=idp)
value = tpl.format(realm=realm, attributes=saml_attributes, idp=idp)
except ValueError:
logger.warning('mellon: invalid attribute mapping template %r', tpl)
except (AttributeError, KeyError, IndexError, ValueError) as e:

View File

@ -27,7 +27,7 @@ from django.conf import settings
from django.contrib import auth
from django.template.loader import render_to_string
from django.urls import reverse
from django.utils.encoding import force_text
from django.utils.encoding import force_str
from django.utils.timezone import get_default_timezone, is_aware, make_aware, make_naive, now
from . import app_settings
@ -213,10 +213,10 @@ def get_setting(idp, name, default=None):
def make_session_dump(lasso_name_id, indexes):
session_infos = []
name_id = force_text(lasso_name_id.content)
name_id_format = force_text(lasso_name_id.format)
name_qualifier = lasso_name_id.nameQualifier and force_text(lasso_name_id.nameQualifier)
sp_name_qualifier = lasso_name_id.spNameQualifier and force_text(lasso_name_id.spNameQualifier)
name_id = force_str(lasso_name_id.content)
name_id_format = force_str(lasso_name_id.format)
name_qualifier = lasso_name_id.nameQualifier and force_str(lasso_name_id.nameQualifier)
sp_name_qualifier = lasso_name_id.spNameQualifier and force_str(lasso_name_id.spNameQualifier)
for index in indexes:
issuer = index.saml_identifier.issuer.entity_id
session_infos.append(

View File

@ -32,7 +32,7 @@ from django.db import transaction
from django.http import Http404, HttpResponse, HttpResponseForbidden, HttpResponseRedirect
from django.shortcuts import render, resolve_url
from django.urls import reverse
from django.utils.encoding import force_str, force_text
from django.utils.encoding import force_str
from django.utils.http import urlencode
from django.utils.translation import gettext as _
from django.views.decorators.csrf import csrf_exempt
@ -264,14 +264,14 @@ class LoginView(ProfileMixin, LogMixin, View):
if login.nameIdentifier:
name_id = login.nameIdentifier
name_id_format = force_text(name_id.format or lasso.SAML2_NAME_IDENTIFIER_FORMAT_UNSPECIFIED)
name_id_format = force_str(name_id.format or lasso.SAML2_NAME_IDENTIFIER_FORMAT_UNSPECIFIED)
attributes.update(
{'name_id_content': lasso_decode(name_id.content), 'name_id_format': name_id_format}
)
if name_id.nameQualifier:
attributes['name_id_name_qualifier'] = force_text(name_id.nameQualifier)
attributes['name_id_name_qualifier'] = force_str(name_id.nameQualifier)
if name_id.spNameQualifier:
attributes['name_id_sp_name_qualifier'] = force_text(name_id.spNameQualifier)
attributes['name_id_sp_name_qualifier'] = force_str(name_id.spNameQualifier)
authn_statement = login.assertion.authnStatement[0]
if authn_statement.authnInstant:
attributes['authn_instant'] = utils.iso8601_to_datetime(authn_statement.authnInstant)
@ -663,12 +663,12 @@ class LogoutView(ProfileMixin, LogMixin, View):
except lasso.Error as e:
return HttpResponseBadRequest('error processing logout request: %r' % e)
entity_id = force_text(logout.remoteProviderId)
session_indexes = {force_text(sessionIndex) for sessionIndex in logout.request.sessionIndexes}
entity_id = force_str(logout.remoteProviderId)
session_indexes = {force_str(sessionIndex) for sessionIndex in logout.request.sessionIndexes}
saml_identifier = (
models.UserSAMLIdentifier.objects.filter(
name_id=force_text(logout.nameIdentifier.content),
name_id=force_str(logout.nameIdentifier.content),
issuer=models_utils.get_issuer(entity_id),
)
.select_related('user', 'issuer')
@ -708,7 +708,7 @@ class LogoutView(ProfileMixin, LogMixin, View):
except lasso.Error as e:
return HttpResponseBadRequest('error processing logout request: %r' % e)
if logout.msgBody:
return HttpResponse(force_text(logout.msgBody), content_type='text/xml')
return HttpResponse(force_str(logout.msgBody), content_type='text/xml')
else:
return HttpResponseRedirect(logout.msgUrl)

View File

@ -210,7 +210,7 @@ class MockIdp:
if body:
logout.processResponseMsg(force_str(body))
else:
logout.processResponseMsg(force_str(url.split('?', 1)[-1]))
logout.processResponseMsg(url.split('?', 1)[-1])
def process_logout_request_redirect(self, url):
logout = lasso.Logout(self.server)
@ -346,7 +346,7 @@ def test_sso_idp_slo_soap(db, app, idp, caplog, sp_settings):
app.cookiejar.clear()
url, body, relay_state = idp.init_slo(method=lasso.HTTP_METHOD_SOAP)
response = app.post(url, params=body, headers={'Content-Type': force_str('text/xml')})
response = app.post(url, params=body, headers={'Content-Type': 'text/xml'})
assert Session.objects.count() == 1
idp.check_slo_return(body=response.content)
@ -424,7 +424,7 @@ def test_sso_idp_slo_full_soap(db, app, idp, caplog, sp_settings):
# idp logout
app.cookiejar.clear()
url, body, relay_state = idp.init_slo(method=lasso.HTTP_METHOD_SOAP, full=True)
response = app.post(url, params=body, headers={'Content-Type': force_str('text/xml')})
response = app.post(url, params=body, headers={'Content-Type': 'text/xml'})
assert Session.objects.count() == 0
idp.check_slo_return(body=response.content)
@ -677,7 +677,7 @@ def test_passive_auth_middleware_ok(db, app, idp, caplog, settings):
assert 'MELLON_PASSIVE_TRIED' not in app.cookies
# webtest-lint is against unicode
app.set_cookie('IDP_SESSION', '1')
response = app.get('/', headers={'Accept': force_str('text/html')}, status=302)
response = app.get('/', headers={'Accept': 'text/html'}, status=302)
assert urlparse.urlparse(response.location).path == '/login/'
assert urlparse.parse_qs(urlparse.urlparse(response.location).query, keep_blank_values=True) == {
'next': ['http://testserver/'],
@ -695,7 +695,7 @@ def test_passive_auth_middleware_ok(db, app, idp, caplog, settings):
# check passive authentication is tried again
app.set_cookie('IDP_SESSION', '1')
response = app.get('/', headers={'Accept': force_str('text/html')}, status=302)
response = app.get('/', headers={'Accept': 'text/html'}, status=302)
assert urlparse.urlparse(response.location).path == '/login/'
assert urlparse.parse_qs(urlparse.urlparse(response.location).query, keep_blank_values=True) == {
'next': ['http://testserver/'],
@ -709,7 +709,7 @@ def test_passive_auth_middleware_no_passive_auth_parameter(db, app, idp, caplog,
assert 'MELLON_PASSIVE_TRIED' not in app.cookies
# webtest-lint is against unicode
app.set_cookie('IDP_SESSION', '1')
app.get('/?no-passive-auth', headers={'Accept': force_str('text/html')}, status=200)
app.get('/?no-passive-auth', headers={'Accept': 'text/html'}, status=200)
def test_sso_user_change(db, app, idp, caplog, sp_settings):

View File

@ -22,7 +22,7 @@ from urllib.parse import parse_qs, urlparse
import lasso
import pytest
from django.urls import reverse
from django.utils.encoding import force_text
from django.utils.encoding import force_str
from django.utils.http import urlencode
from httmock import HTTMock
from utils import error_500, html_response
@ -271,7 +271,7 @@ def test_malfortmed_artifact(private_settings, client, caplog):
def artifact():
entity_id = b'http://idp5/metadata'
token = b'x' * 20
return force_text(base64.b64encode(b'\x00\x04\x00\x00' + hashlib.sha1(entity_id).digest() + token))
return force_str(base64.b64encode(b'\x00\x04\x00\x00' + hashlib.sha1(entity_id).digest() + token))
def test_error_500_on_artifact_resolve(private_settings, client, caplog, artifact):