misc: remove usage of "six" module (#51517)
This commit is contained in:
parent
0d53aa7cfa
commit
e7292f6f3f
|
@ -1,5 +1,6 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
import io
|
||||
import json
|
||||
import mock
|
||||
import os
|
||||
|
@ -10,8 +11,6 @@ try:
|
|||
except ImportError:
|
||||
lasso = None
|
||||
|
||||
from django.utils.six import StringIO
|
||||
|
||||
import pytest
|
||||
from webtest import Upload
|
||||
|
||||
|
@ -502,7 +501,7 @@ def test_data_sources_export(pub):
|
|||
resp = resp.click(href='export')
|
||||
xml_export = resp.text
|
||||
|
||||
ds = StringIO(xml_export)
|
||||
ds = io.StringIO(xml_export)
|
||||
data_source2 = NamedDataSource.import_from_xml(ds)
|
||||
assert data_source2.name == 'foobar'
|
||||
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
import datetime
|
||||
import io
|
||||
import mock
|
||||
import os
|
||||
import re
|
||||
|
@ -11,8 +12,6 @@ import xml.etree.ElementTree as ET
|
|||
import pytest
|
||||
from webtest import Upload
|
||||
|
||||
from django.utils.six import StringIO, BytesIO
|
||||
|
||||
from wcs.qommon.http_request import HTTPRequest
|
||||
from wcs.qommon.errors import ConnectionError
|
||||
from wcs.categories import Category
|
||||
|
@ -962,7 +961,7 @@ def test_form_export(pub):
|
|||
resp = resp.click(href='export')
|
||||
xml_export = resp.text
|
||||
|
||||
fd = StringIO(xml_export)
|
||||
fd = io.StringIO(xml_export)
|
||||
formdef2 = FormDef.import_from_xml(fd)
|
||||
assert formdef2.name == 'form title'
|
||||
|
||||
|
@ -1064,7 +1063,7 @@ def test_form_import_from_url(pub):
|
|||
resp.form['url'] = 'http://remote.invalid/test.wcs'
|
||||
resp = resp.form.submit()
|
||||
assert 'Error loading form' in resp
|
||||
urlopen.side_effect = lambda *args: StringIO(formdef_xml.decode())
|
||||
urlopen.side_effect = lambda *args: io.StringIO(formdef_xml.decode())
|
||||
resp.form['url'] = 'http://remote.invalid/test.wcs'
|
||||
resp = resp.form.submit()
|
||||
assert FormDef.count() == 1
|
||||
|
@ -2368,7 +2367,7 @@ def test_form_archive(pub):
|
|||
resp = resp.click(href='archive')
|
||||
resp = resp.form.submit('submit')
|
||||
assert resp.content_type == 'application/x-wcs-archive'
|
||||
tf = tarfile.open(fileobj=BytesIO(resp.body))
|
||||
tf = tarfile.open(fileobj=io.BytesIO(resp.body))
|
||||
assert 'formdef' in [x.name for x in tf.getmembers()]
|
||||
assert len(tf.getmembers()) == 8 # 7 formdata + 1 formdef
|
||||
|
||||
|
@ -2377,7 +2376,7 @@ def test_form_archive(pub):
|
|||
resp = resp.click(href='archive')
|
||||
resp = resp.form.submit('submit')
|
||||
assert resp.content_type == 'application/x-wcs-archive'
|
||||
tf = tarfile.open(fileobj=BytesIO(resp.body))
|
||||
tf = tarfile.open(fileobj=io.BytesIO(resp.body))
|
||||
assert 'formdef' in [x.name for x in tf.getmembers()]
|
||||
assert len(tf.getmembers()) == 1 # 0 formdata + 1 formdef
|
||||
|
||||
|
|
|
@ -1,7 +1,9 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
import io
|
||||
import logging
|
||||
import os
|
||||
import urllib.parse
|
||||
import zipfile
|
||||
|
||||
try:
|
||||
|
@ -13,9 +15,6 @@ import pytest
|
|||
from webtest import Upload
|
||||
import mock
|
||||
|
||||
from django.utils.six import BytesIO
|
||||
from django.utils.six.moves.urllib import parse as urlparse
|
||||
|
||||
from quixote.http_request import Upload as QuixoteUpload
|
||||
|
||||
from wcs.qommon.form import UploadedFile
|
||||
|
@ -113,11 +112,11 @@ def test_settings_export_import(pub):
|
|||
resp = app.get('/backoffice/settings/export')
|
||||
resp = resp.form.submit('submit')
|
||||
assert resp.location.startswith('http://example.net/backoffice/processing?job=')
|
||||
job_id = urlparse.parse_qs(urlparse.urlparse(resp.location).query)['job'][0]
|
||||
job_id = urllib.parse.parse_qs(urllib.parse.urlparse(resp.location).query)['job'][0]
|
||||
resp = resp.follow()
|
||||
assert 'completed' in resp.text
|
||||
resp = resp.click('Download Export')
|
||||
zip_content = BytesIO(resp.body)
|
||||
zip_content = io.BytesIO(resp.body)
|
||||
zipf = zipfile.ZipFile(zip_content, 'a')
|
||||
filelist = zipf.namelist()
|
||||
assert len(filelist) == 0
|
||||
|
@ -147,7 +146,7 @@ def test_settings_export_import(pub):
|
|||
export_to.label = 'test'
|
||||
upload = QuixoteUpload('/foo/bar', content_type='application/vnd.oasis.opendocument.text')
|
||||
file_content = b'''PK\x03\x04\x14\x00\x00\x08\x00\x00\'l\x8eG^\xc62\x0c\'\x00'''
|
||||
upload.fp = BytesIO()
|
||||
upload.fp = io.BytesIO()
|
||||
upload.fp.write(file_content)
|
||||
upload.fp.seek(0)
|
||||
export_to.model_file = UploadedFile('models', 'export_to_model-1.upload', upload)
|
||||
|
@ -164,10 +163,10 @@ def test_settings_export_import(pub):
|
|||
resp = app.get('/backoffice/settings/export')
|
||||
resp = resp.form.submit('submit')
|
||||
assert resp.location.startswith('http://example.net/backoffice/processing?job=')
|
||||
job_id = urlparse.parse_qs(urlparse.urlparse(resp.location).query)['job'][0]
|
||||
job_id = urllib.parse.parse_qs(urllib.parse.urlparse(resp.location).query)['job'][0]
|
||||
resp = resp.follow()
|
||||
resp = resp.click('Download Export')
|
||||
zip_content = BytesIO(resp.body)
|
||||
zip_content = io.BytesIO(resp.body)
|
||||
zipf = zipfile.ZipFile(zip_content, 'a')
|
||||
filelist = zipf.namelist()
|
||||
assert 'formdefs/1' not in filelist
|
||||
|
@ -243,10 +242,10 @@ def test_settings_export_import(pub):
|
|||
resp.form['wscalls'] = False
|
||||
resp = resp.form.submit('submit')
|
||||
assert resp.location.startswith('http://example.net/backoffice/processing?job=')
|
||||
job_id = urlparse.parse_qs(urlparse.urlparse(resp.location).query)['job'][0]
|
||||
job_id = urllib.parse.parse_qs(urllib.parse.urlparse(resp.location).query)['job'][0]
|
||||
resp = resp.follow()
|
||||
resp = resp.click('Download Export')
|
||||
zip_content = BytesIO(resp.body)
|
||||
zip_content = io.BytesIO(resp.body)
|
||||
zipf = zipfile.ZipFile(zip_content, 'a')
|
||||
filelist = zipf.namelist()
|
||||
assert 'formdefs_xml/%s' % formdef.id in filelist
|
||||
|
@ -277,10 +276,10 @@ def test_settings_export_import(pub):
|
|||
resp = app.get('/backoffice/settings/export')
|
||||
resp = resp.form.submit('submit')
|
||||
assert resp.location.startswith('http://example.net/backoffice/processing?job=')
|
||||
job_id = urlparse.parse_qs(urlparse.urlparse(resp.location).query)['job'][0]
|
||||
job_id = urllib.parse.parse_qs(urllib.parse.urlparse(resp.location).query)['job'][0]
|
||||
resp = resp.follow()
|
||||
resp = resp.click('Download Export')
|
||||
zip_content = BytesIO(resp.body)
|
||||
zip_content = io.BytesIO(resp.body)
|
||||
zipf = zipfile.ZipFile(zip_content, 'a')
|
||||
filelist = zipf.namelist()
|
||||
assert len([x for x in filelist if 'roles/' in x]) == 0
|
||||
|
@ -810,7 +809,7 @@ def test_settings_theme_download_upload(pub):
|
|||
resp = app.get('/backoffice/settings/themes')
|
||||
resp = resp.click('download', index=0)
|
||||
assert resp.headers['content-type'] == 'application/zip'
|
||||
zip_content = BytesIO(resp.body)
|
||||
zip_content = io.BytesIO(resp.body)
|
||||
zipf = zipfile.ZipFile(zip_content, 'a')
|
||||
filelist = zipf.namelist()
|
||||
assert 'alto/icon.png' in filelist
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
from io import StringIO
|
||||
import io
|
||||
import mock
|
||||
import os
|
||||
import re
|
||||
|
@ -358,7 +358,7 @@ def test_workflows_import_from_url(pub):
|
|||
resp.form['url'] = 'http://remote.invalid/test.wcs'
|
||||
resp = resp.form.submit()
|
||||
assert 'Error loading form' in resp
|
||||
urlopen.side_effect = lambda *args: StringIO(wf_export.decode())
|
||||
urlopen.side_effect = lambda *args: io.StringIO(wf_export.decode())
|
||||
resp.form['url'] = 'http://remote.invalid/test.wcs'
|
||||
resp = resp.form.submit()
|
||||
assert Workflow.count() == 2
|
||||
|
|
|
@ -1,9 +1,8 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
import io
|
||||
import xml.etree.ElementTree as ET
|
||||
|
||||
from django.utils.six import StringIO
|
||||
|
||||
import pytest
|
||||
from webtest import Upload
|
||||
|
||||
|
@ -159,7 +158,7 @@ def test_wscalls_export(pub, wscall):
|
|||
resp = resp.click(href='export')
|
||||
xml_export = resp.text
|
||||
|
||||
ds = StringIO(xml_export)
|
||||
ds = io.StringIO(xml_export)
|
||||
wscall2 = NamedWsCall.import_from_xml(ds)
|
||||
assert wscall2.name == 'xxx'
|
||||
|
||||
|
|
|
@ -6,10 +6,10 @@ import hashlib
|
|||
import hmac
|
||||
import os
|
||||
import shutil
|
||||
import urllib.parse
|
||||
|
||||
import pytest
|
||||
from django.utils.encoding import force_bytes
|
||||
from django.utils.six.moves.urllib import parse as urllib
|
||||
from quixote import get_publisher
|
||||
from utilities import clean_temporary_pub, create_temporary_pub, get_app, login
|
||||
|
||||
|
@ -145,7 +145,7 @@ def test_get_user_from_api_query_string_error_invalid_signature(pub):
|
|||
|
||||
|
||||
def test_get_user_from_api_query_string_error_missing_timestamp(pub):
|
||||
signature = urllib.quote(
|
||||
signature = urllib.parse.quote(
|
||||
base64.b64encode(hmac.new(b'1234', b'format=json&orig=coucou&algo=sha1', hashlib.sha1).digest())
|
||||
)
|
||||
output = get_app(pub).get(
|
||||
|
@ -157,7 +157,9 @@ def test_get_user_from_api_query_string_error_missing_timestamp(pub):
|
|||
def test_get_user_from_api_query_string_error_missing_email(pub):
|
||||
timestamp = datetime.datetime.utcnow().isoformat()[:19] + 'Z'
|
||||
query = 'format=json&orig=coucou&algo=sha1×tamp=' + timestamp
|
||||
signature = urllib.quote(base64.b64encode(hmac.new(b'1234', force_bytes(query), hashlib.sha1).digest()))
|
||||
signature = urllib.parse.quote(
|
||||
base64.b64encode(hmac.new(b'1234', force_bytes(query), hashlib.sha1).digest())
|
||||
)
|
||||
output = get_app(pub).get('/api/user/?%s&signature=%s' % (query, signature), status=403)
|
||||
assert output.json['err_desc'] == 'no user specified'
|
||||
|
||||
|
@ -165,7 +167,9 @@ def test_get_user_from_api_query_string_error_missing_email(pub):
|
|||
def test_get_user_from_api_query_string_error_unknown_nameid(pub):
|
||||
timestamp = datetime.datetime.utcnow().isoformat()[:19] + 'Z'
|
||||
query = 'format=json&orig=coucou&algo=sha1&NameID=xxx×tamp=' + timestamp
|
||||
signature = urllib.quote(base64.b64encode(hmac.new(b'1234', force_bytes(query), hashlib.sha1).digest()))
|
||||
signature = urllib.parse.quote(
|
||||
base64.b64encode(hmac.new(b'1234', force_bytes(query), hashlib.sha1).digest())
|
||||
)
|
||||
output = get_app(pub).get('/api/user/?%s&signature=%s' % (query, signature), status=403)
|
||||
assert output.json['err_desc'] == 'unknown NameID'
|
||||
|
||||
|
@ -175,7 +179,9 @@ def test_get_user_from_api_query_string_error_missing_email_valid_endpoint(pub):
|
|||
# works fine without user.
|
||||
timestamp = datetime.datetime.utcnow().isoformat()[:19] + 'Z'
|
||||
query = 'format=json&orig=coucou&algo=sha1×tamp=' + timestamp
|
||||
signature = urllib.quote(base64.b64encode(hmac.new(b'1234', force_bytes(query), hashlib.sha1).digest()))
|
||||
signature = urllib.parse.quote(
|
||||
base64.b64encode(hmac.new(b'1234', force_bytes(query), hashlib.sha1).digest())
|
||||
)
|
||||
output = get_app(pub).get('/categories?%s&signature=%s' % (query, signature))
|
||||
assert output.json == {'data': []}
|
||||
output = get_app(pub).get('/json?%s&signature=%s' % (query, signature))
|
||||
|
@ -186,7 +192,9 @@ def test_get_user_from_api_query_string_error_unknown_nameid_valid_endpoint(pub)
|
|||
# check the categories and forms endpoints accept an unknown NameID
|
||||
timestamp = datetime.datetime.utcnow().isoformat()[:19] + 'Z'
|
||||
query = 'format=json&NameID=xxx&orig=coucou&algo=sha1×tamp=' + timestamp
|
||||
signature = urllib.quote(base64.b64encode(hmac.new(b'1234', force_bytes(query), hashlib.sha1).digest()))
|
||||
signature = urllib.parse.quote(
|
||||
base64.b64encode(hmac.new(b'1234', force_bytes(query), hashlib.sha1).digest())
|
||||
)
|
||||
output = get_app(pub).get('/categories?%s&signature=%s' % (query, signature))
|
||||
assert output.json == {'data': []}
|
||||
output = get_app(pub).get('/json?%s&signature=%s' % (query, signature))
|
||||
|
@ -197,11 +205,13 @@ def test_get_user_from_api_query_string_error_success_sha1(pub, local_user):
|
|||
timestamp = datetime.datetime.utcnow().isoformat()[:19] + 'Z'
|
||||
query = (
|
||||
'format=json&orig=coucou&algo=sha1&email='
|
||||
+ urllib.quote(local_user.email)
|
||||
+ urllib.parse.quote(local_user.email)
|
||||
+ '×tamp='
|
||||
+ timestamp
|
||||
)
|
||||
signature = urllib.quote(base64.b64encode(hmac.new(b'1234', force_bytes(query), hashlib.sha1).digest()))
|
||||
signature = urllib.parse.quote(
|
||||
base64.b64encode(hmac.new(b'1234', force_bytes(query), hashlib.sha1).digest())
|
||||
)
|
||||
output = get_app(pub).get('/api/user/?%s&signature=%s' % (query, signature))
|
||||
assert output.json['user_display_name'] == u'Jean Darmette'
|
||||
|
||||
|
@ -210,11 +220,13 @@ def test_get_user_from_api_query_string_error_invalid_signature_algo_mismatch(pu
|
|||
timestamp = datetime.datetime.utcnow().isoformat()[:19] + 'Z'
|
||||
query = (
|
||||
'format=json&orig=coucou&algo=sha256&email='
|
||||
+ urllib.quote(local_user.email)
|
||||
+ urllib.parse.quote(local_user.email)
|
||||
+ '×tamp='
|
||||
+ timestamp
|
||||
)
|
||||
signature = urllib.quote(base64.b64encode(hmac.new(b'1234', force_bytes(query), hashlib.sha1).digest()))
|
||||
signature = urllib.parse.quote(
|
||||
base64.b64encode(hmac.new(b'1234', force_bytes(query), hashlib.sha1).digest())
|
||||
)
|
||||
output = get_app(pub).get('/api/user/?%s&signature=%s' % (query, signature), status=403)
|
||||
assert output.json['err_desc'] == 'invalid signature'
|
||||
|
||||
|
@ -223,18 +235,21 @@ def test_get_user_from_api_query_string_error_success_sha256(pub, local_user):
|
|||
timestamp = datetime.datetime.utcnow().isoformat()[:19] + 'Z'
|
||||
query = (
|
||||
'format=json&orig=coucou&algo=sha256&email='
|
||||
+ urllib.quote(local_user.email)
|
||||
+ urllib.parse.quote(local_user.email)
|
||||
+ '×tamp='
|
||||
+ timestamp
|
||||
)
|
||||
signature = urllib.quote(base64.b64encode(hmac.new(b'1234', force_bytes(query), hashlib.sha256).digest()))
|
||||
signature = urllib.parse.quote(
|
||||
base64.b64encode(hmac.new(b'1234', force_bytes(query), hashlib.sha256).digest())
|
||||
)
|
||||
output = get_app(pub).get('/api/user/?%s&signature=%s' % (query, signature))
|
||||
assert output.json['user_display_name'] == u'Jean Darmette'
|
||||
|
||||
|
||||
def test_sign_url(pub, local_user):
|
||||
signed_url = sign_url(
|
||||
'http://example.net/api/user/?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email),
|
||||
'http://example.net/api/user/?format=json&orig=coucou&email=%s'
|
||||
% urllib.parse.quote(local_user.email),
|
||||
'1234',
|
||||
)
|
||||
url = signed_url[len('http://example.net') :]
|
||||
|
@ -245,7 +260,8 @@ def test_sign_url(pub, local_user):
|
|||
get_app(pub).get('%s&foo=bar' % url, status=403)
|
||||
|
||||
signed_url = sign_url(
|
||||
'http://example.net/api/user/?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email),
|
||||
'http://example.net/api/user/?format=json&orig=coucou&email=%s'
|
||||
% urllib.parse.quote(local_user.email),
|
||||
'12345',
|
||||
)
|
||||
url = signed_url[len('http://example.net') :]
|
||||
|
@ -259,7 +275,8 @@ def test_get_user(pub, local_user):
|
|||
local_user.roles = [role.id]
|
||||
local_user.store()
|
||||
signed_url = sign_url(
|
||||
'http://example.net/api/user/?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email),
|
||||
'http://example.net/api/user/?format=json&orig=coucou&email=%s'
|
||||
% urllib.parse.quote(local_user.email),
|
||||
'1234',
|
||||
)
|
||||
url = signed_url[len('http://example.net') :]
|
||||
|
@ -284,7 +301,7 @@ def test_api_access_from_xml_storable_object(pub, local_user, admin_user):
|
|||
local_user.store()
|
||||
signed_url = sign_url(
|
||||
'http://example.net/api/user/?format=json&orig=UNKNOWN_ACCESS&email=%s'
|
||||
% (urllib.quote(local_user.email)),
|
||||
% (urllib.parse.quote(local_user.email)),
|
||||
'5678',
|
||||
)
|
||||
url = signed_url[len('http://example.net') :]
|
||||
|
@ -292,7 +309,8 @@ def test_api_access_from_xml_storable_object(pub, local_user, admin_user):
|
|||
assert output.json['err_desc'] == 'invalid orig'
|
||||
|
||||
signed_url = sign_url(
|
||||
'http://example.net/api/user/?format=json&orig=salut&email=%s' % (urllib.quote(local_user.email)),
|
||||
'http://example.net/api/user/?format=json&orig=salut&email=%s'
|
||||
% (urllib.parse.quote(local_user.email)),
|
||||
'5678',
|
||||
)
|
||||
url = signed_url[len('http://example.net') :]
|
||||
|
@ -313,7 +331,7 @@ def test_is_url_signed_check_nonce(pub, local_user, freezer):
|
|||
pub.clean_nonces(now=0)
|
||||
nonce_dir = os.path.join(pub.app_dir, 'nonces')
|
||||
assert not os.path.exists(nonce_dir) or not os.listdir(nonce_dir)
|
||||
signed_url = sign_url('?format=json&orig=%s&email=%s' % (ORIG, urllib.quote(local_user.email)), KEY)
|
||||
signed_url = sign_url('?format=json&orig=%s&email=%s' % (ORIG, urllib.parse.quote(local_user.email)), KEY)
|
||||
req = HTTPRequest(
|
||||
None, {'SCRIPT_NAME': '/', 'SERVER_NAME': 'example.net', 'QUERY_STRING': signed_url[1:]}
|
||||
)
|
||||
|
@ -343,7 +361,8 @@ def test_is_url_signed_check_nonce(pub, local_user, freezer):
|
|||
|
||||
def test_get_user_compat_endpoint(pub, local_user):
|
||||
signed_url = sign_url(
|
||||
'http://example.net/user?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email), '1234'
|
||||
'http://example.net/user?format=json&orig=coucou&email=%s' % urllib.parse.quote(local_user.email),
|
||||
'1234',
|
||||
)
|
||||
url = signed_url[len('http://example.net') :]
|
||||
output = get_app(pub).get(url)
|
||||
|
|
|
@ -1,11 +1,11 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
import io
|
||||
import json
|
||||
import os
|
||||
|
||||
import mock
|
||||
import pytest
|
||||
from django.utils.six import StringIO
|
||||
from quixote import get_publisher
|
||||
from utilities import clean_temporary_pub, create_temporary_pub, get_app
|
||||
|
||||
|
@ -130,7 +130,7 @@ def test_validate_condition(pub):
|
|||
|
||||
def test_reverse_geocoding(pub):
|
||||
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
|
||||
urlopen.side_effect = lambda *args: StringIO(json.dumps({'address': 'xxx'}))
|
||||
urlopen.side_effect = lambda *args: io.StringIO(json.dumps({'address': 'xxx'}))
|
||||
get_app(pub).get('/api/reverse-geocoding', status=400)
|
||||
resp = get_app(pub).get('/api/reverse-geocoding?lat=0&lon=0')
|
||||
assert resp.content_type == 'application/json'
|
||||
|
@ -170,7 +170,7 @@ def test_reverse_geocoding(pub):
|
|||
|
||||
def test_geocoding(pub):
|
||||
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
|
||||
urlopen.side_effect = lambda *args: StringIO(json.dumps([{'lat': 0, 'lon': 0}]))
|
||||
urlopen.side_effect = lambda *args: io.StringIO(json.dumps([{'lat': 0, 'lon': 0}]))
|
||||
get_app(pub).get('/api/geocoding', status=400)
|
||||
resp = get_app(pub).get('/api/geocoding?q=test')
|
||||
assert resp.content_type == 'application/json'
|
||||
|
|
|
@ -1,15 +1,15 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
import base64
|
||||
import io
|
||||
import json
|
||||
import os
|
||||
import time
|
||||
import urllib.parse
|
||||
|
||||
import mock
|
||||
import pytest
|
||||
from django.utils.encoding import force_text
|
||||
from django.utils.six import StringIO
|
||||
from django.utils.six.moves.urllib import parse as urllib
|
||||
from quixote import get_publisher
|
||||
from utilities import clean_temporary_pub, create_temporary_pub, get_app
|
||||
|
||||
|
@ -225,7 +225,7 @@ def test_card_submit(pub, local_user):
|
|||
def url():
|
||||
signed_url = sign_url(
|
||||
'http://example.net/api/cards/test/submit'
|
||||
+ '?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email),
|
||||
+ '?format=json&orig=coucou&email=%s' % urllib.parse.quote(local_user.email),
|
||||
'1234',
|
||||
)
|
||||
return signed_url[len('http://example.net') :]
|
||||
|
@ -301,7 +301,7 @@ def test_carddef_submit_with_varname(pub, local_user):
|
|||
|
||||
signed_url = sign_url(
|
||||
'http://example.net/api/cards/test/submit'
|
||||
+ '?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email),
|
||||
+ '?format=json&orig=coucou&email=%s' % urllib.parse.quote(local_user.email),
|
||||
'1234',
|
||||
)
|
||||
url = signed_url[len('http://example.net') :]
|
||||
|
@ -429,7 +429,8 @@ def test_carddef_submit_from_wscall(pub, local_user):
|
|||
|
||||
def url():
|
||||
signed_url = sign_url(
|
||||
'http://example.net/api/cards/test/submit?orig=coucou&email=%s' % urllib.quote(local_user.email),
|
||||
'http://example.net/api/cards/test/submit?orig=coucou&email=%s'
|
||||
% urllib.parse.quote(local_user.email),
|
||||
'1234',
|
||||
)
|
||||
return signed_url[len('http://example.net') :]
|
||||
|
@ -516,13 +517,13 @@ def test_formdef_submit_structured(pub, local_user):
|
|||
|
||||
signed_url = sign_url(
|
||||
'http://example.net/api/cards/test/submit'
|
||||
'?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email),
|
||||
'?format=json&orig=coucou&email=%s' % urllib.parse.quote(local_user.email),
|
||||
'1234',
|
||||
)
|
||||
url = signed_url[len('http://example.net') :]
|
||||
|
||||
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
|
||||
urlopen.side_effect = lambda *args: StringIO(
|
||||
urlopen.side_effect = lambda *args: io.StringIO(
|
||||
'''\
|
||||
{"data": [{"id": 0, "text": "zéro", "foo": "bar"}, \
|
||||
{"id": 1, "text": "uné", "foo": "bar1"}, \
|
||||
|
|
|
@ -1,11 +1,11 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
import io
|
||||
import os
|
||||
import xml.etree.ElementTree as ET
|
||||
import zipfile
|
||||
|
||||
import pytest
|
||||
from django.utils.six import BytesIO
|
||||
from quixote import get_publisher
|
||||
from utilities import clean_temporary_pub, create_temporary_pub, get_app
|
||||
|
||||
|
@ -245,7 +245,7 @@ def test_api_ods_formdata_custom_view(pub, local_user):
|
|||
|
||||
# check it now gets the data
|
||||
resp = get_app(pub).get(sign_uri('/api/forms/test/ods', user=local_user))
|
||||
zipf = zipfile.ZipFile(BytesIO(resp.body))
|
||||
zipf = zipfile.ZipFile(io.BytesIO(resp.body))
|
||||
ods_sheet = ET.parse(zipf.open('content.xml'))
|
||||
assert len(ods_sheet.findall('.//{%s}table-row' % ods.NS['table'])) == 11
|
||||
|
||||
|
@ -259,7 +259,7 @@ def test_api_ods_formdata_custom_view(pub, local_user):
|
|||
custom_view.store()
|
||||
|
||||
resp = get_app(pub).get(sign_uri('/api/forms/test/ods/custom-view', user=local_user))
|
||||
zipf = zipfile.ZipFile(BytesIO(resp.body))
|
||||
zipf = zipfile.ZipFile(io.BytesIO(resp.body))
|
||||
ods_sheet = ET.parse(zipf.open('content.xml'))
|
||||
assert len(ods_sheet.findall('.//{%s}table-row' % ods.NS['table'])) == 21
|
||||
|
||||
|
|
|
@ -2,6 +2,7 @@
|
|||
|
||||
import base64
|
||||
import datetime
|
||||
import io
|
||||
import os
|
||||
import re
|
||||
import time
|
||||
|
@ -10,7 +11,6 @@ import zipfile
|
|||
|
||||
import pytest
|
||||
from django.utils.encoding import force_bytes
|
||||
from django.utils.six import BytesIO
|
||||
from quixote import get_publisher
|
||||
from utilities import clean_temporary_pub, create_temporary_pub, get_app, login
|
||||
|
||||
|
@ -900,7 +900,7 @@ def test_api_ods_formdata(pub, local_user):
|
|||
|
||||
resp = get_app(pub).get(sign_uri('/api/forms/test/ods', user=local_user))
|
||||
assert resp.content_type == 'application/vnd.oasis.opendocument.spreadsheet'
|
||||
zipf = zipfile.ZipFile(BytesIO(resp.body))
|
||||
zipf = zipfile.ZipFile(io.BytesIO(resp.body))
|
||||
ods_sheet = ET.parse(zipf.open('content.xml'))
|
||||
assert len(ods_sheet.findall('.//{%s}table-row' % ods.NS['table'])) == 311
|
||||
|
||||
|
|
|
@ -2,15 +2,15 @@
|
|||
|
||||
import base64
|
||||
import datetime
|
||||
import io
|
||||
import json
|
||||
import os
|
||||
import time
|
||||
import urllib.parse
|
||||
|
||||
import mock
|
||||
import pytest
|
||||
from django.utils.encoding import force_text
|
||||
from django.utils.six import StringIO
|
||||
from django.utils.six.moves.urllib import parse as urllib
|
||||
from quixote import get_publisher
|
||||
from utilities import clean_temporary_pub, create_temporary_pub, get_app
|
||||
|
||||
|
@ -352,7 +352,7 @@ def test_formdef_schema(pub):
|
|||
formdef.store()
|
||||
|
||||
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
|
||||
urlopen.side_effect = lambda *args: StringIO(
|
||||
urlopen.side_effect = lambda *args: io.StringIO(
|
||||
'''\
|
||||
{"data": [{"id": 0, "text": "zéro", "foo": "bar"}, \
|
||||
{"id": 1, "text": "uné", "foo": "bar1"}, \
|
||||
|
@ -465,7 +465,7 @@ def test_formdef_submit(pub, local_user):
|
|||
def url():
|
||||
signed_url = sign_url(
|
||||
'http://example.net/api/formdefs/test/submit'
|
||||
+ '?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email),
|
||||
+ '?format=json&orig=coucou&email=%s' % urllib.parse.quote(local_user.email),
|
||||
'1234',
|
||||
)
|
||||
return signed_url[len('http://example.net') :]
|
||||
|
@ -554,7 +554,7 @@ def test_formdef_submit_only_one(pub, local_user):
|
|||
def url():
|
||||
signed_url = sign_url(
|
||||
'http://example.net/api/formdefs/test/submit'
|
||||
+ '?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email),
|
||||
+ '?format=json&orig=coucou&email=%s' % urllib.parse.quote(local_user.email),
|
||||
'1234',
|
||||
)
|
||||
return signed_url[len('http://example.net') :]
|
||||
|
@ -609,7 +609,7 @@ def test_formdef_submit_with_varname(pub, local_user):
|
|||
|
||||
signed_url = sign_url(
|
||||
'http://example.net/api/formdefs/test/submit'
|
||||
+ '?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email),
|
||||
+ '?format=json&orig=coucou&email=%s' % urllib.parse.quote(local_user.email),
|
||||
'1234',
|
||||
)
|
||||
url = signed_url[len('http://example.net') :]
|
||||
|
@ -793,13 +793,13 @@ def test_formdef_submit_structured(pub, local_user):
|
|||
def url():
|
||||
signed_url = sign_url(
|
||||
'http://example.net/api/formdefs/test/submit'
|
||||
'?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email),
|
||||
'?format=json&orig=coucou&email=%s' % urllib.parse.quote(local_user.email),
|
||||
'1234',
|
||||
)
|
||||
return signed_url[len('http://example.net') :]
|
||||
|
||||
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
|
||||
urlopen.side_effect = lambda *args: StringIO(
|
||||
urlopen.side_effect = lambda *args: io.StringIO(
|
||||
'''\
|
||||
{"data": [{"id": 0, "text": "zéro", "foo": "bar"}, \
|
||||
{"id": 1, "text": "uné", "foo": "bar1"}, \
|
||||
|
|
|
@ -4,23 +4,22 @@ import base64
|
|||
import datetime
|
||||
import hashlib
|
||||
import hmac
|
||||
import urllib.parse
|
||||
|
||||
from django.utils.encoding import force_bytes
|
||||
from django.utils.six.moves.urllib import parse as urllib
|
||||
from django.utils.six.moves.urllib import parse as urlparse
|
||||
|
||||
|
||||
def sign_uri(uri, user=None, format='json'):
|
||||
timestamp = datetime.datetime.utcnow().isoformat()[:19] + 'Z'
|
||||
scheme, netloc, path, params, query, fragment = urlparse.urlparse(uri)
|
||||
scheme, netloc, path, params, query, fragment = urllib.parse.urlparse(uri)
|
||||
if query:
|
||||
query += '&'
|
||||
if format:
|
||||
query += 'format=%s&' % format
|
||||
query += 'orig=coucou&algo=sha256×tamp=' + timestamp
|
||||
if user:
|
||||
query += '&email=' + urllib.quote(user.email)
|
||||
query += '&signature=%s' % urllib.quote(
|
||||
query += '&email=' + urllib.parse.quote(user.email)
|
||||
query += '&signature=%s' % urllib.parse.quote(
|
||||
base64.b64encode(hmac.new(b'1234', force_bytes(query), hashlib.sha256).digest())
|
||||
)
|
||||
return urlparse.urlunparse((scheme, netloc, path, params, query, fragment))
|
||||
return urllib.parse.urlunparse((scheme, netloc, path, params, query, fragment))
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
import datetime
|
||||
import io
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
|
@ -10,8 +12,6 @@ import zipfile
|
|||
import mock
|
||||
import pytest
|
||||
|
||||
from django.utils.six import StringIO, BytesIO
|
||||
|
||||
from quixote import get_publisher
|
||||
from quixote.http_request import Upload as QuixoteUpload
|
||||
from wcs.blocks import BlockDef
|
||||
|
@ -2082,7 +2082,7 @@ def test_backoffice_download_as_zip(pub):
|
|||
formdef.store()
|
||||
resp = app.get('/backoffice/management/form-title/%s/' % number31.id)
|
||||
resp = resp.click('Download all files as .zip')
|
||||
zip_content = BytesIO(resp.body)
|
||||
zip_content = io.BytesIO(resp.body)
|
||||
zipf = zipfile.ZipFile(zip_content, 'a')
|
||||
filelist = zipf.namelist()
|
||||
assert set(filelist) == {'1_bar', '2_bar'}
|
||||
|
@ -2725,7 +2725,7 @@ def test_backoffice_wfedit_and_data_source_with_user_info(pub):
|
|||
|
||||
def side_effect(url, *args):
|
||||
assert '?name_id=admin' in url
|
||||
return StringIO(json.dumps(data))
|
||||
return io.StringIO(json.dumps(data))
|
||||
|
||||
urlopen.side_effect = side_effect
|
||||
|
||||
|
@ -2779,7 +2779,7 @@ def test_backoffice_wfedit_and_workflow_data(pub):
|
|||
|
||||
def side_effect(url, *args):
|
||||
assert '?test=foobar' in url
|
||||
return StringIO(json.dumps(data))
|
||||
return io.StringIO(json.dumps(data))
|
||||
|
||||
urlopen.side_effect = side_effect
|
||||
|
||||
|
@ -2835,7 +2835,7 @@ def test_backoffice_wfedit_and_data_source_with_field_info(pub):
|
|||
|
||||
def side_effect(url, *args):
|
||||
assert '?xxx=FOO BAR 30' in url
|
||||
return StringIO(json.dumps(data))
|
||||
return io.StringIO(json.dumps(data))
|
||||
|
||||
urlopen.side_effect = side_effect
|
||||
|
||||
|
@ -4212,9 +4212,9 @@ def test_backoffice_workflow_form_with_live_data_source(pub):
|
|||
|
||||
def side_effect(url, *args):
|
||||
if 'toto' not in url:
|
||||
return StringIO(json.dumps(data1))
|
||||
return io.StringIO(json.dumps(data1))
|
||||
else:
|
||||
return StringIO(json.dumps(data2))
|
||||
return io.StringIO(json.dumps(data2))
|
||||
|
||||
urlopen.side_effect = side_effect
|
||||
|
||||
|
@ -5560,7 +5560,7 @@ def test_workflow_inspect_page(pub):
|
|||
export_to.convert_to_pdf = False
|
||||
export_to.label = 'create doc'
|
||||
upload = QuixoteUpload('/foo/test.rtf', content_type='application/rtf')
|
||||
upload.fp = BytesIO()
|
||||
upload.fp = io.BytesIO()
|
||||
upload.fp.write(b'HELLO WORLD')
|
||||
upload.fp.seek(0)
|
||||
export_to.model_file = UploadedFile(pub.app_dir, None, upload)
|
||||
|
|
|
@ -1,15 +1,15 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
import datetime
|
||||
import io
|
||||
import os
|
||||
import time
|
||||
import urllib.parse
|
||||
import xml.etree.ElementTree as ET
|
||||
import zipfile
|
||||
|
||||
import pytest
|
||||
|
||||
from django.utils.six import BytesIO
|
||||
from django.utils.six.moves.urllib import parse as urlparse
|
||||
|
||||
from wcs.qommon import ods
|
||||
from wcs.blocks import BlockDef
|
||||
from wcs.qommon.form import PicklableUpload
|
||||
|
@ -200,7 +200,7 @@ def test_backoffice_export_long_listings(pub, threshold):
|
|||
resp = app.get('/backoffice/management/form-title/')
|
||||
resp = resp.click('Export a Spreadsheet')
|
||||
assert resp.location.startswith('http://example.net/backoffice/processing?job=')
|
||||
job_id = urlparse.parse_qs(urlparse.urlparse(resp.location).query)['job'][0]
|
||||
job_id = urllib.parse.parse_qs(urllib.parse.urlparse(resp.location).query)['job'][0]
|
||||
resp = resp.follow()
|
||||
assert 'completed' in resp.text
|
||||
resp = resp.click('Download Export')
|
||||
|
@ -451,7 +451,7 @@ def test_backoffice_ods(pub):
|
|||
assert 'filename=form-title.ods' in resp.headers['content-disposition']
|
||||
assert resp.body[:2] == b'PK' # ods has a zip container
|
||||
|
||||
zipf = zipfile.ZipFile(BytesIO(resp.body))
|
||||
zipf = zipfile.ZipFile(io.BytesIO(resp.body))
|
||||
ods_sheet = ET.parse(zipf.open('content.xml'))
|
||||
# check the ods contains a link to the document
|
||||
elem = ods_sheet.findall('.//{%s}a' % ods.NS['text'])[0]
|
||||
|
|
|
@ -3,9 +3,9 @@ import datetime
|
|||
import os
|
||||
import re
|
||||
import time
|
||||
import urllib.parse
|
||||
|
||||
import pytest
|
||||
from django.utils.six.moves.urllib import parse as urllib
|
||||
from utilities import clean_temporary_pub, create_temporary_pub, get_app, login
|
||||
|
||||
from wcs import fields
|
||||
|
@ -290,7 +290,7 @@ def test_backoffice_submission_initiated_from_welco(pub, welco_url):
|
|||
def post_formdata():
|
||||
signed_url = sign_url(
|
||||
'http://example.net/api/formdefs/form-title/submit'
|
||||
'?format=json&orig=coucou&email=%s' % urllib.quote(user.email),
|
||||
'?format=json&orig=coucou&email=%s' % urllib.parse.quote(user.email),
|
||||
'1234',
|
||||
)
|
||||
url = signed_url[len('http://example.net') :]
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
import configparser
|
||||
import os
|
||||
from django.utils.six.moves import configparser as ConfigParser
|
||||
|
||||
import pytest
|
||||
|
||||
|
@ -7,7 +7,7 @@ from utilities import EmailsMocking, SMSMocking, HttpRequestsMocking
|
|||
|
||||
|
||||
def site_options(request, pub, section, variable, value):
|
||||
config = ConfigParser.ConfigParser()
|
||||
config = configparser.ConfigParser()
|
||||
path = os.path.join(pub.app_dir, 'site-options.cfg')
|
||||
if os.path.exists(path):
|
||||
config.read([path])
|
||||
|
@ -18,7 +18,7 @@ def site_options(request, pub, section, variable, value):
|
|||
config.write(site_option)
|
||||
|
||||
def fin():
|
||||
config = ConfigParser.ConfigParser()
|
||||
config = configparser.ConfigParser()
|
||||
if os.path.exists(path):
|
||||
config.read([path])
|
||||
config.remove_option(section, variable)
|
||||
|
|
|
@ -1,12 +1,15 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
import io
|
||||
import json
|
||||
import pytest
|
||||
import hashlib
|
||||
import os
|
||||
import re
|
||||
import time
|
||||
import urllib.parse
|
||||
import zipfile
|
||||
|
||||
from webtest import Upload, Hidden, Radio
|
||||
import mock
|
||||
import xml.etree.ElementTree as ET
|
||||
|
@ -16,9 +19,6 @@ try:
|
|||
except ImportError:
|
||||
Image = None
|
||||
|
||||
from django.utils.six import StringIO, BytesIO
|
||||
from django.utils.six.moves.urllib import parse as urlparse
|
||||
|
||||
from django.utils.encoding import force_bytes, force_text
|
||||
|
||||
from wcs.qommon import force_str
|
||||
|
@ -2627,7 +2627,7 @@ def test_form_edit_autocomplete_list(pub):
|
|||
{'id': '2', 'text': 'world', 'extra': 'bar'},
|
||||
]
|
||||
}
|
||||
urlopen.side_effect = lambda *args: StringIO(json.dumps(data))
|
||||
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
|
||||
resp = app.get('/test/')
|
||||
assert 'data-select2-url=' in resp.text
|
||||
# simulate select2 mode, with qommon.forms.js adding an extra hidden widget
|
||||
|
@ -2771,13 +2771,13 @@ def test_form_item_data_source_field_submit(pub):
|
|||
|
||||
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
|
||||
data = {'data': [{'id': '1', 'text': 'un'}, {'id': '2', 'text': 'deux'}]}
|
||||
urlopen.side_effect = lambda *args: StringIO(json.dumps(data))
|
||||
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
|
||||
assert submit_item_data_source_field(ds) == {'0': '1', '0_display': 'un'}
|
||||
|
||||
# numeric identifiers
|
||||
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
|
||||
data = {'data': [{'id': 1, 'text': 'un'}, {'id': 2, 'text': 'deux'}]}
|
||||
urlopen.side_effect = lambda *args: StringIO(json.dumps(data))
|
||||
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
|
||||
assert submit_item_data_source_field(ds) == {'0': '1', '0_display': 'un'}
|
||||
|
||||
|
||||
|
@ -2954,7 +2954,7 @@ def test_form_page_session_var_prefill(pub):
|
|||
|
||||
# check it's not set if it's not whitelisted
|
||||
resp = get_app(pub).get('/?session_var_foo=hello')
|
||||
assert urlparse.urlparse(resp.location).path == '/'
|
||||
assert urllib.parse.urlparse(resp.location).path == '/'
|
||||
resp = resp.follow()
|
||||
resp = resp.click('test')
|
||||
assert resp.forms[0]['f0'].value == ''
|
||||
|
@ -2967,14 +2967,14 @@ query_string_allowed_vars = foo,bar
|
|||
)
|
||||
|
||||
resp = get_app(pub).get('/?session_var_foo=hello')
|
||||
assert urlparse.urlparse(resp.location).path == '/'
|
||||
assert urllib.parse.urlparse(resp.location).path == '/'
|
||||
resp = resp.follow()
|
||||
resp = resp.click('test')
|
||||
assert resp.forms[0]['f0'].value == 'hello'
|
||||
|
||||
# check it survives a login
|
||||
resp = get_app(pub).get('/?session_var_foo=hello2')
|
||||
assert urlparse.urlparse(resp.location).path == '/'
|
||||
assert urllib.parse.urlparse(resp.location).path == '/'
|
||||
resp = resp.follow()
|
||||
resp = resp.click('Login')
|
||||
resp = resp.follow()
|
||||
|
@ -2987,15 +2987,15 @@ query_string_allowed_vars = foo,bar
|
|||
|
||||
# check repeated options are ignored
|
||||
resp = get_app(pub).get('/?session_var_foo=hello&session_var_foo=hello2')
|
||||
assert urlparse.urlparse(resp.location).path == '/'
|
||||
assert urllib.parse.urlparse(resp.location).path == '/'
|
||||
resp = resp.follow()
|
||||
resp = resp.click('test')
|
||||
assert resp.forms[0]['f0'].value == ''
|
||||
|
||||
# check extra query string parameters are not lost
|
||||
resp = get_app(pub).get('/?session_var_foo=hello&foo=bar')
|
||||
assert urlparse.urlparse(resp.location).path == '/'
|
||||
assert urlparse.urlparse(resp.location).query == 'foo=bar'
|
||||
assert urllib.parse.urlparse(resp.location).path == '/'
|
||||
assert urllib.parse.urlparse(resp.location).query == 'foo=bar'
|
||||
|
||||
os.unlink(os.path.join(pub.app_dir, 'site-options.cfg'))
|
||||
|
||||
|
@ -3156,7 +3156,7 @@ def test_form_page_formula_prefill_items_field(pub):
|
|||
|
||||
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
|
||||
data = {'data': [{'id': '1', 'text': 'hello'}, {'id': '2', 'text': 'world'}]}
|
||||
urlopen.side_effect = lambda *args: StringIO(json.dumps(data))
|
||||
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
|
||||
resp = get_app(pub).get('/test/')
|
||||
assert not resp.form['f0$element1'].checked
|
||||
assert resp.form['f0$element2'].checked
|
||||
|
@ -3358,7 +3358,7 @@ def test_form_file_field_with_fargo(pub, fargo_url):
|
|||
fargo_resp = app.get('/fargo/pick') # display file picker
|
||||
assert fargo_resp.location == 'http://fargo.example.net/pick/?pick=http%3A//example.net/fargo/pick'
|
||||
with mock.patch('wcs.portfolio.urlopen') as urlopen:
|
||||
urlopen.side_effect = lambda *args: BytesIO(b'...')
|
||||
urlopen.side_effect = lambda *args: io.BytesIO(b'...')
|
||||
fargo_resp = app.get('/fargo/pick?url=http://www.example.org/...')
|
||||
assert 'window.top.document.fargo_set_token' in fargo_resp.text
|
||||
resp.form['f0$file'] = None
|
||||
|
@ -5511,7 +5511,7 @@ def test_item_field_with_disabled_items(http_requests, pub):
|
|||
|
||||
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
|
||||
data = {'data': [{'id': '1', 'text': 'hello'}, {'id': '2', 'text': 'world'}]}
|
||||
urlopen.side_effect = lambda *args: StringIO(json.dumps(data))
|
||||
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
|
||||
resp = get_app(pub).get('/test/')
|
||||
resp.form['f0'] = '1'
|
||||
resp.form['f0'] = '2'
|
||||
|
@ -5524,7 +5524,7 @@ def test_item_field_with_disabled_items(http_requests, pub):
|
|||
|
||||
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
|
||||
data = {'data': [{'id': '1', 'text': 'hello', 'disabled': True}, {'id': '2', 'text': 'world'}]}
|
||||
urlopen.side_effect = lambda *args: StringIO(json.dumps(data))
|
||||
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
|
||||
resp = get_app(pub).get('/test/')
|
||||
pq = resp.pyquery.remove_namespaces()
|
||||
assert pq('option[disabled=disabled][value="1"]').text() == 'hello'
|
||||
|
@ -5550,7 +5550,7 @@ def test_item_field_with_disabled_items(http_requests, pub):
|
|||
|
||||
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
|
||||
data = {'data': [{'id': '1', 'text': 'hello', 'disabled': True}, {'id': '2', 'text': 'world'}]}
|
||||
urlopen.side_effect = lambda *args: StringIO(json.dumps(data))
|
||||
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
|
||||
resp = get_app(pub).get('/test/')
|
||||
pq = resp.pyquery.remove_namespaces()
|
||||
assert len(pq('option[disabled=disabled][value="1"]')) == 0
|
||||
|
@ -5575,7 +5575,7 @@ def test_item_field_with_disabled_items(http_requests, pub):
|
|||
|
||||
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
|
||||
data = {'data': [{'id': '1', 'text': 'hello'}, {'id': '2', 'text': 'world'}]}
|
||||
urlopen.side_effect = lambda *args: StringIO(json.dumps(data))
|
||||
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
|
||||
resp = get_app(pub).get('/test/')
|
||||
resp.form['f0'] = '1'
|
||||
resp.form['f0'] = '2'
|
||||
|
@ -5588,7 +5588,7 @@ def test_item_field_with_disabled_items(http_requests, pub):
|
|||
|
||||
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
|
||||
data = {'data': [{'id': '1', 'text': 'hello', 'disabled': True}, {'id': '2', 'text': 'world'}]}
|
||||
urlopen.side_effect = lambda *args: StringIO(json.dumps(data))
|
||||
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
|
||||
resp = get_app(pub).get('/test/')
|
||||
pq = resp.pyquery.remove_namespaces()
|
||||
assert len(pq('input[name="f0"][disabled=disabled][value="1"]')) == 1
|
||||
|
@ -5619,7 +5619,7 @@ def test_items_field_with_disabled_items(http_requests, pub):
|
|||
|
||||
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
|
||||
data = {'data': [{'id': '1', 'text': 'hello'}, {'id': '2', 'text': 'world'}]}
|
||||
urlopen.side_effect = lambda *args: StringIO(json.dumps(data))
|
||||
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
|
||||
resp = get_app(pub).get('/test/')
|
||||
resp.form['f0$element1'].checked = True
|
||||
resp.form['f0$element2'].checked = True
|
||||
|
@ -5632,7 +5632,7 @@ def test_items_field_with_disabled_items(http_requests, pub):
|
|||
|
||||
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
|
||||
data = {'data': [{'id': '1', 'text': 'hello', 'disabled': True}, {'id': '2', 'text': 'world'}]}
|
||||
urlopen.side_effect = lambda *args: StringIO(json.dumps(data))
|
||||
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
|
||||
resp = get_app(pub).get('/test/')
|
||||
assert 'disabled' in resp.form['f0$element1'].attrs
|
||||
resp.form['f0$element1'].checked = True
|
||||
|
@ -5650,7 +5650,7 @@ def test_items_field_with_disabled_items(http_requests, pub):
|
|||
|
||||
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
|
||||
data = {'data': [{'id': '1', 'text': 'hello', 'disabled': True}, {'id': '2', 'text': 'world'}]}
|
||||
urlopen.side_effect = lambda *args: StringIO(json.dumps(data))
|
||||
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
|
||||
resp = get_app(pub).get('/test/')
|
||||
assert not 'f0$element1' in resp.form.fields
|
||||
resp.form['f0$element2'].checked = True
|
||||
|
@ -5688,7 +5688,7 @@ def test_item_field_autocomplete_json_source(http_requests, pub):
|
|||
{'id': '2', 'text': 'world', 'extra': 'bar'},
|
||||
]
|
||||
}
|
||||
urlopen.side_effect = lambda *args: StringIO(json.dumps(data))
|
||||
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
|
||||
resp = get_app(pub).get('/test/')
|
||||
assert 'data-autocomplete="true"' in resp.text
|
||||
assert resp.form['f0'].value == '1'
|
||||
|
@ -5709,7 +5709,7 @@ def test_item_field_autocomplete_json_source(http_requests, pub):
|
|||
{'id': '2', 'text': 'world', 'extra': 'bar'},
|
||||
]
|
||||
}
|
||||
urlopen.side_effect = lambda *args: StringIO(json.dumps(data))
|
||||
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
|
||||
resp = get_app(pub).get('/test/')
|
||||
assert 'data-autocomplete="true"' in resp.text
|
||||
assert 'data-hint="help text"' in resp.text
|
||||
|
@ -5733,7 +5733,7 @@ def test_item_field_autocomplete_json_source(http_requests, pub):
|
|||
{'id': '2', 'text': 'world', 'extra': 'bar'},
|
||||
]
|
||||
}
|
||||
urlopen.side_effect = lambda *args: StringIO(json.dumps(data))
|
||||
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
|
||||
resp = app.get('/test/')
|
||||
assert urlopen.call_count == 0
|
||||
pq = resp.pyquery.remove_namespaces()
|
||||
|
@ -5741,7 +5741,7 @@ def test_item_field_autocomplete_json_source(http_requests, pub):
|
|||
|
||||
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
|
||||
data = {'data': [{'id': '1', 'text': 'hello', 'extra': 'foo'}]}
|
||||
urlopen.side_effect = lambda *args: StringIO(json.dumps(data))
|
||||
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
|
||||
resp2 = app.get(select2_url + '?q=hell')
|
||||
assert urlopen.call_count == 1
|
||||
assert urlopen.call_args[0][0] == 'http://remote.example.net/json?q=hell'
|
||||
|
@ -5757,7 +5757,7 @@ def test_item_field_autocomplete_json_source(http_requests, pub):
|
|||
|
||||
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
|
||||
data = {'data': [{'id': '1', 'text': 'hello', 'extra': 'foo'}]}
|
||||
urlopen.side_effect = lambda *args: StringIO(json.dumps(data))
|
||||
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
|
||||
resp = resp.form.submit('submit') # -> validation page
|
||||
assert urlopen.call_count == 1
|
||||
assert urlopen.call_args[0][0] == 'http://remote.example.net/json?id=1'
|
||||
|
@ -5766,7 +5766,7 @@ def test_item_field_autocomplete_json_source(http_requests, pub):
|
|||
|
||||
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
|
||||
data = {'data': [{'id': '1', 'text': 'hello', 'extra': 'foo'}]}
|
||||
urlopen.side_effect = lambda *args: StringIO(json.dumps(data))
|
||||
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
|
||||
resp = resp.form.submit('submit') # -> submit
|
||||
assert urlopen.call_count == 1
|
||||
assert urlopen.call_args[0][0] == 'http://remote.example.net/json?id=1'
|
||||
|
@ -5784,7 +5784,7 @@ def test_item_field_autocomplete_json_source(http_requests, pub):
|
|||
data = {
|
||||
'data': [{'id': 1, 'text': 'hello', 'extra': 'foo'}, {'id': 2, 'text': 'world', 'extra': 'bar'}]
|
||||
}
|
||||
urlopen.side_effect = lambda *args: StringIO(json.dumps(data))
|
||||
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
|
||||
resp = app.get('/test/')
|
||||
assert urlopen.call_count == 0
|
||||
pq = resp.pyquery.remove_namespaces()
|
||||
|
@ -5792,7 +5792,7 @@ def test_item_field_autocomplete_json_source(http_requests, pub):
|
|||
|
||||
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
|
||||
data = {'data': [{'id': 1, 'text': 'hello', 'extra': 'foo'}]}
|
||||
urlopen.side_effect = lambda *args: StringIO(json.dumps(data))
|
||||
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
|
||||
resp2 = app.get(select2_url + '?q=hell')
|
||||
assert urlopen.call_count == 1
|
||||
assert urlopen.call_args[0][0] == 'http://remote.example.net/json-numeric-id?q=hell'
|
||||
|
@ -5808,7 +5808,7 @@ def test_item_field_autocomplete_json_source(http_requests, pub):
|
|||
|
||||
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
|
||||
data = {'data': [{'id': 1, 'text': 'hello', 'extra': 'foo'}]}
|
||||
urlopen.side_effect = lambda *args: StringIO(json.dumps(data))
|
||||
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
|
||||
resp = resp.form.submit('submit') # -> validation page
|
||||
assert urlopen.call_count == 1
|
||||
assert urlopen.call_args[0][0] == 'http://remote.example.net/json-numeric-id?id=1'
|
||||
|
@ -5817,7 +5817,7 @@ def test_item_field_autocomplete_json_source(http_requests, pub):
|
|||
|
||||
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
|
||||
data = {'data': [{'id': 1, 'text': 'hello', 'extra': 'foo'}]}
|
||||
urlopen.side_effect = lambda *args: StringIO(json.dumps(data))
|
||||
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
|
||||
resp = resp.form.submit('submit') # -> submit
|
||||
assert urlopen.call_count == 1
|
||||
assert urlopen.call_args[0][0] == 'http://remote.example.net/json-numeric-id?id=1'
|
||||
|
@ -5844,7 +5844,7 @@ remote.example.net = 1234
|
|||
{'id': '2', 'text': 'world', 'extra': 'bar'},
|
||||
]
|
||||
}
|
||||
urlopen.side_effect = lambda *args: StringIO(json.dumps(data))
|
||||
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
|
||||
resp = app.get('/test/')
|
||||
assert urlopen.call_count == 0
|
||||
pq = resp.pyquery.remove_namespaces()
|
||||
|
@ -5852,7 +5852,7 @@ remote.example.net = 1234
|
|||
|
||||
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
|
||||
data = {'data': [{'id': '1', 'text': 'hello', 'extra': 'foo'}]}
|
||||
urlopen.side_effect = lambda *args: StringIO(json.dumps(data))
|
||||
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
|
||||
resp2 = app.get(select2_url + '?q=hell')
|
||||
assert urlopen.call_count == 1
|
||||
assert urlopen.call_args[0][0].startswith('http://remote.example.net/json?q=hell&orig=example.net&')
|
||||
|
@ -5865,7 +5865,7 @@ remote.example.net = 1234
|
|||
|
||||
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
|
||||
data = {'data': [{'id': '1', 'text': 'hello', 'extra': 'foo'}]}
|
||||
urlopen.side_effect = lambda *args: StringIO(json.dumps(data))
|
||||
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
|
||||
resp = resp.form.submit('submit') # -> validation page
|
||||
assert urlopen.call_count == 1
|
||||
assert urlopen.call_args[0][0].startswith('http://remote.example.net/json?id=1&orig=example.net&')
|
||||
|
@ -5874,7 +5874,7 @@ remote.example.net = 1234
|
|||
|
||||
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
|
||||
data = {'data': [{'id': '1', 'text': 'hello', 'extra': 'foo'}]}
|
||||
urlopen.side_effect = lambda *args: StringIO(json.dumps(data))
|
||||
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
|
||||
resp = resp.form.submit('submit') # -> submit
|
||||
assert urlopen.call_count == 1
|
||||
assert urlopen.call_args[0][0].startswith('http://remote.example.net/json?id=1&orig=example.net&')
|
||||
|
@ -5895,7 +5895,7 @@ remote.example.net = 1234
|
|||
{'id': '2', 'text': 'world', 'extra': 'bar'},
|
||||
]
|
||||
}
|
||||
urlopen.side_effect = lambda *args: StringIO(json.dumps(data))
|
||||
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
|
||||
resp = app.get('/test/')
|
||||
pq = resp.pyquery.remove_namespaces()
|
||||
select2_url = pq('select').attr['data-select2-url']
|
||||
|
@ -5919,7 +5919,7 @@ remote.example.net = 1234
|
|||
{'id': '2', 'text': 'world', 'extra': 'bar'},
|
||||
]
|
||||
}
|
||||
urlopen.side_effect = lambda *args: StringIO(json.dumps(data))
|
||||
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
|
||||
resp = app.get('/test/')
|
||||
assert urlopen.call_count == 0
|
||||
pq = resp.pyquery.remove_namespaces()
|
||||
|
@ -5927,7 +5927,7 @@ remote.example.net = 1234
|
|||
|
||||
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
|
||||
data = {'data': [{'id': '1', 'text': 'hello', 'extra': 'foo'}]}
|
||||
urlopen.side_effect = lambda *args: StringIO(json.dumps(data))
|
||||
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
|
||||
resp2 = app.get(select2_url + '?q=hell', status=403)
|
||||
assert urlopen.call_count == 0
|
||||
|
||||
|
@ -8332,7 +8332,7 @@ def test_create_formdata_empty_item_ds_with_id_parameter(pub, create_formdata):
|
|||
|
||||
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
|
||||
data = {'data': create_formdata['data']}
|
||||
urlopen.side_effect = lambda *args: StringIO(json.dumps(data))
|
||||
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
|
||||
|
||||
app = get_app(create_formdata['pub'])
|
||||
resp = app.get('/source-form/')
|
||||
|
@ -8710,7 +8710,7 @@ def test_form_item_timetable_data_source(pub, http_requests):
|
|||
{"id": "3", "datetime": "2021-01-14 10:40:00", "text": "event 3"},
|
||||
]
|
||||
}
|
||||
urlopen.side_effect = lambda *args: StringIO(json.dumps(data))
|
||||
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
|
||||
|
||||
resp = app.get('/test/')
|
||||
assert 'data-date="2021-01-12"' in resp and 'data-time="10:00"' in resp
|
||||
|
@ -8764,7 +8764,7 @@ def test_form_item_timetable_data_source_with_date_alignment(pub, http_requests)
|
|||
{"id": "3", "datetime": "2021-01-14 10:40:00", "text": "event 3"},
|
||||
]
|
||||
}
|
||||
urlopen.side_effect = lambda *args: StringIO(json.dumps(data))
|
||||
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
|
||||
|
||||
resp = app.get('/test/')
|
||||
resp.form['f2'] = '2021-01-14'
|
||||
|
|
|
@ -1,9 +1,9 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
import io
|
||||
import json
|
||||
import mock
|
||||
|
||||
from django.utils.six import StringIO
|
||||
|
||||
import pytest
|
||||
from webtest import Upload
|
||||
|
||||
|
@ -1255,7 +1255,7 @@ def test_block_with_dynamic_item_field(mock_urlopen, pub, blocks_feature):
|
|||
payload = [{'id': '1', 'text': 'foo'}]
|
||||
elif query == 'bar':
|
||||
payload = [{'id': '2', 'text': 'bar'}]
|
||||
return StringIO(json.dumps({'data': payload}))
|
||||
return io.StringIO(json.dumps({'data': payload}))
|
||||
|
||||
mock_urlopen.side_effect = lambda url: data_source(url)
|
||||
|
||||
|
|
|
@ -1,16 +1,17 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
import base64
|
||||
import io
|
||||
import json
|
||||
import locale
|
||||
import os
|
||||
import urllib.parse
|
||||
import xml.etree.ElementTree as ET
|
||||
import zipfile
|
||||
|
||||
import mock
|
||||
import pytest
|
||||
from django.utils.encoding import force_bytes
|
||||
from django.utils.six import BytesIO, StringIO
|
||||
from django.utils.six.moves.urllib import parse as urlparse
|
||||
from quixote.http_request import Upload as QuixoteUpload
|
||||
from webtest import Upload, Hidden
|
||||
|
||||
|
@ -332,7 +333,7 @@ def test_formdata_generated_document_download(pub):
|
|||
export_to.convert_to_pdf = False
|
||||
export_to.label = 'create doc'
|
||||
upload = QuixoteUpload('/foo/test.rtf', content_type='application/rtf')
|
||||
upload.fp = BytesIO()
|
||||
upload.fp = io.BytesIO()
|
||||
upload.fp.write(b'HELLO WORLD')
|
||||
upload.fp.seek(0)
|
||||
export_to.model_file = UploadedFile(pub.app_dir, None, upload)
|
||||
|
@ -382,7 +383,7 @@ def test_formdata_generated_document_download(pub):
|
|||
# change export model to now be a RTF file, do the action again on the same form and
|
||||
# check that both the old .odt file and the new .rtf file are there and valid.
|
||||
upload = QuixoteUpload('/foo/test.rtf', content_type='application/rtf')
|
||||
upload.fp = BytesIO()
|
||||
upload.fp = io.BytesIO()
|
||||
upload.fp.write(b'HELLO NEW WORLD')
|
||||
upload.fp.seek(0)
|
||||
export_to.model_file = UploadedFile(pub.app_dir, None, upload)
|
||||
|
@ -398,7 +399,7 @@ def test_formdata_generated_document_download(pub):
|
|||
|
||||
# use substitution variables on rtf: only ezt format is accepted
|
||||
upload = QuixoteUpload('/foo/test.rtf', content_type='application/rtf')
|
||||
upload.fp = BytesIO()
|
||||
upload.fp = io.BytesIO()
|
||||
upload.fp.write(b'HELLO {{DJANGO}} WORLD [form_name]')
|
||||
upload.fp.seek(0)
|
||||
export_to.model_file = UploadedFile(pub.app_dir, None, upload)
|
||||
|
@ -427,7 +428,7 @@ def test_formdata_generated_document_odt_download(pub, odt_template):
|
|||
template_filename = os.path.join(os.path.dirname(__file__), '..', odt_template)
|
||||
template = open(template_filename, 'rb').read()
|
||||
upload = QuixoteUpload('/foo/' + odt_template, content_type='application/octet-stream')
|
||||
upload.fp = BytesIO()
|
||||
upload.fp = io.BytesIO()
|
||||
upload.fp.write(template)
|
||||
upload.fp.seek(0)
|
||||
export_to.model_file = UploadedFile(pub.app_dir, None, upload)
|
||||
|
@ -460,7 +461,7 @@ def test_formdata_generated_document_odt_download(pub, odt_template):
|
|||
resp = resp.follow() # $form/$id/create_doc
|
||||
resp = resp.follow() # $form/$id/create_doc/
|
||||
with open(os.path.join(os.path.dirname(__file__), '..', 'template-out.odt'), 'rb') as f:
|
||||
assert_equal_zip(BytesIO(resp.body), f)
|
||||
assert_equal_zip(io.BytesIO(resp.body), f)
|
||||
|
||||
resp = login(get_app(pub), username='foo', password='foo').get(form_location)
|
||||
resp = resp.form.submit('button_export_to')
|
||||
|
@ -487,11 +488,11 @@ def test_formdata_generated_document_odt_download(pub, odt_template):
|
|||
resp = resp.follow()
|
||||
assert resp.content_type == 'application/octet-stream'
|
||||
with open(os.path.join(os.path.dirname(__file__), '..', 'template-out.odt'), 'rb') as f:
|
||||
assert_equal_zip(BytesIO(resp.body), f)
|
||||
assert_equal_zip(io.BytesIO(resp.body), f)
|
||||
|
||||
# change file content, same name
|
||||
upload = QuixoteUpload('/foo/test.rtf', content_type='application/rtf')
|
||||
upload.fp = BytesIO()
|
||||
upload.fp = io.BytesIO()
|
||||
upload.fp.write(b'HELLO NEW WORLD')
|
||||
upload.fp.seek(0)
|
||||
export_to.model_file = UploadedFile(pub.app_dir, None, upload)
|
||||
|
@ -504,7 +505,7 @@ def test_formdata_generated_document_odt_download(pub, odt_template):
|
|||
|
||||
with open(os.path.join(os.path.dirname(__file__), '..', 'template-out.odt'), 'rb') as f:
|
||||
body = resp.click(odt_template, index=0).follow().body
|
||||
assert_equal_zip(BytesIO(body), f)
|
||||
assert_equal_zip(io.BytesIO(body), f)
|
||||
assert resp.click('test.rtf', index=0).follow().body == b'HELLO NEW WORLD'
|
||||
|
||||
|
||||
|
@ -519,7 +520,7 @@ def test_formdata_generated_document_odt_download_with_substitution_variable(pub
|
|||
template_filename = os.path.join(os.path.dirname(__file__), '..', 'template.odt')
|
||||
template = open(template_filename, 'rb').read()
|
||||
upload = QuixoteUpload('/foo/template.odt', content_type='application/octet-stream')
|
||||
upload.fp = BytesIO()
|
||||
upload.fp = io.BytesIO()
|
||||
upload.fp.write(template)
|
||||
upload.fp.seek(0)
|
||||
export_to.model_file = UploadedFile(pub.app_dir, None, upload)
|
||||
|
@ -552,7 +553,7 @@ def test_formdata_generated_document_odt_download_with_substitution_variable(pub
|
|||
resp = resp.follow() # $form/$id/create_doc
|
||||
resp = resp.follow() # $form/$id/create_doc/
|
||||
with open(os.path.join(os.path.dirname(__file__), '..', 'template-out.odt'), 'rb') as f:
|
||||
assert_equal_zip(BytesIO(resp.body), f)
|
||||
assert_equal_zip(io.BytesIO(resp.body), f)
|
||||
|
||||
export_to.attach_to_history = True
|
||||
wf.store()
|
||||
|
@ -567,11 +568,11 @@ def test_formdata_generated_document_odt_download_with_substitution_variable(pub
|
|||
response1 = resp = resp.follow()
|
||||
assert resp.content_type == 'application/octet-stream'
|
||||
with open(os.path.join(os.path.dirname(__file__), '..', 'template-out.odt'), 'rb') as f:
|
||||
assert_equal_zip(BytesIO(resp.body), f)
|
||||
assert_equal_zip(io.BytesIO(resp.body), f)
|
||||
|
||||
# change file content, same name
|
||||
upload = QuixoteUpload('/foo/test.rtf', content_type='application/rtf')
|
||||
upload.fp = BytesIO()
|
||||
upload.fp = io.BytesIO()
|
||||
upload.fp.write(b'HELLO NEW WORLD')
|
||||
upload.fp.seek(0)
|
||||
export_to.model_file = UploadedFile(pub.app_dir, None, upload)
|
||||
|
@ -584,7 +585,7 @@ def test_formdata_generated_document_odt_download_with_substitution_variable(pub
|
|||
|
||||
with open(os.path.join(os.path.dirname(__file__), '..', 'template-out.odt'), 'rb') as f:
|
||||
body = resp.click('template.odt', index=0).follow().body
|
||||
assert_equal_zip(BytesIO(body), f)
|
||||
assert_equal_zip(io.BytesIO(body), f)
|
||||
response2 = resp.click('test.rtf', index=0).follow()
|
||||
assert response2.body == b'HELLO NEW WORLD'
|
||||
# Test attachment substitution variables
|
||||
|
@ -640,7 +641,7 @@ def test_formdata_generated_document_odt_to_pdf_download(pub):
|
|||
template_filename = os.path.join(os.path.dirname(__file__), '..', 'template.odt')
|
||||
template = open(template_filename, 'rb').read()
|
||||
upload = QuixoteUpload('/foo/template.odt', content_type='application/octet-stream')
|
||||
upload.fp = BytesIO()
|
||||
upload.fp = io.BytesIO()
|
||||
upload.fp.write(template)
|
||||
upload.fp.seek(0)
|
||||
export_to.model_file = UploadedFile(pub.app_dir, None, upload)
|
||||
|
@ -711,7 +712,7 @@ def test_formdata_generated_document_odt_to_pdf_download_push_to_portfolio(
|
|||
template_filename = os.path.join(os.path.dirname(__file__), '..', 'template.odt')
|
||||
template = open(template_filename, 'rb').read()
|
||||
upload = QuixoteUpload('/foo/template.odt', content_type='application/octet-stream')
|
||||
upload.fp = BytesIO()
|
||||
upload.fp = io.BytesIO()
|
||||
upload.fp.write(template)
|
||||
upload.fp.seek(0)
|
||||
export_to.model_file = UploadedFile(pub.app_dir, None, upload)
|
||||
|
@ -804,7 +805,7 @@ def test_formdata_generated_document_non_interactive(pub):
|
|||
template_filename = os.path.join(os.path.dirname(__file__), '..', 'template.odt')
|
||||
template = open(template_filename, 'rb').read()
|
||||
upload = QuixoteUpload('/foo/template.odt', content_type='application/octet-stream')
|
||||
upload.fp = BytesIO()
|
||||
upload.fp = io.BytesIO()
|
||||
upload.fp.write(template)
|
||||
upload.fp.seek(0)
|
||||
export_to.model_file = UploadedFile(pub.app_dir, None, upload)
|
||||
|
@ -844,7 +845,7 @@ def test_formdata_generated_document_non_interactive(pub):
|
|||
resp = resp.follow()
|
||||
assert resp.content_type == 'application/octet-stream'
|
||||
with open(os.path.join(os.path.dirname(__file__), '..', 'template-out.odt'), 'rb') as f:
|
||||
assert_equal_zip(BytesIO(resp.body), f)
|
||||
assert_equal_zip(io.BytesIO(resp.body), f)
|
||||
|
||||
assert formdef.data_class().count() == 1
|
||||
assert formdef.data_class().select()[0].status == 'wf-st2'
|
||||
|
@ -866,7 +867,7 @@ def test_formdata_generated_document_to_backoffice_field(pub):
|
|||
template_filename = os.path.join(os.path.dirname(__file__), '..', 'template.odt')
|
||||
template = open(template_filename, 'rb').read()
|
||||
upload = QuixoteUpload('/foo/template.odt', content_type='application/octet-stream')
|
||||
upload.fp = BytesIO()
|
||||
upload.fp = io.BytesIO()
|
||||
upload.fp.write(template)
|
||||
upload.fp.seek(0)
|
||||
export_to.model_file = UploadedFile(pub.app_dir, None, upload)
|
||||
|
@ -911,7 +912,7 @@ def test_formdata_generated_document_to_backoffice_field(pub):
|
|||
resp = resp.follow()
|
||||
assert resp.content_type == 'application/octet-stream'
|
||||
with open(os.path.join(os.path.dirname(__file__), '..', 'template-out.odt'), 'rb') as f:
|
||||
assert_equal_zip(BytesIO(resp.body), f)
|
||||
assert_equal_zip(io.BytesIO(resp.body), f)
|
||||
|
||||
assert formdef.data_class().count() == 1
|
||||
assert formdef.data_class().select()[0].status == 'wf-st2'
|
||||
|
@ -933,7 +934,7 @@ def test_formdata_generated_document_in_private_history(pub):
|
|||
export_to = ExportToModel()
|
||||
export_to.label = 'create doc'
|
||||
upload = QuixoteUpload('/foo/test.rtf', content_type='application/rtf')
|
||||
upload.fp = BytesIO()
|
||||
upload.fp = io.BytesIO()
|
||||
upload.fp.write(b'HELLO WORLD')
|
||||
upload.fp.seek(0)
|
||||
export_to.model_file = UploadedFile(pub.app_dir, None, upload)
|
||||
|
@ -1087,7 +1088,7 @@ def test_formdata_form_file_download(pub):
|
|||
formdata = formdef.data_class().select()[0]
|
||||
assert 'xxx_var_yyy_raw' in formdata.workflow_data
|
||||
|
||||
download = resp.test_app.get(urlparse.urljoin(resp.location, 'files/form-xxx-yyy/test.txt'))
|
||||
download = resp.test_app.get(urllib.parse.urljoin(resp.location, 'files/form-xxx-yyy/test.txt'))
|
||||
assert download.content_type == 'text/plain'
|
||||
assert download.body == b'foobar'
|
||||
|
||||
|
@ -1292,7 +1293,7 @@ def test_formdata_workflow_form_prefill_autocomplete(pub):
|
|||
data = {'data': [{'id': '1', 'text': 'hello', 'extra': 'foo'}]}
|
||||
|
||||
def side_effect(url, *args):
|
||||
return StringIO(json.dumps(data))
|
||||
return io.StringIO(json.dumps(data))
|
||||
|
||||
urlopen.side_effect = side_effect
|
||||
|
||||
|
|
|
@ -1,8 +1,7 @@
|
|||
import io
|
||||
import pytest
|
||||
import xml.etree.ElementTree as ET
|
||||
|
||||
from django.utils.six import BytesIO
|
||||
|
||||
from wcs.qommon.http_request import HTTPRequest
|
||||
from wcs.qommon.misc import indent_xml as indent
|
||||
from wcs.qommon.template import Template
|
||||
|
@ -141,7 +140,7 @@ def test_xml_export_import(pub):
|
|||
carddef.data_class().wipe()
|
||||
pub.custom_view_class.wipe()
|
||||
|
||||
carddef2 = CardDef.import_from_xml(BytesIO(ET.tostring(carddef_xml)))
|
||||
carddef2 = CardDef.import_from_xml(io.BytesIO(ET.tostring(carddef_xml)))
|
||||
assert carddef2.name == 'foo'
|
||||
assert carddef2.fields[1].data_source == {'type': 'carddef:foo'}
|
||||
assert carddef2._custom_views
|
||||
|
|
|
@ -1,10 +1,10 @@
|
|||
import io
|
||||
import os
|
||||
import pickle
|
||||
import shutil
|
||||
|
||||
import pytest
|
||||
|
||||
from django.utils.six import BytesIO
|
||||
from quixote import cleanup
|
||||
|
||||
from wcs.categories import Category, CardDefCategory
|
||||
|
@ -108,7 +108,7 @@ def test_xml_import(category_class):
|
|||
test.store()
|
||||
test = category_class.get(1)
|
||||
|
||||
fd = BytesIO(test.export_to_xml_string(include_id=True))
|
||||
fd = io.BytesIO(test.export_to_xml_string(include_id=True))
|
||||
test2 = category_class.import_from_xml(fd, include_id=True)
|
||||
assert test.id == test2.id
|
||||
assert test.name == test2.name
|
||||
|
|
|
@ -1,13 +1,12 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
import codecs
|
||||
import io
|
||||
import pytest
|
||||
import os
|
||||
import json
|
||||
import shutil
|
||||
|
||||
from django.utils.six import StringIO
|
||||
from django.utils.six.moves.urllib import parse as urlparse
|
||||
import urllib.parse
|
||||
|
||||
from quixote import cleanup
|
||||
from wcs.qommon.http_request import HTTPRequest
|
||||
|
@ -889,7 +888,7 @@ def test_data_source_unicode():
|
|||
data_source2 = NamedDataSource.select()[0]
|
||||
assert data_source2.data_source == data_source.data_source
|
||||
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
|
||||
urlopen.side_effect = lambda *args: StringIO(
|
||||
urlopen.side_effect = lambda *args: io.StringIO(
|
||||
'{"data": [{"id": 0, "text": "zéro"}, {"id": 1, "text": "uné"}, {"id": 2, "text": "deux"}]}'
|
||||
)
|
||||
assert data_sources.get_items({'type': 'foobar'}) == [
|
||||
|
@ -906,12 +905,12 @@ def test_data_source_signed(no_request_pub):
|
|||
data_source.store()
|
||||
|
||||
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
|
||||
urlopen.side_effect = lambda *args: StringIO('{"data": [{"id": 0, "text": "zero"}]}')
|
||||
urlopen.side_effect = lambda *args: io.StringIO('{"data": [{"id": 0, "text": "zero"}]}')
|
||||
assert len(data_sources.get_items({'type': 'foobar'})) == 1
|
||||
signed_url = urlopen.call_args[0][0]
|
||||
assert signed_url.startswith('https://api.example.com/json?')
|
||||
parsed = urlparse.urlparse(signed_url)
|
||||
querystring = urlparse.parse_qs(parsed.query)
|
||||
parsed = urllib.parse.urlparse(signed_url)
|
||||
querystring = urllib.parse.parse_qs(parsed.query)
|
||||
# stupid simple (but sufficient) signature test:
|
||||
assert querystring['algo'] == ['sha256']
|
||||
assert querystring['orig'] == ['example.net']
|
||||
|
@ -922,12 +921,12 @@ def test_data_source_signed(no_request_pub):
|
|||
data_source.data_source = {'type': 'json', 'value': "https://api.example.com/json?foo=bar"}
|
||||
data_source.store()
|
||||
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
|
||||
urlopen.side_effect = lambda *args: StringIO('{"data": [{"id": 0, "text": "zero"}]}')
|
||||
urlopen.side_effect = lambda *args: io.StringIO('{"data": [{"id": 0, "text": "zero"}]}')
|
||||
assert len(data_sources.get_items({'type': 'foobar'})) == 1
|
||||
signed_url = urlopen.call_args[0][0]
|
||||
assert signed_url.startswith('https://api.example.com/json?')
|
||||
parsed = urlparse.urlparse(signed_url)
|
||||
querystring = urlparse.parse_qs(parsed.query)
|
||||
parsed = urllib.parse.urlparse(signed_url)
|
||||
querystring = urllib.parse.parse_qs(parsed.query)
|
||||
assert querystring['algo'] == ['sha256']
|
||||
assert querystring['orig'] == ['example.net']
|
||||
assert querystring['nonce'][0]
|
||||
|
@ -938,7 +937,7 @@ def test_data_source_signed(no_request_pub):
|
|||
data_source.data_source = {'type': 'json', 'value': "https://no-secret.example.com/json"}
|
||||
data_source.store()
|
||||
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
|
||||
urlopen.side_effect = lambda *args: StringIO('{"data": [{"id": 0, "text": "zero"}]}')
|
||||
urlopen.side_effect = lambda *args: io.StringIO('{"data": [{"id": 0, "text": "zero"}]}')
|
||||
assert len(data_sources.get_items({'type': 'foobar'})) == 1
|
||||
unsigned_url = urlopen.call_args[0][0]
|
||||
assert unsigned_url == 'https://no-secret.example.com/json'
|
||||
|
@ -951,7 +950,7 @@ def test_named_datasource_json_cache(requests_pub):
|
|||
datasource.store()
|
||||
|
||||
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
|
||||
urlopen.side_effect = lambda *args: StringIO(
|
||||
urlopen.side_effect = lambda *args: io.StringIO(
|
||||
json.dumps({'data': [{'id': '1', 'text': 'foo'}, {'id': '2', 'text': 'bar'}]})
|
||||
)
|
||||
|
||||
|
@ -997,7 +996,7 @@ def test_named_datasource_id_parameter(requests_pub):
|
|||
|
||||
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
|
||||
value = [{'id': '1', 'text': 'foo'}]
|
||||
urlopen.side_effect = lambda *args: StringIO(json.dumps({'data': value}))
|
||||
urlopen.side_effect = lambda *args: io.StringIO(json.dumps({'data': value}))
|
||||
assert datasource.get_structured_value('1') == value[0]
|
||||
assert urlopen.call_count == 1
|
||||
assert urlopen.call_args[0][0] == 'http://whatever/?id=1'
|
||||
|
@ -1009,27 +1008,27 @@ def test_named_datasource_id_parameter(requests_pub):
|
|||
get_request().datasources_cache = {}
|
||||
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
|
||||
value = [{'id': '1', 'text': 'bar'}, {'id': '2', 'text': 'foo'}]
|
||||
urlopen.side_effect = lambda *args: StringIO(json.dumps({'data': value}))
|
||||
urlopen.side_effect = lambda *args: io.StringIO(json.dumps({'data': value}))
|
||||
assert datasource.get_structured_value('1') == value[0]
|
||||
assert urlopen.call_count == 1
|
||||
|
||||
get_request().datasources_cache = {}
|
||||
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
|
||||
urlopen.side_effect = lambda *args: StringIO(json.dumps({'data': []})) # empty list
|
||||
urlopen.side_effect = lambda *args: io.StringIO(json.dumps({'data': []})) # empty list
|
||||
assert datasource.get_structured_value('1') is None
|
||||
assert urlopen.call_count == 1
|
||||
|
||||
get_request().datasources_cache = {}
|
||||
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
|
||||
value = [{'id': '1', 'text': 'foo'}]
|
||||
urlopen.side_effect = lambda *args: StringIO(json.dumps({'data': value, 'err': 0}))
|
||||
urlopen.side_effect = lambda *args: io.StringIO(json.dumps({'data': value, 'err': 0}))
|
||||
assert datasource.get_structured_value('1') == value[0]
|
||||
assert urlopen.call_count == 1
|
||||
|
||||
get_request().datasources_cache = {}
|
||||
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
|
||||
value = [{'id': '1', 'text': 'foo'}]
|
||||
urlopen.side_effect = lambda *args: StringIO(json.dumps({'data': value, 'err': 1}))
|
||||
urlopen.side_effect = lambda *args: io.StringIO(json.dumps({'data': value, 'err': 1}))
|
||||
assert datasource.get_structured_value('1') is None
|
||||
assert urlopen.call_count == 1
|
||||
# no cache for errors
|
||||
|
@ -1039,13 +1038,13 @@ def test_named_datasource_id_parameter(requests_pub):
|
|||
get_request().datasources_cache = {}
|
||||
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
|
||||
value = {'id': '1', 'text': 'foo'} # not a list
|
||||
urlopen.side_effect = lambda *args: StringIO(json.dumps({'data': value}))
|
||||
urlopen.side_effect = lambda *args: io.StringIO(json.dumps({'data': value}))
|
||||
assert datasource.get_structured_value('1') is None
|
||||
assert urlopen.call_count == 1
|
||||
|
||||
get_request().datasources_cache = {}
|
||||
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
|
||||
urlopen.side_effect = lambda *args: StringIO('not json')
|
||||
urlopen.side_effect = lambda *args: io.StringIO('not json')
|
||||
assert datasource.get_structured_value('1') is None
|
||||
assert urlopen.call_count == 1
|
||||
|
||||
|
@ -1053,7 +1052,7 @@ def test_named_datasource_id_parameter(requests_pub):
|
|||
get_request().datasources_cache = {}
|
||||
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
|
||||
value = [{'id': '1', 'text': 'bar'}, {'id': '2', 'text': 'foo'}]
|
||||
urlopen.side_effect = lambda *args: StringIO(json.dumps({'data': value}))
|
||||
urlopen.side_effect = lambda *args: io.StringIO(json.dumps({'data': value}))
|
||||
assert datasource.get_structured_value('2') == value[1]
|
||||
assert urlopen.call_count == 1
|
||||
# try again, get from request.datasources_cache
|
||||
|
@ -1062,7 +1061,7 @@ def test_named_datasource_id_parameter(requests_pub):
|
|||
get_request().datasources_cache = {}
|
||||
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
|
||||
value = [{'id': '1', 'text': 'bar'}, {'id': '2', 'text': 'foo'}]
|
||||
urlopen.side_effect = lambda *args: StringIO(json.dumps({'data': value}))
|
||||
urlopen.side_effect = lambda *args: io.StringIO(json.dumps({'data': value}))
|
||||
assert datasource.get_structured_value('3') is None
|
||||
assert urlopen.call_count == 1
|
||||
# try again, get from request.datasources_cache
|
||||
|
|
|
@ -1,11 +1,10 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
import io
|
||||
import pytest
|
||||
import json
|
||||
import shutil
|
||||
|
||||
from django.utils.six import StringIO
|
||||
|
||||
from quixote import cleanup
|
||||
from wcs import fields
|
||||
from wcs.data_sources import NamedDataSource, build_agenda_datasources, collect_agenda_data
|
||||
|
@ -112,7 +111,7 @@ def test_collect_agenda_data(urlopen, pub, chrono_url):
|
|||
pub.load_site_options()
|
||||
NamedDataSource.wipe()
|
||||
|
||||
urlopen.side_effect = lambda *args: StringIO('{"data": []}')
|
||||
urlopen.side_effect = lambda *args: io.StringIO('{"data": []}')
|
||||
assert collect_agenda_data(pub) == []
|
||||
assert urlopen.call_args_list == [mock.call('http://chrono.example.net/api/agenda/')]
|
||||
|
||||
|
@ -122,7 +121,7 @@ def test_collect_agenda_data(urlopen, pub, chrono_url):
|
|||
assert urlopen.call_args_list == [mock.call('http://chrono.example.net/api/agenda/')]
|
||||
|
||||
# events agenda
|
||||
urlopen.side_effect = lambda *args: StringIO(json.dumps({"data": AGENDA_EVENTS_DATA}))
|
||||
urlopen.side_effect = lambda *args: io.StringIO(json.dumps({"data": AGENDA_EVENTS_DATA}))
|
||||
urlopen.reset_mock()
|
||||
assert collect_agenda_data(pub) == [
|
||||
{'text': 'Events A', 'url': 'http://chrono.example.net/api/agenda/events-A/datetimes/'},
|
||||
|
@ -132,9 +131,9 @@ def test_collect_agenda_data(urlopen, pub, chrono_url):
|
|||
|
||||
# meetings agenda
|
||||
urlopen.side_effect = [
|
||||
StringIO(json.dumps({"data": AGENDA_MEETINGS_DATA})),
|
||||
StringIO(json.dumps({"data": AGENDA_MEETING_TYPES_DATA['meetings-A']})),
|
||||
StringIO(json.dumps({"data": AGENDA_MEETING_TYPES_DATA['virtual-B']})),
|
||||
io.StringIO(json.dumps({"data": AGENDA_MEETINGS_DATA})),
|
||||
io.StringIO(json.dumps({"data": AGENDA_MEETING_TYPES_DATA['meetings-A']})),
|
||||
io.StringIO(json.dumps({"data": AGENDA_MEETING_TYPES_DATA['virtual-B']})),
|
||||
]
|
||||
urlopen.reset_mock()
|
||||
assert collect_agenda_data(pub) == [
|
||||
|
@ -164,8 +163,8 @@ def test_collect_agenda_data(urlopen, pub, chrono_url):
|
|||
|
||||
# if meeting types could not be collected
|
||||
urlopen.side_effect = [
|
||||
StringIO(json.dumps({"data": AGENDA_MEETINGS_DATA})),
|
||||
StringIO(json.dumps({"data": AGENDA_MEETING_TYPES_DATA['meetings-A']})),
|
||||
io.StringIO(json.dumps({"data": AGENDA_MEETINGS_DATA})),
|
||||
io.StringIO(json.dumps({"data": AGENDA_MEETING_TYPES_DATA['meetings-A']})),
|
||||
ConnectionError,
|
||||
]
|
||||
urlopen.reset_mock()
|
||||
|
@ -177,7 +176,7 @@ def test_collect_agenda_data(urlopen, pub, chrono_url):
|
|||
]
|
||||
|
||||
urlopen.side_effect = [
|
||||
StringIO(json.dumps({"data": AGENDA_MEETINGS_DATA})),
|
||||
io.StringIO(json.dumps({"data": AGENDA_MEETINGS_DATA})),
|
||||
ConnectionError,
|
||||
]
|
||||
urlopen.reset_mock()
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
import datetime
|
||||
import io
|
||||
import pytest
|
||||
import os
|
||||
|
||||
from django.utils.six import StringIO
|
||||
from quixote import cleanup
|
||||
from wcs.qommon.ezt import (
|
||||
Template,
|
||||
|
@ -36,7 +36,7 @@ def pub(request):
|
|||
def test_simple_qualifier():
|
||||
template = Template()
|
||||
template.parse('<p>[foo]</p>')
|
||||
output = StringIO()
|
||||
output = io.StringIO()
|
||||
template.generate(output, {'foo': 'bar'})
|
||||
assert output.getvalue() == '<p>bar</p>'
|
||||
|
||||
|
@ -44,7 +44,7 @@ def test_simple_qualifier():
|
|||
def test_simple_qualifier_missing_variable():
|
||||
template = Template()
|
||||
template.parse('<p>[foo]</p>')
|
||||
output = StringIO()
|
||||
output = io.StringIO()
|
||||
template.generate(output, {})
|
||||
assert output.getvalue() == '<p>[foo]</p>'
|
||||
|
||||
|
@ -54,17 +54,17 @@ def test_if_any():
|
|||
template.parse('<p>[if-any foo]bar[end]</p>')
|
||||
|
||||
# boolean
|
||||
output = StringIO()
|
||||
output = io.StringIO()
|
||||
template.generate(output, {'foo': True})
|
||||
assert output.getvalue() == '<p>bar</p>'
|
||||
|
||||
# no value
|
||||
output = StringIO()
|
||||
output = io.StringIO()
|
||||
template.generate(output, {})
|
||||
assert output.getvalue() == '<p></p>'
|
||||
|
||||
# defined but evaluating to False
|
||||
output = StringIO()
|
||||
output = io.StringIO()
|
||||
template.generate(output, {'foo': False})
|
||||
assert output.getvalue() == '<p>bar</p>'
|
||||
|
||||
|
@ -73,11 +73,11 @@ def test_if_any_else():
|
|||
template = Template()
|
||||
template.parse('<p>[if-any foo]bar[else]baz[end]</p>')
|
||||
|
||||
output = StringIO()
|
||||
output = io.StringIO()
|
||||
template.generate(output, {'foo': True})
|
||||
assert output.getvalue() == '<p>bar</p>'
|
||||
|
||||
output = StringIO()
|
||||
output = io.StringIO()
|
||||
template.generate(output, {})
|
||||
assert output.getvalue() == '<p>baz</p>'
|
||||
|
||||
|
@ -87,17 +87,17 @@ def test_is():
|
|||
template.parse('<p>[is foo "bar"]bar[end]</p>')
|
||||
|
||||
# no value
|
||||
output = StringIO()
|
||||
output = io.StringIO()
|
||||
template.generate(output, {})
|
||||
assert output.getvalue() == '<p></p>'
|
||||
|
||||
# defined but other value
|
||||
output = StringIO()
|
||||
output = io.StringIO()
|
||||
template.generate(output, {'foo': 'baz'})
|
||||
assert output.getvalue() == '<p></p>'
|
||||
|
||||
# defined with correct value
|
||||
output = StringIO()
|
||||
output = io.StringIO()
|
||||
template.generate(output, {'foo': 'bar'})
|
||||
assert output.getvalue() == '<p>bar</p>'
|
||||
|
||||
|
@ -105,7 +105,7 @@ def test_is():
|
|||
def test_callable_qualifier():
|
||||
template = Template()
|
||||
template.parse('<p>[foo]</p>')
|
||||
output = StringIO()
|
||||
output = io.StringIO()
|
||||
template.generate(output, {'foo': lambda x: x.write('bar')})
|
||||
assert output.getvalue() == '<p>bar</p>'
|
||||
|
||||
|
@ -113,13 +113,13 @@ def test_callable_qualifier():
|
|||
def test_date_qualifier(pub):
|
||||
template = Template()
|
||||
template.parse('<p>[foo]</p>')
|
||||
output = StringIO()
|
||||
output = io.StringIO()
|
||||
template.generate(output, {'foo': datetime.date(2019, 1, 2)})
|
||||
assert output.getvalue() == '<p>2019-01-02</p>'
|
||||
|
||||
pub.cfg['language'] = {'language': 'fr'}
|
||||
pub.write_cfg()
|
||||
output = StringIO()
|
||||
output = io.StringIO()
|
||||
template.generate(output, {'foo': datetime.date(2019, 1, 2)})
|
||||
assert output.getvalue() == '<p>02/01/2019</p>'
|
||||
|
||||
|
@ -127,13 +127,13 @@ def test_date_qualifier(pub):
|
|||
def test_datetime_qualifier(pub):
|
||||
template = Template()
|
||||
template.parse('<p>[foo]</p>')
|
||||
output = StringIO()
|
||||
output = io.StringIO()
|
||||
template.generate(output, {'foo': datetime.datetime(2019, 1, 2, 14, 4)})
|
||||
assert output.getvalue() == '<p>2019-01-02 14:04</p>'
|
||||
|
||||
pub.cfg['language'] = {'language': 'fr'}
|
||||
pub.write_cfg()
|
||||
output = StringIO()
|
||||
output = io.StringIO()
|
||||
template.generate(output, {'foo': datetime.datetime(2019, 1, 2, 14, 4)})
|
||||
assert output.getvalue() == '<p>02/01/2019 14:04</p>'
|
||||
|
||||
|
@ -181,13 +181,13 @@ def test_missing_is_arg():
|
|||
def test_array_index():
|
||||
template = Template()
|
||||
template.parse('<p>[foo.0]</p>')
|
||||
output = StringIO()
|
||||
output = io.StringIO()
|
||||
template.generate(output, {'foo': ['bar']})
|
||||
assert output.getvalue() == '<p>bar</p>'
|
||||
|
||||
template = Template()
|
||||
template.parse('<p>[foo.bar]</p>')
|
||||
output = StringIO()
|
||||
output = io.StringIO()
|
||||
template.generate(output, {'foo': ['bar']})
|
||||
assert output.getvalue() == '<p>[foo.bar]</p>'
|
||||
|
||||
|
@ -195,7 +195,7 @@ def test_array_index():
|
|||
def test_array_subindex():
|
||||
template = Template()
|
||||
template.parse('<p>[foo.0.1]</p>')
|
||||
output = StringIO()
|
||||
output = io.StringIO()
|
||||
template.generate(output, {'foo': [['bar', 'baz']]})
|
||||
assert output.getvalue() == '<p>baz</p>'
|
||||
|
||||
|
@ -203,13 +203,13 @@ def test_array_subindex():
|
|||
def test_dict_index():
|
||||
template = Template()
|
||||
template.parse('<p>[foo.a]</p>')
|
||||
output = StringIO()
|
||||
output = io.StringIO()
|
||||
template.generate(output, {'foo': {'a': 'bar'}})
|
||||
assert output.getvalue() == '<p>bar</p>'
|
||||
|
||||
template = Template()
|
||||
template.parse('<p>[foo.b]</p>')
|
||||
output = StringIO()
|
||||
output = io.StringIO()
|
||||
template.generate(output, {'foo': {'a': 'bar'}})
|
||||
assert output.getvalue() == '<p>[foo.b]</p>'
|
||||
|
||||
|
@ -223,14 +223,14 @@ def test_ezt_script(pub):
|
|||
vars = {'script': ScriptsSubstitutionProxy()}
|
||||
template = Template()
|
||||
template.parse('<p>[script.hello_world]</p>')
|
||||
output = StringIO()
|
||||
output = io.StringIO()
|
||||
template.generate(output, vars)
|
||||
assert output.getvalue() == '<p>Hello world</p>'
|
||||
|
||||
vars = {'script': ScriptsSubstitutionProxy()}
|
||||
template = Template()
|
||||
template.parse('<p>[script.hello_world "fred"]</p>')
|
||||
output = StringIO()
|
||||
output = io.StringIO()
|
||||
template.generate(output, vars)
|
||||
assert output.getvalue() == '<p>Hello fred</p>'
|
||||
|
||||
|
|
|
@ -1,9 +1,8 @@
|
|||
import base64
|
||||
import json
|
||||
import urllib.parse
|
||||
|
||||
from django.utils.encoding import force_bytes, force_text
|
||||
from django.utils.six.moves.urllib import parse as urllib
|
||||
from django.utils.six.moves.urllib import parse as urlparse
|
||||
from quixote import cleanup, get_session_manager
|
||||
|
||||
from utilities import get_app, create_temporary_pub
|
||||
|
@ -126,7 +125,7 @@ def test_fc_login_page(caplog):
|
|||
resp = app.get('/login/')
|
||||
assert resp.status_int == 302
|
||||
assert resp.location.startswith('https://fcp.integ01.dev-franceconnect.fr/api/v1/authorize')
|
||||
qs = urlparse.parse_qs(resp.location.split('?')[1])
|
||||
qs = urllib.parse.parse_qs(resp.location.split('?')[1])
|
||||
nonce = qs['nonce'][0]
|
||||
state = qs['state'][0]
|
||||
|
||||
|
@ -152,7 +151,7 @@ def test_fc_login_page(caplog):
|
|||
http_get_page.return_value = (None, 200, json.dumps(user_info_result), None)
|
||||
resp = app.get(
|
||||
'/ident/fc/callback?%s'
|
||||
% urllib.urlencode(
|
||||
% urllib.parse.urlencode(
|
||||
{
|
||||
'code': '1234',
|
||||
'state': state,
|
||||
|
@ -176,19 +175,19 @@ def test_fc_login_page(caplog):
|
|||
assert session.extra_user_variables['fc_sub'] == 'ymca'
|
||||
|
||||
resp = app.get('/logout')
|
||||
splitted = urlparse.urlsplit(resp.location)
|
||||
splitted = urllib.parse.urlsplit(resp.location)
|
||||
assert (
|
||||
urlparse.urlunsplit((splitted.scheme, splitted.netloc, splitted.path, '', ''))
|
||||
urllib.parse.urlunsplit((splitted.scheme, splitted.netloc, splitted.path, '', ''))
|
||||
== 'https://fcp.integ01.dev-franceconnect.fr/api/v1/logout'
|
||||
)
|
||||
assert urlparse.parse_qs(splitted.query)['post_logout_redirect_uri'] == ['http://example.net']
|
||||
assert urlparse.parse_qs(splitted.query)['id_token_hint']
|
||||
assert urllib.parse.parse_qs(splitted.query)['post_logout_redirect_uri'] == ['http://example.net']
|
||||
assert urllib.parse.parse_qs(splitted.query)['id_token_hint']
|
||||
assert not get_session(app)
|
||||
|
||||
# Test error handling path
|
||||
resp = app.get(
|
||||
'/ident/fc/callback?%s'
|
||||
% urllib.urlencode(
|
||||
% urllib.parse.urlencode(
|
||||
{
|
||||
'state': state,
|
||||
'error': 'access_denied',
|
||||
|
@ -198,7 +197,7 @@ def test_fc_login_page(caplog):
|
|||
assert 'user did not authorize login' in caplog.records[-1].message
|
||||
resp = app.get(
|
||||
'/ident/fc/callback?%s'
|
||||
% urllib.urlencode(
|
||||
% urllib.parse.urlencode(
|
||||
{
|
||||
'state': state,
|
||||
'error': 'whatever',
|
||||
|
@ -212,7 +211,7 @@ def test_fc_login_page(caplog):
|
|||
resp = app.get(login_url)
|
||||
assert resp.status_int == 302
|
||||
assert resp.location.startswith('https://fcp.integ01.dev-franceconnect.fr/api/v1/authorize')
|
||||
qs = urlparse.parse_qs(resp.location.split('?')[1])
|
||||
qs = urllib.parse.parse_qs(resp.location.split('?')[1])
|
||||
state = qs['state'][0]
|
||||
id_token['nonce'] = qs['nonce'][0]
|
||||
token_result['id_token'] = '.%s.' % force_text(base64url_encode(json.dumps(id_token)))
|
||||
|
@ -224,7 +223,7 @@ def test_fc_login_page(caplog):
|
|||
http_get_page.return_value = (None, 200, json.dumps(user_info_result), None)
|
||||
resp = app.get(
|
||||
'/ident/fc/callback?%s'
|
||||
% urllib.urlencode(
|
||||
% urllib.parse.urlencode(
|
||||
{
|
||||
'code': '1234',
|
||||
'state': state,
|
||||
|
@ -263,7 +262,7 @@ def test_fc_login_page(caplog):
|
|||
resp = app.get('/login/')
|
||||
assert resp.status_int == 302
|
||||
assert resp.location.startswith('https://fcp.integ01.dev-franceconnect.fr/api/v1/authorize')
|
||||
qs = urlparse.parse_qs(resp.location.split('?')[1])
|
||||
qs = urllib.parse.parse_qs(resp.location.split('?')[1])
|
||||
state = qs['state'][0]
|
||||
id_token['nonce'] = qs['nonce'][0]
|
||||
token_result['id_token'] = '.%s.' % force_text(base64url_encode(json.dumps(id_token)))
|
||||
|
@ -280,7 +279,7 @@ def test_fc_login_page(caplog):
|
|||
http_get_page.return_value = (None, 200, json.dumps(bad_user_info_result), None)
|
||||
resp = app.get(
|
||||
'/ident/fc/callback?%s'
|
||||
% urllib.urlencode(
|
||||
% urllib.parse.urlencode(
|
||||
{
|
||||
'code': '1234',
|
||||
'state': state,
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
import datetime
|
||||
import glob
|
||||
import io
|
||||
import json
|
||||
import os
|
||||
import pickle
|
||||
|
@ -9,9 +10,7 @@ import time
|
|||
|
||||
import pytest
|
||||
|
||||
from django.utils import six
|
||||
from django.utils.encoding import force_bytes
|
||||
from django.utils.six import BytesIO
|
||||
from quixote import cleanup
|
||||
from wcs import fields
|
||||
from wcs.blocks import BlockDef
|
||||
|
@ -438,7 +437,7 @@ def test_unused_file_removal_job(pub):
|
|||
formdata.store()
|
||||
|
||||
formdata.evolution[-1].parts = [
|
||||
AttachmentEvolutionPart('hello.txt', fp=BytesIO(b'hello world'), varname='testfile')
|
||||
AttachmentEvolutionPart('hello.txt', fp=io.BytesIO(b'hello world'), varname='testfile')
|
||||
]
|
||||
formdata.store()
|
||||
assert len(glob.glob(os.path.join(pub.app_dir, 'attachments', '*/*'))) == 1
|
||||
|
|
|
@ -1,11 +1,11 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
import io
|
||||
import pytest
|
||||
import shutil
|
||||
import time
|
||||
import xml.etree.ElementTree as ET
|
||||
|
||||
from django.utils.six import BytesIO, StringIO
|
||||
from quixote import cleanup
|
||||
|
||||
from wcs.categories import Category
|
||||
|
@ -52,7 +52,7 @@ def assert_xml_import_export_works(formdef, include_id=False):
|
|||
|
||||
def assert_json_import_export_works(formdef, include_id=False):
|
||||
formdef2 = FormDef.import_from_json(
|
||||
StringIO(formdef.export_to_json(include_id=include_id)), include_id=include_id
|
||||
io.StringIO(formdef.export_to_json(include_id=include_id)), include_id=include_id
|
||||
)
|
||||
assert_compare_formdef(formdef, formdef2, include_id=include_id)
|
||||
return formdef2
|
||||
|
@ -216,7 +216,7 @@ def test_workflow_options_with_file():
|
|||
|
||||
upload = Upload('/foo/bar', content_type='application/vnd.oasis.opendocument.text')
|
||||
file_content = b'''PK\x03\x04\x14\x00\x00\x08\x00\x00\'l\x8eG^\xc62\x0c\'\x00'''
|
||||
upload.fp = BytesIO()
|
||||
upload.fp = io.BytesIO()
|
||||
upload.fp.write(file_content)
|
||||
upload.fp.seek(0)
|
||||
model_file = UploadedFile(pub.APP_DIR, None, upload)
|
||||
|
@ -339,7 +339,7 @@ def test_invalid_field_type():
|
|||
formdef.fields = [fields.StringField(id='1', type='XXX')]
|
||||
export = ET.tostring(export_to_indented_xml(formdef))
|
||||
with pytest.raises(FormdefImportError):
|
||||
FormDef.import_from_xml(BytesIO(export), include_id=True)
|
||||
FormDef.import_from_xml(io.BytesIO(export), include_id=True)
|
||||
|
||||
|
||||
def test_invalid_data_source():
|
||||
|
@ -352,11 +352,11 @@ def test_invalid_data_source():
|
|||
export = export.replace(
|
||||
b'<data_source><type>xxx</type></data_source>', b'<data_source><type/></data_source>'
|
||||
)
|
||||
formdef2 = FormDef.import_from_xml(BytesIO(export))
|
||||
formdef2 = FormDef.import_from_xml(io.BytesIO(export))
|
||||
assert formdef2.fields[0].data_source == {}
|
||||
|
||||
export = export.replace(b'<data_source><type/></data_source>', b'<data_source> </data_source>')
|
||||
formdef2 = FormDef.import_from_xml(BytesIO(export))
|
||||
formdef2 = FormDef.import_from_xml(io.BytesIO(export))
|
||||
assert formdef2.fields[0].data_source == {}
|
||||
|
||||
|
||||
|
@ -368,12 +368,12 @@ def test_unknown_data_source():
|
|||
]
|
||||
export = ET.tostring(export_to_indented_xml(formdef))
|
||||
|
||||
FormDef.import_from_xml(BytesIO(export))
|
||||
FormDef.import_from_xml(io.BytesIO(export))
|
||||
|
||||
formdef.fields = [fields.StringField(id='1', type='string', data_source={'type': 'foobar'})]
|
||||
export = ET.tostring(export_to_indented_xml(formdef))
|
||||
with pytest.raises(FormdefImportError, match='Unknown datasources'):
|
||||
FormDef.import_from_xml(BytesIO(export))
|
||||
FormDef.import_from_xml(io.BytesIO(export))
|
||||
|
||||
# carddef as datasource
|
||||
carddef = CardDef()
|
||||
|
@ -385,12 +385,12 @@ def test_unknown_data_source():
|
|||
|
||||
formdef.fields = [fields.StringField(id='1', type='string', data_source={'type': 'carddef:foo'})]
|
||||
export = ET.tostring(export_to_indented_xml(formdef))
|
||||
FormDef.import_from_xml(BytesIO(export))
|
||||
FormDef.import_from_xml(io.BytesIO(export))
|
||||
|
||||
formdef.fields = [fields.StringField(id='1', type='string', data_source={'type': 'carddef:unknown'})]
|
||||
export = ET.tostring(export_to_indented_xml(formdef))
|
||||
with pytest.raises(FormdefImportError):
|
||||
FormDef.import_from_xml(BytesIO(export))
|
||||
FormDef.import_from_xml(io.BytesIO(export))
|
||||
|
||||
# carddef custom view as datasource
|
||||
pub.custom_view_class.wipe()
|
||||
|
@ -406,12 +406,12 @@ def test_unknown_data_source():
|
|||
fields.StringField(id='1', type='string', data_source={'type': 'carddef:foo:card-view'})
|
||||
]
|
||||
export = ET.tostring(export_to_indented_xml(formdef))
|
||||
FormDef.import_from_xml(BytesIO(export))
|
||||
FormDef.import_from_xml(io.BytesIO(export))
|
||||
|
||||
formdef.fields = [fields.StringField(id='1', type='string', data_source={'type': 'carddef:foo:unknown'})]
|
||||
export = ET.tostring(export_to_indented_xml(formdef))
|
||||
with pytest.raises(FormdefImportError):
|
||||
FormDef.import_from_xml(BytesIO(export))
|
||||
FormDef.import_from_xml(io.BytesIO(export))
|
||||
|
||||
|
||||
def test_duplicated_field_ids():
|
||||
|
@ -425,12 +425,12 @@ def test_duplicated_field_ids():
|
|||
export = ET.tostring(export_to_indented_xml(formdef, include_id=True))
|
||||
|
||||
with pytest.raises(FormdefImportError):
|
||||
FormDef.import_from_xml(BytesIO(export))
|
||||
FormDef.import_from_xml(io.BytesIO(export))
|
||||
|
||||
with pytest.raises(FormdefImportError):
|
||||
FormDef.import_from_xml(BytesIO(export), include_id=True)
|
||||
FormDef.import_from_xml(io.BytesIO(export), include_id=True)
|
||||
|
||||
formdef2 = FormDef.import_from_xml(BytesIO(export), fix_on_error=True)
|
||||
formdef2 = FormDef.import_from_xml(io.BytesIO(export), fix_on_error=True)
|
||||
assert formdef2.fields[0].id == '1'
|
||||
assert formdef2.fields[1].id == '2'
|
||||
assert formdef2.fields[2].id == '3'
|
||||
|
@ -446,7 +446,7 @@ def test_wrong_max_field_id():
|
|||
formdef.max_field_id = 1
|
||||
export = ET.tostring(export_to_indented_xml(formdef, include_id=True))
|
||||
|
||||
formdef2 = FormDef.import_from_xml(BytesIO(export), include_id=True)
|
||||
formdef2 = FormDef.import_from_xml(io.BytesIO(export), include_id=True)
|
||||
assert formdef2.max_field_id == 2
|
||||
|
||||
|
||||
|
@ -639,7 +639,7 @@ def test_field_validation():
|
|||
old_format = ET.tostring(formdef_xml).replace(
|
||||
b'<validation><type>regex</type><value>\\d</value></validation>', b'<validation>\\d</validation>'
|
||||
)
|
||||
f2 = FormDef.import_from_xml(BytesIO(old_format))
|
||||
f2 = FormDef.import_from_xml(io.BytesIO(old_format))
|
||||
assert len(f2.fields) == len(formdef.fields)
|
||||
assert f2.fields[0].validation == {'type': 'regex', 'value': '\\d'}
|
||||
|
||||
|
@ -745,7 +745,7 @@ def test_custom_views():
|
|||
formdef.data_class().wipe()
|
||||
pub.custom_view_class.wipe()
|
||||
|
||||
formdef2 = FormDef.import_from_xml(BytesIO(ET.tostring(formdef_xml)))
|
||||
formdef2 = FormDef.import_from_xml(io.BytesIO(ET.tostring(formdef_xml)))
|
||||
assert formdef2.name == 'foo'
|
||||
assert formdef2._custom_views
|
||||
|
||||
|
|
|
@ -8,12 +8,12 @@ import pytest
|
|||
import shutil
|
||||
import sys
|
||||
import tempfile
|
||||
import urllib.parse
|
||||
|
||||
import mock
|
||||
|
||||
from utilities import create_temporary_pub, clean_temporary_pub
|
||||
|
||||
from django.utils.six.moves.urllib import parse as urlparse
|
||||
from quixote import cleanup
|
||||
|
||||
from wcs.qommon import force_str
|
||||
|
@ -243,7 +243,7 @@ def test_configure_site_options():
|
|||
key = '109fca71e7dc8ec49708a08fa7c02795de13f34f7d29d27bd150f203b3e0ab40'
|
||||
assert pub.get_site_option('authentic.example.net', 'api-secrets') == key
|
||||
assert pub.get_site_option('authentic.example.net', 'wscall-secrets') == key
|
||||
self_domain = urlparse.urlsplit(service.get('base_url')).netloc
|
||||
self_domain = urllib.parse.urlsplit(service.get('base_url')).netloc
|
||||
assert pub.get_site_option(self_domain, 'wscall-secrets') != '0'
|
||||
|
||||
service['variables']['xxx'] = None
|
||||
|
|
|
@ -1,13 +1,13 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
import base64
|
||||
import io
|
||||
import os
|
||||
import pytest
|
||||
from webtest import Upload
|
||||
import xml.etree.ElementTree as ET
|
||||
|
||||
from django.utils.encoding import force_bytes
|
||||
from django.utils.six import StringIO
|
||||
from quixote import cleanup
|
||||
from wcs.formdef import FormDef
|
||||
from wcs.fields import FileField
|
||||
|
@ -325,7 +325,7 @@ def test_mail_templates_export(pub, superuser, mail_template):
|
|||
resp = resp.click(href='export')
|
||||
xml_export = resp.text
|
||||
|
||||
ds = StringIO(xml_export)
|
||||
ds = io.StringIO(xml_export)
|
||||
mail_template2 = MailTemplate.import_from_xml(ds)
|
||||
assert mail_template2.name == 'test MT'
|
||||
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
import io
|
||||
import json
|
||||
import re
|
||||
import stat
|
||||
|
@ -17,9 +18,7 @@ from django.core.management import call_command
|
|||
from django.core.management.base import CommandError
|
||||
from django.http import Http404
|
||||
from django.test import override_settings
|
||||
from django.utils import six
|
||||
from django.utils.encoding import force_text
|
||||
from django.utils.six import BytesIO, StringIO
|
||||
from quixote import cleanup
|
||||
from wcs.qommon import get_publisher_class
|
||||
from wcs.qommon.afterjobs import AfterJob
|
||||
|
@ -115,7 +114,7 @@ def test_finish_failed_request():
|
|||
|
||||
def test_finish_interrupted_request():
|
||||
req = HTTPRequest(
|
||||
StringIO(''),
|
||||
io.StringIO(''),
|
||||
{
|
||||
'SERVER_NAME': 'example.net',
|
||||
'SCRIPT_NAME': '',
|
||||
|
@ -125,7 +124,7 @@ def test_finish_interrupted_request():
|
|||
response = pub.process_request(req)
|
||||
assert b'invalid content-length header' in response.getvalue()
|
||||
req = HTTPRequest(
|
||||
StringIO(''),
|
||||
io.StringIO(''),
|
||||
{
|
||||
'SERVER_NAME': 'example.net',
|
||||
'SCRIPT_NAME': '',
|
||||
|
@ -136,7 +135,7 @@ def test_finish_interrupted_request():
|
|||
response = pub.process_request(req)
|
||||
assert b'Invalid request: unexpected end of request body' in response.getvalue()
|
||||
req = HTTPRequest(
|
||||
StringIO(''),
|
||||
io.StringIO(''),
|
||||
{
|
||||
'SERVER_NAME': 'example.net',
|
||||
'SCRIPT_NAME': '',
|
||||
|
@ -148,7 +147,7 @@ def test_finish_interrupted_request():
|
|||
assert b'Invalid request: multipart/form-data missing boundary' in response.getvalue()
|
||||
with pytest.raises(Http404):
|
||||
req = HTTPRequest(
|
||||
StringIO(''),
|
||||
io.StringIO(''),
|
||||
{
|
||||
'SERVER_NAME': 'example.net',
|
||||
'SCRIPT_NAME': '',
|
||||
|
@ -184,7 +183,7 @@ def test_import_config_zip():
|
|||
pub.cfg['sp'] = {'what': 'ever'}
|
||||
pub.write_cfg()
|
||||
|
||||
c = BytesIO()
|
||||
c = io.BytesIO()
|
||||
z = zipfile.ZipFile(c, 'w')
|
||||
z.writestr('config.pck', pickle.dumps({'language': {'language': 'fr'}, 'whatever': ['a', 'b', 'c']}))
|
||||
z.close()
|
||||
|
@ -195,7 +194,7 @@ def test_import_config_zip():
|
|||
assert pub.cfg['whatever'] == ['a', 'b', 'c']
|
||||
assert pub.cfg['sp'] == {'what': 'ever'}
|
||||
|
||||
c = BytesIO()
|
||||
c = io.BytesIO()
|
||||
z = zipfile.ZipFile(c, 'w')
|
||||
z.writestr(
|
||||
'config.json', json.dumps({'language': {'language': 'en'}, 'whatever2': ['a', 'b', {'c': 'd'}]})
|
||||
|
|
|
@ -1,7 +1,5 @@
|
|||
import shutil
|
||||
|
||||
from django.utils import six
|
||||
|
||||
from quixote import cleanup
|
||||
|
||||
from wcs.qommon import x509utils
|
||||
|
@ -34,6 +32,6 @@ def test_metadata_generation():
|
|||
)
|
||||
assert meta != None
|
||||
content = meta.get_saml2_metadata(pkey, '', True, True)
|
||||
assert isinstance(content, six.string_types) and content != ''
|
||||
assert isinstance(content, str) and content != ''
|
||||
assert 'EntityDescriptor' in content
|
||||
assert 'SPSSODescriptor' in content
|
||||
|
|
|
@ -2,6 +2,7 @@ import datetime
|
|||
import os
|
||||
import sys
|
||||
import shutil
|
||||
import urllib.parse
|
||||
|
||||
try:
|
||||
import lasso
|
||||
|
@ -10,7 +11,6 @@ except ImportError:
|
|||
|
||||
import pytest
|
||||
|
||||
from django.utils.six.moves.urllib import parse as urlparse
|
||||
from quixote import cleanup
|
||||
from quixote import get_session, get_session_manager
|
||||
from wcs.qommon.http_request import HTTPRequest
|
||||
|
@ -350,7 +350,7 @@ def test_saml_login_page(pub):
|
|||
assert resp.status_int == 302
|
||||
assert resp.location.startswith('http://sso.example.net/saml2/sso?SAMLRequest=')
|
||||
request = lasso.Samlp2AuthnRequest()
|
||||
request.initFromQuery(urlparse.urlparse(resp.location).query)
|
||||
request.initFromQuery(urllib.parse.urlparse(resp.location).query)
|
||||
assert request.forceAuthn is False
|
||||
|
||||
|
||||
|
@ -359,7 +359,7 @@ def test_saml_login_page_force_authn(pub):
|
|||
assert resp.status_int == 302
|
||||
assert resp.location.startswith('http://sso.example.net/saml2/sso?SAMLRequest=')
|
||||
request = lasso.Samlp2AuthnRequest()
|
||||
request.initFromQuery(urlparse.urlparse(resp.location).query)
|
||||
request.initFromQuery(urllib.parse.urlparse(resp.location).query)
|
||||
assert request.forceAuthn is True
|
||||
|
||||
|
||||
|
@ -380,13 +380,13 @@ def test_saml_backoffice_redirect(pub):
|
|||
assert resp.location.startswith('http://example.net/login/?next=')
|
||||
resp = resp.follow()
|
||||
assert resp.location.startswith('http://sso.example.net/saml2/sso')
|
||||
assert urlparse.parse_qs(urlparse.urlparse(resp.location).query)['SAMLRequest']
|
||||
assert urlparse.parse_qs(urlparse.urlparse(resp.location).query)['RelayState'] == [
|
||||
assert urllib.parse.parse_qs(urllib.parse.urlparse(resp.location).query)['SAMLRequest']
|
||||
assert urllib.parse.parse_qs(urllib.parse.urlparse(resp.location).query)['RelayState'] == [
|
||||
'http://example.net/backoffice/'
|
||||
]
|
||||
|
||||
request = lasso.Samlp2AuthnRequest()
|
||||
request.initFromQuery(urlparse.urlparse(resp.location).query)
|
||||
request.initFromQuery(urllib.parse.urlparse(resp.location).query)
|
||||
assert ':next_url>http://example.net/backoffice/<' in request.getOriginalXmlnode()
|
||||
|
||||
|
||||
|
@ -395,7 +395,7 @@ def test_saml_login_hint(pub):
|
|||
assert resp.status_int == 302
|
||||
assert resp.location.startswith('http://sso.example.net/saml2/sso')
|
||||
request = lasso.Samlp2AuthnRequest()
|
||||
request.initFromQuery(urlparse.urlparse(resp.location).query)
|
||||
request.initFromQuery(urllib.parse.urlparse(resp.location).query)
|
||||
assert 'login-hint' not in request.getOriginalXmlnode()
|
||||
|
||||
resp = get_app(pub).get('/backoffice/')
|
||||
|
@ -404,12 +404,12 @@ def test_saml_login_hint(pub):
|
|||
resp = resp.follow()
|
||||
assert resp.location.startswith('http://sso.example.net/saml2/sso')
|
||||
request = lasso.Samlp2AuthnRequest()
|
||||
request.initFromQuery(urlparse.urlparse(resp.location).query)
|
||||
request.initFromQuery(urllib.parse.urlparse(resp.location).query)
|
||||
assert ':login-hint>backoffice<' in request.getOriginalXmlnode()
|
||||
|
||||
resp = get_app(pub).get('http://example.net/login/?next=/backoffice/')
|
||||
request = lasso.Samlp2AuthnRequest()
|
||||
request.initFromQuery(urlparse.urlparse(resp.location).query)
|
||||
request.initFromQuery(urllib.parse.urlparse(resp.location).query)
|
||||
assert ':login-hint>backoffice<' in request.getOriginalXmlnode()
|
||||
|
||||
|
||||
|
@ -507,7 +507,7 @@ def test_saml_idp_logout(pub):
|
|||
logout.buildRequestMsg()
|
||||
|
||||
# process logout message
|
||||
saml2.slo_idp(urlparse.urlparse(logout.msgUrl).query)
|
||||
saml2.slo_idp(urllib.parse.urlparse(logout.msgUrl).query)
|
||||
assert req.response.headers['location'].startswith(
|
||||
'http://sso.example.net/saml2/slo_return?SAMLResponse='
|
||||
)
|
||||
|
|
|
@ -1,10 +1,10 @@
|
|||
import io
|
||||
import os
|
||||
import shutil
|
||||
import xml.etree.ElementTree as ET
|
||||
|
||||
import pytest
|
||||
|
||||
from django.utils.six import BytesIO
|
||||
from quixote.http_request import Upload
|
||||
|
||||
from wcs.blocks import BlockDef
|
||||
|
@ -410,7 +410,7 @@ def test_workflow_with_model_snapshot_browse(pub):
|
|||
export_to.label = 'test'
|
||||
upload = Upload('/foo/bar', content_type='application/vnd.oasis.opendocument.text')
|
||||
file_content = b'''PK\x03\x04\x14\x00\x00\x08\x00\x00\'l\x8eG^\xc62\x0c\'\x00'''
|
||||
upload.fp = BytesIO()
|
||||
upload.fp = io.BytesIO()
|
||||
upload.fp.write(file_content)
|
||||
upload.fp.seek(0)
|
||||
export_to.model_file = UploadedFile('models', 'tmp', upload)
|
||||
|
|
|
@ -1,12 +1,11 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
import io
|
||||
import pytest
|
||||
import xml.etree.ElementTree as ET
|
||||
|
||||
from quixote.http_request import Upload
|
||||
|
||||
from django.utils.six import BytesIO
|
||||
|
||||
from wcs.carddef import CardDef
|
||||
from wcs.formdef import FormDef
|
||||
from wcs.mail_templates import MailTemplate
|
||||
|
@ -202,7 +201,7 @@ def test_status_actions_named_existing_role(pub):
|
|||
b'<item role_id="2">Test Role named existing role</item>',
|
||||
b'<item>Test Role named existing role</item>',
|
||||
)
|
||||
wf3 = Workflow.import_from_xml_tree(ET.parse(BytesIO(xml_export)))
|
||||
wf3 = Workflow.import_from_xml_tree(ET.parse(io.BytesIO(xml_export)))
|
||||
assert wf3.possible_status[0].items[0].by == ['2']
|
||||
|
||||
|
||||
|
@ -235,7 +234,7 @@ def test_status_actions_named_missing_role(pub):
|
|||
xml_export = xml_export_orig.replace(
|
||||
b'<item role_id="3">Test Role A</item>', b'<item role_id="4">Test Role A</item>'
|
||||
)
|
||||
wf3 = Workflow.import_from_xml_tree(ET.parse(BytesIO(xml_export)))
|
||||
wf3 = Workflow.import_from_xml_tree(ET.parse(io.BytesIO(xml_export)))
|
||||
assert wf3.possible_status[0].items[0].by == ['3']
|
||||
|
||||
# check that it creates a new role if there's no match on id and name
|
||||
|
@ -243,7 +242,7 @@ def test_status_actions_named_missing_role(pub):
|
|||
b'<item role_id="3">Test Role A</item>', b'<item role_id="999">foobar</item>'
|
||||
)
|
||||
nb_roles = Role.count()
|
||||
wf3 = Workflow.import_from_xml_tree(ET.parse(BytesIO(xml_export)))
|
||||
wf3 = Workflow.import_from_xml_tree(ET.parse(io.BytesIO(xml_export)))
|
||||
assert Role.count() == nb_roles + 1
|
||||
|
||||
# check that it doesn't fallback on the id if there's no match on the
|
||||
|
@ -252,12 +251,12 @@ def test_status_actions_named_missing_role(pub):
|
|||
xml_export = xml_export_orig.replace(
|
||||
b'<item role_id="3">Test Role A</item>', b'<item role_id="3">Test Role C</item>'
|
||||
)
|
||||
wf3 = Workflow.import_from_xml_tree(ET.parse(BytesIO(xml_export)))
|
||||
wf3 = Workflow.import_from_xml_tree(ET.parse(io.BytesIO(xml_export)))
|
||||
assert wf3.possible_status[0].items[0].by != ['3']
|
||||
assert Role.count() == nb_roles + 1
|
||||
|
||||
# on the other hand, check that it uses the id when included_id is True
|
||||
wf3 = Workflow.import_from_xml_tree(ET.parse(BytesIO(xml_export)), include_id=True)
|
||||
wf3 = Workflow.import_from_xml_tree(ET.parse(io.BytesIO(xml_export)), include_id=True)
|
||||
assert wf3.possible_status[0].items[0].by == ['3']
|
||||
|
||||
|
||||
|
@ -291,7 +290,7 @@ def test_export_to_model_action(pub):
|
|||
export_to.label = 'test'
|
||||
upload = Upload('/foo/bar', content_type='application/vnd.oasis.opendocument.text')
|
||||
file_content = b'''PK\x03\x04\x14\x00\x00\x08\x00\x00\'l\x8eG^\xc62\x0c\'\x00'''
|
||||
upload.fp = BytesIO()
|
||||
upload.fp = io.BytesIO()
|
||||
upload.fp.write(file_content)
|
||||
upload.fp.seek(0)
|
||||
export_to.model_file = UploadedFile(pub.APP_DIR, None, upload)
|
||||
|
@ -308,7 +307,7 @@ def test_export_to_model_action(pub):
|
|||
export_to.label = 'test'
|
||||
upload = Upload('/foo/bar', content_type='text/rtf')
|
||||
file_content = b''
|
||||
upload.fp = BytesIO()
|
||||
upload.fp = io.BytesIO()
|
||||
upload.fp.write(file_content)
|
||||
upload.fp.seek(0)
|
||||
export_to.model_file = UploadedFile(pub.APP_DIR, None, upload)
|
||||
|
@ -397,7 +396,7 @@ def test_commentable_action(pub):
|
|||
assert b'<required>True</required>' in xml_export
|
||||
xml_export = xml_export.replace(b'<required>True</required>', b'')
|
||||
assert b'<required>True</required>' not in xml_export
|
||||
wf2 = Workflow.import_from_xml_tree(ET.parse(BytesIO(xml_export)))
|
||||
wf2 = Workflow.import_from_xml_tree(ET.parse(io.BytesIO(xml_export)))
|
||||
assert wf2.possible_status[0].items[0].required is False
|
||||
|
||||
commentable.button_label = 'button label'
|
||||
|
@ -930,7 +929,7 @@ def test_unknown_data_source(pub):
|
|||
for wf in [wf1, wf2, wf3]:
|
||||
export = ET.tostring(export_to_indented_xml(wf))
|
||||
with pytest.raises(WorkflowImportError, match='Unknown datasources'):
|
||||
Workflow.import_from_xml(BytesIO(export))
|
||||
Workflow.import_from_xml(io.BytesIO(export))
|
||||
|
||||
# carddef as datasource
|
||||
CardDef.wipe()
|
||||
|
@ -945,7 +944,7 @@ def test_unknown_data_source(pub):
|
|||
|
||||
for wf in [wf1, wf2, wf3]:
|
||||
export = ET.tostring(export_to_indented_xml(wf))
|
||||
Workflow.import_from_xml(BytesIO(export))
|
||||
Workflow.import_from_xml(io.BytesIO(export))
|
||||
|
||||
display_form.formdef.fields[0].data_source = {'type': 'carddef:unknown'}
|
||||
wf2.variables_formdef.fields[0].data_source = {'type': 'carddef:unknown'}
|
||||
|
@ -954,7 +953,7 @@ def test_unknown_data_source(pub):
|
|||
for wf in [wf1, wf2, wf3]:
|
||||
export = ET.tostring(export_to_indented_xml(wf))
|
||||
with pytest.raises(WorkflowImportError, match='Unknown datasources'):
|
||||
Workflow.import_from_xml(BytesIO(export))
|
||||
Workflow.import_from_xml(io.BytesIO(export))
|
||||
|
||||
# carddef custom view as datasource
|
||||
pub.custom_view_class.wipe()
|
||||
|
@ -972,7 +971,7 @@ def test_unknown_data_source(pub):
|
|||
|
||||
for wf in [wf1, wf2, wf3]:
|
||||
export = ET.tostring(export_to_indented_xml(wf))
|
||||
Workflow.import_from_xml(BytesIO(export))
|
||||
Workflow.import_from_xml(io.BytesIO(export))
|
||||
|
||||
display_form.formdef.fields[0].data_source = {'type': 'carddef:foo:unknown'}
|
||||
wf2.variables_formdef.fields[0].data_source = {'type': 'carddef:foo:unknown'}
|
||||
|
@ -981,4 +980,4 @@ def test_unknown_data_source(pub):
|
|||
for wf in [wf1, wf2, wf3]:
|
||||
export = ET.tostring(export_to_indented_xml(wf))
|
||||
with pytest.raises(WorkflowImportError, match='Unknown datasources'):
|
||||
Workflow.import_from_xml(BytesIO(export))
|
||||
Workflow.import_from_xml(io.BytesIO(export))
|
||||
|
|
|
@ -1,10 +1,12 @@
|
|||
import base64
|
||||
import io
|
||||
import json
|
||||
import datetime
|
||||
import os
|
||||
import pytest
|
||||
import shutil
|
||||
import time
|
||||
import urllib.parse
|
||||
import zipfile
|
||||
|
||||
import mock
|
||||
|
@ -14,10 +16,7 @@ try:
|
|||
except ImportError:
|
||||
Image = None
|
||||
|
||||
from django.utils import six
|
||||
from django.utils.encoding import force_bytes, force_text
|
||||
from django.utils.six import BytesIO, StringIO
|
||||
from django.utils.six.moves.urllib import parse as urlparse
|
||||
|
||||
from quixote import cleanup, get_response
|
||||
from wcs.qommon.errors import ConnectionError
|
||||
|
@ -1008,7 +1007,7 @@ def test_register_comment_attachment(pub):
|
|||
shutil.rmtree(os.path.join(get_publisher().app_dir, 'attachments'))
|
||||
|
||||
formdata.evolution[-1].parts = [
|
||||
AttachmentEvolutionPart('hello.txt', fp=BytesIO(b'hello world'), varname='testfile')
|
||||
AttachmentEvolutionPart('hello.txt', fp=io.BytesIO(b'hello world'), varname='testfile')
|
||||
]
|
||||
formdata.store()
|
||||
assert len(os.listdir(os.path.join(get_publisher().app_dir, 'attachments'))) == 1
|
||||
|
@ -1961,7 +1960,7 @@ def test_webservice_call(http_requests, pub):
|
|||
pub.substitutions.feed(formdata)
|
||||
item.perform(formdata)
|
||||
assert http_requests.get_last('method') == 'GET'
|
||||
qs = urlparse.parse_qs(http_requests.get_last('url').split('?')[1])
|
||||
qs = urllib.parse.parse_qs(http_requests.get_last('url').split('?')[1])
|
||||
assert set(qs.keys()) == set(['in_url', 'str', 'one', 'evalme', 'django', 'ezt'])
|
||||
assert qs['in_url'] == ['1', '2']
|
||||
assert qs['one'] == ['1']
|
||||
|
@ -3188,7 +3187,7 @@ def test_geolocate_address(pub):
|
|||
)
|
||||
item.perform(formdata)
|
||||
assert 'https://nominatim.entrouvert.org/search' in http_get_page.call_args[0][0]
|
||||
assert urlparse.quote('169 rue du chateau, paris') in http_get_page.call_args[0][0]
|
||||
assert urllib.parse.quote('169 rue du chateau, paris') in http_get_page.call_args[0][0]
|
||||
assert int(formdata.geolocations['base']['lat']) == 48
|
||||
assert int(formdata.geolocations['base']['lon']) == 2
|
||||
|
||||
|
@ -3203,7 +3202,7 @@ def test_geolocate_address(pub):
|
|||
)
|
||||
item.perform(formdata)
|
||||
assert 'https://nominatim.entrouvert.org/search' in http_get_page.call_args[0][0]
|
||||
assert urlparse.quote('169 rue du chateau, paris') in http_get_page.call_args[0][0]
|
||||
assert urllib.parse.quote('169 rue du chateau, paris') in http_get_page.call_args[0][0]
|
||||
assert 'key=KEY' in http_get_page.call_args[0][0]
|
||||
assert int(formdata.geolocations['base']['lat']) == 48
|
||||
assert int(formdata.geolocations['base']['lon']) == 2
|
||||
|
@ -3435,7 +3434,7 @@ def test_export_to_model_image(pub):
|
|||
template_filename = os.path.join(os.path.dirname(__file__), 'template-with-image.odt')
|
||||
template = open(template_filename, 'rb').read()
|
||||
upload = QuixoteUpload('/foo/template.odt', content_type='application/octet-stream')
|
||||
upload.fp = BytesIO()
|
||||
upload.fp = io.BytesIO()
|
||||
upload.fp.write(template)
|
||||
upload.fp.seek(0)
|
||||
item.model_file = UploadedFile(pub.app_dir, None, upload)
|
||||
|
@ -3499,7 +3498,7 @@ def test_export_to_model_backoffice_field(pub):
|
|||
template_filename = os.path.join(os.path.dirname(__file__), 'template.odt')
|
||||
template = open(template_filename, 'rb').read()
|
||||
upload = QuixoteUpload('/foo/template.odt', content_type='application/octet-stream')
|
||||
upload.fp = BytesIO()
|
||||
upload.fp = io.BytesIO()
|
||||
upload.fp.write(template)
|
||||
upload.fp.seek(0)
|
||||
item.model_file = UploadedFile(pub.app_dir, None, upload)
|
||||
|
@ -3557,7 +3556,7 @@ def test_export_to_model_django_template(pub):
|
|||
template_filename = os.path.join(os.path.dirname(__file__), 'template-django.odt')
|
||||
template = open(template_filename, 'rb').read()
|
||||
upload = QuixoteUpload('/foo/template-django.odt', content_type='application/octet-stream')
|
||||
upload.fp = BytesIO()
|
||||
upload.fp = io.BytesIO()
|
||||
upload.fp.write(template)
|
||||
upload.fp.seek(0)
|
||||
item.model_file = UploadedFile(pub.app_dir, None, upload)
|
||||
|
@ -3629,7 +3628,7 @@ def test_export_to_model_form_details_section(pub, filename):
|
|||
template_filename = os.path.join(os.path.dirname(__file__), filename)
|
||||
template = open(template_filename, 'rb').read()
|
||||
upload = QuixoteUpload(filename, content_type='application/octet-stream')
|
||||
upload.fp = BytesIO()
|
||||
upload.fp = io.BytesIO()
|
||||
upload.fp.write(template)
|
||||
upload.fp.seek(0)
|
||||
item.model_file = UploadedFile(pub.app_dir, None, upload)
|
||||
|
|
|
@ -7,6 +7,7 @@ import random
|
|||
import psycopg2
|
||||
import shutil
|
||||
import sys
|
||||
import urllib.parse
|
||||
|
||||
from wcs import sql, sessions, custom_views
|
||||
|
||||
|
@ -14,7 +15,6 @@ from webtest import TestApp
|
|||
from quixote import cleanup, get_publisher
|
||||
from django.conf import settings
|
||||
from django.utils.encoding import force_bytes, force_text
|
||||
from django.utils.six.moves.urllib import parse as urlparse
|
||||
|
||||
from wcs.qommon import force_str
|
||||
import wcs
|
||||
|
@ -330,8 +330,8 @@ class HttpRequestsMocking(object):
|
|||
{'url': url, 'method': method, 'body': body, 'headers': headers, 'timeout': timeout}
|
||||
)
|
||||
|
||||
scheme, netloc, path, params, query, fragment = urlparse.urlparse(url)
|
||||
base_url = urlparse.urlunparse((scheme, netloc, path, '', '', ''))
|
||||
scheme, netloc, path, params, query, fragment = urllib.parse.urlparse(url)
|
||||
base_url = urllib.parse.urlunparse((scheme, netloc, path, '', '', ''))
|
||||
|
||||
with open(os.path.join(os.path.dirname(__file__), 'idp_metadata.xml')) as fd:
|
||||
metadata = fd.read()
|
||||
|
|
|
@ -16,14 +16,13 @@
|
|||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program; if not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import io
|
||||
import xml.etree.ElementTree as ET
|
||||
import datetime
|
||||
import difflib
|
||||
import tarfile
|
||||
import time
|
||||
|
||||
from django.utils.six import BytesIO, StringIO
|
||||
|
||||
from quixote import get_publisher, get_response, redirect
|
||||
from quixote.directory import Directory, AccessControlled
|
||||
from quixote.html import TemplateIO, htmltext
|
||||
|
@ -1247,7 +1246,7 @@ class FormDefPage(Directory):
|
|||
if form.get_widget('file').parse():
|
||||
fp = form.get_widget('file').parse().fp
|
||||
elif form.get_widget('new_formdef').parse():
|
||||
fp = StringIO(form.get_widget('new_formdef').parse())
|
||||
fp = io.StringIO(form.get_widget('new_formdef').parse())
|
||||
elif form.get_widget('url').parse():
|
||||
url = form.get_widget('url').parse()
|
||||
try:
|
||||
|
@ -1513,7 +1512,7 @@ class FormDefPage(Directory):
|
|||
date = time.strptime(date, misc.date_format())
|
||||
all_forms = [x for x in all_forms if x.last_update_time < date]
|
||||
|
||||
self.fd = BytesIO()
|
||||
self.fd = io.BytesIO()
|
||||
t = tarfile.open('wcs.tar.gz', 'w:gz', fileobj=self.fd)
|
||||
t.add(self.formdef.get_object_filename(), 'formdef')
|
||||
for formdata in all_forms:
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
|
||||
import copy
|
||||
import hashlib
|
||||
import io
|
||||
import mimetypes
|
||||
import os
|
||||
|
||||
|
@ -29,7 +30,6 @@ import shutil
|
|||
import xml.etree.ElementTree as ET
|
||||
|
||||
from django.utils.encoding import force_bytes
|
||||
from django.utils.six import BytesIO, StringIO
|
||||
|
||||
from quixote import get_publisher, get_request, get_response, redirect
|
||||
from quixote.directory import Directory
|
||||
|
@ -814,7 +814,7 @@ class SettingsDirectory(QommonSettingsDirectory):
|
|||
return redirect('themes')
|
||||
|
||||
parent_theme_directory = os.path.dirname(theme_directory)
|
||||
c = BytesIO()
|
||||
c = io.BytesIO()
|
||||
z = zipfile.ZipFile(c, 'w')
|
||||
for base, dirnames, filenames in os.walk(theme_directory):
|
||||
basetheme = base[len(parent_theme_directory) + 1 :]
|
||||
|
@ -877,7 +877,7 @@ class SettingsDirectory(QommonSettingsDirectory):
|
|||
get_session().message = ('error', _('Theme is missing a desc.xml file.'))
|
||||
return redirect('themes')
|
||||
desc_xml = z.read('%s/desc.xml' % theme_name)
|
||||
theme_dict = template.get_theme_dict(StringIO(force_text(desc_xml)))
|
||||
theme_dict = template.get_theme_dict(io.StringIO(force_text(desc_xml)))
|
||||
if theme_dict.get('name') != theme_name:
|
||||
get_session().message = ('error', _('desc.xml is missing a name attribute.'))
|
||||
return redirect('themes')
|
||||
|
@ -901,7 +901,7 @@ class SettingsDirectory(QommonSettingsDirectory):
|
|||
get_session().message = ('error', _('Error loading theme (%s).') % str(e))
|
||||
return redirect('themes')
|
||||
|
||||
return self.install_theme_from_file(StringIO(fp.read()))
|
||||
return self.install_theme_from_file(io.StringIO(fp.read()))
|
||||
|
||||
def template(self):
|
||||
from wcs.qommon.template import get_default_ezt_template
|
||||
|
@ -994,7 +994,7 @@ class SettingsDirectory(QommonSettingsDirectory):
|
|||
self.settings = settings
|
||||
|
||||
def export(self, job):
|
||||
c = BytesIO()
|
||||
c = io.BytesIO()
|
||||
z = zipfile.ZipFile(c, 'w')
|
||||
for d in self.dirs:
|
||||
if d not in (
|
||||
|
|
|
@ -14,8 +14,6 @@
|
|||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program; if not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
from django.utils import six
|
||||
|
||||
from quixote import get_publisher, get_response, get_request, get_session, redirect
|
||||
from quixote.directory import Directory
|
||||
from quixote.html import TemplateIO, htmltext
|
||||
|
@ -327,7 +325,7 @@ class UsersDirectory(Directory):
|
|||
checked_roles = None
|
||||
if get_request().form.get('filter'):
|
||||
checked_roles = get_request().form.get('role', [])
|
||||
if isinstance(checked_roles, six.string_types):
|
||||
if isinstance(checked_roles, str):
|
||||
checked_roles = [checked_roles]
|
||||
|
||||
if checked_roles:
|
||||
|
|
|
@ -18,14 +18,13 @@
|
|||
|
||||
from __future__ import print_function
|
||||
|
||||
import io
|
||||
import time
|
||||
from subprocess import Popen, PIPE
|
||||
import textwrap
|
||||
import xml.etree.ElementTree as ET
|
||||
|
||||
from django.utils import six
|
||||
from django.utils.encoding import force_bytes
|
||||
from django.utils.six import StringIO
|
||||
|
||||
from quixote import redirect, get_publisher, get_response
|
||||
from quixote.directory import Directory
|
||||
|
@ -132,7 +131,7 @@ def graphviz_post_treatment(content, colours, include=False):
|
|||
|
||||
|
||||
def graphviz(workflow, url_prefix='', select=None, svg=True, include=False):
|
||||
out = StringIO()
|
||||
out = io.StringIO()
|
||||
# a list of colours known to graphviz, they will serve as key to get back
|
||||
# to the colours defined in wcs, they are used as color attributes in
|
||||
# graphviz (<= 2.38) then as class attribute on node elements for 2.40 and
|
||||
|
|
|
@ -18,13 +18,13 @@ import datetime
|
|||
import json
|
||||
import re
|
||||
import time
|
||||
import urllib.parse
|
||||
|
||||
from quixote import get_request, get_publisher, get_response, get_session
|
||||
from quixote.errors import MethodNotAllowedError
|
||||
from quixote.directory import Directory
|
||||
|
||||
from django.utils.encoding import force_text
|
||||
from django.utils.six.moves.urllib import parse as urllib
|
||||
from django.http import HttpResponse, HttpResponseBadRequest, JsonResponse
|
||||
|
||||
from .qommon import _
|
||||
|
@ -1019,7 +1019,7 @@ class AutocompleteDirectory(Directory):
|
|||
|
||||
if 'url' in info:
|
||||
url = info['url']
|
||||
url += urllib.quote(get_request().form['q'])
|
||||
url += urllib.parse.quote(get_request().form['q'])
|
||||
url = sign_url_auto_orig(url)
|
||||
get_response().set_content_type('application/json')
|
||||
return misc.urlopen(url).read()
|
||||
|
@ -1113,7 +1113,7 @@ def geocoding(request, *args, **kwargs):
|
|||
url += '&'
|
||||
else:
|
||||
url += '?'
|
||||
url += 'format=json&q=%s' % urllib.quote(q.encode('utf-8'))
|
||||
url += 'format=json&q=%s' % urllib.parse.quote(q.encode('utf-8'))
|
||||
url += '&accept-language=%s' % (get_publisher().get_site_language() or 'en')
|
||||
return HttpResponse(misc.urlopen(url).read(), content_type='application/json')
|
||||
|
||||
|
|
|
@ -20,13 +20,11 @@ import hashlib
|
|||
import datetime
|
||||
import random
|
||||
import os
|
||||
import urllib.parse
|
||||
import errno
|
||||
import calendar
|
||||
|
||||
from django.utils import six
|
||||
from django.utils.encoding import force_bytes, force_text
|
||||
from django.utils.six.moves.urllib import parse as urllib
|
||||
from django.utils.six.moves.urllib import parse as urlparse
|
||||
|
||||
from quixote import get_request, get_publisher
|
||||
from .api_access import ApiAccess
|
||||
|
@ -43,18 +41,18 @@ def is_url_signed(utcnow=None, duration=DEFAULT_DURATION):
|
|||
if not query_string:
|
||||
return False
|
||||
signature = get_request().form.get('signature')
|
||||
if not isinstance(signature, six.string_types):
|
||||
if not isinstance(signature, str):
|
||||
return False
|
||||
signature = force_bytes(signature)
|
||||
# verify signature
|
||||
orig = get_request().form.get('orig')
|
||||
if not isinstance(orig, six.string_types):
|
||||
if not isinstance(orig, str):
|
||||
raise AccessForbiddenError('missing/multiple orig field')
|
||||
key = ApiAccess.get_access_key(orig) or get_publisher().get_site_option(orig, 'api-secrets')
|
||||
if not key:
|
||||
raise AccessForbiddenError('invalid orig')
|
||||
algo = get_request().form.get('algo')
|
||||
if not isinstance(algo, six.string_types):
|
||||
if not isinstance(algo, str):
|
||||
raise AccessForbiddenError('missing/multiple algo field')
|
||||
if algo not in hashlib.algorithms_guaranteed:
|
||||
raise AccessForbiddenError('invalid algo')
|
||||
|
@ -69,7 +67,7 @@ def is_url_signed(utcnow=None, duration=DEFAULT_DURATION):
|
|||
):
|
||||
raise AccessForbiddenError('invalid signature')
|
||||
timestamp = get_request().form.get('timestamp')
|
||||
if not isinstance(timestamp, six.string_types):
|
||||
if not isinstance(timestamp, str):
|
||||
raise AccessForbiddenError('missing/multiple timestamp field')
|
||||
try:
|
||||
timestamp = datetime.datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%SZ')
|
||||
|
@ -133,7 +131,7 @@ def get_user_from_api_query_string(api_name=None):
|
|||
user = None
|
||||
if get_request().form.get('email'):
|
||||
email = get_request().form.get('email')
|
||||
if not isinstance(email, six.string_types):
|
||||
if not isinstance(email, str):
|
||||
raise AccessForbiddenError('multiple email field')
|
||||
users = list(get_publisher().user_class.get_users_with_email(email))
|
||||
if users:
|
||||
|
@ -142,7 +140,7 @@ def get_user_from_api_query_string(api_name=None):
|
|||
raise AccessForbiddenError('unknown email')
|
||||
elif get_request().form.get('NameID'):
|
||||
ni = get_request().form.get('NameID')
|
||||
if not isinstance(ni, six.string_types):
|
||||
if not isinstance(ni, str):
|
||||
raise AccessForbiddenError('multiple NameID field')
|
||||
users = list(get_publisher().user_class.get_users_with_name_identifier(ni))
|
||||
if users:
|
||||
|
@ -158,9 +156,9 @@ def get_user_from_api_query_string(api_name=None):
|
|||
|
||||
|
||||
def sign_url(url, key, algo='sha256', timestamp=None, nonce=None):
|
||||
parsed = urlparse.urlparse(url)
|
||||
parsed = urllib.parse.urlparse(url)
|
||||
new_query = sign_query(parsed.query, key, algo, timestamp, nonce)
|
||||
return urlparse.urlunparse(parsed[:4] + (new_query,) + parsed[5:])
|
||||
return urllib.parse.urlunparse(parsed[:4] + (new_query,) + parsed[5:])
|
||||
|
||||
|
||||
def sign_query(query, key, algo='sha256', timestamp=None, nonce=None):
|
||||
|
@ -173,9 +171,9 @@ def sign_query(query, key, algo='sha256', timestamp=None, nonce=None):
|
|||
new_query = query
|
||||
if new_query:
|
||||
new_query += '&'
|
||||
new_query += urllib.urlencode((('algo', algo), ('timestamp', timestamp), ('nonce', nonce)))
|
||||
new_query += urllib.parse.urlencode((('algo', algo), ('timestamp', timestamp), ('nonce', nonce)))
|
||||
signature = base64.b64encode(sign_string(new_query, key, algo=algo))
|
||||
new_query += '&signature=' + urllib.quote(signature)
|
||||
new_query += '&signature=' + urllib.parse.quote(signature)
|
||||
return new_query
|
||||
|
||||
|
||||
|
@ -191,8 +189,8 @@ class MissingSecret(Exception):
|
|||
|
||||
def get_secret_and_orig(url):
|
||||
frontoffice_url = get_publisher().get_frontoffice_url()
|
||||
orig = urlparse.urlparse(frontoffice_url).netloc.rsplit('@', 1)[-1].rsplit(':', 1)[0]
|
||||
target_orig = urlparse.urlparse(url).netloc.rsplit('@', 1)[-1].rsplit(':', 1)[0]
|
||||
orig = urllib.parse.urlparse(frontoffice_url).netloc.rsplit('@', 1)[-1].rsplit(':', 1)[0]
|
||||
target_orig = urllib.parse.urlparse(url).netloc.rsplit('@', 1)[-1].rsplit(':', 1)[0]
|
||||
secret = get_publisher().get_site_option(target_orig, 'wscall-secrets')
|
||||
if not secret:
|
||||
raise MissingSecret()
|
||||
|
@ -204,11 +202,11 @@ def sign_url_auto_orig(url):
|
|||
signature_key, orig = get_secret_and_orig(url)
|
||||
except MissingSecret:
|
||||
return url
|
||||
parsed = urlparse.urlparse(url)
|
||||
querystring = urlparse.parse_qsl(parsed.query)
|
||||
parsed = urllib.parse.urlparse(url)
|
||||
querystring = urllib.parse.parse_qsl(parsed.query)
|
||||
querystring.append(('orig', orig))
|
||||
querystring = urllib.urlencode(querystring)
|
||||
url = urlparse.urlunparse(parsed[:4] + (querystring,) + parsed[5:6])
|
||||
querystring = urllib.parse.urlencode(querystring)
|
||||
url = urllib.parse.urlunparse(parsed[:4] + (querystring,) + parsed[5:6])
|
||||
return sign_url(url, signature_key)
|
||||
|
||||
|
||||
|
|
|
@ -16,12 +16,12 @@
|
|||
|
||||
import csv
|
||||
import datetime
|
||||
import io
|
||||
|
||||
from quixote import get_publisher, get_request, get_response, redirect
|
||||
from quixote.html import TemplateIO, htmltext, htmlescape
|
||||
|
||||
from django.utils.encoding import force_text
|
||||
from django.utils.six import StringIO
|
||||
|
||||
|
||||
from ..qommon import _, N_
|
||||
|
@ -169,7 +169,7 @@ class CardPage(FormPage):
|
|||
|
||||
def data_sample_csv(self):
|
||||
carddef_fields = self.get_import_csv_fields()
|
||||
output = StringIO()
|
||||
output = io.StringIO()
|
||||
csv_output = csv.writer(output)
|
||||
csv_output.writerow([f.label for f in carddef_fields])
|
||||
sample_line = []
|
||||
|
|
|
@ -16,18 +16,17 @@
|
|||
|
||||
import csv
|
||||
import datetime
|
||||
import io
|
||||
import json
|
||||
import re
|
||||
import time
|
||||
import types
|
||||
import urllib.parse
|
||||
import vobject
|
||||
import zipfile
|
||||
|
||||
from django.conf import settings
|
||||
from django.utils import six
|
||||
from django.utils.encoding import force_text
|
||||
from django.utils.six.moves.urllib import parse as urllib
|
||||
from django.utils.six import BytesIO, StringIO
|
||||
|
||||
from quixote import get_session, get_publisher, get_request, get_response, redirect
|
||||
from quixote.directory import Directory
|
||||
|
@ -904,7 +903,7 @@ class ManagementDirectory(Directory):
|
|||
total_count = sql.AnyFormData.count(criterias)
|
||||
if offset > total_count:
|
||||
get_request().form['offset'] = '0'
|
||||
return redirect('listing?' + urllib.urlencode(get_request().form))
|
||||
return redirect('listing?' + urllib.parse.urlencode(get_request().form))
|
||||
formdatas = sql.AnyFormData.select(criterias, order_by=order_by, limit=limit, offset=offset)
|
||||
include_submission_channel = bool(get_publisher().get_site_option('welco_url', 'variables'))
|
||||
|
||||
|
@ -2937,7 +2936,7 @@ class FormBackOfficeStatusPage(FormStatusPage):
|
|||
|
||||
def download_as_zip(self):
|
||||
formdata = self.filled
|
||||
zip_content = BytesIO()
|
||||
zip_content = io.BytesIO()
|
||||
zip_file = zipfile.ZipFile(zip_content, 'w')
|
||||
counter = {'value': 0}
|
||||
|
||||
|
@ -3281,7 +3280,7 @@ class FormBackOfficeStatusPage(FormStatusPage):
|
|||
else:
|
||||
r += htmltext('<li><code title="%s">%s</code>') % (k, k)
|
||||
r += htmltext(' <div class="value"><span>%s</span>') % ellipsize(safe(v), 10000)
|
||||
if not isinstance(v, six.string_types):
|
||||
if not isinstance(v, str):
|
||||
r += htmltext(' <span class="type">(%r)</span>') % type(v)
|
||||
r += htmltext('</div></li>')
|
||||
r += htmltext('</div>')
|
||||
|
@ -3706,7 +3705,7 @@ class CsvExportAfterJob(AfterJob):
|
|||
return self.create_export(formdef, fields, items, total_count)
|
||||
|
||||
def create_export(self, formdef, fields, items, total_count):
|
||||
output = StringIO()
|
||||
output = io.StringIO()
|
||||
csv_output = csv.writer(output)
|
||||
|
||||
csv_output.writerow(self.csv_tuple_heading(fields))
|
||||
|
@ -3752,7 +3751,7 @@ class OdsExportAfterJob(CsvExportAfterJob):
|
|||
native_value=item['native_value'],
|
||||
)
|
||||
|
||||
output = BytesIO()
|
||||
output = io.BytesIO()
|
||||
workbook.save(output)
|
||||
|
||||
self.file_content = output.getvalue()
|
||||
|
|
|
@ -14,8 +14,6 @@
|
|||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program; if not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
from django.utils import six
|
||||
|
||||
from wcs.formdata import FormData
|
||||
|
||||
|
||||
|
@ -43,7 +41,7 @@ class CardData(FormData):
|
|||
if not field.varname:
|
||||
continue
|
||||
value = self.data and self.data.get(field.id)
|
||||
if isinstance(value, six.string_types):
|
||||
if isinstance(value, str):
|
||||
item[field.varname] = value
|
||||
return item
|
||||
|
||||
|
|
|
@ -16,17 +16,16 @@
|
|||
|
||||
from __future__ import print_function
|
||||
|
||||
import configparser
|
||||
import json
|
||||
import os
|
||||
import random
|
||||
import sys
|
||||
import tempfile
|
||||
import hashlib
|
||||
import urllib.parse
|
||||
|
||||
from django.utils import six
|
||||
from django.utils.encoding import force_bytes, force_text
|
||||
from django.utils.six.moves import configparser as ConfigParser
|
||||
from django.utils.six.moves.urllib import parse as urlparse
|
||||
|
||||
from quixote import cleanup
|
||||
from wcs.qommon import force_str
|
||||
|
@ -365,7 +364,7 @@ class CmdCheckHobos(Command):
|
|||
pub.write_cfg()
|
||||
|
||||
def get_instance_path(self, service):
|
||||
parsed_url = urlparse.urlsplit(service.get('base_url'))
|
||||
parsed_url = urllib.parse.urlsplit(service.get('base_url'))
|
||||
instance_path = parsed_url.netloc
|
||||
if parsed_url.path:
|
||||
instance_path += '+%s' % parsed_url.path.replace('/', '+')
|
||||
|
@ -373,7 +372,7 @@ class CmdCheckHobos(Command):
|
|||
|
||||
def configure_site_options(self, current_service, pub, ignore_timestamp=False):
|
||||
# configure site-options.cfg
|
||||
config = ConfigParser.RawConfigParser()
|
||||
config = configparser.RawConfigParser()
|
||||
site_options_filepath = os.path.join(pub.app_dir, 'site-options.cfg')
|
||||
if os.path.exists(site_options_filepath):
|
||||
config.read(site_options_filepath)
|
||||
|
@ -382,7 +381,7 @@ class CmdCheckHobos(Command):
|
|||
try:
|
||||
if config.get('hobo', 'timestamp') == self.all_services.get('timestamp'):
|
||||
raise NoChange()
|
||||
except (ConfigParser.NoOptionError, ConfigParser.NoSectionError):
|
||||
except (configparser.NoOptionError, configparser.NoSectionError):
|
||||
pass
|
||||
|
||||
if not 'hobo' in config.sections():
|
||||
|
@ -404,7 +403,7 @@ class CmdCheckHobos(Command):
|
|||
if not service.get('secret_key'):
|
||||
continue
|
||||
|
||||
domain = urlparse.urlparse(service_url).netloc.split(':')[0]
|
||||
domain = urllib.parse.urlparse(service_url).netloc.split(':')[0]
|
||||
if service is current_service:
|
||||
if config.has_option('api-secrets', domain):
|
||||
api_secrets[domain] = config.get('api-secrets', domain)
|
||||
|
@ -455,7 +454,7 @@ class CmdCheckHobos(Command):
|
|||
if value is None:
|
||||
config.remove_option('variables', key)
|
||||
continue
|
||||
if not isinstance(value, six.string_types):
|
||||
if not isinstance(value, str):
|
||||
value = str(value)
|
||||
value = force_str(value)
|
||||
config.set('variables', key, value)
|
||||
|
@ -480,7 +479,7 @@ class CmdCheckHobos(Command):
|
|||
|
||||
try:
|
||||
portal_agent_url = config.get('variables', 'portal_agent_url')
|
||||
except ConfigParser.NoOptionError:
|
||||
except configparser.NoOptionError:
|
||||
pass
|
||||
else:
|
||||
if portal_agent_url.endswith('/'):
|
||||
|
@ -531,7 +530,7 @@ class CmdCheckHobos(Command):
|
|||
if not createdb_cfg:
|
||||
createdb_cfg = {}
|
||||
for k, v in pub.cfg['postgresql'].items():
|
||||
if v and isinstance(v, six.string_types):
|
||||
if v and isinstance(v, str):
|
||||
createdb_cfg[k] = v
|
||||
|
||||
try:
|
||||
|
|
|
@ -23,8 +23,6 @@ import psycopg2.errorcodes
|
|||
from datetime import datetime
|
||||
from shutil import rmtree
|
||||
|
||||
from django.utils import six
|
||||
|
||||
from ..qommon.ctl import Command, make_option
|
||||
|
||||
|
||||
|
@ -61,7 +59,7 @@ class CmdDeleteTenant(Command):
|
|||
if pub.is_using_postgresql():
|
||||
postgresql_cfg = {}
|
||||
for k, v in pub.cfg['postgresql'].items():
|
||||
if v and isinstance(v, six.string_types):
|
||||
if v and isinstance(v, str):
|
||||
postgresql_cfg[k] = v
|
||||
|
||||
# if there's a createdb-connection-params, we can do a DROP DATABASE with
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program; if not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import io
|
||||
import os
|
||||
import sys
|
||||
import psycopg2
|
||||
|
@ -22,7 +23,6 @@ import traceback
|
|||
from django.utils.encoding import force_bytes
|
||||
from django.core.management.base import BaseCommand
|
||||
from django.core.management.base import CommandError
|
||||
from django.utils.six import StringIO
|
||||
|
||||
from wcs.qommon.publisher import get_publisher_class
|
||||
|
||||
|
@ -82,7 +82,7 @@ class Command(BaseCommand):
|
|||
self.publisher.site_options.add_section('options')
|
||||
self.publisher.site_options.set('options', 'postgresql', 'true')
|
||||
options_file = os.path.join(self.publisher.app_dir, 'site-options.cfg')
|
||||
stringio = StringIO()
|
||||
stringio = io.StringIO()
|
||||
self.publisher.site_options.write(stringio)
|
||||
atomic_write(options_file, force_bytes(stringio.getvalue()))
|
||||
|
||||
|
|
|
@ -14,9 +14,9 @@
|
|||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program; if not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import urllib.parse
|
||||
import xml.etree.ElementTree as ET
|
||||
|
||||
from django.utils.six.moves.urllib import parse as urlparse
|
||||
from django.utils.encoding import force_text
|
||||
from quixote import get_publisher
|
||||
|
||||
|
@ -73,7 +73,7 @@ class CustomView(StorableObject):
|
|||
return True
|
||||
|
||||
def set_from_qs(self, qs):
|
||||
parsed_qs = urlparse.parse_qsl(qs)
|
||||
parsed_qs = urllib.parse.parse_qsl(qs)
|
||||
self.columns = {
|
||||
'list': [
|
||||
{'id': key} for (key, value) in parsed_qs if value == 'on' and not key.startswith('filter-')
|
||||
|
|
|
@ -16,13 +16,11 @@
|
|||
|
||||
import collections
|
||||
import hashlib
|
||||
import urllib.parse
|
||||
import xml.etree.ElementTree as ET
|
||||
|
||||
from django.template import TemplateSyntaxError, VariableDoesNotExist
|
||||
from django.utils import six
|
||||
from django.utils.encoding import force_text, force_bytes
|
||||
from django.utils.six.moves.urllib import parse as urllib
|
||||
from django.utils.six.moves.urllib import parse as urlparse
|
||||
|
||||
from quixote import get_publisher, get_request, get_session
|
||||
from quixote.html import TemplateIO
|
||||
|
@ -288,7 +286,7 @@ def get_structured_items(data_source, mode=None):
|
|||
elif len(value[0]) == 1:
|
||||
return [{'id': x[0], 'text': x[0]} for x in value]
|
||||
return value
|
||||
elif isinstance(value[0], six.string_types):
|
||||
elif isinstance(value[0], str):
|
||||
return [{'id': x, 'text': x} for x in value]
|
||||
return value
|
||||
except:
|
||||
|
@ -619,7 +617,7 @@ class NamedDataSource(XmlStorableObject):
|
|||
url += '?'
|
||||
else:
|
||||
url += '&'
|
||||
url += param_name + '=' + urllib.quote(param_value)
|
||||
url += param_name + '=' + urllib.parse.quote(param_value)
|
||||
|
||||
def find_item(items, name, value):
|
||||
for item in items:
|
||||
|
@ -769,7 +767,7 @@ def has_chrono(publisher):
|
|||
|
||||
def chrono_url(publisher, url):
|
||||
chrono_url = publisher.get_site_option('chrono_url')
|
||||
return urlparse.urljoin(chrono_url, url)
|
||||
return urllib.parse.urljoin(chrono_url, url)
|
||||
|
||||
|
||||
def collect_agenda_data(publisher):
|
||||
|
|
|
@ -31,7 +31,6 @@ import collections
|
|||
from quixote import get_request, get_publisher
|
||||
from quixote.html import htmltag, htmltext, TemplateIO
|
||||
|
||||
from django.utils import six
|
||||
from django.utils.encoding import force_bytes, force_text, smart_text
|
||||
from django.utils.formats import date_format as django_date_format
|
||||
from django.utils.html import urlize
|
||||
|
@ -313,7 +312,7 @@ class Field(object):
|
|||
atname = 'item'
|
||||
for v in val:
|
||||
ET.SubElement(el, atname).text = force_text(v, charset, errors='replace')
|
||||
elif isinstance(val, six.string_types):
|
||||
elif isinstance(val, str):
|
||||
el.text = force_text(val, charset, errors='replace')
|
||||
else:
|
||||
el.text = str(val)
|
||||
|
@ -1039,7 +1038,7 @@ class StringField(WidgetField):
|
|||
|
||||
def migrate(self):
|
||||
changed = super(StringField, self).migrate()
|
||||
if isinstance(self.validation, six.string_types):
|
||||
if isinstance(self.validation, str):
|
||||
self.validation = {'type': 'regex', 'value': self.validation}
|
||||
changed = True
|
||||
return changed
|
||||
|
@ -2498,7 +2497,7 @@ class PageField(Field):
|
|||
|
||||
def migrate(self):
|
||||
changed = super(PageField, self).migrate()
|
||||
if isinstance(self.condition, six.string_types):
|
||||
if isinstance(self.condition, str):
|
||||
if self.condition:
|
||||
self.condition = {'type': 'python', 'value': self.condition}
|
||||
else:
|
||||
|
@ -2506,7 +2505,7 @@ class PageField(Field):
|
|||
changed = True
|
||||
for post_condition in self.post_conditions or []:
|
||||
condition = post_condition.get('condition')
|
||||
if isinstance(condition, six.string_types):
|
||||
if isinstance(condition, str):
|
||||
if condition:
|
||||
post_condition['condition'] = {'type': 'python', 'value': condition}
|
||||
else:
|
||||
|
|
|
@ -22,8 +22,6 @@ import re
|
|||
import sys
|
||||
import time
|
||||
|
||||
from django.utils import six
|
||||
|
||||
from quixote import get_request, get_publisher, get_session
|
||||
from quixote.http_request import Upload
|
||||
|
||||
|
@ -438,11 +436,7 @@ class FormData(StorableObject):
|
|||
if field.prefill and field.prefill.get('type') == 'user':
|
||||
form_user_data[field.prefill['value']] = self.data.get(field.id)
|
||||
user_label = ' '.join(
|
||||
[
|
||||
form_user_data.get(x)
|
||||
for x in field_name_values
|
||||
if isinstance(form_user_data.get(x), six.string_types)
|
||||
]
|
||||
[form_user_data.get(x) for x in field_name_values if isinstance(form_user_data.get(x), str)]
|
||||
)
|
||||
if user_label != self.user_label:
|
||||
self.user_label = user_label
|
||||
|
|
|
@ -26,7 +26,6 @@ import json
|
|||
import xml.etree.ElementTree as ET
|
||||
import datetime
|
||||
|
||||
from django.utils import six
|
||||
from django.utils.encoding import force_bytes, force_text
|
||||
|
||||
from quixote import get_request, get_publisher
|
||||
|
@ -839,7 +838,7 @@ class FormDef(StorableObject):
|
|||
return dict([(unicode2str(k), unicode2str(v)) for k, v in v.items()])
|
||||
elif isinstance(v, list):
|
||||
return [unicode2str(x) for x in v]
|
||||
elif isinstance(v, six.string_types):
|
||||
elif isinstance(v, str):
|
||||
return force_str(v)
|
||||
else:
|
||||
return v
|
||||
|
@ -869,7 +868,7 @@ class FormDef(StorableObject):
|
|||
):
|
||||
formdef.workflow_id = value['workflow'].get('id')
|
||||
elif 'workflow' in value:
|
||||
if isinstance(value['workflow'], six.string_types):
|
||||
if isinstance(value['workflow'], str):
|
||||
workflow = value.get('workflow')
|
||||
else:
|
||||
workflow = value['workflow'].get('name')
|
||||
|
@ -1009,7 +1008,7 @@ class FormDef(StorableObject):
|
|||
element = ET.SubElement(options, 'option')
|
||||
element.attrib['varname'] = option
|
||||
option_value = self.workflow_options.get(option)
|
||||
if isinstance(option_value, six.string_types):
|
||||
if isinstance(option_value, str):
|
||||
element.text = force_text(self.workflow_options.get(option, ''), charset)
|
||||
elif hasattr(option_value, 'base_filename'):
|
||||
ET.SubElement(element, 'filename').text = option_value.base_filename
|
||||
|
|
|
@ -14,7 +14,7 @@
|
|||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program; if not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
from django.utils.six.moves.urllib import parse as urllib
|
||||
import urllib.parse
|
||||
|
||||
from quixote import get_request, get_publisher, get_session, redirect
|
||||
from quixote.html import htmltext, TemplateIO
|
||||
|
@ -63,7 +63,7 @@ class FormDefUI(object):
|
|||
|
||||
if offset > total_count:
|
||||
get_request().form['offset'] = '0'
|
||||
return redirect('?' + urllib.urlencode(get_request().form))
|
||||
return redirect('?' + urllib.parse.urlencode(get_request().form))
|
||||
|
||||
r = TemplateIO(html=True)
|
||||
|
||||
|
|
|
@ -15,8 +15,7 @@
|
|||
# along with this program; if not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import time
|
||||
|
||||
from django.utils.six.moves.urllib import parse as urllib
|
||||
import urllib.parse
|
||||
|
||||
from quixote import get_publisher, get_request, get_response, get_session, redirect
|
||||
from quixote.directory import Directory
|
||||
|
@ -73,7 +72,7 @@ class FileDirectory(Directory):
|
|||
# no such file
|
||||
raise errors.TraversalError()
|
||||
|
||||
if component and component not in (file.base_filename, urllib.quote(file.base_filename)):
|
||||
if component and component not in (file.base_filename, urllib.parse.quote(file.base_filename)):
|
||||
raise errors.TraversalError()
|
||||
|
||||
if file.has_redirect_url():
|
||||
|
@ -688,7 +687,7 @@ class FormStatusPage(Directory, FormTemplateMixin):
|
|||
raise errors.TraversalError()
|
||||
|
||||
if getattr(file, 'base_filename'):
|
||||
file_url += urllib.quote(file.base_filename)
|
||||
file_url += urllib.parse.quote(file.base_filename)
|
||||
return redirect(file_url)
|
||||
|
||||
@classmethod
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
# along with this program; if not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import copy
|
||||
import io
|
||||
import json
|
||||
import time
|
||||
|
||||
|
@ -23,9 +24,7 @@ try:
|
|||
except ImportError:
|
||||
qrcode = None
|
||||
|
||||
from django.utils import six
|
||||
from django.utils.http import quote
|
||||
from django.utils.six import BytesIO
|
||||
from django.utils.safestring import mark_safe
|
||||
|
||||
import ratelimit.utils
|
||||
|
@ -1164,7 +1163,7 @@ class FormPage(Directory, FormTemplateMixin):
|
|||
continue
|
||||
v, locked = field.get_prefill_value(user=prefill_user)
|
||||
if locked:
|
||||
if not isinstance(v, six.string_types) and field.convert_value_to_str:
|
||||
if not isinstance(v, str) and field.convert_value_to_str:
|
||||
# convert structured data to strings as if they were
|
||||
# submitted by the browser.
|
||||
v = field.convert_value_to_str(v)
|
||||
|
@ -1522,7 +1521,7 @@ class FormPage(Directory, FormTemplateMixin):
|
|||
|
||||
def qrcode(self):
|
||||
img = qrcode.make(self.formdef.get_url())
|
||||
s = BytesIO()
|
||||
s = io.BytesIO()
|
||||
img.save(s)
|
||||
if get_request().get_query() == 'download':
|
||||
get_response().set_header(
|
||||
|
|
|
@ -17,10 +17,10 @@
|
|||
import json
|
||||
import threading
|
||||
import time
|
||||
import urllib.parse
|
||||
|
||||
from django.http import HttpResponseBadRequest, HttpResponseRedirect
|
||||
from django.utils.deprecation import MiddlewareMixin
|
||||
from django.utils.six.moves.urllib import parse as urllib
|
||||
|
||||
from quixote import get_publisher
|
||||
from quixote.errors import RequestError
|
||||
|
@ -80,7 +80,7 @@ class PublisherInitialisationMiddleware(MiddlewareMixin):
|
|||
pub.finish_successful_request() # commits session
|
||||
new_query_string = ''
|
||||
if compat_request.form:
|
||||
new_query_string = '?' + urllib.urlencode(compat_request.form)
|
||||
new_query_string = '?' + urllib.parse.urlencode(compat_request.form)
|
||||
response = HttpResponseRedirect(compat_request.get_path() + new_query_string)
|
||||
for name, value in compat_request.response.generate_headers():
|
||||
if name == 'Content-Length':
|
||||
|
|
|
@ -16,11 +16,10 @@
|
|||
|
||||
import threading
|
||||
import types
|
||||
import urllib.parse
|
||||
|
||||
import django.template.base
|
||||
import django.template.defaulttags
|
||||
from django.utils import six
|
||||
from django.utils.six.moves.urllib import parse as urlparse
|
||||
|
||||
import quixote
|
||||
import quixote.publish
|
||||
|
@ -78,7 +77,7 @@ def redirect(location, permanent=False):
|
|||
not honor the redirect).
|
||||
"""
|
||||
request = _thread_local.publisher.get_request()
|
||||
location = urlparse.urljoin(request.get_url(), str(location))
|
||||
location = urllib.parse.urljoin(request.get_url(), str(location))
|
||||
return request.response.redirect(location, permanent)
|
||||
|
||||
|
||||
|
@ -105,7 +104,7 @@ def cleanup():
|
|||
|
||||
|
||||
for key, value in list(locals().items()):
|
||||
if type(value) in (types.FunctionType,) + six.class_types:
|
||||
if type(value) in (types.FunctionType, type):
|
||||
setattr(quixote, key, value)
|
||||
setattr(quixote.publish, key, value)
|
||||
|
||||
|
|
|
@ -17,10 +17,9 @@
|
|||
import json
|
||||
import hashlib
|
||||
import base64
|
||||
import urllib.parse
|
||||
|
||||
from django.utils.encoding import force_text
|
||||
from django.utils.six.moves.urllib import parse as urllib
|
||||
from django.utils.six.moves.urllib import parse as urlparse
|
||||
|
||||
from .qommon import N_, get_logger
|
||||
from .qommon.misc import http_get_page, json_loads, http_post_request, urlopen
|
||||
|
@ -37,7 +36,7 @@ def has_portfolio():
|
|||
|
||||
def fargo_url(url):
|
||||
fargo_url = get_publisher().get_site_option('fargo_url')
|
||||
url = urlparse.urljoin(fargo_url, url)
|
||||
url = urllib.parse.urljoin(fargo_url, url)
|
||||
secret, orig = get_secret_and_orig(url)
|
||||
if '?' in url:
|
||||
url += '&orig=%s' % orig
|
||||
|
@ -70,7 +69,7 @@ def push_document(user, filename, stream):
|
|||
payload['user_nameid'] = force_text(user.name_identifiers[0], 'ascii')
|
||||
elif user.email:
|
||||
payload['user_email'] = force_text(user.email, 'ascii')
|
||||
payload['origin'] = urlparse.urlparse(get_publisher().get_frontoffice_url()).netloc
|
||||
payload['origin'] = urllib.parse.urlparse(get_publisher().get_frontoffice_url()).netloc
|
||||
payload['file_name'] = force_text(filename, charset)
|
||||
stream.seek(0)
|
||||
payload['file_b64_content'] = force_text(base64.b64encode(stream.read()))
|
||||
|
@ -108,9 +107,9 @@ class FargoDirectory(Directory):
|
|||
# FIXME: handle error cases
|
||||
url = request.form['url']
|
||||
document = urlopen(request.form['url']).read()
|
||||
scheme, netloc, path, qs, frag = urlparse.urlsplit(url)
|
||||
scheme, netloc, path, qs, frag = urllib.parse.urlsplit(url)
|
||||
path = path.split('/')
|
||||
name = urllib.unquote(path[-1])
|
||||
name = urllib.parse.unquote(path[-1])
|
||||
from .qommon.form import PicklableUpload
|
||||
|
||||
download = PicklableUpload(name, content_type='application/pdf')
|
||||
|
@ -122,7 +121,7 @@ class FargoDirectory(Directory):
|
|||
frontoffice_url = get_publisher().get_frontoffice_url()
|
||||
self_url = frontoffice_url
|
||||
self_url += '/fargo/pick'
|
||||
return redirect('%spick/?pick=%s' % (self.fargo_url, urllib.quote(self_url)))
|
||||
return redirect('%spick/?pick=%s' % (self.fargo_url, urllib.parse.quote(self_url)))
|
||||
|
||||
def set_token(self, token, title):
|
||||
get_response().add_javascript(['jquery.js'])
|
||||
|
|
|
@ -18,6 +18,7 @@ from contextlib import contextmanager
|
|||
import decimal
|
||||
import json
|
||||
import os
|
||||
import pickle
|
||||
import random
|
||||
import re
|
||||
import socket
|
||||
|
@ -25,10 +26,8 @@ import sys
|
|||
import traceback
|
||||
import zipfile
|
||||
|
||||
from django.utils import six
|
||||
from django.utils.encoding import force_text
|
||||
from django.utils.formats import localize
|
||||
from django.utils.six.moves import cPickle
|
||||
|
||||
from .Defaults import *
|
||||
|
||||
|
@ -194,7 +193,7 @@ class WcsPublisher(StubWcsPublisher):
|
|||
def _decode_list(data):
|
||||
rv = []
|
||||
for item in data:
|
||||
if isinstance(item, six.string_types):
|
||||
if isinstance(item, str):
|
||||
item = force_str(item)
|
||||
elif isinstance(item, list):
|
||||
item = _decode_list(item)
|
||||
|
@ -207,7 +206,7 @@ class WcsPublisher(StubWcsPublisher):
|
|||
rv = {}
|
||||
for key, value in data.items():
|
||||
key = force_str(key)
|
||||
if isinstance(value, six.string_types):
|
||||
if isinstance(value, str):
|
||||
value = force_str(value)
|
||||
elif isinstance(value, list):
|
||||
value = _decode_list(value)
|
||||
|
@ -231,7 +230,7 @@ class WcsPublisher(StubWcsPublisher):
|
|||
if f in ('config.pck', 'config.json'):
|
||||
results['settings'] = 1
|
||||
if f == 'config.pck':
|
||||
d = cPickle.loads(data)
|
||||
d = pickle.loads(data)
|
||||
else:
|
||||
d = json.loads(force_text(data), object_hook=_decode_dict)
|
||||
if 'sp' in self.cfg:
|
||||
|
|
|
@ -14,14 +14,13 @@
|
|||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program; if not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import configparser
|
||||
import copy
|
||||
import threading
|
||||
|
||||
import django.apps
|
||||
from django.conf import settings
|
||||
from django.utils import six
|
||||
from django.utils.encoding import force_text
|
||||
from django.utils.six.moves import configparser as ConfigParser
|
||||
|
||||
from quixote import get_publisher
|
||||
|
||||
|
@ -153,7 +152,7 @@ class AppConfig(django.apps.AppConfig):
|
|||
name = 'wcs.qommon'
|
||||
|
||||
def ready(self):
|
||||
config = ConfigParser.ConfigParser()
|
||||
config = configparser.ConfigParser()
|
||||
if settings.WCS_LEGACY_CONFIG_FILE:
|
||||
config.read(settings.WCS_LEGACY_CONFIG_FILE)
|
||||
if hasattr(settings, 'WCS_EXTRA_MODULES') and settings.WCS_EXTRA_MODULES:
|
||||
|
|
|
@ -14,7 +14,8 @@
|
|||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program; if not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
from django.utils.six.moves.urllib import parse as urllib
|
||||
import urllib.parse
|
||||
|
||||
from quixote.html import htmltext, TemplateIO
|
||||
from quixote import get_request, get_response, get_publisher
|
||||
|
||||
|
@ -41,7 +42,7 @@ def pagination_links(offset, limit, total_count, load_js=True):
|
|||
query['limit'] = limit
|
||||
r += htmltext(
|
||||
'<a class="previous-page" data-limit="%s" data-offset="%s" href="?%s"><!--%s--></a>'
|
||||
) % (limit, query['offset'], urllib.urlencode(query, doseq=1), _('Previous Page'))
|
||||
) % (limit, query['offset'], urllib.parse.urlencode(query, doseq=1), _('Previous Page'))
|
||||
else:
|
||||
r += htmltext('<span class="previous-page"><!--%s--></span>') % _('Previous Page')
|
||||
|
||||
|
@ -77,7 +78,7 @@ def pagination_links(offset, limit, total_count, load_js=True):
|
|||
klass,
|
||||
limit,
|
||||
query['offset'],
|
||||
urllib.urlencode(query, doseq=1),
|
||||
urllib.parse.urlencode(query, doseq=1),
|
||||
page_number,
|
||||
)
|
||||
r += htmltext('</span>') # <!-- .pages -->
|
||||
|
@ -91,7 +92,7 @@ def pagination_links(offset, limit, total_count, load_js=True):
|
|||
r += htmltext('<a class="next-page" data-limit="%s" data-offset="%s" href="?%s"><!--%s--></a>') % (
|
||||
limit,
|
||||
query['offset'],
|
||||
urllib.urlencode(query, doseq=1),
|
||||
urllib.parse.urlencode(query, doseq=1),
|
||||
_('Next Page'),
|
||||
)
|
||||
else:
|
||||
|
@ -112,7 +113,7 @@ def pagination_links(offset, limit, total_count, load_js=True):
|
|||
else:
|
||||
r += htmltext('<a data-limit="%s" data-offset="0"" href="?%s">%s</a>') % (
|
||||
page_size,
|
||||
urllib.urlencode(query, doseq=1),
|
||||
urllib.parse.urlencode(query, doseq=1),
|
||||
page_size,
|
||||
)
|
||||
if page_size >= total_count:
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
|
||||
from __future__ import print_function
|
||||
|
||||
import configparser
|
||||
import optparse
|
||||
from optparse import make_option
|
||||
import sys
|
||||
|
@ -25,8 +26,6 @@ __all__ = [
|
|||
'Command',
|
||||
]
|
||||
|
||||
from django.utils.six.moves import configparser as ConfigParser
|
||||
|
||||
from wcs import qommon
|
||||
from . import _
|
||||
|
||||
|
@ -39,7 +38,7 @@ class Command(object):
|
|||
usage_args = '[ options ... ]'
|
||||
|
||||
def __init__(self, options=[]):
|
||||
self.config = ConfigParser.ConfigParser()
|
||||
self.config = configparser.ConfigParser()
|
||||
self.options = options + [
|
||||
make_option('--extra', metavar='DIR', action='append', dest='extra', default=[]),
|
||||
make_option('--app-dir', metavar='DIR', action='store', dest='app_dir', default=None),
|
||||
|
@ -53,7 +52,7 @@ class Command(object):
|
|||
sys.exit(1)
|
||||
try:
|
||||
self.config.read(base_options.configfile)
|
||||
except ConfigParser.ParsingError as e:
|
||||
except configparser.ParsingError as e:
|
||||
print('Invalid configuration file %s' % base_options.configfile, file=sys.stderr)
|
||||
print(e, file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
|
|
@ -14,7 +14,7 @@
|
|||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program; if not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
from django.utils.six.moves.urllib import parse as urllib
|
||||
import urllib.parse
|
||||
|
||||
from quixote import get_publisher
|
||||
import quixote
|
||||
|
@ -61,7 +61,7 @@ class AccessUnauthorizedError(AccessForbiddenError):
|
|||
if self.public_msg:
|
||||
session.message = ('error', self.public_msg)
|
||||
login_url = get_publisher().get_root_url() + 'login/'
|
||||
login_url += '?' + urllib.urlencode({'next': request.get_frontoffice_url()})
|
||||
login_url += '?' + urllib.parse.urlencode({'next': request.get_frontoffice_url()})
|
||||
return quixote.redirect(login_url)
|
||||
|
||||
|
||||
|
|
|
@ -21,10 +21,10 @@ get_global_eval_dict.
|
|||
"""
|
||||
import base64
|
||||
import datetime
|
||||
import io
|
||||
import time
|
||||
|
||||
from django.utils.encoding import force_bytes
|
||||
from django.utils.six import BytesIO
|
||||
|
||||
from wcs.qommon import force_str
|
||||
from wcs.qommon.upload_storage import UploadStorage
|
||||
|
@ -155,13 +155,13 @@ def attachment(content, filename='', content_type=None, strip_metadata=False):
|
|||
|
||||
if strip_metadata and Image:
|
||||
try:
|
||||
image = Image.open(BytesIO(upload.get_content()))
|
||||
image = Image.open(io.BytesIO(upload.get_content()))
|
||||
except OSError:
|
||||
pass
|
||||
else:
|
||||
image_without_exif = Image.new(image.mode, image.size)
|
||||
image_without_exif.putdata(image.getdata())
|
||||
content = BytesIO()
|
||||
content = io.BytesIO()
|
||||
image_without_exif.save(content, image.format)
|
||||
upload = FileField.convert_value_from_anything(
|
||||
{
|
||||
|
|
|
@ -225,10 +225,9 @@ Directives
|
|||
|
||||
import datetime
|
||||
import html
|
||||
import io
|
||||
import re
|
||||
import os
|
||||
from django.utils import six
|
||||
from django.utils.six import StringIO as cStringIO
|
||||
|
||||
#
|
||||
# Formatting types
|
||||
|
@ -472,7 +471,7 @@ class Template:
|
|||
to the file object 'fp' and functions are called.
|
||||
"""
|
||||
for step in program:
|
||||
if isinstance(step, six.string_types):
|
||||
if isinstance(step, str):
|
||||
fp.write(step)
|
||||
else:
|
||||
step[0](step[1], fp, ctx)
|
||||
|
@ -562,7 +561,7 @@ class Template:
|
|||
list = _get_value(valref, ctx)
|
||||
except UnknownReference:
|
||||
return
|
||||
if isinstance(list, six.string_types):
|
||||
if isinstance(list, str):
|
||||
raise NeedSequenceError()
|
||||
refname = valref[0]
|
||||
ctx.for_index[refname] = idx = [list, 0]
|
||||
|
@ -573,7 +572,7 @@ class Template:
|
|||
|
||||
def _cmd_define(self, args, fp, ctx):
|
||||
((name,), unused, section) = args
|
||||
valfp = cStringIO.StringIO()
|
||||
valfp = io.StringIO()
|
||||
if section is not None:
|
||||
self._execute(section, valfp, ctx)
|
||||
ctx.defines[name] = valfp.getvalue()
|
||||
|
@ -673,7 +672,7 @@ def _get_value(value_ref, ctx):
|
|||
raise UnknownReference(refname)
|
||||
|
||||
# make sure we return a string instead of some various Python types
|
||||
if isinstance(ob, six.integer_types + (float,)):
|
||||
if isinstance(ob, (int, float)):
|
||||
return str(ob)
|
||||
if ob is None:
|
||||
return ''
|
||||
|
|
|
@ -64,7 +64,6 @@ from quixote.http_request import Upload
|
|||
from quixote.util import randbytes
|
||||
|
||||
from django.utils.encoding import force_bytes, force_text
|
||||
from django.utils import six
|
||||
|
||||
from django.conf import settings
|
||||
from django.utils.safestring import mark_safe
|
||||
|
|
|
@ -19,7 +19,6 @@ import copy
|
|||
import re
|
||||
import time
|
||||
|
||||
from django.utils import six
|
||||
from django.utils.encoding import force_bytes, force_text
|
||||
from quixote import get_session, get_publisher
|
||||
import quixote.http_request
|
||||
|
|
|
@ -17,10 +17,10 @@
|
|||
import base64
|
||||
import hashlib
|
||||
import sys
|
||||
import urllib.parse
|
||||
import uuid
|
||||
|
||||
from django.utils.encoding import force_bytes
|
||||
from django.utils.six.moves.urllib import parse as urllib
|
||||
|
||||
from quixote import redirect, get_session, get_publisher, get_request, get_session_manager
|
||||
from quixote.directory import Directory
|
||||
|
@ -313,7 +313,7 @@ class FCAuthMethod(AuthMethod):
|
|||
|
||||
nonce = hashlib.sha256(force_bytes(session.id)).hexdigest()
|
||||
fc_callback = pub.get_frontoffice_url() + '/ident/fc/callback'
|
||||
qs = urllib.urlencode(
|
||||
qs = urllib.parse.urlencode(
|
||||
{
|
||||
'response_type': 'code',
|
||||
'client_id': client_id,
|
||||
|
@ -345,7 +345,7 @@ class FCAuthMethod(AuthMethod):
|
|||
}
|
||||
response, status, data, auth_header = http_post_request(
|
||||
self.get_token_url(),
|
||||
urllib.urlencode(body),
|
||||
urllib.parse.urlencode(body),
|
||||
headers={
|
||||
'Content-Type': 'application/x-www-form-urlencoded',
|
||||
},
|
||||
|
@ -519,7 +519,7 @@ class FCAuthMethod(AuthMethod):
|
|||
get_session_manager().expire_session()
|
||||
logout_url = self.get_logout_url()
|
||||
post_logout_redirect_uri = get_publisher().get_frontoffice_url()
|
||||
logout_url += '?' + urllib.urlencode(
|
||||
logout_url += '?' + urllib.parse.urlencode(
|
||||
{
|
||||
'id_token_hint': id_token,
|
||||
'post_logout_redirect_uri': post_logout_redirect_uri,
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
# along with this program; if not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import re
|
||||
import urllib.parse
|
||||
|
||||
try:
|
||||
import lasso
|
||||
|
@ -22,8 +23,6 @@ except ImportError:
|
|||
lasso = None
|
||||
|
||||
from django.utils.encoding import force_bytes, force_text
|
||||
from django.utils.six.moves.urllib import parse as urllib
|
||||
from django.utils.six.moves.urllib import parse as urlparse
|
||||
|
||||
from quixote.directory import Directory
|
||||
from quixote import redirect, get_session, get_response, get_publisher
|
||||
|
@ -159,7 +158,7 @@ class MethodDirectory(Directory):
|
|||
login_url = get_publisher().get_root_url() + 'login/idp/'
|
||||
else:
|
||||
login_url = get_publisher().get_root_url() + 'login/'
|
||||
login_url += '?' + urllib.urlencode({'next': get_request().get_frontoffice_url()})
|
||||
login_url += '?' + urllib.parse.urlencode({'next': get_request().get_frontoffice_url()})
|
||||
return redirect(login_url)
|
||||
|
||||
if get_request().user:
|
||||
|
@ -1066,7 +1065,7 @@ class MethodAdminDirectory(Directory):
|
|||
new_domain = None
|
||||
|
||||
if old_common_domain_getter_url:
|
||||
old_domain = urlparse.urlparse(old_common_domain_getter_url)[1]
|
||||
old_domain = urllib.parse.urlparse(old_common_domain_getter_url)[1]
|
||||
if ':' in old_domain:
|
||||
old_domain = old_domain.split(':')[0]
|
||||
old_domain_dir = os.path.normpath(os.path.join(dir, '..', old_domain))
|
||||
|
@ -1077,7 +1076,7 @@ class MethodAdminDirectory(Directory):
|
|||
# bad luck, but ignore this
|
||||
pass
|
||||
if new_common_domain_getter_url:
|
||||
new_domain = urlparse.urlparse(new_common_domain_getter_url)[1]
|
||||
new_domain = urllib.parse.urlparse(new_common_domain_getter_url)[1]
|
||||
if ':' in new_domain:
|
||||
new_domain = new_domain.split(':')[0]
|
||||
new_domain_dir = os.path.normpath(os.path.join(dir, '..', new_domain))
|
||||
|
|
|
@ -17,8 +17,6 @@
|
|||
import logging
|
||||
import os
|
||||
|
||||
from django.utils import six
|
||||
|
||||
from quixote import get_publisher, get_session, get_request
|
||||
from quixote.logger import DefaultLogger
|
||||
|
||||
|
@ -78,7 +76,7 @@ class Formatter(logging.Formatter):
|
|||
|
||||
user = request.user
|
||||
if user:
|
||||
if isinstance(user, six.string_types + six.integer_types):
|
||||
if isinstance(user, (str, int)):
|
||||
user_id = user
|
||||
else:
|
||||
user_id = user.id
|
||||
|
|
|
@ -18,9 +18,11 @@ import datetime
|
|||
import decimal
|
||||
import calendar
|
||||
import html
|
||||
import io
|
||||
import re
|
||||
import os
|
||||
import time
|
||||
import urllib.parse
|
||||
import base64
|
||||
import json
|
||||
import subprocess
|
||||
|
@ -37,13 +39,10 @@ except ImportError:
|
|||
|
||||
from django.conf import settings
|
||||
from django.utils import datetime_safe
|
||||
from django.utils import six
|
||||
from django.utils.encoding import force_text
|
||||
from django.utils.formats import localize
|
||||
from django.utils.html import strip_tags
|
||||
from django.template import TemplateSyntaxError, VariableDoesNotExist
|
||||
from django.utils.six.moves.urllib.parse import quote, urlencode
|
||||
from django.utils.six.moves.urllib import parse as urlparse
|
||||
from django.utils.text import Truncator
|
||||
|
||||
from quixote import get_publisher, get_response, get_request, redirect
|
||||
|
@ -54,8 +53,6 @@ from . import get_cfg, get_logger, ezt
|
|||
from .errors import ConnectionError, RequestError
|
||||
from .template import Template
|
||||
|
||||
from django.utils.six import BytesIO, StringIO
|
||||
|
||||
try:
|
||||
subprocess.check_call(['which', 'pdftoppm'], stdout=subprocess.DEVNULL)
|
||||
HAS_PDFTOPPM = True
|
||||
|
@ -167,7 +164,7 @@ def get_provider_key(provider_id):
|
|||
def simplify(s, space='-'):
|
||||
if s is None:
|
||||
return ''
|
||||
if not isinstance(s, six.text_type):
|
||||
if not isinstance(s, str):
|
||||
if get_publisher() and get_publisher().site_charset:
|
||||
s = force_text('%s' % s, get_publisher().site_charset, errors='ignore')
|
||||
else:
|
||||
|
@ -269,7 +266,7 @@ def site_encode(s):
|
|||
return None
|
||||
if isinstance(s, str):
|
||||
return s
|
||||
if not isinstance(s, six.string_types):
|
||||
if not isinstance(s, str):
|
||||
s = force_text(s)
|
||||
return s.encode(get_publisher().site_charset)
|
||||
|
||||
|
@ -346,7 +343,7 @@ def _http_request(
|
|||
):
|
||||
get_publisher().reload_cfg()
|
||||
|
||||
splitted_url = urlparse.urlsplit(url)
|
||||
splitted_url = urllib.parse.urlsplit(url)
|
||||
if splitted_url.scheme not in ('http', 'https'):
|
||||
raise ConnectionError('invalid scheme in URL %s' % url)
|
||||
|
||||
|
@ -391,7 +388,7 @@ def urlopen(url, data=None):
|
|||
response, status, data, auth_header = _http_request(
|
||||
url, 'GET' if data is None else 'POST', body=data, raise_on_http_errors=True
|
||||
)
|
||||
return BytesIO(data)
|
||||
return io.BytesIO(data)
|
||||
|
||||
|
||||
def http_get_page(url, **kwargs):
|
||||
|
@ -418,14 +415,14 @@ def get_variadic_url(url, variables, encode_query=True):
|
|||
if '{{' in url or '{%' in url:
|
||||
try:
|
||||
url = Template(url).render(variables)
|
||||
p = urlparse.urlsplit(url)
|
||||
p = urllib.parse.urlsplit(url)
|
||||
scheme, netloc, path, query, fragment = (p.scheme, p.netloc, p.path, p.query, p.fragment)
|
||||
if path.startswith('//'):
|
||||
# don't let double slash happen at the root of the URL, this
|
||||
# happens when a template such as {{url}}/path is used (with
|
||||
# {{url}} already ending with a slash).
|
||||
path = path[1:]
|
||||
return urlparse.urlunsplit((scheme, netloc, path, query, fragment))
|
||||
return urllib.parse.urlunsplit((scheme, netloc, path, query, fragment))
|
||||
except (TemplateSyntaxError, VariableDoesNotExist):
|
||||
return url
|
||||
|
||||
|
@ -433,16 +430,16 @@ def get_variadic_url(url, variables, encode_query=True):
|
|||
def ezt_substitute(template, variables):
|
||||
tmpl = ezt.Template()
|
||||
tmpl.parse(template)
|
||||
fd = StringIO()
|
||||
fd = io.StringIO()
|
||||
tmpl.generate(fd, variables)
|
||||
return fd.getvalue()
|
||||
|
||||
def partial_quote(string):
|
||||
# unquote brackets, as there may be further processing that needs them
|
||||
# intact.
|
||||
return quote(string).replace('%5B', '[').replace('%5D', ']')
|
||||
return urllib.parse.quote(string).replace('%5B', '[').replace('%5D', ']')
|
||||
|
||||
p = urlparse.urlsplit(url)
|
||||
p = urllib.parse.urlsplit(url)
|
||||
scheme, netloc, path, query, fragment = p.scheme, p.netloc, p.path, p.query, p.fragment
|
||||
if netloc and '[' in netloc:
|
||||
netloc = ezt_substitute(netloc, variables)
|
||||
|
@ -456,7 +453,7 @@ def get_variadic_url(url, variables, encode_query=True):
|
|||
# there were no / in the original path (the two / comes from
|
||||
# the scheme/netloc separation, this means there is no path)
|
||||
before_path = ezt_substitute(path, variables)
|
||||
p2 = urlparse.urlsplit(before_path)
|
||||
p2 = urllib.parse.urlsplit(before_path)
|
||||
scheme, netloc, path = p2.scheme, p2.netloc, p2.path
|
||||
else:
|
||||
# there is a path, we need to get back to the original URL and
|
||||
|
@ -467,7 +464,7 @@ def get_variadic_url(url, variables, encode_query=True):
|
|||
else:
|
||||
before_path, path = path, ''
|
||||
before_path = ezt_substitute(before_path, variables)
|
||||
p2 = urlparse.urlsplit(before_path)
|
||||
p2 = urllib.parse.urlsplit(before_path)
|
||||
scheme, netloc = p2.scheme, p2.netloc
|
||||
if p2.path:
|
||||
if not path:
|
||||
|
@ -487,7 +484,7 @@ def get_variadic_url(url, variables, encode_query=True):
|
|||
if fragment and '[' in fragment:
|
||||
fragment = partial_quote(ezt_substitute(fragment, variables))
|
||||
if query and '[' in query:
|
||||
p_qs = urlparse.parse_qsl(query)
|
||||
p_qs = urllib.parse.parse_qsl(query)
|
||||
if len(p_qs) == 0:
|
||||
# this happened because the query string has no key/values,
|
||||
# probably because it's a single substitution variable (ex:
|
||||
|
@ -502,10 +499,10 @@ def get_variadic_url(url, variables, encode_query=True):
|
|||
v = ezt_substitute(v, variables)
|
||||
query.append((k, v))
|
||||
if encode_query:
|
||||
query = urlencode(query)
|
||||
query = urllib.parse.urlencode(query)
|
||||
else:
|
||||
query = '&'.join('%s=%s' % (k, v) for (k, v) in query)
|
||||
return urlparse.urlunsplit((scheme, netloc, path, query, fragment))
|
||||
return urllib.parse.urlunsplit((scheme, netloc, path, query, fragment))
|
||||
|
||||
|
||||
def get_foreground_colour(background_colour):
|
||||
|
@ -662,7 +659,7 @@ def get_thumbnail(filepath, content_type=None):
|
|||
|
||||
if content_type == 'application/pdf':
|
||||
try:
|
||||
fp = BytesIO(
|
||||
fp = io.BytesIO(
|
||||
subprocess.check_output(
|
||||
['pdftoppm', '-png', '-scale-to-x', '500', '-scale-to-y', '-1', filepath]
|
||||
)
|
||||
|
@ -705,7 +702,7 @@ def get_thumbnail(filepath, content_type=None):
|
|||
# * File "PIL/PngImagePlugin.py", line 119, in read
|
||||
# * raise SyntaxError("broken PNG file (chunk %s)" % repr(cid))
|
||||
raise IOError
|
||||
image_thumb_fp = BytesIO()
|
||||
image_thumb_fp = io.BytesIO()
|
||||
image.save(image_thumb_fp, "PNG")
|
||||
except IOError:
|
||||
# failed to create thumbnail.
|
||||
|
|
|
@ -18,14 +18,15 @@ from __future__ import print_function
|
|||
|
||||
import codecs
|
||||
import collections
|
||||
from django.utils.six.moves import builtins
|
||||
from django.utils.six.moves import configparser as ConfigParser
|
||||
from django.utils.six.moves import cPickle
|
||||
from django.utils.six.moves.urllib import parse as urllib
|
||||
import builtins
|
||||
import configparser
|
||||
import pickle
|
||||
import urllib.parse
|
||||
import datetime
|
||||
from decimal import Decimal
|
||||
import importlib
|
||||
import inspect
|
||||
import io
|
||||
import os
|
||||
import fcntl
|
||||
import hashlib
|
||||
|
@ -43,10 +44,8 @@ import xml.etree.ElementTree as ET
|
|||
|
||||
from django.conf import settings
|
||||
from django.http import Http404
|
||||
from django.utils import six
|
||||
from django.utils import translation
|
||||
from django.utils.encoding import force_text, force_bytes
|
||||
from django.utils.six import StringIO
|
||||
|
||||
from quixote.publish import Publisher, get_request, get_response, get_publisher, redirect
|
||||
from .http_request import HTTPRequest
|
||||
|
@ -128,7 +127,7 @@ class QommonPublisher(Publisher, object):
|
|||
return '%s://%s%s' % (
|
||||
req.get_scheme(),
|
||||
req.get_server(),
|
||||
urllib.quote(req.environ.get('SCRIPT_NAME')),
|
||||
urllib.parse.quote(req.environ.get('SCRIPT_NAME')),
|
||||
)
|
||||
return 'https://%s' % os.path.basename(get_publisher().app_dir)
|
||||
|
||||
|
@ -141,7 +140,7 @@ class QommonPublisher(Publisher, object):
|
|||
return '%s://%s%s/backoffice' % (
|
||||
req.get_scheme(),
|
||||
req.get_server(),
|
||||
urllib.quote(req.environ.get('SCRIPT_NAME')),
|
||||
urllib.parse.quote(req.environ.get('SCRIPT_NAME')),
|
||||
)
|
||||
return 'https://%s/backoffice' % os.path.basename(self.app_dir)
|
||||
|
||||
|
@ -206,7 +205,7 @@ class QommonPublisher(Publisher, object):
|
|||
self, request, original_response, exc_type, exc_value, tb
|
||||
)
|
||||
|
||||
error_file = StringIO()
|
||||
error_file = io.StringIO()
|
||||
|
||||
if limit is None:
|
||||
if hasattr(sys, 'tracebacklimit'):
|
||||
|
@ -356,7 +355,7 @@ class QommonPublisher(Publisher, object):
|
|||
self.ngettext = translation.ngettext
|
||||
|
||||
def load_site_options(self):
|
||||
self.site_options = ConfigParser.ConfigParser()
|
||||
self.site_options = configparser.ConfigParser()
|
||||
site_options_filename = os.path.join(self.app_dir, 'site-options.cfg')
|
||||
if not os.path.exists(site_options_filename):
|
||||
return
|
||||
|
@ -378,9 +377,9 @@ class QommonPublisher(Publisher, object):
|
|||
self.load_site_options()
|
||||
try:
|
||||
return self.site_options.get('options', option) == 'true'
|
||||
except ConfigParser.NoSectionError:
|
||||
except configparser.NoSectionError:
|
||||
return defaults.get(option, False)
|
||||
except ConfigParser.NoOptionError:
|
||||
except configparser.NoOptionError:
|
||||
return defaults.get(option, False)
|
||||
|
||||
def get_site_option(self, option, section='options'):
|
||||
|
@ -394,9 +393,9 @@ class QommonPublisher(Publisher, object):
|
|||
self.load_site_options()
|
||||
try:
|
||||
return self.site_options.get(section, option)
|
||||
except ConfigParser.NoSectionError:
|
||||
except configparser.NoSectionError:
|
||||
return defaults.get(section, {}).get(option)
|
||||
except ConfigParser.NoOptionError:
|
||||
except configparser.NoOptionError:
|
||||
return defaults.get(section, {}).get(option)
|
||||
|
||||
def get_site_storages(self):
|
||||
|
@ -712,7 +711,7 @@ class QommonPublisher(Publisher, object):
|
|||
cfg = None
|
||||
|
||||
def write_cfg(self):
|
||||
s = cPickle.dumps(self.cfg, protocol=2)
|
||||
s = pickle.dumps(self.cfg, protocol=2)
|
||||
filename = os.path.join(self.app_dir, 'config.pck')
|
||||
storage.atomic_write(filename, s)
|
||||
|
||||
|
@ -720,7 +719,7 @@ class QommonPublisher(Publisher, object):
|
|||
filename = os.path.join(self.app_dir, 'config.pck')
|
||||
try:
|
||||
with open(filename, 'rb') as fd:
|
||||
self.cfg = cPickle.load(fd, encoding='utf-8')
|
||||
self.cfg = pickle.load(fd, encoding='utf-8')
|
||||
except:
|
||||
self.cfg = {}
|
||||
|
||||
|
@ -908,26 +907,26 @@ class QommonPublisher(Publisher, object):
|
|||
self.load_site_options()
|
||||
try:
|
||||
d.update(dict(self.site_options.items('variables', raw=True)))
|
||||
except ConfigParser.NoSectionError:
|
||||
except configparser.NoSectionError:
|
||||
pass
|
||||
d['manager_homepage_url'] = d.get('portal_agent_url')
|
||||
d['manager_homepage_title'] = d.get('portal_agent_title')
|
||||
return d
|
||||
|
||||
def is_relatable_url(self, url):
|
||||
parsed_url = urllib.urlparse(url)
|
||||
parsed_url = urllib.parse.urlparse(url)
|
||||
if not parsed_url.netloc:
|
||||
return True
|
||||
if parsed_url.netloc == urllib.urlparse(self.get_frontoffice_url()).netloc:
|
||||
if parsed_url.netloc == urllib.parse.urlparse(self.get_frontoffice_url()).netloc:
|
||||
return True
|
||||
if parsed_url.netloc == urllib.urlparse(self.get_backoffice_url()).netloc:
|
||||
if parsed_url.netloc == urllib.parse.urlparse(self.get_backoffice_url()).netloc:
|
||||
return True
|
||||
if parsed_url.netloc in [x.strip() for x in self.get_site_option('relatable-hosts').split(',')]:
|
||||
return True
|
||||
try:
|
||||
if parsed_url.netloc in self.site_options.options('api-secrets'):
|
||||
return True
|
||||
except ConfigParser.NoSectionError:
|
||||
except configparser.NoSectionError:
|
||||
pass
|
||||
return False
|
||||
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
|
||||
import os
|
||||
import time
|
||||
import urllib.parse
|
||||
import sys
|
||||
from xml.sax.saxutils import escape
|
||||
|
||||
|
@ -25,7 +26,6 @@ except ImportError:
|
|||
lasso = None
|
||||
|
||||
from django.utils.encoding import force_text
|
||||
from django.utils.six.moves.urllib import parse as urlparse
|
||||
|
||||
from quixote import get_request, get_response, redirect, get_field, get_publisher
|
||||
from quixote.http_request import parse_header
|
||||
|
@ -186,11 +186,11 @@ class Saml2Directory(Directory):
|
|||
login.msgRelayState = get_request().form.get('next')
|
||||
|
||||
next_url = login.msgRelayState or get_publisher().get_frontoffice_url()
|
||||
parsed_url = urlparse.urlparse(next_url)
|
||||
parsed_url = urllib.parse.urlparse(next_url)
|
||||
request = get_request()
|
||||
scheme = parsed_url.scheme or request.get_scheme()
|
||||
netloc = parsed_url.netloc or request.get_server()
|
||||
next_url = urlparse.urlunsplit(
|
||||
next_url = urllib.parse.urlunsplit(
|
||||
(scheme, netloc, parsed_url.path, parsed_url.query, parsed_url.fragment)
|
||||
)
|
||||
samlp_extensions = '''<samlp:Extensions
|
||||
|
@ -372,10 +372,10 @@ class Saml2Directory(Directory):
|
|||
if relay_state == 'backoffice':
|
||||
after_url = get_publisher().get_backoffice_url()
|
||||
elif relay_state:
|
||||
parsed_url = urlparse.urlparse(relay_state)
|
||||
parsed_url = urllib.parse.urlparse(relay_state)
|
||||
scheme = parsed_url.scheme or request.get_scheme()
|
||||
netloc = parsed_url.netloc or request.get_server()
|
||||
after_url = urlparse.urlunsplit(
|
||||
after_url = urllib.parse.urlunsplit(
|
||||
(scheme, netloc, parsed_url.path, parsed_url.query, parsed_url.fragment)
|
||||
)
|
||||
if not (
|
||||
|
|
|
@ -15,8 +15,7 @@
|
|||
# along with this program; if not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import re
|
||||
|
||||
from django.utils.six.moves.urllib import parse as urllib
|
||||
import urllib.parse
|
||||
|
||||
from . import N_, errors, misc
|
||||
from . import get_cfg, get_logger
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program; if not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import builtins
|
||||
import copyreg
|
||||
import errno
|
||||
import operator
|
||||
|
@ -24,11 +25,9 @@ import os.path
|
|||
import shutil
|
||||
import sys
|
||||
import tempfile
|
||||
import _thread
|
||||
|
||||
from django.utils import six
|
||||
from django.utils.encoding import force_bytes
|
||||
from django.utils.six.moves import builtins
|
||||
from django.utils.six.moves import _thread
|
||||
|
||||
from .vendor import locket
|
||||
|
||||
|
@ -150,7 +149,7 @@ class Criteria(object):
|
|||
self.typed_none = ''
|
||||
if isinstance(self.value, bool):
|
||||
self.typed_none = False
|
||||
elif isinstance(self.value, six.integer_types + (float,)):
|
||||
elif isinstance(self.value, (int, float)):
|
||||
self.typed_none = -sys.maxsize
|
||||
elif isinstance(self.value, time.struct_time):
|
||||
self.typed_none = time.gmtime(-(10 ** 10)) # 1653
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program; if not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import io
|
||||
import os
|
||||
import glob
|
||||
import re
|
||||
|
@ -28,8 +29,6 @@ from django.template import (
|
|||
from django.template.loader import render_to_string
|
||||
from django.utils.encoding import force_text, smart_text
|
||||
from django.utils.safestring import SafeString, SafeText
|
||||
from django.utils import six
|
||||
from django.utils.six import StringIO
|
||||
|
||||
from quixote import get_session, get_request, get_response, get_publisher
|
||||
from quixote.directory import Directory
|
||||
|
@ -112,7 +111,7 @@ def get_theme_dict(theme_xml):
|
|||
desc = force_str(tree.findtext('desc') or '')
|
||||
author = force_str(tree.findtext('author') or '')
|
||||
icon = None
|
||||
if isinstance(theme_xml, six.string_types):
|
||||
if isinstance(theme_xml, str):
|
||||
icon = os.path.join(os.path.dirname(theme_xml), 'icon.png')
|
||||
if not os.path.exists(icon):
|
||||
icon = None
|
||||
|
@ -412,7 +411,7 @@ def decorate(body, response):
|
|||
else:
|
||||
template = default_template
|
||||
|
||||
fd = StringIO()
|
||||
fd = io.StringIO()
|
||||
vars = get_decorate_vars(body, response, generate_breadcrumb=generate_breadcrumb)
|
||||
|
||||
template.generate(fd, vars)
|
||||
|
@ -527,7 +526,7 @@ class Template(object):
|
|||
return re.sub(r'[\uE000-\uF8FF]', '', rendered)
|
||||
|
||||
def ezt_render(self, context={}):
|
||||
fd = StringIO()
|
||||
fd = io.StringIO()
|
||||
try:
|
||||
self.template.generate(fd, context)
|
||||
except ezt.EZTException as e:
|
||||
|
|
|
@ -36,7 +36,6 @@ except ImportError:
|
|||
from django import template
|
||||
from django.template import defaultfilters
|
||||
from django.utils import dateparse
|
||||
from django.utils import six
|
||||
from django.utils.encoding import force_bytes, force_text
|
||||
from django.utils.safestring import mark_safe
|
||||
from django.utils.timezone import is_naive, make_aware
|
||||
|
@ -222,7 +221,7 @@ def parse_decimal(value, do_raise=False):
|
|||
# treat all booleans as 0 (contrary to Python behaviour where
|
||||
# decimal(True) == 1).
|
||||
value = 0
|
||||
if isinstance(value, six.string_types):
|
||||
if isinstance(value, str):
|
||||
# replace , by . for French users comfort
|
||||
value = value.replace(',', '.')
|
||||
try:
|
||||
|
@ -505,7 +504,7 @@ def divide(term1, term2):
|
|||
def sum_(list_):
|
||||
if hasattr(list_, 'get_value'):
|
||||
list_ = list_.get_value() # unlazy
|
||||
if isinstance(list_, six.string_types):
|
||||
if isinstance(list_, str):
|
||||
# do not consider string as iterable, to avoid misusage
|
||||
return ''
|
||||
try:
|
||||
|
@ -581,7 +580,7 @@ def get_latlon(obj):
|
|||
return float(obj['lat']), float(obj['lng'])
|
||||
except (TypeError, ValueError):
|
||||
pass
|
||||
if isinstance(obj, six.string_types) and ';' in obj:
|
||||
if isinstance(obj, str) and ';' in obj:
|
||||
try:
|
||||
return float(obj.split(';')[0]), float(obj.split(';')[1])
|
||||
except ValueError:
|
||||
|
@ -732,7 +731,7 @@ def phonenumber_fr(value, separator=' '):
|
|||
|
||||
if hasattr(value, 'get_value'):
|
||||
value = value.get_value() # unlazy
|
||||
if not value or not isinstance(value, six.string_types):
|
||||
if not value or not isinstance(value, str):
|
||||
return value
|
||||
number = value.strip()
|
||||
if not number:
|
||||
|
@ -767,7 +766,7 @@ def phonenumber_fr(value, separator=' '):
|
|||
def is_empty(value):
|
||||
from wcs.variables import LazyFormDefObjectsManager, LazyList
|
||||
|
||||
if isinstance(value, (six.string_types, list, dict)):
|
||||
if isinstance(value, (str, list, dict)):
|
||||
return not value
|
||||
if isinstance(value, (LazyFormDefObjectsManager, LazyList)):
|
||||
return not list(value)
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
# along with this program; if not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import base64
|
||||
import io
|
||||
import os
|
||||
|
||||
from quixote import get_publisher
|
||||
|
@ -22,7 +23,6 @@ from quixote.http_request import Upload
|
|||
|
||||
from django.utils.encoding import force_text
|
||||
from django.utils.module_loading import import_string
|
||||
from django.utils.six import BytesIO
|
||||
|
||||
from .errors import ConnectionError
|
||||
from .misc import json_loads, file_digest, can_thumbnail
|
||||
|
@ -51,7 +51,7 @@ class PicklableUpload(Upload):
|
|||
self.__dict__.update(dict)
|
||||
if hasattr(self, 'data'):
|
||||
# backward compatibility with older w.c.s. version
|
||||
self.fp = BytesIO(self.data)
|
||||
self.fp = io.BytesIO(self.data)
|
||||
del self.data
|
||||
|
||||
def get_file(self):
|
||||
|
|
|
@ -21,7 +21,6 @@ import os
|
|||
import subprocess
|
||||
import stat
|
||||
|
||||
from django.utils import six
|
||||
from django.utils.encoding import force_text
|
||||
|
||||
_openssl = 'openssl'
|
||||
|
@ -29,7 +28,7 @@ _openssl = 'openssl'
|
|||
|
||||
def decapsulate_pem_file(file_or_string):
|
||||
'''Remove PEM header lines'''
|
||||
if not isinstance(file_or_string, six.string_types):
|
||||
if not isinstance(file_or_string, str):
|
||||
content = file_or_string.read()
|
||||
else:
|
||||
content = file_or_string
|
||||
|
|
|
@ -18,8 +18,7 @@ from importlib import import_module
|
|||
import json
|
||||
import os
|
||||
import re
|
||||
|
||||
from django.utils.six.moves.urllib import parse as urllib
|
||||
import urllib.parse
|
||||
|
||||
from quixote import get_publisher, get_response, get_session, redirect, get_session_manager, get_request
|
||||
from quixote.directory import Directory
|
||||
|
@ -113,7 +112,7 @@ class LoginDirectory(Directory):
|
|||
if get_publisher().ident_methods.get(method)().is_interactive():
|
||||
login_url = '../ident/%s/login' % method
|
||||
if get_request().form.get('next'):
|
||||
login_url += '?' + urllib.urlencode({'next': get_request().form.get('next')})
|
||||
login_url += '?' + urllib.parse.urlencode({'next': get_request().form.get('next')})
|
||||
return redirect(login_url)
|
||||
else:
|
||||
try:
|
||||
|
|
11
wcs/sql.py
11
wcs/sql.py
|
@ -15,6 +15,7 @@
|
|||
# along with this program; if not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import copy
|
||||
import io
|
||||
import psycopg2
|
||||
import psycopg2.extensions
|
||||
import psycopg2.extras
|
||||
|
@ -28,9 +29,7 @@ try:
|
|||
except ImportError:
|
||||
import pickle
|
||||
|
||||
from django.utils import six
|
||||
from django.utils.encoding import force_bytes, force_text
|
||||
from django.utils.six import BytesIO
|
||||
|
||||
from quixote import get_publisher
|
||||
from . import qommon
|
||||
|
@ -84,7 +83,7 @@ SQL_TYPE_MAPPING = {
|
|||
def pickle_loads(value):
|
||||
if hasattr(value, 'tobytes'):
|
||||
value = value.tobytes()
|
||||
obj = UnpicklerClass(BytesIO(force_bytes(value)), **PICKLE_KWARGS).load()
|
||||
obj = UnpicklerClass(io.BytesIO(force_bytes(value)), **PICKLE_KWARGS).load()
|
||||
obj = deep_bytes2str(obj)
|
||||
return obj
|
||||
|
||||
|
@ -1613,7 +1612,7 @@ class SqlMixin(object):
|
|||
# turn {'poire': 2, 'abricot': 1, 'pomme': 3} into an array
|
||||
value = [[force_str(x), force_str(y)] for x, y in value.items()]
|
||||
elif sql_type == 'varchar':
|
||||
assert isinstance(value, six.string_types)
|
||||
assert isinstance(value, str)
|
||||
elif sql_type == 'date':
|
||||
assert type(value) is time.struct_time
|
||||
value = datetime.datetime(value.tm_year, value.tm_mon, value.tm_mday)
|
||||
|
@ -2012,7 +2011,7 @@ class SqlDataMixin(SqlMixin):
|
|||
elif field.key in ('item', 'items'):
|
||||
value = self.data.get('%s_display' % field.id)
|
||||
if value:
|
||||
if isinstance(value, six.string_types):
|
||||
if isinstance(value, str):
|
||||
fts_strings.append(value)
|
||||
elif type(value) in (tuple, list):
|
||||
fts_strings.extend(value)
|
||||
|
@ -2322,7 +2321,7 @@ class SqlUser(SqlMixin, wcs.users.User):
|
|||
elif field.key in ('item', 'items'):
|
||||
value = self.form_data.get('%s_display' % field.id)
|
||||
if value:
|
||||
if isinstance(value, six.string_types):
|
||||
if isinstance(value, str):
|
||||
fts_strings.append(value)
|
||||
elif type(value) in (tuple, list):
|
||||
fts_strings.extend(value)
|
||||
|
|
|
@ -14,9 +14,9 @@
|
|||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program; if not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import urllib.parse
|
||||
import xml.etree.ElementTree as ET
|
||||
|
||||
from django.utils.six.moves.urllib import parse as urllib
|
||||
from quixote import redirect
|
||||
|
||||
from ..qommon import _, N_
|
||||
|
@ -39,7 +39,7 @@ def lookup_wf_attachment(self, filename):
|
|||
else:
|
||||
file_reference = None
|
||||
|
||||
filenames = [filename, urllib.unquote(filename)]
|
||||
filenames = [filename, urllib.parse.unquote(filename)]
|
||||
for p in self.formdata.iter_evolution_parts():
|
||||
if not isinstance(p, AttachmentEvolutionPart):
|
||||
continue
|
||||
|
@ -69,7 +69,7 @@ def form_attachment(self):
|
|||
% (
|
||||
self.filled.get_url(backoffice=is_in_backoffice),
|
||||
fn,
|
||||
urllib.quote(p.base_filename),
|
||||
urllib.parse.quote(p.base_filename),
|
||||
)
|
||||
)
|
||||
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
|
||||
import base64
|
||||
import collections
|
||||
import io
|
||||
from xml.etree import ElementTree as ET
|
||||
import zipfile
|
||||
import os
|
||||
|
@ -25,9 +26,7 @@ import tempfile
|
|||
import time
|
||||
import shutil
|
||||
|
||||
from django.utils import six
|
||||
from django.utils.encoding import force_bytes, force_text
|
||||
from django.utils.six import BytesIO, StringIO
|
||||
|
||||
from quixote import get_response, get_request, get_publisher
|
||||
from quixote.directory import Directory
|
||||
|
@ -503,7 +502,7 @@ class ExportToModel(WorkflowStatusItem):
|
|||
try:
|
||||
# force ezt_only=True because an RTF file may contain {{ characters
|
||||
# and would be seen as a Django template
|
||||
return BytesIO(
|
||||
return io.BytesIO(
|
||||
force_bytes(
|
||||
template_on_formdata(
|
||||
formdata,
|
||||
|
@ -629,7 +628,7 @@ class ExportToModel(WorkflowStatusItem):
|
|||
node.append(child)
|
||||
node.tail = current_tail
|
||||
|
||||
outstream = BytesIO()
|
||||
outstream = io.BytesIO()
|
||||
transform_opendocument(self.model_file.get_file(), outstream, process_root)
|
||||
outstream.seek(0)
|
||||
return outstream
|
||||
|
@ -721,7 +720,7 @@ class ExportToModel(WorkflowStatusItem):
|
|||
filename = 'export_to_model-%s-%s-%s.upload' % ids
|
||||
|
||||
upload = Upload(base_filename, content_type)
|
||||
upload.fp = BytesIO()
|
||||
upload.fp = io.BytesIO()
|
||||
upload.fp.write(content)
|
||||
upload.fp.seek(0)
|
||||
self.model_file = UploadedFile('models', filename, upload)
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
|
||||
import collections
|
||||
import json
|
||||
import urllib.parse
|
||||
|
||||
try:
|
||||
from PIL import Image
|
||||
|
@ -24,7 +25,6 @@ try:
|
|||
except ImportError:
|
||||
Image = None
|
||||
|
||||
from django.utils.six.moves.urllib import parse as urlparse
|
||||
from quixote import get_publisher
|
||||
|
||||
from ..qommon import _, N_, force_str
|
||||
|
@ -153,7 +153,7 @@ class GeolocateWorkflowStatusItem(WorkflowStatusItem):
|
|||
url += '&'
|
||||
else:
|
||||
url += '?'
|
||||
url += 'q=%s' % urlparse.quote(address)
|
||||
url += 'q=%s' % urllib.parse.quote(address)
|
||||
url += '&format=json'
|
||||
url += '&accept-language=%s' % (get_publisher().get_site_language() or 'en')
|
||||
|
||||
|
|
|
@ -19,8 +19,6 @@ import json
|
|||
import os
|
||||
import time
|
||||
|
||||
from django.utils import six
|
||||
|
||||
from quixote import get_publisher, get_request, redirect
|
||||
|
||||
from quixote.directory import Directory
|
||||
|
@ -136,7 +134,7 @@ class JumpWorkflowStatusItem(WorkflowStatusJumpItem):
|
|||
|
||||
def migrate(self):
|
||||
changed = super(JumpWorkflowStatusItem, self).migrate()
|
||||
if isinstance(self.condition, six.string_types):
|
||||
if isinstance(self.condition, str):
|
||||
if self.condition:
|
||||
self.condition = {'type': 'python', 'value': self.condition}
|
||||
else:
|
||||
|
|
|
@ -18,9 +18,9 @@ import datetime
|
|||
import json
|
||||
import sys
|
||||
import time
|
||||
import urllib.parse
|
||||
import xml.etree.ElementTree as ET
|
||||
|
||||
from django.utils.six.moves.urllib import parse as urlparse
|
||||
from quixote import get_publisher, get_response, get_request
|
||||
|
||||
from ..qommon import _, N_
|
||||
|
@ -37,7 +37,7 @@ def user_ws_url(user_uuid):
|
|||
idps = get_cfg('idp', {})
|
||||
entity_id = list(idps.values())[0]['metadata_url']
|
||||
base_url = entity_id.split('idp/saml2/metadata')[0]
|
||||
url = urlparse.urljoin(base_url, '/api/users/%s/' % user_uuid)
|
||||
url = urllib.parse.urljoin(base_url, '/api/users/%s/' % user_uuid)
|
||||
secret, orig = get_secret_and_orig(url)
|
||||
url += '?orig=%s' % orig
|
||||
return sign_url(url, secret)
|
||||
|
|
|
@ -15,9 +15,7 @@
|
|||
# along with this program; if not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import sys
|
||||
|
||||
from django.utils.six.moves.urllib import parse as urllib
|
||||
from django.utils.six.moves.urllib import parse as urlparse
|
||||
import urllib.parse
|
||||
|
||||
from quixote import get_request, get_publisher, get_response
|
||||
|
||||
|
@ -35,8 +33,8 @@ def roles_ws_url(role_uuid, user_uuid):
|
|||
idps = get_cfg('idp', {})
|
||||
entity_id = list(idps.values())[0]['metadata_url']
|
||||
base_url = entity_id.split('idp/saml2/metadata')[0]
|
||||
url = urlparse.urljoin(
|
||||
base_url, '/api/roles/%s/members/%s/' % (urllib.quote(role_uuid), urllib.quote(user_uuid))
|
||||
url = urllib.parse.urljoin(
|
||||
base_url, '/api/roles/%s/members/%s/' % (urllib.parse.quote(role_uuid), urllib.parse.quote(user_uuid))
|
||||
)
|
||||
return url
|
||||
|
||||
|
|
|
@ -15,15 +15,14 @@
|
|||
# along with this program; if not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import datetime
|
||||
import io
|
||||
import sys
|
||||
import traceback
|
||||
import xml.etree.ElementTree as ET
|
||||
import collections
|
||||
import mimetypes
|
||||
|
||||
from django.utils import six
|
||||
from django.utils.encoding import force_text
|
||||
from django.utils.six import BytesIO
|
||||
|
||||
from quixote.html import TemplateIO, htmltext
|
||||
|
||||
|
@ -463,7 +462,7 @@ class WebserviceCallStatusItem(WorkflowStatusItem):
|
|||
filename, content_type = self.get_attachment_data(response)
|
||||
workflow_data['%s_content_type' % self.varname] = content_type
|
||||
workflow_data['%s_length' % self.varname] = len(data)
|
||||
fp_content = BytesIO(data)
|
||||
fp_content = io.BytesIO(data)
|
||||
attachment = AttachmentEvolutionPart(
|
||||
filename, fp_content, content_type=content_type, varname=self.varname
|
||||
)
|
||||
|
@ -562,11 +561,11 @@ class WebserviceCallStatusItem(WorkflowStatusItem):
|
|||
el = ET.SubElement(xml_item, attribute)
|
||||
for (key, value) in getattr(self, attribute).items():
|
||||
item = ET.SubElement(el, 'item')
|
||||
if isinstance(key, six.string_types):
|
||||
if isinstance(key, str):
|
||||
ET.SubElement(item, 'name').text = force_text(key)
|
||||
else:
|
||||
raise AssertionError('unknown type for key (%r)' % key)
|
||||
if isinstance(value, six.string_types):
|
||||
if isinstance(value, str):
|
||||
ET.SubElement(item, 'value').text = force_text(value)
|
||||
else:
|
||||
raise AssertionError('unknown type for value (%r)' % key)
|
||||
|
|
|
@ -25,7 +25,6 @@ import sys
|
|||
import time
|
||||
import uuid
|
||||
|
||||
from django.utils import six
|
||||
from django.utils.encoding import force_text
|
||||
|
||||
from quixote import get_request, get_response, redirect
|
||||
|
@ -938,7 +937,7 @@ class XmlSerialisable(object):
|
|||
atname = 'item'
|
||||
for v in val:
|
||||
ET.SubElement(el, atname).text = force_text(str(v), charset, errors='replace')
|
||||
elif isinstance(val, six.string_types):
|
||||
elif isinstance(val, str):
|
||||
el.text = force_text(val, charset, errors='replace')
|
||||
else:
|
||||
el.text = str(val)
|
||||
|
@ -970,9 +969,7 @@ class XmlSerialisable(object):
|
|||
else:
|
||||
if el.text is None:
|
||||
setattr(self, attribute, None)
|
||||
elif el.text in ('False', 'True') and not isinstance(
|
||||
getattr(self, attribute), six.string_types
|
||||
):
|
||||
elif el.text in ('False', 'True') and not isinstance(getattr(self, attribute), str):
|
||||
# booleans
|
||||
setattr(self, attribute, el.text == 'True')
|
||||
elif type(getattr(self, attribute)) is int:
|
||||
|
@ -1311,7 +1308,7 @@ class WorkflowGlobalActionTimeoutTrigger(WorkflowGlobalActionTrigger):
|
|||
)
|
||||
elif isinstance(anchor_date, time.struct_time):
|
||||
anchor_date = datetime.datetime.fromtimestamp(time.mktime(anchor_date))
|
||||
elif isinstance(anchor_date, six.string_types) and anchor_date:
|
||||
elif isinstance(anchor_date, str) and anchor_date:
|
||||
try:
|
||||
anchor_date = get_as_datetime(anchor_date)
|
||||
except ValueError:
|
||||
|
@ -2076,7 +2073,7 @@ class WorkflowStatusItem(XmlSerialisable):
|
|||
formdata=None,
|
||||
status_item=None,
|
||||
):
|
||||
if not isinstance(var, six.string_types):
|
||||
if not isinstance(var, str):
|
||||
return var
|
||||
|
||||
expression = cls.get_expression(var)
|
||||
|
@ -2919,7 +2916,7 @@ class SendmailWorkflowStatusItem(WorkflowStatusItem):
|
|||
# this works around the fact that parametric workflows only support
|
||||
# string values, so if we get set a string, we convert it here to an
|
||||
# array.
|
||||
if isinstance(self.to, six.string_types):
|
||||
if isinstance(self.to, str):
|
||||
self.to = [self.to]
|
||||
|
||||
addresses = []
|
||||
|
@ -2935,7 +2932,7 @@ class SendmailWorkflowStatusItem(WorkflowStatusItem):
|
|||
if isinstance(dest, list):
|
||||
addresses.extend(dest)
|
||||
continue
|
||||
elif isinstance(dest, six.string_types) and ',' in dest:
|
||||
elif isinstance(dest, str) and ',' in dest:
|
||||
# if the email contains a comma consider it as a serie of
|
||||
# emails
|
||||
addresses.extend([x.strip() for x in dest.split(',')])
|
||||
|
|
|
@ -17,11 +17,10 @@
|
|||
import collections
|
||||
import json
|
||||
import sys
|
||||
import urllib.parse
|
||||
import xml.etree.ElementTree as ET
|
||||
|
||||
from django.utils.encoding import force_text
|
||||
from django.utils.six.moves.urllib import parse as urllib
|
||||
from django.utils.six.moves.urllib import parse as urlparse
|
||||
|
||||
from quixote import get_publisher, get_request
|
||||
|
||||
|
@ -94,8 +93,8 @@ def call_webservice(
|
|||
|
||||
if qs_data: # merge qs_data into url
|
||||
publisher = get_publisher()
|
||||
parsed = urlparse.urlparse(url)
|
||||
qs = list(urlparse.parse_qsl(parsed.query))
|
||||
parsed = urllib.parse.urlparse(url)
|
||||
qs = list(urllib.parse.parse_qsl(parsed.query))
|
||||
for key, value in qs_data.items():
|
||||
try:
|
||||
value = WorkflowStatusItem.compute(value, raises=True)
|
||||
|
@ -106,8 +105,8 @@ def call_webservice(
|
|||
key = force_str(key)
|
||||
value = force_str(value)
|
||||
qs.append((key, value))
|
||||
qs = urllib.urlencode(qs)
|
||||
url = urlparse.urlunparse(parsed[:4] + (qs,) + parsed[5:6])
|
||||
qs = urllib.parse.urlencode(qs)
|
||||
url = urllib.parse.urlunparse(parsed[:4] + (qs,) + parsed[5:6])
|
||||
|
||||
unsigned_url = url
|
||||
|
||||
|
|
Loading…
Reference in New Issue