eopayment/eopayment/sips2.py

355 lines
13 KiB
Python

# -*- coding: utf-8 -*-
# eopayment - online payment library
# Copyright (C) 2011-2020 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import datetime
from decimal import Decimal
from gettext import gettext as _
import collections
import hashlib
import hmac
import json
import string
import uuid
import warnings
from six.moves.urllib import parse as urlparse
import requests
import pytz
from .common import (PaymentCommon, FORM, Form, PaymentResponse, PAID, ERROR,
CANCELED, ResponseError, force_text)
__all__ = ['Payment']
class Payment(PaymentCommon):
'''
Payment backend module for the ATOS/SIPS system used by many French banks.
The necessary options are:
- merchant_id
- secret_key
- normal_return_url
- automatic_return_url
It was writtent using the documentation from file:
Worldline Benelux_Sips_Technical_integration_guide_Version_1.5.pdf
'''
URL = {
'test': 'https://payment-webinit.simu.sips-atos.com/paymentInit',
'prod': 'https://payment-webinit.sips-atos.com/paymentInit',
}
WS_URL = {
'test': 'https://office-server.test.sips-atos.com',
'prod': 'https://office-server.sips-atos.com',
}
INTERFACE_VERSION = 'HP_2.3'
RESPONSE_CODES = {
'00': 'Authorisation accepted',
'02': 'Authorisation request to be performed via telephone with the issuer, as the '
'card authorisation threshold has been exceeded, if the forcing is authorised for '
'the merchant',
'03': 'Invalid distance selling contract',
'05': 'Authorisation refused',
'12': 'Invalid transaction, verify the parameters transferred in the request.',
'14': 'Invalid bank details or card security code',
'17': 'Buyer cancellation',
'24': 'Operation impossible. The operation the merchant wishes to perform is not '
'compatible with the status of the transaction.',
'25': 'Transaction not found in the Sips database',
'30': 'Format error',
'34': 'Suspicion of fraud',
'40': 'Function not supported: the operation that the merchant would like to perform '
'is not part of the list of operations for which the merchant is authorised',
'51': 'Amount too high',
'54': 'Card is past expiry date',
'60': 'Transaction pending',
'63': 'Security rules not observed, transaction stopped',
'75': 'Number of attempts at entering the card number exceeded',
'90': 'Service temporarily unavailable',
'94': 'Duplicated transaction: for a given day, the TransactionReference has already been '
'used',
'97': 'Timeframe exceeded, transaction refused',
'99': 'Temporary problem at the Sips Office Server level',
}
TEST_MERCHANT_ID = '002001000000001'
description = {
'caption': 'SIPS 2',
'parameters': [
{
'name': 'platform',
'caption': _('Platform'),
'default': 'test',
'choices': ['test', 'prod'],
'required': True,
},
{
'name': 'merchant_id',
'caption': _('Merchant ID'),
'default': TEST_MERCHANT_ID,
'required': True,
},
{
'name': 'secret_key',
'caption': _('Secret Key'),
'default': '002001000000001_KEY1',
'required': True,
},
{
'name': 'key_version',
'caption': _('Key Version'),
'default': '1',
'required': True,
},
{
'name': 'normal_return_url',
'caption': _('Normal return URL'),
'default': '',
'required': True,
},
{
'name': 'automatic_return_url',
'caption': _('Automatic return URL'),
'required': False,
},
{
'name': 'currency_code',
'caption': _('Currency code'),
'default': '978',
'choices': ['978'],
'required': True,
},
{
'name': 'capture_mode',
'caption': _('Capture Mode'),
'default': 'AUTHOR_CAPTURE',
'choices': ['AUTHOR_CAPTURE', 'IMMEDIATE', 'VALIDATION'],
'required': True,
},
{
'name': 'capture_day',
'caption': _('Capture Day'),
'required': False,
},
{
'name': 'payment_means',
'caption': _('Payment Means'),
'required': False
},
{
'name': 'timezone',
'caption': _('SIPS server Timezone'),
'default': 'Europe/Paris',
'required': False,
}
],
}
def encode_data(self, data):
return u'|'.join(u'%s=%s' % (force_text(key), force_text(value))
for key, value in data.items())
def seal_data(self, data):
s = self.encode_data(data)
s += force_text(self.secret_key)
s = s.encode('utf-8')
s = hashlib.sha256(s).hexdigest()
return s
def get_data(self):
data = collections.OrderedDict()
data['merchantId'] = self.merchant_id
data['keyVersion'] = self.key_version
data['normalReturnUrl'] = self.normal_return_url
if self.automatic_return_url:
data['automaticResponseUrl'] = self.automatic_return_url
data['currencyCode'] = self.currency_code
data['captureMode'] = self.capture_mode
if self.capture_day:
data['captureDay'] = self.capture_day
if self.payment_means:
data['paymentMeanBrandList'] = self.payment_means
return data
def get_url(self):
return self.URL[self.platform]
def request(self, amount, name=None, first_name=None, last_name=None,
address=None, email=None, phone=None, orderid=None,
info1=None, info2=None, info3=None, next_url=None, **kwargs):
data = self.get_data()
transaction_id = self.transaction_id(10, string.digits, 'sips2', data['merchantId'])
data['transactionReference'] = force_text(transaction_id)
data['orderId'] = orderid or force_text(uuid.uuid4()).replace('-', '')
if info1:
data['statementReference'] = force_text(info1)
else:
data['statementReference'] = data['transactionReference']
data['amount'] = self.clean_amount(amount)
if email:
data['billingContact.email'] = email
if 'capture_day' in kwargs:
data['captureDay'] = kwargs.get('capture_day')
normal_return_url = self.normal_return_url
if next_url and not normal_return_url:
warnings.warn("passing next_url to request() is deprecated, "
"set normal_return_url in options", DeprecationWarning)
normal_return_url = next_url
if normal_return_url:
data['normalReturnUrl'] = normal_return_url
form = Form(
url=self.get_url(),
method='POST',
fields=[
{
'type': 'hidden',
'name': 'Data',
'value': self.encode_data(data)
},
{
'type': 'hidden',
'name': 'Seal',
'value': self.seal_data(data),
},
{
'type': 'hidden',
'name': 'InterfaceVersion',
'value': self.INTERFACE_VERSION,
},
])
self.logger.debug('emitting request %r', data)
return transaction_id, FORM, form
def decode_data(self, data):
data = data.split('|')
data = [map(force_text, p.split('=', 1)) for p in data]
return collections.OrderedDict(data)
def check_seal(self, data, seal):
return seal == self.seal_data(data)
response_code_to_result = {
'00': PAID,
'17': CANCELED,
}
def response(self, query_string, **kwargs):
form = urlparse.parse_qs(query_string)
if not set(form) >= set(['Data', 'Seal', 'InterfaceVersion']):
raise ResponseError('missing Data, Seal or InterfaceVersion')
self.logger.debug('received query string %r', form)
data = self.decode_data(form['Data'][0])
seal = form['Seal'][0]
self.logger.debug('parsed response %r seal %r', data, seal)
signed = self.check_seal(data, seal)
response_code = data['responseCode']
transaction_id = data.get('transactionReference')
result = self.response_code_to_result.get(response_code, ERROR)
merchant_id = data.get('merchantId')
test = merchant_id == self.TEST_MERCHANT_ID
transaction_date = None
if 'transactionDateTime' in data:
try:
transaction_date = datetime.datetime.strptime(data['transactionDateTime'], '%Y-%m-%d %H:%M:%S')
except (ValueError, TypeError):
pass
else:
sips_tz = pytz.timezone(self.timezone)
transaction_date = sips_tz.localize(transaction_date)
return PaymentResponse(
result=result,
signed=signed,
bank_data=data,
order_id=transaction_id,
transaction_id=data.get('authorisationId'),
bank_status=self.RESPONSE_CODES.get(response_code, u'unknown code - ' + response_code),
test=test,
transaction_date=transaction_date)
def get_seal_for_json_ws_data(self, data):
data_to_send = []
for key in sorted(data.keys()):
if key in ('keyVersion', 'sealAlgorithm', 'seal'):
continue
data_to_send.append(force_text(data[key]))
data_to_send_str = u''.join(data_to_send).encode('utf-8')
return hmac.new(force_text(self.secret_key).encode('utf-8'), data_to_send_str, hashlib.sha256).hexdigest()
def perform_cash_management_operation(self, endpoint, data):
data['merchantId'] = self.merchant_id
data['interfaceVersion'] = 'CR_WS_2.3'
data['keyVersion'] = self.key_version
data['currencyCode'] = self.currency_code
data['seal'] = self.get_seal_for_json_ws_data(data)
url = self.WS_URL.get(self.platform) + '/rs-services/v2/cashManagement/%s' % endpoint
self.logger.debug('posting %r to %s endpoint', data, endpoint)
response = requests.post(
url,
data=json.dumps(data),
headers={
'content-type': 'application/json',
'accept': 'application/json'
})
self.logger.debug('received %r', response.content)
response.raise_for_status()
json_response = response.json()
if self.platform == 'prod':
# test environment doesn't set seal
if json_response.get('seal') != self.get_seal_for_json_ws_data(json_response):
raise ResponseError('wrong signature on response')
if json_response.get('responseCode') != '00':
raise ResponseError('non-zero response code (%s)' % json_response)
return json_response
def cancel(self, amount, bank_data, **kwargs):
data = {}
data['operationAmount'] = self.clean_amount(amount)
data['transactionReference'] = bank_data.get('transactionReference')
return self.perform_cash_management_operation('cancel', data)
def validate(self, amount, bank_data, **kwargs):
data = {}
data['operationAmount'] = self.clean_amount(amount)
data['transactionReference'] = bank_data.get('transactionReference')
return self.perform_cash_management_operation('validate', data)
def diagnostic(self, amount, bank_data, **kwargs):
data = {}
data['transactionReference'] = bank_data.get('transactionReference')
data['merchantId'] = self.merchant_id
data['interfaceVersion'] = 'DR_WS_2.9'
data['keyVersion'] = self.key_version
data['seal'] = self.get_seal_for_json_ws_data(data)
url = self.WS_URL.get(self.platform) + '/rs-services/v2/diagnostic/getTransactionData'
self.logger.debug('posting %r to %s endpoint', data, 'diagnostic')
response = requests.post(
url,
data=json.dumps(data),
headers={
'content-type': 'application/json',
'accept': 'application/json',
})
self.logger.debug('received %r', response.content)
response.raise_for_status()
return response.json()