passerelle/passerelle/contrib/dpark/models.py

641 lines
24 KiB
Python

# Copyright (C) 2018 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import base64
import datetime
import json
import pytz
from django.conf import settings
from django.core.cache import cache
from django.db import models
from django.utils import timezone
from django.utils.translation import gettext_lazy as _
from zeep.helpers import serialize_object
from passerelle.base.models import BaseResource
from passerelle.utils.api import endpoint
from passerelle.utils.conversion import any2bool, to_pdf
from passerelle.utils.jsonresponse import APIError
from passerelle.views import WrongParameter
class Error:
def __init__(self, code, msg=None):
self.code = code
self.msg = msg
def __bool__(self):
return False
def __nonzero__(self):
return False
DOCUMENT_CODES = {
'justif_domicile': '2',
'assurance': '4',
'cartegrise': '6',
'carte_medecin': '8',
'immat_artisanat_d1': '9',
'agrement': '10',
'attest_employeur': '11',
'taxe_habitat': '13',
'abt_transport': '14',
'contrat_locvehicule': '16',
'impot_revenu': '17',
'carteverte': '18',
'decla_impot': '19',
'assurance_pro': '20',
'cotis_urssaf': '21',
'ordonnance': '22',
'attest_employeurpro': '25',
'attest_honneur': '26',
'attest_cpam': '27',
'cartegrise_qe': '28',
'contrat_livraison': '29',
'immat_artisanat': '30',
'immat_artisanat_ape': '31',
'contrat_amarrage': 35,
'facture_amarrage': 36,
}
ADDRESS_SOAP_MAPPING = [
('Adresse_EtageEscalierAppartement', 'address_complement1', False),
('Adresse_ImmeubleBatimentResidence', 'address_complement2', False),
('Adresse_NumeroVoie', 'address_streetno', False),
('Adresse_Extension', 'address_streetext', False),
('Adresse_NomVoie', 'address_streetname', False),
('Adresse_CodeSTI', 'address_sticode', True),
('Adresse_BoitePostaleLieudit', 'address_place', False),
('Adresse_CodePostal', 'address_zipcode', True),
('Adresse_Localite', 'address_locality', True),
('Adresse_Quartier', 'address_district', False),
]
ADDRESS_XML_TO_JSON_MAPPING = {key: value for key, value, required in ADDRESS_SOAP_MAPPING}
def get_document_codes():
document_codes = DOCUMENT_CODES.copy()
if getattr(settings, 'DPARK_DOCUMENT_CODES', {}):
document_codes.update(settings.DPARK_DOCUMENT_CODES)
return document_codes
def is_erroneous(data, expected_keys, silent=False):
errors = []
for key in expected_keys:
if key not in data or not data[key]:
if not silent:
raise WrongParameter([key], [])
errors.append(_('<%s> is either absent or has an invalid value') % key)
if errors:
return errors
def date_to_isoformat(idate):
if not idate:
return None
try:
return datetime.datetime.strptime(idate, '%Y%m%d').date().isoformat()
except (ValueError,):
return idate
def date_or_datetime_to_local_date(value):
try:
dt = datetime.datetime.strptime(value, '%Y-%m-%dT%H:%M:%S')
except ValueError:
pass
else:
dt = pytz.utc.localize(dt)
dt = dt.astimezone(pytz.timezone('Europe/Paris'))
return dt.date()
try:
dt = datetime.datetime.strptime(value, '%Y%m%d')
except ValueError:
pass
else:
return dt.date()
return None
def normalize_reply(reply):
excluded = ('CodeRetour', 'MessageRetour')
serialized_reply = serialize_object(reply)
data = {key.lower(): value for key, value in serialized_reply.items() if key not in excluded}
if data.get('demande_numerodossier'):
data['id'] = str(data['demande_numerodossier'])
text = (
'%(demande_numerodossier)s - %(demandeur_nomusuel)s %(demandeur_prenom)s - %(demande_immatvehicule1)s'
% data
)
if data.get('demande_immatvehicule2'):
text += '/%s' % data['demande_immatvehicule2']
data['text'] = text
if 'demande_datedebutabo' in data:
data['demande_datedebutabo'] = date_to_isoformat(data['demande_datedebutabo'])
if 'demande_datefinabo' in data:
data['demande_datefinabo'] = date_to_isoformat(data['demande_datefinabo'])
return data
def make_subscriber_params(data):
lastname = data['lastname']
firstnames = data['firstnames']
filenumber = data['filenumber']
badgenumber = data.get('badgenumber')
if not badgenumber:
badgenumber = 0
cardnumber = data.get('cardnumber')
if not cardnumber:
cardnumber = 0
return (lastname, firstnames, filenumber, badgenumber, cardnumber)
def address_to_json(reply):
d = {}
for key, json_key in ADDRESS_XML_TO_JSON_MAPPING.items():
d[json_key] = getattr(reply, key, None)
return d
def get_address_params(data):
return {
# empty values must be ''
'Adresse_EtageEscalierAppartement': data.get('address_complement1', ''),
'Adresse_ImmeubleBatimentResidence': data.get('address_complement2', ''),
'Adresse_NumeroVoie': data.get('address_streetno', ''),
'Adresse_Extension': data.get('address_streetext', ''),
'Adresse_NomVoie': data.get('address_streetname', ''),
'Adresse_CodeSTI': data['address_sticode'],
'Adresse_BoitePostaleLieudit': data.get('address_place', ''),
'Adresse_CodePostal': data['address_zipcode'],
'Adresse_Localite': data['address_locality'],
'Adresse_Quartier': data.get('address_district', ''),
}
def get_service(instance):
client = instance.soap_client(api_error=True)
service_name = list(client.wsdl.services)[0]
port_name = list(client.wsdl.services[service_name].ports)[0]
binding_name = str(client.wsdl.services[service_name].ports[port_name].binding.name)
return client.create_service(binding_name, instance.operation_url)
class DPark(BaseResource):
log_requests_errors = False
category = _('Business Process Connectors')
wsdl_url = models.URLField(
max_length=512,
blank=False,
verbose_name=_('SOAP wsdl endpoint'),
help_text=_('URL of the SOAP wsdl endpoint'),
)
operation_url = models.URLField(
max_length=512,
blank=False,
verbose_name=_('SOAP operation endpoint'),
help_text=_('URL of the SOAP operation endpoint'),
)
class Meta:
verbose_name = _('D-Park connector')
def __str__(self):
return '%s - %s' % (self.slug, self.wsdl_url)
def call(self, operation, *args, **kwargs):
service = get_service(self)
bypass_erroneous_reply = kwargs.pop('bypass_erroneous_reply', False)
reply = getattr(service, operation)(*args, **kwargs)
reply_code = getattr(reply, 'CodeRetour', None) or getattr(reply, 'Code_Retour', None)
reply_message = getattr(reply, 'MessageRetour', None) or getattr(reply, 'Lib_Retour', None)
if reply_code != '01' and not bypass_erroneous_reply:
raise APIError(reply_message)
return reply
def check_file_exists(self, data):
is_erroneous(data, ('firstnames', 'lastname', 'filenumber'))
params = make_subscriber_params(data)
reply = self.call('PLS_EXIST', *params, bypass_erroneous_reply=True)
code_retour = reply.CodeRetour
msg_retour = reply.MessageRetour
if code_retour == '01':
return True
else:
errors = {
'02': 'unknown-file',
'03': 'not-requester-file',
'04': 'support-number-unknown',
'05': 'badge-number-unknown',
}
if code_retour in errors:
return Error(errors[code_retour], msg=msg_retour)
return Error('other-error', msg='code="%s" msg="%s"' % (code_retour, msg_retour))
@endpoint(perm='can_access', description=_('Check service availibity'))
def ping(self, request, *args, **kwargs):
"""Checks service availibility by trying to find
a plate number
"""
self.call('FPS_Rech_Immat', 'AA-000-BB', timezone.now().isoformat())
return {'data': True}
@endpoint(perm='can_access', description=_('Search user subscription'))
def search(self, request, *args, **kwargs):
data = request.GET
result = self.check_file_exists(data)
if result:
return {}
else:
return {'err': 1, 'code': result.code, 'msg': result.msg}
@endpoint(perm='can_access', pattern=r'^(?P<nameid>\w+)/$', description=_('Get subscriber information'))
def infos(self, request, nameid, *args, **kwargs):
pairings = Pairing.objects.filter(resource=self, nameid=nameid)
if request.GET.get('filenumber'):
pairings = Pairing.objects.filter(filenumber=request.GET.get('filenumber'))
infos = [pairing.get_info() for pairing in pairings]
# get_info() can return None
infos = [info for info in infos if info]
return {'data': infos}
@endpoint(perm='can_access', methods=['post'], description=_('Register a subscription application'))
def register(self, request, *args, **kwargs):
data = json.loads(request.body)
double_plaque = any2bool(data.get('double_plaque'))
required_fields = [
'application_id',
'applicant_title',
'applicant_lastname',
'applicant_firstnames',
'applicant_email',
'address_sticode',
'address_zipcode',
'address_locality',
'address_district',
]
if double_plaque:
required_fields.extend(['id_contexte', 'id_produit'])
is_erroneous(data, required_fields)
application_id = data['application_id']
applicant = {
'Demandeur_Civilite': data['applicant_title'],
'Demandeur_NomUsuel': data['applicant_lastname'],
'Demandeur_Prenom': data['applicant_firstnames'],
'Demandeur_TelephoneFixe': data.get('applicant_phone', ''),
'Demandeur_TelephonePortable': data.get('applicant_mobilephone', ''),
'Demandeur_Email': data['applicant_email'],
}
address = get_address_params(data)
filenumber = data.get('filenumber', '')
renewal = 1 if filenumber else 2
application = {
'Demande_NumeroDossier': filenumber,
'Demande_TypeDemande': data['application_type'],
'Demande_Renouvellement': renewal,
'Demande_CasDerogatoire_Vehicule1': data['application_car1_exemption'],
'Demande_ImmatVehicule1': data['application_car1_plate'],
'Demande_ModeleVehicule1': data['application_car1_model'],
'Demande_MarqueVehicule1': data['application_car1_brand'],
'Demande_ImmatVehicule2': data.get('application_car2_plate', ''),
'Demande_ModeleVehicule2': data.get('application_car2_model', ''),
'Demande_MarqueVehicule2': data.get('application_car2_brand', ''),
'Demande_CasDerogatoire_Vehicule2': data.get('application_car2_exemption', ''),
'Demande_AbonnementTiers': data['application_thirdparty_subscription'],
'Demande_TypePaiement': data['application_payment_type'],
'Demande_NomBanque': data.get('application_bank_name', ''),
'Demande_Address1Banque': data.get('application_bank_address1', ''),
'Demande_Address2Banque': data.get('application_bank_addresss2', ''),
'Demande_CodePostalBanque': data.get('application_bank_zipcode', ''),
'Demande_VilleBanque': data.get('application_bank_city', ''),
'Demande_IBAN': data.get('application_bank_iban', ''),
'Demande_BIC': data.get('application_bank_bic', ''),
}
if double_plaque:
reply = self.call(
'PLS_ENREG2',
application_id,
applicant,
address,
application,
data['id_produit'],
data['id_contexte'],
)
else:
reply = self.call('PLS_ENREG', application_id, applicant, address, application)
if filenumber:
for pairing in Pairing.objects.filter(filenumber=filenumber):
pairing.clear_cache()
return {'data': normalize_reply(reply)}
@endpoint(perm='can_access', methods=['post'], description=_('Link user to a subscription'))
def link(self, request, *args, **kwargs):
data = json.loads(request.body)
if 'nameid' not in data:
raise WrongParameter(['nameid'], [])
result = self.check_file_exists(data)
if not result:
return {'err': 1, 'code': result.code, 'msg': result.msg}
for key in data:
if data[key]:
data[key] = str(data[key]).strip()
Pairing.objects.get_or_create(
resource=self,
nameid=data['nameid'],
lastname=data['lastname'],
firstnames=data['firstnames'],
filenumber=data['filenumber'],
badgenumber=data.get('badgenumber', 0),
cardnumber=data.get('cardnumber', 0),
)
return {}
@endpoint(perm='can_access', methods=['post'], description=_('Unlink user to subscription'))
def unlink(self, request, *args, **kwargs):
data = json.loads(request.body)
is_erroneous(data, ('nameid',))
pairings = Pairing.objects.filter(**data)
if pairings.exists() is False:
raise APIError(_('No pairing exists'))
for pairing in pairings:
pairing.clear_cache()
pairings.delete()
return {'data': True}
@endpoint(name='address-eligibility', perm='can_access', description=_('Check if address is eligible'))
def address_eligibility(self, request, *args, **kwargs):
data = request.GET
is_erroneous(data, ('address_sticode', 'address_zipcode', 'address_locality'))
params = get_address_params(data)
reply = self.call('PLS_ELIGADR', params)
return {
'data': reply.CodeRetour == '01',
'address': address_to_json(reply),
'desc': reply.MessageRetour,
}
@endpoint(
name='check-renewal-time', perm='can_access', description=_('Check if renewal time has not expired')
)
def check_renewal_time(self, request, *args, **kwargs):
data = request.GET
is_erroneous(data, ('firstnames', 'lastname', 'filenumber'))
params = make_subscriber_params(data)
reply = self.call('PLS_CTRLDELAIS', *params, bypass_erroneous_reply=True)
return {'data': reply.CodeRetour == '01', 'desc': reply.MessageRetour}
@endpoint(
name='check-renewal-duplicate',
perm='can_access',
description=_('Check if renewal request is not a duplicate'),
)
def check_renewal_duplicate(self, request, *args, **kwargs):
data = request.GET
is_erroneous(data, ('firstnames', 'lastname', 'filenumber'))
params = make_subscriber_params(data)
reply = self.call('PLS_CTRLDOUBLRENOUV', *params, bypass_erroneous_reply=True)
return {'data': reply.CodeRetour == '01', 'desc': reply.MessageRetour}
@endpoint(
name='check-creation-duplicate',
perm='can_access',
description=_('Check if creation request is not a duplicate'),
)
def check_creation_duplicate(self, request, *args, **kwargs):
data = request.GET
is_erroneous(
data,
(
'applicant_firstnames',
'applicant_lastname',
'address_sticode',
'address_zipcode',
'address_locality',
),
)
lastname, firstnames = data['applicant_lastname'], data['applicant_firstnames']
address = get_address_params(data)
reply = self.call('PLS_CTRLDOUBLCREA', lastname, firstnames, address, bypass_erroneous_reply=True)
return {'data': reply.CodeRetour == '01', 'desc': reply.MessageRetour}
@endpoint(
name='check-creation-not-renewal',
perm='can_access',
description=_('Check if creation request is not a renewal request'),
)
def check_creation_is_not_renewal(self, request, *args, **kwargs):
data = request.GET
is_erroneous(
data,
(
'applicant_firstnames',
'applicant_lastname',
'address_sticode',
'address_zipcode',
'address_locality',
),
)
lastname, firstnames = data['applicant_lastname'], data['applicant_firstnames']
address = get_address_params(data)
reply = self.call(
'PLS_CTRLUSAGERCONNUCREA', lastname, firstnames, address, bypass_erroneous_reply=True
)
return {'data': reply.CodeRetour == '01', 'desc': reply.MessageRetour}
@endpoint(
name='payment-info',
perm='can_access',
pattern=r'^(?P<nameid>\w+)/$',
description=_('Get payment information'),
)
def payment_info(self, request, nameid, *args, **kwargs):
pairings = Pairing.objects.filter(resource=self, nameid=nameid)
if not pairings:
raise APIError(_('No pairing exists'))
if request.GET.get('filenumber'):
pairings = pairings.filter(filenumber=request.GET.get('filenumber'))
payments = [pairing.get_payment_info() for pairing in pairings]
# get_payment_info() can return None
payments = [payment for payment in payments if payment]
return {'data': payments}
@endpoint(name='notify-payment', perm='can_access', methods=['post'], description=_('Notify a payment'))
def payment_notification(self, request, *args, **kwargs):
data = json.loads(request.body)
is_erroneous(
data,
(
'nameid',
'filenumber',
'transaction_id',
'application_id',
'transaction_datetime',
'total_amount',
'application_external_id',
),
)
# We accept a simple date or a datetime using UTC, we convert it to Europe/Paris timezone on exit
transaction_date = date_or_datetime_to_local_date(data['transaction_datetime'])
if transaction_date is None:
raise APIError(_('Invalid value for transaction datetime'))
pairings = Pairing.objects.filter(resource=self, nameid=data['nameid'], filenumber=data['filenumber'])
total_amount = int(data['total_amount']) * 100 # in cents
if not pairings:
raise APIError(_('No pairing exists'))
double_plaque = any2bool(data.get('double_plaque'))
self.call(
'PLS_NOTIFCB2' if double_plaque else 'PLS_NOTIFCB',
data['application_external_id'],
data['filenumber'],
data['application_id'],
data.get('applicaiton_payment_type', 10),
total_amount,
transaction_date.strftime('%Y%m%d'),
data['transaction_id'],
)
for pairing in pairings:
pairing.clear_cache()
return {'data': True}
@endpoint(
name='send-files', perm='can_access', methods=['post'], description=_('Send supporting documents')
)
def send_files(self, request, *args, **kwargs):
try:
data = json.loads(request.body)
except ValueError as exc:
raise APIError(str(exc), http_status=400)
is_erroneous(data, ('application_external_id', 'application_id'))
application_external_id = data.pop('application_external_id')
application_id = data.pop('application_id')
document_codes = get_document_codes()
attached_files = []
errors = []
for key, value in sorted(data.items()):
if not value:
continue
if not isinstance(value, dict):
errors.append(_('<%s> value is not a dict') % key)
continue
doc_slug = key.split(',')[0]
try:
doc_id = document_codes[doc_slug]
except (KeyError,):
errors.append(_('Invalid document type: <%s>') % key)
continue
value_errors = is_erroneous(value, ('filename', 'content'), silent=True)
if value_errors:
value_errors = ["<%s> " % key + error for error in value_errors]
errors.extend(value_errors)
continue
# Use raw content as zeep encode the content in base64
file_raw_content = base64.b64decode(value['content'])
try:
pdf_content = to_pdf(file_raw_content)
except ValueError as e:
errors.append('<%s> cannot be converted to PDF: %s' % (key, e))
continue
filename = value['filename']
if not filename.lower().endswith('.pdf'):
filename += '.pdf'
attached_files.append({'TypeDocument': doc_id, 'NomFichier': filename, 'Fichier': pdf_content})
# deduce the number of files
if errors:
raise APIError(errors)
number = len(attached_files)
self.call('PLS_ENVOIPJ', application_external_id, application_id, number, attached_files)
return {'data': True}
class Pairing(models.Model):
INFO_CACHE_DURATION = 5 * 60
PAYMENT_INFO_CACHE_DURATION = 20 * 60
resource = models.ForeignKey(DPark, on_delete=models.CASCADE)
nameid = models.CharField(blank=False, max_length=256)
lastname = models.CharField(blank=False, max_length=128)
firstnames = models.CharField(blank=False, max_length=128)
filenumber = models.CharField(blank=False, max_length=128)
badgenumber = models.CharField(blank=False, max_length=128)
cardnumber = models.CharField(blank=False, max_length=128)
@property
def info_cache_key(self):
return 'dpark-pairing-info-%s-%s' % (self.resource.id, self.id)
@property
def payment_info_cache_key(self):
return 'dpark-pairing-payment-info-%s-%s' % (self.resource.id, self.id)
def clear_cache(self):
cache.delete(self.info_cache_key)
cache.delete(self.payment_info_cache_key)
def get_info(self):
info = cache.get(self.info_cache_key)
if info:
return info
data = {
'firstnames': self.firstnames,
'lastname': self.lastname,
'filenumber': self.filenumber,
'badgenumber': self.badgenumber,
'cardnumber': self.cardnumber,
}
params = make_subscriber_params(data)
try:
reply = self.resource.call('PLS_RECUPD', *params)
except APIError:
return None
info = normalize_reply(reply)
cache.set(self.info_cache_key, info, self.INFO_CACHE_DURATION)
return info
PAYMENT_TYPES = {5: 'Prélèvement mensualisé', 10: 'Carte Bancaire via Internet'}
def get_payment_info(self):
payment = cache.get(self.payment_info_cache_key)
if payment:
return payment
try:
reply = self.resource.call('PLS_RECUPAIEM', self.filenumber, self.lastname, self.firstnames)
except APIError:
return None
payment = normalize_reply(reply)
payment['montant'] = payment['montant'] / 100 # received amount is in cents
payment['typepaiement'] = payment['typepaiement']
payment['typepaiement_text'] = self.PAYMENT_TYPES[payment['typepaiement']]
cache.set(self.payment_info_cache_key, payment, self.PAYMENT_INFO_CACHE_DURATION)
return payment
class Meta:
ordering = ['filenumber']
def __str__(self):
return '%s - %s - %s' % (self.resource, self.nameid, self.filenumber)