Skip to content

Commit f5fc2f5

Browse files
author
Amit Kapila
committed
Add information of total data processed to replication slot stats.
This adds the statistics about total transactions count and total transaction data logically sent to the decoding output plugin from ReorderBuffer. Users can query the pg_stat_replication_slots view to check these stats. Suggested-by: Andres Freund Author: Vignesh C and Amit Kapila Reviewed-by: Sawada Masahiko, Amit Kapila Discussion: https://postgr.es/m/20210319185247.ldebgpdaxsowiflw@alap3.anarazel.de
1 parent 1bf946b commit f5fc2f5

File tree

15 files changed

+256
-52
lines changed

15 files changed

+256
-52
lines changed

contrib/test_decoding/Makefile

+2
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
1717
# typical installcheck users do not have (e.g. buildfarm clients).
1818
NO_INSTALLCHECK = 1
1919

20+
TAP_TESTS = 1
21+
2022
ifdef USE_PGXS
2123
PG_CONFIG = pg_config
2224
PGXS := $(shell $(PG_CONFIG) --pgxs)

contrib/test_decoding/expected/stats.out

+56-23
Original file line numberDiff line numberDiff line change
@@ -8,20 +8,33 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_d
88

99
CREATE TABLE stats_test(data text);
1010
-- function to wait for counters to advance
11-
CREATE FUNCTION wait_for_decode_stats(check_reset bool) RETURNS void AS $$
11+
CREATE FUNCTION wait_for_decode_stats(check_reset bool, check_spill_txns bool) RETURNS void AS $$
1212
DECLARE
1313
start_time timestamptz := clock_timestamp();
1414
updated bool;
1515
BEGIN
1616
-- we don't want to wait forever; loop will exit after 30 seconds
1717
FOR i IN 1 .. 300 LOOP
1818

19-
-- check to see if all updates have been reset/updated
20-
SELECT CASE WHEN check_reset THEN (spill_txns = 0)
21-
ELSE (spill_txns > 0)
22-
END
23-
INTO updated
24-
FROM pg_stat_replication_slots WHERE slot_name='regression_slot';
19+
IF check_spill_txns THEN
20+
21+
-- check to see if all updates have been reset/updated
22+
SELECT CASE WHEN check_reset THEN (spill_txns = 0)
23+
ELSE (spill_txns > 0)
24+
END
25+
INTO updated
26+
FROM pg_stat_replication_slots WHERE slot_name='regression_slot';
27+
28+
ELSE
29+
30+
-- check to see if all updates have been reset/updated
31+
SELECT CASE WHEN check_reset THEN (total_txns = 0)
32+
ELSE (total_txns > 0)
33+
END
34+
INTO updated
35+
FROM pg_stat_replication_slots WHERE slot_name='regression_slot';
36+
37+
END IF;
2538

2639
exit WHEN updated;
2740

@@ -51,16 +64,16 @@ SELECT count(*) FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL,
5164
-- Check stats, wait for the stats collector to update. We can't test the
5265
-- exact stats count as that can vary if any background transaction (say by
5366
-- autovacuum) happens in parallel to the main transaction.
54-
SELECT wait_for_decode_stats(false);
67+
SELECT wait_for_decode_stats(false, true);
5568
wait_for_decode_stats
5669
-----------------------
5770

5871
(1 row)
5972

60-
SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count FROM pg_stat_replication_slots;
61-
slot_name | spill_txns | spill_count
62-
-----------------+------------+-------------
63-
regression_slot | t | t
73+
SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots;
74+
slot_name | spill_txns | spill_count | total_txns | total_bytes
75+
-----------------+------------+-------------+------------+-------------
76+
regression_slot | t | t | t | t
6477
(1 row)
6578

6679
-- reset the slot stats, and wait for stats collector to reset
@@ -70,16 +83,16 @@ SELECT pg_stat_reset_replication_slot('regression_slot');
7083

7184
(1 row)
7285

73-
SELECT wait_for_decode_stats(true);
86+
SELECT wait_for_decode_stats(true, true);
7487
wait_for_decode_stats
7588
-----------------------
7689

7790
(1 row)
7891

