Changes for Python 3: str/bytes fixes mostly.

This commit is contained in:
Neil Schemenauer 2016-03-31 20:40:32 +00:00
parent 9c12cd7b1a
commit 574b9b1088
13 changed files with 153 additions and 160 deletions

View File

@ -38,7 +38,7 @@ reference.
"""
import re
import urllib
import urllib.request, urllib.parse, urllib.error
try:
# faster C implementation
@ -103,7 +103,7 @@ def url_quote(value, fallback=None):
raise ValueError("value is None and no fallback supplied")
else:
return fallback
return urllib.quote(stringify(value))
return urllib.parse.quote(stringify(value))
_saved = None
def use_qpy():
@ -111,7 +111,7 @@ def use_qpy():
Switch to using 'qpy' as an alternative.
"""
import qpy
from qpy_templateio import qpy_TemplateIO
from .qpy_templateio import qpy_TemplateIO
global _saved, htmltext, stringify, htmlescape, TemplateIO
if not _saved:

View File

@ -3,8 +3,10 @@ TemplateIO.
"""
def _escape_string(s):
if not isinstance(s, basestring):
raise TypeError, 'string object required'
if isinstance(s, bytes):
raise TypeError('escape_string no longer accepts bytes')
if not isinstance(s, str):
raise TypeError('string object required')
s = s.replace("&", "&")
s = s.replace("<", "&lt;")
s = s.replace(">", "&gt;")
@ -16,17 +18,12 @@ def stringify(obj):
turning strings into unicode objects.
"""
tp = type(obj)
if issubclass(tp, basestring):
if isinstance(obj, (str, bytes)):
return obj
elif hasattr(tp, '__unicode__'):
s = tp.__unicode__(obj)
if not isinstance(s, basestring):
raise TypeError, '__unicode__ did not return a string'
return s
elif hasattr(tp, '__str__'):
s = tp.__str__(obj)
if not isinstance(s, basestring):
raise TypeError, '__str__ did not return a string'
if not isinstance(s, str):
raise TypeError('__str__ did not return a string')
return s
else:
return str(obj)
@ -58,8 +55,11 @@ class htmltext(object):
def __len__(self):
return len(self.s)
def __cmp__(self, other):
return cmp(self.s, other)
def __eq__(self, other):
return (self.s == other)
def __lt__(self, other):
return (self.s < other)
def __hash__(self):
return hash(self.s)
@ -156,17 +156,6 @@ class _QuoteWrapper(object):
def __getitem__(self, key):
return _wraparg(self.value[key])
class _UnicodeWrapper(unicode):
__slots__ = ['raw']
def __new__(cls, s):
result = unicode.__new__(cls, _escape_string(s))
result.raw = s
return result
def __repr__(self):
return _escape_string(`self.raw`)
def _wraparg(arg):
@ -176,7 +165,7 @@ def _wraparg(arg):
return stringify(arg)
elif isinstance(arg, str):
# again, work around PyString_Format bug
return _UnicodeWrapper(arg)
return _escape_string(arg)
elif (isinstance(arg, int) or
isinstance(arg, float)):
# ints, floats are okay

View File

