Skip to content

Add OpenLDAPSyncreplCookie class #571

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Doc/reference/ldap-syncrepl.rst
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,6 @@ This module defines the following classes:

.. autoclass:: ldap.syncrepl.SyncreplConsumer
:members:

.. autoclass:: ldap.syncrepl.OpenLDAPSyncreplCookie
:members:
70 changes: 70 additions & 0 deletions Lib/ldap/syncrepl.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
See https://www.python-ldap.org/ for project details.
"""

from typing import AnyStr, Dict, List, Tuple, Union
from uuid import UUID

# Imports from pyasn1
Expand Down Expand Up @@ -535,3 +536,72 @@ def syncrepl_refreshdone(self):
follows.
"""
pass


class OpenLDAPSyncreplCookie:
"""
OpenLDAPSyncreplCookie - allows a consumer to track a cookie across a
refreshAndPersist syncrepl session against a multi-provider OpenLDAP cluster
"""

rid: int = 0
sid: int = 0
_csnset: Dict[int, str]

def __init__(self, cookie: AnyStr = "") -> None:
self._csnset = {}

if cookie:
self.update(cookie)

def _parse_csn(self, csn: str) -> Tuple[str, str, str, str]:
time, order, sid, other = csn.split('#', 3)
return (time, order, sid, other)

def _parse_cookie(self, cookie: AnyStr) -> Dict[str, Union[str, List[str]]]:
if isinstance(cookie, bytes):
cookie = cookie.decode()

result = {}
parts = cookie.split(',')
for part in parts:
if part.startswith('rid='):
result['rid'] = part[4:]
elif part.startswith('sid='):
result['sid'] = part[4:]
elif part.startswith('csn='):
result['csn'] = part[4:].split(';')
elif part.startswith('delcsn='):
result['delcsn'] = part[7:]
else:
# Did not recognise this cookie part
pass
return result

def update(self, cookie: str):
"""
Update the CSN set based on a cookie we just received, use in
syncrepl_set_cookie() to track the session state.
"""
components = self._parse_cookie(cookie)
for csn in components.get('csn', []):
_, _, sid, _ = self._parse_csn(csn)
if sid not in self._csnset or self._csnset[sid] < csn:
self._csnset[sid] = csn

return self

def unparse(self) -> str:
"""
Return the cookie as a string, use in syncrepl_get_cookie() or when
storing the state for later use.
"""
cookie = 'rid={:03},sid={:03x}'.format(self.rid or 0, self.sid or 0)
if self._csnset:
cookie += ',csn='
cookie += ';'.join(csn
for sid, csn in sorted(self._csnset.items()))
return cookie

def __str__(self):
return self.unparse()
5 changes: 5 additions & 0 deletions Lib/slapdtest/_slapdtest.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@
cn: module
olcModuleLoad: back_%(database)s

dn: olcDatabase=config,cn=config
objectClass: olcDatabaseConfig
olcDatabase: config
olcRootDN: %(rootdn)s

dn: olcDatabase=%(database)s,cn=config
objectClass: olcDatabaseConfig
objectClass: olcMdbConfig
Expand Down
175 changes: 174 additions & 1 deletion Tests/t_ldap_syncrepl.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@

import ldap
from ldap.ldapobject import SimpleLDAPObject
from ldap.syncrepl import SyncreplConsumer, SyncInfoMessage
from ldap.syncrepl import SyncreplConsumer, SyncInfoMessage, \
OpenLDAPSyncreplCookie

from slapdtest import SlapdObject, SlapdTestCase

Expand All @@ -37,6 +38,10 @@
olcModuleLoad: back_%(database)s
olcModuleLoad: syncprov

dn: olcDatabase=config,cn=config
objectClass: olcDatabaseConfig
olcRootDN: %(rootdn)s

dn: olcDatabase=%(database)s,cn=config
objectClass: olcDatabaseConfig
objectClass: olcMdbConfig
Expand Down Expand Up @@ -442,6 +447,174 @@ def setUp(self):
self.suffix = self.server.suffix


class TestMPRSyncrepl(BaseSyncreplTests, SlapdTestCase):
class MPRClient(SyncreplClient):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.cookie = OpenLDAPSyncreplCookie()

def syncrepl_set_cookie(self, cookie):
self.cookie.update(cookie)
super().syncrepl_set_cookie(self.cookie.unparse())

def setUp(self):
super().setUp()
self.tester = self.MPRClient(
self.server.ldap_uri,
self.server.root_dn,
self.server.root_pw,
bytes_mode=False
)
self.suffix = self.server.suffix

# An active MPR should not have a sid=000 server in it
if self.server.server_id == 0:
self.skip("Server got serverid 0 assigned")

