Skip to content

Commit 00ef5d5

Browse files
committed
Fix race conditions in synchronous standby management.
We have repeatedly seen the buildfarm reach the Assert(false) in SyncRepGetSyncStandbysPriority. This apparently is due to failing to consider the possibility that the sync_standby_priority values in shared memory might be inconsistent; but they will be whenever only some of the walsenders have updated their values after a change in the synchronous_standby_names setting. That function is vastly too complex for what it does, anyway, so rewriting it seems better than trying to apply a band-aid fix. Furthermore, the API of SyncRepGetSyncStandbys is broken by design: it returns a list of WalSnd array indexes, but there is nothing guaranteeing that the contents of the WalSnd array remain stable. Thus, if some walsender exits and then a new walsender process takes over that WalSnd array slot, a caller might make use of WAL position data that it should not, potentially leading to incorrect decisions about whether to release transactions that are waiting for synchronous commit. To fix, replace SyncRepGetSyncStandbys with a new function SyncRepGetCandidateStandbys that copies all the required data from shared memory while holding the relevant mutexes. If the associated walsender process then exits, this data is still safe to make release decisions with, since we know that that much WAL *was* sent to a valid standby server. This incidentally means that we no longer need to treat sync_standby_priority as protected by the SyncRepLock rather than the per-walsender mutex. SyncRepGetSyncStandbys is no longer used by the core code, so remove it entirely in HEAD. However, it seems possible that external code is relying on that function, so do not remove it from the back branches. Instead, just remove the known-incorrect Assert. When the bug occurs, the function will return a too-short list, which callers should treat as meaning there are not enough sync standbys, which seems like a reasonably safe fallback until the inconsistent state is resolved. Moreover it's bug-compatible with what has been happening in non-assert builds. We cannot do anything about the walsender-replacement race condition without an API/ABI break. The bogus assertion exists back to 9.6, but 9.6 is sufficiently different from the later branches that the patch doesn't apply at all. I chose to just remove the bogus assertion in 9.6, feeling that the probability of a bad outcome from the walsender-replacement race condition is too low to justify rewriting the whole patch for 9.6. Discussion: https://postgr.es/m/21519.1585272409@sss.pgh.pa.us
1 parent a375f11 commit 00ef5d5

File tree

4 files changed

+243
-66
lines changed

4 files changed

+243
-66
lines changed

src/backend/replication/syncrep.c

Lines changed: 191 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -108,14 +108,18 @@ static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr,
108108
static void SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
109109
XLogRecPtr *flushPtr,
110110
XLogRecPtr *applyPtr,
111-
List *sync_standbys);
111+
SyncRepStandbyData *sync_standbys,
112+
int num_standbys);
112113
static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr,
113114
XLogRecPtr *flushPtr,
114115
XLogRecPtr *applyPtr,
115-
List *sync_standbys, uint8 nth);
116+
SyncRepStandbyData *sync_standbys,
117+
int num_standbys,
118+
uint8 nth);
116119
static int SyncRepGetStandbyPriority(void);
117120
static List *SyncRepGetSyncStandbysPriority(bool *am_sync);
118121
static List *SyncRepGetSyncStandbysQuorum(bool *am_sync);
122+
static int standby_priority_comparator(const void *a, const void *b);
119123
static int cmp_lsn(const void *a, const void *b);
120124

