Skip to content

Add high level LDAPObject.set_tls_options() #350

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions Doc/reference/ldap.rst
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,9 @@ SASL options
TLS options
:::::::::::

The method :py:meth:`LDAPObject.set_tls_options` provides a high-level API
to configure TLS options.

.. warning::

libldap does not materialize all TLS settings immediately. You must use
Expand Down Expand Up @@ -1339,6 +1342,51 @@ Connection-specific LDAP options
specified by *option* to *invalue*.


.. py:method:: LDAPObject.set_tls_options(cacertfile=None, cacertdir=None, require_cert=None, protocol_min=None, cipher_suite=None, certfile=None, keyfile=None, crlfile=None, crlcheck=None, start_tls=True) -> None

The method provides a high-level API to set TLS related options. It
avoids most common pitfalls and some catches errors early, e.g.
missing :py:const:`OPT_X_TLS_NEWCTX`. The method is available for OpenSSL
and GnuTLS backends. It raises :py:exc:`ValueError` for unsupported
backends, when libldap does not have TLS support, or TLS layer is already
installed.

*cacertfile* is a path to a PEM bundle file containing root CA certs.
Raises :py:exc:`OSError` when file is not found.

*cacertdir* is a path to a directory that contains hashed CA cert files.
Raises :py:exc:`OSError` when the directory does not exist.

*require_cert* set the cert validation strategy. Value must be one of
:py:const:`OPT_X_TLS_NEVER`, :py:const:`OPT_X_TLS_DEMAND`,
or :py:const:`OPT_X_TLS_HARD`. Hard and demand have the same meaning.
Raises :py:exc:`ValueError` for unsupported values.

*protocol_min* sets the minimum TLS protocol version. Value must one of
``0x303`` (TLS 1.2) or ``0x304`` (TLS 1.3). Raises :py:exc:`ValueError`
for unsupported values.

*cipher_suite* cipher suite string, see OpenSSL documentation for more
details.

*certfile* and *keyfile* set paths to certificate and key for client
cert authentication. Raises :py:exc:`ValueError` when only one option
is given and :py:exc:`OSError` when any file does not exist.

*crlfile* is path to a CRL file. Raises :py:exc:`OSError` when file is
not found.

*crlcheck* sets the CRL verification strategy. Value must be one of
:py:const:`OPT_X_TLS_CRL_NONE`, :py:const:`OPT_X_TLS_CRL_PEER`, or
:py:const:`OPT_X_TLS_CRL_ALL`. Raises :py:exc:`ValueError` for unsupported
values.

When *start_tls* is set then :py:meth:`LDAPObject.start_tls_s` is
automatically called for ``ldap://`` URIs. The argument is ignored
for ``ldaps://`` URIs.

.. versionadded:: 3.3

Object attributes
-----------------

Expand Down
3 changes: 3 additions & 0 deletions Doc/spelling_wordlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ attrtype
authzId
automagically
backend
backends
behaviour
BER
bindname
Expand Down Expand Up @@ -56,6 +57,7 @@ filterstr
filterStr
formatOID
func
GnuTLS
GPG
Heimdal
hostport
Expand Down Expand Up @@ -144,6 +146,7 @@ subtree
syncrepl
syntaxes
timelimit
TLS
tracebacks
tuple
tuples
Expand Down
128 changes: 128 additions & 0 deletions Lib/ldap/ldapobject.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
See https://www.python-ldap.org/ for details.
"""
from os import strerror
import os.path

from ldap.pkginfo import __version__, __author__, __license__

Expand Down Expand Up @@ -697,6 +698,133 @@ def set_option(self,option,invalue):
invalue = RequestControlTuples(invalue)
return self._ldap_call(self._l.set_option,option,invalue)

def set_tls_options(self, cacertfile=None, cacertdir=None,
require_cert=None, protocol_min=None,
cipher_suite=None, certfile=None, keyfile=None,
crlfile=None, crlcheck=None, start_tls=True):
"""Set TLS/SSL options

