@@ -1763,21 +1763,33 @@ ProcessStandbyReplyMessage(void)
1763
1763
applyLag ;
1764
1764
bool clearLagTimes ;
1765
1765
TimestampTz now ;
1766
+ TimestampTz replyTime ;
1766
1767
1767
1768
static bool fullyAppliedLastTime = false;
1768
1769
1769
1770
/* the caller already consumed the msgtype byte */
1770
1771
writePtr = pq_getmsgint64 (& reply_message );
1771
1772
flushPtr = pq_getmsgint64 (& reply_message );
1772
1773
applyPtr = pq_getmsgint64 (& reply_message );
1773
- ( void ) pq_getmsgint64 (& reply_message ); /* sendTime; not used ATM */
1774
+ replyTime = pq_getmsgint64 (& reply_message );
1774
1775
replyRequested = pq_getmsgbyte (& reply_message );
1775
1776
1776
- elog (DEBUG2 , "write %X/%X flush %X/%X apply %X/%X%s" ,
1777
- (uint32 ) (writePtr >> 32 ), (uint32 ) writePtr ,
1778
- (uint32 ) (flushPtr >> 32 ), (uint32 ) flushPtr ,
1779
- (uint32 ) (applyPtr >> 32 ), (uint32 ) applyPtr ,
1780
- replyRequested ? " (reply requested)" : "" );
1777
+ if (log_min_messages <= DEBUG2 )
1778
+ {
1779
+ char * replyTimeStr ;
1780
+
1781
+ /* Copy because timestamptz_to_str returns a static buffer */
1782
+ replyTimeStr = pstrdup (timestamptz_to_str (replyTime ));
1783
+
1784
+ elog (DEBUG2 , "write %X/%X flush %X/%X apply %X/%X%s reply_time %s" ,
1785
+ (uint32 ) (writePtr >> 32 ), (uint32 ) writePtr ,
1786
+ (uint32 ) (flushPtr >> 32 ), (uint32 ) flushPtr ,
1787
+ (uint32 ) (applyPtr >> 32 ), (uint32 ) applyPtr ,
1788
+ replyRequested ? " (reply requested)" : "" ,
1789
+ replyTimeStr );
1790
+
1791
+ pfree (replyTimeStr );
1792
+ }
1781
1793
1782
1794
/* See if we can compute the round-trip lag for these positions. */
1783
1795
now = GetCurrentTimestamp ();
@@ -1824,6 +1836,7 @@ ProcessStandbyReplyMessage(void)
1824
1836
walsnd -> flushLag = flushLag ;
1825
1837
if (applyLag != -1 || clearLagTimes )
1826
1838
walsnd -> applyLag = applyLag ;
1839
+ walsnd -> replyTime = replyTime ;
1827
1840
SpinLockRelease (& walsnd -> mutex );
1828
1841
}
1829
1842
@@ -1927,23 +1940,47 @@ ProcessStandbyHSFeedbackMessage(void)
1927
1940
uint32 feedbackEpoch ;
1928
1941
TransactionId feedbackCatalogXmin ;
1929
1942
uint32 feedbackCatalogEpoch ;
1943
+ TimestampTz replyTime ;
1930
1944
1931
1945
/*
1932
1946
* Decipher the reply message. The caller already consumed the msgtype
1933
1947
* byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
1934
1948
* of this message.
1935
1949
*/
1936
- ( void ) pq_getmsgint64 (& reply_message ); /* sendTime; not used ATM */
1950
+ replyTime = pq_getmsgint64 (& reply_message );
1937
1951
feedbackXmin = pq_getmsgint (& reply_message , 4 );
1938
1952
feedbackEpoch = pq_getmsgint (& reply_message , 4 );
1939
1953
feedbackCatalogXmin = pq_getmsgint (& reply_message , 4 );
1940
1954
feedbackCatalogEpoch = pq_getmsgint (& reply_message , 4 );
1941
1955
1942
- elog (DEBUG2 , "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u" ,
1943
- feedbackXmin ,
1944
- feedbackEpoch ,
1945
- feedbackCatalogXmin ,
1946
- feedbackCatalogEpoch );
1956
+ if (log_min_messages <= DEBUG2 )
1957
+ {
1958
+ char * replyTimeStr ;
1959
+
1960
+ /* Copy because timestamptz_to_str returns a static buffer */
1961
+ replyTimeStr = pstrdup (timestamptz_to_str (replyTime ));
1962
+
1963
+ elog (DEBUG2 , "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u reply_time %s" ,
1964
+ feedbackXmin ,
1965
+ feedbackEpoch ,
1966
+ feedbackCatalogXmin ,
1967
+ feedbackCatalogEpoch ,
1968
+ replyTimeStr );
1969
+
1970
+ pfree (replyTimeStr );
1971
+ }
1972
+
1973
+ /*
1974
+ * Update shared state for this WalSender process based on reply data from
1975
+ * standby.
1976
+ */
1977
+ {
1978
+ WalSnd * walsnd = MyWalSnd ;
1979
+
1980
+ SpinLockAcquire (& walsnd -> mutex );
1981
+ walsnd -> replyTime = replyTime ;
1982
+ SpinLockRelease (& walsnd -> mutex );
1983
+ }
1947
1984
1948
1985
/*
1949
1986
* Unset WalSender's xmins if the feedback message values are invalid.
@@ -2265,6 +2302,7 @@ InitWalSenderSlot(void)
2265
2302
walsnd -> applyLag = -1 ;
2266
2303
walsnd -> state = WALSNDSTATE_STARTUP ;
2267
2304
walsnd -> latch = & MyProc -> procLatch ;
2305
+ walsnd -> replyTime = 0 ;
2268
2306
SpinLockRelease (& walsnd -> mutex );
2269
2307
/* don't need the lock anymore */
2270
2308
MyWalSnd = (WalSnd * ) walsnd ;
@@ -3179,7 +3217,7 @@ offset_to_interval(TimeOffset offset)
3179
3217
Datum
3180
3218
pg_stat_get_wal_senders (PG_FUNCTION_ARGS )
3181
3219
{
3182
- #define PG_STAT_GET_WAL_SENDERS_COLS 11
3220
+ #define PG_STAT_GET_WAL_SENDERS_COLS 12
3183
3221
ReturnSetInfo * rsinfo = (ReturnSetInfo * ) fcinfo -> resultinfo ;
3184
3222
TupleDesc tupdesc ;
3185
3223
Tuplestorestate * tupstore ;
@@ -3233,6 +3271,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
3233
3271
int priority ;
3234
3272
int pid ;
3235
3273
WalSndState state ;
3274
+ TimestampTz replyTime ;
3236
3275
Datum values [PG_STAT_GET_WAL_SENDERS_COLS ];
3237
3276
bool nulls [PG_STAT_GET_WAL_SENDERS_COLS ];
3238
3277
@@ -3252,6 +3291,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
3252
3291
flushLag = walsnd -> flushLag ;
3253
3292
applyLag = walsnd -> applyLag ;
3254
3293
priority = walsnd -> sync_standby_priority ;
3294
+ replyTime = walsnd -> replyTime ;
3255
3295
SpinLockRelease (& walsnd -> mutex );
3256
3296
3257
3297
memset (nulls , 0 , sizeof (nulls ));
@@ -3328,6 +3368,11 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
3328
3368
CStringGetTextDatum ("sync" ) : CStringGetTextDatum ("quorum" );
3329
3369
else
3330
3370
values [10 ] = CStringGetTextDatum ("potential" );
3371
+
3372
+ if (replyTime == 0 )
3373
+ nulls [11 ] = true;
3374
+ else
3375
+ values [11 ] = TimestampTzGetDatum (replyTime );
3331
3376
}
3332
3377
3333
3378
tuplestore_putvalues (tupstore , tupdesc , values , nulls );
0 commit comments