Import py3dns_3.2.1.orig.tar.gz

[dgit import orig py3dns_3.2.1.orig.tar.gz]
This commit is contained in:
Scott Kitterman 2019-09-04 14:33:10 +02:00
commit 2dadd3e3e6
36 changed files with 3189 additions and 0 deletions

81
CHANGES Normal file
View File

@ -0,0 +1,81 @@
3.2.1 Wed, Sep 4, 2019
* Add support for setting timeout for convenience methods in DNS.lazy
* Fixed DNS.req resulttype error format (LP: #1842423)
* Use errno.EADDRINUSE instead of the hard coded Linux value for improved
portability (LP: #1793540)
* Update test suite to correct for use of no longer existing DNS records
* Set timeout=1 for tests so testing with a non-responsive nameserver will
finish in a reasonable time
3.2.0 Mon, 23 Jul 2018
* Rename internal use of async since it is a reserved word in python3.7
(LP: #1776027)
* Switch from distutils to setuptools
* Ship test.py in the tarball
3.1.1 Thu, 06 Oct 2016 22:00:13 -0400
* Update test suite for new example.org IP addresses
* Fix missing bits for use of ipaddr-py with python3 < 3.3 (LP: #1319611)
- Patch thanks to Arfrever Frehtes Taifersar Arahesis
* Correct error in _DiscoverNameServers from OS X implementation in 3.0.1
that prevents name server discovery on windows (LP: #1442424)
* Correct encoding issue with label length to fix issues with DNS labels
greater than 46 characters (LP: #1502853)
- Thanks to Petr Czepiec for the patch
* Use full path to /usr/sbin/scutil on OS X since scutil is not always in the
search path (LP: #1630844)
3.1.0 Thu Apr 24 23:52:00 EDT 2014
* Raise DNSError when no nameservers have been found by the time a
Base.DNSRequest object is initialized
* Add new DNS.DnsRequest.qry function to supercede DNS.DnsRequest.req to
allow for non-backward compatible changes to be made in .qry while
maintaining full backward compatibility with 3.0.3 and later in the 3.0
series in .req
* Add options for 'resulttype' to DnsResult.qry to allow for binary, integer,
or text data to be returned for IP addresses
* The default result type for IPv4 and IPv6 addresses in DNS.DnsRequest.qry
is an ipaddress object
* The ipaddress module is used internally. This is included in python3.3 and
the ipaddr-py module from https://code.google.com/p/ipaddr-py/ can be used
with python3.2
* New unittest based test suite - thanks to Diane Trout
3.0.4 Wed Aug 7 02:25:00 EDT 2013
* Fix timeouts associated with only one of several available nameservers
being unavailable(LP: #1209071):
- Only raise timeout error after trying all available servers
- Stop lookups once an answer is gotten
* Removed unmaintained spec files
3.0.3 Wed May 29 00:05:00 EDT 2013
* Revert returning IPv6 addresses from AAAA lookups as string. Causing
incompatiblities that are deeply annoying to fix on the other end.
3.0.2 Thu Jan 19 10:59:00 EST 2012
* Add more granular exception sub classes of DNSError, see SF #3388075
- Thanks to Julian Mehnle for the patch
* Add AAAA record support, works like A records
- Thanks to Shane Kerr for the patch
3.0.1 Mon Jul 18 19:46:30 EDT 2011
* Add CHANGES to document post-Python 3 port changes
* Add LICENSE file
* Port pydns 2.3.5 changes to py3dns
- Handle large TCP replies (change to blocking IO with timeout)
- Add new lazy.dnslookup function to retrieve answer data for any query
type
- Add large TCP reply test to tests/test.py
* Add automatic name server discovery for OS X
3.0.0 Wed Feb 9 23:35:22 EST 2011
Ported to Python3 by Scott Kitterman <scott@kitterman.com>. This is mostly a
minimal port to work with Python3 (tested with python3.2) plus addition of
some of the patches that people have submitted on Sourceforge. It should be
fully API compatible with 2.3.

18
CREDITS.txt Normal file
View File

@ -0,0 +1,18 @@
This code was originally based on the DNS library created
by Guido van Rossum somewhere near the dawn of time.
Since then, as well as myself (Anthony), I have had contributions
by:
Michael Ströder
Bastian Kleineidam
Timothy J. Miller
Wolfgang Strobl
Arnaud Fontaine
Scott Kitterman
Stuart Gathman
Diane Trout
It's possible there's other people - the old RCS logs for my
code were lost some time ago. The list above is almost certainly
incomplete - let me know if I've forgotten you...

480
DNS/Base.py Normal file
View File

@ -0,0 +1,480 @@
"""
$Id$
This file is part of the py3dns project.
Homepage: https://launchpad.net/py3dns
This code is covered by the standard Python License. See LICENSE for details.
Changes for Python3 port © 2011-14 Scott Kitterman <scott@kitterman.com>
Base functionality. Request and Response classes, that sort of thing.
"""
import socket, string, types, time, select
import errno
from . import Type,Class,Opcode
import asyncore
#
# This random generator is used for transaction ids and port selection. This
# is important to prevent spurious results from lost packets, and malicious
# cache poisoning. This doesn't matter if you are behind a caching nameserver
# or your app is a primary DNS server only. To install your own generator,
# replace DNS.Base.random. SystemRandom uses /dev/urandom or similar source.
#
try:
from random import SystemRandom
random = SystemRandom()
except:
import random
class DNSError(Exception): pass
class ArgumentError(DNSError): pass
class SocketError(DNSError): pass
class TimeoutError(DNSError): pass
class ServerError(DNSError):
def __init__(self, message, rcode):
DNSError.__init__(self, message, rcode)
self.message = message
self.rcode = rcode
class IncompleteReplyError(DNSError): pass
# Lib uses some of the above exception classes, so import after defining.
from . import Lib
defaults= { 'protocol':'udp', 'port':53, 'opcode':Opcode.QUERY,
'qtype':Type.A, 'rd':1, 'timing':1, 'timeout': 30, 'server_rotate': 0,
'server': [] }
def ParseResolvConf(resolv_path="/etc/resolv.conf"):
"parses the /etc/resolv.conf file and sets defaults for name servers"
with open(resolv_path, 'r') as stream:
return ParseResolvConfFromIterable(stream)
def ParseResolvConfFromIterable(lines):
"parses a resolv.conf formatted stream and sets defaults for name servers"
global defaults
for line in lines:
line = line.strip()
if not line or line[0]==';' or line[0]=='#':
continue
fields=line.split()
if len(fields) < 2:
continue
if fields[0]=='domain' and len(fields) > 1:
defaults['domain']=fields[1]
if fields[0]=='search':
pass
if fields[0]=='options':
pass
if fields[0]=='sortlist':
pass
if fields[0]=='nameserver':
defaults['server'].append(fields[1])
def _DiscoverNameServers():
import sys
if sys.platform in ('win32', 'nt'):
from . import win32dns
defaults['server']=win32dns.RegistryResolve()
elif sys.platform == 'darwin':
ParseOSXSysConfig()
else:
return ParseResolvConf()
def DiscoverNameServers():
"""Don't call, only here for backward compatability. We do discovery for
you automatically.
"""
pass
class DnsRequest:
""" high level Request object """
def __init__(self,*name,**args):
self.donefunc=None
self.py3async=None
self.defaults = {}
self.argparse(name,args)
self.defaults = self.args
self.tid = 0
self.resulttype = ''
if len(self.defaults['server']) == 0:
raise DNSError('No working name servers discovered')
def argparse(self,name,args):
if not name and 'name' in self.defaults:
args['name'] = self.defaults['name']
if type(name) is bytes or type(name) is str:
args['name']=name
else:
if len(name) == 1:
if name[0]:
args['name']=name[0]
if defaults['server_rotate'] and \
type(defaults['server']) == types.ListType:
defaults['server'] = defaults['server'][1:]+defaults['server'][:1]
for i in list(defaults.keys()):
if i not in args:
if i in self.defaults:
args[i]=self.defaults[i]
else:
args[i]=defaults[i]
if type(args['server']) == bytes or type(args['server']) == str:
args['server'] = [args['server']]
self.args=args
def socketInit(self,a,b):
self.s = socket.socket(a,b)
def processUDPReply(self):
if self.timeout > 0:
r,w,e = select.select([self.s],[],[],self.timeout)
if not len(r):
raise TimeoutError('Timeout')
(self.reply, self.from_address) = self.s.recvfrom(65535)
self.time_finish=time.time()
self.args['server']=self.ns
return self.processReply()
def _readall(self,f,count):
res = f.read(count)
while len(res) < count:
if self.timeout > 0:
# should we restart timeout everytime we get a dribble of data?
rem = self.time_start + self.timeout - time.time()
if rem <= 0: raise DNSError('Timeout')
self.s.settimeout(rem)
buf = f.read(count - len(res))
if not buf:
raise DNSError('incomplete reply - %d of %d read' % (len(res),count))
res += buf
return res
def processTCPReply(self):
if self.timeout > 0:
self.s.settimeout(self.timeout)
else:
self.s.settimeout(None)
f = self.s.makefile('rb')
try:
header = self._readall(f,2)
count = Lib.unpack16bit(header)
self.reply = self._readall(f,count)
finally:
f.close()
self.time_finish=time.time()
self.args['server']=self.ns
return self.processReply()
def processReply(self):
self.args['elapsed']=(self.time_finish-self.time_start)*1000
if not self.resulttype:
u = Lib.Munpacker(self.reply)
elif self.resulttype == 'default':
u = Lib.MunpackerDefault(self.reply)
elif self.resulttype == 'binary':
u = Lib.MunpackerBinary(self.reply)
elif self.resulttype == 'text':
u = Lib.MunpackerText(self.reply)
elif self.resulttype == 'integer':
u = Lib.MunpackerInteger(self.reply)
else:
raise SyntaxError('Unknown resulttype: ' + self.resulttype)
r=Lib.DnsResult(u,self.args)
r.args=self.args
#self.args=None # mark this DnsRequest object as used.
return r
#### TODO TODO TODO ####
# if protocol == 'tcp' and qtype == Type.AXFR:
# while 1:
# header = f.read(2)
# if len(header) < 2:
# print '========== EOF =========='
# break
# count = Lib.unpack16bit(header)
# if not count:
# print '========== ZERO COUNT =========='
# break
# print '========== NEXT =========='
# reply = f.read(count)
# if len(reply) != count:
# print '*** Incomplete reply ***'
# break
# u = Lib.Munpacker(reply)
# Lib.dumpM(u)
def getSource(self):
"Pick random source port to avoid DNS cache poisoning attack."
while True:
try:
source_port = random.randint(1024,65535)
self.s.bind(('', source_port))
break
except socket.error as msg:
# errno.EADDRINUSE, 'Address already in use'
if msg.errno != errno.EADDRINUSE: raise
def conn(self):
self.getSource()
self.s.connect((self.ns,self.port))
def qry(self,*name,**args):
'''
Request function for the DnsRequest class. In addition to standard
DNS args, the special pydns arg 'resulttype' can optionally be passed.
Valid resulttypes are 'default', 'text', 'decimal', and 'binary'.
Defaults are configured to be compatible with pydns:
AAAA: decimal
Others: text
'''
" needs a refactoring "
self.argparse(name,args)
#if not self.args:
# raise ArgumentError, 'reinitialize request before reuse'
protocol = self.args['protocol']
self.port = self.args['port']
self.tid = random.randint(0,65535)
self.timeout = self.args['timeout'];
opcode = self.args['opcode']
rd = self.args['rd']
server=self.args['server']
if 'resulttype' in self.args:
self.resulttype = self.args['resulttype']
else:
self.resulttype = 'default'
if type(self.args['qtype']) == bytes or type(self.args['qtype']) == str:
try:
qtype = getattr(Type, str(self.args['qtype'].upper()))
except AttributeError:
raise ArgumentError('unknown query type')
else:
qtype = self.args['qtype']
if 'name' not in self.args:
print((self.args))
raise ArgumentError('nothing to lookup')
qname = self.args['name']
if qtype == Type.AXFR and protocol != 'tcp':
print('Query type AXFR, protocol forced to TCP')
protocol = 'tcp'
#print('QTYPE %d(%s)' % (qtype, Type.typestr(qtype)))
m = Lib.Mpacker()
# jesus. keywords and default args would be good. TODO.
m.addHeader(self.tid,
0, opcode, 0, 0, rd, 0, 0, 0,
1, 0, 0, 0)
m.addQuestion(qname, qtype, Class.IN)
self.request = m.getbuf()
try:
if protocol == 'udp':
self.sendUDPRequest(server)
else:
self.sendTCPRequest(server)
except socket.error as reason:
raise SocketError(reason)
if self.py3async:
return None
else:
return self.response
def req(self,*name,**args):
" needs a refactoring "
self.argparse(name,args)
#if not self.args:
# raise ArgumentError, 'reinitialize request before reuse'
try:
if self.args['resulttype']:
raise ArgumentError('Restulttype {0} set with DNS.req, use DNS.qry to specify result type.'.format(self.args['resulttype']))
except:
# resulttype isn't set and that's what we want for DNS.req
pass
protocol = self.args['protocol']
self.port = self.args['port']
self.tid = random.randint(0,65535)
self.timeout = self.args['timeout'];
opcode = self.args['opcode']
rd = self.args['rd']
server=self.args['server']
if type(self.args['qtype']) == bytes or type(self.args['qtype']) == str:
try:
qtype = getattr(Type, str(self.args['qtype'].upper()))
except AttributeError:
raise ArgumentError('unknown query type')
else:
qtype = self.args['qtype']
if 'name' not in self.args:
print((self.args))
raise ArgumentError('nothing to lookup')
qname = self.args['name']
if qtype == Type.AXFR and protocol != 'tcp':
print('Query type AXFR, protocol forced to TCP')
protocol = 'tcp'
#print('QTYPE %d(%s)' % (qtype, Type.typestr(qtype)))
m = Lib.Mpacker()
# jesus. keywords and default args would be good. TODO.
m.addHeader(self.tid,
0, opcode, 0, 0, rd, 0, 0, 0,
1, 0, 0, 0)
m.addQuestion(qname, qtype, Class.IN)
self.request = m.getbuf()
try:
if protocol == 'udp':
self.sendUDPRequest(server)
else:
self.sendTCPRequest(server)
except socket.error as reason:
raise SocketError(reason)
if self.py3async:
return None
else:
return self.response
def sendUDPRequest(self, server):
"refactor me"
first_socket_error = None
self.response=None
for self.ns in server:
try:
if self.ns.count(':'):
if hasattr(socket,'has_ipv6') and socket.has_ipv6:
self.socketInit(socket.AF_INET6, socket.SOCK_DGRAM)
else: continue
else:
self.socketInit(socket.AF_INET, socket.SOCK_DGRAM)
try:
# TODO. Handle timeouts &c correctly (RFC)
self.time_start=time.time()
self.conn()
if not self.py3async:
self.s.send(self.request)
r=self.processUDPReply()
# Since we bind to the source port and connect to the
# destination port, we don't need to check that here,
# but do make sure it's actually a DNS request that the
# packet is in reply to.
while r.header['id'] != self.tid \
or self.from_address[1] != self.port:
r=self.processUDPReply()
self.response = r
# FIXME: check waiting async queries
finally:
if not self.py3async:
self.s.close()
except socket.error as e:
# Keep trying more nameservers, but preserve the first error
# that occurred so it can be reraised in case none of the
# servers worked:
first_socket_error = first_socket_error or e
continue
except TimeoutError as t:
first_socket_error = first_socket_error or t
continue
if self.response:
break
if not self.response and first_socket_error:
raise first_socket_error
def sendTCPRequest(self, server):
" do the work of sending a TCP request "
first_socket_error = None
self.response=None
for self.ns in server:
#print "trying tcp",self.ns
try:
if self.ns.count(':'):
if hasattr(socket,'has_ipv6') and socket.has_ipv6:
self.socketInit(socket.AF_INET6, socket.SOCK_STREAM)
else: continue
else:
self.socketInit(socket.AF_INET, socket.SOCK_STREAM)
try:
# TODO. Handle timeouts &c correctly (RFC)
self.time_start=time.time()
self.conn()
buf = Lib.pack16bit(len(self.request))+self.request
# Keep server from making sendall hang
self.s.setblocking(0)
# FIXME: throws WOULDBLOCK if request too large to fit in
# system buffer
self.s.sendall(buf)
# SHUT_WR breaks blocking IO with google DNS (8.8.8.8)
#self.s.shutdown(socket.SHUT_WR)
r=self.processTCPReply()
if r.header['id'] == self.tid:
self.response = r
break
finally:
self.s.close()
except socket.error as e:
first_socket_error = first_socket_error or e
continue
except TimeoutError as t:
first_socket_error = first_socket_error or t
continue
if self.response:
break
if not self.response and first_socket_error:
raise first_socket_error
#class DnsAsyncRequest(DnsRequest):
class DnsAsyncRequest(DnsRequest,asyncore.dispatcher_with_send):
" an asynchronous request object. out of date, probably broken "
def __init__(self,*name,**args):
DnsRequest.__init__(self, *name, **args)
# XXX todo
if 'done' in args and args['done']:
self.donefunc=args['done']
else:
self.donefunc=self.showResult
#self.realinit(name,args) # XXX todo
self.py3async=1
def conn(self):
self.getSource()
self.connect((self.ns,self.port))
self.time_start=time.time()
if 'start' in self.args and self.args['start']:
asyncore.dispatcher.go(self)
def socketInit(self,a,b):
self.create_socket(a,b)
asyncore.dispatcher.__init__(self)
self.s=self
def handle_read(self):
if self.args['protocol'] == 'udp':
self.response=self.processUDPReply()
if self.donefunc:
self.donefunc(*(self,))
def handle_connect(self):
self.send(self.request)
def handle_write(self):
pass
def showResult(self,*s):
self.response.show()
def ParseOSXSysConfig():
"Retrieves the current Mac OS X resolver settings using the scutil(8) command."
import os, re
scutil = os.popen('/usr/sbin/scutil --dns', 'r')
res_re = re.compile('^\s+nameserver[]0-9[]*\s*\:\s*(\S+)$')
sets = [ ]
currentset = None
while True:
l = scutil.readline()
if not l:
break
l = l.rstrip()
if len(l) < 1 or l[0] not in string.whitespace:
currentset = None
continue
m = res_re.match(l)
if m:
if currentset is None:
currentset = [ ]
sets.append(currentset)
currentset.append(m.group(1))
scutil.close()
# Someday: Figure out if we should do something other than simply concatenate the sets.
for currentset in sets:
defaults['server'].extend(currentset)

35
DNS/Class.py Normal file
View File

@ -0,0 +1,35 @@
"""
$Id$
This file is part of the py3dns project.
Homepage: https://launchpad.net/py3dns
This code is covered by the standard Python License. See LICENSE for details.
CLASS values (section 3.2.4)
"""
IN = 1 # the Internet
CS = 2 # the CSNET class (Obsolete - used only for examples in
# some obsolete RFCs)
CH = 3 # the CHAOS class. When someone shows me python running on
# a Symbolics Lisp machine, I'll look at implementing this.
HS = 4 # Hesiod [Dyer 87]
# QCLASS values (section 3.2.5)
ANY = 255 # any class
# Construct reverse mapping dictionary
_names = dir()
classmap = {}
for _name in _names:
if _name[0] != '_': classmap[eval(_name)] = _name
def classstr(klass):
if klass in classmap: return classmap[klass]
else: return repr(klass)

809
DNS/Lib.py Normal file
View File

@ -0,0 +1,809 @@
# -*- encoding: utf-8 -*-
"""
$Id$
This file is part of the py3dns project.
Homepage: https://launchpad.net/py3dns
This code is covered by the standard Python License. See LICENSE for details.
Changes for Python3 port © 2011-13 Scott Kitterman <scott@kitterman.com>
Library code. Largely this is packers and unpackers for various types.
"""
#
#
# See RFC 1035:
# ------------------------------------------------------------------------
# Network Working Group P. Mockapetris
# Request for Comments: 1035 ISI
# November 1987
# Obsoletes: RFCs 882, 883, 973
#
# DOMAIN NAMES - IMPLEMENTATION AND SPECIFICATION
# ------------------------------------------------------------------------
import types
import socket
from . import Type
from . import Class
from . import Opcode
from . import Status
import DNS
from .Base import DNSError
try:
import ipaddress
except ImportError:
import ipaddr as ipaddress
LABEL_UTF8 = False
LABEL_ENCODING = 'idna'
class UnpackError(DNSError): pass
class PackError(DNSError): pass
# Low-level 16 and 32 bit integer packing and unpacking
from struct import pack as struct_pack
from struct import unpack as struct_unpack
from socket import inet_ntoa, inet_aton, inet_ntop, AF_INET6
def pack16bit(n):
return struct_pack('!H', n)
def pack32bit(n):
return struct_pack('!L', n)
def unpack16bit(s):
return struct_unpack('!H', s)[0]
def unpack32bit(s):
return struct_unpack('!L', s)[0]
def addr2bin(addr):
# Updated from pyspf
"""Convert a string IPv4 address into an unsigned integer.
Examples::
>>> addr2bin('127.0.0.1')
2130706433
>>> addr2bin('127.0.0.1') == socket.INADDR_LOOPBACK
1
>>> addr2bin('255.255.255.254')
4294967294L
>>> addr2bin('192.168.0.1')
3232235521L
Unlike old DNS.addr2bin, the n, n.n, and n.n.n forms for IP addresses
are handled as well::
>>> addr2bin('10.65536')
167837696
>>> 10 * (2 ** 24) + 65536
167837696
>>> addr2bin('10.93.512')
173867520
>>> 10 * (2 ** 24) + 93 * (2 ** 16) + 512
173867520
"""
return struct_unpack("!L", inet_aton(addr))[0]
def bin2addr(n):
return inet_ntoa(struct_pack('!L', n))
def bin2addr6(n):
return inet_ntop(AF_INET6, n)
def bin2long6(str):
# Also from pyspf
h, l = struct_unpack("!QQ", str)
return h << 64 | l
# Packing class
class Packer:
" packer base class. supports basic byte/16bit/32bit/addr/string/name "
def __init__(self):
if DNS.LABEL_UTF8:
enc = 'utf8'
else:
enc = DNS.LABEL_ENCODING
self.buf = bytes('', enc)
self.index = {}
def getbuf(self):
return self.buf
def addbyte(self, c):
if len(c) != 1: raise TypeError('one character expected')
if DNS.LABEL_UTF8:
enc = 'utf8'
else:
enc = DNS.LABEL_ENCODING
self.buf = self.buf + bytes(c,enc)
def addbytes(self, abytes):
if DNS.LABEL_UTF8:
enc = 'utf8'
else:
enc = DNS.LABEL_ENCODING
self.buf = self.buf + bytes(abytes, enc)
def add16bit(self, n):
self.buf = self.buf + bytes(pack16bit(n))
def add32bit(self, n):
self.buf = self.buf + bytes(pack32bit(n))
def addaddr(self, addr):
n = addr2bin(addr)
self.buf = self.buf + bytes(pack32bit(n))
def addstring(self, s):
if len(s) > 255:
raise ValueError("Can't encode string of length "+ \
"%s (> 255)"%(len(s)))
self.addbyte(chr(len(s)))
self.addbytes(s)
def addname(self, name):
# Domain name packing (section 4.1.4)
# Add a domain name to the buffer, possibly using pointers.
# The case of the first occurrence of a name is preserved.
# Redundant dots are ignored.
nlist = []
for label in name.split('.'):
if not label:
pass # Passing to ignore redundant dots per comments
else:
nlist.append(label)
keys = []
for i in range(len(nlist)):
key = '.'.join(nlist[i:])
key = key.upper()
keys.append(key)
if key in self.index:
pointer = self.index[key]
break
else:
i = len(nlist)
pointer = None
# Do it into temporaries first so exceptions don't
# mess up self.index and self.buf
offset = len(self.buf)
index = []
if DNS.LABEL_UTF8:
enc = 'utf8'
else:
enc = DNS.LABEL_ENCODING
buf = bytes('', enc)
for j in range(i):
label = nlist[j]
try:
label = label.encode(enc)
except UnicodeEncodeError:
if not DNS.LABEL_UTF8: raise
if not label.startswith('\ufeff'):
label = '\ufeff'+label
label = label.encode(enc)
n = len(label)
if n > 63:
raise PackError('label too long')
if offset + len(buf) < 0x3FFF:
index.append((keys[j], offset + len(buf)))
else:
print('DNS.Lib.Packer.addname:')
print('warning: pointer too big')
buf = buf + bytes([n]) + label
if pointer:
buf = buf + (pack16bit(pointer | 0xC000))
else:
buf = buf + bytes('\0', enc)
self.buf = self.buf + buf
for key, value in index:
self.index[key] = value
def dump(self):
keys = list(self.index.keys())
keys.sort()
print('-'*40)
for key in keys:
print('%20s %3d' % (key, self.index[key]))
print('-'*40)
space = 1
for i in range(0, len(self.buf)+1, 2):
if self.buf[i:i+2] == '**':
if not space: print()
space = 1
continue
space = 0
print('%4d' % i)
for c in self.buf[i:i+2]:
if ' ' < c < '\177':
print(' %c' % c)
else:
print('%2d' % ord(c))
print()
print('-'*40)
# Unpacking class
class Unpacker:
def __init__(self, buf):
# buf should be binary in Python3
self.buf = buf
self.offset = 0
def getbyte(self):
if self.offset >= len(self.buf):
raise UnpackError("Ran off end of data")
c = self.buf[self.offset]
self.offset = self.offset + 1
return c
def getbytes(self, n):
s = (self.buf[self.offset : self.offset + n])
if len(s) != n: raise UnpackError('not enough data left')
self.offset = self.offset + n
return s
def get16bit(self):
return unpack16bit(self.getbytes(2))
def get32bit(self):
return unpack32bit(self.getbytes(4))
def getaddr(self):
if DNS.LABEL_UTF8:
enc = 'utf8'
else:
enc = DNS.LABEL_ENCODING
return bytes(bin2addr(self.get32bit()),enc)
def getaddr6(self):
return (self.getbytes(16))
def getstring(self):
return self.getbytes(self.getbyte())
def getname(self):
# Domain name unpacking (section 4.1.4)
i = self.getbyte()
#i = ord(i)
if i and i & 0xC0 == 0xC0:
d = self.getbyte()
j = d
pointer = ((i<<8) | j) & ~0xC000
save_offset = self.offset
try:
self.offset = pointer
domain = self.getname()
finally:
self.offset = save_offset
return domain
if i == 0:
return ''
if DNS.LABEL_UTF8:
enc = 'utf8'
else:
enc = DNS.LABEL_ENCODING
domain = str(self.getbytes(i), enc)
remains = self.getname()
if not remains:
return domain
else:
return domain + '.' + remains
# Test program for packin/unpacking (section 4.1.4)
def testpacker():
N = 2500
R = list(range(N))
import timing
# See section 4.1.4 of RFC 1035
timing.start()
for i in R:
p = Packer()
p.addaddr('192.168.0.1')
p.addbytes('*' * 20)
p.addname('f.ISI.ARPA')
p.addbytes('*' * 8)
p.addname('Foo.F.isi.arpa')
p.addbytes('*' * 18)
p.addname('arpa')
p.addbytes('*' * 26)
p.addname('')
timing.finish()
print(timing.milli(), "ms total for packing")
print(round(timing.milli() / i, 4), 'ms per packing')
#p.dump()
u = Unpacker(p.buf)
u.getaddr()
u.getbytes(20)
u.getname()
u.getbytes(8)
u.getname()
u.getbytes(18)
u.getname()
u.getbytes(26)
u.getname()
timing.start()
for i in R:
u = Unpacker(p.buf)
res = (u.getaddr(),
u.getbytes(20),
u.getname(),
u.getbytes(8),
u.getname(),
u.getbytes(18),
u.getname(),
u.getbytes(26),
u.getname())
timing.finish()
print(timing.milli(), "ms total for unpacking")
print(round(timing.milli() / i, 4), 'ms per unpacking')
#for item in res: print item
# Pack/unpack RR toplevel format (section 3.2.1)
class RRpacker(Packer):
def __init__(self):
Packer.__init__(self)
self.rdstart = None
def addRRheader(self, name, RRtype, klass, ttl, *rest):
self.addname(name)
self.add16bit(RRtype)
self.add16bit(klass)
self.add32bit(ttl)
if rest:
if rest[1:]: raise TypeError('too many args')
rdlength = rest[0]
else:
rdlength = 0
self.add16bit(rdlength)
self.rdstart = len(self.buf)
def patchrdlength(self):
rdlength = unpack16bit(self.buf[self.rdstart-2:self.rdstart])
if rdlength == len(self.buf) - self.rdstart:
return
rdata = self.buf[self.rdstart:]
save_buf = self.buf
ok = 0
try:
self.buf = self.buf[:self.rdstart-2]
self.add16bit(len(rdata))
self.buf = self.buf + rdata
ok = 1
finally:
if not ok: self.buf = save_buf
def endRR(self):
if self.rdstart is not None:
self.patchrdlength()
self.rdstart = None
def getbuf(self):
if self.rdstart is not None: self.patchrdlength()
return Packer.getbuf(self)
# Standard RRs (section 3.3)
def addCNAME(self, name, klass, ttl, cname):
self.addRRheader(name, Type.CNAME, klass, ttl)
self.addname(cname)
self.endRR()
def addHINFO(self, name, klass, ttl, cpu, os):
self.addRRheader(name, Type.HINFO, klass, ttl)
self.addstring(cpu)
self.addstring(os)
self.endRR()
def addMX(self, name, klass, ttl, preference, exchange):
self.addRRheader(name, Type.MX, klass, ttl)
self.add16bit(preference)
self.addname(exchange)
self.endRR()
def addNS(self, name, klass, ttl, nsdname):
self.addRRheader(name, Type.NS, klass, ttl)
self.addname(nsdname)
self.endRR()
def addPTR(self, name, klass, ttl, ptrdname):
self.addRRheader(name, Type.PTR, klass, ttl)
self.addname(ptrdname)
self.endRR()
def addSOA(self, name, klass, ttl,
mname, rname, serial, refresh, retry, expire, minimum):
self.addRRheader(name, Type.SOA, klass, ttl)
self.addname(mname)
self.addname(rname)
self.add32bit(serial)
self.add32bit(refresh)
self.add32bit(retry)
self.add32bit(expire)
self.add32bit(minimum)
self.endRR()
def addTXT(self, name, klass, ttl, tlist):
self.addRRheader(name, Type.TXT, klass, ttl)
if type(tlist) is bytes or type(tlist) is str:
tlist = [tlist]
for txtdata in tlist:
self.addstring(txtdata)
self.endRR()
def addSPF(self, name, klass, ttl, tlist):
self.addRRheader(name, Type.TXT, klass, ttl)
if type(tlist) is bytes or type(tlist) is str:
tlist = [tlist]
for txtdata in tlist:
self.addstring(txtdata)
self.endRR()
# Internet specific RRs (section 3.4) -- class = IN
def addA(self, name, klass, ttl, address):
self.addRRheader(name, Type.A, klass, ttl)
self.addaddr(address)
self.endRR()
def addWKS(self, name, ttl, address, protocol, bitmap):
self.addRRheader(name, Type.WKS, Class.IN, ttl)
self.addaddr(address)
self.addbyte(chr(protocol))
self.addbytes(bitmap)
self.endRR()
def addSRV(self):
raise NotImplementedError
def prettyTime(seconds):
if seconds<60:
return seconds,"%d seconds"%(seconds)
if seconds<3600:
return seconds,"%d minutes"%(seconds/60)
if seconds<86400:
return seconds,"%d hours"%(seconds/3600)
if seconds<604800:
return seconds,"%d days"%(seconds/86400)
else:
return seconds,"%d weeks"%(seconds/604800)
class RRunpacker(Unpacker):
def __init__(self, buf):
Unpacker.__init__(self, buf)
self.rdend = None
def getRRheader(self):
name = self.getname()
rrtype = self.get16bit()
klass = self.get16bit()
ttl = self.get32bit()
rdlength = self.get16bit()
self.rdend = self.offset + rdlength
return (name, rrtype, klass, ttl, rdlength)
def endRR(self):
if self.offset != self.rdend:
raise UnpackError('end of RR not reached')
def getCNAMEdata(self):
return self.getname()
def getHINFOdata(self):
if DNS.LABEL_UTF8:
enc = 'utf8'
else:
enc = DNS.LABEL_ENCODING
return str(self.getstring(), enc), str(self.getstring(),enc)
def getMXdata(self):
return self.get16bit(), self.getname()
def getNSdata(self):
return self.getname()
def getPTRdata(self):
return self.getname()
def getSOAdata(self):
return self.getname(), \
self.getname(), \
('serial',)+(self.get32bit(),), \
('refresh ',)+prettyTime(self.get32bit()), \
('retry',)+prettyTime(self.get32bit()), \
('expire',)+prettyTime(self.get32bit()), \
('minimum',)+prettyTime(self.get32bit())
def getTXTdata(self):
tlist = []
while self.offset != self.rdend:
tlist.append(bytes(self.getstring()))
return tlist
getSPFdata = getTXTdata
def getAdata(self):
if DNS.LABEL_UTF8:
enc = 'utf8'
else:
enc = DNS.LABEL_ENCODING
return self.getaddr().decode(enc)
def getWKSdata(self):
address = self.getaddr()
protocol = ord(self.getbyte())
bitmap = self.getbytes(self.rdend - self.offset)
return address, protocol, bitmap
def getSRVdata(self):
"""
_Service._Proto.Name TTL Class SRV Priority Weight Port Target
"""
priority = self.get16bit()
weight = self.get16bit()
port = self.get16bit()
target = self.getname()
#print '***priority, weight, port, target', priority, weight, port, target
return priority, weight, port, target
class RRunpackerDefault(RRunpacker):
# Default for DNS.qry
def __init__(self, buf):
RRunpacker.__init__(self, buf)
self.rdend = None
def getAdata(self):
if DNS.LABEL_UTF8:
enc = 'utf8'
else:
enc = DNS.LABEL_ENCODING
x = socket.inet_aton(self.getaddr().decode(enc))
return ipaddress.IPv4Address(struct_unpack("!I", x)[0])
def getAAAAdata(self):
return ipaddress.IPv6Address(bin2addr6(self.getaddr6()))
class RRunpackerText(RRunpackerDefault):
def __init__(self, buf):
RRunpackerDefault.__init__(self, buf)
def getAdata(self):
if DNS.LABEL_UTF8:
enc = 'utf8'
else:
enc = DNS.LABEL_ENCODING
return self.getaddr().decode(enc)
def getAAAAdata(self):
return bin2addr6(self.getaddr6())
def getTXTdata(self):
if DNS.LABEL_UTF8:
enc = 'utf8'
else:
enc = DNS.LABEL_ENCODING
tlist = []
while self.offset != self.rdend:
tlist.append(str(self.getstring(), enc))
return tlist
class RRunpackerInteger(RRunpackerDefault):
def __init__(self, buf):
RRunpackerDefault.__init__(self, buf)
def getAdata(self):
if DNS.LABEL_UTF8:
enc = 'utf8'
else:
enc = DNS.LABEL_ENCODING
x = socket.inet_aton(self.getaddr().decode(enc))
return struct_unpack("!I", x)[0]
def getAAAAdata(self):
return bin2long6(self.getaddr6())
class RRunpackerBinary(Unpacker):
def __init__(self, buf):
Unpacker.__init__(self, buf)
self.rdend = None
def getRRheader(self):
name = self.getname()
rrtype = self.get16bit()
klass = self.get16bit()
ttl = self.get32bit()
rdlength = self.get16bit()
self.rdlength = rdlength
self.rdend = self.offset + rdlength
return (name, rrtype, klass, ttl, rdlength)
def endRR(self):
if self.offset != self.rdend:
raise UnpackError('end of RR not reached')
def getTXTdata(self):
tlist = []
while self.offset != self.rdend:
tlist.append(self.getbytes(self.rdlength))
return tlist
getSPFdata = getTXTdata
# Pack/unpack Message Header (section 4.1)
class Hpacker(Packer):
def addHeader(self, id, qr, opcode, aa, tc, rd, ra, z, rcode,
qdcount, ancount, nscount, arcount):
self.add16bit(id)
self.add16bit((qr&1)<<15 | (opcode&0xF)<<11 | (aa&1)<<10
| (tc&1)<<9 | (rd&1)<<8 | (ra&1)<<7
| (z&7)<<4 | (rcode&0xF))
self.add16bit(qdcount)
self.add16bit(ancount)
self.add16bit(nscount)
self.add16bit(arcount)
class Hunpacker(Unpacker):
def getHeader(self):
id = self.get16bit()
flags = self.get16bit()
qr, opcode, aa, tc, rd, ra, z, rcode = (
(flags>>15)&1,
(flags>>11)&0xF,
(flags>>10)&1,
(flags>>9)&1,
(flags>>8)&1,
(flags>>7)&1,
(flags>>4)&7,
(flags>>0)&0xF)
qdcount = self.get16bit()
ancount = self.get16bit()
nscount = self.get16bit()
arcount = self.get16bit()
return (id, qr, opcode, aa, tc, rd, ra, z, rcode,
qdcount, ancount, nscount, arcount)
# Pack/unpack Question (section 4.1.2)
class Qpacker(Packer):
def addQuestion(self, qname, qtype, qclass):
self.addname(qname)
self.add16bit(qtype)
self.add16bit(qclass)
class Qunpacker(Unpacker):
def getQuestion(self):
return self.getname(), self.get16bit(), self.get16bit()
# Pack/unpack Message(section 4)
# NB the order of the base classes is important for __init__()!
class Mpacker(RRpacker, Qpacker, Hpacker):
pass
class Munpacker(RRunpacker, Qunpacker, Hunpacker):
# Default results for DNS.req
pass
class MunpackerDefault(RRunpackerDefault, Qunpacker, Hunpacker):
# Default results for DNS.qry
pass
class MunpackerText(RRunpackerText, Qunpacker, Hunpacker):
pass
class MunpackerBinary(RRunpackerBinary, Qunpacker, Hunpacker):
pass
class MunpackerInteger(RRunpackerInteger, Qunpacker, Hunpacker):
pass
# Routines to print an unpacker to stdout, for debugging.
# These affect the unpacker's current position!
def dumpM(u):
print('HEADER:')
(id, qr, opcode, aa, tc, rd, ra, z, rcode,
qdcount, ancount, nscount, arcount) = u.getHeader()
print('id=%d,' % id)
print('qr=%d, opcode=%d, aa=%d, tc=%d, rd=%d, ra=%d, z=%d, rcode=%d,' \
% (qr, opcode, aa, tc, rd, ra, z, rcode))
if tc: print('*** response truncated! ***')
if rcode: print('*** nonzero error code! (%d) ***' % rcode)
print(' qdcount=%d, ancount=%d, nscount=%d, arcount=%d' \
% (qdcount, ancount, nscount, arcount))
for i in range(qdcount):
print('QUESTION %d:' % i)
dumpQ(u)
for i in range(ancount):
print('ANSWER %d:' % i)
dumpRR(u)
for i in range(nscount):
print('AUTHORITY RECORD %d:' % i)
dumpRR(u)
for i in range(arcount):
print('ADDITIONAL RECORD %d:' % i)
dumpRR(u)
class DnsResult:
def __init__(self,u,args):
self.header={}
self.questions=[]
self.answers=[]
self.authority=[]
self.additional=[]
self.args=args
self.storeM(u)
def show(self):
import time
print('; <<>> PDG.py 1.0 <<>> %s %s'%(self.args['name'],
self.args['qtype']))
opt=""
if self.args['rd']:
opt=opt+'recurs '
h=self.header
print(';; options: '+opt)
print(';; got answer:')
print(';; ->>HEADER<<- opcode %s, status %s, id %d'%(
h['opcode'],h['status'],h['id']))
flags=list(filter(lambda x,h=h:h[x],('qr','aa','rd','ra','tc')))
print(';; flags: %s; Ques: %d, Ans: %d, Auth: %d, Addit: %d'%(
' '.join(flags),h['qdcount'],h['ancount'],h['nscount'],
h['arcount']))
print(';; QUESTIONS:')
for q in self.questions:
print(';; %s, type = %s, class = %s'%(q['qname'],q['qtypestr'],
q['qclassstr']))
print()
print(';; ANSWERS:')
for a in self.answers:
print('%-20s %-6s %-6s %s'%(a['name'],repr(a['ttl']),a['typename'],
a['data']))
print()
print(';; AUTHORITY RECORDS:')
for a in self.authority:
print('%-20s %-6s %-6s %s'%(a['name'],repr(a['ttl']),a['typename'],
a['data']))
print()
print(';; ADDITIONAL RECORDS:')
for a in self.additional:
print('%-20s %-6s %-6s %s'%(a['name'],repr(a['ttl']),a['typename'],
a['data']))
print()
if 'elapsed' in self.args:
print(';; Total query time: %d msec'%self.args['elapsed'])
print(';; To SERVER: %s'%(self.args['server']))
print(';; WHEN: %s'%time.ctime(time.time()))
def storeM(self,u):
(self.header['id'], self.header['qr'], self.header['opcode'],
self.header['aa'], self.header['tc'], self.header['rd'],
self.header['ra'], self.header['z'], self.header['rcode'],
self.header['qdcount'], self.header['ancount'],
self.header['nscount'], self.header['arcount']) = u.getHeader()
self.header['opcodestr']=Opcode.opcodestr(self.header['opcode'])
self.header['status']=Status.statusstr(self.header['rcode'])
for i in range(self.header['qdcount']):
#print 'QUESTION %d:' % i,
self.questions.append(self.storeQ(u))
for i in range(self.header['ancount']):
#print 'ANSWER %d:' % i,
self.answers.append(self.storeRR(u))
for i in range(self.header['nscount']):
#print 'AUTHORITY RECORD %d:' % i,
self.authority.append(self.storeRR(u))
for i in range(self.header['arcount']):
#print 'ADDITIONAL RECORD %d:' % i,
self.additional.append(self.storeRR(u))
def storeQ(self,u):
q={}
q['qname'], q['qtype'], q['qclass'] = u.getQuestion()
q['qtypestr']=Type.typestr(q['qtype'])
q['qclassstr']=Class.classstr(q['qclass'])
return q
def storeRR(self,u):
r={}
r['name'],r['type'],r['class'],r['ttl'],r['rdlength'] = u.getRRheader()
r['typename'] = Type.typestr(r['type'])
r['classstr'] = Class.classstr(r['class'])
#print 'name=%s, type=%d(%s), class=%d(%s), ttl=%d' \
# % (name,
# type, typename,
# klass, Class.classstr(class),
# ttl)
mname = 'get%sdata' % r['typename']
if hasattr(u, mname):
r['data']=getattr(u, mname)()
else:
r['data']=u.getbytes(r['rdlength'])
return r
def dumpQ(u):
qname, qtype, qclass = u.getQuestion()
print('qname=%s, qtype=%d(%s), qclass=%d(%s)' \
% (qname,
qtype, Type.typestr(qtype),
qclass, Class.classstr(qclass)))
def dumpRR(u):
name, type, klass, ttl, rdlength = u.getRRheader()
typename = Type.typestr(type)
print('name=%s, type=%d(%s), class=%d(%s), ttl=%d' \
% (name,
type, typename,
klass, Class.classstr(klass),
ttl))
mname = 'get%sdata' % typename
if hasattr(u, mname):
print(' formatted rdata:', getattr(u, mname)())
else:
print(' binary rdata:', u.getbytes(rdlength))
if __name__ == "__main__":
testpacker()

30
DNS/Opcode.py Normal file
View File

@ -0,0 +1,30 @@
"""
$Id$
This file is part of the py3dns project.
Homepage: https://launchpad.net/py3dns
This code is covered by the standard Python License. See LICENSE for details.
Opcode values in message header. RFC 1035, 1996, 2136.
"""
QUERY = 0
IQUERY = 1
STATUS = 2
NOTIFY = 4
UPDATE = 5
# Construct reverse mapping dictionary
_names = dir()
opcodemap = {}
for _name in _names:
if _name[0] != '_': opcodemap[eval(_name)] = _name
def opcodestr(opcode):
if opcode in opcodemap: return opcodemap[opcode]
else: return repr(opcode)

41
DNS/Status.py Normal file
View File

@ -0,0 +1,41 @@
"""
$Id$
This file is part of the py3dns project.
Homepage: https://launchpad.net/py3dns
This code is covered by the standard Python License. See LICENSE for details.
Status values in message header
"""
NOERROR = 0 # No Error [RFC 1035]
FORMERR = 1 # Format Error [RFC 1035]
SERVFAIL = 2 # Server Failure [RFC 1035]
NXDOMAIN = 3 # Non-Existent Domain [RFC 1035]
NOTIMP = 4 # Not Implemented [RFC 1035]
REFUSED = 5 # Query Refused [RFC 1035]
YXDOMAIN = 6 # Name Exists when it should not [RFC 2136]
YXRRSET = 7 # RR Set Exists when it should not [RFC 2136]
NXRRSET = 8 # RR Set that should exist does not [RFC 2136]
NOTAUTH = 9 # Server Not Authoritative for zone [RFC 2136]
NOTZONE = 10 # Name not contained in zone [RFC 2136]
BADVERS = 16 # Bad OPT Version [RFC 2671]
BADSIG = 16 # TSIG Signature Failure [RFC 2845]
BADKEY = 17 # Key not recognized [RFC 2845]
BADTIME = 18 # Signature out of time window [RFC 2845]
BADMODE = 19 # Bad TKEY Mode [RFC 2930]
BADNAME = 20 # Duplicate key name [RFC 2930]
BADALG = 21 # Algorithm not supported [RFC 2930]
# Construct reverse mapping dictionary
_names = dir()
statusmap = {}
for _name in _names:
if _name[0] != '_': statusmap[eval(_name)] = _name
def statusstr(status):
if status in statusmap: return statusmap[status]
else: return repr(status)

55
DNS/Type.py Normal file
View File

@ -0,0 +1,55 @@
# -*- encoding: utf-8 -*-
"""
$Id$
This file is part of the py3dns project.
Homepage: https://launchpad.net/py3dns
This code is covered by the standard Python License. See LICENSE for details.
TYPE values (section 3.2.2)
"""
A = 1 # a host address
NS = 2 # an authoritative name server
MD = 3 # a mail destination (Obsolete - use MX)
MF = 4 # a mail forwarder (Obsolete - use MX)
CNAME = 5 # the canonical name for an alias
SOA = 6 # marks the start of a zone of authority
MB = 7 # a mailbox domain name (EXPERIMENTAL)
MG = 8 # a mail group member (EXPERIMENTAL)
MR = 9 # a mail rename domain name (EXPERIMENTAL)
NULL = 10 # a null RR (EXPERIMENTAL)
WKS = 11 # a well known service description
PTR = 12 # a domain name pointer
HINFO = 13 # host information
MINFO = 14 # mailbox or mail list information
MX = 15 # mail exchange
TXT = 16 # text strings
AAAA = 28 # IPv6 AAAA records (RFC 1886)
SRV = 33 # DNS RR for specifying the location of services (RFC 2782)
SPF = 99 # TXT RR for Sender Policy Framework
# Additional TYPE values from host.c source
UNAME = 110
MP = 240
# QTYPE values (section 3.2.3)
AXFR = 252 # A request for a transfer of an entire zone
MAILB = 253 # A request for mailbox-related records (MB, MG or MR)
MAILA = 254 # A request for mail agent RRs (Obsolete - see MX)
ANY = 255 # A request for all records
# Construct reverse mapping dictionary
_names = dir()
typemap = {}
for _name in _names:
if _name[0] != '_': typemap[eval(_name)] = _name
def typestr(type):
if type in typemap: return typemap[type]
else: return repr(type)

38
DNS/__init__.py Normal file
View File

@ -0,0 +1,38 @@
# -*- encoding: utf-8 -*-
# $Id$
#
# This file is part of the py3dns project.
# Homepage: https://launchpad.net/py3dns
#
# Changes for Python3 port © 2011 Scott Kitterman <scott@kitterman.com>
#
# This code is covered by the standard Python License. See LICENSE for details.
# __init__.py for DNS class.
__version__ = '3.2.1'
try:
import ipaddress
except ImportError:
try:
import ipaddr as ipaddress
except ImportError:
raise Exception("py3dns 3.1 requires either ipaddress (python3.3) or ipaddr, see CHANGES for 3.1.0")
from . import Type
from . import Opcode
from . import Status
from . import Class
from .Base import DnsRequest
from .Base import DNSError
from .Lib import DnsResult
from .Base import *
from .Lib import *
Error=DNSError
from .lazy import *
Request = DnsRequest
Result = DnsResult
Base._DiscoverNameServers()

83
DNS/lazy.py Normal file
View File

@ -0,0 +1,83 @@
# $Id$
#
# This file is part of the pydns project.
# Homepage: http://pydns.sourceforge.net
#
# This code is covered by the standard Python License. See LICENSE for details.
#
# routines for lazy people.
from . import Base
from . Base import ServerError
class NoDataError(IndexError): pass
class StatusError(IndexError): pass
def revlookup(name,timeout=30):
"convenience routine for doing a reverse lookup of an address"
if Base.defaults['server'] == []: Base.DiscoverNameServers()
names = revlookupall(name, timeout)
if not names: return None
return names[0] # return shortest name
def revlookupall(name,timeout=30):
"convenience routine for doing a reverse lookup of an address"
# FIXME: check for IPv6
a = name.split('.')
a.reverse()
b = '.'.join(a)+'.in-addr.arpa'
qtype='ptr'
names = dnslookup(b, qtype, timeout)
# this will return all records.
names.sort(key=str.__len__)
return names
def dnslookup(name,qtype,timeout=30):
"convenience routine to return just answer data for any query type"
if Base.defaults['server'] == []: Base.DiscoverNameServers()
result = Base.DnsRequest(name=name, qtype=qtype).req(timeout=timeout)
if result.header['status'] != 'NOERROR':
raise ServerError("DNS query status: %s" % result.header['status'],
result.header['rcode'])
elif len(result.answers) == 0 and Base.defaults['server_rotate']:
# check with next DNS server
result = Base.DnsRequest(name=name, qtype=qtype).req(timeout=timeout)
if result.header['status'] != 'NOERROR':
raise ServerError("DNS query status: %s" % result.header['status'],
result.header['rcode'])
return [x['data'] for x in result.answers]
def mxlookup(name,timeout=30):
"""
convenience routine for doing an MX lookup of a name. returns a
sorted list of (preference, mail exchanger) records
"""
qtype = 'mx'
l = dnslookup(name, qtype, timeout)
return l
#
# $Log$
# Revision 1.5.2.1.2.2 2011/03/23 01:42:07 customdesigned
# Changes from 2.3 branch
#
# Revision 1.5.2.1.2.1 2011/02/18 19:35:22 customdesigned
# Python3 updates from Scott Kitterman
#
# Revision 1.5.2.1 2007/05/22 20:23:38 customdesigned
# Lazy call to DiscoverNameServers
#
# Revision 1.5 2002/05/06 06:14:38 anthonybaxter
# reformat, move import to top of file.
#
# Revision 1.4 2002/03/19 12:41:33 anthonybaxter
# tabnannied and reindented everything. 4 space indent, no tabs.
# yay.
#
# Revision 1.3 2001/08/09 09:08:55 anthonybaxter
# added identifying header to top of each file
#
# Revision 1.2 2001/07/19 06:57:07 anthony
# cvs keywords added
#
#

13
DNS/tests/__init__.py Normal file
View File

@ -0,0 +1,13 @@
import unittest
import importlib
def test_suite():
module_names = [
'.testPackers',
'.test_base'
]
suites = []
for m in module_names:
module = importlib.import_module(m, 'DNS.tests')
suites.append(module.test_suite())
return unittest.TestSuite(suites)

430
DNS/tests/testPackers.py Executable file
View File

@ -0,0 +1,430 @@
#!/usr/bin/python3
#
# Tests of the packet assembler/disassembler routines.
#
# only tests the simple packers for now. next is to test the
# classes: Hpacker/Hunpacker,
# Qpacker/Unpacker, then Mpacker/Munpacker
#
# Start doing unpleasant tests with broken data, truncations, that
# sort of thing.
import sys ; sys.path.insert(0, '..')
import DNS
import socket
import unittest
TestCompleted = "TestCompleted" # exc.
class Int16Packing(unittest.TestCase):
knownValues = ( ( 10, b'\x00\n'),
( 500, b'\x01\xf4' ),
( 5340, b'\x14\xdc' ),
( 51298, b'\xc8b'),
( 65535, b'\xff\xff'),
)
def test16bitPacking(self):
""" pack16bit should give known output for known input """
for i,s in self.knownValues:
result = DNS.Lib.pack16bit(i)
self.assertEqual(s,result)
def test16bitUnpacking(self):
""" unpack16bit should give known output for known input """
for i,s in self.knownValues:
result = DNS.Lib.unpack16bit(s)
self.assertEqual(i,result)
class Int32Packing(unittest.TestCase):
knownValues = ( ( 10, b'\x00\x00\x00\n'),
( 500, b'\x00\x00\x01\xf4' ),
( 5340, b'\x00\x00\x14\xdc' ),
( 51298, b'\x00\x00\xc8b'),
( 65535, b'\x00\x00\xff\xff'),
( 33265535, b'\x01\xfb\x97\x7f' ),
( 147483647, b'\x08\xcak\xff' ),
( 2147483647, b'\x7f\xff\xff\xff' ),
)
def test32bitPacking(self):
""" pack32bit should give known output for known input """
for i,s in self.knownValues:
result = DNS.Lib.pack32bit(i)
self.assertEqual(s,result)
def test32bitUnpacking(self):
""" unpack32bit should give known output for known input """
for i,s in self.knownValues:
result = DNS.Lib.unpack32bit(s)
self.assertEqual(i,result)
class IPaddrPacking(unittest.TestCase):
knownValues = (
('127.0.0.1', 2130706433 ),
('10.99.23.13', 174266125 ),
('192.35.59.45', 3223534381), # Not signed anymore - it's all long now.
('255.255.255.255', 4294967295) # No longer -1
)
def testIPaddrPacking(self):
""" addr2bin should give known output for known input """
for i,s in self.knownValues:
result = DNS.Lib.addr2bin(i)
self.assertEqual(s,result)
def testIPaddrUnpacking(self):
""" bin2addr should give known output for known input """
for i,s in self.knownValues:
result = DNS.Lib.bin2addr(s)
self.assertEqual(i,result)
class PackerClassPacking(unittest.TestCase):
knownPackValues = [
( ['www.ekit.com'], b'\x03www\x04ekit\x03com\x00' ),
( ['ns1.ekorp.com', 'ns2.ekorp.com', 'ns3.ekorp.com'],
b'\x03ns1\x05ekorp\x03com\x00\x03ns2\xc0\x04\x03ns3\xc0\x04'),
( ['a.root-servers.net.', 'b.root-servers.net.',
'c.root-servers.net.', 'd.root-servers.net.',
'e.root-servers.net.', 'f.root-servers.net.'],
b'\x01a\x0croot-servers\x03net\x00\x01b\xc0\x02\x01c\xc0'+
b'\x02\x01d\xc0\x02\x01e\xc0\x02\x01f\xc0\x02' ),
]
knownUnpackValues = [
( ['www.ekit.com'], b'\x03www\x04ekit\x03com\x00' ),
( ['ns1.ekorp.com', 'ns2.ekorp.com', 'ns3.ekorp.com'],
b'\x03ns1\x05ekorp\x03com\x00\x03ns2\xc0\x04\x03ns3\xc0\x04'),
( ['a.root-servers.net', 'b.root-servers.net',
'c.root-servers.net', 'd.root-servers.net',
'e.root-servers.net', 'f.root-servers.net'],
b'\x01a\x0croot-servers\x03net\x00\x01b\xc0\x02\x01c\xc0'+
b'\x02\x01d\xc0\x02\x01e\xc0\x02\x01f\xc0\x02' ),
]
def testPackNames(self):
from DNS.Lib import Packer
for namelist,result in self.knownPackValues:
p = Packer()
for n in namelist:
p.addname(n)
self.assertEqual(p.getbuf(),result)
def testUnpackNames(self):
from DNS.Lib import Unpacker
for namelist,result in self.knownUnpackValues:
u = Unpacker(result)
names = []
for i in range(len(namelist)):
n = u.getname()
names.append(n)
self.assertEqual(names, namelist)
""" def testUnpackerLimitCheck(self):
# FIXME: Don't understand what this test should do. If my guess is right,
# then the code is working ~OK.
from DNS.Lib import Unpacker
u=Unpacker(b'\x03ns1\x05ekorp\x03com\x00\x03ns2\xc0\x04\x03ns3\xc0\x04')
u.getname() ; u.getname() ; u.getname()
# 4th call should fail
self.assertRaises(IndexError, u.getname)"""
class testUnpackingMangled(unittest.TestCase):
"addA(self, name, klass, ttl, address)"
packerCorrect = b'\x05www02\x04ekit\x03com\x00\x00\x01\x00\x01\x00\x01Q\x80\x00\x04\xc0\xa8\n\x02'
def testWithoutRR(self):
u = DNS.Lib.RRunpacker(self.packerCorrect)
u.getAdata()
def testWithTwoRRs(self):
u = DNS.Lib.RRunpacker(self.packerCorrect)
u.getRRheader()
self.assertRaises(DNS.Lib.UnpackError, u.getRRheader)
def testWithNoGetData(self):
u = DNS.Lib.RRunpacker(self.packerCorrect)
u.getRRheader()
self.assertRaises(DNS.Lib.UnpackError, u.endRR)
class PackerTestCase(unittest.TestCase):
" base class for tests of Packing code. Laziness on my part, I know. "
def setUp(self):
self.RRpacker = DNS.Lib.RRpacker
self.RRunpacker = DNS.Lib.RRunpacker
def testPacker(self):
p = self.RRpacker()
check = self.doPack(p)
if (p is not None) and (check is not TestCompleted):
return self.checkPackResult(p)
def checkPackResult(self, buf):
if not hasattr(self, 'packerExpectedResult'):
if self.__class__.__name__ != 'PackerTestCase':
print("P***", self, repr(buf.getbuf())) #cheat testcase
else:
return self.assertEqual(buf.getbuf(),
self.packerExpectedResult)
def checkUnpackResult(self, rrbits, specbits):
if not hasattr(self, 'unpackerExpectedResult'):
if self.__class__.__name__ != 'PackerTestCase':
print("U***", self, repr((rrbits,specbits))) #cheat testcase
else:
return self.assertEqual((rrbits, specbits),
self.unpackerExpectedResult)
def testUnpacker(self):
if self.doUnpack is not None:
if hasattr(self.__class__, 'doUnpack') \
and hasattr(self, 'packerExpectedResult'):
u = self.RRunpacker(self.packerExpectedResult)
rrbits = u.getRRheader()[:4]
specbits = self.doUnpack(u)
try:
u.endRR()
except DNS.Lib.UnpackError:
self.assertEqual(0, 'Not at end of RR!')
return self.checkUnpackResult(rrbits, specbits)
else:
me = self.__class__.__name__
if me != 'PackerTestCase':
self.assertEquals(self.__class__.__name__,
'Unpack NotImplemented')
def doPack(self, p):
" stub. don't test the base class "
return None
def doUnpack(self, p):
" stub. don't test the base class "
return None
class testPackingOfCNAME(PackerTestCase):
"addCNAME(self, name, klass, ttl, cname)"
def doPack(self,p):
p.addCNAME('www.sub.domain', DNS.Class.IN, 3600, 'realhost.sub.domain')
def doUnpack(self, u):
return u.getCNAMEdata()
unpackerExpectedResult = (('www.sub.domain', DNS.Type.CNAME, DNS.Class.IN, 3600), 'realhost.sub.domain')
packerExpectedResult = \
b'\x03www\x03sub\x06domain\x00\x00\x05\x00\x01\x00'+ \
b'\x00\x0e\x10\x00\x0b\x08realhost\xc0\x04'
class testPackingOfCNAME2(PackerTestCase):
"addCNAME(self, name, klass, ttl, cname)"
def doPack(self,p):
p.addCNAME('www.cust.com', DNS.Class.IN, 200, 'www023.big.isp.com')
def doUnpack(self, u):
return u.getCNAMEdata()
unpackerExpectedResult = (('www.cust.com', DNS.Type.CNAME, DNS.Class.IN, 200), 'www023.big.isp.com')
packerExpectedResult = \
b'\x03www\x04cust\x03com\x00\x00\x05\x00\x01\x00'+ \
b'\x00\x00\xc8\x00\x11\x06www023\x03big\x03isp\xc0\t'
class testPackingOfCNAME3(PackerTestCase):
"addCNAME(self, name, klass, ttl, cname)"
def doPack(self,p):
p.addCNAME('www.fred.com', DNS.Class.IN, 86400, 'webhost.loa.com')
def doUnpack(self, u):
return u.getCNAMEdata()
unpackerExpectedResult = (('www.fred.com', DNS.Type.CNAME, DNS.Class.IN, 86400), 'webhost.loa.com')
packerExpectedResult = \
b'\x03www\x04fred\x03com\x00\x00\x05\x00\x01\x00\x01Q'+ \
b'\x80\x00\x0e\x07webhost\x03loa\xc0\t'
class testPackingOfHINFO(PackerTestCase):
"addHINFO(self, name, klass, ttl, cpu, os)"
def doPack(self,p):
p.addHINFO('www.sub.domain.com', DNS.Class.IN, 3600, 'i686', 'linux')
def doUnpack(self, u):
return u.getHINFOdata()
unpackerExpectedResult = (('www.sub.domain.com', 13, 1, 3600), ('i686', 'linux'))
packerExpectedResult = \
b'\x03www\x03sub\x06domain\x03com\x00\x00\r\x00\x01'+ \
b'\x00\x00\x0e\x10\x00\x0b\x04i686\x05linux'
class testPackingOfHINFO2(PackerTestCase):
"addHINFO(self, name, klass, ttl, cpu, os)"
def doPack(self,p):
p.addHINFO('core1.lax.foo.com', DNS.Class.IN, 3600, 'cisco', 'ios')
def doUnpack(self, u):
return u.getHINFOdata()
unpackerExpectedResult = (('core1.lax.foo.com', 13, 1, 3600), ('cisco', 'ios'))
packerExpectedResult = \
b'\x05core1\x03lax\x03foo\x03com\x00\x00\r\x00\x01'+ \
b'\x00\x00\x0e\x10\x00\n\x05cisco\x03ios'
class testPackingOfMX(PackerTestCase):
"addMX(self, name, klass, ttl, preference, exchange)"
def doPack(self, p):
p.addMX('sub.domain.com', DNS.Class.IN, 86400, 10, 'mailhost1.isp.com')
def doUnpack(self, u):
return u.getMXdata()
packerExpectedResult = \
b'\x03sub\x06domain\x03com\x00\x00\x0f\x00\x01'+ \
b'\x00\x01Q\x80\x00\x12\x00\n\tmailhost1\x03isp\xc0\x0b'
unpackerExpectedResult = (('sub.domain.com', 15, 1, 86400), (10, 'mailhost1.isp.com'))
class testPackingOfMX2(PackerTestCase):
"addMX(self, name, klass, ttl, preference, exchange)"
def doPack(self, p):
p.addMX('ekit-inc.com.', DNS.Class.IN, 86400, 10, 'mx1.ekorp.com')
p.addMX('ekit-inc.com.', DNS.Class.IN, 86400, 20, 'mx2.ekorp.com')
p.addMX('ekit-inc.com.', DNS.Class.IN, 86400, 30, 'mx3.ekorp.com')
def doUnpack(self, u):
res = [u.getMXdata(),]
dummy = u.getRRheader()[:4]
res += u.getMXdata()
dummy = u.getRRheader()[:4]
res += u.getMXdata()
return res
unpackerExpectedResult = (('ekit-inc.com', 15, 1, 86400), [(10, 'mx1.ekorp.com'), 20, 'mx2.ekorp.com', 30, 'mx3.ekorp.com'])
packerExpectedResult = \
b'\x08ekit-inc\x03com\x00\x00\x0f\x00\x01\x00\x01Q\x80\x00'+\
b'\x0e\x00\n\x03mx1\x05ekorp\xc0\t\x00\x00\x0f\x00\x01\x00'+\
b'\x01Q\x80\x00\x08\x00\x14\x03mx2\xc0\x1e\x00\x00\x0f\x00'+\
b'\x01\x00\x01Q\x80\x00\x08\x00\x1e\x03mx3\xc0\x1e'
class testPackingOfNS(PackerTestCase):
"addNS(self, name, klass, ttl, nsdname)"
def doPack(self, p):
p.addNS('ekit-inc.com', DNS.Class.IN, 86400, 'ns1.ekorp.com')
def doUnpack(self, u):
return u.getNSdata()
unpackerExpectedResult = (('ekit-inc.com', 2, 1, 86400), 'ns1.ekorp.com')
packerExpectedResult = b'\x08ekit-inc\x03com\x00\x00\x02\x00\x01\x00\x01Q\x80\x00\x0c\x03ns1\x05ekorp\xc0\t'
class testPackingOfPTR(PackerTestCase):
"addPTR(self, name, klass, ttl, ptrdname)"
def doPack(self, p):
p.addPTR('www.ekit-inc.com', DNS.Class.IN, 3600, 'www-real01.ekorp.com')
def doUnpack(self, u):
return u.getPTRdata()
unpackerExpectedResult = (('www.ekit-inc.com', 12, 1, 3600), 'www-real01.ekorp.com')
packerExpectedResult = b'\x03www\x08ekit-inc\x03com\x00\x00\x0c\x00\x01\x00\x00\x0e\x10\x00\x13\nwww-real01\x05ekorp\xc0\r'
class testPackingOfSOA(PackerTestCase):
"""addSOA(self, name, klass, ttl, mname,
rname, serial, refresh, retry, expire, minimum)"""
def doPack(self, p):
p.addSOA('ekit-inc.com', DNS.Class.IN, 3600, 'ns1.ekorp.com',
'hostmaster.ekit-inc.com', 2002020301, 100, 200, 300, 400)
def doUnpack(self, u):
return u.getSOAdata()
unpackerExpectedResult = (('ekit-inc.com', 6, 1, 3600), ('ns1.ekorp.com', 'hostmaster', ('serial', 2002020301), ('refresh ', 100, '1 minutes'), ('retry', 200, '3 minutes'), ('expire', 300, '5 minutes'), ('minimum', 400, '6 minutes')))
packerExpectedResult = b'\x08ekit-inc\x03com\x00\x00\x06\x00\x01\x00\x00\x0e\x10\x00,\x03ns1\x05ekorp\xc0\t\nhostmaster\x00wTg\xcd\x00\x00\x00d\x00\x00\x00\xc8\x00\x00\x01,\x00\x00\x01\x90'
class testPackingOfA(PackerTestCase):
"addA(self, name, klass, ttl, address)"
def doPack(self, p):
p.addA('www02.ekit.com', DNS.Class.IN, 86400, '192.168.10.2')
def doUnpack(self, u):
return u.getAdata()
unpackerExpectedResult = (('www02.ekit.com', 1, 1, 86400), '192.168.10.2')
packerExpectedResult = b'\x05www02\x04ekit\x03com\x00\x00\x01\x00\x01\x00\x01Q\x80\x00\x04\xc0\xa8\n\x02'
class testPackingOfA2(PackerTestCase):
"addA(self, name, ttl, address)"
def doPack(self, p):
p.addA('www.ekit.com', DNS.Class.IN, 86400, '10.98.1.0')
def doUnpack(self, u):
return u.getAdata()
unpackerExpectedResult = (('www.ekit.com', 1, 1, 86400), '10.98.1.0')
packerExpectedResult = b'\x03www\x04ekit\x03com\x00\x00\x01\x00\x01\x00\x01Q\x80\x00\x04\nb\x01\x00'
class testPackingOfA3(PackerTestCase):
"addA(self, name, ttl, address)"
def doPack(self, p):
p.addA('www.zol.com', DNS.Class.IN, 86400, '192.168.10.4')
p.addA('www.zol.com', DNS.Class.IN, 86400, '192.168.10.3')
p.addA('www.zol.com', DNS.Class.IN, 86400, '192.168.10.2')
p.addA('www.zol.com', DNS.Class.IN, 86400, '192.168.10.1')
def doUnpack(self, u):
u1,d1,u2,d2,u3,d3,u4=u.getAdata(),u.getRRheader(),u.getAdata(),u.getRRheader(),u.getAdata(),u.getRRheader(),u.getAdata()
return u1,u2,u3,u4
unpackerExpectedResult = (('www.zol.com', 1, 1, 86400), ('192.168.10.4', '192.168.10.3', '192.168.10.2', '192.168.10.1'))
packerExpectedResult = b'\x03www\x03zol\x03com\x00\x00\x01\x00\x01\x00\x01Q\x80\x00\x04\xc0\xa8\n\x04\x00\x00\x01\x00\x01\x00\x01Q\x80\x00\x04\xc0\xa8\n\x03\x00\x00\x01\x00\x01\x00\x01Q\x80\x00\x04\xc0\xa8\n\x02\x00\x00\x01\x00\x01\x00\x01Q\x80\x00\x04\xc0\xa8\n\x01'
class testPackingOfTXT(PackerTestCase):
"addTXT(self, name, klass, ttl, list)"
def doPack(self, p):
p.addTXT('ekit-inc.com', DNS.Class.IN, 3600, 'this is a text record')
def doUnpack(self, u):
return u.getTXTdata()
packerExpectedResult = b'\x08ekit-inc\x03com\x00\x00\x10\x00\x01\x00\x00\x0e\x10\x00\x16\x15this is a text record'
unpackerExpectedResult = (('ekit-inc.com', 16, 1, 3600), [b'this is a text record'])
# check what the maximum/minimum &c of TXT records are.
class testPackingOfTXT2(PackerTestCase):
"addTXT(self, name, klass, ttl, list)"
def doPack(self, p):
f = lambda p=p:p.addTXT('ekit-inc.com', DNS.Class.IN, 3600, 'the quick brown fox jumped over the lazy brown dog\n'*20)
self.assertRaises(ValueError, f)
return TestCompleted
doUnpack = None
class testPackingOfAAAAText(PackerTestCase):
"addAAAA(self, name, klass, ttl, address)"
def setUp(self):
self.RRpacker = DNS.Lib.RRpacker
self.RRunpacker = DNS.Lib.RRunpackerText
def doPack(self, p):
addAAAA(p, 'google.com', DNS.Class.IN, 4, '2607:f8b0:4005:802::1005')
def doUnpack(self, u):
r = u.getAAAAdata()
return r
packerExpectedResult = b'\x06google\x03com\x00\x00\x1c\x00\x01\x00\x00\x00\x04\x00\x10&\x07\xf8\xb0@\x05\x08\x02\x00\x00\x00\x00\x00\x00\x10\x05'
unpackerExpectedResult = (('google.com', DNS.Type.AAAA, DNS.Class.IN, 4), '2607:f8b0:4005:802::1005')
class testPackingOfAAAABinary(PackerTestCase):
"addAAAA(self, name, klass, ttl, address)"
def setUp(self):
self.RRpacker = DNS.Lib.RRpacker
self.RRunpacker = DNS.Lib.RRunpackerBinary
def doPack(self, p):
addAAAA(p, 'google.com', DNS.Class.IN, 4, '2607:f8b0:4005:802::1005')
def doUnpack(self, u):
self.assertFalse(hasattr(u, "getAAAAdata"))
r = u.getbytes(16)
return r
packerExpectedResult = b'\x06google\x03com\x00\x00\x1c\x00\x01\x00\x00\x00\x04\x00\x10&\x07\xf8\xb0@\x05\x08\x02\x00\x00\x00\x00\x00\x00\x10\x05'
unpackerExpectedResult = (('google.com', DNS.Type.AAAA, DNS.Class.IN, 4), b'&\x07\xf8\xb0@\x05\x08\x02\x00\x00\x00\x00\x00\x00\x10\x05')
class testPackingOfAAAAInteger(PackerTestCase):
"addAAAA(self, name, klass, ttl, address)"
def setUp(self):
self.RRpacker = DNS.Lib.RRpacker
self.RRunpacker = DNS.Lib.RRunpackerInteger
def doPack(self, p):
addAAAA(p, 'google.com', DNS.Class.IN, 4, '2607:f8b0:4005:802::1005')
def doUnpack(self, u):
r = u.getAAAAdata()
return r
packerExpectedResult = b'\x06google\x03com\x00\x00\x1c\x00\x01\x00\x00\x00\x04\x00\x10&\x07\xf8\xb0@\x05\x08\x02\x00\x00\x00\x00\x00\x00\x10\x05'
unpackerExpectedResult = (('google.com', DNS.Type.AAAA, DNS.Class.IN, 4), 50552053919387978162022445795852161029)
def addAAAA(p, name, klass, ttl, address):
"""Add AAAA record to a packer.
"""
addr_buf = socket.inet_pton(socket.AF_INET6, address)
p.addRRheader(name, DNS.Type.AAAA, klass, ttl)
p.buf = p.buf + addr_buf
p.endRR()
return p
#class testPackingOfQuestion(PackerTestCase):
# "addQuestion(self, qname, qtype, qclass)"
# def doPack(self, p):
# self.assertEquals(0,"NotImplemented")
def test_suite():
from unittest import TestLoader
return TestLoader().loadTestsFromName(__name__)
if __name__ == "__main__":
unittest.main()

260
DNS/tests/test_base.py Normal file
View File

@ -0,0 +1,260 @@
#!/usr/bin/python3
# -*- coding: utf-8 -*-
import DNS
import unittest
try:
import ipaddress
except ImportError:
import ipaddr as ipaddress
def assertIsByte(b):
assert b >= 0
assert b <= 255
class TestBase(unittest.TestCase):
def testParseResolvConf(self):
# reset elments set by Base._DiscoverNameServers
DNS.defaults['server'] = []
if 'domain' in DNS.defaults:
del DNS.defaults['domain']
self.assertEqual(len(DNS.defaults['server']), 0)
resolv = ['# a comment',
'domain example.org',
'nameserver 127.0.0.1',
]
DNS.ParseResolvConfFromIterable(resolv)
self.assertEqual(len(DNS.defaults['server']), 1)
self.assertEqual(DNS.defaults['server'][0], '127.0.0.1')
self.assertEqual(DNS.defaults['domain'], 'example.org')
def testDnsRequestA(self):
# try with asking for strings, and asking for bytes
dnsobj = DNS.DnsRequest('example.org')
a_response = dnsobj.qry(qtype='A', resulttype='text', timeout=1)
self.assertTrue(a_response.answers)
# is the result vaguely ipv4 like?
self.assertEqual(a_response.answers[0]['data'].count('.'), 3)
self.assertEqual(a_response.answers[0]['data'],'93.184.216.34')
# Default result type for .qry object is an ipaddress object
ad_response = dnsobj.qry(qtype='A', timeout=1)
self.assertTrue(ad_response.answers)
self.assertEqual(ad_response.answers[0]['data'],ipaddress.IPv4Address('93.184.216.34'))
ab_response = dnsobj.qry(qtype='A', resulttype='binary', timeout=1)
self.assertTrue(ab_response.answers)
# is the result ipv4 binary like?
self.assertEqual(len(ab_response.answers[0]['data']), 4)
for b in ab_response.answers[0]['data']:
assertIsByte(b)
self.assertEqual(ab_response.answers[0]['data'],b']\xb8\xd8\"')
ai_response = dnsobj.qry(qtype='A', resulttype='integer', timeout=1)
self.assertTrue(ai_response.answers)
self.assertEqual(ai_response.answers[0]['data'],1572395042)
def testDnsRequestAAAA(self):
dnsobj = DNS.DnsRequest('example.org')
aaaa_response = dnsobj.qry(qtype='AAAA', resulttype='text', timeout=1)
self.assertTrue(aaaa_response.answers)
# does the result look like an ipv6 address?
self.assertTrue(':' in aaaa_response.answers[0]['data'])
self.assertEqual(aaaa_response.answers[0]['data'],'2606:2800:220:1:248:1893:25c8:1946')
# default is returning ipaddress object
aaaad_response = dnsobj.qry(qtype='AAAA', timeout=1)
self.assertTrue(aaaad_response.answers)
self.assertEqual(aaaad_response.answers[0]['data'],ipaddress.IPv6Address('2606:2800:220:1:248:1893:25c8:1946'))
aaaab_response = dnsobj.qry(qtype='AAAA', resulttype='binary', timeout=1)
self.assertTrue(aaaab_response.answers)
# is it ipv6 looking?
self.assertEqual(len(aaaab_response.answers[0]['data']) , 16)
for b in aaaab_response.answers[0]['data']:
assertIsByte(b)
self.assertEqual(aaaab_response.answers[0]['data'],b'&\x06(\x00\x02 \x00\x01\x02H\x18\x93%\xc8\x19F')
# IPv6 decimal
aaaai_response = dnsobj.qry(qtype='AAAA', resulttype='integer', timeout=1)
self.assertTrue(aaaai_response.answers)
self.assertEqual(aaaai_response.answers[0]['data'], 50542628918019813867414319910101719366)
def testDnsRequestEmptyMX(self):
dnsobj = DNS.DnsRequest('example.org')
mx_empty_response = dnsobj.qry(qtype='MX', timeout=1)
self.assertFalse(mx_empty_response.answers)
def testDnsRequestMX(self):
dnsobj = DNS.DnsRequest('ietf.org')
mx_response = dnsobj.qry(qtype='MX', timeout=1)
self.assertTrue(mx_response.answers[0])
# is hard coding a remote address a good idea?
# I think it's unavoidable. - sk
self.assertEqual(mx_response.answers[0]['data'], (0, 'mail.ietf.org'))
m = DNS.mxlookup('ietf.org', timeout=1)
self.assertEqual(mx_response.answers[0]['data'], m[0])
def testDnsRequestSrv(self):
dnsobj = DNS.Request(qtype='srv')
respdef = dnsobj.qry('_ldap._tcp.openldap.org', timeout=1)
self.assertTrue(respdef.answers)
data = respdef.answers[0]['data']
self.assertEqual(len(data), 4)
self.assertEqual(data[2], 389)
self.assertTrue('openldap.org' in data[3])
def testDkimRequest(self):
q = '20161025._domainkey.google.com'
dnsobj = DNS.Request(q, qtype='txt')
resp = dnsobj.qry(timeout=1)
self.assertTrue(resp.answers)
# should the result be bytes or a string? (Bytes, we finally settled on bytes)
data = resp.answers[0]['data']
self.assertFalse(isinstance(data[0], str))
self.assertTrue(data[0].startswith(b'k=rsa'))
def testDNSRequestTXT(self):
dnsobj = DNS.DnsRequest('fail.kitterman.org')
respdef = dnsobj.qry(qtype='TXT', timeout=1)
self.assertTrue(respdef.answers)
data = respdef.answers[0]['data']
self.assertEqual(data, [b'v=spf1 -all'])
resptext = dnsobj.qry(qtype='TXT', resulttype='text', timeout=1)
self.assertTrue(resptext.answers)
data = resptext.answers[0]['data']
self.assertEqual(data, ['v=spf1 -all'])
respbin = dnsobj.qry(qtype='TXT', resulttype='binary', timeout=1)
self.assertTrue(respbin.answers)
data = respbin.answers[0]['data']
self.assertEqual(data, [b'\x0bv=spf1 -all'])
def testIDN(self):
"""Can we lookup an internationalized domain name?"""
dnsobj = DNS.DnsRequest('xn--bb-eka.at')
unidnsobj = DNS.DnsRequest('öbb.at')
a_resp = dnsobj.qry(qtype='A', resulttype='text', timeout=1)
ua_resp = unidnsobj.qry(qtype='A', resulttype='text', timeout=1)
self.assertTrue(a_resp.answers)
self.assertTrue(ua_resp.answers)
self.assertEqual(ua_resp.answers[0]['data'],
a_resp.answers[0]['data'])
def testNS(self):
"""Lookup NS record from SOA"""
dnsob = DNS.DnsRequest('kitterman.com')
resp = dnsob.qry(qtype='SOA', timeout=1)
self.assertTrue(resp.answers)
primary = resp.answers[0]['data'][0]
self.assertEqual(primary, 'ns1.pairnic.com')
resp = dnsob.qry(qtype='NS',server=primary,aa=1)
nslist = [x['data'].lower() for x in resp.answers]
nslist.sort()
self.assertEqual(nslist, ['ns1.pairnic.com', 'ns2.pairnic.com'])
# Test defaults with legacy DNS.req
def testDnsRequestAD(self):
# try with asking for strings, and asking for bytes
dnsob = DNS.DnsRequest('example.org')
ad_response = dnsob.req(qtype='A', timeout=1)
self.assertTrue(ad_response.answers)
# is the result vaguely ipv4 like?
self.assertEqual(ad_response.answers[0]['data'].count('.'), 3)
self.assertEqual(ad_response.answers[0]['data'],'93.184.216.34')
def testDnsRequestAAAAD(self):
dnsob = DNS.DnsRequest('example.org')
# default is returning binary instead of text
aaaad_response = dnsob.req(qtype='AAAA', timeout=1)
self.assertTrue(aaaad_response.answers)
# does the result look like a binary ipv6 address?
self.assertEqual(len(aaaad_response.answers[0]['data']) , 16)
for b in aaaad_response.answers[0]['data']:
assertIsByte(b)
self.assertEqual(aaaad_response.answers[0]['data'],b'&\x06(\x00\x02 \x00\x01\x02H\x18\x93%\xc8\x19F')
def testDnsRequestEmptyMXD(self):
dnsob = DNS.DnsRequest('example.org')
mx_empty_response = dnsob.req(qtype='MX', timeout=1)
self.assertFalse(mx_empty_response.answers)
def testDnsRequestMXD(self):
dnsob = DNS.DnsRequest('ietf.org')
mx_response = dnsob.req(qtype='MX', timeout=1)
self.assertTrue(mx_response.answers[0])
# is hard coding a remote address a good idea?
# I think it's unavoidable. - sk
self.assertEqual(mx_response.answers[0]['data'], (0, 'mail.ietf.org'))
m = DNS.mxlookup('ietf.org', timeout=1)
self.assertEqual(mx_response.answers[0]['data'], m[0])
def testDnsRequestSrvD(self):
dnsob = DNS.Request(qtype='srv')
respdef = dnsob.req('_ldap._tcp.openldap.org', timeout=1)
self.assertTrue(respdef.answers)
data = respdef.answers[0]['data']
self.assertEqual(len(data), 4)
self.assertEqual(data[2], 389)
self.assertTrue('openldap.org' in data[3])
def testDkimRequestD(self):
q = '20161025._domainkey.google.com'
dnsob = DNS.Request(q, qtype='txt')
resp = dnsob.req(timeout=1)
self.assertTrue(resp.answers)
# should the result be bytes or a string? (Bytes, we finally settled on bytes)
data = resp.answers[0]['data']
self.assertFalse(isinstance(data[0], str))
self.assertTrue(data[0].startswith(b'k=rsa'))
def testDNSRequestTXTD(self):
dnsob = DNS.DnsRequest('fail.kitterman.org')
respdef = dnsob.req(qtype='TXT', timeout=1)
self.assertTrue(respdef.answers)
data = respdef.answers[0]['data']
self.assertEqual(data, [b'v=spf1 -all'])
def testIDND(self):
"""Can we lookup an internationalized domain name?"""
dnsob = DNS.DnsRequest('xn--bb-eka.at')
unidnsob = DNS.DnsRequest('öbb.at')
a_resp = dnsob.req(qtype='A', resulttype='text', timeout=1)
ua_resp = unidnsob.req(qtype='A', resulttype='text', timeout=1)
self.assertTrue(a_resp.answers)
self.assertTrue(ua_resp.answers)
self.assertEqual(ua_resp.answers[0]['data'],
a_resp.answers[0]['data'])
def testNSD(self):
"""Lookup NS record from SOA"""
dnsob = DNS.DnsRequest('kitterman.com')
resp = dnsob.req(qtype='SOA', timeout=1)
self.assertTrue(resp.answers)
primary = resp.answers[0]['data'][0]
self.assertEqual(primary, 'ns1.pairnic.com')
resp = dnsob.req(qtype='NS',server=primary,aa=1, timeout=1)
nslist = [x['data'].lower() for x in resp.answers]
nslist.sort()
self.assertEqual(nslist, ['ns1.pairnic.com', 'ns2.pairnic.com'])
def test_suite():
from unittest import TestLoader
return TestLoader().loadTestsFromName(__name__)
if __name__ == "__main__":
unittest.main()

118
DNS/win32dns.py Normal file
View File

@ -0,0 +1,118 @@
"""
$Id$
Extract a list of TCP/IP name servers from the registry 0.1
0.1 Strobl 2001-07-19
Usage:
RegistryResolve() returns a list of ip numbers (dotted quads), by
scouring the registry for addresses of name servers
Tested on Windows NT4 Server SP6a, Windows 2000 Pro SP2 and
Whistler Pro (XP) Build 2462 and Windows ME
... all having a different registry layout wrt name servers :-/
Todo:
Program doesn't check whether an interface is up or down
(c) 2001 Copyright by Wolfgang Strobl ws@mystrobl.de,
License analog to the current Python license
WARNING: Python3 port completely untested on Windows.
"""
import re
import winreg
def binipdisplay(s):
"convert a binary array of ip adresses to a python list"
if len(s)%4!= 0:
raise EnvironmentError # well ...
ol=[]
for i in range(len(s)/4):
s1=s[:4]
s=s[4:]
ip=[]
for j in s1:
ip.append(str(ord(j)))
ol.append('.'.join(ip))
return ol
def stringdisplay(s):
'''convert "d.d.d.d,d.d.d.d" to ["d.d.d.d","d.d.d.d"].
also handle u'd.d.d.d d.d.d.d', as reporting on SF
'''
import re
return list(map(str, re.split("[ ,]",s)))
def RegistryResolve():
nameservers=[]
x=winreg.ConnectRegistry(None,winreg.HKEY_LOCAL_MACHINE)
try:
y= winreg.OpenKey(x,
r"SYSTEM\CurrentControlSet\Services\Tcpip\Parameters")
except EnvironmentError: # so it isn't NT/2000/XP
# windows ME, perhaps?
try: # for Windows ME
y= winreg.OpenKey(x,
r"SYSTEM\CurrentControlSet\Services\VxD\MSTCP")
nameserver,dummytype=winreg.QueryValueEx(y,'NameServer')
if nameserver and not (nameserver in nameservers):
nameservers.extend(stringdisplay(nameserver))
except EnvironmentError:
pass
return nameservers # no idea
try:
nameserver = winreg.QueryValueEx(y, "DhcpNameServer")[0].split()
except:
nameserver = winreg.QueryValueEx(y, "NameServer")[0].split()
if nameserver:
nameservers=nameserver
nameserver = winreg.QueryValueEx(y,"NameServer")[0]
winreg.CloseKey(y)
try: # for win2000
y= winreg.OpenKey(x,
r"SYSTEM\CurrentControlSet\Services\Tcpip\Parameters\DNSRegisteredAdapters")
for i in range(1000):
try:
n=winreg.EnumKey(y,i)
z=winreg.OpenKey(y,n)
dnscount,dnscounttype=winreg.QueryValueEx(z,
'DNSServerAddressCount')
dnsvalues,dnsvaluestype=winreg.QueryValueEx(z,
'DNSServerAddresses')
nameservers.extend(binipdisplay(dnsvalues))
winreg.CloseKey(z)
except EnvironmentError:
break
winreg.CloseKey(y)
except EnvironmentError:
pass
#
try: # for whistler
y= winreg.OpenKey(x,
r"SYSTEM\CurrentControlSet\Services\Tcpip\Parameters\Interfaces")
for i in range(1000):
try:
n=winreg.EnumKey(y,i)
z=winreg.OpenKey(y,n)
try:
nameserver,dummytype=winreg.QueryValueEx(z,'NameServer')
if nameserver and not (nameserver in nameservers):
nameservers.extend(stringdisplay(nameserver))
except EnvironmentError:
pass
winreg.CloseKey(z)
except EnvironmentError:
break
winreg.CloseKey(y)
except EnvironmentError:
#print "Key Interfaces not found, just do nothing"
pass
#
winreg.CloseKey(x)
return nameservers
if __name__=="__main__":
print("Name servers:",RegistryResolve())

69
LICENSE Normal file
View File

@ -0,0 +1,69 @@
PYDNS is Copyright 2000-2014 by Guido van Rossum,
Michael Ströder <stroeder@users.sourceforge.net>,
Anthony Baxter <anthony@interlink.com.au>,
Stuart Gathman <stuart@bmsi.com>,
and Scott Kitterman <scott@kitterman.com>
This code is released under the following Python-style license:
CNRI LICENSE AGREEMENT FOR PYDNS-2.3.5
1. This LICENSE AGREEMENT is between the Corporation for National Research
Initiatives, having an office at 1895 Preston White Drive, Reston, VA 20191
(“CNRI”), and the Individual or Organization (“Licensee”) accessing and
otherwise using pydns-2.3.5 software in source or binary form and its
associated documentation.
2. Subject to the terms and conditions of this License Agreement, CNRI
hereby grants Licensee a nonexclusive, royalty-free, world-wide license to
reproduce, analyze, test, perform and/or display publicly, prepare derivative
works, distribute, and otherwise use pydns-2.3.5 alone or in any derivative
version, provided, however, that CNRIs License Agreement and CNRIs notice of
copyright, i.e., “Copyright © 1995-2001 Corporation for National Research
Initiatives; All Rights Reserved” are retained in pydns-2.3.5 alone or in any
derivative version prepared by Licensee. Alternately, in lieu of CNRIs License
Agreement, Licensee may substitute the following text (omitting the quotes):
“pydns-2.3.5 is made available subject to the terms and conditions in CNRIs
License Agreement. This Agreement together with pydns-2.3.5 may be located on
the Internet using the following unique, persistent identifier (known as a
handle): 1895.22/1013. This Agreement may also be obtained from a proxy server
on the Internet using the following URL: http://hdl.handle.net/1895.22/1013.”
3. In the event Licensee prepares a derivative work that is based on or
incorporates pydns-2.3.5 or any part thereof, and wants to make the derivative
work available to others as provided herein, then Licensee hereby agrees to
include in any such work a brief summary of the changes made to pydns-2.3.5.
4. CNRI is making pydns-2.3.5 available to Licensee on an “AS IS” basis.
CNRI MAKES NO REPRESENTATIONS OR WARRANTIES, EXPRESS OR IMPLIED. BY WAY OF
EXAMPLE, BUT NOT LIMITATION, CNRI MAKES NO AND DISCLAIMS ANY REPRESENTATION OR
WARRANTY OF MERCHANTABILITY OR FITNESS FOR ANY PARTICULAR PURPOSE OR THAT THE
USE OF PYTHON 1.6.1 WILL NOT INFRINGE ANY THIRD PARTY RIGHTS.
5. CNRI SHALL NOT BE LIABLE TO LICENSEE OR ANY OTHER USERS OF PYTHON 1.6.1
FOR ANY INCIDENTAL, SPECIAL, OR CONSEQUENTIAL DAMAGES OR LOSS AS A RESULT OF
MODIFYING, DISTRIBUTING, OR OTHERWISE USING PYTHON 1.6.1, OR ANY DERIVATIVE
THEREOF, EVEN IF ADVISED OF THE POSSIBILITY THEREOF.
6. This License Agreement will automatically terminate upon a material
breach of its terms and conditions.
7. This License Agreement shall be governed by the federal intellectual
property law of the United States, including without limitation the federal
copyright law, and, to the extent such U.S. federal law does not apply, by the
law of the Commonwealth of Virginia, excluding Virginias conflict of law
provisions. Notwithstanding the foregoing, with regard to derivative works
based on pydns-2.3.5 that incorporate non-separable material that was
previously distributed under the GNU General Public License (GPL), the law of
the Commonwealth of Virginia shall govern this License Agreement only as to
issues arising under or with respect to Paragraphs 4, 5, and 7 of this License
Agreement. Nothing in this License Agreement shall be deemed to create any
relationship of agency, partnership, or joint venture between CNRI and
Licensee. This License Agreement does not grant permission to use CNRI
trademarks or trade name in a trademark sense to endorse or promote products or
services of Licensee, or any third party.
8. By clicking on the “ACCEPT” button where indicated, or by copying,
installing or otherwise using pydns-2.3.5, Licensee agrees to be bound by the
terms and conditions of this License Agreement.

9
MANIFEST.in Normal file
View File

@ -0,0 +1,9 @@
recursive-include DNS *.py
recursive-include tools *.py
recursive-include tests *.py
include LICENSE
include CHANGES
include MANIFEST.in
include *.txt
include setup.*
include test.py

23
PKG-INFO Normal file
View File

@ -0,0 +1,23 @@
Metadata-Version: 1.2
Name: py3dns
Version: 3.2.1
Summary: Python 3 DNS library
Home-page: https://launchpad.net/py3dns
Author: Anthony Baxter and others
Author-email: py3dns-hackers@lists.launchpad.net
Maintainer: Scott Kitterman
Maintainer-email: scott@kitterman.com
License: Python License
Description: Python 3 DNS library:
Keywords: DNS
Platform: UNKNOWN
Classifier: Development Status :: 5 - Production/Stable
Classifier: Environment :: No Input/Output (Daemon)
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: Python License (CNRI Python License)
Classifier: Natural Language :: English
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3
Classifier: Topic :: Internet :: Name Service (DNS)
Classifier: Topic :: Software Development :: Libraries :: Python Modules

12
README-guido.txt Normal file
View File

@ -0,0 +1,12 @@
This directory contains a module (dnslib) that implements a DNS
(Domain Name Server) client, plus additional modules that define some
symbolic constants used by DNS (dnstype, dnsclass, dnsopcode).
Type "python dnslib.py -/" for a usage message.
You can also import dnslib and write your own, more sophisticated
client code; use the test program as an example (there is currently no
documentation :-).
--Guido van Rossum, CWI, Amsterdam <Guido.van.Rossum@cwi.nl>
URL: <http://www.cwi.nl/cwi/people/Guido.van.Rossum.html>

146
README.txt Normal file
View File

@ -0,0 +1,146 @@
Release 3.2.0 Mon Jul 23 2018
Switched from distutils to setuptools because "it's the future". It is
unlikely to have end user impact. For python3.3+ no additional dependencies
are required.
Release 3.1.0 Thu Apr 24 23:52:00 EDT 2014
More choices about result types are provided in 3.1.0. To specify resulttype,
in a DnsRequest object, use the new function DnsRequest.qry
(resulttype='binary/text/default'). DnsRequest.qry returns ipaddress objects
for A and AAAA queries by defaults. Other defaults are the same as
DnsRequest.req. Continue to use DnsRequest.req for exact backward
compatibility with pydns and older py3dns defaults. TXT and SPF record data
are returned as strings by default, this matches what dnspython3 returns.
The ipaddress module is used internally now. See CHANGES for details.
Release 3.0.3 Wed May 29 00:05:00 EDT 2013
There was a third, unintended incompatiblity in 3.0.2 in that IPv6 addresses
were returned in their string format rather than their decimal format. This
breaks pyspf queries when the connect IP is IPv6. 3.0.3 is a release strictly
to revert this change.
Release 3.0.2 Thu Jan 19 01:25:00 EST 2012
This release introduces two potentially incompatible changes from the python
verion of DNS (pydns). First, the data portion of DNS records of types TXT
and SPF are returned as bytes instead of strings. Second, additional sub
classes of DNSError have been added. Any code that catches DNSError should
be checked to see if it needs updating to catch one of the new sub classes:
ArgumentError, SocketError, TimeoutError, ServerError, and
IncompleteReplyError.
Release 3.0 Sun Mar 2-9 23:07:22 2011 -0400
Ported to Python3 by Scott Kitterman <scott@kitterman.com>. This is mostly a
minimal port to work with Python3 (tested with python3.2) plus addition of
some of the patches that people have submitted on Sourceforge. It should be
fully API compatible with 2.3. Note: Version 3.0.0 shipped with a new
lazy.lookupfull function in advance of 2.3. This was incorporated in pydns
2.3.5 as lazy.lookupalll. It has been renamed in 3.0.1 to stay API compatible
with pydns 2.3.
Release 2.3 Mon May 6 16:18:02 EST 2002
This is a another release of the pydns code, as originally written by
Guido van Rossum, and with a hopefully nicer API bolted over the
top of it by Anthony Baxter <anthony@interlink.com.au>.
This code is released under a Python-style license.
I'm making this release because there hasn't been a release in a
heck of a long time, and it probably deserves one. I'd also like to
do a substantial refactor of some of the guts of the code, and this
is likely to break any code that uses the existing interface. So
this will be a release for people who are using the existing API...
There are several known bugs/unfinished bits
- processing of AXFR results is not done yet.
- doesn't do IPv6 DNS requests (type AAAA)
- docs, aside from this file
- all sorts of other stuff that I've probably forgotten.
- MacOS support for discovering nameservers
- the API that I evolved some time ago is pretty ugly. I'm going
to re-do it, designed this time.
Stuff it _does_ do:
- processes /etc/resolv.conf - at least as far as nameserver directives go.
- tries multiple nameservers.
- nicer API - see below.
- returns results in more useful format.
- optional timing of requests.
- default 'show' behaviour emulates 'dig' pretty closely.
To use:
import DNS
reqobj=DNS.Request(args)
reqobj.req(args)
args can be a name, in which case it takes that as the query, and/or a series
of keyword/value args. (see below for a list of args)
when calling the 'req()' method, it reuses the options specified in the
DNS.Request() call as defaults.
options are applied in the following order:
those specified in the req() call
or, if not specified there,
those specified in the creation of the Request() object
or, if not specified there,
those specified in the DNS.defaults dictionary
name servers can be specified in the following ways:
- by calling DNS.DiscoverNameServers(), which will load the DNS servers
from the system's /etc/resolv.conf file on Unix, or from the Registry
on windows.
- by specifying it as an option to the request
- by manually setting DNS.defaults['server'] to a list of server IP
addresses to try
- XXXX It should be possible to load the DNS servers on a mac os machine,
from where-ever they've squirrelled them away
name="host.do.main" # the object being looked up
qtype="SOA" # the query type, eg SOA, A, MX, CNAME, ANY
protocol="udp" # "udp" or "tcp" - usually you want "udp"
server="nameserver" # the name of the nameserver. Note that you might
# want to use an IP address here
rd=1 # "recursion desired" - defaults to 1.
other: opcode, port, ...
There's also some convenience functions, for the lazy:
to do a reverse lookup:
>>> print DNS.revlookup("192.189.54.17")
yarrina.connect.com.au
to look up all MX records for an entry:
>>> print DNS.mxlookup("connect.com.au")
[(10, 'yarrina.connect.com.au'), (100, 'warrane.connect.com.au')]
Documentation of the rest of the interface will have to wait for a
later date. Note that the DnsAsyncRequest stuff is currently not
working - I haven't looked too closely at why, yet.
There's some examples in the tests/ directory - including test5.py,
which is even vaguely useful. It looks for the SOA for a domain, checks
that the primary NS is authoritative, then checks the nameservers
that it believes are NSs for the domain and checks that they're
authoritative, and that the zone serial numbers match.
see also README.guido for the original docs.
py3dns is derived from pydns. The sourceforge details below refer to pydns.
All py3dns issues/comments/etc should be reported via
https://launchpad.net/py3dns.
comments to me, anthony@interlink.com.au, or to the mailing list,
pydns-developer@lists.sourceforge.net.
bugs/patches to the tracker on SF -
http://sourceforge.net/tracker/?group_id=31674

23
py3dns.egg-info/PKG-INFO Normal file
View File

@ -0,0 +1,23 @@
Metadata-Version: 1.2
Name: py3dns
Version: 3.2.1
Summary: Python 3 DNS library
Home-page: https://launchpad.net/py3dns
Author: Anthony Baxter and others
Author-email: py3dns-hackers@lists.launchpad.net
Maintainer: Scott Kitterman
Maintainer-email: scott@kitterman.com
License: Python License
Description: Python 3 DNS library:
Keywords: DNS
Platform: UNKNOWN
Classifier: Development Status :: 5 - Production/Stable
Classifier: Environment :: No Input/Output (Daemon)
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: Python License (CNRI Python License)
Classifier: Natural Language :: English
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3
Classifier: Topic :: Internet :: Name Service (DNS)
Classifier: Topic :: Software Development :: Libraries :: Python Modules

View File

@ -0,0 +1,34 @@
CHANGES
CREDITS.txt
LICENSE
MANIFEST.in
README-guido.txt
README.txt
setup.py
test.py
DNS/Base.py
DNS/Class.py
DNS/Lib.py
DNS/Opcode.py
DNS/Status.py
DNS/Type.py
DNS/__init__.py
DNS/lazy.py
DNS/win32dns.py
DNS/tests/__init__.py
DNS/tests/testPackers.py
DNS/tests/test_base.py
py3dns.egg-info/PKG-INFO
py3dns.egg-info/SOURCES.txt
py3dns.egg-info/dependency_links.txt
py3dns.egg-info/not-zip-safe
py3dns.egg-info/top_level.txt
tests/test.py
tests/test2.py
tests/test4.py
tests/test5.py
tests/test6.py
tests/test7.py
tests/testsrv.py
tools/caching.py
tools/named-perf.py

View File

@ -0,0 +1 @@

View File

@ -0,0 +1 @@

View File

@ -0,0 +1 @@
DNS

4
setup.cfg Normal file
View File

@ -0,0 +1,4 @@
[egg_info]
tag_build =
tag_date = 0

36
setup.py Normal file
View File

@ -0,0 +1,36 @@
import sys,os
sys.path.insert(0,os.getcwd())
from setuptools import setup
import DNS
setup(
#-- Package description
name = 'py3dns',
license = 'Python License',
version = DNS.__version__,
description = 'Python 3 DNS library',
long_description = """Python 3 DNS library:
""",
author = 'Anthony Baxter and others',
author_email = 'py3dns-hackers@lists.launchpad.net ',
maintainer="Scott Kitterman",
maintainer_email="scott@kitterman.com",
url = 'https://launchpad.net/py3dns',
packages = ['DNS'], keywords = ['DNS'],
zip_safe = False,
classifiers = [
'Development Status :: 5 - Production/Stable',
'Environment :: No Input/Output (Daemon)',
'Intended Audience :: Developers',
'License :: OSI Approved :: Python License (CNRI Python License)',
'Natural Language :: English',
'Operating System :: OS Independent',
'Programming Language :: Python :: 3',
'Topic :: Internet :: Name Service (DNS)',
'Topic :: Software Development :: Libraries :: Python Modules'
]
)

8
test.py Executable file
View File

@ -0,0 +1,8 @@
#! /usr/bin/python3
import unittest
import doctest
import DNS
from DNS.tests import test_suite
unittest.TextTestRunner().run(test_suite())

43
tests/test.py Executable file
View File

@ -0,0 +1,43 @@
#!/usr/bin/python3
import sys ; sys.path.insert(0, '..')
import DNS
# automatically load nameserver(s) from /etc/resolv.conf
# (works on unix - on others, YMMV)
DNS.ParseResolvConf()
# lets do an all-in-one request
# set up the request object
r = DNS.DnsRequest(name='munnari.oz.au',qtype='A')
# do the request
a=r.req()
# and do a pretty-printed output
a.show()
# now lets setup a reusable request object
r = DNS.DnsRequest(qtype='ANY')
res = r.req("a.root-servers.nex",qtype='ANY')
res.show()
res = r.req("proxy.connect.com.au")
res.show()
# do a TCP reply
r = DNS.DnsRequest("imsavscan.netvigator.com", qtype="A", server=['8.8.8.8'], protocol='tcp', timeout=300)
res = r.req()
res.show()
# look up a TXT record
r = DNS.DnsRequest("kitterman.com", qtype="TXT", protocol='tcp')
res = r.req()
res.show()
# look up a AAAA record
r = DNS.DnsRequest("mailout03.controlledmail.com", qtype="AAAA", protocol='tcp')
res = r.req(resulttype='text')
res.show()
# look up a A record set that falls over to EDNS0/TCP
r = DNS.DnsRequest("long-a-record.tana.it", qtype="A", protocol='udp')
res = r.req(resulttype='text')
res.show()

17
tests/test2.py Executable file
View File

@ -0,0 +1,17 @@
#!/usr/bin/python3
import sys ; sys.path.insert(0, '..')
import DNS
# automatically load nameserver(s) from /etc/resolv.conf
# (works on unix - on others, YMMV)
DNS.ParseResolvConf()
r=DNS.Request(qtype='mx')
res = r.req('connect.com.au')
res.show()
r=DNS.Request(qtype='soa')
res = r.req('connect.com.au')
res.show()
print(DNS.revlookup('192.189.54.17'))

10
tests/test4.py Executable file
View File

@ -0,0 +1,10 @@
#!/usr/bin/python3
import sys ; sys.path.insert(0, '..')
import DNS
DNS.ParseResolvConf()
print(DNS.mxlookup("hotmail.com"))
print(DNS.mxlookup("connect.com.au"))

62
tests/test5.py Executable file
View File

@ -0,0 +1,62 @@
#!/usr/bin/python3
import sys ; sys.path.insert(0, '..')
import DNS
def Error(mesg):
import sys
print(sys.argv[0],"ERROR:")
print(mesg)
sys.exit(1)
def main():
import sys
if len(sys.argv) != 2:
Error("usage: %s somedomain.com"%sys.argv[0])
domain = sys.argv[1]
nslist = GetNS(domain)
print("According to the primary, the following are nameservers for this domain")
for ns in nslist:
print(" ",ns)
CheckNS(ns,domain)
def GetNS(domain):
import DNS
# hm. this might fail if a server is off the air.
r = DNS.Request(domain,qtype='SOA').req()
if r.header['status'] != 'NOERROR':
Error("received status of %s when attempting to look up SOA for domain"%
(r.header['status']))
if r.header['status'] == 'NXDOMAIN':
print("SOA request was NXDOMAIN")
primary = ''
else:
if r.answers:
primary,email,serial,refresh,retry,expire,minimum = r.answers[0]['data']
print("Primary nameserver for domain %s is: %s"%(domain,primary))
else:
print("No answer to SOA query")
primary = ''
r = DNS.Request(domain,qtype='NS',server=primary,aa=1).req()
if r.header['status'] != 'NOERROR':
Error("received status of %s when attempting to query %s for NSs"%
(r.header['status']))
if r.header['aa'] != 1 and primary is not '':
Error("primary NS %s doesn't believe that it's authoritative!"% primary)
nslist = [x['data'] for x in r.answers]
print("Full list of nameservers for domain %s is: %s"%(domain,nslist))
return nslist
def CheckNS(nameserver,domain):
r = DNS.Request(domain,qtype='SOA',server=nameserver,aa=1).req()
if r.header['status'] != 'NOERROR':
Error("received status of %s when attempting to query %s for NS"%
(r.header['status']))
if r.header['aa'] != 1:
Error("NS %s doesn't believe that it's authoritative!"% nameserver)
primary,email,serial,refresh,retry,expire,minimum = r.answers[0]['data']
print(" NS has serial",serial[1])
if __name__ == "__main__":
main()

28
tests/test6.py Executable file
View File

@ -0,0 +1,28 @@
#!/usr/bin/python3
import sys ; sys.path.insert(0, '..')
import DNS
req = DNS.DnsRequest('mailout03.controlledmail.com', qtype='AAAA', protocol='tcp')
resp = req.req()
print(resp.answers[0]['name'], resp.answers[0]['data'])
req1 = DNS.DnsRequest('mailout03.controlledmail.com', qtype='AAAA', protocol='udp')
resp1 = req1.req(resulttype='binary')
print(resp1.answers[0]['name'], resp1.answers[0]['data'])
req2 = DNS.DnsRequest('mailout03.controlledmail.com', qtype='AAAA', protocol='tcp')
resp2 = req2.req(resulttype='text')
print(resp2.answers[0]['name'], resp2.answers[0]['data'])
req3 = DNS.DnsRequest('mailout03.controlledmail.com', qtype='A', protocol='tcp')
resp3 = req3.req()
print(resp3.answers[0]['name'], resp3.answers[0]['data'])
req4 = DNS.DnsRequest('mailout03.controlledmail.com', qtype='A', protocol='udp', resulttype='binary')
resp4 = req4.req(resulttype='binary')
print(resp4.answers[0]['name'], resp4.answers[0]['data'])
req5 = DNS.DnsRequest('mailout03.controlledmail.com', qtype='A', protocol='tcp')
resp5 = req5.req(resulttype='text')
print(resp5.answers[0]['name'], resp5.answers[0]['data'])

40
tests/test7.py Executable file
View File

@ -0,0 +1,40 @@
#!/usr/bin/python3
import sys ; sys.path.insert(0, '..')
import DNS
req = DNS.DnsRequest('google.com', qtype='AAAA', protocol='tcp')
resp = req.req()
print(resp.answers[0]['name'], resp.answers[0]['data'])
req1 = DNS.DnsRequest('google.com', qtype='AAAA', protocol='udp')
resp1 = req1.req(resulttype='binary')
print(resp1.answers[0]['name'], resp1.answers[0]['data'])
req2 = DNS.DnsRequest('google.com', qtype='AAAA', protocol='tcp')
resp2 = req2.req(resulttype='text')
print(resp2.answers[0]['name'], resp2.answers[0]['data'])
req3 = DNS.DnsRequest('mailout03.controlledmail.com', qtype='TXT', protocol='tcp')
resp3 = req3.req()
print(resp3.answers[0]['name'], resp3.answers[0]['data'])
req4 = DNS.DnsRequest('mailout03.controlledmail.com', qtype='TXT', protocol='udp', resulttype='binary')
resp4 = req4.req(resulttype='binary')
print(resp4.answers[0]['name'], resp4.answers[0]['data'])
req5 = DNS.DnsRequest('mailout03.controlledmail.com', qtype='TXT', protocol='tcp')
resp5 = req5.req(resulttype='text')
print(resp5.answers[0]['name'], resp5.answers[0]['data'])
req6 = DNS.DnsRequest('controlledmail.com', qtype='MX', protocol='tcp')
resp6 = req6.req()
print(resp6.answers[0]['name'], resp6.answers[0]['data'])
req7 = DNS.DnsRequest('controlledmail.com', qtype='MX', protocol='udp', resulttype='binary')
resp7 = req6.req(resulttype='binary')
print(resp7.answers[0]['name'], resp7.answers[0]['data'])
req8 = DNS.DnsRequest('controlledmail.com', qtype='MX', protocol='tcp')
resp8 = req8.req(resulttype='text')
print(resp8.answers[0]['name'], resp8.answers[0]['data'])

13
tests/testsrv.py Executable file
View File

@ -0,0 +1,13 @@
#!/usr/bin/python3
import sys ; sys.path.insert(0, '..')
import DNS
# automatically load nameserver(s) from /etc/resolv.conf
# (works on unix - on others, YMMV)
DNS.ParseResolvConf()
r=DNS.Request(qtype='srv')
res = r.req('_ldap._tcp.openldap.org')
res.show()
print(res.answers)

55
tools/caching.py Normal file
View File

@ -0,0 +1,55 @@
#
# From: KevinL <darius@bofh.net.au>
# A simple dns answer cache - it's author notes:
# "It's probably really bodgy code, tho - it was my early python..."
# So don't send him abusive messages if you hate it.
#
class DNSCache:
"""
Covers the DNS object, keeps a cache of answers. Clumsy as hell.
"""
forCache = {}
revCache = {}
# cache failures for this long, in seconds
negCache = 3600
def __init__(self):
import DNS
DNS.ParseResolvConf()
def lookup(self,IP = None,name = None):
import DNS
now = time.time()
if (not IP) and (not name):
return None
if IP:
if type(IP) != type(''):
return None
a = string.split(IP, '.')
a.reverse()
name = string.join(a, '.')+'.in-addr.arpa'
cache = self.revCache
qt = 'ptr'
else:
if type(name) != type(''):
return None
cache = self.forCache
qt = 'a'
if name in cache:
# Check if it's timed out or not
if cache[name][1] < now:
del(cache[name])
else:
return(cache[name][0])
x = DNS.DnsRequest(name,qtype=qt)
try:
x.req()
except:
return 'Timeout'
if len(x.response.answers) > 0:
cache[name] = ( x.response.answers[0]['data'], x.time_finish +
x.response.answers[0]['ttl'])
else:
cache[name] = (None,now+self.negCache)
return cache[name][0]

63
tools/named-perf.py Executable file
View File

@ -0,0 +1,63 @@
#!/usr/bin/python3
servers = [ "192.92.129.1",
"192.189.54.17", # yarrina
"192.189.54.33", # warrane
"203.8.183.1", # yalumba
"192.189.54.65", # gnamma
"128.250.1.21", # munnari
]
lookups = [ ( 'munnari.oz.au', 'A' ),
( 'connect.com.au', 'SOA' ),
( 'parc.xerox.com', 'MX' ),
( 'bogus.example.net', 'A'),
]
rpts = 5
def main():
import DNS
import socket
import time
res = {}
for server in servers:
res[server] = [100000,0,0,0] # min,max,tot,failed
for what,querytype in lookups:
for count in range(rpts):
for server in servers:
d = DNS.DnsRequest(server=server,timeout=1)
fail = 0
timingstart = time.time()
try:
r=d.req(name=what,qtype=querytype)
except DNS.Error:
fail = 1
timingfinish = time.time()
if fail:
res[server][3] = res[server][3] + 1
print("(failed)",res[server][3])
if 0:
if r.header['ancount'] == 0:
print("WARNING: Server",server,"got no answers for", \
what, querytype)
t = int(1000 * (timingfinish - timingstart))
print(server,"took",t,"ms for",what,querytype)
res[server][0] = min(t,res[server][0])
res[server][1] = max(t,res[server][1])
res[server][2] = res[server][2] + t
for server in servers:
queries = rpts * len(lookups)
r = res[server]
print(server)
print("%-30s %2d/%2d(%3.2f%%) %dms/%dms/%dms min/avg/max" % (
socket.gethostbyaddr(server)[0],
queries - r[3], queries,
((queries-r[3])*100.0)/queries,
r[0],
r[2] / queries,
r[1]))
if __name__ == "__main__":
main()