Skip to content

Commit 6b0f6f7

Browse files
author
Amit Kapila
committed
Add tap tests for the schema publications.
This adds additional tests for commit 5a28324 ("Allow publishing the tables of schema.). This allows testing streaming of data in tables that are published via schema publications. Author: Vignesh C, Haiying Tang Reviewed-by: Greg Nancarrow, Hou Zhijie, Amit Kapila Discussion: https://www.postgresql.org/message-id/CALDaNm0OANxuJ6RXqwZsM1MSY4s19nuH3734j4a72etDwvBETQ%40mail.gmail.com
1 parent d680992 commit 6b0f6f7

File tree

1 file changed

+205
-0
lines changed

1 file changed

+205
-0
lines changed
Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
2+
# Copyright (c) 2021, PostgreSQL Global Development Group
3+
4+
# Logical replication tests for schema publications
5+
use strict;
6+
use warnings;
7+
use PostgreSQL::Test::Cluster;
8+
use PostgreSQL::Test::Utils;
9+
use Test::More tests => 13;
10+
11+
# Initialize publisher node
12+
my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
13+
$node_publisher->init(allows_streaming => 'logical');
14+
$node_publisher->start;
15+
16+
# Create subscriber node
17+
my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
18+
$node_subscriber->init(allows_streaming => 'logical');
19+
$node_subscriber->start;
20+
21+
# Test replication with publications created using FOR ALL TABLES IN SCHEMA
22+
# option.
23+
# Create schemas and tables on publisher
24+
$node_publisher->safe_psql('postgres', "CREATE SCHEMA sch1");
25+
$node_publisher->safe_psql('postgres',
26+
"CREATE TABLE sch1.tab1 AS SELECT generate_series(1,10) AS a");
27+
$node_publisher->safe_psql('postgres',
28+
"CREATE TABLE sch1.tab2 AS SELECT generate_series(1,10) AS a");
29+
$node_publisher->safe_psql('postgres',
30+
"CREATE TABLE sch1.tab1_parent (a int PRIMARY KEY, b text) PARTITION BY LIST (a)");
31+
$node_publisher->safe_psql('postgres',
32+
"CREATE TABLE public.tab1_child1 PARTITION OF sch1.tab1_parent FOR VALUES IN (1, 2, 3)");
33+
$node_publisher->safe_psql('postgres',
34+
"CREATE TABLE public.tab1_child2 PARTITION OF sch1.tab1_parent FOR VALUES IN (4, 5, 6)");
35+
36+
$node_publisher->safe_psql('postgres',
37+
"INSERT INTO sch1.tab1_parent values (1),(4)");
38+
39+
# Create schemas and tables on subscriber
40+
$node_subscriber->safe_psql('postgres', "CREATE SCHEMA sch1");
41+
$node_subscriber->safe_psql('postgres', "CREATE TABLE sch1.tab1 (a int)");
42+
$node_subscriber->safe_psql('postgres', "CREATE TABLE sch1.tab2 (a int)");
43+
$node_subscriber->safe_psql('postgres',
44+
"CREATE TABLE sch1.tab1_parent (a int PRIMARY KEY, b text) PARTITION BY LIST (a)");
45+
$node_subscriber->safe_psql('postgres',
46+
"CREATE TABLE public.tab1_child1 PARTITION OF sch1.tab1_parent FOR VALUES IN (1, 2, 3)");
47+
$node_subscriber->safe_psql('postgres',
48+
"CREATE TABLE public.tab1_child2 PARTITION OF sch1.tab1_parent FOR VALUES IN (4, 5, 6)");
49+
50+
# Setup logical replication
51+
my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
52+
$node_publisher->safe_psql('postgres',
53+
"CREATE PUBLICATION tap_pub_schema FOR ALL TABLES IN SCHEMA sch1");
54+
55+
$node_subscriber->safe_psql('postgres',
56+
"CREATE SUBSCRIPTION tap_sub_schema CONNECTION '$publisher_connstr' PUBLICATION tap_pub_schema"
57+
);
58+
59+
$node_publisher->wait_for_catchup('tap_sub_schema');
60+
61+
# Also wait for initial table sync to finish
62+
my $synced_query =
63+
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
64+
$node_subscriber->poll_query_until('postgres', $synced_query)
65+
or die "Timed out while waiting for subscriber to synchronize data";
66+
67+
# Check the schema table data is synced up
68+
my $result = $node_subscriber->safe_psql('postgres',
69+
"SELECT count(*), min(a), max(a) FROM sch1.tab1");
70+
is($result, qq(10|1|10), 'check rows on subscriber catchup');
71+
72+
$result = $node_subscriber->safe_psql('postgres',
73+
"SELECT count(*), min(a), max(a) FROM sch1.tab2");
74+
is($result, qq(10|1|10), 'check rows on subscriber catchup');
75+
76+
$result = $node_subscriber->safe_psql('postgres',
77+
"SELECT * FROM sch1.tab1_parent order by 1");
78+
is($result, qq(1|
79+
4|), 'check rows on subscriber catchup');
80+
81+
# Insert some data into few tables and verify that inserted data is replicated
82+
$node_publisher->safe_psql('postgres',
83+
"INSERT INTO sch1.tab1 VALUES(generate_series(11,20))");
84+
85+
$node_publisher->safe_psql('postgres',
86+
"INSERT INTO sch1.tab1_parent values (2),(5)");
87+
88+
$node_publisher->wait_for_catchup('tap_sub_schema');
89+
90+
$result = $node_subscriber->safe_psql('postgres',
91+
"SELECT count(*), min(a), max(a) FROM sch1.tab1");
92+
is($result, qq(20|1|20), 'check replicated inserts on subscriber');
93+
94+
$result = $node_subscriber->safe_psql('postgres',
95+
"SELECT * FROM sch1.tab1_parent order by 1");
96+
is($result, qq(1|
97+
2|
98+
4|
99+
5|), 'check replicated inserts on subscriber');
100+
101+
# Create new table in the publication schema, verify that subscriber does not get
102+
# the new table data before refresh.
103+
$node_publisher->safe_psql('postgres',
104+
"CREATE TABLE sch1.tab3 AS SELECT generate_series(1,10) AS a");
105+
106+
$node_subscriber->safe_psql('postgres', "CREATE TABLE sch1.tab3(a int)");
107+
108+
$node_publisher->wait_for_catchup('tap_sub_schema');
109+
110+
$result =
111+
$node_subscriber->safe_psql('postgres', "SELECT count(*) FROM sch1.tab3");
112+
is($result, qq(0), 'check replicated inserts on subscriber');
113+
114+
# Table data should be reflected after refreshing the publication in
115+
# subscriber.
116+
$node_subscriber->safe_psql('postgres',
117+
"ALTER SUBSCRIPTION tap_sub_schema REFRESH PUBLICATION");
118+
119+
# Wait for sync to finish
120+
$node_subscriber->poll_query_until('postgres', $synced_query)
121+
or die "Timed out while waiting for subscriber to synchronize data";
122+
123+
$node_publisher->safe_psql('postgres', "INSERT INTO sch1.tab3 VALUES(11)");
124+
125+
$node_publisher->wait_for_catchup('tap_sub_schema');
126+
127+
$result = $node_subscriber->safe_psql('postgres',
128+
"SELECT count(*), min(a), max(a) FROM sch1.tab3");
129+
is($result, qq(11|1|11), 'check rows on subscriber catchup');
130+
131+
# Set the schema of a publication schema table to a non publication schema and
132+
# verify that inserted data is not reflected by the subscriber.
133+
$node_publisher->safe_psql('postgres',
134+
"ALTER TABLE sch1.tab3 SET SCHEMA public");
135+
$node_publisher->safe_psql('postgres', "INSERT INTO public.tab3 VALUES(12)");
136+
137+
$node_publisher->wait_for_catchup('tap_sub_schema');
138+
139+
$result = $node_subscriber->safe_psql('postgres',
140+
"SELECT count(*), min(a), max(a) FROM sch1.tab3");
141+
is($result, qq(11|1|11), 'check replicated inserts on subscriber');
142+
143+
# Verify that the subscription relation list is updated after refresh
144+
$result = $node_subscriber->safe_psql('postgres',
145+
"SELECT count(*) FROM pg_subscription_rel WHERE srsubid IN (SELECT oid FROM pg_subscription WHERE subname = 'tap_sub_schema')"
146+
);
147+
is($result, qq(5),
148+
'check subscription relation status is not yet dropped on subscriber');
149+
150+
# Ask for data sync
151+
$node_subscriber->safe_psql('postgres',
152+
"ALTER SUBSCRIPTION tap_sub_schema REFRESH PUBLICATION");
153+
154+
# Wait for sync to finish
155+
$node_subscriber->poll_query_until('postgres', $synced_query)
156+
or die "Timed out while waiting for subscriber to synchronize data";
157+
158+
$result = $node_subscriber->safe_psql('postgres',
159+
"SELECT count(*) FROM pg_subscription_rel WHERE srsubid IN (SELECT oid FROM pg_subscription WHERE subname = 'tap_sub_schema')"
160+
);
161+
is($result, qq(4),
162+
'check subscription relation status was dropped on subscriber');
163+
164+
# Drop table from the publication schema, verify that subscriber removes the
165+
# table entry after refresh.
166+
$node_publisher->safe_psql('postgres', "DROP TABLE sch1.tab2");
167+
$node_publisher->wait_for_catchup('tap_sub_schema');
168+
$result = $node_subscriber->safe_psql('postgres',
169+
"SELECT count(*) FROM pg_subscription_rel WHERE srsubid IN (SELECT oid FROM pg_subscription WHERE subname = 'tap_sub_schema')"
170+
);
171+
is($result, qq(4),
172+
'check subscription relation status is not yet dropped on subscriber');
173+
174+
# Table should be removed from pg_subscription_rel after refreshing the
175+
# publication in subscriber.
176+
$node_subscriber->safe_psql('postgres',
177+
"ALTER SUBSCRIPTION tap_sub_schema REFRESH PUBLICATION");
178+
179+
# Wait for sync to finish
180+
$node_subscriber->poll_query_until('postgres', $synced_query)
181+
or die "Timed out while waiting for subscriber to synchronize data";
182+
183+
$result = $node_subscriber->safe_psql('postgres',
184+
"SELECT count(*) FROM pg_subscription_rel WHERE srsubid IN (SELECT oid FROM pg_subscription WHERE subname = 'tap_sub_schema')"
185+
);
186+
is($result, qq(3),
187+
'check subscription relation status was dropped on subscriber');
188+
189+
# Drop schema from publication, verify that the inserts are not published after
190+
# dropping the schema from publication. Here 2nd insert should not be
191+
# published.
192+
$node_publisher->safe_psql('postgres', "
193+
INSERT INTO sch1.tab1 VALUES(21);
194+
ALTER PUBLICATION tap_pub_schema DROP ALL TABLES IN SCHEMA sch1;
195+
INSERT INTO sch1.tab1 values(22);"
196+
);
197+
198+
$node_publisher->wait_for_catchup('tap_sub_schema');
199+
200+
$result = $node_subscriber->safe_psql('postgres',
201+
"SELECT count(*), min(a), max(a) FROM sch1.tab1");
202+
is($result, qq(21|1|21), 'check replicated inserts on subscriber');
203+
204+
$node_subscriber->stop('fast');
205+
$node_publisher->stop('fast');

0 commit comments

Comments
 (0)