79-
SELECT slot_name, spill_txns, spill_count FROM pg_stat_replication_slots;
80-
slot_name | spill_txns | spill_count
81-
-----------------+------------+-------------
82-
regression_slot | 0 | 0
92+
SELECT slot_name, spill_txns, spill_count, total_txns, total_bytes FROM pg_stat_replication_slots;
93+
slot_name | spill_txns | spill_count | total_txns | total_bytes
94+
-----------------+------------+-------------+------------+-------------
95+
regression_slot | 0 | 0 | 0 | 0
8396
(1 row)
8497

8598
-- decode and check stats again.
@@ -89,16 +102,36 @@ SELECT count(*) FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL,
89102
5002
90103
(1 row)
91104

92-
SELECT wait_for_decode_stats(false);
105+
SELECT wait_for_decode_stats(false, true);
106+
wait_for_decode_stats
107+
-----------------------
108+
109+
(1 row)
110+
111+
SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots;
112+
slot_name | spill_txns | spill_count | total_txns | total_bytes
113+
-----------------+------------+-------------+------------+-------------
114+
regression_slot | t | t | t | t
115+
(1 row)
116+
117+
SELECT pg_stat_reset_replication_slot('regression_slot');
118+
pg_stat_reset_replication_slot
119+
--------------------------------
120+
121+
(1 row)
122+
123+
-- non-spilled xact
124+
INSERT INTO stats_test values(generate_series(1, 10));
125+
SELECT wait_for_decode_stats(false, false);
93126
wait_for_decode_stats
94127
-----------------------
95128

96129
(1 row)
97130

98-
SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count FROM pg_stat_replication_slots;
99-
slot_name | spill_txns | spill_count
100-
-----------------+------------+-------------
101-
regression_slot | t | t
131+
SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots;
132+
slot_name | spill_txns | spill_count | total_txns | total_bytes
133+
-----------------+------------+-------------+------------+-------------
134+
regression_slot | f | f | t | t
102135
(1 row)
103136

104137
-- Ensure stats can be repeatedly accessed using the same stats snapshot. See
@@ -117,7 +150,7 @@ SELECT slot_name FROM pg_stat_replication_slots;
117150
(1 row)
118151

119152
COMMIT;
120-
DROP FUNCTION wait_for_decode_stats(bool);
153+
DROP FUNCTION wait_for_decode_stats(bool, bool);
121154
DROP TABLE stats_test;
122155
SELECT pg_drop_replication_slot('regression_slot');
123156
pg_drop_replication_slot

contrib/test_decoding/sql/stats.sql

+34-14
Original file line numberDiff line numberDiff line change
@@ -6,20 +6,33 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_d
66
CREATE TABLE stats_test(data text);
77

88
-- function to wait for counters to advance
9-
CREATE FUNCTION wait_for_decode_stats(check_reset bool) RETURNS void AS $$
9+
CREATE FUNCTION wait_for_decode_stats(check_reset bool, check_spill_txns bool) RETURNS void AS $$
1010
DECLARE
1111
start_time timestamptz := clock_timestamp();
1212
updated bool;
1313
BEGIN
1414
-- we don't want to wait forever; loop will exit after 30 seconds
1515
FOR i IN 1 .. 300 LOOP
1616

17-
-- check to see if all updates have been reset/updated
18-
SELECT CASE WHEN check_reset THEN (spill_txns = 0)
19-
ELSE (spill_txns > 0)
20-
END
21-
INTO updated
22-
FROM pg_stat_replication_slots WHERE slot_name='regression_slot';
17+
IF check_spill_txns THEN
18+
19+
-- check to see if all updates have been reset/updated
20+
SELECT CASE WHEN check_reset THEN (spill_txns = 0)
21+
ELSE (spill_txns > 0)
22+
END
23+
INTO updated
24+
FROM pg_stat_replication_slots WHERE slot_name='regression_slot';
25+
26+
ELSE
27+
28+
-- check to see if all updates have been reset/updated
29+
SELECT CASE WHEN check_reset THEN (total_txns = 0)
30+
ELSE (total_txns > 0)
31+
END
32+
INTO updated
33+
FROM pg_stat_replication_slots WHERE slot_name='regression_slot';
34+
35+
END IF;
2336

2437
exit WHEN updated;
2538

@@ -46,18 +59,25 @@ SELECT count(*) FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL,
4659
-- Check stats, wait for the stats collector to update. We can't test the
4760
-- exact stats count as that can vary if any background transaction (say by
4861
-- autovacuum) happens in parallel to the main transaction.
49-
SELECT wait_for_decode_stats(false);
50-
SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count FROM pg_stat_replication_slots;
62+
SELECT wait_for_decode_stats(false, true);
63+
SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots;
5164

