@@ -78,13 +78,21 @@ static SlruCtlData CommitTsCtlData;
78
78
#define CommitTsCtl (&CommitTsCtlData)
79
79
80
80
/*
81
- * We keep a cache of the last value set in shared memory. This is protected
82
- * by CommitTsLock.
81
+ * We keep a cache of the last value set in shared memory.
82
+ *
83
+ * This is also good place to keep the activation status. We keep this
84
+ * separate from the GUC so that the standby can activate the module if the
85
+ * primary has it active independently of the value of the GUC.
86
+ *
87
+ * This is protected by CommitTsLock. In some places, we use commitTsActive
88
+ * without acquiring the lock; where this happens, a comment explains the
89
+ * rationale for it.
83
90
*/
84
91
typedef struct CommitTimestampShared
85
92
{
86
93
TransactionId xidLastCommit ;
87
94
CommitTimestampEntry dataLastCommit ;
95
+ bool commitTsActive ;
88
96
} CommitTimestampShared ;
89
97
90
98
CommitTimestampShared * commitTsShared ;
@@ -93,14 +101,6 @@ CommitTimestampShared *commitTsShared;
93
101
/* GUC variable */
94
102
bool track_commit_timestamp ;
95
103
96
- /*
97
- * When this is set, commit_ts is force-enabled during recovery. This is so
98
- * that a standby can replay WAL records coming from a master with the setting
99
- * enabled. (Note that this doesn't enable SQL access to the data; it's
100
- * effectively write-only until the GUC itself is enabled.)
101
- */
102
- static bool enable_during_recovery ;
103
-
104
104
static void SetXidCommitTsInPage (TransactionId xid , int nsubxids ,
105
105
TransactionId * subxids , TimestampTz ts ,
106
106
RepOriginId nodeid , int pageno );
@@ -109,7 +109,7 @@ static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
109
109
static int ZeroCommitTsPage (int pageno , bool writeXlog );
110
110
static bool CommitTsPagePrecedes (int page1 , int page2 );
111
111
static void ActivateCommitTs (void );
112
- static void DeactivateCommitTs (bool do_wal );
112
+ static void DeactivateCommitTs (void );
113
113
static void WriteZeroPageXlogRec (int pageno );
114
114
static void WriteTruncateXlogRec (int pageno );
115
115
static void WriteSetTimestampXlogRec (TransactionId mainxid , int nsubxids ,
@@ -149,10 +149,14 @@ TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids,
149
149
TransactionId newestXact ;
150
150
151
151
/*
152
- * No-op if the module is not enabled, but allow writes in a standby
153
- * during recovery.
152
+ * No-op if the module is not active.
153
+ *
154
+ * An unlocked read here is fine, because in a standby (the only place
155
+ * where the flag can change in flight) this routine is only called by
156
+ * the recovery process, which is also the only process which can change
157
+ * the flag.
154
158
*/
155
- if (!track_commit_timestamp && ! enable_during_recovery )
159
+ if (!commitTsShared -> commitTsActive )
156
160
return ;
157
161
158
162
/*
@@ -283,30 +287,45 @@ TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts,
283
287
TransactionId oldestCommitTs ;
284
288
TransactionId newestCommitTs ;
285
289
290
+ /* error if the given Xid doesn't normally commit */
291
+ if (!TransactionIdIsNormal (xid ))
292
+ ereport (ERROR ,
293
+ (errcode (ERRCODE_INVALID_PARAMETER_VALUE ),
294
+ errmsg ("cannot retrieve commit timestamp for transaction %u" , xid )));
295
+
296
+ LWLockAcquire (CommitTsLock , LW_SHARED );
297
+
286
298
/* Error if module not enabled */
287
- if (!track_commit_timestamp )
299
+ if (!commitTsShared -> commitTsActive )
288
300
ereport (ERROR ,
289
301
(errcode (ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ),
290
302
errmsg ("could not get commit timestamp data" ),
291
303
errhint ("Make sure the configuration parameter \"%s\" is set." ,
292
304
"track_commit_timestamp" )));
293
305
294
- /* error if the given Xid doesn't normally commit */
295
- if (!TransactionIdIsNormal (xid ))
296
- ereport (ERROR ,
297
- (errcode (ERRCODE_INVALID_PARAMETER_VALUE ),
298
- errmsg ("cannot retrieve commit timestamp for transaction %u" , xid )));
299
-
300
306
/*
301
- * Return empty if the requested value is outside our valid range.
307
+ * If we're asked for the cached value, return that. Otherwise, fall
308
+ * through to read from SLRU.
302
309
*/
303
- LWLockAcquire (CommitTsLock , LW_SHARED );
310
+ if (commitTsShared -> xidLastCommit == xid )
311
+ {
312
+ * ts = commitTsShared -> dataLastCommit .time ;
313
+ if (nodeid )
314
+ * nodeid = commitTsShared -> dataLastCommit .nodeid ;
315
+
316
+ LWLockRelease (CommitTsLock );
317
+ return * ts != 0 ;
318
+ }
319
+
304
320
oldestCommitTs = ShmemVariableCache -> oldestCommitTs ;
305
321
newestCommitTs = ShmemVariableCache -> newestCommitTs ;
306
322
/* neither is invalid, or both are */
307
323
Assert (TransactionIdIsValid (oldestCommitTs ) == TransactionIdIsValid (newestCommitTs ));
308
324
LWLockRelease (CommitTsLock );
309
325
326
+ /*
327
+ * Return empty if the requested value is outside our valid range.
328
+ */
310
329
if (!TransactionIdIsValid (oldestCommitTs ) ||
311
330
TransactionIdPrecedes (xid , oldestCommitTs ) ||
312
331
TransactionIdPrecedes (newestCommitTs , xid ))
@@ -317,27 +336,6 @@ TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts,
317
336
return false;
318
337
}
319
338
320
- /*
321
- * Use an unlocked atomic read on our cached value in shared memory; if
322
- * it's a hit, acquire a lock and read the data, after verifying that it's
323
- * still what we initially read. Otherwise, fall through to read from
324
- * SLRU.
325
- */
326
- if (commitTsShared -> xidLastCommit == xid )
327
- {
328
- LWLockAcquire (CommitTsLock , LW_SHARED );
329
- if (commitTsShared -> xidLastCommit == xid )
330
- {
331
- * ts = commitTsShared -> dataLastCommit .time ;
332
- if (nodeid )
333
- * nodeid = commitTsShared -> dataLastCommit .nodeid ;
334
-
335
- LWLockRelease (CommitTsLock );
336
- return * ts != 0 ;
337
- }
338
- LWLockRelease (CommitTsLock );
339
- }
340
-
341
339
/* lock is acquired by SimpleLruReadPage_ReadOnly */
342
340
slotno = SimpleLruReadPage_ReadOnly (CommitTsCtl , pageno , xid );
343
341
memcpy (& entry ,
@@ -366,15 +364,16 @@ GetLatestCommitTsData(TimestampTz *ts, RepOriginId *nodeid)
366
364
{
367
365
TransactionId xid ;
368
366
367
+ LWLockAcquire (CommitTsLock , LW_SHARED );
368
+
369
369
/* Error if module not enabled */
370
- if (!track_commit_timestamp )
370
+ if (!commitTsShared -> commitTsActive )
371
371
ereport (ERROR ,
372
372
(errcode (ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ),
373
373
errmsg ("could not get commit timestamp data" ),
374
374
errhint ("Make sure the configuration parameter \"%s\" is set." ,
375
375
"track_commit_timestamp" )));
376
376
377
- LWLockAcquire (CommitTsLock , LW_SHARED );
378
377
xid = commitTsShared -> xidLastCommit ;
379
378
if (ts )
380
379
* ts = commitTsShared -> dataLastCommit .time ;
@@ -493,6 +492,7 @@ CommitTsShmemInit(void)
493
492
commitTsShared -> xidLastCommit = InvalidTransactionId ;
494
493
TIMESTAMP_NOBEGIN (commitTsShared -> dataLastCommit .time );
495
494
commitTsShared -> dataLastCommit .nodeid = InvalidRepOriginId ;
495
+ commitTsShared -> commitTsActive = false;
496
496
}
497
497
else
498
498
Assert (found );
@@ -566,7 +566,7 @@ CompleteCommitTsInitialization(void)
566
566
* any leftover data.
567
567
*/
568
568
if (!track_commit_timestamp )
569
- DeactivateCommitTs (true );
569
+ DeactivateCommitTs ();
570
570
}
571
571
572
572
/*
@@ -588,11 +588,11 @@ CommitTsParameterChange(bool newvalue, bool oldvalue)
588
588
*/
589
589
if (newvalue )
590
590
{
591
- if (!track_commit_timestamp && ! oldvalue )
591
+ if (!commitTsShared -> commitTsActive )
592
592
ActivateCommitTs ();
593
593
}
594
- else if (! track_commit_timestamp && oldvalue )
595
- DeactivateCommitTs (false );
594
+ else if (commitTsShared -> commitTsActive )
595
+ DeactivateCommitTs ();
596
596
}
597
597
598
598
/*
@@ -645,7 +645,7 @@ ActivateCommitTs(void)
645
645
}
646
646
LWLockRelease (CommitTsLock );
647
647
648
- /* Finally, create the current segment file, if necessary */
648
+ /* Create the current segment file, if necessary */
649
649
if (!SimpleLruDoesPhysicalPageExist (CommitTsCtl , pageno ))
650
650
{
651
651
int slotno ;
@@ -657,8 +657,10 @@ ActivateCommitTs(void)
657
657
LWLockRelease (CommitTsControlLock );
658
658
}
659
659
660
- /* We can now replay xlog records from this module */
661
- enable_during_recovery = true;
660
+ /* Change the activation status in shared memory. */
661
+ LWLockAcquire (CommitTsLock , LW_EXCLUSIVE );
662
+ commitTsShared -> commitTsActive = true;
663
+ LWLockRelease (CommitTsLock );
662
664
}
663
665
664
666
/*
@@ -672,21 +674,25 @@ ActivateCommitTs(void)
672
674
* possibly-invalid data; also removes segments of old data.
673
675
*/
674
676
static void
675
- DeactivateCommitTs (bool do_wal )
677
+ DeactivateCommitTs (void )
676
678
{
677
- TransactionId xid = ShmemVariableCache -> nextXid ;
678
- int pageno = TransactionIdToCTsPage (xid );
679
-
680
679
/*
681
- * Re-Initialize our idea of the latest page number.
680
+ * Cleanup the status in the shared memory.
681
+ *
682
+ * We reset everything in the commitTsShared record to prevent user from
683
+ * getting confusing data about last committed transaction on the standby
684
+ * when the module was activated repeatedly on the primary.
682
685
*/
683
- LWLockAcquire (CommitTsControlLock , LW_EXCLUSIVE );
684
- CommitTsCtl -> shared -> latest_page_number = pageno ;
685
- LWLockRelease (CommitTsControlLock );
686
-
687
686
LWLockAcquire (CommitTsLock , LW_EXCLUSIVE );
687
+
688
+ commitTsShared -> commitTsActive = false;
689
+ commitTsShared -> xidLastCommit = InvalidTransactionId ;
690
+ TIMESTAMP_NOBEGIN (commitTsShared -> dataLastCommit .time );
691
+ commitTsShared -> dataLastCommit .nodeid = InvalidRepOriginId ;
692
+
688
693
ShmemVariableCache -> oldestCommitTs = InvalidTransactionId ;
689
694
ShmemVariableCache -> newestCommitTs = InvalidTransactionId ;
695
+
690
696
LWLockRelease (CommitTsLock );
691
697
692
698
/*
@@ -697,10 +703,9 @@ DeactivateCommitTs(bool do_wal)
697
703
* be overwritten anyway when we wrap around, but it seems better to be
698
704
* tidy.)
699
705
*/
706
+ LWLockAcquire (CommitTsControlLock , LW_EXCLUSIVE );
700
707
(void ) SlruScanDirectory (CommitTsCtl , SlruScanDirCbDeleteAll , NULL );
701
-
702
- /* No longer enabled on recovery */
703
- enable_during_recovery = false;
708
+ LWLockRelease (CommitTsControlLock );
704
709
}
705
710
706
711
/*
@@ -739,8 +744,13 @@ ExtendCommitTs(TransactionId newestXact)
739
744
{
740
745
int pageno ;
741
746
742
- /* nothing to do if module not enabled */
743
- if (!track_commit_timestamp && !enable_during_recovery )
747
+ /*
748
+ * Nothing to do if module not enabled. Note we do an unlocked read of the
749
+ * flag here, which is okay because this routine is only called from
750
+ * GetNewTransactionId, which is never called in a standby.
751
+ */
752
+ Assert (!InRecovery );
753
+ if (!commitTsShared -> commitTsActive )
744
754
return ;
745
755
746
756
/*
@@ -768,7 +778,7 @@ ExtendCommitTs(TransactionId newestXact)
768
778
* Note that we don't need to flush XLOG here.
769
779
*/
770
780
void
771
- TruncateCommitTs (TransactionId oldestXact , bool do_wal )
781
+ TruncateCommitTs (TransactionId oldestXact )
772
782
{
773
783
int cutoffPage ;
774
784
@@ -784,8 +794,7 @@ TruncateCommitTs(TransactionId oldestXact, bool do_wal)
784
794
return ; /* nothing to remove */
785
795
786
796
/* Write XLOG record */
787
- if (do_wal )
788
- WriteTruncateXlogRec (cutoffPage );
797
+ WriteTruncateXlogRec (cutoffPage );
789
798
790
799
/* Now we can remove the old CommitTs segment(s) */
791
800
SimpleLruTruncate (CommitTsCtl , cutoffPage );
0 commit comments