Skip to content

Commit 1260541

Browse files
committed
Use dlists instead of SHM_QUEUE for syncrep queue
Part of a series to remove SHM_QUEUE. ilist.h style lists are more widely used and have an easier to use interface. Reviewed-by: Thomas Munro <thomas.munro@gmail.com> (in an older version) Discussion: https://postgr.es/m/20221120055930.t6kl3tyivzhlrzu2@awork3.anarazel.de Discussion: https://postgr.es/m/20200211042229.msv23badgqljrdg2@alap3.anarazel.de
1 parent 5764f61 commit 1260541

File tree

5 files changed

+41
-57
lines changed

5 files changed

+41
-57
lines changed

src/backend/replication/syncrep.c

Lines changed: 36 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
182182
else
183183
mode = Min(SyncRepWaitMode, SYNC_REP_WAIT_FLUSH);
184184

185-
Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
185+
Assert(dlist_node_is_detached(&MyProc->syncRepLinks));
186186
Assert(WalSndCtl != NULL);
187187

188188
LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
@@ -318,7 +318,7 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
318318
* assertions, but better safe than sorry).
319319
*/
320320
pg_read_barrier();
321-
Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
321+
Assert(dlist_node_is_detached(&MyProc->syncRepLinks));
322322
MyProc->syncRepState = SYNC_REP_NOT_WAITING;
323323
MyProc->waitLSN = 0;
324324

@@ -339,31 +339,32 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
339339
static void
340340
SyncRepQueueInsert(int mode)
341341
{
342-
PGPROC *proc;
342+
dlist_head *queue;
343+
dlist_iter iter;
343344

344345
Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
345-
proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]),
346-
&(WalSndCtl->SyncRepQueue[mode]),
347-
offsetof(PGPROC, syncRepLinks));
346+
queue = &WalSndCtl->SyncRepQueue[mode];
348347

349-
while (proc)
348+
dlist_reverse_foreach(iter, queue)
350349
{
350+
PGPROC *proc = dlist_container(PGPROC, syncRepLinks, iter.cur);
351+
351352
/*
352-
* Stop at the queue element that we should after to ensure the queue
353-
* is ordered by LSN.
353+
* Stop at the queue element that we should insert after to ensure the
354+
* queue is ordered by LSN.
354355
*/
355356
if (proc->waitLSN < MyProc->waitLSN)
356-
break;
357-
358-
proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]),
359-
&(proc->syncRepLinks),
360-
offsetof(PGPROC, syncRepLinks));
357+
{
358+
dlist_insert_after(&proc->syncRepLinks, &MyProc->syncRepLinks);
359+
return;
360+
}
361361
}
362362

363-
if (proc)
364-
SHMQueueInsertAfter(&(proc->syncRepLinks), &(MyProc->syncRepLinks));
365-
else
366-
SHMQueueInsertAfter(&(WalSndCtl->SyncRepQueue[mode]), &(MyProc->syncRepLinks));
363+
/*
364+
* If we get here, the list was either empty, or this process needs to be
365+
* at the head.
366+
*/
367+
dlist_push_head(queue, &MyProc->syncRepLinks);
367368
}
368369

369370
/*
@@ -373,8 +374,8 @@ static void
373374
SyncRepCancelWait(void)
374375
{
375376
LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
376-
if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
377-
SHMQueueDelete(&(MyProc->syncRepLinks));
377+
if (!dlist_node_is_detached(&MyProc->syncRepLinks))
378+
dlist_delete_thoroughly(&MyProc->syncRepLinks);
378379
MyProc->syncRepState = SYNC_REP_NOT_WAITING;
379380
LWLockRelease(SyncRepLock);
380381
}
@@ -386,13 +387,13 @@ SyncRepCleanupAtProcExit(void)
386387
* First check if we are removed from the queue without the lock to not
387388
* slow down backend exit.
388389
*/
389-
if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
390+
if (!dlist_node_is_detached(&MyProc->syncRepLinks))
390391
{
391392
LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
392393

393394
/* maybe we have just been removed, so recheck */
394-
if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
395-
SHMQueueDelete(&(MyProc->syncRepLinks));
395+
if (!dlist_node_is_detached(&MyProc->syncRepLinks))
396+
dlist_delete_thoroughly(&MyProc->syncRepLinks);
396397

397398
LWLockRelease(SyncRepLock);
398399
}
@@ -879,39 +880,27 @@ static int
879880
SyncRepWakeQueue(bool all, int mode)
880881
{
881882
volatile WalSndCtlData *walsndctl = WalSndCtl;
882-
PGPROC *proc = NULL;
883-
PGPROC *thisproc = NULL;
884883
int numprocs = 0;
884+
dlist_mutable_iter iter;
885885

886886
Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
887887
Assert(LWLockHeldByMeInMode(SyncRepLock, LW_EXCLUSIVE));
888888
Assert(SyncRepQueueIsOrderedByLSN(mode));
889889

890-
proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
891-
&(WalSndCtl->SyncRepQueue[mode]),
892-
offsetof(PGPROC, syncRepLinks));
893-
894-
while (proc)
890+
dlist_foreach_modify(iter, &WalSndCtl->SyncRepQueue[mode])
895891
{
892+
PGPROC *proc = dlist_container(PGPROC, syncRepLinks, iter.cur);
893+
896894
/*
897895
* Assume the queue is ordered by LSN
898896
*/
899897
if (!all && walsndctl->lsn[mode] < proc->waitLSN)
900898
return numprocs;
901899

902900
/*
903-
* Move to next proc, so we can delete thisproc from the queue.
904-
* thisproc is valid, proc may be NULL after this.
905-
*/
906-
thisproc = proc;
907-
proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
908-
&(proc->syncRepLinks),
909-
offsetof(PGPROC, syncRepLinks));
910-
911-
/*
912-
* Remove thisproc from queue.
901+
* Remove from queue.
913902
*/
914-
SHMQueueDelete(&(thisproc->syncRepLinks));
903+
dlist_delete_thoroughly(&proc->syncRepLinks);
915904