5265
-- reset the slot stats, and wait for stats collector to reset
5366
SELECT pg_stat_reset_replication_slot('regression_slot');
54-
SELECT wait_for_decode_stats(true);
55-
SELECT slot_name, spill_txns, spill_count FROM pg_stat_replication_slots;
67+
SELECT wait_for_decode_stats(true, true);
68+
SELECT slot_name, spill_txns, spill_count, total_txns, total_bytes FROM pg_stat_replication_slots;
5669

5770
-- decode and check stats again.
5871
SELECT count(*) FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'skip-empty-xacts', '1');
59-
SELECT wait_for_decode_stats(false);
60-
SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count FROM pg_stat_replication_slots;
72+
SELECT wait_for_decode_stats(false, true);
73+
SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots;
74+
75+
SELECT pg_stat_reset_replication_slot('regression_slot');
76+
77+
-- non-spilled xact
78+
INSERT INTO stats_test values(generate_series(1, 10));
79+
SELECT wait_for_decode_stats(false, false);
80+
SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots;
6181

6282
-- Ensure stats can be repeatedly accessed using the same stats snapshot. See
6383
-- https://postgr.es/m/20210317230447.c7uc4g3vbs4wi32i%40alap3.anarazel.de
@@ -66,6 +86,6 @@ SELECT slot_name FROM pg_stat_replication_slots;
6686
SELECT slot_name FROM pg_stat_replication_slots;
6787
COMMIT;
6888

