Skip to content

Commit 7fee252

Browse files
committed
Add timestamp of last received message from standby to pg_stat_replication
The timestamp generated by the standby at message transmission has been included in the protocol since its introduction for both the status update message and hot standby feedback message, but it has never appeared in pg_stat_replication. Seeing this timestamp does not matter much with a cluster which has a lot of activity, but on a mostly-idle cluster, this makes monitoring able to react faster than the configured timeouts. Author: MyungKyu LIM Reviewed-by: Michael Paquier, Masahiko Sawada Discussion: https://postgr.es/m/1657809367.407321.1533027417725.JavaMail.jboss@ep2ml404
1 parent 1f66c65 commit 7fee252

File tree

7 files changed

+77
-20
lines changed

7 files changed

+77
-20
lines changed

doc/src/sgml/monitoring.sgml

+5
Original file line numberDiff line numberDiff line change
@@ -1920,6 +1920,11 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
19201920
</itemizedlist>
19211921
</entry>
19221922
</row>
1923+
<row>
1924+
<entry><structfield>reply_time</structfield></entry>
1925+
<entry><type>timestamp with time zone</type></entry>
1926+
<entry>Send time of last reply message received from standby server</entry>
1927+
</row>
19231928
</tbody>
19241929
</tgroup>
19251930
</table>

src/backend/catalog/system_views.sql

+2-1
Original file line numberDiff line numberDiff line change
@@ -734,7 +734,8 @@ CREATE VIEW pg_stat_replication AS
734734
W.flush_lag,
735735
W.replay_lag,
736736
W.sync_priority,
737-
W.sync_state
737+
W.sync_state,
738+
W.reply_time
738739
FROM pg_stat_get_activity(NULL) AS S
739740
JOIN pg_stat_get_wal_senders() AS W ON (S.pid = W.pid)
740741
LEFT JOIN pg_authid AS U ON (S.usesysid = U.oid);

src/backend/replication/walsender.c

+58-13
Original file line numberDiff line numberDiff line change
@@ -1763,21 +1763,33 @@ ProcessStandbyReplyMessage(void)
17631763
applyLag;
17641764
bool clearLagTimes;
17651765
TimestampTz now;
1766+
TimestampTz replyTime;
17661767

17671768
static bool fullyAppliedLastTime = false;
17681769

17691770
/* the caller already consumed the msgtype byte */
17701771
writePtr = pq_getmsgint64(&reply_message);
17711772
flushPtr = pq_getmsgint64(&reply_message);
17721773
applyPtr = pq_getmsgint64(&reply_message);
1773-
(void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */
1774+
replyTime = pq_getmsgint64(&reply_message);
17741775
replyRequested = pq_getmsgbyte(&reply_message);
17751776

1776-
elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s",
1777-
(uint32) (writePtr >> 32), (uint32) writePtr,
1778-
(uint32) (flushPtr >> 32), (uint32) flushPtr,
1779-
(uint32) (applyPtr >> 32), (uint32) applyPtr,
1780-
replyRequested ? " (reply requested)" : "");
1777+
if (log_min_messages <= DEBUG2)
1778+
{
1779+
char *replyTimeStr;
1780+
1781+
/* Copy because timestamptz_to_str returns a static buffer */
1782+
replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
1783+
1784+
elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s reply_time %s",
1785+
(uint32) (writePtr >> 32), (uint32) writePtr,
1786+
(uint32) (flushPtr >> 32), (uint32) flushPtr,
1787+
(uint32) (applyPtr >> 32), (uint32) applyPtr,
1788+
replyRequested ? " (reply requested)" : "",
1789+
replyTimeStr);
1790+
1791+
pfree(replyTimeStr);
1792+
}
17811793

17821794
/* See if we can compute the round-trip lag for these positions. */
17831795
now = GetCurrentTimestamp();
@@ -1824,6 +1836,7 @@ ProcessStandbyReplyMessage(void)
18241836
walsnd->flushLag = flushLag;
18251837
if (applyLag != -1 || clearLagTimes)
18261838
walsnd->applyLag = applyLag;
1839+
walsnd->replyTime = replyTime;
18271840
SpinLockRelease(&walsnd->mutex);
18281841
}
18291842

@@ -1927,23 +1940,47 @@ ProcessStandbyHSFeedbackMessage(void)
19271940
uint32 feedbackEpoch;
19281941
TransactionId feedbackCatalogXmin;
19291942
uint32 feedbackCatalogEpoch;
1943+
TimestampTz replyTime;
19301944

19311945
/*
19321946
* Decipher the reply message. The caller already consumed the msgtype
19331947
* byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
19341948
* of this message.
19351949
*/
1936-
(void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */
1950+
replyTime = pq_getmsgint64(&reply_message);
19371951
feedbackXmin = pq_getmsgint(&reply_message, 4);
19381952
feedbackEpoch = pq_getmsgint(&reply_message, 4);
19391953
feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
19401954
feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);
19411955

