misc: remove usage of "six" module (#51517)

This commit is contained in:
Frédéric Péters 2021-02-28 16:19:33 +01:00
parent 0d53aa7cfa
commit e7292f6f3f
91 changed files with 535 additions and 584 deletions

View File

@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
import io
import json
import mock
import os
@ -10,8 +11,6 @@ try:
except ImportError:
lasso = None
from django.utils.six import StringIO
import pytest
from webtest import Upload
@ -502,7 +501,7 @@ def test_data_sources_export(pub):
resp = resp.click(href='export')
xml_export = resp.text
ds = StringIO(xml_export)
ds = io.StringIO(xml_export)
data_source2 = NamedDataSource.import_from_xml(ds)
assert data_source2.name == 'foobar'

View File

@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
import datetime
import io
import mock
import os
import re
@ -11,8 +12,6 @@ import xml.etree.ElementTree as ET
import pytest
from webtest import Upload
from django.utils.six import StringIO, BytesIO
from wcs.qommon.http_request import HTTPRequest
from wcs.qommon.errors import ConnectionError
from wcs.categories import Category
@ -962,7 +961,7 @@ def test_form_export(pub):
resp = resp.click(href='export')
xml_export = resp.text
fd = StringIO(xml_export)
fd = io.StringIO(xml_export)
formdef2 = FormDef.import_from_xml(fd)
assert formdef2.name == 'form title'
@ -1064,7 +1063,7 @@ def test_form_import_from_url(pub):
resp.form['url'] = 'http://remote.invalid/test.wcs'
resp = resp.form.submit()
assert 'Error loading form' in resp
urlopen.side_effect = lambda *args: StringIO(formdef_xml.decode())
urlopen.side_effect = lambda *args: io.StringIO(formdef_xml.decode())
resp.form['url'] = 'http://remote.invalid/test.wcs'
resp = resp.form.submit()
assert FormDef.count() == 1
@ -2368,7 +2367,7 @@ def test_form_archive(pub):
resp = resp.click(href='archive')
resp = resp.form.submit('submit')
assert resp.content_type == 'application/x-wcs-archive'
tf = tarfile.open(fileobj=BytesIO(resp.body))
tf = tarfile.open(fileobj=io.BytesIO(resp.body))
assert 'formdef' in [x.name for x in tf.getmembers()]
assert len(tf.getmembers()) == 8 # 7 formdata + 1 formdef
@ -2377,7 +2376,7 @@ def test_form_archive(pub):
resp = resp.click(href='archive')
resp = resp.form.submit('submit')
assert resp.content_type == 'application/x-wcs-archive'
tf = tarfile.open(fileobj=BytesIO(resp.body))
tf = tarfile.open(fileobj=io.BytesIO(resp.body))
assert 'formdef' in [x.name for x in tf.getmembers()]
assert len(tf.getmembers()) == 1 # 0 formdata + 1 formdef

View File

@ -1,7 +1,9 @@
# -*- coding: utf-8 -*-
import io
import logging
import os
import urllib.parse
import zipfile
try:
@ -13,9 +15,6 @@ import pytest
from webtest import Upload
import mock
from django.utils.six import BytesIO
from django.utils.six.moves.urllib import parse as urlparse
from quixote.http_request import Upload as QuixoteUpload
from wcs.qommon.form import UploadedFile
@ -113,11 +112,11 @@ def test_settings_export_import(pub):
resp = app.get('/backoffice/settings/export')
resp = resp.form.submit('submit')
assert resp.location.startswith('http://example.net/backoffice/processing?job=')
job_id = urlparse.parse_qs(urlparse.urlparse(resp.location).query)['job'][0]
job_id = urllib.parse.parse_qs(urllib.parse.urlparse(resp.location).query)['job'][0]
resp = resp.follow()
assert 'completed' in resp.text
resp = resp.click('Download Export')
zip_content = BytesIO(resp.body)
zip_content = io.BytesIO(resp.body)
zipf = zipfile.ZipFile(zip_content, 'a')
filelist = zipf.namelist()
assert len(filelist) == 0
@ -147,7 +146,7 @@ def test_settings_export_import(pub):
export_to.label = 'test'
upload = QuixoteUpload('/foo/bar', content_type='application/vnd.oasis.opendocument.text')
file_content = b'''PK\x03\x04\x14\x00\x00\x08\x00\x00\'l\x8eG^\xc62\x0c\'\x00'''
upload.fp = BytesIO()
upload.fp = io.BytesIO()
upload.fp.write(file_content)
upload.fp.seek(0)
export_to.model_file = UploadedFile('models', 'export_to_model-1.upload', upload)
@ -164,10 +163,10 @@ def test_settings_export_import(pub):
resp = app.get('/backoffice/settings/export')
resp = resp.form.submit('submit')
assert resp.location.startswith('http://example.net/backoffice/processing?job=')
job_id = urlparse.parse_qs(urlparse.urlparse(resp.location).query)['job'][0]
job_id = urllib.parse.parse_qs(urllib.parse.urlparse(resp.location).query)['job'][0]
resp = resp.follow()
resp = resp.click('Download Export')
zip_content = BytesIO(resp.body)
zip_content = io.BytesIO(resp.body)
zipf = zipfile.ZipFile(zip_content, 'a')
filelist = zipf.namelist()
assert 'formdefs/1' not in filelist
@ -243,10 +242,10 @@ def test_settings_export_import(pub):
resp.form['wscalls'] = False
resp = resp.form.submit('submit')
assert resp.location.startswith('http://example.net/backoffice/processing?job=')
job_id = urlparse.parse_qs(urlparse.urlparse(resp.location).query)['job'][0]
job_id = urllib.parse.parse_qs(urllib.parse.urlparse(resp.location).query)['job'][0]
resp = resp.follow()
resp = resp.click('Download Export')
zip_content = BytesIO(resp.body)
zip_content = io.BytesIO(resp.body)
zipf = zipfile.ZipFile(zip_content, 'a')
filelist = zipf.namelist()
assert 'formdefs_xml/%s' % formdef.id in filelist
@ -277,10 +276,10 @@ def test_settings_export_import(pub):
resp = app.get('/backoffice/settings/export')
resp = resp.form.submit('submit')
assert resp.location.startswith('http://example.net/backoffice/processing?job=')
job_id = urlparse.parse_qs(urlparse.urlparse(resp.location).query)['job'][0]
job_id = urllib.parse.parse_qs(urllib.parse.urlparse(resp.location).query)['job'][0]
resp = resp.follow()
resp = resp.click('Download Export')
zip_content = BytesIO(resp.body)
zip_content = io.BytesIO(resp.body)
zipf = zipfile.ZipFile(zip_content, 'a')
filelist = zipf.namelist()
assert len([x for x in filelist if 'roles/' in x]) == 0
@ -810,7 +809,7 @@ def test_settings_theme_download_upload(pub):
resp = app.get('/backoffice/settings/themes')
resp = resp.click('download', index=0)
assert resp.headers['content-type'] == 'application/zip'
zip_content = BytesIO(resp.body)
zip_content = io.BytesIO(resp.body)
zipf = zipfile.ZipFile(zip_content, 'a')
filelist = zipf.namelist()
assert 'alto/icon.png' in filelist

View File

@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
from io import StringIO
import io
import mock
import os
import re
@ -358,7 +358,7 @@ def test_workflows_import_from_url(pub):
resp.form['url'] = 'http://remote.invalid/test.wcs'
resp = resp.form.submit()
assert 'Error loading form' in resp
urlopen.side_effect = lambda *args: StringIO(wf_export.decode())
urlopen.side_effect = lambda *args: io.StringIO(wf_export.decode())
resp.form['url'] = 'http://remote.invalid/test.wcs'
resp = resp.form.submit()
assert Workflow.count() == 2

View File

@ -1,9 +1,8 @@
# -*- coding: utf-8 -*-
import io
import xml.etree.ElementTree as ET
from django.utils.six import StringIO
import pytest
from webtest import Upload
@ -159,7 +158,7 @@ def test_wscalls_export(pub, wscall):
resp = resp.click(href='export')
xml_export = resp.text
ds = StringIO(xml_export)
ds = io.StringIO(xml_export)
wscall2 = NamedWsCall.import_from_xml(ds)
assert wscall2.name == 'xxx'

View File

@ -6,10 +6,10 @@ import hashlib
import hmac
import os
import shutil
import urllib.parse
import pytest
from django.utils.encoding import force_bytes
from django.utils.six.moves.urllib import parse as urllib
from quixote import get_publisher
from utilities import clean_temporary_pub, create_temporary_pub, get_app, login
@ -145,7 +145,7 @@ def test_get_user_from_api_query_string_error_invalid_signature(pub):
def test_get_user_from_api_query_string_error_missing_timestamp(pub):
signature = urllib.quote(
signature = urllib.parse.quote(
base64.b64encode(hmac.new(b'1234', b'format=json&orig=coucou&algo=sha1', hashlib.sha1).digest())
)
output = get_app(pub).get(
@ -157,7 +157,9 @@ def test_get_user_from_api_query_string_error_missing_timestamp(pub):
def test_get_user_from_api_query_string_error_missing_email(pub):
timestamp = datetime.datetime.utcnow().isoformat()[:19] + 'Z'
query = 'format=json&orig=coucou&algo=sha1&timestamp=' + timestamp
signature = urllib.quote(base64.b64encode(hmac.new(b'1234', force_bytes(query), hashlib.sha1).digest()))
signature = urllib.parse.quote(
base64.b64encode(hmac.new(b'1234', force_bytes(query), hashlib.sha1).digest())
)
output = get_app(pub).get('/api/user/?%s&signature=%s' % (query, signature), status=403)
assert output.json['err_desc'] == 'no user specified'
@ -165,7 +167,9 @@ def test_get_user_from_api_query_string_error_missing_email(pub):
def test_get_user_from_api_query_string_error_unknown_nameid(pub):
timestamp = datetime.datetime.utcnow().isoformat()[:19] + 'Z'
query = 'format=json&orig=coucou&algo=sha1&NameID=xxx&timestamp=' + timestamp
signature = urllib.quote(base64.b64encode(hmac.new(b'1234', force_bytes(query), hashlib.sha1).digest()))
signature = urllib.parse.quote(
base64.b64encode(hmac.new(b'1234', force_bytes(query), hashlib.sha1).digest())
)
output = get_app(pub).get('/api/user/?%s&signature=%s' % (query, signature), status=403)
assert output.json['err_desc'] == 'unknown NameID'
@ -175,7 +179,9 @@ def test_get_user_from_api_query_string_error_missing_email_valid_endpoint(pub):
# works fine without user.
timestamp = datetime.datetime.utcnow().isoformat()[:19] + 'Z'
query = 'format=json&orig=coucou&algo=sha1&timestamp=' + timestamp
signature = urllib.quote(base64.b64encode(hmac.new(b'1234', force_bytes(query), hashlib.sha1).digest()))
signature = urllib.parse.quote(
base64.b64encode(hmac.new(b'1234', force_bytes(query), hashlib.sha1).digest())
)
output = get_app(pub).get('/categories?%s&signature=%s' % (query, signature))
assert output.json == {'data': []}
output = get_app(pub).get('/json?%s&signature=%s' % (query, signature))
@ -186,7 +192,9 @@ def test_get_user_from_api_query_string_error_unknown_nameid_valid_endpoint(pub)
# check the categories and forms endpoints accept an unknown NameID
timestamp = datetime.datetime.utcnow().isoformat()[:19] + 'Z'
query = 'format=json&NameID=xxx&orig=coucou&algo=sha1&timestamp=' + timestamp
signature = urllib.quote(base64.b64encode(hmac.new(b'1234', force_bytes(query), hashlib.sha1).digest()))
signature = urllib.parse.quote(
base64.b64encode(hmac.new(b'1234', force_bytes(query), hashlib.sha1).digest())
)
output = get_app(pub).get('/categories?%s&signature=%s' % (query, signature))
assert output.json == {'data': []}
output = get_app(pub).get('/json?%s&signature=%s' % (query, signature))
@ -197,11 +205,13 @@ def test_get_user_from_api_query_string_error_success_sha1(pub, local_user):
timestamp = datetime.datetime.utcnow().isoformat()[:19] + 'Z'
query = (
'format=json&orig=coucou&algo=sha1&email='
+ urllib.quote(local_user.email)
+ urllib.parse.quote(local_user.email)
+ '&timestamp='
+ timestamp
)
signature = urllib.quote(base64.b64encode(hmac.new(b'1234', force_bytes(query), hashlib.sha1).digest()))
signature = urllib.parse.quote(
base64.b64encode(hmac.new(b'1234', force_bytes(query), hashlib.sha1).digest())
)
output = get_app(pub).get('/api/user/?%s&signature=%s' % (query, signature))
assert output.json['user_display_name'] == u'Jean Darmette'
@ -210,11 +220,13 @@ def test_get_user_from_api_query_string_error_invalid_signature_algo_mismatch(pu
timestamp = datetime.datetime.utcnow().isoformat()[:19] + 'Z'
query = (
'format=json&orig=coucou&algo=sha256&email='
+ urllib.quote(local_user.email)
+ urllib.parse.quote(local_user.email)
+ '&timestamp='
+ timestamp
)
signature = urllib.quote(base64.b64encode(hmac.new(b'1234', force_bytes(query), hashlib.sha1).digest()))
signature = urllib.parse.quote(
base64.b64encode(hmac.new(b'1234', force_bytes(query), hashlib.sha1).digest())
)
output = get_app(pub).get('/api/user/?%s&signature=%s' % (query, signature), status=403)
assert output.json['err_desc'] == 'invalid signature'
@ -223,18 +235,21 @@ def test_get_user_from_api_query_string_error_success_sha256(pub, local_user):
timestamp = datetime.datetime.utcnow().isoformat()[:19] + 'Z'
query = (
'format=json&orig=coucou&algo=sha256&email='
+ urllib.quote(local_user.email)
+ urllib.parse.quote(local_user.email)
+ '&timestamp='
+ timestamp
)
signature = urllib.quote(base64.b64encode(hmac.new(b'1234', force_bytes(query), hashlib.sha256).digest()))
signature = urllib.parse.quote(
base64.b64encode(hmac.new(b'1234', force_bytes(query), hashlib.sha256).digest())
)
output = get_app(pub).get('/api/user/?%s&signature=%s' % (query, signature))
assert output.json['user_display_name'] == u'Jean Darmette'
def test_sign_url(pub, local_user):
signed_url = sign_url(
'http://example.net/api/user/?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email),
'http://example.net/api/user/?format=json&orig=coucou&email=%s'
% urllib.parse.quote(local_user.email),
'1234',
)
url = signed_url[len('http://example.net') :]
@ -245,7 +260,8 @@ def test_sign_url(pub, local_user):
get_app(pub).get('%s&foo=bar' % url, status=403)
signed_url = sign_url(
'http://example.net/api/user/?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email),
'http://example.net/api/user/?format=json&orig=coucou&email=%s'
% urllib.parse.quote(local_user.email),
'12345',
)
url = signed_url[len('http://example.net') :]
@ -259,7 +275,8 @@ def test_get_user(pub, local_user):
local_user.roles = [role.id]
local_user.store()
signed_url = sign_url(
'http://example.net/api/user/?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email),
'http://example.net/api/user/?format=json&orig=coucou&email=%s'
% urllib.parse.quote(local_user.email),
'1234',
)
url = signed_url[len('http://example.net') :]
@ -284,7 +301,7 @@ def test_api_access_from_xml_storable_object(pub, local_user, admin_user):
local_user.store()
signed_url = sign_url(
'http://example.net/api/user/?format=json&orig=UNKNOWN_ACCESS&email=%s'
% (urllib.quote(local_user.email)),
% (urllib.parse.quote(local_user.email)),
'5678',
)
url = signed_url[len('http://example.net') :]
@ -292,7 +309,8 @@ def test_api_access_from_xml_storable_object(pub, local_user, admin_user):
assert output.json['err_desc'] == 'invalid orig'
signed_url = sign_url(
'http://example.net/api/user/?format=json&orig=salut&email=%s' % (urllib.quote(local_user.email)),
'http://example.net/api/user/?format=json&orig=salut&email=%s'
% (urllib.parse.quote(local_user.email)),
'5678',
)
url = signed_url[len('http://example.net') :]
@ -313,7 +331,7 @@ def test_is_url_signed_check_nonce(pub, local_user, freezer):
pub.clean_nonces(now=0)
nonce_dir = os.path.join(pub.app_dir, 'nonces')
assert not os.path.exists(nonce_dir) or not os.listdir(nonce_dir)
signed_url = sign_url('?format=json&orig=%s&email=%s' % (ORIG, urllib.quote(local_user.email)), KEY)
signed_url = sign_url('?format=json&orig=%s&email=%s' % (ORIG, urllib.parse.quote(local_user.email)), KEY)
req = HTTPRequest(
None, {'SCRIPT_NAME': '/', 'SERVER_NAME': 'example.net', 'QUERY_STRING': signed_url[1:]}
)
@ -343,7 +361,8 @@ def test_is_url_signed_check_nonce(pub, local_user, freezer):
def test_get_user_compat_endpoint(pub, local_user):
signed_url = sign_url(
'http://example.net/user?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email), '1234'
'http://example.net/user?format=json&orig=coucou&email=%s' % urllib.parse.quote(local_user.email),
'1234',
)
url = signed_url[len('http://example.net') :]
output = get_app(pub).get(url)

View File

@ -1,11 +1,11 @@
# -*- coding: utf-8 -*-
import io
import json
import os
import mock
import pytest
from django.utils.six import StringIO
from quixote import get_publisher
from utilities import clean_temporary_pub, create_temporary_pub, get_app
@ -130,7 +130,7 @@ def test_validate_condition(pub):
def test_reverse_geocoding(pub):
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
urlopen.side_effect = lambda *args: StringIO(json.dumps({'address': 'xxx'}))
urlopen.side_effect = lambda *args: io.StringIO(json.dumps({'address': 'xxx'}))
get_app(pub).get('/api/reverse-geocoding', status=400)
resp = get_app(pub).get('/api/reverse-geocoding?lat=0&lon=0')
assert resp.content_type == 'application/json'
@ -170,7 +170,7 @@ def test_reverse_geocoding(pub):
def test_geocoding(pub):
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
urlopen.side_effect = lambda *args: StringIO(json.dumps([{'lat': 0, 'lon': 0}]))
urlopen.side_effect = lambda *args: io.StringIO(json.dumps([{'lat': 0, 'lon': 0}]))
get_app(pub).get('/api/geocoding', status=400)
resp = get_app(pub).get('/api/geocoding?q=test')
assert resp.content_type == 'application/json'

View File

@ -1,15 +1,15 @@
# -*- coding: utf-8 -*-
import base64
import io
import json
import os
import time
import urllib.parse
import mock
import pytest
from django.utils.encoding import force_text
from django.utils.six import StringIO
from django.utils.six.moves.urllib import parse as urllib
from quixote import get_publisher
from utilities import clean_temporary_pub, create_temporary_pub, get_app
@ -225,7 +225,7 @@ def test_card_submit(pub, local_user):
def url():
signed_url = sign_url(
'http://example.net/api/cards/test/submit'
+ '?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email),
+ '?format=json&orig=coucou&email=%s' % urllib.parse.quote(local_user.email),
'1234',
)
return signed_url[len('http://example.net') :]
@ -301,7 +301,7 @@ def test_carddef_submit_with_varname(pub, local_user):
signed_url = sign_url(
'http://example.net/api/cards/test/submit'
+ '?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email),
+ '?format=json&orig=coucou&email=%s' % urllib.parse.quote(local_user.email),
'1234',
)
url = signed_url[len('http://example.net') :]
@ -429,7 +429,8 @@ def test_carddef_submit_from_wscall(pub, local_user):
def url():
signed_url = sign_url(
'http://example.net/api/cards/test/submit?orig=coucou&email=%s' % urllib.quote(local_user.email),
'http://example.net/api/cards/test/submit?orig=coucou&email=%s'
% urllib.parse.quote(local_user.email),
'1234',
)
return signed_url[len('http://example.net') :]
@ -516,13 +517,13 @@ def test_formdef_submit_structured(pub, local_user):
signed_url = sign_url(
'http://example.net/api/cards/test/submit'
'?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email),
'?format=json&orig=coucou&email=%s' % urllib.parse.quote(local_user.email),
'1234',
)
url = signed_url[len('http://example.net') :]
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
urlopen.side_effect = lambda *args: StringIO(
urlopen.side_effect = lambda *args: io.StringIO(
'''\
{"data": [{"id": 0, "text": "zéro", "foo": "bar"}, \
{"id": 1, "text": "uné", "foo": "bar1"}, \

View File

@ -1,11 +1,11 @@
# -*- coding: utf-8 -*-
import io
import os
import xml.etree.ElementTree as ET
import zipfile
import pytest
from django.utils.six import BytesIO
from quixote import get_publisher
from utilities import clean_temporary_pub, create_temporary_pub, get_app
@ -245,7 +245,7 @@ def test_api_ods_formdata_custom_view(pub, local_user):
# check it now gets the data
resp = get_app(pub).get(sign_uri('/api/forms/test/ods', user=local_user))
zipf = zipfile.ZipFile(BytesIO(resp.body))
zipf = zipfile.ZipFile(io.BytesIO(resp.body))
ods_sheet = ET.parse(zipf.open('content.xml'))
assert len(ods_sheet.findall('.//{%s}table-row' % ods.NS['table'])) == 11
@ -259,7 +259,7 @@ def test_api_ods_formdata_custom_view(pub, local_user):
custom_view.store()
resp = get_app(pub).get(sign_uri('/api/forms/test/ods/custom-view', user=local_user))
zipf = zipfile.ZipFile(BytesIO(resp.body))
zipf = zipfile.ZipFile(io.BytesIO(resp.body))
ods_sheet = ET.parse(zipf.open('content.xml'))
assert len(ods_sheet.findall('.//{%s}table-row' % ods.NS['table'])) == 21

View File

@ -2,6 +2,7 @@
import base64
import datetime
import io
import os
import re
import time
@ -10,7 +11,6 @@ import zipfile
import pytest
from django.utils.encoding import force_bytes
from django.utils.six import BytesIO
from quixote import get_publisher
from utilities import clean_temporary_pub, create_temporary_pub, get_app, login
@ -900,7 +900,7 @@ def test_api_ods_formdata(pub, local_user):
resp = get_app(pub).get(sign_uri('/api/forms/test/ods', user=local_user))
assert resp.content_type == 'application/vnd.oasis.opendocument.spreadsheet'
zipf = zipfile.ZipFile(BytesIO(resp.body))
zipf = zipfile.ZipFile(io.BytesIO(resp.body))
ods_sheet = ET.parse(zipf.open('content.xml'))
assert len(ods_sheet.findall('.//{%s}table-row' % ods.NS['table'])) == 311

View File

@ -2,15 +2,15 @@
import base64
import datetime
import io
import json
import os
import time
import urllib.parse
import mock
import pytest
from django.utils.encoding import force_text
from django.utils.six import StringIO
from django.utils.six.moves.urllib import parse as urllib
from quixote import get_publisher
from utilities import clean_temporary_pub, create_temporary_pub, get_app
@ -352,7 +352,7 @@ def test_formdef_schema(pub):
formdef.store()
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
urlopen.side_effect = lambda *args: StringIO(
urlopen.side_effect = lambda *args: io.StringIO(
'''\
{"data": [{"id": 0, "text": "zéro", "foo": "bar"}, \
{"id": 1, "text": "uné", "foo": "bar1"}, \
@ -465,7 +465,7 @@ def test_formdef_submit(pub, local_user):
def url():
signed_url = sign_url(
'http://example.net/api/formdefs/test/submit'
+ '?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email),
+ '?format=json&orig=coucou&email=%s' % urllib.parse.quote(local_user.email),
'1234',
)
return signed_url[len('http://example.net') :]
@ -554,7 +554,7 @@ def test_formdef_submit_only_one(pub, local_user):
def url():
signed_url = sign_url(
'http://example.net/api/formdefs/test/submit'
+ '?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email),
+ '?format=json&orig=coucou&email=%s' % urllib.parse.quote(local_user.email),
'1234',
)
return signed_url[len('http://example.net') :]
@ -609,7 +609,7 @@ def test_formdef_submit_with_varname(pub, local_user):
signed_url = sign_url(
'http://example.net/api/formdefs/test/submit'
+ '?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email),
+ '?format=json&orig=coucou&email=%s' % urllib.parse.quote(local_user.email),
'1234',
)
url = signed_url[len('http://example.net') :]
@ -793,13 +793,13 @@ def test_formdef_submit_structured(pub, local_user):
def url():
signed_url = sign_url(
'http://example.net/api/formdefs/test/submit'
'?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email),
'?format=json&orig=coucou&email=%s' % urllib.parse.quote(local_user.email),
'1234',
)
return signed_url[len('http://example.net') :]
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
urlopen.side_effect = lambda *args: StringIO(
urlopen.side_effect = lambda *args: io.StringIO(
'''\
{"data": [{"id": 0, "text": "zéro", "foo": "bar"}, \
{"id": 1, "text": "uné", "foo": "bar1"}, \

View File

@ -4,23 +4,22 @@ import base64
import datetime
import hashlib
import hmac
import urllib.parse
from django.utils.encoding import force_bytes
from django.utils.six.moves.urllib import parse as urllib
from django.utils.six.moves.urllib import parse as urlparse
def sign_uri(uri, user=None, format='json'):
timestamp = datetime.datetime.utcnow().isoformat()[:19] + 'Z'
scheme, netloc, path, params, query, fragment = urlparse.urlparse(uri)
scheme, netloc, path, params, query, fragment = urllib.parse.urlparse(uri)
if query:
query += '&'
if format:
query += 'format=%s&' % format
query += 'orig=coucou&algo=sha256&timestamp=' + timestamp
if user:
query += '&email=' + urllib.quote(user.email)
query += '&signature=%s' % urllib.quote(
query += '&email=' + urllib.parse.quote(user.email)
query += '&signature=%s' % urllib.parse.quote(
base64.b64encode(hmac.new(b'1234', force_bytes(query), hashlib.sha256).digest())
)
return urlparse.urlunparse((scheme, netloc, path, params, query, fragment))
return urllib.parse.urlunparse((scheme, netloc, path, params, query, fragment))

View File

@ -1,5 +1,7 @@
# -*- coding: utf-8 -*-
import datetime
import io
import json
import os
import re
@ -10,8 +12,6 @@ import zipfile
import mock
import pytest
from django.utils.six import StringIO, BytesIO
from quixote import get_publisher
from quixote.http_request import Upload as QuixoteUpload
from wcs.blocks import BlockDef
@ -2082,7 +2082,7 @@ def test_backoffice_download_as_zip(pub):
formdef.store()
resp = app.get('/backoffice/management/form-title/%s/' % number31.id)
resp = resp.click('Download all files as .zip')
zip_content = BytesIO(resp.body)
zip_content = io.BytesIO(resp.body)
zipf = zipfile.ZipFile(zip_content, 'a')
filelist = zipf.namelist()
assert set(filelist) == {'1_bar', '2_bar'}
@ -2725,7 +2725,7 @@ def test_backoffice_wfedit_and_data_source_with_user_info(pub):
def side_effect(url, *args):
assert '?name_id=admin' in url
return StringIO(json.dumps(data))
return io.StringIO(json.dumps(data))
urlopen.side_effect = side_effect
@ -2779,7 +2779,7 @@ def test_backoffice_wfedit_and_workflow_data(pub):
def side_effect(url, *args):
assert '?test=foobar' in url
return StringIO(json.dumps(data))
return io.StringIO(json.dumps(data))
urlopen.side_effect = side_effect
@ -2835,7 +2835,7 @@ def test_backoffice_wfedit_and_data_source_with_field_info(pub):
def side_effect(url, *args):
assert '?xxx=FOO BAR 30' in url
return StringIO(json.dumps(data))
return io.StringIO(json.dumps(data))
urlopen.side_effect = side_effect
@ -4212,9 +4212,9 @@ def test_backoffice_workflow_form_with_live_data_source(pub):
def side_effect(url, *args):
if 'toto' not in url:
return StringIO(json.dumps(data1))
return io.StringIO(json.dumps(data1))
else:
return StringIO(json.dumps(data2))
return io.StringIO(json.dumps(data2))
urlopen.side_effect = side_effect
@ -5560,7 +5560,7 @@ def test_workflow_inspect_page(pub):
export_to.convert_to_pdf = False
export_to.label = 'create doc'
upload = QuixoteUpload('/foo/test.rtf', content_type='application/rtf')
upload.fp = BytesIO()
upload.fp = io.BytesIO()
upload.fp.write(b'HELLO WORLD')
upload.fp.seek(0)
export_to.model_file = UploadedFile(pub.app_dir, None, upload)

View File

@ -1,15 +1,15 @@
# -*- coding: utf-8 -*-
import datetime
import io
import os
import time
import urllib.parse
import xml.etree.ElementTree as ET
import zipfile
import pytest
from django.utils.six import BytesIO
from django.utils.six.moves.urllib import parse as urlparse
from wcs.qommon import ods
from wcs.blocks import BlockDef
from wcs.qommon.form import PicklableUpload
@ -200,7 +200,7 @@ def test_backoffice_export_long_listings(pub, threshold):
resp = app.get('/backoffice/management/form-title/')
resp = resp.click('Export a Spreadsheet')
assert resp.location.startswith('http://example.net/backoffice/processing?job=')
job_id = urlparse.parse_qs(urlparse.urlparse(resp.location).query)['job'][0]
job_id = urllib.parse.parse_qs(urllib.parse.urlparse(resp.location).query)['job'][0]
resp = resp.follow()
assert 'completed' in resp.text
resp = resp.click('Download Export')
@ -451,7 +451,7 @@ def test_backoffice_ods(pub):
assert 'filename=form-title.ods' in resp.headers['content-disposition']
assert resp.body[:2] == b'PK' # ods has a zip container
zipf = zipfile.ZipFile(BytesIO(resp.body))
zipf = zipfile.ZipFile(io.BytesIO(resp.body))
ods_sheet = ET.parse(zipf.open('content.xml'))
# check the ods contains a link to the document
elem = ods_sheet.findall('.//{%s}a' % ods.NS['text'])[0]

View File

@ -3,9 +3,9 @@ import datetime
import os
import re
import time
import urllib.parse
import pytest
from django.utils.six.moves.urllib import parse as urllib
from utilities import clean_temporary_pub, create_temporary_pub, get_app, login
from wcs import fields
@ -290,7 +290,7 @@ def test_backoffice_submission_initiated_from_welco(pub, welco_url):
def post_formdata():
signed_url = sign_url(
'http://example.net/api/formdefs/form-title/submit'
'?format=json&orig=coucou&email=%s' % urllib.quote(user.email),
'?format=json&orig=coucou&email=%s' % urllib.parse.quote(user.email),
'1234',
)
url = signed_url[len('http://example.net') :]

View File

@ -1,5 +1,5 @@
import configparser
import os
from django.utils.six.moves import configparser as ConfigParser
import pytest
@ -7,7 +7,7 @@ from utilities import EmailsMocking, SMSMocking, HttpRequestsMocking
def site_options(request, pub, section, variable, value):
config = ConfigParser.ConfigParser()
config = configparser.ConfigParser()
path = os.path.join(pub.app_dir, 'site-options.cfg')
if os.path.exists(path):
config.read([path])
@ -18,7 +18,7 @@ def site_options(request, pub, section, variable, value):
config.write(site_option)
def fin():
config = ConfigParser.ConfigParser()
config = configparser.ConfigParser()
if os.path.exists(path):
config.read([path])
config.remove_option(section, variable)

View File

@ -1,12 +1,15 @@
# -*- coding: utf-8 -*-
import io
import json
import pytest
import hashlib
import os
import re
import time
import urllib.parse
import zipfile
from webtest import Upload, Hidden, Radio
import mock
import xml.etree.ElementTree as ET
@ -16,9 +19,6 @@ try:
except ImportError:
Image = None
from django.utils.six import StringIO, BytesIO
from django.utils.six.moves.urllib import parse as urlparse
from django.utils.encoding import force_bytes, force_text
from wcs.qommon import force_str
@ -2627,7 +2627,7 @@ def test_form_edit_autocomplete_list(pub):
{'id': '2', 'text': 'world', 'extra': 'bar'},
]
}
urlopen.side_effect = lambda *args: StringIO(json.dumps(data))
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
resp = app.get('/test/')
assert 'data-select2-url=' in resp.text
# simulate select2 mode, with qommon.forms.js adding an extra hidden widget
@ -2771,13 +2771,13 @@ def test_form_item_data_source_field_submit(pub):
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
data = {'data': [{'id': '1', 'text': 'un'}, {'id': '2', 'text': 'deux'}]}
urlopen.side_effect = lambda *args: StringIO(json.dumps(data))
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
assert submit_item_data_source_field(ds) == {'0': '1', '0_display': 'un'}
# numeric identifiers
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
data = {'data': [{'id': 1, 'text': 'un'}, {'id': 2, 'text': 'deux'}]}
urlopen.side_effect = lambda *args: StringIO(json.dumps(data))
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
assert submit_item_data_source_field(ds) == {'0': '1', '0_display': 'un'}
@ -2954,7 +2954,7 @@ def test_form_page_session_var_prefill(pub):
# check it's not set if it's not whitelisted
resp = get_app(pub).get('/?session_var_foo=hello')
assert urlparse.urlparse(resp.location).path == '/'
assert urllib.parse.urlparse(resp.location).path == '/'
resp = resp.follow()
resp = resp.click('test')
assert resp.forms[0]['f0'].value == ''
@ -2967,14 +2967,14 @@ query_string_allowed_vars = foo,bar
)
resp = get_app(pub).get('/?session_var_foo=hello')
assert urlparse.urlparse(resp.location).path == '/'
assert urllib.parse.urlparse(resp.location).path == '/'
resp = resp.follow()
resp = resp.click('test')
assert resp.forms[0]['f0'].value == 'hello'
# check it survives a login
resp = get_app(pub).get('/?session_var_foo=hello2')
assert urlparse.urlparse(resp.location).path == '/'
assert urllib.parse.urlparse(resp.location).path == '/'
resp = resp.follow()
resp = resp.click('Login')
resp = resp.follow()
@ -2987,15 +2987,15 @@ query_string_allowed_vars = foo,bar
# check repeated options are ignored
resp = get_app(pub).get('/?session_var_foo=hello&session_var_foo=hello2')
assert urlparse.urlparse(resp.location).path == '/'
assert urllib.parse.urlparse(resp.location).path == '/'
resp = resp.follow()
resp = resp.click('test')
assert resp.forms[0]['f0'].value == ''
# check extra query string parameters are not lost
resp = get_app(pub).get('/?session_var_foo=hello&foo=bar')
assert urlparse.urlparse(resp.location).path == '/'
assert urlparse.urlparse(resp.location).query == 'foo=bar'
assert urllib.parse.urlparse(resp.location).path == '/'
assert urllib.parse.urlparse(resp.location).query == 'foo=bar'
os.unlink(os.path.join(pub.app_dir, 'site-options.cfg'))
@ -3156,7 +3156,7 @@ def test_form_page_formula_prefill_items_field(pub):
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
data = {'data': [{'id': '1', 'text': 'hello'}, {'id': '2', 'text': 'world'}]}
urlopen.side_effect = lambda *args: StringIO(json.dumps(data))
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
resp = get_app(pub).get('/test/')
assert not resp.form['f0$element1'].checked
assert resp.form['f0$element2'].checked
@ -3358,7 +3358,7 @@ def test_form_file_field_with_fargo(pub, fargo_url):
fargo_resp = app.get('/fargo/pick') # display file picker
assert fargo_resp.location == 'http://fargo.example.net/pick/?pick=http%3A//example.net/fargo/pick'
with mock.patch('wcs.portfolio.urlopen') as urlopen:
urlopen.side_effect = lambda *args: BytesIO(b'...')
urlopen.side_effect = lambda *args: io.BytesIO(b'...')
fargo_resp = app.get('/fargo/pick?url=http://www.example.org/...')
assert 'window.top.document.fargo_set_token' in fargo_resp.text
resp.form['f0$file'] = None
@ -5511,7 +5511,7 @@ def test_item_field_with_disabled_items(http_requests, pub):
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
data = {'data': [{'id': '1', 'text': 'hello'}, {'id': '2', 'text': 'world'}]}
urlopen.side_effect = lambda *args: StringIO(json.dumps(data))
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
resp = get_app(pub).get('/test/')
resp.form['f0'] = '1'
resp.form['f0'] = '2'
@ -5524,7 +5524,7 @@ def test_item_field_with_disabled_items(http_requests, pub):
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
data = {'data': [{'id': '1', 'text': 'hello', 'disabled': True}, {'id': '2', 'text': 'world'}]}
urlopen.side_effect = lambda *args: StringIO(json.dumps(data))
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
resp = get_app(pub).get('/test/')
pq = resp.pyquery.remove_namespaces()
assert pq('option[disabled=disabled][value="1"]').text() == 'hello'
@ -5550,7 +5550,7 @@ def test_item_field_with_disabled_items(http_requests, pub):
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
data = {'data': [{'id': '1', 'text': 'hello', 'disabled': True}, {'id': '2', 'text': 'world'}]}
urlopen.side_effect = lambda *args: StringIO(json.dumps(data))
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
resp = get_app(pub).get('/test/')
pq = resp.pyquery.remove_namespaces()
assert len(pq('option[disabled=disabled][value="1"]')) == 0
@ -5575,7 +5575,7 @@ def test_item_field_with_disabled_items(http_requests, pub):
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
data = {'data': [{'id': '1', 'text': 'hello'}, {'id': '2', 'text': 'world'}]}
urlopen.side_effect = lambda *args: StringIO(json.dumps(data))
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
resp = get_app(pub).get('/test/')
resp.form['f0'] = '1'
resp.form['f0'] = '2'
@ -5588,7 +5588,7 @@ def test_item_field_with_disabled_items(http_requests, pub):
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
data = {'data': [{'id': '1', 'text': 'hello', 'disabled': True}, {'id': '2', 'text': 'world'}]}
urlopen.side_effect = lambda *args: StringIO(json.dumps(data))
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
resp = get_app(pub).get('/test/')
pq = resp.pyquery.remove_namespaces()
assert len(pq('input[name="f0"][disabled=disabled][value="1"]')) == 1
@ -5619,7 +5619,7 @@ def test_items_field_with_disabled_items(http_requests, pub):
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
data = {'data': [{'id': '1', 'text': 'hello'}, {'id': '2', 'text': 'world'}]}
urlopen.side_effect = lambda *args: StringIO(json.dumps(data))
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
resp = get_app(pub).get('/test/')
resp.form['f0$element1'].checked = True
resp.form['f0$element2'].checked = True
@ -5632,7 +5632,7 @@ def test_items_field_with_disabled_items(http_requests, pub):
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
data = {'data': [{'id': '1', 'text': 'hello', 'disabled': True}, {'id': '2', 'text': 'world'}]}
urlopen.side_effect = lambda *args: StringIO(json.dumps(data))
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
resp = get_app(pub).get('/test/')
assert 'disabled' in resp.form['f0$element1'].attrs
resp.form['f0$element1'].checked = True
@ -5650,7 +5650,7 @@ def test_items_field_with_disabled_items(http_requests, pub):
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
data = {'data': [{'id': '1', 'text': 'hello', 'disabled': True}, {'id': '2', 'text': 'world'}]}
urlopen.side_effect = lambda *args: StringIO(json.dumps(data))
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
resp = get_app(pub).get('/test/')
assert not 'f0$element1' in resp.form.fields
resp.form['f0$element2'].checked = True
@ -5688,7 +5688,7 @@ def test_item_field_autocomplete_json_source(http_requests, pub):
{'id': '2', 'text': 'world', 'extra': 'bar'},
]
}
urlopen.side_effect = lambda *args: StringIO(json.dumps(data))
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
resp = get_app(pub).get('/test/')
assert 'data-autocomplete="true"' in resp.text
assert resp.form['f0'].value == '1'
@ -5709,7 +5709,7 @@ def test_item_field_autocomplete_json_source(http_requests, pub):
{'id': '2', 'text': 'world', 'extra': 'bar'},
]
}
urlopen.side_effect = lambda *args: StringIO(json.dumps(data))
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
resp = get_app(pub).get('/test/')
assert 'data-autocomplete="true"' in resp.text
assert 'data-hint="help text"' in resp.text
@ -5733,7 +5733,7 @@ def test_item_field_autocomplete_json_source(http_requests, pub):
{'id': '2', 'text': 'world', 'extra': 'bar'},
]
}
urlopen.side_effect = lambda *args: StringIO(json.dumps(data))
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
resp = app.get('/test/')
assert urlopen.call_count == 0
pq = resp.pyquery.remove_namespaces()
@ -5741,7 +5741,7 @@ def test_item_field_autocomplete_json_source(http_requests, pub):
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
data = {'data': [{'id': '1', 'text': 'hello', 'extra': 'foo'}]}
urlopen.side_effect = lambda *args: StringIO(json.dumps(data))
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
resp2 = app.get(select2_url + '?q=hell')
assert urlopen.call_count == 1
assert urlopen.call_args[0][0] == 'http://remote.example.net/json?q=hell'
@ -5757,7 +5757,7 @@ def test_item_field_autocomplete_json_source(http_requests, pub):
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
data = {'data': [{'id': '1', 'text': 'hello', 'extra': 'foo'}]}
urlopen.side_effect = lambda *args: StringIO(json.dumps(data))
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
resp = resp.form.submit('submit') # -> validation page
assert urlopen.call_count == 1
assert urlopen.call_args[0][0] == 'http://remote.example.net/json?id=1'
@ -5766,7 +5766,7 @@ def test_item_field_autocomplete_json_source(http_requests, pub):
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
data = {'data': [{'id': '1', 'text': 'hello', 'extra': 'foo'}]}
urlopen.side_effect = lambda *args: StringIO(json.dumps(data))
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
resp = resp.form.submit('submit') # -> submit
assert urlopen.call_count == 1
assert urlopen.call_args[0][0] == 'http://remote.example.net/json?id=1'
@ -5784,7 +5784,7 @@ def test_item_field_autocomplete_json_source(http_requests, pub):
data = {
'data': [{'id': 1, 'text': 'hello', 'extra': 'foo'}, {'id': 2, 'text': 'world', 'extra': 'bar'}]
}
urlopen.side_effect = lambda *args: StringIO(json.dumps(data))
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
resp = app.get('/test/')
assert urlopen.call_count == 0
pq = resp.pyquery.remove_namespaces()
@ -5792,7 +5792,7 @@ def test_item_field_autocomplete_json_source(http_requests, pub):
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
data = {'data': [{'id': 1, 'text': 'hello', 'extra': 'foo'}]}
urlopen.side_effect = lambda *args: StringIO(json.dumps(data))
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
resp2 = app.get(select2_url + '?q=hell')
assert urlopen.call_count == 1
assert urlopen.call_args[0][0] == 'http://remote.example.net/json-numeric-id?q=hell'
@ -5808,7 +5808,7 @@ def test_item_field_autocomplete_json_source(http_requests, pub):
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
data = {'data': [{'id': 1, 'text': 'hello', 'extra': 'foo'}]}
urlopen.side_effect = lambda *args: StringIO(json.dumps(data))
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
resp = resp.form.submit('submit') # -> validation page
assert urlopen.call_count == 1
assert urlopen.call_args[0][0] == 'http://remote.example.net/json-numeric-id?id=1'
@ -5817,7 +5817,7 @@ def test_item_field_autocomplete_json_source(http_requests, pub):
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
data = {'data': [{'id': 1, 'text': 'hello', 'extra': 'foo'}]}
urlopen.side_effect = lambda *args: StringIO(json.dumps(data))
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
resp = resp.form.submit('submit') # -> submit
assert urlopen.call_count == 1
assert urlopen.call_args[0][0] == 'http://remote.example.net/json-numeric-id?id=1'
@ -5844,7 +5844,7 @@ remote.example.net = 1234
{'id': '2', 'text': 'world', 'extra': 'bar'},
]
}
urlopen.side_effect = lambda *args: StringIO(json.dumps(data))
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
resp = app.get('/test/')
assert urlopen.call_count == 0
pq = resp.pyquery.remove_namespaces()
@ -5852,7 +5852,7 @@ remote.example.net = 1234
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
data = {'data': [{'id': '1', 'text': 'hello', 'extra': 'foo'}]}
urlopen.side_effect = lambda *args: StringIO(json.dumps(data))
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
resp2 = app.get(select2_url + '?q=hell')
assert urlopen.call_count == 1
assert urlopen.call_args[0][0].startswith('http://remote.example.net/json?q=hell&orig=example.net&')
@ -5865,7 +5865,7 @@ remote.example.net = 1234
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
data = {'data': [{'id': '1', 'text': 'hello', 'extra': 'foo'}]}
urlopen.side_effect = lambda *args: StringIO(json.dumps(data))
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
resp = resp.form.submit('submit') # -> validation page
assert urlopen.call_count == 1
assert urlopen.call_args[0][0].startswith('http://remote.example.net/json?id=1&orig=example.net&')
@ -5874,7 +5874,7 @@ remote.example.net = 1234
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
data = {'data': [{'id': '1', 'text': 'hello', 'extra': 'foo'}]}
urlopen.side_effect = lambda *args: StringIO(json.dumps(data))
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
resp = resp.form.submit('submit') # -> submit
assert urlopen.call_count == 1
assert urlopen.call_args[0][0].startswith('http://remote.example.net/json?id=1&orig=example.net&')
@ -5895,7 +5895,7 @@ remote.example.net = 1234
{'id': '2', 'text': 'world', 'extra': 'bar'},
]
}
urlopen.side_effect = lambda *args: StringIO(json.dumps(data))
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
resp = app.get('/test/')
pq = resp.pyquery.remove_namespaces()
select2_url = pq('select').attr['data-select2-url']
@ -5919,7 +5919,7 @@ remote.example.net = 1234
{'id': '2', 'text': 'world', 'extra': 'bar'},
]
}
urlopen.side_effect = lambda *args: StringIO(json.dumps(data))
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
resp = app.get('/test/')
assert urlopen.call_count == 0
pq = resp.pyquery.remove_namespaces()
@ -5927,7 +5927,7 @@ remote.example.net = 1234
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
data = {'data': [{'id': '1', 'text': 'hello', 'extra': 'foo'}]}
urlopen.side_effect = lambda *args: StringIO(json.dumps(data))
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
resp2 = app.get(select2_url + '?q=hell', status=403)
assert urlopen.call_count == 0
@ -8332,7 +8332,7 @@ def test_create_formdata_empty_item_ds_with_id_parameter(pub, create_formdata):
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
data = {'data': create_formdata['data']}
urlopen.side_effect = lambda *args: StringIO(json.dumps(data))
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
app = get_app(create_formdata['pub'])
resp = app.get('/source-form/')
@ -8710,7 +8710,7 @@ def test_form_item_timetable_data_source(pub, http_requests):
{"id": "3", "datetime": "2021-01-14 10:40:00", "text": "event 3"},
]
}
urlopen.side_effect = lambda *args: StringIO(json.dumps(data))
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
resp = app.get('/test/')
assert 'data-date="2021-01-12"' in resp and 'data-time="10:00"' in resp
@ -8764,7 +8764,7 @@ def test_form_item_timetable_data_source_with_date_alignment(pub, http_requests)
{"id": "3", "datetime": "2021-01-14 10:40:00", "text": "event 3"},
]
}
urlopen.side_effect = lambda *args: StringIO(json.dumps(data))
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
resp = app.get('/test/')
resp.form['f2'] = '2021-01-14'

View File

@ -1,9 +1,9 @@
# -*- coding: utf-8 -*-
import io
import json
import mock
from django.utils.six import StringIO
import pytest
from webtest import Upload
@ -1255,7 +1255,7 @@ def test_block_with_dynamic_item_field(mock_urlopen, pub, blocks_feature):
payload = [{'id': '1', 'text': 'foo'}]
elif query == 'bar':
payload = [{'id': '2', 'text': 'bar'}]
return StringIO(json.dumps({'data': payload}))
return io.StringIO(json.dumps({'data': payload}))
mock_urlopen.side_effect = lambda url: data_source(url)

View File

@ -1,16 +1,17 @@
# -*- coding: utf-8 -*-
import base64
import io
import json
import locale
import os
import urllib.parse
import xml.etree.ElementTree as ET
import zipfile
import mock
import pytest
from django.utils.encoding import force_bytes
from django.utils.six import BytesIO, StringIO
from django.utils.six.moves.urllib import parse as urlparse
from quixote.http_request import Upload as QuixoteUpload
from webtest import Upload, Hidden
@ -332,7 +333,7 @@ def test_formdata_generated_document_download(pub):
export_to.convert_to_pdf = False
export_to.label = 'create doc'
upload = QuixoteUpload('/foo/test.rtf', content_type='application/rtf')
upload.fp = BytesIO()
upload.fp = io.BytesIO()
upload.fp.write(b'HELLO WORLD')
upload.fp.seek(0)
export_to.model_file = UploadedFile(pub.app_dir, None, upload)
@ -382,7 +383,7 @@ def test_formdata_generated_document_download(pub):
# change export model to now be a RTF file, do the action again on the same form and
# check that both the old .odt file and the new .rtf file are there and valid.
upload = QuixoteUpload('/foo/test.rtf', content_type='application/rtf')
upload.fp = BytesIO()
upload.fp = io.BytesIO()
upload.fp.write(b'HELLO NEW WORLD')
upload.fp.seek(0)
export_to.model_file = UploadedFile(pub.app_dir, None, upload)
@ -398,7 +399,7 @@ def test_formdata_generated_document_download(pub):
# use substitution variables on rtf: only ezt format is accepted
upload = QuixoteUpload('/foo/test.rtf', content_type='application/rtf')
upload.fp = BytesIO()
upload.fp = io.BytesIO()
upload.fp.write(b'HELLO {{DJANGO}} WORLD [form_name]')
upload.fp.seek(0)
export_to.model_file = UploadedFile(pub.app_dir, None, upload)
@ -427,7 +428,7 @@ def test_formdata_generated_document_odt_download(pub, odt_template):
template_filename = os.path.join(os.path.dirname(__file__), '..', odt_template)
template = open(template_filename, 'rb').read()
upload = QuixoteUpload('/foo/' + odt_template, content_type='application/octet-stream')
upload.fp = BytesIO()
upload.fp = io.BytesIO()
upload.fp.write(template)
upload.fp.seek(0)
export_to.model_file = UploadedFile(pub.app_dir, None, upload)
@ -460,7 +461,7 @@ def test_formdata_generated_document_odt_download(pub, odt_template):
resp = resp.follow() # $form/$id/create_doc
resp = resp.follow() # $form/$id/create_doc/
with open(os.path.join(os.path.dirname(__file__), '..', 'template-out.odt'), 'rb') as f:
assert_equal_zip(BytesIO(resp.body), f)
assert_equal_zip(io.BytesIO(resp.body), f)
resp = login(get_app(pub), username='foo', password='foo').get(form_location)
resp = resp.form.submit('button_export_to')
@ -487,11 +488,11 @@ def test_formdata_generated_document_odt_download(pub, odt_template):
resp = resp.follow()
assert resp.content_type == 'application/octet-stream'
with open(os.path.join(os.path.dirname(__file__), '..', 'template-out.odt'), 'rb') as f:
assert_equal_zip(BytesIO(resp.body), f)
assert_equal_zip(io.BytesIO(resp.body), f)
# change file content, same name
upload = QuixoteUpload('/foo/test.rtf', content_type='application/rtf')
upload.fp = BytesIO()
upload.fp = io.BytesIO()
upload.fp.write(b'HELLO NEW WORLD')
upload.fp.seek(0)
export_to.model_file = UploadedFile(pub.app_dir, None, upload)
@ -504,7 +505,7 @@ def test_formdata_generated_document_odt_download(pub, odt_template):
with open(os.path.join(os.path.dirname(__file__), '..', 'template-out.odt'), 'rb') as f:
body = resp.click(odt_template, index=0).follow().body
assert_equal_zip(BytesIO(body), f)
assert_equal_zip(io.BytesIO(body), f)
assert resp.click('test.rtf', index=0).follow().body == b'HELLO NEW WORLD'
@ -519,7 +520,7 @@ def test_formdata_generated_document_odt_download_with_substitution_variable(pub
template_filename = os.path.join(os.path.dirname(__file__), '..', 'template.odt')
template = open(template_filename, 'rb').read()
upload = QuixoteUpload('/foo/template.odt', content_type='application/octet-stream')
upload.fp = BytesIO()
upload.fp = io.BytesIO()
upload.fp.write(template)
upload.fp.seek(0)
export_to.model_file = UploadedFile(pub.app_dir, None, upload)
@ -552,7 +553,7 @@ def test_formdata_generated_document_odt_download_with_substitution_variable(pub
resp = resp.follow() # $form/$id/create_doc
resp = resp.follow() # $form/$id/create_doc/
with open(os.path.join(os.path.dirname(__file__), '..', 'template-out.odt'), 'rb') as f:
assert_equal_zip(BytesIO(resp.body), f)
assert_equal_zip(io.BytesIO(resp.body), f)
export_to.attach_to_history = True
wf.store()
@ -567,11 +568,11 @@ def test_formdata_generated_document_odt_download_with_substitution_variable(pub
response1 = resp = resp.follow()
assert resp.content_type == 'application/octet-stream'
with open(os.path.join(os.path.dirname(__file__), '..', 'template-out.odt'), 'rb') as f:
assert_equal_zip(BytesIO(resp.body), f)
assert_equal_zip(io.BytesIO(resp.body), f)
# change file content, same name
upload = QuixoteUpload('/foo/test.rtf', content_type='application/rtf')
upload.fp = BytesIO()
upload.fp = io.BytesIO()
upload.fp.write(b'HELLO NEW WORLD')
upload.fp.seek(0)
export_to.model_file = UploadedFile(pub.app_dir, None, upload)
@ -584,7 +585,7 @@ def test_formdata_generated_document_odt_download_with_substitution_variable(pub
with open(os.path.join(os.path.dirname(__file__), '..', 'template-out.odt'), 'rb') as f:
body = resp.click('template.odt', index=0).follow().body
assert_equal_zip(BytesIO(body), f)
assert_equal_zip(io.BytesIO(body), f)
response2 = resp.click('test.rtf', index=0).follow()
assert response2.body == b'HELLO NEW WORLD'
# Test attachment substitution variables
@ -640,7 +641,7 @@ def test_formdata_generated_document_odt_to_pdf_download(pub):
template_filename = os.path.join(os.path.dirname(__file__), '..', 'template.odt')
template = open(template_filename, 'rb').read()
upload = QuixoteUpload('/foo/template.odt', content_type='application/octet-stream')
upload.fp = BytesIO()
upload.fp = io.BytesIO()
upload.fp.write(template)
upload.fp.seek(0)
export_to.model_file = UploadedFile(pub.app_dir, None, upload)
@ -711,7 +712,7 @@ def test_formdata_generated_document_odt_to_pdf_download_push_to_portfolio(
template_filename = os.path.join(os.path.dirname(__file__), '..', 'template.odt')
template = open(template_filename, 'rb').read()
upload = QuixoteUpload('/foo/template.odt', content_type='application/octet-stream')
upload.fp = BytesIO()
upload.fp = io.BytesIO()
upload.fp.write(template)
upload.fp.seek(0)
export_to.model_file = UploadedFile(pub.app_dir, None, upload)
@ -804,7 +805,7 @@ def test_formdata_generated_document_non_interactive(pub):
template_filename = os.path.join(os.path.dirname(__file__), '..', 'template.odt')
template = open(template_filename, 'rb').read()
upload = QuixoteUpload('/foo/template.odt', content_type='application/octet-stream')
upload.fp = BytesIO()
upload.fp = io.BytesIO()
upload.fp.write(template)
upload.fp.seek(0)
export_to.model_file = UploadedFile(pub.app_dir, None, upload)
@ -844,7 +845,7 @@ def test_formdata_generated_document_non_interactive(pub):
resp = resp.follow()
assert resp.content_type == 'application/octet-stream'
with open(os.path.join(os.path.dirname(__file__), '..', 'template-out.odt'), 'rb') as f:
assert_equal_zip(BytesIO(resp.body), f)
assert_equal_zip(io.BytesIO(resp.body), f)
assert formdef.data_class().count() == 1
assert formdef.data_class().select()[0].status == 'wf-st2'
@ -866,7 +867,7 @@ def test_formdata_generated_document_to_backoffice_field(pub):
template_filename = os.path.join(os.path.dirname(__file__), '..', 'template.odt')
template = open(template_filename, 'rb').read()
upload = QuixoteUpload('/foo/template.odt', content_type='application/octet-stream')
upload.fp = BytesIO()
upload.fp = io.BytesIO()
upload.fp.write(template)
upload.fp.seek(0)
export_to.model_file = UploadedFile(pub.app_dir, None, upload)
@ -911,7 +912,7 @@ def test_formdata_generated_document_to_backoffice_field(pub):
resp = resp.follow()
assert resp.content_type == 'application/octet-stream'
with open(os.path.join(os.path.dirname(__file__), '..', 'template-out.odt'), 'rb') as f:
assert_equal_zip(BytesIO(resp.body), f)
assert_equal_zip(io.BytesIO(resp.body), f)
assert formdef.data_class().count() == 1
assert formdef.data_class().select()[0].status == 'wf-st2'
@ -933,7 +934,7 @@ def test_formdata_generated_document_in_private_history(pub):
export_to = ExportToModel()
export_to.label = 'create doc'
upload = QuixoteUpload('/foo/test.rtf', content_type='application/rtf')
upload.fp = BytesIO()
upload.fp = io.BytesIO()
upload.fp.write(b'HELLO WORLD')
upload.fp.seek(0)
export_to.model_file = UploadedFile(pub.app_dir, None, upload)
@ -1087,7 +1088,7 @@ def test_formdata_form_file_download(pub):
formdata = formdef.data_class().select()[0]
assert 'xxx_var_yyy_raw' in formdata.workflow_data
download = resp.test_app.get(urlparse.urljoin(resp.location, 'files/form-xxx-yyy/test.txt'))
download = resp.test_app.get(urllib.parse.urljoin(resp.location, 'files/form-xxx-yyy/test.txt'))
assert download.content_type == 'text/plain'
assert download.body == b'foobar'
@ -1292,7 +1293,7 @@ def test_formdata_workflow_form_prefill_autocomplete(pub):
data = {'data': [{'id': '1', 'text': 'hello', 'extra': 'foo'}]}
def side_effect(url, *args):
return StringIO(json.dumps(data))
return io.StringIO(json.dumps(data))
urlopen.side_effect = side_effect

View File

@ -1,8 +1,7 @@
import io
import pytest
import xml.etree.ElementTree as ET
from django.utils.six import BytesIO
from wcs.qommon.http_request import HTTPRequest
from wcs.qommon.misc import indent_xml as indent
from wcs.qommon.template import Template
@ -141,7 +140,7 @@ def test_xml_export_import(pub):
carddef.data_class().wipe()
pub.custom_view_class.wipe()
carddef2 = CardDef.import_from_xml(BytesIO(ET.tostring(carddef_xml)))
carddef2 = CardDef.import_from_xml(io.BytesIO(ET.tostring(carddef_xml)))
assert carddef2.name == 'foo'
assert carddef2.fields[1].data_source == {'type': 'carddef:foo'}
assert carddef2._custom_views

View File

@ -1,10 +1,10 @@
import io
import os
import pickle
import shutil
import pytest
from django.utils.six import BytesIO
from quixote import cleanup
from wcs.categories import Category, CardDefCategory
@ -108,7 +108,7 @@ def test_xml_import(category_class):
test.store()
test = category_class.get(1)
fd = BytesIO(test.export_to_xml_string(include_id=True))
fd = io.BytesIO(test.export_to_xml_string(include_id=True))
test2 = category_class.import_from_xml(fd, include_id=True)
assert test.id == test2.id
assert test.name == test2.name

View File

@ -1,13 +1,12 @@
# -*- coding: utf-8 -*-
import codecs
import io
import pytest
import os
import json
import shutil
from django.utils.six import StringIO
from django.utils.six.moves.urllib import parse as urlparse
import urllib.parse
from quixote import cleanup
from wcs.qommon.http_request import HTTPRequest
@ -889,7 +888,7 @@ def test_data_source_unicode():
data_source2 = NamedDataSource.select()[0]
assert data_source2.data_source == data_source.data_source
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
urlopen.side_effect = lambda *args: StringIO(
urlopen.side_effect = lambda *args: io.StringIO(
'{"data": [{"id": 0, "text": "zéro"}, {"id": 1, "text": "uné"}, {"id": 2, "text": "deux"}]}'
)
assert data_sources.get_items({'type': 'foobar'}) == [
@ -906,12 +905,12 @@ def test_data_source_signed(no_request_pub):
data_source.store()
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
urlopen.side_effect = lambda *args: StringIO('{"data": [{"id": 0, "text": "zero"}]}')
urlopen.side_effect = lambda *args: io.StringIO('{"data": [{"id": 0, "text": "zero"}]}')
assert len(data_sources.get_items({'type': 'foobar'})) == 1
signed_url = urlopen.call_args[0][0]
assert signed_url.startswith('https://api.example.com/json?')
parsed = urlparse.urlparse(signed_url)
querystring = urlparse.parse_qs(parsed.query)
parsed = urllib.parse.urlparse(signed_url)
querystring = urllib.parse.parse_qs(parsed.query)
# stupid simple (but sufficient) signature test:
assert querystring['algo'] == ['sha256']
assert querystring['orig'] == ['example.net']
@ -922,12 +921,12 @@ def test_data_source_signed(no_request_pub):
data_source.data_source = {'type': 'json', 'value': "https://api.example.com/json?foo=bar"}
data_source.store()
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
urlopen.side_effect = lambda *args: StringIO('{"data": [{"id": 0, "text": "zero"}]}')
urlopen.side_effect = lambda *args: io.StringIO('{"data": [{"id": 0, "text": "zero"}]}')
assert len(data_sources.get_items({'type': 'foobar'})) == 1
signed_url = urlopen.call_args[0][0]
assert signed_url.startswith('https://api.example.com/json?')
parsed = urlparse.urlparse(signed_url)
querystring = urlparse.parse_qs(parsed.query)
parsed = urllib.parse.urlparse(signed_url)
querystring = urllib.parse.parse_qs(parsed.query)
assert querystring['algo'] == ['sha256']
assert querystring['orig'] == ['example.net']
assert querystring['nonce'][0]
@ -938,7 +937,7 @@ def test_data_source_signed(no_request_pub):
data_source.data_source = {'type': 'json', 'value': "https://no-secret.example.com/json"}
data_source.store()
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
urlopen.side_effect = lambda *args: StringIO('{"data": [{"id": 0, "text": "zero"}]}')
urlopen.side_effect = lambda *args: io.StringIO('{"data": [{"id": 0, "text": "zero"}]}')
assert len(data_sources.get_items({'type': 'foobar'})) == 1
unsigned_url = urlopen.call_args[0][0]
assert unsigned_url == 'https://no-secret.example.com/json'
@ -951,7 +950,7 @@ def test_named_datasource_json_cache(requests_pub):
datasource.store()
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
urlopen.side_effect = lambda *args: StringIO(
urlopen.side_effect = lambda *args: io.StringIO(
json.dumps({'data': [{'id': '1', 'text': 'foo'}, {'id': '2', 'text': 'bar'}]})
)
@ -997,7 +996,7 @@ def test_named_datasource_id_parameter(requests_pub):
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
value = [{'id': '1', 'text': 'foo'}]
urlopen.side_effect = lambda *args: StringIO(json.dumps({'data': value}))
urlopen.side_effect = lambda *args: io.StringIO(json.dumps({'data': value}))
assert datasource.get_structured_value('1') == value[0]
assert urlopen.call_count == 1
assert urlopen.call_args[0][0] == 'http://whatever/?id=1'
@ -1009,27 +1008,27 @@ def test_named_datasource_id_parameter(requests_pub):
get_request().datasources_cache = {}
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
value = [{'id': '1', 'text': 'bar'}, {'id': '2', 'text': 'foo'}]
urlopen.side_effect = lambda *args: StringIO(json.dumps({'data': value}))
urlopen.side_effect = lambda *args: io.StringIO(json.dumps({'data': value}))
assert datasource.get_structured_value('1') == value[0]
assert urlopen.call_count == 1
get_request().datasources_cache = {}
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
urlopen.side_effect = lambda *args: StringIO(json.dumps({'data': []})) # empty list
urlopen.side_effect = lambda *args: io.StringIO(json.dumps({'data': []})) # empty list
assert datasource.get_structured_value('1') is None
assert urlopen.call_count == 1
get_request().datasources_cache = {}
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
value = [{'id': '1', 'text': 'foo'}]
urlopen.side_effect = lambda *args: StringIO(json.dumps({'data': value, 'err': 0}))
urlopen.side_effect = lambda *args: io.StringIO(json.dumps({'data': value, 'err': 0}))
assert datasource.get_structured_value('1') == value[0]
assert urlopen.call_count == 1
get_request().datasources_cache = {}
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
value = [{'id': '1', 'text': 'foo'}]
urlopen.side_effect = lambda *args: StringIO(json.dumps({'data': value, 'err': 1}))
urlopen.side_effect = lambda *args: io.StringIO(json.dumps({'data': value, 'err': 1}))
assert datasource.get_structured_value('1') is None
assert urlopen.call_count == 1
# no cache for errors
@ -1039,13 +1038,13 @@ def test_named_datasource_id_parameter(requests_pub):
get_request().datasources_cache = {}
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
value = {'id': '1', 'text': 'foo'} # not a list
urlopen.side_effect = lambda *args: StringIO(json.dumps({'data': value}))
urlopen.side_effect = lambda *args: io.StringIO(json.dumps({'data': value}))
assert datasource.get_structured_value('1') is None
assert urlopen.call_count == 1
get_request().datasources_cache = {}
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
urlopen.side_effect = lambda *args: StringIO('not json')
urlopen.side_effect = lambda *args: io.StringIO('not json')
assert datasource.get_structured_value('1') is None
assert urlopen.call_count == 1
@ -1053,7 +1052,7 @@ def test_named_datasource_id_parameter(requests_pub):
get_request().datasources_cache = {}
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
value = [{'id': '1', 'text': 'bar'}, {'id': '2', 'text': 'foo'}]
urlopen.side_effect = lambda *args: StringIO(json.dumps({'data': value}))
urlopen.side_effect = lambda *args: io.StringIO(json.dumps({'data': value}))
assert datasource.get_structured_value('2') == value[1]
assert urlopen.call_count == 1
# try again, get from request.datasources_cache
@ -1062,7 +1061,7 @@ def test_named_datasource_id_parameter(requests_pub):
get_request().datasources_cache = {}
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
value = [{'id': '1', 'text': 'bar'}, {'id': '2', 'text': 'foo'}]
urlopen.side_effect = lambda *args: StringIO(json.dumps({'data': value}))
urlopen.side_effect = lambda *args: io.StringIO(json.dumps({'data': value}))
assert datasource.get_structured_value('3') is None
assert urlopen.call_count == 1
# try again, get from request.datasources_cache

View File

@ -1,11 +1,10 @@
# -*- coding: utf-8 -*-
import io
import pytest
import json
import shutil
from django.utils.six import StringIO
from quixote import cleanup
from wcs import fields
from wcs.data_sources import NamedDataSource, build_agenda_datasources, collect_agenda_data
@ -112,7 +111,7 @@ def test_collect_agenda_data(urlopen, pub, chrono_url):
pub.load_site_options()
NamedDataSource.wipe()
urlopen.side_effect = lambda *args: StringIO('{"data": []}')
urlopen.side_effect = lambda *args: io.StringIO('{"data": []}')
assert collect_agenda_data(pub) == []
assert urlopen.call_args_list == [mock.call('http://chrono.example.net/api/agenda/')]
@ -122,7 +121,7 @@ def test_collect_agenda_data(urlopen, pub, chrono_url):
assert urlopen.call_args_list == [mock.call('http://chrono.example.net/api/agenda/')]
# events agenda
urlopen.side_effect = lambda *args: StringIO(json.dumps({"data": AGENDA_EVENTS_DATA}))
urlopen.side_effect = lambda *args: io.StringIO(json.dumps({"data": AGENDA_EVENTS_DATA}))
urlopen.reset_mock()
assert collect_agenda_data(pub) == [
{'text': 'Events A', 'url': 'http://chrono.example.net/api/agenda/events-A/datetimes/'},
@ -132,9 +131,9 @@ def test_collect_agenda_data(urlopen, pub, chrono_url):
# meetings agenda
urlopen.side_effect = [
StringIO(json.dumps({"data": AGENDA_MEETINGS_DATA})),
StringIO(json.dumps({"data": AGENDA_MEETING_TYPES_DATA['meetings-A']})),
StringIO(json.dumps({"data": AGENDA_MEETING_TYPES_DATA['virtual-B']})),
io.StringIO(json.dumps({"data": AGENDA_MEETINGS_DATA})),
io.StringIO(json.dumps({"data": AGENDA_MEETING_TYPES_DATA['meetings-A']})),
io.StringIO(json.dumps({"data": AGENDA_MEETING_TYPES_DATA['virtual-B']})),
]
urlopen.reset_mock()
assert collect_agenda_data(pub) == [
@ -164,8 +163,8 @@ def test_collect_agenda_data(urlopen, pub, chrono_url):
# if meeting types could not be collected
urlopen.side_effect = [
StringIO(json.dumps({"data": AGENDA_MEETINGS_DATA})),
StringIO(json.dumps({"data": AGENDA_MEETING_TYPES_DATA['meetings-A']})),
io.StringIO(json.dumps({"data": AGENDA_MEETINGS_DATA})),
io.StringIO(json.dumps({"data": AGENDA_MEETING_TYPES_DATA['meetings-A']})),
ConnectionError,
]
urlopen.reset_mock()
@ -177,7 +176,7 @@ def test_collect_agenda_data(urlopen, pub, chrono_url):
]
urlopen.side_effect = [
StringIO(json.dumps({"data": AGENDA_MEETINGS_DATA})),
io.StringIO(json.dumps({"data": AGENDA_MEETINGS_DATA})),
ConnectionError,
]
urlopen.reset_mock()

View File

@ -1,8 +1,8 @@
import datetime
import io
import pytest
import os
from django.utils.six import StringIO
from quixote import cleanup
from wcs.qommon.ezt import (
Template,
@ -36,7 +36,7 @@ def pub(request):
def test_simple_qualifier():
template = Template()
template.parse('<p>[foo]</p>')
output = StringIO()
output = io.StringIO()
template.generate(output, {'foo': 'bar'})
assert output.getvalue() == '<p>bar</p>'
@ -44,7 +44,7 @@ def test_simple_qualifier():
def test_simple_qualifier_missing_variable():
template = Template()
template.parse('<p>[foo]</p>')
output = StringIO()
output = io.StringIO()
template.generate(output, {})
assert output.getvalue() == '<p>[foo]</p>'
@ -54,17 +54,17 @@ def test_if_any():
template.parse('<p>[if-any foo]bar[end]</p>')
# boolean
output = StringIO()
output = io.StringIO()
template.generate(output, {'foo': True})
assert output.getvalue() == '<p>bar</p>'
# no value
output = StringIO()
output = io.StringIO()
template.generate(output, {})
assert output.getvalue() == '<p></p>'
# defined but evaluating to False
output = StringIO()
output = io.StringIO()
template.generate(output, {'foo': False})
assert output.getvalue() == '<p>bar</p>'
@ -73,11 +73,11 @@ def test_if_any_else():
template = Template()
template.parse('<p>[if-any foo]bar[else]baz[end]</p>')
output = StringIO()
output = io.StringIO()
template.generate(output, {'foo': True})
assert output.getvalue() == '<p>bar</p>'
output = StringIO()
output = io.StringIO()
template.generate(output, {})
assert output.getvalue() == '<p>baz</p>'
@ -87,17 +87,17 @@ def test_is():
template.parse('<p>[is foo "bar"]bar[end]</p>')
# no value
output = StringIO()
output = io.StringIO()
template.generate(output, {})
assert output.getvalue() == '<p></p>'
# defined but other value
output = StringIO()
output = io.StringIO()
template.generate(output, {'foo': 'baz'})
assert output.getvalue() == '<p></p>'
# defined with correct value
output = StringIO()
output = io.StringIO()
template.generate(output, {'foo': 'bar'})
assert output.getvalue() == '<p>bar</p>'
@ -105,7 +105,7 @@ def test_is():
def test_callable_qualifier():
template = Template()
template.parse('<p>[foo]</p>')
output = StringIO()
output = io.StringIO()
template.generate(output, {'foo': lambda x: x.write('bar')})
assert output.getvalue() == '<p>bar</p>'
@ -113,13 +113,13 @@ def test_callable_qualifier():
def test_date_qualifier(pub):
template = Template()
template.parse('<p>[foo]</p>')
output = StringIO()
output = io.StringIO()
template.generate(output, {'foo': datetime.date(2019, 1, 2)})
assert output.getvalue() == '<p>2019-01-02</p>'
pub.cfg['language'] = {'language': 'fr'}
pub.write_cfg()
output = StringIO()
output = io.StringIO()
template.generate(output, {'foo': datetime.date(2019, 1, 2)})
assert output.getvalue() == '<p>02/01/2019</p>'
@ -127,13 +127,13 @@ def test_date_qualifier(pub):
def test_datetime_qualifier(pub):
template = Template()
template.parse('<p>[foo]</p>')
output = StringIO()
output = io.StringIO()
template.generate(output, {'foo': datetime.datetime(2019, 1, 2, 14, 4)})
assert output.getvalue() == '<p>2019-01-02 14:04</p>'
pub.cfg['language'] = {'language': 'fr'}
pub.write_cfg()
output = StringIO()
output = io.StringIO()
template.generate(output, {'foo': datetime.datetime(2019, 1, 2, 14, 4)})
assert output.getvalue() == '<p>02/01/2019 14:04</p>'
@ -181,13 +181,13 @@ def test_missing_is_arg():
def test_array_index():
template = Template()
template.parse('<p>[foo.0]</p>')
output = StringIO()
output = io.StringIO()
template.generate(output, {'foo': ['bar']})
assert output.getvalue() == '<p>bar</p>'
template = Template()
template.parse('<p>[foo.bar]</p>')
output = StringIO()
output = io.StringIO()
template.generate(output, {'foo': ['bar']})
assert output.getvalue() == '<p>[foo.bar]</p>'
@ -195,7 +195,7 @@ def test_array_index():
def test_array_subindex():
template = Template()
template.parse('<p>[foo.0.1]</p>')
output = StringIO()
output = io.StringIO()
template.generate(output, {'foo': [['bar', 'baz']]})
assert output.getvalue() == '<p>baz</p>'
@ -203,13 +203,13 @@ def test_array_subindex():
def test_dict_index():
template = Template()
template.parse('<p>[foo.a]</p>')
output = StringIO()
output = io.StringIO()
template.generate(output, {'foo': {'a': 'bar'}})
assert output.getvalue() == '<p>bar</p>'
template = Template()
template.parse('<p>[foo.b]</p>')
output = StringIO()
output = io.StringIO()
template.generate(output, {'foo': {'a': 'bar'}})
assert output.getvalue() == '<p>[foo.b]</p>'
@ -223,14 +223,14 @@ def test_ezt_script(pub):
vars = {'script': ScriptsSubstitutionProxy()}
template = Template()
template.parse('<p>[script.hello_world]</p>')
output = StringIO()
output = io.StringIO()
template.generate(output, vars)
assert output.getvalue() == '<p>Hello world</p>'
vars = {'script': ScriptsSubstitutionProxy()}
template = Template()
template.parse('<p>[script.hello_world "fred"]</p>')
output = StringIO()
output = io.StringIO()
template.generate(output, vars)
assert output.getvalue() == '<p>Hello fred</p>'

View File

@ -1,9 +1,8 @@
import base64
import json
import urllib.parse
from django.utils.encoding import force_bytes, force_text
from django.utils.six.moves.urllib import parse as urllib
from django.utils.six.moves.urllib import parse as urlparse
from quixote import cleanup, get_session_manager
from utilities import get_app, create_temporary_pub
@ -126,7 +125,7 @@ def test_fc_login_page(caplog):
resp = app.get('/login/')
assert resp.status_int == 302
assert resp.location.startswith('https://fcp.integ01.dev-franceconnect.fr/api/v1/authorize')
qs = urlparse.parse_qs(resp.location.split('?')[1])
qs = urllib.parse.parse_qs(resp.location.split('?')[1])
nonce = qs['nonce'][0]
state = qs['state'][0]
@ -152,7 +151,7 @@ def test_fc_login_page(caplog):
http_get_page.return_value = (None, 200, json.dumps(user_info_result), None)
resp = app.get(
'/ident/fc/callback?%s'
% urllib.urlencode(
% urllib.parse.urlencode(
{
'code': '1234',
'state': state,
@ -176,19 +175,19 @@ def test_fc_login_page(caplog):
assert session.extra_user_variables['fc_sub'] == 'ymca'
resp = app.get('/logout')
splitted = urlparse.urlsplit(resp.location)
splitted = urllib.parse.urlsplit(resp.location)
assert (
urlparse.urlunsplit((splitted.scheme, splitted.netloc, splitted.path, '', ''))
urllib.parse.urlunsplit((splitted.scheme, splitted.netloc, splitted.path, '', ''))
== 'https://fcp.integ01.dev-franceconnect.fr/api/v1/logout'
)
assert urlparse.parse_qs(splitted.query)['post_logout_redirect_uri'] == ['http://example.net']
assert urlparse.parse_qs(splitted.query)['id_token_hint']
assert urllib.parse.parse_qs(splitted.query)['post_logout_redirect_uri'] == ['http://example.net']
assert urllib.parse.parse_qs(splitted.query)['id_token_hint']
assert not get_session(app)
# Test error handling path
resp = app.get(
'/ident/fc/callback?%s'
% urllib.urlencode(
% urllib.parse.urlencode(
{
'state': state,
'error': 'access_denied',
@ -198,7 +197,7 @@ def test_fc_login_page(caplog):
assert 'user did not authorize login' in caplog.records[-1].message
resp = app.get(
'/ident/fc/callback?%s'
% urllib.urlencode(
% urllib.parse.urlencode(
{
'state': state,
'error': 'whatever',
@ -212,7 +211,7 @@ def test_fc_login_page(caplog):
resp = app.get(login_url)
assert resp.status_int == 302
assert resp.location.startswith('https://fcp.integ01.dev-franceconnect.fr/api/v1/authorize')
qs = urlparse.parse_qs(resp.location.split('?')[1])
qs = urllib.parse.parse_qs(resp.location.split('?')[1])
state = qs['state'][0]
id_token['nonce'] = qs['nonce'][0]
token_result['id_token'] = '.%s.' % force_text(base64url_encode(json.dumps(id_token)))
@ -224,7 +223,7 @@ def test_fc_login_page(caplog):
http_get_page.return_value = (None, 200, json.dumps(user_info_result), None)
resp = app.get(
'/ident/fc/callback?%s'
% urllib.urlencode(
% urllib.parse.urlencode(
{
'code': '1234',
'state': state,
@ -263,7 +262,7 @@ def test_fc_login_page(caplog):
resp = app.get('/login/')
assert resp.status_int == 302
assert resp.location.startswith('https://fcp.integ01.dev-franceconnect.fr/api/v1/authorize')
qs = urlparse.parse_qs(resp.location.split('?')[1])
qs = urllib.parse.parse_qs(resp.location.split('?')[1])
state = qs['state'][0]
id_token['nonce'] = qs['nonce'][0]
token_result['id_token'] = '.%s.' % force_text(base64url_encode(json.dumps(id_token)))
@ -280,7 +279,7 @@ def test_fc_login_page(caplog):
http_get_page.return_value = (None, 200, json.dumps(bad_user_info_result), None)
resp = app.get(
'/ident/fc/callback?%s'
% urllib.urlencode(
% urllib.parse.urlencode(
{
'code': '1234',
'state': state,

View File

@ -1,5 +1,6 @@
import datetime
import glob
import io
import json
import os
import pickle
@ -9,9 +10,7 @@ import time
import pytest
from django.utils import six
from django.utils.encoding import force_bytes
from django.utils.six import BytesIO
from quixote import cleanup
from wcs import fields
from wcs.blocks import BlockDef
@ -438,7 +437,7 @@ def test_unused_file_removal_job(pub):
formdata.store()
formdata.evolution[-1].parts = [
AttachmentEvolutionPart('hello.txt', fp=BytesIO(b'hello world'), varname='testfile')
AttachmentEvolutionPart('hello.txt', fp=io.BytesIO(b'hello world'), varname='testfile')
]
formdata.store()
assert len(glob.glob(os.path.join(pub.app_dir, 'attachments', '*/*'))) == 1

View File

@ -1,11 +1,11 @@
# -*- coding: utf-8 -*-
import io
import pytest
import shutil
import time
import xml.etree.ElementTree as ET
from django.utils.six import BytesIO, StringIO
from quixote import cleanup
from wcs.categories import Category
@ -52,7 +52,7 @@ def assert_xml_import_export_works(formdef, include_id=False):
def assert_json_import_export_works(formdef, include_id=False):
formdef2 = FormDef.import_from_json(
StringIO(formdef.export_to_json(include_id=include_id)), include_id=include_id
io.StringIO(formdef.export_to_json(include_id=include_id)), include_id=include_id
)
assert_compare_formdef(formdef, formdef2, include_id=include_id)
return formdef2
@ -216,7 +216,7 @@ def test_workflow_options_with_file():
upload = Upload('/foo/bar', content_type='application/vnd.oasis.opendocument.text')
file_content = b'''PK\x03\x04\x14\x00\x00\x08\x00\x00\'l\x8eG^\xc62\x0c\'\x00'''
upload.fp = BytesIO()
upload.fp = io.BytesIO()
upload.fp.write(file_content)
upload.fp.seek(0)
model_file = UploadedFile(pub.APP_DIR, None, upload)
@ -339,7 +339,7 @@ def test_invalid_field_type():
formdef.fields = [fields.StringField(id='1', type='XXX')]
export = ET.tostring(export_to_indented_xml(formdef))
with pytest.raises(FormdefImportError):
FormDef.import_from_xml(BytesIO(export), include_id=True)
FormDef.import_from_xml(io.BytesIO(export), include_id=True)
def test_invalid_data_source():
@ -352,11 +352,11 @@ def test_invalid_data_source():
export = export.replace(
b'<data_source><type>xxx</type></data_source>', b'<data_source><type/></data_source>'
)
formdef2 = FormDef.import_from_xml(BytesIO(export))
formdef2 = FormDef.import_from_xml(io.BytesIO(export))
assert formdef2.fields[0].data_source == {}
export = export.replace(b'<data_source><type/></data_source>', b'<data_source> </data_source>')
formdef2 = FormDef.import_from_xml(BytesIO(export))
formdef2 = FormDef.import_from_xml(io.BytesIO(export))
assert formdef2.fields[0].data_source == {}
@ -368,12 +368,12 @@ def test_unknown_data_source():
]
export = ET.tostring(export_to_indented_xml(formdef))
FormDef.import_from_xml(BytesIO(export))
FormDef.import_from_xml(io.BytesIO(export))
formdef.fields = [fields.StringField(id='1', type='string', data_source={'type': 'foobar'})]
export = ET.tostring(export_to_indented_xml(formdef))
with pytest.raises(FormdefImportError, match='Unknown datasources'):
FormDef.import_from_xml(BytesIO(export))
FormDef.import_from_xml(io.BytesIO(export))
# carddef as datasource
carddef = CardDef()
@ -385,12 +385,12 @@ def test_unknown_data_source():
formdef.fields = [fields.StringField(id='1', type='string', data_source={'type': 'carddef:foo'})]
export = ET.tostring(export_to_indented_xml(formdef))
FormDef.import_from_xml(BytesIO(export))
FormDef.import_from_xml(io.BytesIO(export))
formdef.fields = [fields.StringField(id='1', type='string', data_source={'type': 'carddef:unknown'})]
export = ET.tostring(export_to_indented_xml(formdef))
with pytest.raises(FormdefImportError):
FormDef.import_from_xml(BytesIO(export))
FormDef.import_from_xml(io.BytesIO(export))
# carddef custom view as datasource
pub.custom_view_class.wipe()
@ -406,12 +406,12 @@ def test_unknown_data_source():
fields.StringField(id='1', type='string', data_source={'type': 'carddef:foo:card-view'})
]
export = ET.tostring(export_to_indented_xml(formdef))
FormDef.import_from_xml(BytesIO(export))
FormDef.import_from_xml(io.BytesIO(export))
formdef.fields = [fields.StringField(id='1', type='string', data_source={'type': 'carddef:foo:unknown'})]
export = ET.tostring(export_to_indented_xml(formdef))
with pytest.raises(FormdefImportError):
FormDef.import_from_xml(BytesIO(export))
FormDef.import_from_xml(io.BytesIO(export))
def test_duplicated_field_ids():
@ -425,12 +425,12 @@ def test_duplicated_field_ids():
export = ET.tostring(export_to_indented_xml(formdef, include_id=True))
with pytest.raises(FormdefImportError):
FormDef.import_from_xml(BytesIO(export))
FormDef.import_from_xml(io.BytesIO(export))
with pytest.raises(FormdefImportError):
FormDef.import_from_xml(BytesIO(export), include_id=True)
FormDef.import_from_xml(io.BytesIO(export), include_id=True)
formdef2 = FormDef.import_from_xml(BytesIO(export), fix_on_error=True)
formdef2 = FormDef.import_from_xml(io.BytesIO(export), fix_on_error=True)
assert formdef2.fields[0].id == '1'
assert formdef2.fields[1].id == '2'
assert formdef2.fields[2].id == '3'
@ -446,7 +446,7 @@ def test_wrong_max_field_id():
formdef.max_field_id = 1
export = ET.tostring(export_to_indented_xml(formdef, include_id=True))
formdef2 = FormDef.import_from_xml(BytesIO(export), include_id=True)
formdef2 = FormDef.import_from_xml(io.BytesIO(export), include_id=True)
assert formdef2.max_field_id == 2
@ -639,7 +639,7 @@ def test_field_validation():
old_format = ET.tostring(formdef_xml).replace(
b'<validation><type>regex</type><value>\\d</value></validation>', b'<validation>\\d</validation>'
)
f2 = FormDef.import_from_xml(BytesIO(old_format))
f2 = FormDef.import_from_xml(io.BytesIO(old_format))
assert len(f2.fields) == len(formdef.fields)
assert f2.fields[0].validation == {'type': 'regex', 'value': '\\d'}
@ -745,7 +745,7 @@ def test_custom_views():
formdef.data_class().wipe()
pub.custom_view_class.wipe()
formdef2 = FormDef.import_from_xml(BytesIO(ET.tostring(formdef_xml)))
formdef2 = FormDef.import_from_xml(io.BytesIO(ET.tostring(formdef_xml)))
assert formdef2.name == 'foo'
assert formdef2._custom_views

View File

@ -8,12 +8,12 @@ import pytest
import shutil
import sys
import tempfile
import urllib.parse
import mock
from utilities import create_temporary_pub, clean_temporary_pub
from django.utils.six.moves.urllib import parse as urlparse
from quixote import cleanup
from wcs.qommon import force_str
@ -243,7 +243,7 @@ def test_configure_site_options():
key = '109fca71e7dc8ec49708a08fa7c02795de13f34f7d29d27bd150f203b3e0ab40'
assert pub.get_site_option('authentic.example.net', 'api-secrets') == key
assert pub.get_site_option('authentic.example.net', 'wscall-secrets') == key
self_domain = urlparse.urlsplit(service.get('base_url')).netloc
self_domain = urllib.parse.urlsplit(service.get('base_url')).netloc
assert pub.get_site_option(self_domain, 'wscall-secrets') != '0'
service['variables']['xxx'] = None

View File

@ -1,13 +1,13 @@
# -*- coding: utf-8 -*-
import base64
import io
import os
import pytest
from webtest import Upload
import xml.etree.ElementTree as ET
from django.utils.encoding import force_bytes
from django.utils.six import StringIO
from quixote import cleanup
from wcs.formdef import FormDef
from wcs.fields import FileField
@ -325,7 +325,7 @@ def test_mail_templates_export(pub, superuser, mail_template):
resp = resp.click(href='export')
xml_export = resp.text
ds = StringIO(xml_export)
ds = io.StringIO(xml_export)
mail_template2 = MailTemplate.import_from_xml(ds)
assert mail_template2.name == 'test MT'

View File

@ -1,3 +1,4 @@
import io
import json
import re
import stat
@ -17,9 +18,7 @@ from django.core.management import call_command
from django.core.management.base import CommandError
from django.http import Http404
from django.test import override_settings
from django.utils import six
from django.utils.encoding import force_text
from django.utils.six import BytesIO, StringIO
from quixote import cleanup
from wcs.qommon import get_publisher_class
from wcs.qommon.afterjobs import AfterJob
@ -115,7 +114,7 @@ def test_finish_failed_request():
def test_finish_interrupted_request():
req = HTTPRequest(
StringIO(''),
io.StringIO(''),
{
'SERVER_NAME': 'example.net',
'SCRIPT_NAME': '',
@ -125,7 +124,7 @@ def test_finish_interrupted_request():
response = pub.process_request(req)
assert b'invalid content-length header' in response.getvalue()
req = HTTPRequest(
StringIO(''),
io.StringIO(''),
{
'SERVER_NAME': 'example.net',
'SCRIPT_NAME': '',
@ -136,7 +135,7 @@ def test_finish_interrupted_request():
response = pub.process_request(req)
assert b'Invalid request: unexpected end of request body' in response.getvalue()
req = HTTPRequest(
StringIO(''),
io.StringIO(''),
{
'SERVER_NAME': 'example.net',
'SCRIPT_NAME': '',
@ -148,7 +147,7 @@ def test_finish_interrupted_request():
assert b'Invalid request: multipart/form-data missing boundary' in response.getvalue()
with pytest.raises(Http404):
req = HTTPRequest(
StringIO(''),
io.StringIO(''),
{
'SERVER_NAME': 'example.net',
'SCRIPT_NAME': '',
@ -184,7 +183,7 @@ def test_import_config_zip():
pub.cfg['sp'] = {'what': 'ever'}
pub.write_cfg()
c = BytesIO()
c = io.BytesIO()
z = zipfile.ZipFile(c, 'w')
z.writestr('config.pck', pickle.dumps({'language': {'language': 'fr'}, 'whatever': ['a', 'b', 'c']}))
z.close()
@ -195,7 +194,7 @@ def test_import_config_zip():
assert pub.cfg['whatever'] == ['a', 'b', 'c']
assert pub.cfg['sp'] == {'what': 'ever'}
c = BytesIO()
c = io.BytesIO()
z = zipfile.ZipFile(c, 'w')
z.writestr(
'config.json', json.dumps({'language': {'language': 'en'}, 'whatever2': ['a', 'b', {'c': 'd'}]})

View File

@ -1,7 +1,5 @@
import shutil
from django.utils import six
from quixote import cleanup
from wcs.qommon import x509utils
@ -34,6 +32,6 @@ def test_metadata_generation():
)
assert meta != None
content = meta.get_saml2_metadata(pkey, '', True, True)
assert isinstance(content, six.string_types) and content != ''
assert isinstance(content, str) and content != ''
assert 'EntityDescriptor' in content
assert 'SPSSODescriptor' in content

View File

@ -2,6 +2,7 @@ import datetime
import os
import sys
import shutil
import urllib.parse
try:
import lasso
@ -10,7 +11,6 @@ except ImportError:
import pytest
from django.utils.six.moves.urllib import parse as urlparse
from quixote import cleanup
from quixote import get_session, get_session_manager
from wcs.qommon.http_request import HTTPRequest
@ -350,7 +350,7 @@ def test_saml_login_page(pub):
assert resp.status_int == 302
assert resp.location.startswith('http://sso.example.net/saml2/sso?SAMLRequest=')
request = lasso.Samlp2AuthnRequest()
request.initFromQuery(urlparse.urlparse(resp.location).query)
request.initFromQuery(urllib.parse.urlparse(resp.location).query)
assert request.forceAuthn is False
@ -359,7 +359,7 @@ def test_saml_login_page_force_authn(pub):
assert resp.status_int == 302
assert resp.location.startswith('http://sso.example.net/saml2/sso?SAMLRequest=')
request = lasso.Samlp2AuthnRequest()
request.initFromQuery(urlparse.urlparse(resp.location).query)
request.initFromQuery(urllib.parse.urlparse(resp.location).query)
assert request.forceAuthn is True
@ -380,13 +380,13 @@ def test_saml_backoffice_redirect(pub):
assert resp.location.startswith('http://example.net/login/?next=')
resp = resp.follow()
assert resp.location.startswith('http://sso.example.net/saml2/sso')
assert urlparse.parse_qs(urlparse.urlparse(resp.location).query)['SAMLRequest']
assert urlparse.parse_qs(urlparse.urlparse(resp.location).query)['RelayState'] == [
assert urllib.parse.parse_qs(urllib.parse.urlparse(resp.location).query)['SAMLRequest']
assert urllib.parse.parse_qs(urllib.parse.urlparse(resp.location).query)['RelayState'] == [
'http://example.net/backoffice/'
]
request = lasso.Samlp2AuthnRequest()
request.initFromQuery(urlparse.urlparse(resp.location).query)
request.initFromQuery(urllib.parse.urlparse(resp.location).query)
assert ':next_url>http://example.net/backoffice/<' in request.getOriginalXmlnode()
@ -395,7 +395,7 @@ def test_saml_login_hint(pub):
assert resp.status_int == 302
assert resp.location.startswith('http://sso.example.net/saml2/sso')
request = lasso.Samlp2AuthnRequest()
request.initFromQuery(urlparse.urlparse(resp.location).query)
request.initFromQuery(urllib.parse.urlparse(resp.location).query)
assert 'login-hint' not in request.getOriginalXmlnode()
resp = get_app(pub).get('/backoffice/')
@ -404,12 +404,12 @@ def test_saml_login_hint(pub):
resp = resp.follow()
assert resp.location.startswith('http://sso.example.net/saml2/sso')
request = lasso.Samlp2AuthnRequest()
request.initFromQuery(urlparse.urlparse(resp.location).query)
request.initFromQuery(urllib.parse.urlparse(resp.location).query)
assert ':login-hint>backoffice<' in request.getOriginalXmlnode()
resp = get_app(pub).get('http://example.net/login/?next=/backoffice/')
request = lasso.Samlp2AuthnRequest()
request.initFromQuery(urlparse.urlparse(resp.location).query)
request.initFromQuery(urllib.parse.urlparse(resp.location).query)
assert ':login-hint>backoffice<' in request.getOriginalXmlnode()
@ -507,7 +507,7 @@ def test_saml_idp_logout(pub):
logout.buildRequestMsg()
# process logout message
saml2.slo_idp(urlparse.urlparse(logout.msgUrl).query)
saml2.slo_idp(urllib.parse.urlparse(logout.msgUrl).query)
assert req.response.headers['location'].startswith(
'http://sso.example.net/saml2/slo_return?SAMLResponse='
)

View File

@ -1,10 +1,10 @@
import io
import os
import shutil
import xml.etree.ElementTree as ET
import pytest
from django.utils.six import BytesIO
from quixote.http_request import Upload
from wcs.blocks import BlockDef
@ -410,7 +410,7 @@ def test_workflow_with_model_snapshot_browse(pub):
export_to.label = 'test'
upload = Upload('/foo/bar', content_type='application/vnd.oasis.opendocument.text')
file_content = b'''PK\x03\x04\x14\x00\x00\x08\x00\x00\'l\x8eG^\xc62\x0c\'\x00'''
upload.fp = BytesIO()
upload.fp = io.BytesIO()
upload.fp.write(file_content)
upload.fp.seek(0)
export_to.model_file = UploadedFile('models', 'tmp', upload)

View File

@ -1,12 +1,11 @@
# -*- coding: utf-8 -*-
import io
import pytest
import xml.etree.ElementTree as ET
from quixote.http_request import Upload
from django.utils.six import BytesIO
from wcs.carddef import CardDef
from wcs.formdef import FormDef
from wcs.mail_templates import MailTemplate
@ -202,7 +201,7 @@ def test_status_actions_named_existing_role(pub):
b'<item role_id="2">Test Role named existing role</item>',
b'<item>Test Role named existing role</item>',
)
wf3 = Workflow.import_from_xml_tree(ET.parse(BytesIO(xml_export)))
wf3 = Workflow.import_from_xml_tree(ET.parse(io.BytesIO(xml_export)))
assert wf3.possible_status[0].items[0].by == ['2']
@ -235,7 +234,7 @@ def test_status_actions_named_missing_role(pub):
xml_export = xml_export_orig.replace(
b'<item role_id="3">Test Role A</item>', b'<item role_id="4">Test Role A</item>'
)
wf3 = Workflow.import_from_xml_tree(ET.parse(BytesIO(xml_export)))
wf3 = Workflow.import_from_xml_tree(ET.parse(io.BytesIO(xml_export)))
assert wf3.possible_status[0].items[0].by == ['3']
# check that it creates a new role if there's no match on id and name
@ -243,7 +242,7 @@ def test_status_actions_named_missing_role(pub):
b'<item role_id="3">Test Role A</item>', b'<item role_id="999">foobar</item>'
)
nb_roles = Role.count()
wf3 = Workflow.import_from_xml_tree(ET.parse(BytesIO(xml_export)))
wf3 = Workflow.import_from_xml_tree(ET.parse(io.BytesIO(xml_export)))
assert Role.count() == nb_roles + 1
# check that it doesn't fallback on the id if there's no match on the
@ -252,12 +251,12 @@ def test_status_actions_named_missing_role(pub):
xml_export = xml_export_orig.replace(
b'<item role_id="3">Test Role A</item>', b'<item role_id="3">Test Role C</item>'
)
wf3 = Workflow.import_from_xml_tree(ET.parse(BytesIO(xml_export)))
wf3 = Workflow.import_from_xml_tree(ET.parse(io.BytesIO(xml_export)))
assert wf3.possible_status[0].items[0].by != ['3']
assert Role.count() == nb_roles + 1
# on the other hand, check that it uses the id when included_id is True
wf3 = Workflow.import_from_xml_tree(ET.parse(BytesIO(xml_export)), include_id=True)
wf3 = Workflow.import_from_xml_tree(ET.parse(io.BytesIO(xml_export)), include_id=True)
assert wf3.possible_status[0].items[0].by == ['3']
@ -291,7 +290,7 @@ def test_export_to_model_action(pub):
export_to.label = 'test'
upload = Upload('/foo/bar', content_type='application/vnd.oasis.opendocument.text')
file_content = b'''PK\x03\x04\x14\x00\x00\x08\x00\x00\'l\x8eG^\xc62\x0c\'\x00'''
upload.fp = BytesIO()
upload.fp = io.BytesIO()
upload.fp.write(file_content)
upload.fp.seek(0)
export_to.model_file = UploadedFile(pub.APP_DIR, None, upload)
@ -308,7 +307,7 @@ def test_export_to_model_action(pub):
export_to.label = 'test'
upload = Upload('/foo/bar', content_type='text/rtf')
file_content = b''
upload.fp = BytesIO()
upload.fp = io.BytesIO()
upload.fp.write(file_content)
upload.fp.seek(0)
export_to.model_file = UploadedFile(pub.APP_DIR, None, upload)
@ -397,7 +396,7 @@ def test_commentable_action(pub):
assert b'<required>True</required>' in xml_export
xml_export = xml_export.replace(b'<required>True</required>', b'')
assert b'<required>True</required>' not in xml_export
wf2 = Workflow.import_from_xml_tree(ET.parse(BytesIO(xml_export)))
wf2 = Workflow.import_from_xml_tree(ET.parse(io.BytesIO(xml_export)))
assert wf2.possible_status[0].items[0].required is False
commentable.button_label = 'button label'
@ -930,7 +929,7 @@ def test_unknown_data_source(pub):
for wf in [wf1, wf2, wf3]:
export = ET.tostring(export_to_indented_xml(wf))
with pytest.raises(WorkflowImportError, match='Unknown datasources'):
Workflow.import_from_xml(BytesIO(export))
Workflow.import_from_xml(io.BytesIO(export))
# carddef as datasource
CardDef.wipe()
@ -945,7 +944,7 @@ def test_unknown_data_source(pub):
for wf in [wf1, wf2, wf3]:
export = ET.tostring(export_to_indented_xml(wf))
Workflow.import_from_xml(BytesIO(export))
Workflow.import_from_xml(io.BytesIO(export))
display_form.formdef.fields[0].data_source = {'type': 'carddef:unknown'}
wf2.variables_formdef.fields[0].data_source = {'type': 'carddef:unknown'}
@ -954,7 +953,7 @@ def test_unknown_data_source(pub):
for wf in [wf1, wf2, wf3]:
export = ET.tostring(export_to_indented_xml(wf))
with pytest.raises(WorkflowImportError, match='Unknown datasources'):
Workflow.import_from_xml(BytesIO(export))
Workflow.import_from_xml(io.BytesIO(export))
# carddef custom view as datasource
pub.custom_view_class.wipe()
@ -972,7 +971,7 @@ def test_unknown_data_source(pub):
for wf in [wf1, wf2, wf3]:
export = ET.tostring(export_to_indented_xml(wf))
Workflow.import_from_xml(BytesIO(export))
Workflow.import_from_xml(io.BytesIO(export))
display_form.formdef.fields[0].data_source = {'type': 'carddef:foo:unknown'}
wf2.variables_formdef.fields[0].data_source = {'type': 'carddef:foo:unknown'}
@ -981,4 +980,4 @@ def test_unknown_data_source(pub):
for wf in [wf1, wf2, wf3]:
export = ET.tostring(export_to_indented_xml(wf))
with pytest.raises(WorkflowImportError, match='Unknown datasources'):
Workflow.import_from_xml(BytesIO(export))
Workflow.import_from_xml(io.BytesIO(export))

View File

@ -1,10 +1,12 @@
import base64
import io
import json
import datetime
import os
import pytest
import shutil
import time
import urllib.parse
import zipfile
import mock
@ -14,10 +16,7 @@ try:
except ImportError:
Image = None
from django.utils import six
from django.utils.encoding import force_bytes, force_text
from django.utils.six import BytesIO, StringIO
from django.utils.six.moves.urllib import parse as urlparse
from quixote import cleanup, get_response
from wcs.qommon.errors import ConnectionError
@ -1008,7 +1007,7 @@ def test_register_comment_attachment(pub):
shutil.rmtree(os.path.join(get_publisher().app_dir, 'attachments'))
formdata.evolution[-1].parts = [
AttachmentEvolutionPart('hello.txt', fp=BytesIO(b'hello world'), varname='testfile')
AttachmentEvolutionPart('hello.txt', fp=io.BytesIO(b'hello world'), varname='testfile')
]
formdata.store()
assert len(os.listdir(os.path.join(get_publisher().app_dir, 'attachments'))) == 1
@ -1961,7 +1960,7 @@ def test_webservice_call(http_requests, pub):
pub.substitutions.feed(formdata)
item.perform(formdata)
assert http_requests.get_last('method') == 'GET'
qs = urlparse.parse_qs(http_requests.get_last('url').split('?')[1])
qs = urllib.parse.parse_qs(http_requests.get_last('url').split('?')[1])
assert set(qs.keys()) == set(['in_url', 'str', 'one', 'evalme', 'django', 'ezt'])
assert qs['in_url'] == ['1', '2']
assert qs['one'] == ['1']
@ -3188,7 +3187,7 @@ def test_geolocate_address(pub):
)
item.perform(formdata)
assert 'https://nominatim.entrouvert.org/search' in http_get_page.call_args[0][0]
assert urlparse.quote('169 rue du chateau, paris') in http_get_page.call_args[0][0]
assert urllib.parse.quote('169 rue du chateau, paris') in http_get_page.call_args[0][0]
assert int(formdata.geolocations['base']['lat']) == 48
assert int(formdata.geolocations['base']['lon']) == 2
@ -3203,7 +3202,7 @@ def test_geolocate_address(pub):
)
item.perform(formdata)
assert 'https://nominatim.entrouvert.org/search' in http_get_page.call_args[0][0]
assert urlparse.quote('169 rue du chateau, paris') in http_get_page.call_args[0][0]
assert urllib.parse.quote('169 rue du chateau, paris') in http_get_page.call_args[0][0]
assert 'key=KEY' in http_get_page.call_args[0][0]
assert int(formdata.geolocations['base']['lat']) == 48
assert int(formdata.geolocations['base']['lon']) == 2
@ -3435,7 +3434,7 @@ def test_export_to_model_image(pub):
template_filename = os.path.join(os.path.dirname(__file__), 'template-with-image.odt')
template = open(template_filename, 'rb').read()
upload = QuixoteUpload('/foo/template.odt', content_type='application/octet-stream')
upload.fp = BytesIO()
upload.fp = io.BytesIO()
upload.fp.write(template)
upload.fp.seek(0)
item.model_file = UploadedFile(pub.app_dir, None, upload)
@ -3499,7 +3498,7 @@ def test_export_to_model_backoffice_field(pub):
template_filename = os.path.join(os.path.dirname(__file__), 'template.odt')
template = open(template_filename, 'rb').read()
upload = QuixoteUpload('/foo/template.odt', content_type='application/octet-stream')
upload.fp = BytesIO()
upload.fp = io.BytesIO()
upload.fp.write(template)
upload.fp.seek(0)
item.model_file = UploadedFile(pub.app_dir, None, upload)
@ -3557,7 +3556,7 @@ def test_export_to_model_django_template(pub):
template_filename = os.path.join(os.path.dirname(__file__), 'template-django.odt')
template = open(template_filename, 'rb').read()
upload = QuixoteUpload('/foo/template-django.odt', content_type='application/octet-stream')
upload.fp = BytesIO()
upload.fp = io.BytesIO()
upload.fp.write(template)
upload.fp.seek(0)
item.model_file = UploadedFile(pub.app_dir, None, upload)
@ -3629,7 +3628,7 @@ def test_export_to_model_form_details_section(pub, filename):
template_filename = os.path.join(os.path.dirname(__file__), filename)
template = open(template_filename, 'rb').read()
upload = QuixoteUpload(filename, content_type='application/octet-stream')
upload.fp = BytesIO()
upload.fp = io.BytesIO()
upload.fp.write(template)
upload.fp.seek(0)
item.model_file = UploadedFile(pub.app_dir, None, upload)

View File

@ -7,6 +7,7 @@ import random
import psycopg2
import shutil
import sys
import urllib.parse
from wcs import sql, sessions, custom_views
@ -14,7 +15,6 @@ from webtest import TestApp
from quixote import cleanup, get_publisher
from django.conf import settings
from django.utils.encoding import force_bytes, force_text
from django.utils.six.moves.urllib import parse as urlparse
from wcs.qommon import force_str
import wcs
@ -330,8 +330,8 @@ class HttpRequestsMocking(object):
{'url': url, 'method': method, 'body': body, 'headers': headers, 'timeout': timeout}
)
scheme, netloc, path, params, query, fragment = urlparse.urlparse(url)
base_url = urlparse.urlunparse((scheme, netloc, path, '', '', ''))
scheme, netloc, path, params, query, fragment = urllib.parse.urlparse(url)
base_url = urllib.parse.urlunparse((scheme, netloc, path, '', '', ''))
with open(os.path.join(os.path.dirname(__file__), 'idp_metadata.xml')) as fd:
metadata = fd.read()

View File

@ -16,14 +16,13 @@
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import io
import xml.etree.ElementTree as ET
import datetime
import difflib
import tarfile
import time
from django.utils.six import BytesIO, StringIO
from quixote import get_publisher, get_response, redirect
from quixote.directory import Directory, AccessControlled
from quixote.html import TemplateIO, htmltext
@ -1247,7 +1246,7 @@ class FormDefPage(Directory):
if form.get_widget('file').parse():
fp = form.get_widget('file').parse().fp
elif form.get_widget('new_formdef').parse():
fp = StringIO(form.get_widget('new_formdef').parse())
fp = io.StringIO(form.get_widget('new_formdef').parse())
elif form.get_widget('url').parse():
url = form.get_widget('url').parse()
try:
@ -1513,7 +1512,7 @@ class FormDefPage(Directory):
date = time.strptime(date, misc.date_format())
all_forms = [x for x in all_forms if x.last_update_time < date]
self.fd = BytesIO()
self.fd = io.BytesIO()
t = tarfile.open('wcs.tar.gz', 'w:gz', fileobj=self.fd)
t.add(self.formdef.get_object_filename(), 'formdef')
for formdata in all_forms:

View File

@ -16,6 +16,7 @@
import copy
import hashlib
import io
import mimetypes
import os
@ -29,7 +30,6 @@ import shutil
import xml.etree.ElementTree as ET
from django.utils.encoding import force_bytes
from django.utils.six import BytesIO, StringIO
from quixote import get_publisher, get_request, get_response, redirect
from quixote.directory import Directory
@ -814,7 +814,7 @@ class SettingsDirectory(QommonSettingsDirectory):
return redirect('themes')
parent_theme_directory = os.path.dirname(theme_directory)
c = BytesIO()
c = io.BytesIO()
z = zipfile.ZipFile(c, 'w')
for base, dirnames, filenames in os.walk(theme_directory):
basetheme = base[len(parent_theme_directory) + 1 :]
@ -877,7 +877,7 @@ class SettingsDirectory(QommonSettingsDirectory):
get_session().message = ('error', _('Theme is missing a desc.xml file.'))
return redirect('themes')
desc_xml = z.read('%s/desc.xml' % theme_name)
theme_dict = template.get_theme_dict(StringIO(force_text(desc_xml)))
theme_dict = template.get_theme_dict(io.StringIO(force_text(desc_xml)))
if theme_dict.get('name') != theme_name:
get_session().message = ('error', _('desc.xml is missing a name attribute.'))
return redirect('themes')
@ -901,7 +901,7 @@ class SettingsDirectory(QommonSettingsDirectory):
get_session().message = ('error', _('Error loading theme (%s).') % str(e))
return redirect('themes')
return self.install_theme_from_file(StringIO(fp.read()))
return self.install_theme_from_file(io.StringIO(fp.read()))
def template(self):
from wcs.qommon.template import get_default_ezt_template
@ -994,7 +994,7 @@ class SettingsDirectory(QommonSettingsDirectory):
self.settings = settings
def export(self, job):
c = BytesIO()
c = io.BytesIO()
z = zipfile.ZipFile(c, 'w')
for d in self.dirs:
if d not in (

View File

@ -14,8 +14,6 @@
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
from django.utils import six
from quixote import get_publisher, get_response, get_request, get_session, redirect
from quixote.directory import Directory
from quixote.html import TemplateIO, htmltext
@ -327,7 +325,7 @@ class UsersDirectory(Directory):
checked_roles = None
if get_request().form.get('filter'):
checked_roles = get_request().form.get('role', [])
if isinstance(checked_roles, six.string_types):
if isinstance(checked_roles, str):
checked_roles = [checked_roles]
if checked_roles:

View File

@ -18,14 +18,13 @@
from __future__ import print_function
import io
import time
from subprocess import Popen, PIPE
import textwrap
import xml.etree.ElementTree as ET
from django.utils import six
from django.utils.encoding import force_bytes
from django.utils.six import StringIO
from quixote import redirect, get_publisher, get_response
from quixote.directory import Directory
@ -132,7 +131,7 @@ def graphviz_post_treatment(content, colours, include=False):
def graphviz(workflow, url_prefix='', select=None, svg=True, include=False):
out = StringIO()
out = io.StringIO()
# a list of colours known to graphviz, they will serve as key to get back
# to the colours defined in wcs, they are used as color attributes in
# graphviz (<= 2.38) then as class attribute on node elements for 2.40 and

View File

@ -18,13 +18,13 @@ import datetime
import json
import re
import time
import urllib.parse
from quixote import get_request, get_publisher, get_response, get_session
from quixote.errors import MethodNotAllowedError
from quixote.directory import Directory
from django.utils.encoding import force_text
from django.utils.six.moves.urllib import parse as urllib
from django.http import HttpResponse, HttpResponseBadRequest, JsonResponse
from .qommon import _
@ -1019,7 +1019,7 @@ class AutocompleteDirectory(Directory):
if 'url' in info:
url = info['url']
url += urllib.quote(get_request().form['q'])
url += urllib.parse.quote(get_request().form['q'])
url = sign_url_auto_orig(url)
get_response().set_content_type('application/json')
return misc.urlopen(url).read()
@ -1113,7 +1113,7 @@ def geocoding(request, *args, **kwargs):
url += '&'
else:
url += '?'
url += 'format=json&q=%s' % urllib.quote(q.encode('utf-8'))
url += 'format=json&q=%s' % urllib.parse.quote(q.encode('utf-8'))
url += '&accept-language=%s' % (get_publisher().get_site_language() or 'en')
return HttpResponse(misc.urlopen(url).read(), content_type='application/json')

View File

@ -20,13 +20,11 @@ import hashlib
import datetime
import random
import os
import urllib.parse
import errno
import calendar
from django.utils import six
from django.utils.encoding import force_bytes, force_text
from django.utils.six.moves.urllib import parse as urllib
from django.utils.six.moves.urllib import parse as urlparse
from quixote import get_request, get_publisher
from .api_access import ApiAccess
@ -43,18 +41,18 @@ def is_url_signed(utcnow=None, duration=DEFAULT_DURATION):
if not query_string:
return False
signature = get_request().form.get('signature')
if not isinstance(signature, six.string_types):
if not isinstance(signature, str):
return False
signature = force_bytes(signature)
# verify signature
orig = get_request().form.get('orig')
if not isinstance(orig, six.string_types):
if not isinstance(orig, str):
raise AccessForbiddenError('missing/multiple orig field')
key = ApiAccess.get_access_key(orig) or get_publisher().get_site_option(orig, 'api-secrets')
if not key:
raise AccessForbiddenError('invalid orig')
algo = get_request().form.get('algo')
if not isinstance(algo, six.string_types):
if not isinstance(algo, str):
raise AccessForbiddenError('missing/multiple algo field')
if algo not in hashlib.algorithms_guaranteed:
raise AccessForbiddenError('invalid algo')
@ -69,7 +67,7 @@ def is_url_signed(utcnow=None, duration=DEFAULT_DURATION):
):
raise AccessForbiddenError('invalid signature')
timestamp = get_request().form.get('timestamp')
if not isinstance(timestamp, six.string_types):
if not isinstance(timestamp, str):
raise AccessForbiddenError('missing/multiple timestamp field')
try:
timestamp = datetime.datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%SZ')
@ -133,7 +131,7 @@ def get_user_from_api_query_string(api_name=None):
user = None
if get_request().form.get('email'):
email = get_request().form.get('email')
if not isinstance(email, six.string_types):
if not isinstance(email, str):
raise AccessForbiddenError('multiple email field')
users = list(get_publisher().user_class.get_users_with_email(email))
if users:
@ -142,7 +140,7 @@ def get_user_from_api_query_string(api_name=None):
raise AccessForbiddenError('unknown email')
elif get_request().form.get('NameID'):
ni = get_request().form.get('NameID')
if not isinstance(ni, six.string_types):
if not isinstance(ni, str):
raise AccessForbiddenError('multiple NameID field')
users = list(get_publisher().user_class.get_users_with_name_identifier(ni))
if users:
@ -158,9 +156,9 @@ def get_user_from_api_query_string(api_name=None):
def sign_url(url, key, algo='sha256', timestamp=None, nonce=None):
parsed = urlparse.urlparse(url)
parsed = urllib.parse.urlparse(url)
new_query = sign_query(parsed.query, key, algo, timestamp, nonce)
return urlparse.urlunparse(parsed[:4] + (new_query,) + parsed[5:])
return urllib.parse.urlunparse(parsed[:4] + (new_query,) + parsed[5:])
def sign_query(query, key, algo='sha256', timestamp=None, nonce=None):
@ -173,9 +171,9 @@ def sign_query(query, key, algo='sha256', timestamp=None, nonce=None):
new_query = query
if new_query:
new_query += '&'
new_query += urllib.urlencode((('algo', algo), ('timestamp', timestamp), ('nonce', nonce)))
new_query += urllib.parse.urlencode((('algo', algo), ('timestamp', timestamp), ('nonce', nonce)))
signature = base64.b64encode(sign_string(new_query, key, algo=algo))
new_query += '&signature=' + urllib.quote(signature)
new_query += '&signature=' + urllib.parse.quote(signature)
return new_query
@ -191,8 +189,8 @@ class MissingSecret(Exception):
def get_secret_and_orig(url):
frontoffice_url = get_publisher().get_frontoffice_url()
orig = urlparse.urlparse(frontoffice_url).netloc.rsplit('@', 1)[-1].rsplit(':', 1)[0]
target_orig = urlparse.urlparse(url).netloc.rsplit('@', 1)[-1].rsplit(':', 1)[0]
orig = urllib.parse.urlparse(frontoffice_url).netloc.rsplit('@', 1)[-1].rsplit(':', 1)[0]
target_orig = urllib.parse.urlparse(url).netloc.rsplit('@', 1)[-1].rsplit(':', 1)[0]
secret = get_publisher().get_site_option(target_orig, 'wscall-secrets')
if not secret:
raise MissingSecret()
@ -204,11 +202,11 @@ def sign_url_auto_orig(url):
signature_key, orig = get_secret_and_orig(url)
except MissingSecret:
return url
parsed = urlparse.urlparse(url)
querystring = urlparse.parse_qsl(parsed.query)
parsed = urllib.parse.urlparse(url)
querystring = urllib.parse.parse_qsl(parsed.query)
querystring.append(('orig', orig))
querystring = urllib.urlencode(querystring)
url = urlparse.urlunparse(parsed[:4] + (querystring,) + parsed[5:6])
querystring = urllib.parse.urlencode(querystring)
url = urllib.parse.urlunparse(parsed[:4] + (querystring,) + parsed[5:6])
return sign_url(url, signature_key)

View File

@ -16,12 +16,12 @@
import csv
import datetime
import io
from quixote import get_publisher, get_request, get_response, redirect
from quixote.html import TemplateIO, htmltext, htmlescape
from django.utils.encoding import force_text
from django.utils.six import StringIO
from ..qommon import _, N_
@ -169,7 +169,7 @@ class CardPage(FormPage):
def data_sample_csv(self):
carddef_fields = self.get_import_csv_fields()
output = StringIO()
output = io.StringIO()
csv_output = csv.writer(output)
csv_output.writerow([f.label for f in carddef_fields])
sample_line = []

View File

@ -16,18 +16,17 @@
import csv
import datetime
import io
import json
import re
import time
import types
import urllib.parse
import vobject
import zipfile
from django.conf import settings
from django.utils import six
from django.utils.encoding import force_text
from django.utils.six.moves.urllib import parse as urllib
from django.utils.six import BytesIO, StringIO
from quixote import get_session, get_publisher, get_request, get_response, redirect
from quixote.directory import Directory
@ -904,7 +903,7 @@ class ManagementDirectory(Directory):
total_count = sql.AnyFormData.count(criterias)
if offset > total_count:
get_request().form['offset'] = '0'
return redirect('listing?' + urllib.urlencode(get_request().form))
return redirect('listing?' + urllib.parse.urlencode(get_request().form))
formdatas = sql.AnyFormData.select(criterias, order_by=order_by, limit=limit, offset=offset)
include_submission_channel = bool(get_publisher().get_site_option('welco_url', 'variables'))
@ -2937,7 +2936,7 @@ class FormBackOfficeStatusPage(FormStatusPage):
def download_as_zip(self):
formdata = self.filled
zip_content = BytesIO()
zip_content = io.BytesIO()
zip_file = zipfile.ZipFile(zip_content, 'w')
counter = {'value': 0}
@ -3281,7 +3280,7 @@ class FormBackOfficeStatusPage(FormStatusPage):
else:
r += htmltext('<li><code title="%s">%s</code>') % (k, k)
r += htmltext(' <div class="value"><span>%s</span>') % ellipsize(safe(v), 10000)
if not isinstance(v, six.string_types):
if not isinstance(v, str):
r += htmltext(' <span class="type">(%r)</span>') % type(v)
r += htmltext('</div></li>')
r += htmltext('</div>')
@ -3706,7 +3705,7 @@ class CsvExportAfterJob(AfterJob):
return self.create_export(formdef, fields, items, total_count)
def create_export(self, formdef, fields, items, total_count):
output = StringIO()
output = io.StringIO()
csv_output = csv.writer(output)
csv_output.writerow(self.csv_tuple_heading(fields))
@ -3752,7 +3751,7 @@ class OdsExportAfterJob(CsvExportAfterJob):
native_value=item['native_value'],
)
output = BytesIO()
output = io.BytesIO()
workbook.save(output)
self.file_content = output.getvalue()

View File

@ -14,8 +14,6 @@
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
from django.utils import six
from wcs.formdata import FormData
@ -43,7 +41,7 @@ class CardData(FormData):
if not field.varname:
continue
value = self.data and self.data.get(field.id)
if isinstance(value, six.string_types):
if isinstance(value, str):
item[field.varname] = value
return item

View File

@ -16,17 +16,16 @@
from __future__ import print_function
import configparser
import json
import os
import random
import sys
import tempfile
import hashlib
import urllib.parse
from django.utils import six
from django.utils.encoding import force_bytes, force_text
from django.utils.six.moves import configparser as ConfigParser
from django.utils.six.moves.urllib import parse as urlparse
from quixote import cleanup
from wcs.qommon import force_str
@ -365,7 +364,7 @@ class CmdCheckHobos(Command):
pub.write_cfg()
def get_instance_path(self, service):
parsed_url = urlparse.urlsplit(service.get('base_url'))
parsed_url = urllib.parse.urlsplit(service.get('base_url'))
instance_path = parsed_url.netloc
if parsed_url.path:
instance_path += '+%s' % parsed_url.path.replace('/', '+')
@ -373,7 +372,7 @@ class CmdCheckHobos(Command):
def configure_site_options(self, current_service, pub, ignore_timestamp=False):
# configure site-options.cfg
config = ConfigParser.RawConfigParser()
config = configparser.RawConfigParser()
site_options_filepath = os.path.join(pub.app_dir, 'site-options.cfg')
if os.path.exists(site_options_filepath):
config.read(site_options_filepath)
@ -382,7 +381,7 @@ class CmdCheckHobos(Command):
try:
if config.get('hobo', 'timestamp') == self.all_services.get('timestamp'):
raise NoChange()
except (ConfigParser.NoOptionError, ConfigParser.NoSectionError):
except (configparser.NoOptionError, configparser.NoSectionError):
pass
if not 'hobo' in config.sections():
@ -404,7 +403,7 @@ class CmdCheckHobos(Command):
if not service.get('secret_key'):
continue
domain = urlparse.urlparse(service_url).netloc.split(':')[0]
domain = urllib.parse.urlparse(service_url).netloc.split(':')[0]
if service is current_service:
if config.has_option('api-secrets', domain):
api_secrets[domain] = config.get('api-secrets', domain)
@ -455,7 +454,7 @@ class CmdCheckHobos(Command):
if value is None:
config.remove_option('variables', key)
continue
if not isinstance(value, six.string_types):
if not isinstance(value, str):
value = str(value)
value = force_str(value)
config.set('variables', key, value)
@ -480,7 +479,7 @@ class CmdCheckHobos(Command):
try:
portal_agent_url = config.get('variables', 'portal_agent_url')
except ConfigParser.NoOptionError:
except configparser.NoOptionError:
pass
else:
if portal_agent_url.endswith('/'):
@ -531,7 +530,7 @@ class CmdCheckHobos(Command):
if not createdb_cfg:
createdb_cfg = {}
for k, v in pub.cfg['postgresql'].items():
if v and isinstance(v, six.string_types):
if v and isinstance(v, str):
createdb_cfg[k] = v
try:

View File

@ -23,8 +23,6 @@ import psycopg2.errorcodes
from datetime import datetime
from shutil import rmtree
from django.utils import six
from ..qommon.ctl import Command, make_option
@ -61,7 +59,7 @@ class CmdDeleteTenant(Command):
if pub.is_using_postgresql():
postgresql_cfg = {}
for k, v in pub.cfg['postgresql'].items():
if v and isinstance(v, six.string_types):
if v and isinstance(v, str):
postgresql_cfg[k] = v
# if there's a createdb-connection-params, we can do a DROP DATABASE with

View File

@ -14,6 +14,7 @@
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import io
import os
import sys
import psycopg2
@ -22,7 +23,6 @@ import traceback
from django.utils.encoding import force_bytes
from django.core.management.base import BaseCommand
from django.core.management.base import CommandError
from django.utils.six import StringIO
from wcs.qommon.publisher import get_publisher_class
@ -82,7 +82,7 @@ class Command(BaseCommand):
self.publisher.site_options.add_section('options')
self.publisher.site_options.set('options', 'postgresql', 'true')
options_file = os.path.join(self.publisher.app_dir, 'site-options.cfg')
stringio = StringIO()
stringio = io.StringIO()
self.publisher.site_options.write(stringio)
atomic_write(options_file, force_bytes(stringio.getvalue()))

View File

@ -14,9 +14,9 @@
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import urllib.parse
import xml.etree.ElementTree as ET
from django.utils.six.moves.urllib import parse as urlparse
from django.utils.encoding import force_text
from quixote import get_publisher
@ -73,7 +73,7 @@ class CustomView(StorableObject):
return True
def set_from_qs(self, qs):
parsed_qs = urlparse.parse_qsl(qs)
parsed_qs = urllib.parse.parse_qsl(qs)
self.columns = {
'list': [
{'id': key} for (key, value) in parsed_qs if value == 'on' and not key.startswith('filter-')

View File

@ -16,13 +16,11 @@
import collections
import hashlib
import urllib.parse
import xml.etree.ElementTree as ET
from django.template import TemplateSyntaxError, VariableDoesNotExist
from django.utils import six
from django.utils.encoding import force_text, force_bytes
from django.utils.six.moves.urllib import parse as urllib
from django.utils.six.moves.urllib import parse as urlparse
from quixote import get_publisher, get_request, get_session
from quixote.html import TemplateIO
@ -288,7 +286,7 @@ def get_structured_items(data_source, mode=None):
elif len(value[0]) == 1:
return [{'id': x[0], 'text': x[0]} for x in value]
return value
elif isinstance(value[0], six.string_types):
elif isinstance(value[0], str):
return [{'id': x, 'text': x} for x in value]
return value
except:
@ -619,7 +617,7 @@ class NamedDataSource(XmlStorableObject):
url += '?'
else:
url += '&'
url += param_name + '=' + urllib.quote(param_value)
url += param_name + '=' + urllib.parse.quote(param_value)
def find_item(items, name, value):
for item in items:
@ -769,7 +767,7 @@ def has_chrono(publisher):
def chrono_url(publisher, url):
chrono_url = publisher.get_site_option('chrono_url')
return urlparse.urljoin(chrono_url, url)
return urllib.parse.urljoin(chrono_url, url)
def collect_agenda_data(publisher):

View File

@ -31,7 +31,6 @@ import collections
from quixote import get_request, get_publisher
from quixote.html import htmltag, htmltext, TemplateIO
from django.utils import six
from django.utils.encoding import force_bytes, force_text, smart_text
from django.utils.formats import date_format as django_date_format
from django.utils.html import urlize
@ -313,7 +312,7 @@ class Field(object):
atname = 'item'
for v in val:
ET.SubElement(el, atname).text = force_text(v, charset, errors='replace')
elif isinstance(val, six.string_types):
elif isinstance(val, str):
el.text = force_text(val, charset, errors='replace')
else:
el.text = str(val)
@ -1039,7 +1038,7 @@ class StringField(WidgetField):
def migrate(self):
changed = super(StringField, self).migrate()
if isinstance(self.validation, six.string_types):
if isinstance(self.validation, str):
self.validation = {'type': 'regex', 'value': self.validation}
changed = True
return changed
@ -2498,7 +2497,7 @@ class PageField(Field):
def migrate(self):
changed = super(PageField, self).migrate()
if isinstance(self.condition, six.string_types):
if isinstance(self.condition, str):
if self.condition:
self.condition = {'type': 'python', 'value': self.condition}
else:
@ -2506,7 +2505,7 @@ class PageField(Field):
changed = True
for post_condition in self.post_conditions or []:
condition = post_condition.get('condition')
if isinstance(condition, six.string_types):
if isinstance(condition, str):
if condition:
post_condition['condition'] = {'type': 'python', 'value': condition}
else:

View File

@ -22,8 +22,6 @@ import re
import sys
import time
from django.utils import six
from quixote import get_request, get_publisher, get_session
from quixote.http_request import Upload
@ -438,11 +436,7 @@ class FormData(StorableObject):
if field.prefill and field.prefill.get('type') == 'user':
form_user_data[field.prefill['value']] = self.data.get(field.id)
user_label = ' '.join(
[
form_user_data.get(x)
for x in field_name_values
if isinstance(form_user_data.get(x), six.string_types)
]
[form_user_data.get(x) for x in field_name_values if isinstance(form_user_data.get(x), str)]
)
if user_label != self.user_label:
self.user_label = user_label

View File

@ -26,7 +26,6 @@ import json
import xml.etree.ElementTree as ET
import datetime
from django.utils import six
from django.utils.encoding import force_bytes, force_text
from quixote import get_request, get_publisher
@ -839,7 +838,7 @@ class FormDef(StorableObject):
return dict([(unicode2str(k), unicode2str(v)) for k, v in v.items()])
elif isinstance(v, list):
return [unicode2str(x) for x in v]
elif isinstance(v, six.string_types):
elif isinstance(v, str):
return force_str(v)
else:
return v
@ -869,7 +868,7 @@ class FormDef(StorableObject):
):
formdef.workflow_id = value['workflow'].get('id')
elif 'workflow' in value:
if isinstance(value['workflow'], six.string_types):
if isinstance(value['workflow'], str):
workflow = value.get('workflow')
else:
workflow = value['workflow'].get('name')
@ -1009,7 +1008,7 @@ class FormDef(StorableObject):
element = ET.SubElement(options, 'option')
element.attrib['varname'] = option
option_value = self.workflow_options.get(option)
if isinstance(option_value, six.string_types):
if isinstance(option_value, str):
element.text = force_text(self.workflow_options.get(option, ''), charset)
elif hasattr(option_value, 'base_filename'):
ET.SubElement(element, 'filename').text = option_value.base_filename

View File

@ -14,7 +14,7 @@
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
from django.utils.six.moves.urllib import parse as urllib
import urllib.parse
from quixote import get_request, get_publisher, get_session, redirect
from quixote.html import htmltext, TemplateIO
@ -63,7 +63,7 @@ class FormDefUI(object):
if offset > total_count:
get_request().form['offset'] = '0'
return redirect('?' + urllib.urlencode(get_request().form))
return redirect('?' + urllib.parse.urlencode(get_request().form))
r = TemplateIO(html=True)

View File

@ -15,8 +15,7 @@
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import time
from django.utils.six.moves.urllib import parse as urllib
import urllib.parse
from quixote import get_publisher, get_request, get_response, get_session, redirect
from quixote.directory import Directory
@ -73,7 +72,7 @@ class FileDirectory(Directory):
# no such file
raise errors.TraversalError()
if component and component not in (file.base_filename, urllib.quote(file.base_filename)):
if component and component not in (file.base_filename, urllib.parse.quote(file.base_filename)):
raise errors.TraversalError()
if file.has_redirect_url():
@ -688,7 +687,7 @@ class FormStatusPage(Directory, FormTemplateMixin):
raise errors.TraversalError()
if getattr(file, 'base_filename'):
file_url += urllib.quote(file.base_filename)
file_url += urllib.parse.quote(file.base_filename)
return redirect(file_url)
@classmethod

View File

@ -15,6 +15,7 @@
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import copy
import io
import json
import time
@ -23,9 +24,7 @@ try:
except ImportError:
qrcode = None
from django.utils import six
from django.utils.http import quote
from django.utils.six import BytesIO
from django.utils.safestring import mark_safe
import ratelimit.utils
@ -1164,7 +1163,7 @@ class FormPage(Directory, FormTemplateMixin):
continue
v, locked = field.get_prefill_value(user=prefill_user)
if locked:
if not isinstance(v, six.string_types) and field.convert_value_to_str:
if not isinstance(v, str) and field.convert_value_to_str:
# convert structured data to strings as if they were
# submitted by the browser.
v = field.convert_value_to_str(v)
@ -1522,7 +1521,7 @@ class FormPage(Directory, FormTemplateMixin):
def qrcode(self):
img = qrcode.make(self.formdef.get_url())
s = BytesIO()
s = io.BytesIO()
img.save(s)
if get_request().get_query() == 'download':
get_response().set_header(

View File

@ -17,10 +17,10 @@
import json
import threading
import time
import urllib.parse
from django.http import HttpResponseBadRequest, HttpResponseRedirect
from django.utils.deprecation import MiddlewareMixin
from django.utils.six.moves.urllib import parse as urllib
from quixote import get_publisher
from quixote.errors import RequestError
@ -80,7 +80,7 @@ class PublisherInitialisationMiddleware(MiddlewareMixin):
pub.finish_successful_request() # commits session
new_query_string = ''
if compat_request.form:
new_query_string = '?' + urllib.urlencode(compat_request.form)
new_query_string = '?' + urllib.parse.urlencode(compat_request.form)
response = HttpResponseRedirect(compat_request.get_path() + new_query_string)
for name, value in compat_request.response.generate_headers():
if name == 'Content-Length':

View File

@ -16,11 +16,10 @@
import threading
import types
import urllib.parse
import django.template.base
import django.template.defaulttags
from django.utils import six
from django.utils.six.moves.urllib import parse as urlparse
import quixote
import quixote.publish
@ -78,7 +77,7 @@ def redirect(location, permanent=False):
not honor the redirect).
"""
request = _thread_local.publisher.get_request()
location = urlparse.urljoin(request.get_url(), str(location))
location = urllib.parse.urljoin(request.get_url(), str(location))
return request.response.redirect(location, permanent)
@ -105,7 +104,7 @@ def cleanup():
for key, value in list(locals().items()):
if type(value) in (types.FunctionType,) + six.class_types:
if type(value) in (types.FunctionType, type):
setattr(quixote, key, value)
setattr(quixote.publish, key, value)

View File

@ -17,10 +17,9 @@
import json
import hashlib
import base64
import urllib.parse
from django.utils.encoding import force_text
from django.utils.six.moves.urllib import parse as urllib
from django.utils.six.moves.urllib import parse as urlparse
from .qommon import N_, get_logger
from .qommon.misc import http_get_page, json_loads, http_post_request, urlopen
@ -37,7 +36,7 @@ def has_portfolio():
def fargo_url(url):
fargo_url = get_publisher().get_site_option('fargo_url')
url = urlparse.urljoin(fargo_url, url)
url = urllib.parse.urljoin(fargo_url, url)
secret, orig = get_secret_and_orig(url)
if '?' in url:
url += '&orig=%s' % orig
@ -70,7 +69,7 @@ def push_document(user, filename, stream):
payload['user_nameid'] = force_text(user.name_identifiers[0], 'ascii')
elif user.email:
payload['user_email'] = force_text(user.email, 'ascii')
payload['origin'] = urlparse.urlparse(get_publisher().get_frontoffice_url()).netloc
payload['origin'] = urllib.parse.urlparse(get_publisher().get_frontoffice_url()).netloc
payload['file_name'] = force_text(filename, charset)
stream.seek(0)
payload['file_b64_content'] = force_text(base64.b64encode(stream.read()))
@ -108,9 +107,9 @@ class FargoDirectory(Directory):
# FIXME: handle error cases
url = request.form['url']
document = urlopen(request.form['url']).read()
scheme, netloc, path, qs, frag = urlparse.urlsplit(url)
scheme, netloc, path, qs, frag = urllib.parse.urlsplit(url)
path = path.split('/')
name = urllib.unquote(path[-1])
name = urllib.parse.unquote(path[-1])
from .qommon.form import PicklableUpload
download = PicklableUpload(name, content_type='application/pdf')
@ -122,7 +121,7 @@ class FargoDirectory(Directory):
frontoffice_url = get_publisher().get_frontoffice_url()
self_url = frontoffice_url
self_url += '/fargo/pick'
return redirect('%spick/?pick=%s' % (self.fargo_url, urllib.quote(self_url)))
return redirect('%spick/?pick=%s' % (self.fargo_url, urllib.parse.quote(self_url)))
def set_token(self, token, title):
get_response().add_javascript(['jquery.js'])

View File

@ -18,6 +18,7 @@ from contextlib import contextmanager
import decimal
import json
import os
import pickle
import random
import re
import socket
@ -25,10 +26,8 @@ import sys
import traceback
import zipfile
from django.utils import six
from django.utils.encoding import force_text
from django.utils.formats import localize
from django.utils.six.moves import cPickle
from .Defaults import *
@ -194,7 +193,7 @@ class WcsPublisher(StubWcsPublisher):
def _decode_list(data):
rv = []
for item in data:
if isinstance(item, six.string_types):
if isinstance(item, str):
item = force_str(item)
elif isinstance(item, list):
item = _decode_list(item)
@ -207,7 +206,7 @@ class WcsPublisher(StubWcsPublisher):
rv = {}
for key, value in data.items():
key = force_str(key)
if isinstance(value, six.string_types):
if isinstance(value, str):
value = force_str(value)
elif isinstance(value, list):
value = _decode_list(value)
@ -231,7 +230,7 @@ class WcsPublisher(StubWcsPublisher):
if f in ('config.pck', 'config.json'):
results['settings'] = 1
if f == 'config.pck':
d = cPickle.loads(data)
d = pickle.loads(data)
else:
d = json.loads(force_text(data), object_hook=_decode_dict)
if 'sp' in self.cfg:

View File

@ -14,14 +14,13 @@
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import configparser
import copy
import threading
import django.apps
from django.conf import settings
from django.utils import six
from django.utils.encoding import force_text
from django.utils.six.moves import configparser as ConfigParser
from quixote import get_publisher
@ -153,7 +152,7 @@ class AppConfig(django.apps.AppConfig):
name = 'wcs.qommon'
def ready(self):
config = ConfigParser.ConfigParser()
config = configparser.ConfigParser()
if settings.WCS_LEGACY_CONFIG_FILE:
config.read(settings.WCS_LEGACY_CONFIG_FILE)
if hasattr(settings, 'WCS_EXTRA_MODULES') and settings.WCS_EXTRA_MODULES:

View File

@ -14,7 +14,8 @@
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
from django.utils.six.moves.urllib import parse as urllib
import urllib.parse
from quixote.html import htmltext, TemplateIO
from quixote import get_request, get_response, get_publisher
@ -41,7 +42,7 @@ def pagination_links(offset, limit, total_count, load_js=True):
query['limit'] = limit
r += htmltext(
'<a class="previous-page" data-limit="%s" data-offset="%s" href="?%s"><!--%s--></a>'
) % (limit, query['offset'], urllib.urlencode(query, doseq=1), _('Previous Page'))
) % (limit, query['offset'], urllib.parse.urlencode(query, doseq=1), _('Previous Page'))
else:
r += htmltext('<span class="previous-page"><!--%s--></span>') % _('Previous Page')
@ -77,7 +78,7 @@ def pagination_links(offset, limit, total_count, load_js=True):
klass,
limit,
query['offset'],
urllib.urlencode(query, doseq=1),
urllib.parse.urlencode(query, doseq=1),
page_number,
)
r += htmltext('</span>') # <!-- .pages -->
@ -91,7 +92,7 @@ def pagination_links(offset, limit, total_count, load_js=True):
r += htmltext('<a class="next-page" data-limit="%s" data-offset="%s" href="?%s"><!--%s--></a>') % (
limit,
query['offset'],
urllib.urlencode(query, doseq=1),
urllib.parse.urlencode(query, doseq=1),
_('Next Page'),
)
else:
@ -112,7 +113,7 @@ def pagination_links(offset, limit, total_count, load_js=True):
else:
r += htmltext('<a data-limit="%s" data-offset="0"" href="?%s">%s</a>') % (
page_size,
urllib.urlencode(query, doseq=1),
urllib.parse.urlencode(query, doseq=1),
page_size,
)
if page_size >= total_count:

View File

@ -16,6 +16,7 @@
from __future__ import print_function
import configparser
import optparse
from optparse import make_option
import sys
@ -25,8 +26,6 @@ __all__ = [
'Command',
]
from django.utils.six.moves import configparser as ConfigParser
from wcs import qommon
from . import _
@ -39,7 +38,7 @@ class Command(object):
usage_args = '[ options ... ]'
def __init__(self, options=[]):
self.config = ConfigParser.ConfigParser()
self.config = configparser.ConfigParser()
self.options = options + [
make_option('--extra', metavar='DIR', action='append', dest='extra', default=[]),
make_option('--app-dir', metavar='DIR', action='store', dest='app_dir', default=None),
@ -53,7 +52,7 @@ class Command(object):
sys.exit(1)
try:
self.config.read(base_options.configfile)
except ConfigParser.ParsingError as e:
except configparser.ParsingError as e:
print('Invalid configuration file %s' % base_options.configfile, file=sys.stderr)
print(e, file=sys.stderr)
sys.exit(1)

View File

@ -14,7 +14,7 @@
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
from django.utils.six.moves.urllib import parse as urllib
import urllib.parse
from quixote import get_publisher
import quixote
@ -61,7 +61,7 @@ class AccessUnauthorizedError(AccessForbiddenError):
if self.public_msg:
session.message = ('error', self.public_msg)
login_url = get_publisher().get_root_url() + 'login/'
login_url += '?' + urllib.urlencode({'next': request.get_frontoffice_url()})
login_url += '?' + urllib.parse.urlencode({'next': request.get_frontoffice_url()})
return quixote.redirect(login_url)

View File

@ -21,10 +21,10 @@ get_global_eval_dict.
"""
import base64
import datetime
import io
import time
from django.utils.encoding import force_bytes
from django.utils.six import BytesIO
from wcs.qommon import force_str
from wcs.qommon.upload_storage import UploadStorage
@ -155,13 +155,13 @@ def attachment(content, filename='', content_type=None, strip_metadata=False):
if strip_metadata and Image:
try:
image = Image.open(BytesIO(upload.get_content()))
image = Image.open(io.BytesIO(upload.get_content()))
except OSError:
pass
else:
image_without_exif = Image.new(image.mode, image.size)
image_without_exif.putdata(image.getdata())
content = BytesIO()
content = io.BytesIO()
image_without_exif.save(content, image.format)
upload = FileField.convert_value_from_anything(
{

View File

@ -225,10 +225,9 @@ Directives
import datetime
import html
import io
import re
import os
from django.utils import six
from django.utils.six import StringIO as cStringIO
#
# Formatting types
@ -472,7 +471,7 @@ class Template:
to the file object 'fp' and functions are called.
"""
for step in program:
if isinstance(step, six.string_types):
if isinstance(step, str):
fp.write(step)
else:
step[0](step[1], fp, ctx)
@ -562,7 +561,7 @@ class Template:
list = _get_value(valref, ctx)
except UnknownReference:
return
if isinstance(list, six.string_types):
if isinstance(list, str):
raise NeedSequenceError()
refname = valref[0]
ctx.for_index[refname] = idx = [list, 0]
@ -573,7 +572,7 @@ class Template:
def _cmd_define(self, args, fp, ctx):
((name,), unused, section) = args
valfp = cStringIO.StringIO()
valfp = io.StringIO()
if section is not None:
self._execute(section, valfp, ctx)
ctx.defines[name] = valfp.getvalue()
@ -673,7 +672,7 @@ def _get_value(value_ref, ctx):
raise UnknownReference(refname)
# make sure we return a string instead of some various Python types
if isinstance(ob, six.integer_types + (float,)):
if isinstance(ob, (int, float)):
return str(ob)
if ob is None:
return ''

View File

@ -64,7 +64,6 @@ from quixote.http_request import Upload
from quixote.util import randbytes
from django.utils.encoding import force_bytes, force_text
from django.utils import six
from django.conf import settings
from django.utils.safestring import mark_safe

View File

@ -19,7 +19,6 @@ import copy
import re
import time
from django.utils import six
from django.utils.encoding import force_bytes, force_text
from quixote import get_session, get_publisher
import quixote.http_request

View File

@ -17,10 +17,10 @@
import base64
import hashlib
import sys
import urllib.parse
import uuid
from django.utils.encoding import force_bytes
from django.utils.six.moves.urllib import parse as urllib
from quixote import redirect, get_session, get_publisher, get_request, get_session_manager
from quixote.directory import Directory
@ -313,7 +313,7 @@ class FCAuthMethod(AuthMethod):
nonce = hashlib.sha256(force_bytes(session.id)).hexdigest()
fc_callback = pub.get_frontoffice_url() + '/ident/fc/callback'
qs = urllib.urlencode(
qs = urllib.parse.urlencode(
{
'response_type': 'code',
'client_id': client_id,
@ -345,7 +345,7 @@ class FCAuthMethod(AuthMethod):
}
response, status, data, auth_header = http_post_request(
self.get_token_url(),
urllib.urlencode(body),
urllib.parse.urlencode(body),
headers={
'Content-Type': 'application/x-www-form-urlencoded',
},
@ -519,7 +519,7 @@ class FCAuthMethod(AuthMethod):
get_session_manager().expire_session()
logout_url = self.get_logout_url()
post_logout_redirect_uri = get_publisher().get_frontoffice_url()
logout_url += '?' + urllib.urlencode(
logout_url += '?' + urllib.parse.urlencode(
{
'id_token_hint': id_token,
'post_logout_redirect_uri': post_logout_redirect_uri,

View File

@ -15,6 +15,7 @@
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import re
import urllib.parse
try:
import lasso
@ -22,8 +23,6 @@ except ImportError:
lasso = None
from django.utils.encoding import force_bytes, force_text
from django.utils.six.moves.urllib import parse as urllib
from django.utils.six.moves.urllib import parse as urlparse
from quixote.directory import Directory
from quixote import redirect, get_session, get_response, get_publisher
@ -159,7 +158,7 @@ class MethodDirectory(Directory):
login_url = get_publisher().get_root_url() + 'login/idp/'
else:
login_url = get_publisher().get_root_url() + 'login/'
login_url += '?' + urllib.urlencode({'next': get_request().get_frontoffice_url()})
login_url += '?' + urllib.parse.urlencode({'next': get_request().get_frontoffice_url()})
return redirect(login_url)
if get_request().user:
@ -1066,7 +1065,7 @@ class MethodAdminDirectory(Directory):
new_domain = None
if old_common_domain_getter_url:
old_domain = urlparse.urlparse(old_common_domain_getter_url)[1]
old_domain = urllib.parse.urlparse(old_common_domain_getter_url)[1]
if ':' in old_domain:
old_domain = old_domain.split(':')[0]
old_domain_dir = os.path.normpath(os.path.join(dir, '..', old_domain))
@ -1077,7 +1076,7 @@ class MethodAdminDirectory(Directory):
# bad luck, but ignore this
pass
if new_common_domain_getter_url:
new_domain = urlparse.urlparse(new_common_domain_getter_url)[1]
new_domain = urllib.parse.urlparse(new_common_domain_getter_url)[1]
if ':' in new_domain:
new_domain = new_domain.split(':')[0]
new_domain_dir = os.path.normpath(os.path.join(dir, '..', new_domain))

View File

@ -17,8 +17,6 @@
import logging
import os
from django.utils import six
from quixote import get_publisher, get_session, get_request
from quixote.logger import DefaultLogger
@ -78,7 +76,7 @@ class Formatter(logging.Formatter):
user = request.user
if user:
if isinstance(user, six.string_types + six.integer_types):
if isinstance(user, (str, int)):
user_id = user
else:
user_id = user.id

View File

@ -18,9 +18,11 @@ import datetime
import decimal
import calendar
import html
import io
import re
import os
import time
import urllib.parse
import base64
import json
import subprocess
@ -37,13 +39,10 @@ except ImportError:
from django.conf import settings
from django.utils import datetime_safe
from django.utils import six
from django.utils.encoding import force_text
from django.utils.formats import localize
from django.utils.html import strip_tags
from django.template import TemplateSyntaxError, VariableDoesNotExist
from django.utils.six.moves.urllib.parse import quote, urlencode
from django.utils.six.moves.urllib import parse as urlparse
from django.utils.text import Truncator
from quixote import get_publisher, get_response, get_request, redirect
@ -54,8 +53,6 @@ from . import get_cfg, get_logger, ezt
from .errors import ConnectionError, RequestError
from .template import Template
from django.utils.six import BytesIO, StringIO
try:
subprocess.check_call(['which', 'pdftoppm'], stdout=subprocess.DEVNULL)
HAS_PDFTOPPM = True
@ -167,7 +164,7 @@ def get_provider_key(provider_id):
def simplify(s, space='-'):
if s is None:
return ''
if not isinstance(s, six.text_type):
if not isinstance(s, str):
if get_publisher() and get_publisher().site_charset:
s = force_text('%s' % s, get_publisher().site_charset, errors='ignore')
else:
@ -269,7 +266,7 @@ def site_encode(s):
return None
if isinstance(s, str):
return s
if not isinstance(s, six.string_types):
if not isinstance(s, str):
s = force_text(s)
return s.encode(get_publisher().site_charset)
@ -346,7 +343,7 @@ def _http_request(
):
get_publisher().reload_cfg()
splitted_url = urlparse.urlsplit(url)
splitted_url = urllib.parse.urlsplit(url)
if splitted_url.scheme not in ('http', 'https'):
raise ConnectionError('invalid scheme in URL %s' % url)
@ -391,7 +388,7 @@ def urlopen(url, data=None):
response, status, data, auth_header = _http_request(
url, 'GET' if data is None else 'POST', body=data, raise_on_http_errors=True
)
return BytesIO(data)
return io.BytesIO(data)
def http_get_page(url, **kwargs):
@ -418,14 +415,14 @@ def get_variadic_url(url, variables, encode_query=True):
if '{{' in url or '{%' in url:
try:
url = Template(url).render(variables)
p = urlparse.urlsplit(url)
p = urllib.parse.urlsplit(url)
scheme, netloc, path, query, fragment = (p.scheme, p.netloc, p.path, p.query, p.fragment)
if path.startswith('//'):
# don't let double slash happen at the root of the URL, this
# happens when a template such as {{url}}/path is used (with
# {{url}} already ending with a slash).
path = path[1:]
return urlparse.urlunsplit((scheme, netloc, path, query, fragment))
return urllib.parse.urlunsplit((scheme, netloc, path, query, fragment))
except (TemplateSyntaxError, VariableDoesNotExist):
return url
@ -433,16 +430,16 @@ def get_variadic_url(url, variables, encode_query=True):
def ezt_substitute(template, variables):
tmpl = ezt.Template()
tmpl.parse(template)
fd = StringIO()
fd = io.StringIO()
tmpl.generate(fd, variables)
return fd.getvalue()
def partial_quote(string):
# unquote brackets, as there may be further processing that needs them
# intact.
return quote(string).replace('%5B', '[').replace('%5D', ']')
return urllib.parse.quote(string).replace('%5B', '[').replace('%5D', ']')
p = urlparse.urlsplit(url)
p = urllib.parse.urlsplit(url)
scheme, netloc, path, query, fragment = p.scheme, p.netloc, p.path, p.query, p.fragment
if netloc and '[' in netloc:
netloc = ezt_substitute(netloc, variables)
@ -456,7 +453,7 @@ def get_variadic_url(url, variables, encode_query=True):
# there were no / in the original path (the two / comes from
# the scheme/netloc separation, this means there is no path)
before_path = ezt_substitute(path, variables)
p2 = urlparse.urlsplit(before_path)
p2 = urllib.parse.urlsplit(before_path)
scheme, netloc, path = p2.scheme, p2.netloc, p2.path
else:
# there is a path, we need to get back to the original URL and
@ -467,7 +464,7 @@ def get_variadic_url(url, variables, encode_query=True):
else:
before_path, path = path, ''
before_path = ezt_substitute(before_path, variables)
p2 = urlparse.urlsplit(before_path)
p2 = urllib.parse.urlsplit(before_path)
scheme, netloc = p2.scheme, p2.netloc
if p2.path:
if not path:
@ -487,7 +484,7 @@ def get_variadic_url(url, variables, encode_query=True):
if fragment and '[' in fragment:
fragment = partial_quote(ezt_substitute(fragment, variables))
if query and '[' in query:
p_qs = urlparse.parse_qsl(query)
p_qs = urllib.parse.parse_qsl(query)
if len(p_qs) == 0:
# this happened because the query string has no key/values,
# probably because it's a single substitution variable (ex:
@ -502,10 +499,10 @@ def get_variadic_url(url, variables, encode_query=True):
v = ezt_substitute(v, variables)
query.append((k, v))
if encode_query:
query = urlencode(query)
query = urllib.parse.urlencode(query)
else:
query = '&'.join('%s=%s' % (k, v) for (k, v) in query)
return urlparse.urlunsplit((scheme, netloc, path, query, fragment))
return urllib.parse.urlunsplit((scheme, netloc, path, query, fragment))
def get_foreground_colour(background_colour):
@ -662,7 +659,7 @@ def get_thumbnail(filepath, content_type=None):
if content_type == 'application/pdf':
try:
fp = BytesIO(
fp = io.BytesIO(
subprocess.check_output(
['pdftoppm', '-png', '-scale-to-x', '500', '-scale-to-y', '-1', filepath]
)
@ -705,7 +702,7 @@ def get_thumbnail(filepath, content_type=None):
# * File "PIL/PngImagePlugin.py", line 119, in read
# * raise SyntaxError("broken PNG file (chunk %s)" % repr(cid))
raise IOError
image_thumb_fp = BytesIO()
image_thumb_fp = io.BytesIO()
image.save(image_thumb_fp, "PNG")
except IOError:
# failed to create thumbnail.

View File

@ -18,14 +18,15 @@ from __future__ import print_function
import codecs
import collections
from django.utils.six.moves import builtins
from django.utils.six.moves import configparser as ConfigParser
from django.utils.six.moves import cPickle
from django.utils.six.moves.urllib import parse as urllib
import builtins
import configparser
import pickle
import urllib.parse
import datetime
from decimal import Decimal
import importlib
import inspect
import io
import os
import fcntl
import hashlib
@ -43,10 +44,8 @@ import xml.etree.ElementTree as ET
from django.conf import settings
from django.http import Http404
from django.utils import six
from django.utils import translation
from django.utils.encoding import force_text, force_bytes
from django.utils.six import StringIO
from quixote.publish import Publisher, get_request, get_response, get_publisher, redirect
from .http_request import HTTPRequest
@ -128,7 +127,7 @@ class QommonPublisher(Publisher, object):
return '%s://%s%s' % (
req.get_scheme(),
req.get_server(),
urllib.quote(req.environ.get('SCRIPT_NAME')),
urllib.parse.quote(req.environ.get('SCRIPT_NAME')),
)
return 'https://%s' % os.path.basename(get_publisher().app_dir)
@ -141,7 +140,7 @@ class QommonPublisher(Publisher, object):
return '%s://%s%s/backoffice' % (
req.get_scheme(),
req.get_server(),
urllib.quote(req.environ.get('SCRIPT_NAME')),
urllib.parse.quote(req.environ.get('SCRIPT_NAME')),
)
return 'https://%s/backoffice' % os.path.basename(self.app_dir)
@ -206,7 +205,7 @@ class QommonPublisher(Publisher, object):
self, request, original_response, exc_type, exc_value, tb
)
error_file = StringIO()
error_file = io.StringIO()
if limit is None:
if hasattr(sys, 'tracebacklimit'):
@ -356,7 +355,7 @@ class QommonPublisher(Publisher, object):
self.ngettext = translation.ngettext
def load_site_options(self):
self.site_options = ConfigParser.ConfigParser()
self.site_options = configparser.ConfigParser()
site_options_filename = os.path.join(self.app_dir, 'site-options.cfg')
if not os.path.exists(site_options_filename):
return
@ -378,9 +377,9 @@ class QommonPublisher(Publisher, object):
self.load_site_options()
try:
return self.site_options.get('options', option) == 'true'
except ConfigParser.NoSectionError:
except configparser.NoSectionError:
return defaults.get(option, False)
except ConfigParser.NoOptionError:
except configparser.NoOptionError:
return defaults.get(option, False)
def get_site_option(self, option, section='options'):
@ -394,9 +393,9 @@ class QommonPublisher(Publisher, object):
self.load_site_options()
try:
return self.site_options.get(section, option)
except ConfigParser.NoSectionError:
except configparser.NoSectionError:
return defaults.get(section, {}).get(option)
except ConfigParser.NoOptionError:
except configparser.NoOptionError:
return defaults.get(section, {}).get(option)
def get_site_storages(self):
@ -712,7 +711,7 @@ class QommonPublisher(Publisher, object):
cfg = None
def write_cfg(self):
s = cPickle.dumps(self.cfg, protocol=2)
s = pickle.dumps(self.cfg, protocol=2)
filename = os.path.join(self.app_dir, 'config.pck')
storage.atomic_write(filename, s)
@ -720,7 +719,7 @@ class QommonPublisher(Publisher, object):
filename = os.path.join(self.app_dir, 'config.pck')
try:
with open(filename, 'rb') as fd:
self.cfg = cPickle.load(fd, encoding='utf-8')
self.cfg = pickle.load(fd, encoding='utf-8')
except:
self.cfg = {}
@ -908,26 +907,26 @@ class QommonPublisher(Publisher, object):
self.load_site_options()
try:
d.update(dict(self.site_options.items('variables', raw=True)))
except ConfigParser.NoSectionError:
except configparser.NoSectionError:
pass
d['manager_homepage_url'] = d.get('portal_agent_url')
d['manager_homepage_title'] = d.get('portal_agent_title')
return d
def is_relatable_url(self, url):
parsed_url = urllib.urlparse(url)
parsed_url = urllib.parse.urlparse(url)
if not parsed_url.netloc:
return True
if parsed_url.netloc == urllib.urlparse(self.get_frontoffice_url()).netloc:
if parsed_url.netloc == urllib.parse.urlparse(self.get_frontoffice_url()).netloc:
return True
if parsed_url.netloc == urllib.urlparse(self.get_backoffice_url()).netloc:
if parsed_url.netloc == urllib.parse.urlparse(self.get_backoffice_url()).netloc:
return True
if parsed_url.netloc in [x.strip() for x in self.get_site_option('relatable-hosts').split(',')]:
return True
try:
if parsed_url.netloc in self.site_options.options('api-secrets'):
return True
except ConfigParser.NoSectionError:
except configparser.NoSectionError:
pass
return False

View File

@ -16,6 +16,7 @@
import os
import time
import urllib.parse
import sys
from xml.sax.saxutils import escape
@ -25,7 +26,6 @@ except ImportError:
lasso = None
from django.utils.encoding import force_text
from django.utils.six.moves.urllib import parse as urlparse
from quixote import get_request, get_response, redirect, get_field, get_publisher
from quixote.http_request import parse_header
@ -186,11 +186,11 @@ class Saml2Directory(Directory):
login.msgRelayState = get_request().form.get('next')
next_url = login.msgRelayState or get_publisher().get_frontoffice_url()
parsed_url = urlparse.urlparse(next_url)
parsed_url = urllib.parse.urlparse(next_url)
request = get_request()
scheme = parsed_url.scheme or request.get_scheme()
netloc = parsed_url.netloc or request.get_server()
next_url = urlparse.urlunsplit(
next_url = urllib.parse.urlunsplit(
(scheme, netloc, parsed_url.path, parsed_url.query, parsed_url.fragment)
)
samlp_extensions = '''<samlp:Extensions
@ -372,10 +372,10 @@ class Saml2Directory(Directory):
if relay_state == 'backoffice':
after_url = get_publisher().get_backoffice_url()
elif relay_state:
parsed_url = urlparse.urlparse(relay_state)
parsed_url = urllib.parse.urlparse(relay_state)
scheme = parsed_url.scheme or request.get_scheme()
netloc = parsed_url.netloc or request.get_server()
after_url = urlparse.urlunsplit(
after_url = urllib.parse.urlunsplit(
(scheme, netloc, parsed_url.path, parsed_url.query, parsed_url.fragment)
)
if not (

View File

@ -15,8 +15,7 @@
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import re
from django.utils.six.moves.urllib import parse as urllib
import urllib.parse
from . import N_, errors, misc
from . import get_cfg, get_logger

View File

@ -14,6 +14,7 @@
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import builtins
import copyreg
import errno
import operator
@ -24,11 +25,9 @@ import os.path
import shutil
import sys
import tempfile
import _thread
from django.utils import six
from django.utils.encoding import force_bytes
from django.utils.six.moves import builtins
from django.utils.six.moves import _thread
from .vendor import locket
@ -150,7 +149,7 @@ class Criteria(object):
self.typed_none = ''
if isinstance(self.value, bool):
self.typed_none = False
elif isinstance(self.value, six.integer_types + (float,)):
elif isinstance(self.value, (int, float)):
self.typed_none = -sys.maxsize
elif isinstance(self.value, time.struct_time):
self.typed_none = time.gmtime(-(10 ** 10)) # 1653

View File

@ -14,6 +14,7 @@
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import io
import os
import glob
import re
@ -28,8 +29,6 @@ from django.template import (
from django.template.loader import render_to_string
from django.utils.encoding import force_text, smart_text
from django.utils.safestring import SafeString, SafeText
from django.utils import six
from django.utils.six import StringIO
from quixote import get_session, get_request, get_response, get_publisher
from quixote.directory import Directory
@ -112,7 +111,7 @@ def get_theme_dict(theme_xml):
desc = force_str(tree.findtext('desc') or '')
author = force_str(tree.findtext('author') or '')
icon = None
if isinstance(theme_xml, six.string_types):
if isinstance(theme_xml, str):
icon = os.path.join(os.path.dirname(theme_xml), 'icon.png')
if not os.path.exists(icon):
icon = None
@ -412,7 +411,7 @@ def decorate(body, response):
else:
template = default_template
fd = StringIO()
fd = io.StringIO()
vars = get_decorate_vars(body, response, generate_breadcrumb=generate_breadcrumb)
template.generate(fd, vars)
@ -527,7 +526,7 @@ class Template(object):
return re.sub(r'[\uE000-\uF8FF]', '', rendered)
def ezt_render(self, context={}):
fd = StringIO()
fd = io.StringIO()
try:
self.template.generate(fd, context)
except ezt.EZTException as e:

View File

@ -36,7 +36,6 @@ except ImportError:
from django import template
from django.template import defaultfilters
from django.utils import dateparse
from django.utils import six
from django.utils.encoding import force_bytes, force_text
from django.utils.safestring import mark_safe
from django.utils.timezone import is_naive, make_aware
@ -222,7 +221,7 @@ def parse_decimal(value, do_raise=False):
# treat all booleans as 0 (contrary to Python behaviour where
# decimal(True) == 1).
value = 0
if isinstance(value, six.string_types):
if isinstance(value, str):
# replace , by . for French users comfort
value = value.replace(',', '.')
try:
@ -505,7 +504,7 @@ def divide(term1, term2):
def sum_(list_):
if hasattr(list_, 'get_value'):
list_ = list_.get_value() # unlazy
if isinstance(list_, six.string_types):
if isinstance(list_, str):
# do not consider string as iterable, to avoid misusage
return ''
try:
@ -581,7 +580,7 @@ def get_latlon(obj):
return float(obj['lat']), float(obj['lng'])
except (TypeError, ValueError):
pass
if isinstance(obj, six.string_types) and ';' in obj:
if isinstance(obj, str) and ';' in obj:
try:
return float(obj.split(';')[0]), float(obj.split(';')[1])
except ValueError:
@ -732,7 +731,7 @@ def phonenumber_fr(value, separator=' '):
if hasattr(value, 'get_value'):
value = value.get_value() # unlazy
if not value or not isinstance(value, six.string_types):
if not value or not isinstance(value, str):
return value
number = value.strip()
if not number:
@ -767,7 +766,7 @@ def phonenumber_fr(value, separator=' '):
def is_empty(value):
from wcs.variables import LazyFormDefObjectsManager, LazyList
if isinstance(value, (six.string_types, list, dict)):
if isinstance(value, (str, list, dict)):
return not value
if isinstance(value, (LazyFormDefObjectsManager, LazyList)):
return not list(value)

View File

@ -15,6 +15,7 @@
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import base64
import io
import os
from quixote import get_publisher
@ -22,7 +23,6 @@ from quixote.http_request import Upload
from django.utils.encoding import force_text
from django.utils.module_loading import import_string
from django.utils.six import BytesIO
from .errors import ConnectionError
from .misc import json_loads, file_digest, can_thumbnail
@ -51,7 +51,7 @@ class PicklableUpload(Upload):
self.__dict__.update(dict)
if hasattr(self, 'data'):
# backward compatibility with older w.c.s. version
self.fp = BytesIO(self.data)
self.fp = io.BytesIO(self.data)
del self.data
def get_file(self):

View File

@ -21,7 +21,6 @@ import os
import subprocess
import stat
from django.utils import six
from django.utils.encoding import force_text
_openssl = 'openssl'
@ -29,7 +28,7 @@ _openssl = 'openssl'
def decapsulate_pem_file(file_or_string):
'''Remove PEM header lines'''
if not isinstance(file_or_string, six.string_types):
if not isinstance(file_or_string, str):
content = file_or_string.read()
else:
content = file_or_string

View File

@ -18,8 +18,7 @@ from importlib import import_module
import json
import os
import re
from django.utils.six.moves.urllib import parse as urllib
import urllib.parse
from quixote import get_publisher, get_response, get_session, redirect, get_session_manager, get_request
from quixote.directory import Directory
@ -113,7 +112,7 @@ class LoginDirectory(Directory):
if get_publisher().ident_methods.get(method)().is_interactive():
login_url = '../ident/%s/login' % method
if get_request().form.get('next'):
login_url += '?' + urllib.urlencode({'next': get_request().form.get('next')})
login_url += '?' + urllib.parse.urlencode({'next': get_request().form.get('next')})
return redirect(login_url)
else:
try:

View File

@ -15,6 +15,7 @@
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import copy
import io
import psycopg2
import psycopg2.extensions
import psycopg2.extras
@ -28,9 +29,7 @@ try:
except ImportError:
import pickle
from django.utils import six
from django.utils.encoding import force_bytes, force_text
from django.utils.six import BytesIO
from quixote import get_publisher
from . import qommon
@ -84,7 +83,7 @@ SQL_TYPE_MAPPING = {
def pickle_loads(value):
if hasattr(value, 'tobytes'):
value = value.tobytes()
obj = UnpicklerClass(BytesIO(force_bytes(value)), **PICKLE_KWARGS).load()
obj = UnpicklerClass(io.BytesIO(force_bytes(value)), **PICKLE_KWARGS).load()
obj = deep_bytes2str(obj)
return obj
@ -1613,7 +1612,7 @@ class SqlMixin(object):
# turn {'poire': 2, 'abricot': 1, 'pomme': 3} into an array
value = [[force_str(x), force_str(y)] for x, y in value.items()]
elif sql_type == 'varchar':
assert isinstance(value, six.string_types)
assert isinstance(value, str)
elif sql_type == 'date':
assert type(value) is time.struct_time
value = datetime.datetime(value.tm_year, value.tm_mon, value.tm_mday)
@ -2012,7 +2011,7 @@ class SqlDataMixin(SqlMixin):
elif field.key in ('item', 'items'):
value = self.data.get('%s_display' % field.id)
if value:
if isinstance(value, six.string_types):
if isinstance(value, str):
fts_strings.append(value)
elif type(value) in (tuple, list):
fts_strings.extend(value)
@ -2322,7 +2321,7 @@ class SqlUser(SqlMixin, wcs.users.User):
elif field.key in ('item', 'items'):
value = self.form_data.get('%s_display' % field.id)
if value:
if isinstance(value, six.string_types):
if isinstance(value, str):
fts_strings.append(value)
elif type(value) in (tuple, list):
fts_strings.extend(value)

View File

@ -14,9 +14,9 @@
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import urllib.parse
import xml.etree.ElementTree as ET
from django.utils.six.moves.urllib import parse as urllib
from quixote import redirect
from ..qommon import _, N_
@ -39,7 +39,7 @@ def lookup_wf_attachment(self, filename):
else:
file_reference = None
filenames = [filename, urllib.unquote(filename)]
filenames = [filename, urllib.parse.unquote(filename)]
for p in self.formdata.iter_evolution_parts():
if not isinstance(p, AttachmentEvolutionPart):
continue
@ -69,7 +69,7 @@ def form_attachment(self):
% (
self.filled.get_url(backoffice=is_in_backoffice),
fn,
urllib.quote(p.base_filename),
urllib.parse.quote(p.base_filename),
)
)

View File

@ -16,6 +16,7 @@
import base64
import collections
import io
from xml.etree import ElementTree as ET
import zipfile
import os
@ -25,9 +26,7 @@ import tempfile
import time
import shutil
from django.utils import six
from django.utils.encoding import force_bytes, force_text
from django.utils.six import BytesIO, StringIO
from quixote import get_response, get_request, get_publisher
from quixote.directory import Directory
@ -503,7 +502,7 @@ class ExportToModel(WorkflowStatusItem):
try:
# force ezt_only=True because an RTF file may contain {{ characters
# and would be seen as a Django template
return BytesIO(
return io.BytesIO(
force_bytes(
template_on_formdata(
formdata,
@ -629,7 +628,7 @@ class ExportToModel(WorkflowStatusItem):
node.append(child)
node.tail = current_tail
outstream = BytesIO()
outstream = io.BytesIO()
transform_opendocument(self.model_file.get_file(), outstream, process_root)
outstream.seek(0)
return outstream
@ -721,7 +720,7 @@ class ExportToModel(WorkflowStatusItem):
filename = 'export_to_model-%s-%s-%s.upload' % ids
upload = Upload(base_filename, content_type)
upload.fp = BytesIO()
upload.fp = io.BytesIO()
upload.fp.write(content)
upload.fp.seek(0)
self.model_file = UploadedFile('models', filename, upload)

View File

@ -16,6 +16,7 @@
import collections
import json
import urllib.parse
try:
from PIL import Image
@ -24,7 +25,6 @@ try:
except ImportError:
Image = None
from django.utils.six.moves.urllib import parse as urlparse
from quixote import get_publisher
from ..qommon import _, N_, force_str
@ -153,7 +153,7 @@ class GeolocateWorkflowStatusItem(WorkflowStatusItem):
url += '&'
else:
url += '?'
url += 'q=%s' % urlparse.quote(address)
url += 'q=%s' % urllib.parse.quote(address)
url += '&format=json'
url += '&accept-language=%s' % (get_publisher().get_site_language() or 'en')

View File

@ -19,8 +19,6 @@ import json
import os
import time
from django.utils import six
from quixote import get_publisher, get_request, redirect
from quixote.directory import Directory
@ -136,7 +134,7 @@ class JumpWorkflowStatusItem(WorkflowStatusJumpItem):
def migrate(self):
changed = super(JumpWorkflowStatusItem, self).migrate()
if isinstance(self.condition, six.string_types):
if isinstance(self.condition, str):
if self.condition:
self.condition = {'type': 'python', 'value': self.condition}
else:

View File

@ -18,9 +18,9 @@ import datetime
import json
import sys
import time
import urllib.parse
import xml.etree.ElementTree as ET
from django.utils.six.moves.urllib import parse as urlparse
from quixote import get_publisher, get_response, get_request
from ..qommon import _, N_
@ -37,7 +37,7 @@ def user_ws_url(user_uuid):
idps = get_cfg('idp', {})
entity_id = list(idps.values())[0]['metadata_url']
base_url = entity_id.split('idp/saml2/metadata')[0]
url = urlparse.urljoin(base_url, '/api/users/%s/' % user_uuid)
url = urllib.parse.urljoin(base_url, '/api/users/%s/' % user_uuid)
secret, orig = get_secret_and_orig(url)
url += '?orig=%s' % orig
return sign_url(url, secret)

View File

@ -15,9 +15,7 @@
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import sys
from django.utils.six.moves.urllib import parse as urllib
from django.utils.six.moves.urllib import parse as urlparse
import urllib.parse
from quixote import get_request, get_publisher, get_response
@ -35,8 +33,8 @@ def roles_ws_url(role_uuid, user_uuid):
idps = get_cfg('idp', {})
entity_id = list(idps.values())[0]['metadata_url']
base_url = entity_id.split('idp/saml2/metadata')[0]
url = urlparse.urljoin(
base_url, '/api/roles/%s/members/%s/' % (urllib.quote(role_uuid), urllib.quote(user_uuid))
url = urllib.parse.urljoin(
base_url, '/api/roles/%s/members/%s/' % (urllib.parse.quote(role_uuid), urllib.parse.quote(user_uuid))
)
return url

View File

@ -15,15 +15,14 @@
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import datetime
import io
import sys
import traceback
import xml.etree.ElementTree as ET
import collections
import mimetypes
from django.utils import six
from django.utils.encoding import force_text
from django.utils.six import BytesIO
from quixote.html import TemplateIO, htmltext
@ -463,7 +462,7 @@ class WebserviceCallStatusItem(WorkflowStatusItem):
filename, content_type = self.get_attachment_data(response)
workflow_data['%s_content_type' % self.varname] = content_type
workflow_data['%s_length' % self.varname] = len(data)
fp_content = BytesIO(data)
fp_content = io.BytesIO(data)
attachment = AttachmentEvolutionPart(
filename, fp_content, content_type=content_type, varname=self.varname
)
@ -562,11 +561,11 @@ class WebserviceCallStatusItem(WorkflowStatusItem):
el = ET.SubElement(xml_item, attribute)
for (key, value) in getattr(self, attribute).items():
item = ET.SubElement(el, 'item')
if isinstance(key, six.string_types):
if isinstance(key, str):
ET.SubElement(item, 'name').text = force_text(key)
else:
raise AssertionError('unknown type for key (%r)' % key)
if isinstance(value, six.string_types):
if isinstance(value, str):
ET.SubElement(item, 'value').text = force_text(value)
else:
raise AssertionError('unknown type for value (%r)' % key)

View File

@ -25,7 +25,6 @@ import sys
import time
import uuid
from django.utils import six
from django.utils.encoding import force_text
from quixote import get_request, get_response, redirect
@ -938,7 +937,7 @@ class XmlSerialisable(object):
atname = 'item'
for v in val:
ET.SubElement(el, atname).text = force_text(str(v), charset, errors='replace')
elif isinstance(val, six.string_types):
elif isinstance(val, str):
el.text = force_text(val, charset, errors='replace')
else:
el.text = str(val)
@ -970,9 +969,7 @@ class XmlSerialisable(object):
else:
if el.text is None:
setattr(self, attribute, None)
elif el.text in ('False', 'True') and not isinstance(
getattr(self, attribute), six.string_types
):
elif el.text in ('False', 'True') and not isinstance(getattr(self, attribute), str):
# booleans
setattr(self, attribute, el.text == 'True')
elif type(getattr(self, attribute)) is int:
@ -1311,7 +1308,7 @@ class WorkflowGlobalActionTimeoutTrigger(WorkflowGlobalActionTrigger):
)
elif isinstance(anchor_date, time.struct_time):
anchor_date = datetime.datetime.fromtimestamp(time.mktime(anchor_date))
elif isinstance(anchor_date, six.string_types) and anchor_date:
elif isinstance(anchor_date, str) and anchor_date:
try:
anchor_date = get_as_datetime(anchor_date)
except ValueError:
@ -2076,7 +2073,7 @@ class WorkflowStatusItem(XmlSerialisable):
formdata=None,
status_item=None,
):
if not isinstance(var, six.string_types):
if not isinstance(var, str):
return var
expression = cls.get_expression(var)
@ -2919,7 +2916,7 @@ class SendmailWorkflowStatusItem(WorkflowStatusItem):
# this works around the fact that parametric workflows only support
# string values, so if we get set a string, we convert it here to an
# array.
if isinstance(self.to, six.string_types):
if isinstance(self.to, str):
self.to = [self.to]
addresses = []
@ -2935,7 +2932,7 @@ class SendmailWorkflowStatusItem(WorkflowStatusItem):
if isinstance(dest, list):
addresses.extend(dest)
continue
elif isinstance(dest, six.string_types) and ',' in dest:
elif isinstance(dest, str) and ',' in dest:
# if the email contains a comma consider it as a serie of
# emails
addresses.extend([x.strip() for x in dest.split(',')])

View File

@ -17,11 +17,10 @@
import collections
import json
import sys
import urllib.parse
import xml.etree.ElementTree as ET
from django.utils.encoding import force_text
from django.utils.six.moves.urllib import parse as urllib
from django.utils.six.moves.urllib import parse as urlparse
from quixote import get_publisher, get_request
@ -94,8 +93,8 @@ def call_webservice(
if qs_data: # merge qs_data into url
publisher = get_publisher()
parsed = urlparse.urlparse(url)
qs = list(urlparse.parse_qsl(parsed.query))
parsed = urllib.parse.urlparse(url)
qs = list(urllib.parse.parse_qsl(parsed.query))
for key, value in qs_data.items():
try:
value = WorkflowStatusItem.compute(value, raises=True)
@ -106,8 +105,8 @@ def call_webservice(
key = force_str(key)
value = force_str(value)
qs.append((key, value))
qs = urllib.urlencode(qs)
url = urlparse.urlunparse(parsed[:4] + (qs,) + parsed[5:6])
qs = urllib.parse.urlencode(qs)
url = urllib.parse.urlunparse(parsed[:4] + (qs,) + parsed[5:6])
unsigned_url = url