passerelle/passerelle/apps/gdc/models.py

93 lines
3.0 KiB
Python

import xml.etree.ElementTree as ET
try:
import phpserialize
except ImportError:
phpserialize = None
from django.core.urlresolvers import reverse
from django.db import models
from django.utils.encoding import force_text
from django.utils.translation import ugettext_lazy as _
from passerelle.base.models import BaseResource
from passerelle.utils.api import endpoint
def deep_bytes2str(obj):
if obj is None or isinstance(obj, (int, str)):
return obj
if isinstance(obj, bytes):
try:
return obj.decode('utf-8')
except UnicodeDecodeError:
return obj
if isinstance(obj, list):
return [deep_bytes2str(x) for x in obj]
if isinstance(obj, dict):
new_d = {}
for k, v in obj.items():
new_d[force_text(k)] = deep_bytes2str(v)
return new_d
return obj
def phpserialize_loads(s):
return deep_bytes2str(phpserialize.loads(s.encode('utf-8')))
class Gdc(BaseResource):
service_url = models.CharField(max_length=128, blank=False,
verbose_name=_('Service URL'),
help_text=_('GDC Web Service URL'))
category = _('Business Process Connectors')
class Meta:
verbose_name = _('GDC Web Service')
def call_soap(self, action, *args, **kwargs):
params = []
for i, arg in enumerate(args):
params.append('<v%(i)s xsi:type="xsd:string">%(value)s</v%(i)s>' % {'i': i + 1, 'value': arg})
for key, value in kwargs.items():
type_ = 'int' if isinstance(value, int) else 'string'
params.append('<%(key)s xsi:type="xsd:%(type)s">%(value)s</%(key)s>' % {
'key': key, 'type': type_, 'value': value})
data = """<?xml version="1.0" encoding="UTF-8"?>
<SOAP-ENV:Envelope
SOAP-ENV:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/"
xmlns:SOAP-ENC="http://schemas.xmlsoap.org/soap/encoding/"
xmlns:SOAP-ENV="http://schemas.xmlsoap.org/soap/envelope/"
>
<SOAP-ENV:Body>
<%(action)s SOAP-ENC:root="1">
%(params)s
</%(action)s>
</SOAP-ENV:Body>
</SOAP-ENV:Envelope>""" % {'action': action, 'params': ''.join(params)}
resp = self.requests.post(self.service_url, data=data)
return ET.ElementTree(ET.fromstring(resp.content))
@endpoint()
def communes(self, request, *args, **kwargs):
resp = self.call_soap('getListeCommune')
soap_result = phpserialize_loads(resp.findall('.//listeCommune')[0].text)
result = []
for k, v in soap_result.items():
result.append({'id': k, 'text': force_text(v, 'utf-8')})
result.sort(key=lambda x: x['id'])
return result
@endpoint()
def objets(self, request, *args, **kwargs):
resp = self.call_soap('getListeObjet')
soap_result = phpserialize_loads(resp.findall('.//listeObjet')[0].text)
result = []
for k, v in soap_result.items():
result.append({'id': k, 'text': force_text(v, 'utf-8')})
result.sort(key=lambda x: x['id'])
return result