|
11 | 11 | # Create publisher node
|
12 | 12 | my $node_publisher = get_new_node('publisher');
|
13 | 13 | $node_publisher->init(allows_streaming => 'logical');
|
| 14 | +$node_publisher->append_conf('postgresql.conf', |
| 15 | + 'autovacuum = off'); |
14 | 16 | $node_publisher->start;
|
15 | 17 |
|
16 | 18 | # Create subscriber node
|
|
35 | 37 | "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub"
|
36 | 38 | );
|
37 | 39 |
|
| 40 | +$node_publisher->wait_for_catchup('tap_sub'); |
| 41 | + |
38 | 42 | # Ensure a transactional logical decoding message shows up on the slot
|
39 | 43 | $node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub DISABLE");
|
40 | 44 |
|
41 |
| -# wait for the replication connection to drop from the publisher |
| 45 | +# wait for the replication slot to become inactive in the publisher |
42 | 46 | $node_publisher->poll_query_until('postgres',
|
43 |
| - 'SELECT COUNT(*) FROM pg_catalog.pg_stat_replication', 0); |
| 47 | + "SELECT COUNT(*) FROM pg_catalog.pg_replication_slots WHERE slot_name = 'tap_sub' AND active='f'", 1); |
44 | 48 |
|
45 | 49 | $node_publisher->safe_psql('postgres',
|
46 | 50 | "SELECT pg_logical_emit_message(true, 'pgoutput', 'a transactional message')"
|
|
77 | 81 | $result = $node_publisher->safe_psql(
|
78 | 82 | 'postgres', qq(
|
79 | 83 | SELECT get_byte(data, 0)
|
80 |
| - FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL, |
| 84 | + FROM pg_logical_slot_get_binary_changes('tap_sub', NULL, NULL, |
81 | 85 | 'proto_version', '1',
|
82 | 86 | 'publication_names', 'tap_pub')
|
83 | 87 | ));
|
|
88 | 92 | 'option messages defaults to false so message (M) is not available on slot'
|
89 | 93 | );
|
90 | 94 |
|
91 |
| -$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub ENABLE"); |
92 |
| -$node_publisher->wait_for_catchup('tap_sub'); |
93 |
| - |
94 |
| -# ensure a non-transactional logical decoding message shows up on the slot |
95 |
| -$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub DISABLE"); |
96 |
| - |
97 |
| -# wait for the replication connection to drop from the publisher |
98 |
| -$node_publisher->poll_query_until('postgres', |
99 |
| - 'SELECT COUNT(*) FROM pg_catalog.pg_stat_replication', 0); |
100 |
| - |
101 | 95 | $node_publisher->safe_psql('postgres', "INSERT INTO tab_test VALUES (1)");
|
102 | 96 |
|
103 | 97 | my $message_lsn = $node_publisher->safe_psql('postgres',
|
|
109 | 103 | $result = $node_publisher->safe_psql(
|
110 | 104 | 'postgres', qq(
|
111 | 105 | SELECT get_byte(data, 0), get_byte(data, 1)
|
112 |
| - FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL, |
| 106 | + FROM pg_logical_slot_get_binary_changes('tap_sub', NULL, NULL, |
113 | 107 | 'proto_version', '1',
|
114 | 108 | 'publication_names', 'tap_pub',
|
115 | 109 | 'messages', 'true')
|
|
118 | 112 |
|
119 | 113 | is($result, qq(77|0), 'non-transactional message on slot is M');
|
120 | 114 |
|
121 |
| -$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub ENABLE"); |
122 |
| -$node_publisher->wait_for_catchup('tap_sub'); |
123 |
| - |
124 |
| -$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub DISABLE"); |
125 |
| - |
126 |
| -# wait for the replication connection to drop from the publisher |
127 |
| -$node_publisher->poll_query_until('postgres', |
128 |
| - 'SELECT COUNT(*) FROM pg_catalog.pg_stat_replication', 0); |
129 |
| - |
130 | 115 | # Ensure a non-transactional logical decoding message shows up on the slot when
|
131 | 116 | # it is emitted within an aborted transaction. The message won't emit until
|
132 | 117 | # something advances the LSN, hence, we intentionally forces the server to
|
|
0 commit comments