From c3c0f1dcb74684e351e8b93e8ab5711ec8942114 Mon Sep 17 00:00:00 2001 From: Benjamin Dauvergne Date: Tue, 4 Dec 2018 14:23:40 +0100 Subject: [PATCH] slapd: add support for testing with TLS --- src/ldaptools/slapd.py | 83 +++++++++++++++++++++++++++++++----------- tests/conftest.py | 35 ++++++++++++++++-- tests/test_slapd.py | 24 ++++++++++++ tox.ini | 1 + 4 files changed, 119 insertions(+), 24 deletions(-) diff --git a/src/ldaptools/slapd.py b/src/ldaptools/slapd.py index eda7928..a18f54c 100644 --- a/src/ldaptools/slapd.py +++ b/src/ldaptools/slapd.py @@ -41,8 +41,9 @@ class Slapd(object): objectClass: olcGlobal cn: config olcToolThreads: 1 -olcLogLevel: stats +olcLogLevel: -1 olcLogFile: {slapd_dir}/log +{extra_config} dn: cn=module{{0}},cn=config objectClass: olcModuleList @@ -90,29 +91,52 @@ olcAccess: {{0}}to * 'schemas', '%s.ldif' % schema)).read() for schema in schemas] checkpoints = None data_dirs = None + db_index = 1 + tls = None - def create_process(self, args, pipe=True): + def create_process(self, args, pipe=True, out=None): if pipe: return subprocess.Popen(args, stdin=subprocess.PIPE, env=os.environ, stdout=subprocess.PIPE, stderr=subprocess.PIPE) else: DEVNULL = open(os.devnull, 'w') - return subprocess.Popen(args, stdin=DEVNULL, env=os.environ, stdout=DEVNULL, - stderr=DEVNULL) + out = out or DEVNULL + return subprocess.Popen(args, stdin=DEVNULL, env=os.environ, stdout=out, + stderr=out) - def __init__(self, ldap_url=None): + def __init__(self, ldap_url=None, config_context=None, tls=None): assert has_slapd() self.checkpoints = [] self.data_dirs = [] - self.slapd_dir = tempfile.mkdtemp(prefix='a2-provision-slapd') + self.slapd_dir = tempfile.mkdtemp(prefix='slapd-server') self.config_dir = os.path.join(self.slapd_dir, 'slapd.d') os.mkdir(self.config_dir) self.socket = os.path.join(self.slapd_dir, 'socket') if not ldap_url: ldap_url = 'ldapi://%s' % self.socket.replace('/', '%2F') self.ldap_url = ldap_url - self.slapadd(self.config_ldif, context={'slapd_dir': self.slapd_dir, 'gid': os.getgid(), - 'uid': os.getuid()}) + config_context = config_context or {} + extra_config = '' + if tls: + key_file, certificate_file = tls + real_key = os.path.join(self.slapd_dir, 'key.pem') + real_cert = os.path.join(self.slapd_dir, 'cert.pem') + with open(real_key, 'w') as f, open(key_file) as g: + f.write(g.read()) + with open(real_cert, 'w') as f, open(certificate_file) as g: + f.write(g.read()) + self.tls = real_key, real_cert + + extra_config += 'olcTLSCertificateKeyFile: %s\n' % real_key + extra_config += 'olcTLSCertificateFile: %s\n' % real_cert + extra_config += 'olcSecurity: ssf=1\n' + config_context.update({ + 'slapd_dir': self.slapd_dir, + 'gid': os.getgid(), + 'uid': os.getuid(), + 'extra_config': extra_config, + }) + self.slapadd(self.config_ldif, context=config_context) for schema_ldif in self.schemas_ldif: self.slapadd(schema_ldif) self.start() @@ -127,7 +151,7 @@ o: orga def add_db(self, suffix): path = os.path.join(self.slapd_dir, suffix) os.mkdir(path) - ldif = '''dn: olcDatabase=mdb,cn=config + ldif = '''dn: olcDatabase={{{index}}}mdb,cn=config objectClass: olcDatabaseConfig objectClass: olcMdbConfig olcDatabase: mdb @@ -138,7 +162,8 @@ olcReadOnly: FALSE olcAccess: {{0}}to * by * manage ''' - self.add_ldif(ldif, context={'suffix': suffix, 'path': path}) + self.add_ldif(ldif, context={'index': self.db_index, 'suffix': suffix, 'path': path}) + self.db_index += 1 self.data_dirs.append(path) def slapadd(self, ldif, db=0, context=None): @@ -158,14 +183,19 @@ olcAccess: {{0}}to * by * manage '-d768', # put slapd in foreground '-F' + self.config_dir, '-h', self.ldap_url] - self.process = self.create_process(cmd, pipe=False) + out_file = open(os.path.join(self.slapd_dir, 'stdout'), 'w') + self.process = self.create_process(cmd, pipe=False, out=out_file) atexit.register(self.clean) + c = 0 while True: + c += 1 try: conn = self.get_connection() conn.whoami_s() - except ldap.SERVER_DOWN: + except ldap.SERVER_DOWN as e: + if c > 100: + raise time.sleep(0.1) else: break @@ -176,6 +206,7 @@ olcAccess: {{0}}to * by * manage process = self.process process.kill() + process.wait() while True: try: @@ -215,12 +246,14 @@ olcAccess: {{0}}to * by * manage def clean(self): '''Remove directory''' - if self.slapd_dir: - if os.path.exists(self.slapd_dir): - shutil.rmtree(self.slapd_dir, ignore_errors=True) - self.slapd_dir = None - if self.process: - self.stop() + try: + if self.process: + self.stop() + finally: + if self.slapd_dir: + if os.path.exists(self.slapd_dir): + shutil.rmtree(self.slapd_dir, ignore_errors=True) + self.slapd_dir = None def __enter__(self): return self @@ -238,10 +271,18 @@ olcAccess: {{0}}to * by * manage conn = self.get_connection_admin() parser.add(conn) - def get_connection(self): + def get_connection(self, tls=None): assert self.process - - return PagedLDAPObject(self.ldap_url) + conn = PagedLDAPObject(self.ldap_url) + if tls: + conn.set_option(ldap.OPT_X_TLS_KEYFILE, tls[0]) + conn.set_option(ldap.OPT_X_TLS_CERTFILE, tls[1]) + if self.tls: + conn.set_option(ldap.OPT_X_TLS_CACERTFILE, self.tls[1]) + conn.set_option(ldap.OPT_X_TLS_NEWCTX, 0) + if not self.ldap_url.startswith('ldaps'): + conn.start_tls_s() + return conn def get_connection_admin(self): conn = self.get_connection() diff --git a/tests/conftest.py b/tests/conftest.py index 2b8fa5e..021bf0d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,23 +1,52 @@ import pytest import tempfile import os +import random from ldaptools.slapd import Slapd +base_path = os.path.dirname(__file__) +key_file = os.path.join(base_path, 'ssl-cert-snakeoil.key') +certificate_file = os.path.join(base_path, 'ssl-cert-snakeoil.pem') + @pytest.fixture def slapd(request): - return Slapd(ldap_url=getattr(request, 'param', None)) + with Slapd(ldap_url=getattr(request, 'param', None)) as s: + yield s @pytest.fixture def slapd_tcp1(request): - return Slapd(ldap_url='ldap://localhost:3389') + port = 4389 + with Slapd(ldap_url='ldap://localhost:%s' % port) as s: + yield s @pytest.fixture def slapd_tcp2(request): - return Slapd(ldap_url='ldap://localhost:4389') + port = 5389 + with Slapd(ldap_url='ldap://localhost:%s' % port) as s: + yield s + + +@pytest.fixture +def slapd_ssl(request): + port = 6389 + with Slapd(ldap_url='ldaps://localhost:%s' % port, tls=(key_file, certificate_file)) as s: + yield s + + +@pytest.fixture +def slapd_tls(request): + port = 7389 + with Slapd(ldap_url='ldap://localhost:%s' % port, tls=(key_file, certificate_file)) as s: + yield s + + +@pytest.fixture(params=['slapd_tcp1', 'slapd_ssl', 'slapd_tls']) +def any_slapd(request, slapd_tcp1, slapd_ssl, slapd_tls): + return vars().get(request.param) @pytest.fixture diff --git a/tests/test_slapd.py b/tests/test_slapd.py index 56e06ab..e4888da 100644 --- a/tests/test_slapd.py +++ b/tests/test_slapd.py @@ -26,3 +26,27 @@ sn: n slapd.start() conn = slapd.get_connection() assert len(conn.search_s('o=orga', ldap.SCOPE_SUBTREE)) == 1 + + +def test_any(any_slapd): + conn = any_slapd.get_connection() + conn.simple_bind_s('uid=admin,cn=config', 'admin') + + +def test_ssl_client_cert(slapd_ssl): + conn = slapd_ssl.get_connection_admin() + conn.modify_s('cn=config', [ + (ldap.MOD_ADD, 'olcTLSCACertificateFile', slapd_ssl.tls[1]), + (ldap.MOD_ADD, 'olcTLSVerifyClient', 'demand'), + ]) + + with pytest.raises((ldap.SERVER_DOWN, ldap.CONNECT_ERROR)): + conn = slapd_ssl.get_connection() + conn.whoami_s() + + conn = slapd_ssl.get_connection(tls=slapd_ssl.tls) + conn.whoami_s() + + +def test_tls_client_cert(slapd_tls): + test_ssl_client_cert(slapd_tls) diff --git a/tox.ini b/tox.ini index 2fbe662..063b256 100644 --- a/tox.ini +++ b/tox.ini @@ -16,6 +16,7 @@ deps = pytest pytest-cov pytest-random + python-ldap<3 commands = py.test {env:COVERAGE:} {posargs:--random tests}