@@ -108,14 +108,18 @@ static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr,
108
108
static void SyncRepGetOldestSyncRecPtr (XLogRecPtr * writePtr ,
109
109
XLogRecPtr * flushPtr ,
110
110
XLogRecPtr * applyPtr ,
111
- List * sync_standbys );
111
+ SyncRepStandbyData * sync_standbys ,
112
+ int num_standbys );
112
113
static void SyncRepGetNthLatestSyncRecPtr (XLogRecPtr * writePtr ,
113
114
XLogRecPtr * flushPtr ,
114
115
XLogRecPtr * applyPtr ,
115
- List * sync_standbys , uint8 nth );
116
+ SyncRepStandbyData * sync_standbys ,
117
+ int num_standbys ,
118
+ uint8 nth );
116
119
static int SyncRepGetStandbyPriority (void );
117
120
static List * SyncRepGetSyncStandbysPriority (bool * am_sync );
118
121
static List * SyncRepGetSyncStandbysQuorum (bool * am_sync );
122
+ static int standby_priority_comparator (const void * a , const void * b );
119
123
static int cmp_lsn (const void * a , const void * b );
120
124
121
125
#ifdef USE_ASSERT_CHECKING
@@ -399,9 +403,10 @@ SyncRepInitConfig(void)
399
403
priority = SyncRepGetStandbyPriority ();
400
404
if (MyWalSnd -> sync_standby_priority != priority )
401
405
{
402
- LWLockAcquire ( SyncRepLock , LW_EXCLUSIVE );
406
+ SpinLockAcquire ( & MyWalSnd -> mutex );
403
407
MyWalSnd -> sync_standby_priority = priority ;
404
- LWLockRelease (SyncRepLock );
408
+ SpinLockRelease (& MyWalSnd -> mutex );
409
+
405
410
ereport (DEBUG1 ,
406
411
(errmsg ("standby \"%s\" now has synchronous standby priority %u" ,
407
412
application_name , priority )));
@@ -452,7 +457,11 @@ SyncRepReleaseWaiters(void)
452
457
453
458
/*
454
459
* Check whether we are a sync standby or not, and calculate the synced
455
- * positions among all sync standbys.
460
+ * positions among all sync standbys. (Note: although this step does not
461
+ * of itself require holding SyncRepLock, it seems like a good idea to do
462
+ * it after acquiring the lock. This ensures that the WAL pointers we use
463
+ * to release waiters are newer than any previous execution of this
464
+ * routine used.)
456
465
*/
457
466
got_recptr = SyncRepGetSyncRecPtr (& writePtr , & flushPtr , & applyPtr , & am_sync );
458
467
@@ -527,25 +536,41 @@ static bool
527
536
SyncRepGetSyncRecPtr (XLogRecPtr * writePtr , XLogRecPtr * flushPtr ,
528
537
XLogRecPtr * applyPtr , bool * am_sync )
529
538
{
530
- List * sync_standbys ;
539
+ SyncRepStandbyData * sync_standbys ;
540
+ int num_standbys ;
541
+ int i ;
531
542
543
+ /* Initialize default results */
532
544
* writePtr = InvalidXLogRecPtr ;
533
545
* flushPtr = InvalidXLogRecPtr ;
534
546
* applyPtr = InvalidXLogRecPtr ;
535
547
* am_sync = false;
536
548
549
+ /* Quick out if not even configured to be synchronous */
550
+ if (SyncRepConfig == NULL )
551
+ return false;
552
+
537
553
/* Get standbys that are considered as synchronous at this moment */
538
- sync_standbys = SyncRepGetSyncStandbys (am_sync );
554
+ num_standbys = SyncRepGetCandidateStandbys (& sync_standbys );
555
+
556
+ /* Am I among the candidate sync standbys? */
557
+ for (i = 0 ; i < num_standbys ; i ++ )
558
+ {
559
+ if (sync_standbys [i ].is_me )
560
+ {
561
+ * am_sync = true;
562
+ break ;
563
+ }
564
+ }
539
565
540
566
/*
541
- * Quick exit if we are not managing a sync standby or there are not
542
- * enough synchronous standbys.
567
+ * Nothing more to do if we are not managing a sync standby or there are
568
+ * not enough synchronous standbys.
543
569
*/
544
570
if (!(* am_sync ) ||
545
- SyncRepConfig == NULL ||
546
- list_length (sync_standbys ) < SyncRepConfig -> num_sync )
571
+ num_standbys < SyncRepConfig -> num_sync )
547
572
{
548
- list_free (sync_standbys );
573
+ pfree (sync_standbys );
549
574
return false;
550
575
}
551
576
@@ -565,43 +590,41 @@ SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
565
590
if (SyncRepConfig -> syncrep_method == SYNC_REP_PRIORITY )
566
591
{
567
592
SyncRepGetOldestSyncRecPtr (writePtr , flushPtr , applyPtr ,
568
- sync_standbys );
593
+ sync_standbys , num_standbys );
569
594
}
570
595
else
571
596
{
572
597
SyncRepGetNthLatestSyncRecPtr (writePtr , flushPtr , applyPtr ,
573
- sync_standbys , SyncRepConfig -> num_sync );
598
+ sync_standbys , num_standbys ,
599
+ SyncRepConfig -> num_sync );
574
600
}
575
601
576
- list_free (sync_standbys );
602
+ pfree (sync_standbys );
577
603
return true;
578
604
}
579
605
580
606
/*
581
607
* Calculate the oldest Write, Flush and Apply positions among sync standbys.
582
608
*/
583
609
static void
584
- SyncRepGetOldestSyncRecPtr (XLogRecPtr * writePtr , XLogRecPtr * flushPtr ,
585
- XLogRecPtr * applyPtr , List * sync_standbys )
610
+ SyncRepGetOldestSyncRecPtr (XLogRecPtr * writePtr ,
611
+ XLogRecPtr * flushPtr ,
612
+ XLogRecPtr * applyPtr ,
613
+ SyncRepStandbyData * sync_standbys ,
614
+ int num_standbys )
586
615
{
587
- ListCell * cell ;
616
+ int i ;
588
617
589
618
/*
590
619
* Scan through all sync standbys and calculate the oldest Write, Flush
591
- * and Apply positions.
620
+ * and Apply positions. We assume *writePtr et al were initialized to
621
+ * InvalidXLogRecPtr.
592
622
*/
593
- foreach ( cell , sync_standbys )
623
+ for ( i = 0 ; i < num_standbys ; i ++ )
594
624
{
595
- WalSnd * walsnd = & WalSndCtl -> walsnds [lfirst_int (cell )];
596
- XLogRecPtr write ;
597
- XLogRecPtr flush ;
598
- XLogRecPtr apply ;
599
-
600
- SpinLockAcquire (& walsnd -> mutex );
601
- write = walsnd -> write ;
602
- flush = walsnd -> flush ;
603
- apply = walsnd -> apply ;
604
- SpinLockRelease (& walsnd -> mutex );
625
+ XLogRecPtr write = sync_standbys [i ].write ;
626
+ XLogRecPtr flush = sync_standbys [i ].flush ;
627
+ XLogRecPtr apply = sync_standbys [i ].apply ;
605
628
606
629
if (XLogRecPtrIsInvalid (* writePtr ) || * writePtr > write )
607
630
* writePtr = write ;
@@ -617,38 +640,36 @@ SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
617
640
* standbys.
618
641
*/
619
642
static void
620
- SyncRepGetNthLatestSyncRecPtr (XLogRecPtr * writePtr , XLogRecPtr * flushPtr ,
621
- XLogRecPtr * applyPtr , List * sync_standbys , uint8 nth )
643
+ SyncRepGetNthLatestSyncRecPtr (XLogRecPtr * writePtr ,
644
+ XLogRecPtr * flushPtr ,
645
+ XLogRecPtr * applyPtr ,
646
+ SyncRepStandbyData * sync_standbys ,
647
+ int num_standbys ,
648
+ uint8 nth )
622
649
{
623
- ListCell * cell ;
624
650
XLogRecPtr * write_array ;
625
651
XLogRecPtr * flush_array ;
626
652
XLogRecPtr * apply_array ;
627
- int len ;
628
- int i = 0 ;
629
-
630
- len = list_length (sync_standbys );
631
- write_array = (XLogRecPtr * ) palloc (sizeof (XLogRecPtr ) * len );
632
- flush_array = (XLogRecPtr * ) palloc (sizeof (XLogRecPtr ) * len );
633
- apply_array = (XLogRecPtr * ) palloc (sizeof (XLogRecPtr ) * len );
653
+ int i ;
634
654
635
- foreach (cell , sync_standbys )
636
- {
637
- WalSnd * walsnd = & WalSndCtl -> walsnds [lfirst_int (cell )];
655
+ /* Should have enough candidates, or somebody messed up */
656
+ Assert (nth > 0 && nth <= num_standbys );
638
657
639
- SpinLockAcquire (& walsnd -> mutex );
640
- write_array [i ] = walsnd -> write ;
641
- flush_array [i ] = walsnd -> flush ;
642
- apply_array [i ] = walsnd -> apply ;
643
- SpinLockRelease (& walsnd -> mutex );
658
+ write_array = (XLogRecPtr * ) palloc (sizeof (XLogRecPtr ) * num_standbys );
659
+ flush_array = (XLogRecPtr * ) palloc (sizeof (XLogRecPtr ) * num_standbys );
660
+ apply_array = (XLogRecPtr * ) palloc (sizeof (XLogRecPtr ) * num_standbys );
644
661
645
- i ++ ;
662
+ for (i = 0 ; i < num_standbys ; i ++ )
663
+ {
664
+ write_array [i ] = sync_standbys [i ].write ;
665
+ flush_array [i ] = sync_standbys [i ].flush ;
666
+ apply_array [i ] = sync_standbys [i ].apply ;
646
667
}
647
668
648
669
/* Sort each array in descending order */
649
- qsort (write_array , len , sizeof (XLogRecPtr ), cmp_lsn );
650
- qsort (flush_array , len , sizeof (XLogRecPtr ), cmp_lsn );
651
- qsort (apply_array , len , sizeof (XLogRecPtr ), cmp_lsn );
670
+ qsort (write_array , num_standbys , sizeof (XLogRecPtr ), cmp_lsn );
671
+ qsort (flush_array , num_standbys , sizeof (XLogRecPtr ), cmp_lsn );
672
+ qsort (apply_array , num_standbys , sizeof (XLogRecPtr ), cmp_lsn );
652
673
653
674
/* Get Nth latest Write, Flush, Apply positions */
654
675
* writePtr = write_array [nth - 1 ];
@@ -677,13 +698,122 @@ cmp_lsn(const void *a, const void *b)
677
698
return 1 ;
678
699
}
679
700
701
+ /*
702
+ * Return data about walsenders that are candidates to be sync standbys.
703
+ *
704
+ * *standbys is set to a palloc'd array of structs of per-walsender data,
705
+ * and the number of valid entries (candidate sync senders) is returned.
706
+ * (This might be more or fewer than num_sync; caller must check.)
707
+ */
708
+ int
709
+ SyncRepGetCandidateStandbys (SyncRepStandbyData * * standbys )
710
+ {
711
+ int i ;
712
+ int n ;
713
+
714
+ /* Create result array */
715
+ * standbys = (SyncRepStandbyData * )
716
+ palloc (max_wal_senders * sizeof (SyncRepStandbyData ));
717
+
718
+ /* Quick exit if sync replication is not requested */
719
+ if (SyncRepConfig == NULL )
720
+ return 0 ;
721
+
722
+ /* Collect raw data from shared memory */
723
+ n = 0 ;
724
+ for (i = 0 ; i < max_wal_senders ; i ++ )
725
+ {
726
+ volatile WalSnd * walsnd ; /* Use volatile pointer to prevent code
727
+ * rearrangement */
728
+ SyncRepStandbyData * stby ;
729
+ WalSndState state ; /* not included in SyncRepStandbyData */
730
+
731
+ walsnd = & WalSndCtl -> walsnds [i ];
732
+ stby = * standbys + n ;
733
+
734
+ SpinLockAcquire (& walsnd -> mutex );
735
+ stby -> pid = walsnd -> pid ;
736
+ state = walsnd -> state ;
737
+ stby -> write = walsnd -> write ;
738
+ stby -> flush = walsnd -> flush ;
739
+ stby -> apply = walsnd -> apply ;
740
+ stby -> sync_standby_priority = walsnd -> sync_standby_priority ;
741
+ SpinLockRelease (& walsnd -> mutex );
742
+
743
+ /* Must be active */
744
+ if (stby -> pid == 0 )
745
+ continue ;
746
+
747
+ /* Must be streaming or stopping */
748
+ if (state != WALSNDSTATE_STREAMING &&
749
+ state != WALSNDSTATE_STOPPING )
750
+ continue ;
751
+
752
+ /* Must be synchronous */
753
+ if (stby -> sync_standby_priority == 0 )
754
+ continue ;
755
+
756
+ /* Must have a valid flush position */
757
+ if (XLogRecPtrIsInvalid (stby -> flush ))
758
+ continue ;
759
+
760
+ /* OK, it's a candidate */
761
+ stby -> walsnd_index = i ;
762
+ stby -> is_me = (walsnd == MyWalSnd );
763
+ n ++ ;
764
+ }
765
+
766
+ /*
767
+ * In quorum mode, we return all the candidates. In priority mode, if we
768
+ * have too many candidates then return only the num_sync ones of highest
769
+ * priority.
770
+ */
771
+ if (SyncRepConfig -> syncrep_method == SYNC_REP_PRIORITY &&
772
+ n > SyncRepConfig -> num_sync )
773
+ {
774
+ /* Sort by priority ... */
775
+ qsort (* standbys , n , sizeof (SyncRepStandbyData ),
776
+ standby_priority_comparator );
777
+ /* ... then report just the first num_sync ones */
778
+ n = SyncRepConfig -> num_sync ;
779
+ }
780
+
781
+ return n ;
782
+ }
783
+
784
+ /*
785
+ * qsort comparator to sort SyncRepStandbyData entries by priority
786
+ */
787
+ static int
788
+ standby_priority_comparator (const void * a , const void * b )
789
+ {
790
+ const SyncRepStandbyData * sa = (const SyncRepStandbyData * ) a ;
791
+ const SyncRepStandbyData * sb = (const SyncRepStandbyData * ) b ;
792
+
793
+ /* First, sort by increasing priority value */
794
+ if (sa -> sync_standby_priority != sb -> sync_standby_priority )
795
+ return sa -> sync_standby_priority - sb -> sync_standby_priority ;
796
+
797
+ /*
798
+ * We might have equal priority values; arbitrarily break ties by position
799
+ * in the WALSnd array. (This is utterly bogus, since that is arrival
800
+ * order dependent, but there are regression tests that rely on it.)
801
+ */
802
+ return sa -> walsnd_index - sb -> walsnd_index ;
803
+ }
804
+
805
+
680
806
/*
681
807
* Return the list of sync standbys, or NIL if no sync standby is connected.
682
808
*
683
809
* The caller must hold SyncRepLock.
684
810
*
685
811
* On return, *am_sync is set to true if this walsender is connecting to
686
812
* sync standby. Otherwise it's set to false.
813
+ *
814
+ * XXX This function is BROKEN and should not be used in new code. It has
815
+ * an inherent race condition, since the returned list of integer indexes
816
+ * might no longer correspond to reality.
687
817
*/
688
818
List *
689
819
SyncRepGetSyncStandbys (bool * am_sync )
@@ -943,8 +1073,15 @@ SyncRepGetSyncStandbysPriority(bool *am_sync)
943
1073
priority = next_highest_priority ;
944
1074
}
945
1075
946
- /* never reached, but keep compiler quiet */
947
- Assert (false);
1076
+ /*
1077
+ * We might get here if the set of sync_standby_priority values in shared
1078
+ * memory is inconsistent, as can happen transiently after a change in the
1079
+ * synchronous_standby_names setting. In that case, just return the
1080
+ * incomplete list we have so far. That will cause the caller to decide
1081
+ * there aren't enough synchronous candidates, which should be a safe
1082
+ * choice until the priority values become consistent again.
1083
+ */
1084
+ list_free (pending );
948
1085
return result ;
949
1086
}
950
1087
0 commit comments