eopayment/eopayment/sips2.py

310 lines
12 KiB
Python

# -*- coding: utf-8 -*-
import collections
import json
from six.moves.urllib import parse as urlparse
import string
from decimal import Decimal
import uuid
import hashlib
import hmac
from gettext import gettext as _
import requests
import warnings
from .common import (PaymentCommon, FORM, Form, PaymentResponse, PAID, ERROR,
CANCELED, ResponseError, force_text)
__all__ = ['Payment']
class Payment(PaymentCommon):
'''
Payment backend module for the ATOS/SIPS system used by many French banks.
The necessary options are:
- merchant_id
- secret_key
- normal_return_url
- automatic_return_url
It was writtent using the documentation from file:
Worldline Benelux_Sips_Technical_integration_guide_Version_1.5.pdf
'''
URL = {
'test': 'https://payment-webinit.simu.sips-atos.com/paymentInit',
'prod': 'https://payment-webinit.sips-atos.com/paymentInit',
}
WS_URL = {
'test': 'https://office-server.test.sips-atos.com',
'prod': 'https://office-server.sips-atos.com',
}
INTERFACE_VERSION = 'HP_2.3'
RESPONSE_CODES = {
'00': 'Authorisation accepted',
'02': 'Authorisation request to be performed via telephone with the issuer, as the '
'card authorisation threshold has been exceeded, if the forcing is authorised for '
'the merchant',
'03': 'Invalid distance selling contract',
'05': 'Authorisation refused',
'12': 'Invalid transaction, verify the parameters transferred in the request.',
'14': 'Invalid bank details or card security code',
'17': 'Buyer cancellation',
'24': 'Operation impossible. The operation the merchant wishes to perform is not '
'compatible with the status of the transaction.',
'25': 'Transaction not found in the Sips database',
'30': 'Format error',
'34': 'Suspicion of fraud',
'40': 'Function not supported: the operation that the merchant would like to perform '
'is not part of the list of operations for which the merchant is authorised',
'51': 'Amount too high',
'54': 'Card is past expiry date',
'60': 'Transaction pending',
'63': 'Security rules not observed, transaction stopped',
'75': 'Number of attempts at entering the card number exceeded',
'90': 'Service temporarily unavailable',
'94': 'Duplicated transaction: for a given day, the TransactionReference has already been '
'used',
'97': 'Timeframe exceeded, transaction refused',
'99': 'Temporary problem at the Sips Office Server level',
}
TEST_MERCHANT_ID = '002001000000001'
description = {
'caption': 'SIPS 2',
'parameters': [
{
'name': 'platform',
'caption': _('Platform'),
'default': 'test',
'choices': ['test', 'prod'],
'required': True,
},
{
'name': 'merchant_id',
'caption': _('Merchant ID'),
'default': TEST_MERCHANT_ID,
'required': True,
},
{
'name': 'secret_key',
'caption': _('Secret Key'),
'default': '002001000000001_KEY1',
'required': True,
},
{
'name': 'key_version',
'caption': _('Key Version'),
'default': '1',
'required': True,
},
{
'name': 'normal_return_url',
'caption': _('Normal return URL'),
'default': '',
'required': True,
},
{
'name': 'automatic_return_url',
'caption': _('Automatic return URL'),
'required': False,
},
{
'name': 'currency_code',
'caption': _('Currency code'),
'default': '978',
'choices': ['978'],
'required': True,
},
{
'name': 'capture_mode',
'caption': _('Capture Mode'),
'default': 'AUTHOR_CAPTURE',
'choices': ['AUTHOR_CAPTURE', 'IMMEDIATE', 'VALIDATION'],
'required': True,
},
{
'name': 'capture_day',
'caption': _('Capture Day'),
'required': False,
},
{
'name': 'payment_means',
'caption': _('Payment Means'),
'required': False
}
],
}
def encode_data(self, data):
return u'|'.join(u'%s=%s' % (force_text(key), force_text(value))
for key, value in data.items())
def seal_data(self, data):
s = self.encode_data(data)
s += force_text(self.secret_key)
s = s.encode('utf-8')
s = hashlib.sha256(s).hexdigest()
return s
def get_data(self):
data = collections.OrderedDict()
data['merchantId'] = self.merchant_id
data['keyVersion'] = self.key_version
data['normalReturnUrl'] = self.normal_return_url
if self.automatic_return_url:
data['automaticResponseUrl'] = self.automatic_return_url
data['currencyCode'] = self.currency_code
data['captureMode'] = self.capture_mode
if self.capture_day:
data['captureDay'] = self.capture_day
if self.payment_means:
data['paymentMeanBrandList'] = self.payment_means
return data
def get_url(self):
return self.URL[self.platform]
def request(self, amount, name=None, first_name=None, last_name=None,
address=None, email=None, phone=None, orderid=None,
info1=None, info2=None, info3=None, next_url=None, **kwargs):
data = self.get_data()
transaction_id = self.transaction_id(10, string.digits, 'sips2', data['merchantId'])
data['transactionReference'] = force_text(transaction_id)
data['orderId'] = orderid or force_text(uuid.uuid4()).replace('-', '')
if info1:
data['statementReference'] = force_text(info1)
else:
data['statementReference'] = data['transactionReference']
data['amount'] = force_text(int(Decimal(amount) * 100))
if email:
data['billingContact.email'] = email
if 'captureDay' in kwargs:
data['captureDay'] == kwargs.get('captureDay')
normal_return_url = self.normal_return_url
if next_url and not normal_return_url:
warnings.warn("passing next_url to request() is deprecated, "
"set normal_return_url in options", DeprecationWarning)
normal_return_url = next_url
if normal_return_url:
data['normalReturnUrl'] = normal_return_url
form = Form(
url=self.get_url(),
method='POST',
fields=[
{
'type': 'hidden',
'name': 'Data',
'value': self.encode_data(data)
},
{
'type': 'hidden',
'name': 'Seal',
'value': self.seal_data(data),
},
{
'type': 'hidden',
'name': 'InterfaceVersion',
'value': self.INTERFACE_VERSION,
},
])
self.logger.debug('emitting request %r', data)
return transaction_id, FORM, form
def decode_data(self, data):
data = data.split('|')
data = [map(force_text, p.split('=', 1)) for p in data]
return collections.OrderedDict(data)
def check_seal(self, data, seal):
return seal == self.seal_data(data)
response_code_to_result = {
'00': PAID,
'17': CANCELED,
}
def response(self, query_string, **kwargs):
form = urlparse.parse_qs(query_string)
if not set(form) >= set(['Data', 'Seal', 'InterfaceVersion']):
raise ResponseError()
self.logger.debug('received query string %r', form)
data = self.decode_data(form['Data'][0])
seal = form['Seal'][0]
self.logger.debug('parsed response %r seal %r', data, seal)
signed = self.check_seal(data, seal)
response_code = data['responseCode']
transaction_id = data.get('transactionReference')
result = self.response_code_to_result.get(response_code, ERROR)
merchant_id = data.get('merchantId')
test = merchant_id == self.TEST_MERCHANT_ID
return PaymentResponse(
result=result,
signed=signed,
bank_data=data,
order_id=transaction_id,
transaction_id=data.get('authorisationId'),
bank_status=self.RESPONSE_CODES.get(response_code, u'unknown code - ' + response_code),
test=test)
def get_seal_for_json_ws_data(self, data):
data_to_send = []
for key in sorted(data.keys()):
if key in ('keyVersion', 'sealAlgorithm', 'seal'):
continue
data_to_send.append(force_text(data[key]))
data_to_send_str = u''.join(data_to_send).encode('utf-8')
return hmac.new(force_text(self.secret_key).encode('utf-8'), data_to_send_str, hashlib.sha256).hexdigest()
def perform_cash_management_operation(self, endpoint, data):
data['merchantId'] = self.merchant_id
data['interfaceVersion'] = 'CR_WS_2.3'
data['keyVersion'] = self.key_version
data['currencyCode'] = self.currency_code
data['seal'] = self.get_seal_for_json_ws_data(data)
url = self.WS_URL.get(self.platform) + '/rs-services/v2/cashManagement/%s' % endpoint
self.logger.debug('posting %r to %s endpoint', data, endpoint)
response = requests.post(url, data=json.dumps(data),
headers={'content-type': 'application/json',
'accept': 'application/json'})
self.logger.debug('received %r', response.content)
response.raise_for_status()
json_response = response.json()
if self.platform == 'prod':
# test environment doesn't set seal
if json_response.get('seal') != self.get_seal_for_json_ws_data(json_response):
raise ResponseError('wrong signature on response')
if json_response.get('responseCode') != '00':
raise ResponseError('non-zero response code (%s)' % json_response)
return json_response
def cancel(self, amount, bank_data, **kwargs):
data = {}
data['operationAmount'] = force_text(int(Decimal(amount) * 100))
data['transactionReference'] = bank_data.get('transactionReference')
return self.perform_cash_management_operation('cancel', data)
def validate(self, amount, bank_data, **kwargs):
data = {}
data['operationAmount'] = force_text(int(Decimal(amount) * 100))
data['transactionReference'] = bank_data.get('transactionReference')
return self.perform_cash_management_operation('validate', data)
def diagnostic(self, amount, bank_data, **kwargs):
data = {}
data['transactionReference'] = bank_data.get('transactionReference')
data['merchantId'] = self.merchant_id
data['interfaceVersion'] = 'DR_WS_2.9'
data['keyVersion'] = self.key_version
data['seal'] = self.get_seal_for_json_ws_data(data)
url = self.WS_URL.get(self.platform) + '/rs-services/v2/diagnostic/getTransactionData'
self.logger.debug('posting %r to %s endpoint', data, 'diagnostic')
response = requests.post(url, data=json.dumps(data),
headers={'content-type': 'application/json',
'accept': 'application/json'})
self.logger.debug('received %r', response.content)
response.raise_for_status()
return response.json()