api: add nonce parameter
This commit is contained in:
parent
71c9d4e73c
commit
13a8032419
|
@ -52,7 +52,7 @@ def get_timestamp(tst):
|
|||
except PyAsn1Error, e:
|
||||
raise ValueError('not a valid TimeStampToken', e)
|
||||
|
||||
def check_timestamp(tst, certificate, data=None, digest=None, hashname=None):
|
||||
def check_timestamp(tst, certificate, data=None, digest=None, hashname=None, nonce=None):
|
||||
hashname = hashname or 'sha1'
|
||||
hashobj = hashlib.new(hashname)
|
||||
if digest is None:
|
||||
|
@ -73,6 +73,8 @@ def check_timestamp(tst, certificate, data=None, digest=None, hashname=None):
|
|||
certificate = X509.load_cert_string(certificate, X509.FORMAT_DER)
|
||||
else:
|
||||
return False, "missing certificate"
|
||||
if nonce is not None and int(tst.tst_info['nonce']) != int(nonce):
|
||||
return False, 'nonce is different or missing'
|
||||
# check message imprint with respect to locally computed digest
|
||||
message_imprint = tst.tst_info.message_imprint
|
||||
if message_imprint.hash_algorithm[0] != get_hash_oid(hashname) or \
|
||||
|
@ -142,14 +144,16 @@ class RemoteTimestamper(object):
|
|||
self.hashobj = hashlib.new(hashname or 'sha1')
|
||||
self.include_tsa_certificate = include_tsa_certificate
|
||||
|
||||
def check_response(self, response, digest):
|
||||
def check_response(self, response, digest, nonce=None):
|
||||
'''
|
||||
Check validity of a TimeStampResponse
|
||||
'''
|
||||
tst = response.time_stamp_token
|
||||
return check_timestamp(tst, digest=digest, certificate=self.certificate, hashname=self.hashobj.name)
|
||||
return check_timestamp(tst, digest=digest,
|
||||
certificate=self.certificate, hashname=self.hashobj.name,
|
||||
nonce=nonce)
|
||||
|
||||
def __call__(self, data=None, digest=None, include_tsa_certificate=None):
|
||||
def __call__(self, data=None, digest=None, include_tsa_certificate=None, nonce=None):
|
||||
algorithm_identifier = rfc2459.AlgorithmIdentifier()
|
||||
algorithm_identifier.setComponentByPosition(0, get_hash_oid(self.hashobj.name))
|
||||
message_imprint = rfc3161.MessageImprint()
|
||||
|
@ -165,6 +169,8 @@ class RemoteTimestamper(object):
|
|||
request = rfc3161.TimeStampReq()
|
||||
request.setComponentByPosition(0, 'v1')
|
||||
request.setComponentByPosition(1, message_imprint)
|
||||
if nonce is not None:
|
||||
request.setComponentByPosition(3, int(nonce))
|
||||
request.setComponentByPosition(4, include_tsa_certificate if include_tsa_certificate is not None else self.include_tsa_certificate)
|
||||
binary_request = encoder.encode(request)
|
||||
http_request = urllib2.Request(self.url, binary_request,
|
||||
|
@ -180,7 +186,7 @@ class RemoteTimestamper(object):
|
|||
tst_response, substrate = decoder.decode(response, asn1Spec=rfc3161.TimeStampResp())
|
||||
if substrate:
|
||||
return False, 'Extra data returned'
|
||||
result, message = self.check_response(tst_response, digest)
|
||||
result, message = self.check_response(tst_response, digest, nonce=nonce)
|
||||
if result:
|
||||
return encoder.encode(tst_response.time_stamp_token), ''
|
||||
else:
|
||||
|
|
14
tests/api.py
14
tests/api.py
|
@ -35,3 +35,17 @@ class Rfc3161(unittest.TestCase):
|
|||
self.assertNotEqual(value, None)
|
||||
self.assertEqual(substrate, '')
|
||||
|
||||
def test_teszt_e_szigno_hu_with_nonce(self):
|
||||
PUBLIC_TSA_SERVER = 'https://teszt.e-szigno.hu:440/tsa'
|
||||
USERNAME = 'teszt'
|
||||
PASSWORD = 'teszt'
|
||||
CERTIFICATE = os.path.join(os.path.dirname(__file__),
|
||||
'../data/e_szigno_test_tsa2.crt')
|
||||
data = 'xx'
|
||||
certificate = file(CERTIFICATE).read()
|
||||
value, substrate = rfc3161.RemoteTimestamper(
|
||||
PUBLIC_TSA_SERVER, certificate=certificate, username=USERNAME,
|
||||
password=PASSWORD, hashname='sha256')(data=data, nonce=2)
|
||||
self.assertIsInstance(rfc3161.get_timestamp(value), datetime.datetime)
|
||||
self.assertNotEqual(value, None)
|
||||
self.assertEqual(substrate, '')
|
||||
|
|
Reference in New Issue