1942-
elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u",
1943-
feedbackXmin,
1944-
feedbackEpoch,
1945-
feedbackCatalogXmin,
1946-
feedbackCatalogEpoch);
1956+
if (log_min_messages <= DEBUG2)
1957+
{
1958+
char *replyTimeStr;
1959+
1960+
/* Copy because timestamptz_to_str returns a static buffer */
1961+
replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
1962+
1963+
elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u reply_time %s",
1964+
feedbackXmin,
1965+
feedbackEpoch,
1966+
feedbackCatalogXmin,
1967+
feedbackCatalogEpoch,
1968+
replyTimeStr);
1969+
1970+
pfree(replyTimeStr);
1971+
}
1972+
1973+
/*
1974+
* Update shared state for this WalSender process based on reply data from
1975+
* standby.
1976+
*/
1977+
{
1978+
WalSnd *walsnd = MyWalSnd;
1979+
1980+
SpinLockAcquire(&walsnd->mutex);
1981+
walsnd->replyTime = replyTime;
1982+
SpinLockRelease(&walsnd->mutex);
1983+
}
19471984

19481985
/*
19491986
* Unset WalSender's xmins if the feedback message values are invalid.
@@ -2265,6 +2302,7 @@ InitWalSenderSlot(void)
22652302
walsnd->applyLag = -1;
22662303
walsnd->state = WALSNDSTATE_STARTUP;
22672304
walsnd->latch = &MyProc->procLatch;
2305+
walsnd->replyTime = 0;
22682306
SpinLockRelease(&walsnd->mutex);
22692307
/* don't need the lock anymore */
22702308
MyWalSnd = (WalSnd *) walsnd;
@@ -3179,7 +3217,7 @@ offset_to_interval(TimeOffset offset)
31793217
Datum
31803218
pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
31813219
{
3182-
#define PG_STAT_GET_WAL_SENDERS_COLS 11
3220+
#define PG_STAT_GET_WAL_SENDERS_COLS 12
31833221
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
31843222
TupleDesc tupdesc;
31853223
Tuplestorestate *tupstore;
@@ -3233,6 +3271,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
32333271
int priority;
32343272
int pid;
32353273
WalSndState state;
3274+
TimestampTz replyTime;
32363275
Datum values[PG_STAT_GET_WAL_SENDERS_COLS];
32373276
bool nulls[PG_STAT_GET_WAL_SENDERS_COLS];
32383277

@@ -3252,6 +3291,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
32523291
flushLag = walsnd->flushLag;
32533292
applyLag = walsnd->applyLag;
32543293
priority = walsnd->sync_standby_priority;
3294+
replyTime = walsnd->replyTime;
32553295
SpinLockRelease(&walsnd->mutex);
32563296

32573297
memset(nulls, 0, sizeof(nulls));
@@ -3328,6 +3368,11 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
33283368
CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
33293369
else
33303370
values[10] = CStringGetTextDatum("potential");
3371+
3372+
if (replyTime == 0)
3373+
nulls[11] = true;
3374+
else
3375+
values[11] = TimestampTzGetDatum(replyTime);
33313376
}
33323377

33333378
tuplestore_putvalues(tupstore, tupdesc, values, nulls);

src/include/catalog/catversion.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,6 @@
5353
*/
5454

5555
/* yyyymmddN */
56-
#define CATALOG_VERSION_NO 201811201
56+
#define CATALOG_VERSION_NO 201812091
5757

5858
#endif

src/include/catalog/pg_proc.dat

+3-3
Original file line numberDiff line numberDiff line change
@@ -5023,9 +5023,9 @@
50235023
proname => 'pg_stat_get_wal_senders', prorows => '10', proisstrict => 'f',
50245024
proretset => 't', provolatile => 's', proparallel => 'r',
50255025
prorettype => 'record', proargtypes => '',
5026-
proallargtypes => '{int4,text,pg_lsn,pg_lsn,pg_lsn,pg_lsn,interval,interval,interval,int4,text}',
5027-
proargmodes => '{o,o,o,o,o,o,o,o,o,o,o}',
5028-
proargnames => '{pid,state,sent_lsn,write_lsn,flush_lsn,replay_lsn,write_lag,flush_lag,replay_lag,sync_priority,sync_state}',
5026+
proallargtypes => '{int4,text,pg_lsn,pg_lsn,pg_lsn,pg_lsn,interval,interval,interval,int4,text,timestamptz}',
5027+
proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o}',
5028+
proargnames => '{pid,state,sent_lsn,write_lsn,flush_lsn,replay_lsn,write_lag,flush_lag,replay_lag,sync_priority,sync_state,reply_time}',
50295029
prosrc => 'pg_stat_get_wal_senders' },
50305030
{ oid => '3317', descr => 'statistics: information about WAL receiver',
50315031
proname => 'pg_stat_get_wal_receiver', proisstrict => 'f', provolatile => 's',

src/include/replication/walsender_private.h

+5
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,11 @@ typedef struct WalSnd
7575
* SyncRepLock.
7676
*/
7777
int sync_standby_priority;
78+
79+
/*
80+
* Timestamp of the last message received from standby.
81+
*/
82+
TimestampTz replyTime;
7883
} WalSnd;
7984

8085
extern WalSnd *MyWalSnd;

src/test/regress/expected/rules.out

+3-2
Original file line numberDiff line numberDiff line change
@@ -1861,9 +1861,10 @@ pg_stat_replication| SELECT s.pid,
18611861
w.flush_lag,
18621862
w.replay_lag,
18631863
w.sync_priority,
1864-
w.sync_state
1864+
w.sync_state,
1865+
w.reply_time
18651866
FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, sslclientdn)
1866-
JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state) ON ((s.pid = w.pid)))
1867+
JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state, reply_time) ON ((s.pid = w.pid)))
18671868
LEFT JOIN pg_authid u ON ((s.usesysid = u.oid)));
18681869
pg_stat_ssl| SELECT s.pid,
18691870
s.ssl,

0 commit comments

Comments
 (0)