slapd: add support for testing with TLS

This commit is contained in:
Benjamin Dauvergne 2018-12-04 14:23:40 +01:00
parent d32d58add6
commit c3c0f1dcb7
4 changed files with 119 additions and 24 deletions

View File

@ -41,8 +41,9 @@ class Slapd(object):
objectClass: olcGlobal
cn: config
olcToolThreads: 1
olcLogLevel: stats
olcLogLevel: -1
olcLogFile: {slapd_dir}/log
{extra_config}
dn: cn=module{{0}},cn=config
objectClass: olcModuleList
@ -90,29 +91,52 @@ olcAccess: {{0}}to *
'schemas', '%s.ldif' % schema)).read() for schema in schemas]
checkpoints = None
data_dirs = None
db_index = 1
tls = None
def create_process(self, args, pipe=True):
def create_process(self, args, pipe=True, out=None):
if pipe:
return subprocess.Popen(args, stdin=subprocess.PIPE, env=os.environ,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
else:
DEVNULL = open(os.devnull, 'w')
return subprocess.Popen(args, stdin=DEVNULL, env=os.environ, stdout=DEVNULL,
stderr=DEVNULL)
out = out or DEVNULL
return subprocess.Popen(args, stdin=DEVNULL, env=os.environ, stdout=out,
stderr=out)
def __init__(self, ldap_url=None):
def __init__(self, ldap_url=None, config_context=None, tls=None):
assert has_slapd()
self.checkpoints = []
self.data_dirs = []
self.slapd_dir = tempfile.mkdtemp(prefix='a2-provision-slapd')
self.slapd_dir = tempfile.mkdtemp(prefix='slapd-server')
self.config_dir = os.path.join(self.slapd_dir, 'slapd.d')
os.mkdir(self.config_dir)
self.socket = os.path.join(self.slapd_dir, 'socket')
if not ldap_url:
ldap_url = 'ldapi://%s' % self.socket.replace('/', '%2F')
self.ldap_url = ldap_url
self.slapadd(self.config_ldif, context={'slapd_dir': self.slapd_dir, 'gid': os.getgid(),
'uid': os.getuid()})
config_context = config_context or {}
extra_config = ''
if tls:
key_file, certificate_file = tls
real_key = os.path.join(self.slapd_dir, 'key.pem')
real_cert = os.path.join(self.slapd_dir, 'cert.pem')
with open(real_key, 'w') as f, open(key_file) as g:
f.write(g.read())
with open(real_cert, 'w') as f, open(certificate_file) as g:
f.write(g.read())
self.tls = real_key, real_cert
extra_config += 'olcTLSCertificateKeyFile: %s\n' % real_key
extra_config += 'olcTLSCertificateFile: %s\n' % real_cert
extra_config += 'olcSecurity: ssf=1\n'
config_context.update({
'slapd_dir': self.slapd_dir,
'gid': os.getgid(),
'uid': os.getuid(),
'extra_config': extra_config,
})
self.slapadd(self.config_ldif, context=config_context)
for schema_ldif in self.schemas_ldif:
self.slapadd(schema_ldif)
self.start()
@ -127,7 +151,7 @@ o: orga
def add_db(self, suffix):
path = os.path.join(self.slapd_dir, suffix)
os.mkdir(path)
ldif = '''dn: olcDatabase=mdb,cn=config
ldif = '''dn: olcDatabase={{{index}}}mdb,cn=config
objectClass: olcDatabaseConfig
objectClass: olcMdbConfig
olcDatabase: mdb
@ -138,7 +162,8 @@ olcReadOnly: FALSE
olcAccess: {{0}}to * by * manage
'''
self.add_ldif(ldif, context={'suffix': suffix, 'path': path})
self.add_ldif(ldif, context={'index': self.db_index, 'suffix': suffix, 'path': path})
self.db_index += 1
self.data_dirs.append(path)
def slapadd(self, ldif, db=0, context=None):
@ -158,14 +183,19 @@ olcAccess: {{0}}to * by * manage
'-d768', # put slapd in foreground
'-F' + self.config_dir,
'-h', self.ldap_url]
self.process = self.create_process(cmd, pipe=False)
out_file = open(os.path.join(self.slapd_dir, 'stdout'), 'w')
self.process = self.create_process(cmd, pipe=False, out=out_file)
atexit.register(self.clean)
c = 0
while True:
c += 1
try:
conn = self.get_connection()
conn.whoami_s()
except ldap.SERVER_DOWN:
except ldap.SERVER_DOWN as e:
if c > 100:
raise
time.sleep(0.1)
else:
break
@ -176,6 +206,7 @@ olcAccess: {{0}}to * by * manage
process = self.process
process.kill()
process.wait()
while True:
try:
@ -215,12 +246,14 @@ olcAccess: {{0}}to * by * manage
def clean(self):
'''Remove directory'''
if self.slapd_dir:
if os.path.exists(self.slapd_dir):
shutil.rmtree(self.slapd_dir, ignore_errors=True)
self.slapd_dir = None
if self.process:
self.stop()
try:
if self.process:
self.stop()
finally:
if self.slapd_dir:
if os.path.exists(self.slapd_dir):
shutil.rmtree(self.slapd_dir, ignore_errors=True)
self.slapd_dir = None
def __enter__(self):
return self
@ -238,10 +271,18 @@ olcAccess: {{0}}to * by * manage
conn = self.get_connection_admin()
parser.add(conn)
def get_connection(self):
def get_connection(self, tls=None):
assert self.process
return PagedLDAPObject(self.ldap_url)
conn = PagedLDAPObject(self.ldap_url)
if tls:
conn.set_option(ldap.OPT_X_TLS_KEYFILE, tls[0])
conn.set_option(ldap.OPT_X_TLS_CERTFILE, tls[1])
if self.tls:
conn.set_option(ldap.OPT_X_TLS_CACERTFILE, self.tls[1])
conn.set_option(ldap.OPT_X_TLS_NEWCTX, 0)
if not self.ldap_url.startswith('ldaps'):
conn.start_tls_s()
return conn
def get_connection_admin(self):
conn = self.get_connection()

View File

@ -1,23 +1,52 @@
import pytest
import tempfile
import os
import random
from ldaptools.slapd import Slapd
base_path = os.path.dirname(__file__)
key_file = os.path.join(base_path, 'ssl-cert-snakeoil.key')
certificate_file = os.path.join(base_path, 'ssl-cert-snakeoil.pem')
@pytest.fixture
def slapd(request):
return Slapd(ldap_url=getattr(request, 'param', None))
with Slapd(ldap_url=getattr(request, 'param', None)) as s:
yield s
@pytest.fixture
def slapd_tcp1(request):
return Slapd(ldap_url='ldap://localhost:3389')
port = 4389
with Slapd(ldap_url='ldap://localhost:%s' % port) as s:
yield s
@pytest.fixture
def slapd_tcp2(request):
return Slapd(ldap_url='ldap://localhost:4389')
port = 5389
with Slapd(ldap_url='ldap://localhost:%s' % port) as s:
yield s
@pytest.fixture
def slapd_ssl(request):
port = 6389
with Slapd(ldap_url='ldaps://localhost:%s' % port, tls=(key_file, certificate_file)) as s:
yield s
@pytest.fixture
def slapd_tls(request):
port = 7389
with Slapd(ldap_url='ldap://localhost:%s' % port, tls=(key_file, certificate_file)) as s:
yield s
@pytest.fixture(params=['slapd_tcp1', 'slapd_ssl', 'slapd_tls'])
def any_slapd(request, slapd_tcp1, slapd_ssl, slapd_tls):
return vars().get(request.param)
@pytest.fixture

View File

@ -26,3 +26,27 @@ sn: n
slapd.start()
conn = slapd.get_connection()
assert len(conn.search_s('o=orga', ldap.SCOPE_SUBTREE)) == 1
def test_any(any_slapd):
conn = any_slapd.get_connection()
conn.simple_bind_s('uid=admin,cn=config', 'admin')
def test_ssl_client_cert(slapd_ssl):
conn = slapd_ssl.get_connection_admin()
conn.modify_s('cn=config', [
(ldap.MOD_ADD, 'olcTLSCACertificateFile', slapd_ssl.tls[1]),
(ldap.MOD_ADD, 'olcTLSVerifyClient', 'demand'),
])
with pytest.raises((ldap.SERVER_DOWN, ldap.CONNECT_ERROR)):
conn = slapd_ssl.get_connection()
conn.whoami_s()
conn = slapd_ssl.get_connection(tls=slapd_ssl.tls)
conn.whoami_s()
def test_tls_client_cert(slapd_tls):
test_ssl_client_cert(slapd_tls)

View File

@ -16,6 +16,7 @@ deps =
pytest
pytest-cov
pytest-random
python-ldap<3
commands =
py.test {env:COVERAGE:} {posargs:--random tests}