From dbf100f4d7719fe829b2bdfd44b3abeced0be923 Mon Sep 17 00:00:00 2001 From: Florian Best Date: Fri, 18 Mar 2022 13:13:26 +0100 Subject: [PATCH 1/3] test(ldap.dn): Add test cases for ldap.dn.dn2str() with different format flags --- Tests/t_ldap_dn.py | 141 +++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 137 insertions(+), 4 deletions(-) diff --git a/Tests/t_ldap_dn.py b/Tests/t_ldap_dn.py index 86d36403..18bc90a3 100644 --- a/Tests/t_ldap_dn.py +++ b/Tests/t_ldap_dn.py @@ -57,6 +57,7 @@ def test_str2dn(self): test function str2dn() """ self.assertEqual(ldap.dn.str2dn(''), []) + self.assertEqual(ldap.dn.str2dn(None), []) self.assertEqual( ldap.dn.str2dn('uid=test42,ou=Testing,dc=example,dc=com'), [ @@ -105,7 +106,7 @@ def test_str2dn(self): self.assertEqual( ldap.dn.str2dn('cn=äöüÄÖÜß,dc=example,dc=com', flags=0), [ - [('cn', 'äöüÄÖÜß', 4)], + [('cn', 'äöüÄÖÜß', ldap.AVA_NONPRINTABLE)], [('dc', 'example', 1)], [('dc', 'com', 1)] ] @@ -113,7 +114,16 @@ def test_str2dn(self): self.assertEqual( ldap.dn.str2dn('cn=\\c3\\a4\\c3\\b6\\c3\\bc\\c3\\84\\c3\\96\\c3\\9c\\c3\\9f,dc=example,dc=com', flags=0), [ - [('cn', 'äöüÄÖÜß', 4)], + [('cn', 'äöüÄÖÜß', ldap.AVA_NONPRINTABLE)], + [('dc', 'example', 1)], + [('dc', 'com', 1)] + ] + ) + self.assertEqual( + ldap.dn.str2dn('/dc=com/dc=example/ou=Testing/uid=test42', flags=ldap.DN_FORMAT_DCE), + [ + [('uid', 'test42', 1)], + [('ou', 'Testing', 1)], [('dc', 'example', 1)], [('dc', 'com', 1)] ] @@ -123,7 +133,7 @@ def test_dn2str(self): """ test function dn2str() """ - self.assertEqual(ldap.dn.str2dn(''), []) + self.assertEqual(ldap.dn.dn2str([]), '') self.assertEqual( ldap.dn.dn2str([ [('uid', 'test42', 1)], @@ -162,12 +172,135 @@ def test_dn2str(self): ) self.assertEqual( ldap.dn.dn2str([ - [('cn', 'äöüÄÖÜß', 4)], + [('uid', 'test, 42', 1)], + [('ou', 'Testing', 1)], + [('dc', 'example', 1)], + [('dc', 'com', 1)] + ], ldap.DN_FORMAT_LDAPV3), + 'uid=test\\2C 42,ou=Testing,dc=example,dc=com' + ) + self.assertEqual( + ldap.dn.dn2str([ + [('cn', 'äöüÄÖÜß', ldap.AVA_NONPRINTABLE)], [('dc', 'example', 1)], [('dc', 'com', 1)] ]), 'cn=äöüÄÖÜß,dc=example,dc=com' ) + self.assertEqual( + ldap.dn.dn2str([ + [('cn', 'äöüÄÖÜß', ldap.AVA_NONPRINTABLE)], + [('dc', 'example', 1)], + [('dc', 'com', 1)] + ], ldap.DN_FORMAT_LDAPV3), + r'cn=\C3\A4\C3\B6\C3\BC\C3\84\C3\96\C3\9C\C3\9F,dc=example,dc=com' + ) + self.assertEqual( + ldap.dn.dn2str([ + [('uid', 'test42', 1), ('cn', 'test42', 1)], + [('ou', 'Testing', 1)], + [('dc', 'example', 1)], + [('dc', 'com', 1)] + ], ldap.DN_FORMAT_AD_CANONICAL), + 'example.com/Testing/test42,test42' + ) + self.assertEqual( + ldap.dn.dn2str([ + [('uid', 'test42', 1), ('cn', 'test42', 1)], + [('ou', 'Testing', 1)], + [('dc', 'example', 1)], + [('dc', 'com', 1)] + ], ldap.DN_FORMAT_UFN), + 'test42 + test42, Testing, example.com' + ) + self.assertEqual( + ldap.dn.dn2str([ + [('uid', 'test42', 1), ('cn', 'test42', 1)], + [('ou', 'Testing', 1)], + [('dc', 'example', 1)], + [('dc', 'com', 1)] + ], ldap.DN_FORMAT_DCE), + '/dc=com/dc=example/ou=Testing/uid=test42,cn=test42' + ) + + self.assertEqual( + ldap.dn.dn2str([ + [('cn', 'äöüÄÖÜß', ldap.AVA_BINARY)], + [('dc', 'example', 1)], + [('dc', 'com', 1)] + ], ldap.DN_FORMAT_LDAPV3), + 'cn=#C3A4C3B6C3BCC384C396C39CC39F,dc=example,dc=com' + ) + self.assertEqual( + ldap.dn.dn2str([ + [('cn', 'äöüÄÖÜß', ldap.AVA_NULL)], + [('dc', 'example', 1)], + [('dc', 'com', 1)] + ], ldap.DN_FORMAT_LDAPV3), + 'cn=\\C3\\A4\\C3\\B6\\C3\\BC\\C3\\84\\C3\\96\\C3\\9C\\C3\\9F,dc=example,dc=com' + ) + self.assertEqual( + ldap.dn.dn2str([ + [('cn', 'äöüÄÖÜß', ldap.AVA_STRING)], + [('dc', 'example', 1)], + [('dc', 'com', 1)] + ], ldap.DN_FORMAT_LDAPV3), + 'cn=\\C3\\A4\\C3\\B6\\C3\\BC\\C3\\84\\C3\\96\\C3\\9C\\C3\\9F,dc=example,dc=com' + ) + self.assertEqual( + ldap.dn.dn2str([ + [('cn', 'äöüÄÖÜß', ldap.AVA_NONPRINTABLE)], + [('dc', 'example', 1)], + [('dc', 'com', 1)] + ], ldap.DN_FORMAT_LDAPV3), + 'cn=\\C3\\A4\\C3\\B6\\C3\\BC\\C3\\84\\C3\\96\\C3\\9C\\C3\\9F,dc=example,dc=com' + ) + + def test_dn2str_errors(self): + """ + test error handling of function dn2str() + """ + with self.assertRaises(RuntimeError): + ldap.dn.dn2str([[('uid', 'test42', 1)]], 142) + + ldap_format = ldap.DN_FORMAT_LDAPV3 + + with self.assertRaises(TypeError): + ldap.dn.dn2str(None) + + with self.assertRaises(TypeError): + ldap.dn.dn2str(None, ldap_format) + + with self.assertRaises(TypeError): + ldap.dn.dn2str([1], ldap_format) + + with self.assertRaises(TypeError): + ldap.dn.dn2str([[1]], ldap_format) + + with self.assertRaises(TypeError): + ldap.dn.dn2str([[('uid', 'test42', '1')]], ldap_format) + + with self.assertRaises(TypeError): + ldap.dn.dn2str([[('uid', 'test42', 1.0)]], ldap_format) + + with self.assertRaises(TypeError): + ldap.dn.dn2str([[['uid', 'test42', 1]]], ldap_format) + + with self.assertRaises(TypeError): + ldap.dn.dn2str([ + [('uid', 'test42', 1), ('cn', 'test42', 1)], + [('ou', 'Testing', 1)], + [('dc', 'example', '1')], + [('dc', 'com', 1)] + ], ldap_format), + + with self.assertRaises(TypeError): + ldap.dn.dn2str([ + [('ou', 'Testing', 1)], + [('dc', 'example', 1)], + [('uid', 'test42', 1), ('cn', 'test42', '1')], + [('dc', 'com', 1)] + ], ldap_format), def test_explode_dn(self): """ From bf0c7019011e34d95ebfbd396c305867fa632ed5 Mon Sep 17 00:00:00 2001 From: Florian Best Date: Fri, 18 Mar 2022 13:13:26 +0100 Subject: [PATCH 2/3] feat(ldap.dn): Add support for different formats in `ldap.dn.dn2str()` via flags In C `dn2str()` supports `flags` which works by providing one of `LDAP_DN_FORMAT_UFN`, `LDAP_DN_FORMAT_AD_CANONICAL`, `LDAP_DN_FORMAT_DCE`, `LDAP_DN_FORMAT_LDAPV3`. These symbols do exist in Python, but could not be used ultimately because the Python counterpart was pure Python and did not pass to `dn2str(3)`. Fix #257 --- Lib/ldap/dn.py | 12 ++- Modules/functions.c | 217 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 226 insertions(+), 3 deletions(-) diff --git a/Lib/ldap/dn.py b/Lib/ldap/dn.py index a9d96846..b466a442 100644 --- a/Lib/ldap/dn.py +++ b/Lib/ldap/dn.py @@ -48,12 +48,17 @@ def str2dn(dn,flags=0): return ldap.functions._ldap_function_call(None,_ldap.str2dn,dn,flags) -def dn2str(dn): +def dn2str(dn, flags=0): """ This function takes a decomposed DN as parameter and returns - a single string. It's the inverse to str2dn() but will always - return a DN in LDAPv3 format compliant to RFC 4514. + a single string. It's the inverse to str2dn() but will by default always + return a DN in LDAPv3 format compliant to RFC 4514 if not otherwise specified + via flags. + + See also the OpenLDAP man-page ldap_dn2str(3) """ + if flags: + return ldap.functions._ldap_function_call(None, _ldap.dn2str, dn, flags) return ','.join([ '+'.join([ '='.join((atype,escape_dn_chars(avalue or ''))) @@ -61,6 +66,7 @@ def dn2str(dn): for rdn in dn ]) + def explode_dn(dn, notypes=False, flags=0): """ explode_dn(dn [, notypes=False [, flags=0]]) -> list diff --git a/Modules/functions.c b/Modules/functions.c index b811708f..0a56310d 100644 --- a/Modules/functions.c +++ b/Modules/functions.c @@ -160,6 +160,222 @@ l_ldap_str2dn(PyObject *unused, PyObject *args) return result; } +/* ldap_dn2str */ + +static void +_free_dn_structure(LDAPDN dn) +{ + if (dn == NULL) + return; + + for (LDAPRDN * rdn = dn; *rdn != NULL; rdn++) { + for (LDAPAVA ** avap = *rdn; *avap != NULL; avap++) { + LDAPAVA *ava = *avap; + + if (ava->la_attr.bv_val) { + free(ava->la_attr.bv_val); + } + if (ava->la_value.bv_val) { + free(ava->la_value.bv_val); + } + free(ava); + } + free(*rdn); + } + free(dn); +} + +/* + * Convert a Python list-of-list-of-(str, str, int) into an LDAPDN and + * call ldap_dn2bv to build a DN string. + * + * Python signature: dn2str(dn: list[list[tuple[str, str, int]]], flags: int) -> str + * Returns the DN string on success, or raises TypeError or RuntimeError on error. + */ +static PyObject * +l_ldap_dn2str(PyObject *self, PyObject *args) +{ + PyObject *dn_list = NULL; + int flags = 0; + LDAPDN dn = NULL; + LDAPAVA *ava; + LDAPAVA **rdn; + BerValue str = { 0, NULL }; + PyObject *py_rdn_seq = NULL, *py_ava_item = NULL; + PyObject *py_name = NULL, *py_value = NULL, *py_encoding = NULL; + PyObject *result = NULL; + Py_ssize_t nrdns = 0, navas = 0, name_len = 0, value_len = 0; + int i = 0, j = 0; + int ldap_err; + const char *name_utf8, *value_utf8; + + const char *type_error_message = "expected list[list[tuple[str, str, int]]]"; + + if (!PyArg_ParseTuple(args, "Oi:dn2str", &dn_list, &flags)) { + return NULL; + } + + if (!PySequence_Check(dn_list)) { + PyErr_SetString(PyExc_TypeError, type_error_message); + return NULL; + } + + nrdns = PySequence_Size(dn_list); + if (nrdns < 0) { + PyErr_SetString(PyExc_TypeError, type_error_message); + return NULL; + } + + /* Allocate array of LDAPRDN pointers (+1 for NULL terminator) */ + dn = (LDAPRDN *) calloc((size_t)nrdns + 1, sizeof(LDAPRDN)); + if (dn == NULL) { + PyErr_NoMemory(); + return NULL; + } + + for (i = 0; i < nrdns; i++) { + py_rdn_seq = PySequence_GetItem(dn_list, i); /* New reference */ + if (py_rdn_seq == NULL) { + goto error_cleanup; + } + if (!PySequence_Check(py_rdn_seq)) { + PyErr_SetString(PyExc_TypeError, type_error_message); + goto error_cleanup; + } + + navas = PySequence_Size(py_rdn_seq); + if (navas < 0) { + PyErr_SetString(PyExc_TypeError, type_error_message); + goto error_cleanup; + } + + /* Allocate array of LDAPAVA* pointers (+1 for NULL terminator) */ + rdn = (LDAPAVA **)calloc((size_t)navas + 1, sizeof(LDAPAVA *)); + if (rdn == NULL) { + PyErr_NoMemory(); + goto error_cleanup; + } + + for (j = 0; j < navas; j++) { + py_ava_item = PySequence_GetItem(py_rdn_seq, j); /* New reference */ + if (py_ava_item == NULL) { + goto error_cleanup; + } + /* Expect a 3‐tuple: (name: str, value: str, encoding: int) */ + if (!PyTuple_Check(py_ava_item) || PyTuple_Size(py_ava_item) != 3) { + PyErr_SetString(PyExc_TypeError, type_error_message); + goto error_cleanup; + } + + py_name = PyTuple_GetItem(py_ava_item, 0); /* Borrowed reference */ + py_value = PyTuple_GetItem(py_ava_item, 1); /* Borrowed reference */ + py_encoding = PyTuple_GetItem(py_ava_item, 2); /* Borrowed reference */ + + if (!PyUnicode_Check(py_name) || !PyUnicode_Check(py_value) || !PyLong_Check(py_encoding)) { + PyErr_SetString(PyExc_TypeError, type_error_message); + goto error_cleanup; + } + + name_len = 0; + value_len = 0; + name_utf8 = PyUnicode_AsUTF8AndSize(py_name, &name_len); + value_utf8 = PyUnicode_AsUTF8AndSize(py_value, &value_len); + if (name_utf8 == NULL || value_utf8 == NULL) { + goto error_cleanup; + } + + ava = (LDAPAVA *) calloc(1, sizeof(LDAPAVA)); + + if (ava == NULL) { + PyErr_NoMemory(); + goto error_cleanup; + } + + ava->la_attr.bv_val = (char *)malloc((size_t)name_len + 1); + if (ava->la_attr.bv_val == NULL) { + free(ava); + PyErr_NoMemory(); + goto error_cleanup; + } + memcpy(ava->la_attr.bv_val, name_utf8, (size_t)name_len); + ava->la_attr.bv_val[name_len] = '\0'; + ava->la_attr.bv_len = (ber_len_t) name_len; + + ava->la_value.bv_val = (char *)malloc((size_t)value_len + 1); + if (ava->la_value.bv_val == NULL) { + free(ava->la_attr.bv_val); + free(ava); + PyErr_NoMemory(); + goto error_cleanup; + } + memcpy(ava->la_value.bv_val, value_utf8, (size_t)value_len); + ava->la_value.bv_val[value_len] = '\0'; + ava->la_value.bv_len = (ber_len_t) value_len; + + ava->la_flags = (int)PyLong_AsLong(py_encoding); + if (PyErr_Occurred()) { + /* Encoding conversion failed */ + free(ava->la_attr.bv_val); + free(ava->la_value.bv_val); + free(ava); + goto error_cleanup; + } + + rdn[j] = ava; + Py_DECREF(py_ava_item); + py_ava_item = NULL; + } + + /* Null‐terminate the RDN */ + rdn[navas] = NULL; + + dn[i] = rdn; + Py_DECREF(py_rdn_seq); + py_rdn_seq = NULL; + } + + /* Null‐terminate the DN */ + dn[nrdns] = NULL; + + /* Call ldap_dn2bv to build a DN string */ + ldap_err = ldap_dn2bv(dn, &str, flags); + if (ldap_err != LDAP_SUCCESS) { + PyErr_SetString(PyExc_RuntimeError, ldap_err2string(ldap_err)); + goto error_cleanup; + } + + result = PyUnicode_FromString(str.bv_val); + if (result == NULL) { + goto error_cleanup; + } + + /* Free the memory allocated by ldap_dn2bv */ + ldap_memfree(str.bv_val); + str.bv_val = NULL; + + /* Free our local DN structure */ + _free_dn_structure(dn); + dn = NULL; + + return result; + + error_cleanup: + /* Free any partially built DN structure */ + _free_dn_structure(dn); + dn = NULL; + + /* If ldap_dn2bv allocated something, free it */ + if (str.bv_val) { + ldap_memfree(str.bv_val); + str.bv_val = NULL; + } + + /* Cleanup Python temporaries */ + Py_XDECREF(py_ava_item); + Py_XDECREF(py_rdn_seq); + return NULL; +} + /* ldap_set_option (global options) */ static PyObject * @@ -196,6 +412,7 @@ static PyMethodDef methods[] = { {"initialize_fd", (PyCFunction)l_ldap_initialize_fd, METH_VARARGS}, #endif {"str2dn", (PyCFunction)l_ldap_str2dn, METH_VARARGS}, + {"dn2str", (PyCFunction)l_ldap_dn2str, METH_VARARGS}, {"set_option", (PyCFunction)l_ldap_set_option, METH_VARARGS}, {"get_option", (PyCFunction)l_ldap_get_option, METH_VARARGS}, {NULL, NULL} From 7c80d5c341612877659d3bcd605fcf65a07addbf Mon Sep 17 00:00:00 2001 From: Florian Best Date: Wed, 4 Jun 2025 08:00:13 +0200 Subject: [PATCH 3/3] feat(ldap.dn): add ldap.dn.normalize() --- Lib/ldap/dn.py | 5 +++++ Tests/t_ldap_dn.py | 30 +++++++++++++++++++++++++++++- 2 files changed, 34 insertions(+), 1 deletion(-) diff --git a/Lib/ldap/dn.py b/Lib/ldap/dn.py index b466a442..dd7278b6 100644 --- a/Lib/ldap/dn.py +++ b/Lib/ldap/dn.py @@ -122,3 +122,8 @@ def is_dn(s,flags=0): return False else: return True + + +def normalize(s, flags=0): + """Returns a normalized distinguished name (DN)""" + return dn2str(str2dn(s, flags), flags) diff --git a/Tests/t_ldap_dn.py b/Tests/t_ldap_dn.py index 18bc90a3..3168bc13 100644 --- a/Tests/t_ldap_dn.py +++ b/Tests/t_ldap_dn.py @@ -222,7 +222,6 @@ def test_dn2str(self): ], ldap.DN_FORMAT_DCE), '/dc=com/dc=example/ou=Testing/uid=test42,cn=test42' ) - self.assertEqual( ldap.dn.dn2str([ [('cn', 'äöüÄÖÜß', ldap.AVA_BINARY)], @@ -378,6 +377,35 @@ def test_explode_rdn(self): ['cn=äöüÄÖÜß'] ) + def test_normalize(self): + """ + test function normalize() + """ + self.assertEqual( + ldap.dn.normalize('uid = test42 , ou = Testing , dc = example , dc = com', flags=ldap.DN_FORMAT_LDAPV3), + 'uid=test42,ou=Testing,dc=example,dc=com' + ) + self.assertEqual( + ldap.dn.normalize('cn=äöüÄÖÜß,dc=example,dc=com', flags=0), + 'cn=äöüÄÖÜß,dc=example,dc=com' + ) + self.assertEqual( + ldap.dn.normalize('cn=#C3A4C3B6C3BCC384C396C39CC39F,dc=example,dc=com', flags=0), + 'cn=äöüÄÖÜß,dc=example,dc=com' + ) + self.assertEqual( + ldap.dn.normalize('cn=#C3A4C3B6C3BCC384C396C39CC39F,dc=example,dc=com', flags=ldap.DN_FORMAT_LDAPV3), + 'cn=#C3A4C3B6C3BCC384C396C39CC39F,dc=example,dc=com' + ) + self.assertEqual( + ldap.dn.normalize('cn=äöüÄÖÜß,dc=example,dc=com', flags=ldap.DN_FORMAT_LDAPV3), + r'cn=\C3\A4\C3\B6\C3\BC\C3\84\C3\96\C3\9C\C3\9F,dc=example,dc=com' + ) + self.assertEqual( + ldap.dn.normalize('/ dc = com / dc = example / ou = Testing / uid = test42 , cn = test42', flags=ldap.DN_FORMAT_DCE), + '/dc=com/dc=example/ou=Testing/uid=test42,cn=test42' + ) + if __name__ == '__main__': unittest.main()