misc: remove six module usage (#52503)

This commit is contained in:
Valentin Deniaud 2021-03-30 14:10:43 +02:00
parent 8b506d4281
commit 4751846fed
82 changed files with 296 additions and 343 deletions

1
debian/control vendored
View File

@ -29,7 +29,6 @@ Depends: ${misc:Depends}, ${python3:Depends},
python3-djangorestframework (<< 3.10),
python3-markdown (>= 2.1),
python3-ldap (>= 2.4),
python3-six (>= 1.0),
python3-jwcrypto (>= 0.3.1),
python3-cryptography (>= 1.3.4),
python3-django-filters (>= 1),

View File

@ -9,7 +9,6 @@ XStatic_jQuery python3-xstatic-jquery
XStatic_jquery_ui python3-xstatic-jquery-ui
django-import-export python3-django-import-export
django-sekizai python3-django-sekizai
six python3-six
pycryptodome python3-pycryptodome
ldaptools python3-ldaptools
django-mellon python3-django-mellon

View File

@ -23,7 +23,6 @@ The other Authentic 2 dependencies are:
- gadjo>=0.6
- django-import-export>=0.2.7,<=0.4.5
- djangorestframework>=3.3
- six>=1.9
- Markdown>=2.5
- python-ldap

View File

@ -129,7 +129,6 @@ setup(
'gadjo>=0.53',
'django-import-export>=1,<2',
'djangorestframework>=3.3,<3.10',
'six>=1',
'Markdown>=2.1',
'python-ldap',
'django-filter>1,<2.3',

View File

@ -15,7 +15,6 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from django.contrib import admin
from django.utils import six
from django.utils.translation import ugettext_lazy as _
from . import models
@ -90,7 +89,7 @@ class PermissionAdmin(admin.ModelAdmin):
list_select_related = True
def name(self, obj):
return six.text_type(obj)
return str(obj)
name.short_description = _('name')

View File

@ -15,7 +15,6 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from django.contrib.contenttypes.models import ContentType
from django.utils import six
from django.utils.text import slugify
from django.utils.translation import ugettext
from django.utils.translation import ugettext_lazy as _
@ -41,9 +40,9 @@ def update_ou_admin_roles(ou):
# do not create scoped admin roles if the model is not scopable
if not ou_model:
continue
name = six.text_type(MANAGED_CT[key]['name'])
name = str(MANAGED_CT[key]['name'])
slug = '_a2-' + slugify(name)
scoped_name = six.text_type(MANAGED_CT[key]['scoped_name'])
scoped_name = str(MANAGED_CT[key]['scoped_name'])
name = scoped_name.format(ou=ou)
ou_slug = slug + '-' + ou.slug
if app_settings.MANAGED_CONTENT_TYPES == ():
@ -123,7 +122,7 @@ def update_content_types_roles():
if ct_tuple not in MANAGED_CT:
continue
# General admin role
name = six.text_type(MANAGED_CT[ct_tuple]['name'])
name = str(MANAGED_CT[ct_tuple]['name'])
slug = '_a2-' + slugify(name)
if (
app_settings.MANAGED_CONTENT_TYPES is not None

View File

@ -3,7 +3,6 @@
from __future__ import unicode_literals
from django.db import migrations
from django.utils.six import text_type
from authentic2.a2_rbac.models import MANAGE_MEMBERS_OP
from django_rbac.models import CHANGE_OP
@ -14,8 +13,8 @@ def update_self_administration_perm(apps, schema_editor):
Permission = apps.get_model('a2_rbac', 'Permission')
Operation = apps.get_model('django_rbac', 'Operation')
ContentType = apps.get_model('contenttypes', 'ContentType')
change_op, _ = Operation.objects.get_or_create(slug=text_type(CHANGE_OP.slug))
manage_members_op, _ = Operation.objects.get_or_create(slug=text_type(MANAGE_MEMBERS_OP.slug))
change_op, _ = Operation.objects.get_or_create(slug=str(CHANGE_OP.slug))
manage_members_op, _ = Operation.objects.get_or_create(slug=str(MANAGE_MEMBERS_OP.slug))
ct = ContentType.objects.get_for_model(Role)
perms_to_delete = []
for role in Role.objects.all():

View File

@ -20,7 +20,6 @@ from django.contrib.contenttypes.models import ContentType
from django.core.exceptions import ValidationError
from django.core.validators import MinValueValidator
from django.db import models
from django.utils import six
from django.utils.text import slugify
from django.utils.translation import pgettext_lazy
from django.utils.translation import ugettext_lazy as _
@ -248,8 +247,8 @@ class Role(RoleAbstractBase):
admin_role = self.__class__.objects.get_admin_role(
self,
name=_('Managers of role "{role}"').format(role=six.text_type(self)),
slug='_a2-managers-of-role-{role}'.format(role=slugify(six.text_type(self))),
name=_('Managers of role "{role}"').format(role=str(self)),
slug='_a2-managers-of-role-{role}'.format(role=slugify(str(self))),
permissions=(view_user_perm,),
self_administered=True,
update_name=True,

View File

@ -16,7 +16,6 @@
import sys
import six
from django.core.exceptions import ImproperlyConfigured
from django.utils.translation import ugettext_lazy as _
@ -26,7 +25,7 @@ class Setting(object):
def __init__(self, default=SENTINEL, definition='', names=None):
self.names = names or []
if isinstance(self.names, six.string_types):
if isinstance(self.names, str):
self.names = [self.names]
self.names = set(self.names)
self.default = default

View File

@ -28,7 +28,7 @@ from django.core.files.storage import default_storage
from django.core.validators import RegexValidator
from django.db.models import query
from django.urls import reverse
from django.utils import formats, html, six
from django.utils import formats, html
from django.utils.functional import keep_lazy
from django.utils.translation import pgettext_lazy
from django.utils.translation import ugettext_lazy as _
@ -42,7 +42,7 @@ from .forms import fields, widgets
from .plugins import collect_from_plugins
@keep_lazy(six.text_type)
@keep_lazy(str)
def capfirst(value):
return value and value[0].upper() + value[1:]
@ -75,7 +75,7 @@ class DateRestField(serializers.DateField):
# Test for the empty string here so that it does not get validated,
# and so that subclasses do not need to handle it explicitly
# inside the `to_internal_value()` method.
if data == '' or (self.trim_whitespace and six.text_type(data).strip() == ''):
if data == '' or (self.trim_whitespace and str(data).strip() == ''):
if not self.allow_blank:
self.fail('blank')
return ''

View File

@ -16,11 +16,8 @@
import abc
from django.utils import six
@six.add_metaclass(abc.ABCMeta)
class BaseAttributeSource(object):
class BaseAttributeSource(object, metaclass=abc.ABCMeta):
"""
Base class for attribute sources
"""

View File

@ -15,7 +15,6 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from django.contrib.auth import get_user_model
from django.utils import six
from django.utils.translation import ugettext_lazy as _
from django_rbac.utils import get_role_model
@ -91,7 +90,7 @@ def get_attributes(instance, ctx):
ctx['django_user_' + str(av.attribute.name)] = serialized
ctx['django_user_' + str(av.attribute.name) + ':verified'] = av.verified
ctx['django_user_groups'] = [group for group in user.groups.all()]
ctx['django_user_group_names'] = [six.text_type(group) for group in user.groups.all()]
ctx['django_user_group_names'] = [str(group) for group in user.groups.all()]
if user.username:
splitted = user.username.rsplit('@', 1)
ctx['django_user_domain'] = splitted[1] if '@' in user.username else ''

View File

@ -14,7 +14,6 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import six
from django.core.exceptions import ImproperlyConfigured
from ...decorators import to_list
@ -62,7 +61,7 @@ def get_instances(ctx):
config_error(UNEXPECTED_KEYS_ERROR, unexpected)
if 'name' not in keys or 'template' not in keys:
config_error(BAD_CONFIG_ERROR)
if not isinstance(d['template'], six.string_types):
if not isinstance(d['template'], str):
config_error(TYPE_ERROR)
yield d

View File

@ -16,8 +16,6 @@
import inspect
from django.utils import six
try:
from django.utils.deprecation import CallableTrue
except ImportError:
@ -71,8 +69,7 @@ class Authentic2Authentication(BasicAuthentication):
pass
# try BasicAuthentication
if (
six.PY3
and 'request'
'request'
in inspect.signature(super(Authentic2Authentication, self).authenticate_credentials).parameters
):
# compatibility with DRF 3.4

View File

@ -15,6 +15,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import hashlib
import urllib.parse
try:
import ldap
@ -36,6 +37,7 @@ import logging
import os
import random
import time
import urllib.parse
from django.conf import settings
from django.contrib import messages
@ -43,9 +45,7 @@ from django.contrib.auth import get_user_model
from django.contrib.auth.models import Group
from django.core.cache import cache
from django.core.exceptions import ImproperlyConfigured
from django.utils import six
from django.utils.encoding import force_bytes, force_text
from django.utils.six.moves.urllib import parse as urlparse
from django.utils.translation import ngettext
from django.utils.translation import ugettext as _
@ -290,7 +290,7 @@ elif PYTHON_LDAP3 is False:
def map_text(d):
if d is None:
return d
elif isinstance(d, six.string_types):
elif isinstance(d, str):
return force_text(d)
elif isinstance(d, (list, tuple)):
return d.__class__(map_text(x) for x in d)
@ -1031,7 +1031,7 @@ class LDAPBackend(object):
'''Obtain a Django role'''
kwargs = {}
slug = None
if isinstance(role_id, six.string_types):
if isinstance(role_id, str):
slug = role_id
elif isinstance(role_id, (tuple, list)):
try:
@ -1302,7 +1302,7 @@ class LDAPBackend(object):
attribute, param = attribute.split(':')
quote = 'noquote' not in param.split(',')
if quote:
decoded.append((attribute, urlparse.unquote(value)))
decoded.append((attribute, urllib.parse.unquote(value)))
else:
decoded.append((attribute, force_text(value)))
filters = [filter_format(u'(%s=%s)', (a, b)) for a, b in decoded]
@ -1322,7 +1322,7 @@ class LDAPBackend(object):
if isinstance(part, list):
part = part[0]
if quote:
part = urlparse.quote(part.encode('utf-8'))
part = urllib.parse.quote(part.encode('utf-8'))
parts.append(part)
return ' '.join(part for part in parts)
@ -1676,7 +1676,7 @@ class LDAPBackend(object):
# convert string to list of strings for settings accepting it
for i in cls._TO_ITERABLE:
if i in block and isinstance(block[i], six.string_types):
if i in block and isinstance(block[i], str):
block[i] = (block[i],)
for d in cls._DEFAULTS:
@ -1685,8 +1685,8 @@ class LDAPBackend(object):
if d == 'user_filter' and app_settings.A2_ACCEPT_EMAIL_AUTHENTICATION:
block[d] = '(|(mail=%s)(uid=%s))'
else:
if isinstance(cls._DEFAULTS[d], six.string_types):
if not isinstance(block[d], six.string_types):
if isinstance(cls._DEFAULTS[d], str):
if not isinstance(block[d], str):
raise ImproperlyConfigured('LDAP_AUTH_SETTINGS: attribute %r must be a string' % d)
try:
block[d] = force_text(block[d])
@ -1710,7 +1710,7 @@ class LDAPBackend(object):
for key in cls._TO_LOWERCASE:
# we handle strings, list of strings and list of list or tuple whose first element is a
# string
if isinstance(block[key], six.string_types):
if isinstance(block[key], str):
block[key] = force_text(block[key]).lower()
elif isinstance(block[key], (list, tuple)):
new_seq = []

View File

@ -16,10 +16,11 @@
from __future__ import unicode_literals
import functools
from django.contrib.auth import get_user_model
from django.contrib.auth.backends import ModelBackend
from django.db import models
from django.utils import six
from authentic2.backends import get_user_queryset
from authentic2.user_login_failure import user_login_failure, user_login_success
@ -58,7 +59,7 @@ class ModelBackend(ModelBackend):
queries.append(models.Q(**{username_field: upn(username, realm)}))
else:
queries.append(models.Q(**{username_field: upn(username, realm)}))
queries = six.moves.reduce(models.Q.__or__, queries)
queries = functools.reduce(models.Q.__or__, queries)
if ou:
queries &= models.Q(ou=ou)
return queries

View File

@ -15,8 +15,9 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import urllib.parse
from django.conf import settings
from django.utils.six.moves.urllib import parse as urlparse
from . import app_settings, plugins
from .decorators import SessionCache
@ -24,7 +25,7 @@ from .decorators import SessionCache
def make_origin(url):
'''Build origin of an URL'''
parsed = urlparse.urlparse(url)
parsed = urllib.parse.urlparse(url)
if ':' in parsed.netloc:
host, port = parsed.netloc.split(':', 1)
if parsed.scheme == 'http' and port == 80:

View File

@ -26,7 +26,6 @@ from Cryptodome.Hash import HMAC, SHA256
from Cryptodome.Protocol.KDF import PBKDF2
from django.utils.crypto import constant_time_compare
from django.utils.encoding import force_bytes
from django.utils.six import text_type
class DecryptionError(Exception):
@ -123,7 +122,7 @@ def aes_base64url_deterministic_encrypt(key, data, salt, hash_name='sha256', cou
key_size = 16
hmac_size = key_size
if isinstance(salt, text_type):
if isinstance(salt, str):
salt = force_bytes(salt)
iv = hashmod.new(salt).digest()
@ -174,7 +173,7 @@ def aes_base64url_deterministic_decrypt(key, urlencoded, salt, raise_on_error=Tr
if not crypted or not hmac or prf(key, crypted)[:hmac_size] != hmac:
raise DecryptionError('invalid HMAC')
if isinstance(salt, text_type):
if isinstance(salt, str):
salt = force_bytes(salt)
iv = hashmod.new(salt).digest()

View File

@ -27,7 +27,6 @@ from django.core.exceptions import FieldDoesNotExist, ValidationError
from django.core.validators import RegexValidator
from django.db import IntegrityError, models
from django.db.transaction import atomic
from django.utils import six
from django.utils.encoding import force_bytes, force_text
from django.utils.translation import ugettext as _
@ -109,9 +108,9 @@ class CsvImporter(object):
encoding = None
def run(self, fd_or_str, encoding):
if isinstance(fd_or_str, six.binary_type):
if isinstance(fd_or_str, bytes):
input_fd = io.BytesIO(fd_or_str)
elif isinstance(fd_or_str, six.text_type):
elif isinstance(fd_or_str, str):
input_fd = io.StringIO(fd_or_str)
elif not hasattr(fd_or_str, 'read1'):
try:
@ -122,7 +121,7 @@ class CsvImporter(object):
except Exception:
pass
content = fd_or_str.read()
if isinstance(content, six.text_type):
if isinstance(content, str):
input_fd = io.StringIO(content)
else:
input_fd = io.BytesIO(content)
@ -539,7 +538,7 @@ class UserCsvImporter(object):
form.is_valid()
def get_form_errors(form, name):
return [Error('data-error', six.text_type(value)) for value in form.errors.get(name, [])]
return [Error('data-error', str(value)) for value in form.errors.get(name, [])]
cells = [
CsvCell(

View File

@ -24,7 +24,7 @@ import os
from django.core.exceptions import MultipleObjectsReturned, ValidationError
from django.core.mail import send_mail
from django.db import models, transaction
from django.utils import six, timezone
from django.utils import timezone
from django.utils.translation import ugettext_lazy as _
try:
@ -233,7 +233,7 @@ class User(AbstractBaseUser, PermissionMixin):
return '%s (%s)' % (human_name, short_id)
def __repr__(self):
return '<User: %r>' % six.text_type(self)
return '<User: %r>' % str(self)
def clean(self):
if not (self.username or self.email or (self.first_name and self.last_name)):

View File

@ -25,7 +25,6 @@ from json import dumps as json_dumps
from django.core.cache import cache as django_cache
from django.core.exceptions import ValidationError
from django.http import Http404, HttpResponse, HttpResponseBadRequest, HttpResponseForbidden
from django.utils import six
from django.views.debug import technical_404_response
from . import app_settings, middleware
@ -195,12 +194,12 @@ class CacheDecoratorBase(object):
for i, arg in enumerate(args):
if self.args and i not in self.args:
continue
parts.append(six.text_type(arg))
parts.append(str(arg))
for kw, arg in sorted(kwargs.items(), key=lambda x: x[0]):
if kw not in self.kwargs:
continue
parts.append(u'%s-%s' % (six.text_type(kw), six.text_type(arg)))
parts.append(u'%s-%s' % (str(kw), str(arg)))
return u'|'.join(parts)

View File

@ -23,13 +23,13 @@
import logging
import urllib.parse
from xml.dom.minidom import parseString
from django.conf.urls import url
from django.http import HttpResponseRedirect
from django.urls import reverse
from django.utils.http import urlquote
from django.utils.six.moves.urllib import parse as urlparse
from django.utils.translation import ugettext as _
from authentic2 import settings
@ -117,21 +117,21 @@ def get_disco_return_url_from_metadata(entity_id):
def is_param_id_in_return_url(return_url, returnIDParam):
url = urlparse.urlparse(return_url)
if url.query and returnIDParam in urlparse.parse_qs(url.query):
url = urllib.parse.urlparse(return_url)
if url.query and returnIDParam in urllib.parse.parse_qs(url.query):
return True
return False
def add_param_to_url(url, param_name, value):
scheme, netloc, path, params, query, fragment = urlparse.urlparse(url)
scheme, netloc, path, params, query, fragment = urllib.parse.urlparse(url)
if query:
qs = urlparse.parse_qs(query)
qs = urllib.parse.parse_qs(query)
qs[param_name] = [value]
query = urlparse.urlencode(qs)
else:
query = '%s=%s' % (param_name, value)
return urlparse.urlunparse((scheme, netloc, path, params, query, fragment))
return urllib.parse.urlunparse((scheme, netloc, path, params, query, fragment))
def disco(request):

View File

@ -19,7 +19,6 @@ import logging
import time
from django.core.cache import cache
from django.utils import six
class ExponentialRetryTimeout(object):
@ -45,7 +44,7 @@ class ExponentialRetryTimeout(object):
self.key_prefix = key_prefix
def key(self, keys):
key = u'-'.join(six.text_type(key) for key in keys)
key = u'-'.join(str(key) for key in keys)
key = key.encode('utf-8')
return '%s%s' % (self.key_prefix or self.KEY_PREFIX, hashlib.md5(key).hexdigest())

View File

@ -14,15 +14,15 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import functools
import logging
import operator
import random
from urllib.parse import quote
from django.db.models import Q
from django.template.loader import render_to_string
from django.urls import reverse
from django.utils import six
from django.utils.six.moves.urllib.parse import quote
from django.utils.translation import ugettext as _
import authentic2.idp.saml.saml2_endpoints as saml2_endpoints
@ -68,7 +68,7 @@ class SamlBackend(object):
queries.append(
q.filter(sp_options_policy__isnull=True, liberty_provider__entity_id__in=sessions_eids)
)
qs = six.moves.reduce(operator.__or__, queries)
qs = functools.reduce(operator.__or__, queries)
# do some prefetching
qs = qs.prefetch_related('liberty_provider')
qs = qs.select_related('sp_options_policy')

View File

@ -41,6 +41,7 @@ import random
import string
import xml.etree.cElementTree as ctree
from functools import wraps
from urllib.parse import quote, urlencode
from django.conf import settings
from django.contrib import messages
@ -50,9 +51,7 @@ from django.core.exceptions import ObjectDoesNotExist
from django.http import HttpResponse, HttpResponseBadRequest, HttpResponseForbidden, HttpResponseRedirect
from django.shortcuts import redirect, render
from django.urls import reverse
from django.utils import six
from django.utils.encoding import force_bytes, force_str, force_text
from django.utils.six.moves.urllib.parse import quote, urlencode
from django.utils.translation import ugettext as _
from django.utils.translation import ugettext_noop as N_
from django.views.decorators.cache import never_cache
@ -821,7 +820,7 @@ def sso_after_process_request(
if not access_granted:
logger.debug('access denied, return answer to the requester')
set_saml2_response_responder_status_code(
login.response, lasso.SAML2_STATUS_CODE_REQUEST_DENIED, msg=six.text_type(dic['message'])
login.response, lasso.SAML2_STATUS_CODE_REQUEST_DENIED, msg=str(dic['message'])
)
return finish_sso(request, login)

View File

@ -16,8 +16,6 @@
import logging
from django.utils import six
class RequestContextFilter(logging.Filter):
DEFAULT_USERNAME = '-'
@ -45,7 +43,7 @@ class RequestContextFilter(logging.Filter):
if not hasattr(record, 'user'):
if hasattr(request, 'user') and request.user.is_authenticated:
record.user = six.text_type(request.user)
record.user = str(request.user)
else:
record.user = self.DEFAULT_USERNAME

View File

@ -27,7 +27,6 @@ from django.db import connection
from django.db.models import Count, Q
from django.db.models.functions import Lower
from django.db.transaction import atomic
from django.utils.six.moves import input
from django.utils.timezone import localtime
from authentic2 import app_settings

View File

@ -17,6 +17,7 @@
from __future__ import print_function
import logging
import urllib.parse
from datetime import timedelta
from django.conf import settings
@ -25,7 +26,6 @@ from django.core.management.base import BaseCommand
from django.db import transaction
from django.db.models import F
from django.utils import timezone, translation
from django.utils.six.moves.urllib import parse as urlparse
from authentic2.backends.ldap_backend import LDAPBackend
from authentic2.utils import send_templated_mail
@ -110,7 +110,7 @@ class Command(BaseCommand):
ctx = {
'user': user,
'days_to_deletion': days_to_deletion,
'login_url': urlparse.urljoin(settings.SITE_BASE_URL, settings.LOGIN_URL),
'login_url': urllib.parse.urljoin(settings.SITE_BASE_URL, settings.LOGIN_URL),
}
with transaction.atomic():
if not self.fake:

View File

@ -16,13 +16,13 @@
from __future__ import print_function
import io
import logging
import re
import sys
from django.contrib.auth import get_user_model
from django.core.management.base import BaseCommand
from django.utils import six
from ldap.dn import escape_dn_chars
from ldif import LDIFWriter
@ -48,7 +48,7 @@ class Command(BaseCommand):
def ldap(self, command, attrs):
self.logger.debug('received command %s %s', command, attrs)
if command == 'SEARCH':
out = six.BytesIO()
out = io.BytesIO()
ldif_writer = LDIFWriter(out)
qs = get_user_model().objects.all()
if attrs['filter'] != '(objectClass=*)':
@ -85,7 +85,7 @@ class Command(BaseCommand):
for user in qs:
o = {}
for user_attribute, ldap_attribute in MAPPING.items():
o[ldap_attribute] = [six.text_type(getattr(user, user_attribute)).encode('utf-8')]
o[ldap_attribute] = [str(getattr(user, user_attribute)).encode('utf-8')]
o['objectClass'] = ['inetOrgPerson']
dn = 'uid=%s,%s' % (escape_dn_chars(o['uid'][0]), attrs['suffix'])
self.logger.debug(u'sending entry %s %s', dn, o)

View File

@ -23,7 +23,6 @@ from django import forms
from django.contrib.auth import get_user_model
from django.contrib.contenttypes.models import ContentType
from django.core.exceptions import ValidationError
from django.utils import six
from django.utils.text import slugify
from django.utils.translation import pgettext, ugettext
from django.utils.translation import ugettext_lazy as _
@ -67,7 +66,7 @@ class SlugMixin(forms.ModelForm):
def save(self, commit=True):
instance = self.instance
if not instance.slug:
instance.slug = slugify(six.text_type(instance.name)).lstrip('_')
instance.slug = slugify(str(instance.name)).lstrip('_')
qs = instance.__class__.objects.all()
if instance.pk:
qs = qs.exclude(pk=instance.pk)
@ -441,7 +440,7 @@ class OUSearchForm(FormWithRequest):
if self.show_all_ou and (len(self.ou_qs) > 1 or self.search_all_ous):
choices.append(('all', all_ou_label))
for ou in self.ou_qs:
choices.append((str(ou.pk), six.text_type(ou)))
choices.append((str(ou.pk), str(ou)))
if self.show_none_ou and self.search_all_ous:
choices.append(('none', pgettext('organizational unit', 'None')))
@ -703,7 +702,7 @@ class UserImportForm(forms.Form):
@staticmethod
def raise_validation_error(error_message):
message_prefix = ugettext('Invalid import file')
raise forms.ValidationError('%s : %s' % (message_prefix, six.text_type(error_message)))
raise forms.ValidationError('%s : %s' % (message_prefix, str(error_message)))
class UserNewImportForm(UserImportForm):

View File

@ -21,7 +21,6 @@ from django.core.exceptions import PermissionDenied, ValidationError
from django.db import transaction
from django.http import HttpResponseRedirect
from django.urls import reverse
from django.utils import six
from django.utils.translation import ugettext as _
from django.views.generic import FormView
@ -68,7 +67,7 @@ class OrganizationalUnitDetailView(views.BaseDetailView):
@property
def title(self):
return six.text_type(self.object)
return str(self.object)
def authorize(self, request, *args, **kwargs):
super(OrganizationalUnitDetailView, self).authorize(request, *args, **kwargs)

View File

@ -15,7 +15,6 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from django.contrib.auth import get_user_model
from django.utils import six
from import_export.fields import Field
from import_export.resources import ModelResource
from import_export.widgets import Widget
@ -30,7 +29,7 @@ class ListWidget(Widget):
raise NotImplementedError
def render(self, value, object):
return u', '.join(six.text_type(v) for v in value.all())
return u', '.join(str(v) for v in value.all())
class UserResource(ModelResource):
@ -42,7 +41,7 @@ class UserResource(ModelResource):
result.add(role)
for pr in role.parent_relation.all():
result.add(pr.parent)
return ', '.join(six.text_type(x) for x in result)
return ', '.join(str(x) for x in result)
class Meta:
model = User

View File

@ -15,7 +15,6 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from django.contrib import messages
from django.utils import six
from django.utils.translation import ugettext as _
from authentic2.models import Service
@ -53,7 +52,7 @@ class ServiceView(
@property
def title(self):
return six.text_type(self.object)
return str(self.object)
def get_table_queryset(self):
return self.object.authorized_roles.all()

View File

@ -30,7 +30,6 @@ from atomicwrites import AtomicWriter
from django.conf import settings
from django.core.files.storage import default_storage
from django.db import connection
from django.utils import six
from django.utils.functional import cached_property
from django.utils.timezone import utc
from django.utils.translation import ugettext_lazy as _
@ -225,7 +224,7 @@ class Report(object):
logger.exception('error during report %s:%s run', self.user_import.uuid, self.uuid)
state = self.STATE_ERROR
try:
exception = six.text_type(e)
exception = str(e)
except Exception:
exception = repr(repr(e))
else:

View File

@ -26,7 +26,6 @@ from django.db import transaction
from django.forms import MediaDefiningClass
from django.http import Http404, HttpResponse
from django.urls import reverse, reverse_lazy
from django.utils import six
from django.utils.encoding import force_text
from django.utils.functional import cached_property
from django.utils.timezone import now
@ -62,8 +61,7 @@ class MultipleOUMixin(object):
return super(MultipleOUMixin, self).get_context_data(**kwargs)
@six.add_metaclass(MediaMixinBase)
class MediaMixin(object):
class MediaMixin(object, metaclass=MediaMixinBase):
'''Expose needed CSS and JS files as a media object'''
class Media:
@ -401,7 +399,7 @@ class ModelNameMixin(MediaMixin):
def get_instance_name(self):
if hasattr(self, 'get_object'):
return six.text_type(self.get_object())
return str(self.get_object())
return u''
def get_context_data(self, **kwargs):
@ -674,9 +672,9 @@ class MenuJson(HomepageView):
continue
menu_entries.append(
{
'label': six.text_type(entry['label']),
'label': str(entry['label']),
'slug': entry.get('slug', ''),
'url': request.build_absolute_uri(six.text_type(entry['href'])),
'url': request.build_absolute_uri(str(entry['href'])),
}
)
return menu_entries

View File

@ -15,12 +15,12 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import base64
import functools
import operator
import pickle
from django.contrib.auth import get_user_model
from django.core import signing
from django.utils import six
from django.utils.encoding import force_text
from django_select2.forms import ModelSelect2MultipleWidget, ModelSelect2Widget
@ -43,7 +43,7 @@ class SplitTermMixin(object):
queries = []
for term in term.split():
queries.append(super(SplitTermMixin, self).filter_queryset(term, queryset=qs))
qs = six.moves.reduce(self.split_term_operator, queries)
qs = functools.reduce(self.split_term_operator, queries)
return qs
@ -101,7 +101,7 @@ class SearchRoleWidgetMixin(SplitTermMixin):
]
def label_from_instance(self, obj):
label = six.text_type(obj)
label = str(obj)
if obj.ou and utils.get_ou_count() > 1:
label = u'{ou} - {obj}'.format(ou=obj.ou, obj=obj)
return label

View File

@ -21,12 +21,13 @@ try:
except ImportError:
threading = None
import urllib.parse
from django import http
from django.conf import settings
from django.contrib import messages
from django.utils.deprecation import MiddlewareMixin
from django.utils.functional import SimpleLazyObject
from django.utils.six.moves.urllib import parse as urlparse
from django.utils.translation import ugettext as _
from . import app_settings, plugins, utils
@ -162,10 +163,10 @@ class DisplayMessageBeforeRedirectMiddleware(MiddlewareMixin):
return response
if not getattr(response, 'display_message', True):
return response
parsed_url = urlparse.urlparse(url)
parsed_url = urllib.parse.urlparse(url)
if not parsed_url.scheme and not parsed_url.netloc:
return response
parsed_request_url = urlparse.urlparse(request.build_absolute_uri())
parsed_request_url = urllib.parse.urlparse(request.build_absolute_uri())
if (parsed_request_url.scheme == parsed_url.scheme or not parsed_url.scheme) and (
parsed_request_url.netloc == parsed_url.netloc
):

View File

@ -1,7 +1,6 @@
import itertools
from django.db.migrations.operations.base import Operation
from django.utils import six
class CreatePartialIndexes(Operation):
@ -64,10 +63,10 @@ class CreatePartialIndexes(Operation):
for clause in where:
if isinstance(clause, tuple):
clause, params = clause
assert isinstance(clause, six.string_types)
assert isinstance(clause, str)
assert isinstance(params, tuple)
clause = clause % tuple(schema_editor.quote_value(param) for param in params)
assert isinstance(clause, six.string_types)
assert isinstance(clause, str)
clauses.append(clause)
where_clause = ' AND '.join(clauses)
# SQLite does not accept parameters in partial index creations, don't ask why :/

View File

@ -16,6 +16,7 @@
import datetime
import time
import urllib.parse
import uuid
import django
@ -27,9 +28,8 @@ from django.contrib.postgres.search import SearchVectorField
from django.core.exceptions import ValidationError
from django.db import models, transaction
from django.db.models.query import Q
from django.utils import six, timezone
from django.utils import timezone
from django.utils.http import urlquote
from django.utils.six.moves.urllib import parse as urlparse
from django.utils.translation import ugettext_lazy as _
from model_utils.managers import QueryManager
@ -117,7 +117,7 @@ class LogoutUrlAbstract(models.Model):
def get_logout_url(self, request):
ok_icon_url = (
request.build_absolute_uri(urlparse.urljoin(settings.STATIC_URL, 'authentic2/images/ok.png'))
request.build_absolute_uri(urllib.parse.urljoin(settings.STATIC_URL, 'authentic2/images/ok.png'))
+ '?nonce=%s' % time.time()
)
return self.logout_url.format(urlquote(ok_icon_url))
@ -363,7 +363,7 @@ class PasswordReset(models.Model):
verbose_name_plural = _('password reset')
def __str__(self):
return six.text_type(self.user)
return str(self.user)
class Service(models.Model):
@ -418,7 +418,7 @@ class Service(models.Model):
return self.name
def __repr__(self):
return '<%s %r>' % (self.__class__.__name__, six.text_type(self))
return '<%s %r>' % (self.__class__.__name__, str(self))
def authorize(self, user):
if not self.authorized_roles.exists():
@ -492,7 +492,7 @@ class Token(models.Model):
else:
return _uuid
if isinstance(_uuid, six.text_type):
if isinstance(_uuid, str):
_uuid = _uuid.encode('ascii')
_uuid = base64url_decode(_uuid)
return uuid.UUID(bytes=_uuid)

View File

@ -20,7 +20,6 @@ import re
import string
from django.core.exceptions import ValidationError
from django.utils import six
from django.utils.functional import lazy
from django.utils.module_loading import import_string
from django.utils.translation import ugettext as _
@ -50,8 +49,7 @@ def generate_password():
return ''.join(new_password)
@six.add_metaclass(abc.ABCMeta)
class PasswordChecker(object):
class PasswordChecker(object, metaclass=abc.ABCMeta):
class Check(object):
def __init__(self, label, result):
self.label = label
@ -134,4 +132,4 @@ def password_help_text(password='', only_errors=False):
return ''
password_help_text = lazy(password_help_text, six.text_type)
password_help_text = lazy(password_help_text, str)

View File

@ -22,7 +22,6 @@ from django.conf.urls import url
from django.contrib import admin, messages
from django.core.exceptions import ValidationError
from django.forms import ModelForm
from django.utils import six
from django.utils.translation import ugettext as _
try:
@ -72,7 +71,7 @@ class TextAndFileWidget(forms.widgets.MultiWidget):
def render(self, name, value, attrs=None, **kwargs):
if attrs is None:
attrs = {}
if isinstance(value, (str, six.text_type)):
if isinstance(value, (str, str)):
attrs['rows'] = value.count('\n') + 5
attrs['cols'] = min(max((len(x) for x in value.split('\n'))), 150)
return super(TextAndFileWidget, self).render(name, value, attrs, **kwargs)

View File

@ -15,6 +15,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import datetime
import functools
import logging
import os.path
import re
@ -25,7 +26,6 @@ from django.core.exceptions import ValidationError
from django.http import Http404, HttpResponse, HttpResponseRedirect
from django.shortcuts import render
from django.urls import reverse
from django.utils import six
from django.utils.encoding import force_text
from authentic2.compat_lasso import lasso
@ -337,7 +337,7 @@ def retrieve_metadata_and_create(request, provider_id, sp_or_idp):
return None
logger.debug('loaded %d bytes', len(metadata))
try:
metadata = six.text_type(metadata, 'utf-8')
metadata = str(metadata, 'utf-8')
except UnicodeDecodeError:
logging.error('SAML metadata autoload: retrieved metadata for entity id %s is not UTF-8', provider_id)
return None
@ -586,5 +586,5 @@ def get_session_not_on_or_after(assertion):
value,
)
if session_not_on_or_afters:
return six.moves.reduce(min, session_not_on_or_afters)
return functools.reduce(min, session_not_on_or_afters)
return None

View File

@ -27,7 +27,6 @@ from django.contrib.humanize.templatetags.humanize import apnumber
from django.core.exceptions import ValidationError
from django.db import models
from django.template.defaultfilters import pluralize
from django.utils import six
from django.utils.encoding import force_bytes, force_text
from django.utils.text import capfirst
@ -56,7 +55,7 @@ def dumps(value):
# Initial author: Oliver Beattie
class PickledObject(six.text_type):
class PickledObject(str):
"""A subclass of string so it can be told whether a string is
a pickled object or not (if the object is an instance of this class
then it must [well, should] be a pickled one)."""
@ -161,7 +160,7 @@ class MultiSelectField(models.Field):
return MultiSelectFormField(**defaults)
def get_db_prep_value(self, value, connection, prepared=False):
if isinstance(value, six.string_types):
if isinstance(value, str):
return value
elif isinstance(value, list):
return ",".join(value)

View File

@ -16,6 +16,7 @@
from __future__ import print_function
import io
import os
import sys
import warnings
@ -26,7 +27,6 @@ from django.contrib.contenttypes.models import ContentType
from django.core.management.base import BaseCommand, CommandError
from django.db.transaction import atomic
from django.template.defaultfilters import slugify
from django.utils import six
from django.utils.translation import gettext as _
from authentic2.compat_lasso import lasso
@ -340,11 +340,11 @@ Any other kind of attribute filter policy is unsupported.
response = requests.get(metadata_file_path)
if not response.ok:
raise CommandError('Unable to open url %s' % metadata_file_path)
metadata_file = six.BytesIO(response.content)
metadata_file = io.BytesIO(response.content)
else:
try:
with open(metadata_file_path, 'rb') as fd:
metadata_file = six.BytesIO(fd.read())
metadata_file = io.BytesIO(fd.read())
except IOError:
raise CommandError('Unable to open file %s' % metadata_file_path)

View File

@ -25,7 +25,6 @@ from django.core.exceptions import ObjectDoesNotExist, ValidationError
from django.db import models
from django.db.models import Q
from django.db.models.query import QuerySet
from django.utils import six
from django.utils.encoding import force_str, force_text
from django.utils.translation import ugettext_lazy as _
@ -484,7 +483,7 @@ class LibertyServiceProvider(models.Model):
return (self.liberty_provider.slug,)
def __str__(self):
return six.text_type(self.liberty_provider)
return str(self.liberty_provider)
class Meta:
verbose_name = _('SAML service provider')

View File

@ -22,7 +22,6 @@ import re
import time
import xml.etree.ElementTree as etree
from django.utils import six
from django.utils.encoding import force_text
from authentic2.compat_lasso import lasso
@ -30,14 +29,14 @@ from authentic2.saml import x509utils
def filter_attribute_private_key(message):
if isinstance(message, six.string_types):
if isinstance(message, str):
return re.sub(r' (\w+:)?(PrivateKey=")([&#;\w/ +-=])+(")', '', message)
else:
return message
def filter_element_private_key(message):
if isinstance(message, six.string_types):
if isinstance(message, str):
return re.sub(
r'(<saml)(p)?(:PrivateKeyFile>-----BEGIN RSA PRIVATE KEY-----)'
r'([&#;\w/+=\s])+'

View File

@ -19,14 +19,12 @@ import os
import subprocess
import tempfile
import six
_openssl = 'openssl'
def decapsulate_pem_file(file_or_string):
'''Remove PEM header lines'''
if not isinstance(file_or_string, six.string_types):
if not isinstance(file_or_string, str):
content = file_or_string.read()
else:
content = file_or_string

View File

@ -23,7 +23,6 @@ from django.core.serializers.base import DeserializationError
from django.core.serializers.json import Serializer as JSONSerializer
from django.core.serializers.python import _get_model
from django.db import DEFAULT_DB_ALIAS
from django.utils import six
class Serializer(JSONSerializer):
@ -73,7 +72,7 @@ def Deserializer(stream_or_string, **options):
"""
from django.core.serializers.python import Deserializer as PythonDeserializer
if not isinstance(stream_or_string, (bytes, six.string_types)):
if not isinstance(stream_or_string, (bytes, str)):
stream_or_string = stream_or_string.read()
if isinstance(stream_or_string, bytes):
stream_or_string = stream_or_string.decode('utf-8')
@ -85,5 +84,4 @@ def Deserializer(stream_or_string, **options):
except GeneratorExit:
raise
except Exception as e:
# Map to deserializer error
six.reraise(DeserializationError, DeserializationError(e), sys.exc_info()[2])
raise DeserializationError(e)

View File

@ -19,6 +19,7 @@ import ctypes
import logging
import random
import time
import urllib.parse
import uuid
from functools import wraps
from importlib import import_module
@ -40,10 +41,9 @@ from django.shortcuts import render, resolve_url
from django.template.context import make_context
from django.template.loader import TemplateDoesNotExist, render_to_string, select_template
from django.urls import reverse
from django.utils import html, six, timezone
from django.utils import html, timezone
from django.utils.encoding import iri_to_uri, uri_to_iri
from django.utils.formats import localize
from django.utils.six.moves.urllib import parse as urlparse
from django.utils.translation import ungettext
try:
@ -171,7 +171,7 @@ def get_backends(setting_name='IDP_BACKENDS'):
backends = []
for backend_path in getattr(app_settings, setting_name):
kwargs = {}
if not isinstance(backend_path, six.string_types):
if not isinstance(backend_path, str):
backend_path, kwargs = backend_path
kwargs_settings = getattr(app_settings, setting_name + '_KWARGS', {})
backend = load_backend(backend_path, kwargs_settings)
@ -232,9 +232,9 @@ def get_authenticator_method(authenticator, method, parameters):
def add_arg(url, key, value=None):
'''Add a parameter to an URL'''
key = urlparse.quote(key)
key = urllib.parse.quote(key)
if value is not None:
add = '%s=%s' % (key, urlparse.quote(value))
add = '%s=%s' % (key, urllib.parse.quote(value))
else:
add = key
if '?' in url:
@ -262,7 +262,7 @@ class Service(object):
def field_names(list_of_field_name_and_titles):
for t in list_of_field_name_and_titles:
if isinstance(t, six.string_types):
if isinstance(t, str):
yield t
else:
yield t[0]
@ -270,7 +270,7 @@ def field_names(list_of_field_name_and_titles):
def is_valid_url(url):
try:
parsed = urlparse.urlparse(url)
parsed = urllib.parse.urlparse(url)
if parsed.scheme in ('http', 'https', ''):
return True
except Exception:
@ -307,8 +307,8 @@ def make_url(
else:
url = to
url = iri_to_uri(url)
scheme, netloc, path, query_string, o_fragment = urlparse.urlsplit(url)
url = uri_to_iri(urlparse.urlunsplit((scheme, netloc, path, '', '')))
scheme, netloc, path, query_string, o_fragment = urllib.parse.urlsplit(url)
url = uri_to_iri(urllib.parse.urlunsplit((scheme, netloc, path, '', '')))
fragment = fragment or o_fragment
# Django < 1.6 compat, query_string is not optional
url_params = QueryDict(query_string=query_string, mutable=True)
@ -348,7 +348,7 @@ def make_url(
if request:
url = request.build_absolute_uri(url)
elif hasattr(settings, 'SITE_BASE_URL'):
url = urlparse.urljoin(settings.SITE_BASE_URL, url)
url = urllib.parse.urljoin(settings.SITE_BASE_URL, url)
else:
raise TypeError('make_url() absolute cannot be used without request')
# keep using unicode
@ -426,13 +426,13 @@ def record_authentication_event(request, how, nonce=None):
# explicitly state that the session has been modified
request.session.modified = True
event = {
'who': six.text_type(request.user),
'who': str(request.user),
'who_id': getattr(request.user, 'pk', None),
'how': how,
'when': int(time.time()),
}
kwargs = {
'who': six.text_type(request.user)[:80],
'who': str(request.user)[:80],
'how': how,
}
nonce = nonce or get_nonce(request)
@ -621,12 +621,12 @@ def to_iter(func):
def normalize_attribute_values(values):
'''Take a list of values or a single one and normalize it'''
values_set = set()
if isinstance(values, six.string_types) or not hasattr(values, '__iter__'):
if isinstance(values, str) or not hasattr(values, '__iter__'):
values = [values]
for value in values:
if isinstance(value, bool):
value = str(value).lower()
values_set.add(six.text_type(value))
values_set.add(str(value))
return values_set
@ -682,7 +682,7 @@ def send_templated_mail(
"""
from .. import middleware
if isinstance(template_names, six.string_types):
if isinstance(template_names, str):
template_names = [template_names]
if per_ou_templates and getattr(user_or_email, 'ou', None):
new_template_names = []
@ -1055,7 +1055,7 @@ def select_next_url(request, default, field_name=None, include_post=False, repla
if good_next_url(request, next_url):
if replace:
for key, value in replace.items():
next_url = next_url.replace(key, urlparse.quote(value))
next_url = next_url.replace(key, urllib.parse.quote(value))
return next_url
return default
@ -1127,7 +1127,7 @@ def same_origin(url1, url2):
If not scheme and not port are given, port compare is skipped.
The last two rules allow authorizing complete domains easily.
"""
p1, p2 = urlparse.urlparse(url1), urlparse.urlparse(url2)
p1, p2 = urllib.parse.urlparse(url1), urllib.parse.urlparse(url2)
p1_host, p1_port = netloc_to_host_port(p1.netloc)
p2_host, p2_port = netloc_to_host_port(p2.netloc)

View File

@ -25,7 +25,6 @@ except ImportError:
import ast
from django.utils import six
from django.utils.translation import ugettext as _
@ -110,7 +109,7 @@ class BaseExpressionValidator(ast.NodeVisitor):
# set the nearer expr node as the node of the error
if e.node is None and hasattr(node, 'col_offset'):
e.set_node(node)
six.reraise(*sys.exc_info())
raise e
@lru_cache(maxsize=1024)
def __call__(self, expression):
@ -126,7 +125,7 @@ class BaseExpressionValidator(ast.NodeVisitor):
if e.text is None:
e.text = expression
e.params = {'expression': expression}
six.reraise(*sys.exc_info())
raise e
return compile(tree, expression, mode='eval')
@ -179,8 +178,7 @@ class ConditionValidator(BaseExpressionValidator):
super(ConditionValidator, self).__init__(
authorized_nodes=authorized_nodes, forbidden_nodes=forbidden_nodes
)
if six.PY3:
self.authorized_nodes.append(ast.NameConstant)
self.authorized_nodes.append(ast.NameConstant)
def check_Name(self, node):
if node.id.startswith('_'):
@ -225,7 +223,7 @@ def evaluate_condition(expression, ctx=None, validator=None, on_raise=None):
raise ExpressionError(
_('variable is not defined: %s') % e, code='undefined-variable', text=expression, column=0
)
except Exception:
except Exception as e:
if on_raise is not None:
return on_raise
six.reraise(*sys.exc_info())
raise e

View File

@ -16,7 +16,6 @@
from __future__ import unicode_literals
from django.utils import six
from django.utils.encoding import force_text
from django.utils.functional import keep_lazy
from django.utils.text import format_lazy
@ -30,7 +29,7 @@ def lazy_join(join, args):
return format_lazy(fstring, *args)
@keep_lazy(six.text_type)
@keep_lazy(str)
def lazy_label(default, func):
"""Allow using a getter for a label, with late binding.

View File

@ -42,7 +42,7 @@ from django.shortcuts import get_object_or_404, render
from django.template import loader
from django.template.loader import render_to_string
from django.urls import reverse
from django.utils import six, timezone
from django.utils import timezone
from django.utils.translation import ugettext as _
from django.views.decorators.cache import never_cache
from django.views.decorators.csrf import csrf_exempt, ensure_csrf_cookie
@ -452,7 +452,7 @@ class ProfileView(cbv.TemplateNamesMixin, TemplateView):
value = filter(None, value)
value = [html_value(attribute, at_value) for at_value in value]
if not title:
title = six.text_type(attribute)
title = str(attribute)
else:
# fallback to model attributes
try:
@ -471,7 +471,7 @@ class ProfileView(cbv.TemplateNamesMixin, TemplateView):
if not isinstance(value, (list, tuple)):
value = (value,)
raw_value = value
value = [six.text_type(v) for v in value]
value = [str(v) for v in value]
if value or app_settings.A2_PROFILE_DISPLAY_EMPTY_FIELDS:
profile.append((title, value))
attributes.append({'attribute': attribute, 'values': raw_value})

View File

@ -18,11 +18,11 @@ import base64
import hashlib
import hmac
import json
import urllib.parse
from django.conf import settings
from django.db import models
from django.utils.encoding import force_bytes, force_text
from django.utils.six.moves.urllib import parse as urlparse
from django.utils.timezone import now
from django.utils.translation import ugettext_lazy as _
@ -66,11 +66,11 @@ def parse_id_token(id_token, client_id=None, client_secret=None):
return None, 'id_token is expired'
def check_issuer():
parsed = urlparse.urlparse(app_settings.authorize_url)
parsed = urllib.parse.urlparse(app_settings.authorize_url)
if 'iss' not in payload:
return False
try:
parsed_issuer = urlparse.urlparse(payload['iss'])
parsed_issuer = urllib.parse.urlparse(payload['iss'])
except Exception:
return False
return parsed_issuer.scheme == parsed.scheme and parsed_issuer.netloc == parsed.netloc

View File

@ -16,6 +16,7 @@
import json
import logging
import urllib.parse
import uuid
import requests
@ -31,7 +32,6 @@ from django.http import Http404, HttpResponseRedirect
from django.shortcuts import render, resolve_url
from django.urls import reverse
from django.utils.http import is_safe_url, urlencode
from django.utils.six.moves.urllib import parse as urlparse
from django.utils.translation import ugettext as _
from django.views.generic import FormView, View
from requests_oauthlib import OAuth2Session
@ -509,9 +509,9 @@ class RegistrationView(PopupViewMixin, LoggerMixin, View):
redirect_to = resolve_url(settings.LOGIN_REDIRECT_URL)
# Prevent errors when redirect_to does not contain fc-login-or-link view
parsed_redirect_to = urlparse.urlparse(redirect_to)
parsed_redirect_to = urllib.parse.urlparse(redirect_to)
if parsed_redirect_to.path == reverse('fc-login-or-link'):
redirect_to = urlparse.parse_qs(parsed_redirect_to.query).get(
redirect_to = urllib.parse.parse_qs(parsed_redirect_to.query).get(
REDIRECT_FIELD_NAME, [a2_utils.make_url('auth_homepage')]
)[0]
params = {

View File

@ -20,7 +20,6 @@ import logging
import requests
from django.contrib.auth import get_user_model
from django.contrib.auth.backends import ModelBackend
from django.utils import six
from django.utils.timezone import now
from jwcrypto.jwk import JWK
from jwcrypto.jwt import JWT
@ -88,7 +87,7 @@ class OIDCBackend(ModelBackend):
jwt = JWT(jwt=original_id_token, key=key_or_keyset, check_claims={}, algs=algs)
jwt.claims
if isinstance(id_token.aud, six.text_type) and provider.client_id != id_token.aud:
if isinstance(id_token.aud, str) and provider.client_id != id_token.aud:
logger.warning(
'auth_oidc: invalid id_token audience %s != provider client_id %s',
id_token.aud,

View File

@ -15,11 +15,10 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import datetime
import urllib.parse
import requests
from django.shortcuts import get_object_or_404
from django.utils import six
from django.utils.six.moves.urllib import parse as urlparse
from django.utils.timezone import utc
from django.utils.translation import ugettext as _
from jwcrypto.common import JWException, base64url_encode, json_decode
@ -93,14 +92,14 @@ def parse_id_token(encoded, provider):
REQUIRED_ID_TOKEN_KEYS = set(['iss', 'sub', 'aud', 'exp', 'iat'])
KEY_TYPES = {
'iss': six.text_type,
'sub': six.text_type,
'iss': str,
'sub': str,
'exp': int,
'iat': int,
'auth_time': int,
'nonce': six.text_type,
'acr': six.text_type,
'azp': six.text_type,
'nonce': str,
'acr': str,
'azp': str,
# aud and amr havec specific checks
}
@ -120,7 +119,7 @@ class IDToken(object):
nonce = None
def __init__(self, encoded):
if not isinstance(encoded, (six.binary_type, six.string_types)):
if not isinstance(encoded, (bytes, str)):
raise IDTokenError(_('Encoded ID Token must be either binary or string data'))
self._encoded = encoded
@ -139,7 +138,7 @@ class IDToken(object):
if key == 'amr':
if not isinstance(decoded['amr'], list):
raise IDTokenError(_('invalid amr value: %s') % decoded['amr'])
if not all(isinstance(v, six.text_type) for v in decoded['amr']):
if not all(isinstance(v, str) for v in decoded['amr']):
raise IDTokenError(_('invalid amr value: %s') % decoded['amr'])
elif key in KEY_TYPES:
if not isinstance(decoded[key], KEY_TYPES[key]):
@ -197,7 +196,7 @@ OPENID_CONFIGURATION_REQUIRED = set(
def check_https(url):
return urlparse.urlparse(url).scheme == 'https'
return urllib.parse.urlparse(url).scheme == 'https'
def register_issuer(name, issuer=None, openid_configuration=None, verify=True, timeout=None, ou=None):
@ -293,10 +292,12 @@ def register_issuer(name, issuer=None, openid_configuration=None, verify=True, t
def get_openid_configuration_url(issuer):
parsed = urlparse.urlparse(issuer)
parsed = urllib.parse.urlparse(issuer)
if parsed.query or parsed.fragment or parsed.scheme != 'https':
raise ValueError(
_('invalid issuer URL, it must use the https:// scheme and not have a ' 'query or fragment')
)
issuer = urlparse.urlunparse((parsed.scheme, parsed.netloc, parsed.path.rstrip('/'), None, None, None))
issuer = urllib.parse.urlunparse(
(parsed.scheme, parsed.netloc, parsed.path.rstrip('/'), None, None, None)
)
return issuer + '/.well-known/openid-configuration'

View File

@ -21,7 +21,6 @@ import logging
from django.contrib import messages
from django.core.exceptions import MultipleObjectsReturned
from django.db.transaction import atomic
from django.utils import six
from django.utils.translation import ugettext as _
from mellon.adapters import DefaultAdapter, UserCreationError
from mellon.utils import get_setting
@ -44,7 +43,7 @@ class MappingError(Exception):
super(MappingError, self).__init__(message)
def __str__(self):
s = six.text_type(self.args[0])
s = str(self.args[0])
if self.details:
s += ' ' + repr(self.details)
return s
@ -127,7 +126,7 @@ class AuthenticAdapter(DefaultAdapter):
action = mapping.get('action', 'set-attribute')
mandatory = mapping.get('mandatory', False) is True
method = None
if isinstance(action, six.string_types):
if isinstance(action, str):
try:
method = getattr(self, 'action_' + action.replace('-', '_'))
except AttributeError:
@ -149,21 +148,21 @@ class AuthenticAdapter(DefaultAdapter):
def action_rename(self, user, idp, saml_attributes, mapping):
from_name = mapping.get('from')
if not from_name or not isinstance(from_name, six.string_types):
if not from_name or not isinstance(from_name, str):
raise MappingError('missing from in rename')
to_name = mapping.get('to')
if not to_name or not isinstance(to_name, six.string_types):
if not to_name or not isinstance(to_name, str):
raise MappingError('missing to in rename')
if from_name in saml_attributes:
saml_attributes[to_name] = saml_attributes[from_name]
def action_set_attribute(self, user, idp, saml_attributes, mapping):
attribute = mapping.get('attribute')
if not attribute or not isinstance(attribute, six.string_types):
if not attribute or not isinstance(attribute, str):
raise MappingError('missing attribute key')
saml_attribute = mapping.get('saml_attribute')
if not saml_attribute or not isinstance(saml_attribute, six.string_types):
if not saml_attribute or not isinstance(saml_attribute, str):
raise MappingError('missing saml_attribute key')
if saml_attribute not in saml_attributes:
@ -199,14 +198,14 @@ class AuthenticAdapter(DefaultAdapter):
slug = ou_desc.get('slug')
name = ou_desc.get('name')
if slug:
if not isinstance(slug, six.string_types):
if not isinstance(slug, str):
raise MappingError('invalid ou.slug in ou description')
try:
return OU.objects.get(slug=slug)
except OU.DoesNotExist:
raise MappingError('unknown ou', details={'slug': slug})
elif name:
if not isinstance(name, six.string_types):
if not isinstance(name, str):
raise MappingError('invalid ou.slug in ou description')
try:
return OU.objects.get(name=name)
@ -228,11 +227,11 @@ class AuthenticAdapter(DefaultAdapter):
kwargs['ou'] = ou
if slug:
if not isinstance(slug, six.string_types):
if not isinstance(slug, str):
raise MappingError('invalid role slug', details={'slug': slug})
kwargs['slug'] = slug
elif name:
if not isinstance(name, six.string_types):
if not isinstance(name, str):
raise MappingError('invalid role name', details={'name': name})
kwargs['name'] = name
else:
@ -249,7 +248,7 @@ class AuthenticAdapter(DefaultAdapter):
condition = mapping.get('condition')
if condition is None:
return True
if not isinstance(condition, six.string_types):
if not isinstance(condition, str):
raise MappingError('invalid condition')
try:
# use a proxy to simplify condition expressions as subscript is forbidden
@ -258,7 +257,7 @@ class AuthenticAdapter(DefaultAdapter):
logger.debug('auth_saml: condition %r is %s', condition, value, extra={'user': user})
return value
except Exception as e:
raise MappingError('condition evaluation failed', details={'error': six.text_type(e)})
raise MappingError('condition evaluation failed', details={'error': str(e)})
def action_add_role(self, user, idp, saml_attributes, mapping):
role = self.get_role(mapping)

View File

@ -14,11 +14,11 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import urllib.parse
from datetime import timedelta
from django.db import models
from django.db.models import query
from django.utils.six.moves.urllib import parse as urlparse
from django.utils.timezone import now
@ -37,7 +37,7 @@ class TicketQuerySet(query.QuerySet):
class ServiceQuerySet(query.QuerySet):
def for_service(self, service):
'''Find service with the longest match'''
parsed = urlparse.urlparse(service)
parsed = urllib.parse.urlparse(service)
matches = []
for match in self.filter(urls__contains=parsed.netloc):
urls = match.get_urls()

View File

@ -21,7 +21,6 @@ from xml.etree import ElementTree as ET
import requests
from django.http import HttpResponse, HttpResponseBadRequest
from django.utils import six
from django.utils.timezone import now
from django.views.generic.base import View
@ -262,7 +261,7 @@ class ValidateBaseView(CasMixin, View):
if st.service.identifier_attribute not in attributes:
self.logger.error(
'unable to compute an identifier for user %r and service %s',
six.text_type(st.user),
str(st.user),
st.service_url,
)
return self.validation_failure(request, service, INTERNAL_ERROR)
@ -299,7 +298,7 @@ class ValidateBaseView(CasMixin, View):
'validation success service: %r ticket: %s user: %r identifier: %r',
st.service_url,
st.ticket_id,
six.text_type(st.user),
str(st.user),
identifier,
)
return self.real_validation_success(request, st, identifier)
@ -327,7 +326,7 @@ class ServiceValidateView(ValidateBaseView):
root = ET.Element(SERVICE_RESPONSE_ELT)
success = ET.SubElement(root, AUTHENTICATION_SUCCESS_ELT)
user = ET.SubElement(success, USER_ELT)
user.text = six.text_type(identifier)
user.text = str(identifier)
self.provision_pgt(request, st, success)
self.provision_attributes(request, st, success)
return HttpResponse(ET.tostring(root, encoding='utf-8'), content_type='text/xml')
@ -349,7 +348,7 @@ class ServiceValidateView(ValidateBaseView):
for key, values in values.items():
for value in values:
attribute_elt = ET.SubElement(attributes_elt, '{%s}%s' % (CAS_NAMESPACE, key))
attribute_elt.text = six.text_type(value)
attribute_elt.text = str(value)
def provision_pgt(self, request, st, success):
"""Provision a PGT ticket if requested"""

View File

@ -14,6 +14,7 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import urllib.parse
import uuid
from importlib import import_module
@ -22,8 +23,6 @@ from django.contrib.contenttypes.fields import GenericForeignKey, GenericRelatio
from django.core.exceptions import ImproperlyConfigured, ValidationError
from django.core.validators import URLValidator
from django.db import models
from django.utils import six
from django.utils.six.moves.urllib import parse as urlparse
from django.utils.timezone import now
from django.utils.translation import ugettext_lazy as _
@ -34,7 +33,7 @@ from . import app_settings, managers, utils
def generate_uuid():
return six.text_type(uuid.uuid4())
return str(uuid.uuid4())
def validate_https_url(data):
@ -173,9 +172,9 @@ class OIDCClient(Service):
if len(redirect_uri) > app_settings.REDIRECT_URI_MAX_LENGTH:
raise ValueError('redirect_uri length > %s' % app_settings.REDIRECT_URI_MAX_LENGTH)
parsed_uri = urlparse.urlparse(redirect_uri)
parsed_uri = urllib.parse.urlparse(redirect_uri)
for valid_redirect_uri in self.redirect_uris.split():
parsed_valid_uri = urlparse.urlparse(valid_redirect_uri)
parsed_valid_uri = urllib.parse.urlparse(valid_redirect_uri)
if parsed_uri.scheme != parsed_valid_uri.scheme:
continue
if parsed_valid_uri.netloc.startswith('*'):
@ -257,8 +256,8 @@ class OIDCAuthorization(models.Model):
def __repr__(self):
return '<OIDCAuthorization client:%r user:%r scopes:%r>' % (
self.client_id and six.text_type(self.client),
self.user_id and six.text_type(self.user),
self.client_id and str(self.client),
self.user_id and str(self.user),
self.scopes,
)
@ -324,8 +323,8 @@ class OIDCCode(SessionMixin, models.Model):
def __repr__(self):
return '<OIDCCode uuid:%s client:%s user:%s expired:%s scopes:%s>' % (
self.uuid,
self.client_id and six.text_type(self.client),
self.user_id and six.text_type(self.user),
self.client_id and str(self.client),
self.user_id and str(self.user),
self.expired,
self.scopes,
)
@ -362,8 +361,8 @@ class OIDCAccessToken(SessionMixin, models.Model):
def __repr__(self):
return '<OIDCAccessToken uuid:%s client:%s user:%s expired:%s scopes:%s>' % (
self.uuid,
self.client_id and six.text_type(self.client),
self.user_id and six.text_type(self.user),
self.client_id and str(self.client),
self.user_id and str(self.user),
self.expired,
self.scopes,
)

View File

@ -17,13 +17,12 @@
import base64
import hashlib
import json
import urllib.parse
import uuid
from django.conf import settings
from django.core.exceptions import ImproperlyConfigured
from django.utils import six
from django.utils.encoding import force_bytes, force_text
from django.utils.six.moves.urllib import parse as urlparse
from jwcrypto.jwk import JWK, InvalidJWKValue, JWKSet
from jwcrypto.jwt import JWT
@ -107,7 +106,7 @@ def clean_words(data):
def url_domain(url):
return urlparse.urlparse(url).netloc.split(':')[0]
return urllib.parse.urlparse(url).netloc.split(':')[0]
def make_sub(client, user):
@ -165,7 +164,7 @@ def reverse_pairwise_sub(client, sub):
def normalize_claim_values(values):
values_list = []
if isinstance(values, six.string_types) or not hasattr(values, '__iter__'):
if isinstance(values, str) or not hasattr(values, '__iter__'):
return values
for value in values:
if isinstance(value, bool):

View File

@ -36,7 +36,6 @@ from django.contrib.auth import authenticate
from django.http import HttpResponse, HttpResponseNotAllowed, JsonResponse
from django.shortcuts import render
from django.urls import reverse
from django.utils import six
from django.utils.encoding import force_text
from django.utils.http import urlencode
from django.utils.timezone import now, utc
@ -423,7 +422,7 @@ def authorize_for_client(request, client, redirect_uri):
'idp_oidc: sending code %s for scopes %s for service %s', code.uuid, ' '.join(scopes), client
)
params = {
'code': six.text_type(code.uuid),
'code': str(code.uuid),
}
if state is not None:
params['state'] = state
@ -664,7 +663,7 @@ def idtoken_from_user_credential(request):
)
return JsonResponse(
{
'access_token': six.text_type(access_token.uuid),
'access_token': str(access_token.uuid),
'token_type': 'Bearer',
'expires_in': int(expires_in.total_seconds()),
'id_token': utils.make_idtoken(client, id_token),
@ -726,7 +725,7 @@ def tokens_from_authz_code(request):
id_token['nonce'] = oidc_code.nonce
return JsonResponse(
{
'access_token': six.text_type(access_token.uuid),
'access_token': str(access_token.uuid),
'token_type': 'Bearer',
'expires_in': int(expires_in.total_seconds()),
'id_token': utils.make_idtoken(client, id_token),

View File

@ -1,9 +1,9 @@
import copy
import functools
from django.conf import settings
from django.contrib.contenttypes.models import ContentType
from django.db.models.query import Q
from django.utils import six
try:
from django.core.exceptions import FieldDoesNotExist
@ -145,7 +145,7 @@ class DjangoRBACBackend(object):
return False
if user_obj.is_superuser:
return True
if isinstance(perm_or_perms, six.string_types):
if isinstance(perm_or_perms, str):
perm_or_perms = [perm_or_perms]
perm_or_perms = set(perm_or_perms)
cache = self.get_permission_cache(user_obj)
@ -174,7 +174,7 @@ class DjangoRBACBackend(object):
return False
if user_obj.is_superuser:
return True
if isinstance(perm_or_perms, six.string_types):
if isinstance(perm_or_perms, str):
perm_or_perms = [perm_or_perms]
perm_or_perms = set(perm_or_perms)
cache = self.get_permission_cache(user_obj)
@ -197,7 +197,7 @@ class DjangoRBACBackend(object):
ct_id, fk = key.split('.')
q.append(Q(pk=int(fk)))
if q:
return six.moves.reduce(Q.__or__, q)
return functools.reduce(Q.__or__, q)
return False
def filter_by_perm(self, user_obj, perm_or_perms, qs):

View File

@ -1,4 +1,5 @@
import contextlib
import functools
import threading
from django.contrib.auth import get_user_model
@ -6,7 +7,6 @@ from django.contrib.contenttypes.models import ContentType
from django.db import models
from django.db.models import query
from django.db.models.query import Prefetch, Q
from django.utils import six
from . import utils
@ -210,7 +210,7 @@ class RoleParentingManager(models.Manager):
obsolete = old - add
if obsolete:
queries = (query.Q(parent_id=a, child_id=b, direct=False) for a, b in obsolete)
self.model.objects.filter(six.moves.reduce(query.Q.__or__, queries)).delete()
self.model.objects.filter(functools.reduce(query.Q.__or__, queries)).delete()
@contextlib.contextmanager

View File

@ -1,10 +1,10 @@
import functools
import hashlib
import operator
from django.conf import settings
from django.db import models
from django.db.models.query import Prefetch, Q
from django.utils import six
from django.utils.text import slugify
from django.utils.translation import ugettext_lazy as _
@ -45,7 +45,7 @@ class AbstractBase(models.Model):
def save(self, *args, **kwargs):
# truncate slug and add a hash if it's too long
if not self.slug:
self.slug = slugify(six.text_type(self.name)).lstrip('_')
self.slug = slugify(str(self.name)).lstrip('_')
if len(self.slug) > 256:
self.slug = self.slug[:252] + hashlib.md5(self.slug).hexdigest()[:4]
if not self.uuid:
@ -373,7 +373,7 @@ class PermissionMixin(models.Model):
if hasattr(backend, "filter_by_perm"):
results.append(backend.filter_by_perm(self, perm_or_perms, qs))
if results:
return six.moves.reduce(operator.__or__, results)
return functools.reduce(operator.__or__, results)
else:
return qs

View File

@ -18,7 +18,7 @@ import base64
import contextlib
import datetime
import json
import urllib.parse as urlparse
import urllib.parse
import uuid
import httmock
@ -60,7 +60,7 @@ class FranceConnectMock:
def handle_authorization(self, app, url, **kwargs):
assert url.startswith('https://fcp.integ01')
parsed_url = urlparse.urlparse(url)
parsed_url = urllib.parse.urlparse(url)
query = QueryDict(parsed_url.query)
assert_equals_url(query['redirect_uri'], self.callback_url)
assert query['client_id'] == self.client_id
@ -90,7 +90,7 @@ class FranceConnectMock:
app.session.flush()
response = app.get(path)
self.callback_params = {
k: v for k, v in QueryDict(urlparse.urlparse(response.location).query).items()
k: v for k, v in QueryDict(urllib.parse.urlparse(response.location).query).items()
}
response = response.follow()
response = response.click(href='callback')
@ -142,7 +142,7 @@ class FranceConnectMock:
def handle_logout(self, app, url):
assert url.startswith('https://fcp.integ01.dev-franceconnect.fr/api/v1/logout')
parsed_url = urlparse.urlparse(url)
parsed_url = urllib.parse.urlparse(url)
query = QueryDict(parsed_url.query)
assert_equals_url(query['post_logout_redirect_uri'], 'http://testserver' + reverse('fc-logout'))
assert query['state']

View File

@ -16,12 +16,12 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import datetime
import urllib.parse
import mock
import requests
from django.contrib.auth import get_user_model
from django.urls import reverse
from django.utils.six.moves.urllib import parse as urlparse
from django.utils.timezone import now
from authentic2.custom_user.models import DeletedUser
@ -34,7 +34,7 @@ User = get_user_model()
def path(url):
return urlparse.urlparse(url).path
return urllib.parse.urlparse(url).path
def test_login_redirect(app, franceconnect):

View File

@ -16,6 +16,8 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import urllib.parse
import django
import django_webtest
import mock
@ -25,7 +27,6 @@ from django.core.cache import cache
from django.core.management import call_command
from django.db import connection
from django.db.migrations.executor import MigrationExecutor
from django.utils.six.moves.urllib import parse as urlparse
from pytest_django.migrations import DisableMigrations
from authentic2 import hooks as a2_hooks
@ -458,7 +459,7 @@ def assert_external_redirect(external_redirect):
else:
def check_location(response, default_return):
assert urlparse.urljoin('http://testserver/', default_return).endswith(response['Location'])
assert urllib.parse.urljoin('http://testserver/', default_return).endswith(response['Location'])
return check_location

View File

@ -17,7 +17,7 @@
from __future__ import unicode_literals
from django.utils.six.moves.urllib.parse import urlparse
from urllib.parse import urlparse
from authentic2.custom_user.models import User
from authentic2.models import Attribute

View File

@ -17,6 +17,7 @@
import base64
import json
import urllib.parse
import pytest
from django.contrib.auth import get_user_model
@ -29,8 +30,6 @@ from django.test.client import Client
from django.test.utils import override_settings
from django.urls import reverse
from django.utils.encoding import force_text
from django.utils.six import text_type
from django.utils.six.moves.urllib import parse as urlparse
from django.utils.translation import ugettext as _
from rest_framework import status, test
@ -203,7 +202,8 @@ class UtilsTests(Authentic2TestCase):
response = login_require(request, login_hint=['backoffice'])
self.assertEqualsURL(response['Location'].split('?', 1)[0], '/login/')
self.assertEqualsURL(
urlparse.parse_qs(response['Location'].split('?', 1)[1])['next'][0], '/coin?nonce=xxx&next=/zob/'
urllib.parse.parse_qs(response['Location'].split('?', 1)[1])['next'][0],
'/coin?nonce=xxx&next=/zob/',
)
self.assertEqual(request.session['login-hint'], ['backoffice'])
@ -385,7 +385,7 @@ class AttributeKindsTest(TestCase):
for i, name in enumerate(attribute_kinds.get_attribute_kinds()):
fields['field_%d' % i] = attribute_kinds.get_form_field(name)
AttributeKindForm = type('AttributeKindForm', (forms.Form,), fields)
text_type(AttributeKindForm().as_p())
str(AttributeKindForm().as_p())
class APITest(TestCase):

View File

@ -21,6 +21,7 @@ import os
import random
import re
import time
import urllib.parse
import pytest
from django.contrib.auth import get_user_model
@ -28,7 +29,6 @@ from django.db import IntegrityError, transaction
from django.http import QueryDict
from django.urls import reverse
from django.utils.encoding import force_str, force_text
from django.utils.six.moves.urllib import parse as urlparse
from django.utils.timezone import now, utc
from httmock import HTTMock, urlmatch
from jwcrypto.common import base64url_decode, base64url_encode, json_encode
@ -248,13 +248,13 @@ def oidc_provider_mock(
provides_kid_header=False,
kid=None,
):
token_endpoint = urlparse.urlparse(oidc_provider.token_endpoint)
userinfo_endpoint = urlparse.urlparse(oidc_provider.userinfo_endpoint)
token_revocation_endpoint = urlparse.urlparse(oidc_provider.token_revocation_endpoint)
token_endpoint = urllib.parse.urlparse(oidc_provider.token_endpoint)
userinfo_endpoint = urllib.parse.urlparse(oidc_provider.userinfo_endpoint)
token_revocation_endpoint = urllib.parse.urlparse(oidc_provider.token_revocation_endpoint)
@urlmatch(netloc=token_endpoint.netloc, path=token_endpoint.path)
def token_endpoint_mock(url, request):
if urlparse.parse_qs(request.body).get('code') == [code]:
if urllib.parse.parse_qs(request.body).get('code') == [code]:
exp = now() + datetime.timedelta(seconds=10)
id_token = {
'iss': oidc_provider.issuer,
@ -341,7 +341,7 @@ def oidc_provider_mock(
@urlmatch(netloc=token_revocation_endpoint.netloc, path=token_revocation_endpoint.path)
def token_revocation_endpoint_mock(url, request):
query = urlparse.parse_qs(request.body)
query = urllib.parse.parse_qs(request.body)
assert 'token' in query
return {
'status_code': 200,
@ -458,8 +458,8 @@ def test_sso(app, caplog, code, oidc_provider, oidc_provider_jwkset, hooks):
response = app.get('/admin/').maybe_follow()
assert oidc_provider.name in response.text
response = response.click(oidc_provider.name)
location = urlparse.urlparse(response.location)
endpoint = urlparse.urlparse(oidc_provider.authorization_endpoint)
location = urllib.parse.urlparse(response.location)
endpoint = urllib.parse.urlparse(oidc_provider.authorization_endpoint)
assert location.scheme == endpoint.scheme
assert location.netloc == endpoint.netloc
assert location.path == endpoint.path
@ -510,7 +510,7 @@ def test_sso(app, caplog, code, oidc_provider, oidc_provider_jwkset, hooks):
assert set(hooks.auth_oidc_backend_modify_user[0]['kwargs']) >= set(
['user', 'provider', 'user_info', 'id_token', 'access_token']
)
assert urlparse.urlparse(response['Location']).path == '/admin/'
assert urllib.parse.urlparse(response['Location']).path == '/admin/'
assert User.objects.count() == 1
user = User.objects.get()
assert user.ou == get_default_ou()
@ -588,7 +588,7 @@ def test_strategy_find_uuid(app, caplog, code, oidc_provider, oidc_provider_jwks
response = app.get('/').maybe_follow()
assert oidc_provider.name in response.text
response = response.click(oidc_provider.name)
location = urlparse.urlparse(response.location)
location = urllib.parse.urlparse(response.location)
query = QueryDict(location.query)
state = query['state']
nonce = query['nonce']
@ -603,7 +603,7 @@ def test_strategy_find_uuid(app, caplog, code, oidc_provider, oidc_provider_jwks
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, sub=simple_user.uuid, nonce=nonce):
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state})
assert urlparse.urlparse(response['Location']).path == '/'
assert urllib.parse.urlparse(response['Location']).path == '/'
assert User.objects.count() == 1
user = User.objects.get()
# verify user was not modified
@ -630,7 +630,7 @@ def test_strategy_create(app, caplog, code, oidc_provider, oidc_provider_jwkset)
response = app.get('/').maybe_follow()
assert oidc_provider.name in response.text
response = response.click(oidc_provider.name)
location = urlparse.urlparse(response.location)
location = urllib.parse.urlparse(response.location)
query = QueryDict(location.query)
state = query['state']
nonce = query['nonce']
@ -658,7 +658,7 @@ def test_register_issuer(db, app, caplog, oidc_provider_jwkset):
config_file = os.path.join(config_dir, 'openid_configuration.json')
with open(config_file) as f:
oidc_conf = json.load(f)
jwks_uri = urlparse.urlparse(oidc_conf['jwks_uri'])
jwks_uri = urllib.parse.urlparse(oidc_conf['jwks_uri'])
@urlmatch(netloc=jwks_uri.netloc, path=jwks_uri.path)
def jwks_mock(url, request):
@ -705,7 +705,7 @@ def test_invalid_kid(app, caplog, code, oidc_provider_rsa, oidc_provider_jwkset,
response = app.get('/').maybe_follow()
assert oidc_provider_rsa.name in response.text
response = response.click(oidc_provider_rsa.name)
location = urlparse.urlparse(response.location)
location = urllib.parse.urlparse(response.location)
query = QueryDict(location.query)
state = query['state']
nonce = query['nonce']
@ -771,7 +771,7 @@ def test_templated_claim_mapping(app, caplog, code, oidc_provider, oidc_provider
response = app.get('/').maybe_follow()
response = response.click(oidc_provider.name)
location = urlparse.urlparse(response.location)
location = urllib.parse.urlparse(response.location)
query = QueryDict(location.query)
state = query['state']
nonce = query['nonce']
@ -799,7 +799,7 @@ def test_lost_state(app, caplog, code, oidc_provider, oidc_provider_jwkset, hook
# As the oidc-state is used during a redirect from a third-party, we need
# it to be lax.
assert re.search('Set-Cookie.* oidc-state=.*SameSite=Lax', str(response))
qs = urlparse.parse_qs(urlparse.urlparse(response.location).query)
qs = urllib.parse.parse_qs(urllib.parse.urlparse(response.location).query)
state = qs['state']
# reset the session to forget the state
@ -858,7 +858,7 @@ def test_multiple_users_with_same_email(app, caplog, code, oidc_provider_jwkset,
response = app.get('/').maybe_follow()
assert oidc_provider.name in response.text
response = response.click(oidc_provider.name)
location = urlparse.urlparse(response.location)
location = urllib.parse.urlparse(response.location)
query = QueryDict(location.query)
state = query['state']
nonce = query['nonce']
@ -877,7 +877,7 @@ def test_multiple_users_with_same_email(app, caplog, code, oidc_provider_jwkset,
response = app.get('/').maybe_follow()
assert oidc_provider.name in response.text
response = response.click(oidc_provider.name)
location = urlparse.urlparse(response.location)
location = urllib.parse.urlparse(response.location)
query = QueryDict(location.query)
state = query['state']
nonce = query['nonce']

View File

@ -14,11 +14,12 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import urllib.parse
from django.contrib.auth import get_user_model
from django.test.client import Client, RequestFactory
from django.test.utils import override_settings
from django.utils.encoding import force_text
from django.utils.six.moves.urllib import parse as urlparse
from authentic2.a2_rbac.utils import get_default_ou
from authentic2.constants import AUTHENTICATION_EVENTS_SESSION_KEY, NONCE_FIELD_NAME
@ -132,9 +133,9 @@ class CasTests(Authentic2TestCase):
assert service.authorized_roles.exists() is True
response = client.get('/idp/cas/login', {constants.SERVICE_PARAM: self.URL})
location = response['Location']
query = urlparse.parse_qs(location.split('?')[1])
query = urllib.parse.parse_qs(location.split('?')[1])
next_url, next_url_query = query['next'][0].split('?')
next_url_query = urlparse.parse_qs(next_url_query)
next_url_query = urllib.parse.parse_qs(next_url_query)
response = client.get(location)
response = client.post(
location,
@ -152,9 +153,9 @@ class CasTests(Authentic2TestCase):
assert service.authorized_roles.exists() is True
response = client.get('/idp/cas/login', {constants.SERVICE_PARAM: self.URL})
location = response['Location']
query = urlparse.parse_qs(location.split('?')[1])
query = urllib.parse.parse_qs(location.split('?')[1])
next_url, next_url_query = query['next'][0].split('?')
next_url_query = urlparse.parse_qs(next_url_query)
next_url_query = urllib.parse.parse_qs(next_url_query)
response = client.get(location)
response = client.post(
location,
@ -163,7 +164,7 @@ class CasTests(Authentic2TestCase):
)
response = client.get(response.url)
client = Client()
ticket_id = urlparse.parse_qs(response.url.split('?')[1])[constants.TICKET_PARAM][0]
ticket_id = urllib.parse.parse_qs(response.url.split('?')[1])[constants.TICKET_PARAM][0]
response = client.get(
'/idp/cas/validate', {constants.TICKET_PARAM: ticket_id, constants.SERVICE_PARAM: self.URL}
)
@ -174,13 +175,13 @@ class CasTests(Authentic2TestCase):
ticket = Ticket.objects.get()
location = response['Location']
url = location.split('?')[0]
query = urlparse.parse_qs(location.split('?')[1])
query = urllib.parse.parse_qs(location.split('?')[1])
self.assertTrue(url.endswith('/login/'))
self.assertIn('nonce', query)
self.assertIn('next', query)
self.assertEqual(query['nonce'], [ticket.ticket_id])
next_url, next_url_query = query['next'][0].split('?')
next_url_query = urlparse.parse_qs(next_url_query)
next_url_query = urllib.parse.parse_qs(next_url_query)
self.assertEqual(next_url, '/idp/cas/continue/')
self.assertEqual(set(next_url_query.keys()), set([constants.SERVICE_PARAM, NONCE_FIELD_NAME]))
self.assertEqual(next_url_query[constants.SERVICE_PARAM], [self.URL])
@ -214,7 +215,7 @@ class CasTests(Authentic2TestCase):
# Do not the same client for direct calls from the CAS service provider
# to prevent use of the user session
client = Client()
ticket_id = urlparse.parse_qs(response.url.split('?')[1])[constants.TICKET_PARAM][0]
ticket_id = urllib.parse.parse_qs(response.url.split('?')[1])[constants.TICKET_PARAM][0]
response = client.get(
'/idp/cas/validate', {constants.TICKET_PARAM: ticket_id, constants.SERVICE_PARAM: self.URL}
)
@ -231,13 +232,13 @@ class CasTests(Authentic2TestCase):
ticket = Ticket.objects.get()
location = response['Location']
url = location.split('?')[0]
query = urlparse.parse_qs(location.split('?')[1])
query = urllib.parse.parse_qs(location.split('?')[1])
self.assertTrue(url.endswith('/login/'))
self.assertIn('nonce', query)
self.assertIn('next', query)
self.assertEqual(query['nonce'], [ticket.ticket_id])
next_url, next_url_query = query['next'][0].split('?')
next_url_query = urlparse.parse_qs(next_url_query)
next_url_query = urllib.parse.parse_qs(next_url_query)
self.assertEqual(next_url, '/idp/cas/continue/')
self.assertEqual(set(next_url_query.keys()), set([constants.SERVICE_PARAM, NONCE_FIELD_NAME]))
self.assertEqual(next_url_query[constants.SERVICE_PARAM], [self.URL])
@ -271,7 +272,7 @@ class CasTests(Authentic2TestCase):
# Do not the same client for direct calls from the CAS service provider
# to prevent use of the user session
client = Client()
ticket_id = urlparse.parse_qs(response.url.split('?')[1])[constants.TICKET_PARAM][0]
ticket_id = urllib.parse.parse_qs(response.url.split('?')[1])[constants.TICKET_PARAM][0]
response = client.get(
'/idp/cas/serviceValidate', {constants.TICKET_PARAM: ticket_id, constants.SERVICE_PARAM: self.URL}
)
@ -292,13 +293,13 @@ class CasTests(Authentic2TestCase):
ticket = Ticket.objects.get()
location = response['Location']
url = location.split('?')[0]
query = urlparse.parse_qs(location.split('?')[1])
query = urllib.parse.parse_qs(location.split('?')[1])
self.assertTrue(url.endswith('/login/'))
self.assertIn('nonce', query)
self.assertIn('next', query)
self.assertEqual(query['nonce'], [ticket.ticket_id])
next_url, next_url_query = query['next'][0].split('?')
next_url_query = urlparse.parse_qs(next_url_query)
next_url_query = urllib.parse.parse_qs(next_url_query)
self.assertEqual(next_url, '/idp/cas/continue/')
self.assertEqual(set(next_url_query.keys()), set([constants.SERVICE_PARAM, NONCE_FIELD_NAME]))
self.assertEqual(next_url_query[constants.SERVICE_PARAM], [self.URL])
@ -332,7 +333,7 @@ class CasTests(Authentic2TestCase):
# Do not the same client for direct calls from the CAS service provider
# to prevent use of the user session
client = Client()
ticket_id = urlparse.parse_qs(response.url.split('?')[1])[constants.TICKET_PARAM][0]
ticket_id = urllib.parse.parse_qs(response.url.split('?')[1])[constants.TICKET_PARAM][0]
response = client.get(
'/idp/cas/serviceValidate',
{constants.TICKET_PARAM: ticket_id, constants.SERVICE_PARAM: self.URL, constants.RENEW_PARAM: ''},
@ -351,13 +352,13 @@ class CasTests(Authentic2TestCase):
ticket = Ticket.objects.get()
location = response['Location']
url = location.split('?')[0]
query = urlparse.parse_qs(location.split('?')[1])
query = urllib.parse.parse_qs(location.split('?')[1])
self.assertTrue(url.endswith('/login/'))
self.assertIn('nonce', query)
self.assertIn('next', query)
self.assertEqual(query['nonce'], [ticket.ticket_id])
next_url, next_url_query = query['next'][0].split('?')
next_url_query = urlparse.parse_qs(next_url_query)
next_url_query = urllib.parse.parse_qs(next_url_query)
self.assertEqual(next_url, '/idp/cas/continue/')
self.assertEqual(set(next_url_query.keys()), set([constants.SERVICE_PARAM, NONCE_FIELD_NAME]))
self.assertEqual(next_url_query[constants.SERVICE_PARAM], [self.URL])
@ -391,7 +392,7 @@ class CasTests(Authentic2TestCase):
# Do not the same client for direct calls from the CAS service provider
# to prevent use of the user session
client = Client()
ticket_id = urlparse.parse_qs(response.url.split('?')[1])[constants.TICKET_PARAM][0]
ticket_id = urllib.parse.parse_qs(response.url.split('?')[1])[constants.TICKET_PARAM][0]
response = client.get(
'/idp/cas/proxyValidate', {constants.TICKET_PARAM: ticket_id, constants.SERVICE_PARAM: self.URL}
)
@ -413,13 +414,13 @@ class CasTests(Authentic2TestCase):
ticket = Ticket.objects.get()
location = response['Location']
url = location.split('?')[0]
query = urlparse.parse_qs(location.split('?')[1])
query = urllib.parse.parse_qs(location.split('?')[1])
self.assertTrue(url.endswith('/login/'))
self.assertIn('nonce', query)
self.assertIn('next', query)
self.assertEqual(query['nonce'], [ticket.ticket_id])
next_url, next_url_query = query['next'][0].split('?')
next_url_query = urlparse.parse_qs(next_url_query)
next_url_query = urllib.parse.parse_qs(next_url_query)
self.assertEqual(next_url, '/idp/cas/continue/')
self.assertEqual(set(next_url_query.keys()), set([constants.SERVICE_PARAM, NONCE_FIELD_NAME]))
self.assertEqual(next_url_query[constants.SERVICE_PARAM], [self.URL])
@ -453,7 +454,7 @@ class CasTests(Authentic2TestCase):
# Do not the same client for direct calls from the CAS service provider
# to prevent use of the user session
client = Client()
ticket_id = urlparse.parse_qs(response.url.split('?')[1])[constants.TICKET_PARAM][0]
ticket_id = urllib.parse.parse_qs(response.url.split('?')[1])[constants.TICKET_PARAM][0]
response = client.get(
'/idp/cas/serviceValidate',
{

View File

@ -18,6 +18,7 @@ import base64
import datetime
import functools
import json
import urllib.parse
from importlib import import_module
import pytest
@ -28,7 +29,6 @@ from django.http import QueryDict
from django.test.utils import override_settings
from django.urls import reverse
from django.utils.encoding import force_text
from django.utils.six.moves.urllib import parse as urlparse
from django.utils.timezone import now
from jwcrypto.jwk import JWK, JWKSet
from jwcrypto.jwt import JWT
@ -301,9 +301,9 @@ def test_authorization_code_sso(
assert code.auth_time <= now()
assert code.expired >= now()
assert response['Location'].startswith(redirect_uri)
location = urlparse.urlparse(response['Location'])
location = urllib.parse.urlparse(response['Location'])
if oidc_client.authorization_flow == oidc_client.FLOW_AUTHORIZATION_CODE:
query = urlparse.parse_qs(location.query)
query = urllib.parse.parse_qs(location.query)
assert set(query.keys()) == set(['code', 'state'])
assert query['code'] == [code.uuid]
code = query['code'][0]
@ -328,7 +328,7 @@ def test_authorization_code_sso(
id_token = response.json['id_token']
elif oidc_client.authorization_flow == oidc_client.FLOW_IMPLICIT:
assert location.fragment
query = urlparse.parse_qs(location.fragment)
query = urllib.parse.parse_qs(location.fragment)
assert OIDCAccessToken.objects.count() == 1
access_token = OIDCAccessToken.objects.get()
assert set(query.keys()) == set(['access_token', 'token_type', 'expires_in', 'id_token', 'state'])
@ -434,13 +434,13 @@ def check_authorize_error(
):
# check next_url qs
if message:
location = urlparse.urlparse(response.location)
location = urllib.parse.urlparse(response.location)
assert location.path == '/continue/'
if check_next:
location_qs = QueryDict(location.query or '')
assert 'next' in location_qs
assert location_qs['next'].startswith(redirect_uri)
next_url = urlparse.urlparse(location_qs['next'])
next_url = urllib.parse.urlparse(location_qs['next'])
next_url_qs = QueryDict(next_url.fragment if fragment else next_url.query)
assert next_url_qs['error'] == error
assert next_url_qs['error_description'] == error_description
@ -449,7 +449,7 @@ def check_authorize_error(
assert error_description in continue_response.pyquery('.error').text()
elif check_next:
assert response.location.startswith(redirect_uri)
location = urlparse.urlparse(response.location)
location = urllib.parse.urlparse(response.location)
location_qs = QueryDict(location.fragment if fragment else location.query)
assert location_qs['error'] == error
assert location_qs['error_description'] == error_description
@ -466,7 +466,7 @@ def check_authorize_error(
def assert_authorization_response(response, fragment=False, **kwargs):
location = urlparse.urlparse(response.location)
location = urllib.parse.urlparse(response.location)
location_qs = QueryDict(location.fragment if fragment else location.query)
assert set(location_qs) == set(kwargs)
for key, value in kwargs.items():
@ -769,7 +769,7 @@ def test_invalid_request(oidc_client, caplog, oidc_settings, simple_user, app):
},
)
response = app.get(authorize_url)
assert urlparse.urlparse(response['Location']).path == reverse('auth_login')
assert urllib.parse.urlparse(response['Location']).path == reverse('auth_login')
if oidc_client.authorization_mode != oidc_client.AUTHORIZATION_MODE_NONE:
# prompt is none, but consent is required
@ -873,8 +873,8 @@ def test_invalid_request(oidc_client, caplog, oidc_settings, simple_user, app):
code.expired = now() - datetime.timedelta(seconds=120)
assert not code.is_valid()
code.save()
location = urlparse.urlparse(response['Location'])
query = urlparse.parse_qs(location.query)
location = urllib.parse.urlparse(response['Location'])
query = urllib.parse.parse_qs(location.query)
assert set(query.keys()) == set(['code'])
assert query['code'] == [code.uuid]
code = query['code'][0]
@ -1000,8 +1000,8 @@ def test_client_secret_post_authentication(oidc_settings, app, simple_oidc_clien
authorize_url = make_url('oidc-authorize', params=params)
response = app.get(authorize_url)
response = response.form.submit('accept')
location = urlparse.urlparse(response['Location'])
query = urlparse.parse_qs(location.query)
location = urllib.parse.urlparse(response['Location'])
query = urllib.parse.parse_qs(location.query)
code = query['code'][0]
token_url = make_url('oidc-token')
response = app.post(
@ -1067,9 +1067,9 @@ def test_role_control_access(login_first, oidc_settings, oidc_client, simple_use
assert OIDCAuthorization.objects.get()
if oidc_client.authorization_flow == oidc_client.FLOW_AUTHORIZATION_CODE:
code = OIDCCode.objects.get()
location = urlparse.urlparse(response['Location'])
location = urllib.parse.urlparse(response['Location'])
if oidc_client.authorization_flow == oidc_client.FLOW_AUTHORIZATION_CODE:
query = urlparse.parse_qs(location.query)
query = urllib.parse.parse_qs(location.query)
code = query['code'][0]
token_url = make_url('oidc-token')
response = app.post(
@ -1083,7 +1083,7 @@ def test_role_control_access(login_first, oidc_settings, oidc_client, simple_use
)
id_token = response.json['id_token']
elif oidc_client.authorization_flow == oidc_client.FLOW_IMPLICIT:
query = urlparse.parse_qs(location.fragment)
query = urllib.parse.parse_qs(location.fragment)
id_token = query['id_token'][0]
if oidc_client.idtoken_algo in (oidc_client.ALGO_RSA, oidc_client.ALGO_EC):
@ -1118,12 +1118,12 @@ def test_registration_service_slug(oidc_settings, app, simple_oidc_client, simpl
authorize_url = make_url('oidc-authorize', params=params)
response = app.get(authorize_url)
location = urlparse.urlparse(response['Location'])
query = urlparse.parse_qs(location.query)
location = urllib.parse.urlparse(response['Location'])
query = urllib.parse.parse_qs(location.query)
assert query['service'] == ['default client']
response = response.follow().click('Register')
location = urlparse.urlparse(response.request.url)
query = urlparse.parse_qs(location.query)
location = urllib.parse.urlparse(response.request.url)
query = urllib.parse.parse_qs(location.query)
assert query['service'] == ['default client']
response.form.set('email', 'john.doe@example.com')
@ -1317,8 +1317,8 @@ def test_claim_default_value(oidc_settings, normal_oidc_client, simple_user, app
authorize_url = make_url('oidc-authorize', params=params)
response = app.get(authorize_url)
location = urlparse.urlparse(response['Location'])
query = urlparse.parse_qs(location.query)
location = urllib.parse.urlparse(response['Location'])
query = urllib.parse.parse_qs(location.query)
code = query['code'][0]
token_url = make_url('oidc-token')
@ -1420,8 +1420,8 @@ def test_claim_templated(oidc_settings, normal_oidc_client, simple_user, app):
authorize_url = make_url('oidc-authorize', params=params)
response = app.get(authorize_url)
location = urlparse.urlparse(response['Location'])
query = urlparse.parse_qs(location.query)
location = urllib.parse.urlparse(response['Location'])
query = urllib.parse.parse_qs(location.query)
code = query['code'][0]
token_url = make_url('oidc-token')

View File

@ -21,6 +21,7 @@ import base64
import datetime
import hashlib
import re
import urllib.parse
import xml.etree.ElementTree as ET
import lasso
@ -31,7 +32,6 @@ from django.core.files import File
from django.template import Context, Template
from django.urls import reverse
from django.utils.encoding import force_bytes, force_str, force_text
from django.utils.six.moves.urllib import parse as urlparse
from django.utils.translation import gettext as _
from authentic2.a2_rbac.models import OrganizationalUnit, Role
@ -272,7 +272,7 @@ class SamlSP(object):
),
)
login.buildAuthnRequestMsg()
url_parsed = urlparse.urlparse(login.msgUrl)
url_parsed = urllib.parse.urlparse(login.msgUrl)
assert url_parsed.path == reverse('a2-idp-saml-sso'), 'msgUrl should target the sso endpoint'
if self.keys:
assert 'rsa-sha256' in login.msgUrl
@ -288,7 +288,7 @@ class SamlSP(object):
if response.location:
method = lasso.HTTP_METHOD_ARTIFACT_GET
query_string = response.location.split('?', 1)[1]
parsed_query_string = urlparse.parse_qs(query_string)
parsed_query_string = urllib.parse.parse_qs(query_string)
self.relay_state = parsed_query_string.get('RelayState')
login.msgRelayState = force_str(self.relay_state)
else: # lasso.HTTP_METHOD_ARTIFACT_POST, never happens
@ -334,7 +334,7 @@ class Scenario(object):
REDIRECT_FIELD_NAME: make_url('a2-idp-saml-continue', params={NONCE_FIELD_NAME: request_id}),
},
)
self.nonce = urlparse.parse_qs(urlparse.urlparse(response['Location']).query)['nonce'][0]
self.nonce = urllib.parse.parse_qs(urllib.parse.urlparse(response['Location']).query)['nonce'][0]
url = response['Location']
response = self.app.get(url)
assert response.status_code == 200

View File

@ -14,6 +14,7 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import builtins as __builtin__
import json
import random
@ -21,7 +22,6 @@ import pytest
from django import VERSION
from django.core import management
from django.core.exceptions import ValidationError
from django.utils.six.moves import builtins as __builtin__
from django_rbac.utils import get_role_model

View File

@ -18,6 +18,7 @@
import json
import os
import time
import urllib.parse
import ldap
import mock
@ -27,7 +28,6 @@ from django.core import mail, management
from django.core.exceptions import ImproperlyConfigured
from django.utils import timezone
from django.utils.encoding import force_bytes, force_text
from django.utils.six.moves.urllib import parse as urlparse
from ldap.dn import escape_dn_chars
from ldaptools.slapd import Slapd, has_slapd
@ -1479,7 +1479,7 @@ def test_sync_ldap_users(slapd, settings, app, db, capsys):
assert all(
[
user.userexternalid_set.first().external_id
== urlparse.quote(user.username.split('@')[0].encode('utf-8'))
== urllib.parse.quote(user.username.split('@')[0].encode('utf-8'))
for user in User.objects.all()
]
)

View File

@ -14,9 +14,10 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from urllib.parse import quote
import pytest
from django.contrib.auth import get_user_model
from django.utils.six.moves.urllib.parse import quote
from authentic2 import models

View File

@ -18,6 +18,7 @@
from __future__ import unicode_literals
import json
from urllib.parse import urlparse
import pytest
from django.contrib.auth import get_user_model
@ -25,7 +26,6 @@ from django.contrib.contenttypes.models import ContentType
from django.core import mail
from django.urls import reverse
from django.utils.encoding import force_bytes, force_str
from django.utils.six.moves.urllib.parse import urlparse
from webtest import Upload
from authentic2.a2_rbac.models import MANAGE_MEMBERS_OP

View File

@ -16,11 +16,11 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from datetime import date
from urllib.parse import urlparse
from django.contrib.auth import REDIRECT_FIELD_NAME, get_user_model
from django.urls import reverse
from django.utils.http import urlquote
from django.utils.six.moves.urllib.parse import urlparse
from authentic2 import models, utils
from authentic2.apps.journal.models import Event

View File

@ -27,7 +27,6 @@ import pytest
from django.contrib.auth import get_user_model
from django.contrib.contenttypes.models import ContentType
from django.urls import reverse
from django.utils.six import text_type
from webtest import Upload
from authentic2.a2_rbac.utils import get_default_ou, get_view_user_perm
@ -230,12 +229,12 @@ def test_manager_user_change_email(app, superuser_or_admin, simple_user, mailout
response = login(
app,
superuser_or_admin,
reverse('a2-manager-user-by-uuid-detail', kwargs={'slug': text_type(simple_user.uuid)}),
reverse('a2-manager-user-by-uuid-detail', kwargs={'slug': str(simple_user.uuid)}),
)
assert 'Change user email' in response.text
# cannot click it's a submit button :/
response = app.get(
reverse('a2-manager-user-by-uuid-change-email', kwargs={'slug': text_type(simple_user.uuid)})
reverse('a2-manager-user-by-uuid-change-email', kwargs={'slug': str(simple_user.uuid)})
)
assert response.form['new_email'].value == simple_user.email
response.form.set('new_email', NEW_EMAIL)
@ -270,12 +269,12 @@ def test_manager_user_change_email_no_change(app, superuser_or_admin, simple_use
response = login(
app,
superuser_or_admin,
reverse('a2-manager-user-by-uuid-detail', kwargs={'slug': text_type(simple_user.uuid)}),
reverse('a2-manager-user-by-uuid-detail', kwargs={'slug': str(simple_user.uuid)}),
)
assert 'Change user email' in response.text
# cannot click it's a submit button :/
response = app.get(
reverse('a2-manager-user-by-uuid-change-email', kwargs={'slug': text_type(simple_user.uuid)})
reverse('a2-manager-user-by-uuid-change-email', kwargs={'slug': str(simple_user.uuid)})
)
assert response.form['new_email'].value == simple_user.email
assert len(mailoutbox) == 0

View File

@ -16,11 +16,11 @@
# authentic2
import datetime
from urllib.parse import urlparse
import pytest
from django.urls import reverse
from django.utils.html import escape
from django.utils.six.moves.urllib.parse import urlparse
from authentic2.custom_user.models import DeletedUser, User

View File

@ -18,6 +18,7 @@
import base64
import re
import socket
import urllib.parse
from contextlib import closing, contextmanager
import httmock
@ -25,9 +26,7 @@ from django.core.management import call_command as django_call_command
from django.shortcuts import resolve_url
from django.test import TestCase
from django.urls import reverse
from django.utils import six
from django.utils.encoding import force_text, iri_to_uri
from django.utils.six.moves.urllib import parse as urlparse
from lxml import etree
from authentic2 import models, utils
@ -98,13 +97,13 @@ def assert_equals_url(url1, url2, **kwargs):
value.
"""
url1 = iri_to_uri(utils.make_url(url1, params=None))
splitted1 = urlparse.urlsplit(url1)
splitted1 = urllib.parse.urlsplit(url1)
url2 = iri_to_uri(utils.make_url(url2, params=kwargs))
splitted2 = urlparse.urlsplit(url2)
splitted2 = urllib.parse.urlsplit(url2)
for i, (elt1, elt2) in enumerate(zip(splitted1, splitted2)):
if i == 3:
elt1 = urlparse.parse_qs(elt1, True)
elt2 = urlparse.parse_qs(elt2, True)
elt1 = urllib.parse.parse_qs(elt1, True)
elt2 = urllib.parse.parse_qs(elt2, True)
for k, v in elt1.items():
elt1[k] = set(v)
for k, v in elt2.items():
@ -117,11 +116,11 @@ def assert_equals_url(url1, url2, **kwargs):
def assert_redirects_complex(response, expected_url, **kwargs):
assert response.status_code == 302, 'code should be 302'
scheme, netloc, path, query, fragment = urlparse.urlsplit(response.url)
e_scheme, e_netloc, e_path, e_query, e_fragment = urlparse.urlsplit(expected_url)
scheme, netloc, path, query, fragment = urllib.parse.urlsplit(response.url)
e_scheme, e_netloc, e_path, e_query, e_fragment = urllib.parse.urlsplit(expected_url)
e_scheme = e_scheme if e_scheme else scheme
e_netloc = e_netloc if e_netloc else netloc
expected_url = urlparse.urlunsplit((e_scheme, e_netloc, e_path, e_query, e_fragment))
expected_url = urllib.parse.urlunsplit((e_scheme, e_netloc, e_path, e_query, e_fragment))
assert_equals_url(response['Location'], expected_url, **kwargs)
@ -132,7 +131,7 @@ def assert_xpath_constraints(xml, constraints, namespaces):
for xpath, content in constraints:
nodes = doc.xpath(xpath, namespaces=namespaces)
assert len(nodes) > 0, 'xpath %s not found' % xpath
if isinstance(content, six.string_types):
if isinstance(content, str):
for node in nodes:
if hasattr(node, 'text'):
assert node.text == content, 'xpath %s does not contain %s but %s' % (