69-
DROP FUNCTION wait_for_decode_stats(bool);
89+
DROP FUNCTION wait_for_decode_stats(bool, bool);
7090
DROP TABLE stats_test;
7191
SELECT pg_drop_replication_slot('regression_slot');
+76
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
# Test replication statistics data in pg_stat_replication_slots is sane after
2+
# drop replication slot and restart.
3+
use strict;
4+
use warnings;
5+
use PostgresNode;
6+
use TestLib;
7+
use Test::More tests => 1;
8+
9+
# Test set-up
10+
my $node = get_new_node('test');
11+
$node->init(allows_streaming => 'logical');
12+
$node->append_conf('postgresql.conf', 'synchronous_commit = on');
13+
$node->start;
14+
15+
# Create table.
16+
$node->safe_psql('postgres',
17+
"CREATE TABLE test_repl_stat(col1 int)");
18+
19+
# Create replication slots.
20+
$node->safe_psql(
21+
'postgres', qq[
22+
SELECT pg_create_logical_replication_slot('regression_slot1', 'test_decoding');
23+
SELECT pg_create_logical_replication_slot('regression_slot2', 'test_decoding');
24+
SELECT pg_create_logical_replication_slot('regression_slot3', 'test_decoding');
25+
SELECT pg_create_logical_replication_slot('regression_slot4', 'test_decoding');
26+
]);
27+
28+
# Insert some data.
29+
$node->safe_psql('postgres', "INSERT INTO test_repl_stat values(generate_series(1, 5));");
30+
31+
$node->safe_psql(
32+
'postgres', qq[
33+
SELECT data FROM pg_logical_slot_get_changes('regression_slot1', NULL,
34+
NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
35+
SELECT data FROM pg_logical_slot_get_changes('regression_slot2', NULL,
36+
NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
37+
SELECT data FROM pg_logical_slot_get_changes('regression_slot3', NULL,
38+
NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
39+
SELECT data FROM pg_logical_slot_get_changes('regression_slot4', NULL,
40+
NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
41+
]);
42+
43+
# Wait for the statistics to be updated.
44+
$node->poll_query_until(
45+
'postgres', qq[
46+
SELECT count(slot_name) >= 4 FROM pg_stat_replication_slots
47+
WHERE slot_name ~ 'regression_slot'
48+
AND total_txns > 0 AND total_bytes > 0;
49+
]) or die "Timed out while waiting for statistics to be updated";
50+
51+
# Test to drop one of the replication slot and verify replication statistics data is
52+
# fine after restart.
53+
$node->safe_psql('postgres', "SELECT pg_drop_replication_slot('regression_slot4')");
54+
55+
$node->stop;
56+
$node->start;
57+
58+
# Verify statistics data present in pg_stat_replication_slots are sane after
59+
# restart.
60+
my $result = $node->safe_psql('postgres',
61+
"SELECT slot_name, total_txns > 0 AS total_txn,
62+
total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots
63+
ORDER BY slot_name"
64+
);
65+
is($result, qq(regression_slot1|t|t
66+
regression_slot2|t|t
67+
regression_slot3|t|t), 'check replication statistics are updated');
68+
69+
# cleanup
70+
$node->safe_psql('postgres', "DROP TABLE test_repl_stat");
71+
$node->safe_psql('postgres', "SELECT pg_drop_replication_slot('regression_slot1')");
72+
$node->safe_psql('postgres', "SELECT pg_drop_replication_slot('regression_slot2')");
73+
$node->safe_psql('postgres', "SELECT pg_drop_replication_slot('regression_slot3')");
74+
75+
# shutdown
76+
$node->stop;

doc/src/sgml/monitoring.sgml

+25
Original file line numberDiff line numberDiff line change
@@ -2716,6 +2716,31 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
27162716
</entry>
27172717
</row>
27182718

2719+
<row>
2720+
<entry role="catalog_table_entry"><para role="column_definition">
2721+
<structfield>total_txns</structfield> <type>bigint</type>
2722+
</para>
2723+
<para>
2724+
Number of decoded transactions sent to the decoding output plugin for
2725+
this slot. This counter is used to maintain the top level transactions,
2726+
so the counter is not incremented for subtransactions. Note that this
2727+
includes the transactions that are streamed and/or spilled.
2728+
</para></entry>
2729+
</row>
2730+
2731+
<row>
2732+
<entry role="catalog_table_entry"><para role="column_definition">
2733+
<structfield>total_bytes</structfield><type>bigint</type>
2734+
</para>
2735+
<para>
2736+
Amount of decoded transactions data sent to the decoding output plugin
2737+
while decoding the changes from WAL for this slot. This can be used to
2738+
gauge the total amount of data sent during logical decoding. Note that
2739+
this includes the data that is streamed and/or spilled.
2740+
</para>
2741+
</entry>
2742+
</row>
2743+
27192744
<row>
27202745
<entry role="catalog_table_entry"><para role="column_definition">
27212746
<structfield>stats_reset</structfield> <type>timestamp with time zone</type>

src/backend/catalog/system_views.sql

+2
Original file line numberDiff line numberDiff line change
@@ -875,6 +875,8 @@ CREATE VIEW pg_stat_replication_slots AS
875875
s.stream_txns,
876876
s.stream_count,
877877
s.stream_bytes,
878+
s.total_txns,
879+
s.total_bytes,
878880
s.stats_reset
879881
FROM pg_stat_get_replication_slots() AS s;
880882

src/backend/postmaster/pgstat.c

+6
Original file line numberDiff line numberDiff line change
@@ -1829,6 +1829,8 @@ pgstat_report_replslot(const PgStat_ReplSlotStats *repSlotStat)
18291829
msg.m_stream_txns = repSlotStat->stream_txns;
18301830
msg.m_stream_count = repSlotStat->stream_count;
18311831
msg.m_stream_bytes = repSlotStat->stream_bytes;
1832+
msg.m_total_txns = repSlotStat->total_txns;
1833+
msg.m_total_bytes = repSlotStat->total_bytes;
18321834
pgstat_send(&msg, sizeof(PgStat_MsgReplSlot));
18331835
}
18341836

@@ -5568,6 +5570,8 @@ pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len)
55685570
replSlotStats[idx].stream_txns += msg->m_stream_txns;
55695571
replSlotStats[idx].stream_count += msg->m_stream_count;
55705572
replSlotStats[idx].stream_bytes += msg->m_stream_bytes;
5573+
replSlotStats[idx].total_txns += msg->m_total_txns;
5574+
replSlotStats[idx].total_bytes += msg->m_total_bytes;
55715575
}
55725576
}
55735577

@@ -5795,6 +5799,8 @@ pgstat_reset_replslot(int i, TimestampTz ts)
57955799
replSlotStats[i].stream_txns = 0;
57965800
replSlotStats[i].stream_count = 0;
57975801
replSlotStats[i].stream_bytes = 0;
5802+
replSlotStats[i].total_txns = 0;
5803+
replSlotStats[i].total_bytes = 0;
57985804
replSlotStats[i].stat_reset_timestamp = ts;
57995805
}
58005806

0 commit comments

Comments
 (0)