@ -5,10 +5,13 @@ from quixote.html import href, url_with_query, url_quote, nl2br
markupchars = '<>&"'
quotedchars = '&lt;&gt;&amp;&quot;'
if sys.hexversion >= 0x20400a2:
unicodechars = u'\u1234'
else:
unicodechars = 'x' # lie, Python <= 2.3 is broken
unicodechars = '\\u1234'
# Byte types...
markupbytes = b'<>&"'
quotedbytes = b'&lt;&gt;&amp;&quot;'
bytebytes = b'\u1234'
class Wrapper:
def __init__(self, s):
@ -68,8 +71,8 @@ class HTMLTextTest (UTest):
def _check_init(self):
assert str(htmltext('foo')) == 'foo'
assert str(htmltext(markupchars)) == markupchars
assert unicode(htmltext(unicodechars)) == unicodechars
assert str(htmltext(unicode(markupchars))) == markupchars
assert str(htmltext(unicodechars)) == unicodechars
assert str(htmltext(str(markupchars))) == markupchars
assert str(htmltext(None)) == 'None'
assert str(htmltext(1)) == '1'
try:
@ -84,19 +87,31 @@ class HTMLTextTest (UTest):
assert stringify(Wrapper(markupchars)) is markupchars
assert stringify(Wrapper) == str(Wrapper)
assert stringify(None) == str(None)
assert stringify(markupbytes) is markupbytes
def check_escape(self):
assert htmlescape(markupchars) == quotedchars
assert isinstance(htmlescape(markupchars), htmltext)
assert escape(markupchars) == quotedchars
assert escape(unicodechars) == unicodechars
assert escape(unicode(markupchars)) == quotedchars
assert isinstance(escape(markupchars), basestring)
assert type(escape(markupchars)) == type(markupchars)
assert isinstance(escape(markupchars), str)
assert htmlescape(htmlescape(markupchars)) == quotedchars
# Now try to pass bytes...
try:
escape(markupbytes)
assert 0
except TypeError:
pass
# ...and now not a string at all
try:
escape(1)
assert 0
except TypeError: pass
except TypeError:
pass
def check_cmp(self):
s = htmltext("foo")
@ -158,39 +173,27 @@ class HTMLTextTest (UTest):
except TypeError: pass
def check_format(self):
s_fmt = htmltext('%s')
u_fmt = htmltext(u'%s')
assert s_fmt % 'foo' == "foo"
assert u_fmt % 'foo' == u"foo"
assert isinstance(s_fmt % 'foo', htmltext)
assert isinstance(u_fmt % 'foo', htmltext)
assert s_fmt % markupchars == quotedchars
u_fmt = htmltext('%s')
assert u_fmt % 'fooble' == "fooble"
assert isinstance(u_fmt % 'wibblefoo', htmltext)
assert u_fmt % markupchars == quotedchars
assert s_fmt % None == "None"
assert u_fmt % None == "None"
assert s_fmt % unicodechars == unicodechars
assert u_fmt % unicodechars == unicodechars
assert s_fmt % htmltext(unicodechars) == unicodechars
assert u_fmt % htmltext(unicodechars) == unicodechars
assert htmltext('%r') % Wrapper(markupchars) == quotedchars
assert htmltext('%r') % unicodechars == `unicodechars`
assert htmltext('%r') % unicodechars == repr(unicodechars)
assert htmltext('%s%s') % ('foo', htmltext(markupchars)) \
== ("foo" + markupchars)
assert htmltext('%d') % 10 == "10"
assert htmltext('%.1f') % 10 == "10.0"
try:
s_fmt % Broken()
assert 0
except BrokenError: pass
try:
htmltext('%r') % Broken()
assert 0
except BrokenError: pass
try:
s_fmt % (1, 2)
assert 0
except TypeError: pass
assert htmltext('%d') % 12300000000000000000L == "12300000000000000000"
assert htmltext('%d') % 12300000000000000000 == "12300000000000000000"
def check_dict_format(self):
args = {'a': 'foo&', 'b': htmltext('bar&')}
@ -232,8 +235,8 @@ class HTMLTextTest (UTest):
assert htmltext(' ').join([htmltext(markupchars), 'bar']) == \
markupchars + " bar"
assert isinstance(htmltext('').join([]), htmltext)
assert htmltext(u' ').join([unicodechars]) == unicodechars
assert htmltext(u' ').join(['']) == u''
assert htmltext(' ').join([unicodechars]) == unicodechars
assert htmltext(' ').join(['']) == ''
try:
htmltext('').join(1)
assert 0
@ -307,8 +310,8 @@ class TemplateTest (UTest):
assert t.getvalue() == 'abcd'
t += 123
assert t.getvalue() == 'abcd123'
t += u'\u1234'
assert t.getvalue() == u'abcd123\u1234'
t += '\\u1234'
assert t.getvalue() == 'abcd123\\u1234'
try:
t += Broken(); t.getvalue()
assert 0

View File

