Import py3dns_3.2.1.orig.tar.gz
[dgit import orig py3dns_3.2.1.orig.tar.gz]
This commit is contained in:
commit
2dadd3e3e6
|
@ -0,0 +1,81 @@
|
|||
3.2.1 Wed, Sep 4, 2019
|
||||
* Add support for setting timeout for convenience methods in DNS.lazy
|
||||
* Fixed DNS.req resulttype error format (LP: #1842423)
|
||||
* Use errno.EADDRINUSE instead of the hard coded Linux value for improved
|
||||
portability (LP: #1793540)
|
||||
* Update test suite to correct for use of no longer existing DNS records
|
||||
* Set timeout=1 for tests so testing with a non-responsive nameserver will
|
||||
finish in a reasonable time
|
||||
|
||||
3.2.0 Mon, 23 Jul 2018
|
||||
* Rename internal use of async since it is a reserved word in python3.7
|
||||
(LP: #1776027)
|
||||
* Switch from distutils to setuptools
|
||||
* Ship test.py in the tarball
|
||||
|
||||
3.1.1 Thu, 06 Oct 2016 22:00:13 -0400
|
||||
* Update test suite for new example.org IP addresses
|
||||
* Fix missing bits for use of ipaddr-py with python3 < 3.3 (LP: #1319611)
|
||||
- Patch thanks to Arfrever Frehtes Taifersar Arahesis
|
||||
* Correct error in _DiscoverNameServers from OS X implementation in 3.0.1
|
||||
that prevents name server discovery on windows (LP: #1442424)
|
||||
* Correct encoding issue with label length to fix issues with DNS labels
|
||||
greater than 46 characters (LP: #1502853)
|
||||
- Thanks to Petr Czepiec for the patch
|
||||
* Use full path to /usr/sbin/scutil on OS X since scutil is not always in the
|
||||
search path (LP: #1630844)
|
||||
|
||||
3.1.0 Thu Apr 24 23:52:00 EDT 2014
|
||||
* Raise DNSError when no nameservers have been found by the time a
|
||||
Base.DNSRequest object is initialized
|
||||
* Add new DNS.DnsRequest.qry function to supercede DNS.DnsRequest.req to
|
||||
allow for non-backward compatible changes to be made in .qry while
|
||||
maintaining full backward compatibility with 3.0.3 and later in the 3.0
|
||||
series in .req
|
||||
* Add options for 'resulttype' to DnsResult.qry to allow for binary, integer,
|
||||
or text data to be returned for IP addresses
|
||||
* The default result type for IPv4 and IPv6 addresses in DNS.DnsRequest.qry
|
||||
is an ipaddress object
|
||||
* The ipaddress module is used internally. This is included in python3.3 and
|
||||
the ipaddr-py module from https://code.google.com/p/ipaddr-py/ can be used
|
||||
with python3.2
|
||||
* New unittest based test suite - thanks to Diane Trout
|
||||
|
||||
3.0.4 Wed Aug 7 02:25:00 EDT 2013
|
||||
|
||||
* Fix timeouts associated with only one of several available nameservers
|
||||
being unavailable(LP: #1209071):
|
||||
- Only raise timeout error after trying all available servers
|
||||
- Stop lookups once an answer is gotten
|
||||
* Removed unmaintained spec files
|
||||
|
||||
3.0.3 Wed May 29 00:05:00 EDT 2013
|
||||
|
||||
* Revert returning IPv6 addresses from AAAA lookups as string. Causing
|
||||
incompatiblities that are deeply annoying to fix on the other end.
|
||||
|
||||
3.0.2 Thu Jan 19 10:59:00 EST 2012
|
||||
|
||||
* Add more granular exception sub classes of DNSError, see SF #3388075
|
||||
- Thanks to Julian Mehnle for the patch
|
||||
* Add AAAA record support, works like A records
|
||||
- Thanks to Shane Kerr for the patch
|
||||
|
||||
3.0.1 Mon Jul 18 19:46:30 EDT 2011
|
||||
|
||||
* Add CHANGES to document post-Python 3 port changes
|
||||
* Add LICENSE file
|
||||
* Port pydns 2.3.5 changes to py3dns
|
||||
- Handle large TCP replies (change to blocking IO with timeout)
|
||||
- Add new lazy.dnslookup function to retrieve answer data for any query
|
||||
type
|
||||
- Add large TCP reply test to tests/test.py
|
||||
* Add automatic name server discovery for OS X
|
||||
|
||||
3.0.0 Wed Feb 9 23:35:22 EST 2011
|
||||
|
||||
Ported to Python3 by Scott Kitterman <scott@kitterman.com>. This is mostly a
|
||||
minimal port to work with Python3 (tested with python3.2) plus addition of
|
||||
some of the patches that people have submitted on Sourceforge. It should be
|
||||
fully API compatible with 2.3.
|
||||
|
|
@ -0,0 +1,18 @@
|
|||
This code was originally based on the DNS library created
|
||||
by Guido van Rossum somewhere near the dawn of time.
|
||||
|
||||
Since then, as well as myself (Anthony), I have had contributions
|
||||
by:
|
||||
|
||||
Michael Ströder
|
||||
Bastian Kleineidam
|
||||
Timothy J. Miller
|
||||
Wolfgang Strobl
|
||||
Arnaud Fontaine
|
||||
Scott Kitterman
|
||||
Stuart Gathman
|
||||
Diane Trout
|
||||
|
||||
It's possible there's other people - the old RCS logs for my
|
||||
code were lost some time ago. The list above is almost certainly
|
||||
incomplete - let me know if I've forgotten you...
|
|
@ -0,0 +1,480 @@
|
|||
"""
|
||||
$Id$
|
||||
|
||||
This file is part of the py3dns project.
|
||||
Homepage: https://launchpad.net/py3dns
|
||||
|
||||
This code is covered by the standard Python License. See LICENSE for details.
|
||||
|
||||
Changes for Python3 port © 2011-14 Scott Kitterman <scott@kitterman.com>
|
||||
|
||||
Base functionality. Request and Response classes, that sort of thing.
|
||||
"""
|
||||
|
||||
import socket, string, types, time, select
|
||||
import errno
|
||||
from . import Type,Class,Opcode
|
||||
import asyncore
|
||||
#
|
||||
# This random generator is used for transaction ids and port selection. This
|
||||
# is important to prevent spurious results from lost packets, and malicious
|
||||
# cache poisoning. This doesn't matter if you are behind a caching nameserver
|
||||
# or your app is a primary DNS server only. To install your own generator,
|
||||
# replace DNS.Base.random. SystemRandom uses /dev/urandom or similar source.
|
||||
#
|
||||
try:
|
||||
from random import SystemRandom
|
||||
random = SystemRandom()
|
||||
except:
|
||||
import random
|
||||
|
||||
class DNSError(Exception): pass
|
||||
class ArgumentError(DNSError): pass
|
||||
class SocketError(DNSError): pass
|
||||
class TimeoutError(DNSError): pass
|
||||
|
||||
class ServerError(DNSError):
|
||||
def __init__(self, message, rcode):
|
||||
DNSError.__init__(self, message, rcode)
|
||||
self.message = message
|
||||
self.rcode = rcode
|
||||
|
||||
class IncompleteReplyError(DNSError): pass
|
||||
|
||||
# Lib uses some of the above exception classes, so import after defining.
|
||||
from . import Lib
|
||||
|
||||
defaults= { 'protocol':'udp', 'port':53, 'opcode':Opcode.QUERY,
|
||||
'qtype':Type.A, 'rd':1, 'timing':1, 'timeout': 30, 'server_rotate': 0,
|
||||
'server': [] }
|
||||
|
||||
def ParseResolvConf(resolv_path="/etc/resolv.conf"):
|
||||
"parses the /etc/resolv.conf file and sets defaults for name servers"
|
||||
with open(resolv_path, 'r') as stream:
|
||||
return ParseResolvConfFromIterable(stream)
|
||||
|
||||
def ParseResolvConfFromIterable(lines):
|
||||
"parses a resolv.conf formatted stream and sets defaults for name servers"
|
||||
global defaults
|
||||
for line in lines:
|
||||
line = line.strip()
|
||||
if not line or line[0]==';' or line[0]=='#':
|
||||
continue
|
||||
fields=line.split()
|
||||
if len(fields) < 2:
|
||||
continue
|
||||
if fields[0]=='domain' and len(fields) > 1:
|
||||
defaults['domain']=fields[1]
|
||||
if fields[0]=='search':
|
||||
pass
|
||||
if fields[0]=='options':
|
||||
pass
|
||||
if fields[0]=='sortlist':
|
||||
pass
|
||||
if fields[0]=='nameserver':
|
||||
defaults['server'].append(fields[1])
|
||||
|
||||
def _DiscoverNameServers():
|
||||
import sys
|
||||
if sys.platform in ('win32', 'nt'):
|
||||
from . import win32dns
|
||||
defaults['server']=win32dns.RegistryResolve()
|
||||
elif sys.platform == 'darwin':
|
||||
ParseOSXSysConfig()
|
||||
else:
|
||||
return ParseResolvConf()
|
||||
|
||||
def DiscoverNameServers():
|
||||
"""Don't call, only here for backward compatability. We do discovery for
|
||||
you automatically.
|
||||
"""
|
||||
pass
|
||||
|
||||
class DnsRequest:
|
||||
""" high level Request object """
|
||||
def __init__(self,*name,**args):
|
||||
self.donefunc=None
|
||||
self.py3async=None
|
||||
self.defaults = {}
|
||||
self.argparse(name,args)
|
||||
self.defaults = self.args
|
||||
self.tid = 0
|
||||
self.resulttype = ''
|
||||
if len(self.defaults['server']) == 0:
|
||||
raise DNSError('No working name servers discovered')
|
||||
|
||||
def argparse(self,name,args):
|
||||
if not name and 'name' in self.defaults:
|
||||
args['name'] = self.defaults['name']
|
||||
if type(name) is bytes or type(name) is str:
|
||||
args['name']=name
|
||||
else:
|
||||
if len(name) == 1:
|
||||
if name[0]:
|
||||
args['name']=name[0]
|
||||
if defaults['server_rotate'] and \
|
||||
type(defaults['server']) == types.ListType:
|
||||
defaults['server'] = defaults['server'][1:]+defaults['server'][:1]
|
||||
for i in list(defaults.keys()):
|
||||
if i not in args:
|
||||
if i in self.defaults:
|
||||
args[i]=self.defaults[i]
|
||||
else:
|
||||
args[i]=defaults[i]
|
||||
if type(args['server']) == bytes or type(args['server']) == str:
|
||||
args['server'] = [args['server']]
|
||||
self.args=args
|
||||
|
||||
def socketInit(self,a,b):
|
||||
self.s = socket.socket(a,b)
|
||||
|
||||
def processUDPReply(self):
|
||||
if self.timeout > 0:
|
||||
r,w,e = select.select([self.s],[],[],self.timeout)
|
||||
if not len(r):
|
||||
raise TimeoutError('Timeout')
|
||||
(self.reply, self.from_address) = self.s.recvfrom(65535)
|
||||
self.time_finish=time.time()
|
||||
self.args['server']=self.ns
|
||||
return self.processReply()
|
||||
|
||||
def _readall(self,f,count):
|
||||
res = f.read(count)
|
||||
while len(res) < count:
|
||||
if self.timeout > 0:
|
||||
# should we restart timeout everytime we get a dribble of data?
|
||||
rem = self.time_start + self.timeout - time.time()
|
||||
if rem <= 0: raise DNSError('Timeout')
|
||||
self.s.settimeout(rem)
|
||||
buf = f.read(count - len(res))
|
||||
if not buf:
|
||||
raise DNSError('incomplete reply - %d of %d read' % (len(res),count))
|
||||
res += buf
|
||||
return res
|
||||
|
||||
def processTCPReply(self):
|
||||
if self.timeout > 0:
|
||||
self.s.settimeout(self.timeout)
|
||||
else:
|
||||
self.s.settimeout(None)
|
||||
f = self.s.makefile('rb')
|
||||
try:
|
||||
header = self._readall(f,2)
|
||||
count = Lib.unpack16bit(header)
|
||||
self.reply = self._readall(f,count)
|
||||
finally:
|
||||
f.close()
|
||||
self.time_finish=time.time()
|
||||
self.args['server']=self.ns
|
||||
return self.processReply()
|
||||
|
||||
def processReply(self):
|
||||
self.args['elapsed']=(self.time_finish-self.time_start)*1000
|
||||
if not self.resulttype:
|
||||
u = Lib.Munpacker(self.reply)
|
||||
elif self.resulttype == 'default':
|
||||
u = Lib.MunpackerDefault(self.reply)
|
||||
elif self.resulttype == 'binary':
|
||||
u = Lib.MunpackerBinary(self.reply)
|
||||
elif self.resulttype == 'text':
|
||||
u = Lib.MunpackerText(self.reply)
|
||||
elif self.resulttype == 'integer':
|
||||
u = Lib.MunpackerInteger(self.reply)
|
||||
else:
|
||||
raise SyntaxError('Unknown resulttype: ' + self.resulttype)
|
||||
r=Lib.DnsResult(u,self.args)
|
||||
r.args=self.args
|
||||
#self.args=None # mark this DnsRequest object as used.
|
||||
return r
|
||||
#### TODO TODO TODO ####
|
||||
# if protocol == 'tcp' and qtype == Type.AXFR:
|
||||
# while 1:
|
||||
# header = f.read(2)
|
||||
# if len(header) < 2:
|
||||
# print '========== EOF =========='
|
||||
# break
|
||||
# count = Lib.unpack16bit(header)
|
||||
# if not count:
|
||||
# print '========== ZERO COUNT =========='
|
||||
# break
|
||||
# print '========== NEXT =========='
|
||||
# reply = f.read(count)
|
||||
# if len(reply) != count:
|
||||
# print '*** Incomplete reply ***'
|
||||
# break
|
||||
# u = Lib.Munpacker(reply)
|
||||
# Lib.dumpM(u)
|
||||
|
||||
def getSource(self):
|
||||
"Pick random source port to avoid DNS cache poisoning attack."
|
||||
while True:
|
||||
try:
|
||||
source_port = random.randint(1024,65535)
|
||||
self.s.bind(('', source_port))
|
||||
break
|
||||
except socket.error as msg:
|
||||
# errno.EADDRINUSE, 'Address already in use'
|
||||
if msg.errno != errno.EADDRINUSE: raise
|
||||
|
||||
def conn(self):
|
||||
self.getSource()
|
||||
self.s.connect((self.ns,self.port))
|
||||
|
||||
def qry(self,*name,**args):
|
||||
'''
|
||||
Request function for the DnsRequest class. In addition to standard
|
||||
DNS args, the special pydns arg 'resulttype' can optionally be passed.
|
||||
Valid resulttypes are 'default', 'text', 'decimal', and 'binary'.
|
||||
|
||||
Defaults are configured to be compatible with pydns:
|
||||
AAAA: decimal
|
||||
Others: text
|
||||
'''
|
||||
" needs a refactoring "
|
||||
self.argparse(name,args)
|
||||
#if not self.args:
|
||||
# raise ArgumentError, 'reinitialize request before reuse'
|
||||
protocol = self.args['protocol']
|
||||
self.port = self.args['port']
|
||||
self.tid = random.randint(0,65535)
|
||||
self.timeout = self.args['timeout'];
|
||||
opcode = self.args['opcode']
|
||||
rd = self.args['rd']
|
||||
server=self.args['server']
|
||||
if 'resulttype' in self.args:
|
||||
self.resulttype = self.args['resulttype']
|
||||
else:
|
||||
self.resulttype = 'default'
|
||||
if type(self.args['qtype']) == bytes or type(self.args['qtype']) == str:
|
||||
try:
|
||||
qtype = getattr(Type, str(self.args['qtype'].upper()))
|
||||
except AttributeError:
|
||||
raise ArgumentError('unknown query type')
|
||||
else:
|
||||
qtype = self.args['qtype']
|
||||
if 'name' not in self.args:
|
||||
print((self.args))
|
||||
raise ArgumentError('nothing to lookup')
|
||||
qname = self.args['name']
|
||||
if qtype == Type.AXFR and protocol != 'tcp':
|
||||
print('Query type AXFR, protocol forced to TCP')
|
||||
protocol = 'tcp'
|
||||
#print('QTYPE %d(%s)' % (qtype, Type.typestr(qtype)))
|
||||
m = Lib.Mpacker()
|
||||
# jesus. keywords and default args would be good. TODO.
|
||||
m.addHeader(self.tid,
|
||||
0, opcode, 0, 0, rd, 0, 0, 0,
|
||||
1, 0, 0, 0)
|
||||
m.addQuestion(qname, qtype, Class.IN)
|
||||
self.request = m.getbuf()
|
||||
try:
|
||||
if protocol == 'udp':
|
||||
self.sendUDPRequest(server)
|
||||
else:
|
||||
self.sendTCPRequest(server)
|
||||
except socket.error as reason:
|
||||
raise SocketError(reason)
|
||||
if self.py3async:
|
||||
return None
|
||||
else:
|
||||
return self.response
|
||||
|
||||
def req(self,*name,**args):
|
||||
" needs a refactoring "
|
||||
self.argparse(name,args)
|
||||
#if not self.args:
|
||||
# raise ArgumentError, 'reinitialize request before reuse'
|
||||
try:
|
||||
if self.args['resulttype']:
|
||||
raise ArgumentError('Restulttype {0} set with DNS.req, use DNS.qry to specify result type.'.format(self.args['resulttype']))
|
||||
except:
|
||||
# resulttype isn't set and that's what we want for DNS.req
|
||||
pass
|
||||
protocol = self.args['protocol']
|
||||
self.port = self.args['port']
|
||||
self.tid = random.randint(0,65535)
|
||||
self.timeout = self.args['timeout'];
|
||||
opcode = self.args['opcode']
|
||||
rd = self.args['rd']
|
||||
server=self.args['server']
|
||||
if type(self.args['qtype']) == bytes or type(self.args['qtype']) == str:
|
||||
try:
|
||||
qtype = getattr(Type, str(self.args['qtype'].upper()))
|
||||
except AttributeError:
|
||||
raise ArgumentError('unknown query type')
|
||||
else:
|
||||
qtype = self.args['qtype']
|
||||
if 'name' not in self.args:
|
||||
print((self.args))
|
||||
raise ArgumentError('nothing to lookup')
|
||||
qname = self.args['name']
|
||||
if qtype == Type.AXFR and protocol != 'tcp':
|
||||
print('Query type AXFR, protocol forced to TCP')
|
||||
protocol = 'tcp'
|
||||
#print('QTYPE %d(%s)' % (qtype, Type.typestr(qtype)))
|
||||
m = Lib.Mpacker()
|
||||
# jesus. keywords and default args would be good. TODO.
|
||||
m.addHeader(self.tid,
|
||||
0, opcode, 0, 0, rd, 0, 0, 0,
|
||||
1, 0, 0, 0)
|
||||
m.addQuestion(qname, qtype, Class.IN)
|
||||
self.request = m.getbuf()
|
||||
try:
|
||||
if protocol == 'udp':
|
||||
self.sendUDPRequest(server)
|
||||
else:
|
||||
self.sendTCPRequest(server)
|
||||
except socket.error as reason:
|
||||
raise SocketError(reason)
|
||||
if self.py3async:
|
||||
return None
|
||||
else:
|
||||
return self.response
|
||||
|
||||
def sendUDPRequest(self, server):
|
||||
"refactor me"
|
||||
first_socket_error = None
|
||||
self.response=None
|
||||
for self.ns in server:
|
||||
try:
|
||||
if self.ns.count(':'):
|
||||
if hasattr(socket,'has_ipv6') and socket.has_ipv6:
|
||||
self.socketInit(socket.AF_INET6, socket.SOCK_DGRAM)
|
||||
else: continue
|
||||
else:
|
||||
self.socketInit(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
try:
|
||||
# TODO. Handle timeouts &c correctly (RFC)
|
||||
self.time_start=time.time()
|
||||
self.conn()
|
||||
if not self.py3async:
|
||||
self.s.send(self.request)
|
||||
r=self.processUDPReply()
|
||||
# Since we bind to the source port and connect to the
|
||||
# destination port, we don't need to check that here,
|
||||
# but do make sure it's actually a DNS request that the
|
||||
# packet is in reply to.
|
||||
while r.header['id'] != self.tid \
|
||||
or self.from_address[1] != self.port:
|
||||
r=self.processUDPReply()
|
||||
self.response = r
|
||||
# FIXME: check waiting async queries
|
||||
finally:
|
||||
if not self.py3async:
|
||||
self.s.close()
|
||||
except socket.error as e:
|
||||
# Keep trying more nameservers, but preserve the first error
|
||||
# that occurred so it can be reraised in case none of the
|
||||
# servers worked:
|
||||
first_socket_error = first_socket_error or e
|
||||
continue
|
||||
except TimeoutError as t:
|
||||
first_socket_error = first_socket_error or t
|
||||
continue
|
||||
if self.response:
|
||||
break
|
||||
if not self.response and first_socket_error:
|
||||
raise first_socket_error
|
||||
|
||||
def sendTCPRequest(self, server):
|
||||
" do the work of sending a TCP request "
|
||||
first_socket_error = None
|
||||
self.response=None
|
||||
for self.ns in server:
|
||||
#print "trying tcp",self.ns
|
||||
try:
|
||||
if self.ns.count(':'):
|
||||
if hasattr(socket,'has_ipv6') and socket.has_ipv6:
|
||||
self.socketInit(socket.AF_INET6, socket.SOCK_STREAM)
|
||||
else: continue
|
||||
else:
|
||||
self.socketInit(socket.AF_INET, socket.SOCK_STREAM)
|
||||
try:
|
||||
# TODO. Handle timeouts &c correctly (RFC)
|
||||
self.time_start=time.time()
|
||||
self.conn()
|
||||
buf = Lib.pack16bit(len(self.request))+self.request
|
||||
# Keep server from making sendall hang
|
||||
self.s.setblocking(0)
|
||||
# FIXME: throws WOULDBLOCK if request too large to fit in
|
||||
# system buffer
|
||||
self.s.sendall(buf)
|
||||
# SHUT_WR breaks blocking IO with google DNS (8.8.8.8)
|
||||
#self.s.shutdown(socket.SHUT_WR)
|
||||
r=self.processTCPReply()
|
||||
if r.header['id'] == self.tid:
|
||||
self.response = r
|
||||
break
|
||||
finally:
|
||||
self.s.close()
|
||||
except socket.error as e:
|
||||
first_socket_error = first_socket_error or e
|
||||
continue
|
||||
except TimeoutError as t:
|
||||
first_socket_error = first_socket_error or t
|
||||
continue
|
||||
if self.response:
|
||||
break
|
||||
if not self.response and first_socket_error:
|
||||
raise first_socket_error
|
||||
|
||||
#class DnsAsyncRequest(DnsRequest):
|
||||
class DnsAsyncRequest(DnsRequest,asyncore.dispatcher_with_send):
|
||||
" an asynchronous request object. out of date, probably broken "
|
||||
def __init__(self,*name,**args):
|
||||
DnsRequest.__init__(self, *name, **args)
|
||||
# XXX todo
|
||||
if 'done' in args and args['done']:
|
||||
self.donefunc=args['done']
|
||||
else:
|
||||
self.donefunc=self.showResult
|
||||
#self.realinit(name,args) # XXX todo
|
||||
self.py3async=1
|
||||
def conn(self):
|
||||
self.getSource()
|
||||
self.connect((self.ns,self.port))
|
||||
self.time_start=time.time()
|
||||
if 'start' in self.args and self.args['start']:
|
||||
asyncore.dispatcher.go(self)
|
||||
def socketInit(self,a,b):
|
||||
self.create_socket(a,b)
|
||||
asyncore.dispatcher.__init__(self)
|
||||
self.s=self
|
||||
def handle_read(self):
|
||||
if self.args['protocol'] == 'udp':
|
||||
self.response=self.processUDPReply()
|
||||
if self.donefunc:
|
||||
self.donefunc(*(self,))
|
||||
def handle_connect(self):
|
||||
self.send(self.request)
|
||||
def handle_write(self):
|
||||
pass
|
||||
def showResult(self,*s):
|
||||
self.response.show()
|
||||
|
||||
def ParseOSXSysConfig():
|
||||
"Retrieves the current Mac OS X resolver settings using the scutil(8) command."
|
||||
import os, re
|
||||
scutil = os.popen('/usr/sbin/scutil --dns', 'r')
|
||||
res_re = re.compile('^\s+nameserver[]0-9[]*\s*\:\s*(\S+)$')
|
||||
sets = [ ]
|
||||
currentset = None
|
||||
while True:
|
||||
l = scutil.readline()
|
||||
if not l:
|
||||
break
|
||||
l = l.rstrip()
|
||||
if len(l) < 1 or l[0] not in string.whitespace:
|
||||
currentset = None
|
||||
continue
|
||||
m = res_re.match(l)
|
||||
if m:
|
||||
if currentset is None:
|
||||
currentset = [ ]
|
||||
sets.append(currentset)
|
||||
currentset.append(m.group(1))
|
||||
scutil.close()
|
||||
# Someday: Figure out if we should do something other than simply concatenate the sets.
|
||||
for currentset in sets:
|
||||
defaults['server'].extend(currentset)
|
||||
|
|
@ -0,0 +1,35 @@
|
|||
"""
|
||||
$Id$
|
||||
|
||||
This file is part of the py3dns project.
|
||||
Homepage: https://launchpad.net/py3dns
|
||||
|
||||
This code is covered by the standard Python License. See LICENSE for details.
|
||||
|
||||
CLASS values (section 3.2.4)
|
||||
"""
|
||||
|
||||
|
||||
IN = 1 # the Internet
|
||||
CS = 2 # the CSNET class (Obsolete - used only for examples in
|
||||
# some obsolete RFCs)
|
||||
CH = 3 # the CHAOS class. When someone shows me python running on
|
||||
# a Symbolics Lisp machine, I'll look at implementing this.
|
||||
HS = 4 # Hesiod [Dyer 87]
|
||||
|
||||
# QCLASS values (section 3.2.5)
|
||||
|
||||
ANY = 255 # any class
|
||||
|
||||
|
||||
# Construct reverse mapping dictionary
|
||||
|
||||
_names = dir()
|
||||
classmap = {}
|
||||
for _name in _names:
|
||||
if _name[0] != '_': classmap[eval(_name)] = _name
|
||||
|
||||
def classstr(klass):
|
||||
if klass in classmap: return classmap[klass]
|
||||
else: return repr(klass)
|
||||
|
|
@ -0,0 +1,809 @@
|
|||
# -*- encoding: utf-8 -*-
|
||||
"""
|
||||
$Id$
|
||||
|
||||
This file is part of the py3dns project.
|
||||
Homepage: https://launchpad.net/py3dns
|
||||
|
||||
This code is covered by the standard Python License. See LICENSE for details.
|
||||
|
||||
Changes for Python3 port © 2011-13 Scott Kitterman <scott@kitterman.com>
|
||||
|
||||
Library code. Largely this is packers and unpackers for various types.
|
||||
"""
|
||||
|
||||
#
|
||||
#
|
||||
# See RFC 1035:
|
||||
# ------------------------------------------------------------------------
|
||||
# Network Working Group P. Mockapetris
|
||||
# Request for Comments: 1035 ISI
|
||||
# November 1987
|
||||
# Obsoletes: RFCs 882, 883, 973
|
||||
#
|
||||
# DOMAIN NAMES - IMPLEMENTATION AND SPECIFICATION
|
||||
# ------------------------------------------------------------------------
|
||||
|
||||
|
||||
import types
|
||||
import socket
|
||||
|
||||
from . import Type
|
||||
from . import Class
|
||||
from . import Opcode
|
||||
from . import Status
|
||||
import DNS
|
||||
|
||||
from .Base import DNSError
|
||||
|
||||
try:
|
||||
import ipaddress
|
||||
except ImportError:
|
||||
import ipaddr as ipaddress
|
||||
|
||||
LABEL_UTF8 = False
|
||||
LABEL_ENCODING = 'idna'
|
||||
|
||||
class UnpackError(DNSError): pass
|
||||
class PackError(DNSError): pass
|
||||
|
||||
# Low-level 16 and 32 bit integer packing and unpacking
|
||||
|
||||
from struct import pack as struct_pack
|
||||
from struct import unpack as struct_unpack
|
||||
from socket import inet_ntoa, inet_aton, inet_ntop, AF_INET6
|
||||
|
||||
def pack16bit(n):
|
||||
return struct_pack('!H', n)
|
||||
|
||||
def pack32bit(n):
|
||||
return struct_pack('!L', n)
|
||||
|
||||
def unpack16bit(s):
|
||||
return struct_unpack('!H', s)[0]
|
||||
|
||||
def unpack32bit(s):
|
||||
return struct_unpack('!L', s)[0]
|
||||
|
||||
def addr2bin(addr):
|
||||
# Updated from pyspf
|
||||
"""Convert a string IPv4 address into an unsigned integer.
|
||||
|
||||
Examples::
|
||||
>>> addr2bin('127.0.0.1')
|
||||
2130706433
|
||||
|
||||
>>> addr2bin('127.0.0.1') == socket.INADDR_LOOPBACK
|
||||
1
|
||||
|
||||
>>> addr2bin('255.255.255.254')
|
||||
4294967294L
|
||||
|
||||
>>> addr2bin('192.168.0.1')
|
||||
3232235521L
|
||||
|
||||
Unlike old DNS.addr2bin, the n, n.n, and n.n.n forms for IP addresses
|
||||
are handled as well::
|
||||
>>> addr2bin('10.65536')
|
||||
167837696
|
||||
>>> 10 * (2 ** 24) + 65536
|
||||
167837696
|
||||
|
||||
>>> addr2bin('10.93.512')
|
||||
173867520
|
||||
>>> 10 * (2 ** 24) + 93 * (2 ** 16) + 512
|
||||
173867520
|
||||
"""
|
||||
return struct_unpack("!L", inet_aton(addr))[0]
|
||||
|
||||
def bin2addr(n):
|
||||
return inet_ntoa(struct_pack('!L', n))
|
||||
|
||||
def bin2addr6(n):
|
||||
return inet_ntop(AF_INET6, n)
|
||||
|
||||
def bin2long6(str):
|
||||
# Also from pyspf
|
||||
h, l = struct_unpack("!QQ", str)
|
||||
return h << 64 | l
|
||||
|
||||
# Packing class
|
||||
|
||||
class Packer:
|
||||
" packer base class. supports basic byte/16bit/32bit/addr/string/name "
|
||||
def __init__(self):
|
||||
if DNS.LABEL_UTF8:
|
||||
enc = 'utf8'
|
||||
else:
|
||||
enc = DNS.LABEL_ENCODING
|
||||
self.buf = bytes('', enc)
|
||||
self.index = {}
|
||||
def getbuf(self):
|
||||
return self.buf
|
||||
def addbyte(self, c):
|
||||
if len(c) != 1: raise TypeError('one character expected')
|
||||
if DNS.LABEL_UTF8:
|
||||
enc = 'utf8'
|
||||
else:
|
||||
enc = DNS.LABEL_ENCODING
|
||||
self.buf = self.buf + bytes(c,enc)
|
||||
def addbytes(self, abytes):
|
||||
if DNS.LABEL_UTF8:
|
||||
enc = 'utf8'
|
||||
else:
|
||||
enc = DNS.LABEL_ENCODING
|
||||
self.buf = self.buf + bytes(abytes, enc)
|
||||
def add16bit(self, n):
|
||||
self.buf = self.buf + bytes(pack16bit(n))
|
||||
def add32bit(self, n):
|
||||
self.buf = self.buf + bytes(pack32bit(n))
|
||||
def addaddr(self, addr):
|
||||
n = addr2bin(addr)
|
||||
self.buf = self.buf + bytes(pack32bit(n))
|
||||
def addstring(self, s):
|
||||
if len(s) > 255:
|
||||
raise ValueError("Can't encode string of length "+ \
|
||||
"%s (> 255)"%(len(s)))
|
||||
self.addbyte(chr(len(s)))
|
||||
self.addbytes(s)
|
||||
def addname(self, name):
|
||||
# Domain name packing (section 4.1.4)
|
||||
# Add a domain name to the buffer, possibly using pointers.
|
||||
# The case of the first occurrence of a name is preserved.
|
||||
# Redundant dots are ignored.
|
||||
nlist = []
|
||||
for label in name.split('.'):
|
||||
if not label:
|
||||
pass # Passing to ignore redundant dots per comments
|
||||
else:
|
||||
nlist.append(label)
|
||||
keys = []
|
||||
for i in range(len(nlist)):
|
||||
key = '.'.join(nlist[i:])
|
||||
key = key.upper()
|
||||
keys.append(key)
|
||||
if key in self.index:
|
||||
pointer = self.index[key]
|
||||
break
|
||||
else:
|
||||
i = len(nlist)
|
||||
pointer = None
|
||||
# Do it into temporaries first so exceptions don't
|
||||
# mess up self.index and self.buf
|
||||
offset = len(self.buf)
|
||||
index = []
|
||||
if DNS.LABEL_UTF8:
|
||||
enc = 'utf8'
|
||||
else:
|
||||
enc = DNS.LABEL_ENCODING
|
||||
buf = bytes('', enc)
|
||||
for j in range(i):
|
||||
label = nlist[j]
|
||||
try:
|
||||
label = label.encode(enc)
|
||||
except UnicodeEncodeError:
|
||||
if not DNS.LABEL_UTF8: raise
|
||||
if not label.startswith('\ufeff'):
|
||||
label = '\ufeff'+label
|
||||
label = label.encode(enc)
|
||||
n = len(label)
|
||||
if n > 63:
|
||||
raise PackError('label too long')
|
||||
if offset + len(buf) < 0x3FFF:
|
||||
index.append((keys[j], offset + len(buf)))
|
||||
else:
|
||||
print('DNS.Lib.Packer.addname:')
|
||||
print('warning: pointer too big')
|
||||
buf = buf + bytes([n]) + label
|
||||
if pointer:
|
||||
buf = buf + (pack16bit(pointer | 0xC000))
|
||||
else:
|
||||
buf = buf + bytes('\0', enc)
|
||||
self.buf = self.buf + buf
|
||||
for key, value in index:
|
||||
self.index[key] = value
|
||||
def dump(self):
|
||||
keys = list(self.index.keys())
|
||||
keys.sort()
|
||||
print('-'*40)
|
||||
for key in keys:
|
||||
print('%20s %3d' % (key, self.index[key]))
|
||||
print('-'*40)
|
||||
space = 1
|
||||
for i in range(0, len(self.buf)+1, 2):
|
||||
if self.buf[i:i+2] == '**':
|
||||
if not space: print()
|
||||
space = 1
|
||||
continue
|
||||
space = 0
|
||||
print('%4d' % i)
|
||||
for c in self.buf[i:i+2]:
|
||||
if ' ' < c < '\177':
|
||||
print(' %c' % c)
|
||||
else:
|
||||
print('%2d' % ord(c))
|
||||
print()
|
||||
print('-'*40)
|
||||
|
||||
|
||||
# Unpacking class
|
||||
|
||||
|
||||
class Unpacker:
|
||||
def __init__(self, buf):
|
||||
# buf should be binary in Python3
|
||||
self.buf = buf
|
||||
self.offset = 0
|
||||
def getbyte(self):
|
||||
if self.offset >= len(self.buf):
|
||||
raise UnpackError("Ran off end of data")
|
||||
c = self.buf[self.offset]
|
||||
self.offset = self.offset + 1
|
||||
return c
|
||||
def getbytes(self, n):
|
||||
s = (self.buf[self.offset : self.offset + n])
|
||||
if len(s) != n: raise UnpackError('not enough data left')
|
||||
self.offset = self.offset + n
|
||||
return s
|
||||
def get16bit(self):
|
||||
return unpack16bit(self.getbytes(2))
|
||||
def get32bit(self):
|
||||
return unpack32bit(self.getbytes(4))
|
||||
def getaddr(self):
|
||||
if DNS.LABEL_UTF8:
|
||||
enc = 'utf8'
|
||||
else:
|
||||
enc = DNS.LABEL_ENCODING
|
||||
return bytes(bin2addr(self.get32bit()),enc)
|
||||
def getaddr6(self):
|
||||
return (self.getbytes(16))
|
||||
def getstring(self):
|
||||
return self.getbytes(self.getbyte())
|
||||
def getname(self):
|
||||
# Domain name unpacking (section 4.1.4)
|
||||
i = self.getbyte()
|
||||
#i = ord(i)
|
||||
if i and i & 0xC0 == 0xC0:
|
||||
d = self.getbyte()
|
||||
j = d
|
||||
pointer = ((i<<8) | j) & ~0xC000
|
||||
save_offset = self.offset
|
||||
try:
|
||||
self.offset = pointer
|
||||
domain = self.getname()
|
||||
finally:
|
||||
self.offset = save_offset
|
||||
return domain
|
||||
if i == 0:
|
||||
return ''
|
||||
if DNS.LABEL_UTF8:
|
||||
enc = 'utf8'
|
||||
else:
|
||||
enc = DNS.LABEL_ENCODING
|
||||
domain = str(self.getbytes(i), enc)
|
||||
remains = self.getname()
|
||||
if not remains:
|
||||
return domain
|
||||
else:
|
||||
return domain + '.' + remains
|
||||
|
||||
# Test program for packin/unpacking (section 4.1.4)
|
||||
|
||||
def testpacker():
|
||||
N = 2500
|
||||
R = list(range(N))
|
||||
import timing
|
||||
# See section 4.1.4 of RFC 1035
|
||||
timing.start()
|
||||
for i in R:
|
||||
p = Packer()
|
||||
p.addaddr('192.168.0.1')
|
||||
p.addbytes('*' * 20)
|
||||
p.addname('f.ISI.ARPA')
|
||||
p.addbytes('*' * 8)
|
||||
p.addname('Foo.F.isi.arpa')
|
||||
p.addbytes('*' * 18)
|
||||
p.addname('arpa')
|
||||
p.addbytes('*' * 26)
|
||||
p.addname('')
|
||||
timing.finish()
|
||||
print(timing.milli(), "ms total for packing")
|
||||
print(round(timing.milli() / i, 4), 'ms per packing')
|
||||
#p.dump()
|
||||
u = Unpacker(p.buf)
|
||||
u.getaddr()
|
||||
u.getbytes(20)
|
||||
u.getname()
|
||||
u.getbytes(8)
|
||||
u.getname()
|
||||
u.getbytes(18)
|
||||
u.getname()
|
||||
u.getbytes(26)
|
||||
u.getname()
|
||||
timing.start()
|
||||
for i in R:
|
||||
u = Unpacker(p.buf)
|
||||
|
||||
res = (u.getaddr(),
|
||||
u.getbytes(20),
|
||||
u.getname(),
|
||||
u.getbytes(8),
|
||||
u.getname(),
|
||||
u.getbytes(18),
|
||||
u.getname(),
|
||||
u.getbytes(26),
|
||||
u.getname())
|
||||
timing.finish()
|
||||
print(timing.milli(), "ms total for unpacking")
|
||||
print(round(timing.milli() / i, 4), 'ms per unpacking')
|
||||
#for item in res: print item
|
||||
|
||||
|
||||
# Pack/unpack RR toplevel format (section 3.2.1)
|
||||
|
||||
class RRpacker(Packer):
|
||||
def __init__(self):
|
||||
Packer.__init__(self)
|
||||
self.rdstart = None
|
||||
def addRRheader(self, name, RRtype, klass, ttl, *rest):
|
||||
self.addname(name)
|
||||
self.add16bit(RRtype)
|
||||
self.add16bit(klass)
|
||||
self.add32bit(ttl)
|
||||
if rest:
|
||||
if rest[1:]: raise TypeError('too many args')
|
||||
rdlength = rest[0]
|
||||
else:
|
||||
rdlength = 0
|
||||
self.add16bit(rdlength)
|
||||
self.rdstart = len(self.buf)
|
||||
def patchrdlength(self):
|
||||
rdlength = unpack16bit(self.buf[self.rdstart-2:self.rdstart])
|
||||
if rdlength == len(self.buf) - self.rdstart:
|
||||
return
|
||||
rdata = self.buf[self.rdstart:]
|
||||
save_buf = self.buf
|
||||
ok = 0
|
||||
try:
|
||||
self.buf = self.buf[:self.rdstart-2]
|
||||
self.add16bit(len(rdata))
|
||||
self.buf = self.buf + rdata
|
||||
ok = 1
|
||||
finally:
|
||||
if not ok: self.buf = save_buf
|
||||
def endRR(self):
|
||||
if self.rdstart is not None:
|
||||
self.patchrdlength()
|
||||
self.rdstart = None
|
||||
def getbuf(self):
|
||||
if self.rdstart is not None: self.patchrdlength()
|
||||
return Packer.getbuf(self)
|
||||
# Standard RRs (section 3.3)
|
||||
def addCNAME(self, name, klass, ttl, cname):
|
||||
self.addRRheader(name, Type.CNAME, klass, ttl)
|
||||
self.addname(cname)
|
||||
self.endRR()
|
||||
def addHINFO(self, name, klass, ttl, cpu, os):
|
||||
self.addRRheader(name, Type.HINFO, klass, ttl)
|
||||
self.addstring(cpu)
|
||||
self.addstring(os)
|
||||
self.endRR()
|
||||
def addMX(self, name, klass, ttl, preference, exchange):
|
||||
self.addRRheader(name, Type.MX, klass, ttl)
|
||||
self.add16bit(preference)
|
||||
self.addname(exchange)
|
||||
self.endRR()
|
||||
def addNS(self, name, klass, ttl, nsdname):
|
||||
self.addRRheader(name, Type.NS, klass, ttl)
|
||||
self.addname(nsdname)
|
||||
self.endRR()
|
||||
def addPTR(self, name, klass, ttl, ptrdname):
|
||||
self.addRRheader(name, Type.PTR, klass, ttl)
|
||||
self.addname(ptrdname)
|
||||
self.endRR()
|
||||
def addSOA(self, name, klass, ttl,
|
||||
mname, rname, serial, refresh, retry, expire, minimum):
|
||||
self.addRRheader(name, Type.SOA, klass, ttl)
|
||||
self.addname(mname)
|
||||
self.addname(rname)
|
||||
self.add32bit(serial)
|
||||
self.add32bit(refresh)
|
||||
self.add32bit(retry)
|
||||
self.add32bit(expire)
|
||||
self.add32bit(minimum)
|
||||
self.endRR()
|
||||
def addTXT(self, name, klass, ttl, tlist):
|
||||
self.addRRheader(name, Type.TXT, klass, ttl)
|
||||
if type(tlist) is bytes or type(tlist) is str:
|
||||
tlist = [tlist]
|
||||
for txtdata in tlist:
|
||||
self.addstring(txtdata)
|
||||
self.endRR()
|
||||
def addSPF(self, name, klass, ttl, tlist):
|
||||
self.addRRheader(name, Type.TXT, klass, ttl)
|
||||
if type(tlist) is bytes or type(tlist) is str:
|
||||
tlist = [tlist]
|
||||
for txtdata in tlist:
|
||||
self.addstring(txtdata)
|
||||
self.endRR()
|
||||
# Internet specific RRs (section 3.4) -- class = IN
|
||||
def addA(self, name, klass, ttl, address):
|
||||
self.addRRheader(name, Type.A, klass, ttl)
|
||||
self.addaddr(address)
|
||||
self.endRR()
|
||||
def addWKS(self, name, ttl, address, protocol, bitmap):
|
||||
self.addRRheader(name, Type.WKS, Class.IN, ttl)
|
||||
self.addaddr(address)
|
||||
self.addbyte(chr(protocol))
|
||||
self.addbytes(bitmap)
|
||||
self.endRR()
|
||||
def addSRV(self):
|
||||
raise NotImplementedError
|
||||
|
||||
def prettyTime(seconds):
|
||||
if seconds<60:
|
||||
return seconds,"%d seconds"%(seconds)
|
||||
if seconds<3600:
|
||||
return seconds,"%d minutes"%(seconds/60)
|
||||
if seconds<86400:
|
||||
return seconds,"%d hours"%(seconds/3600)
|
||||
if seconds<604800:
|
||||
return seconds,"%d days"%(seconds/86400)
|
||||
else:
|
||||
return seconds,"%d weeks"%(seconds/604800)
|
||||
|
||||
class RRunpacker(Unpacker):
|
||||
def __init__(self, buf):
|
||||
Unpacker.__init__(self, buf)
|
||||
self.rdend = None
|
||||
def getRRheader(self):
|
||||
name = self.getname()
|
||||
rrtype = self.get16bit()
|
||||
klass = self.get16bit()
|
||||
ttl = self.get32bit()
|
||||
rdlength = self.get16bit()
|
||||
self.rdend = self.offset + rdlength
|
||||
return (name, rrtype, klass, ttl, rdlength)
|
||||
def endRR(self):
|
||||
if self.offset != self.rdend:
|
||||
raise UnpackError('end of RR not reached')
|
||||
def getCNAMEdata(self):
|
||||
return self.getname()
|
||||
def getHINFOdata(self):
|
||||
if DNS.LABEL_UTF8:
|
||||
enc = 'utf8'
|
||||
else:
|
||||
enc = DNS.LABEL_ENCODING
|
||||
return str(self.getstring(), enc), str(self.getstring(),enc)
|
||||
def getMXdata(self):
|
||||
return self.get16bit(), self.getname()
|
||||
def getNSdata(self):
|
||||
return self.getname()
|
||||
def getPTRdata(self):
|
||||
return self.getname()
|
||||
def getSOAdata(self):
|
||||
return self.getname(), \
|
||||
self.getname(), \
|
||||
('serial',)+(self.get32bit(),), \
|
||||
('refresh ',)+prettyTime(self.get32bit()), \
|
||||
('retry',)+prettyTime(self.get32bit()), \
|
||||
('expire',)+prettyTime(self.get32bit()), \
|
||||
('minimum',)+prettyTime(self.get32bit())
|
||||
def getTXTdata(self):
|
||||
tlist = []
|
||||
while self.offset != self.rdend:
|
||||
tlist.append(bytes(self.getstring()))
|
||||
return tlist
|
||||
getSPFdata = getTXTdata
|
||||
def getAdata(self):
|
||||
if DNS.LABEL_UTF8:
|
||||
enc = 'utf8'
|
||||
else:
|
||||
enc = DNS.LABEL_ENCODING
|
||||
return self.getaddr().decode(enc)
|
||||
def getWKSdata(self):
|
||||
address = self.getaddr()
|
||||
protocol = ord(self.getbyte())
|
||||
bitmap = self.getbytes(self.rdend - self.offset)
|
||||
return address, protocol, bitmap
|
||||
def getSRVdata(self):
|
||||
"""
|
||||
_Service._Proto.Name TTL Class SRV Priority Weight Port Target
|
||||
"""
|
||||
priority = self.get16bit()
|
||||
weight = self.get16bit()
|
||||
port = self.get16bit()
|
||||
target = self.getname()
|
||||
#print '***priority, weight, port, target', priority, weight, port, target
|
||||
return priority, weight, port, target
|
||||
|
||||
class RRunpackerDefault(RRunpacker):
|
||||
# Default for DNS.qry
|
||||
def __init__(self, buf):
|
||||
RRunpacker.__init__(self, buf)
|
||||
self.rdend = None
|
||||
def getAdata(self):
|
||||
if DNS.LABEL_UTF8:
|
||||
enc = 'utf8'
|
||||
else:
|
||||
enc = DNS.LABEL_ENCODING
|
||||
x = socket.inet_aton(self.getaddr().decode(enc))
|
||||
return ipaddress.IPv4Address(struct_unpack("!I", x)[0])
|
||||
def getAAAAdata(self):
|
||||
return ipaddress.IPv6Address(bin2addr6(self.getaddr6()))
|
||||
|
||||
class RRunpackerText(RRunpackerDefault):
|
||||
def __init__(self, buf):
|
||||
RRunpackerDefault.__init__(self, buf)
|
||||
def getAdata(self):
|
||||
if DNS.LABEL_UTF8:
|
||||
enc = 'utf8'
|
||||
else:
|
||||
enc = DNS.LABEL_ENCODING
|
||||
return self.getaddr().decode(enc)
|
||||
def getAAAAdata(self):
|
||||
return bin2addr6(self.getaddr6())
|
||||
def getTXTdata(self):
|
||||
if DNS.LABEL_UTF8:
|
||||
enc = 'utf8'
|
||||
else:
|
||||
enc = DNS.LABEL_ENCODING
|
||||
tlist = []
|
||||
while self.offset != self.rdend:
|
||||
tlist.append(str(self.getstring(), enc))
|
||||
return tlist
|
||||
|
||||
class RRunpackerInteger(RRunpackerDefault):
|
||||
def __init__(self, buf):
|
||||
RRunpackerDefault.__init__(self, buf)
|
||||
def getAdata(self):
|
||||
if DNS.LABEL_UTF8:
|
||||
enc = 'utf8'
|
||||
else:
|
||||
enc = DNS.LABEL_ENCODING
|
||||
x = socket.inet_aton(self.getaddr().decode(enc))
|
||||
return struct_unpack("!I", x)[0]
|
||||
def getAAAAdata(self):
|
||||
return bin2long6(self.getaddr6())
|
||||
|
||||
class RRunpackerBinary(Unpacker):
|
||||
def __init__(self, buf):
|
||||
Unpacker.__init__(self, buf)
|
||||
self.rdend = None
|
||||
def getRRheader(self):
|
||||
name = self.getname()
|
||||
rrtype = self.get16bit()
|
||||
klass = self.get16bit()
|
||||
ttl = self.get32bit()
|
||||
rdlength = self.get16bit()
|
||||
self.rdlength = rdlength
|
||||
self.rdend = self.offset + rdlength
|
||||
return (name, rrtype, klass, ttl, rdlength)
|
||||
def endRR(self):
|
||||
if self.offset != self.rdend:
|
||||
raise UnpackError('end of RR not reached')
|
||||
def getTXTdata(self):
|
||||
tlist = []
|
||||
while self.offset != self.rdend:
|
||||
tlist.append(self.getbytes(self.rdlength))
|
||||
return tlist
|
||||
getSPFdata = getTXTdata
|
||||
|
||||
# Pack/unpack Message Header (section 4.1)
|
||||
|
||||
class Hpacker(Packer):
|
||||
def addHeader(self, id, qr, opcode, aa, tc, rd, ra, z, rcode,
|
||||
qdcount, ancount, nscount, arcount):
|
||||
self.add16bit(id)
|
||||
self.add16bit((qr&1)<<15 | (opcode&0xF)<<11 | (aa&1)<<10
|
||||
| (tc&1)<<9 | (rd&1)<<8 | (ra&1)<<7
|
||||
| (z&7)<<4 | (rcode&0xF))
|
||||
self.add16bit(qdcount)
|
||||
self.add16bit(ancount)
|
||||
self.add16bit(nscount)
|
||||
self.add16bit(arcount)
|
||||
|
||||
class Hunpacker(Unpacker):
|
||||
def getHeader(self):
|
||||
id = self.get16bit()
|
||||
flags = self.get16bit()
|
||||
qr, opcode, aa, tc, rd, ra, z, rcode = (
|
||||
(flags>>15)&1,
|
||||
(flags>>11)&0xF,
|
||||
(flags>>10)&1,
|
||||
(flags>>9)&1,
|
||||
(flags>>8)&1,
|
||||
(flags>>7)&1,
|
||||
(flags>>4)&7,
|
||||
(flags>>0)&0xF)
|
||||
qdcount = self.get16bit()
|
||||
ancount = self.get16bit()
|
||||
nscount = self.get16bit()
|
||||
arcount = self.get16bit()
|
||||
return (id, qr, opcode, aa, tc, rd, ra, z, rcode,
|
||||
qdcount, ancount, nscount, arcount)
|
||||
|
||||
|
||||
# Pack/unpack Question (section 4.1.2)
|
||||
|
||||
class Qpacker(Packer):
|
||||
def addQuestion(self, qname, qtype, qclass):
|
||||
self.addname(qname)
|
||||
self.add16bit(qtype)
|
||||
self.add16bit(qclass)
|
||||
|
||||
class Qunpacker(Unpacker):
|
||||
def getQuestion(self):
|
||||
return self.getname(), self.get16bit(), self.get16bit()
|
||||
|
||||
|
||||
# Pack/unpack Message(section 4)
|
||||
# NB the order of the base classes is important for __init__()!
|
||||
|
||||
class Mpacker(RRpacker, Qpacker, Hpacker):
|
||||
pass
|
||||
|
||||
class Munpacker(RRunpacker, Qunpacker, Hunpacker):
|
||||
# Default results for DNS.req
|
||||
pass
|
||||
|
||||
class MunpackerDefault(RRunpackerDefault, Qunpacker, Hunpacker):
|
||||
# Default results for DNS.qry
|
||||
pass
|
||||
|
||||
class MunpackerText(RRunpackerText, Qunpacker, Hunpacker):
|
||||
pass
|
||||
|
||||
class MunpackerBinary(RRunpackerBinary, Qunpacker, Hunpacker):
|
||||
pass
|
||||
|
||||
class MunpackerInteger(RRunpackerInteger, Qunpacker, Hunpacker):
|
||||
pass
|
||||
|
||||
# Routines to print an unpacker to stdout, for debugging.
|
||||
# These affect the unpacker's current position!
|
||||
|
||||
def dumpM(u):
|
||||
print('HEADER:')
|
||||
(id, qr, opcode, aa, tc, rd, ra, z, rcode,
|
||||
qdcount, ancount, nscount, arcount) = u.getHeader()
|
||||
print('id=%d,' % id)
|
||||
print('qr=%d, opcode=%d, aa=%d, tc=%d, rd=%d, ra=%d, z=%d, rcode=%d,' \
|
||||
% (qr, opcode, aa, tc, rd, ra, z, rcode))
|
||||
if tc: print('*** response truncated! ***')
|
||||
if rcode: print('*** nonzero error code! (%d) ***' % rcode)
|
||||
print(' qdcount=%d, ancount=%d, nscount=%d, arcount=%d' \
|
||||
% (qdcount, ancount, nscount, arcount))
|
||||
for i in range(qdcount):
|
||||
print('QUESTION %d:' % i)
|
||||
dumpQ(u)
|
||||
for i in range(ancount):
|
||||
print('ANSWER %d:' % i)
|
||||
dumpRR(u)
|
||||
for i in range(nscount):
|
||||
print('AUTHORITY RECORD %d:' % i)
|
||||
dumpRR(u)
|
||||
for i in range(arcount):
|
||||
print('ADDITIONAL RECORD %d:' % i)
|
||||
dumpRR(u)
|
||||
|
||||
class DnsResult:
|
||||
|
||||
def __init__(self,u,args):
|
||||
self.header={}
|
||||
self.questions=[]
|
||||
self.answers=[]
|
||||
self.authority=[]
|
||||
self.additional=[]
|
||||
self.args=args
|
||||
self.storeM(u)
|
||||
|
||||
def show(self):
|
||||
import time
|
||||
print('; <<>> PDG.py 1.0 <<>> %s %s'%(self.args['name'],
|
||||
self.args['qtype']))
|
||||
opt=""
|
||||
if self.args['rd']:
|
||||
opt=opt+'recurs '
|
||||
h=self.header
|
||||
print(';; options: '+opt)
|
||||
print(';; got answer:')
|
||||
print(';; ->>HEADER<<- opcode %s, status %s, id %d'%(
|
||||
h['opcode'],h['status'],h['id']))
|
||||
flags=list(filter(lambda x,h=h:h[x],('qr','aa','rd','ra','tc')))
|
||||
print(';; flags: %s; Ques: %d, Ans: %d, Auth: %d, Addit: %d'%(
|
||||
' '.join(flags),h['qdcount'],h['ancount'],h['nscount'],
|
||||
h['arcount']))
|
||||
print(';; QUESTIONS:')
|
||||
for q in self.questions:
|
||||
print(';; %s, type = %s, class = %s'%(q['qname'],q['qtypestr'],
|
||||
q['qclassstr']))
|
||||
print()
|
||||
print(';; ANSWERS:')
|
||||
for a in self.answers:
|
||||
print('%-20s %-6s %-6s %s'%(a['name'],repr(a['ttl']),a['typename'],
|
||||
a['data']))
|
||||
print()
|
||||
print(';; AUTHORITY RECORDS:')
|
||||
for a in self.authority:
|
||||
print('%-20s %-6s %-6s %s'%(a['name'],repr(a['ttl']),a['typename'],
|
||||
a['data']))
|
||||
print()
|
||||
print(';; ADDITIONAL RECORDS:')
|
||||
for a in self.additional:
|
||||
print('%-20s %-6s %-6s %s'%(a['name'],repr(a['ttl']),a['typename'],
|
||||
a['data']))
|
||||
print()
|
||||
if 'elapsed' in self.args:
|
||||
print(';; Total query time: %d msec'%self.args['elapsed'])
|
||||
print(';; To SERVER: %s'%(self.args['server']))
|
||||
print(';; WHEN: %s'%time.ctime(time.time()))
|
||||
|
||||
def storeM(self,u):
|
||||
(self.header['id'], self.header['qr'], self.header['opcode'],
|
||||
self.header['aa'], self.header['tc'], self.header['rd'],
|
||||
self.header['ra'], self.header['z'], self.header['rcode'],
|
||||
self.header['qdcount'], self.header['ancount'],
|
||||
self.header['nscount'], self.header['arcount']) = u.getHeader()
|
||||
self.header['opcodestr']=Opcode.opcodestr(self.header['opcode'])
|
||||
self.header['status']=Status.statusstr(self.header['rcode'])
|
||||
for i in range(self.header['qdcount']):
|
||||
#print 'QUESTION %d:' % i,
|
||||
self.questions.append(self.storeQ(u))
|
||||
for i in range(self.header['ancount']):
|
||||
#print 'ANSWER %d:' % i,
|
||||
self.answers.append(self.storeRR(u))
|
||||
for i in range(self.header['nscount']):
|
||||
#print 'AUTHORITY RECORD %d:' % i,
|
||||
self.authority.append(self.storeRR(u))
|
||||
for i in range(self.header['arcount']):
|
||||
#print 'ADDITIONAL RECORD %d:' % i,
|
||||
self.additional.append(self.storeRR(u))
|
||||
|
||||
def storeQ(self,u):
|
||||
q={}
|
||||
q['qname'], q['qtype'], q['qclass'] = u.getQuestion()
|
||||
q['qtypestr']=Type.typestr(q['qtype'])
|
||||
q['qclassstr']=Class.classstr(q['qclass'])
|
||||
return q
|
||||
|
||||
def storeRR(self,u):
|
||||
r={}
|
||||
r['name'],r['type'],r['class'],r['ttl'],r['rdlength'] = u.getRRheader()
|
||||
r['typename'] = Type.typestr(r['type'])
|
||||
r['classstr'] = Class.classstr(r['class'])
|
||||
#print 'name=%s, type=%d(%s), class=%d(%s), ttl=%d' \
|
||||
# % (name,
|
||||
# type, typename,
|
||||
# klass, Class.classstr(class),
|
||||
# ttl)
|
||||
mname = 'get%sdata' % r['typename']
|
||||
if hasattr(u, mname):
|
||||
r['data']=getattr(u, mname)()
|
||||
else:
|
||||
r['data']=u.getbytes(r['rdlength'])
|
||||
return r
|
||||
|
||||
def dumpQ(u):
|
||||
qname, qtype, qclass = u.getQuestion()
|
||||
print('qname=%s, qtype=%d(%s), qclass=%d(%s)' \
|
||||
% (qname,
|
||||
qtype, Type.typestr(qtype),
|
||||
qclass, Class.classstr(qclass)))
|
||||
|
||||
def dumpRR(u):
|
||||
name, type, klass, ttl, rdlength = u.getRRheader()
|
||||
typename = Type.typestr(type)
|
||||
print('name=%s, type=%d(%s), class=%d(%s), ttl=%d' \
|
||||
% (name,
|
||||
type, typename,
|
||||
klass, Class.classstr(klass),
|
||||
ttl))
|
||||
mname = 'get%sdata' % typename
|
||||
if hasattr(u, mname):
|
||||
print(' formatted rdata:', getattr(u, mname)())
|
||||
else:
|
||||
print(' binary rdata:', u.getbytes(rdlength))
|
||||
|
||||
if __name__ == "__main__":
|
||||
testpacker()
|
|
@ -0,0 +1,30 @@
|
|||
"""
|
||||
$Id$
|
||||
|
||||
This file is part of the py3dns project.
|
||||
Homepage: https://launchpad.net/py3dns
|
||||
|
||||
This code is covered by the standard Python License. See LICENSE for details.
|
||||
|
||||
Opcode values in message header. RFC 1035, 1996, 2136.
|
||||
"""
|
||||
|
||||
|
||||
|
||||
QUERY = 0
|
||||
IQUERY = 1
|
||||
STATUS = 2
|
||||
NOTIFY = 4
|
||||
UPDATE = 5
|
||||
|
||||
# Construct reverse mapping dictionary
|
||||
|
||||
_names = dir()
|
||||
opcodemap = {}
|
||||
for _name in _names:
|
||||
if _name[0] != '_': opcodemap[eval(_name)] = _name
|
||||
|
||||
def opcodestr(opcode):
|
||||
if opcode in opcodemap: return opcodemap[opcode]
|
||||
else: return repr(opcode)
|
||||
|
|
@ -0,0 +1,41 @@
|
|||
"""
|
||||
$Id$
|
||||
|
||||
This file is part of the py3dns project.
|
||||
Homepage: https://launchpad.net/py3dns
|
||||
|
||||
This code is covered by the standard Python License. See LICENSE for details.
|
||||
|
||||
Status values in message header
|
||||
"""
|
||||
|
||||
NOERROR = 0 # No Error [RFC 1035]
|
||||
FORMERR = 1 # Format Error [RFC 1035]
|
||||
SERVFAIL = 2 # Server Failure [RFC 1035]
|
||||
NXDOMAIN = 3 # Non-Existent Domain [RFC 1035]
|
||||
NOTIMP = 4 # Not Implemented [RFC 1035]
|
||||
REFUSED = 5 # Query Refused [RFC 1035]
|
||||
YXDOMAIN = 6 # Name Exists when it should not [RFC 2136]
|
||||
YXRRSET = 7 # RR Set Exists when it should not [RFC 2136]
|
||||
NXRRSET = 8 # RR Set that should exist does not [RFC 2136]
|
||||
NOTAUTH = 9 # Server Not Authoritative for zone [RFC 2136]
|
||||
NOTZONE = 10 # Name not contained in zone [RFC 2136]
|
||||
BADVERS = 16 # Bad OPT Version [RFC 2671]
|
||||
BADSIG = 16 # TSIG Signature Failure [RFC 2845]
|
||||
BADKEY = 17 # Key not recognized [RFC 2845]
|
||||
BADTIME = 18 # Signature out of time window [RFC 2845]
|
||||
BADMODE = 19 # Bad TKEY Mode [RFC 2930]
|
||||
BADNAME = 20 # Duplicate key name [RFC 2930]
|
||||
BADALG = 21 # Algorithm not supported [RFC 2930]
|
||||
|
||||
# Construct reverse mapping dictionary
|
||||
|
||||
_names = dir()
|
||||
statusmap = {}
|
||||
for _name in _names:
|
||||
if _name[0] != '_': statusmap[eval(_name)] = _name
|
||||
|
||||
def statusstr(status):
|
||||
if status in statusmap: return statusmap[status]
|
||||
else: return repr(status)
|
||||
|
|
@ -0,0 +1,55 @@
|
|||
# -*- encoding: utf-8 -*-
|
||||
"""
|
||||
$Id$
|
||||
|
||||
This file is part of the py3dns project.
|
||||
Homepage: https://launchpad.net/py3dns
|
||||
|
||||
This code is covered by the standard Python License. See LICENSE for details.
|
||||
|
||||
TYPE values (section 3.2.2)
|
||||
"""
|
||||
|
||||
A = 1 # a host address
|
||||
NS = 2 # an authoritative name server
|
||||
MD = 3 # a mail destination (Obsolete - use MX)
|
||||
MF = 4 # a mail forwarder (Obsolete - use MX)
|
||||
CNAME = 5 # the canonical name for an alias
|
||||
SOA = 6 # marks the start of a zone of authority
|
||||
MB = 7 # a mailbox domain name (EXPERIMENTAL)
|
||||
MG = 8 # a mail group member (EXPERIMENTAL)
|
||||
MR = 9 # a mail rename domain name (EXPERIMENTAL)
|
||||
NULL = 10 # a null RR (EXPERIMENTAL)
|
||||
WKS = 11 # a well known service description
|
||||
PTR = 12 # a domain name pointer
|
||||
HINFO = 13 # host information
|
||||
MINFO = 14 # mailbox or mail list information
|
||||
MX = 15 # mail exchange
|
||||
TXT = 16 # text strings
|
||||
AAAA = 28 # IPv6 AAAA records (RFC 1886)
|
||||
SRV = 33 # DNS RR for specifying the location of services (RFC 2782)
|
||||
SPF = 99 # TXT RR for Sender Policy Framework
|
||||
|
||||
# Additional TYPE values from host.c source
|
||||
|
||||
UNAME = 110
|
||||
MP = 240
|
||||
|
||||
# QTYPE values (section 3.2.3)
|
||||
|
||||
AXFR = 252 # A request for a transfer of an entire zone
|
||||
MAILB = 253 # A request for mailbox-related records (MB, MG or MR)
|
||||
MAILA = 254 # A request for mail agent RRs (Obsolete - see MX)
|
||||
ANY = 255 # A request for all records
|
||||
|
||||
# Construct reverse mapping dictionary
|
||||
|
||||
_names = dir()
|
||||
typemap = {}
|
||||
for _name in _names:
|
||||
if _name[0] != '_': typemap[eval(_name)] = _name
|
||||
|
||||
def typestr(type):
|
||||
if type in typemap: return typemap[type]
|
||||
else: return repr(type)
|
||||
|
|
@ -0,0 +1,38 @@
|
|||
# -*- encoding: utf-8 -*-
|
||||
# $Id$
|
||||
#
|
||||
# This file is part of the py3dns project.
|
||||
# Homepage: https://launchpad.net/py3dns
|
||||
#
|
||||
# Changes for Python3 port © 2011 Scott Kitterman <scott@kitterman.com>
|
||||
#
|
||||
# This code is covered by the standard Python License. See LICENSE for details.
|
||||
|
||||
# __init__.py for DNS class.
|
||||
|
||||
__version__ = '3.2.1'
|
||||
|
||||
try:
|
||||
import ipaddress
|
||||
except ImportError:
|
||||
try:
|
||||
import ipaddr as ipaddress
|
||||
except ImportError:
|
||||
raise Exception("py3dns 3.1 requires either ipaddress (python3.3) or ipaddr, see CHANGES for 3.1.0")
|
||||
|
||||
from . import Type
|
||||
from . import Opcode
|
||||
from . import Status
|
||||
from . import Class
|
||||
from .Base import DnsRequest
|
||||
from .Base import DNSError
|
||||
from .Lib import DnsResult
|
||||
from .Base import *
|
||||
from .Lib import *
|
||||
Error=DNSError
|
||||
from .lazy import *
|
||||
Request = DnsRequest
|
||||
Result = DnsResult
|
||||
|
||||
Base._DiscoverNameServers()
|
||||
|
|
@ -0,0 +1,83 @@
|
|||
# $Id$
|
||||
#
|
||||
# This file is part of the pydns project.
|
||||
# Homepage: http://pydns.sourceforge.net
|
||||
#
|
||||
# This code is covered by the standard Python License. See LICENSE for details.
|
||||
#
|
||||
|
||||
# routines for lazy people.
|
||||
from . import Base
|
||||
from . Base import ServerError
|
||||
|
||||
class NoDataError(IndexError): pass
|
||||
class StatusError(IndexError): pass
|
||||
|
||||
def revlookup(name,timeout=30):
|
||||
"convenience routine for doing a reverse lookup of an address"
|
||||
if Base.defaults['server'] == []: Base.DiscoverNameServers()
|
||||
names = revlookupall(name, timeout)
|
||||
if not names: return None
|
||||
return names[0] # return shortest name
|
||||
|
||||
def revlookupall(name,timeout=30):
|
||||
"convenience routine for doing a reverse lookup of an address"
|
||||
# FIXME: check for IPv6
|
||||
a = name.split('.')
|
||||
a.reverse()
|
||||
b = '.'.join(a)+'.in-addr.arpa'
|
||||
qtype='ptr'
|
||||
names = dnslookup(b, qtype, timeout)
|
||||
# this will return all records.
|
||||
names.sort(key=str.__len__)
|
||||
return names
|
||||
|
||||
def dnslookup(name,qtype,timeout=30):
|
||||
"convenience routine to return just answer data for any query type"
|
||||
if Base.defaults['server'] == []: Base.DiscoverNameServers()
|
||||
result = Base.DnsRequest(name=name, qtype=qtype).req(timeout=timeout)
|
||||
if result.header['status'] != 'NOERROR':
|
||||
raise ServerError("DNS query status: %s" % result.header['status'],
|
||||
result.header['rcode'])
|
||||
elif len(result.answers) == 0 and Base.defaults['server_rotate']:
|
||||
# check with next DNS server
|
||||
result = Base.DnsRequest(name=name, qtype=qtype).req(timeout=timeout)
|
||||
if result.header['status'] != 'NOERROR':
|
||||
raise ServerError("DNS query status: %s" % result.header['status'],
|
||||
result.header['rcode'])
|
||||
return [x['data'] for x in result.answers]
|
||||
|
||||
def mxlookup(name,timeout=30):
|
||||
"""
|
||||
convenience routine for doing an MX lookup of a name. returns a
|
||||
sorted list of (preference, mail exchanger) records
|
||||
"""
|
||||
qtype = 'mx'
|
||||
l = dnslookup(name, qtype, timeout)
|
||||
return l
|
||||
|
||||
#
|
||||
# $Log$
|
||||
# Revision 1.5.2.1.2.2 2011/03/23 01:42:07 customdesigned
|
||||
# Changes from 2.3 branch
|
||||
#
|
||||
# Revision 1.5.2.1.2.1 2011/02/18 19:35:22 customdesigned
|
||||
# Python3 updates from Scott Kitterman
|
||||
#
|
||||
# Revision 1.5.2.1 2007/05/22 20:23:38 customdesigned
|
||||
# Lazy call to DiscoverNameServers
|
||||
#
|
||||
# Revision 1.5 2002/05/06 06:14:38 anthonybaxter
|
||||
# reformat, move import to top of file.
|
||||
#
|
||||
# Revision 1.4 2002/03/19 12:41:33 anthonybaxter
|
||||
# tabnannied and reindented everything. 4 space indent, no tabs.
|
||||
# yay.
|
||||
#
|
||||
# Revision 1.3 2001/08/09 09:08:55 anthonybaxter
|
||||
# added identifying header to top of each file
|
||||
#
|
||||
# Revision 1.2 2001/07/19 06:57:07 anthony
|
||||
# cvs keywords added
|
||||
#
|
||||
#
|
|
@ -0,0 +1,13 @@
|
|||
import unittest
|
||||
import importlib
|
||||
|
||||
def test_suite():
|
||||
module_names = [
|
||||
'.testPackers',
|
||||
'.test_base'
|
||||
]
|
||||
suites = []
|
||||
for m in module_names:
|
||||
module = importlib.import_module(m, 'DNS.tests')
|
||||
suites.append(module.test_suite())
|
||||
return unittest.TestSuite(suites)
|
|
@ -0,0 +1,430 @@
|
|||
#!/usr/bin/python3
|
||||
|
||||
#
|
||||
# Tests of the packet assembler/disassembler routines.
|
||||
#
|
||||
# only tests the simple packers for now. next is to test the
|
||||
# classes: Hpacker/Hunpacker,
|
||||
# Qpacker/Unpacker, then Mpacker/Munpacker
|
||||
#
|
||||
# Start doing unpleasant tests with broken data, truncations, that
|
||||
# sort of thing.
|
||||
|
||||
import sys ; sys.path.insert(0, '..')
|
||||
import DNS
|
||||
import socket
|
||||
import unittest
|
||||
|
||||
TestCompleted = "TestCompleted" # exc.
|
||||
|
||||
class Int16Packing(unittest.TestCase):
|
||||
knownValues = ( ( 10, b'\x00\n'),
|
||||
( 500, b'\x01\xf4' ),
|
||||
( 5340, b'\x14\xdc' ),
|
||||
( 51298, b'\xc8b'),
|
||||
( 65535, b'\xff\xff'),
|
||||
)
|
||||
|
||||
def test16bitPacking(self):
|
||||
""" pack16bit should give known output for known input """
|
||||
for i,s in self.knownValues:
|
||||
result = DNS.Lib.pack16bit(i)
|
||||
self.assertEqual(s,result)
|
||||
|
||||
def test16bitUnpacking(self):
|
||||
""" unpack16bit should give known output for known input """
|
||||
for i,s in self.knownValues:
|
||||
result = DNS.Lib.unpack16bit(s)
|
||||
self.assertEqual(i,result)
|
||||
|
||||
class Int32Packing(unittest.TestCase):
|
||||
knownValues = ( ( 10, b'\x00\x00\x00\n'),
|
||||
( 500, b'\x00\x00\x01\xf4' ),
|
||||
( 5340, b'\x00\x00\x14\xdc' ),
|
||||
( 51298, b'\x00\x00\xc8b'),
|
||||
( 65535, b'\x00\x00\xff\xff'),
|
||||
( 33265535, b'\x01\xfb\x97\x7f' ),
|
||||
( 147483647, b'\x08\xcak\xff' ),
|
||||
( 2147483647, b'\x7f\xff\xff\xff' ),
|
||||
)
|
||||
def test32bitPacking(self):
|
||||
""" pack32bit should give known output for known input """
|
||||
for i,s in self.knownValues:
|
||||
result = DNS.Lib.pack32bit(i)
|
||||
self.assertEqual(s,result)
|
||||
|
||||
def test32bitUnpacking(self):
|
||||
""" unpack32bit should give known output for known input """
|
||||
for i,s in self.knownValues:
|
||||
result = DNS.Lib.unpack32bit(s)
|
||||
self.assertEqual(i,result)
|
||||
|
||||
|
||||
class IPaddrPacking(unittest.TestCase):
|
||||
knownValues = (
|
||||
('127.0.0.1', 2130706433 ),
|
||||
('10.99.23.13', 174266125 ),
|
||||
('192.35.59.45', 3223534381), # Not signed anymore - it's all long now.
|
||||
('255.255.255.255', 4294967295) # No longer -1
|
||||
)
|
||||
|
||||
def testIPaddrPacking(self):
|
||||
""" addr2bin should give known output for known input """
|
||||
for i,s in self.knownValues:
|
||||
result = DNS.Lib.addr2bin(i)
|
||||
self.assertEqual(s,result)
|
||||
|
||||
def testIPaddrUnpacking(self):
|
||||
""" bin2addr should give known output for known input """
|
||||
for i,s in self.knownValues:
|
||||
result = DNS.Lib.bin2addr(s)
|
||||
self.assertEqual(i,result)
|
||||
|
||||
class PackerClassPacking(unittest.TestCase):
|
||||
knownPackValues = [
|
||||
( ['www.ekit.com'], b'\x03www\x04ekit\x03com\x00' ),
|
||||
( ['ns1.ekorp.com', 'ns2.ekorp.com', 'ns3.ekorp.com'],
|
||||
b'\x03ns1\x05ekorp\x03com\x00\x03ns2\xc0\x04\x03ns3\xc0\x04'),
|
||||
( ['a.root-servers.net.', 'b.root-servers.net.',
|
||||
'c.root-servers.net.', 'd.root-servers.net.',
|
||||
'e.root-servers.net.', 'f.root-servers.net.'],
|
||||
b'\x01a\x0croot-servers\x03net\x00\x01b\xc0\x02\x01c\xc0'+
|
||||
b'\x02\x01d\xc0\x02\x01e\xc0\x02\x01f\xc0\x02' ),
|
||||
]
|
||||
knownUnpackValues = [
|
||||
( ['www.ekit.com'], b'\x03www\x04ekit\x03com\x00' ),
|
||||
( ['ns1.ekorp.com', 'ns2.ekorp.com', 'ns3.ekorp.com'],
|
||||
b'\x03ns1\x05ekorp\x03com\x00\x03ns2\xc0\x04\x03ns3\xc0\x04'),
|
||||
( ['a.root-servers.net', 'b.root-servers.net',
|
||||
'c.root-servers.net', 'd.root-servers.net',
|
||||
'e.root-servers.net', 'f.root-servers.net'],
|
||||
b'\x01a\x0croot-servers\x03net\x00\x01b\xc0\x02\x01c\xc0'+
|
||||
b'\x02\x01d\xc0\x02\x01e\xc0\x02\x01f\xc0\x02' ),
|
||||
]
|
||||
|
||||
def testPackNames(self):
|
||||
from DNS.Lib import Packer
|
||||
for namelist,result in self.knownPackValues:
|
||||
p = Packer()
|
||||
for n in namelist:
|
||||
p.addname(n)
|
||||
self.assertEqual(p.getbuf(),result)
|
||||
|
||||
def testUnpackNames(self):
|
||||
from DNS.Lib import Unpacker
|
||||
for namelist,result in self.knownUnpackValues:
|
||||
u = Unpacker(result)
|
||||
names = []
|
||||
for i in range(len(namelist)):
|
||||
n = u.getname()
|
||||
names.append(n)
|
||||
self.assertEqual(names, namelist)
|
||||
|
||||
""" def testUnpackerLimitCheck(self):
|
||||
# FIXME: Don't understand what this test should do. If my guess is right,
|
||||
# then the code is working ~OK.
|
||||
from DNS.Lib import Unpacker
|
||||
u=Unpacker(b'\x03ns1\x05ekorp\x03com\x00\x03ns2\xc0\x04\x03ns3\xc0\x04')
|
||||
u.getname() ; u.getname() ; u.getname()
|
||||
# 4th call should fail
|
||||
self.assertRaises(IndexError, u.getname)"""
|
||||
|
||||
class testUnpackingMangled(unittest.TestCase):
|
||||
"addA(self, name, klass, ttl, address)"
|
||||
packerCorrect = b'\x05www02\x04ekit\x03com\x00\x00\x01\x00\x01\x00\x01Q\x80\x00\x04\xc0\xa8\n\x02'
|
||||
def testWithoutRR(self):
|
||||
u = DNS.Lib.RRunpacker(self.packerCorrect)
|
||||
u.getAdata()
|
||||
def testWithTwoRRs(self):
|
||||
u = DNS.Lib.RRunpacker(self.packerCorrect)
|
||||
u.getRRheader()
|
||||
self.assertRaises(DNS.Lib.UnpackError, u.getRRheader)
|
||||
def testWithNoGetData(self):
|
||||
u = DNS.Lib.RRunpacker(self.packerCorrect)
|
||||
u.getRRheader()
|
||||
self.assertRaises(DNS.Lib.UnpackError, u.endRR)
|
||||
|
||||
class PackerTestCase(unittest.TestCase):
|
||||
" base class for tests of Packing code. Laziness on my part, I know. "
|
||||
def setUp(self):
|
||||
self.RRpacker = DNS.Lib.RRpacker
|
||||
self.RRunpacker = DNS.Lib.RRunpacker
|
||||
|
||||
def testPacker(self):
|
||||
p = self.RRpacker()
|
||||
check = self.doPack(p)
|
||||
if (p is not None) and (check is not TestCompleted):
|
||||
return self.checkPackResult(p)
|
||||
|
||||
def checkPackResult(self, buf):
|
||||
if not hasattr(self, 'packerExpectedResult'):
|
||||
if self.__class__.__name__ != 'PackerTestCase':
|
||||
print("P***", self, repr(buf.getbuf())) #cheat testcase
|
||||
else:
|
||||
return self.assertEqual(buf.getbuf(),
|
||||
self.packerExpectedResult)
|
||||
|
||||
def checkUnpackResult(self, rrbits, specbits):
|
||||
if not hasattr(self, 'unpackerExpectedResult'):
|
||||
if self.__class__.__name__ != 'PackerTestCase':
|
||||
print("U***", self, repr((rrbits,specbits))) #cheat testcase
|
||||
else:
|
||||
return self.assertEqual((rrbits, specbits),
|
||||
self.unpackerExpectedResult)
|
||||
|
||||
def testUnpacker(self):
|
||||
if self.doUnpack is not None:
|
||||
if hasattr(self.__class__, 'doUnpack') \
|
||||
and hasattr(self, 'packerExpectedResult'):
|
||||
u = self.RRunpacker(self.packerExpectedResult)
|
||||
rrbits = u.getRRheader()[:4]
|
||||
specbits = self.doUnpack(u)
|
||||
try:
|
||||
u.endRR()
|
||||
except DNS.Lib.UnpackError:
|
||||
self.assertEqual(0, 'Not at end of RR!')
|
||||
return self.checkUnpackResult(rrbits, specbits)
|
||||
else:
|
||||
me = self.__class__.__name__
|
||||
if me != 'PackerTestCase':
|
||||
self.assertEquals(self.__class__.__name__,
|
||||
'Unpack NotImplemented')
|
||||
|
||||
def doPack(self, p):
|
||||
" stub. don't test the base class "
|
||||
return None
|
||||
|
||||
def doUnpack(self, p):
|
||||
" stub. don't test the base class "
|
||||
return None
|
||||
|
||||
|
||||
class testPackingOfCNAME(PackerTestCase):
|
||||
"addCNAME(self, name, klass, ttl, cname)"
|
||||
def doPack(self,p):
|
||||
p.addCNAME('www.sub.domain', DNS.Class.IN, 3600, 'realhost.sub.domain')
|
||||
def doUnpack(self, u):
|
||||
return u.getCNAMEdata()
|
||||
|
||||
unpackerExpectedResult = (('www.sub.domain', DNS.Type.CNAME, DNS.Class.IN, 3600), 'realhost.sub.domain')
|
||||
packerExpectedResult = \
|
||||
b'\x03www\x03sub\x06domain\x00\x00\x05\x00\x01\x00'+ \
|
||||
b'\x00\x0e\x10\x00\x0b\x08realhost\xc0\x04'
|
||||
|
||||
class testPackingOfCNAME2(PackerTestCase):
|
||||
"addCNAME(self, name, klass, ttl, cname)"
|
||||
def doPack(self,p):
|
||||
p.addCNAME('www.cust.com', DNS.Class.IN, 200, 'www023.big.isp.com')
|
||||
def doUnpack(self, u):
|
||||
return u.getCNAMEdata()
|
||||
unpackerExpectedResult = (('www.cust.com', DNS.Type.CNAME, DNS.Class.IN, 200), 'www023.big.isp.com')
|
||||
packerExpectedResult = \
|
||||
b'\x03www\x04cust\x03com\x00\x00\x05\x00\x01\x00'+ \
|
||||
b'\x00\x00\xc8\x00\x11\x06www023\x03big\x03isp\xc0\t'
|
||||
|
||||
class testPackingOfCNAME3(PackerTestCase):
|
||||
"addCNAME(self, name, klass, ttl, cname)"
|
||||
def doPack(self,p):
|
||||
p.addCNAME('www.fred.com', DNS.Class.IN, 86400, 'webhost.loa.com')
|
||||
def doUnpack(self, u):
|
||||
return u.getCNAMEdata()
|
||||
unpackerExpectedResult = (('www.fred.com', DNS.Type.CNAME, DNS.Class.IN, 86400), 'webhost.loa.com')
|
||||
packerExpectedResult = \
|
||||
b'\x03www\x04fred\x03com\x00\x00\x05\x00\x01\x00\x01Q'+ \
|
||||
b'\x80\x00\x0e\x07webhost\x03loa\xc0\t'
|
||||
|
||||
class testPackingOfHINFO(PackerTestCase):
|
||||
"addHINFO(self, name, klass, ttl, cpu, os)"
|
||||
def doPack(self,p):
|
||||
p.addHINFO('www.sub.domain.com', DNS.Class.IN, 3600, 'i686', 'linux')
|
||||
def doUnpack(self, u):
|
||||
return u.getHINFOdata()
|
||||
unpackerExpectedResult = (('www.sub.domain.com', 13, 1, 3600), ('i686', 'linux'))
|
||||
packerExpectedResult = \
|
||||
b'\x03www\x03sub\x06domain\x03com\x00\x00\r\x00\x01'+ \
|
||||
b'\x00\x00\x0e\x10\x00\x0b\x04i686\x05linux'
|
||||
|
||||
class testPackingOfHINFO2(PackerTestCase):
|
||||
"addHINFO(self, name, klass, ttl, cpu, os)"
|
||||
def doPack(self,p):
|
||||
p.addHINFO('core1.lax.foo.com', DNS.Class.IN, 3600, 'cisco', 'ios')
|
||||
def doUnpack(self, u):
|
||||
return u.getHINFOdata()
|
||||
unpackerExpectedResult = (('core1.lax.foo.com', 13, 1, 3600), ('cisco', 'ios'))
|
||||
packerExpectedResult = \
|
||||
b'\x05core1\x03lax\x03foo\x03com\x00\x00\r\x00\x01'+ \
|
||||
b'\x00\x00\x0e\x10\x00\n\x05cisco\x03ios'
|
||||
|
||||
class testPackingOfMX(PackerTestCase):
|
||||
"addMX(self, name, klass, ttl, preference, exchange)"
|
||||
def doPack(self, p):
|
||||
p.addMX('sub.domain.com', DNS.Class.IN, 86400, 10, 'mailhost1.isp.com')
|
||||
def doUnpack(self, u):
|
||||
return u.getMXdata()
|
||||
packerExpectedResult = \
|
||||
b'\x03sub\x06domain\x03com\x00\x00\x0f\x00\x01'+ \
|
||||
b'\x00\x01Q\x80\x00\x12\x00\n\tmailhost1\x03isp\xc0\x0b'
|
||||
unpackerExpectedResult = (('sub.domain.com', 15, 1, 86400), (10, 'mailhost1.isp.com'))
|
||||
|
||||
class testPackingOfMX2(PackerTestCase):
|
||||
"addMX(self, name, klass, ttl, preference, exchange)"
|
||||
def doPack(self, p):
|
||||
p.addMX('ekit-inc.com.', DNS.Class.IN, 86400, 10, 'mx1.ekorp.com')
|
||||
p.addMX('ekit-inc.com.', DNS.Class.IN, 86400, 20, 'mx2.ekorp.com')
|
||||
p.addMX('ekit-inc.com.', DNS.Class.IN, 86400, 30, 'mx3.ekorp.com')
|
||||
def doUnpack(self, u):
|
||||
res = [u.getMXdata(),]
|
||||
dummy = u.getRRheader()[:4]
|
||||
res += u.getMXdata()
|
||||
dummy = u.getRRheader()[:4]
|
||||
res += u.getMXdata()
|
||||
return res
|
||||
unpackerExpectedResult = (('ekit-inc.com', 15, 1, 86400), [(10, 'mx1.ekorp.com'), 20, 'mx2.ekorp.com', 30, 'mx3.ekorp.com'])
|
||||
packerExpectedResult = \
|
||||
b'\x08ekit-inc\x03com\x00\x00\x0f\x00\x01\x00\x01Q\x80\x00'+\
|
||||
b'\x0e\x00\n\x03mx1\x05ekorp\xc0\t\x00\x00\x0f\x00\x01\x00'+\
|
||||
b'\x01Q\x80\x00\x08\x00\x14\x03mx2\xc0\x1e\x00\x00\x0f\x00'+\
|
||||
b'\x01\x00\x01Q\x80\x00\x08\x00\x1e\x03mx3\xc0\x1e'
|
||||
|
||||
class testPackingOfNS(PackerTestCase):
|
||||
"addNS(self, name, klass, ttl, nsdname)"
|
||||
def doPack(self, p):
|
||||
p.addNS('ekit-inc.com', DNS.Class.IN, 86400, 'ns1.ekorp.com')
|
||||
def doUnpack(self, u):
|
||||
return u.getNSdata()
|
||||
unpackerExpectedResult = (('ekit-inc.com', 2, 1, 86400), 'ns1.ekorp.com')
|
||||
packerExpectedResult = b'\x08ekit-inc\x03com\x00\x00\x02\x00\x01\x00\x01Q\x80\x00\x0c\x03ns1\x05ekorp\xc0\t'
|
||||
|
||||
class testPackingOfPTR(PackerTestCase):
|
||||
"addPTR(self, name, klass, ttl, ptrdname)"
|
||||
def doPack(self, p):
|
||||
p.addPTR('www.ekit-inc.com', DNS.Class.IN, 3600, 'www-real01.ekorp.com')
|
||||
def doUnpack(self, u):
|
||||
return u.getPTRdata()
|
||||
unpackerExpectedResult = (('www.ekit-inc.com', 12, 1, 3600), 'www-real01.ekorp.com')
|
||||
packerExpectedResult = b'\x03www\x08ekit-inc\x03com\x00\x00\x0c\x00\x01\x00\x00\x0e\x10\x00\x13\nwww-real01\x05ekorp\xc0\r'
|
||||
|
||||
class testPackingOfSOA(PackerTestCase):
|
||||
"""addSOA(self, name, klass, ttl, mname,
|
||||
rname, serial, refresh, retry, expire, minimum)"""
|
||||
def doPack(self, p):
|
||||
p.addSOA('ekit-inc.com', DNS.Class.IN, 3600, 'ns1.ekorp.com',
|
||||
'hostmaster.ekit-inc.com', 2002020301, 100, 200, 300, 400)
|
||||
def doUnpack(self, u):
|
||||
return u.getSOAdata()
|
||||
unpackerExpectedResult = (('ekit-inc.com', 6, 1, 3600), ('ns1.ekorp.com', 'hostmaster', ('serial', 2002020301), ('refresh ', 100, '1 minutes'), ('retry', 200, '3 minutes'), ('expire', 300, '5 minutes'), ('minimum', 400, '6 minutes')))
|
||||
packerExpectedResult = b'\x08ekit-inc\x03com\x00\x00\x06\x00\x01\x00\x00\x0e\x10\x00,\x03ns1\x05ekorp\xc0\t\nhostmaster\x00wTg\xcd\x00\x00\x00d\x00\x00\x00\xc8\x00\x00\x01,\x00\x00\x01\x90'
|
||||
|
||||
|
||||
class testPackingOfA(PackerTestCase):
|
||||
"addA(self, name, klass, ttl, address)"
|
||||
def doPack(self, p):
|
||||
p.addA('www02.ekit.com', DNS.Class.IN, 86400, '192.168.10.2')
|
||||
def doUnpack(self, u):
|
||||
return u.getAdata()
|
||||
unpackerExpectedResult = (('www02.ekit.com', 1, 1, 86400), '192.168.10.2')
|
||||
packerExpectedResult = b'\x05www02\x04ekit\x03com\x00\x00\x01\x00\x01\x00\x01Q\x80\x00\x04\xc0\xa8\n\x02'
|
||||
|
||||
class testPackingOfA2(PackerTestCase):
|
||||
"addA(self, name, ttl, address)"
|
||||
def doPack(self, p):
|
||||
p.addA('www.ekit.com', DNS.Class.IN, 86400, '10.98.1.0')
|
||||
def doUnpack(self, u):
|
||||
return u.getAdata()
|
||||
unpackerExpectedResult = (('www.ekit.com', 1, 1, 86400), '10.98.1.0')
|
||||
packerExpectedResult = b'\x03www\x04ekit\x03com\x00\x00\x01\x00\x01\x00\x01Q\x80\x00\x04\nb\x01\x00'
|
||||
|
||||
class testPackingOfA3(PackerTestCase):
|
||||
"addA(self, name, ttl, address)"
|
||||
def doPack(self, p):
|
||||
p.addA('www.zol.com', DNS.Class.IN, 86400, '192.168.10.4')
|
||||
p.addA('www.zol.com', DNS.Class.IN, 86400, '192.168.10.3')
|
||||
p.addA('www.zol.com', DNS.Class.IN, 86400, '192.168.10.2')
|
||||
p.addA('www.zol.com', DNS.Class.IN, 86400, '192.168.10.1')
|
||||
def doUnpack(self, u):
|
||||
u1,d1,u2,d2,u3,d3,u4=u.getAdata(),u.getRRheader(),u.getAdata(),u.getRRheader(),u.getAdata(),u.getRRheader(),u.getAdata()
|
||||
return u1,u2,u3,u4
|
||||
unpackerExpectedResult = (('www.zol.com', 1, 1, 86400), ('192.168.10.4', '192.168.10.3', '192.168.10.2', '192.168.10.1'))
|
||||
packerExpectedResult = b'\x03www\x03zol\x03com\x00\x00\x01\x00\x01\x00\x01Q\x80\x00\x04\xc0\xa8\n\x04\x00\x00\x01\x00\x01\x00\x01Q\x80\x00\x04\xc0\xa8\n\x03\x00\x00\x01\x00\x01\x00\x01Q\x80\x00\x04\xc0\xa8\n\x02\x00\x00\x01\x00\x01\x00\x01Q\x80\x00\x04\xc0\xa8\n\x01'
|
||||
|
||||
class testPackingOfTXT(PackerTestCase):
|
||||
"addTXT(self, name, klass, ttl, list)"
|
||||
def doPack(self, p):
|
||||
p.addTXT('ekit-inc.com', DNS.Class.IN, 3600, 'this is a text record')
|
||||
def doUnpack(self, u):
|
||||
return u.getTXTdata()
|
||||
packerExpectedResult = b'\x08ekit-inc\x03com\x00\x00\x10\x00\x01\x00\x00\x0e\x10\x00\x16\x15this is a text record'
|
||||
unpackerExpectedResult = (('ekit-inc.com', 16, 1, 3600), [b'this is a text record'])
|
||||
|
||||
# check what the maximum/minimum &c of TXT records are.
|
||||
class testPackingOfTXT2(PackerTestCase):
|
||||
"addTXT(self, name, klass, ttl, list)"
|
||||
def doPack(self, p):
|
||||
f = lambda p=p:p.addTXT('ekit-inc.com', DNS.Class.IN, 3600, 'the quick brown fox jumped over the lazy brown dog\n'*20)
|
||||
self.assertRaises(ValueError, f)
|
||||
return TestCompleted
|
||||
doUnpack = None
|
||||
|
||||
class testPackingOfAAAAText(PackerTestCase):
|
||||
"addAAAA(self, name, klass, ttl, address)"
|
||||
def setUp(self):
|
||||
self.RRpacker = DNS.Lib.RRpacker
|
||||
self.RRunpacker = DNS.Lib.RRunpackerText
|
||||
|
||||
def doPack(self, p):
|
||||
addAAAA(p, 'google.com', DNS.Class.IN, 4, '2607:f8b0:4005:802::1005')
|
||||
def doUnpack(self, u):
|
||||
r = u.getAAAAdata()
|
||||
return r
|
||||
packerExpectedResult = b'\x06google\x03com\x00\x00\x1c\x00\x01\x00\x00\x00\x04\x00\x10&\x07\xf8\xb0@\x05\x08\x02\x00\x00\x00\x00\x00\x00\x10\x05'
|
||||
unpackerExpectedResult = (('google.com', DNS.Type.AAAA, DNS.Class.IN, 4), '2607:f8b0:4005:802::1005')
|
||||
|
||||
class testPackingOfAAAABinary(PackerTestCase):
|
||||
"addAAAA(self, name, klass, ttl, address)"
|
||||
def setUp(self):
|
||||
self.RRpacker = DNS.Lib.RRpacker
|
||||
self.RRunpacker = DNS.Lib.RRunpackerBinary
|
||||
|
||||
def doPack(self, p):
|
||||
addAAAA(p, 'google.com', DNS.Class.IN, 4, '2607:f8b0:4005:802::1005')
|
||||
def doUnpack(self, u):
|
||||
self.assertFalse(hasattr(u, "getAAAAdata"))
|
||||
r = u.getbytes(16)
|
||||
return r
|
||||
packerExpectedResult = b'\x06google\x03com\x00\x00\x1c\x00\x01\x00\x00\x00\x04\x00\x10&\x07\xf8\xb0@\x05\x08\x02\x00\x00\x00\x00\x00\x00\x10\x05'
|
||||
unpackerExpectedResult = (('google.com', DNS.Type.AAAA, DNS.Class.IN, 4), b'&\x07\xf8\xb0@\x05\x08\x02\x00\x00\x00\x00\x00\x00\x10\x05')
|
||||
|
||||
class testPackingOfAAAAInteger(PackerTestCase):
|
||||
"addAAAA(self, name, klass, ttl, address)"
|
||||
def setUp(self):
|
||||
self.RRpacker = DNS.Lib.RRpacker
|
||||
self.RRunpacker = DNS.Lib.RRunpackerInteger
|
||||
|
||||
def doPack(self, p):
|
||||
addAAAA(p, 'google.com', DNS.Class.IN, 4, '2607:f8b0:4005:802::1005')
|
||||
def doUnpack(self, u):
|
||||
r = u.getAAAAdata()
|
||||
return r
|
||||
packerExpectedResult = b'\x06google\x03com\x00\x00\x1c\x00\x01\x00\x00\x00\x04\x00\x10&\x07\xf8\xb0@\x05\x08\x02\x00\x00\x00\x00\x00\x00\x10\x05'
|
||||
unpackerExpectedResult = (('google.com', DNS.Type.AAAA, DNS.Class.IN, 4), 50552053919387978162022445795852161029)
|
||||
|
||||
def addAAAA(p, name, klass, ttl, address):
|
||||
"""Add AAAA record to a packer.
|
||||
"""
|
||||
addr_buf = socket.inet_pton(socket.AF_INET6, address)
|
||||
p.addRRheader(name, DNS.Type.AAAA, klass, ttl)
|
||||
p.buf = p.buf + addr_buf
|
||||
p.endRR()
|
||||
return p
|
||||
|
||||
#class testPackingOfQuestion(PackerTestCase):
|
||||
# "addQuestion(self, qname, qtype, qclass)"
|
||||
# def doPack(self, p):
|
||||
# self.assertEquals(0,"NotImplemented")
|
||||
|
||||
def test_suite():
|
||||
from unittest import TestLoader
|
||||
return TestLoader().loadTestsFromName(__name__)
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
|
@ -0,0 +1,260 @@
|
|||
#!/usr/bin/python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import DNS
|
||||
import unittest
|
||||
try:
|
||||
import ipaddress
|
||||
except ImportError:
|
||||
import ipaddr as ipaddress
|
||||
|
||||
def assertIsByte(b):
|
||||
assert b >= 0
|
||||
assert b <= 255
|
||||
|
||||
class TestBase(unittest.TestCase):
|
||||
def testParseResolvConf(self):
|
||||
# reset elments set by Base._DiscoverNameServers
|
||||
DNS.defaults['server'] = []
|
||||
if 'domain' in DNS.defaults:
|
||||
del DNS.defaults['domain']
|
||||
self.assertEqual(len(DNS.defaults['server']), 0)
|
||||
resolv = ['# a comment',
|
||||
'domain example.org',
|
||||
'nameserver 127.0.0.1',
|
||||
]
|
||||
DNS.ParseResolvConfFromIterable(resolv)
|
||||
self.assertEqual(len(DNS.defaults['server']), 1)
|
||||
self.assertEqual(DNS.defaults['server'][0], '127.0.0.1')
|
||||
self.assertEqual(DNS.defaults['domain'], 'example.org')
|
||||
|
||||
def testDnsRequestA(self):
|
||||
# try with asking for strings, and asking for bytes
|
||||
dnsobj = DNS.DnsRequest('example.org')
|
||||
|
||||
a_response = dnsobj.qry(qtype='A', resulttype='text', timeout=1)
|
||||
self.assertTrue(a_response.answers)
|
||||
# is the result vaguely ipv4 like?
|
||||
self.assertEqual(a_response.answers[0]['data'].count('.'), 3)
|
||||
self.assertEqual(a_response.answers[0]['data'],'93.184.216.34')
|
||||
|
||||
# Default result type for .qry object is an ipaddress object
|
||||
ad_response = dnsobj.qry(qtype='A', timeout=1)
|
||||
self.assertTrue(ad_response.answers)
|
||||
self.assertEqual(ad_response.answers[0]['data'],ipaddress.IPv4Address('93.184.216.34'))
|
||||
|
||||
ab_response = dnsobj.qry(qtype='A', resulttype='binary', timeout=1)
|
||||
self.assertTrue(ab_response.answers)
|
||||
# is the result ipv4 binary like?
|
||||
self.assertEqual(len(ab_response.answers[0]['data']), 4)
|
||||
for b in ab_response.answers[0]['data']:
|
||||
assertIsByte(b)
|
||||
self.assertEqual(ab_response.answers[0]['data'],b']\xb8\xd8\"')
|
||||
|
||||
ai_response = dnsobj.qry(qtype='A', resulttype='integer', timeout=1)
|
||||
self.assertTrue(ai_response.answers)
|
||||
self.assertEqual(ai_response.answers[0]['data'],1572395042)
|
||||
|
||||
|
||||
def testDnsRequestAAAA(self):
|
||||
dnsobj = DNS.DnsRequest('example.org')
|
||||
|
||||
aaaa_response = dnsobj.qry(qtype='AAAA', resulttype='text', timeout=1)
|
||||
self.assertTrue(aaaa_response.answers)
|
||||
# does the result look like an ipv6 address?
|
||||
self.assertTrue(':' in aaaa_response.answers[0]['data'])
|
||||
self.assertEqual(aaaa_response.answers[0]['data'],'2606:2800:220:1:248:1893:25c8:1946')
|
||||
|
||||
# default is returning ipaddress object
|
||||
aaaad_response = dnsobj.qry(qtype='AAAA', timeout=1)
|
||||
self.assertTrue(aaaad_response.answers)
|
||||
self.assertEqual(aaaad_response.answers[0]['data'],ipaddress.IPv6Address('2606:2800:220:1:248:1893:25c8:1946'))
|
||||
|
||||
aaaab_response = dnsobj.qry(qtype='AAAA', resulttype='binary', timeout=1)
|
||||
self.assertTrue(aaaab_response.answers)
|
||||
# is it ipv6 looking?
|
||||
self.assertEqual(len(aaaab_response.answers[0]['data']) , 16)
|
||||
for b in aaaab_response.answers[0]['data']:
|
||||
assertIsByte(b)
|
||||
self.assertEqual(aaaab_response.answers[0]['data'],b'&\x06(\x00\x02 \x00\x01\x02H\x18\x93%\xc8\x19F')
|
||||
# IPv6 decimal
|
||||
aaaai_response = dnsobj.qry(qtype='AAAA', resulttype='integer', timeout=1)
|
||||
self.assertTrue(aaaai_response.answers)
|
||||
self.assertEqual(aaaai_response.answers[0]['data'], 50542628918019813867414319910101719366)
|
||||
|
||||
def testDnsRequestEmptyMX(self):
|
||||
dnsobj = DNS.DnsRequest('example.org')
|
||||
|
||||
mx_empty_response = dnsobj.qry(qtype='MX', timeout=1)
|
||||
self.assertFalse(mx_empty_response.answers)
|
||||
|
||||
def testDnsRequestMX(self):
|
||||
dnsobj = DNS.DnsRequest('ietf.org')
|
||||
mx_response = dnsobj.qry(qtype='MX', timeout=1)
|
||||
self.assertTrue(mx_response.answers[0])
|
||||
# is hard coding a remote address a good idea?
|
||||
# I think it's unavoidable. - sk
|
||||
self.assertEqual(mx_response.answers[0]['data'], (0, 'mail.ietf.org'))
|
||||
|
||||
m = DNS.mxlookup('ietf.org', timeout=1)
|
||||
self.assertEqual(mx_response.answers[0]['data'], m[0])
|
||||
|
||||
def testDnsRequestSrv(self):
|
||||
dnsobj = DNS.Request(qtype='srv')
|
||||
respdef = dnsobj.qry('_ldap._tcp.openldap.org', timeout=1)
|
||||
self.assertTrue(respdef.answers)
|
||||
data = respdef.answers[0]['data']
|
||||
self.assertEqual(len(data), 4)
|
||||
self.assertEqual(data[2], 389)
|
||||
self.assertTrue('openldap.org' in data[3])
|
||||
|
||||
def testDkimRequest(self):
|
||||
q = '20161025._domainkey.google.com'
|
||||
dnsobj = DNS.Request(q, qtype='txt')
|
||||
resp = dnsobj.qry(timeout=1)
|
||||
|
||||
self.assertTrue(resp.answers)
|
||||
# should the result be bytes or a string? (Bytes, we finally settled on bytes)
|
||||
data = resp.answers[0]['data']
|
||||
self.assertFalse(isinstance(data[0], str))
|
||||
self.assertTrue(data[0].startswith(b'k=rsa'))
|
||||
|
||||
def testDNSRequestTXT(self):
|
||||
dnsobj = DNS.DnsRequest('fail.kitterman.org')
|
||||
|
||||
respdef = dnsobj.qry(qtype='TXT', timeout=1)
|
||||
self.assertTrue(respdef.answers)
|
||||
data = respdef.answers[0]['data']
|
||||
self.assertEqual(data, [b'v=spf1 -all'])
|
||||
|
||||
resptext = dnsobj.qry(qtype='TXT', resulttype='text', timeout=1)
|
||||
self.assertTrue(resptext.answers)
|
||||
data = resptext.answers[0]['data']
|
||||
self.assertEqual(data, ['v=spf1 -all'])
|
||||
|
||||
respbin = dnsobj.qry(qtype='TXT', resulttype='binary', timeout=1)
|
||||
self.assertTrue(respbin.answers)
|
||||
data = respbin.answers[0]['data']
|
||||
self.assertEqual(data, [b'\x0bv=spf1 -all'])
|
||||
|
||||
def testIDN(self):
|
||||
"""Can we lookup an internationalized domain name?"""
|
||||
dnsobj = DNS.DnsRequest('xn--bb-eka.at')
|
||||
unidnsobj = DNS.DnsRequest('öbb.at')
|
||||
a_resp = dnsobj.qry(qtype='A', resulttype='text', timeout=1)
|
||||
ua_resp = unidnsobj.qry(qtype='A', resulttype='text', timeout=1)
|
||||
self.assertTrue(a_resp.answers)
|
||||
self.assertTrue(ua_resp.answers)
|
||||
self.assertEqual(ua_resp.answers[0]['data'],
|
||||
a_resp.answers[0]['data'])
|
||||
|
||||
def testNS(self):
|
||||
"""Lookup NS record from SOA"""
|
||||
dnsob = DNS.DnsRequest('kitterman.com')
|
||||
resp = dnsob.qry(qtype='SOA', timeout=1)
|
||||
self.assertTrue(resp.answers)
|
||||
primary = resp.answers[0]['data'][0]
|
||||
self.assertEqual(primary, 'ns1.pairnic.com')
|
||||
resp = dnsob.qry(qtype='NS',server=primary,aa=1)
|
||||
nslist = [x['data'].lower() for x in resp.answers]
|
||||
nslist.sort()
|
||||
self.assertEqual(nslist, ['ns1.pairnic.com', 'ns2.pairnic.com'])
|
||||
|
||||
# Test defaults with legacy DNS.req
|
||||
|
||||
def testDnsRequestAD(self):
|
||||
# try with asking for strings, and asking for bytes
|
||||
dnsob = DNS.DnsRequest('example.org')
|
||||
|
||||
ad_response = dnsob.req(qtype='A', timeout=1)
|
||||
self.assertTrue(ad_response.answers)
|
||||
# is the result vaguely ipv4 like?
|
||||
self.assertEqual(ad_response.answers[0]['data'].count('.'), 3)
|
||||
self.assertEqual(ad_response.answers[0]['data'],'93.184.216.34')
|
||||
|
||||
def testDnsRequestAAAAD(self):
|
||||
dnsob = DNS.DnsRequest('example.org')
|
||||
|
||||
# default is returning binary instead of text
|
||||
aaaad_response = dnsob.req(qtype='AAAA', timeout=1)
|
||||
self.assertTrue(aaaad_response.answers)
|
||||
# does the result look like a binary ipv6 address?
|
||||
self.assertEqual(len(aaaad_response.answers[0]['data']) , 16)
|
||||
for b in aaaad_response.answers[0]['data']:
|
||||
assertIsByte(b)
|
||||
self.assertEqual(aaaad_response.answers[0]['data'],b'&\x06(\x00\x02 \x00\x01\x02H\x18\x93%\xc8\x19F')
|
||||
|
||||
def testDnsRequestEmptyMXD(self):
|
||||
dnsob = DNS.DnsRequest('example.org')
|
||||
|
||||
mx_empty_response = dnsob.req(qtype='MX', timeout=1)
|
||||
self.assertFalse(mx_empty_response.answers)
|
||||
|
||||
def testDnsRequestMXD(self):
|
||||
dnsob = DNS.DnsRequest('ietf.org')
|
||||
mx_response = dnsob.req(qtype='MX', timeout=1)
|
||||
self.assertTrue(mx_response.answers[0])
|
||||
# is hard coding a remote address a good idea?
|
||||
# I think it's unavoidable. - sk
|
||||
self.assertEqual(mx_response.answers[0]['data'], (0, 'mail.ietf.org'))
|
||||
|
||||
m = DNS.mxlookup('ietf.org', timeout=1)
|
||||
self.assertEqual(mx_response.answers[0]['data'], m[0])
|
||||
|
||||
def testDnsRequestSrvD(self):
|
||||
dnsob = DNS.Request(qtype='srv')
|
||||
respdef = dnsob.req('_ldap._tcp.openldap.org', timeout=1)
|
||||
self.assertTrue(respdef.answers)
|
||||
data = respdef.answers[0]['data']
|
||||
self.assertEqual(len(data), 4)
|
||||
self.assertEqual(data[2], 389)
|
||||
self.assertTrue('openldap.org' in data[3])
|
||||
|
||||
def testDkimRequestD(self):
|
||||
q = '20161025._domainkey.google.com'
|
||||
dnsob = DNS.Request(q, qtype='txt')
|
||||
resp = dnsob.req(timeout=1)
|
||||
|
||||
self.assertTrue(resp.answers)
|
||||
# should the result be bytes or a string? (Bytes, we finally settled on bytes)
|
||||
data = resp.answers[0]['data']
|
||||
self.assertFalse(isinstance(data[0], str))
|
||||
self.assertTrue(data[0].startswith(b'k=rsa'))
|
||||
|
||||
def testDNSRequestTXTD(self):
|
||||
dnsob = DNS.DnsRequest('fail.kitterman.org')
|
||||
|
||||
respdef = dnsob.req(qtype='TXT', timeout=1)
|
||||
self.assertTrue(respdef.answers)
|
||||
data = respdef.answers[0]['data']
|
||||
self.assertEqual(data, [b'v=spf1 -all'])
|
||||
|
||||
def testIDND(self):
|
||||
"""Can we lookup an internationalized domain name?"""
|
||||
dnsob = DNS.DnsRequest('xn--bb-eka.at')
|
||||
unidnsob = DNS.DnsRequest('öbb.at')
|
||||
a_resp = dnsob.req(qtype='A', resulttype='text', timeout=1)
|
||||
ua_resp = unidnsob.req(qtype='A', resulttype='text', timeout=1)
|
||||
self.assertTrue(a_resp.answers)
|
||||
self.assertTrue(ua_resp.answers)
|
||||
self.assertEqual(ua_resp.answers[0]['data'],
|
||||
a_resp.answers[0]['data'])
|
||||
|
||||
def testNSD(self):
|
||||
"""Lookup NS record from SOA"""
|
||||
dnsob = DNS.DnsRequest('kitterman.com')
|
||||
resp = dnsob.req(qtype='SOA', timeout=1)
|
||||
self.assertTrue(resp.answers)
|
||||
primary = resp.answers[0]['data'][0]
|
||||
self.assertEqual(primary, 'ns1.pairnic.com')
|
||||
resp = dnsob.req(qtype='NS',server=primary,aa=1, timeout=1)
|
||||
nslist = [x['data'].lower() for x in resp.answers]
|
||||
nslist.sort()
|
||||
self.assertEqual(nslist, ['ns1.pairnic.com', 'ns2.pairnic.com'])
|
||||
|
||||
def test_suite():
|
||||
from unittest import TestLoader
|
||||
return TestLoader().loadTestsFromName(__name__)
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
|
@ -0,0 +1,118 @@
|
|||
"""
|
||||
$Id$
|
||||
|
||||
Extract a list of TCP/IP name servers from the registry 0.1
|
||||
0.1 Strobl 2001-07-19
|
||||
Usage:
|
||||
RegistryResolve() returns a list of ip numbers (dotted quads), by
|
||||
scouring the registry for addresses of name servers
|
||||
|
||||
Tested on Windows NT4 Server SP6a, Windows 2000 Pro SP2 and
|
||||
Whistler Pro (XP) Build 2462 and Windows ME
|
||||
... all having a different registry layout wrt name servers :-/
|
||||
|
||||
Todo:
|
||||
|
||||
Program doesn't check whether an interface is up or down
|
||||
|
||||
(c) 2001 Copyright by Wolfgang Strobl ws@mystrobl.de,
|
||||
License analog to the current Python license
|
||||
|
||||
WARNING: Python3 port completely untested on Windows.
|
||||
"""
|
||||
|
||||
import re
|
||||
import winreg
|
||||
|
||||
def binipdisplay(s):
|
||||
"convert a binary array of ip adresses to a python list"
|
||||
if len(s)%4!= 0:
|
||||
raise EnvironmentError # well ...
|
||||
ol=[]
|
||||
for i in range(len(s)/4):
|
||||
s1=s[:4]
|
||||
s=s[4:]
|
||||
ip=[]
|
||||
for j in s1:
|
||||
ip.append(str(ord(j)))
|
||||
ol.append('.'.join(ip))
|
||||
return ol
|
||||
|
||||
def stringdisplay(s):
|
||||
'''convert "d.d.d.d,d.d.d.d" to ["d.d.d.d","d.d.d.d"].
|
||||
also handle u'd.d.d.d d.d.d.d', as reporting on SF
|
||||
'''
|
||||
import re
|
||||
return list(map(str, re.split("[ ,]",s)))
|
||||
|
||||
def RegistryResolve():
|
||||
nameservers=[]
|
||||
x=winreg.ConnectRegistry(None,winreg.HKEY_LOCAL_MACHINE)
|
||||
try:
|
||||
y= winreg.OpenKey(x,
|
||||
r"SYSTEM\CurrentControlSet\Services\Tcpip\Parameters")
|
||||
except EnvironmentError: # so it isn't NT/2000/XP
|
||||
# windows ME, perhaps?
|
||||
try: # for Windows ME
|
||||
y= winreg.OpenKey(x,
|
||||
r"SYSTEM\CurrentControlSet\Services\VxD\MSTCP")
|
||||
nameserver,dummytype=winreg.QueryValueEx(y,'NameServer')
|
||||
if nameserver and not (nameserver in nameservers):
|
||||
nameservers.extend(stringdisplay(nameserver))
|
||||
except EnvironmentError:
|
||||
pass
|
||||
return nameservers # no idea
|
||||
try:
|
||||
nameserver = winreg.QueryValueEx(y, "DhcpNameServer")[0].split()
|
||||
except:
|
||||
nameserver = winreg.QueryValueEx(y, "NameServer")[0].split()
|
||||
if nameserver:
|
||||
nameservers=nameserver
|
||||
nameserver = winreg.QueryValueEx(y,"NameServer")[0]
|
||||
winreg.CloseKey(y)
|
||||
try: # for win2000
|
||||
y= winreg.OpenKey(x,
|
||||
r"SYSTEM\CurrentControlSet\Services\Tcpip\Parameters\DNSRegisteredAdapters")
|
||||
for i in range(1000):
|
||||
try:
|
||||
n=winreg.EnumKey(y,i)
|
||||
z=winreg.OpenKey(y,n)
|
||||
dnscount,dnscounttype=winreg.QueryValueEx(z,
|
||||
'DNSServerAddressCount')
|
||||
dnsvalues,dnsvaluestype=winreg.QueryValueEx(z,
|
||||
'DNSServerAddresses')
|
||||
nameservers.extend(binipdisplay(dnsvalues))
|
||||
winreg.CloseKey(z)
|
||||
except EnvironmentError:
|
||||
break
|
||||
winreg.CloseKey(y)
|
||||
except EnvironmentError:
|
||||
pass
|
||||
#
|
||||
try: # for whistler
|
||||
y= winreg.OpenKey(x,
|
||||
r"SYSTEM\CurrentControlSet\Services\Tcpip\Parameters\Interfaces")
|
||||
for i in range(1000):
|
||||
try:
|
||||
n=winreg.EnumKey(y,i)
|
||||
z=winreg.OpenKey(y,n)
|
||||
try:
|
||||
nameserver,dummytype=winreg.QueryValueEx(z,'NameServer')
|
||||
if nameserver and not (nameserver in nameservers):
|
||||
nameservers.extend(stringdisplay(nameserver))
|
||||
except EnvironmentError:
|
||||
pass
|
||||
winreg.CloseKey(z)
|
||||
except EnvironmentError:
|
||||
break
|
||||
winreg.CloseKey(y)
|
||||
except EnvironmentError:
|
||||
#print "Key Interfaces not found, just do nothing"
|
||||
pass
|
||||
#
|
||||
winreg.CloseKey(x)
|
||||
return nameservers
|
||||
|
||||
if __name__=="__main__":
|
||||
print("Name servers:",RegistryResolve())
|
||||
|
|
@ -0,0 +1,69 @@
|
|||
PYDNS is Copyright 2000-2014 by Guido van Rossum,
|
||||
Michael Ströder <stroeder@users.sourceforge.net>,
|
||||
Anthony Baxter <anthony@interlink.com.au>,
|
||||
Stuart Gathman <stuart@bmsi.com>,
|
||||
and Scott Kitterman <scott@kitterman.com>
|
||||
|
||||
This code is released under the following Python-style license:
|
||||
|
||||
CNRI LICENSE AGREEMENT FOR PYDNS-2.3.5
|
||||
|
||||
1. This LICENSE AGREEMENT is between the Corporation for National Research
|
||||
Initiatives, having an office at 1895 Preston White Drive, Reston, VA 20191
|
||||
(“CNRI”), and the Individual or Organization (“Licensee”) accessing and
|
||||
otherwise using pydns-2.3.5 software in source or binary form and its
|
||||
associated documentation.
|
||||
|
||||
2. Subject to the terms and conditions of this License Agreement, CNRI
|
||||
hereby grants Licensee a nonexclusive, royalty-free, world-wide license to
|
||||
reproduce, analyze, test, perform and/or display publicly, prepare derivative
|
||||
works, distribute, and otherwise use pydns-2.3.5 alone or in any derivative
|
||||
version, provided, however, that CNRI’s License Agreement and CNRI’s notice of
|
||||
copyright, i.e., “Copyright © 1995-2001 Corporation for National Research
|
||||
Initiatives; All Rights Reserved” are retained in pydns-2.3.5 alone or in any
|
||||
derivative version prepared by Licensee. Alternately, in lieu of CNRI’s License
|
||||
Agreement, Licensee may substitute the following text (omitting the quotes):
|
||||
“pydns-2.3.5 is made available subject to the terms and conditions in CNRI’s
|
||||
License Agreement. This Agreement together with pydns-2.3.5 may be located on
|
||||
the Internet using the following unique, persistent identifier (known as a
|
||||
handle): 1895.22/1013. This Agreement may also be obtained from a proxy server
|
||||
on the Internet using the following URL: http://hdl.handle.net/1895.22/1013.”
|
||||
|
||||
3. In the event Licensee prepares a derivative work that is based on or
|
||||
incorporates pydns-2.3.5 or any part thereof, and wants to make the derivative
|
||||
work available to others as provided herein, then Licensee hereby agrees to
|
||||
include in any such work a brief summary of the changes made to pydns-2.3.5.
|
||||
|
||||
4. CNRI is making pydns-2.3.5 available to Licensee on an “AS IS” basis.
|
||||
CNRI MAKES NO REPRESENTATIONS OR WARRANTIES, EXPRESS OR IMPLIED. BY WAY OF
|
||||
EXAMPLE, BUT NOT LIMITATION, CNRI MAKES NO AND DISCLAIMS ANY REPRESENTATION OR
|
||||
WARRANTY OF MERCHANTABILITY OR FITNESS FOR ANY PARTICULAR PURPOSE OR THAT THE
|
||||
USE OF PYTHON 1.6.1 WILL NOT INFRINGE ANY THIRD PARTY RIGHTS.
|
||||
|
||||
5. CNRI SHALL NOT BE LIABLE TO LICENSEE OR ANY OTHER USERS OF PYTHON 1.6.1
|
||||
FOR ANY INCIDENTAL, SPECIAL, OR CONSEQUENTIAL DAMAGES OR LOSS AS A RESULT OF
|
||||
MODIFYING, DISTRIBUTING, OR OTHERWISE USING PYTHON 1.6.1, OR ANY DERIVATIVE
|
||||
THEREOF, EVEN IF ADVISED OF THE POSSIBILITY THEREOF.
|
||||
|
||||
6. This License Agreement will automatically terminate upon a material
|
||||
breach of its terms and conditions.
|
||||
|
||||
7. This License Agreement shall be governed by the federal intellectual
|
||||
property law of the United States, including without limitation the federal
|
||||
copyright law, and, to the extent such U.S. federal law does not apply, by the
|
||||
law of the Commonwealth of Virginia, excluding Virginia’s conflict of law
|
||||
provisions. Notwithstanding the foregoing, with regard to derivative works
|
||||
based on pydns-2.3.5 that incorporate non-separable material that was
|
||||
previously distributed under the GNU General Public License (GPL), the law of
|
||||
the Commonwealth of Virginia shall govern this License Agreement only as to
|
||||
issues arising under or with respect to Paragraphs 4, 5, and 7 of this License
|
||||
Agreement. Nothing in this License Agreement shall be deemed to create any
|
||||
relationship of agency, partnership, or joint venture between CNRI and
|
||||
Licensee. This License Agreement does not grant permission to use CNRI
|
||||
trademarks or trade name in a trademark sense to endorse or promote products or
|
||||
services of Licensee, or any third party.
|
||||
|
||||
8. By clicking on the “ACCEPT” button where indicated, or by copying,
|
||||
installing or otherwise using pydns-2.3.5, Licensee agrees to be bound by the
|
||||
terms and conditions of this License Agreement.
|
||||
|
|
@ -0,0 +1,9 @@
|
|||
recursive-include DNS *.py
|
||||
recursive-include tools *.py
|
||||
recursive-include tests *.py
|
||||
include LICENSE
|
||||
include CHANGES
|
||||
include MANIFEST.in
|
||||
include *.txt
|
||||
include setup.*
|
||||
include test.py
|
|
@ -0,0 +1,23 @@
|
|||
Metadata-Version: 1.2
|
||||
Name: py3dns
|
||||
Version: 3.2.1
|
||||
Summary: Python 3 DNS library
|
||||
Home-page: https://launchpad.net/py3dns
|
||||
Author: Anthony Baxter and others
|
||||
Author-email: py3dns-hackers@lists.launchpad.net
|
||||
Maintainer: Scott Kitterman
|
||||
Maintainer-email: scott@kitterman.com
|
||||
License: Python License
|
||||
Description: Python 3 DNS library:
|
||||
|
||||
Keywords: DNS
|
||||
Platform: UNKNOWN
|
||||
Classifier: Development Status :: 5 - Production/Stable
|
||||
Classifier: Environment :: No Input/Output (Daemon)
|
||||
Classifier: Intended Audience :: Developers
|
||||
Classifier: License :: OSI Approved :: Python License (CNRI Python License)
|
||||
Classifier: Natural Language :: English
|
||||
Classifier: Operating System :: OS Independent
|
||||
Classifier: Programming Language :: Python :: 3
|
||||
Classifier: Topic :: Internet :: Name Service (DNS)
|
||||
Classifier: Topic :: Software Development :: Libraries :: Python Modules
|
|
@ -0,0 +1,12 @@
|
|||
This directory contains a module (dnslib) that implements a DNS
|
||||
(Domain Name Server) client, plus additional modules that define some
|
||||
symbolic constants used by DNS (dnstype, dnsclass, dnsopcode).
|
||||
|
||||
Type "python dnslib.py -/" for a usage message.
|
||||
|
||||
You can also import dnslib and write your own, more sophisticated
|
||||
client code; use the test program as an example (there is currently no
|
||||
documentation :-).
|
||||
|
||||
--Guido van Rossum, CWI, Amsterdam <Guido.van.Rossum@cwi.nl>
|
||||
URL: <http://www.cwi.nl/cwi/people/Guido.van.Rossum.html>
|
|
@ -0,0 +1,146 @@
|
|||
Release 3.2.0 Mon Jul 23 2018
|
||||
|
||||
Switched from distutils to setuptools because "it's the future". It is
|
||||
unlikely to have end user impact. For python3.3+ no additional dependencies
|
||||
are required.
|
||||
|
||||
Release 3.1.0 Thu Apr 24 23:52:00 EDT 2014
|
||||
|
||||
More choices about result types are provided in 3.1.0. To specify resulttype,
|
||||
in a DnsRequest object, use the new function DnsRequest.qry
|
||||
(resulttype='binary/text/default'). DnsRequest.qry returns ipaddress objects
|
||||
for A and AAAA queries by defaults. Other defaults are the same as
|
||||
DnsRequest.req. Continue to use DnsRequest.req for exact backward
|
||||
compatibility with pydns and older py3dns defaults. TXT and SPF record data
|
||||
are returned as strings by default, this matches what dnspython3 returns.
|
||||
|
||||
The ipaddress module is used internally now. See CHANGES for details.
|
||||
|
||||
Release 3.0.3 Wed May 29 00:05:00 EDT 2013
|
||||
|
||||
There was a third, unintended incompatiblity in 3.0.2 in that IPv6 addresses
|
||||
were returned in their string format rather than their decimal format. This
|
||||
breaks pyspf queries when the connect IP is IPv6. 3.0.3 is a release strictly
|
||||
to revert this change.
|
||||
|
||||
Release 3.0.2 Thu Jan 19 01:25:00 EST 2012
|
||||
|
||||
This release introduces two potentially incompatible changes from the python
|
||||
verion of DNS (pydns). First, the data portion of DNS records of types TXT
|
||||
and SPF are returned as bytes instead of strings. Second, additional sub
|
||||
classes of DNSError have been added. Any code that catches DNSError should
|
||||
be checked to see if it needs updating to catch one of the new sub classes:
|
||||
ArgumentError, SocketError, TimeoutError, ServerError, and
|
||||
IncompleteReplyError.
|
||||
|
||||
Release 3.0 Sun Mar 2-9 23:07:22 2011 -0400
|
||||
|
||||
Ported to Python3 by Scott Kitterman <scott@kitterman.com>. This is mostly a
|
||||
minimal port to work with Python3 (tested with python3.2) plus addition of
|
||||
some of the patches that people have submitted on Sourceforge. It should be
|
||||
fully API compatible with 2.3. Note: Version 3.0.0 shipped with a new
|
||||
lazy.lookupfull function in advance of 2.3. This was incorporated in pydns
|
||||
2.3.5 as lazy.lookupalll. It has been renamed in 3.0.1 to stay API compatible
|
||||
with pydns 2.3.
|
||||
|
||||
Release 2.3 Mon May 6 16:18:02 EST 2002
|
||||
|
||||
This is a another release of the pydns code, as originally written by
|
||||
Guido van Rossum, and with a hopefully nicer API bolted over the
|
||||
top of it by Anthony Baxter <anthony@interlink.com.au>.
|
||||
|
||||
This code is released under a Python-style license.
|
||||
|
||||
I'm making this release because there hasn't been a release in a
|
||||
heck of a long time, and it probably deserves one. I'd also like to
|
||||
do a substantial refactor of some of the guts of the code, and this
|
||||
is likely to break any code that uses the existing interface. So
|
||||
this will be a release for people who are using the existing API...
|
||||
|
||||
There are several known bugs/unfinished bits
|
||||
|
||||
- processing of AXFR results is not done yet.
|
||||
- doesn't do IPv6 DNS requests (type AAAA)
|
||||
- docs, aside from this file
|
||||
- all sorts of other stuff that I've probably forgotten.
|
||||
- MacOS support for discovering nameservers
|
||||
- the API that I evolved some time ago is pretty ugly. I'm going
|
||||
to re-do it, designed this time.
|
||||
|
||||
Stuff it _does_ do:
|
||||
- processes /etc/resolv.conf - at least as far as nameserver directives go.
|
||||
- tries multiple nameservers.
|
||||
- nicer API - see below.
|
||||
- returns results in more useful format.
|
||||
- optional timing of requests.
|
||||
- default 'show' behaviour emulates 'dig' pretty closely.
|
||||
|
||||
|
||||
To use:
|
||||
|
||||
import DNS
|
||||
reqobj=DNS.Request(args)
|
||||
reqobj.req(args)
|
||||
|
||||
args can be a name, in which case it takes that as the query, and/or a series
|
||||
of keyword/value args. (see below for a list of args)
|
||||
|
||||
when calling the 'req()' method, it reuses the options specified in the
|
||||
DNS.Request() call as defaults.
|
||||
|
||||
options are applied in the following order:
|
||||
those specified in the req() call
|
||||
or, if not specified there,
|
||||
those specified in the creation of the Request() object
|
||||
or, if not specified there,
|
||||
those specified in the DNS.defaults dictionary
|
||||
|
||||
name servers can be specified in the following ways:
|
||||
- by calling DNS.DiscoverNameServers(), which will load the DNS servers
|
||||
from the system's /etc/resolv.conf file on Unix, or from the Registry
|
||||
on windows.
|
||||
- by specifying it as an option to the request
|
||||
- by manually setting DNS.defaults['server'] to a list of server IP
|
||||
addresses to try
|
||||
- XXXX It should be possible to load the DNS servers on a mac os machine,
|
||||
from where-ever they've squirrelled them away
|
||||
|
||||
name="host.do.main" # the object being looked up
|
||||
qtype="SOA" # the query type, eg SOA, A, MX, CNAME, ANY
|
||||
protocol="udp" # "udp" or "tcp" - usually you want "udp"
|
||||
server="nameserver" # the name of the nameserver. Note that you might
|
||||
# want to use an IP address here
|
||||
rd=1 # "recursion desired" - defaults to 1.
|
||||
other: opcode, port, ...
|
||||
|
||||
There's also some convenience functions, for the lazy:
|
||||
|
||||
to do a reverse lookup:
|
||||
>>> print DNS.revlookup("192.189.54.17")
|
||||
yarrina.connect.com.au
|
||||
|
||||
to look up all MX records for an entry:
|
||||
>>> print DNS.mxlookup("connect.com.au")
|
||||
[(10, 'yarrina.connect.com.au'), (100, 'warrane.connect.com.au')]
|
||||
|
||||
Documentation of the rest of the interface will have to wait for a
|
||||
later date. Note that the DnsAsyncRequest stuff is currently not
|
||||
working - I haven't looked too closely at why, yet.
|
||||
|
||||
There's some examples in the tests/ directory - including test5.py,
|
||||
which is even vaguely useful. It looks for the SOA for a domain, checks
|
||||
that the primary NS is authoritative, then checks the nameservers
|
||||
that it believes are NSs for the domain and checks that they're
|
||||
authoritative, and that the zone serial numbers match.
|
||||
|
||||
see also README.guido for the original docs.
|
||||
|
||||
py3dns is derived from pydns. The sourceforge details below refer to pydns.
|
||||
All py3dns issues/comments/etc should be reported via
|
||||
https://launchpad.net/py3dns.
|
||||
|
||||
comments to me, anthony@interlink.com.au, or to the mailing list,
|
||||
pydns-developer@lists.sourceforge.net.
|
||||
|
||||
bugs/patches to the tracker on SF -
|
||||
http://sourceforge.net/tracker/?group_id=31674
|
|
@ -0,0 +1,23 @@
|
|||
Metadata-Version: 1.2
|
||||
Name: py3dns
|
||||
Version: 3.2.1
|
||||
Summary: Python 3 DNS library
|
||||
Home-page: https://launchpad.net/py3dns
|
||||
Author: Anthony Baxter and others
|
||||
Author-email: py3dns-hackers@lists.launchpad.net
|
||||
Maintainer: Scott Kitterman
|
||||
Maintainer-email: scott@kitterman.com
|
||||
License: Python License
|
||||
Description: Python 3 DNS library:
|
||||
|
||||
Keywords: DNS
|
||||
Platform: UNKNOWN
|
||||
Classifier: Development Status :: 5 - Production/Stable
|
||||
Classifier: Environment :: No Input/Output (Daemon)
|
||||
Classifier: Intended Audience :: Developers
|
||||
Classifier: License :: OSI Approved :: Python License (CNRI Python License)
|
||||
Classifier: Natural Language :: English
|
||||
Classifier: Operating System :: OS Independent
|
||||
Classifier: Programming Language :: Python :: 3
|
||||
Classifier: Topic :: Internet :: Name Service (DNS)
|
||||
Classifier: Topic :: Software Development :: Libraries :: Python Modules
|
|
@ -0,0 +1,34 @@
|
|||
CHANGES
|
||||
CREDITS.txt
|
||||
LICENSE
|
||||
MANIFEST.in
|
||||
README-guido.txt
|
||||
README.txt
|
||||
setup.py
|
||||
test.py
|
||||
DNS/Base.py
|
||||
DNS/Class.py
|
||||
DNS/Lib.py
|
||||
DNS/Opcode.py
|
||||
DNS/Status.py
|
||||
DNS/Type.py
|
||||
DNS/__init__.py
|
||||
DNS/lazy.py
|
||||
DNS/win32dns.py
|
||||
DNS/tests/__init__.py
|
||||
DNS/tests/testPackers.py
|
||||
DNS/tests/test_base.py
|
||||
py3dns.egg-info/PKG-INFO
|
||||
py3dns.egg-info/SOURCES.txt
|
||||
py3dns.egg-info/dependency_links.txt
|
||||
py3dns.egg-info/not-zip-safe
|
||||
py3dns.egg-info/top_level.txt
|
||||
tests/test.py
|
||||
tests/test2.py
|
||||
tests/test4.py
|
||||
tests/test5.py
|
||||
tests/test6.py
|
||||
tests/test7.py
|
||||
tests/testsrv.py
|
||||
tools/caching.py
|
||||
tools/named-perf.py
|
|
@ -0,0 +1 @@
|
|||
|
|
@ -0,0 +1 @@
|
|||
|
|
@ -0,0 +1 @@
|
|||
DNS
|
|
@ -0,0 +1,36 @@
|
|||
import sys,os
|
||||
|
||||
sys.path.insert(0,os.getcwd())
|
||||
|
||||
from setuptools import setup
|
||||
|
||||
import DNS
|
||||
|
||||
setup(
|
||||
#-- Package description
|
||||
name = 'py3dns',
|
||||
license = 'Python License',
|
||||
version = DNS.__version__,
|
||||
description = 'Python 3 DNS library',
|
||||
long_description = """Python 3 DNS library:
|
||||
""",
|
||||
author = 'Anthony Baxter and others',
|
||||
author_email = 'py3dns-hackers@lists.launchpad.net ',
|
||||
maintainer="Scott Kitterman",
|
||||
maintainer_email="scott@kitterman.com",
|
||||
url = 'https://launchpad.net/py3dns',
|
||||
packages = ['DNS'], keywords = ['DNS'],
|
||||
zip_safe = False,
|
||||
classifiers = [
|
||||
'Development Status :: 5 - Production/Stable',
|
||||
'Environment :: No Input/Output (Daemon)',
|
||||
'Intended Audience :: Developers',
|
||||
'License :: OSI Approved :: Python License (CNRI Python License)',
|
||||
'Natural Language :: English',
|
||||
'Operating System :: OS Independent',
|
||||
'Programming Language :: Python :: 3',
|
||||
'Topic :: Internet :: Name Service (DNS)',
|
||||
'Topic :: Software Development :: Libraries :: Python Modules'
|
||||
]
|
||||
)
|
||||
|
|
@ -0,0 +1,8 @@
|
|||
#! /usr/bin/python3
|
||||
|
||||
import unittest
|
||||
import doctest
|
||||
import DNS
|
||||
from DNS.tests import test_suite
|
||||
|
||||
unittest.TextTestRunner().run(test_suite())
|
|
@ -0,0 +1,43 @@
|
|||
#!/usr/bin/python3
|
||||
|
||||
import sys ; sys.path.insert(0, '..')
|
||||
|
||||
import DNS
|
||||
# automatically load nameserver(s) from /etc/resolv.conf
|
||||
# (works on unix - on others, YMMV)
|
||||
DNS.ParseResolvConf()
|
||||
|
||||
# lets do an all-in-one request
|
||||
# set up the request object
|
||||
r = DNS.DnsRequest(name='munnari.oz.au',qtype='A')
|
||||
# do the request
|
||||
a=r.req()
|
||||
# and do a pretty-printed output
|
||||
a.show()
|
||||
|
||||
# now lets setup a reusable request object
|
||||
r = DNS.DnsRequest(qtype='ANY')
|
||||
res = r.req("a.root-servers.nex",qtype='ANY')
|
||||
res.show()
|
||||
res = r.req("proxy.connect.com.au")
|
||||
res.show()
|
||||
|
||||
# do a TCP reply
|
||||
r = DNS.DnsRequest("imsavscan.netvigator.com", qtype="A", server=['8.8.8.8'], protocol='tcp', timeout=300)
|
||||
res = r.req()
|
||||
res.show()
|
||||
|
||||
# look up a TXT record
|
||||
r = DNS.DnsRequest("kitterman.com", qtype="TXT", protocol='tcp')
|
||||
res = r.req()
|
||||
res.show()
|
||||
|
||||
# look up a AAAA record
|
||||
r = DNS.DnsRequest("mailout03.controlledmail.com", qtype="AAAA", protocol='tcp')
|
||||
res = r.req(resulttype='text')
|
||||
res.show()
|
||||
|
||||
# look up a A record set that falls over to EDNS0/TCP
|
||||
r = DNS.DnsRequest("long-a-record.tana.it", qtype="A", protocol='udp')
|
||||
res = r.req(resulttype='text')
|
||||
res.show()
|
|
@ -0,0 +1,17 @@
|
|||
#!/usr/bin/python3
|
||||
|
||||
import sys ; sys.path.insert(0, '..')
|
||||
import DNS
|
||||
# automatically load nameserver(s) from /etc/resolv.conf
|
||||
# (works on unix - on others, YMMV)
|
||||
DNS.ParseResolvConf()
|
||||
|
||||
r=DNS.Request(qtype='mx')
|
||||
res = r.req('connect.com.au')
|
||||
res.show()
|
||||
|
||||
r=DNS.Request(qtype='soa')
|
||||
res = r.req('connect.com.au')
|
||||
res.show()
|
||||
|
||||
print(DNS.revlookup('192.189.54.17'))
|
|
@ -0,0 +1,10 @@
|
|||
#!/usr/bin/python3
|
||||
|
||||
import sys ; sys.path.insert(0, '..')
|
||||
|
||||
import DNS
|
||||
|
||||
DNS.ParseResolvConf()
|
||||
|
||||
print(DNS.mxlookup("hotmail.com"))
|
||||
print(DNS.mxlookup("connect.com.au"))
|
|
@ -0,0 +1,62 @@
|
|||
#!/usr/bin/python3
|
||||
|
||||
import sys ; sys.path.insert(0, '..')
|
||||
import DNS
|
||||
|
||||
def Error(mesg):
|
||||
import sys
|
||||
print(sys.argv[0],"ERROR:")
|
||||
print(mesg)
|
||||
sys.exit(1)
|
||||
|
||||
def main():
|
||||
import sys
|
||||
if len(sys.argv) != 2:
|
||||
Error("usage: %s somedomain.com"%sys.argv[0])
|
||||
domain = sys.argv[1]
|
||||
nslist = GetNS(domain)
|
||||
print("According to the primary, the following are nameservers for this domain")
|
||||
for ns in nslist:
|
||||
print(" ",ns)
|
||||
CheckNS(ns,domain)
|
||||
|
||||
|
||||
def GetNS(domain):
|
||||
import DNS
|
||||
# hm. this might fail if a server is off the air.
|
||||
r = DNS.Request(domain,qtype='SOA').req()
|
||||
if r.header['status'] != 'NOERROR':
|
||||
Error("received status of %s when attempting to look up SOA for domain"%
|
||||
(r.header['status']))
|
||||
if r.header['status'] == 'NXDOMAIN':
|
||||
print("SOA request was NXDOMAIN")
|
||||
primary = ''
|
||||
else:
|
||||
if r.answers:
|
||||
primary,email,serial,refresh,retry,expire,minimum = r.answers[0]['data']
|
||||
print("Primary nameserver for domain %s is: %s"%(domain,primary))
|
||||
else:
|
||||
print("No answer to SOA query")
|
||||
primary = ''
|
||||
r = DNS.Request(domain,qtype='NS',server=primary,aa=1).req()
|
||||
if r.header['status'] != 'NOERROR':
|
||||
Error("received status of %s when attempting to query %s for NSs"%
|
||||
(r.header['status']))
|
||||
if r.header['aa'] != 1 and primary is not '':
|
||||
Error("primary NS %s doesn't believe that it's authoritative!"% primary)
|
||||
nslist = [x['data'] for x in r.answers]
|
||||
print("Full list of nameservers for domain %s is: %s"%(domain,nslist))
|
||||
return nslist
|
||||
|
||||
def CheckNS(nameserver,domain):
|
||||
r = DNS.Request(domain,qtype='SOA',server=nameserver,aa=1).req()
|
||||
if r.header['status'] != 'NOERROR':
|
||||
Error("received status of %s when attempting to query %s for NS"%
|
||||
(r.header['status']))
|
||||
if r.header['aa'] != 1:
|
||||
Error("NS %s doesn't believe that it's authoritative!"% nameserver)
|
||||
primary,email,serial,refresh,retry,expire,minimum = r.answers[0]['data']
|
||||
print(" NS has serial",serial[1])
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
|
@ -0,0 +1,28 @@
|
|||
#!/usr/bin/python3
|
||||
|
||||
import sys ; sys.path.insert(0, '..')
|
||||
import DNS
|
||||
req = DNS.DnsRequest('mailout03.controlledmail.com', qtype='AAAA', protocol='tcp')
|
||||
resp = req.req()
|
||||
print(resp.answers[0]['name'], resp.answers[0]['data'])
|
||||
|
||||
req1 = DNS.DnsRequest('mailout03.controlledmail.com', qtype='AAAA', protocol='udp')
|
||||
resp1 = req1.req(resulttype='binary')
|
||||
print(resp1.answers[0]['name'], resp1.answers[0]['data'])
|
||||
|
||||
req2 = DNS.DnsRequest('mailout03.controlledmail.com', qtype='AAAA', protocol='tcp')
|
||||
resp2 = req2.req(resulttype='text')
|
||||
print(resp2.answers[0]['name'], resp2.answers[0]['data'])
|
||||
|
||||
req3 = DNS.DnsRequest('mailout03.controlledmail.com', qtype='A', protocol='tcp')
|
||||
resp3 = req3.req()
|
||||
print(resp3.answers[0]['name'], resp3.answers[0]['data'])
|
||||
|
||||
req4 = DNS.DnsRequest('mailout03.controlledmail.com', qtype='A', protocol='udp', resulttype='binary')
|
||||
resp4 = req4.req(resulttype='binary')
|
||||
print(resp4.answers[0]['name'], resp4.answers[0]['data'])
|
||||
|
||||
req5 = DNS.DnsRequest('mailout03.controlledmail.com', qtype='A', protocol='tcp')
|
||||
resp5 = req5.req(resulttype='text')
|
||||
print(resp5.answers[0]['name'], resp5.answers[0]['data'])
|
||||
|
|
@ -0,0 +1,40 @@
|
|||
#!/usr/bin/python3
|
||||
|
||||
import sys ; sys.path.insert(0, '..')
|
||||
import DNS
|
||||
req = DNS.DnsRequest('google.com', qtype='AAAA', protocol='tcp')
|
||||
resp = req.req()
|
||||
print(resp.answers[0]['name'], resp.answers[0]['data'])
|
||||
|
||||
req1 = DNS.DnsRequest('google.com', qtype='AAAA', protocol='udp')
|
||||
resp1 = req1.req(resulttype='binary')
|
||||
print(resp1.answers[0]['name'], resp1.answers[0]['data'])
|
||||
|
||||
req2 = DNS.DnsRequest('google.com', qtype='AAAA', protocol='tcp')
|
||||
resp2 = req2.req(resulttype='text')
|
||||
print(resp2.answers[0]['name'], resp2.answers[0]['data'])
|
||||
|
||||
req3 = DNS.DnsRequest('mailout03.controlledmail.com', qtype='TXT', protocol='tcp')
|
||||
resp3 = req3.req()
|
||||
print(resp3.answers[0]['name'], resp3.answers[0]['data'])
|
||||
|
||||
req4 = DNS.DnsRequest('mailout03.controlledmail.com', qtype='TXT', protocol='udp', resulttype='binary')
|
||||
resp4 = req4.req(resulttype='binary')
|
||||
print(resp4.answers[0]['name'], resp4.answers[0]['data'])
|
||||
|
||||
req5 = DNS.DnsRequest('mailout03.controlledmail.com', qtype='TXT', protocol='tcp')
|
||||
resp5 = req5.req(resulttype='text')
|
||||
print(resp5.answers[0]['name'], resp5.answers[0]['data'])
|
||||
|
||||
req6 = DNS.DnsRequest('controlledmail.com', qtype='MX', protocol='tcp')
|
||||
resp6 = req6.req()
|
||||
print(resp6.answers[0]['name'], resp6.answers[0]['data'])
|
||||
|
||||
req7 = DNS.DnsRequest('controlledmail.com', qtype='MX', protocol='udp', resulttype='binary')
|
||||
resp7 = req6.req(resulttype='binary')
|
||||
print(resp7.answers[0]['name'], resp7.answers[0]['data'])
|
||||
|
||||
req8 = DNS.DnsRequest('controlledmail.com', qtype='MX', protocol='tcp')
|
||||
resp8 = req8.req(resulttype='text')
|
||||
print(resp8.answers[0]['name'], resp8.answers[0]['data'])
|
||||
|
|
@ -0,0 +1,13 @@
|
|||
#!/usr/bin/python3
|
||||
|
||||
import sys ; sys.path.insert(0, '..')
|
||||
|
||||
import DNS
|
||||
# automatically load nameserver(s) from /etc/resolv.conf
|
||||
# (works on unix - on others, YMMV)
|
||||
DNS.ParseResolvConf()
|
||||
|
||||
r=DNS.Request(qtype='srv')
|
||||
res = r.req('_ldap._tcp.openldap.org')
|
||||
res.show()
|
||||
print(res.answers)
|
|
@ -0,0 +1,55 @@
|
|||
#
|
||||
# From: KevinL <darius@bofh.net.au>
|
||||
# A simple dns answer cache - it's author notes:
|
||||
# "It's probably really bodgy code, tho - it was my early python..."
|
||||
# So don't send him abusive messages if you hate it.
|
||||
#
|
||||
class DNSCache:
|
||||
"""
|
||||
Covers the DNS object, keeps a cache of answers. Clumsy as hell.
|
||||
"""
|
||||
forCache = {}
|
||||
revCache = {}
|
||||
# cache failures for this long, in seconds
|
||||
negCache = 3600
|
||||
|
||||
def __init__(self):
|
||||
import DNS
|
||||
DNS.ParseResolvConf()
|
||||
|
||||
def lookup(self,IP = None,name = None):
|
||||
import DNS
|
||||
now = time.time()
|
||||
if (not IP) and (not name):
|
||||
return None
|
||||
if IP:
|
||||
if type(IP) != type(''):
|
||||
return None
|
||||
a = string.split(IP, '.')
|
||||
a.reverse()
|
||||
name = string.join(a, '.')+'.in-addr.arpa'
|
||||
cache = self.revCache
|
||||
qt = 'ptr'
|
||||
else:
|
||||
if type(name) != type(''):
|
||||
return None
|
||||
cache = self.forCache
|
||||
qt = 'a'
|
||||
if name in cache:
|
||||
# Check if it's timed out or not
|
||||
if cache[name][1] < now:
|
||||
del(cache[name])
|
||||
else:
|
||||
return(cache[name][0])
|
||||
x = DNS.DnsRequest(name,qtype=qt)
|
||||
try:
|
||||
x.req()
|
||||
except:
|
||||
return 'Timeout'
|
||||
if len(x.response.answers) > 0:
|
||||
cache[name] = ( x.response.answers[0]['data'], x.time_finish +
|
||||
x.response.answers[0]['ttl'])
|
||||
else:
|
||||
cache[name] = (None,now+self.negCache)
|
||||
return cache[name][0]
|
||||
|
|
@ -0,0 +1,63 @@
|
|||
#!/usr/bin/python3
|
||||
|
||||
servers = [ "192.92.129.1",
|
||||
"192.189.54.17", # yarrina
|
||||
"192.189.54.33", # warrane
|
||||
"203.8.183.1", # yalumba
|
||||
"192.189.54.65", # gnamma
|
||||
"128.250.1.21", # munnari
|
||||
|
||||
]
|
||||
|
||||
lookups = [ ( 'munnari.oz.au', 'A' ),
|
||||
( 'connect.com.au', 'SOA' ),
|
||||
( 'parc.xerox.com', 'MX' ),
|
||||
( 'bogus.example.net', 'A'),
|
||||
]
|
||||
|
||||
rpts = 5
|
||||
|
||||
def main():
|
||||
import DNS
|
||||
import socket
|
||||
import time
|
||||
res = {}
|
||||
for server in servers:
|
||||
res[server] = [100000,0,0,0] # min,max,tot,failed
|
||||
for what,querytype in lookups:
|
||||
for count in range(rpts):
|
||||
for server in servers:
|
||||
d = DNS.DnsRequest(server=server,timeout=1)
|
||||
fail = 0
|
||||
timingstart = time.time()
|
||||
try:
|
||||
r=d.req(name=what,qtype=querytype)
|
||||
except DNS.Error:
|
||||
fail = 1
|
||||
timingfinish = time.time()
|
||||
if fail:
|
||||
res[server][3] = res[server][3] + 1
|
||||
print("(failed)",res[server][3])
|
||||
if 0:
|
||||
if r.header['ancount'] == 0:
|
||||
print("WARNING: Server",server,"got no answers for", \
|
||||
what, querytype)
|
||||
t = int(1000 * (timingfinish - timingstart))
|
||||
print(server,"took",t,"ms for",what,querytype)
|
||||
res[server][0] = min(t,res[server][0])
|
||||
res[server][1] = max(t,res[server][1])
|
||||
res[server][2] = res[server][2] + t
|
||||
for server in servers:
|
||||
queries = rpts * len(lookups)
|
||||
r = res[server]
|
||||
print(server)
|
||||
print("%-30s %2d/%2d(%3.2f%%) %dms/%dms/%dms min/avg/max" % (
|
||||
socket.gethostbyaddr(server)[0],
|
||||
queries - r[3], queries,
|
||||
((queries-r[3])*100.0)/queries,
|
||||
r[0],
|
||||
r[2] / queries,
|
||||
r[1]))
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
Loading…
Reference in New Issue