slapd: add support for testing with TLS
This commit is contained in:
parent
d32d58add6
commit
c3c0f1dcb7
|
@ -41,8 +41,9 @@ class Slapd(object):
|
|||
objectClass: olcGlobal
|
||||
cn: config
|
||||
olcToolThreads: 1
|
||||
olcLogLevel: stats
|
||||
olcLogLevel: -1
|
||||
olcLogFile: {slapd_dir}/log
|
||||
{extra_config}
|
||||
|
||||
dn: cn=module{{0}},cn=config
|
||||
objectClass: olcModuleList
|
||||
|
@ -90,29 +91,52 @@ olcAccess: {{0}}to *
|
|||
'schemas', '%s.ldif' % schema)).read() for schema in schemas]
|
||||
checkpoints = None
|
||||
data_dirs = None
|
||||
db_index = 1
|
||||
tls = None
|
||||
|
||||
def create_process(self, args, pipe=True):
|
||||
def create_process(self, args, pipe=True, out=None):
|
||||
if pipe:
|
||||
return subprocess.Popen(args, stdin=subprocess.PIPE, env=os.environ,
|
||||
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
else:
|
||||
DEVNULL = open(os.devnull, 'w')
|
||||
return subprocess.Popen(args, stdin=DEVNULL, env=os.environ, stdout=DEVNULL,
|
||||
stderr=DEVNULL)
|
||||
out = out or DEVNULL
|
||||
return subprocess.Popen(args, stdin=DEVNULL, env=os.environ, stdout=out,
|
||||
stderr=out)
|
||||
|
||||
def __init__(self, ldap_url=None):
|
||||
def __init__(self, ldap_url=None, config_context=None, tls=None):
|
||||
assert has_slapd()
|
||||
self.checkpoints = []
|
||||
self.data_dirs = []
|
||||
self.slapd_dir = tempfile.mkdtemp(prefix='a2-provision-slapd')
|
||||
self.slapd_dir = tempfile.mkdtemp(prefix='slapd-server')
|
||||
self.config_dir = os.path.join(self.slapd_dir, 'slapd.d')
|
||||
os.mkdir(self.config_dir)
|
||||
self.socket = os.path.join(self.slapd_dir, 'socket')
|
||||
if not ldap_url:
|
||||
ldap_url = 'ldapi://%s' % self.socket.replace('/', '%2F')
|
||||
self.ldap_url = ldap_url
|
||||
self.slapadd(self.config_ldif, context={'slapd_dir': self.slapd_dir, 'gid': os.getgid(),
|
||||
'uid': os.getuid()})
|
||||
config_context = config_context or {}
|
||||
extra_config = ''
|
||||
if tls:
|
||||
key_file, certificate_file = tls
|
||||
real_key = os.path.join(self.slapd_dir, 'key.pem')
|
||||
real_cert = os.path.join(self.slapd_dir, 'cert.pem')
|
||||
with open(real_key, 'w') as f, open(key_file) as g:
|
||||
f.write(g.read())
|
||||
with open(real_cert, 'w') as f, open(certificate_file) as g:
|
||||
f.write(g.read())
|
||||
self.tls = real_key, real_cert
|
||||
|
||||
extra_config += 'olcTLSCertificateKeyFile: %s\n' % real_key
|
||||
extra_config += 'olcTLSCertificateFile: %s\n' % real_cert
|
||||
extra_config += 'olcSecurity: ssf=1\n'
|
||||
config_context.update({
|
||||
'slapd_dir': self.slapd_dir,
|
||||
'gid': os.getgid(),
|
||||
'uid': os.getuid(),
|
||||
'extra_config': extra_config,
|
||||
})
|
||||
self.slapadd(self.config_ldif, context=config_context)
|
||||
for schema_ldif in self.schemas_ldif:
|
||||
self.slapadd(schema_ldif)
|
||||
self.start()
|
||||
|
@ -127,7 +151,7 @@ o: orga
|
|||
def add_db(self, suffix):
|
||||
path = os.path.join(self.slapd_dir, suffix)
|
||||
os.mkdir(path)
|
||||
ldif = '''dn: olcDatabase=mdb,cn=config
|
||||
ldif = '''dn: olcDatabase={{{index}}}mdb,cn=config
|
||||
objectClass: olcDatabaseConfig
|
||||
objectClass: olcMdbConfig
|
||||
olcDatabase: mdb
|
||||
|
@ -138,7 +162,8 @@ olcReadOnly: FALSE
|
|||
olcAccess: {{0}}to * by * manage
|
||||
|
||||
'''
|
||||
self.add_ldif(ldif, context={'suffix': suffix, 'path': path})
|
||||
self.add_ldif(ldif, context={'index': self.db_index, 'suffix': suffix, 'path': path})
|
||||
self.db_index += 1
|
||||
self.data_dirs.append(path)
|
||||
|
||||
def slapadd(self, ldif, db=0, context=None):
|
||||
|
@ -158,14 +183,19 @@ olcAccess: {{0}}to * by * manage
|
|||
'-d768', # put slapd in foreground
|
||||
'-F' + self.config_dir,
|
||||
'-h', self.ldap_url]
|
||||
self.process = self.create_process(cmd, pipe=False)
|
||||
out_file = open(os.path.join(self.slapd_dir, 'stdout'), 'w')
|
||||
self.process = self.create_process(cmd, pipe=False, out=out_file)
|
||||
atexit.register(self.clean)
|
||||
|
||||
c = 0
|
||||
while True:
|
||||
c += 1
|
||||
try:
|
||||
conn = self.get_connection()
|
||||
conn.whoami_s()
|
||||
except ldap.SERVER_DOWN:
|
||||
except ldap.SERVER_DOWN as e:
|
||||
if c > 100:
|
||||
raise
|
||||
time.sleep(0.1)
|
||||
else:
|
||||
break
|
||||
|
@ -176,6 +206,7 @@ olcAccess: {{0}}to * by * manage
|
|||
|
||||
process = self.process
|
||||
process.kill()
|
||||
process.wait()
|
||||
|
||||
while True:
|
||||
try:
|
||||
|
@ -215,12 +246,14 @@ olcAccess: {{0}}to * by * manage
|
|||
|
||||
def clean(self):
|
||||
'''Remove directory'''
|
||||
if self.slapd_dir:
|
||||
if os.path.exists(self.slapd_dir):
|
||||
shutil.rmtree(self.slapd_dir, ignore_errors=True)
|
||||
self.slapd_dir = None
|
||||
if self.process:
|
||||
self.stop()
|
||||
try:
|
||||
if self.process:
|
||||
self.stop()
|
||||
finally:
|
||||
if self.slapd_dir:
|
||||
if os.path.exists(self.slapd_dir):
|
||||
shutil.rmtree(self.slapd_dir, ignore_errors=True)
|
||||
self.slapd_dir = None
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
@ -238,10 +271,18 @@ olcAccess: {{0}}to * by * manage
|
|||
conn = self.get_connection_admin()
|
||||
parser.add(conn)
|
||||
|
||||
def get_connection(self):
|
||||
def get_connection(self, tls=None):
|
||||
assert self.process
|
||||
|
||||
return PagedLDAPObject(self.ldap_url)
|
||||
conn = PagedLDAPObject(self.ldap_url)
|
||||
if tls:
|
||||
conn.set_option(ldap.OPT_X_TLS_KEYFILE, tls[0])
|
||||
conn.set_option(ldap.OPT_X_TLS_CERTFILE, tls[1])
|
||||
if self.tls:
|
||||
conn.set_option(ldap.OPT_X_TLS_CACERTFILE, self.tls[1])
|
||||
conn.set_option(ldap.OPT_X_TLS_NEWCTX, 0)
|
||||
if not self.ldap_url.startswith('ldaps'):
|
||||
conn.start_tls_s()
|
||||
return conn
|
||||
|
||||
def get_connection_admin(self):
|
||||
conn = self.get_connection()
|
||||
|
|
|
@ -1,23 +1,52 @@
|
|||
import pytest
|
||||
import tempfile
|
||||
import os
|
||||
import random
|
||||
|
||||
from ldaptools.slapd import Slapd
|
||||
|
||||
base_path = os.path.dirname(__file__)
|
||||
key_file = os.path.join(base_path, 'ssl-cert-snakeoil.key')
|
||||
certificate_file = os.path.join(base_path, 'ssl-cert-snakeoil.pem')
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def slapd(request):
|
||||
return Slapd(ldap_url=getattr(request, 'param', None))
|
||||
with Slapd(ldap_url=getattr(request, 'param', None)) as s:
|
||||
yield s
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def slapd_tcp1(request):
|
||||
return Slapd(ldap_url='ldap://localhost:3389')
|
||||
port = 4389
|
||||
with Slapd(ldap_url='ldap://localhost:%s' % port) as s:
|
||||
yield s
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def slapd_tcp2(request):
|
||||
return Slapd(ldap_url='ldap://localhost:4389')
|
||||
port = 5389
|
||||
with Slapd(ldap_url='ldap://localhost:%s' % port) as s:
|
||||
yield s
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def slapd_ssl(request):
|
||||
port = 6389
|
||||
with Slapd(ldap_url='ldaps://localhost:%s' % port, tls=(key_file, certificate_file)) as s:
|
||||
yield s
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def slapd_tls(request):
|
||||
port = 7389
|
||||
with Slapd(ldap_url='ldap://localhost:%s' % port, tls=(key_file, certificate_file)) as s:
|
||||
yield s
|
||||
|
||||
|
||||
@pytest.fixture(params=['slapd_tcp1', 'slapd_ssl', 'slapd_tls'])
|
||||
def any_slapd(request, slapd_tcp1, slapd_ssl, slapd_tls):
|
||||
return vars().get(request.param)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
|
|
|
@ -26,3 +26,27 @@ sn: n
|
|||
slapd.start()
|
||||
conn = slapd.get_connection()
|
||||
assert len(conn.search_s('o=orga', ldap.SCOPE_SUBTREE)) == 1
|
||||
|
||||
|
||||
def test_any(any_slapd):
|
||||
conn = any_slapd.get_connection()
|
||||
conn.simple_bind_s('uid=admin,cn=config', 'admin')
|
||||
|
||||
|
||||
def test_ssl_client_cert(slapd_ssl):
|
||||
conn = slapd_ssl.get_connection_admin()
|
||||
conn.modify_s('cn=config', [
|
||||
(ldap.MOD_ADD, 'olcTLSCACertificateFile', slapd_ssl.tls[1]),
|
||||
(ldap.MOD_ADD, 'olcTLSVerifyClient', 'demand'),
|
||||
])
|
||||
|
||||
with pytest.raises((ldap.SERVER_DOWN, ldap.CONNECT_ERROR)):
|
||||
conn = slapd_ssl.get_connection()
|
||||
conn.whoami_s()
|
||||
|
||||
conn = slapd_ssl.get_connection(tls=slapd_ssl.tls)
|
||||
conn.whoami_s()
|
||||
|
||||
|
||||
def test_tls_client_cert(slapd_tls):
|
||||
test_ssl_client_cert(slapd_tls)
|
||||
|
|
Loading…
Reference in New Issue