passerelle/passerelle/apps/bdp/models.py

61 lines
2.1 KiB
Python

import json
import requests
from requests.auth import HTTPBasicAuth
from django.urls import reverse
from django.db import models
from django.utils.translation import ugettext_lazy as _
from passerelle.base.models import BaseResource
class Bdp(BaseResource):
service_url = models.CharField(
max_length=128, blank=False, verbose_name=_('Service URL'), help_text=_('BDP Web Service URL')
)
username = models.CharField(max_length=128, blank=True, verbose_name=_('Username'))
password = models.CharField(max_length=128, blank=True, verbose_name=_('Password'))
verify_cert = models.BooleanField(default=True, verbose_name=_('Check HTTPS Certificate validity'))
keystore = models.FileField(
upload_to='bdp',
blank=True,
null=True,
verbose_name=_('Keystore'),
help_text=_('Certificate and private key in PEM format'),
)
category = _('Business Process Connectors')
class Meta:
verbose_name = _('BDP Web Service')
def requests_options(self):
options = {}
if self.keystore:
options['cert'] = (self.keystore.path, self.keystore.path)
if not self.verify_cert:
options['verify'] = False
if self.username:
options['auth'] = HTTPBasicAuth(self.username, self.password)
return options
def get_api(self, endpoint, **params):
options = self.requests_options()
return requests.get(self.service_url + '/api/' + endpoint, params=params, **options).json()
def post_api(self, endpoint, obj):
data = json.dumps(obj)
headers = {'Content-Type': 'application/json'}
options = self.requests_options()
request = requests.post(self.service_url + '/api/' + endpoint, data=data, headers=headers, **options)
result = {
'status_code': request.status_code,
'x_request_id': request.headers.get('x-request-id'),
'x_status_reason': request.headers.get('x-status-reason'),
}
try:
result.update(request.json())
except ValueError:
result['content'] = request.text
return result