summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorScott Kitterman <scott@kitterman.com>2019-09-04 12:33:10 (GMT)
committerScott Kitterman <scott@kitterman.com>2019-09-04 12:33:10 (GMT)
commit2dadd3e3e60e0e0235f774861c3ecfdf3b3a3df5 (patch)
tree411478e67380082e1959b8ff7f679dcc2ef303a0
downloadpy3dns-2dadd3e3e60e0e0235f774861c3ecfdf3b3a3df5.zip
py3dns-2dadd3e3e60e0e0235f774861c3ecfdf3b3a3df5.tar.gz
py3dns-2dadd3e3e60e0e0235f774861c3ecfdf3b3a3df5.tar.bz2
Import py3dns_3.2.1.orig.tar.gz
[dgit import orig py3dns_3.2.1.orig.tar.gz]
-rw-r--r--CHANGES81
-rw-r--r--CREDITS.txt18
-rw-r--r--DNS/Base.py480
-rw-r--r--DNS/Class.py35
-rw-r--r--DNS/Lib.py809
-rw-r--r--DNS/Opcode.py30
-rw-r--r--DNS/Status.py41
-rw-r--r--DNS/Type.py55
-rw-r--r--DNS/__init__.py38
-rw-r--r--DNS/lazy.py83
-rw-r--r--DNS/tests/__init__.py13
-rwxr-xr-xDNS/tests/testPackers.py430
-rw-r--r--DNS/tests/test_base.py260
-rw-r--r--DNS/win32dns.py118
-rw-r--r--LICENSE69
-rw-r--r--MANIFEST.in9
-rw-r--r--PKG-INFO23
-rw-r--r--README-guido.txt12
-rw-r--r--README.txt146
-rw-r--r--py3dns.egg-info/PKG-INFO23
-rw-r--r--py3dns.egg-info/SOURCES.txt34
-rw-r--r--py3dns.egg-info/dependency_links.txt1
-rw-r--r--py3dns.egg-info/not-zip-safe1
-rw-r--r--py3dns.egg-info/top_level.txt1
-rw-r--r--setup.cfg4
-rw-r--r--setup.py36
-rwxr-xr-xtest.py8
-rwxr-xr-xtests/test.py43
-rwxr-xr-xtests/test2.py17
-rwxr-xr-xtests/test4.py10
-rwxr-xr-xtests/test5.py62
-rwxr-xr-xtests/test6.py28
-rwxr-xr-xtests/test7.py40
-rwxr-xr-xtests/testsrv.py13
-rw-r--r--tools/caching.py55
-rwxr-xr-xtools/named-perf.py63
36 files changed, 3189 insertions, 0 deletions
diff --git a/CHANGES b/CHANGES
new file mode 100644
index 0000000..8518c10
--- /dev/null
+++ b/CHANGES
@@ -0,0 +1,81 @@
+3.2.1 Wed, Sep 4, 2019
+ * Add support for setting timeout for convenience methods in DNS.lazy
+ * Fixed DNS.req resulttype error format (LP: #1842423)
+ * Use errno.EADDRINUSE instead of the hard coded Linux value for improved
+ portability (LP: #1793540)
+ * Update test suite to correct for use of no longer existing DNS records
+ * Set timeout=1 for tests so testing with a non-responsive nameserver will
+ finish in a reasonable time
+
+3.2.0 Mon, 23 Jul 2018
+ * Rename internal use of async since it is a reserved word in python3.7
+ (LP: #1776027)
+ * Switch from distutils to setuptools
+ * Ship test.py in the tarball
+
+3.1.1 Thu, 06 Oct 2016 22:00:13 -0400
+ * Update test suite for new example.org IP addresses
+ * Fix missing bits for use of ipaddr-py with python3 < 3.3 (LP: #1319611)
+ - Patch thanks to Arfrever Frehtes Taifersar Arahesis
+ * Correct error in _DiscoverNameServers from OS X implementation in 3.0.1
+ that prevents name server discovery on windows (LP: #1442424)
+ * Correct encoding issue with label length to fix issues with DNS labels
+ greater than 46 characters (LP: #1502853)
+ - Thanks to Petr Czepiec for the patch
+ * Use full path to /usr/sbin/scutil on OS X since scutil is not always in the
+ search path (LP: #1630844)
+
+3.1.0 Thu Apr 24 23:52:00 EDT 2014
+ * Raise DNSError when no nameservers have been found by the time a
+ Base.DNSRequest object is initialized
+ * Add new DNS.DnsRequest.qry function to supercede DNS.DnsRequest.req to
+ allow for non-backward compatible changes to be made in .qry while
+ maintaining full backward compatibility with 3.0.3 and later in the 3.0
+ series in .req
+ * Add options for 'resulttype' to DnsResult.qry to allow for binary, integer,
+ or text data to be returned for IP addresses
+ * The default result type for IPv4 and IPv6 addresses in DNS.DnsRequest.qry
+ is an ipaddress object
+ * The ipaddress module is used internally. This is included in python3.3 and
+ the ipaddr-py module from https://code.google.com/p/ipaddr-py/ can be used
+ with python3.2
+ * New unittest based test suite - thanks to Diane Trout
+
+3.0.4 Wed Aug 7 02:25:00 EDT 2013
+
+ * Fix timeouts associated with only one of several available nameservers
+ being unavailable(LP: #1209071):
+ - Only raise timeout error after trying all available servers
+ - Stop lookups once an answer is gotten
+ * Removed unmaintained spec files
+
+3.0.3 Wed May 29 00:05:00 EDT 2013
+
+ * Revert returning IPv6 addresses from AAAA lookups as string. Causing
+ incompatiblities that are deeply annoying to fix on the other end.
+
+3.0.2 Thu Jan 19 10:59:00 EST 2012
+
+ * Add more granular exception sub classes of DNSError, see SF #3388075
+ - Thanks to Julian Mehnle for the patch
+ * Add AAAA record support, works like A records
+ - Thanks to Shane Kerr for the patch
+
+3.0.1 Mon Jul 18 19:46:30 EDT 2011
+
+ * Add CHANGES to document post-Python 3 port changes
+ * Add LICENSE file
+ * Port pydns 2.3.5 changes to py3dns
+ - Handle large TCP replies (change to blocking IO with timeout)
+ - Add new lazy.dnslookup function to retrieve answer data for any query
+ type
+ - Add large TCP reply test to tests/test.py
+ * Add automatic name server discovery for OS X
+
+3.0.0 Wed Feb 9 23:35:22 EST 2011
+
+Ported to Python3 by Scott Kitterman <scott@kitterman.com>. This is mostly a
+minimal port to work with Python3 (tested with python3.2) plus addition of
+some of the patches that people have submitted on Sourceforge. It should be
+fully API compatible with 2.3.
+
diff --git a/CREDITS.txt b/CREDITS.txt
new file mode 100644
index 0000000..498f2ce
--- /dev/null
+++ b/CREDITS.txt
@@ -0,0 +1,18 @@
+This code was originally based on the DNS library created
+by Guido van Rossum somewhere near the dawn of time.
+
+Since then, as well as myself (Anthony), I have had contributions
+by:
+
+ Michael Ströder
+ Bastian Kleineidam
+ Timothy J. Miller
+ Wolfgang Strobl
+ Arnaud Fontaine
+ Scott Kitterman
+ Stuart Gathman
+ Diane Trout
+
+It's possible there's other people - the old RCS logs for my
+code were lost some time ago. The list above is almost certainly
+incomplete - let me know if I've forgotten you...
diff --git a/DNS/Base.py b/DNS/Base.py
new file mode 100644
index 0000000..34a6da7
--- /dev/null
+++ b/DNS/Base.py
@@ -0,0 +1,480 @@
+"""
+$Id$
+
+This file is part of the py3dns project.
+Homepage: https://launchpad.net/py3dns
+
+This code is covered by the standard Python License. See LICENSE for details.
+
+Changes for Python3 port © 2011-14 Scott Kitterman <scott@kitterman.com>
+
+ Base functionality. Request and Response classes, that sort of thing.
+"""
+
+import socket, string, types, time, select
+import errno
+from . import Type,Class,Opcode
+import asyncore
+#
+# This random generator is used for transaction ids and port selection. This
+# is important to prevent spurious results from lost packets, and malicious
+# cache poisoning. This doesn't matter if you are behind a caching nameserver
+# or your app is a primary DNS server only. To install your own generator,
+# replace DNS.Base.random. SystemRandom uses /dev/urandom or similar source.
+#
+try:
+ from random import SystemRandom
+ random = SystemRandom()
+except:
+ import random
+
+class DNSError(Exception): pass
+class ArgumentError(DNSError): pass
+class SocketError(DNSError): pass
+class TimeoutError(DNSError): pass
+
+class ServerError(DNSError):
+ def __init__(self, message, rcode):
+ DNSError.__init__(self, message, rcode)
+ self.message = message
+ self.rcode = rcode
+
+class IncompleteReplyError(DNSError): pass
+
+# Lib uses some of the above exception classes, so import after defining.
+from . import Lib
+
+defaults= { 'protocol':'udp', 'port':53, 'opcode':Opcode.QUERY,
+ 'qtype':Type.A, 'rd':1, 'timing':1, 'timeout': 30, 'server_rotate': 0,
+ 'server': [] }
+
+def ParseResolvConf(resolv_path="/etc/resolv.conf"):
+ "parses the /etc/resolv.conf file and sets defaults for name servers"
+ with open(resolv_path, 'r') as stream:
+ return ParseResolvConfFromIterable(stream)
+
+def ParseResolvConfFromIterable(lines):
+ "parses a resolv.conf formatted stream and sets defaults for name servers"
+ global defaults
+ for line in lines:
+ line = line.strip()
+ if not line or line[0]==';' or line[0]=='#':
+ continue
+ fields=line.split()
+ if len(fields) < 2:
+ continue
+ if fields[0]=='domain' and len(fields) > 1:
+ defaults['domain']=fields[1]
+ if fields[0]=='search':
+ pass
+ if fields[0]=='options':
+ pass
+ if fields[0]=='sortlist':
+ pass
+ if fields[0]=='nameserver':
+ defaults['server'].append(fields[1])
+
+def _DiscoverNameServers():
+ import sys
+ if sys.platform in ('win32', 'nt'):
+ from . import win32dns
+ defaults['server']=win32dns.RegistryResolve()
+ elif sys.platform == 'darwin':
+ ParseOSXSysConfig()
+ else:
+ return ParseResolvConf()
+
+def DiscoverNameServers():
+ """Don't call, only here for backward compatability. We do discovery for
+ you automatically.
+ """
+ pass
+
+class DnsRequest:
+ """ high level Request object """
+ def __init__(self,*name,**args):
+ self.donefunc=None
+ self.py3async=None
+ self.defaults = {}
+ self.argparse(name,args)
+ self.defaults = self.args
+ self.tid = 0
+ self.resulttype = ''
+ if len(self.defaults['server']) == 0:
+ raise DNSError('No working name servers discovered')
+
+ def argparse(self,name,args):
+ if not name and 'name' in self.defaults:
+ args['name'] = self.defaults['name']
+ if type(name) is bytes or type(name) is str:
+ args['name']=name
+ else:
+ if len(name) == 1:
+ if name[0]:
+ args['name']=name[0]
+ if defaults['server_rotate'] and \
+ type(defaults['server']) == types.ListType:
+ defaults['server'] = defaults['server'][1:]+defaults['server'][:1]
+ for i in list(defaults.keys()):
+ if i not in args:
+ if i in self.defaults:
+ args[i]=self.defaults[i]
+ else:
+ args[i]=defaults[i]
+ if type(args['server']) == bytes or type(args['server']) == str:
+ args['server'] = [args['server']]
+ self.args=args
+
+ def socketInit(self,a,b):
+ self.s = socket.socket(a,b)
+
+ def processUDPReply(self):
+ if self.timeout > 0:
+ r,w,e = select.select([self.s],[],[],self.timeout)
+ if not len(r):
+ raise TimeoutError('Timeout')
+ (self.reply, self.from_address) = self.s.recvfrom(65535)
+ self.time_finish=time.time()
+ self.args['server']=self.ns
+ return self.processReply()
+
+ def _readall(self,f,count):
+ res = f.read(count)
+ while len(res) < count:
+ if self.timeout > 0:
+ # should we restart timeout everytime we get a dribble of data?
+ rem = self.time_start + self.timeout - time.time()
+ if rem <= 0: raise DNSError('Timeout')
+ self.s.settimeout(rem)
+ buf = f.read(count - len(res))
+ if not buf:
+ raise DNSError('incomplete reply - %d of %d read' % (len(res),count))
+ res += buf
+ return res
+
+ def processTCPReply(self):
+ if self.timeout > 0:
+ self.s.settimeout(self.timeout)
+ else:
+ self.s.settimeout(None)
+ f = self.s.makefile('rb')
+ try:
+ header = self._readall(f,2)
+ count = Lib.unpack16bit(header)
+ self.reply = self._readall(f,count)
+ finally:
+ f.close()
+ self.time_finish=time.time()
+ self.args['server']=self.ns
+ return self.processReply()
+
+ def processReply(self):
+ self.args['elapsed']=(self.time_finish-self.time_start)*1000
+ if not self.resulttype:
+ u = Lib.Munpacker(self.reply)
+ elif self.resulttype == 'default':
+ u = Lib.MunpackerDefault(self.reply)
+ elif self.resulttype == 'binary':
+ u = Lib.MunpackerBinary(self.reply)
+ elif self.resulttype == 'text':
+ u = Lib.MunpackerText(self.reply)
+ elif self.resulttype == 'integer':
+ u = Lib.MunpackerInteger(self.reply)
+ else:
+ raise SyntaxError('Unknown resulttype: ' + self.resulttype)
+ r=Lib.DnsResult(u,self.args)
+ r.args=self.args
+ #self.args=None # mark this DnsRequest object as used.
+ return r
+ #### TODO TODO TODO ####
+# if protocol == 'tcp' and qtype == Type.AXFR:
+# while 1:
+# header = f.read(2)
+# if len(header) < 2:
+# print '========== EOF =========='
+# break
+# count = Lib.unpack16bit(header)
+# if not count:
+# print '========== ZERO COUNT =========='
+# break
+# print '========== NEXT =========='
+# reply = f.read(count)
+# if len(reply) != count:
+# print '*** Incomplete reply ***'
+# break
+# u = Lib.Munpacker(reply)
+# Lib.dumpM(u)
+
+ def getSource(self):
+ "Pick random source port to avoid DNS cache poisoning attack."
+ while True:
+ try:
+ source_port = random.randint(1024,65535)
+ self.s.bind(('', source_port))
+ break
+ except socket.error as msg:
+ # errno.EADDRINUSE, 'Address already in use'
+ if msg.errno != errno.EADDRINUSE: raise
+
+ def conn(self):
+ self.getSource()
+ self.s.connect((self.ns,self.port))
+
+ def qry(self,*name,**args):
+ '''
+ Request function for the DnsRequest class. In addition to standard
+ DNS args, the special pydns arg 'resulttype' can optionally be passed.
+ Valid resulttypes are 'default', 'text', 'decimal', and 'binary'.
+
+ Defaults are configured to be compatible with pydns:
+ AAAA: decimal
+ Others: text
+ '''
+ " needs a refactoring "
+ self.argparse(name,args)
+ #if not self.args:
+ # raise ArgumentError, 'reinitialize request before reuse'
+ protocol = self.args['protocol']
+ self.port = self.args['port']
+ self.tid = random.randint(0,65535)
+ self.timeout = self.args['timeout'];
+ opcode = self.args['opcode']
+ rd = self.args['rd']
+ server=self.args['server']
+ if 'resulttype' in self.args:
+ self.resulttype = self.args['resulttype']
+ else:
+ self.resulttype = 'default'
+ if type(self.args['qtype']) == bytes or type(self.args['qtype']) == str:
+ try:
+ qtype = getattr(Type, str(self.args['qtype'].upper()))
+ except AttributeError:
+ raise ArgumentError('unknown query type')
+ else:
+ qtype = self.args['qtype']
+ if 'name' not in self.args:
+ print((self.args))
+ raise ArgumentError('nothing to lookup')
+ qname = self.args['name']
+ if qtype == Type.AXFR and protocol != 'tcp':
+ print('Query type AXFR, protocol forced to TCP')
+ protocol = 'tcp'
+ #print('QTYPE %d(%s)' % (qtype, Type.typestr(qtype)))
+ m = Lib.Mpacker()
+ # jesus. keywords and default args would be good. TODO.
+ m.addHeader(self.tid,
+ 0, opcode, 0, 0, rd, 0, 0, 0,
+ 1, 0, 0, 0)
+ m.addQuestion(qname, qtype, Class.IN)
+ self.request = m.getbuf()
+ try:
+ if protocol == 'udp':
+ self.sendUDPRequest(server)
+ else:
+ self.sendTCPRequest(server)
+ except socket.error as reason:
+ raise SocketError(reason)
+ if self.py3async:
+ return None
+ else:
+ return self.response
+
+ def req(self,*name,**args):
+ " needs a refactoring "
+ self.argparse(name,args)
+ #if not self.args:
+ # raise ArgumentError, 'reinitialize request before reuse'
+ try:
+ if self.args['resulttype']:
+ raise ArgumentError('Restulttype {0} set with DNS.req, use DNS.qry to specify result type.'.format(self.args['resulttype']))
+ except:
+ # resulttype isn't set and that's what we want for DNS.req
+ pass
+ protocol = self.args['protocol']
+ self.port = self.args['port']
+ self.tid = random.randint(0,65535)
+ self.timeout = self.args['timeout'];
+ opcode = self.args['opcode']
+ rd = self.args['rd']
+ server=self.args['server']
+ if type(self.args['qtype']) == bytes or type(self.args['qtype']) == str:
+ try:
+ qtype = getattr(Type, str(self.args['qtype'].upper()))
+ except AttributeError:
+ raise ArgumentError('unknown query type')
+ else:
+ qtype = self.args['qtype']
+ if 'name' not in self.args:
+ print((self.args))
+ raise ArgumentError('nothing to lookup')
+ qname = self.args['name']
+ if qtype == Type.AXFR and protocol != 'tcp':
+ print('Query type AXFR, protocol forced to TCP')
+ protocol = 'tcp'
+ #print('QTYPE %d(%s)' % (qtype, Type.typestr(qtype)))
+ m = Lib.Mpacker()
+ # jesus. keywords and default args would be good. TODO.
+ m.addHeader(self.tid,
+ 0, opcode, 0, 0, rd, 0, 0, 0,
+ 1, 0, 0, 0)
+ m.addQuestion(qname, qtype, Class.IN)
+ self.request = m.getbuf()
+ try:
+ if protocol == 'udp':
+ self.sendUDPRequest(server)
+ else:
+ self.sendTCPRequest(server)
+ except socket.error as reason:
+ raise SocketError(reason)
+ if self.py3async:
+ return None
+ else:
+ return self.response
+
+ def sendUDPRequest(self, server):
+ "refactor me"
+ first_socket_error = None
+ self.response=None
+ for self.ns in server:
+ try:
+ if self.ns.count(':'):
+ if hasattr(socket,'has_ipv6') and socket.has_ipv6:
+ self.socketInit(socket.AF_INET6, socket.SOCK_DGRAM)
+ else: continue
+ else:
+ self.socketInit(socket.AF_INET, socket.SOCK_DGRAM)
+ try:
+ # TODO. Handle timeouts &c correctly (RFC)
+ self.time_start=time.time()
+ self.conn()
+ if not self.py3async:
+ self.s.send(self.request)
+ r=self.processUDPReply()
+ # Since we bind to the source port and connect to the
+ # destination port, we don't need to check that here,
+ # but do make sure it's actually a DNS request that the
+ # packet is in reply to.
+ while r.header['id'] != self.tid \
+ or self.from_address[1] != self.port:
+ r=self.processUDPReply()
+ self.response = r
+ # FIXME: check waiting async queries
+ finally:
+ if not self.py3async:
+ self.s.close()
+ except socket.error as e:
+ # Keep trying more nameservers, but preserve the first error
+ # that occurred so it can be reraised in case none of the
+ # servers worked:
+ first_socket_error = first_socket_error or e
+ continue
+ except TimeoutError as t:
+ first_socket_error = first_socket_error or t
+ continue
+ if self.response:
+ break
+ if not self.response and first_socket_error:
+ raise first_socket_error
+
+ def sendTCPRequest(self, server):
+ " do the work of sending a TCP request "
+ first_socket_error = None
+ self.response=None
+ for self.ns in server:
+ #print "trying tcp",self.ns
+ try:
+ if self.ns.count(':'):
+ if hasattr(socket,'has_ipv6') and socket.has_ipv6:
+ self.socketInit(socket.AF_INET6, socket.SOCK_STREAM)
+ else: continue
+ else:
+ self.socketInit(socket.AF_INET, socket.SOCK_STREAM)
+ try:
+ # TODO. Handle timeouts &c correctly (RFC)
+ self.time_start=time.time()
+ self.conn()
+ buf = Lib.pack16bit(len(self.request))+self.request
+ # Keep server from making sendall hang
+ self.s.setblocking(0)
+ # FIXME: throws WOULDBLOCK if request too large to fit in
+ # system buffer
+ self.s.sendall(buf)
+ # SHUT_WR breaks blocking IO with google DNS (8.8.8.8)
+ #self.s.shutdown(socket.SHUT_WR)
+ r=self.processTCPReply()
+ if r.header['id'] == self.tid:
+ self.response = r
+ break
+ finally:
+ self.s.close()
+ except socket.error as e:
+ first_socket_error = first_socket_error or e
+ continue
+ except TimeoutError as t:
+ first_socket_error = first_socket_error or t
+ continue
+ if self.response:
+ break
+ if not self.response and first_socket_error:
+ raise first_socket_error
+
+#class DnsAsyncRequest(DnsRequest):
+class DnsAsyncRequest(DnsRequest,asyncore.dispatcher_with_send):
+ " an asynchronous request object. out of date, probably broken "
+ def __init__(self,*name,**args):
+ DnsRequest.__init__(self, *name, **args)
+ # XXX todo
+ if 'done' in args and args['done']:
+ self.donefunc=args['done']
+ else:
+ self.donefunc=self.showResult
+ #self.realinit(name,args) # XXX todo
+ self.py3async=1
+ def conn(self):
+ self.getSource()
+ self.connect((self.ns,self.port))
+ self.time_start=time.time()
+ if 'start' in self.args and self.args['start']:
+ asyncore.dispatcher.go(self)
+ def socketInit(self,a,b):
+ self.create_socket(a,b)
+ asyncore.dispatcher.__init__(self)
+ self.s=self
+ def handle_read(self):
+ if self.args['protocol'] == 'udp':
+ self.response=self.processUDPReply()
+ if self.donefunc:
+ self.donefunc(*(self,))
+ def handle_connect(self):
+ self.send(self.request)
+ def handle_write(self):
+ pass
+ def showResult(self,*s):
+ self.response.show()
+
+def ParseOSXSysConfig():
+ "Retrieves the current Mac OS X resolver settings using the scutil(8) command."
+ import os, re
+ scutil = os.popen('/usr/sbin/scutil --dns', 'r')
+ res_re = re.compile('^\s+nameserver[]0-9[]*\s*\:\s*(\S+)$')
+ sets = [ ]
+ currentset = None
+ while True:
+ l = scutil.readline()
+ if not l:
+ break
+ l = l.rstrip()
+ if len(l) < 1 or l[0] not in string.whitespace:
+ currentset = None
+ continue
+ m = res_re.match(l)
+ if m:
+ if currentset is None:
+ currentset = [ ]
+ sets.append(currentset)
+ currentset.append(m.group(1))
+ scutil.close()
+ # Someday: Figure out if we should do something other than simply concatenate the sets.
+ for currentset in sets:
+ defaults['server'].extend(currentset)
+
diff --git a/DNS/Class.py b/DNS/Class.py
new file mode 100644
index 0000000..3412cc9
--- /dev/null
+++ b/DNS/Class.py
@@ -0,0 +1,35 @@
+"""
+$Id$
+
+ This file is part of the py3dns project.
+ Homepage: https://launchpad.net/py3dns
+
+ This code is covered by the standard Python License. See LICENSE for details.
+
+ CLASS values (section 3.2.4)
+"""
+
+
+IN = 1 # the Internet
+CS = 2 # the CSNET class (Obsolete - used only for examples in
+ # some obsolete RFCs)
+CH = 3 # the CHAOS class. When someone shows me python running on
+ # a Symbolics Lisp machine, I'll look at implementing this.
+HS = 4 # Hesiod [Dyer 87]
+
+# QCLASS values (section 3.2.5)
+
+ANY = 255 # any class
+
+
+# Construct reverse mapping dictionary
+
+_names = dir()
+classmap = {}
+for _name in _names:
+ if _name[0] != '_': classmap[eval(_name)] = _name
+
+def classstr(klass):
+ if klass in classmap: return classmap[klass]
+ else: return repr(klass)
+
diff --git a/DNS/Lib.py b/DNS/Lib.py
new file mode 100644
index 0000000..67622f5
--- /dev/null
+++ b/DNS/Lib.py
@@ -0,0 +1,809 @@
+# -*- encoding: utf-8 -*-
+"""
+ $Id$
+
+ This file is part of the py3dns project.
+ Homepage: https://launchpad.net/py3dns
+
+ This code is covered by the standard Python License. See LICENSE for details.
+
+Changes for Python3 port © 2011-13 Scott Kitterman <scott@kitterman.com>
+
+ Library code. Largely this is packers and unpackers for various types.
+"""
+
+#
+#
+# See RFC 1035:
+# ------------------------------------------------------------------------
+# Network Working Group P. Mockapetris
+# Request for Comments: 1035 ISI
+# November 1987
+# Obsoletes: RFCs 882, 883, 973
+#
+# DOMAIN NAMES - IMPLEMENTATION AND SPECIFICATION
+# ------------------------------------------------------------------------
+
+
+import types
+import socket
+
+from . import Type
+from . import Class
+from . import Opcode
+from . import Status
+import DNS
+
+from .Base import DNSError
+
+try:
+ import ipaddress
+except ImportError:
+ import ipaddr as ipaddress
+
+LABEL_UTF8 = False
+LABEL_ENCODING = 'idna'
+
+class UnpackError(DNSError): pass
+class PackError(DNSError): pass
+
+# Low-level 16 and 32 bit integer packing and unpacking
+
+from struct import pack as struct_pack
+from struct import unpack as struct_unpack
+from socket import inet_ntoa, inet_aton, inet_ntop, AF_INET6
+
+def pack16bit(n):
+ return struct_pack('!H', n)
+
+def pack32bit(n):
+ return struct_pack('!L', n)
+
+def unpack16bit(s):
+ return struct_unpack('!H', s)[0]
+
+def unpack32bit(s):
+ return struct_unpack('!L', s)[0]
+
+def addr2bin(addr):
+ # Updated from pyspf
+ """Convert a string IPv4 address into an unsigned integer.
+
+ Examples::
+ >>> addr2bin('127.0.0.1')
+ 2130706433
+
+ >>> addr2bin('127.0.0.1') == socket.INADDR_LOOPBACK
+ 1
+
+ >>> addr2bin('255.255.255.254')
+ 4294967294L
+
+ >>> addr2bin('192.168.0.1')
+ 3232235521L
+
+ Unlike old DNS.addr2bin, the n, n.n, and n.n.n forms for IP addresses
+ are handled as well::
+ >>> addr2bin('10.65536')
+ 167837696
+ >>> 10 * (2 ** 24) + 65536
+ 167837696
+
+ >>> addr2bin('10.93.512')
+ 173867520
+ >>> 10 * (2 ** 24) + 93 * (2 ** 16) + 512
+ 173867520
+ """
+ return struct_unpack("!L", inet_aton(addr))[0]
+
+def bin2addr(n):
+ return inet_ntoa(struct_pack('!L', n))
+
+def bin2addr6(n):
+ return inet_ntop(AF_INET6, n)
+
+def bin2long6(str):
+ # Also from pyspf
+ h, l = struct_unpack("!QQ", str)
+ return h << 64 | l
+
+# Packing class
+
+class Packer:
+ " packer base class. supports basic byte/16bit/32bit/addr/string/name "
+ def __init__(self):
+ if DNS.LABEL_UTF8:
+ enc = 'utf8'
+ else:
+ enc = DNS.LABEL_ENCODING
+ self.buf = bytes('', enc)
+ self.index = {}
+ def getbuf(self):
+ return self.buf
+ def addbyte(self, c):
+ if len(c) != 1: raise TypeError('one character expected')
+ if DNS.LABEL_UTF8:
+ enc = 'utf8'
+ else:
+ enc = DNS.LABEL_ENCODING
+ self.buf = self.buf + bytes(c,enc)
+ def addbytes(self, abytes):
+ if DNS.LABEL_UTF8:
+ enc = 'utf8'
+ else:
+ enc = DNS.LABEL_ENCODING
+ self.buf = self.buf + bytes(abytes, enc)
+ def add16bit(self, n):
+ self.buf = self.buf + bytes(pack16bit(n))
+ def add32bit(self, n):
+ self.buf = self.buf + bytes(pack32bit(n))
+ def addaddr(self, addr):
+ n = addr2bin(addr)
+ self.buf = self.buf + bytes(pack32bit(n))
+ def addstring(self, s):
+ if len(s) > 255:
+ raise ValueError("Can't encode string of length "+ \
+ "%s (> 255)"%(len(s)))
+ self.addbyte(chr(len(s)))
+ self.addbytes(s)
+ def addname(self, name):
+ # Domain name packing (section 4.1.4)
+ # Add a domain name to the buffer, possibly using pointers.
+ # The case of the first occurrence of a name is preserved.
+ # Redundant dots are ignored.
+ nlist = []
+ for label in name.split('.'):
+ if not label:
+ pass # Passing to ignore redundant dots per comments
+ else:
+ nlist.append(label)
+ keys = []
+ for i in range(len(nlist)):
+ key = '.'.join(nlist[i:])
+ key = key.upper()
+ keys.append(key)
+ if key in self.index:
+ pointer = self.index[key]
+ break
+ else:
+ i = len(nlist)
+ pointer = None
+ # Do it into temporaries first so exceptions don't
+ # mess up self.index and self.buf
+ offset = len(self.buf)
+ index = []
+ if DNS.LABEL_UTF8:
+ enc = 'utf8'
+ else:
+ enc = DNS.LABEL_ENCODING
+ buf = bytes('', enc)
+ for j in range(i):
+ label = nlist[j]
+ try:
+ label = label.encode(enc)
+ except UnicodeEncodeError:
+ if not DNS.LABEL_UTF8: raise
+ if not label.startswith('\ufeff'):
+ label = '\ufeff'+label
+ label = label.encode(enc)
+ n = len(label)
+ if n > 63:
+ raise PackError('label too long')
+ if offset + len(buf) < 0x3FFF:
+ index.append((keys[j], offset + len(buf)))
+ else:
+ print('DNS.Lib.Packer.addname:')
+ print('warning: pointer too big')
+ buf = buf + bytes([n]) + label
+ if pointer:
+ buf = buf + (pack16bit(pointer | 0xC000))
+ else:
+ buf = buf + bytes('\0', enc)
+ self.buf = self.buf + buf
+ for key, value in index:
+ self.index[key] = value
+ def dump(self):
+ keys = list(self.index.keys())
+ keys.sort()
+ print('-'*40)
+ for key in keys:
+ print('%20s %3d' % (key, self.index[key]))
+ print('-'*40)
+ space = 1
+ for i in range(0, len(self.buf)+1, 2):
+ if self.buf[i:i+2] == '**':
+ if not space: print()
+ space = 1
+ continue
+ space = 0
+ print('%4d' % i)
+ for c in self.buf[i:i+2]:
+ if ' ' < c < '\177':
+ print(' %c' % c)
+ else:
+ print('%2d' % ord(c))
+ print()
+ print('-'*40)
+
+
+# Unpacking class
+
+
+class Unpacker:
+ def __init__(self, buf):
+ # buf should be binary in Python3
+ self.buf = buf
+ self.offset = 0
+ def getbyte(self):
+ if self.offset >= len(self.buf):
+ raise UnpackError("Ran off end of data")
+ c = self.buf[self.offset]
+ self.offset = self.offset + 1
+ return c
+ def getbytes(self, n):
+ s = (self.buf[self.offset : self.offset + n])
+ if len(s) != n: raise UnpackError('not enough data left')
+ self.offset = self.offset + n
+ return s
+ def get16bit(self):
+ return unpack16bit(self.getbytes(2))
+ def get32bit(self):
+ return unpack32bit(self.getbytes(4))
+ def getaddr(self):
+ if DNS.LABEL_UTF8:
+ enc = 'utf8'
+ else:
+ enc = DNS.LABEL_ENCODING
+ return bytes(bin2addr(self.get32bit()),enc)
+ def getaddr6(self):
+ return (self.getbytes(16))
+ def getstring(self):
+ return self.getbytes(self.getbyte())
+ def getname(self):
+ # Domain name unpacking (section 4.1.4)
+ i = self.getbyte()
+ #i = ord(i)
+ if i and i & 0xC0 == 0xC0:
+ d = self.getbyte()
+ j = d
+ pointer = ((i<<8) | j) & ~0xC000
+ save_offset = self.offset
+ try:
+ self.offset = pointer
+ domain = self.getname()
+ finally:
+ self.offset = save_offset
+ return domain
+ if i == 0:
+ return ''
+ if DNS.LABEL_UTF8:
+ enc = 'utf8'
+ else:
+ enc = DNS.LABEL_ENCODING
+ domain = str(self.getbytes(i), enc)
+ remains = self.getname()
+ if not remains:
+ return domain
+ else:
+ return domain + '.' + remains
+
+# Test program for packin/unpacking (section 4.1.4)
+
+def testpacker():
+ N = 2500
+ R = list(range(N))
+ import timing
+ # See section 4.1.4 of RFC 1035
+ timing.start()
+ for i in R:
+ p = Packer()
+ p.addaddr('192.168.0.1')
+ p.addbytes('*' * 20)
+ p.addname('f.ISI.ARPA')
+ p.addbytes('*' * 8)
+ p.addname('Foo.F.isi.arpa')
+ p.addbytes('*' * 18)
+ p.addname('arpa')
+ p.addbytes('*' * 26)
+ p.addname('')
+ timing.finish()
+ print(timing.milli(), "ms total for packing")
+ print(round(timing.milli() / i, 4), 'ms per packing')
+ #p.dump()
+ u = Unpacker(p.buf)
+ u.getaddr()
+ u.getbytes(20)
+ u.getname()
+ u.getbytes(8)
+ u.getname()
+ u.getbytes(18)
+ u.getname()
+ u.getbytes(26)
+ u.getname()
+ timing.start()
+ for i in R:
+ u = Unpacker(p.buf)
+
+ res = (u.getaddr(),
+ u.getbytes(20),
+ u.getname(),
+ u.getbytes(8),
+ u.getname(),
+ u.getbytes(18),
+ u.getname(),
+ u.getbytes(26),
+ u.getname())
+ timing.finish()
+ print(timing.milli(), "ms total for unpacking")
+ print(round(timing.milli() / i, 4), 'ms per unpacking')
+ #for item in res: print item
+
+
+# Pack/unpack RR toplevel format (section 3.2.1)
+
+class RRpacker(Packer):
+ def __init__(self):
+ Packer.__init__(self)
+ self.rdstart = None
+ def addRRheader(self, name, RRtype, klass, ttl, *rest):
+ self.addname(name)
+ self.add16bit(RRtype)
+ self.add16bit(klass)
+ self.add32bit(ttl)
+ if rest:
+ if rest[1:]: raise TypeError('too many args')
+ rdlength = rest[0]
+ else:
+ rdlength = 0
+ self.add16bit(rdlength)
+ self.rdstart = len(self.buf)
+ def patchrdlength(self):
+ rdlength = unpack16bit(self.buf[self.rdstart-2:self.rdstart])
+ if rdlength == len(self.buf) - self.rdstart:
+ return
+ rdata = self.buf[self.rdstart:]
+ save_buf = self.buf
+ ok = 0
+ try:
+ self.buf = self.buf[:self.rdstart-2]
+ self.add16bit(len(rdata))
+ self.buf = self.buf + rdata
+ ok = 1
+ finally:
+ if not ok: self.buf = save_buf
+ def endRR(self):
+ if self.rdstart is not None:
+ self.patchrdlength()
+ self.rdstart = None
+ def getbuf(self):
+ if self.rdstart is not None: self.patchrdlength()
+ return Packer.getbuf(self)
+ # Standard RRs (section 3.3)
+ def addCNAME(self, name, klass, ttl, cname):
+ self.addRRheader(name, Type.CNAME, klass, ttl)
+ self.addname(cname)
+ self.endRR()
+ def addHINFO(self, name, klass, ttl, cpu, os):
+ self.addRRheader(name, Type.HINFO, klass, ttl)
+ self.addstring(cpu)
+ self.addstring(os)
+ self.endRR()
+ def addMX(self, name, klass, ttl, preference, exchange):
+ self.addRRheader(name, Type.MX, klass, ttl)
+ self.add16bit(preference)
+ self.addname(exchange)
+ self.endRR()
+ def addNS(self, name, klass, ttl, nsdname):
+ self.addRRheader(name, Type.NS, klass, ttl)
+ self.addname(nsdname)
+ self.endRR()
+ def addPTR(self, name, klass, ttl, ptrdname):
+ self.addRRheader(name, Type.PTR, klass, ttl)
+ self.addname(ptrdname)
+ self.endRR()
+ def addSOA(self, name, klass, ttl,
+ mname, rname, serial, refresh, retry, expire, minimum):
+ self.addRRheader(name, Type.SOA, klass, ttl)
+ self.addname(mname)
+ self.addname(rname)
+ self.add32bit(serial)
+ self.add32bit(refresh)
+ self.add32bit(retry)
+ self.add32bit(expire)
+ self.add32bit(minimum)
+ self.endRR()
+ def addTXT(self, name, klass, ttl, tlist):
+ self.addRRheader(name, Type.TXT, klass, ttl)
+ if type(tlist) is bytes or type(tlist) is str:
+ tlist = [tlist]
+ for txtdata in tlist:
+ self.addstring(txtdata)
+ self.endRR()
+ def addSPF(self, name, klass, ttl, tlist):
+ self.addRRheader(name, Type.TXT, klass, ttl)
+ if type(tlist) is bytes or type(tlist) is str:
+ tlist = [tlist]
+ for txtdata in tlist:
+ self.addstring(txtdata)
+ self.endRR()
+ # Internet specific RRs (section 3.4) -- class = IN
+ def addA(self, name, klass, ttl, address):
+ self.addRRheader(name, Type.A, klass, ttl)
+ self.addaddr(address)
+ self.endRR()
+ def addWKS(self, name, ttl, address, protocol, bitmap):
+ self.addRRheader(name, Type.WKS, Class.IN, ttl)
+ self.addaddr(address)
+ self.addbyte(chr(protocol))
+ self.addbytes(bitmap)
+ self.endRR()
+ def addSRV(self):
+ raise NotImplementedError
+
+def prettyTime(seconds):
+ if seconds<60:
+ return seconds,"%d seconds"%(seconds)
+ if seconds<3600:
+ return seconds,"%d minutes"%(seconds/60)
+ if seconds<86400:
+ return seconds,"%d hours"%(seconds/3600)
+ if seconds<604800:
+ return seconds,"%d days"%(seconds/86400)
+ else:
+ return seconds,"%d weeks"%(seconds/604800)
+
+class RRunpacker(Unpacker):
+ def __init__(self, buf):
+ Unpacker.__init__(self, buf)
+ self.rdend = None
+ def getRRheader(self):
+ name = self.getname()
+ rrtype = self.get16bit()
+ klass = self.get16bit()
+ ttl = self.get32bit()
+ rdlength = self.get16bit()
+ self.rdend = self.offset + rdlength
+ return (name, rrtype, klass, ttl, rdlength)
+ def endRR(self):
+ if self.offset != self.rdend:
+ raise UnpackError('end of RR not reached')
+ def getCNAMEdata(self):
+ return self.getname()
+ def getHINFOdata(self):
+ if DNS.LABEL_UTF8:
+ enc = 'utf8'
+ else:
+ enc = DNS.LABEL_ENCODING
+ return str(self.getstring(), enc), str(self.getstring(),enc)
+ def getMXdata(self):
+ return self.get16bit(), self.getname()
+ def getNSdata(self):
+ return self.getname()
+ def getPTRdata(self):
+ return self.getname()
+ def getSOAdata(self):
+ return self.getname(), \
+ self.getname(), \
+ ('serial',)+(self.get32bit(),), \
+ ('refresh ',)+prettyTime(self.get32bit()), \
+ ('retry',)+prettyTime(self.get32bit()), \
+ ('expire',)+prettyTime(self.get32bit()), \
+ ('minimum',)+prettyTime(self.get32bit())
+ def getTXTdata(self):
+ tlist = []
+ while self.offset != self.rdend:
+ tlist.append(bytes(self.getstring()))
+ return tlist
+ getSPFdata = getTXTdata
+ def getAdata(self):
+ if DNS.LABEL_UTF8:
+ enc = 'utf8'
+ else:
+ enc = DNS.LABEL_ENCODING
+ return self.getaddr().decode(enc)
+ def getWKSdata(self):
+ address = self.getaddr()
+ protocol = ord(self.getbyte())
+ bitmap = self.getbytes(self.rdend - self.offset)
+ return address, protocol, bitmap
+ def getSRVdata(self):
+ """
+ _Service._Proto.Name TTL Class SRV Priority Weight Port Target
+ """
+ priority = self.get16bit()
+ weight = self.get16bit()
+ port = self.get16bit()
+ target = self.getname()
+ #print '***priority, weight, port, target', priority, weight, port, target
+ return priority, weight, port, target
+
+class RRunpackerDefault(RRunpacker):
+ # Default for DNS.qry
+ def __init__(self, buf):
+ RRunpacker.__init__(self, buf)
+ self.rdend = None
+ def getAdata(self):
+ if DNS.LABEL_UTF8:
+ enc = 'utf8'
+ else:
+ enc = DNS.LABEL_ENCODING
+ x = socket.inet_aton(self.getaddr().decode(enc))
+ return ipaddress.IPv4Address(struct_unpack("!I", x)[0])
+ def getAAAAdata(self):
+ return ipaddress.IPv6Address(bin2addr6(self.getaddr6()))
+
+class RRunpackerText(RRunpackerDefault):
+ def __init__(self, buf):
+ RRunpackerDefault.__init__(self, buf)
+ def getAdata(self):
+ if DNS.LABEL_UTF8:
+ enc = 'utf8'
+ else:
+ enc = DNS.LABEL_ENCODING
+ return self.getaddr().decode(enc)
+ def getAAAAdata(self):
+ return bin2addr6(self.getaddr6())
+ def getTXTdata(self):
+ if DNS.LABEL_UTF8:
+ enc = 'utf8'
+ else:
+ enc = DNS.LABEL_ENCODING
+ tlist = []
+ while self.offset != self.rdend:
+ tlist.append(str(self.getstring(), enc))
+ return tlist
+
+class RRunpackerInteger(RRunpackerDefault):
+ def __init__(self, buf):
+ RRunpackerDefault.__init__(self, buf)
+ def getAdata(self):
+ if DNS.LABEL_UTF8:
+ enc = 'utf8'
+ else:
+ enc = DNS.LABEL_ENCODING
+ x = socket.inet_aton(self.getaddr().decode(enc))
+ return struct_unpack("!I", x)[0]
+ def getAAAAdata(self):
+ return bin2long6(self.getaddr6())
+
+class RRunpackerBinary(Unpacker):
+ def __init__(self, buf):
+ Unpacker.__init__(self, buf)
+ self.rdend = None
+ def getRRheader(self):
+ name = self.getname()
+ rrtype = self.get16bit()
+ klass = self.get16bit()
+ ttl = self.get32bit()
+ rdlength = self.get16bit()
+ self.rdlength = rdlength
+ self.rdend = self.offset + rdlength
+ return (name, rrtype, klass, ttl, rdlength)
+ def endRR(self):
+ if self.offset != self.rdend:
+ raise UnpackError('end of RR not reached')
+ def getTXTdata(self):
+ tlist = []
+ while self.offset != self.rdend:
+ tlist.append(self.getbytes(self.rdlength))
+ return tlist
+ getSPFdata = getTXTdata
+
+# Pack/unpack Message Header (section 4.1)
+
+class Hpacker(Packer):
+ def addHeader(self, id, qr, opcode, aa, tc, rd, ra, z, rcode,
+ qdcount, ancount, nscount, arcount):
+ self.add16bit(id)
+ self.add16bit((qr&1)<<15 | (opcode&0xF)<<11 | (aa&1)<<10
+ | (tc&1)<<9 | (rd&1)<<8 | (ra&1)<<7
+ | (z&7)<<4 | (rcode&0xF))
+ self.add16bit(qdcount)
+ self.add16bit(ancount)
+ self.add16bit(nscount)
+ self.add16bit(arcount)
+
+class Hunpacker(Unpacker):
+ def getHeader(self):
+ id = self.get16bit()
+ flags = self.get16bit()
+ qr, opcode, aa, tc, rd, ra, z, rcode = (
+ (flags>>15)&1,
+ (flags>>11)&0xF,
+ (flags>>10)&1,
+ (flags>>9)&1,
+ (flags>>8)&1,
+ (flags>>7)&1,
+ (flags>>4)&7,
+ (flags>>0)&0xF)
+ qdcount = self.get16bit()
+ ancount = self.get16bit()
+ nscount = self.get16bit()
+ arcount = self.get16bit()
+ return (id, qr, opcode, aa, tc, rd, ra, z, rcode,
+ qdcount, ancount, nscount, arcount)
+
+
+# Pack/unpack Question (section 4.1.2)
+
+class Qpacker(Packer):
+ def addQuestion(self, qname, qtype, qclass):
+ self.addname(qname)
+ self.add16bit(qtype)
+ self.add16bit(qclass)
+
+class Qunpacker(Unpacker):
+ def getQuestion(self):
+ return self.getname(), self.get16bit(), self.get16bit()
+
+
+# Pack/unpack Message(section 4)
+# NB the order of the base classes is important for __init__()!
+
+class Mpacker(RRpacker, Qpacker, Hpacker):
+ pass
+
+class Munpacker(RRunpacker, Qunpacker, Hunpacker):
+ # Default results for DNS.req
+ pass
+
+class MunpackerDefault(RRunpackerDefault, Qunpacker, Hunpacker):
+ # Default results for DNS.qry
+ pass
+
+class MunpackerText(RRunpackerText, Qunpacker, Hunpacker):
+ pass
+
+class MunpackerBinary(RRunpackerBinary, Qunpacker, Hunpacker):
+ pass
+
+class MunpackerInteger(RRunpackerInteger, Qunpacker, Hunpacker):
+ pass
+
+# Routines to print an unpacker to stdout, for debugging.
+# These affect the unpacker's current position!
+
+def dumpM(u):
+ print('HEADER:')
+ (id, qr, opcode, aa, tc, rd, ra, z, rcode,
+ qdcount, ancount, nscount, arcount) = u.getHeader()
+ print('id=%d,' % id)
+ print('qr=%d, opcode=%d, aa=%d, tc=%d, rd=%d, ra=%d, z=%d, rcode=%d,' \
+ % (qr, opcode, aa, tc, rd, ra, z, rcode))
+ if tc: print('*** response truncated! ***')
+ if rcode: print('*** nonzero error code! (%d) ***' % rcode)
+ print(' qdcount=%d, ancount=%d, nscount=%d, arcount=%d' \
+ % (qdcount, ancount, nscount, arcount))
+ for i in range(qdcount):
+ print('QUESTION %d:' % i)
+ dumpQ(u)
+ for i in range(ancount):
+ print('ANSWER %d:' % i)
+ dumpRR(u)
+ for i in range(nscount):
+ print('AUTHORITY RECORD %d:' % i)
+ dumpRR(u)
+ for i in range(arcount):
+ print('ADDITIONAL RECORD %d:' % i)
+ dumpRR(u)
+
+class DnsResult:
+
+ def __init__(self,u,args):
+ self.header={}
+ self.questions=[]
+ self.answers=[]
+ self.authority=[]
+ self.additional=[]
+ self.args=args
+ self.storeM(u)
+
+ def show(self):
+ import time
+ print('; <<>> PDG.py 1.0 <<>> %s %s'%(self.args['name'],
+ self.args['qtype']))
+ opt=""
+ if self.args['rd']:
+ opt=opt+'recurs '
+ h=self.header
+ print(';; options: '+opt)
+ print(';; got answer:')
+ print(';; ->>HEADER<<- opcode %s, status %s, id %d'%(
+ h['opcode'],h['status'],h['id']))
+ flags=list(filter(lambda x,h=h:h[x],('qr','aa','rd','ra','tc')))
+ print(';; flags: %s; Ques: %d, Ans: %d, Auth: %d, Addit: %d'%(
+ ' '.join(flags),h['qdcount'],h['ancount'],h['nscount'],
+ h['arcount']))
+ print(';; QUESTIONS:')
+ for q in self.questions:
+ print(';; %s, type = %s, class = %s'%(q['qname'],q['qtypestr'],
+ q['qclassstr']))
+ print()
+ print(';; ANSWERS:')
+ for a in self.answers:
+ print('%-20s %-6s %-6s %s'%(a['name'],repr(a['ttl']),a['typename'],
+ a['data']))
+ print()
+ print(';; AUTHORITY RECORDS:')
+ for a in self.authority:
+ print('%-20s %-6s %-6s %s'%(a['name'],repr(a['ttl']),a['typename'],
+ a['data']))
+ print()
+ print(';; ADDITIONAL RECORDS:')
+ for a in self.additional:
+ print('%-20s %-6s %-6s %s'%(a['name'],repr(a['ttl']),a['typename'],
+ a['data']))
+ print()
+ if 'elapsed' in self.args:
+ print(';; Total query time: %d msec'%self.args['elapsed'])
+ print(';; To SERVER: %s'%(self.args['server']))
+ print(';; WHEN: %s'%time.ctime(time.time()))
+
+ def storeM(self,u):
+ (self.header['id'], self.header['qr'], self.header['opcode'],
+ self.header['aa'], self.header['tc'], self.header['rd'],
+ self.header['ra'], self.header['z'], self.header['rcode'],
+ self.header['qdcount'], self.header['ancount'],
+ self.header['nscount'], self.header['arcount']) = u.getHeader()
+ self.header['opcodestr']=Opcode.opcodestr(self.header['opcode'])
+ self.header['status']=Status.statusstr(self.header['rcode'])
+ for i in range(self.header['qdcount']):
+ #print 'QUESTION %d:' % i,
+ self.questions.append(self.storeQ(u))
+ for i in range(self.header['ancount']):
+ #print 'ANSWER %d:' % i,
+ self.answers.append(self.storeRR(u))
+ for i in range(self.header['nscount']):
+ #print 'AUTHORITY RECORD %d:' % i,
+ self.authority.append(self.storeRR(u))
+ for i in range(self.header['arcount']):
+ #print 'ADDITIONAL RECORD %d:' % i,
+ self.additional.append(self.storeRR(u))
+
+ def storeQ(self,u):
+ q={}
+ q['qname'], q['qtype'], q['qclass'] = u.getQuestion()
+ q['qtypestr']=Type.typestr(q['qtype'])
+ q['qclassstr']=Class.classstr(q['qclass'])
+ return q
+
+ def storeRR(self,u):
+ r={}
+ r['name'],r['type'],r['class'],r['ttl'],r['rdlength'] = u.getRRheader()
+ r['typename'] = Type.typestr(r['type'])
+ r['classstr'] = Class.classstr(r['class'])
+ #print 'name=%s, type=%d(%s), class=%d(%s), ttl=%d' \
+ # % (name,
+ # type, typename,
+ # klass, Class.classstr(class),
+ # ttl)
+ mname = 'get%sdata' % r['typename']
+ if hasattr(u, mname):
+ r['data']=getattr(u, mname)()
+ else:
+ r['data']=u.getbytes(r['rdlength'])
+ return r
+
+def dumpQ(u):
+ qname, qtype, qclass = u.getQuestion()
+ print('qname=%s, qtype=%d(%s), qclass=%d(%s)' \
+ % (qname,
+ qtype, Type.typestr(qtype),
+ qclass, Class.classstr(qclass)))
+
+def dumpRR(u):
+ name, type, klass, ttl, rdlength = u.getRRheader()
+ typename = Type.typestr(type)
+ print('name=%s, type=%d(%s), class=%d(%s), ttl=%d' \
+ % (name,
+ type, typename,
+ klass, Class.classstr(klass),
+ ttl))
+ mname = 'get%sdata' % typename
+ if hasattr(u, mname):
+ print(' formatted rdata:', getattr(u, mname)())
+ else:
+ print(' binary rdata:', u.getbytes(rdlength))
+
+if __name__ == "__main__":
+ testpacker()
diff --git a/DNS/Opcode.py b/DNS/Opcode.py
new file mode 100644
index 0000000..2762bc1
--- /dev/null
+++ b/DNS/Opcode.py
@@ -0,0 +1,30 @@
+"""
+ $Id$
+
+ This file is part of the py3dns project.
+ Homepage: https://launchpad.net/py3dns
+
+ This code is covered by the standard Python License. See LICENSE for details.
+
+ Opcode values in message header. RFC 1035, 1996, 2136.
+"""
+
+
+
+QUERY = 0
+IQUERY = 1
+STATUS = 2
+NOTIFY = 4
+UPDATE = 5
+
+# Construct reverse mapping dictionary
+
+_names = dir()
+opcodemap = {}
+for _name in _names:
+ if _name[0] != '_': opcodemap[eval(_name)] = _name
+
+def opcodestr(opcode):
+ if opcode in opcodemap: return opcodemap[opcode]
+ else: return repr(opcode)
+
diff --git a/DNS/Status.py b/DNS/Status.py
new file mode 100644
index 0000000..7df1c4d
--- /dev/null
+++ b/DNS/Status.py
@@ -0,0 +1,41 @@
+"""
+ $Id$
+
+ This file is part of the py3dns project.
+ Homepage: https://launchpad.net/py3dns
+
+ This code is covered by the standard Python License. See LICENSE for details.
+
+ Status values in message header
+"""
+
+NOERROR = 0 # No Error [RFC 1035]
+FORMERR = 1 # Format Error [RFC 1035]
+SERVFAIL = 2 # Server Failure [RFC 1035]
+NXDOMAIN = 3 # Non-Existent Domain [RFC 1035]
+NOTIMP = 4 # Not Implemented [RFC 1035]
+REFUSED = 5 # Query Refused [RFC 1035]
+YXDOMAIN = 6 # Name Exists when it should not [RFC 2136]
+YXRRSET = 7 # RR Set Exists when it should not [RFC 2136]
+NXRRSET = 8 # RR Set that should exist does not [RFC 2136]
+NOTAUTH = 9 # Server Not Authoritative for zone [RFC 2136]
+NOTZONE = 10 # Name not contained in zone [RFC 2136]
+BADVERS = 16 # Bad OPT Version [RFC 2671]
+BADSIG = 16 # TSIG Signature Failure [RFC 2845]
+BADKEY = 17 # Key not recognized [RFC 2845]
+BADTIME = 18 # Signature out of time window [RFC 2845]
+BADMODE = 19 # Bad TKEY Mode [RFC 2930]
+BADNAME = 20 # Duplicate key name [RFC 2930]
+BADALG = 21 # Algorithm not supported [RFC 2930]
+
+# Construct reverse mapping dictionary
+
+_names = dir()
+statusmap = {}
+for _name in _names:
+ if _name[0] != '_': statusmap[eval(_name)] = _name
+
+def statusstr(status):
+ if status in statusmap: return statusmap[status]
+ else: return repr(status)
+
diff --git a/DNS/Type.py b/DNS/Type.py
new file mode 100644
index 0000000..758a775
--- /dev/null
+++ b/DNS/Type.py
@@ -0,0 +1,55 @@
+# -*- encoding: utf-8 -*-
+"""
+ $Id$
+
+ This file is part of the py3dns project.
+ Homepage: https://launchpad.net/py3dns
+
+ This code is covered by the standard Python License. See LICENSE for details.
+
+ TYPE values (section 3.2.2)
+"""
+
+A = 1 # a host address
+NS = 2 # an authoritative name server
+MD = 3 # a mail destination (Obsolete - use MX)
+MF = 4 # a mail forwarder (Obsolete - use MX)
+CNAME = 5 # the canonical name for an alias
+SOA = 6 # marks the start of a zone of authority
+MB = 7 # a mailbox domain name (EXPERIMENTAL)
+MG = 8 # a mail group member (EXPERIMENTAL)
+MR = 9 # a mail rename domain name (EXPERIMENTAL)
+NULL = 10 # a null RR (EXPERIMENTAL)
+WKS = 11 # a well known service description
+PTR = 12 # a domain name pointer
+HINFO = 13 # host information
+MINFO = 14 # mailbox or mail list information
+MX = 15 # mail exchange
+TXT = 16 # text strings
+AAAA = 28 # IPv6 AAAA records (RFC 1886)
+SRV = 33 # DNS RR for specifying the location of services (RFC 2782)
+SPF = 99 # TXT RR for Sender Policy Framework
+
+# Additional TYPE values from host.c source
+
+UNAME = 110
+MP = 240
+
+# QTYPE values (section 3.2.3)
+
+AXFR = 252 # A request for a transfer of an entire zone
+MAILB = 253 # A request for mailbox-related records (MB, MG or MR)
+MAILA = 254 # A request for mail agent RRs (Obsolete - see MX)
+ANY = 255 # A request for all records
+
+# Construct reverse mapping dictionary
+
+_names = dir()
+typemap = {}
+for _name in _names:
+ if _name[0] != '_': typemap[eval(_name)] = _name
+
+def typestr(type):
+ if type in typemap: return typemap[type]
+ else: return repr(type)
+
diff --git a/DNS/__init__.py b/DNS/__init__.py
new file mode 100644
index 0000000..dcde728
--- /dev/null
+++ b/DNS/__init__.py
@@ -0,0 +1,38 @@
+# -*- encoding: utf-8 -*-
+# $Id$
+#
+# This file is part of the py3dns project.
+# Homepage: https://launchpad.net/py3dns
+#
+# Changes for Python3 port © 2011 Scott Kitterman <scott@kitterman.com>
+#
+# This code is covered by the standard Python License. See LICENSE for details.
+
+# __init__.py for DNS class.
+
+__version__ = '3.2.1'
+
+try:
+ import ipaddress
+except ImportError:
+ try:
+ import ipaddr as ipaddress
+ except ImportError:
+ raise Exception("py3dns 3.1 requires either ipaddress (python3.3) or ipaddr, see CHANGES for 3.1.0")
+
+from . import Type
+from . import Opcode
+from . import Status
+from . import Class
+from .Base import DnsRequest
+from .Base import DNSError
+from .Lib import DnsResult
+from .Base import *
+from .Lib import *
+Error=DNSError
+from .lazy import *
+Request = DnsRequest
+Result = DnsResult
+
+Base._DiscoverNameServers()
+
diff --git a/DNS/lazy.py b/DNS/lazy.py
new file mode 100644
index 0000000..8aa5151
--- /dev/null
+++ b/DNS/lazy.py
@@ -0,0 +1,83 @@
+# $Id$
+#
+# This file is part of the pydns project.
+# Homepage: http://pydns.sourceforge.net
+#
+# This code is covered by the standard Python License. See LICENSE for details.
+#
+
+# routines for lazy people.
+from . import Base
+from . Base import ServerError
+
+class NoDataError(IndexError): pass
+class StatusError(IndexError): pass
+
+def revlookup(name,timeout=30):
+ "convenience routine for doing a reverse lookup of an address"
+ if Base.defaults['server'] == []: Base.DiscoverNameServers()
+ names = revlookupall(name, timeout)
+ if not names: return None
+ return names[0] # return shortest name
+
+def revlookupall(name,timeout=30):
+ "convenience routine for doing a reverse lookup of an address"
+ # FIXME: check for IPv6
+ a = name.split('.')
+ a.reverse()
+ b = '.'.join(a)+'.in-addr.arpa'
+ qtype='ptr'
+ names = dnslookup(b, qtype, timeout)
+ # this will return all records.
+ names.sort(key=str.__len__)
+ return names
+
+def dnslookup(name,qtype,timeout=30):
+ "convenience routine to return just answer data for any query type"
+ if Base.defaults['server'] == []: Base.DiscoverNameServers()
+ result = Base.DnsRequest(name=name, qtype=qtype).req(timeout=timeout)
+ if result.header['status'] != 'NOERROR':
+ raise ServerError("DNS query status: %s" % result.header['status'],
+ result.header['rcode'])
+ elif len(result.answers) == 0 and Base.defaults['server_rotate']:
+ # check with next DNS server
+ result = Base.DnsRequest(name=name, qtype=qtype).req(timeout=timeout)
+ if result.header['status'] != 'NOERROR':
+ raise ServerError("DNS query status: %s" % result.header['status'],
+ result.header['rcode'])
+ return [x['data'] for x in result.answers]
+
+def mxlookup(name,timeout=30):
+ """
+ convenience routine for doing an MX lookup of a name. returns a
+ sorted list of (preference, mail exchanger) records
+ """
+ qtype = 'mx'
+ l = dnslookup(name, qtype, timeout)
+ return l
+
+#
+# $Log$
+# Revision 1.5.2.1.2.2 2011/03/23 01:42:07 customdesigned
+# Changes from 2.3 branch
+#
+# Revision 1.5.2.1.2.1 2011/02/18 19:35:22 customdesigned
+# Python3 updates from Scott Kitterman
+#
+# Revision 1.5.2.1 2007/05/22 20:23:38 customdesigned
+# Lazy call to DiscoverNameServers
+#
+# Revision 1.5 2002/05/06 06:14:38 anthonybaxter
+# reformat, move import to top of file.
+#
+# Revision 1.4 2002/03/19 12:41:33 anthonybaxter
+# tabnannied and reindented everything. 4 space indent, no tabs.
+# yay.
+#
+# Revision 1.3 2001/08/09 09:08:55 anthonybaxter
+# added identifying header to top of each file
+#
+# Revision 1.2 2001/07/19 06:57:07 anthony
+# cvs keywords added
+#
+#
diff --git a/DNS/tests/__init__.py b/DNS/tests/__init__.py
new file mode 100644
index 0000000..5c87d47
--- /dev/null
+++ b/DNS/tests/__init__.py
@@ -0,0 +1,13 @@
+import unittest
+import importlib
+
+def test_suite():
+ module_names = [
+ '.testPackers',
+ '.test_base'
+ ]
+ suites = []
+ for m in module_names:
+ module = importlib.import_module(m, 'DNS.tests')
+ suites.append(module.test_suite())
+ return unittest.TestSuite(suites)
diff --git a/DNS/tests/testPackers.py b/DNS/tests/testPackers.py
new file mode 100755
index 0000000..a27725e
--- /dev/null
+++ b/DNS/tests/testPackers.py
@@ -0,0 +1,430 @@
+#!/usr/bin/python3
+
+#
+# Tests of the packet assembler/disassembler routines.
+#
+# only tests the simple packers for now. next is to test the
+# classes: Hpacker/Hunpacker,
+# Qpacker/Unpacker, then Mpacker/Munpacker
+#
+# Start doing unpleasant tests with broken data, truncations, that
+# sort of thing.
+
+import sys ; sys.path.insert(0, '..')
+import DNS
+import socket
+import unittest
+
+TestCompleted = "TestCompleted" # exc.
+
+class Int16Packing(unittest.TestCase):
+ knownValues = ( ( 10, b'\x00\n'),
+ ( 500, b'\x01\xf4' ),
+ ( 5340, b'\x14\xdc' ),
+ ( 51298, b'\xc8b'),
+ ( 65535, b'\xff\xff'),
+ )
+
+ def test16bitPacking(self):
+ """ pack16bit should give known output for known input """
+ for i,s in self.knownValues:
+ result = DNS.Lib.pack16bit(i)
+ self.assertEqual(s,result)
+
+ def test16bitUnpacking(self):
+ """ unpack16bit should give known output for known input """
+ for i,s in self.knownValues:
+ result = DNS.Lib.unpack16bit(s)
+ self.assertEqual(i,result)
+
+class Int32Packing(unittest.TestCase):
+ knownValues = ( ( 10, b'\x00\x00\x00\n'),
+ ( 500, b'\x00\x00\x01\xf4' ),
+ ( 5340, b'\x00\x00\x14\xdc' ),
+ ( 51298, b'\x00\x00\xc8b'),
+ ( 65535, b'\x00\x00\xff\xff'),
+ ( 33265535, b'\x01\xfb\x97\x7f' ),
+ ( 147483647, b'\x08\xcak\xff' ),
+ ( 2147483647, b'\x7f\xff\xff\xff' ),
+ )
+ def test32bitPacking(self):
+ """ pack32bit should give known output for known input """
+ for i,s in self.knownValues:
+ result = DNS.Lib.pack32bit(i)
+ self.assertEqual(s,result)
+
+ def test32bitUnpacking(self):
+ """ unpack32bit should give known output for known input """
+ for i,s in self.knownValues:
+ result = DNS.Lib.unpack32bit(s)
+ self.assertEqual(i,result)
+
+
+class IPaddrPacking(unittest.TestCase):
+ knownValues = (
+ ('127.0.0.1', 2130706433 ),
+ ('10.99.23.13', 174266125 ),
+ ('192.35.59.45', 3223534381), # Not signed anymore - it's all long now.
+ ('255.255.255.255', 4294967295) # No longer -1
+ )
+
+ def testIPaddrPacking(self):
+ """ addr2bin should give known output for known input """
+ for i,s in self.knownValues:
+ result = DNS.Lib.addr2bin(i)
+ self.assertEqual(s,result)
+
+ def testIPaddrUnpacking(self):
+ """ bin2addr should give known output for known input """
+ for i,s in self.knownValues:
+ result = DNS.Lib.bin2addr(s)
+ self.assertEqual(i,result)
+
+class PackerClassPacking(unittest.TestCase):
+ knownPackValues = [
+ ( ['www.ekit.com'], b'\x03www\x04ekit\x03com\x00' ),
+ ( ['ns1.ekorp.com', 'ns2.ekorp.com', 'ns3.ekorp.com'],
+ b'\x03ns1\x05ekorp\x03com\x00\x03ns2\xc0\x04\x03ns3\xc0\x04'),
+ ( ['a.root-servers.net.', 'b.root-servers.net.',
+ 'c.root-servers.net.', 'd.root-servers.net.',
+ 'e.root-servers.net.', 'f.root-servers.net.'],
+ b'\x01a\x0croot-servers\x03net\x00\x01b\xc0\x02\x01c\xc0'+
+ b'\x02\x01d\xc0\x02\x01e\xc0\x02\x01f\xc0\x02' ),
+ ]
+ knownUnpackValues = [
+ ( ['www.ekit.com'], b'\x03www\x04ekit\x03com\x00' ),
+ ( ['ns1.ekorp.com', 'ns2.ekorp.com', 'ns3.ekorp.com'],
+ b'\x03ns1\x05ekorp\x03com\x00\x03ns2\xc0\x04\x03ns3\xc0\x04'),
+ ( ['a.root-servers.net', 'b.root-servers.net',
+ 'c.root-servers.net', 'd.root-servers.net',
+ 'e.root-servers.net', 'f.root-servers.net'],
+ b'\x01a\x0croot-servers\x03net\x00\x01b\xc0\x02\x01c\xc0'+
+ b'\x02\x01d\xc0\x02\x01e\xc0\x02\x01f\xc0\x02' ),
+ ]
+
+ def testPackNames(self):
+ from DNS.Lib import Packer
+ for namelist,result in self.knownPackValues:
+ p = Packer()
+ for n in namelist:
+ p.addname(n)
+ self.assertEqual(p.getbuf(),result)
+
+ def testUnpackNames(self):
+ from DNS.Lib import Unpacker
+ for namelist,result in self.knownUnpackValues:
+ u = Unpacker(result)
+ names = []
+ for i in range(len(namelist)):
+ n = u.getname()
+ names.append(n)
+ self.assertEqual(names, namelist)
+
+""" def testUnpackerLimitCheck(self):
+ # FIXME: Don't understand what this test should do. If my guess is right,
+ # then the code is working ~OK.
+ from DNS.Lib import Unpacker
+ u=Unpacker(b'\x03ns1\x05ekorp\x03com\x00\x03ns2\xc0\x04\x03ns3\xc0\x04')
+ u.getname() ; u.getname() ; u.getname()
+ # 4th call should fail
+ self.assertRaises(IndexError, u.getname)"""
+
+class testUnpackingMangled(unittest.TestCase):
+ "addA(self, name, klass, ttl, address)"
+ packerCorrect = b'\x05www02\x04ekit\x03com\x00\x00\x01\x00\x01\x00\x01Q\x80\x00\x04\xc0\xa8\n\x02'
+ def testWithoutRR(self):
+ u = DNS.Lib.RRunpacker(self.packerCorrect)
+ u.getAdata()
+ def testWithTwoRRs(self):
+ u = DNS.Lib.RRunpacker(self.packerCorrect)
+ u.getRRheader()
+ self.assertRaises(DNS.Lib.UnpackError, u.getRRheader)
+ def testWithNoGetData(self):
+ u = DNS.Lib.RRunpacker(self.packerCorrect)
+ u.getRRheader()
+ self.assertRaises(DNS.Lib.UnpackError, u.endRR)
+
+class PackerTestCase(unittest.TestCase):
+ " base class for tests of Packing code. Laziness on my part, I know. "
+ def setUp(self):
+ self.RRpacker = DNS.Lib.RRpacker
+ self.RRunpacker = DNS.Lib.RRunpacker
+
+ def testPacker(self):
+ p = self.RRpacker()
+ check = self.doPack(p)
+ if (p is not None) and (check is not TestCompleted):
+ return self.checkPackResult(p)
+
+ def checkPackResult(self, buf):
+ if not hasattr(self, 'packerExpectedResult'):
+ if self.__class__.__name__ != 'PackerTestCase':
+ print("P***", self, repr(buf.getbuf())) #cheat testcase
+ else:
+ return self.assertEqual(buf.getbuf(),
+ self.packerExpectedResult)
+
+ def checkUnpackResult(self, rrbits, specbits):
+ if not hasattr(self, 'unpackerExpectedResult'):
+ if self.__class__.__name__ != 'PackerTestCase':
+ print("U***", self, repr((rrbits,specbits))) #cheat testcase
+ else:
+ return self.assertEqual((rrbits, specbits),
+ self.unpackerExpectedResult)
+
+ def testUnpacker(self):
+ if self.doUnpack is not None:
+ if hasattr(self.__class__, 'doUnpack') \
+ and hasattr(self, 'packerExpectedResult'):
+ u = self.RRunpacker(self.packerExpectedResult)
+ rrbits = u.getRRheader()[:4]
+ specbits = self.doUnpack(u)
+ try:
+ u.endRR()
+ except DNS.Lib.UnpackError:
+ self.assertEqual(0, 'Not at end of RR!')
+ return self.checkUnpackResult(rrbits, specbits)
+ else:
+ me = self.__class__.__name__
+ if me != 'PackerTestCase':
+ self.assertEquals(self.__class__.__name__,
+ 'Unpack NotImplemented')
+
+ def doPack(self, p):
+ " stub. don't test the base class "
+ return None
+
+ def doUnpack(self, p):
+ " stub. don't test the base class "
+ return None
+
+
+class testPackingOfCNAME(PackerTestCase):
+ "addCNAME(self, name, klass, ttl, cname)"
+ def doPack(self,p):
+ p.addCNAME('www.sub.domain', DNS.Class.IN, 3600, 'realhost.sub.domain')
+ def doUnpack(self, u):
+ return u.getCNAMEdata()
+
+ unpackerExpectedResult = (('www.sub.domain', DNS.Type.CNAME, DNS.Class.IN, 3600), 'realhost.sub.domain')
+ packerExpectedResult = \
+ b'\x03www\x03sub\x06domain\x00\x00\x05\x00\x01\x00'+ \
+ b'\x00\x0e\x10\x00\x0b\x08realhost\xc0\x04'
+
+class testPackingOfCNAME2(PackerTestCase):
+ "addCNAME(self, name, klass, ttl, cname)"
+ def doPack(self,p):
+ p.addCNAME('www.cust.com', DNS.Class.IN, 200, 'www023.big.isp.com')
+ def doUnpack(self, u):
+ return u.getCNAMEdata()
+ unpackerExpectedResult = (('www.cust.com', DNS.Type.CNAME, DNS.Class.IN, 200), 'www023.big.isp.com')
+ packerExpectedResult = \
+ b'\x03www\x04cust\x03com\x00\x00\x05\x00\x01\x00'+ \
+ b'\x00\x00\xc8\x00\x11\x06www023\x03big\x03isp\xc0\t'
+
+class testPackingOfCNAME3(PackerTestCase):
+ "addCNAME(self, name, klass, ttl, cname)"
+ def doPack(self,p):
+ p.addCNAME('www.fred.com', DNS.Class.IN, 86400, 'webhost.loa.com')
+ def doUnpack(self, u):
+ return u.getCNAMEdata()
+ unpackerExpectedResult = (('www.fred.com', DNS.Type.CNAME, DNS.Class.IN, 86400), 'webhost.loa.com')
+ packerExpectedResult = \
+ b'\x03www\x04fred\x03com\x00\x00\x05\x00\x01\x00\x01Q'+ \
+ b'\x80\x00\x0e\x07webhost\x03loa\xc0\t'
+
+class testPackingOfHINFO(PackerTestCase):
+ "addHINFO(self, name, klass, ttl, cpu, os)"
+ def doPack(self,p):
+ p.addHINFO('www.sub.domain.com', DNS.Class.IN, 3600, 'i686', 'linux')
+ def doUnpack(self, u):
+ return u.getHINFOdata()
+ unpackerExpectedResult = (('www.sub.domain.com', 13, 1, 3600), ('i686', 'linux'))
+ packerExpectedResult = \
+ b'\x03www\x03sub\x06domain\x03com\x00\x00\r\x00\x01'+ \
+ b'\x00\x00\x0e\x10\x00\x0b\x04i686\x05linux'
+
+class testPackingOfHINFO2(PackerTestCase):
+ "addHINFO(self, name, klass, ttl, cpu, os)"
+ def doPack(self,p):
+ p.addHINFO('core1.lax.foo.com', DNS.Class.IN, 3600, 'cisco', 'ios')
+ def doUnpack(self, u):
+ return u.getHINFOdata()
+ unpackerExpectedResult = (('core1.lax.foo.com', 13, 1, 3600), ('cisco', 'ios'))
+ packerExpectedResult = \
+ b'\x05core1\x03lax\x03foo\x03com\x00\x00\r\x00\x01'+ \
+ b'\x00\x00\x0e\x10\x00\n\x05cisco\x03ios'
+
+class testPackingOfMX(PackerTestCase):
+ "addMX(self, name, klass, ttl, preference, exchange)"
+ def doPack(self, p):
+ p.addMX('sub.domain.com', DNS.Class.IN, 86400, 10, 'mailhost1.isp.com')
+ def doUnpack(self, u):
+ return u.getMXdata()
+ packerExpectedResult = \
+ b'\x03sub\x06domain\x03com\x00\x00\x0f\x00\x01'+ \
+ b'\x00\x01Q\x80\x00\x12\x00\n\tmailhost1\x03isp\xc0\x0b'
+ unpackerExpectedResult = (('sub.domain.com', 15, 1, 86400), (10, 'mailhost1.isp.com'))
+
+class testPackingOfMX2(PackerTestCase):
+ "addMX(self, name, klass, ttl, preference, exchange)"
+ def doPack(self, p):
+ p.addMX('ekit-inc.com.', DNS.Class.IN, 86400, 10, 'mx1.ekorp.com')
+ p.addMX('ekit-inc.com.', DNS.Class.IN, 86400, 20, 'mx2.ekorp.com')
+ p.addMX('ekit-inc.com.', DNS.Class.IN, 86400, 30, 'mx3.ekorp.com')
+ def doUnpack(self, u):
+ res = [u.getMXdata(),]
+ dummy = u.getRRheader()[:4]
+ res += u.getMXdata()
+ dummy = u.getRRheader()[:4]
+ res += u.getMXdata()
+ return res
+ unpackerExpectedResult = (('ekit-inc.com', 15, 1, 86400), [(10, 'mx1.ekorp.com'), 20, 'mx2.ekorp.com', 30, 'mx3.ekorp.com'])
+ packerExpectedResult = \
+ b'\x08ekit-inc\x03com\x00\x00\x0f\x00\x01\x00\x01Q\x80\x00'+\
+ b'\x0e\x00\n\x03mx1\x05ekorp\xc0\t\x00\x00\x0f\x00\x01\x00'+\
+ b'\x01Q\x80\x00\x08\x00\x14\x03mx2\xc0\x1e\x00\x00\x0f\x00'+\
+ b'\x01\x00\x01Q\x80\x00\x08\x00\x1e\x03mx3\xc0\x1e'
+
+class testPackingOfNS(PackerTestCase):
+ "addNS(self, name, klass, ttl, nsdname)"
+ def doPack(self, p):
+ p.addNS('ekit-inc.com', DNS.Class.IN, 86400, 'ns1.ekorp.com')
+ def doUnpack(self, u):
+ return u.getNSdata()
+ unpackerExpectedResult = (('ekit-inc.com', 2, 1, 86400), 'ns1.ekorp.com')
+ packerExpectedResult = b'\x08ekit-inc\x03com\x00\x00\x02\x00\x01\x00\x01Q\x80\x00\x0c\x03ns1\x05ekorp\xc0\t'
+
+class testPackingOfPTR(PackerTestCase):
+ "addPTR(self, name, klass, ttl, ptrdname)"
+ def doPack(self, p):
+ p.addPTR('www.ekit-inc.com', DNS.Class.IN, 3600, 'www-real01.ekorp.com')
+ def doUnpack(self, u):
+ return u.getPTRdata()
+ unpackerExpectedResult = (('www.ekit-inc.com', 12, 1, 3600), 'www-real01.ekorp.com')
+ packerExpectedResult = b'\x03www\x08ekit-inc\x03com\x00\x00\x0c\x00\x01\x00\x00\x0e\x10\x00\x13\nwww-real01\x05ekorp\xc0\r'
+
+class testPackingOfSOA(PackerTestCase):
+ """addSOA(self, name, klass, ttl, mname,
+ rname, serial, refresh, retry, expire, minimum)"""
+ def doPack(self, p):
+ p.addSOA('ekit-inc.com', DNS.Class.IN, 3600, 'ns1.ekorp.com',
+ 'hostmaster.ekit-inc.com', 2002020301, 100, 200, 300, 400)
+ def doUnpack(self, u):
+ return u.getSOAdata()
+ unpackerExpectedResult = (('ekit-inc.com', 6, 1, 3600), ('ns1.ekorp.com', 'hostmaster', ('serial', 2002020301), ('refresh ', 100, '1 minutes'), ('retry', 200, '3 minutes'), ('expire', 300, '5 minutes'), ('minimum', 400, '6 minutes')))
+ packerExpectedResult = b'\x08ekit-inc\x03com\x00\x00\x06\x00\x01\x00\x00\x0e\x10\x00,\x03ns1\x05ekorp\xc0\t\nhostmaster\x00wTg\xcd\x00\x00\x00d\x00\x00\x00\xc8\x00\x00\x01,\x00\x00\x01\x90'
+
+
+class testPackingOfA(PackerTestCase):
+ "addA(self, name, klass, ttl, address)"
+ def doPack(self, p):
+ p.addA('www02.ekit.com', DNS.Class.IN, 86400, '192.168.10.2')
+ def doUnpack(self, u):
+ return u.getAdata()
+ unpackerExpectedResult = (('www02.ekit.com', 1, 1, 86400), '192.168.10.2')
+ packerExpectedResult = b'\x05www02\x04ekit\x03com\x00\x00\x01\x00\x01\x00\x01Q\x80\x00\x04\xc0\xa8\n\x02'
+
+class testPackingOfA2(PackerTestCase):
+ "addA(self, name, ttl, address)"
+ def doPack(self, p):
+ p.addA('www.ekit.com', DNS.Class.IN, 86400, '10.98.1.0')
+ def doUnpack(self, u):
+ return u.getAdata()
+ unpackerExpectedResult = (('www.ekit.com', 1, 1, 86400), '10.98.1.0')
+ packerExpectedResult = b'\x03www\x04ekit\x03com\x00\x00\x01\x00\x01\x00\x01Q\x80\x00\x04\nb\x01\x00'
+
+class testPackingOfA3(PackerTestCase):
+ "addA(self, name, ttl, address)"
+ def doPack(self, p):
+ p.addA('www.zol.com', DNS.Class.IN, 86400, '192.168.10.4')
+ p.addA('www.zol.com', DNS.Class.IN, 86400, '192.168.10.3')
+ p.addA('www.zol.com', DNS.Class.IN, 86400, '192.168.10.2')
+ p.addA('www.zol.com', DNS.Class.IN, 86400, '192.168.10.1')
+ def doUnpack(self, u):
+ u1,d1,u2,d2,u3,d3,u4=u.getAdata(),u.getRRheader(),u.getAdata(),u.getRRheader(),u.getAdata(),u.getRRheader(),u.getAdata()
+ return u1,u2,u3,u4
+ unpackerExpectedResult = (('www.zol.com', 1, 1, 86400), ('192.168.10.4', '192.168.10.3', '192.168.10.2', '192.168.10.1'))
+ packerExpectedResult = b'\x03www\x03zol\x03com\x00\x00\x01\x00\x01\x00\x01Q\x80\x00\x04\xc0\xa8\n\x04\x00\x00\x01\x00\x01\x00\x01Q\x80\x00\x04\xc0\xa8\n\x03\x00\x00\x01\x00\x01\x00\x01Q\x80\x00\x04\xc0\xa8\n\x02\x00\x00\x01\x00\x01\x00\x01Q\x80\x00\x04\xc0\xa8\n\x01'
+
+class testPackingOfTXT(PackerTestCase):
+ "addTXT(self, name, klass, ttl, list)"
+ def doPack(self, p):
+ p.addTXT('ekit-inc.com', DNS.Class.IN, 3600, 'this is a text record')
+ def doUnpack(self, u):
+ return u.getTXTdata()
+ packerExpectedResult = b'\x08ekit-inc\x03com\x00\x00\x10\x00\x01\x00\x00\x0e\x10\x00\x16\x15this is a text record'
+ unpackerExpectedResult = (('ekit-inc.com', 16, 1, 3600), [b'this is a text record'])
+
+# check what the maximum/minimum &c of TXT records are.
+class testPackingOfTXT2(PackerTestCase):
+ "addTXT(self, name, klass, ttl, list)"
+ def doPack(self, p):
+ f = lambda p=p:p.addTXT('ekit-inc.com', DNS.Class.IN, 3600, 'the quick brown fox jumped over the lazy brown dog\n'*20)
+ self.assertRaises(ValueError, f)
+ return TestCompleted
+ doUnpack = None
+
+class testPackingOfAAAAText(PackerTestCase):
+ "addAAAA(self, name, klass, ttl, address)"
+ def setUp(self):
+ self.RRpacker = DNS.Lib.RRpacker
+ self.RRunpacker = DNS.Lib.RRunpackerText
+
+ def doPack(self, p):
+ addAAAA(p, 'google.com', DNS.Class.IN, 4, '2607:f8b0:4005:802::1005')
+ def doUnpack(self, u):
+ r = u.getAAAAdata()
+ return r
+ packerExpectedResult = b'\x06google\x03com\x00\x00\x1c\x00\x01\x00\x00\x00\x04\x00\x10&\x07\xf8\xb0@\x05\x08\x02\x00\x00\x00\x00\x00\x00\x10\x05'
+ unpackerExpectedResult = (('google.com', DNS.Type.AAAA, DNS.Class.IN, 4), '2607:f8b0:4005:802::1005')
+
+class testPackingOfAAAABinary(PackerTestCase):
+ "addAAAA(self, name, klass, ttl, address)"
+ def setUp(self):
+ self.RRpacker = DNS.Lib.RRpacker
+ self.RRunpacker = DNS.Lib.RRunpackerBinary
+
+ def doPack(self, p):
+ addAAAA(p, 'google.com', DNS.Class.IN, 4, '2607:f8b0:4005:802::1005')
+ def doUnpack(self, u):
+ self.assertFalse(hasattr(u, "getAAAAdata"))
+ r = u.getbytes(16)
+ return r
+ packerExpectedResult = b'\x06google\x03com\x00\x00\x1c\x00\x01\x00\x00\x00\x04\x00\x10&\x07\xf8\xb0@\x05\x08\x02\x00\x00\x00\x00\x00\x00\x10\x05'
+ unpackerExpectedResult = (('google.com', DNS.Type.AAAA, DNS.Class.IN, 4), b'&\x07\xf8\xb0@\x05\x08\x02\x00\x00\x00\x00\x00\x00\x10\x05')
+
+class testPackingOfAAAAInteger(PackerTestCase):
+ "addAAAA(self, name, klass, ttl, address)"
+ def setUp(self):
+ self.RRpacker = DNS.Lib.RRpacker
+ self.RRunpacker = DNS.Lib.RRunpackerInteger
+
+ def doPack(self, p):
+ addAAAA(p, 'google.com', DNS.Class.IN, 4, '2607:f8b0:4005:802::1005')
+ def doUnpack(self, u):
+ r = u.getAAAAdata()
+ return r
+ packerExpectedResult = b'\x06google\x03com\x00\x00\x1c\x00\x01\x00\x00\x00\x04\x00\x10&\x07\xf8\xb0@\x05\x08\x02\x00\x00\x00\x00\x00\x00\x10\x05'
+ unpackerExpectedResult = (('google.com', DNS.Type.AAAA, DNS.Class.IN, 4), 50552053919387978162022445795852161029)
+
+def addAAAA(p, name, klass, ttl, address):
+ """Add AAAA record to a packer.
+ """
+ addr_buf = socket.inet_pton(socket.AF_INET6, address)
+ p.addRRheader(name, DNS.Type.AAAA, klass, ttl)
+ p.buf = p.buf + addr_buf
+ p.endRR()
+ return p
+
+#class testPackingOfQuestion(PackerTestCase):
+# "addQuestion(self, qname, qtype, qclass)"
+# def doPack(self, p):
+# self.assertEquals(0,"NotImplemented")
+
+def test_suite():
+ from unittest import TestLoader
+ return TestLoader().loadTestsFromName(__name__)
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/DNS/tests/test_base.py b/DNS/tests/test_base.py
new file mode 100644
index 0000000..e070b58
--- /dev/null
+++ b/DNS/tests/test_base.py
@@ -0,0 +1,260 @@
+#!/usr/bin/python3
+# -*- coding: utf-8 -*-
+
+import DNS
+import unittest
+try:
+ import ipaddress
+except ImportError:
+ import ipaddr as ipaddress
+
+def assertIsByte(b):
+ assert b >= 0
+ assert b <= 255
+
+class TestBase(unittest.TestCase):
+ def testParseResolvConf(self):
+ # reset elments set by Base._DiscoverNameServers
+ DNS.defaults['server'] = []
+ if 'domain' in DNS.defaults:
+ del DNS.defaults['domain']
+ self.assertEqual(len(DNS.defaults['server']), 0)
+ resolv = ['# a comment',
+ 'domain example.org',
+ 'nameserver 127.0.0.1',
+ ]
+ DNS.ParseResolvConfFromIterable(resolv)
+ self.assertEqual(len(DNS.defaults['server']), 1)
+ self.assertEqual(DNS.defaults['server'][0], '127.0.0.1')
+ self.assertEqual(DNS.defaults['domain'], 'example.org')
+
+ def testDnsRequestA(self):
+ # try with asking for strings, and asking for bytes
+ dnsobj = DNS.DnsRequest('example.org')
+
+ a_response = dnsobj.qry(qtype='A', resulttype='text', timeout=1)
+ self.assertTrue(a_response.answers)
+ # is the result vaguely ipv4 like?
+ self.assertEqual(a_response.answers[0]['data'].count('.'), 3)
+ self.assertEqual(a_response.answers[0]['data'],'93.184.216.34')
+
+ # Default result type for .qry object is an ipaddress object
+ ad_response = dnsobj.qry(qtype='A', timeout=1)
+ self.assertTrue(ad_response.answers)
+ self.assertEqual(ad_response.answers[0]['data'],ipaddress.IPv4Address('93.184.216.34'))
+
+ ab_response = dnsobj.qry(qtype='A', resulttype='binary', timeout=1)
+ self.assertTrue(ab_response.answers)
+ # is the result ipv4 binary like?
+ self.assertEqual(len(ab_response.answers[0]['data']), 4)
+ for b in ab_response.answers[0]['data']:
+ assertIsByte(b)
+ self.assertEqual(ab_response.answers[0]['data'],b']\xb8\xd8\"')
+
+ ai_response = dnsobj.qry(qtype='A', resulttype='integer', timeout=1)
+ self.assertTrue(ai_response.answers)
+ self.assertEqual(ai_response.answers[0]['data'],1572395042)
+
+
+ def testDnsRequestAAAA(self):
+ dnsobj = DNS.DnsRequest('example.org')
+
+ aaaa_response = dnsobj.qry(qtype='AAAA', resulttype='text', timeout=1)
+ self.assertTrue(aaaa_response.answers)
+ # does the result look like an ipv6 address?
+ self.assertTrue(':' in aaaa_response.answers[0]['data'])
+ self.assertEqual(aaaa_response.answers[0]['data'],'2606:2800:220:1:248:1893:25c8:1946')
+
+ # default is returning ipaddress object
+ aaaad_response = dnsobj.qry(qtype='AAAA', timeout=1)
+ self.assertTrue(aaaad_response.answers)
+ self.assertEqual(aaaad_response.answers[0]['data'],ipaddress.IPv6Address('2606:2800:220:1:248:1893:25c8:1946'))
+
+ aaaab_response = dnsobj.qry(qtype='AAAA', resulttype='binary', timeout=1)
+ self.assertTrue(aaaab_response.answers)
+ # is it ipv6 looking?
+ self.assertEqual(len(aaaab_response.answers[0]['data']) , 16)
+ for b in aaaab_response.answers[0]['data']:
+ assertIsByte(b)
+ self.assertEqual(aaaab_response.answers[0]['data'],b'&\x06(\x00\x02 \x00\x01\x02H\x18\x93%\xc8\x19F')
+ # IPv6 decimal
+ aaaai_response = dnsobj.qry(qtype='AAAA', resulttype='integer', timeout=1)
+ self.assertTrue(aaaai_response.answers)
+ self.assertEqual(aaaai_response.answers[0]['data'], 50542628918019813867414319910101719366)
+
+ def testDnsRequestEmptyMX(self):
+ dnsobj = DNS.DnsRequest('example.org')
+
+ mx_empty_response = dnsobj.qry(qtype='MX', timeout=1)
+ self.assertFalse(mx_empty_response.answers)
+
+ def testDnsRequestMX(self):
+ dnsobj = DNS.DnsRequest('ietf.org')
+ mx_response = dnsobj.qry(qtype='MX', timeout=1)
+ self.assertTrue(mx_response.answers[0])
+ # is hard coding a remote address a good idea?
+ # I think it's unavoidable. - sk
+ self.assertEqual(mx_response.answers[0]['data'], (0, 'mail.ietf.org'))
+
+ m = DNS.mxlookup('ietf.org', timeout=1)
+ self.assertEqual(mx_response.answers[0]['data'], m[0])
+
+ def testDnsRequestSrv(self):
+ dnsobj = DNS.Request(qtype='srv')
+ respdef = dnsobj.qry('_ldap._tcp.openldap.org', timeout=1)
+ self.assertTrue(respdef.answers)
+ data = respdef.answers[0]['data']
+ self.assertEqual(len(data), 4)
+ self.assertEqual(data[2], 389)
+ self.assertTrue('openldap.org' in data[3])
+
+ def testDkimRequest(self):
+ q = '20161025._domainkey.google.com'
+ dnsobj = DNS.Request(q, qtype='txt')
+ resp = dnsobj.qry(timeout=1)
+
+ self.assertTrue(resp.answers)
+ # should the result be bytes or a string? (Bytes, we finally settled on bytes)
+ data = resp.answers[0]['data']
+ self.assertFalse(isinstance(data[0], str))
+ self.assertTrue(data[0].startswith(b'k=rsa'))
+
+ def testDNSRequestTXT(self):
+ dnsobj = DNS.DnsRequest('fail.kitterman.org')
+
+ respdef = dnsobj.qry(qtype='TXT', timeout=1)
+ self.assertTrue(respdef.answers)
+ data = respdef.answers[0]['data']
+ self.assertEqual(data, [b'v=spf1 -all'])
+
+ resptext = dnsobj.qry(qtype='TXT', resulttype='text', timeout=1)
+ self.assertTrue(resptext.answers)
+ data = resptext.answers[0]['data']
+ self.assertEqual(data, ['v=spf1 -all'])
+
+ respbin = dnsobj.qry(qtype='TXT', resulttype='binary', timeout=1)
+ self.assertTrue(respbin.answers)
+ data = respbin.answers[0]['data']
+ self.assertEqual(data, [b'\x0bv=spf1 -all'])
+
+ def testIDN(self):
+ """Can we lookup an internationalized domain name?"""
+ dnsobj = DNS.DnsRequest('xn--bb-eka.at')
+ unidnsobj = DNS.DnsRequest('öbb.at')
+ a_resp = dnsobj.qry(qtype='A', resulttype='text', timeout=1)
+ ua_resp = unidnsobj.qry(qtype='A', resulttype='text', timeout=1)
+ self.assertTrue(a_resp.answers)
+ self.assertTrue(ua_resp.answers)
+ self.assertEqual(ua_resp.answers[0]['data'],
+ a_resp.answers[0]['data'])
+
+ def testNS(self):
+ """Lookup NS record from SOA"""
+ dnsob = DNS.DnsRequest('kitterman.com')
+ resp = dnsob.qry(qtype='SOA', timeout=1)
+ self.assertTrue(resp.answers)
+ primary = resp.answers[0]['data'][0]
+ self.assertEqual(primary, 'ns1.pairnic.com')
+ resp = dnsob.qry(qtype='NS',server=primary,aa=1)
+ nslist = [x['data'].lower() for x in resp.answers]
+ nslist.sort()
+ self.assertEqual(nslist, ['ns1.pairnic.com', 'ns2.pairnic.com'])
+
+ # Test defaults with legacy DNS.req
+
+ def testDnsRequestAD(self):
+ # try with asking for strings, and asking for bytes
+ dnsob = DNS.DnsRequest('example.org')
+
+ ad_response = dnsob.req(qtype='A', timeout=1)
+ self.assertTrue(ad_response.answers)
+ # is the result vaguely ipv4 like?
+ self.assertEqual(ad_response.answers[0]['data'].count('.'), 3)
+ self.assertEqual(ad_response.answers[0]['data'],'93.184.216.34')
+
+ def testDnsRequestAAAAD(self):
+ dnsob = DNS.DnsRequest('example.org')
+
+ # default is returning binary instead of text
+ aaaad_response = dnsob.req(qtype='AAAA', timeout=1)
+ self.assertTrue(aaaad_response.answers)
+ # does the result look like a binary ipv6 address?
+ self.assertEqual(len(aaaad_response.answers[0]['data']) , 16)
+ for b in aaaad_response.answers[0]['data']:
+ assertIsByte(b)
+ self.assertEqual(aaaad_response.answers[0]['data'],b'&\x06(\x00\x02 \x00\x01\x02H\x18\x93%\xc8\x19F')
+
+ def testDnsRequestEmptyMXD(self):
+ dnsob = DNS.DnsRequest('example.org')
+
+ mx_empty_response = dnsob.req(qtype='MX', timeout=1)
+ self.assertFalse(mx_empty_response.answers)
+
+ def testDnsRequestMXD(self):
+ dnsob = DNS.DnsRequest('ietf.org')
+ mx_response = dnsob.req(qtype='MX', timeout=1)
+ self.assertTrue(mx_response.answers[0])
+ # is hard coding a remote address a good idea?
+ # I think it's unavoidable. - sk
+ self.assertEqual(mx_response.answers[0]['data'], (0, 'mail.ietf.org'))
+
+ m = DNS.mxlookup('ietf.org', timeout=1)
+ self.assertEqual(mx_response.answers[0]['data'], m[0])
+
+ def testDnsRequestSrvD(self):
+ dnsob = DNS.Request(qtype='srv')
+ respdef = dnsob.req('_ldap._tcp.openldap.org', timeout=1)
+ self.assertTrue(respdef.answers)
+ data = respdef.answers[0]['data']
+ self.assertEqual(len(data), 4)
+ self.assertEqual(data[2], 389)
+ self.assertTrue('openldap.org' in data[3])
+
+ def testDkimRequestD(self):
+ q = '20161025._domainkey.google.com'
+ dnsob = DNS.Request(q, qtype='txt')
+ resp = dnsob.req(timeout=1)
+
+ self.assertTrue(resp.answers)
+ # should the result be bytes or a string? (Bytes, we finally settled on bytes)
+ data = resp.answers[0]['data']
+ self.assertFalse(isinstance(data[0], str))
+ self.assertTrue(data[0].startswith(b'k=rsa'))
+
+ def testDNSRequestTXTD(self):
+ dnsob = DNS.DnsRequest('fail.kitterman.org')
+
+ respdef = dnsob.req(qtype='TXT', timeout=1)
+ self.assertTrue(respdef.answers)
+ data = respdef.answers[0]['data']
+ self.assertEqual(data, [b'v=spf1 -all'])
+
+ def testIDND(self):
+ """Can we lookup an internationalized domain name?"""
+ dnsob = DNS.DnsRequest('xn--bb-eka.at')
+ unidnsob = DNS.DnsRequest('öbb.at')
+ a_resp = dnsob.req(qtype='A', resulttype='text', timeout=1)
+ ua_resp = unidnsob.req(qtype='A', resulttype='text', timeout=1)
+ self.assertTrue(a_resp.answers)
+ self.assertTrue(ua_resp.answers)
+ self.assertEqual(ua_resp.answers[0]['data'],
+ a_resp.answers[0]['data'])
+
+ def testNSD(self):
+ """Lookup NS record from SOA"""
+ dnsob = DNS.DnsRequest('kitterman.com')
+ resp = dnsob.req(qtype='SOA', timeout=1)
+ self.assertTrue(resp.answers)
+ primary = resp.answers[0]['data'][0]
+ self.assertEqual(primary, 'ns1.pairnic.com')
+ resp = dnsob.req(qtype='NS',server=primary,aa=1, timeout=1)
+ nslist = [x['data'].lower() for x in resp.answers]
+ nslist.sort()
+ self.assertEqual(nslist, ['ns1.pairnic.com', 'ns2.pairnic.com'])
+
+def test_suite():
+ from unittest import TestLoader
+ return TestLoader().loadTestsFromName(__name__)
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/DNS/win32dns.py b/DNS/win32dns.py
new file mode 100644
index 0000000..a74296b
--- /dev/null
+++ b/DNS/win32dns.py
@@ -0,0 +1,118 @@
+"""
+ $Id$
+
+ Extract a list of TCP/IP name servers from the registry 0.1
+ 0.1 Strobl 2001-07-19
+ Usage:
+ RegistryResolve() returns a list of ip numbers (dotted quads), by
+ scouring the registry for addresses of name servers
+
+ Tested on Windows NT4 Server SP6a, Windows 2000 Pro SP2 and
+ Whistler Pro (XP) Build 2462 and Windows ME
+ ... all having a different registry layout wrt name servers :-/
+
+ Todo:
+
+ Program doesn't check whether an interface is up or down
+
+ (c) 2001 Copyright by Wolfgang Strobl ws@mystrobl.de,
+ License analog to the current Python license
+
+ WARNING: Python3 port completely untested on Windows.
+"""
+
+import re
+import winreg
+
+def binipdisplay(s):
+ "convert a binary array of ip adresses to a python list"
+ if len(s)%4!= 0:
+ raise EnvironmentError # well ...
+ ol=[]
+ for i in range(len(s)/4):
+ s1=s[:4]
+ s=s[4:]
+ ip=[]
+ for j in s1:
+ ip.append(str(ord(j)))
+ ol.append('.'.join(ip))
+ return ol
+
+def stringdisplay(s):
+ '''convert "d.d.d.d,d.d.d.d" to ["d.d.d.d","d.d.d.d"].
+ also handle u'd.d.d.d d.d.d.d', as reporting on SF
+ '''
+ import re
+ return list(map(str, re.split("[ ,]",s)))
+
+def RegistryResolve():
+ nameservers=[]
+ x=winreg.ConnectRegistry(None,winreg.HKEY_LOCAL_MACHINE)
+ try:
+ y= winreg.OpenKey(x,
+ r"SYSTEM\CurrentControlSet\Services\Tcpip\Parameters")
+ except EnvironmentError: # so it isn't NT/2000/XP
+ # windows ME, perhaps?
+ try: # for Windows ME
+ y= winreg.OpenKey(x,
+ r"SYSTEM\CurrentControlSet\Services\VxD\MSTCP")
+ nameserver,dummytype=winreg.QueryValueEx(y,'NameServer')
+ if nameserver and not (nameserver in nameservers):
+ nameservers.extend(stringdisplay(nameserver))
+ except EnvironmentError:
+ pass
+ return nameservers # no idea
+ try:
+ nameserver = winreg.QueryValueEx(y, "DhcpNameServer")[0].split()
+ except:
+ nameserver = winreg.QueryValueEx(y, "NameServer")[0].split()
+ if nameserver:
+ nameservers=nameserver
+ nameserver = winreg.QueryValueEx(y,"NameServer")[0]
+ winreg.CloseKey(y)
+ try: # for win2000
+ y= winreg.OpenKey(x,
+ r"SYSTEM\CurrentControlSet\Services\Tcpip\Parameters\DNSRegisteredAdapters")
+ for i in range(1000):
+ try:
+ n=winreg.EnumKey(y,i)
+ z=winreg.OpenKey(y,n)
+ dnscount,dnscounttype=winreg.QueryValueEx(z,
+ 'DNSServerAddressCount')
+ dnsvalues,dnsvaluestype=winreg.QueryValueEx(z,
+ 'DNSServerAddresses')
+ nameservers.extend(binipdisplay(dnsvalues))
+ winreg.CloseKey(z)
+ except EnvironmentError:
+ break
+ winreg.CloseKey(y)
+ except EnvironmentError:
+ pass
+#
+ try: # for whistler
+ y= winreg.OpenKey(x,
+ r"SYSTEM\CurrentControlSet\Services\Tcpip\Parameters\Interfaces")
+ for i in range(1000):
+ try:
+ n=winreg.EnumKey(y,i)
+ z=winreg.OpenKey(y,n)
+ try:
+ nameserver,dummytype=winreg.QueryValueEx(z,'NameServer')
+ if nameserver and not (nameserver in nameservers):
+ nameservers.extend(stringdisplay(nameserver))
+ except EnvironmentError:
+ pass
+ winreg.CloseKey(z)
+ except EnvironmentError:
+ break
+ winreg.CloseKey(y)
+ except EnvironmentError:
+ #print "Key Interfaces not found, just do nothing"
+ pass
+#
+ winreg.CloseKey(x)
+ return nameservers
+
+if __name__=="__main__":
+ print("Name servers:",RegistryResolve())
+
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..1af024b
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,69 @@
+PYDNS is Copyright 2000-2014 by Guido van Rossum,
+Michael Ströder <stroeder@users.sourceforge.net>,
+Anthony Baxter <anthony@interlink.com.au>,
+Stuart Gathman <stuart@bmsi.com>,
+and Scott Kitterman <scott@kitterman.com>
+
+This code is released under the following Python-style license:
+
+CNRI LICENSE AGREEMENT FOR PYDNS-2.3.5
+
+ 1. This LICENSE AGREEMENT is between the Corporation for National Research
+Initiatives, having an office at 1895 Preston White Drive, Reston, VA 20191
+(“CNRI”), and the Individual or Organization (“Licensee”) accessing and
+otherwise using pydns-2.3.5 software in source or binary form and its
+associated documentation.
+
+ 2. Subject to the terms and conditions of this License Agreement, CNRI
+hereby grants Licensee a nonexclusive, royalty-free, world-wide license to
+reproduce, analyze, test, perform and/or display publicly, prepare derivative
+works, distribute, and otherwise use pydns-2.3.5 alone or in any derivative
+version, provided, however, that CNRI’s License Agreement and CNRI’s notice of
+copyright, i.e., “Copyright © 1995-2001 Corporation for National Research
+Initiatives; All Rights Reserved” are retained in pydns-2.3.5 alone or in any
+derivative version prepared by Licensee. Alternately, in lieu of CNRI’s License
+Agreement, Licensee may substitute the following text (omitting the quotes):
+“pydns-2.3.5 is made available subject to the terms and conditions in CNRI’s
+License Agreement. This Agreement together with pydns-2.3.5 may be located on
+the Internet using the following unique, persistent identifier (known as a
+handle): 1895.22/1013. This Agreement may also be obtained from a proxy server
+on the Internet using the following URL: http://hdl.handle.net/1895.22/1013.”
+
+ 3. In the event Licensee prepares a derivative work that is based on or
+incorporates pydns-2.3.5 or any part thereof, and wants to make the derivative
+work available to others as provided herein, then Licensee hereby agrees to
+include in any such work a brief summary of the changes made to pydns-2.3.5.
+
+ 4. CNRI is making pydns-2.3.5 available to Licensee on an “AS IS” basis.
+CNRI MAKES NO REPRESENTATIONS OR WARRANTIES, EXPRESS OR IMPLIED. BY WAY OF
+EXAMPLE, BUT NOT LIMITATION, CNRI MAKES NO AND DISCLAIMS ANY REPRESENTATION OR
+WARRANTY OF MERCHANTABILITY OR FITNESS FOR ANY PARTICULAR PURPOSE OR THAT THE
+USE OF PYTHON 1.6.1 WILL NOT INFRINGE ANY THIRD PARTY RIGHTS.
+
+ 5. CNRI SHALL NOT BE LIABLE TO LICENSEE OR ANY OTHER USERS OF PYTHON 1.6.1
+FOR ANY INCIDENTAL, SPECIAL, OR CONSEQUENTIAL DAMAGES OR LOSS AS A RESULT OF
+MODIFYING, DISTRIBUTING, OR OTHERWISE USING PYTHON 1.6.1, OR ANY DERIVATIVE
+THEREOF, EVEN IF ADVISED OF THE POSSIBILITY THEREOF.
+
+ 6. This License Agreement will automatically terminate upon a material
+breach of its terms and conditions.
+
+ 7. This License Agreement shall be governed by the federal intellectual
+property law of the United States, including without limitation the federal
+copyright law, and, to the extent such U.S. federal law does not apply, by the
+law of the Commonwealth of Virginia, excluding Virginia’s conflict of law
+provisions. Notwithstanding the foregoing, with regard to derivative works
+based on pydns-2.3.5 that incorporate non-separable material that was
+previously distributed under the GNU General Public License (GPL), the law of
+the Commonwealth of Virginia shall govern this License Agreement only as to
+issues arising under or with respect to Paragraphs 4, 5, and 7 of this License
+Agreement. Nothing in this License Agreement shall be deemed to create any
+relationship of agency, partnership, or joint venture between CNRI and
+Licensee. This License Agreement does not grant permission to use CNRI
+trademarks or trade name in a trademark sense to endorse or promote products or
+services of Licensee, or any third party.
+
+ 8. By clicking on the “ACCEPT” button where indicated, or by copying,
+installing or otherwise using pydns-2.3.5, Licensee agrees to be bound by the
+terms and conditions of this License Agreement.
+
diff --git a/MANIFEST.in b/MANIFEST.in
new file mode 100644
index 0000000..1727efd
--- /dev/null
+++ b/MANIFEST.in
@@ -0,0 +1,9 @@
+recursive-include DNS *.py
+recursive-include tools *.py
+recursive-include tests *.py
+include LICENSE
+include CHANGES
+include MANIFEST.in
+include *.txt
+include setup.*
+include test.py
diff --git a/PKG-INFO b/PKG-INFO
new file mode 100644
index 0000000..efff2cd
--- /dev/null
+++ b/PKG-INFO
@@ -0,0 +1,23 @@
+Metadata-Version: 1.2
+Name: py3dns
+Version: 3.2.1
+Summary: Python 3 DNS library
+Home-page: https://launchpad.net/py3dns
+Author: Anthony Baxter and others
+Author-email: py3dns-hackers@lists.launchpad.net
+Maintainer: Scott Kitterman
+Maintainer-email: scott@kitterman.com
+License: Python License
+Description: Python 3 DNS library:
+
+Keywords: DNS
+Platform: UNKNOWN
+Classifier: Development Status :: 5 - Production/Stable
+Classifier: Environment :: No Input/Output (Daemon)
+Classifier: Intended Audience :: Developers
+Classifier: License :: OSI Approved :: Python License (CNRI Python License)
+Classifier: Natural Language :: English
+Classifier: Operating System :: OS Independent
+Classifier: Programming Language :: Python :: 3
+Classifier: Topic :: Internet :: Name Service (DNS)
+Classifier: Topic :: Software Development :: Libraries :: Python Modules
diff --git a/README-guido.txt b/README-guido.txt
new file mode 100644
index 0000000..09ebe8f
--- /dev/null
+++ b/README-guido.txt
@@ -0,0 +1,12 @@
+This directory contains a module (dnslib) that implements a DNS
+(Domain Name Server) client, plus additional modules that define some
+symbolic constants used by DNS (dnstype, dnsclass, dnsopcode).
+
+Type "python dnslib.py -/" for a usage message.
+
+You can also import dnslib and write your own, more sophisticated
+client code; use the test program as an example (there is currently no
+documentation :-).
+
+--Guido van Rossum, CWI, Amsterdam <Guido.van.Rossum@cwi.nl>
+URL: <http://www.cwi.nl/cwi/people/Guido.van.Rossum.html>
diff --git a/README.txt b/README.txt
new file mode 100644
index 0000000..b511b68
--- /dev/null
+++ b/README.txt
@@ -0,0 +1,146 @@
+Release 3.2.0 Mon Jul 23 2018
+
+Switched from distutils to setuptools because "it's the future". It is
+unlikely to have end user impact. For python3.3+ no additional dependencies
+are required.
+
+Release 3.1.0 Thu Apr 24 23:52:00 EDT 2014
+
+More choices about result types are provided in 3.1.0. To specify resulttype,
+in a DnsRequest object, use the new function DnsRequest.qry
+(resulttype='binary/text/default'). DnsRequest.qry returns ipaddress objects
+for A and AAAA queries by defaults. Other defaults are the same as
+DnsRequest.req. Continue to use DnsRequest.req for exact backward
+compatibility with pydns and older py3dns defaults. TXT and SPF record data
+are returned as strings by default, this matches what dnspython3 returns.
+
+The ipaddress module is used internally now. See CHANGES for details.
+
+Release 3.0.3 Wed May 29 00:05:00 EDT 2013
+
+There was a third, unintended incompatiblity in 3.0.2 in that IPv6 addresses
+were returned in their string format rather than their decimal format. This
+breaks pyspf queries when the connect IP is IPv6. 3.0.3 is a release strictly
+to revert this change.
+
+Release 3.0.2 Thu Jan 19 01:25:00 EST 2012
+
+This release introduces two potentially incompatible changes from the python
+verion of DNS (pydns). First, the data portion of DNS records of types TXT
+and SPF are returned as bytes instead of strings. Second, additional sub
+classes of DNSError have been added. Any code that catches DNSError should
+be checked to see if it needs updating to catch one of the new sub classes:
+ArgumentError, SocketError, TimeoutError, ServerError, and
+IncompleteReplyError.
+
+Release 3.0 Sun Mar 2-9 23:07:22 2011 -0400
+
+Ported to Python3 by Scott Kitterman <scott@kitterman.com>. This is mostly a
+minimal port to work with Python3 (tested with python3.2) plus addition of
+some of the patches that people have submitted on Sourceforge. It should be
+fully API compatible with 2.3. Note: Version 3.0.0 shipped with a new
+lazy.lookupfull function in advance of 2.3. This was incorporated in pydns
+2.3.5 as lazy.lookupalll. It has been renamed in 3.0.1 to stay API compatible
+with pydns 2.3.
+
+Release 2.3 Mon May 6 16:18:02 EST 2002
+
+This is a another release of the pydns code, as originally written by
+Guido van Rossum, and with a hopefully nicer API bolted over the
+top of it by Anthony Baxter <anthony@interlink.com.au>.
+
+This code is released under a Python-style license.
+
+I'm making this release because there hasn't been a release in a
+heck of a long time, and it probably deserves one. I'd also like to
+do a substantial refactor of some of the guts of the code, and this
+is likely to break any code that uses the existing interface. So
+this will be a release for people who are using the existing API...
+
+There are several known bugs/unfinished bits
+
+- processing of AXFR results is not done yet.
+- doesn't do IPv6 DNS requests (type AAAA)
+- docs, aside from this file
+- all sorts of other stuff that I've probably forgotten.
+- MacOS support for discovering nameservers
+- the API that I evolved some time ago is pretty ugly. I'm going
+ to re-do it, designed this time.
+
+Stuff it _does_ do:
+- processes /etc/resolv.conf - at least as far as nameserver directives go.
+- tries multiple nameservers.
+- nicer API - see below.
+- returns results in more useful format.
+- optional timing of requests.
+- default 'show' behaviour emulates 'dig' pretty closely.
+
+
+To use:
+
+import DNS
+reqobj=DNS.Request(args)
+reqobj.req(args)
+
+args can be a name, in which case it takes that as the query, and/or a series
+of keyword/value args. (see below for a list of args)
+
+when calling the 'req()' method, it reuses the options specified in the
+DNS.Request() call as defaults.
+
+options are applied in the following order:
+ those specified in the req() call
+ or, if not specified there,
+ those specified in the creation of the Request() object
+ or, if not specified there,
+ those specified in the DNS.defaults dictionary
+
+name servers can be specified in the following ways:
+- by calling DNS.DiscoverNameServers(), which will load the DNS servers
+ from the system's /etc/resolv.conf file on Unix, or from the Registry
+ on windows.
+- by specifying it as an option to the request
+- by manually setting DNS.defaults['server'] to a list of server IP
+ addresses to try
+- XXXX It should be possible to load the DNS servers on a mac os machine,
+ from where-ever they've squirrelled them away
+
+name="host.do.main" # the object being looked up
+qtype="SOA" # the query type, eg SOA, A, MX, CNAME, ANY
+protocol="udp" # "udp" or "tcp" - usually you want "udp"
+server="nameserver" # the name of the nameserver. Note that you might
+ # want to use an IP address here
+rd=1 # "recursion desired" - defaults to 1.
+other: opcode, port, ...
+
+There's also some convenience functions, for the lazy:
+
+to do a reverse lookup:
+>>> print DNS.revlookup("192.189.54.17")
+yarrina.connect.com.au
+
+to look up all MX records for an entry:
+>>> print DNS.mxlookup("connect.com.au")
+[(10, 'yarrina.connect.com.au'), (100, 'warrane.connect.com.au')]
+
+Documentation of the rest of the interface will have to wait for a
+later date. Note that the DnsAsyncRequest stuff is currently not
+working - I haven't looked too closely at why, yet.
+
+There's some examples in the tests/ directory - including test5.py,
+which is even vaguely useful. It looks for the SOA for a domain, checks
+that the primary NS is authoritative, then checks the nameservers
+that it believes are NSs for the domain and checks that they're
+authoritative, and that the zone serial numbers match.
+
+see also README.guido for the original docs.
+
+py3dns is derived from pydns. The sourceforge details below refer to pydns.
+All py3dns issues/comments/etc should be reported via
+https://launchpad.net/py3dns.
+
+comments to me, anthony@interlink.com.au, or to the mailing list,
+pydns-developer@lists.sourceforge.net.
+
+bugs/patches to the tracker on SF -
+ http://sourceforge.net/tracker/?group_id=31674
diff --git a/py3dns.egg-info/PKG-INFO b/py3dns.egg-info/PKG-INFO
new file mode 100644
index 0000000..efff2cd
--- /dev/null
+++ b/py3dns.egg-info/PKG-INFO
@@ -0,0 +1,23 @@
+Metadata-Version: 1.2
+Name: py3dns
+Version: 3.2.1
+Summary: Python 3 DNS library
+Home-page: https://launchpad.net/py3dns
+Author: Anthony Baxter and others
+Author-email: py3dns-hackers@lists.launchpad.net
+Maintainer: Scott Kitterman
+Maintainer-email: scott@kitterman.com
+License: Python License
+Description: Python 3 DNS library:
+
+Keywords: DNS
+Platform: UNKNOWN
+Classifier: Development Status :: 5 - Production/Stable
+Classifier: Environment :: No Input/Output (Daemon)
+Classifier: Intended Audience :: Developers
+Classifier: License :: OSI Approved :: Python License (CNRI Python License)
+Classifier: Natural Language :: English
+Classifier: Operating System :: OS Independent
+Classifier: Programming Language :: Python :: 3
+Classifier: Topic :: Internet :: Name Service (DNS)
+Classifier: Topic :: Software Development :: Libraries :: Python Modules
diff --git a/py3dns.egg-info/SOURCES.txt b/py3dns.egg-info/SOURCES.txt
new file mode 100644
index 0000000..0abd6b4
--- /dev/null
+++ b/py3dns.egg-info/SOURCES.txt
@@ -0,0 +1,34 @@
+CHANGES
+CREDITS.txt
+LICENSE
+MANIFEST.in
+README-guido.txt
+README.txt
+setup.py
+test.py
+DNS/Base.py
+DNS/Class.py
+DNS/Lib.py
+DNS/Opcode.py
+DNS/Status.py
+DNS/Type.py
+DNS/__init__.py
+DNS/lazy.py
+DNS/win32dns.py
+DNS/tests/__init__.py
+DNS/tests/testPackers.py
+DNS/tests/test_base.py
+py3dns.egg-info/PKG-INFO
+py3dns.egg-info/SOURCES.txt
+py3dns.egg-info/dependency_links.txt
+py3dns.egg-info/not-zip-safe
+py3dns.egg-info/top_level.txt
+tests/test.py
+tests/test2.py
+tests/test4.py
+tests/test5.py
+tests/test6.py
+tests/test7.py
+tests/testsrv.py
+tools/caching.py
+tools/named-perf.py \ No newline at end of file
diff --git a/py3dns.egg-info/dependency_links.txt b/py3dns.egg-info/dependency_links.txt
new file mode 100644
index 0000000..8b13789
--- /dev/null
+++ b/py3dns.egg-info/dependency_links.txt
@@ -0,0 +1 @@
+
diff --git a/py3dns.egg-info/not-zip-safe b/py3dns.egg-info/not-zip-safe
new file mode 100644
index 0000000..8b13789
--- /dev/null
+++ b/py3dns.egg-info/not-zip-safe
@@ -0,0 +1 @@
+
diff --git a/py3dns.egg-info/top_level.txt b/py3dns.egg-info/top_level.txt
new file mode 100644
index 0000000..a42182f
--- /dev/null
+++ b/py3dns.egg-info/top_level.txt
@@ -0,0 +1 @@
+DNS
diff --git a/setup.cfg b/setup.cfg
new file mode 100644
index 0000000..8bfd5a1
--- /dev/null
+++ b/setup.cfg
@@ -0,0 +1,4 @@
+[egg_info]
+tag_build =
+tag_date = 0
+
diff --git a/setup.py b/setup.py
new file mode 100644
index 0000000..519ae7a
--- /dev/null
+++ b/setup.py
@@ -0,0 +1,36 @@
+import sys,os
+
+sys.path.insert(0,os.getcwd())
+
+from setuptools import setup
+
+import DNS
+
+setup(
+ #-- Package description
+ name = 'py3dns',
+ license = 'Python License',
+ version = DNS.__version__,
+ description = 'Python 3 DNS library',
+ long_description = """Python 3 DNS library:
+""",
+ author = 'Anthony Baxter and others',
+ author_email = 'py3dns-hackers@lists.launchpad.net ',
+ maintainer="Scott Kitterman",
+ maintainer_email="scott@kitterman.com",
+ url = 'https://launchpad.net/py3dns',
+ packages = ['DNS'], keywords = ['DNS'],
+ zip_safe = False,
+ classifiers = [
+ 'Development Status :: 5 - Production/Stable',
+ 'Environment :: No Input/Output (Daemon)',
+ 'Intended Audience :: Developers',
+ 'License :: OSI Approved :: Python License (CNRI Python License)',
+ 'Natural Language :: English',
+ 'Operating System :: OS Independent',
+ 'Programming Language :: Python :: 3',
+ 'Topic :: Internet :: Name Service (DNS)',
+ 'Topic :: Software Development :: Libraries :: Python Modules'
+ ]
+)
+
diff --git a/test.py b/test.py
new file mode 100755
index 0000000..ab85e03
--- /dev/null
+++ b/test.py
@@ -0,0 +1,8 @@
+#! /usr/bin/python3
+
+import unittest
+import doctest
+import DNS
+from DNS.tests import test_suite
+
+unittest.TextTestRunner().run(test_suite())
diff --git a/tests/test.py b/tests/test.py
new file mode 100755
index 0000000..772f3a3
--- /dev/null
+++ b/tests/test.py
@@ -0,0 +1,43 @@
+#!/usr/bin/python3
+
+import sys ; sys.path.insert(0, '..')
+
+import DNS
+# automatically load nameserver(s) from /etc/resolv.conf
+# (works on unix - on others, YMMV)
+DNS.ParseResolvConf()
+
+# lets do an all-in-one request
+# set up the request object
+r = DNS.DnsRequest(name='munnari.oz.au',qtype='A')
+# do the request
+a=r.req()
+# and do a pretty-printed output
+a.show()
+
+# now lets setup a reusable request object
+r = DNS.DnsRequest(qtype='ANY')
+res = r.req("a.root-servers.nex",qtype='ANY')
+res.show()
+res = r.req("proxy.connect.com.au")
+res.show()
+
+# do a TCP reply
+r = DNS.DnsRequest("imsavscan.netvigator.com", qtype="A", server=['8.8.8.8'], protocol='tcp', timeout=300)
+res = r.req()
+res.show()
+
+# look up a TXT record
+r = DNS.DnsRequest("kitterman.com", qtype="TXT", protocol='tcp')
+res = r.req()
+res.show()
+
+# look up a AAAA record
+r = DNS.DnsRequest("mailout03.controlledmail.com", qtype="AAAA", protocol='tcp')
+res = r.req(resulttype='text')
+res.show()
+
+# look up a A record set that falls over to EDNS0/TCP
+r = DNS.DnsRequest("long-a-record.tana.it", qtype="A", protocol='udp')
+res = r.req(resulttype='text')
+res.show()
diff --git a/tests/test2.py b/tests/test2.py
new file mode 100755
index 0000000..d6d64f6
--- /dev/null
+++ b/tests/test2.py
@@ -0,0 +1,17 @@
+#!/usr/bin/python3
+
+import sys ; sys.path.insert(0, '..')
+import DNS
+# automatically load nameserver(s) from /etc/resolv.conf
+# (works on unix - on others, YMMV)
+DNS.ParseResolvConf()
+
+r=DNS.Request(qtype='mx')
+res = r.req('connect.com.au')
+res.show()
+
+r=DNS.Request(qtype='soa')
+res = r.req('connect.com.au')
+res.show()
+
+print(DNS.revlookup('192.189.54.17'))
diff --git a/tests/test4.py b/tests/test4.py
new file mode 100755
index 0000000..50a328d
--- /dev/null
+++ b/tests/test4.py
@@ -0,0 +1,10 @@
+#!/usr/bin/python3
+
+import sys ; sys.path.insert(0, '..')
+
+import DNS
+
+DNS.ParseResolvConf()
+
+print(DNS.mxlookup("hotmail.com"))
+print(DNS.mxlookup("connect.com.au"))
diff --git a/tests/test5.py b/tests/test5.py
new file mode 100755
index 0000000..9115500
--- /dev/null
+++ b/tests/test5.py
@@ -0,0 +1,62 @@
+#!/usr/bin/python3
+
+import sys ; sys.path.insert(0, '..')
+import DNS
+
+def Error(mesg):
+ import sys
+ print(sys.argv[0],"ERROR:")
+ print(mesg)
+ sys.exit(1)
+
+def main():
+ import sys
+ if len(sys.argv) != 2:
+ Error("usage: %s somedomain.com"%sys.argv[0])
+ domain = sys.argv[1]
+ nslist = GetNS(domain)
+ print("According to the primary, the following are nameservers for this domain")
+ for ns in nslist:
+ print(" ",ns)
+ CheckNS(ns,domain)
+
+
+def GetNS(domain):
+ import DNS
+ # hm. this might fail if a server is off the air.
+ r = DNS.Request(domain,qtype='SOA').req()
+ if r.header['status'] != 'NOERROR':
+ Error("received status of %s when attempting to look up SOA for domain"%
+ (r.header['status']))
+ if r.header['status'] == 'NXDOMAIN':
+ print("SOA request was NXDOMAIN")
+ primary = ''
+ else:
+ if r.answers:
+ primary,email,serial,refresh,retry,expire,minimum = r.answers[0]['data']
+ print("Primary nameserver for domain %s is: %s"%(domain,primary))
+ else:
+ print("No answer to SOA query")
+ primary = ''
+ r = DNS.Request(domain,qtype='NS',server=primary,aa=1).req()
+ if r.header['status'] != 'NOERROR':
+ Error("received status of %s when attempting to query %s for NSs"%
+ (r.header['status']))
+ if r.header['aa'] != 1 and primary is not '':
+ Error("primary NS %s doesn't believe that it's authoritative!"% primary)
+ nslist = [x['data'] for x in r.answers]
+ print("Full list of nameservers for domain %s is: %s"%(domain,nslist))
+ return nslist
+
+def CheckNS(nameserver,domain):
+ r = DNS.Request(domain,qtype='SOA',server=nameserver,aa=1).req()
+ if r.header['status'] != 'NOERROR':
+ Error("received status of %s when attempting to query %s for NS"%
+ (r.header['status']))
+ if r.header['aa'] != 1:
+ Error("NS %s doesn't believe that it's authoritative!"% nameserver)
+ primary,email,serial,refresh,retry,expire,minimum = r.answers[0]['data']
+ print(" NS has serial",serial[1])
+
+if __name__ == "__main__":
+ main()
diff --git a/tests/test6.py b/tests/test6.py
new file mode 100755
index 0000000..8c0a1e9
--- /dev/null
+++ b/tests/test6.py
@@ -0,0 +1,28 @@
+#!/usr/bin/python3
+
+import sys ; sys.path.insert(0, '..')
+import DNS
+req = DNS.DnsRequest('mailout03.controlledmail.com', qtype='AAAA', protocol='tcp')
+resp = req.req()
+print(resp.answers[0]['name'], resp.answers[0]['data'])
+
+req1 = DNS.DnsRequest('mailout03.controlledmail.com', qtype='AAAA', protocol='udp')
+resp1 = req1.req(resulttype='binary')
+print(resp1.answers[0]['name'], resp1.answers[0]['data'])
+
+req2 = DNS.DnsRequest('mailout03.controlledmail.com', qtype='AAAA', protocol='tcp')
+resp2 = req2.req(resulttype='text')
+print(resp2.answers[0]['name'], resp2.answers[0]['data'])
+
+req3 = DNS.DnsRequest('mailout03.controlledmail.com', qtype='A', protocol='tcp')
+resp3 = req3.req()
+print(resp3.answers[0]['name'], resp3.answers[0]['data'])
+
+req4 = DNS.DnsRequest('mailout03.controlledmail.com', qtype='A', protocol='udp', resulttype='binary')
+resp4 = req4.req(resulttype='binary')
+print(resp4.answers[0]['name'], resp4.answers[0]['data'])
+
+req5 = DNS.DnsRequest('mailout03.controlledmail.com', qtype='A', protocol='tcp')
+resp5 = req5.req(resulttype='text')
+print(resp5.answers[0]['name'], resp5.answers[0]['data'])
+
diff --git a/tests/test7.py b/tests/test7.py
new file mode 100755
index 0000000..c040cd3
--- /dev/null
+++ b/tests/test7.py
@@ -0,0 +1,40 @@
+#!/usr/bin/python3
+
+import sys ; sys.path.insert(0, '..')
+import DNS
+req = DNS.DnsRequest('google.com', qtype='AAAA', protocol='tcp')
+resp = req.req()
+print(resp.answers[0]['name'], resp.answers[0]['data'])
+
+req1 = DNS.DnsRequest('google.com', qtype='AAAA', protocol='udp')
+resp1 = req1.req(resulttype='binary')
+print(resp1.answers[0]['name'], resp1.answers[0]['data'])
+
+req2 = DNS.DnsRequest('google.com', qtype='AAAA', protocol='tcp')
+resp2 = req2.req(resulttype='text')
+print(resp2.answers[0]['name'], resp2.answers[0]['data'])
+
+req3 = DNS.DnsRequest('mailout03.controlledmail.com', qtype='TXT', protocol='tcp')
+resp3 = req3.req()
+print(resp3.answers[0]['name'], resp3.answers[0]['data'])
+
+req4 = DNS.DnsRequest('mailout03.controlledmail.com', qtype='TXT', protocol='udp', resulttype='binary')
+resp4 = req4.req(resulttype='binary')
+print(resp4.answers[0]['name'], resp4.answers[0]['data'])
+
+req5 = DNS.DnsRequest('mailout03.controlledmail.com', qtype='TXT', protocol='tcp')
+resp5 = req5.req(resulttype='text')
+print(resp5.answers[0]['name'], resp5.answers[0]['data'])
+
+req6 = DNS.DnsRequest('controlledmail.com', qtype='MX', protocol='tcp')
+resp6 = req6.req()
+print(resp6.answers[0]['name'], resp6.answers[0]['data'])
+
+req7 = DNS.DnsRequest('controlledmail.com', qtype='MX', protocol='udp', resulttype='binary')
+resp7 = req6.req(resulttype='binary')
+print(resp7.answers[0]['name'], resp7.answers[0]['data'])
+
+req8 = DNS.DnsRequest('controlledmail.com', qtype='MX', protocol='tcp')
+resp8 = req8.req(resulttype='text')
+print(resp8.answers[0]['name'], resp8.answers[0]['data'])
+
diff --git a/tests/testsrv.py b/tests/testsrv.py
new file mode 100755
index 0000000..09cb2c6
--- /dev/null
+++ b/tests/testsrv.py
@@ -0,0 +1,13 @@
+#!/usr/bin/python3
+
+import sys ; sys.path.insert(0, '..')
+
+import DNS
+# automatically load nameserver(s) from /etc/resolv.conf
+# (works on unix - on others, YMMV)
+DNS.ParseResolvConf()
+
+r=DNS.Request(qtype='srv')
+res = r.req('_ldap._tcp.openldap.org')
+res.show()
+print(res.answers)
diff --git a/tools/caching.py b/tools/caching.py
new file mode 100644
index 0000000..9d02583
--- /dev/null
+++ b/tools/caching.py
@@ -0,0 +1,55 @@
+#
+# From: KevinL <darius@bofh.net.au>
+# A simple dns answer cache - it's author notes:
+# "It's probably really bodgy code, tho - it was my early python..."
+# So don't send him abusive messages if you hate it.
+#
+class DNSCache:
+ """
+ Covers the DNS object, keeps a cache of answers. Clumsy as hell.
+ """
+ forCache = {}
+ revCache = {}
+ # cache failures for this long, in seconds
+ negCache = 3600
+
+ def __init__(self):
+ import DNS
+ DNS.ParseResolvConf()
+
+ def lookup(self,IP = None,name = None):
+ import DNS
+ now = time.time()
+ if (not IP) and (not name):
+ return None
+ if IP:
+ if type(IP) != type(''):
+ return None
+ a = string.split(IP, '.')
+ a.reverse()
+ name = string.join(a, '.')+'.in-addr.arpa'
+ cache = self.revCache
+ qt = 'ptr'
+ else:
+ if type(name) != type(''):
+ return None
+ cache = self.forCache
+ qt = 'a'
+ if name in cache:
+ # Check if it's timed out or not
+ if cache[name][1] < now:
+ del(cache[name])
+ else:
+ return(cache[name][0])
+ x = DNS.DnsRequest(name,qtype=qt)
+ try:
+ x.req()
+ except:
+ return 'Timeout'
+ if len(x.response.answers) > 0:
+ cache[name] = ( x.response.answers[0]['data'], x.time_finish +
+ x.response.answers[0]['ttl'])
+ else:
+ cache[name] = (None,now+self.negCache)
+ return cache[name][0]
+
diff --git a/tools/named-perf.py b/tools/named-perf.py
new file mode 100755
index 0000000..46c370b
--- /dev/null
+++ b/tools/named-perf.py
@@ -0,0 +1,63 @@
+#!/usr/bin/python3
+
+servers = [ "192.92.129.1",
+ "192.189.54.17", # yarrina
+ "192.189.54.33", # warrane
+ "203.8.183.1", # yalumba
+ "192.189.54.65", # gnamma
+ "128.250.1.21", # munnari
+
+ ]
+
+lookups = [ ( 'munnari.oz.au', 'A' ),
+ ( 'connect.com.au', 'SOA' ),
+ ( 'parc.xerox.com', 'MX' ),
+ ( 'bogus.example.net', 'A'),
+ ]
+
+rpts = 5
+
+def main():
+ import DNS
+ import socket
+ import time
+ res = {}
+ for server in servers:
+ res[server] = [100000,0,0,0] # min,max,tot,failed
+ for what,querytype in lookups:
+ for count in range(rpts):
+ for server in servers:
+ d = DNS.DnsRequest(server=server,timeout=1)
+ fail = 0
+ timingstart = time.time()
+ try:
+ r=d.req(name=what,qtype=querytype)
+ except DNS.Error:
+ fail = 1
+ timingfinish = time.time()
+ if fail:
+ res[server][3] = res[server][3] + 1
+ print("(failed)",res[server][3])
+ if 0:
+ if r.header['ancount'] == 0:
+ print("WARNING: Server",server,"got no answers for", \
+ what, querytype)
+ t = int(1000 * (timingfinish - timingstart))
+ print(server,"took",t,"ms for",what,querytype)
+ res[server][0] = min(t,res[server][0])
+ res[server][1] = max(t,res[server][1])
+ res[server][2] = res[server][2] + t
+ for server in servers:
+ queries = rpts * len(lookups)
+ r = res[server]
+ print(server)
+ print("%-30s %2d/%2d(%3.2f%%) %dms/%dms/%dms min/avg/max" % (
+ socket.gethostbyaddr(server)[0],
+ queries - r[3], queries,
+ ((queries-r[3])*100.0)/queries,
+ r[0],
+ r[2] / queries,
+ r[1]))
+
+if __name__ == "__main__":
+ main()