solis: add http auth, ssl and proxy attributes (#20789)
This commit is contained in:
parent
cb5000d54d
commit
b5452c2d53
|
@ -0,0 +1,55 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
from __future__ import unicode_literals
|
||||
|
||||
from django.db import migrations, models
|
||||
import passerelle.apps.solis.models
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
('solis', '0002_solisapalink_text'),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.RenameField(
|
||||
model_name='solis',
|
||||
old_name='password',
|
||||
new_name='basic_auth_password',
|
||||
),
|
||||
migrations.RenameField(
|
||||
model_name='solis',
|
||||
old_name='username',
|
||||
new_name='basic_auth_username',
|
||||
),
|
||||
migrations.AlterField(
|
||||
model_name='solis',
|
||||
name='basic_auth_password',
|
||||
field=models.CharField(max_length=128, verbose_name='HTTP Basic Auth password', blank=True),
|
||||
),
|
||||
migrations.AlterField(
|
||||
model_name='solis',
|
||||
name='basic_auth_username',
|
||||
field=models.CharField(max_length=128, verbose_name='HTTP Basic Auth username', blank=True),
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='solis',
|
||||
name='client_certificate',
|
||||
field=models.FileField(help_text='Client certificate and private key (PEM format)', upload_to=passerelle.apps.solis.models.keystore_upload_to, null=True, verbose_name='Client certificate', blank=True),
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='solis',
|
||||
name='http_proxy',
|
||||
field=models.CharField(max_length=128, verbose_name='Proxy URL', blank=True),
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='solis',
|
||||
name='trusted_certificate_authorities',
|
||||
field=models.FileField(help_text='Trusted CAs (PEM format)', upload_to=passerelle.apps.solis.models.trusted_cas_upload_to, null=True, verbose_name='Trusted CAs', blank=True),
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='solis',
|
||||
name='verify_cert',
|
||||
field=models.BooleanField(default=True, verbose_name='Check HTTPS Certificate validity'),
|
||||
),
|
||||
]
|
|
@ -25,11 +25,37 @@ from passerelle.utils.api import endpoint
|
|||
from passerelle.utils.jsonresponse import APIError
|
||||
|
||||
|
||||
def keystore_upload_to(instance, filename):
|
||||
return '%s/%s/keystore/%s' % (instance.get_connector_slug(), instance.id, filename)
|
||||
|
||||
|
||||
def trusted_cas_upload_to(instance, filename):
|
||||
return '%s/%s/trusted_ca/%s' % (instance.get_connector_slug(), instance.id, filename)
|
||||
|
||||
|
||||
class Solis(BaseResource):
|
||||
service_url = models.URLField(max_length=256, blank=False, verbose_name=_('Service URL'),
|
||||
service_url = models.URLField(max_length=256, blank=False,
|
||||
verbose_name=_('Service URL'),
|
||||
help_text=_('Solis API base URL'))
|
||||
username = models.CharField(max_length=128, blank=False, verbose_name=_('Username'))
|
||||
password = models.CharField(max_length=128, blank=False, verbose_name=_('Password'))
|
||||
|
||||
basic_auth_username = models.CharField(max_length=128, blank=True,
|
||||
verbose_name=_('HTTP Basic Auth username'))
|
||||
basic_auth_password = models.CharField(max_length=128, blank=True,
|
||||
verbose_name=_('HTTP Basic Auth password'))
|
||||
|
||||
client_certificate = models.FileField(upload_to=keystore_upload_to,
|
||||
null=True, blank=True,
|
||||
verbose_name=_('Client certificate'),
|
||||
help_text=_('Client certificate and private key (PEM format)'))
|
||||
verify_cert = models.BooleanField(default=True,
|
||||
verbose_name=_('Check HTTPS Certificate validity'))
|
||||
trusted_certificate_authorities = models.FileField(upload_to=trusted_cas_upload_to,
|
||||
null=True, blank=True,
|
||||
verbose_name=_('Trusted CAs'),
|
||||
help_text=_('Trusted CAs (PEM format)'))
|
||||
|
||||
http_proxy = models.CharField(max_length=128, blank=True,
|
||||
verbose_name=_('Proxy URL'))
|
||||
|
||||
text_template_name = 'solis/apa_user_text.txt'
|
||||
|
||||
|
@ -40,9 +66,7 @@ class Solis(BaseResource):
|
|||
|
||||
@endpoint(name='ping', description=_('Check Solis API availability'))
|
||||
def ping(self, request):
|
||||
message = self.requests.get(
|
||||
self.service_url + 'main/isAlive',
|
||||
auth=(self.username, self.password)).content
|
||||
message = self.requests.get(self.service_url + 'main/isAlive').content
|
||||
if message.startswith('Solis API'):
|
||||
return {'data': 'pong', 'message': message}
|
||||
else:
|
||||
|
@ -50,11 +74,10 @@ class Solis(BaseResource):
|
|||
|
||||
def request(self, endpoint, data=None):
|
||||
url = self.service_url + endpoint
|
||||
auth = (self.username, self.password)
|
||||
headers = {'Accept': 'application/json'}
|
||||
if data is None:
|
||||
return self.requests.get(url, auth=auth, headers=headers)
|
||||
return self.requests.post(url, json=data, auth=auth, headers=headers)
|
||||
return self.requests.get(url, headers=headers)
|
||||
return self.requests.post(url, json=data, headers=headers)
|
||||
|
||||
def apa_token(self, user_id, code):
|
||||
response = self.request('asg/apa/generationJeton', data={
|
||||
|
|
|
@ -1,8 +1,10 @@
|
|||
import pytest
|
||||
import mock
|
||||
import utils
|
||||
from StringIO import StringIO
|
||||
|
||||
from django.contrib.contenttypes.models import ContentType
|
||||
from django.core.files import File
|
||||
|
||||
from passerelle.apps.solis.models import Solis, SolisAPALink
|
||||
from passerelle.base.models import ApiUser, AccessRight
|
||||
|
@ -22,8 +24,8 @@ APAINFOS = {
|
|||
def solis(db):
|
||||
return Solis.objects.create(slug='test',
|
||||
service_url='https://solis.example.net/solisapi/',
|
||||
username='usertest',
|
||||
password='userpass')
|
||||
basic_auth_username='usertest',
|
||||
basic_auth_password='userpass')
|
||||
|
||||
|
||||
def test_solis_restricted_access(app, solis):
|
||||
|
@ -51,6 +53,70 @@ def test_solis_restricted_access(app, solis):
|
|||
assert requests_post.call_count == 0
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def ping_response():
|
||||
response_request = mock.Mock(headers={'Accept': '*/*'}, body=None)
|
||||
return utils.FakedResponse(headers={'Content-Type': 'text/plain'}, status_code=200,
|
||||
request=response_request)
|
||||
|
||||
def test_solis_ping(app, solis, ping_response):
|
||||
# full opened access
|
||||
api = ApiUser.objects.create(username='all', keytype='', key='')
|
||||
obj_type = ContentType.objects.get_for_model(solis)
|
||||
AccessRight.objects.create(codename='can_access', apiuser=api, resource_type=obj_type,
|
||||
resource_pk=solis.pk)
|
||||
|
||||
endpoint = utils.generic_endpoint_url('solis', 'ping', slug=solis.slug)
|
||||
with mock.patch('passerelle.utils.RequestSession.request') as requests_get:
|
||||
ping_response.content = 'error'
|
||||
requests_get.return_value = ping_response
|
||||
resp = app.get(endpoint, status=200)
|
||||
assert resp.json['err'] == 1
|
||||
|
||||
ping_response.content = 'Solis API OK'
|
||||
requests_get.return_value = ping_response
|
||||
resp = app.get(endpoint, status=200)
|
||||
assert resp.json['err'] == 0
|
||||
assert resp.json['data'] == 'pong'
|
||||
assert resp.json['message'] == 'Solis API OK'
|
||||
|
||||
assert requests_get.call_args[1]['auth'] == ('usertest', 'userpass')
|
||||
assert requests_get.call_args[1]['verify'] is True
|
||||
assert 'cert' not in requests_get.call_args[1]
|
||||
assert 'proxies' not in requests_get.call_args[1]
|
||||
|
||||
# try certificates parameters
|
||||
solis.verify_cert = False
|
||||
solis.save()
|
||||
resp = app.get(endpoint, status=200)
|
||||
assert requests_get.call_args[1]['verify'] is False
|
||||
|
||||
solis.trusted_certificate_authorities = File(StringIO('CA'), 'ca.pem')
|
||||
solis.save()
|
||||
resp = app.get(endpoint, status=200)
|
||||
assert requests_get.call_args[1]['verify'] == solis.trusted_certificate_authorities.path
|
||||
assert 'cert' not in requests_get.call_args[1]
|
||||
|
||||
solis.client_certificate = File(StringIO('KS'), 'crt.pem')
|
||||
solis.save()
|
||||
resp = app.get(endpoint, status=200)
|
||||
assert requests_get.call_args[1]['cert'] == solis.client_certificate.path
|
||||
assert requests_get.call_args[1]['verify'] == solis.trusted_certificate_authorities.path
|
||||
|
||||
solis.trusted_certificate_authorities = None
|
||||
solis.save()
|
||||
resp = app.get(endpoint, status=200)
|
||||
assert requests_get.call_args[1]['verify'] is False
|
||||
assert requests_get.call_args[1]['cert'] == solis.client_certificate.path
|
||||
|
||||
# try proxy parameter
|
||||
solis.http_proxy = 'http://proxy:3128/'
|
||||
solis.save()
|
||||
resp = app.get(endpoint, status=200)
|
||||
assert requests_get.call_args[1]['proxies'] == {'http': 'http://proxy:3128/',
|
||||
'https': 'http://proxy:3128/'}
|
||||
|
||||
|
||||
def test_solis_link_infos_unlink(app, solis):
|
||||
# full opened access
|
||||
api = ApiUser.objects.create(username='all', keytype='', key='')
|
||||
|
|
Loading…
Reference in New Issue