tests: use django-webtest instead of webtest.TestApp (#19881)
This commit is contained in:
parent
9d8e995aad
commit
3c8df18794
|
@ -2,12 +2,10 @@ import re
|
|||
import sys
|
||||
|
||||
from django.contrib.auth.models import User
|
||||
from django.core.wsgi import get_wsgi_application
|
||||
from django.contrib.contenttypes.models import ContentType
|
||||
from django.core.urlresolvers import reverse
|
||||
|
||||
import pytest
|
||||
from webtest import TestApp
|
||||
|
||||
from passerelle.base import signature
|
||||
from passerelle.base.models import ApiUser, AccessRight
|
||||
|
@ -15,21 +13,19 @@ from oxyd.models import OxydSMSGateway
|
|||
|
||||
pytestmark = pytest.mark.django_db
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def setup():
|
||||
app = TestApp(get_wsgi_application())
|
||||
oxyd = OxydSMSGateway.objects.create(title='eservices',
|
||||
def oxyd(db):
|
||||
return OxydSMSGateway.objects.create(title='eservices',
|
||||
slug='eservices',
|
||||
username='user',
|
||||
description='oxyd',
|
||||
password='secret')
|
||||
return app, oxyd
|
||||
|
||||
def test_anonymous_access(setup):
|
||||
app, oxyd = setup
|
||||
def test_anonymous_access(app, oxyd):
|
||||
endpoint_url = reverse('generic-endpoint',
|
||||
kwargs={'connector': 'oxyd', 'slug': oxyd.slug, 'endpoint': 'send'})
|
||||
resp = app.post_json(endpoint_url, {}, status=403)
|
||||
resp = app.post_json(endpoint_url, params={}, status=403)
|
||||
assert resp.json['err'] == 1
|
||||
assert resp.json['err_class'] == 'django.core.exceptions.PermissionDenied'
|
||||
|
||||
|
@ -43,14 +39,13 @@ def test_anonymous_access(setup):
|
|||
resource_type=obj_type,
|
||||
resource_pk=oxyd.pk,
|
||||
)
|
||||
resp = app.post_json(endpoint_url, {})
|
||||
resp = app.post_json(endpoint_url, params={})
|
||||
# for empty payload the connector returns an APIError with
|
||||
# {"err_desc": "missing \"message\" in JSON payload"}
|
||||
assert resp.json['err'] == 1
|
||||
assert resp.json['err_desc'] == 'Payload error: missing "message" in JSON payload'
|
||||
|
||||
def test_access_with_signature(setup):
|
||||
app, oxyd = setup
|
||||
def test_access_with_signature(app, oxyd):
|
||||
api = ApiUser.objects.create(username='eservices',
|
||||
fullname='Eservices User',
|
||||
description='eservices',
|
||||
|
@ -68,29 +63,28 @@ def test_access_with_signature(setup):
|
|||
url = signature.sign_url(endpoint_url + '?orig=eservices', '12345')
|
||||
# for empty payload the connector returns an APIError with
|
||||
# {"err_desc": "missing \"message\" in JSON payload"}
|
||||
resp = app.post_json(url, {})
|
||||
resp = app.post_json(url, params={})
|
||||
assert resp.json['err'] == 1
|
||||
assert resp.json['err_desc'] == 'Payload error: missing "message" in JSON payload'
|
||||
# bad key
|
||||
url = signature.sign_url(endpoint_url + '?orig=eservices', 'notmykey')
|
||||
resp = app.post_json(url, {}, status=403)
|
||||
resp = app.post_json(url, params={}, status=403)
|
||||
assert resp.json['err'] == 1
|
||||
assert resp.json['err_class'] == 'django.core.exceptions.PermissionDenied'
|
||||
|
||||
# trusted user (from settings.KNOWN_SERVICES)
|
||||
url = signature.sign_url(endpoint_url + '?orig=wcs1', 'abcde')
|
||||
resp = app.post_json(url, {})
|
||||
resp = app.post_json(url, params={})
|
||||
assert resp.json['err'] == 1
|
||||
assert resp.json['err_desc'] == 'Payload error: missing "message" in JSON payload'
|
||||
# bad key
|
||||
url = signature.sign_url(endpoint_url + '?orig=wcs1', 'notmykey')
|
||||
resp = app.post_json(url, {}, status=403)
|
||||
resp = app.post_json(url, params={}, status=403)
|
||||
assert resp.json['err'] == 1
|
||||
assert resp.json['err_class'] == 'django.core.exceptions.PermissionDenied'
|
||||
|
||||
|
||||
def test_access_http_auth(setup):
|
||||
app, oxyd = setup
|
||||
def test_access_http_auth(app, oxyd):
|
||||
username = 'apiuser'
|
||||
password = '12345'
|
||||
api = ApiUser.objects.create(username=username,
|
||||
|
@ -108,12 +102,11 @@ def test_access_http_auth(setup):
|
|||
app.authorization = ('Basic', (username, password))
|
||||
endpoint_url = reverse('generic-endpoint',
|
||||
kwargs={'connector': 'oxyd', 'slug': oxyd.slug, 'endpoint': 'send'})
|
||||
resp = app.post_json(endpoint_url, {})
|
||||
resp = app.post_json(endpoint_url, params={})
|
||||
assert resp.json['err'] == 1
|
||||
assert resp.json['err_desc'] == 'Payload error: missing "message" in JSON payload'
|
||||
|
||||
def test_access_apikey(setup):
|
||||
app, oxyd = setup
|
||||
def test_access_apikey(app, oxyd):
|
||||
password = 'apiuser_12345'
|
||||
api = ApiUser.objects.create(username='apiuser',
|
||||
fullname='Api User',
|
||||
|
@ -130,15 +123,14 @@ def test_access_apikey(setup):
|
|||
params = {'message': 'test'}
|
||||
endpoint_url = reverse('generic-endpoint',
|
||||
kwargs={'connector': 'oxyd', 'slug': oxyd.slug, 'endpoint': 'send'})
|
||||
resp = app.post_json(endpoint_url + '?apikey=' + password , params)
|
||||
resp = app.post_json(endpoint_url + '?apikey=' + password , params=params)
|
||||
resp.json['err'] == 1
|
||||
assert resp.json['err_desc'] == 'Payload error: missing "from" in JSON payload'
|
||||
resp = app.post_json(endpoint_url + '?apikey=' + password[:3] , params, status=403)
|
||||
resp = app.post_json(endpoint_url + '?apikey=' + password[:3] , params=params, status=403)
|
||||
resp.json['err'] == 1
|
||||
assert resp.json['err_class'] == 'django.core.exceptions.PermissionDenied'
|
||||
|
||||
def test_access_apiuser_with_no_key(setup):
|
||||
app, oxyd = setup
|
||||
def test_access_apiuser_with_no_key(app, oxyd):
|
||||
api = ApiUser.objects.create(username='apiuser',
|
||||
fullname='Api User',
|
||||
description='api')
|
||||
|
@ -152,12 +144,11 @@ def test_access_apiuser_with_no_key(setup):
|
|||
params = {'message': 'test', 'from': 'test api'}
|
||||
endpoint_url = reverse('generic-endpoint',
|
||||
kwargs={'connector': 'oxyd', 'slug': oxyd.slug, 'endpoint': 'send'})
|
||||
resp = app.post_json(endpoint_url, params)
|
||||
resp = app.post_json(endpoint_url, params=params)
|
||||
assert resp.json['err'] == 1
|
||||
assert resp.json['err_desc'] == 'Payload error: missing "to" in JSON payload'
|
||||
|
||||
def test_access_apiuser_with_ip_restriction(setup):
|
||||
app, oxyd = setup
|
||||
def test_access_apiuser_with_ip_restriction(app, oxyd):
|
||||
authorized_ip = '176.31.123.109'
|
||||
api = ApiUser.objects.create(username='apiuser',
|
||||
fullname='Api User',
|
||||
|
@ -173,14 +164,14 @@ def test_access_apiuser_with_ip_restriction(setup):
|
|||
)
|
||||
endpoint_url = reverse('generic-endpoint',
|
||||
kwargs={'connector': 'oxyd', 'slug': oxyd.slug, 'endpoint': 'send'})
|
||||
resp = app.post_json(endpoint_url, {}, extra_environ=[('REMOTE_ADDR', '127.0.0.1')],
|
||||
resp = app.post_json(endpoint_url, params={}, extra_environ={'REMOTE_ADDR': '127.0.0.1'},
|
||||
status=403)
|
||||
assert resp.json['err'] == 1
|
||||
assert resp.json['err_class'] == 'django.core.exceptions.PermissionDenied'
|
||||
|
||||
endpoint_url = reverse('generic-endpoint',
|
||||
kwargs={'connector': 'oxyd', 'slug': oxyd.slug, 'endpoint': 'send'})
|
||||
resp = app.post_json(endpoint_url, {},
|
||||
extra_environ=[('REMOTE_ADDR', authorized_ip)])
|
||||
resp = app.post_json(endpoint_url, params={},
|
||||
extra_environ={'REMOTE_ADDR': authorized_ip})
|
||||
assert resp.json['err'] == 1
|
||||
assert resp.json['err_desc'] == 'Payload error: missing "message" in JSON payload'
|
||||
|
|
|
@ -4,8 +4,6 @@ import os
|
|||
import pytest
|
||||
from StringIO import StringIO
|
||||
|
||||
from django.core.wsgi import get_wsgi_application
|
||||
from webtest import TestApp
|
||||
from django.contrib.auth.models import User
|
||||
from django.core.files import File
|
||||
from django.core.urlresolvers import reverse
|
||||
|
@ -86,9 +84,9 @@ def test_default_column_keynames(setup, filetype):
|
|||
assert 'id' in csvdata.columns_keynames
|
||||
assert 'text' in csvdata.columns_keynames
|
||||
|
||||
def test_sheet_name_error(setup, filetype, admin_user):
|
||||
def test_sheet_name_error(setup, app, filetype, admin_user):
|
||||
csvdata, url = setup('field,,another_field,', filename=filetype, data=get_file_content(filetype))
|
||||
app = login(TestApp(get_wsgi_application()))
|
||||
app = login(app)
|
||||
resp = app.get('/manage/csvdatasource/test/edit')
|
||||
edit_form = resp.forms[0]
|
||||
edit_form['sheet_name'] = ''
|
||||
|
@ -546,9 +544,9 @@ def test_edit_connector_queries(admin_user, app, setup, filetype):
|
|||
url = reverse('view-connector', kwargs={'connector': 'csvdatasource', 'slug': csvdata.slug})
|
||||
resp = app.get(url)
|
||||
assert not 'New Query' in resp.body
|
||||
resp = app.get(reverse('csv-new-query', kwargs={'connector_slug': csvdata.slug}),
|
||||
status=302)
|
||||
assert resp.location.startswith('http://testserver/login/')
|
||||
new_query_url = reverse('csv-new-query', kwargs={'connector_slug': csvdata.slug})
|
||||
resp = app.get(new_query_url, status=302)
|
||||
assert resp.location.endswith('/login/?next=%s' % new_query_url)
|
||||
|
||||
app = login(app)
|
||||
resp = app.get(url)
|
||||
|
@ -574,11 +572,10 @@ def test_edit_connector_queries(admin_user, app, setup, filetype):
|
|||
assert len(resp.json['data']) == 1
|
||||
assert resp.json['data'][0]['prenom'] == 'Lucie'
|
||||
|
||||
def test_download_file(setup, filetype, admin_user):
|
||||
def test_download_file(app, setup, filetype, admin_user):
|
||||
csvdata, url = setup('field,,another_field,', filename=filetype, data=get_file_content(filetype))
|
||||
app = TestApp(get_wsgi_application())
|
||||
assert '/login' in app.get('/manage/csvdatasource/test/download/').location
|
||||
app = login(TestApp(get_wsgi_application()))
|
||||
app = login(app)
|
||||
resp = app.get('/manage/csvdatasource/test/download/', status=200)
|
||||
if filetype == 'data.csv':
|
||||
assert resp.headers['Content-Type'] == 'text/csv; charset=utf-8'
|
||||
|
|
|
@ -2,14 +2,11 @@
|
|||
|
||||
import json
|
||||
|
||||
from django.core.wsgi import get_wsgi_application
|
||||
from django.core.urlresolvers import reverse
|
||||
|
||||
import pytest
|
||||
from passerelle.contrib.fake_family.models import FakeFamily
|
||||
|
||||
from webtest import TestApp
|
||||
|
||||
pytestmark = pytest.mark.django_db
|
||||
|
||||
def test_init_fake_family():
|
||||
|
@ -25,18 +22,14 @@ def test_init_fake_family():
|
|||
|
||||
|
||||
@pytest.fixture
|
||||
def setup():
|
||||
app = TestApp(get_wsgi_application())
|
||||
fakefam = FakeFamily.objects.create(title='fakefam', slug='fakefam')
|
||||
return app, fakefam
|
||||
def fakefam(db):
|
||||
return FakeFamily.objects.create(title='fakefam', slug='fakefam')
|
||||
|
||||
def test_fake_family_dump(setup):
|
||||
app, fakefam = setup
|
||||
def test_fake_family_dump(app, fakefam):
|
||||
resp = app.get(reverse('fake-family-dump', kwargs={'slug': fakefam.slug}))
|
||||
assert resp.json == fakefam.jsondatabase
|
||||
|
||||
def test_fake_family_urls(setup):
|
||||
app, fakefam = setup
|
||||
def test_fake_family_urls(app, fakefam):
|
||||
name_id = '__test_name_id__'
|
||||
family = fakefam.jsondatabase['families']['1']
|
||||
adult_id = family['adults'][0]
|
||||
|
@ -68,8 +61,7 @@ def test_fake_family_urls(setup):
|
|||
assert resp.json['err'] == 0
|
||||
assert resp.json['data'] == 'ok (but there was no links)'
|
||||
|
||||
def test_fake_family_bad_login(setup):
|
||||
app, fakefam = setup
|
||||
def test_fake_family_bad_login(app, fakefam):
|
||||
name_id = '__test_name_id__'
|
||||
family = fakefam.jsondatabase['families']['1']
|
||||
adult_id = family['adults'][0]
|
||||
|
|
|
@ -9,7 +9,6 @@ import shutil
|
|||
|
||||
from django.core.exceptions import ValidationError
|
||||
from django.core.files import File
|
||||
from django.core.wsgi import get_wsgi_application
|
||||
from django.core.urlresolvers import reverse
|
||||
from django.utils import timezone
|
||||
from django.utils.http import urlencode
|
||||
|
@ -32,19 +31,16 @@ nameid = 'foobarnameid'
|
|||
API_KEY = 'family'
|
||||
|
||||
@pytest.fixture
|
||||
def setup():
|
||||
from webtest import TestApp
|
||||
def resource(db):
|
||||
filepath = os.path.join(os.path.dirname(__file__), 'data', 'family_data.zip')
|
||||
resource = GenericFamily.objects.create(title='test', slug='test', archive=filepath)
|
||||
api = ApiUser.objects.create(username='family', keytype='API', key=API_KEY)
|
||||
obj_type = ContentType.objects.get_for_model(GenericFamily)
|
||||
AccessRight.objects.create(codename='can_access', apiuser=api,
|
||||
resource_type=obj_type, resource_pk=resource.pk)
|
||||
app = TestApp(get_wsgi_application())
|
||||
return app, resource
|
||||
return resource
|
||||
|
||||
def test_link_to_family(setup):
|
||||
app, resource = setup
|
||||
def test_link_to_family(app, resource):
|
||||
r = app.get(reverse('generic-endpoint', kwargs={'connector': 'family',
|
||||
'slug': resource.slug, 'endpoint': 'family', 'rest': 'link/'}),
|
||||
params={'NameID': nameid, 'login': '9407',
|
||||
|
@ -52,8 +48,7 @@ def test_link_to_family(setup):
|
|||
assert r.json['data']
|
||||
assert FamilyLink.objects.filter(resource=resource, name_id=nameid).exists()
|
||||
|
||||
def test_unlink_from_family(setup):
|
||||
app, resource = setup
|
||||
def test_unlink_from_family(app, resource):
|
||||
r = app.get(reverse('generic-endpoint', kwargs={'connector': 'family',
|
||||
'slug': resource.slug, 'endpoint': 'family', 'rest': 'link/'}),
|
||||
params={'NameID': nameid, 'login': '1364',
|
||||
|
@ -65,8 +60,7 @@ def test_unlink_from_family(setup):
|
|||
assert r.json['data']
|
||||
assert not FamilyLink.objects.filter(resource=resource, name_id=nameid).exists()
|
||||
|
||||
def test_family_infos(setup):
|
||||
app, resource = setup
|
||||
def test_family_infos(app, resource):
|
||||
r = app.get(reverse('generic-endpoint', kwargs={'connector': 'family',
|
||||
'slug': resource.slug, 'endpoint': 'family', 'rest': 'link/'}),
|
||||
params={'NameID': nameid, 'login': '9407',
|
||||
|
@ -87,8 +81,7 @@ def test_family_infos(setup):
|
|||
assert data['address'] is not None
|
||||
assert data['quotient'] == '1370.50'
|
||||
|
||||
def test_family_members(setup):
|
||||
app, resource = setup
|
||||
def test_family_members(app, resource):
|
||||
r = app.get(reverse('generic-endpoint', kwargs={'connector': 'family',
|
||||
'slug': resource.slug, 'endpoint': 'family', 'rest': 'link/'}),
|
||||
params={'NameID': nameid, 'login': '23762',
|
||||
|
@ -112,8 +105,7 @@ def test_family_members(setup):
|
|||
assert person['city'] is not None
|
||||
assert person['address'] is not None
|
||||
|
||||
def test_get_family_invoices(setup):
|
||||
app, resource = setup
|
||||
def test_get_family_invoices(app, resource):
|
||||
r = app.get(reverse('generic-endpoint', kwargs={'connector': 'family',
|
||||
'slug': resource.slug, 'endpoint': 'family', 'rest': 'link/'}),
|
||||
params={'NameID': nameid, 'login': '19184',
|
||||
|
@ -134,8 +126,7 @@ def test_get_family_invoices(setup):
|
|||
assert i['paid']
|
||||
assert i['has_pdf']
|
||||
|
||||
def test_get_family_invoice(setup):
|
||||
app, resource = setup
|
||||
def test_get_family_invoice(app, resource):
|
||||
r = app.get(reverse('generic-endpoint', kwargs={'connector': 'family',
|
||||
'slug': resource.slug, 'endpoint': 'family', 'rest': 'link/'}),
|
||||
params={'NameID': nameid, 'login': '19184',
|
||||
|
@ -155,8 +146,7 @@ def test_get_family_invoice(setup):
|
|||
for field in ('label', 'amount', 'paid', 'created', 'pay_limit_date'):
|
||||
assert r.json['data'][field] == invoice[field]
|
||||
|
||||
def test_get_invoice_pdf(setup):
|
||||
app, resource = setup
|
||||
def test_get_invoice_pdf(app, resource):
|
||||
r = app.get(reverse('generic-endpoint', kwargs={'connector': 'family',
|
||||
'slug': resource.slug, 'endpoint': 'family', 'rest': 'link/'}),
|
||||
params={'NameID': nameid, 'login': '11959',
|
||||
|
@ -176,8 +166,7 @@ def test_get_invoice_pdf(setup):
|
|||
assert r.headers['Content-Type'] == 'application/pdf'
|
||||
assert r.headers['Content-Disposition'] == 'attachment; filename=%s.pdf' % invoice['label']
|
||||
|
||||
def test_pay_invoice(setup):
|
||||
app, resource = setup
|
||||
def test_pay_invoice(app, resource):
|
||||
r = app.get(reverse('generic-endpoint', kwargs={'connector': 'family',
|
||||
'slug': resource.slug, 'endpoint': 'family', 'rest': 'link/'}),
|
||||
params={'NameID': nameid, 'login': '19184',
|
||||
|
@ -191,7 +180,7 @@ def test_pay_invoice(setup):
|
|||
'slug': resource.slug, 'endpoint': 'regie',
|
||||
'rest': 'invoice/%s/pay/' % invoice['label']})
|
||||
r = app.post_json(payment_url + '?' + urlencode({'apikey': API_KEY}),
|
||||
{'transaction_id': str(uuid4()),
|
||||
params={'transaction_id': str(uuid4()),
|
||||
'transaction_date': timezone.now().strftime(DATETIME_FORMAT)}
|
||||
)
|
||||
assert r.json['data']
|
||||
|
|
|
@ -9,7 +9,6 @@ import xml.etree.ElementTree as ET
|
|||
from dateutil import parser
|
||||
|
||||
from requests.exceptions import ConnectionError
|
||||
from django.core.wsgi import get_wsgi_application
|
||||
from django.core.urlresolvers import reverse
|
||||
from django.contrib.contenttypes.models import ContentType
|
||||
from django.utils import timezone
|
||||
|
@ -28,12 +27,10 @@ SOAP_NAMESPACES = {'soap': 'http://schemas.xmlsoap.org/soap/envelope/',
|
|||
}
|
||||
|
||||
@pytest.fixture
|
||||
def setup():
|
||||
from webtest import TestApp
|
||||
def conn():
|
||||
api = ApiUser.objects.create(username='iparapheur',
|
||||
keytype='API',
|
||||
key=API_KEY)
|
||||
app = TestApp(get_wsgi_application())
|
||||
conn = IParapheur.objects.create(title='parapheur', slug='parapheur',
|
||||
wsdl_url=WSDL_URL, username='test',
|
||||
password='secret')
|
||||
|
@ -41,7 +38,7 @@ def setup():
|
|||
AccessRight.objects.create(codename='can_access',
|
||||
apiuser=api, resource_type=obj_type,
|
||||
resource_pk=conn.pk)
|
||||
return app, conn
|
||||
return conn
|
||||
|
||||
@pytest.fixture
|
||||
def xmlmime():
|
||||
|
@ -52,8 +49,7 @@ def wsdl_file():
|
|||
return os.path.join(os.path.dirname(__file__), 'data','iparapheur.wsdl')
|
||||
|
||||
@mock.patch('passerelle.contrib.iparapheur.models.soap_get_client')
|
||||
def test_call_ping(soap_client, setup):
|
||||
app, conn = setup
|
||||
def test_call_ping(soap_client, app, conn):
|
||||
service = mock.Mock()
|
||||
service.echo.return_value = 'pong'
|
||||
mocked_client = mock.Mock(service=service)
|
||||
|
@ -68,8 +64,7 @@ def test_call_ping(soap_client, setup):
|
|||
|
||||
@mock.patch('passerelle.contrib.iparapheur.models.soap_get_client',
|
||||
side_effect=ConnectionError('mocked error'))
|
||||
def test_call_ping_connectionerror(soap_client, setup):
|
||||
app, conn = setup
|
||||
def test_call_ping_connectionerror(soap_client, app, conn):
|
||||
url = reverse('generic-endpoint', kwargs={'connector': 'iparapheur',
|
||||
'endpoint': 'ping', 'slug': conn.slug})
|
||||
resp = app.get(url, status=403)
|
||||
|
@ -82,8 +77,7 @@ def test_call_ping_connectionerror(soap_client, setup):
|
|||
@mock.patch('passerelle.utils.LoggedRequest.get')
|
||||
@mock.patch('passerelle.utils.LoggedRequest.post')
|
||||
@mock.patch('passerelle.contrib.iparapheur.soap.HttpAuthenticated.open')
|
||||
def test_create_file(http_open, mocked_post, mocked_get, setup, xmlmime, wsdl_file):
|
||||
app, conn = setup
|
||||
def test_create_file(http_open, mocked_post, mocked_get, app, conn, xmlmime, wsdl_file):
|
||||
file_id = str(uuid.uuid4())
|
||||
typ = 'Courrier'
|
||||
subtyp = 'maire'
|
||||
|
@ -108,9 +102,9 @@ def test_create_file(http_open, mocked_post, mocked_get, setup, xmlmime, wsdl_fi
|
|||
|
||||
url = reverse('generic-endpoint', kwargs={'connector': 'iparapheur',
|
||||
'endpoint': 'create-file', 'slug': conn.slug})
|
||||
resp = app.post_json(url, data, status=403)
|
||||
resp = app.post_json(url, params=data, status=403)
|
||||
url += '?apikey=%s' % API_KEY
|
||||
resp = app.post_json(url, data)
|
||||
resp = app.post_json(url, params=data)
|
||||
# check output call args
|
||||
assert (BASE_URL,) == mocked_post.call_args[0]
|
||||
xml = ET.fromstring(mocked_post.call_args[1].get('data'))
|
||||
|
@ -126,9 +120,7 @@ def test_create_file(http_open, mocked_post, mocked_get, setup, xmlmime, wsdl_fi
|
|||
@mock.patch('passerelle.utils.LoggedRequest.get')
|
||||
@mock.patch('passerelle.utils.LoggedRequest.post')
|
||||
@mock.patch('passerelle.contrib.iparapheur.soap.HttpAuthenticated.open')
|
||||
def get_get_files(http_open, mocked_post, mocked_get, setup, xmlmime, wsdl_file):
|
||||
app, conn = setup
|
||||
|
||||
def get_get_files(http_open, mocked_post, mocked_get, app, conn, xmlmime, wsdl_file):
|
||||
soap_response = """
|
||||
<?xml version='1.0' encoding='UTF-8'?><S:Envelope xmlns:S="http://schemas.xmlsoap.org/soap/envelope/"><S:Body><RechercherDossiersResponse xmlns="http://www.adullact.org/spring-ws/iparapheur/1.0" xmlns:xmime="http://www.w3.org/2005/05/xmlmime"><LogDossier><timestamp>2015-10-29T15:42:08.732+01:00</timestamp><nom>test</nom><status>Lu</status><annotation></annotation><accessible>KO</accessible></LogDossier><LogDossier><timestamp>2015-10-29T15:52:42.167+01:00</timestamp><nom>Test2</nom><status>RejetSignataire</status><annotation></annotation><accessible>OK</accessible></LogDossier><LogDossier><timestamp>2015-11-25T12:13:30.830+01:00</timestamp><nom>6ceecfb7-67ee-4388-8943-35911c640031</nom><status>NonLu</status><annotation></annotation><accessible>Recuperable</accessible></LogDossier></RechercherDossiersResponse></S:Body></S:Envelope>
|
||||
"""
|
||||
|
@ -151,8 +143,7 @@ def get_get_files(http_open, mocked_post, mocked_get, setup, xmlmime, wsdl_file)
|
|||
@mock.patch('passerelle.utils.LoggedRequest.get')
|
||||
@mock.patch('passerelle.utils.LoggedRequest.post')
|
||||
@mock.patch('passerelle.contrib.iparapheur.soap.HttpAuthenticated.open')
|
||||
def test_get_file_status(http_open, mocked_post, mocked_get, setup, xmlmime, wsdl_file):
|
||||
app, conn = setup
|
||||
def test_get_file_status(http_open, mocked_post, mocked_get, app, conn, xmlmime, wsdl_file):
|
||||
file_id = str(uuid.uuid4())
|
||||
|
||||
soap_response = """
|
||||
|
@ -183,8 +174,7 @@ def test_get_file_status(http_open, mocked_post, mocked_get, setup, xmlmime, wsd
|
|||
@mock.patch('passerelle.utils.LoggedRequest.get')
|
||||
@mock.patch('passerelle.utils.LoggedRequest.post')
|
||||
@mock.patch('passerelle.contrib.iparapheur.soap.HttpAuthenticated.open')
|
||||
def test_get_file(http_open, mocked_post, mocked_get, setup, xmlmime, wsdl_file):
|
||||
app, conn = setup
|
||||
def test_get_file(http_open, mocked_post, mocked_get, app, conn, xmlmime, wsdl_file):
|
||||
file_id = str(uuid.uuid4())
|
||||
|
||||
soap_response = file(os.path.join(os.path.dirname(__file__),
|
||||
|
@ -208,8 +198,7 @@ def test_get_file(http_open, mocked_post, mocked_get, setup, xmlmime, wsdl_file)
|
|||
@mock.patch('passerelle.utils.LoggedRequest.get')
|
||||
@mock.patch('passerelle.utils.LoggedRequest.post')
|
||||
@mock.patch('passerelle.contrib.iparapheur.soap.HttpAuthenticated.open')
|
||||
def test_get_file_invalid_appendix(http_open, mocked_post, mocked_get, setup, xmlmime, wsdl_file):
|
||||
app, conn = setup
|
||||
def test_get_file_invalid_appendix(http_open, mocked_post, mocked_get, app, conn, xmlmime, wsdl_file):
|
||||
file_id = str(uuid.uuid4())
|
||||
|
||||
soap_response = file(os.path.join(os.path.dirname(__file__),
|
||||
|
@ -231,8 +220,7 @@ def test_get_file_invalid_appendix(http_open, mocked_post, mocked_get, setup, xm
|
|||
@mock.patch('passerelle.utils.LoggedRequest.get')
|
||||
@mock.patch('passerelle.utils.LoggedRequest.post')
|
||||
@mock.patch('passerelle.contrib.iparapheur.soap.HttpAuthenticated.open')
|
||||
def test_get_file_not_found_appendix(http_open, mocked_post, mocked_get, setup, xmlmime, wsdl_file):
|
||||
app, conn = setup
|
||||
def test_get_file_not_found_appendix(http_open, mocked_post, mocked_get, app, conn, xmlmime, wsdl_file):
|
||||
file_id = str(uuid.uuid4())
|
||||
|
||||
soap_response = file(os.path.join(os.path.dirname(__file__),
|
||||
|
@ -254,8 +242,7 @@ def test_get_file_not_found_appendix(http_open, mocked_post, mocked_get, setup,
|
|||
@mock.patch('passerelle.utils.LoggedRequest.get')
|
||||
@mock.patch('passerelle.utils.LoggedRequest.post')
|
||||
@mock.patch('passerelle.contrib.iparapheur.soap.HttpAuthenticated.open')
|
||||
def test_get_file_appendix(http_open, mocked_post, mocked_get, setup, xmlmime, wsdl_file):
|
||||
app, conn = setup
|
||||
def test_get_file_appendix(http_open, mocked_post, mocked_get, app, conn, xmlmime, wsdl_file):
|
||||
file_id = str(uuid.uuid4())
|
||||
|
||||
soap_response = file(os.path.join(os.path.dirname(__file__),
|
||||
|
@ -280,8 +267,7 @@ def test_get_file_appendix(http_open, mocked_post, mocked_get, setup, xmlmime, w
|
|||
@mock.patch('passerelle.utils.LoggedRequest.get')
|
||||
@mock.patch('passerelle.utils.LoggedRequest.post')
|
||||
@mock.patch('passerelle.contrib.iparapheur.soap.HttpAuthenticated.open')
|
||||
def test_invalid_response(http_open, mocked_post, mocked_get, setup, xmlmime, wsdl_file):
|
||||
app, conn = setup
|
||||
def test_invalid_response(http_open, mocked_post, mocked_get, app, conn, xmlmime, wsdl_file):
|
||||
file_id = str(uuid.uuid4())
|
||||
|
||||
http_open.return_value = file(xmlmime)
|
||||
|
@ -299,8 +285,7 @@ def test_invalid_response(http_open, mocked_post, mocked_get, setup, xmlmime, ws
|
|||
@mock.patch('passerelle.utils.LoggedRequest.get')
|
||||
@mock.patch('passerelle.utils.LoggedRequest.post')
|
||||
@mock.patch('passerelle.contrib.iparapheur.soap.HttpAuthenticated.open')
|
||||
def test_webfault_response(http_open, mocked_post, mocked_get, setup, xmlmime, wsdl_file):
|
||||
app, conn = setup
|
||||
def test_webfault_response(http_open, mocked_post, mocked_get, app, conn, xmlmime, wsdl_file):
|
||||
file_id = str(uuid.uuid4())
|
||||
url = reverse('generic-endpoint', kwargs={'slug': conn.slug,
|
||||
'connector': 'iparapheur', 'endpoint': 'get-file-status', 'rest': file_id})
|
||||
|
|
|
@ -1,9 +1,7 @@
|
|||
import re
|
||||
|
||||
from django.contrib.auth.models import User
|
||||
from django.core.wsgi import get_wsgi_application
|
||||
import pytest
|
||||
from webtest import TestApp
|
||||
|
||||
from passerelle.base.models import ApiUser
|
||||
|
||||
|
@ -34,31 +32,29 @@ def login(app, username='admin', password='admin'):
|
|||
assert resp.status_int == 302
|
||||
return app
|
||||
|
||||
def test_homepage_redirect():
|
||||
app = TestApp(get_wsgi_application())
|
||||
assert app.get('/', status=302).location == 'http://localhost:80/manage/'
|
||||
def test_homepage_redirect(app):
|
||||
assert app.get('/', status=302).location.endswith('/manage/')
|
||||
|
||||
def test_unlogged_access():
|
||||
def test_unlogged_access(app):
|
||||
# connect while not being logged in
|
||||
app = TestApp(get_wsgi_application())
|
||||
assert app.get('/manage/', status=302).location == 'http://localhost:80/login/?next=/manage/'
|
||||
assert app.get('/manage/', status=302).location.endswith('/login/?next=/manage/')
|
||||
|
||||
def test_simple_user_access(simple_user):
|
||||
def test_simple_user_access(app, simple_user):
|
||||
# connect while being logged as a simple user
|
||||
app = login(TestApp(get_wsgi_application()), username='user', password='user')
|
||||
app = login(app, username='user', password='user')
|
||||
assert app.get('/manage/', status=403)
|
||||
assert app.get('/manage/add', status=403)
|
||||
assert app.get('/manage/ovh/add', status=403)
|
||||
assert app.get('/manage/access/', status=403)
|
||||
|
||||
def test_access(admin_user):
|
||||
app = login(TestApp(get_wsgi_application()))
|
||||
def test_access(app, admin_user):
|
||||
app = login(app)
|
||||
resp = app.get('/manage/', status=200)
|
||||
assert 'Add Connector' in resp.body
|
||||
assert app.get('/manage/access/', status=200)
|
||||
|
||||
def test_add_connector(admin_user):
|
||||
app = login(TestApp(get_wsgi_application()))
|
||||
def test_add_connector(app, admin_user):
|
||||
app = login(app)
|
||||
resp = app.get('/manage/', status=200)
|
||||
resp = resp.click('Add Connector')
|
||||
assert 'Business Process Connectors' in resp.body
|
||||
|
@ -69,23 +65,23 @@ def test_add_connector(admin_user):
|
|||
resp.forms[0]['service_url'] = 'https://api-adresse.data.gouv.fr/'
|
||||
resp = resp.forms[0].submit()
|
||||
assert resp.status_int == 302
|
||||
assert resp.location == 'http://localhost:80/base-adresse/test-connector/'
|
||||
assert resp.location.endswith('/base-adresse/test-connector/')
|
||||
resp = resp.follow()
|
||||
assert 'Base Adresse Web Service - Test Connector' in resp.body
|
||||
|
||||
resp = app.get('/manage/', status=200)
|
||||
assert 'Test Connector' in resp.body
|
||||
|
||||
def test_visit_connectors(admin_user):
|
||||
app = login(TestApp(get_wsgi_application()))
|
||||
def test_visit_connectors(app, admin_user):
|
||||
app = login(app)
|
||||
resp = app.get('/manage/', status=200)
|
||||
resp = resp.click('Add Connector')
|
||||
for link in re.findall('href="(/manage.*add)"', resp.body):
|
||||
resp = app.get(link, status=200)
|
||||
|
||||
def test_access_management(admin_user):
|
||||
def test_access_management(app, admin_user):
|
||||
assert ApiUser.objects.count() == 0
|
||||
app = login(TestApp(get_wsgi_application()))
|
||||
app = login(app)
|
||||
resp = app.get('/manage/', status=200)
|
||||
resp = resp.click('Access Management')
|
||||
resp = resp.click('Add API User')
|
||||
|
|
|
@ -4,7 +4,6 @@ import pytest
|
|||
import mock
|
||||
import requests
|
||||
|
||||
from django.core.wsgi import get_wsgi_application
|
||||
from django.core.urlresolvers import reverse
|
||||
from django.contrib.contenttypes.models import ContentType
|
||||
from django.utils.http import urlencode
|
||||
|
@ -81,22 +80,19 @@ def get_subscriptions_connection_error(*args, **kwargs):
|
|||
raise requests.ConnectionError
|
||||
|
||||
@pytest.fixture
|
||||
def setup():
|
||||
from webtest import TestApp
|
||||
def conn():
|
||||
api = ApiUser.objects.create(username='local', keytype='API', key=API_KEY)
|
||||
app = TestApp(get_wsgi_application())
|
||||
conn = MeyzieuNewsletters.objects.create(title='test', slug='push_mail',
|
||||
url=WS_URL, apikey=WS_API_KEY)
|
||||
obj_type = ContentType.objects.get_for_model(MeyzieuNewsletters)
|
||||
AccessRight.objects.create(codename='can_access',
|
||||
apiuser=api, resource_type=obj_type,
|
||||
resource_pk=conn.pk)
|
||||
return app, conn
|
||||
return conn
|
||||
|
||||
@mock.patch('passerelle.contrib.meyzieu_newsletters.models.requests.get',
|
||||
side_effect=mocked_requests_get)
|
||||
def test_get_newsletters(mock_get, setup):
|
||||
app, conn = setup
|
||||
def test_get_newsletters(mock_get, app, conn):
|
||||
resp = app.get(reverse('meyzieu-newsletters-types', kwargs={'slug': conn.slug}),
|
||||
params={'apikey': API_KEY}, status=200)
|
||||
newsletters_ids = []
|
||||
|
@ -118,8 +114,7 @@ def test_get_newsletters(mock_get, setup):
|
|||
|
||||
@mock.patch('passerelle.contrib.meyzieu_newsletters.models.requests.get',
|
||||
side_effect=mocked_requests_get)
|
||||
def test_get_subscriptions_to_non_existing_newsletters(mock_get, setup):
|
||||
app, conn = setup
|
||||
def test_get_subscriptions_to_non_existing_newsletters(mock_get, app, conn):
|
||||
resp = app.get(reverse('meyzieu-newsletters-subscriptions', kwargs={'slug': conn.slug}),
|
||||
params={'apikey': API_KEY, 'email': TEST_USER}, status=200)
|
||||
subscriptions = WS_SUBSCRIPTIONS_RETURN['abonnement']
|
||||
|
@ -138,8 +133,7 @@ def test_get_subscriptions_to_non_existing_newsletters(mock_get, setup):
|
|||
|
||||
@mock.patch('passerelle.contrib.meyzieu_newsletters.models.requests.get',
|
||||
side_effect=mocked_requests_get)
|
||||
def test_get_subscriptions(mock_get, setup):
|
||||
app, conn = setup
|
||||
def test_get_subscriptions(mock_get, app, conn):
|
||||
resp = app.get(reverse('meyzieu-newsletters-subscriptions', kwargs={'slug': conn.slug}),
|
||||
params={'apikey': API_KEY, 'email': TEST_USER}, status=200)
|
||||
subscriptions = WS_SUBSCRIPTIONS_RETURN['abonnement']
|
||||
|
@ -157,8 +151,7 @@ def test_get_subscriptions(mock_get, setup):
|
|||
|
||||
@mock.patch('passerelle.contrib.meyzieu_newsletters.models.requests.get',
|
||||
side_effect=get_subscriptions_connection_error)
|
||||
def test_get_subscriptions_with_connection_error(mock_get, setup):
|
||||
app, conn = setup
|
||||
def test_get_subscriptions_with_connection_error(mock_get, app, conn):
|
||||
resp = app.get(reverse('meyzieu-newsletters-subscriptions', kwargs={'slug': conn.slug}),
|
||||
params={'apikey': API_KEY, 'email': TEST_USER}, status=503)
|
||||
assert resp.json['err']
|
||||
|
@ -168,36 +161,32 @@ def test_get_subscriptions_with_connection_error(mock_get, setup):
|
|||
|
||||
@mock.patch('passerelle.contrib.meyzieu_newsletters.models.requests.get',
|
||||
side_effect=mocked_requests_get)
|
||||
def test_set_subscriptions(mock_get, setup):
|
||||
app, conn = setup
|
||||
def test_set_subscriptions(mock_get, app, conn):
|
||||
url = reverse('meyzieu-newsletters-subscriptions', kwargs={'slug': conn.slug})
|
||||
url += '?' + urlencode({'apikey': API_KEY, 'email': TEST_USER})
|
||||
resp = app.post_json(url, POST_SUBSCRIPTIONS, status=200)
|
||||
resp = app.post_json(url, params=POST_SUBSCRIPTIONS, status=200)
|
||||
assert resp.json['data']
|
||||
|
||||
@mock.patch('passerelle.contrib.meyzieu_newsletters.models.requests.get')
|
||||
def test_update_empty_subscriptions(mock_get, setup):
|
||||
app, conn = setup
|
||||
def test_update_empty_subscriptions(mock_get, app, conn):
|
||||
response = mock.Mock()
|
||||
response.json.return_value = {}
|
||||
mock_get.return_value = response
|
||||
url = reverse('meyzieu-newsletters-subscriptions', kwargs={'slug': conn.slug})
|
||||
url += '?' + urlencode({'apikey': API_KEY, 'email': TEST_USER})
|
||||
resp = app.post_json(url, POST_SUBSCRIPTIONS, status=200)
|
||||
resp = app.post_json(url, params=POST_SUBSCRIPTIONS, status=200)
|
||||
assert resp.json['data']
|
||||
|
||||
@mock.patch('passerelle.contrib.meyzieu_newsletters.models.requests.get',
|
||||
side_effect=mocked_requests_get)
|
||||
def test_delete_subscriptions(mock_get, setup):
|
||||
app, conn = setup
|
||||
def test_delete_subscriptions(mock_get, app, conn):
|
||||
url = reverse('meyzieu-newsletters-subscriptions', kwargs={'slug': conn.slug})
|
||||
url += '?' + urlencode({'apikey': API_KEY, 'email': TEST_USER})
|
||||
resp = app.delete(url, status=200)
|
||||
assert resp.json['data']
|
||||
|
||||
@mock.patch('passerelle.contrib.meyzieu_newsletters.models.requests.get')
|
||||
def test_delete_empty_subscriptions(mock_get, setup):
|
||||
app, conn = setup
|
||||
def test_delete_empty_subscriptions(mock_get, app, conn):
|
||||
response = mock.Mock()
|
||||
response.json.return_value = {}
|
||||
mock_get.return_value = response
|
||||
|
@ -208,11 +197,10 @@ def test_delete_empty_subscriptions(mock_get, setup):
|
|||
|
||||
@mock.patch('passerelle.contrib.meyzieu_newsletters.models.requests.get',
|
||||
side_effect=set_subscriptions_connection_error)
|
||||
def test_update_subscriptions_with_connection_error(mock_get, setup):
|
||||
app, conn = setup
|
||||
def test_update_subscriptions_with_connection_error(mock_get, app, conn):
|
||||
url = reverse('meyzieu-newsletters-subscriptions', kwargs={'slug': conn.slug})
|
||||
url += '?' + urlencode({'apikey': API_KEY, 'email': TEST_USER})
|
||||
resp = app.post_json(url, POST_SUBSCRIPTIONS, status=503)
|
||||
resp = app.post_json(url, params=POST_SUBSCRIPTIONS, status=503)
|
||||
assert resp.json['err']
|
||||
assert resp.json['err_class'] == '%s.%s' % (SubscriptionsSetError.__module__,
|
||||
SubscriptionsSetError.__name__)
|
||||
|
|
|
@ -6,7 +6,6 @@ import mock
|
|||
import pytest
|
||||
|
||||
from django.core.urlresolvers import reverse
|
||||
from django.core.wsgi import get_wsgi_application
|
||||
from django.contrib.contenttypes.models import ContentType
|
||||
|
||||
from passerelle.base.models import ApiUser, AccessRight
|
||||
|
@ -24,7 +23,6 @@ def json_get_data(filename):
|
|||
|
||||
@pytest.fixture
|
||||
def setup(db):
|
||||
from webtest import TestApp
|
||||
api = ApiUser.objects.create(username='all',
|
||||
keytype='', key='')
|
||||
solis = SolisAPA.objects.create(base_url='https://whateever.com/rec/',
|
||||
|
@ -32,8 +30,6 @@ def setup(db):
|
|||
obj_type = ContentType.objects.get_for_model(solis)
|
||||
AccessRight.objects.create(codename='can_access', apiuser=api,
|
||||
resource_type=obj_type, resource_pk=solis.pk)
|
||||
return TestApp(get_wsgi_application())
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def url():
|
||||
|
@ -50,15 +46,14 @@ def homonymie(request):
|
|||
|
||||
|
||||
@mock.patch('passerelle.utils.LoggedRequest.post')
|
||||
def test_instegration_demande_apa_domicile(mocked_post, setup, url):
|
||||
app = setup
|
||||
def test_instegration_demande_apa_domicile(mocked_post, setup, app, url):
|
||||
fake_response = '{"ImportIdResults":{"Items":[{"key":"indexDossier","value":359043},{"key":"indexBeneficiaire","value":458238},{"key":"indexDemande","value":221155}]}}'
|
||||
|
||||
mocked_post.return_value = mock.Mock(status_code=200, content=fake_response,
|
||||
json=lambda: json.loads(fake_response))
|
||||
|
||||
resp = app.post_json(url('integration'),
|
||||
json_get_data('premiere_demande_apa_domicile.json'), status=200)
|
||||
params=json_get_data('premiere_demande_apa_domicile.json'), status=200)
|
||||
|
||||
resp.json['data']['indexDossier'] == 359043
|
||||
resp.json['data']['indexBeneficiaire'] == 458238
|
||||
|
@ -66,15 +61,14 @@ def test_instegration_demande_apa_domicile(mocked_post, setup, url):
|
|||
|
||||
|
||||
@mock.patch('passerelle.utils.LoggedRequest.post')
|
||||
def test_integration_demande_apa_etablissement(mocked_post, setup, url):
|
||||
app = setup
|
||||
def test_integration_demande_apa_etablissement(mocked_post, setup, app, url):
|
||||
fake_response = '{"ImportIdResults":{"Items":[{"key":"indexDossier","value":359043},{"key":"indexBeneficiaire","value":458238},{"key":"indexDemande","value":221155}]}}'
|
||||
|
||||
mocked_post.return_value = mock.Mock(status_code=200, content=fake_response,
|
||||
json=lambda: json.loads(fake_response))
|
||||
|
||||
resp = app.post_json(url('integration'),
|
||||
json_get_data('premiere_demande_apa_etablissement.json'), status=200)
|
||||
params=json_get_data('premiere_demande_apa_etablissement.json'), status=200)
|
||||
|
||||
resp.json['data']['indexDossier'] == 359043
|
||||
resp.json['data']['indexBeneficiaire'] == 458238
|
||||
|
@ -85,7 +79,7 @@ def test_get_conjoint(setup, url, homonymie):
|
|||
data = integration.build_message(homonymie)
|
||||
if homonymie['display_id'] == '25-3':
|
||||
assert data['Conjoint']['indexIndividu'] == '458107'
|
||||
assert data['Conjoint']['bParticipeRevenus'] == False
|
||||
assert data['Conjoint']['bParticipeRevenus'] is False
|
||||
assert data['Conjoint']['dateNaissance'] == '1930-06-11'
|
||||
assert data['Conjoint']['nom'] == 'MCBEAL'
|
||||
assert data['Conjoint']['prenom'] == 'ALLY'
|
||||
|
|
Loading…
Reference in New Issue