@ -8,9 +8,9 @@ import re
import string
import os
import tempfile
import urllib
import rfc822
from StringIO import StringIO
import urllib.request, urllib.parse, urllib.error
import email
import io
import quixote
from quixote.http_response import HTTPResponse
@ -48,10 +48,6 @@ def get_content_type(environ):
return None
def _decode_string(s, charset):
if charset == 'iso-8859-1' == quixote.DEFAULT_CHARSET:
# To avoid breaking applications that are not Unicode-safe, return
# a str instance in this case.
return s
try:
return s.decode(charset)
except LookupError:
@ -65,7 +61,9 @@ def parse_header(line):
Return the main content-type and a dictionary of options.
"""
plist = map(lambda x: x.strip(), line.split(';'))
if isinstance(line, email.header.Header): # file upload
line = ''.join(val for val, charset in line._chunks)
plist = [val.strip() for val in line.split(';')]
key = plist.pop(0).lower()
pdict = {}
for p in plist:
@ -100,8 +98,8 @@ def parse_query(qs, charset):
value = ''
else:
name, value = chunk.split('=', 1)
name = urllib.unquote(name.replace('+', ' '))
value = urllib.unquote(value.replace('+', ' '))
name = urllib.parse.unquote_plus(name)
value = urllib.parse.unquote_plus(value)
name = _decode_string(name, charset)
value = _decode_string(value, charset)
_add_field_value(fields, name, value)
@ -205,7 +203,7 @@ class HTTPRequest:
# middleware expect that. We cannot rely on the application to
# read it completely (e.g. if there is some PublishError raised).
if length < 20000:
fp = StringIO()
fp = io.BytesIO()
else:
fp = tempfile.TemporaryFile("w+b")
remaining = length
@ -226,6 +224,8 @@ class HTTPRequest:
# Use the declared charset if it's provided (most browser's don't
# provide it to avoid breaking old HTTP servers).
charset = params.get('charset', self.charset)
# should contain only ASCII characters but parse as iso-8859-1
query = query.decode('iso-8859-1')
self.form.update(parse_query(query, charset))
def _process_multipart(self, length, params):
@ -243,14 +243,14 @@ class HTTPRequest:
raise RequestError('unexpected end of multipart/form-data')
def _process_multipart_body(self, mimeinput, charset):
headers = StringIO()
headers = io.BytesIO()
lines = mimeinput.readpart()
for line in lines:
headers.write(line)
if line == '\r\n':
if line == b'\r\n':
break
headers.seek(0)
headers = rfc822.Message(headers)
headers = email.message_from_binary_file(headers)
ctype, ctype_params = parse_header(headers.get('content-type', ''))
if ctype and 'charset' in ctype_params:
charset = ctype_params['charset']
@ -272,7 +272,7 @@ class HTTPRequest:
upload.receive(lines)
_add_field_value(self.form, name, upload)
else:
value = _decode_string(''.join(lines), charset or self.charset)
value = _decode_string(b''.join(lines), charset or self.charset)
_add_field_value(self.form, name, value)
def get_header(self, name, default=None):
@ -408,7 +408,7 @@ class HTTPRequest:
any).
"""
return "%s://%s%s" % (self.get_scheme(), self.get_server(),
urllib.quote(self.get_path(n)))
urllib.parse.quote(self.get_path(n)))
def get_environ(self, key, default=None):
"""get_environ(key : string) -> string
@ -616,7 +616,7 @@ def parse_cookies(text):
result[name] = value
return result
SAFE_CHARS = string.letters + string.digits + "-@&+=_., "
SAFE_CHARS = string.ascii_letters + string.digits + "-@&+=_., "
_safe_trans = None
def make_safe_filename(s):
@ -737,7 +737,7 @@ class LineInput:
def __init__(self, fp, length):
self.fp = fp
self.length = length
self.buf = ''
self.buf = b''
def readline(self, maxlength=4096):
# fill buffer
@ -751,17 +751,17 @@ class LineInput:
self.buf += chunk
# split into lines
buf = self.buf
i = buf.find('\r\n')
i = buf.find(b'\r\n')
if i >= 0:
i += 2
self.buf = buf[i:]
return buf[:i]
elif buf.endswith('\r'):
elif buf.endswith(b'\r'):
# avoid splitting CR LF pairs
self.buf = '\r'
self.buf = b'\r'
return buf[:-1]
else:
self.buf = ''
self.buf = b''
return buf
class MIMEInput:
@ -772,7 +772,7 @@ class MIMEInput:
def __init__(self, fp, boundary, length):
self.lineinput = LineInput(fp, length)
self.pat = re.compile(r'--%s(--)?' % re.escape(boundary))
self.pat = b'--' + boundary.encode('iso-8859-1')
self.done = False
def moreparts(self):
@ -783,20 +783,20 @@ class MIMEInput:
"""Generate all the lines up to a MIME boundary. Note that you
must exhaust the generator before calling this function again."""
assert not self.done
last_line = ''
last_line = b''
while 1:
line = self.lineinput.readline()
if not line:
# Hit EOF -- nothing more to read. This should *not* happen
# in a well-formed MIME message.
raise EOFError('MIME boundary not found (end of input)')
if last_line.endswith('\r\n') or last_line == '':
m = self.pat.match(line)
if m:
# FIXME: check this
if last_line.endswith(b'\r\n') or last_line == b'':
if line.startswith(self.pat):
# If we hit the boundary line, return now. Forget
# the current line *and* the CRLF ending of the
# previous line.
if m.group(1):
if line.startswith(self.pat + b'--'):
# hit final boundary
self.done = True
yield last_line[:-2]

View File

@ -249,12 +249,12 @@ class HTTPResponse:
return chunk
def _compress_body(self, body):
"""(body: str) -> str
"""(body: bytes) -> bytes
"""
n = len(body)
compressed_body = ''.join(self._generate_compressed([body]))
compressed_body = b''.join(self._generate_compressed([body]))
ratio = float(n) / len(compressed_body)
#print "gzip original size %d, ratio %.1f" % (n, ratio)
#print("gzip original size %d, ratio %.1f" % (n, ratio))
if ratio > 1.0:
# only compress if we save space
self.set_header("Content-Encoding", "gzip")
@ -509,8 +509,8 @@ class HTTPResponse:
# The stream is terminated by a zero length chunk.
for chunk in stream:
if chunk:
yield ''.join(['%x\r\n' % len(chunk), chunk, '\r\n'])
yield '0\r\n\r\n'
yield b''.join([('%x\r\n' % len(chunk)).encode(), chunk, b'\r\n'])
yield b'0\r\n\r\n'
def generate_body_chunks(self):
stream = self._generate_encoded_body()
@ -534,11 +534,13 @@ class HTTPResponse:
flush_output = not self.buffered and hasattr(output, 'flush')
if include_status:
# "Status" header must come first.
output.write("Status: %03d %s\r\n" % (self.status_code,
self.reason_phrase))
s = 'Status: %03d %s\r\n' % (self.status_code, self.reason_phrase)
output.write(s.encode('utf-8'))
for name, value in self.generate_headers():
output.write("%s: %s\r\n" % (name, value))
output.write("\r\n")
s = "%s: %s\r\n" % (name, value))
output.write(s.encode('utf-8'))
output.write(b"\r\n")
if flush_output:
output.flush()
if not include_body:

View File

@ -1,9 +1,9 @@
"""Logic for publishing modules and objects on the Web.
"""
import sys, traceback, StringIO
import sys, traceback, io
import time
import urlparse
import urllib.parse
import cgitb
from quixote.errors import PublishError, MethodNotAllowedError, \
@ -186,7 +186,7 @@ class Publisher:
def _generate_plaintext_error(self, request, original_response,
exc_type, exc_value, tb):
error_file = StringIO.StringIO()
error_file = io.StringIO()
# format the traceback
traceback.print_exception(exc_type, exc_value, tb, file=error_file)
@ -201,14 +201,23 @@ class Publisher:
def _generate_cgitb_error(self, request, original_response,
exc_type, exc_value, tb):
error_file = StringIO.StringIO()
# let cgitb.Hook have the type it wants...
error_file = io.StringIO()
hook = cgitb.Hook(file=error_file)
hook(exc_type, exc_value, tb)
error_file.write('<h2>Original Request</h2>')
error_file.write(str(util.dump_request(request)))
error_file.write('<h2>Original Response</h2><pre>')
original_response.write(error_file)
error_file.write('</pre>')
byte_error_file = io.BytesIO()
byte_error_file.write(b'<h2>Original Request</h2>')
# dump_request returns an HTMLText object
s = str(util.dump_request(request))
byte_error_file.write(s.encode('latin-1', 'strict'))
byte_error_file.write(b'<h2>Original Response</h2><pre>')
original_response.write(byte_error_file)
byte_error_file.write(b'</pre>')
# Now we push the bytes to the "real" error file...
s = byte_error_file.getvalue().decode('latin-1')
error_file.write(s)
return error_file.getvalue()
@ -332,7 +341,7 @@ def redirect(location, permanent=False):
not honor the redirect).
"""
request = _publisher.get_request()
location = urlparse.urljoin(request.get_url(), str(location))
location = urllib.parse.urljoin(request.get_url(), str(location))
return request.response.redirect(location, permanent)
def get_session():

View File

@ -1,9 +1,11 @@
#!/usr/bin/env python
#!/usr/bin/env python3
"""An HTTP handler for Medusa that publishes a Quixote application.
"""
import asyncore, rfc822, socket, urllib
from StringIO import StringIO
import asyncore
import email
import socket, urllib.request, urllib.parse, urllib.error
from io import StringIO
from medusa import http_server, xmlrpc_handler
import quixote
@ -14,7 +16,7 @@ class StreamProducer:
def more(self):
try:
return self.chunks.next()
return next(self.chunks)
except StopIteration:
return ''
@ -29,7 +31,7 @@ class QuixoteHandler:
return True
def handle_request(self, request):
msg = rfc822.Message(StringIO('\n'.join(request.header)))
msg = email.Message(StringIO('\n'.join(request.header)))
length = int(msg.get('Content-Length', '0'))
if length:
request.collector = xmlrpc_handler.collector(self, request)
@ -37,7 +39,7 @@ class QuixoteHandler:
self.continue_request('', request)
def continue_request(self, data, request):
msg = rfc822.Message(StringIO('\n'.join(request.header)))
msg = email.Message(StringIO('\n'.join(request.header)))
remote_addr, remote_port = request.channel.addr
if '#' in request.uri:
# MSIE is buggy and sometimes includes fragments in URLs
@ -48,7 +50,7 @@ class QuixoteHandler:
path = request.uri
query_string = ''
path = urllib.unquote(path)
path = urllib.parse.unquote(path)
server_port = str(self.server.port)
http_host = msg.get("Host")
if http_host:

View File

@ -11,8 +11,8 @@ class QuixoteHandler(scgi_server.SCGIHandler):
self.script_name = script_name
def handle_connection(self, conn):
input = conn.makefile("r")
output = conn.makefile("w")
input = conn.makefile("rb")
output = conn.makefile("wb")
env = self.read_env(input)
if self.script_name is not None:

View File

@ -1,9 +1,9 @@
#!/usr/bin/env python
#!/usr/bin/env python3
"""A simple, single threaded, synchronous HTTP server.
"""
import sys
from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
import urllib
from http.server import BaseHTTPRequestHandler, HTTPServer
import urllib.request, urllib.parse, urllib.error
import quixote
from quixote import get_publisher
from quixote.util import import_object
@ -28,12 +28,9 @@ class HTTPRequestHandler(BaseHTTPRequestHandler):
env['PATH_INFO'], env['QUERY_STRING'] = self.path.split('?', 1)
else:
env['PATH_INFO'] = self.path
env['PATH_INFO'] = urllib.unquote(env['PATH_INFO'])
if self.headers.typeheader is None:
env['CONTENT_TYPE'] = self.headers.type
else:
env['CONTENT_TYPE'] = self.headers.typeheader
env['CONTENT_LENGTH'] = self.headers.getheader('content-length') or "0"
env['PATH_INFO'] = urllib.parse.unquote(env['PATH_INFO'])
env['CONTENT_TYPE'] = self.headers.get('content-type')
env['CONTENT_LENGTH'] = self.headers.get('content-length') or "0"
for name, value in self.headers.items():
header_name = 'HTTP_' + name.upper().replace('-', '_')
env[header_name] = value
@ -56,12 +53,11 @@ class HTTPRequestHandler(BaseHTTPRequestHandler):
# single threaded server, persistent connections will block others
response.set_header('connection', 'close')
try:
self.send_response(response.get_status_code(),
response.get_reason_phrase())
response.write(self.wfile, include_status=False,
include_body=include_body)
except IOError, err:
print "IOError while sending response ignored: %s" % err
self.send_response(response.get_status_code(), response.get_reason_phrase())
self.flush_headers()
response.write(self.wfile, include_status=False, include_body=include_body)
except IOError as err:
print("IOError while sending response ignored: %s" % err)
def do_POST(self):
return self.process(self.get_cgi_env('POST'))
@ -78,14 +74,7 @@ class HTTPRequestHandler(BaseHTTPRequestHandler):
that adds the 'Date' header is removed to avoid duplicating the one
that Quixote adds and the log_request() call has been removed.
"""
if message is None:
if code in self.responses:
message = self.responses[code][0]
else:
message = ''
if self.request_version != 'HTTP/0.9':
self.wfile.write("%s %d %s\r\n" %
(self.protocol_version, code, message))
self.send_response_only(code, message)
self.send_header('Server', self.version_string())
def run(create_publisher, host='', port=80, https=False):