:param cacertfile: path to a PEM bundle file containing root CA certs
:param cacertdir: path to a directory with hashed CA certificates
:param require_cert: cert validation strategy, one of
ldap.OPT_X_TLS_NEVER, OPT_X_TLS_DEMAND, OPT_X_TLS_HARD. Hard and
demand have the same meaning for client side sockets.
:param protocol_min: minimum protocol version, one of 0x303 (TLS 1.2)
or 0x304 (TLS 1.3).
:param cipher_suite: cipher suite string
:param certfile: path to cert file for client cert authentication
:param keyfile: path to key file for client cert authentication
:param crlfile: path to a CRL file
:param crlcheck: CRL verification strategy, one of
ldap.OPT_X_TLS_CRL_NONE, ldap.OPT_X_TLS_CRL_PEER, or
ldap.OPT_X_TLS_CRL_ALL
:param start_tls: automatically perform StartTLS for ldap:// connections
"""
if not hasattr(ldap, "OPT_X_TLS_NEWCTX"):
raise ValueError("libldap does not have TLS support")

# OpenSSL and GnuTLS support these options
tls_pkg = self.get_option(ldap.OPT_X_TLS_PACKAGE)
if tls_pkg not in {"OpenSSL", "GnuTLS"}:
raise ValueError("Unsupport TLS package '{}'.".format(tls_pkg))

# block ldapi ('in' because libldap supports multiple URIs)
if "ldapi://" in self._uri:
raise ValueError("IPC (ldapi) does not support TLS.")

# Check that TLS layer is not inplace yet
if self._ldap_call(self._l.tls_inplace):
raise ValueError("TLS connection already established")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly the exception types... I think this one will fit a bit better - EnvironmentError (instead of ValueError).


def _checkfile(option, filename):
# check that the file exists and is readable.
# libldap doesn't verify paths until it establishes a connection
if not os.access(filename, os.R_OK):
raise OSError(
f"{option} '{filename}' does not exist or is not readable"
)

if cacertfile is not None:
_checkfile("certfile", certfile)
self.set_option(ldap.OPT_X_TLS_CACERTFILE, cacertfile)

if cacertdir is not None:
if not os.path.isdir(cacertdir):
raise OSError(
"'{}' does not exist or is not a directory".format(cacertdir)
)
self.set_option(ldap.OPT_X_TLS_CACERTDIR, cacertdir)

if require_cert is not None:
supported = {
ldap.OPT_X_TLS_NEVER,
# ALLOW is a server-side setting
# ldap.OPT_X_TLS_ALLOW,
ldap.OPT_X_TLS_DEMAND,
ldap.OPT_X_TLS_HARD
}
if require_cert not in supported:
raise ValueError("Unsupported value for require_cert")
self.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, require_cert)

if protocol_min is not None:
# let's not support TLS 1.0 and 1.1
supported = {0x303, 0x304}
if protocol_min not in supported:
raise ValueError("Unsupported value for protocol_min")
self.set_option(ldap.OPT_X_TLS_PROTOCOL_MIN, protocol_min)

if cipher_suite is not None:
self.set_option(ldap.OPT_X_TLS_CIPHER_SUITE, cipher_suite)

if certfile is not None:
if keyfile is None:
raise ValueError("certfile option requires keyfile option")
_checkfile("certfile", certfile)
self.set_option(ldap.OPT_X_TLS_CERTFILE, certfile)

if keyfile is not None:
if certfile is None:
raise ValueError("keyfile option requires certfile option")
_checkfile("keyfile", keyfile)
self.set_option(ldap.OPT_X_TLS_KEYFILE, keyfile)

if crlfile is not None:
_checkfile("crlfile", crlfile)
self.set_option(ldap.OPT_X_TLS_CRLFILE, crlfile)

if crlcheck is not None:
# no check for crlfile. OpenSSL supports CRLs in CACERTDIR, too.
supported = {
ldap.OPT_X_TLS_CRL_NONE,
ldap.OPT_X_TLS_CRL_PEER,
ldap.OPT_X_TLS_CRL_ALL
}
if crlcheck not in supported:
raise ValueError("Unsupported value for crlcheck")
self.set_option(ldap.OPT_X_TLS_CRLCHECK, crlcheck)

# materialize settings
# 0 means client-side socket
try:
self.set_option(ldap.OPT_X_TLS_NEWCTX, 0)
except ValueError as e:
# libldap doesn't return better error message here, global debug log
# may contain more information.
raise ValueError(
"libldap or {} does not support one or more options: {}".format(
tls_pkg, e
)
)

# Cannot use OPT_X_TLS with OPT_X_TLS_HARD to enforce StartTLS.
# libldap ldap_int_open_connection() calls ldap_int_tls_start() when
# mode is HARD, but it does not send LDAP_EXOP_START_TLS first.
if start_tls and "ldap://" in self._uri:
if self.protocol_version != ldap.VERSION3:
self.protocol_version = ldap.VERSION3
self.start_tls_s()

def search_subschemasubentry_s(self,dn=None):
"""
Returns the distinguished name of the sub schema sub entry
Expand Down
10 changes: 10 additions & 0 deletions Modules/LDAPObject.c
Original file line number Diff line number Diff line change
Expand Up @@ -1360,6 +1360,15 @@ l_ldap_start_tls_s(LDAPObject *self, PyObject *args)
return Py_None;
}

