Skip to content

Commit 8fccf75

Browse files
author
Amit Kapila
committed
Add tests for logical replication spilled stats.
Commit 9868167 added a mechanism to track statistics corresponding to the spilling of changes from ReorderBuffer but didn't add any tests. Author: Amit Kapila and Sawada Masahiko Discussion: https://postgr.es/m/CA+fd4k5_pPAYRTDrO2PbtTOe0eHQpBvuqmCr8ic39uTNmR49Eg@mail.gmail.com
1 parent 371668a commit 8fccf75

File tree

3 files changed

+172
-1
lines changed

3 files changed

+172
-1
lines changed

contrib/test_decoding/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ PGFILEDESC = "test_decoding - example of a logical decoding output plugin"
55

66
REGRESS = ddl xact rewrite toast permissions decoding_in_xact \
77
decoding_into_rel binary prepared replorigin time messages \
8-
spill slot truncate stream
8+
spill slot truncate stream stats
99
ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \
1010
oldest_xmin snapshot_transfer subxact_without_top concurrent_stream
1111

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
-- predictability
2+
SET synchronous_commit = on;
3+
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
4+
?column?
5+
----------
6+
init
7+
(1 row)
8+
9+
CREATE TABLE stats_test(data text);
10+
-- function to wait for counters to advance
11+
CREATE FUNCTION wait_for_decode_stats(check_reset bool) RETURNS void AS $$
12+
DECLARE
13+
start_time timestamptz := clock_timestamp();
14+
updated bool;
15+
BEGIN
16+
-- we don't want to wait forever; loop will exit after 30 seconds
17+
FOR i IN 1 .. 300 LOOP
18+
19+
-- check to see if all updates have been reset/updated
20+
SELECT CASE WHEN check_reset THEN (spill_txns = 0)
21+
ELSE (spill_txns > 0)
22+
END
23+
INTO updated
24+
FROM pg_stat_replication_slots WHERE name='regression_slot';
25+
26+
exit WHEN updated;
27+
28+
-- wait a little
29+
perform pg_sleep_for('100 milliseconds');
30+
31+
-- reset stats snapshot so we can test again
32+
perform pg_stat_clear_snapshot();
33+
34+
END LOOP;
35+
36+
-- report time waited in postmaster log (where it won't change test output)
37+
RAISE LOG 'wait_for_decode_stats delayed % seconds',
38+
extract(epoch from clock_timestamp() - start_time);
39+
END
40+
$$ LANGUAGE plpgsql;
41+
-- spilling the xact
42+
BEGIN;
43+
INSERT INTO stats_test SELECT 'serialize-topbig--1:'||g.i FROM generate_series(1, 5000) g(i);
44+
COMMIT;
45+
SELECT count(*) FROM pg_logical_slot_peek_changes('regression_slot', NULL,NULL);
46+
count
47+
-------
48+
5006
49+
(1 row)
50+
51+
-- check stats, wait for stats collector to update.
52+
SELECT wait_for_decode_stats(false);
53+
wait_for_decode_stats
54+
-----------------------
55+
56+
(1 row)
57+
58+
SELECT name, spill_txns, spill_count FROM pg_stat_replication_slots;
59+
name | spill_txns | spill_count
60+
-----------------+------------+-------------
61+
regression_slot | 1 | 12
62+
(1 row)
63+
64+
-- reset the slot stats, and wait for stats collector to reset
65+
SELECT pg_stat_reset_replication_slot('regression_slot');
66+
pg_stat_reset_replication_slot
67+
--------------------------------
68+
69+
(1 row)
70+
71+
SELECT wait_for_decode_stats(true);
72+
wait_for_decode_stats
73+
-----------------------
74+
75+
(1 row)
76+
77+
SELECT name, spill_txns, spill_count FROM pg_stat_replication_slots;
78+
name | spill_txns | spill_count
79+
-----------------+------------+-------------
80+
regression_slot | 0 | 0
81+
(1 row)
82+
83+
-- decode and check stats again.
84+
SELECT count(*) FROM pg_logical_slot_peek_changes('regression_slot', NULL,NULL);
85+
count
86+
-------
87+
5006
88+
(1 row)
89+
90+
SELECT wait_for_decode_stats(false);
91+
wait_for_decode_stats
92+
-----------------------
93+
94+
(1 row)
95+
96+
SELECT name, spill_txns, spill_count FROM pg_stat_replication_slots;
97+
name | spill_txns | spill_count
98+
-----------------+------------+-------------
99+
regression_slot | 1 | 12
100+
(1 row)
101+
102+
DROP FUNCTION wait_for_decode_stats(bool);
103+
DROP TABLE stats_test;
104+
SELECT pg_drop_replication_slot('regression_slot');
105+
pg_drop_replication_slot
106+
--------------------------
107+
108+
(1 row)
109+

contrib/test_decoding/sql/stats.sql

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
-- predictability
2+
SET synchronous_commit = on;
3+
4+
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
5+
6+
CREATE TABLE stats_test(data text);
7+
8+
-- function to wait for counters to advance
9+
CREATE FUNCTION wait_for_decode_stats(check_reset bool) RETURNS void AS $$
10+
DECLARE
11+
start_time timestamptz := clock_timestamp();
12+
updated bool;
13+
BEGIN
14+
-- we don't want to wait forever; loop will exit after 30 seconds
15+
FOR i IN 1 .. 300 LOOP
16+
17+
-- check to see if all updates have been reset/updated
18+
SELECT CASE WHEN check_reset THEN (spill_txns = 0)
19+
ELSE (spill_txns > 0)
20+
END
21+
INTO updated
22+
FROM pg_stat_replication_slots WHERE name='regression_slot';
23+
24+
exit WHEN updated;
25+
26+
-- wait a little
27+
perform pg_sleep_for('100 milliseconds');
28+
29+
-- reset stats snapshot so we can test again
30+
perform pg_stat_clear_snapshot();
31+
32+
END LOOP;
33+
34+
-- report time waited in postmaster log (where it won't change test output)
35+
RAISE LOG 'wait_for_decode_stats delayed % seconds',
36+
extract(epoch from clock_timestamp() - start_time);
37+
END
38+
$$ LANGUAGE plpgsql;
39+
40+
-- spilling the xact
41+
BEGIN;
42+
INSERT INTO stats_test SELECT 'serialize-topbig--1:'||g.i FROM generate_series(1, 5000) g(i);
43+
COMMIT;
44+
SELECT count(*) FROM pg_logical_slot_peek_changes('regression_slot', NULL,NULL);
45+
46+
-- check stats, wait for stats collector to update.
47+
SELECT wait_for_decode_stats(false);
48+
SELECT name, spill_txns, spill_count FROM pg_stat_replication_slots;
49+
50+
-- reset the slot stats, and wait for stats collector to reset
51+
SELECT pg_stat_reset_replication_slot('regression_slot');
52+
SELECT wait_for_decode_stats(true);
53+
SELECT name, spill_txns, spill_count FROM pg_stat_replication_slots;
54+
55+
-- decode and check stats again.
56+
SELECT count(*) FROM pg_logical_slot_peek_changes('regression_slot', NULL,NULL);
57+
SELECT wait_for_decode_stats(false);
58+
SELECT name, spill_txns, spill_count FROM pg_stat_replication_slots;
59+
60+
DROP FUNCTION wait_for_decode_stats(bool);
61+
DROP TABLE stats_test;
62+
SELECT pg_drop_replication_slot('regression_slot');

0 commit comments

Comments
 (0)