From a414e997ac7a76afbdd0406a35f040840d59fbee Mon Sep 17 00:00:00 2001 From: Christian Heimes Date: Thu, 16 Sep 2021 13:56:34 +0200 Subject: [PATCH 1/2] Implement support for OPT_X_TLS_PEERCERT Co-authored-by: Thomas Grainger Signed-off-by: Christian Heimes --- Doc/reference/ldap.rst | 7 ++++++- Modules/options.c | 24 +++++++++++++++++++++++- Tests/t_ldapobject.py | 35 +++++++++++++++++++++++++++++++++++ 3 files changed, 64 insertions(+), 2 deletions(-) diff --git a/Doc/reference/ldap.rst b/Doc/reference/ldap.rst index 57485c7a..3029f64c 100644 --- a/Doc/reference/ldap.rst +++ b/Doc/reference/ldap.rst @@ -406,7 +406,12 @@ TLS options .. py:data:: OPT_X_TLS_PEERCERT - Get peer's certificate as binary ASN.1 data structure (not supported) + Get peer's certificate as binary ASN.1 data structure (DER) + + .. versionadded:: 3.4.0 + + .. note:: + The option leaks memory with OpenLDAP < 2.5.8. .. py:data:: OPT_X_TLS_PROTOCOL_MIN diff --git a/Modules/options.c b/Modules/options.c index db5fde3e..48ef0f36 100644 --- a/Modules/options.c +++ b/Modules/options.c @@ -5,6 +5,7 @@ #include "LDAPObject.h" #include "ldapcontrol.h" #include "options.h" +#include "berval.h" void set_timeval_from_double(struct timeval *tv, double d) @@ -58,6 +59,9 @@ LDAP_set_option(LDAPObject *self, int option, PyObject *value) case LDAP_OPT_API_FEATURE_INFO: #ifdef HAVE_SASL case LDAP_OPT_X_SASL_SSF: +#endif +#ifdef LDAP_OPT_X_TLS_PEERCERT + case LDAP_OPT_X_TLS_PEERCERT: #endif /* Read-only options */ PyErr_SetString(PyExc_ValueError, "read-only option"); @@ -257,6 +261,9 @@ LDAP_get_option(LDAPObject *self, int option) #if HAVE_SASL /* unsigned long */ ber_len_t blen; +#endif +#ifdef LDAP_OPT_X_TLS_PEERCERT + struct berval bv = {0}; #endif PyObject *extensions, *v; Py_ssize_t i, num_extensions; @@ -431,7 +438,22 @@ LDAP_get_option(LDAPObject *self, int option) v = LDAPControls_to_List(lcs); ldap_controls_free(lcs); return v; - +#ifdef LDAP_OPT_X_TLS_PEERCERT + case LDAP_OPT_X_TLS_PEERCERT: + res = LDAP_int_get_option(self, option, &bv); + if (res != LDAP_OPT_SUCCESS) { + return option_error(res, "ldap_get_option"); + } + if (bv.bv_len == 0) { + Py_RETURN_NONE; + } else { + v = LDAPberval_to_object(&bv); + /* bv_val memory is allocated with ber_memalloc_x() + * context allocation/dealloc is a private API. */ + ber_memfree(bv.bv_val); + return v; + } +#endif default: PyErr_Format(PyExc_ValueError, "unknown option %d", option); return NULL; diff --git a/Tests/t_ldapobject.py b/Tests/t_ldapobject.py index 3bcc00a2..07a78595 100644 --- a/Tests/t_ldapobject.py +++ b/Tests/t_ldapobject.py @@ -20,6 +20,11 @@ from slapdtest import requires_ldapi, requires_sasl, requires_tls from slapdtest import requires_init_fd +try: + from ssl import PEM_cert_to_DER_cert +except ImportError: + PEM_cert_to_DER_cert = None + LDIF_TEMPLATE = """dn: %(suffix)s objectClass: dcObject @@ -421,6 +426,36 @@ def test_multiple_starttls(self): l.simple_bind_s(self.server.root_dn, self.server.root_pw) self.assertEqual(l.whoami_s(), 'dn:' + self.server.root_dn) + @requires_tls() + @unittest.skipUnless( + hasattr(ldap, "OPT_X_TLS_PEERCERT"), + reason="Requires OPT_X_TLS_PEERCERT" + ) + def test_get_tls_peercert(self): + l = self.ldap_object_class(self.server.ldap_uri) + peercert = l.get_option(ldap.OPT_X_TLS_PEERCERT) + self.assertEqual(peercert, None) + with self.assertRaises(ValueError): + l.set_option(ldap.OPT_X_TLS_PEERCERT, b"") + + l.set_option(ldap.OPT_X_TLS_CACERTFILE, self.server.cafile) + l.set_option(ldap.OPT_X_TLS_NEWCTX, 0) + l.start_tls_s() + + peercert = l.get_option(ldap.OPT_X_TLS_PEERCERT) + self.assertTrue(peercert) + self.assertIsInstance(peercert, bytes) + + if PEM_cert_to_DER_cert is not None: + with open(self.server.servercert) as f: + server_pem = f.read() + # remove text + begin = server_pem.find("-----BEGIN CERTIFICATE-----") + server_pem = server_pem[begin:-1] + + server_der = PEM_cert_to_DER_cert(server_pem) + self.assertEqual(server_der, peercert) + def test_dse(self): dse = self._ldap_conn.read_rootdse_s() self.assertIsInstance(dse, dict) From e704ffa12a9c5d614e53041fdd32da3bcbf2a915 Mon Sep 17 00:00:00 2001 From: Christian Heimes Date: Sat, 18 Sep 2021 17:36:44 +0200 Subject: [PATCH 2/2] Use regex to locate PEM body --- Tests/t_ldapobject.py | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/Tests/t_ldapobject.py b/Tests/t_ldapobject.py index 07a78595..9e4e3311 100644 --- a/Tests/t_ldapobject.py +++ b/Tests/t_ldapobject.py @@ -3,9 +3,11 @@ See https://www.python-ldap.org/ for details. """ +import base64 import errno import linecache import os +import re import socket import unittest import pickle @@ -20,10 +22,10 @@ from slapdtest import requires_ldapi, requires_sasl, requires_tls from slapdtest import requires_init_fd -try: - from ssl import PEM_cert_to_DER_cert -except ImportError: - PEM_cert_to_DER_cert = None +PEM_CERT_RE = re.compile( + b'-----BEGIN CERTIFICATE-----(.*?)-----END CERTIFICATE-----', + re.DOTALL +) LDIF_TEMPLATE = """dn: %(suffix)s @@ -446,15 +448,12 @@ def test_get_tls_peercert(self): self.assertTrue(peercert) self.assertIsInstance(peercert, bytes) - if PEM_cert_to_DER_cert is not None: - with open(self.server.servercert) as f: - server_pem = f.read() - # remove text - begin = server_pem.find("-----BEGIN CERTIFICATE-----") - server_pem = server_pem[begin:-1] + with open(self.server.servercert, "rb") as f: + server_cert = f.read() + pem_body = PEM_CERT_RE.search(server_cert).group(1) + server_der = base64.b64decode(pem_body) - server_der = PEM_cert_to_DER_cert(server_pem) - self.assertEqual(server_der, peercert) + self.assertEqual(server_der, peercert) def test_dse(self): dse = self._ldap_conn.read_rootdse_s()