tests: use more responses module to mock HTTP responses (#68527)

Replace every mock.patch('http_related') pattern.
This commit is contained in:
Emmanuel Cazenave 2022-08-30 17:05:39 +02:00 committed by Frédéric Péters
parent 74f0c9e8f0
commit 5522a3f7b3
16 changed files with 518 additions and 551 deletions

View File

@ -3,9 +3,9 @@ import os
import re
import time
import xml.etree.ElementTree as ET
from unittest import mock
import pytest
import responses
from webtest import Upload
from wcs import fields
@ -1354,14 +1354,15 @@ def test_form_import_from_url(pub):
resp = resp.form.submit()
assert 'You have to enter a file or a URL' in resp
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
urlopen.side_effect = ConnectionError('...')
with responses.RequestsMock() as rsps:
rsps.get('http://remote.invalid/test.wcs', body=ConnectionError('...'))
resp.form['url'] = 'http://remote.invalid/test.wcs'
resp = resp.form.submit()
assert 'Error loading form' in resp
urlopen.side_effect = lambda *args: io.StringIO(formdef_xml.decode())
rsps.get('http://remote.invalid/test.wcs', body=formdef_xml.decode())
resp.form['url'] = 'http://remote.invalid/test.wcs'
resp = resp.form.submit()
assert FormDef.count() == 1

View File

@ -8,9 +8,8 @@ try:
except ImportError:
lasso = None
from unittest import mock
import pytest
import responses
from quixote.http_request import Upload as QuixoteUpload
from webtest import Upload
@ -620,20 +619,20 @@ def test_settings_idp(pub):
resp = resp.forms[0].submit() # confirm delete
assert len(pub.cfg['idp']) == 0
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
with responses.RequestsMock() as rsps:
idp_metadata_filename = os.path.join(os.path.dirname(__file__), '..', 'idp_metadata.xml')
# pylint: disable=consider-using-with
urlopen.side_effect = lambda *args: open(idp_metadata_filename, 'rb')
with open(idp_metadata_filename, 'rb') as body:
rsps.get('http://authentic.example.net/idp/saml2/metadata', body=body.read())
resp = app.get('/backoffice/settings/identification/idp/idp/')
resp = resp.click('Create new from remote URL')
resp.form['metadata_url'] = 'http://authentic.example.net/idp/saml2/metadata'
resp = resp.form.submit('submit')
resp = resp.follow()
assert 'http://authentic.example.net/idp/saml2/metadata' in resp.text
assert urlopen.call_count == 1
assert len(rsps.calls) == 1
resp = resp.click('View')
resp = resp.click('Update from remote URL')
assert urlopen.call_count == 2
assert len(rsps.calls) == 2
def test_settings_auth_password(pub):

View File

@ -2,9 +2,9 @@ import io
import os
import re
import xml.etree.ElementTree as ET
from unittest import mock
import pytest
import responses
from quixote.http_request import Upload as QuixoteUpload
from webtest import Radio, Upload
@ -710,12 +710,12 @@ def test_workflows_import_from_url(pub):
resp = resp.form.submit()
assert 'You have to enter a file or a URL' in resp
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
urlopen.side_effect = ConnectionError('...')
with responses.RequestsMock() as rsps:
rsps.get('http://remote.invalid/test.wcs', body=ConnectionError('...'))
resp.form['url'] = 'http://remote.invalid/test.wcs'
resp = resp.form.submit()
assert 'Error loading form' in resp
urlopen.side_effect = lambda *args: io.StringIO(wf_export.decode())
rsps.get('http://remote.invalid/test.wcs', body=wf_export.decode())
resp.form['url'] = 'http://remote.invalid/test.wcs'
resp = resp.form.submit()
assert Workflow.count() == 2

View File

@ -1,9 +1,8 @@
import io
import json
import os
from unittest import mock
import pytest
import responses
from quixote import get_publisher
from wcs.api_utils import sign_url
@ -131,14 +130,14 @@ def test_validate_condition(pub):
def test_reverse_geocoding(pub):
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
urlopen.side_effect = lambda *args: io.StringIO(json.dumps({'address': 'xxx'}))
with responses.RequestsMock() as rsps:
rsps.get('https://nominatim.entrouvert.org/reverse', json={'address': 'xxx'})
get_app(pub).get('/api/reverse-geocoding', status=400)
resp = get_app(pub).get('/api/reverse-geocoding?lat=0&lon=0')
assert resp.content_type == 'application/json'
assert resp.text == json.dumps({'address': 'xxx'})
assert (
urlopen.call_args[0][0]
rsps.calls[-1].request.url
== 'https://nominatim.entrouvert.org/reverse?zoom=18&format=json&addressdetails=1&lat=0&lon=0&accept-language=en'
)
@ -148,7 +147,7 @@ def test_reverse_geocoding(pub):
pub.site_options.write(fd)
resp = get_app(pub).get('/api/reverse-geocoding?lat=0&lon=0')
assert (
urlopen.call_args[0][0]
rsps.calls[-1].request.url
== 'https://nominatim.entrouvert.org/reverse?zoom=16&format=json&addressdetails=1&lat=0&lon=0&accept-language=en'
)
@ -157,31 +156,32 @@ def test_reverse_geocoding(pub):
pub.site_options.write(fd)
resp = get_app(pub).get('/api/reverse-geocoding?lat=0&lon=0')
assert (
urlopen.call_args[0][0]
rsps.calls[-1].request.url
== 'https://nominatim.entrouvert.org/reverse?zoom=16&key=KEY&format=json&addressdetails=1&lat=0&lon=0&accept-language=en'
)
pub.site_options.set(
'options', 'reverse_geocoding_service_url', 'http://reverse.example.net/?param=value'
)
rsps.get('http://reverse.example.net/', json={'address': 'xxx'})
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
pub.site_options.write(fd)
resp = get_app(pub).get('/api/reverse-geocoding?lat=0&lon=0')
assert (
urlopen.call_args[0][0]
rsps.calls[-1].request.url
== 'http://reverse.example.net/?param=value&format=json&addressdetails=1&lat=0&lon=0&accept-language=en'
)
def test_geocoding(pub):
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
urlopen.side_effect = lambda *args: io.StringIO(json.dumps([{'lat': 0, 'lon': 0}]))
with responses.RequestsMock() as rsps:
rsps.get('https://nominatim.entrouvert.org/search', json=[{'lat': 0, 'lon': 0}])
get_app(pub).get('/api/geocoding', status=400)
resp = get_app(pub).get('/api/geocoding?q=test')
assert resp.content_type == 'application/json'
assert resp.text == json.dumps([{'lat': 0, 'lon': 0}])
assert (
urlopen.call_args[0][0]
rsps.calls[-1].request.url
== 'https://nominatim.entrouvert.org/search?format=json&q=test&accept-language=en'
)
@ -191,7 +191,7 @@ def test_geocoding(pub):
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
pub.site_options.write(fd)
resp = get_app(pub).get('/api/geocoding?q=test')
assert urlopen.call_args[0][0] == (
assert rsps.calls[-1].request.url == (
'https://nominatim.entrouvert.org/search?viewbox=2.34,1.23,3.45,2.34&bounded=1&'
'format=json&q=test&accept-language=en'
)
@ -203,7 +203,7 @@ def test_geocoding(pub):
pub.site_options.write(fd)
resp = get_app(pub).get('/api/geocoding?q=test')
assert (
urlopen.call_args[0][0]
rsps.calls[-1].request.url
== 'https://nominatim.entrouvert.org/search?key=KEY&format=json&q=test&accept-language=en'
)
@ -212,7 +212,7 @@ def test_geocoding(pub):
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
pub.site_options.write(fd)
resp = get_app(pub).get('/api/geocoding?q=test')
assert urlopen.call_args[0][0] == (
assert rsps.calls[-1].request.url == (
'https://nominatim.entrouvert.org/search?key=KEY&viewbox=2.34,1.23,3.45,2.34&bounded=1&'
'format=json&q=test&accept-language=en'
)
@ -220,9 +220,10 @@ def test_geocoding(pub):
pub.site_options.set('options', 'geocoding_service_url', 'http://reverse.example.net/?param=value')
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
pub.site_options.write(fd)
rsps.get('http://reverse.example.net/', json=[{'lat': 0, 'lon': 0}])
resp = get_app(pub).get('/api/geocoding?q=test')
assert (
urlopen.call_args[0][0]
rsps.calls[-1].request.url
== 'http://reverse.example.net/?param=value&format=json&q=test&accept-language=en'
)

View File

@ -1,12 +1,11 @@
import base64
import io
import json
import os
import time
import urllib.parse
from unittest import mock
import pytest
import responses
from django.utils.encoding import force_text
from quixote import get_publisher
@ -1156,12 +1155,16 @@ def test_formdef_submit_structured(pub, local_user):
)
url = signed_url[len('http://example.net') :]
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
urlopen.side_effect = lambda *args: io.StringIO(
'''\
{"data": [{"id": 0, "text": "zéro", "foo": "bar"}, \
{"id": 1, "text": "uné", "foo": "bar1"}, \
{"id": 2, "text": "deux", "foo": "bar2"}]}'''
with responses.RequestsMock() as rsps:
rsps.get(
'http://datasource.com',
json={
"data": [
{"id": 0, "text": "zéro", "foo": "bar"},
{"id": 1, "text": "uné", "foo": "bar1"},
{"id": 2, "text": "deux", "foo": "bar2"},
]
},
)
resp = get_app(pub).post_json(url, {'data': post_data})

View File

@ -1,14 +1,13 @@
import base64
import datetime
import io
import json
import os
import time
import urllib.parse
from functools import partial
from unittest import mock
import pytest
import responses
from django.utils.encoding import force_text
from quixote import get_publisher
@ -445,12 +444,16 @@ def test_formdef_schema(pub, access):
formdef.tracking_code_verify_fields = ['0']
formdef.store()
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
urlopen.side_effect = lambda *args: io.StringIO(
'''\
{"data": [{"id": 0, "text": "zéro", "foo": "bar"}, \
{"id": 1, "text": "uné", "foo": "bar1"}, \
{"id": 2, "text": "deux", "foo": "bar2"}]}'''
with responses.RequestsMock() as rsps:
rsps.get(
'http://datasource.com',
json={
"data": [
{"id": 0, "text": "zéro", "foo": "bar"},
{"id": 1, "text": "uné", "foo": "bar1"},
{"id": 2, "text": "deux", "foo": "bar2"},
]
},
)
# fails for unauthenticated users
get_app(pub).get('/api/formdefs/test/schema', status=403)
@ -944,13 +947,16 @@ def test_formdef_submit_structured(pub, local_user):
)
return signed_url[len('http://example.net') :]
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
urlopen.side_effect = lambda *args: io.StringIO(
'''\
{"data": [{"id": 0, "text": "zéro", "foo": "bar"}, \
{"id": 1, "text": "uné", "foo": "bar1"}, \
{"id": 2, "text": "deux", "foo": "bar2"}]}'''
)
with responses.RequestsMock() as rsps:
json_data = {
"data": [
{"id": 0, "text": "zéro", "foo": "bar"},
{"id": 1, "text": "uné", "foo": "bar1"},
{"id": 2, "text": "deux", "foo": "bar2"},
]
}
rsps.get('http://datasource.com', json=json_data)
rsps.get('http://datasource.com/bar', json=json_data)
resp = get_app(pub).post_json(
url(),
{
@ -961,9 +967,9 @@ def test_formdef_submit_structured(pub, local_user):
}
},
)
assert len(urlopen.mock_calls) == 2
assert urlopen.call_args_list[0][0] == ('http://datasource.com',)
assert urlopen.call_args_list[1][0] == ('http://datasource.com/bar',)
assert len(rsps.calls) == 2
assert rsps.calls[0].request.url == 'http://datasource.com/'
assert rsps.calls[1].request.url == 'http://datasource.com/bar'
formdata = data_class.get(resp.json['data']['id'])
assert formdata.status == 'wf-new'

View File

@ -1,14 +1,13 @@
import datetime
import io
import json
import os
import random
import re
import time
import zipfile
from unittest import mock
import pytest
import responses
import wcs.qommon.storage as st
from wcs import fields
@ -1745,22 +1744,21 @@ def test_backoffice_wfedit_and_data_source_with_user_info(pub):
app = login(get_app(pub))
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
data = {'data': [{'id': 'A', 'text': 'hello'}, {'id': 'B', 'text': 'world'}]}
def side_effect(url, *args):
assert '?name_id=admin' in url
return io.StringIO(json.dumps(data))
urlopen.side_effect = side_effect
with responses.RequestsMock() as rsps:
rsps.get(
'https://www.example.invalid/',
json={'data': [{'id': 'A', 'text': 'hello'}, {'id': 'B', 'text': 'world'}]},
)
resp = app.get('/backoffice/management/form-title/%s/' % number31.id)
resp = resp.form.submit('button_wfedit')
resp = resp.follow()
assert urlopen.call_count == 1
assert len(rsps.calls) == 1
assert '?name_id=admin' in rsps.calls[-1].request.url
resp.form['f3'].value = 'A'
resp = resp.form.submit('submit')
assert urlopen.call_count == 2
assert len(rsps.calls) == 2
assert '?name_id=admin' in rsps.calls[-1].request.url
resp = resp.follow()
@ -1796,22 +1794,21 @@ def test_backoffice_wfedit_and_workflow_data(pub):
app = login(get_app(pub))
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
data = {'data': [{'id': 'A', 'text': 'hello'}, {'id': 'B', 'text': 'world'}]}
def side_effect(url, *args):
assert '?test=foobar' in url
return io.StringIO(json.dumps(data))
urlopen.side_effect = side_effect
with responses.RequestsMock() as rsps:
rsps.get(
'https://www.example.invalid/',
json={'data': [{'id': 'A', 'text': 'hello'}, {'id': 'B', 'text': 'world'}]},
)
resp = app.get('/backoffice/management/form-title/%s/' % number31.id)
resp = resp.form.submit('button_wfedit')
resp = resp.follow()
assert urlopen.call_count == 1
assert len(rsps.calls) == 1
assert '?test=foobar' in rsps.calls[-1].request.url
resp.form['f3'].value = 'A'
resp = resp.form.submit('submit')
assert urlopen.call_count == 2
assert len(rsps.calls) == 2
assert '?test=foobar' in rsps.calls[-1].request.url
resp = resp.follow()
@ -1849,24 +1846,23 @@ def test_backoffice_wfedit_and_data_source_with_field_info(pub):
app = login(get_app(pub))
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
data = {'data': [{'id': 'DD', 'text': 'DD'}, {'id': 'EE', 'text': 'EE'}]}
def side_effect(url, *args):
assert '?xxx=FOO BAR 30' in url
return io.StringIO(json.dumps(data))
urlopen.side_effect = side_effect
with responses.RequestsMock() as rsps:
rsps.get(
'https://www.example.invalid/',
json={'data': [{'id': 'DD', 'text': 'DD'}, {'id': 'EE', 'text': 'EE'}]},
)
resp = app.get('/backoffice/management/form-title/%s/' % number31.id)
resp = resp.form.submit('button_wfedit')
resp = resp.follow()
assert urlopen.call_count == 1
assert len(rsps.calls) == 1
assert '?xxx=FOO%20BAR%2030' in rsps.calls[-1].request.url
assert 'invalid value' not in resp
assert resp.form['f3'].value == 'EE'
resp.form['f3'].value = 'DD'
resp = resp.form.submit('submit')
assert urlopen.call_count == 2
assert len(rsps.calls) == 2
assert '?xxx=FOO%20BAR%2030' in rsps.calls[-1].request.url
resp = resp.follow()
@ -3252,17 +3248,15 @@ def test_backoffice_workflow_form_with_live_data_source(pub):
app = get_app(pub)
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
data1 = {'data': [{'id': 'A', 'text': 'hello'}, {'id': 'B', 'text': 'world'}]}
data2 = {'data': [{'id': 'C', 'text': 'hello'}, {'id': 'D', 'text': 'world'}]}
def side_effect(url, *args):
if 'toto' not in url:
return io.StringIO(json.dumps(data1))
else:
return io.StringIO(json.dumps(data2))
urlopen.side_effect = side_effect
with responses.RequestsMock() as rsps:
rsps.get(
'https://www.example.invalid/',
json={'data': [{'id': 'A', 'text': 'hello'}, {'id': 'B', 'text': 'world'}]},
)
rsps.get(
'https://www.example.invalid/toto',
json={'data': [{'id': 'C', 'text': 'hello'}, {'id': 'D', 'text': 'world'}]},
)
resp = login(app).get(formdata.get_url(backoffice=True))
assert 'fblah_1' in resp.form.fields

View File

@ -11,6 +11,7 @@ import zipfile
from unittest import mock
import pytest
import responses
from django.utils.encoding import force_bytes, force_text
from webtest import Hidden, Radio, Upload
@ -2898,14 +2899,15 @@ def test_form_edit_autocomplete_list(pub):
app = get_app(pub)
login(app, username='foo', password='foo')
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
with responses.RequestsMock() as rsps:
data = {
'data': [
{'id': '1', 'text': 'hello', 'extra': 'foo'},
{'id': '2', 'text': 'world', 'extra': 'bar'},
]
}
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
rsps.get('http://remote.example.net/json', json=data)
resp = app.get('/test/')
assert 'data-select2-url=' in resp.text
# simulate select2 mode, with qommon.forms.js adding an extra hidden widget
@ -3058,24 +3060,26 @@ def test_form_item_data_source_field_submit(pub):
'value': 'http://www.example.net/plop',
}
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
data = {'data': [{'id': '1', 'text': 'un'}, {'id': '2', 'text': 'deux'}]}
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
with responses.RequestsMock() as rsps:
rsps.get(
'http://www.example.net/plop',
json={'data': [{'id': '1', 'text': 'un'}, {'id': '2', 'text': 'deux'}]},
)
assert submit_item_data_source_field(ds) == {'0': '1', '0_display': 'un'}
# numeric identifiers
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
data = {'data': [{'id': 1, 'text': 'un'}, {'id': 2, 'text': 'deux'}]}
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
with responses.RequestsMock() as rsps:
rsps.get(
'http://www.example.net/plop', json={'data': [{'id': 1, 'text': 'un'}, {'id': 2, 'text': 'deux'}]}
)
assert submit_item_data_source_field(ds) == {'0': '1', '0_display': 'un'}
@pytest.mark.parametrize('fail_after_count_page', range(2, 8))
@pytest.mark.parametrize('fail_after_count_validation', range(0, 2))
@mock.patch('wcs.qommon.misc.urlopen')
def test_form_item_data_source_error(
urlopen, pub, monkeypatch, fail_after_count_page, fail_after_count_validation
):
@responses.activate
def test_form_item_data_source_error(pub, monkeypatch, fail_after_count_page, fail_after_count_validation):
NamedDataSource.wipe()
data_source = NamedDataSource(name='foobar')
data_source.data_source = {'type': 'json', 'value': 'http://www.example.net/plop'}
data_source.id_parameter = 'id'
@ -3106,8 +3110,9 @@ def test_form_item_data_source_error(
return f
data = {'data': [{'id': '1', 'text': 'un'}, {'id': '2', 'text': 'deux'}]}
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
responses.get(
'http://www.example.net/plop', json={'data': [{'id': '1', 'text': 'un'}, {'id': '2', 'text': 'deux'}]}
)
formdef = create_formdef()
formdef.fields = [
@ -3153,8 +3158,9 @@ def test_form_item_data_source_error(
@pytest.mark.parametrize('fail_after_count_page', range(2, 8))
@mock.patch('wcs.qommon.misc.urlopen')
def test_form_item_data_source_error_no_confirmation(urlopen, pub, monkeypatch, fail_after_count_page):
@responses.activate
def test_form_item_data_source_error_no_confirmation(pub, monkeypatch, fail_after_count_page):
NamedDataSource.wipe()
data_source = NamedDataSource(name='foobar')
data_source.data_source = {'type': 'json', 'value': 'http://www.example.net/plop'}
data_source.id_parameter = 'id'
@ -3185,8 +3191,9 @@ def test_form_item_data_source_error_no_confirmation(urlopen, pub, monkeypatch,
return f
data = {'data': [{'id': '1', 'text': 'un'}, {'id': '2', 'text': 'deux'}]}
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
responses.get(
'http://www.example.net/plop', json={'data': [{'id': '1', 'text': 'un'}, {'id': '2', 'text': 'deux'}]}
)
formdef = create_formdef()
formdef.confirmation = False
@ -3601,12 +3608,9 @@ def test_form_page_formula_prefill_items_field(pub):
)
]
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
data = {'data': [{'id': '1', 'text': 'hello'}, {'id': '2', 'text': 'world'}]}
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
resp = get_app(pub).get('/test/')
assert not resp.form['f0$element1'].checked
assert resp.form['f0$element2'].checked
resp = get_app(pub).get('/test/')
assert not resp.form['f0$element1'].checked
assert resp.form['f0$element2'].checked
def test_form_page_checkbox_prefill(pub):
@ -3696,15 +3700,18 @@ def test_form_page_template_prefill_items_field(pub):
)
formdef.store()
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
data = {
'data': [
{'id': 'foo', 'text': 'hello'},
{'id': 'bar', 'text': 'world'},
{'id': 'baz', 'text': '!'},
]
}
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
with responses.RequestsMock() as rsps:
rsps.get(
'http://remote.example.net/json',
json={
'data': [
{'id': 'foo', 'text': 'hello'},
{'id': 'bar', 'text': 'world'},
{'id': 'baz', 'text': '!'},
]
},
)
resp = get_app(pub).get('/test/')
assert resp.form['f0$elementfoo'].checked
assert not resp.form['f0$elementbar'].checked
@ -4261,16 +4268,11 @@ def test_form_map_field_prefill_address(pub):
resp = get_app(pub).get('/test/')
formdef.data_class().wipe()
resp.form['f1'] = '169 rue du chateau, paris'
with mock.patch('wcs.wf.geolocate.http_get_page') as http_get_page:
http_get_page.return_value = (
None,
200,
json.dumps([{'lat': '48.8337085', 'lon': '2.3233693'}]),
None,
)
with responses.RequestsMock() as rsps:
rsps.get('https://nominatim.entrouvert.org/search', json=[{'lat': '48.8337085', 'lon': '2.3233693'}])
resp = resp.form.submit('submit')
assert resp.form['f3$latlng'].value == '48.8337085;2.3233693'
assert 'chateau' in http_get_page.call_args[0][0]
assert 'chateau' in rsps.calls[0].request.url
def test_form_map_field_prefill_coords(pub):
@ -4903,8 +4905,8 @@ def test_form_autosave_with_invalid_data(pub):
assert resp.forms[1]['f1'].value == 'foobar' # not a valid email
@mock.patch('wcs.qommon.misc.urlopen')
def test_form_autosave_item_field_data_source_error(urlopen, pub):
@responses.activate
def test_form_autosave_item_field_data_source_error(pub):
ds = {'type': 'json', 'value': 'http://www.example.net/plop'}
formdef = create_formdef()
formdef.fields = [
@ -4913,8 +4915,9 @@ def test_form_autosave_item_field_data_source_error(urlopen, pub):
formdef.enable_tracking_codes = True
formdef.store()
data = {'data': [{'id': '1', 'text': 'un'}, {'id': '2', 'text': 'deux'}]}
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
responses.get(
'http://www.example.net/plop', json={'data': [{'id': '1', 'text': 'un'}, {'id': '2', 'text': 'deux'}]}
)
formdef.data_class().wipe()
app = get_app(pub)
@ -6255,9 +6258,11 @@ def test_item_field_with_disabled_items(http_requests, pub):
]
formdef.store()
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
data = {'data': [{'id': '1', 'text': 'hello'}, {'id': '2', 'text': 'world'}]}
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
with responses.RequestsMock() as rsps:
rsps.get(
'http://remote.example.net/json',
json={'data': [{'id': '1', 'text': 'hello'}, {'id': '2', 'text': 'world'}]},
)
resp = get_app(pub).get('/test/')
resp.form['f0'] = '1'
resp.form['f0'] = '2'
@ -6268,9 +6273,11 @@ def test_item_field_with_disabled_items(http_requests, pub):
formdef.data_class().wipe()
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
data = {'data': [{'id': '1', 'text': 'hello', 'disabled': True}, {'id': '2', 'text': 'world'}]}
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
with responses.RequestsMock() as rsps:
rsps.get(
'http://remote.example.net/json',
json={'data': [{'id': '1', 'text': 'hello', 'disabled': True}, {'id': '2', 'text': 'world'}]},
)
resp = get_app(pub).get('/test/')
pq = resp.pyquery.remove_namespaces()
assert pq('option[disabled=disabled][value="1"]').text() == 'hello'
@ -6294,9 +6301,11 @@ def test_item_field_with_disabled_items(http_requests, pub):
]
formdef.store()
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
data = {'data': [{'id': '1', 'text': 'hello', 'disabled': True}, {'id': '2', 'text': 'world'}]}
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
with responses.RequestsMock() as rsps:
rsps.get(
'http://remote.example.net/json',
json={'data': [{'id': '1', 'text': 'hello', 'disabled': True}, {'id': '2', 'text': 'world'}]},
)
resp = get_app(pub).get('/test/')
pq = resp.pyquery.remove_namespaces()
assert len(pq('option[disabled=disabled][value="1"]')) == 0
@ -6319,9 +6328,11 @@ def test_item_field_with_disabled_items(http_requests, pub):
]
formdef.store()
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
data = {'data': [{'id': '1', 'text': 'hello'}, {'id': '2', 'text': 'world'}]}
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
with responses.RequestsMock() as rsps:
rsps.get(
'http://remote.example.net/json',
json={'data': [{'id': '1', 'text': 'hello'}, {'id': '2', 'text': 'world'}]},
)
resp = get_app(pub).get('/test/')
resp.form['f0'] = '1'
resp.form['f0'] = '2'
@ -6332,9 +6343,11 @@ def test_item_field_with_disabled_items(http_requests, pub):
formdef.data_class().wipe()
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
data = {'data': [{'id': '1', 'text': 'hello', 'disabled': True}, {'id': '2', 'text': 'world'}]}
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
with responses.RequestsMock() as rsps:
rsps.get(
'http://remote.example.net/json',
json={'data': [{'id': '1', 'text': 'hello', 'disabled': True}, {'id': '2', 'text': 'world'}]},
)
resp = get_app(pub).get('/test/')
pq = resp.pyquery.remove_namespaces()
assert len(pq('input[name="f0"][disabled=disabled][value="1"]')) == 1
@ -6363,9 +6376,11 @@ def test_items_field_with_disabled_items(http_requests, pub):
]
formdef.store()
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
data = {'data': [{'id': '1', 'text': 'hello'}, {'id': '2', 'text': 'world'}]}
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
with responses.RequestsMock() as rsps:
rsps.get(
'http://remote.example.net/json',
json={'data': [{'id': '1', 'text': 'hello'}, {'id': '2', 'text': 'world'}]},
)
resp = get_app(pub).get('/test/')
resp.form['f0$element1'].checked = True
resp.form['f0$element2'].checked = True
@ -6376,9 +6391,11 @@ def test_items_field_with_disabled_items(http_requests, pub):
formdef.data_class().wipe()
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
data = {'data': [{'id': '1', 'text': 'hello', 'disabled': True}, {'id': '2', 'text': 'world'}]}
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
with responses.RequestsMock() as rsps:
rsps.get(
'http://remote.example.net/json',
json={'data': [{'id': '1', 'text': 'hello', 'disabled': True}, {'id': '2', 'text': 'world'}]},
)
resp = get_app(pub).get('/test/')
assert 'disabled' in resp.form['f0$element1'].attrs
resp.form['f0$element1'].checked = True
@ -6394,9 +6411,11 @@ def test_items_field_with_disabled_items(http_requests, pub):
]
formdef.store()
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
data = {'data': [{'id': '1', 'text': 'hello', 'disabled': True}, {'id': '2', 'text': 'world'}]}
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
with responses.RequestsMock() as rsps:
rsps.get(
'http://remote.example.net/json',
json={'data': [{'id': '1', 'text': 'hello', 'disabled': True}, {'id': '2', 'text': 'world'}]},
)
resp = get_app(pub).get('/test/')
assert 'f0$element1' not in resp.form.fields
resp.form['f0$element2'].checked = True
@ -6427,14 +6446,14 @@ def test_item_field_autocomplete_json_source(http_requests, pub, error_email, em
]
formdef.store()
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
with responses.RequestsMock() as rsps:
data = {
'data': [
{'id': '1', 'text': 'hello', 'extra': 'foo'},
{'id': '2', 'text': 'world', 'extra': 'bar'},
]
}
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
rsps.get('http://remote.example.net/json', json=data)
resp = get_app(pub).get('/test/')
assert 'data-autocomplete="true"' in resp.text
assert resp.form['f0'].value == '1'
@ -6448,14 +6467,14 @@ def test_item_field_autocomplete_json_source(http_requests, pub, error_email, em
# check hint is displayed within
formdef.fields[0].hint = 'help text'
formdef.store()
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
with responses.RequestsMock() as rsps:
data = {
'data': [
{'id': '1', 'text': 'hello', 'extra': 'foo'},
{'id': '2', 'text': 'world', 'extra': 'bar'},
]
}
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
rsps.get('http://remote.example.net/json', json=data)
resp = get_app(pub).get('/test/')
assert 'data-autocomplete="true"' in resp.text
assert 'data-hint="help text"' in resp.text
@ -6472,25 +6491,24 @@ def test_item_field_autocomplete_json_source(http_requests, pub, error_email, em
formdef.data_class().wipe()
app = get_app(pub)
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
with responses.RequestsMock() as rsps:
data = {
'data': [
{'id': '1', 'text': 'hello', 'extra': 'foo'},
{'id': '2', 'text': 'world', 'extra': 'bar'},
]
}
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
resp = app.get('/test/')
assert urlopen.call_count == 0
assert len(rsps.calls) == 0
pq = resp.pyquery.remove_namespaces()
select2_url = pq('select').attr['data-select2-url']
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
with responses.RequestsMock() as rsps:
data = {'data': [{'id': '1', 'text': 'hello', 'extra': 'foo'}]}
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
rsps.get('http://remote.example.net/json', json=data)
resp2 = app.get(select2_url + '?q=hell')
assert urlopen.call_count == 1
assert urlopen.call_args[0][0] == 'http://remote.example.net/json?q=hell'
assert len(rsps.calls) == 1
assert rsps.calls[-1].request.url == 'http://remote.example.net/json?q=hell'
assert resp2.json == data
# check unauthorized access
@ -6500,17 +6518,17 @@ def test_item_field_autocomplete_json_source(http_requests, pub, error_email, em
formdef.data_class().wipe()
app = get_app(pub)
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
urlopen.side_effect = ConnectionError('...')
with responses.RequestsMock() as rsps:
rsps.get('http://remote.example.net/json', body=ConnectionError('...'))
resp = app.get('/test/')
assert urlopen.call_count == 0
assert len(rsps.calls) == 0
pq = resp.pyquery.remove_namespaces()
select2_url = pq('select').attr['data-select2-url']
assert emails.count() == 0
resp2 = app.get(select2_url + '?q=hell')
assert urlopen.call_count == 1
assert urlopen.call_args[0][0] == 'http://remote.example.net/json?q=hell'
assert len(rsps.calls) == 1
assert rsps.calls[-1].request.url == 'http://remote.example.net/json?q=hell'
assert resp2.json == {'data': [], 'err': '1'}
assert emails.count() == 0
@ -6533,21 +6551,21 @@ def test_item_field_autocomplete_json_source(http_requests, pub, error_email, em
resp.form['f0'].force_value('1')
resp.form.fields['f0_display'].force_value('hello')
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
with responses.RequestsMock() as rsps:
data = {'data': [{'id': '1', 'text': 'hello', 'extra': 'foo'}]}
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
rsps.get('http://remote.example.net/json', json=data)
resp = resp.form.submit('submit') # -> validation page
assert urlopen.call_count == 1
assert urlopen.call_args[0][0] == 'http://remote.example.net/json?id=1'
assert len(rsps.calls) == 1
assert rsps.calls[-1].request.url == 'http://remote.example.net/json?id=1'
assert resp.form['f0'].value == '1'
assert resp.form['f0_label'].value == 'hello'
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
with responses.RequestsMock() as rsps:
data = {'data': [{'id': '1', 'text': 'hello', 'extra': 'foo'}]}
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
rsps.get('http://remote.example.net/json', json=data)
resp = resp.form.submit('submit') # -> submit
assert urlopen.call_count == 1
assert urlopen.call_args[0][0] == 'http://remote.example.net/json?id=1'
assert len(rsps.calls) == 1
assert rsps.calls[-1].request.url == 'http://remote.example.net/json?id=1'
assert formdef.data_class().select()[0].data['0'] == '1'
assert formdef.data_class().select()[0].data['0_display'] == 'hello'
assert formdef.data_class().select()[0].data['0_structured'] == data['data'][0]
@ -6558,22 +6576,18 @@ def test_item_field_autocomplete_json_source(http_requests, pub, error_email, em
data_source.store()
app = get_app(pub)
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
data = {
'data': [{'id': 1, 'text': 'hello', 'extra': 'foo'}, {'id': 2, 'text': 'world', 'extra': 'bar'}]
}
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
with responses.RequestsMock() as rsps:
resp = app.get('/test/')
assert urlopen.call_count == 0
assert len(rsps.calls) == 0
pq = resp.pyquery.remove_namespaces()
select2_url = pq('select').attr['data-select2-url']
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
with responses.RequestsMock() as rsps:
data = {'data': [{'id': 1, 'text': 'hello', 'extra': 'foo'}]}
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
rsps.get('http://remote.example.net/json-numeric-id', json=data)
resp2 = app.get(select2_url + '?q=hell')
assert urlopen.call_count == 1
assert urlopen.call_args[0][0] == 'http://remote.example.net/json-numeric-id?q=hell'
assert len(rsps.calls) == 1
assert rsps.calls[-1].request.url == 'http://remote.example.net/json-numeric-id?q=hell'
assert resp2.json == data
# check unauthorized access
@ -6584,21 +6598,21 @@ def test_item_field_autocomplete_json_source(http_requests, pub, error_email, em
resp.form['f0'].force_value('1')
resp.form.fields['f0_display'].force_value('hello')
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
with responses.RequestsMock() as rsps:
data = {'data': [{'id': 1, 'text': 'hello', 'extra': 'foo'}]}
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
rsps.get('http://remote.example.net/json-numeric-id', json=data)
resp = resp.form.submit('submit') # -> validation page
assert urlopen.call_count == 1
assert urlopen.call_args[0][0] == 'http://remote.example.net/json-numeric-id?id=1'
assert len(rsps.calls) == 1
assert rsps.calls[-1].request.url == 'http://remote.example.net/json-numeric-id?id=1'
assert resp.form['f0'].value == '1'
assert resp.form['f0_label'].value == 'hello'
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
with responses.RequestsMock() as rsps:
data = {'data': [{'id': 1, 'text': 'hello', 'extra': 'foo'}]}
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
rsps.get('http://remote.example.net/json-numeric-id', json=data)
resp = resp.form.submit('submit') # -> submit
assert urlopen.call_count == 1
assert urlopen.call_args[0][0] == 'http://remote.example.net/json-numeric-id?id=1'
assert len(rsps.calls) == 1
assert rsps.calls[-1].request.url == 'http://remote.example.net/json-numeric-id?id=1'
assert formdef.data_class().select()[0].data['0'] == '1'
assert formdef.data_class().select()[0].data['0_display'] == 'hello'
assert formdef.data_class().select()[0].data['0_structured'] == data['data'][0]
@ -6616,25 +6630,20 @@ remote.example.net = 1234
)
app = get_app(pub)
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
data = {
'data': [
{'id': '1', 'text': 'hello', 'extra': 'foo'},
{'id': '2', 'text': 'world', 'extra': 'bar'},
]
}
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
with responses.RequestsMock() as rsps:
resp = app.get('/test/')
assert urlopen.call_count == 0
assert len(rsps.calls) == 0
pq = resp.pyquery.remove_namespaces()
select2_url = pq('select').attr['data-select2-url']
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
with responses.RequestsMock() as rsps:
data = {'data': [{'id': '1', 'text': 'hello', 'extra': 'foo'}]}
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
rsps.get('http://remote.example.net/json', json=data)
resp2 = app.get(select2_url + '?q=hell')
assert urlopen.call_count == 1
assert urlopen.call_args[0][0].startswith('http://remote.example.net/json?q=hell&orig=example.net&')
assert len(rsps.calls) == 1
assert rsps.calls[-1].request.url.startswith(
'http://remote.example.net/json?q=hell&orig=example.net&'
)
assert resp2.json == data
# simulate select2 mode, with qommon.forms.js adding an extra hidden widget
@ -6642,21 +6651,21 @@ remote.example.net = 1234
resp.form['f0'].force_value('1')
resp.form.fields['f0_display'].force_value('hello')
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
with responses.RequestsMock() as rsps:
data = {'data': [{'id': '1', 'text': 'hello', 'extra': 'foo'}]}
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
rsps.get('http://remote.example.net/json', json=data)
resp = resp.form.submit('submit') # -> validation page
assert urlopen.call_count == 1
assert urlopen.call_args[0][0].startswith('http://remote.example.net/json?id=1&orig=example.net&')
assert len(rsps.calls) == 1
assert rsps.calls[-1].request.url.startswith('http://remote.example.net/json?id=1&orig=example.net&')
assert resp.form['f0'].value == '1'
assert resp.form['f0_label'].value == 'hello'
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
with responses.RequestsMock() as rsps:
data = {'data': [{'id': '1', 'text': 'hello', 'extra': 'foo'}]}
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
rsps.get('http://remote.example.net/json', json=data)
resp = resp.form.submit('submit') # -> submit
assert urlopen.call_count == 1
assert urlopen.call_args[0][0].startswith('http://remote.example.net/json?id=1&orig=example.net&')
assert len(rsps.calls) == 1
assert rsps.calls[-1].request.url.startswith('http://remote.example.net/json?id=1&orig=example.net&')
assert formdef.data_class().select()[0].data['0'] == '1'
assert formdef.data_class().select()[0].data['0_display'] == 'hello'
assert formdef.data_class().select()[0].data['0_structured'] == data['data'][0]
@ -6667,17 +6676,18 @@ remote.example.net = 1234
formdef.store()
app = get_app(pub)
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
with responses.RequestsMock() as rsps:
data = {
'data': [
{'id': '1', 'text': 'hello', 'extra': 'foo'},
{'id': '2', 'text': 'world', 'extra': 'bar'},
]
}
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
rsps.get('http://remote.example.net/json', json=data)
resp = app.get('/test/')
pq = resp.pyquery.remove_namespaces()
select2_url = pq('select').attr['data-select2-url']
rsps.reset()
resp = resp.form.submit('submit') # -> validation page
assert resp.form['f0'].value == ''
@ -6691,24 +6701,15 @@ remote.example.net = 1234
data_source.store()
app = get_app(pub)
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
data = {
'data': [
{'id': '1', 'text': 'hello', 'extra': 'foo'},
{'id': '2', 'text': 'world', 'extra': 'bar'},
]
}
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
with responses.RequestsMock() as rsps:
resp = app.get('/test/')
assert urlopen.call_count == 0
assert len(rsps.calls) == 0
pq = resp.pyquery.remove_namespaces()
select2_url = pq('select').attr['data-select2-url']
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
data = {'data': [{'id': '1', 'text': 'hello', 'extra': 'foo'}]}
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
with responses.RequestsMock() as rsps:
resp2 = app.get(select2_url + '?q=hell', status=403)
assert urlopen.call_count == 0
assert len(rsps.calls) == 0
def test_item_field_autocomplete_jsonp_source(http_requests, pub):
@ -6733,9 +6734,8 @@ def test_item_field_autocomplete_jsonp_source(http_requests, pub):
formdef.store()
app = get_app(pub)
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
with responses.RequestsMock() as rsps:
resp = app.get('/test/')
assert urlopen.call_count == 0
pq = resp.pyquery.remove_namespaces()
select2_url = pq('select').attr['data-select2-url']
assert select2_url == 'http://remote.example.net/jsonp'
@ -6746,15 +6746,15 @@ def test_item_field_autocomplete_jsonp_source(http_requests, pub):
resp.form['f0'].force_value('1')
resp.form['f0_display'].force_value('hello')
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
with responses.RequestsMock() as rsps:
resp = resp.form.submit('submit') # -> validation page
assert urlopen.call_count == 0
assert len(rsps.calls) == 0
assert resp.form['f0'].value == '1'
assert resp.form['f0_label'].value == 'hello'
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
with responses.RequestsMock() as rsps:
resp = resp.form.submit('submit') # -> submit
assert urlopen.call_count == 0
assert len(rsps.calls) == 0
assert formdef.data_class().select()[0].data['0'] == '1'
assert formdef.data_class().select()[0].data['0_display'] == 'hello'
# no _structured data for pure jsonp sources
@ -6831,9 +6831,9 @@ def test_item_field_autocomplete_ezt_variable_jsonp(http_requests, pub):
formdef.store()
app = get_app(pub)
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
with responses.RequestsMock() as rsps:
resp = app.get('/test/')
assert urlopen.call_count == 0
assert len(rsps.calls) == 0
pq = resp.pyquery.remove_namespaces()
select2_url = pq('select').attr['data-select2-url']
assert select2_url == 'http://example.net/foo-jsonp'
@ -6844,9 +6844,9 @@ def test_item_field_autocomplete_ezt_variable_jsonp(http_requests, pub):
resp.form['f1'].force_value('1')
resp.form['f1_display'].force_value('hello')
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
with responses.RequestsMock() as rsps:
resp = resp.form.submit('submit') # -> 2nd page
assert urlopen.call_count == 0
assert len(rsps.calls) == 0
assert resp.pyquery('select').attr['data-select2-url'] == 'http://example.net/foo-jsonp?a=1'
# simulate select2 mode, with qommon.forms.js adding an extra hidden widget
@ -6855,10 +6855,10 @@ def test_item_field_autocomplete_ezt_variable_jsonp(http_requests, pub):
resp.form['f3'].force_value('2')
resp.form['f3_display'].force_value('hello2')
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
with responses.RequestsMock() as rsps:
resp = resp.form.submit('submit') # -> validation
resp = resp.form.submit('submit') # -> submit
assert urlopen.call_count == 0
assert len(rsps.calls) == 0
assert formdef.data_class().select()[0].data['1'] == '1'
assert formdef.data_class().select()[0].data['1_display'] == 'hello'
assert formdef.data_class().select()[0].data['3'] == '2'
@ -8306,25 +8306,25 @@ def test_frontoffice_workflow_form_with_dynamic_list(pub):
app = login(get_app(pub), username='foo', password='foo')
resp = app.get(formdata.get_url(backoffice=False))
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
def side_effect(url, *args):
print('url:', url)
if url == 'http://example.org/10':
data = [
with responses.RequestsMock() as rsps:
rsps.get(
'http://example.org/10',
json={
'data': [
{'id': '1', 'text': 'hello', 'extra': 'foo'},
{'id': '2', 'text': 'world', 'extra': 'bar'},
]
elif url == 'http://example.org/20':
data = [
},
)
rsps.get(
'http://example.org/20',
json={
'data': [
{'id': '11', 'text': 'hello2', 'extra': 'foo'},
{'id': '21', 'text': 'world2', 'extra': 'bar'},
]
else:
data = []
return io.StringIO(json.dumps({'data': data}))
urlopen.side_effect = side_effect
},
)
live_url = resp.html.find('form').attrs['data-live-url']
live_resp = app.post(live_url + '?modified_field_id=init', params=resp.form.submit_fields())
@ -8530,16 +8530,17 @@ def test_frontoffice_workflow_form_with_attachment_and_python_datasource(pub):
resp = resp.form.submit('submit')
@mock.patch('wcs.qommon.misc.urlopen')
def test_frontoffice_workflow_form_with_disappearing_option(urlopen, pub, monkeypatch):
@responses.activate
def test_frontoffice_workflow_form_with_disappearing_option(pub, monkeypatch):
NamedDataSource.wipe()
data_source = NamedDataSource(name='foobar')
data_source.data_source = {'type': 'json', 'value': 'http://www.example.net/plop'}
data_source.id_parameter = 'id'
data_source.store()
data = {'data': [{'id': '1', 'text': 'un'}, {'id': '2', 'text': 'deux'}]}
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
responses.get(
'http://www.example.net/plop', json={'data': [{'id': '1', 'text': 'un'}, {'id': '2', 'text': 'deux'}]}
)
user = create_user(pub)
wf = Workflow(name='select')
@ -8875,9 +8876,9 @@ def test_create_formdata_empty_item_ds_with_id_parameter(pub, create_formdata):
create_formdata['target_formdef'].fields[2].data_source = {'type': 'foobar'}
create_formdata['target_formdef'].store()
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
with responses.RequestsMock() as rsps:
data = {'data': create_formdata['data']}
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
rsps.get('http://remote.example.net/json', json=data)
app = get_app(create_formdata['pub'])
resp = app.get('/source-form/')
@ -9260,7 +9261,7 @@ def test_form_item_timetable_data_source(pub, http_requests):
formdef.data_class().wipe()
app = get_app(pub)
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
with responses.RequestsMock() as rsps:
data = {
"data": [
{"id": "1", "datetime": "2021-01-12 10:00:00", "text": "event 1"},
@ -9268,7 +9269,7 @@ def test_form_item_timetable_data_source(pub, http_requests):
{"id": "3", "datetime": "2021-01-14 10:40:00", "text": "event 3"},
]
}
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
rsps.get('http://remote.example.net/api/datetimes', json=data)
resp = app.get('/test/')
assert 'data-date="2021-01-12"' in resp and 'data-time="10:00"' in resp
@ -9314,7 +9315,7 @@ def test_form_item_timetable_data_source_with_date_alignment(pub, http_requests)
formdef.data_class().wipe()
app = get_app(pub)
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
with responses.RequestsMock() as rsps:
data = {
"data": [
{"id": "1", "datetime": "2021-01-12 10:00:00", "text": "event 1"},
@ -9322,7 +9323,7 @@ def test_form_item_timetable_data_source_with_date_alignment(pub, http_requests)
{"id": "3", "datetime": "2021-01-14 10:40:00", "text": "event 3"},
]
}
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
rsps.get('http://remote.example.net/api/datetimes', json=data)
resp = app.get('/test/')
resp.form['f2'] = '2021-01-14'

View File

@ -1,8 +1,5 @@
import io
import json
from unittest import mock
import pytest
import responses
from webtest import Upload
from wcs import fields
@ -1420,18 +1417,10 @@ def test_block_post_condition_on_2nd_page(pub):
resp = resp.follow()
@mock.patch('wcs.qommon.misc.urlopen')
def test_block_with_dynamic_item_field(mock_urlopen, pub):
def data_source(url):
url, query = url.split('?q=')
payload = []
if query == 'foo':
payload = [{'id': '1', 'text': 'foo'}]
elif query == 'bar':
payload = [{'id': '2', 'text': 'bar'}]
return io.StringIO(json.dumps({'data': payload}))
mock_urlopen.side_effect = data_source
@responses.activate
def test_block_with_dynamic_item_field(pub):
responses.get('http://whatever/data-source?q=foo', json={'data': [{'id': '1', 'text': 'foo'}]})
responses.get('http://whatever/data-source?q=bar', json={'data': [{'id': '2', 'text': 'bar'}]})
FormDef.wipe()
BlockDef.wipe()

View File

@ -1,9 +1,8 @@
import io
import os
import re
from unittest import mock
import pytest
import responses
from webtest import Upload
try:
@ -75,11 +74,11 @@ def test_form_file_field_with_fargo(pub, fargo_url):
assert fargo_resp.location == 'http://fargo.example.net/pick/?pick=http%3A//example.net/fargo/pick'
# check loading a random URL doesn't work
fargo_resp = app.get('/fargo/pick?url=http://www.example.org/whatever', status=403)
with mock.patch('wcs.portfolio.urlopen') as urlopen:
urlopen.side_effect = ConnectionError('plop')
with responses.RequestsMock() as rsps:
rsps.get('http://fargo.example.net/...', body=ConnectionError('plop'))
fargo_resp = app.get('/fargo/pick?url=http://fargo.example.net/...', status=404)
with mock.patch('wcs.portfolio.urlopen') as urlopen:
urlopen.side_effect = lambda *args: io.BytesIO(b'...')
with responses.RequestsMock() as rsps:
rsps.get('http://fargo.example.net/...', body=b'...')
fargo_resp = app.get('/fargo/pick?url=http://fargo.example.net/...')
assert 'window.top.document.fargo_set_token' in fargo_resp.text
resp.form['f0$file'] = None

View File

@ -8,6 +8,7 @@ import zipfile
from unittest import mock
import pytest
import responses
from django.utils.encoding import force_bytes
from quixote.http_request import Upload as QuixoteUpload
from webtest import Hidden, Upload
@ -812,10 +813,10 @@ def test_formdata_generated_document_odt_to_pdf_download_push_to_portfolio(
resp = resp.follow()
assert 'The form has been recorded' in resp.text
with mock.patch('wcs.portfolio.http_post_request') as http_post_request:
http_post_request.return_value = None, 200, 'null', None
with responses.RequestsMock() as rsps:
rsps.post('http://fargo.example.net/api/documents/push/', body='null')
resp = resp.form.submit('button_export_to')
assert http_post_request.call_count == 1
assert len(rsps.calls) == 1
resp = resp.follow() # $form/$id/create_doc
resp = resp.follow() # $form/$id/create_doc/
@ -823,10 +824,12 @@ def test_formdata_generated_document_odt_to_pdf_download_push_to_portfolio(
assert b'PDF' in resp.body
resp = login(get_app(pub), username='foo', password='foo').get(form_location)
with mock.patch('wcs.portfolio.http_post_request') as http_post_request:
http_post_request.return_value = None, 400, '{"code": "document-exists"}', None # fail
with responses.RequestsMock() as rsps:
rsps.post(
'http://fargo.example.net/api/documents/push/', status=400, json={"code": "document-exists"}
)
resp = resp.form.submit('button_export_to')
assert http_post_request.call_count == 1
assert len(rsps.calls) == 1
error = pub.loggederror_class.select()[0]
assert error.summary.startswith("file 'template.pdf' failed to be pushed to portfolio of 'Foo")
assert 'status: 400' in error.summary
@ -842,12 +845,12 @@ def test_formdata_generated_document_odt_to_pdf_download_push_to_portfolio(
wf.store()
resp = login(get_app(pub), username='foo', password='foo').get(form_location)
with mock.patch('wcs.portfolio.http_post_request') as http_post_request:
http_post_request.return_value = None, 200, 'null', None
with responses.RequestsMock() as rsps:
rsps.post('http://fargo.example.net/api/documents/push/', body='null')
resp = resp.form.submit('button_export_to')
assert http_post_request.call_count == 1
assert http_post_request.call_args[0][0].startswith('http://fargo.example.net/api/documents/push/')
payload = json.loads(http_post_request.call_args[0][1])
assert len(rsps.calls) == 1
assert rsps.calls[0].request.url.startswith('http://fargo.example.net/api/documents/push/')
payload = json.loads(rsps.calls[0].request.body)
assert payload['file_name'] == 'template.pdf'
assert payload['user_email'] == 'foo@localhost'
assert payload['origin'] == 'example.net'
@ -1318,13 +1321,9 @@ def test_formdata_workflow_form_prefill_autocomplete(pub):
formdef.data_class().wipe()
resp = login(get_app(pub), username='foo', password='foo').get('/test/')
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
with responses.RequestsMock() as rsps:
data = {'data': [{'id': '1', 'text': 'hello', 'extra': 'foo'}]}
def side_effect(url, *args):
return io.StringIO(json.dumps(data))
urlopen.side_effect = side_effect
rsps.get('http://local-mock/test', json=data)
assert 'data-select2-url=' in resp
# simulate select2
@ -1544,14 +1543,14 @@ def test_formdata_named_wscall_in_comment(pub):
formdef.store()
formdef.data_class().wipe()
with mock.patch('wcs.qommon.misc._http_request') as mocked_http_request:
with responses.RequestsMock() as rsps:
data = {'foo': 'bar'}
mocked_http_request.return_value = (mock.Mock(headers={}), 200, json.dumps(data), 'headers')
rsps.get('http://remote.example.net/json', json=data, headers={'WWW-Authenticate': 'headers'})
resp = get_app(pub).get('/test/')
assert 'Hello XbarY.' in resp.text
with mock.patch('wcs.qommon.misc._http_request') as mocked_http_request:
mocked_http_request.side_effect = ConnectionError('...')
with responses.RequestsMock() as rsps:
rsps.get('http://remote.example.net/json', body=ConnectionError('...'))
resp = get_app(pub).get('/test/')
assert 'Hello XY.' in resp.text
if pub.loggederror_class:

View File

@ -1,10 +1,9 @@
import datetime
import io
import itertools
import json
from unittest import mock
import pytest
import responses
from webtest import Checkbox, Hidden
from wcs import fields
@ -767,7 +766,7 @@ def test_field_live_timetable_select(pub, http_requests):
app = get_app(pub)
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
with responses.RequestsMock() as rsps:
data = {
"data": [
{"id": "1", "datetime": "2021-01-12 10:00:00", "text": "event 1", "api": {}},
@ -781,7 +780,7 @@ def test_field_live_timetable_select(pub, http_requests):
},
]
}
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
rsps.get('http://remote.example.net/api/datetimes', json=data)
resp = app.get('/foo/')
assert resp.html.find('div', {'data-field-id': '2'}).attrs['data-live-source'] == 'true'
@ -1120,10 +1119,10 @@ def test_field_live_item_datasource_carddef_prefill(pub, http_requests):
assert live_resp.json['result']['2'] == {'visible': True}
@mock.patch('wcs.qommon.misc.urlopen')
def test_field_live_item_datasource_prefill_with_request(urlopen, pub):
@responses.activate
def test_field_live_item_datasource_prefill_with_request(pub):
data = {'data': [{'id': '1', 'text': 'un'}, {'id': '2', 'text': 'deux', 'x': 'bye'}]}
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
responses.get('http://remote.example.net/plop', json=data)
ds = {'type': 'json', 'value': 'http://remote.example.net/plop'}
FormDef.wipe()
@ -1159,8 +1158,8 @@ def test_field_live_item_datasource_prefill_with_request(urlopen, pub):
assert live_resp.json['result'] == {'2': {'visible': True, 'content': '2'}}
@mock.patch('wcs.qommon.misc.urlopen')
def test_field_live_item_datasource_prefill_with_request_with_q(urlopen, pub):
@responses.activate
def test_field_live_item_datasource_prefill_with_request_with_q(pub):
NamedDataSource.wipe()
data_source = NamedDataSource(name='foobar')
data_source.data_source = {
@ -1172,7 +1171,7 @@ def test_field_live_item_datasource_prefill_with_request_with_q(urlopen, pub):
data_source.store()
data = {'data': [{'id': '1', 'text': 'un'}, {'id': '2', 'text': 'deux', 'x': 'bye'}]}
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
responses.get('http://remote.example.net/json', json=data)
ds = {'type': 'foobar'}
FormDef.wipe()
@ -1207,9 +1206,9 @@ def test_field_live_item_datasource_prefill_with_request_with_q(urlopen, pub):
)
assert live_resp.json['result'] == {'2': {'visible': True, 'content': '2'}}
# check it has ?q=
assert urlopen.call_args[0][0] == 'http://remote.example.net/json?plop=&q=deux'
assert responses.calls[-1].request.url == 'http://remote.example.net/json?plop=&q=deux'
urlopen.side_effect = misc.ConnectionError('...')
responses.get('http://remote.example.net/json', body=misc.ConnectionError('...'))
# no error
app.post(
'/foo/live?modified_field_id=init&prefilled_1=on&prefilled_2=on',
@ -1872,8 +1871,8 @@ def test_item_field_from_cards_check_lazy_live(pub):
assert 'live value: attr1' in resp
@mock.patch('wcs.qommon.misc.urlopen')
def test_field_live_condition_data_source_error(urlopen, pub):
@responses.activate
def test_field_live_condition_data_source_error(pub):
ds = {'type': 'json', 'value': 'http://www.example.net/plop'}
FormDef.wipe()
@ -1892,7 +1891,7 @@ def test_field_live_condition_data_source_error(urlopen, pub):
formdef.store()
data = {'data': [{'id': '1', 'text': 'un'}, {'id': '2', 'text': 'deux', 'x': 'bye'}]}
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
responses.get('http://www.example.net/plop', json=data)
app = get_app(pub)
resp = app.get('/foo/')

View File

@ -1,10 +1,7 @@
import codecs
import io
import json
import os
import urllib.parse
import xml.etree.ElementTree as ET
from unittest import mock
import pytest
import responses
@ -556,11 +553,10 @@ def test_json_datasource_bad_qs_data(pub, error_email, emails, notify, record):
'notify_on_errors': notify,
'record_on_errors': record,
}
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
urlopen.side_effect = lambda *args: io.StringIO(json.dumps({'data': [{'id': '1', 'text': 'foo'}]}))
with responses.RequestsMock() as rsps:
rsps.get('https://whatever.com/json', json={'data': [{'id': '1', 'text': 'foo'}]})
assert data_sources.get_items(datasource) == [('1', 'foo', '1', {'id': '1', 'text': 'foo'})]
url = urlopen.call_args[0][0]
assert url == 'https://whatever.com/json?bar='
assert rsps.calls[-1].request.url == 'https://whatever.com/json?bar='
message = '[DATASOURCE] Failed to compute value "{% for invalid %}" for "foo" query parameter'
if notify:
assert emails.count() == 1
@ -1097,9 +1093,10 @@ def test_data_source_unicode(pub):
data_source2 = NamedDataSource.select()[0]
assert data_source2.data_source == data_source.data_source
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
urlopen.side_effect = lambda *args: io.StringIO(
'{"data": [{"id": 0, "text": "zéro"}, {"id": 1, "text": "uné"}, {"id": 2, "text": "deux"}]}'
with responses.RequestsMock() as rsps:
rsps.get(
'https://whatever.com/json',
json={"data": [{"id": 0, "text": "zéro"}, {"id": 1, "text": "uné"}, {"id": 2, "text": "deux"}]},
)
assert data_sources.get_items({'type': 'foobar'}) == [
('0', 'zéro', '0', {"id": 0, "text": "zéro"}),
@ -1116,10 +1113,10 @@ def test_data_source_signed(no_request_pub, qs_data):
data_source.qs_data = qs_data
data_source.store()
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
urlopen.side_effect = lambda *args: io.StringIO('{"data": [{"id": 0, "text": "zero"}]}')
with responses.RequestsMock() as rsps:
rsps.get('https://api.example.com/json', json={"data": [{"id": 0, "text": "zero"}]})
assert len(data_sources.get_items({'type': 'foobar'})) == 1
signed_url = urlopen.call_args[0][0]
signed_url = rsps.calls[-1].request.url
assert signed_url.startswith('https://api.example.com/json?')
parsed = urllib.parse.urlparse(signed_url)
querystring = urllib.parse.parse_qs(parsed.query)
@ -1135,10 +1132,10 @@ def test_data_source_signed(no_request_pub, qs_data):
data_source.data_source = {'type': 'json', 'value': "https://api.example.com/json?foo=bar"}
data_source.store()
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
urlopen.side_effect = lambda *args: io.StringIO('{"data": [{"id": 0, "text": "zero"}]}')
with responses.RequestsMock() as rsps:
rsps.get('https://api.example.com/json', json={"data": [{"id": 0, "text": "zero"}]})
assert len(data_sources.get_items({'type': 'foobar'})) == 1
signed_url = urlopen.call_args[0][0]
signed_url = rsps.calls[-1].request.url
assert signed_url.startswith('https://api.example.com/json?')
parsed = urllib.parse.urlparse(signed_url)
querystring = urllib.parse.parse_qs(parsed.query)
@ -1154,10 +1151,10 @@ def test_data_source_signed(no_request_pub, qs_data):
data_source.data_source = {'type': 'json', 'value': "https://no-secret.example.com/json"}
data_source.store()
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
urlopen.side_effect = lambda *args: io.StringIO('{"data": [{"id": 0, "text": "zero"}]}')
with responses.RequestsMock() as rsps:
rsps.get('https://no-secret.example.com/json', json={"data": [{"id": 0, "text": "zero"}]})
assert len(data_sources.get_items({'type': 'foobar'})) == 1
unsigned_url = urlopen.call_args[0][0]
unsigned_url = rsps.calls[-1].request.url
if qs_data:
assert unsigned_url == 'https://no-secret.example.com/json?arg1=val1&arg2=val2'
else:
@ -1165,10 +1162,10 @@ def test_data_source_signed(no_request_pub, qs_data):
data_source.data_source = {'type': 'json', 'value': "https://no-secret.example.com/json?foo=bar"}
data_source.store()
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
urlopen.side_effect = lambda *args: io.StringIO('{"data": [{"id": 0, "text": "zero"}]}')
with responses.RequestsMock() as rsps:
rsps.get('https://no-secret.example.com/json', json={"data": [{"id": 0, "text": "zero"}]})
assert len(data_sources.get_items({'type': 'foobar'})) == 1
unsigned_url = urlopen.call_args[0][0]
unsigned_url = rsps.calls[-1].request.url
if qs_data:
assert unsigned_url == 'https://no-secret.example.com/json?foo=bar&arg1=val1&arg2=val2'
else:
@ -1181,23 +1178,21 @@ def test_named_datasource_json_cache(requests_pub):
datasource.data_source = {'type': 'json', 'value': 'http://whatever/'}
datasource.store()
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
urlopen.side_effect = lambda *args: io.StringIO(
json.dumps({'data': [{'id': '1', 'text': 'foo'}, {'id': '2', 'text': 'bar'}]})
)
with responses.RequestsMock() as rsps:
rsps.get('http://whatever/', json={'data': [{'id': '1', 'text': 'foo'}, {'id': '2', 'text': 'bar'}]})
assert data_sources.get_structured_items({'type': 'foobar'}) == [
{'id': '1', 'text': 'foo'},
{'id': '2', 'text': 'bar'},
]
assert urlopen.call_count == 1
assert len(rsps.calls) == 1
get_request().datasources_cache = {}
assert data_sources.get_structured_items({'type': 'foobar'}) == [
{'id': '1', 'text': 'foo'},
{'id': '2', 'text': 'bar'},
]
assert urlopen.call_count == 2
assert len(rsps.calls) == 2
datasource.cache_duration = '60'
datasource.store()
@ -1208,7 +1203,7 @@ def test_named_datasource_json_cache(requests_pub):
{'id': '1', 'text': 'foo'},
{'id': '2', 'text': 'bar'},
]
assert urlopen.call_count == 3
assert len(rsps.calls) == 3
# will get from cache
get_request().datasources_cache = {}
@ -1216,7 +1211,7 @@ def test_named_datasource_json_cache(requests_pub):
{'id': '1', 'text': 'foo'},
{'id': '2', 'text': 'bar'},
]
assert urlopen.call_count == 3
assert len(rsps.calls) == 3
def test_named_datasource_id_parameter(requests_pub):
@ -1226,79 +1221,81 @@ def test_named_datasource_id_parameter(requests_pub):
datasource.id_parameter = 'id'
datasource.store()
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
with responses.RequestsMock() as rsps:
value = [{'id': '1', 'text': 'foo'}]
urlopen.side_effect = lambda *args: io.StringIO(json.dumps({'data': value}))
rsps.get('http://whatever/', json={'data': value})
assert datasource.get_structured_value('1') == value[0]
assert urlopen.call_count == 1
assert urlopen.call_args[0][0] == 'http://whatever/?id=1'
assert len(rsps.calls) == 1
assert rsps.calls[-1].request.url == 'http://whatever/?id=1'
# try again, get from request.datasources_cache
assert datasource.get_structured_value('1') == value[0]
assert urlopen.call_count == 1 # no new call
assert len(rsps.calls) == 1 # no new call
get_request().datasources_cache = {}
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
with responses.RequestsMock() as rsps:
value = [{'id': '1', 'text': 'bar'}, {'id': '2', 'text': 'foo'}]
urlopen.side_effect = lambda *args: io.StringIO(json.dumps({'data': value}))
rsps.get('http://whatever/', json={'data': value})
assert datasource.get_structured_value('1') == value[0]
assert urlopen.call_count == 1
assert len(rsps.calls) == 1
get_request().datasources_cache = {}
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
urlopen.side_effect = lambda *args: io.StringIO(json.dumps({'data': []})) # empty list
with responses.RequestsMock() as rsps:
rsps.get('http://whatever/', json={'data': []}) # empty list
assert datasource.get_structured_value('1') is None
assert urlopen.call_count == 1
assert len(rsps.calls) == 1
get_request().datasources_cache = {}
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
with responses.RequestsMock() as rsps:
value = [{'id': '1', 'text': 'foo'}]
urlopen.side_effect = lambda *args: io.StringIO(json.dumps({'data': value, 'err': 0}))
rsps.get('http://whatever/', json={'data': value})
assert datasource.get_structured_value('1') == value[0]
assert urlopen.call_count == 1
assert len(rsps.calls) == 1
get_request().datasources_cache = {}
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
with responses.RequestsMock() as rsps:
value = [{'id': '1', 'text': 'foo'}]
urlopen.side_effect = lambda *args: io.StringIO(json.dumps({'data': value, 'err': 1}))
rsps.get('http://whatever/', json={'data': value, 'err': 1})
assert datasource.get_structured_value('1') is None
assert urlopen.call_count == 1
assert len(rsps.calls) == 1
# no cache for errors
assert datasource.get_structured_value('1') is None
assert urlopen.call_count == 2 # called again
assert len(rsps.calls) == 2 # called again
get_request().datasources_cache = {}
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
with responses.RequestsMock() as rsps:
value = {'id': '1', 'text': 'foo'} # not a list
urlopen.side_effect = lambda *args: io.StringIO(json.dumps({'data': value}))
rsps.get('http://whatever/', json={'data': value})
assert datasource.get_structured_value('1') is None
assert urlopen.call_count == 1
assert len(rsps.calls) == 1
get_request().datasources_cache = {}
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
urlopen.side_effect = lambda *args: io.StringIO('not json')
with responses.RequestsMock() as rsps:
rsps.get('http://whatever/', body='not json')
assert datasource.get_structured_value('1') is None
assert urlopen.call_count == 1
assert len(rsps.calls) == 1
# ws badly configured, return all items
get_request().datasources_cache = {}
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
with responses.RequestsMock() as rsps:
value = [{'id': '1', 'text': 'bar'}, {'id': '2', 'text': 'foo'}]
urlopen.side_effect = lambda *args: io.StringIO(json.dumps({'data': value}))
rsps.get('http://whatever/', json={'data': value})
assert datasource.get_structured_value('2') == value[1]
assert urlopen.call_count == 1
assert len(rsps.calls) == 1
# try again, get from request.datasources_cache
assert datasource.get_structured_value('2') == value[1]
assert urlopen.call_count == 1 # no new call
assert len(rsps.calls) == 1
get_request().datasources_cache = {}
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
with responses.RequestsMock() as rsps:
value = [{'id': '1', 'text': 'bar'}, {'id': '2', 'text': 'foo'}]
urlopen.side_effect = lambda *args: io.StringIO(json.dumps({'data': value}))
rsps.get('http://whatever/', json={'data': value})
assert datasource.get_structured_value('3') is None
assert urlopen.call_count == 1
assert len(rsps.calls) == 1
# try again, get from request.datasources_cache
assert datasource.get_structured_value('3') is None
assert urlopen.call_count == 1 # no new call
assert len(rsps.calls) == 1 # no new call
def test_named_datasource_in_formdef(pub):
@ -1343,16 +1340,19 @@ def test_data_source_in_template(pub):
data_source.store()
with pub.complex_data():
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
urlopen.side_effect = lambda *args: io.StringIO(
'{"data": [{"id": 0, "text": "zéro"}, {"id": 1, "text": "uné"}, {"id": 2, "text": "deux"}]}'
with responses.RequestsMock() as rsps:
rsps.get(
'https://example.invalid/json',
json={
"data": [{"id": 0, "text": "zéro"}, {"id": 1, "text": "uné"}, {"id": 2, "text": "deux"}]
},
)
assert (
WorkflowStatusItem.compute('{{ data_source.foobar|first|get:"text" }}', allow_complex=True)
== 'zéro'
)
assert urlopen.call_args[0][0] == 'https://example.invalid/json?t=hello'
assert rsps.calls[-1].request.url == 'https://example.invalid/json?t=hello'
def export_to_indented_xml(data_source, include_id=False):

View File

@ -1,8 +1,7 @@
import io
import json
from unittest import mock
import pytest
import responses
from wcs import fields
from wcs.data_sources import NamedDataSource, build_agenda_datasources, collect_agenda_data
@ -98,23 +97,26 @@ AGENDA_MEETING_TYPES_DATA = {
}
@mock.patch('wcs.qommon.misc.urlopen')
def test_collect_agenda_data(urlopen, pub, chrono_url):
@responses.activate
def test_collect_agenda_data(pub, chrono_url):
pub.load_site_options()
NamedDataSource.wipe()
urlopen.side_effect = lambda *args: io.StringIO('{"data": []}')
responses.get('http://chrono.example.net/api/agenda/', json={"data": []})
assert collect_agenda_data(pub) == []
assert urlopen.call_args_list == [mock.call('http://chrono.example.net/api/agenda/')]
assert len(responses.calls) == 1
assert responses.calls[-1].request.url == 'http://chrono.example.net/api/agenda/'
urlopen.side_effect = ConnectionError
urlopen.reset_mock()
responses.reset()
responses.get('http://chrono.example.net/api/agenda/', body=ConnectionError('...'))
assert collect_agenda_data(pub) is None
assert urlopen.call_args_list == [mock.call('http://chrono.example.net/api/agenda/')]
assert len(responses.calls) == 1
assert responses.calls[-1].request.url == 'http://chrono.example.net/api/agenda/'
# events agenda
urlopen.side_effect = lambda *args: io.StringIO(json.dumps({"data": AGENDA_EVENTS_DATA}))
urlopen.reset_mock()
responses.reset()
responses.get('http://chrono.example.net/api/agenda/', json={"data": AGENDA_EVENTS_DATA})
assert collect_agenda_data(pub) == [
{
'slug': 'agenda-events-events-A',
@ -127,15 +129,21 @@ def test_collect_agenda_data(urlopen, pub, chrono_url):
'url': 'http://chrono.example.net/api/agenda/events-B/datetimes/',
},
]
assert urlopen.call_args_list == [mock.call('http://chrono.example.net/api/agenda/')]
assert len(responses.calls) == 1
assert responses.calls[-1].request.url == 'http://chrono.example.net/api/agenda/'
# meetings agenda
urlopen.side_effect = [
io.StringIO(json.dumps({"data": AGENDA_MEETINGS_DATA})),
io.StringIO(json.dumps({"data": AGENDA_MEETING_TYPES_DATA['meetings-A']})),
io.StringIO(json.dumps({"data": AGENDA_MEETING_TYPES_DATA['virtual-B']})),
]
urlopen.reset_mock()
responses.reset()
responses.get('http://chrono.example.net/api/agenda/', json={"data": AGENDA_MEETINGS_DATA})
responses.get(
'http://chrono.example.net/api/agenda/meetings-A/meetings/',
json={"data": AGENDA_MEETING_TYPES_DATA['meetings-A']},
)
responses.get(
'http://chrono.example.net/api/agenda/virtual-B/meetings/',
json={"data": AGENDA_MEETING_TYPES_DATA['virtual-B']},
)
assert collect_agenda_data(pub) == [
{
'slug': 'agenda-meetings-meetings-A-meetingtypes',
@ -163,36 +171,33 @@ def test_collect_agenda_data(urlopen, pub, chrono_url):
'url': 'http://chrono.example.net/api/agenda/virtual-B/meetings/mt-3/datetimes/',
},
]
assert urlopen.call_args_list == [
mock.call('http://chrono.example.net/api/agenda/'),
mock.call('http://chrono.example.net/api/agenda/meetings-A/meetings/'),
mock.call('http://chrono.example.net/api/agenda/virtual-B/meetings/'),
]
assert len(responses.calls) == 3
assert responses.calls[0].request.url == 'http://chrono.example.net/api/agenda/'
assert responses.calls[1].request.url == 'http://chrono.example.net/api/agenda/meetings-A/meetings/'
assert responses.calls[2].request.url == 'http://chrono.example.net/api/agenda/virtual-B/meetings/'
# if meeting types could not be collected
urlopen.side_effect = [
io.StringIO(json.dumps({"data": AGENDA_MEETINGS_DATA})),
io.StringIO(json.dumps({"data": AGENDA_MEETING_TYPES_DATA['meetings-A']})),
ConnectionError,
]
urlopen.reset_mock()
assert collect_agenda_data(pub) is None
assert urlopen.call_args_list == [
mock.call('http://chrono.example.net/api/agenda/'),
mock.call('http://chrono.example.net/api/agenda/meetings-A/meetings/'),
mock.call('http://chrono.example.net/api/agenda/virtual-B/meetings/'),
]
responses.reset()
responses.get('http://chrono.example.net/api/agenda/', json={"data": AGENDA_MEETINGS_DATA})
responses.get(
'http://chrono.example.net/api/agenda/meetings-A/meetings/',
json={"data": AGENDA_MEETING_TYPES_DATA['meetings-A']},
)
responses.get('http://chrono.example.net/api/agenda/virtual-B/meetings/', body=ConnectionError('...'))
urlopen.side_effect = [
io.StringIO(json.dumps({"data": AGENDA_MEETINGS_DATA})),
ConnectionError,
]
urlopen.reset_mock()
assert collect_agenda_data(pub) is None
assert urlopen.call_args_list == [
mock.call('http://chrono.example.net/api/agenda/'),
mock.call('http://chrono.example.net/api/agenda/meetings-A/meetings/'),
]
assert len(responses.calls) == 3
assert responses.calls[0].request.url == 'http://chrono.example.net/api/agenda/'
assert responses.calls[1].request.url == 'http://chrono.example.net/api/agenda/meetings-A/meetings/'
assert responses.calls[2].request.url == 'http://chrono.example.net/api/agenda/virtual-B/meetings/'
responses.reset()
responses.get('http://chrono.example.net/api/agenda/', json={"data": AGENDA_MEETINGS_DATA})
responses.get('http://chrono.example.net/api/agenda/meetings-A/meetings/', body=ConnectionError('...'))
assert collect_agenda_data(pub) is None
assert len(responses.calls) == 2
assert responses.calls[0].request.url == 'http://chrono.example.net/api/agenda/'
assert responses.calls[1].request.url == 'http://chrono.example.net/api/agenda/meetings-A/meetings/'
@mock.patch('wcs.data_sources.collect_agenda_data')

View File

@ -1,8 +1,8 @@
import base64
import json
import urllib.parse
from unittest import mock
import responses
from django.utils.encoding import force_bytes, force_text
from quixote import cleanup, get_session_manager
@ -143,11 +143,9 @@ def test_fc_login_page(caplog):
}
assert pub.user_class.count() == 0
with mock.patch('wcs.qommon.ident.franceconnect.http_post_request') as http_post_request, mock.patch(
'wcs.qommon.ident.franceconnect.http_get_page'
) as http_get_page:
http_post_request.return_value = (None, 200, json.dumps(token_result), None)
http_get_page.return_value = (None, 200, json.dumps(user_info_result), None)
with responses.RequestsMock() as rsps:
rsps.post('https://fcp.integ01.dev-franceconnect.fr/api/v1/token', json=token_result)
rsps.get('https://fcp.integ01.dev-franceconnect.fr/api/v1/userinfo', json=user_info_result)
resp = app.get(
'/ident/fc/callback?%s'
% urllib.parse.urlencode(
@ -215,11 +213,9 @@ def test_fc_login_page(caplog):
id_token['nonce'] = qs['nonce'][0]
token_result['id_token'] = '.%s.' % force_text(base64url_encode(json.dumps(id_token)))
with mock.patch('wcs.qommon.ident.franceconnect.http_post_request') as http_post_request, mock.patch(
'wcs.qommon.ident.franceconnect.http_get_page'
) as http_get_page:
http_post_request.return_value = (None, 200, json.dumps(token_result), None)
http_get_page.return_value = (None, 200, json.dumps(user_info_result), None)
with responses.RequestsMock() as rsps:
rsps.post('https://fcp.integ01.dev-franceconnect.fr/api/v1/token', json=token_result)
rsps.get('https://fcp.integ01.dev-franceconnect.fr/api/v1/userinfo', json=user_info_result)
resp = app.get(
'/ident/fc/callback?%s'
% urllib.parse.urlencode(
@ -271,11 +267,10 @@ def test_fc_login_page(caplog):
'family_name': 'Deux',
# 'email': 'john.deux@example.com', # missing
}
with mock.patch('wcs.qommon.ident.franceconnect.http_post_request') as http_post_request, mock.patch(
'wcs.qommon.ident.franceconnect.http_get_page'
) as http_get_page:
http_post_request.return_value = (None, 200, json.dumps(token_result), None)
http_get_page.return_value = (None, 200, json.dumps(bad_user_info_result), None)
with responses.RequestsMock() as rsps:
rsps.post('https://fcp.integ01.dev-franceconnect.fr/api/v1/token', json=token_result)
rsps.get('https://fcp.integ01.dev-franceconnect.fr/api/v1/userinfo', json=bad_user_info_result)
resp = app.get(
'/ident/fc/callback?%s'
% urllib.parse.urlencode(

View File

@ -10,13 +10,14 @@ import zipfile
from unittest import mock
import pytest
import responses
try:
from PIL import Image
except ImportError:
Image = None
from django.utils.encoding import force_bytes, force_text
from django.utils.encoding import force_text
from pyzbar.pyzbar import ZBarSymbol
from pyzbar.pyzbar import decode as zbar_decode_qrcode
from quixote import cleanup, get_publisher, get_response
@ -903,22 +904,19 @@ def test_roles_idp(pub):
item.perform(formdata)
assert not pub.user_class.get(user.id).roles
with mock.patch('wcs.wf.roles.http_post_request') as http_post_request:
http_post_request.return_value = (None, 201, '', None)
with responses.RequestsMock() as rsps:
get_response().process_after_jobs()
assert http_post_request.call_count == 0
assert len(rsps.calls) == 0
item.role_id = role.id
item.perform(formdata)
assert pub.user_class.get(user.id).roles == [role.id]
with mock.patch('wcs.wf.roles.http_post_request') as http_post_request:
http_post_request.return_value = (None, 201, '', None)
with responses.RequestsMock() as rsps:
rsps.post('http://idp.example.net/api/roles/bar1/members/xxx/', body=None, status=201)
get_response().process_after_jobs()
assert http_post_request.call_count == 1
assert http_post_request.call_args[0][0].startswith(
'http://idp.example.net/api/roles/bar1/members/xxx/'
)
assert 'signature=' in http_post_request.call_args[0][0]
assert len(rsps.calls) == 1
assert rsps.calls[-1].request.url.startswith('http://idp.example.net/api/roles/bar1/members/xxx/')
assert 'signature=' in rsps.calls[-1].request.url
user.roles = None
user.store()
@ -927,34 +925,31 @@ def test_roles_idp(pub):
item2.perform(formdata)
assert not pub.user_class.get(user.id).roles
with mock.patch('wcs.wf.roles.http_delete_request') as http_delete_request:
http_delete_request.return_value = (None, 200, '', None)
with responses.RequestsMock() as rsps:
get_response().process_after_jobs()
assert http_delete_request.call_count == 0
assert len(rsps.calls) == 0
item2.role_id = role.id
user.roles = [role.id]
user.store()
item2.perform(formdata)
assert not pub.user_class.get(user.id).roles
with mock.patch('wcs.wf.roles.http_delete_request') as http_delete_request:
http_delete_request.return_value = (None, 200, '', None)
with responses.RequestsMock() as rsps:
rsps.delete('http://idp.example.net/api/roles/bar1/members/xxx/')
get_response().process_after_jobs()
assert http_delete_request.call_count == 1
assert http_delete_request.call_args[0][0].startswith(
'http://idp.example.net/api/roles/bar1/members/xxx/'
)
assert 'signature=' in http_delete_request.call_args[0][0]
assert len(rsps.calls) == 1
assert rsps.calls[-1].request.url.startswith('http://idp.example.net/api/roles/bar1/members/xxx/')
assert 'signature=' in rsps.calls[-1].request.url
# out of http request/response cycle
pub._set_request(None)
with mock.patch('wcs.wf.roles.http_post_request') as http_post_request:
http_post_request.return_value = (None, 201, '', None)
with responses.RequestsMock() as rsps:
rsps.post('http://idp.example.net/api/roles/bar1/members/xxx/', body=None, status=201)
item.perform(formdata)
assert pub.user_class.get(user.id).roles == [role.id]
with mock.patch('wcs.wf.roles.http_delete_request') as http_delete_request:
http_delete_request.return_value = (None, 200, '', None)
with responses.RequestsMock() as rsps:
rsps.delete('http://idp.example.net/api/roles/bar1/members/xxx/')
item2.perform(formdata)
assert pub.user_class.get(user.id).roles == []
@ -2768,6 +2763,7 @@ def test_sms_with_passerelle(pub):
'passerelle_url': 'http://passerelle.example.com/send?nostop=1',
'sender': 'Passerelle',
}
pub.write_cfg()
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = [
@ -2786,11 +2782,11 @@ def test_sms_with_passerelle(pub):
item.body = 'my "message" {{ form_var_quotes }}'
with mock.patch('wcs.wscalls.get_secret_and_orig') as mocked_secret_and_orig:
mocked_secret_and_orig.return_value = ('secret', 'localhost')
with mock.patch('wcs.qommon.misc._http_request') as mocked_http_post:
mocked_http_post.return_value = (mock.Mock(headers={}), 200, 'data', 'headers')
with responses.RequestsMock() as rsps:
rsps.post('http://passerelle.example.com/send', body='data')
item.perform(formdata)
url = mocked_http_post.call_args[1]['url']
payload = mocked_http_post.call_args[1]['body']
url = rsps.calls[-1].request.url
payload = rsps.calls[-1].request.body
assert 'http://passerelle.example.com' in url
assert '?nostop=1' in url
assert 'orig=localhost' in url
@ -2804,8 +2800,8 @@ def test_sms_with_passerelle(pub):
pub.loggederror_class.wipe()
with mock.patch('wcs.wscalls.get_secret_and_orig') as mocked_secret_and_orig:
mocked_secret_and_orig.return_value = ('secret', 'localhost')
with mock.patch('wcs.qommon.misc._http_request') as mocked_http_post:
mocked_http_post.return_value = (mock.Mock(headers={}), 400, '{"err": 1}', 'headers')
with responses.RequestsMock() as rsps:
rsps.post('http://passerelle.example.com/send', status=400, json={"err": 1})
item.perform(formdata)
assert pub.loggederror_class.count() == 1
assert pub.loggederror_class.select()[0].summary == 'Could not send SMS'
@ -3344,57 +3340,37 @@ def test_geolocate_address(pub):
item.method = 'address_string'
item.address_string = '[form_var_string], paris, france'
with mock.patch('wcs.wf.geolocate.http_get_page') as http_get_page:
http_get_page.return_value = (
None,
200,
force_bytes(json.dumps([{'lat': '48.8337085', 'lon': '2.3233693'}])),
None,
)
with responses.RequestsMock() as rsps:
rsps.get('https://nominatim.entrouvert.org/search', json=[{'lat': '48.8337085', 'lon': '2.3233693'}])
item.perform(formdata)
assert 'https://nominatim.entrouvert.org/search' in http_get_page.call_args[0][0]
assert urllib.parse.quote('169 rue du chateau, paris') in http_get_page.call_args[0][0]
assert 'https://nominatim.entrouvert.org/search' in rsps.calls[-1].request.url
assert urllib.parse.quote('169 rue du chateau, paris') in rsps.calls[-1].request.url
assert int(formdata.geolocations['base']['lat']) == 48
assert int(formdata.geolocations['base']['lon']) == 2
pub.load_site_options()
pub.site_options.set('options', 'nominatim_key', 'KEY')
with mock.patch('wcs.wf.geolocate.http_get_page') as http_get_page:
http_get_page.return_value = (
None,
200,
force_bytes(json.dumps([{'lat': '48.8337085', 'lon': '2.3233693'}])),
None,
)
with responses.RequestsMock() as rsps:
rsps.get('https://nominatim.entrouvert.org/search', json=[{'lat': '48.8337085', 'lon': '2.3233693'}])
item.perform(formdata)
assert 'https://nominatim.entrouvert.org/search' in http_get_page.call_args[0][0]
assert urllib.parse.quote('169 rue du chateau, paris') in http_get_page.call_args[0][0]
assert 'key=KEY' in http_get_page.call_args[0][0]
assert 'https://nominatim.entrouvert.org/search' in rsps.calls[-1].request.url
assert urllib.parse.quote('169 rue du chateau, paris') in rsps.calls[-1].request.url
assert 'key=KEY' in rsps.calls[-1].request.url
assert int(formdata.geolocations['base']['lat']) == 48
assert int(formdata.geolocations['base']['lon']) == 2
pub.load_site_options()
pub.site_options.set('options', 'geocoding_service_url', 'http://example.net/')
with mock.patch('wcs.wf.geolocate.http_get_page') as http_get_page:
http_get_page.return_value = (
None,
200,
force_bytes(json.dumps([{'lat': '48.8337085', 'lon': '2.3233693'}])),
None,
)
with responses.RequestsMock() as rsps:
rsps.get('http://example.net/', json=[{'lat': '48.8337085', 'lon': '2.3233693'}])
item.perform(formdata)
assert 'http://example.net/?q=' in http_get_page.call_args[0][0]
assert 'http://example.net/?q=' in rsps.calls[-1].request.url
pub.site_options.set('options', 'geocoding_service_url', 'http://example.net/?param=value')
with mock.patch('wcs.wf.geolocate.http_get_page') as http_get_page:
http_get_page.return_value = (
None,
200,
force_bytes(json.dumps([{'lat': '48.8337085', 'lon': '2.3233693'}])),
None,
)
with responses.RequestsMock() as rsps:
rsps.get('http://example.net/', json=[{'lat': '48.8337085', 'lon': '2.3233693'}])
item.perform(formdata)
assert 'http://example.net/?param=value&' in http_get_page.call_args[0][0]
assert 'http://example.net/?param=value&' in rsps.calls[-1].request.url
# check for invalid ezt
item.address_string = '[if-any], paris, france'
@ -3424,22 +3400,22 @@ def test_geolocate_address(pub):
# check for nominatim returning an empty result set
item.address_string = '[form_var_string], paris, france'
formdata.geolocations = None
with mock.patch('wcs.wf.geolocate.http_get_page') as http_get_page:
http_get_page.return_value = (None, 200, force_bytes(json.dumps([])), None)
with responses.RequestsMock() as rsps:
rsps.get('http://example.net/', json=[])
item.perform(formdata)
assert formdata.geolocations == {}
# check for nominatim bad json
formdata.geolocations = None
with mock.patch('wcs.wf.geolocate.http_get_page') as http_get_page:
http_get_page.return_value = (None, 200, b'bad json', None)
with responses.RequestsMock() as rsps:
rsps.get('http://example.net/', body=b'bad json')
item.perform(formdata)
assert formdata.geolocations == {}
# check for nominatim connection error
formdata.geolocations = None
with mock.patch('wcs.wf.geolocate.http_get_page') as http_get_page:
http_get_page.side_effect = ConnectionError('some error')
with responses.RequestsMock() as rsps:
rsps.get('http://example.net/', body=ConnectionError('some error'))
item.perform(formdata)
assert formdata.geolocations == {}
assert pub.loggederror_class.count() == 1
@ -4388,11 +4364,11 @@ def test_profile(pub):
assert User.get(user.id).form_data.get('3') == 'Plop'
assert User.get(user.id).form_data.get('4').tm_year == 2018
with mock.patch('wcs.wf.profile.http_patch_request') as http_patch_request:
http_patch_request.return_value = (None, 200, '', None)
with responses.RequestsMock() as rsps:
rsps.patch('http://idp.example.net/api/users/xyz/')
get_response().process_after_jobs()
assert http_patch_request.call_count == 1
assert http_patch_request.call_args[0][1] == '{"bar": "2018-03-20"}'
assert len(rsps.calls) == 1
assert rsps.calls[-1].request.body == '{"bar": "2018-03-20"}'
for date_value in ('baddate', '', {}, [], None):
# reset date to a known value
@ -4407,24 +4383,24 @@ def test_profile(pub):
else: # empty value : empty field
assert User.get(user.id).form_data.get('4') is None
with mock.patch('wcs.wf.profile.http_patch_request') as http_patch_request:
http_patch_request.return_value = (None, 200, '', None)
with responses.RequestsMock() as rsps:
rsps.patch('http://idp.example.net/api/users/xyz/')
get_response().process_after_jobs()
assert http_patch_request.call_count == 1
assert len(rsps.calls) == 1
if date_value not in (None, ''): # bad value : do nothing
assert http_patch_request.call_args[0][1] == '{}'
assert rsps.calls[-1].request.body == '{}'
else: # empty value : null field
assert http_patch_request.call_args[0][1] == '{"bar": null}'
assert rsps.calls[-1].request.body == '{"bar": null}'
# out of http request/response cycle (cron, after_job)
pub._set_request(None)
item.fields = [{'field_id': 'bar', 'value': '01/01/2020'}]
with mock.patch('wcs.wf.profile.http_patch_request') as http_patch_request:
http_patch_request.return_value = (None, 200, '', None)
with responses.RequestsMock() as rsps:
rsps.patch('http://idp.example.net/api/users/xyz/')
item.perform(formdata)
assert User.get(user.id).form_data.get('4').tm_year == 2020
assert http_patch_request.call_count == 1
assert http_patch_request.call_args[0][1] == '{"bar": "2020-01-01"}'
assert len(rsps.calls) == 1
assert rsps.calls[-1].request.body == '{"bar": "2020-01-01"}'
def test_set_backoffice_field(http_requests, pub):