Skip to content

Commit b1332e9

Browse files
committed
Put the logic to decide which synchronous standby is active into a function.
This avoids duplicating the code. Michael Paquier, reviewed by Simon Riggs and me
1 parent 7afc233 commit b1332e9

File tree

3 files changed

+79
-65
lines changed

3 files changed

+79
-65
lines changed

src/backend/replication/syncrep.c

Lines changed: 60 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
* Synchronous replication is new as of PostgreSQL 9.1.
66
*
77
* If requested, transaction commits wait until their commit LSN is
8-
* acknowledged by the sync standby.
8+
* acknowledged by the synchronous standby.
99
*
1010
* This module contains the code for waiting and release of backends.
1111
* All code in this module executes on the primary. The core streaming
@@ -357,6 +357,60 @@ SyncRepInitConfig(void)
357357
}
358358
}
359359

360+
/*
361+
* Find the WAL sender servicing the synchronous standby with the lowest
362+
* priority value, or NULL if no synchronous standby is connected. If there
363+
* are multiple standbys with the same lowest priority value, the first one
364+
* found is selected. The caller must hold SyncRepLock.
365+
*/
366+
WalSnd *
367+
SyncRepGetSynchronousStandby(void)
368+
{
369+
WalSnd *result = NULL;
370+
int result_priority = 0;
371+
int i;
372+
373+
for (i = 0; i < max_wal_senders; i++)
374+
{
375+
/* Use volatile pointer to prevent code rearrangement */
376+
volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
377+
int this_priority;
378+
379+
/* Must be active */
380+
if (walsnd->pid == 0)
381+
continue;
382+
383+
/* Must be streaming */
384+
if (walsnd->state != WALSNDSTATE_STREAMING)
385+
continue;
386+
387+
/* Must be synchronous */
388+
this_priority = walsnd->sync_standby_priority;
389+
if (this_priority == 0)
390+
continue;
391+
392+
/* Must have a lower priority value than any previous ones */
393+
if (result != NULL && result_priority <= this_priority)
394+
continue;
395+
396+
/* Must have a valid flush position */
397+
if (XLogRecPtrIsInvalid(walsnd->flush))
398+
continue;
399+
400+
result = (WalSnd *) walsnd;
401+
result_priority = this_priority;
402+
403+
/*
404+
* If priority is equal to 1, there cannot be any other WAL senders
405+
* with a lower priority, so we're done.
406+
*/
407+
if (this_priority == 1)
408+
return result;
409+
}
410+
411+
return result;
412+
}
413+
360414
/*
361415
* Update the LSNs on each queue based upon our latest state. This
362416
* implements a simple policy of first-valid-standby-releases-waiter.
@@ -368,11 +422,9 @@ void
368422
SyncRepReleaseWaiters(void)
369423
{
370424
volatile WalSndCtlData *walsndctl = WalSndCtl;
371-
volatile WalSnd *syncWalSnd = NULL;
425+
WalSnd *syncWalSnd;
372426
int numwrite = 0;
373427
int numflush = 0;
374-
int priority = 0;
375-
int i;
376428

377429
/*
378430
* If this WALSender is serving a standby that is not on the list of
@@ -387,33 +439,13 @@ SyncRepReleaseWaiters(void)
387439

388440
/*
389441
* We're a potential sync standby. Release waiters if we are the highest
390-
* priority standby. If there are multiple standbys with same priorities
391-
* then we use the first mentioned standby. If you change this, also
392-
* change pg_stat_get_wal_senders().
442+
* priority standby.
393443
*/
394444
LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
445+
syncWalSnd = SyncRepGetSynchronousStandby();
395446

396-
for (i = 0; i < max_wal_senders; i++)
397-
{
398-
/* use volatile pointer to prevent code rearrangement */
399-
volatile WalSnd *walsnd = &walsndctl->walsnds[i];
400-
401-
if (walsnd->pid != 0 &&
402-
walsnd->state == WALSNDSTATE_STREAMING &&
403-
walsnd->sync_standby_priority > 0 &&
404-
(priority == 0 ||
405-
priority > walsnd->sync_standby_priority) &&
406-
!XLogRecPtrIsInvalid(walsnd->flush))
407-
{
408-
priority = walsnd->sync_standby_priority;
409-
syncWalSnd = walsnd;
410-
}
411-
}
412-
413-
/*
414-
* We should have found ourselves at least.
415-
*/
416-
Assert(syncWalSnd);
447+
/* We should have found ourselves at least */
448+
Assert(syncWalSnd != NULL);
417449

