misc: turn urlopen into a _http_request wrapper (#19437)

This commit is contained in:
Frédéric Péters 2017-11-15 14:54:38 +04:00
parent 218bc1063b
commit 0773410d02
16 changed files with 95 additions and 103 deletions

View File

@ -3650,7 +3650,7 @@ def test_settings_idp(pub):
resp = resp.forms[0].submit() # confirm delete
assert len(pub.cfg['idp']) == 0
with mock.patch('urllib2.urlopen') as urlopen:
with mock.patch('qommon.misc.urlopen') as urlopen:
idp_metadata_filename = os.path.join(os.path.dirname(__file__), 'idp_metadata.xml')
urlopen.side_effect = lambda *args: open(idp_metadata_filename)
resp = app.get('/backoffice/settings/identification/idp/idp/')

View File

@ -1835,7 +1835,7 @@ def test_get_secret_and_orig(no_request_pub):
assert orig == 'example.net'
def test_reverse_geocoding(pub):
with mock.patch('urllib2.urlopen') as urlopen:
with mock.patch('qommon.misc.urlopen') as urlopen:
urlopen.side_effect = lambda *args: StringIO('xxx')
get_app(pub).get('/api/reverse-geocoding', status=400)
resp = get_app(pub).get('/api/reverse-geocoding?lat=0&lon=0')

View File

@ -1,4 +1,5 @@
# -*- coding: utf-8 -*-
from cStringIO import StringIO
import pytest
import os
import json
@ -295,9 +296,9 @@ def test_data_source_unicode():
data_source2 = NamedDataSource.select()[0]
assert data_source2.data_source == data_source.data_source
with mock.patch('wcs.data_sources.urllib2') as urllib2:
urllib2.urlopen.return_value.read.return_value = \
'{"data": [{"id": 0, "text": "zéro"}, {"id": 1, "text": "uné"}, {"id": 2, "text": "deux"}]}'
with mock.patch('qommon.misc.urlopen') as urlopen:
urlopen.side_effect = lambda *args: StringIO(
'{"data": [{"id": 0, "text": "zéro"}, {"id": 1, "text": "uné"}, {"id": 2, "text": "deux"}]}')
assert data_sources.get_items({'type': 'foobar'}) == [
('0', 'zéro', '0', {"id": 0, "text": "zéro"}),
('1', 'uné', '1', {"id": 1, "text": "uné"}),
@ -322,11 +323,11 @@ def test_data_source_signed(no_request_pub):
data_source.data_source = {'type': 'json', 'value': "https://api.example.com/json"}
data_source.store()
with mock.patch('wcs.data_sources.urllib2') as urllib2:
urllib2.urlopen.return_value.read.return_value = \
'{"data": [{"id": 0, "text": "zero"}]}'
with mock.patch('qommon.misc.urlopen') as urlopen:
urlopen.side_effect = lambda *args: StringIO(
'{"data": [{"id": 0, "text": "zero"}]}')
assert len(data_sources.get_items({'type': 'foobar'})) == 1
signed_url = urllib2.urlopen.call_args[0][0]
signed_url = urlopen.call_args[0][0]
assert signed_url.startswith('https://api.example.com/json?')
parsed = urlparse.urlparse(signed_url)
querystring = urlparse.parse_qs(parsed.query)
@ -339,11 +340,11 @@ def test_data_source_signed(no_request_pub):
data_source.data_source = {'type': 'json', 'value': "https://api.example.com/json?foo=bar"}
data_source.store()
with mock.patch('wcs.data_sources.urllib2') as urllib2:
urllib2.urlopen.return_value.read.return_value = \
'{"data": [{"id": 0, "text": "zero"}]}'
with mock.patch('qommon.misc.urlopen') as urlopen:
urlopen.side_effect = lambda *args: StringIO(
'{"data": [{"id": 0, "text": "zero"}]}')
assert len(data_sources.get_items({'type': 'foobar'})) == 1
signed_url = urllib2.urlopen.call_args[0][0]
signed_url = urlopen.call_args[0][0]
assert signed_url.startswith('https://api.example.com/json?')
parsed = urlparse.urlparse(signed_url)
querystring = urlparse.parse_qs(parsed.query)
@ -356,9 +357,9 @@ def test_data_source_signed(no_request_pub):
data_source.data_source = {'type': 'json', 'value': "https://no-secret.example.com/json"}
data_source.store()
with mock.patch('wcs.data_sources.urllib2') as urllib2:
urllib2.urlopen.return_value.read.return_value = \
'{"data": [{"id": 0, "text": "zero"}]}'
with mock.patch('qommon.misc.urlopen') as urlopen:
urlopen.side_effect = lambda *args: StringIO(
'{"data": [{"id": 0, "text": "zero"}]}')
assert len(data_sources.get_items({'type': 'foobar'})) == 1
unsigned_url = urllib2.urlopen.call_args[0][0]
unsigned_url = urlopen.call_args[0][0]
assert unsigned_url == 'https://no-secret.example.com/json'

View File

@ -3486,7 +3486,7 @@ def test_item_field_with_disabled_items(pub):
data_source=ds, display_disabled_items=True)]
formdef.store()
with mock.patch('urllib2.urlopen') as urlopen:
with mock.patch('qommon.misc.urlopen') as urlopen:
data = {'data': [{'id': '1', 'text': 'hello'}, {'id': '2', 'text': 'world'}]}
urlopen.side_effect = lambda *args: StringIO.StringIO(json.dumps(data))
resp = get_app(pub).get('/test/')
@ -3499,7 +3499,7 @@ def test_item_field_with_disabled_items(pub):
formdef.data_class().wipe()
with mock.patch('urllib2.urlopen') as urlopen:
with mock.patch('qommon.misc.urlopen') as urlopen:
data = {'data': [{'id': '1', 'text': 'hello', 'disabled': True}, {'id': '2', 'text': 'world'}]}
urlopen.side_effect = lambda *args: StringIO.StringIO(json.dumps(data))
resp = get_app(pub).get('/test/')
@ -3522,7 +3522,7 @@ def test_item_field_with_disabled_items(pub):
display_disabled_items=False)]
formdef.store()
with mock.patch('urllib2.urlopen') as urlopen:
with mock.patch('qommon.misc.urlopen') as urlopen:
data = {'data': [{'id': '1', 'text': 'hello', 'disabled': True}, {'id': '2', 'text': 'world'}]}
urlopen.side_effect = lambda *args: StringIO.StringIO(json.dumps(data))
resp = get_app(pub).get('/test/')
@ -3538,7 +3538,7 @@ def test_item_field_with_disabled_items(pub):
data_source=ds, show_as_radio=True, display_disabled_items=True)]
formdef.store()
with mock.patch('urllib2.urlopen') as urlopen:
with mock.patch('qommon.misc.urlopen') as urlopen:
data = {'data': [{'id': '1', 'text': 'hello'}, {'id': '2', 'text': 'world'}]}
urlopen.side_effect = lambda *args: StringIO.StringIO(json.dumps(data))
resp = get_app(pub).get('/test/')
@ -3551,7 +3551,7 @@ def test_item_field_with_disabled_items(pub):
formdef.data_class().wipe()
with mock.patch('urllib2.urlopen') as urlopen:
with mock.patch('qommon.misc.urlopen') as urlopen:
data = {'data': [{'id': '1', 'text': 'hello', 'disabled': True}, {'id': '2', 'text': 'world'}]}
urlopen.side_effect = lambda *args: StringIO.StringIO(json.dumps(data))
resp = get_app(pub).get('/test/')
@ -3578,7 +3578,7 @@ def test_items_field_with_disabled_items(pub):
data_source=ds, display_disabled_items=True)]
formdef.store()
with mock.patch('urllib2.urlopen') as urlopen:
with mock.patch('qommon.misc.urlopen') as urlopen:
data = {'data': [{'id': '1', 'text': 'hello'}, {'id': '2', 'text': 'world'}]}
urlopen.side_effect = lambda *args: StringIO.StringIO(json.dumps(data))
resp = get_app(pub).get('/test/')
@ -3591,7 +3591,7 @@ def test_items_field_with_disabled_items(pub):
formdef.data_class().wipe()
with mock.patch('urllib2.urlopen') as urlopen:
with mock.patch('qommon.misc.urlopen') as urlopen:
data = {'data': [{'id': '1', 'text': 'hello', 'disabled': True}, {'id': '2', 'text': 'world'}]}
urlopen.side_effect = lambda *args: StringIO.StringIO(json.dumps(data))
resp = get_app(pub).get('/test/')
@ -3608,7 +3608,7 @@ def test_items_field_with_disabled_items(pub):
data_source=ds, display_disabled_items=False)]
formdef.store()
with mock.patch('urllib2.urlopen') as urlopen:
with mock.patch('qommon.misc.urlopen') as urlopen:
data = {'data': [{'id': '1', 'text': 'hello', 'disabled': True}, {'id': '2', 'text': 'world'}]}
urlopen.side_effect = lambda *args: StringIO.StringIO(json.dumps(data))
resp = get_app(pub).get('/test/')

