You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Add information of total data processed to replication slot stats.
This adds the statistics about total transactions count and total
transaction data logically sent to the decoding output plugin from
ReorderBuffer. Users can query the pg_stat_replication_slots view to check
these stats.
Suggested-by: Andres Freund
Author: Vignesh C and Amit Kapila
Reviewed-by: Sawada Masahiko, Amit Kapila
Discussion: https://postgr.es/m/20210319185247.ldebgpdaxsowiflw@alap3.anarazel.de
Copy file name to clipboardExpand all lines: contrib/test_decoding/expected/stats.out
+56-23
Original file line number
Diff line number
Diff line change
@@ -8,20 +8,33 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_d
8
8
9
9
CREATE TABLE stats_test(data text);
10
10
-- function to wait for counters to advance
11
-
CREATE FUNCTION wait_for_decode_stats(check_reset bool) RETURNS void AS $$
11
+
CREATE FUNCTION wait_for_decode_stats(check_reset bool, check_spill_txns bool) RETURNS void AS $$
12
12
DECLARE
13
13
start_time timestamptz := clock_timestamp();
14
14
updated bool;
15
15
BEGIN
16
16
-- we don't want to wait forever; loop will exit after 30 seconds
17
17
FOR i IN 1 .. 300 LOOP
18
18
19
-
-- check to see if all updates have been reset/updated
20
-
SELECT CASE WHEN check_reset THEN (spill_txns = 0)
21
-
ELSE (spill_txns > 0)
22
-
END
23
-
INTO updated
24
-
FROM pg_stat_replication_slots WHERE slot_name='regression_slot';
19
+
IF check_spill_txns THEN
20
+
21
+
-- check to see if all updates have been reset/updated
22
+
SELECT CASE WHEN check_reset THEN (spill_txns = 0)
23
+
ELSE (spill_txns > 0)
24
+
END
25
+
INTO updated
26
+
FROM pg_stat_replication_slots WHERE slot_name='regression_slot';
27
+
28
+
ELSE
29
+
30
+
-- check to see if all updates have been reset/updated
31
+
SELECT CASE WHEN check_reset THEN (total_txns = 0)
32
+
ELSE (total_txns > 0)
33
+
END
34
+
INTO updated
35
+
FROM pg_stat_replication_slots WHERE slot_name='regression_slot';
36
+
37
+
END IF;
25
38
26
39
exit WHEN updated;
27
40
@@ -51,16 +64,16 @@ SELECT count(*) FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL,
51
64
-- Check stats, wait for the stats collector to update. We can't test the
52
65
-- exact stats count as that can vary if any background transaction (say by
53
66
-- autovacuum) happens in parallel to the main transaction.
54
-
SELECT wait_for_decode_stats(false);
67
+
SELECT wait_for_decode_stats(false, true);
55
68
wait_for_decode_stats
56
69
-----------------------
57
70
58
71
(1 row)
59
72
60
-
SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count FROM pg_stat_replication_slots;
61
-
slot_name | spill_txns | spill_count
62
-
-----------------+------------+-------------
63
-
regression_slot | t | t
73
+
SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots;
@@ -89,16 +102,36 @@ SELECT count(*) FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL,
89
102
5002
90
103
(1 row)
91
104
92
-
SELECT wait_for_decode_stats(false);
105
+
SELECT wait_for_decode_stats(false, true);
106
+
wait_for_decode_stats
107
+
-----------------------
108
+
109
+
(1 row)
110
+
111
+
SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots;
INSERT INTO stats_test values(generate_series(1, 10));
125
+
SELECT wait_for_decode_stats(false, false);
93
126
wait_for_decode_stats
94
127
-----------------------
95
128
96
129
(1 row)
97
130
98
-
SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count FROM pg_stat_replication_slots;
99
-
slot_name | spill_txns | spill_count
100
-
-----------------+------------+-------------
101
-
regression_slot | t | t
131
+
SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots;
0 commit comments