121125
#ifdef USE_ASSERT_CHECKING
@@ -399,9 +403,10 @@ SyncRepInitConfig(void)
399403
priority = SyncRepGetStandbyPriority();
400404
if (MyWalSnd->sync_standby_priority != priority)
401405
{
402-
LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
406+
SpinLockAcquire(&MyWalSnd->mutex);
403407
MyWalSnd->sync_standby_priority = priority;
404-
LWLockRelease(SyncRepLock);
408+
SpinLockRelease(&MyWalSnd->mutex);
409+
405410
ereport(DEBUG1,
406411
(errmsg("standby \"%s\" now has synchronous standby priority %u",
407412
application_name, priority)));
@@ -452,7 +457,11 @@ SyncRepReleaseWaiters(void)
452457

453458
/*
454459
* Check whether we are a sync standby or not, and calculate the synced
455-
* positions among all sync standbys.
460+
* positions among all sync standbys. (Note: although this step does not
461+
* of itself require holding SyncRepLock, it seems like a good idea to do
462+
* it after acquiring the lock. This ensures that the WAL pointers we use
463+
* to release waiters are newer than any previous execution of this
464+
* routine used.)
456465
*/
457466
got_recptr = SyncRepGetSyncRecPtr(&writePtr, &flushPtr, &applyPtr, &am_sync);
458467

@@ -527,25 +536,41 @@ static bool
527536
SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
528537
XLogRecPtr *applyPtr, bool *am_sync)
529538
{
530-
List *sync_standbys;
539+
SyncRepStandbyData *sync_standbys;
540+
int num_standbys;
541+
int i;
531542

543+
/* Initialize default results */
532544
*writePtr = InvalidXLogRecPtr;
533545
*flushPtr = InvalidXLogRecPtr;
534546
*applyPtr = InvalidXLogRecPtr;
535547
*am_sync = false;
536548

549+
/* Quick out if not even configured to be synchronous */
550+
if (SyncRepConfig == NULL)
551+
return false;
552+
537553
/* Get standbys that are considered as synchronous at this moment */
538-
sync_standbys = SyncRepGetSyncStandbys(am_sync);
554+
num_standbys = SyncRepGetCandidateStandbys(&sync_standbys);
555+
556+
/* Am I among the candidate sync standbys? */
557+
for (i = 0; i < num_standbys; i++)
558+
{
559+
if (sync_standbys[i].is_me)
560+
{
561+
*am_sync = true;
562+
break;
563+
}
564+
}
539565

540566
/*
541-
* Quick exit if we are not managing a sync standby or there are not
542-
* enough synchronous standbys.
567+
* Nothing more to do if we are not managing a sync standby or there are
568+
* not enough synchronous standbys.
543569
*/
544570
if (!(*am_sync) ||
545-
SyncRepConfig == NULL ||
546-
list_length(sync_standbys) < SyncRepConfig->num_sync)
571+
num_standbys < SyncRepConfig->num_sync)
547572
{
548-
list_free(sync_standbys);
573+
pfree(sync_standbys);
549574
return false;
550575
}
551576

@@ -565,43 +590,41 @@ SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
565590
if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
566591
{
567592
SyncRepGetOldestSyncRecPtr(writePtr, flushPtr, applyPtr,
568-
sync_standbys);
593+
sync_standbys, num_standbys);
569594
}
570595
else
571596
{
572597
SyncRepGetNthLatestSyncRecPtr(writePtr, flushPtr, applyPtr,
573-
sync_standbys, SyncRepConfig->num_sync);
598+
sync_standbys, num_standbys,
599+
SyncRepConfig->num_sync);
574600
}
575601

576-
list_free(sync_standbys);
602+
pfree(sync_standbys);
577603
return true;
578604
}
579605

580606
/*
581607
* Calculate the oldest Write, Flush and Apply positions among sync standbys.
582608
*/
583609
static void
584-
SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
585-
XLogRecPtr *applyPtr, List *sync_standbys)
610+
SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
611+
XLogRecPtr *flushPtr,
612+
XLogRecPtr *applyPtr,
613+
SyncRepStandbyData *sync_standbys,
614+
int num_standbys)
586615
{
587-
ListCell *cell;
616+
int i;
588617

589618
/*
590619
* Scan through all sync standbys and calculate the oldest Write, Flush
591-
* and Apply positions.
620+
* and Apply positions. We assume *writePtr et al were initialized to
621+
* InvalidXLogRecPtr.
592622
*/
593-
foreach(cell, sync_standbys)
623+
for (i = 0; i < num_standbys; i++)
594624
{
595-
WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)];
596-
XLogRecPtr write;
597-
XLogRecPtr flush;
598-
XLogRecPtr apply;
599-
600-
SpinLockAcquire(&walsnd->mutex);
601-
write = walsnd->write;
602-
flush = walsnd->flush;
603-
apply = walsnd->apply;
604-
SpinLockRelease(&walsnd->mutex);
625+
XLogRecPtr write = sync_standbys[i].write;
626+
XLogRecPtr flush = sync_standbys[i].flush;
627+
XLogRecPtr apply = sync_standbys[i].apply;
605628

