@@ -182,7 +182,7 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
182
182
else
183
183
mode = Min (SyncRepWaitMode , SYNC_REP_WAIT_FLUSH );
184
184
185
- Assert (SHMQueueIsDetached ( & ( MyProc -> syncRepLinks ) ));
185
+ Assert (dlist_node_is_detached ( & MyProc -> syncRepLinks ));
186
186
Assert (WalSndCtl != NULL );
187
187
188
188
LWLockAcquire (SyncRepLock , LW_EXCLUSIVE );
@@ -318,7 +318,7 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
318
318
* assertions, but better safe than sorry).
319
319
*/
320
320
pg_read_barrier ();
321
- Assert (SHMQueueIsDetached ( & ( MyProc -> syncRepLinks ) ));
321
+ Assert (dlist_node_is_detached ( & MyProc -> syncRepLinks ));
322
322
MyProc -> syncRepState = SYNC_REP_NOT_WAITING ;
323
323
MyProc -> waitLSN = 0 ;
324
324
@@ -339,31 +339,32 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
339
339
static void
340
340
SyncRepQueueInsert (int mode )
341
341
{
342
- PGPROC * proc ;
342
+ dlist_head * queue ;
343
+ dlist_iter iter ;
343
344
344
345
Assert (mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE );
345
- proc = (PGPROC * ) SHMQueuePrev (& (WalSndCtl -> SyncRepQueue [mode ]),
346
- & (WalSndCtl -> SyncRepQueue [mode ]),
347
- offsetof(PGPROC , syncRepLinks ));
346
+ queue = & WalSndCtl -> SyncRepQueue [mode ];
348
347
349
- while ( proc )
348
+ dlist_reverse_foreach ( iter , queue )
350
349
{
350
+ PGPROC * proc = dlist_container (PGPROC , syncRepLinks , iter .cur );
351
+
351
352
/*
352
- * Stop at the queue element that we should after to ensure the queue
353
- * is ordered by LSN.
353
+ * Stop at the queue element that we should insert after to ensure the
354
+ * queue is ordered by LSN.
354
355
*/
355
356
if (proc -> waitLSN < MyProc -> waitLSN )
356
- break ;
357
-
358
- proc = (PGPROC * ) SHMQueuePrev (& (WalSndCtl -> SyncRepQueue [mode ]),
359
- & (proc -> syncRepLinks ),
360
- offsetof(PGPROC , syncRepLinks ));
357
+ {
358
+ dlist_insert_after (& proc -> syncRepLinks , & MyProc -> syncRepLinks );
359
+ return ;
360
+ }
361
361
}
362
362
363
- if (proc )
364
- SHMQueueInsertAfter (& (proc -> syncRepLinks ), & (MyProc -> syncRepLinks ));
365
- else
366
- SHMQueueInsertAfter (& (WalSndCtl -> SyncRepQueue [mode ]), & (MyProc -> syncRepLinks ));
363
+ /*
364
+ * If we get here, the list was either empty, or this process needs to be
365
+ * at the head.
366
+ */
367
+ dlist_push_head (queue , & MyProc -> syncRepLinks );
367
368
}
368
369
369
370
/*
@@ -373,8 +374,8 @@ static void
373
374
SyncRepCancelWait (void )
374
375
{
375
376
LWLockAcquire (SyncRepLock , LW_EXCLUSIVE );
376
- if (!SHMQueueIsDetached ( & ( MyProc -> syncRepLinks ) ))
377
- SHMQueueDelete ( & ( MyProc -> syncRepLinks ) );
377
+ if (!dlist_node_is_detached ( & MyProc -> syncRepLinks ))
378
+ dlist_delete_thoroughly ( & MyProc -> syncRepLinks );
378
379
MyProc -> syncRepState = SYNC_REP_NOT_WAITING ;
379
380
LWLockRelease (SyncRepLock );
380
381
}
@@ -386,13 +387,13 @@ SyncRepCleanupAtProcExit(void)
386
387
* First check if we are removed from the queue without the lock to not
387
388
* slow down backend exit.
388
389
*/
389
- if (!SHMQueueIsDetached ( & ( MyProc -> syncRepLinks ) ))
390
+ if (!dlist_node_is_detached ( & MyProc -> syncRepLinks ))
390
391
{
391
392
LWLockAcquire (SyncRepLock , LW_EXCLUSIVE );
392
393
393
394
/* maybe we have just been removed, so recheck */
394
- if (!SHMQueueIsDetached ( & ( MyProc -> syncRepLinks ) ))
395
- SHMQueueDelete ( & ( MyProc -> syncRepLinks ) );
395
+ if (!dlist_node_is_detached ( & MyProc -> syncRepLinks ))
396
+ dlist_delete_thoroughly ( & MyProc -> syncRepLinks );
396
397
397
398
LWLockRelease (SyncRepLock );
398
399
}
@@ -879,39 +880,27 @@ static int
879
880
SyncRepWakeQueue (bool all , int mode )
880
881
{
881
882
volatile WalSndCtlData * walsndctl = WalSndCtl ;
882
- PGPROC * proc = NULL ;
883
- PGPROC * thisproc = NULL ;
884
883
int numprocs = 0 ;
884
+ dlist_mutable_iter iter ;
885
885
886
886
Assert (mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE );
887
887
Assert (LWLockHeldByMeInMode (SyncRepLock , LW_EXCLUSIVE ));
888
888
Assert (SyncRepQueueIsOrderedByLSN (mode ));
889
889
890
- proc = (PGPROC * ) SHMQueueNext (& (WalSndCtl -> SyncRepQueue [mode ]),
891
- & (WalSndCtl -> SyncRepQueue [mode ]),
892
- offsetof(PGPROC , syncRepLinks ));
893
-
894
- while (proc )
890
+ dlist_foreach_modify (iter , & WalSndCtl -> SyncRepQueue [mode ])
895
891
{
892
+ PGPROC * proc = dlist_container (PGPROC , syncRepLinks , iter .cur );
893
+
896
894
/*
897
895
* Assume the queue is ordered by LSN
898
896
*/
899
897
if (!all && walsndctl -> lsn [mode ] < proc -> waitLSN )
900
898
return numprocs ;
901
899
902
900
/*
903
- * Move to next proc, so we can delete thisproc from the queue.
904
- * thisproc is valid, proc may be NULL after this.
905
- */
906
- thisproc = proc ;
907
- proc = (PGPROC * ) SHMQueueNext (& (WalSndCtl -> SyncRepQueue [mode ]),
908
- & (proc -> syncRepLinks ),
909
- offsetof(PGPROC , syncRepLinks ));
910
-
911
- /*
912
- * Remove thisproc from queue.
901
+ * Remove from queue.
913
902
*/
914
- SHMQueueDelete ( & ( thisproc -> syncRepLinks ) );
903
+ dlist_delete_thoroughly ( & proc -> syncRepLinks );
915
904
916
905
/*
917
906
* SyncRepWaitForLSN() reads syncRepState without holding the lock, so
@@ -924,12 +913,12 @@ SyncRepWakeQueue(bool all, int mode)
924
913
* Set state to complete; see SyncRepWaitForLSN() for discussion of
925
914
* the various states.
926
915
*/
927
- thisproc -> syncRepState = SYNC_REP_WAIT_COMPLETE ;
916
+ proc -> syncRepState = SYNC_REP_WAIT_COMPLETE ;
928
917
929
918
/*
930
919
* Wake only when we have set state and removed from queue.
931
920
*/
932
- SetLatch (& (thisproc -> procLatch ));
921
+ SetLatch (& (proc -> procLatch ));
933
922
934
923
numprocs ++ ;
935
924
}
@@ -983,19 +972,17 @@ SyncRepUpdateSyncStandbysDefined(void)
983
972
static bool
984
973
SyncRepQueueIsOrderedByLSN (int mode )
985
974
{
986
- PGPROC * proc = NULL ;
987
975
XLogRecPtr lastLSN ;
976
+ dlist_iter iter ;
988
977
989
978
Assert (mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE );
990
979
991
980
lastLSN = 0 ;
992
981
993
- proc = (PGPROC * ) SHMQueueNext (& (WalSndCtl -> SyncRepQueue [mode ]),
994
- & (WalSndCtl -> SyncRepQueue [mode ]),
995
- offsetof(PGPROC , syncRepLinks ));
996
-
997
- while (proc )
982
+ dlist_foreach (iter , & WalSndCtl -> SyncRepQueue [mode ])
998
983
{
984
+ PGPROC * proc = dlist_container (PGPROC , syncRepLinks , iter .cur );
985
+
999
986
/*
1000
987
* Check the queue is ordered by LSN and that multiple procs don't
1001
988
* have matching LSNs
@@ -1004,10 +991,6 @@ SyncRepQueueIsOrderedByLSN(int mode)
1004
991
return false;
1005
992
1006
993
lastLSN = proc -> waitLSN ;
1007
-
1008
- proc = (PGPROC * ) SHMQueueNext (& (WalSndCtl -> SyncRepQueue [mode ]),
1009
- & (proc -> syncRepLinks ),
1010
- offsetof(PGPROC , syncRepLinks ));
1011
994
}
1012
995
1013
996
return true;
0 commit comments