# -*- coding: utf-8 # eopayment - online payment library # Copyright (C) 2011-2020 Entr'ouvert # # This program is free software: you can redistribute it and/or modify it # under the terms of the GNU Affero General Public License as published # by the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import codecs from collections import OrderedDict import datetime import logging import hashlib import hmac import re import requests import uuid import pytz from Crypto.Signature import PKCS1_v1_5 from Crypto.PublicKey import RSA from Crypto.Hash import SHA import six from six.moves.urllib import parse as urlparse from six.moves.urllib import parse as urllib import base64 from gettext import gettext as _ import warnings from .common import (PaymentCommon, PaymentResponse, FORM, PAID, CANCELLED, DENIED, ERROR, Form, ResponseError, force_text, force_byte) from . import cb __all__ = ['sign', 'Payment'] PAYBOX_KEY = '''-----BEGIN PUBLIC KEY----- MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDe+hkicNP7ROHUssGNtHwiT2Ew HFrSk/qwrcq8v5metRtTTFPE/nmzSkRnTs3GMpi57rBdxBBJW5W9cpNyGUh0jNXc VrOSClpD5Ri2hER/GcNrxVRP7RlWOqB1C03q4QYmwjHZ+zlM4OUhCCAtSWflB4wC Ka1g88CjFwRw/PB9kwIDAQAB -----END PUBLIC KEY-----''' VARS = { 'PBX_SITE': 'Numéro de site (fourni par Paybox)', 'PBX_RANG': 'Numéro de rang (fourni par Paybox)', 'PBX_IDENTIFIANT': 'Identifiant interne (fourni par Paybox)', 'PBX_TOTAL': 'Montant total de la transaction', 'PBX_DEVISE': 'Devise de la transaction', 'PBX_CMD': 'Référence commande côté commerçant', 'PBX_PORTEUR': 'Adresse E - mail de l’acheteur', 'PBX_RETOUR': 'Liste des variables à retourner par Paybox', 'PBX_HASH': 'Type d’algorit hme de hachage pour le calcul de l’empreinte', 'PBX_TIME': 'Horodatage de la transaction', 'PBX_HMAC': 'Signature calculée avec la clé secrète', } PAYBOX_ERROR_CODES = { '00000': {'message': 'Paiement réalisé avec succés.', 'result': PAID}, '00001': { 'message': 'Demande annulée par l\'usager.', 'result': CANCELLED, }, '001xx': { 'message': 'Paiement refusé par le centre d’autorisation [voir ' '§12.112.1 Codes réponses du centre d’autorisationCodes réponses du ' 'centre d’autorisation]. En cas d’autorisation de la transaction par ' 'le centre d’autorisation de la banque ou de l’établissement financier ' 'privatif, le code erreur “00100” sera en fait remplacé directement ' 'par “00000”.' }, '00003': { 'message': 'Erreur Paybox. Dans ce cas, il est souhaitable de faire une ' 'tentative sur le site secondaire FQDN tpeweb1.paybox.com.' }, '00004': {'message': 'Numéro de porteur ou cryptogramme visuel invalide.'}, '00006': {'message': 'Accès refusé ou site/rang/identifiant incorrect.'}, '00008': {'message': 'Date de fin de validité incorrecte.'}, '00009': {'message': 'Erreur de création d’un abonnement.'}, '00010': {'message': 'Devise inconnue.'}, '00011': {'message': 'Montant incorrect.'}, '00015': {'message': 'Paiement déjà effectué.'}, '00016': { 'message': 'Abonné déjà existant (inscription nouvel abonné). Valeur ' '‘U’ de la variable PBX_RETOUR.' }, '00021': {'message': 'Carte non autorisée.', 'result': DENIED}, '00029': { 'message': 'Carte non conforme. Code erreur renvoyé lors de la documentation de la variable « PBX_EMPREINTE ».' }, '00030': { 'message': 'Temps d’attente > 15 mn par l’internaute/acheteur au niveau de la page de paiements.' }, '00031': {'message': 'Réservé'}, '00032': {'message': 'Réservé'}, '00033': { 'message': 'Code pays de l’adresse IP du navigateur de l’acheteur non autorisé.', 'result': DENIED, }, '00040': { 'message': 'Opération sans authentification 3-DSecure, bloquée par le filtre.', 'result': DENIED, }, '99999': { 'message': 'Opération en attente de validation par l’émetteur du moyen de paiement.' }, } ALGOS = { 'SHA512': hashlib.sha512, 'SHA256': hashlib.sha256, 'SHA384': hashlib.sha384, 'SHA224': hashlib.sha224, } URLS = { 'test': 'https://preprod-tpeweb.paybox.com/cgi/MYchoix_pagepaiement.cgi', 'prod': 'https://tpeweb.paybox.com/cgi/MYchoix_pagepaiement.cgi', 'backup': 'https://tpeweb1.paybox.com/cgi/MYchoix_pagepaiement.cgi', } PAYBOX_DIRECT_URLS = { 'test': 'https://preprod-ppps.paybox.com/PPPS.php', 'prod': 'https://ppps.paybox.com/PPPS.php', 'backup': 'https://ppps1.paybox.com/PPPS.php' } PAYBOX_DIRECT_CANCEL_OPERATION = '00055' PAYBOX_DIRECT_VALIDATE_OPERATION = '00002' PAYBOX_DIRECT_VERSION_NUMBER = '00103' PAYBOX_DIRECT_SUCCESS_RESPONSE_CODE = '00000' # payment modes PAYMENT_MODES = {'AUTHOR_CAPTURE': 'O', 'IMMEDIATE': 'N'} def sign(data, key): '''Take a list of tuple key, value and sign it by building a string to sign. ''' logger = logging.getLogger(__name__) algo = None logger.debug('signature key %r', key) logger.debug('signed data %r', data) for k, v in data: if k == 'PBX_HASH' and v in ALGOS: algo = ALGOS[v] break assert algo, 'Missing or invalid PBX_HASH' tosign = ['%s=%s' % (k, force_text(v)) for k, v in data] tosign = '&'.join(tosign) logger.debug('signed string %r', tosign) tosign = tosign.encode('utf-8') signature = hmac.new(key, tosign, algo) return tuple(data) + (('PBX_HMAC', signature.hexdigest().upper()),) def verify(data, signature, key=PAYBOX_KEY): '''Verify signature using SHA1withRSA by Paybox''' key = RSA.importKey(key) h = SHA.new(force_byte(data)) verifier = PKCS1_v1_5.new(key) return verifier.verify(h, signature) class Payment(PaymentCommon): '''Paybox backend for eopayment. If you want to handle Instant Payment Notification, you must pass provide a automatic_return_url option specifying the URL of the callback endpoint. Email is mandatory to emit payment requests with paybox. IP adresses to authorize: IN OUT test 195.101.99.73 195.101.99.76 production 194.2.160.66 194.2.122.158 backup 195.25.7.146 195.25.7.166 ''' callback = None description = { 'caption': _('Paybox'), 'parameters': [ { 'name': 'normal_return_url', 'caption': _('Normal return URL'), 'default': '', 'required': False, }, { 'name': 'automatic_return_url', 'caption': _('Automatic return URL'), 'required': False, }, { 'name': 'platform', 'caption': _('Plateforme cible'), 'default': 'test', 'choices': ( ('test', 'Test'), ('backup', 'Backup'), ('prod', 'Production'), ) }, { 'name': 'site', 'caption': _('Numéro de site'), 'required': True, 'validation': lambda x: isinstance(x, six.string_types) and x.isdigit() and len(x) == 7, }, { 'name': 'cle', 'caption': _('Site key'), 'required': False, 'validation': lambda x: isinstance(x, six.string_types), }, { 'name': 'rang', 'caption': _('Numéro de rang'), 'required': True, 'validation': lambda x: isinstance(x, six.string_types) and x.isdigit() and len(x) == 3, }, { 'name': 'identifiant', 'caption': _('Identifiant'), 'required': True, 'validation': lambda x: isinstance(x, six.string_types) and x.isdigit() and (0 < len(x) < 10), }, { 'name': 'shared_secret', 'caption': _('Secret partagé (clé HMAC)'), 'validation': lambda x: isinstance(x, str) and all(a.lower() in '0123456789abcdef' for a in x), 'required': True, }, { 'name': 'devise', 'caption': _('Devise'), 'default': '978', 'choices': ( ('978', 'Euro'), ), }, { 'name': 'callback', 'caption': _('Callback URL'), 'deprecated': True, }, { 'name': 'capture_day', 'caption': _('Nombre de jours pour un paiement différé'), 'default': '', 'required': False, 'validation': lambda x: isinstance(x, six.string_types) and x.isdigit() and (1 <= len(x) <= 2) }, { 'name': 'capture_mode', 'caption': _('Capture Mode'), 'default': 'IMMEDIATE', 'required': False, 'choices': list(PAYMENT_MODES) }, { 'name': 'manual_validation', 'caption': 'Validation manuelle', 'type': bool, 'default': False, 'scope': 'transaction' }, { 'name': 'timezone', 'caption': _('Default Timezone'), 'default': 'Europe/Paris', 'required': False, } ] } def make_pbx_cmd(self, guid, orderid=None, transaction_id=None): if not transaction_id: date = datetime.datetime.now(pytz.timezone(self.timezone)).strftime('%Y-%m-%dT%H%M%S') transaction_id = '%s_%s' % (date, guid) pbx_cmd = transaction_id if orderid: pbx_cmd += '!' + orderid return pbx_cmd def request(self, amount, email, name=None, orderid=None, manual_validation=None, **kwargs): d = OrderedDict() d['PBX_SITE'] = force_text(self.site) d['PBX_RANG'] = force_text(self.rang).strip()[-3:] d['PBX_IDENTIFIANT'] = force_text(self.identifiant) d['PBX_TOTAL'] = self.clean_amount(amount) d['PBX_DEVISE'] = force_text(self.devise) guid = str(uuid.uuid4().hex) transaction_id = d['PBX_CMD'] = self.make_pbx_cmd(guid=guid, transaction_id=kwargs.get('transaction_id'), orderid=orderid) d['PBX_PORTEUR'] = force_text(email) d['PBX_RETOUR'] = ( 'montant:M;reference:R;code_autorisation:A;erreur:E;numero_appel:T;' 'numero_transaction:S;' 'date_transaction:W;heure_transaction:Q;' 'signature:K' ) d['PBX_HASH'] = 'SHA512' d['PBX_TIME'] = kwargs.get('time') or ( force_text(datetime.datetime.utcnow().isoformat('T')).split('.')[0] + '+00:00') d['PBX_ARCHIVAGE'] = orderid or guid if self.normal_return_url: d['PBX_EFFECTUE'] = self.normal_return_url d['PBX_REFUSE'] = self.normal_return_url d['PBX_ANNULE'] = self.normal_return_url d['PBX_ATTENTE'] = self.normal_return_url automatic_return_url = self.automatic_return_url if not automatic_return_url and self.callback: warnings.warn("callback option is deprecated, " "use automatic_return_url", DeprecationWarning) automatic_return_url = self.callback capture_day = capture_day = kwargs.get('capture_day', self.capture_day) if capture_day: d['PBX_DIFF'] = capture_day.zfill(2) d['PBX_AUTOSEULE'] = PAYMENT_MODES[self.capture_mode] if manual_validation: d['PBX_AUTOSEULE'] = PAYMENT_MODES['AUTHOR_CAPTURE'] if automatic_return_url: d['PBX_REPONDRE_A'] = force_text(automatic_return_url) d = d.items() if six.PY3: shared_secret = codecs.decode(bytes(self.shared_secret, 'ascii'), 'hex') else: shared_secret = codecs.decode(bytes(self.shared_secret), 'hex') d = sign(d, shared_secret) url = URLS[self.platform] fields = [] for k, v in d: fields.append({ 'type': u'hidden', 'name': force_text(k), 'value': force_text(v), }) form = Form(url, 'POST', fields, submit_name=None, submit_value=u'Envoyer', encoding='utf-8') return transaction_id, FORM, form def response(self, query_string, callback=False, **kwargs): d = urlparse.parse_qs(query_string, True, False) if not set(d) >= set(['erreur', 'reference']): raise ResponseError('missing erreur or reference') signed = False if 'signature' in d: sig = d['signature'][0] sig = base64.b64decode(sig) data = [] if callback: for key in ('montant', 'reference', 'code_autorisation', 'erreur', 'numero_appel', 'numero_transaction', 'date_transaction', 'heure_transaction'): data.append('%s=%s' % (key, urllib.quote(d[key][0]))) else: for key, value in urlparse.parse_qsl(query_string, True, True): if key == 'signature': break data.append('%s=%s' % (key, urllib.quote(value))) data = '&'.join(data) signed = verify(data, sig) erreur = d['erreur'][0] if re.match(r'^001[0-9][0-9]$', erreur): cb_error_code = erreur[3:5] message, result = cb.translate_cb_error_code(cb_error_code) elif erreur in PAYBOX_ERROR_CODES: message = PAYBOX_ERROR_CODES[erreur]['message'] result = PAYBOX_ERROR_CODES[erreur].get('result', ERROR) else: message = 'Code erreur inconnu %s' % erreur result = ERROR pbx_cmd = d['reference'][0] transaction_date = None if 'date_transaction' in d and 'heure_transaction' in d: try: full_date_string = '%sT%s' % (d['date_transaction'][0], d['heure_transaction'][0]) transaction_date = datetime.datetime.strptime(full_date_string, '%Y%m%dT%H:%M:%S') except ValueError: pass else: # We suppose Europe/Paris is the default timezone for Paybox # servers. paris_tz = pytz.timezone(self.timezone) transaction_date = paris_tz.localize(transaction_date) return PaymentResponse( order_id=pbx_cmd, signed=signed, bank_data=d, result=result, bank_status=message, transaction_date=transaction_date) def perform(self, amount, bank_data, operation): logger = logging.getLogger(__name__) url = PAYBOX_DIRECT_URLS[self.platform] params = { 'VERSION': PAYBOX_DIRECT_VERSION_NUMBER, 'TYPE': operation, 'SITE': force_text(self.site), 'RANG': self.rang.strip(), 'CLE': force_text(self.cle), 'NUMQUESTION': bank_data['numero_transaction'][0].zfill(10), 'MONTANT': self.clean_amount(amount), 'DEVISE': force_text(self.devise), 'NUMTRANS': bank_data['numero_transaction'][0], # paybox transaction number 'NUMAPPEL': bank_data['numero_appel'][0], 'REFERENCE': bank_data['reference'][0], 'DATEQ': datetime.datetime.now().strftime('%d%m%Y%H%M%S'), } response = requests.post(url, params) response.raise_for_status() logger.debug('received %r', response.text) data = dict(urlparse.parse_qsl(response.text, True, True)) if data.get('CODEREPONSE') != PAYBOX_DIRECT_SUCCESS_RESPONSE_CODE: if six.PY2: raise ResponseError(data['COMMENTAIRE'].encode('utf-8')) raise ResponseError(data['COMMENTAIRE']) return data def validate(self, amount, bank_data, **kwargs): return self.perform(amount, bank_data, PAYBOX_DIRECT_VALIDATE_OPERATION) def cancel(self, amount, bank_data, **kwargs): return self.perform(amount, bank_data, PAYBOX_DIRECT_CANCEL_OPERATION)