static PyObject *
l_ldap_tls_inplace(LDAPObject *self)
{
if (not_valid(self))
return NULL;

return PyBool_FromLong(ldap_tls_inplace(self->ldap));
}

#endif

/* ldap_set_option */
Expand Down Expand Up @@ -1525,6 +1534,7 @@ static PyMethodDef methods[] = {
{"search_ext", (PyCFunction)l_ldap_search_ext, METH_VARARGS},
#ifdef HAVE_TLS
{"start_tls_s", (PyCFunction)l_ldap_start_tls_s, METH_VARARGS},
{"tls_inplace", (PyCFunction)l_ldap_tls_inplace, METH_NOARGS},
#endif
{"whoami_s", (PyCFunction)l_ldap_whoami_s, METH_VARARGS},
{"passwd", (PyCFunction)l_ldap_passwd, METH_VARARGS},
Expand Down
56 changes: 56 additions & 0 deletions Tests/t_ldapobject.py
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,62 @@ def test_multiple_starttls(self):
l.simple_bind_s(self.server.root_dn, self.server.root_pw)
self.assertEqual(l.whoami_s(), 'dn:' + self.server.root_dn)

def assert_option_equal(self, conn, option, value):
self.assertEqual(conn.get_option(option), value)

@requires_tls()
def test_set_tls_options_ldap(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the coverage report has failed.
I think it's important to test all the parts before merging...

If you haven't started on the test suite expansion yet, I can take a look later and add a few tests so it will cover the rest of the code. :)

# just any directory will do
certdir = os.path.dirname(__file__)
conn = self.ldap_object_class(self.server.ldap_uri)
conn.set_tls_options(
cacertfile=self.server.cafile,
# just any directory
cacertdir=certdir,
require_cert=ldap.OPT_X_TLS_DEMAND,
protocol_min=0x303,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a very minor nitpick.
I understand that it is common knowledge but can we mention here in a comment that 0x303 is TLS 1.2? For friendliness and explicitness:)

# libldap on Travis CI doesn't like cipher_suite
# cipher_suite="ALL",
certfile=self.server.clientcert,
keyfile=self.server.clientkey,
# libldap on TravisCI doesn't like CRL options
# crlfile=None,
# crlcheck=ldap.OPT_X_TLS_CRL_PEER,
start_tls=False
)
self.assert_option_equal(
conn, ldap.OPT_X_TLS_CACERTFILE, self.server.cafile
)
self.assert_option_equal(
conn, ldap.OPT_X_TLS_CACERTDIR, certdir
)
self.assert_option_equal(
conn, ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_DEMAND
)
# cipher_suite depends on OpenSSL version and system settings
self.assert_option_equal(
conn, ldap.OPT_X_TLS_PROTOCOL_MIN, 0x303
)
self.assert_option_equal(
conn, ldap.OPT_X_TLS_CERTFILE, self.server.clientcert
)
self.assert_option_equal(
conn, ldap.OPT_X_TLS_KEYFILE, self.server.clientkey,
)
# self.assert_option_equal(
# conn, ldap.OPT_X_TLS_CRLFILE, crlfile
# )
# self.assert_option_equal(
# conn, ldap.OPT_X_TLS_CRLCHECK, ldap.OPT_X_TLS_CRL_PEER
# )

# run again, this time with default start_tls.
conn.set_tls_options()
# second call should fail
with self.assertRaises(ValueError) as e:
conn.set_tls_options()
self.assertIn("TLS connection already established", str(e.exception))

def test_dse(self):
dse = self._ldap_conn.read_rootdse_s()
self.assertIsInstance(dse, dict)
Expand Down