def test_mpr_refresh_and_persist(self):
"""
Make sure we process cookie updates from a live MPR cluster correctly
"""
# Assumes that server_id is not used before the call to start()
self.server2 = self.server_class()
if self.server.server_id == self.server2.server_id:
self.server2.server_id += 1
if self.server2.server_id % 4096 == 0:
self.server2.server_id = 1

with self.server2 as server2:
tester2 = self.MPRClient(
self.server2.ldap_uri,
self.server2.root_dn,
self.server2.root_pw,
bytes_mode=False
)
self.addCleanup(tester2.unbind_s)

self.tester.search(
self.suffix,
'refreshAndPersist',
)

# Run a quick refresh, that shouldn't have any changes.
while self.tester.refresh_done is not True:
poll_result = self.tester.poll(
all=0,
timeout=None
)
self.assertTrue(poll_result)

# Again, server data should not have changed.
self.assertEqual(self.tester.dn_attrs, LDAP_ENTRIES)

# set up replication between both
coords = [(1, self.server.ldap_uri, self.suffix,
self.server.root_dn, self.server.root_pw),
(2, self.server2.ldap_uri, self.suffix,
self.server2.root_dn, self.server2.root_pw),
]
modifications = [
(ldap.MOD_ADD, "olcSyncrepl", [
('rid=%d provider=%s searchbase="%s" type=refreshAndPersist '
'bindmethod=simple binddn="%s" credentials="%s" '
'retry="1 +"' % coord).encode() for coord in coords]),
# do we still support 2.4.x? Change to olcMultiProvider if not
(ldap.MOD_REPLACE, "olcMirrorMode", [b"TRUE"]),
]

self.tester.modify_s(
"olcDatabase={1}%s,cn=config" % (self.server.database),
modifications)
tester2.modify_s(
"olcDatabase={1}%s,cn=config" % (self.server.database),
modifications)

tester2.search(
self.suffix,
'refreshAndPersist',
)

# Wait till server2 catches up
while tester2.refresh_done is not True or \
tester2.cookie.unparse() != self.tester.cookie.unparse():
try:
poll_result = tester2.poll(
all=0,
timeout=None
)
self.assertTrue(poll_result)
except ldap.NO_SUCH_OBJECT:
# 2.6+ Allows a refreshAndPersist against an empty DB, but
# with older ones we need to retry until there's at least
# one entry
tester2.search(
self.suffix,
'refreshAndPersist',
)

# Again, server data should not have changed.
self.assertEqual(tester2.dn_attrs, LDAP_ENTRIES)

# From here on, things get little hairy, server1 might not have
# finished its refresh from 2 and we can't easily confirm this
# without cn=monitor. We just read back our CSNs and make sure
# we've seen both.

# send some mods to both
modification = [('objectClass', [b'device'])]
self.tester.add_s("cn=server1,%s" % self.suffix, modification)

csn1 = self.tester.read_s("cn=server1,%s" % self.suffix,
attrlist=['entryCSN']
)['entryCSN'][0].decode('utf8')

tester2.add_s("cn=server2,%s" % self.suffix, modification)
csn2 = tester2.read_s("cn=server2,%s" % self.suffix,
attrlist=['entryCSN']
)['entryCSN'][0].decode('utf8')

new_state = LDAP_ENTRIES.copy()
new_state["cn=server1,%s" % self.suffix] = {
"objectClass": [b"device"],
"cn": [b"server1"],
}
new_state["cn=server2,%s" % self.suffix] = {
"objectClass": [b"device"],
"cn": [b"server2"],
}

# Wait for the cookie to sync up, a failure would be that this
# doesn't happen, so impose a timeout
while csn1 not in self.tester.cookie.unparse() or \
csn2 not in self.tester.cookie.unparse() or \
csn1 not in tester2.cookie.unparse() or \
csn2 not in tester2.cookie.unparse():
if csn1 not in self.tester.cookie.unparse() or \
csn2 not in self.tester.cookie.unparse():
poll_result = self.tester.poll(
all=0,
timeout=5
)
self.assertTrue(poll_result)
if csn1 not in tester2.cookie.unparse() or \
csn2 not in tester2.cookie.unparse():
poll_result = tester2.poll(
all=0,
timeout=5
)
self.assertTrue(poll_result)

self.assertEqual(self.tester.cookie.unparse(),
tester2.cookie.unparse())
self.assertEqual(self.tester.dn_attrs, new_state)
self.assertEqual(tester2.dn_attrs, new_state)

# self.tester seems to have been unbound by the time
# self.addCleanup callbacks get called? Cleanup manually...
self.tester.delete_s("cn=server1,%s" % self.suffix)
self.tester.delete_s("cn=server2,%s" % self.suffix)


class DecodeSyncreplProtoTests(unittest.TestCase):
"""
Tests of the ASN.1 decoder for tricky cases or past issues to ensure that
Expand Down
Loading