606629
if (XLogRecPtrIsInvalid(*writePtr) || *writePtr > write)
607630
*writePtr = write;
@@ -617,38 +640,36 @@ SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
617640
* standbys.
618641
*/
619642
static void
620-
SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
621-
XLogRecPtr *applyPtr, List *sync_standbys, uint8 nth)
643+
SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr,
644+
XLogRecPtr *flushPtr,
645+
XLogRecPtr *applyPtr,
646+
SyncRepStandbyData *sync_standbys,
647+
int num_standbys,
648+
uint8 nth)
622649
{
623-
ListCell *cell;
624650
XLogRecPtr *write_array;
625651
XLogRecPtr *flush_array;
626652
XLogRecPtr *apply_array;
627-
int len;
628-
int i = 0;
629-
630-
len = list_length(sync_standbys);
631-
write_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len);
632-
flush_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len);
633-
apply_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len);
653+
int i;
634654

635-
foreach(cell, sync_standbys)
636-
{
637-
WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)];
655+
/* Should have enough candidates, or somebody messed up */
656+
Assert(nth > 0 && nth <= num_standbys);
638657

639-
SpinLockAcquire(&walsnd->mutex);
640-
write_array[i] = walsnd->write;
641-
flush_array[i] = walsnd->flush;
642-
apply_array[i] = walsnd->apply;
643-
SpinLockRelease(&walsnd->mutex);
658+
write_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys);
659+
flush_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys);
660+
apply_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys);
644661

645-
i++;
662+
for (i = 0; i < num_standbys; i++)
663+
{
664+
write_array[i] = sync_standbys[i].write;
665+
flush_array[i] = sync_standbys[i].flush;
666+
apply_array[i] = sync_standbys[i].apply;
646667
}
647668

648669
/* Sort each array in descending order */
649-
qsort(write_array, len, sizeof(XLogRecPtr), cmp_lsn);
650-
qsort(flush_array, len, sizeof(XLogRecPtr), cmp_lsn);
651-
qsort(apply_array, len, sizeof(XLogRecPtr), cmp_lsn);
670+
qsort(write_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn);
671+
qsort(flush_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn);
672+
qsort(apply_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn);
652673

653674
/* Get Nth latest Write, Flush, Apply positions */
654675
*writePtr = write_array[nth - 1];
@@ -677,13 +698,122 @@ cmp_lsn(const void *a, const void *b)
677698
return 1;
678699
}
679700

