passerelle/passerelle/contrib/lille_urban_card/models.py

317 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#
# passerelle - uniform access to multiple data sources and services
# Copyright (C) 2019 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import json
import re
from urllib.parse import urljoin
from django.db import models
from django.utils.translation import gettext_lazy as _
from passerelle.base.models import BaseResource
from passerelle.utils.api import endpoint
from passerelle.utils.http_authenticators import HttpBearerAuth
from passerelle.utils.jsonresponse import APIError
class TokenError(APIError):
pass
class LilleUrbanCard(BaseResource):
base_url = models.URLField(
max_length=256, blank=False, verbose_name=_('Base URL'), help_text=_('API base URL')
)
username = models.CharField(max_length=128, verbose_name=_('Username'))
password = models.CharField(max_length=128, verbose_name=_('Password'))
category = 'Lille'
class Meta:
verbose_name = _('Lille Urban Card')
@classmethod
def get_verbose_name(cls):
return cls._meta.verbose_name
def check_status(self):
self.get_token()
def get_token(self):
response = self.requests.post(
urljoin(self.base_url, '/clu/ws/auth/connexion'),
json={'login': self.username, 'password': self.password},
).json()
if response.get('erreur'):
self.logger.error('error getting token (%r)', response['erreur'])
raise TokenError(response['erreur'])
return response['token']
@endpoint(description=_('List of socioprofessional categories'), perm='OPEN')
def csp(self, request, *args, **kwargs):
return {
'data': [
{'id': '2', 'text': "Commerçant·e, chef·ffe dentreprise"},
{'id': '3', 'text': "Cadre, profession libérale ou intellectuel·le"},
{'id': '4', 'text': "Profession intermédiaire"},
{'id': '5', 'text': "Employé·e"},
{'id': '6', 'text': "Ouvrier·e"},
{'id': '1', 'text': "Agriculteur·rice"},
{'id': '8', 'text': "Sans profession"},
{'id': '81', 'text': "Demandeur·se demploi"},
{'id': '82', 'text': "Enfant de 0 à 11 ans"},
{'id': '83', 'text': "Enfant de plus de 12 ans"},
{'id': '84', 'text': "Étudiant·e"},
{'id': '7', 'text': "Retraité·e"},
{'id': '99', 'text': "Ne souhaite pas se prononcer"},
]
}
def preprocess_contact_data(self, data):
if data.get('telephone'):
data['telephone'] = ''.join(re.findall(r'\d', data['telephone']))
if data.get('civilite'):
try:
data['civilite'] = int(data['civilite'])
except ValueError:
if data['civilite'] == 'Monsieur':
data['civilite'] = 1
else:
data['civilite'] = 2
if data.get('code_postal'):
data['code_postal'] = int(data['code_postal'])
if data.get('ville'):
data['ville'] = data['ville'].upper()
if data.get('photo'):
data['photo'] = data['photo']['content']
def preprocess_card_request_data(self, data):
for kind_of_optional_field in ('nom_naissance', 'telephone', 'complement_numero_voie'):
if not data.get(kind_of_optional_field):
data[kind_of_optional_field] = ''
for boolean_field in (
'recevoir_journal_senior',
'recevoir_msg_info_senior',
'acceptation_reg_int',
'acceptation_reg_int_resp_legal',
):
if data.get(boolean_field) == 'Oui':
data[boolean_field] = 1
else:
data[boolean_field] = 0
def preprocess_service_data(self, data):
services = {}
for key in sorted(data.keys()):
if key.startswith('service_'):
service = key.split('_')[1]
if key.count('_') == 1: # ex: service_zoo
if data[key] == 'Oui':
services[service] = {'service': service, 'newsletter': 0}
elif service in services: # ex: service_zoo_newsletter
attr = key.split('_')[2]
if data[key] == 'Oui':
services[service][attr] = 1
else:
services[service][attr] = 0
del data[key]
data['services'] = list(services.values())
@endpoint(perm='can_access', description=_('Card Request'), methods=['post'])
def card_request(self, request, *args, **kwargs):
data = json.loads(request.body)
self.preprocess_card_request_data(data)
self.preprocess_contact_data(data)
self.preprocess_service_data(data)
response = self.requests.post(
urljoin(self.base_url, '/clu/ws/demanderCarte'), json=data, auth=HttpBearerAuth(self.get_token())
)
response_json = response.json()
if not isinstance(response_json, dict):
self.logger.error('error requesting card (unknown response format)')
raise APIError('unknown error', data=response_json)
if response_json.get('erreur'):
self.logger.debug('error requesting card (%r)', response_json['erreur'])
response_json['status_code'] = response.status_code
raise APIError(response_json['erreur'], data=response_json)
return {'data': response_json} # {"n_demande_clu":10000005}
@endpoint(
perm='can_access',
description=_('Get status of card request'),
parameters={
'n_demande_clu': {
'description': _('Request number'),
}
},
)
def card_status(self, request, n_demande_clu):
response = self.requests.get(
urljoin(self.base_url, '/clu/ws/consulterDemande/%s' % n_demande_clu),
auth=HttpBearerAuth(self.get_token()),
).json()
return {'data': response}
@endpoint(perm='can_access', description=_('Add new subscriptions'), methods=['post'])
def add_subscriptions(self, request, *args, **kwargs):
data = json.loads(request.body)
self.preprocess_card_request_data(data)
self.preprocess_contact_data(data)
self.preprocess_service_data(data)
for attribute in ('recevoir_msg_info_senior', 'recevoir_journal_senior'):
# remove attributes that are forbidden by this API
del data[attribute]
response = self.requests.post(
urljoin(self.base_url, '/clu/ws/ajouterAbonnements'),
json=data,
auth=HttpBearerAuth(self.get_token()),
)
response_json = response.json()
if not isinstance(response_json, dict):
self.logger.error('error adding subscriptions (unknown response format)')
raise APIError('unknown error', data=response_json)
if response_json.get('erreur'):
self.logger.debug('error adding subscriptions (%r)', response_json['erreur'])
response_json['status_code'] = response.status_code
raise APIError(response_json['erreur'], data=response_json)
return {'data': response_json} # {"n_demande_clu":10000005}
@endpoint(perm='can_access', description=_('Code Change'), methods=['post'])
def code_change(self, request, *args, **kwargs):
data = json.loads(request.body)
response = self.requests.post(
urljoin(self.base_url, '/clu/ws/modifierCodeSecret'),
json=data,
auth=HttpBearerAuth(self.get_token()),
)
response_json = response.json()
if not isinstance(response_json, dict):
self.logger.error('error changing code (unknown response format)')
raise APIError('unknown error', data=response_json)
if response_json.get('erreur'):
self.logger.debug('error changing code (%r)', response_json['erreur'])
response_json['status_code'] = response.status_code
raise APIError(response_json['erreur'], data=response_json)
return {'data': response_json}
@endpoint(perm='can_access', description=_('Code check'), methods=['post'])
def code_check(self, request, *args, **kwargs):
data = json.loads(request.body)
if 'password' in data:
data['code_secret'] = data.pop('password')
response = self.requests.get(
urljoin(self.base_url, '/clu/ws/verifierMdp'), data=data, auth=HttpBearerAuth(self.get_token())
)
response_json = response.json()
if not isinstance(response_json, dict):
self.logger.error('error checking code (unknown response format)')
raise APIError('unknown error', data=response_json)
if response_json.get('message') == 'Le mot de passe est valide':
return {'data': response_json}
for error_attribute in ('erreur', 'message'):
if response_json.get(error_attribute):
self.logger.debug('error checking code (%r)', response_json[error_attribute])
response_json['status_code'] = response.status_code
raise APIError(response_json[error_attribute], data=response_json)
raise APIError('invalid response', data=response_json)
@endpoint(
perm='can_access',
description=_('Get Card Info'),
parameters={
'numero_serie': {
'description': _('Serial Number'),
}
},
)
def card_info(self, request, numero_serie, **kwargs):
if not numero_serie or numero_serie == 'None':
raise APIError('missing numero_serie')
response = self.requests.get(
urljoin(self.base_url, '/clu/ws/consulterCarte?numero_serie=%s' % numero_serie),
auth=HttpBearerAuth(self.get_token()),
)
response_json = response.json()
if not isinstance(response_json, dict):
self.logger.error('error getting card info (unknown response format)')
raise APIError('unknown error', data=response_json)
for error_attribute in ('erreur', 'message'):
if response_json.get(error_attribute):
self.logger.debug('error getting card info (%r)', response_json[error_attribute])
response_json['status_code'] = response.status_code
raise APIError(response_json[error_attribute], data=response_json)
if not response_json.get('numero_serie'):
self.logger.error('error getting card info (response missing numero_serie)')
raise APIError('response missing numero_serie', data=response_json)
return {'data': response_json}
@endpoint(perm='can_access', description=_('Card Revocation'), methods=['post'])
def card_revocation(self, request, *args, **kwargs):
data = json.loads(request.body)
self.preprocess_contact_data(data)
response = self.requests.post(
urljoin(self.base_url, '/clu/ws/revoquerCarte'), json=data, auth=HttpBearerAuth(self.get_token())
)
response_json = response.json()
if not isinstance(response_json, dict):
self.logger.error('error revoking card (unknown response format)')
raise APIError('unknown error', data=response_json)
if response_json.get('erreur'):
self.logger.debug('error revoking card (%r)', response_json['erreur'])
response_json['status_code'] = response.status_code
raise APIError(response_json['erreur'], data=response_json)
return {'data': response_json} # {"message": "La demande..."}
@endpoint(perm='can_access', description=_('Subscription Revocation'), methods=['post'])
def subscription_revocation(self, request, *args, **kwargs):
data = json.loads(request.body)
self.preprocess_contact_data(data)
self.preprocess_service_data(data)
response = self.requests.post(
urljoin(self.base_url, '/clu/ws/revoquerAbonnement'),
json=data,
auth=HttpBearerAuth(self.get_token()),
)
response_json = response.json()
if not isinstance(response_json, dict):
self.logger.error('error revoking subscription (unknown response format)')
raise APIError('unknown error', data=response_json)
if response_json.get('erreur'):
self.logger.debug('error revoking subscripton (%r)', response_json['erreur'])
response_json['status_code'] = response.status_code
raise APIError(response_json['erreur'], data=response_json)
return {'data': response_json} # {"n_demande_clu":10000005}
@endpoint(perm='can_access', description=_('Subscription Renewal'), methods=['post'])
def subscription_renewal(self, request, *args, **kwargs):
data = json.loads(request.body)
self.preprocess_contact_data(data)
response = self.requests.post(
urljoin(self.base_url, '/clu/ws/renouvelerAbonnements'),
json=data,
auth=HttpBearerAuth(self.get_token()),
)
response_json = response.json()
if not isinstance(response_json, dict):
self.logger.error('error renewing subscription (unknown response format)')
raise APIError('unknown error', data=response_json)
if response_json.get('erreur'):
self.logger.debug('error renewing subscripton (%r)', response_json['erreur'])
response_json['status_code'] = response.status_code
raise APIError(response_json['erreur'], data=response_json)
return {'data': response_json} # {"n_demande_clu":10000005}