View File

@ -2,8 +2,9 @@
"""An HTTP server for Twisted that publishes a Quixote application.
"""
import urllib
from twisted.web import http, server
import urllib.request, urllib.parse, urllib.error
from twisted.protocols import http
from twisted.web import server
from twisted.python import threadable
from twisted.internet import reactor
@ -45,7 +46,7 @@ class QuixoteRequest(server.Request):
"""
# Twisted doesn't decode the path for us, so let's do it here.
if '%' in self.path:
self.path = urllib.unquote(self.path)
self.path = urllib.parse.unquote(self.path)
serverName = self.getRequestHostname().split(':')[0]
env = {"SERVER_SOFTWARE": server.version,
@ -109,7 +110,7 @@ class QuixoteProducer:
def resumeProducing(self):
if self.request:
try:
chunk = self.stream.next()
chunk = next(self.stream)
except StopIteration:
self.request.unregisterProducer()
self.request.finish()

View File

@ -18,9 +18,10 @@ import os
import time
import base64
import mimetypes
import urllib
import xmlrpclib
from rfc822 import formatdate
import urllib.request, urllib.parse, urllib.error
import xmlrpc.client
from email.utils import formatdate
import quixote
from quixote import errors
from quixote.directory import Directory
@ -68,20 +69,20 @@ def xmlrpc(request, func):
data = request.stdin.read(length)
# Parse arguments
params, method = xmlrpclib.loads(data)
params, method = xmlrpc.client.loads(data)
try:
result = func(method, params)
except xmlrpclib.Fault, exc:
except xmlrpc.client.Fault as exc:
result = exc
except:
# report exception back to client
result = xmlrpclib.dumps(
xmlrpclib.Fault(1, "%s:%s" % (sys.exc_type, sys.exc_value))
result = xmlrpc.client.dumps(
xmlrpc.client.Fault(1, "%s:%s" % (sys.exc_info()[0], sys.exc_info()[1]))
)
else:
result = (result,)
result = xmlrpclib.dumps(result, methodresponse=1)
result = xmlrpc.client.dumps(result, methodresponse=1)
request.response.set_content_type('text/xml')
return result
@ -98,7 +99,7 @@ class FileStream(Stream):
def __iter__(self):
return self
def next(self):
def __next__(self):
chunk = self.fp.read(self.CHUNK_SIZE)
if not chunk:
raise StopIteration
@ -249,7 +250,7 @@ class StaticDirectory(Directory):
for filename in files:
filepath = os.path.join(self.path, filename)
marker = os.path.isdir(filepath) and "/" or ""
r += template % (urllib.quote(filename), filename, marker)
r += template % (urllib.parse.quote(filename), filename, marker)
r += htmltext('</pre>')
body = r.getvalue()
else:

View File

@ -9,10 +9,8 @@ To create an application object, execute
Authors: Mike Orr <mso@oz.net> and Titus Brown <titus@caltech.edu>.
Last updated 2005-05-03.
"""
import sys
from http_request import HTTPRequest
from StringIO import StringIO
from .http_request import HTTPRequest
###### QWIP: WSGI COMPATIBILITY WRAPPER FOR QUIXOTE #####################

View File

@ -2,10 +2,9 @@ import sys, subprocess
import quixote
from quixote.server.simple_server import run
from StringIO import StringIO
from io import StringIO
import os
import socket
import urllib
import urllib.request, urllib.parse, urllib.error
_server_url = None
@ -74,7 +73,7 @@ def kill_server():
global _server_url
if _server_url != None:
try:
fp = urllib.urlopen('%sexit' % (_server_url,))
fp = urllib.request.urlopen('%sexit' % (_server_url,))
except:
pass