add a method to guess transaction_id and backend from an HTTP response (#32224)

This commit is contained in:
Benjamin Dauvergne 2022-06-24 13:04:41 +02:00
parent fe27aeb4e5
commit 7b73275f70
10 changed files with 283 additions and 1 deletions

View File

@ -33,6 +33,7 @@ from .common import ( # noqa: F401
RECEIVED,
URL,
WAITING,
BackendNotFound,
PaymentException,
ResponseError,
force_text,
@ -284,3 +285,38 @@ class Payment:
def get_maximal_amount(self):
return getattr(self.backend, 'maximal_amount', None)
@property
def has_guess(self):
return hasattr(self.backend.__class__, 'guess')
@classmethod
def guess(self, *, method=None, query_string=None, body=None, headers=None, backends=(), **kwargs):
'''Try to guess the kind of backend and the transaction_id given part of an HTTP response.
method CAN be GET or POST.
query_string is the URL encoded query-string as a regular string.
body is the bytes content of the response.
headers can eventually give access to the response headers.
backends is to limit the accepted kind of backends if the possible backends are known.
'''
last_exception = None
for kind, backend in get_backends().items():
if not hasattr(backend, 'guess'):
continue
if backends and kind not in backends:
continue
try:
transaction_id = backend.guess(
method=method, query_string=query_string, body=body, headers=headers
)
except Exception as e:
last_exception = e
continue
if transaction_id:
return kind, transaction_id
if last_exception is not None:
raise last_exception
raise BackendNotFound

View File

@ -89,6 +89,10 @@ class ResponseError(PaymentException):
pass
class BackendNotFound(PaymentException):
pass
class PaymentResponse:
"""Holds a generic view on the result of payment transaction response.

View File

@ -162,3 +162,17 @@ class Payment(PaymentCommon):
'%s error on endpoint "%s": %s "%s"' % (method, endpoint, e, result.get('detail', result))
)
return result
@classmethod
def guess(self, *, method=None, query_string=None, body=None, headers=None, backends=(), **kwargs):
for content in [query_string, body]:
if isinstance(content, bytes):
try:
content = content.decode()
except UnicodeDecodeError:
pass
if isinstance(content, str):
fields = parse_qs(content)
if set(fields) == {'id'}:
return fields['id'][0]
return None

View File

@ -645,3 +645,22 @@ class Payment(PaymentCommon):
order_id=complus or orderid,
transaction_id=transaction_id,
)
@classmethod
def guess(self, *, method=None, query_string=None, body=None, headers=None, backends=(), **kwargs):
for content in [query_string, body]:
if isinstance(content, bytes):
try:
content = content.decode()
except UnicodeDecodeError:
pass
if isinstance(content, str):
fields = {key.upper(): values for key, values in urlparse.parse_qs(content).items()}
if not set(fields) >= {'ORDERID', 'PAYID', 'STATUS', 'NCERROR'}:
continue
orderid = fields.get('ORDERID')
complus = fields.get('COMPLUS')
if complus:
return complus[0]
return orderid[0]
return None

View File

@ -455,3 +455,17 @@ class Payment(PaymentCommon):
def cancel(self, amount, bank_data, **kwargs):
return self.perform(amount, bank_data, PAYBOX_DIRECT_CANCEL_OPERATION)
@classmethod
def guess(self, *, method=None, query_string=None, body=None, headers=None, backends=(), **kwargs):
for content in [query_string, body]:
if isinstance(content, bytes):
try:
content = content.decode()
except UnicodeDecodeError:
pass
if isinstance(content, str):
fields = urlparse.parse_qs(content)
if 'erreur' in fields and 'reference' in fields:
return fields['reference'][0]
return None

View File

@ -374,6 +374,20 @@ class Payment(PaymentCommon):
test=response.saisie == 'T',
)
@classmethod
def guess(self, *, method=None, query_string=None, body=None, headers=None, backends=(), **kwargs):
for content in [query_string, body]:
if isinstance(content, bytes):
try:
content = content.decode()
except UnicodeDecodeError:
pass
if isinstance(content, str):
fields = parse_qs(content)
if set(fields) == {'idOp'}:
return fields['idOp'][0]
return None
if __name__ == '__main__':
import click

View File

@ -256,7 +256,8 @@ class Payment(PaymentCommon):
self.logger.debug('emitting request %r', data)
return transactionReference, FORM, form
def decode_data(self, data):
@classmethod
def decode_data(cls, data):
data = data.split('|')
data = [map(force_text, p.split('=', 1)) for p in data]
return collections.OrderedDict(data)
@ -372,3 +373,20 @@ class Payment(PaymentCommon):
self.logger.debug('received %r', response.content)
response.raise_for_status()
return response.json()
@classmethod
def guess(self, *, method=None, query_string=None, body=None, headers=None, backends=(), **kwargs):
for content in [query_string, body]:
if isinstance(content, bytes):
try:
content = content.decode()
except UnicodeDecodeError:
pass
if isinstance(content, str):
fields = urlparse.parse_qs(content)
if not set(fields) >= {'Data', 'Seal', 'InterfaceVersion'}:
continue
data = self.decode_data(fields['Data'][0])
if 'transactionReference' in data:
return data['transactionReference']
return None

View File

@ -620,3 +620,24 @@ class Payment(PaymentCommon):
sign = sign_method(secret, signed_data)
self.logger.debug('signature «%s»', sign)
return force_text(sign)
@classmethod
def guess(self, *, method=None, query_string=None, body=None, headers=None, backends=(), **kwargs):
for content in [query_string, body]:
if isinstance(content, bytes):
try:
content = content.decode()
except UnicodeDecodeError:
pass
if isinstance(content, str):
fields = urlparse.parse_qs(content)
if not set(fields) >= {SIGNATURE, VADS_CTX_MODE, VADS_AUTH_RESULT}:
continue
vads_eopayment_trans_id = fields.get(VADS_EOPAYMENT_TRANS_ID)
vads_trans_date = fields.get(VADS_TRANS_DATE)
vads_trans_id = fields.get(VADS_TRANS_ID)
if vads_eopayment_trans_id:
return vads_eopayment_trans_id[0]
elif vads_trans_date and vads_trans_id:
return vads_trans_date[0] + '_' + vads_trans_id[0]
return None

View File

@ -202,3 +202,17 @@ class Payment(PaymentCommon):
transaction_id=refdet,
test=test,
)
@classmethod
def guess(self, *, method=None, query_string=None, body=None, headers=None, backends=(), **kwargs):
for content in [query_string, body]:
if isinstance(content, bytes):
try:
content = content.decode()
except UnicodeDecodeError:
pass
if isinstance(content, str):
fields = parse_qs(content)
if 'refdet' in fields and 'resultrans' in fields:
return fields['refdet'][0]
return None

View File

@ -14,9 +14,137 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import pytest
import eopayment
def test_get_backends():
assert len(eopayment.get_backends()) > 1
GUESS_TEST_VECTORS = [
{
'name': 'tipi',
'kwargs': {
'query_string': 'objet=tout+a+fait&montant=12312&saisie=T&mel=info%40entrouvert.com'
'&numcli=12345&exer=9999&refdet=999900000000999999&resultrans=P',
},
'result': ['tipi', '999900000000999999'],
},
{
'name': 'payfip_ws',
'kwargs': {
'query_string': 'idOp=1234',
},
'result': ['payfip_ws', '1234'],
},
{
'name': 'systempayv2-old-transaction-id',
'kwargs': {
'query_string': 'vads_amount=1042&vads_auth_mode=FULL&vads_auth_number=3feadf'
'&vads_auth_result=00&vads_capture_delay=0&vads_card_brand=CB'
'&vads_result=00'
'&vads_card_number=497010XXXXXX0000'
'&vads_payment_certificate=582ba2b725057618706d7a06e9e59acdbf69ff53'
'&vads_ctx_mode=TEST&vads_currency=978&vads_effective_amount=1042'
'&vads_site_id=70168983&vads_trans_date=20161013101355'
'&vads_trans_id=226787&vads_trans_uuid=4b5053b3b1fe4b02a07753e7a'
'&vads_effective_creation_date=20200330162530'
'&signature=c17fab393f94dc027dc029510c85d5fc46c4710f',
},
'result': ['systempayv2', '20161013101355_226787'],
},
{
'name': 'systempayv2-eo-trans-id',
'kwargs': {
'query_string': 'vads_amount=1042&vads_auth_mode=FULL&vads_auth_number=3feadf'
'&vads_auth_result=00&vads_capture_delay=0&vads_card_brand=CB'
'&vads_result=00'
'&vads_card_number=497010XXXXXX0000'
'&vads_payment_certificate=582ba2b725057618706d7a06e9e59acdbf69ff53'
'&vads_ctx_mode=TEST&vads_currency=978&vads_effective_amount=1042'
'&vads_site_id=70168983&vads_trans_date=20161013101355'
'&vads_trans_id=226787&vads_trans_uuid=4b5053b3b1fe4b02a07753e7a'
'&vads_effective_creation_date=20200330162530'
'&signature=c17fab393f94dc027dc029510c85d5fc46c4710f'
'&vads_ext_info_eopayment_trans_id=123456',
},
'result': ['systempayv2', '123456'],
},
{
'name': 'paybox',
'kwargs': {
'query_string': 'montant=4242&reference=abcdef&code_autorisation=A'
'&erreur=00000&date_transaction=20200101&heure_transaction=01%3A01%3A01',
},
'result': ['paybox', 'abcdef'],
},
{
'name': 'ogone-no-complus',
'kwargs': {
'query_string': 'orderid=myorder&status=9&payid=3011229363&cn=Us%C3%A9r&ncerror=0'
'&trxdate=10%2F24%2F16&acceptance=test123&currency=eur&amount=7.5',
},
'result': ['ogone', 'myorder'],
},
{
'name': 'ogone-with-complus',
'kwargs': {
'query_string': 'complus=neworder&orderid=myorder&status=9&payid=3011229363&cn=Us%C3%A9r'
'&ncerror=0&trxdate=10%2F24%2F16&acceptance=test123&currency=eur&amount=7.5',
},
'result': ['ogone', 'neworder'],
},
{
'name': 'mollie',
'kwargs': {
'body': b'id=tr_7UhSN1zuXS',
},
'result': ['mollie', 'tr_7UhSN1zuXS'],
},
{
'name': 'sips2',
'kwargs': {
'body': (
b'Data=captureDay%3D0%7CcaptureMode%3DAUTHOR_CAPTURE%7CcurrencyCode%3D978%7CmerchantId%3D002001000000001%7CorderChannel%3D'
b'INTERNET%7CresponseCode%3D00%7CtransactionDateTime%3D2016-02-01T17%3A44%3A20%2B01%3A00%7C'
b'transactionReference%3D668930%7CkeyVersion%3D1%7CacquirerResponseCode%3D00%7Camou'
b'nt%3D1200%7CauthorisationId%3D12345%7CcardCSCResultCode%3D4E%7CpanExpiryDate%3D201605%7Cpay'
b'mentMeanBrand%3DMASTERCARD%7CpaymentMeanType%3DCARD%7CcustomerIpAddress%3D82.244.203.243%7CmaskedPan'
b'%3D5100%23%23%23%23%23%23%23%23%23%23%23%2300%7CorderId%3Dd4903de7027f4d56ac01634fd7ab9526%7CholderAuthentRelegation'
b'%3DN%7CholderAuthentStatus%3D3D_ERROR%7CtransactionOrigin%3DINTERNET%7CpaymentPattern%3D'
b'ONE_SHOT&Seal=6ca3247765a19b45d25ad54ef4076483e7d55583166bd5ac9c64357aac097602&InterfaceVersion=HP_2.0&Encode='
),
},
'result': ['sips2', '668930'],
},
{
'name': 'notfound',
'kwargs': {},
'exception': eopayment.BackendNotFound,
},
{
'name': 'notfound-2',
'kwargs': {'query_string': None, 'body': [12323], 'headers': {b'1': '2'}},
'exception': eopayment.BackendNotFound,
},
{
'name': 'backends-limitation',
'kwargs': {
'body': b'id=tr_7UhSN1zuXS',
'backends': ['payfips_ws'],
},
'exception': eopayment.BackendNotFound,
},
]
@pytest.mark.parametrize('test_vector', GUESS_TEST_VECTORS, ids=lambda tv: tv['name'])
def test_guess(test_vector):
kwargs, result, exception = test_vector['kwargs'], test_vector.get('result'), test_vector.get('exception')
if exception is not None:
with pytest.raises(exception):
eopayment.Payment.guess(**kwargs)
else:
assert list(eopayment.Payment.guess(**kwargs)) == result