# -*- coding: utf-8 -*- # passerelle-grandlyon-cartads-cs # support for Cart@DS CS connector in Grand Lyon environment # Copyright (C) 2019 Entr'ouvert # # This program is free software: you can redistribute it and/or modify it # under the terms of the GNU Affero General Public License as published # by the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import base64 import os from django.core.cache import cache from django.db import models from django.utils.translation import ugettext_lazy as _ from passerelle.apps.cartads_cs.models import AbstractCartaDSCS from passerelle.utils import SOAPTransport class Transport(SOAPTransport): def post(self, address, message, headers): headers['Authorization'] = self.resource.get_api_manager_token() response = super(Transport, self).post(address, message, headers) if response.status_code == 401: # ask for a new token and retry headers['Authorization'] = self.resource.get_api_manager_token(renew=True) response = super(Transport, self).post(address, message, headers) return response class GLCartaDSCS(AbstractCartaDSCS): category = 'Grand Lyon' soap_transport_class = Transport token_url = models.URLField(_('Token URL'), max_length=256) token_authorization = models.CharField(_('Token Authorization'), max_length=128) sendfile_ws_url = models.URLField( _('Sendfile Webservice URL'), max_length=256, blank=True) sendfile_ws_dirname = models.CharField( _('Sendfile Webservice Directory Name'), max_length=256, blank=True) verify_cert = models.BooleanField(default=True, verbose_name=_('Check HTTPS Certificate validity')) class Meta: verbose_name = 'Cart@DS CS (@ Grand Lyon)' def soap_client(self, **kwargs): # use original BaseResource soap_client as cart@ds wsdl files cannot be # served directly and have to be copied to a different server :/ return super(AbstractCartaDSCS, self).soap_client(**kwargs) def get_api_manager_token(self, renew=False): cache_key = 'cartads-%s-token' % self.id if not renew: token = cache.get(cache_key) if token: return token headers = {'Authorization': 'Basic %s' % self.token_authorization} resp = self.requests.post( self.token_url, headers=headers, data={'grant_type': 'client_credentials'}, verify=self.verify_cert).json() token = '%s %s' % (resp.get('token_type'), resp.get('access_token')) timeout = int(resp.get('expires_in')) cache.set(cache_key, token, timeout) self.logger.debug('new token: %s (timeout %ss)', token, timeout) return token def upload_zip(self, zip_filename): b64_zip = base64.b64encode(open(zip_filename).read()) chunk_size = 16777216 # 16MB for n in range(0, len(b64_zip), chunk_size): resp = self.requests.post(self.sendfile_ws_url, data={ 'fileName': self.sendfile_ws_dirname + os.path.basename(zip_filename), 'b64_fileContent': b64_zip[n:n+chunk_size], } ) resp.raise_for_status()