418450
/*
419451
* If we aren't managing the highest priority standby then just leave.

src/backend/replication/walsender.c

Lines changed: 15 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -2741,9 +2741,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
27412741
Tuplestorestate *tupstore;
27422742
MemoryContext per_query_ctx;
27432743
MemoryContext oldcontext;
2744-
int *sync_priority;
2745-
int priority = 0;
2746-
int sync_standby = -1;
2744+
WalSnd *sync_standby;
27472745
int i;
27482746

27492747
/* check to see if caller supports us returning a tuplestore */
@@ -2772,38 +2770,10 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
27722770
MemoryContextSwitchTo(oldcontext);
27732771

27742772
/*
2775-
* Get the priorities of sync standbys all in one go, to minimise lock
2776-
* acquisitions and to allow us to evaluate who is the current sync
2777-
* standby. This code must match the code in SyncRepReleaseWaiters().
2773+
* Get the currently active synchronous standby.
27782774
*/
2779-
sync_priority = palloc(sizeof(int) * max_wal_senders);
27802775
LWLockAcquire(SyncRepLock, LW_SHARED);
2781-
for (i = 0; i < max_wal_senders; i++)
2782-
{
2783-
/* use volatile pointer to prevent code rearrangement */
2784-
volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
2785-
2786-
if (walsnd->pid != 0)
2787-
{
2788-
/*
2789-
* Treat a standby such as a pg_basebackup background process
2790-
* which always returns an invalid flush location, as an
2791-
* asynchronous standby.
2792-
*/
2793-
sync_priority[i] = XLogRecPtrIsInvalid(walsnd->flush) ?
2794-
0 : walsnd->sync_standby_priority;
2795-
2796-
if (walsnd->state == WALSNDSTATE_STREAMING &&
2797-
walsnd->sync_standby_priority > 0 &&
2798-
(priority == 0 ||
2799-
priority > walsnd->sync_standby_priority) &&
2800-
!XLogRecPtrIsInvalid(walsnd->flush))
2801-
{
2802-
priority = walsnd->sync_standby_priority;
2803-
sync_standby = i;
2804-
}
2805-
}
2806-
}
2776+
sync_standby = SyncRepGetSynchronousStandby();
28072777
LWLockRelease(SyncRepLock);
28082778

28092779
for (i = 0; i < max_wal_senders; i++)
@@ -2814,6 +2784,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
28142784
XLogRecPtr write;
28152785
XLogRecPtr flush;
28162786
XLogRecPtr apply;
2787+
int priority;
28172788
WalSndState state;
28182789
Datum values[PG_STAT_GET_WAL_SENDERS_COLS];
28192790
bool nulls[PG_STAT_GET_WAL_SENDERS_COLS];
@@ -2827,6 +2798,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
28272798
write = walsnd->write;
28282799
flush = walsnd->flush;
28292800
apply = walsnd->apply;
2801+
priority = walsnd->sync_standby_priority;
28302802
SpinLockRelease(&walsnd->mutex);
28312803

28322804
memset(nulls, 0, sizeof(nulls));
@@ -2857,23 +2829,29 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
28572829
nulls[5] = true;
28582830
values[5] = LSNGetDatum(apply);
28592831

2860-
values[6] = Int32GetDatum(sync_priority[i]);
2832+
/*
2833+
* Treat a standby such as a pg_basebackup background process
2834+
* which always returns an invalid flush location, as an
2835+
* asynchronous standby.
2836+
*/
2837+
priority = XLogRecPtrIsInvalid(walsnd->flush) ? 0 : priority;
2838+
2839+
values[6] = Int32GetDatum(priority);
28612840

28622841
/*
28632842
* More easily understood version of standby state. This is purely
28642843
* informational, not different from priority.
28652844
*/
2866-
if (sync_priority[i] == 0)
2845+
if (priority == 0)
28672846
values[7] = CStringGetTextDatum("async");
2868-
else if (i == sync_standby)
2847+
else if (walsnd == sync_standby)
28692848
values[7] = CStringGetTextDatum("sync");
28702849
else
28712850
values[7] = CStringGetTextDatum("potential");
28722851
}
28732852

28742853
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
28752854
}
2876-
pfree(sync_priority);
28772855

28782856
/* clean up and return the tuplestore */
28792857
tuplestore_donestoring(tupstore);

src/include/replication/syncrep.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,10 @@ extern void SyncRepUpdateSyncStandbysDefined(void);
5050
/* called by various procs */
5151
extern int SyncRepWakeQueue(bool all, int mode);
5252

53+
/* forward declaration to avoid pulling in walsender_private.h */
54+
struct WalSnd;
55+
extern struct WalSnd *SyncRepGetSynchronousStandby(void);
56+
5357
extern bool check_synchronous_standby_names(char **newval, void **extra, GucSource source);
5458
extern void assign_synchronous_commit(int newval, void *extra);
5559

0 commit comments

Comments
 (0)