Skip to content

Commit f672df5

Browse files
author
Amit Kapila
committed
Remove the unnecessary PrepareWrite in pgoutput.
This issue exists from the inception of this code (PG-10) but got exposed by the recent commit ce0fdbf where we are using origins in tablesync workers. The problem was that we were sometimes sending the prepare_write ('w') message but then the actual message was not being sent and on the subscriber side, we always expect a message after prepare_write message which led to this bug. I refrained from backpatching this because there is no way in the core code to hit this prior to commit ce0fdbf and we haven't received any complaints so far. Reported-by: Erik Rijkers Author: Amit Kapila and Vignesh C Tested-by: Erik Rijkers Discussion: https://postgr.es/m/1295168140.139428.1613133237154@webmailclassic.xs4all.nl
1 parent 8001cb7 commit f672df5

File tree

2 files changed

+80
-8
lines changed

2 files changed

+80
-8
lines changed

src/backend/replication/pgoutput/pgoutput.c

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -342,10 +342,6 @@ pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
342342
{
343343
char *origin;
344344

345-
/* Message boundary */
346-
OutputPluginWrite(ctx, false);
347-
OutputPluginPrepareWrite(ctx, true);
348-
349345
/*----------
350346
* XXX: which behaviour do we want here?
351347
*
@@ -357,7 +353,13 @@ pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
357353
*----------
358354
*/
359355
if (replorigin_by_oid(txn->origin_id, true, &origin))
356+
{
357+
/* Message boundary */
358+
OutputPluginWrite(ctx, false);
359+
OutputPluginPrepareWrite(ctx, true);
360360
logicalrep_write_origin(ctx->out, origin, txn->origin_lsn);
361+
}
362+
361363
}
362364

363365
OutputPluginWrite(ctx, true);
@@ -780,12 +782,13 @@ pgoutput_stream_start(struct LogicalDecodingContext *ctx,
780782
{
781783
char *origin;
782784

783-
/* Message boundary */
784-
OutputPluginWrite(ctx, false);
785-
OutputPluginPrepareWrite(ctx, true);
786-
787785
if (replorigin_by_oid(txn->origin_id, true, &origin))
786+
{
787+
/* Message boundary */
788+
OutputPluginWrite(ctx, false);
789+
OutputPluginPrepareWrite(ctx, true);
788790
logicalrep_write_origin(ctx->out, origin, InvalidXLogRecPtr);
791+
}
789792
}
790793

791794
OutputPluginWrite(ctx, true);

src/test/subscription/t/100_bugs.pl

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,3 +153,72 @@
153153
$rows * 2, "2x$rows rows in t");
154154
is($node_twoways->safe_psql('d2', "SELECT count(f) FROM t2"),
155155
$rows * 2, "2x$rows rows in t2");
156+
157+
# Verify table data is synced with cascaded replication setup. This is mainly
158+
# to test whether the data written by tablesync worker gets replicated.
159+
my $node_pub = get_new_node('testpublisher1');
160+
$node_pub->init(allows_streaming => 'logical');
161+
$node_pub->start;
162+
163+
my $node_pub_sub = get_new_node('testpublisher_subscriber');
164+
$node_pub_sub->init(allows_streaming => 'logical');
165+
$node_pub_sub->start;
166+
167+
my $node_sub = get_new_node('testsubscriber1');
168+
$node_sub->init(allows_streaming => 'logical');
169+
$node_sub->start;
170+
171+
# Create the tables in all nodes.
172+
$node_pub->safe_psql('postgres', "CREATE TABLE tab1 (a int)");
173+
$node_pub_sub->safe_psql('postgres', "CREATE TABLE tab1 (a int)");
174+
$node_sub->safe_psql('postgres', "CREATE TABLE tab1 (a int)");
175+
176+
# Create a cascaded replication setup like:
177+
# N1 - Create publication testpub1.
178+
# N2 - Create publication testpub2 and also include subscriber which subscribes
179+
# to testpub1.
180+
# N3 - Create subscription testsub2 subscribes to testpub2.
181+
#
182+
# Note that subscription on N3 needs to be created before subscription on N2 to
183+
# test whether the data written by tablesync worker of N2 gets replicated.
184+
$node_pub->safe_psql('postgres',
185+
"CREATE PUBLICATION testpub1 FOR TABLE tab1");
186+
187+
$node_pub_sub->safe_psql('postgres',
188+
"CREATE PUBLICATION testpub2 FOR TABLE tab1");
189+
190+
my $publisher1_connstr = $node_pub->connstr . ' dbname=postgres';
191+
my $publisher2_connstr = $node_pub_sub->connstr . ' dbname=postgres';
192+
193+
$node_sub->safe_psql('postgres',
194+
"CREATE SUBSCRIPTION testsub2 CONNECTION '$publisher2_connstr' PUBLICATION testpub2"
195+
);
196+
197+
$node_pub_sub->safe_psql('postgres',
198+
"CREATE SUBSCRIPTION testsub1 CONNECTION '$publisher1_connstr' PUBLICATION testpub1"
199+
);
200+
201+
$node_pub->safe_psql('postgres',
202+
"INSERT INTO tab1 values(generate_series(1,10))");
203+
204+
# Verify that the data is cascaded from testpub1 to testsub1 and further from
205+
# testpub2 (which had testsub1) to testsub2.
206+
$node_pub->wait_for_catchup('testsub1');
207+
$node_pub_sub->wait_for_catchup('testsub2');
208+
209+
# Drop subscriptions as we don't need them anymore
210+
$node_pub_sub->safe_psql('postgres', "DROP SUBSCRIPTION testsub1");
211+
$node_sub->safe_psql('postgres', "DROP SUBSCRIPTION testsub2");
212+
213+
# Drop publications as we don't need them anymore
214+
$node_pub->safe_psql('postgres', "DROP PUBLICATION testpub1");
215+
$node_pub_sub->safe_psql('postgres', "DROP PUBLICATION testpub2");
216+
217+
# Clean up the tables on both publisher and subscriber as we don't need them
218+
$node_pub->safe_psql('postgres', "DROP TABLE tab1");
219+
$node_pub_sub->safe_psql('postgres', "DROP TABLE tab1");
220+
$node_sub->safe_psql('postgres', "DROP TABLE tab1");
221+
222+
$node_pub->stop('fast');
223+
$node_pub_sub->stop('fast');
224+
$node_sub->stop('fast');

0 commit comments

Comments
 (0)