misc: replace deprecated force_text by force_str (#69988)
This commit is contained in:
parent
43c256ee7b
commit
d84e49a329
|
@ -28,7 +28,7 @@ from django.core.exceptions import MultipleObjectsReturned
|
|||
from django.db import models
|
||||
from django.shortcuts import get_object_or_404
|
||||
from django.utils.dateparse import parse_datetime
|
||||
from django.utils.encoding import force_text
|
||||
from django.utils.encoding import force_str
|
||||
from django.utils.text import slugify
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
from django.views.decorators.cache import cache_control
|
||||
|
@ -255,7 +255,7 @@ class Register(BaseRpcView):
|
|||
response = {
|
||||
'result': 0,
|
||||
'errors': {'__all__': ['Mail sending failed']},
|
||||
'exception': force_text(e),
|
||||
'exception': force_str(e),
|
||||
}
|
||||
response_status = status.HTTP_503_SERVICE_UNAVAILABLE
|
||||
else:
|
||||
|
|
|
@ -38,7 +38,7 @@ from django.core.cache import cache
|
|||
from django.core.exceptions import ImproperlyConfigured
|
||||
from django.db.models import Q
|
||||
from django.db.transaction import atomic
|
||||
from django.utils.encoding import force_bytes, force_text
|
||||
from django.utils.encoding import force_bytes, force_str
|
||||
from django.utils.translation import gettext as _
|
||||
from django.utils.translation import ngettext
|
||||
from ldap.controls import DecodeControlTuples, SimplePagedResultsControl, ppolicy
|
||||
|
@ -200,7 +200,7 @@ def map_text(d):
|
|||
if d is None:
|
||||
return d
|
||||
elif isinstance(d, str):
|
||||
return force_text(d)
|
||||
return force_str(d)
|
||||
elif isinstance(d, (list, tuple)):
|
||||
return d.__class__(map_text(x) for x in d)
|
||||
elif isinstance(d, dict):
|
||||
|
@ -293,7 +293,7 @@ class LDAPUser(User):
|
|||
settings.SECRET_KEY, encrypted_bindpw, raise_on_error=False
|
||||
)
|
||||
if decrypted:
|
||||
decrypted = force_text(decrypted)
|
||||
decrypted = force_str(decrypted)
|
||||
self.ldap_data['block']['bindpw'] = decrypted
|
||||
del self.ldap_data['block']['encrypted_bindpw']
|
||||
|
||||
|
@ -302,7 +302,7 @@ class LDAPUser(User):
|
|||
data = dict(self.ldap_data)
|
||||
data['block'] = dict(data['block'])
|
||||
if data['block'].get('bindpw'):
|
||||
data['block']['encrypted_bindpw'] = force_text(
|
||||
data['block']['encrypted_bindpw'] = force_str(
|
||||
crypto.aes_base64_encrypt(settings.SECRET_KEY, force_bytes(data['block']['bindpw']))
|
||||
)
|
||||
del data['block']['bindpw']
|
||||
|
@ -339,7 +339,7 @@ class LDAPUser(User):
|
|||
cache = self.ldap_data.setdefault('password', {})
|
||||
if password is not None:
|
||||
# Prevent eavesdropping of the password through the session storage
|
||||
password = force_text(crypto.aes_base64_encrypt(settings.SECRET_KEY, force_bytes(password)))
|
||||
password = force_str(crypto.aes_base64_encrypt(settings.SECRET_KEY, force_bytes(password)))
|
||||
cache[self.dn] = password
|
||||
# ensure session is marked dirty
|
||||
self.update_request()
|
||||
|
@ -350,13 +350,13 @@ class LDAPUser(User):
|
|||
password = cache.get(self.dn)
|
||||
if password is not None:
|
||||
try:
|
||||
password = force_text(crypto.aes_base64_decrypt(settings.SECRET_KEY, password))
|
||||
password = force_str(crypto.aes_base64_decrypt(settings.SECRET_KEY, password))
|
||||
except crypto.DecryptionError:
|
||||
log.error('unable to decrypt a stored LDAP password')
|
||||
self.keep_password_in_session(None)
|
||||
password = None
|
||||
else:
|
||||
password = force_text(password)
|
||||
password = force_str(password)
|
||||
return password
|
||||
else:
|
||||
self.keep_password_in_session(None)
|
||||
|
@ -413,7 +413,7 @@ class LDAPUser(User):
|
|||
|
||||
def get_attributes(self, attribute_source, ctx):
|
||||
cache_key = hashlib.md5(
|
||||
(force_text(str(self.pk)) + ';' + force_text(self.dn)).encode('utf-8')
|
||||
(force_str(str(self.pk)) + ';' + force_str(self.dn)).encode('utf-8')
|
||||
).hexdigest()
|
||||
conn = self.get_connection()
|
||||
# prevents blocking on temporary LDAP failures
|
||||
|
@ -657,11 +657,11 @@ class LDAPBackend:
|
|||
for conn in self.get_connections(block):
|
||||
ldap_uri = conn.get_option(ldap.OPT_URI)
|
||||
authz_ids = []
|
||||
user_basedn = force_text(block.get('user_basedn') or block['basedn'])
|
||||
user_basedn = force_str(block.get('user_basedn') or block['basedn'])
|
||||
|
||||
try:
|
||||
if block['user_dn_template']:
|
||||
template = force_text(block['user_dn_template'])
|
||||
template = force_str(block['user_dn_template'])
|
||||
escaped_username = escape_dn_chars(username)
|
||||
authz_ids.append(template.format(username=escaped_username))
|
||||
else:
|
||||
|
@ -670,7 +670,7 @@ class LDAPBackend:
|
|||
authz_ids.append(username)
|
||||
elif block['user_filter']:
|
||||
# allow multiple occurences of the username in the filter
|
||||
user_filter = force_text(block['user_filter'])
|
||||
user_filter = force_str(block['user_filter'])
|
||||
n = len(user_filter.split('%s')) - 1
|
||||
try:
|
||||
query = filter_format(user_filter, (username,) * n)
|
||||
|
@ -818,7 +818,7 @@ class LDAPBackend:
|
|||
|
||||
def create_username(self, block, attributes):
|
||||
'''Build a username using the configured template'''
|
||||
username_template = force_text(block['username_template'])
|
||||
username_template = force_str(block['username_template'])
|
||||
try:
|
||||
return username_template.format(realm=block['realm'], **attributes)
|
||||
except KeyError as e:
|
||||
|
@ -955,7 +955,7 @@ class LDAPBackend:
|
|||
if member_of_attribute:
|
||||
group_dns.update([dn.lower() for dn in attributes.get(member_of_attribute, [])])
|
||||
if group_filter:
|
||||
group_filter = force_text(group_filter)
|
||||
group_filter = force_str(group_filter)
|
||||
params = attributes.copy()
|
||||
params['user_dn'] = dn
|
||||
query = FilterFormatter().format(group_filter, **params)
|
||||
|
@ -1095,7 +1095,7 @@ class LDAPBackend:
|
|||
def _get_target_ou(self, block):
|
||||
ou_slug = block['ou_slug']
|
||||
if ou_slug:
|
||||
ou_slug = force_text(ou_slug)
|
||||
ou_slug = force_str(ou_slug)
|
||||
try:
|
||||
return OrganizationalUnit.objects.get(slug=ou_slug)
|
||||
except OrganizationalUnit.DoesNotExist:
|
||||
|
@ -1120,7 +1120,7 @@ class LDAPBackend:
|
|||
|
||||
@classmethod
|
||||
def get_sync_ldap_user_filter(cls, block):
|
||||
user_filter = force_text(block['sync_ldap_users_filter'] or block['user_filter'])
|
||||
user_filter = force_str(block['sync_ldap_users_filter'] or block['user_filter'])
|
||||
user_filter = user_filter.replace('%s', '*')
|
||||
return user_filter
|
||||
|
||||
|
@ -1217,20 +1217,20 @@ class LDAPBackend:
|
|||
attribute_map = results[0][1] if results else {}
|
||||
# add mandatory attributes
|
||||
for key, mandatory_values in mandatory_attributes_values.items():
|
||||
key = force_text(key)
|
||||
key = force_str(key)
|
||||
old = attribute_map.setdefault(key, [])
|
||||
new = set(old) | set(mandatory_values)
|
||||
attribute_map[key] = list(new)
|
||||
# apply mappings
|
||||
for from_attribute, to_attribute in attribute_mappings:
|
||||
from_attribute = force_text(from_attribute)
|
||||
from_attribute = force_str(from_attribute)
|
||||
if from_attribute not in attribute_map:
|
||||
continue
|
||||
to_attribute = force_text(to_attribute)
|
||||
to_attribute = force_str(to_attribute)
|
||||
old = attribute_map.setdefault(to_attribute, [])
|
||||
new = set(old) | set(attribute_map[from_attribute])
|
||||
attribute_map[to_attribute] = list(new)
|
||||
attribute_map['dn'] = force_text(dn)
|
||||
attribute_map['dn'] = force_str(dn)
|
||||
|
||||
# extra attributes
|
||||
attribute_map = cls.get_ldap_extra_attributes(block, conn, dn, attribute_map)
|
||||
|
@ -1346,7 +1346,7 @@ class LDAPBackend:
|
|||
if quote:
|
||||
decoded.append((attribute, urllib.parse.unquote(value)))
|
||||
else:
|
||||
decoded.append((attribute, force_text(value)))
|
||||
decoded.append((attribute, force_str(value)))
|
||||
filters = [filter_format('(%s=%s)', (a, b)) for a, b in decoded]
|
||||
return '(&{})'.format(''.join(filters))
|
||||
|
||||
|
@ -1647,10 +1647,10 @@ class LDAPBackend:
|
|||
log.info('Synchronising users from realm "%s"', block['realm'])
|
||||
conn = cls.get_connection(block)
|
||||
if conn is None:
|
||||
log.warning('unable to synchronize with LDAP servers %s', force_text(block['url']))
|
||||
log.warning('unable to synchronize with LDAP servers %s', force_str(block['url']))
|
||||
return
|
||||
cls.check_group_to_role_mappings(block)
|
||||
user_basedn = force_text(block.get('user_basedn') or block['basedn'])
|
||||
user_basedn = force_str(block.get('user_basedn') or block['basedn'])
|
||||
user_filter = cls.get_sync_ldap_user_filter(block)
|
||||
attribute_names = cls.get_ldap_attributes_names(block)
|
||||
results = cls.paged_search(
|
||||
|
@ -1715,7 +1715,7 @@ class LDAPBackend:
|
|||
'external_guid', flat=True
|
||||
)
|
||||
)
|
||||
basedn = force_text(block.get('user_basedn') or block['basedn'])
|
||||
basedn = force_str(block.get('user_basedn') or block['basedn'])
|
||||
attribute_names = list(
|
||||
{a[0] for a in cls.attribute_name_from_external_id_tuple(block['external_id_tuples'])}
|
||||
| set(USUAL_GUID_ATTRIBUTES)
|
||||
|
@ -1794,7 +1794,7 @@ class LDAPBackend:
|
|||
new_attrs = {'dn': dn}
|
||||
for key in attrs:
|
||||
try:
|
||||
new_attrs[key.lower()] = [force_text(value, encoding) for value in attrs[key]]
|
||||
new_attrs[key.lower()] = [force_str(value, encoding) for value in attrs[key]]
|
||||
except UnicodeDecodeError:
|
||||
log.debug('unable to decode attribute %r as UTF-8, converting to base64', key)
|
||||
new_attrs[key.lower()] = [base64.b64encode(value).decode('ascii') for value in attrs[key]]
|
||||
|
@ -1877,7 +1877,7 @@ class LDAPBackend:
|
|||
try:
|
||||
if credentials:
|
||||
who, password = credentials[0], credentials[1]
|
||||
password = force_text(password)
|
||||
password = force_str(password)
|
||||
conn.simple_bind_s(who, password)
|
||||
log_message = 'with user %s' % who
|
||||
elif block['bindsasl']:
|
||||
|
@ -1887,8 +1887,8 @@ class LDAPBackend:
|
|||
conn.sasl_interactive_bind_s(who, auth)
|
||||
log_message = 'with account %s' % who
|
||||
elif block['binddn'] and block['bindpw']:
|
||||
who = force_text(block['binddn'])
|
||||
conn.simple_bind_s(who, force_text(block['bindpw']))
|
||||
who = force_str(block['binddn'])
|
||||
conn.simple_bind_s(who, force_str(block['bindpw']))
|
||||
log_message = 'with binddn %s' % who
|
||||
else:
|
||||
who = 'anonymous'
|
||||
|
@ -1959,7 +1959,7 @@ class LDAPBackend:
|
|||
if not isinstance(block[d], str):
|
||||
raise ImproperlyConfigured('LDAP_AUTH_SETTINGS: attribute %r must be a string' % d)
|
||||
try:
|
||||
block[d] = force_text(block[d])
|
||||
block[d] = force_str(block[d])
|
||||
except UnicodeEncodeError:
|
||||
raise ImproperlyConfigured('LDAP_AUTH_SETTINGS: attribute %r must be a string' % d)
|
||||
if isinstance(value, bool) and not isinstance(block[d], bool):
|
||||
|
@ -1981,7 +1981,7 @@ class LDAPBackend:
|
|||
# we handle strings, list of strings and list of list or tuple whose first element is a
|
||||
# string
|
||||
if isinstance(block[key], str):
|
||||
block[key] = force_text(block[key]).lower()
|
||||
block[key] = force_str(block[key]).lower()
|
||||
elif isinstance(block[key], (list, tuple)):
|
||||
new_seq = []
|
||||
for elt in block[key]:
|
||||
|
@ -1998,7 +1998,7 @@ class LDAPBackend:
|
|||
elif isinstance(block[key], dict):
|
||||
newdict = {}
|
||||
for subkey in block[key]:
|
||||
newdict[force_text(subkey).lower()] = block[key][subkey]
|
||||
newdict[force_str(subkey).lower()] = block[key][subkey]
|
||||
block[key] = newdict
|
||||
else:
|
||||
raise NotImplementedError(
|
||||
|
@ -2036,7 +2036,7 @@ class LDAPBackendPasswordLost(LDAPBackend):
|
|||
for block in config:
|
||||
if block['authentication'] is False:
|
||||
continue
|
||||
if user_external_id.source != force_text(block['realm']):
|
||||
if user_external_id.source != force_str(block['realm']):
|
||||
continue
|
||||
for external_id_tuple in map_text(block['external_id_tuples']):
|
||||
conn = self.ldap_backend.get_connection(block)
|
||||
|
|
|
@ -26,7 +26,7 @@ from django.core.exceptions import FieldDoesNotExist, ValidationError
|
|||
from django.core.validators import RegexValidator
|
||||
from django.db import IntegrityError, models
|
||||
from django.db.transaction import atomic
|
||||
from django.utils.encoding import force_bytes, force_text
|
||||
from django.utils.encoding import force_bytes, force_str
|
||||
from django.utils.translation import gettext as _
|
||||
|
||||
from authentic2 import app_settings
|
||||
|
@ -79,7 +79,7 @@ class UTF8Recoder:
|
|||
return self
|
||||
|
||||
def __next__(self):
|
||||
return force_text(self.fd.__next__().encode('utf-8'))
|
||||
return force_str(self.fd.__next__().encode('utf-8'))
|
||||
|
||||
next = __next__
|
||||
|
||||
|
@ -156,7 +156,7 @@ class CsvImporter:
|
|||
|
||||
def parse_csv(input_fd):
|
||||
try:
|
||||
content = force_text(input_fd.read().encode('utf-8'))
|
||||
content = force_str(input_fd.read().encode('utf-8'))
|
||||
except UnicodeDecodeError:
|
||||
self.error = Error('bad-encoding', _('Bad encoding'))
|
||||
return False
|
||||
|
|
|
@ -22,7 +22,7 @@ from django.conf import settings
|
|||
from django.contrib.auth import forms as auth_forms
|
||||
from django.forms.widgets import Media
|
||||
from django.utils import html
|
||||
from django.utils.encoding import force_text
|
||||
from django.utils.encoding import force_str
|
||||
from django.utils.translation import gettext
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
|
||||
|
@ -159,5 +159,5 @@ class AuthenticationForm(auth_forms.AuthenticationForm):
|
|||
invalid_login_message.append(_('Try again or use the forgotten password link below.'))
|
||||
elif getattr(settings, 'REGISTRATION_OPEN', True):
|
||||
invalid_login_message.append(_('Try again or create an account.'))
|
||||
error_messages['invalid_login'] = ' '.join([force_text(x) for x in invalid_login_message])
|
||||
error_messages['invalid_login'] = ' '.join([force_str(x) for x in invalid_login_message])
|
||||
return error_messages
|
||||
|
|
|
@ -34,7 +34,7 @@ from django.forms.widgets import ClearableFileInput, DateInput, DateTimeInput
|
|||
from django.forms.widgets import EmailInput as BaseEmailInput
|
||||
from django.forms.widgets import PasswordInput as BasePasswordInput
|
||||
from django.forms.widgets import TextInput, TimeInput
|
||||
from django.utils.encoding import force_text
|
||||
from django.utils.encoding import force_str
|
||||
from django.utils.formats import get_format, get_language
|
||||
from django.utils.safestring import mark_safe
|
||||
from django.utils.text import capfirst
|
||||
|
@ -228,7 +228,7 @@ class DateWidget(PickerWidgetMixin, DateInput):
|
|||
def format_value(self, value):
|
||||
if value is not None:
|
||||
if isinstance(value, datetime.datetime):
|
||||
return force_text(value.isoformat())
|
||||
return force_str(value.isoformat())
|
||||
return value
|
||||
|
||||
|
||||
|
|
|
@ -23,7 +23,7 @@ from collections import OrderedDict
|
|||
from django.contrib.auth import hashers
|
||||
from django.contrib.auth.hashers import make_password
|
||||
from django.utils.crypto import constant_time_compare
|
||||
from django.utils.encoding import force_bytes, force_text
|
||||
from django.utils.encoding import force_bytes, force_str
|
||||
from django.utils.translation import gettext_noop as _
|
||||
|
||||
|
||||
|
@ -160,8 +160,8 @@ def olap_password_to_dj(password):
|
|||
algo_name, salt_offset, hex_encode = OPENLDAP_ALGO_MAPPING[algo]
|
||||
salt, password = (password[salt_offset:], password[:salt_offset]) if salt_offset else ('', password)
|
||||
if hex_encode:
|
||||
password = force_text(hexlify(password), encoding='ascii')
|
||||
salt = force_text(hexlify(force_bytes(salt)), encoding='ascii')
|
||||
password = force_str(hexlify(password), encoding='ascii')
|
||||
salt = force_str(hexlify(force_bytes(salt)), encoding='ascii')
|
||||
return '%s$%s$%s' % (algo_name, salt, password)
|
||||
else:
|
||||
return make_password(password)
|
||||
|
@ -172,7 +172,7 @@ class OpenLDAPPasswordHasher(CommonPasswordHasher):
|
|||
assert password
|
||||
assert b'$' not in salt
|
||||
hash = self.digest(force_bytes(password + salt)).hexdigest()
|
||||
salt = force_text(hexlify(salt), encoding='ascii')
|
||||
salt = force_str(hexlify(salt), encoding='ascii')
|
||||
return "%s$%s$%s" % (self.algorithm, salt, hash)
|
||||
|
||||
def verify(self, password, encoded):
|
||||
|
@ -223,7 +223,7 @@ class JoomlaPasswordHasher(CommonPasswordHasher):
|
|||
assert password
|
||||
assert b'$' not in salt
|
||||
hash = self.digest(force_bytes(password) + salt).hexdigest()
|
||||
salt = force_text(hexlify(force_bytes(salt)), encoding='ascii')
|
||||
salt = force_str(hexlify(force_bytes(salt)), encoding='ascii')
|
||||
return "%s$md5$%s$%s" % (self.algorithm, salt, hash)
|
||||
|
||||
def verify(self, password, encoded):
|
||||
|
@ -247,7 +247,7 @@ class JoomlaPasswordHasher(CommonPasswordHasher):
|
|||
h, salt = encoded.split(':', 1)
|
||||
else:
|
||||
h, salt = encoded, ''
|
||||
salt = force_text(hexlify(force_bytes(salt)), encoding='ascii')
|
||||
salt = force_str(hexlify(force_bytes(salt)), encoding='ascii')
|
||||
|
||||
return '%s$md5$%s$%s' % (cls.algorithm, salt, h)
|
||||
|
||||
|
@ -259,7 +259,7 @@ class JoomlaPasswordHasher(CommonPasswordHasher):
|
|||
if subalgo != 'md5':
|
||||
raise NotImplementedError
|
||||
if salt:
|
||||
return '%s:%s' % (_hash, force_text(unhexlify(force_bytes(salt))))
|
||||
return '%s:%s' % (_hash, force_str(unhexlify(force_bytes(salt))))
|
||||
else:
|
||||
return _hash
|
||||
|
||||
|
@ -283,7 +283,7 @@ class PloneSHA1PasswordHasher(hashers.SHA1PasswordHasher):
|
|||
salt = force_bytes(salt)
|
||||
|
||||
hashed = base64.b64encode(hashlib.sha1(password + salt).digest() + salt)
|
||||
return "%s$%s%s" % (self.algorithm, self._prefix, force_text(hashed))
|
||||
return "%s$%s%s" % (self.algorithm, self._prefix, force_str(hashed))
|
||||
|
||||
def verify(self, password, encoded):
|
||||
"""Verify the given password against the encoded string."""
|
||||
|
|
|
@ -50,7 +50,7 @@ from django.core.exceptions import ObjectDoesNotExist
|
|||
from django.http import HttpResponse, HttpResponseBadRequest, HttpResponseForbidden, HttpResponseRedirect
|
||||
from django.shortcuts import redirect, render
|
||||
from django.urls import reverse
|
||||
from django.utils.encoding import force_bytes, force_str, force_text
|
||||
from django.utils.encoding import force_bytes, force_str
|
||||
from django.utils.timezone import utc
|
||||
from django.utils.translation import gettext as _
|
||||
from django.utils.translation import gettext_noop as N_
|
||||
|
@ -271,8 +271,8 @@ def add_attributes(request, entity_id, assertion, provider, nid_format):
|
|||
seen = set()
|
||||
# Keep current attributes, mark string values as already added
|
||||
for attribute in attribute_statement.attribute:
|
||||
name = force_text(attribute.name)
|
||||
name_format = force_text(attribute.nameFormat)
|
||||
name = force_str(attribute.name)
|
||||
name_format = force_str(attribute.nameFormat)
|
||||
# sequence from lasso are always tuple, so convert to list
|
||||
attributes[(name, name_format)] = attribute, list(attribute.attributeValue)
|
||||
for atv in attribute.attributeValue:
|
||||
|
@ -282,7 +282,7 @@ def add_attributes(request, entity_id, assertion, provider, nid_format):
|
|||
and isinstance(atv.any[0], lasso.MiscTextNode)
|
||||
and atv.any[0].textChild
|
||||
):
|
||||
seen.add((name, name_format, force_text(atv.any[0].content)))
|
||||
seen.add((name, name_format, force_str(atv.any[0].content)))
|
||||
definitions = list(qs)
|
||||
|
||||
# special handling of nid format edupersontargetedid
|
||||
|
@ -360,7 +360,7 @@ def value_to_misc_text_node(value):
|
|||
elif value is None:
|
||||
value = ''
|
||||
else:
|
||||
value = force_text(value)
|
||||
value = force_str(value)
|
||||
tn = lasso.MiscTextNode.newWithString(force_str(value))
|
||||
tn.textChild = True
|
||||
return tn
|
||||
|
@ -369,7 +369,7 @@ def value_to_misc_text_node(value):
|
|||
def saml2_add_attribute_values(assertion, attributes):
|
||||
if attributes:
|
||||
logger.debug('adding attributes')
|
||||
logger.debug('assertion before processing %s', force_text(assertion.dump()))
|
||||
logger.debug('assertion before processing %s', force_str(assertion.dump()))
|
||||
logger.debug('adding attributes %s', attributes)
|
||||
if not assertion.attributeStatement:
|
||||
assertion.attributeStatement = [lasso.Saml2AttributeStatement()]
|
||||
|
@ -404,7 +404,7 @@ def saml2_add_attribute_values(assertion, attributes):
|
|||
attribute_value.any = [text_node]
|
||||
attribute_value_list.append(attribute_value)
|
||||
attribute.attributeValue = attribute_value_list
|
||||
logger.debug('assertion after processing %s', force_text(assertion.dump()))
|
||||
logger.debug('assertion after processing %s', force_str(assertion.dump()))
|
||||
|
||||
|
||||
def build_assertion(request, login, provider, nid_format='transient'):
|
||||
|
@ -463,7 +463,7 @@ def build_assertion(request, login, provider, nid_format='transient'):
|
|||
expiry_date = request.session.get_expiry_date()
|
||||
session_not_on_or_after = timezone_now + (expiry_date - timezone_now) * 0.5
|
||||
assertion.authnStatement[0].sessionNotOnOrAfter = datetime_to_xs_datetime(session_not_on_or_after)
|
||||
logger.debug('assertion building in progress %s', force_text(assertion.dump()))
|
||||
logger.debug('assertion building in progress %s', force_str(assertion.dump()))
|
||||
fill_assertion(request, login.request, assertion, login.remoteProviderId, nid_format)
|
||||
# Save federation and new session
|
||||
if nid_format == 'persistent':
|
||||
|
@ -605,7 +605,7 @@ def sso(request):
|
|||
and name_id_policy.format
|
||||
and name_id_policy.format != lasso.SAML2_NAME_IDENTIFIER_FORMAT_UNSPECIFIED
|
||||
):
|
||||
logger.debug('nameID policy is %s', force_text(name_id_policy.dump()))
|
||||
logger.debug('nameID policy is %s', force_str(name_id_policy.dump()))
|
||||
nid_format = saml2_urn_to_nidformat(name_id_policy.format, accepted=policy.accepted_name_id_format)
|
||||
logger.debug('nameID format %s', nid_format)
|
||||
default_nid_format = policy.default_name_id_format
|
||||
|
@ -635,7 +635,7 @@ def need_login(request, login, nid_format):
|
|||
the login form was submitted
|
||||
"""
|
||||
nonce = login.request.id or get_nonce()
|
||||
save_key_values(nonce, force_text(login.dump()), False, nid_format)
|
||||
save_key_values(nonce, force_str(login.dump()), False, nid_format)
|
||||
next_url = make_url(continue_sso, params={NONCE_FIELD_NAME: nonce})
|
||||
logger.debug('redirect to login page with next url %s', next_url)
|
||||
return login_require(
|
||||
|
@ -653,7 +653,7 @@ def get_url_with_nonce(request, function, nonce):
|
|||
|
||||
def need_consent_for_federation(request, login, nid_format):
|
||||
nonce = login.request.id or get_nonce()
|
||||
save_key_values(nonce, force_text(login.dump()), False, nid_format)
|
||||
save_key_values(nonce, force_str(login.dump()), False, nid_format)
|
||||
display_name = None
|
||||
try:
|
||||
provider = LibertyProvider.objects.get(entity_id=login.request.issuer.content)
|
||||
|
@ -943,13 +943,13 @@ def sso_after_process_request(
|
|||
logger.debug('ask the user consent now')
|
||||
return need_consent_for_federation(request, login, nid_format)
|
||||
|
||||
logger.debug('login dump before processing %s', force_text(login.dump()))
|
||||
logger.debug('login dump before processing %s', force_str(login.dump()))
|
||||
try:
|
||||
if needs_persistence(nid_format):
|
||||
logger.debug('load identity dump')
|
||||
load_federation(request, get_entity_id(request), login, user)
|
||||
login.validateRequestMsg(not user.is_anonymous, consent_obtained)
|
||||
logger.debug('validateRequestMsg %s', force_text(login.dump()))
|
||||
logger.debug('validateRequestMsg %s', force_str(login.dump()))
|
||||
except lasso.LoginRequestDeniedError:
|
||||
logger.warning('access denied due to LoginRequestDeniedError')
|
||||
set_saml2_response_responder_status_code(login.response, lasso.SAML2_STATUS_CODE_REQUEST_DENIED)
|
||||
|
@ -1003,7 +1003,7 @@ def finish_sso(request, login, user=None, return_profile=False):
|
|||
def save_artifact(request, login):
|
||||
'''Remember an artifact message for later retrieving'''
|
||||
LibertyArtifact(
|
||||
artifact=login.artifact, content=force_text(login.artifactMessage), provider_id=login.remoteProviderId
|
||||
artifact=login.artifact, content=force_str(login.artifactMessage), provider_id=login.remoteProviderId
|
||||
).save()
|
||||
logger.debug('artifact saved')
|
||||
|
||||
|
@ -1465,7 +1465,7 @@ def slo(request):
|
|||
provider_id=logout.remoteProviderId, django_session_key=request.session.session_key
|
||||
).delete()
|
||||
# Save some values for cleaning up
|
||||
save_key_values(logout.request.id, force_text(logout.dump()), request.session.session_key)
|
||||
save_key_values(logout.request.id, force_str(logout.dump()), request.session.session_key)
|
||||
|
||||
# Use the logout view and come back to the finish slo view
|
||||
next_url = make_url(finish_slo, params={'id': logout.request.id})
|
||||
|
@ -1552,7 +1552,7 @@ def idp_slo(request, provider_id=None):
|
|||
return redirect_next(request, next) or ko_icon(request)
|
||||
return process_logout_response(request, logout, soap_response, next)
|
||||
else:
|
||||
save_key_values(logout.request.id, force_text(logout.dump()), provider_id, next)
|
||||
save_key_values(logout.request.id, force_str(logout.dump()), provider_id, next)
|
||||
return HttpResponseRedirect(logout.msgUrl)
|
||||
|
||||
|
||||
|
|
|
@ -19,7 +19,7 @@ import string
|
|||
|
||||
import ldap.dn
|
||||
import ldap.filter
|
||||
from django.utils.encoding import force_text
|
||||
from django.utils.encoding import force_str
|
||||
|
||||
|
||||
class DnFormatter(string.Formatter):
|
||||
|
@ -39,7 +39,7 @@ class DnFormatter(string.Formatter):
|
|||
|
||||
def convert_field(self, value, conversion):
|
||||
if conversion == 's':
|
||||
return force_text(value)
|
||||
return force_str(value)
|
||||
return super().convert_field(value, conversion)
|
||||
|
||||
|
||||
|
@ -60,5 +60,5 @@ class FilterFormatter(string.Formatter):
|
|||
|
||||
def convert_field(self, value, conversion):
|
||||
if conversion == 's':
|
||||
return force_text(value)
|
||||
return force_str(value)
|
||||
return super().convert_field(value, conversion)
|
||||
|
|
|
@ -28,7 +28,7 @@ from django.db import transaction
|
|||
from django.forms import MediaDefiningClass
|
||||
from django.http import Http404, HttpResponse
|
||||
from django.urls import reverse, reverse_lazy
|
||||
from django.utils.encoding import force_text
|
||||
from django.utils.encoding import force_str
|
||||
from django.utils.functional import cached_property
|
||||
from django.utils.timezone import now
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
|
@ -270,7 +270,7 @@ class AjaxFormViewMixin:
|
|||
data['location'] = location
|
||||
if hasattr(response, 'render'):
|
||||
response.render()
|
||||
data['content'] = force_text(response.content)
|
||||
data['content'] = force_str(response.content)
|
||||
return HttpResponse(json.dumps(data), content_type='application/json')
|
||||
|
||||
|
||||
|
@ -713,9 +713,9 @@ class TechnicalInformationView(TitleMixin, MediaMixin, TemplateView):
|
|||
# retrieve ldap uri, not directly visible in configuration block
|
||||
config['ldap_uri'] = conn.get_option(ldap.OPT_URI)
|
||||
# user filters need to be formatted to ldapsearch syntax
|
||||
config['user_filter'] = force_text(block.get('user_filter'), '').replace('%s', '*')
|
||||
config['user_filter'] = force_str(block.get('user_filter'), '').replace('%s', '*')
|
||||
config['sync_ldap_users_filter'] = (
|
||||
force_text(block.get('sync_ldap_users_filter'), '').replace('%s', '*').replace('%s', '*')
|
||||
force_str(block.get('sync_ldap_users_filter'), '').replace('%s', '*').replace('%s', '*')
|
||||
)
|
||||
|
||||
kwargs['ldap_list'].append(config)
|
||||
|
|
|
@ -20,7 +20,7 @@ import operator
|
|||
import pickle
|
||||
|
||||
from django.contrib.auth import get_user_model
|
||||
from django.utils.encoding import force_text
|
||||
from django.utils.encoding import force_str
|
||||
from django_select2.forms import ModelSelect2MultipleWidget, ModelSelect2Widget
|
||||
|
||||
from authentic2.a2_rbac.models import Role
|
||||
|
@ -73,7 +73,7 @@ class Select2Mixin:
|
|||
attrs = super().build_attrs(*args, **kwargs)
|
||||
field_data = {
|
||||
'class': self.__class__.__name__,
|
||||
'where_clause': force_text(base64.b64encode(pickle.dumps(self.queryset.query.where))),
|
||||
'where_clause': force_str(base64.b64encode(pickle.dumps(self.queryset.query.where))),
|
||||
}
|
||||
attrs['data-field_id'] = crypto.dumps(field_data)
|
||||
return attrs
|
||||
|
|
|
@ -26,7 +26,7 @@ from django.core.exceptions import ValidationError
|
|||
from django.http import Http404, HttpResponse, HttpResponseRedirect
|
||||
from django.shortcuts import render
|
||||
from django.urls import reverse
|
||||
from django.utils.encoding import force_text
|
||||
from django.utils.encoding import force_str
|
||||
|
||||
from authentic2.compat_lasso import lasso
|
||||
from authentic2.http_utils import get_url
|
||||
|
@ -393,7 +393,7 @@ def load_provider(request, entity_id, server=None, sp_or_idp='sp', autoload=Fals
|
|||
return False
|
||||
if server:
|
||||
server.addProviderFromBuffer(
|
||||
lasso.PROVIDER_ROLE_SP, force_text(liberty_provider.metadata.encode('utf8'))
|
||||
lasso.PROVIDER_ROLE_SP, force_str(liberty_provider.metadata.encode('utf8'))
|
||||
)
|
||||
policy = get_sp_options_policy(liberty_provider)
|
||||
if policy:
|
||||
|
|
|
@ -27,7 +27,7 @@ from django.contrib.humanize.templatetags.humanize import apnumber
|
|||
from django.core.exceptions import ValidationError
|
||||
from django.db import models
|
||||
from django.template.defaultfilters import pluralize
|
||||
from django.utils.encoding import force_bytes, force_text
|
||||
from django.utils.encoding import force_bytes, force_str
|
||||
from django.utils.text import capfirst
|
||||
|
||||
|
||||
|
@ -39,7 +39,7 @@ def loads(value):
|
|||
|
||||
|
||||
def dumps(value):
|
||||
return PickledObject(force_text(base64.b64encode(pickle.dumps(value, protocol=0))))
|
||||
return PickledObject(force_str(base64.b64encode(pickle.dumps(value, protocol=0))))
|
||||
|
||||
|
||||
# This is a copy of http://djangosnippets.org/snippets/513/
|
||||
|
|
|
@ -20,7 +20,7 @@ import requests
|
|||
from django import forms
|
||||
from django.conf import settings
|
||||
from django.core.exceptions import ValidationError
|
||||
from django.utils.encoding import force_text
|
||||
from django.utils.encoding import force_str
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
|
||||
from authentic2.a2_rbac.models import OrganizationalUnit
|
||||
|
@ -52,7 +52,7 @@ class AddLibertyProviderFromUrlForm(forms.Form):
|
|||
try:
|
||||
response = requests.get(url, timeout=settings.REQUESTS_TIMEOUT)
|
||||
response.raise_for_status()
|
||||
content = force_text(response.content)
|
||||
content = force_str(response.content)
|
||||
except requests.RequestException as e:
|
||||
raise ValidationError(
|
||||
_('Retrieval of %(url)s failed: %(exception)s') % {'url': url, 'exception': e}
|
||||
|
|
|
@ -26,7 +26,7 @@ from django.core.exceptions import ObjectDoesNotExist, ValidationError
|
|||
from django.db import models
|
||||
from django.db.models import Q
|
||||
from django.db.models.query import QuerySet
|
||||
from django.utils.encoding import force_str, force_text
|
||||
from django.utils.encoding import force_str
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
|
||||
from authentic2.compat_lasso import lasso
|
||||
|
@ -162,7 +162,7 @@ NAME_ID_FORMATS = collections.OrderedDict(
|
|||
]
|
||||
)
|
||||
|
||||
NAME_ID_FORMATS_CHOICES = [(force_text(x), y['caption']) for x, y in NAME_ID_FORMATS.items()]
|
||||
NAME_ID_FORMATS_CHOICES = [(force_str(x), y['caption']) for x, y in NAME_ID_FORMATS.items()]
|
||||
|
||||
ACCEPTED_NAME_ID_FORMAT_LENGTH = sum(len(x) for x, y in NAME_ID_FORMATS.items()) + len(NAME_ID_FORMATS) - 1
|
||||
|
||||
|
@ -397,7 +397,7 @@ class LibertyProvider(Service):
|
|||
|
||||
def clean(self):
|
||||
super().clean()
|
||||
p = lasso.Provider.newFromBuffer(lasso.PROVIDER_ROLE_ANY, force_text(self.metadata.encode('utf8')))
|
||||
p = lasso.Provider.newFromBuffer(lasso.PROVIDER_ROLE_ANY, force_str(self.metadata.encode('utf8')))
|
||||
if p is None:
|
||||
raise ValidationError(_('Invalid metadata file'))
|
||||
self.entity_id = p.providerId
|
||||
|
|
|
@ -15,7 +15,7 @@
|
|||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
|
||||
from django.utils.encoding import force_text
|
||||
from django.utils.encoding import force_str
|
||||
from django.utils.functional import keep_lazy
|
||||
from django.utils.text import format_lazy
|
||||
|
||||
|
@ -34,4 +34,4 @@ def lazy_label(default, func):
|
|||
|
||||
ex.: lazy_label(_('Default label'), lambda: app_settings.CUSTOM_LABEL)
|
||||
"""
|
||||
return force_text(func() or default)
|
||||
return force_str(func() or default)
|
||||
|
|
|
@ -29,7 +29,7 @@ import requests
|
|||
from django.conf import settings
|
||||
from django.shortcuts import resolve_url
|
||||
from django.urls import reverse
|
||||
from django.utils.encoding import force_bytes, force_text
|
||||
from django.utils.encoding import force_bytes, force_str
|
||||
from django.utils.http import urlencode
|
||||
from django.utils.timezone import now
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
|
@ -281,7 +281,7 @@ def parse_id_token(id_token, authorize_url, client_id=None, client_secret=None):
|
|||
return None, 'hmac signature does not match'
|
||||
payload = base64url_decode(str(payload))
|
||||
try:
|
||||
payload = json.loads(force_text(payload))
|
||||
payload = json.loads(force_str(payload))
|
||||
except ValueError:
|
||||
return None, 'invalid payload'
|
||||
if client_id and ('aud' not in payload or payload['aud'] != client_id):
|
||||
|
|
|
@ -23,7 +23,7 @@ import uuid
|
|||
|
||||
from django.conf import settings
|
||||
from django.core.exceptions import ImproperlyConfigured
|
||||
from django.utils.encoding import force_bytes, force_text
|
||||
from django.utils.encoding import force_bytes, force_str
|
||||
from jwcrypto.jwk import JWK, InvalidJWKValue, JWKSet
|
||||
from jwcrypto.jwt import JWT
|
||||
|
||||
|
@ -97,7 +97,7 @@ def make_idtoken(client, claims):
|
|||
if client.idtoken_algo == client.ALGO_HMAC:
|
||||
header = {'alg': 'HS256'}
|
||||
k = base64url(client.client_secret.encode('utf-8'))
|
||||
jwk = JWK(kty='oct', k=force_text(k))
|
||||
jwk = JWK(kty='oct', k=force_str(k))
|
||||
elif client.idtoken_algo == client.ALGO_RSA:
|
||||
header = {'alg': 'RS256'}
|
||||
jwk = get_first_rsa_sig_key()
|
||||
|
@ -134,7 +134,7 @@ def make_sub(client, user, profile=None):
|
|||
if client.identifier_policy in (client.POLICY_PAIRWISE, client.POLICY_PAIRWISE_REVERSIBLE):
|
||||
return make_pairwise_sub(client, user, profile=profile)
|
||||
elif client.identifier_policy == client.POLICY_UUID:
|
||||
return force_text(user.uuid)
|
||||
return force_str(user.uuid)
|
||||
elif client.identifier_policy == client.POLICY_EMAIL:
|
||||
return user.email
|
||||
else:
|
||||
|
|
|
@ -28,7 +28,7 @@ from django.contrib.auth import authenticate
|
|||
from django.http import HttpResponse, HttpResponseNotAllowed, JsonResponse
|
||||
from django.shortcuts import render
|
||||
from django.urls import reverse
|
||||
from django.utils.encoding import force_text
|
||||
from django.utils.encoding import force_str
|
||||
from django.utils.http import urlencode
|
||||
from django.utils.timezone import now, utc
|
||||
from django.utils.translation import gettext as _
|
||||
|
@ -185,7 +185,7 @@ class WrongClientSecret(InvalidClient):
|
|||
error_description = _('Wrong client secret')
|
||||
|
||||
def __init__(self, *args, wrong_id, **kwargs):
|
||||
kwargs['extra_info'] = _('received %s') % force_text(wrong_id)
|
||||
kwargs['extra_info'] = _('received %s') % force_str(wrong_id)
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
|
||||
|
@ -540,7 +540,7 @@ def parse_http_basic(request):
|
|||
if authorization[0] != 'Basic' or len(authorization) != 2:
|
||||
return None, None
|
||||
try:
|
||||
decoded = force_text(base64.b64decode(authorization[1]))
|
||||
decoded = force_str(base64.b64decode(authorization[1]))
|
||||
except Base64Error:
|
||||
return None, None
|
||||
parts = decoded.split(':')
|
||||
|
|
|
@ -28,7 +28,7 @@ from django.contrib.auth.hashers import check_password
|
|||
from django.contrib.contenttypes.models import ContentType
|
||||
from django.core import mail
|
||||
from django.urls import reverse
|
||||
from django.utils.encoding import force_text
|
||||
from django.utils.encoding import force_str
|
||||
from django.utils.text import slugify
|
||||
from requests.models import Response
|
||||
|
||||
|
@ -108,7 +108,7 @@ def test_api_user(client):
|
|||
# login
|
||||
client.login(request=None, username='john.doe', password='password')
|
||||
response = client.get('/api/user/', HTTP_ORIGIN='http://testserver')
|
||||
data = json.loads(force_text(response.content))
|
||||
data = json.loads(force_str(response.content))
|
||||
assert isinstance(data, dict)
|
||||
assert set(data.keys()) == {
|
||||
'uuid',
|
||||
|
|
|
@ -30,7 +30,7 @@ from django.http import QueryDict
|
|||
from django.test.utils import override_settings
|
||||
from django.urls import reverse
|
||||
from django.utils.dateparse import parse_datetime
|
||||
from django.utils.encoding import force_text
|
||||
from django.utils.encoding import force_str
|
||||
from django.utils.timezone import now
|
||||
from jwcrypto.jwk import JWK, JWKSet
|
||||
from jwcrypto.jwt import JWT
|
||||
|
@ -325,7 +325,7 @@ def test_authorization_code_sso(
|
|||
algs = ['RS256', 'ES256']
|
||||
elif oidc_client.idtoken_algo == oidc_client.ALGO_HMAC:
|
||||
k = base64.b64encode(oidc_client.client_secret.encode('utf-8'))
|
||||
key = JWK(kty='oct', k=force_text(k))
|
||||
key = JWK(kty='oct', k=force_str(k))
|
||||
algs = ['HS256']
|
||||
else:
|
||||
raise NotImplementedError
|
||||
|
@ -1037,7 +1037,7 @@ def test_role_control_access(login_first, oidc_settings, oidc_client, simple_use
|
|||
algs = ['RS256', 'ES256']
|
||||
elif oidc_client.idtoken_algo == oidc_client.ALGO_HMAC:
|
||||
k = base64.b64encode(oidc_client.client_secret.encode('utf-8'))
|
||||
key = JWK(kty='oct', k=force_text(k))
|
||||
key = JWK(kty='oct', k=force_str(k))
|
||||
algs = ['HS256']
|
||||
else:
|
||||
raise NotImplementedError
|
||||
|
@ -1152,7 +1152,7 @@ def test_claim_default_value(oidc_settings, normal_oidc_client, simple_user, app
|
|||
id_token = response.json['id_token']
|
||||
|
||||
k = base64.b64encode(oidc_client.client_secret.encode('utf-8'))
|
||||
key = JWK(kty='oct', k=force_text(k))
|
||||
key = JWK(kty='oct', k=force_str(k))
|
||||
jwt = JWT(jwt=id_token, key=key)
|
||||
claims = json.loads(jwt.claims)
|
||||
|
||||
|
@ -1255,7 +1255,7 @@ def test_claim_templated(oidc_settings, normal_oidc_client, simple_user, app):
|
|||
id_token = response.json['id_token']
|
||||
|
||||
k = base64.b64encode(oidc_client.client_secret.encode('utf-8'))
|
||||
key = JWK(kty='oct', k=force_text(k))
|
||||
key = JWK(kty='oct', k=force_str(k))
|
||||
jwt = JWT(jwt=id_token, key=key)
|
||||
claims = json.loads(jwt.claims)
|
||||
|
||||
|
@ -1365,7 +1365,7 @@ def test_credentials_grant(app, oidc_client, admin, simple_user):
|
|||
token_url = make_url('oidc-token')
|
||||
if oidc_client.idtoken_algo == OIDCClient.ALGO_HMAC:
|
||||
k = base64url(oidc_client.client_secret.encode('utf-8'))
|
||||
jwk = JWK(kty='oct', k=force_text(k))
|
||||
jwk = JWK(kty='oct', k=force_str(k))
|
||||
elif oidc_client.idtoken_algo == OIDCClient.ALGO_RSA:
|
||||
jwk = get_first_rsa_sig_key()
|
||||
elif oidc_client.idtoken_algo == OIDCClient.ALGO_EC:
|
||||
|
|
|
@ -23,7 +23,7 @@ from uuid import UUID
|
|||
import pytest
|
||||
from django.contrib.auth import get_user_model
|
||||
from django.urls import reverse
|
||||
from django.utils.encoding import force_text
|
||||
from django.utils.encoding import force_str
|
||||
from django.utils.timezone import now
|
||||
from jwcrypto.jwk import JWK
|
||||
from jwcrypto.jwt import JWT
|
||||
|
@ -205,7 +205,7 @@ def test_login_profile_selection(app, oidc_client, profile_user, profile_setting
|
|||
assert access_token
|
||||
id_token = response.json['id_token']
|
||||
k = base64.b64encode(oidc_client.client_secret.encode('utf-8'))
|
||||
key = JWK(kty='oct', k=force_text(k))
|
||||
key = JWK(kty='oct', k=force_str(k))
|
||||
algs = ['HS256']
|
||||
jwt = JWT(jwt=id_token, key=key, algs=algs)
|
||||
claims = json.loads(jwt.claims)
|
||||
|
@ -275,7 +275,7 @@ def test_login_implicit(app, oidc_client, profile_user, profile_settings):
|
|||
access_token = query['access_token'][0]
|
||||
id_token = query['id_token'][0]
|
||||
k = base64.b64encode(oidc_client.client_secret.encode('utf-8'))
|
||||
key = JWK(kty='oct', k=force_text(k))
|
||||
key = JWK(kty='oct', k=force_str(k))
|
||||
algs = ['HS256']
|
||||
jwt = JWT(jwt=id_token, key=key, algs=algs)
|
||||
claims = json.loads(jwt.claims)
|
||||
|
@ -450,7 +450,7 @@ def test_modify_user_info_hook(app, oidc_client, profile_settings, profile_user,
|
|||
)
|
||||
id_token = response.json['id_token']
|
||||
k = base64.b64encode(oidc_client.client_secret.encode('utf-8'))
|
||||
key = JWK(kty='oct', k=force_text(k))
|
||||
key = JWK(kty='oct', k=force_str(k))
|
||||
algs = ['HS256']
|
||||
jwt = JWT(jwt=id_token, key=key, algs=algs)
|
||||
claims = json.loads(jwt.claims)
|
||||
|
@ -468,7 +468,7 @@ def test_profile_selection_user_credentials_grant(app, oidc_client, admin, simpl
|
|||
|
||||
if oidc_client.idtoken_algo == OIDCClient.ALGO_HMAC:
|
||||
k = base64url(oidc_client.client_secret.encode('utf-8'))
|
||||
jwk = JWK(kty='oct', k=force_text(k))
|
||||
jwk = JWK(kty='oct', k=force_str(k))
|
||||
elif oidc_client.idtoken_algo == OIDCClient.ALGO_RSA:
|
||||
jwk = get_first_rsa_sig_key()
|
||||
elif oidc_client.idtoken_algo == OIDCClient.ALGO_EC:
|
||||
|
|
|
@ -28,7 +28,7 @@ from django.test import TestCase
|
|||
from django.test.client import Client
|
||||
from django.test.utils import override_settings
|
||||
from django.urls import reverse
|
||||
from django.utils.encoding import force_text
|
||||
from django.utils.encoding import force_str
|
||||
from django.utils.translation import gettext as _
|
||||
from rest_framework import status, test
|
||||
|
||||
|
@ -463,7 +463,7 @@ class APITest(TestCase):
|
|||
activation_url = get_link_from_mail(mail.outbox[-1])
|
||||
response = client.get(activation_url, follow=True)
|
||||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||||
assert utils_misc.make_url(return_url, params={'token': token}) in force_text(response.content)
|
||||
assert utils_misc.make_url(return_url, params={'token': token}) in force_str(response.content)
|
||||
self.assertEqual(User.objects.count(), user_count + 1)
|
||||
response = client.get(reverse('auth_homepage'))
|
||||
self.assertContains(response, username)
|
||||
|
@ -569,7 +569,7 @@ class APITest(TestCase):
|
|||
activation_url = get_link_from_mail(mail.outbox[0])
|
||||
response = client.get(activation_url, follow=True)
|
||||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||||
assert utils_misc.make_url(return_url, params={'token': token}) in force_text(response.content)
|
||||
assert utils_misc.make_url(return_url, params={'token': token}) in force_str(response.content)
|
||||
self.assertEqual(User.objects.count(), user_count + 1)
|
||||
response = client.get(reverse('auth_homepage'))
|
||||
self.assertContains(response, username)
|
||||
|
@ -669,7 +669,7 @@ class APITest(TestCase):
|
|||
activation_url = get_link_from_mail(activation_mail1)
|
||||
response = client.get(activation_url, follow=True)
|
||||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||||
assert utils_misc.make_url(return_url, params={'token': token}) in force_text(response.content)
|
||||
assert utils_misc.make_url(return_url, params={'token': token}) in force_str(response.content)
|
||||
self.assertEqual(User.objects.count(), user_count + 1)
|
||||
response = client.get(reverse('auth_homepage'))
|
||||
self.assertContains(response, username)
|
||||
|
|
|
@ -28,7 +28,7 @@ from django.core.exceptions import ValidationError
|
|||
from django.db import IntegrityError, transaction
|
||||
from django.http import QueryDict
|
||||
from django.urls import reverse
|
||||
from django.utils.encoding import force_str, force_text
|
||||
from django.utils.encoding import force_str
|
||||
from django.utils.timezone import now, utc
|
||||
from httmock import HTTMock, urlmatch
|
||||
from jwcrypto.common import base64url_decode, base64url_encode, json_encode
|
||||
|
@ -289,7 +289,7 @@ def oidc_provider_mock(
|
|||
else: # hmac
|
||||
jwt = JWT(header={'alg': 'HS256'}, claims=id_token)
|
||||
k = base64url_encode(oidc_provider.client_secret.encode('utf-8'))
|
||||
jwt.make_signed_token(JWK(kty='oct', k=force_text(k)))
|
||||
jwt.make_signed_token(JWK(kty='oct', k=force_str(k)))
|
||||
|
||||
content = {
|
||||
'access_token': '1234',
|
||||
|
|
|
@ -19,7 +19,7 @@ import urllib.parse
|
|||
from django.contrib.auth import get_user_model
|
||||
from django.test.client import Client, RequestFactory
|
||||
from django.test.utils import override_settings
|
||||
from django.utils.encoding import force_text
|
||||
from django.utils.encoding import force_str
|
||||
|
||||
from authentic2.a2_rbac.models import Role
|
||||
from authentic2.a2_rbac.utils import get_default_ou
|
||||
|
@ -106,7 +106,7 @@ class CasTests(Authentic2TestCase):
|
|||
client = Client()
|
||||
response = client.get('/idp/cas/login')
|
||||
self.assertEqual(response.status_code, 400)
|
||||
self.assertIn('no service', force_text(response.content))
|
||||
self.assertIn('no service', force_str(response.content))
|
||||
response = client.get('/idp/cas/login', {constants.SERVICE_PARAM: 'http://google.com/'})
|
||||
self.assertRedirectsComplex(response, 'http://google.com/')
|
||||
response = client.get(
|
||||
|
@ -144,7 +144,7 @@ class CasTests(Authentic2TestCase):
|
|||
user=self.user,
|
||||
service=self.service,
|
||||
)
|
||||
self.assertIn('https://casclient.com/loser/', force_text(response.content))
|
||||
self.assertIn('https://casclient.com/loser/', force_str(response.content))
|
||||
|
||||
def test_role_access_control_granted(self):
|
||||
client = Client()
|
||||
|
@ -222,7 +222,7 @@ class CasTests(Authentic2TestCase):
|
|||
)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertEqual(response['content-type'], 'text/plain')
|
||||
self.assertEqual(force_text(response.content), 'yes\n%s\n' % self.LOGIN)
|
||||
self.assertEqual(force_str(response.content), 'yes\n%s\n' % self.LOGIN)
|
||||
# Verify ticket has been deleted
|
||||
with self.assertRaises(Ticket.DoesNotExist):
|
||||
Ticket.objects.get()
|
||||
|
|
|
@ -29,7 +29,7 @@ from django.contrib.auth import REDIRECT_FIELD_NAME
|
|||
from django.core.files import File
|
||||
from django.template import Context, Template
|
||||
from django.urls import reverse
|
||||
from django.utils.encoding import force_bytes, force_str, force_text
|
||||
from django.utils.encoding import force_bytes, force_str
|
||||
from django.utils.translation import gettext as _
|
||||
|
||||
from authentic2.a2_rbac.models import OrganizationalUnit, Role
|
||||
|
@ -418,10 +418,10 @@ class Scenario:
|
|||
name_id = login.assertion.subject.nameID
|
||||
if self.sp.default_name_id_format == 'username':
|
||||
assert name_id.format == lasso.SAML2_NAME_IDENTIFIER_FORMAT_UNSPECIFIED
|
||||
assert force_text(name_id.content) == user.username
|
||||
assert force_str(name_id.content) == user.username
|
||||
elif self.sp.default_name_id_format == 'uuid':
|
||||
assert name_id.format == lasso.SAML2_NAME_IDENTIFIER_FORMAT_UNSPECIFIED
|
||||
assert force_text(name_id.content) == user.uuid
|
||||
assert force_str(name_id.content) == user.uuid
|
||||
else:
|
||||
raise NotImplementedError(
|
||||
'unknown default_name_id_format %s' % self.sp.default_name_id_format
|
||||
|
@ -677,7 +677,7 @@ def add_attributes(rf):
|
|||
func.nid_format,
|
||||
)
|
||||
return {
|
||||
at.name: {''.join(force_text(mtn.dump()) for mtn in atv.any) for atv in at.attributeValue}
|
||||
at.name: {''.join(force_str(mtn.dump()) for mtn in atv.any) for atv in at.attributeValue}
|
||||
for at in assertion.attributeStatement[0].attribute
|
||||
}
|
||||
|
||||
|
@ -820,7 +820,7 @@ def test_make_edu_person_targeted_id(db, settings, rf):
|
|||
assert edpt is not None
|
||||
node = lasso.Node.newFromXmlNode(force_str(ET.tostring(edpt)))
|
||||
assert isinstance(node, lasso.Saml2NameID)
|
||||
assert force_text(node.content) == '_A485C0ACEEF43A6D39145F5CFE25D9D3B6F15DC6443F412263C76D81C72DA8D5'
|
||||
assert force_str(node.content) == '_A485C0ACEEF43A6D39145F5CFE25D9D3B6F15DC6443F412263C76D81C72DA8D5'
|
||||
assert node.format == lasso.SAML2_NAME_IDENTIFIER_FORMAT_PERSISTENT
|
||||
|
||||
assert node.nameQualifier == 'http://testserver/idp/saml2/metadata'
|
||||
|
@ -853,7 +853,7 @@ def test_add_attributes_edu_person_targeted_id_nid_format(db, settings, rf, add_
|
|||
assert len(attributes[edu_name]) == 1
|
||||
node = lasso.Node.newFromXmlNode(force_str(list(attributes[edu_name])[0]))
|
||||
assert isinstance(node, lasso.Saml2NameID)
|
||||
assert force_text(node.content) == '_A485C0ACEEF43A6D39145F5CFE25D9D3B6F15DC6443F412263C76D81C72DA8D5'
|
||||
assert force_str(node.content) == '_A485C0ACEEF43A6D39145F5CFE25D9D3B6F15DC6443F412263C76D81C72DA8D5'
|
||||
assert node.format == lasso.SAML2_NAME_IDENTIFIER_FORMAT_PERSISTENT
|
||||
assert node.nameQualifier == 'http://testserver/idp/saml2/metadata'
|
||||
assert node.spNameQualifier == 'https://sp.com/'
|
||||
|
@ -885,7 +885,7 @@ def test_add_attributes_edu_person_targeted_id_attribute(db, settings, rf, add_a
|
|||
assert len(attributes['edupersontargetedid']) == 1
|
||||
node = lasso.Node.newFromXmlNode(force_str(list(attributes['edupersontargetedid'])[0]))
|
||||
assert isinstance(node, lasso.Saml2NameID)
|
||||
assert force_text(node.content) == '_A485C0ACEEF43A6D39145F5CFE25D9D3B6F15DC6443F412263C76D81C72DA8D5'
|
||||
assert force_str(node.content) == '_A485C0ACEEF43A6D39145F5CFE25D9D3B6F15DC6443F412263C76D81C72DA8D5'
|
||||
assert node.format == lasso.SAML2_NAME_IDENTIFIER_FORMAT_PERSISTENT
|
||||
assert node.nameQualifier == 'http://testserver/idp/saml2/metadata'
|
||||
assert node.spNameQualifier == 'https://sp.com/'
|
||||
|
|
|
@ -29,7 +29,7 @@ from django.core import mail, management
|
|||
from django.core.exceptions import ImproperlyConfigured
|
||||
from django.urls import reverse
|
||||
from django.utils import timezone
|
||||
from django.utils.encoding import force_bytes, force_text
|
||||
from django.utils.encoding import force_bytes, force_str
|
||||
from ldap.dn import escape_dn_chars
|
||||
from ldaptools.slapd import Slapd, has_slapd
|
||||
|
||||
|
@ -392,7 +392,7 @@ def test_simple_with_binddn(slapd, settings, client):
|
|||
settings.LDAP_AUTH_SETTINGS = [
|
||||
{
|
||||
'url': [slapd.ldap_url],
|
||||
'binddn': force_text(DN),
|
||||
'binddn': force_str(DN),
|
||||
'bindpw': PASS,
|
||||
'basedn': 'o=ôrga',
|
||||
'use_tls': False,
|
||||
|
@ -1004,8 +1004,8 @@ def test_reset_password_ldap_user(slapd, settings, app, db, caplog):
|
|||
settings.LDAP_AUTH_SETTINGS = [
|
||||
{
|
||||
'url': [slapd.ldap_url],
|
||||
'binddn': force_text(slapd.root_bind_dn),
|
||||
'bindpw': force_text(slapd.root_bind_password),
|
||||
'binddn': force_str(slapd.root_bind_dn),
|
||||
'bindpw': force_str(slapd.root_bind_password),
|
||||
'basedn': 'o=ôrga',
|
||||
'use_tls': False,
|
||||
'attributes': ['uid', 'carLicense'],
|
||||
|
@ -1069,8 +1069,8 @@ def test_reset_password_ldap_failure(slapd, settings, app, db, caplog):
|
|||
settings.LDAP_AUTH_SETTINGS = [
|
||||
{
|
||||
'url': [slapd.ldap_url],
|
||||
'binddn': force_text(slapd.root_bind_dn),
|
||||
'bindpw': force_text(slapd.root_bind_password),
|
||||
'binddn': force_str(slapd.root_bind_dn),
|
||||
'bindpw': force_str(slapd.root_bind_password),
|
||||
'basedn': 'o=ôrga',
|
||||
'use_tls': False,
|
||||
'attributes': ['uid', 'carLicense'],
|
||||
|
@ -1106,8 +1106,8 @@ def test_user_cannot_change_password(slapd, settings, app, db):
|
|||
settings.LDAP_AUTH_SETTINGS = [
|
||||
{
|
||||
'url': [slapd.ldap_url],
|
||||
'binddn': force_text(slapd.root_bind_dn),
|
||||
'bindpw': force_text(slapd.root_bind_password),
|
||||
'binddn': force_str(slapd.root_bind_dn),
|
||||
'bindpw': force_str(slapd.root_bind_password),
|
||||
'basedn': 'o=ôrga',
|
||||
'use_tls': False,
|
||||
'user_can_change_password': False,
|
||||
|
@ -1669,8 +1669,8 @@ def test_login_ppolicy_pwdExpireWarning(slapd_ppolicy, settings, app, db, caplog
|
|||
settings.LDAP_AUTH_SETTINGS = [
|
||||
{
|
||||
'url': [slapd_ppolicy.ldap_url],
|
||||
'binddn': force_text(slapd_ppolicy.root_bind_dn),
|
||||
'bindpw': force_text(slapd_ppolicy.root_bind_password),
|
||||
'binddn': force_str(slapd_ppolicy.root_bind_dn),
|
||||
'bindpw': force_str(slapd_ppolicy.root_bind_password),
|
||||
'basedn': 'o=ôrga',
|
||||
'use_tls': False,
|
||||
'attributes': ['carLicense'],
|
||||
|
@ -1779,7 +1779,7 @@ def test_ou_selector(slapd, settings, app, ou1):
|
|||
settings.LDAP_AUTH_SETTINGS = [
|
||||
{
|
||||
'url': [slapd.ldap_url],
|
||||
'binddn': force_text(DN),
|
||||
'binddn': force_str(DN),
|
||||
'bindpw': PASS,
|
||||
'basedn': 'o=ôrga',
|
||||
'ou_slug': ou1.slug,
|
||||
|
@ -1810,7 +1810,7 @@ def test_ou_selector_default_ou(slapd, settings, app, ou1):
|
|||
settings.LDAP_AUTH_SETTINGS = [
|
||||
{
|
||||
'url': [slapd.ldap_url],
|
||||
'binddn': force_text(DN),
|
||||
'binddn': force_str(DN),
|
||||
'bindpw': PASS,
|
||||
'basedn': 'o=ôrga',
|
||||
'use_tls': False,
|
||||
|
@ -2119,8 +2119,8 @@ def test_switch_user_ldap_user(slapd, settings, app, db, caplog):
|
|||
settings.LDAP_AUTH_SETTINGS = [
|
||||
{
|
||||
'url': [slapd.ldap_url],
|
||||
'binddn': force_text(slapd.root_bind_dn),
|
||||
'bindpw': force_text(slapd.root_bind_password),
|
||||
'binddn': force_str(slapd.root_bind_dn),
|
||||
'bindpw': force_str(slapd.root_bind_password),
|
||||
'basedn': 'o=ôrga',
|
||||
'use_tls': False,
|
||||
'attributes': ['carLicense'],
|
||||
|
@ -2210,7 +2210,7 @@ def test_technical_info_ldap(app, admin, superuser, slapd, settings, monkeypatch
|
|||
settings.LDAP_AUTH_SETTINGS = [
|
||||
{
|
||||
'url': [slapd.ldap_url],
|
||||
'binddn': force_text('cn=%s,o=ôrga' % escape_dn_chars('Étienne Michu')),
|
||||
'binddn': force_str('cn=%s,o=ôrga' % escape_dn_chars('Étienne Michu')),
|
||||
'bindpw': 'passé',
|
||||
'basedn': 'o=ôrga',
|
||||
'use_tls': False,
|
||||
|
|
|
@ -18,7 +18,7 @@ import json
|
|||
|
||||
import django_tables2 as tables
|
||||
from django.urls import reverse
|
||||
from django.utils.encoding import force_bytes, force_text
|
||||
from django.utils.encoding import force_bytes, force_str
|
||||
from webtest import Upload
|
||||
|
||||
from authentic2.a2_rbac.models import OrganizationalUnit, Role
|
||||
|
@ -44,7 +44,7 @@ def test_manager_role_export(app, admin, ou1, role_ou1, ou2, role_ou2):
|
|||
|
||||
export_response = response.click('CSV', href='/export/')
|
||||
reader = csv.reader(
|
||||
[force_text(line) for line in export_response.body.split(force_bytes('\r\n'))], delimiter=','
|
||||
[force_str(line) for line in export_response.body.split(force_bytes('\r\n'))], delimiter=','
|
||||
)
|
||||
rows = [row for row in reader]
|
||||
|
||||
|
@ -65,7 +65,7 @@ def test_manager_role_export(app, admin, ou1, role_ou1, ou2, role_ou2):
|
|||
|
||||
export_response = search_response.click('CSV', href='/export/')
|
||||
reader = csv.reader(
|
||||
[force_text(line) for line in export_response.body.split(force_bytes('\r\n'))], delimiter=','
|
||||
[force_str(line) for line in export_response.body.split(force_bytes('\r\n'))], delimiter=','
|
||||
)
|
||||
rows = [row for row in reader]
|
||||
|
||||
|
|
|
@ -48,7 +48,7 @@ from django.core.management import call_command as django_call_command
|
|||
from django.shortcuts import resolve_url
|
||||
from django.test import TestCase
|
||||
from django.urls import reverse
|
||||
from django.utils.encoding import force_text, iri_to_uri
|
||||
from django.utils.encoding import force_str, iri_to_uri
|
||||
from lxml import etree
|
||||
|
||||
from authentic2.apps.journal.models import Event
|
||||
|
@ -99,7 +99,7 @@ def logout(app):
|
|||
def basic_authorization_header(user, password=None):
|
||||
cred = '%s:%s' % (user.username, password or user.username)
|
||||
b64_cred = base64.b64encode(cred.encode('utf-8'))
|
||||
return {'Authorization': 'Basic %s' % str(force_text(b64_cred))}
|
||||
return {'Authorization': 'Basic %s' % str(force_str(b64_cred))}
|
||||
|
||||
|
||||
def get_response_form(response, form='form'):
|
||||
|
|
Loading…
Reference in New Issue