View File

@ -344,14 +344,8 @@ def test_configure_authentication_methods():
pub.cfg['idp'] = {}
service = [x for x in HOBO_JSON.get('services', []) if x.get('service-id') == 'wcs'][0]
# with an error retrieving metadata
hobo_cmd.configure_authentication_methods(service, pub)
# with real metadata
with mock.patch('urllib2.urlopen') as urlopen:
idp_metadata_filename = os.path.join(os.path.dirname(__file__), 'idp_metadata.xml')
urlopen.side_effect = lambda *args: open(idp_metadata_filename)
hobo_cmd.configure_authentication_methods(service, pub)
hobo_cmd.configure_authentication_methods(service, pub)
assert len(pub.cfg['idp'].keys()) == 1
assert pub.cfg['saml_identities']['registration-url']

View File

@ -230,7 +230,8 @@ class HttpRequestsMocking(object):
wcs.qommon.misc._http_request = self.http_request
qommon.misc._http_request = self.http_request
def http_request(self, url, method='GET', body=None, headers={}, timeout=None):
def http_request(self, url, method='GET', body=None, headers={},
cert_file=None, timeout=None, raise_on_http_errors=False):
self.requests.append(
{'url': url,
'method': method,
@ -262,8 +263,16 @@ class HttpRequestsMocking(object):
'http://remote.example.net/xml-errheader': (200, '<?xml version="1.0"><foo/>',
{'content-type': 'text/xml', 'x-error-code': '1'}),
'http://remote.example.net/connection-error': (None, None, None),
'http://authentic.example.net/idp/saml2/metadata': (
200, open(os.path.join(os.path.dirname(__file__), 'idp_metadata.xml')).read(), None),
}.get(base_url, (200, '', {}))
if url.startswith('file://'):
try:
status, data = 200, open(url[7:]).read()
except IOError:
status = 404
class FakeResponse(object):
def __init__(self, status, data, headers):
self.status = status
@ -274,6 +283,10 @@ class HttpRequestsMocking(object):
if status is None:
raise ConnectionError('error')
if raise_on_http_errors and not (200 <= status < 300):
raise ConnectionError('error in HTTP request to (status: %s)' % status)
return FakeResponse(status, data, headers), status, data, None
def get_last(self, attribute):

View File

@ -19,7 +19,6 @@ import datetime
import difflib
import tarfile
import time
import urllib2
from cStringIO import StringIO
from quixote import get_response, redirect
@ -904,11 +903,8 @@ class FormDefPage(Directory):
elif form.get_widget('url').parse():
url = form.get_widget('url').parse()
try:
fp = urllib2.urlopen(url)
except urllib2.HTTPError as e:
form.set_error('url', _('Error loading form (%s).') % str(e))
raise ValueError()
except urllib2.URLError as e:
fp = misc.urlopen(url)
except misc.ConnectionError as e:
form.set_error('url', _('Error loading form (%s).') % str(e))
raise ValueError()
else:
@ -1540,11 +1536,8 @@ class FormsDirectory(AccessControlled, Directory):
elif form.get_widget('url').parse():
url = form.get_widget('url').parse()
try:
fp = urllib2.urlopen(url)
except urllib2.HTTPError as e:
form.set_error('url', _('Error loading form (%s).') % str(e))
raise ValueError()
except urllib2.URLError as e:
fp = misc.urlopen(url)
except misc.ConnectionError as e:
form.set_error('url', _('Error loading form (%s).') % str(e))
raise ValueError()
else:

View File

@ -19,7 +19,6 @@ import cStringIO
import hashlib
import mimetypes
import os
import urllib2
try:
import lasso
except ImportError:
@ -36,6 +35,7 @@ from quixote.html import TemplateIO, htmltext
from qommon import _
from qommon import get_cfg
from qommon import errors
from qommon import misc
from qommon.form import *
from qommon.sms import SMS
@ -728,11 +728,8 @@ class SettingsDirectory(QommonSettingsDirectory):
def install_theme_from_url(self, url):
try:
fp = urllib2.urlopen(url)
except urllib2.HTTPError as e:
get_session().message = ('error', _('Error loading theme (%s).') % str(e))
return redirect('themes')
except urllib2.URLError as e:
fp = misc.urlopen(url)
except misc.ConnectionError as e:
get_session().message = ('error', _('Error loading theme (%s).') % str(e))
return redirect('themes')

View File

@ -21,7 +21,6 @@ from StringIO import StringIO
from subprocess import Popen, PIPE
import textwrap
import xml.etree.ElementTree as ET
import urllib2
from quixote import redirect, get_publisher
from quixote.directory import Directory
@ -1873,11 +1872,8 @@ class WorkflowsDirectory(Directory):
elif form.get_widget('url').parse():
url = form.get_widget('url').parse()
try:
fp = urllib2.urlopen(url)
except urllib2.HTTPError as e:
form.set_error('url', _('Error loading form (%s).') % str(e))
raise ValueError()
except urllib2.URLError as e:
fp = misc.urlopen(url)
except misc.ConnectionError as e:
form.set_error('url', _('Error loading form (%s).') % str(e))
raise ValueError()
else:

View File

@ -17,7 +17,6 @@
import json
import re
import time
import urllib2
import sys
from quixote import get_request, get_publisher, get_response
@ -672,7 +671,7 @@ class ApiDirectory(Directory):
url += '&accept-language=%s' % (get_publisher().get_site_language() or 'en')
if get_publisher().get_site_option('nominatim_key'):
url += '&key=' + get_publisher().get_site_option('nominatim_key')
return urllib2.urlopen(url).read()
return misc.urlopen(url).read()
def roles(self):
get_response().set_content_type('application/json')

View File

@ -26,6 +26,7 @@ import urlparse
import hashlib
from quixote import cleanup
from qommon import misc
from qommon.ctl import Command, make_option
from qommon.storage import atomic_write
@ -292,11 +293,8 @@ class CmdCheckHobos(Command):
idp['base_url'] = idp['base_url'] + '/'
metadata_url = '%sidp/saml2/metadata' % idp['base_url']
try:
rfd = urllib2.urlopen(metadata_url)
except (urllib2.HTTPError, urllib2.URLError), e:
print >> sys.stderr, 'failed to get metadata URL', metadata_url, e
continue
except Exception, e:
rfd = misc.urlopen(metadata_url)
except misc.ConnectionError as e:
print >> sys.stderr, 'failed to get metadata URL', metadata_url, e
continue

View File

@ -15,7 +15,6 @@
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import collections
import urllib2
import urllib
import urlparse
import xml.etree.ElementTree as ET
@ -163,7 +162,7 @@ def get_structured_items(data_source):
url = urlparse.urlunparse(parsed[:4] + (querystring,) + parsed[5:6])
url = sign_url(url, signature_key)
try:
entries = qommon.misc.json_loads(urllib2.urlopen(url).read())
entries = qommon.misc.json_loads(qommon.misc.urlopen(url).read())
if type(entries) is not dict:
raise ValueError('not a json dict')
if type(entries.get('data')) is not list:
@ -177,9 +176,7 @@ def get_structured_items(data_source):
item['text'] = item['id']
items.append(item)
return items
except urllib2.HTTPError as e:
get_logger().warn('Error loading JSON data source (%s)' % str(e))
except urllib2.URLError as e:
except qommon.misc.ConnectionError as e:
get_logger().warn('Error loading JSON data source (%s)' % str(e))
except ValueError as e:
get_logger().warn('Error reading JSON data source output (%s)' % str(e))

View File

@ -21,7 +21,7 @@ import urllib
import base64
from qommon import get_logger
from qommon.misc import http_get_page, json_loads, http_post_request
from qommon.misc import http_get_page, json_loads, http_post_request, urlopen
import qommon.form
from quixote import get_publisher, get_request, get_response, redirect, get_session
from quixote.directory import Directory
@ -132,7 +132,7 @@ class FargoDirectory(Directory):
# Download file
# FIXME: handle error cases
url = request.form['url']
document = urllib.urlopen(request.form['url']).read()
document = urlopen(request.form['url']).read()
scheme, netloc, path, qs, frag = urlparse.urlsplit(url)
path = map(None, path.split('/'))
name = urllib.unquote(path[-1])

View File

@ -20,7 +20,6 @@ except ImportError:
lasso = None
import urllib
import urllib2
import urlparse
from quixote.directory import Directory
@ -406,12 +405,9 @@ class AdminIDPDir(Directory):
metadata_url = form.get_widget('metadata_url').parse()
publickey_url = None
try:
rfd = urllib2.urlopen(metadata_url)
except (urllib2.HTTPError, urllib2.URLError), e:
if e.code == 404:
form.set_error('metadata_url', _('Resource not found'))
else:
form.set_error('metadata_url', _('HTTP error on retrieval: %s') % e.code)
rfd = misc.urlopen(metadata_url)
except misc.ConnectionError as e:
form.set_error('metadata_url', _('Failed to retrieve file (%s)') % e)
except:
form.set_error('metadata_url', _('Failed to retrieve file'))
else:
@ -434,12 +430,9 @@ class AdminIDPDir(Directory):
form.add(StringWidget, 'publickey_url', title = _('URL to public key'),
required=True, size = 60)
try:
rfd = urllib2.urlopen(publickey_url)
except (urllib2.HTTPError, urllib2.URLError), e:
if e.code == 404:
form.set_error('publickey_url', _('Resource not found'))
else:
form.set_error('publickey_url', _('HTTP error on retrieval: %s') % e.code)
rfd = misc.urlopen(publickey_url)
except misc.ConnectionError as e:
form.set_error('metadata_url', _('Failed to retrieve file (%s)') % e)
except:
form.set_error('publickey_url', _('Failed to retrieve file'))
else:
@ -654,16 +647,16 @@ class AdminIDPUI(Directory):
metadata_url = self.idp.get('metadata_url')
try:
metadata_fd = urllib2.urlopen(metadata_url)
except (urllib2.HTTPError, urllib2.URLError), e:
metadata_fd = misc.urlopen(metadata_url)
except misc.ConnectionError:
return template.error_page('failed to download')
metadata = metadata_fd.read()
publickey_url = self.idp.get('publickey_url')
if publickey_url:
try:
publickey_fd = urllib2.urlopen(publickey_url)
except (urllib2.HTTPError, urllib2.URLError), e:
publickey_fd = misc.urlopen(publickey_url)
except misc.ConnectionError:
return template.error_page('failed to download')
publickey = publickey_fd.read()
else:

View File

@ -268,7 +268,8 @@ def format_time(datetime, formatstring, gmtime = False):
return formatstring % locals()
def _http_request(url, method='GET', body=None, headers={}, cert_file=None, timeout=None):
def _http_request(url, method='GET', body=None, headers={}, cert_file=None, timeout=None,
raise_on_http_errors=False):
get_publisher().reload_cfg()
if url.startswith('http://'):
@ -292,14 +293,25 @@ def _http_request(url, method='GET', body=None, headers={}, cert_file=None, time
except requests.Timeout:
raise ConnectionError('connection timed out while fetching the page')
except requests.RequestException as err:
raise ConnectionError('error in http request to to %s (%s)' % (hostname, err))
raise ConnectionError('error in HTTP request to %s (%s)' % (hostname, err))
else:
data = response.content
status = response.status_code
auth_header = response.headers.get('WWW-Authenticate')
if raise_on_http_errors and not (200 <= status < 300):
raise ConnectionError('error in HTTP request to (status: %s)' % status)
return response, status, data, auth_header
def urlopen(url, data=None):
response, status, data, auth_header = _http_request(
url, 'GET' if data is None else 'POST',
body=data,
raise_on_http_errors=True)
return StringIO(data)
def http_get_page(url, **kwargs):
return _http_request(url, **kwargs)

View File

@ -15,10 +15,9 @@
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import re
import urllib2
import urllib
from qommon import errors
from qommon import errors, misc
from qommon import get_cfg, get_logger
from qommon.form import StringWidget, PasswordWidget
from wcs.wscalls import call_webservice
@ -52,9 +51,9 @@ class MobytSMS(object):
'qty': quality
})
try:
r = urllib2.urlopen("http://multilevel.mobyt.fr/sms/batch.php", params)
except Exception, e:
raise errors.SMSError("urlopen mobyt.fr failed (%s)" % e)
r = misc.urlopen("http://multilevel.mobyt.fr/sms/batch.php", params)
except misc.ConnectionError as e:
raise errors.SMSError("failed to POST to mobyt.fr (%s)" % e)
answer = r.read()
r.close()
if answer[:2] == "KO":
@ -67,9 +66,9 @@ class MobytSMS(object):
'type': type
})
try:
r = urllib2.urlopen("http://multilevel.mobyt.fr/sms/credit.php", params)
except Exception, e:
raise errors.SMSError("urlopen mobyt.fr failed (%s)" % e)
r = misc.urlopen("http://multilevel.mobyt.fr/sms/credit.php", params)
except misc.ConnectionError as e:
raise errors.SMSError("failed to POST to mobyt.fr (%s)" % e)
answer = r.read()
r.close()
if answer[:2] == "KO":
@ -134,10 +133,10 @@ class OxydSMS(object):
'flash': '0'
})
try:
r = urllib2.urlopen('http://sms.oxyd.fr/send.php', params)
except Exception, e:
r = misc.urlopen('http://sms.oxyd.fr/send.php', params)
except misc.ConnectionError as e:
# XXX: add proper handling of errors
raise errors.SMSError('urlopen oxyd.fr failed (%s)' % e)
raise errors.SMSError("failed to POST to oxyd.fr (%s)" % e)
r.close()
def get_sms_left(self, type='standard'):
@ -185,10 +184,10 @@ class ChoositSMS(object):
'content': text[:160],
})
try:
r = urllib2.urlopen('http://sms.choosit.com/webservice', params)
except Exception, e:
r = misc.urlopen('http://sms.choosit.com/webservice', params)
except misc.ConnectionError as e:
# XXX: add proper handling of errors
raise errors.SMSError('urlopen choosit.com failed (%s)' % e)
raise errors.SMSError("failed to POST to choosit.com (%s)" % e)
r.close()
def get_sms_left(self, type='standard'):