701+
/*
702+
* Return data about walsenders that are candidates to be sync standbys.
703+
*
704+
* *standbys is set to a palloc'd array of structs of per-walsender data,
705+
* and the number of valid entries (candidate sync senders) is returned.
706+
* (This might be more or fewer than num_sync; caller must check.)
707+
*/
708+
int
709+
SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys)
710+
{
711+
int i;
712+
int n;
713+
714+
/* Create result array */
715+
*standbys = (SyncRepStandbyData *)
716+
palloc(max_wal_senders * sizeof(SyncRepStandbyData));
717+
718+
/* Quick exit if sync replication is not requested */
719+
if (SyncRepConfig == NULL)
720+
return 0;
721+
722+
/* Collect raw data from shared memory */
723+
n = 0;
724+
for (i = 0; i < max_wal_senders; i++)
725+
{
726+
volatile WalSnd *walsnd; /* Use volatile pointer to prevent code
727+
* rearrangement */
728+
SyncRepStandbyData *stby;
729+
WalSndState state; /* not included in SyncRepStandbyData */
730+
731+
walsnd = &WalSndCtl->walsnds[i];
732+
stby = *standbys + n;
733+
734+
SpinLockAcquire(&walsnd->mutex);
735+
stby->pid = walsnd->pid;
736+
state = walsnd->state;
737+
stby->write = walsnd->write;
738+
stby->flush = walsnd->flush;
739+
stby->apply = walsnd->apply;
740+
stby->sync_standby_priority = walsnd->sync_standby_priority;
741+
SpinLockRelease(&walsnd->mutex);
742+
743+
/* Must be active */
744+
if (stby->pid == 0)
745+
continue;
746+
747+
/* Must be streaming or stopping */
748+
if (state != WALSNDSTATE_STREAMING &&
749+
state != WALSNDSTATE_STOPPING)
750+
continue;
751+
752+
/* Must be synchronous */
753+
if (stby->sync_standby_priority == 0)
754+
continue;
755+
756+
/* Must have a valid flush position */
757+
if (XLogRecPtrIsInvalid(stby->flush))
758+
continue;
759+
760+
/* OK, it's a candidate */
761+
stby->walsnd_index = i;
762+
stby->is_me = (walsnd == MyWalSnd);
763+
n++;
764+
}
765+
766+
/*
767+
* In quorum mode, we return all the candidates. In priority mode, if we
768+
* have too many candidates then return only the num_sync ones of highest
769+
* priority.
770+
*/
771+
if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY &&
772+
n > SyncRepConfig->num_sync)
773+
{
774+
/* Sort by priority ... */
775+
qsort(*standbys, n, sizeof(SyncRepStandbyData),
776+
standby_priority_comparator);
777+
/* ... then report just the first num_sync ones */
778+
n = SyncRepConfig->num_sync;
779+
}
780+
781+
return n;
782+
}
783+
784+
/*
785+
* qsort comparator to sort SyncRepStandbyData entries by priority
786+
*/
787+
static int
788+
standby_priority_comparator(const void *a, const void *b)
789+
{
790+
const SyncRepStandbyData *sa = (const SyncRepStandbyData *) a;
791+
const SyncRepStandbyData *sb = (const SyncRepStandbyData *) b;
792+
793+
/* First, sort by increasing priority value */
794+
if (sa->sync_standby_priority != sb->sync_standby_priority)
795+
return sa->sync_standby_priority - sb->sync_standby_priority;
796+
797+
/*
798+
* We might have equal priority values; arbitrarily break ties by position
799+
* in the WALSnd array. (This is utterly bogus, since that is arrival
800+
* order dependent, but there are regression tests that rely on it.)
801+
*/
802+
return sa->walsnd_index - sb->walsnd_index;
803+
}
804+
805+
680806
/*
681807
* Return the list of sync standbys, or NIL if no sync standby is connected.
682808
*
683809
* The caller must hold SyncRepLock.
684810
*
685811
* On return, *am_sync is set to true if this walsender is connecting to
686812
* sync standby. Otherwise it's set to false.
813+
*
814+
* XXX This function is BROKEN and should not be used in new code. It has
815+
* an inherent race condition, since the returned list of integer indexes
816+
* might no longer correspond to reality.
687817
*/
688818
List *
689819
SyncRepGetSyncStandbys(bool *am_sync)
@@ -943,8 +1073,15 @@ SyncRepGetSyncStandbysPriority(bool *am_sync)
9431073
priority = next_highest_priority;
9441074
}
9451075

946-
/* never reached, but keep compiler quiet */
947-
Assert(false);
1076+
/*
1077+
* We might get here if the set of sync_standby_priority values in shared
1078+
* memory is inconsistent, as can happen transiently after a change in the
1079+
* synchronous_standby_names setting. In that case, just return the
1080+
* incomplete list we have so far. That will cause the caller to decide
1081+
* there aren't enough synchronous candidates, which should be a safe
1082+
* choice until the priority values become consistent again.
1083+
*/
1084+
list_free(pending);
9481085
return result;
9491086
}
9501087

0 commit comments

Comments
 (0)