916905
/*
917906
* SyncRepWaitForLSN() reads syncRepState without holding the lock, so
@@ -924,12 +913,12 @@ SyncRepWakeQueue(bool all, int mode)
924913
* Set state to complete; see SyncRepWaitForLSN() for discussion of
925914
* the various states.
926915
*/
927-
thisproc->syncRepState = SYNC_REP_WAIT_COMPLETE;
916+
proc->syncRepState = SYNC_REP_WAIT_COMPLETE;
928917

929918
/*
930919
* Wake only when we have set state and removed from queue.
931920
*/
932-
SetLatch(&(thisproc->procLatch));
921+
SetLatch(&(proc->procLatch));
933922

934923
numprocs++;
935924
}
@@ -983,19 +972,17 @@ SyncRepUpdateSyncStandbysDefined(void)
983972
static bool
984973
SyncRepQueueIsOrderedByLSN(int mode)
985974
{
986-
PGPROC *proc = NULL;
987975
XLogRecPtr lastLSN;
976+
dlist_iter iter;
988977

989978
Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
990979

991980
lastLSN = 0;
992981

993-
proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
994-
&(WalSndCtl->SyncRepQueue[mode]),
995-
offsetof(PGPROC, syncRepLinks));
996-
997-
while (proc)
982+
dlist_foreach(iter, &WalSndCtl->SyncRepQueue[mode])
998983
{
984+
PGPROC *proc = dlist_container(PGPROC, syncRepLinks, iter.cur);
985+
999986
/*
1000987
* Check the queue is ordered by LSN and that multiple procs don't
1001988
* have matching LSNs
@@ -1004,10 +991,6 @@ SyncRepQueueIsOrderedByLSN(int mode)
1004991
return false;
1005992

1006993
lastLSN = proc->waitLSN;
1007-
1008-
proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
1009-
&(proc->syncRepLinks),
1010-
offsetof(PGPROC, syncRepLinks));
1011994
}
1012995

1013996
return true;

src/backend/replication/walsender.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3275,7 +3275,7 @@ WalSndShmemInit(void)
32753275
MemSet(WalSndCtl, 0, WalSndShmemSize());
32763276

32773277
for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
3278-
SHMQueueInit(&(WalSndCtl->SyncRepQueue[i]));
3278+
dlist_init(&(WalSndCtl->SyncRepQueue[i]));
32793279

32803280
for (i = 0; i < max_wal_senders; i++)
32813281
{

src/backend/storage/lmgr/proc.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -410,7 +410,7 @@ InitProcess(void)
410410
/* Initialize fields for sync rep */
411411
MyProc->waitLSN = 0;
412412
MyProc->syncRepState = SYNC_REP_NOT_WAITING;
413-
SHMQueueElemInit(&(MyProc->syncRepLinks));
413+
dlist_node_init(&MyProc->syncRepLinks);
414414

415415
/* Initialize fields for group XID clearing. */
416416
MyProc->procArrayGroupMember = false;

src/include/replication/walsender_private.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
#define _WALSENDER_PRIVATE_H
1414

1515
#include "access/xlog.h"
16+
#include "lib/ilist.h"
1617
#include "nodes/nodes.h"
1718
#include "replication/syncrep.h"
1819
#include "storage/latch.h"
@@ -89,7 +90,7 @@ typedef struct
8990
* Synchronous replication queue with one queue per request type.
9091
* Protected by SyncRepLock.
9192
*/
92-
SHM_QUEUE SyncRepQueue[NUM_SYNC_REP_WAIT_MODE];
93+
dlist_head SyncRepQueue[NUM_SYNC_REP_WAIT_MODE];
9394

9495
/*
9596
* Current location of the head of the queue. All waiters should have a

src/include/storage/proc.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ struct PGPROC
248248
*/
249249
XLogRecPtr waitLSN; /* waiting for this LSN or higher */
250250
int syncRepState; /* wait state for sync rep */
251-
SHM_QUEUE syncRepLinks; /* list link if process is in syncrep queue */
251+
dlist_node syncRepLinks; /* list link if process is in syncrep queue */
252252

253253
/*
254254
* All PROCLOCK objects for locks held or awaited by this backend are

0 commit comments

Comments
 (0)