Skip to content

Commit 0de091a

Browse files
author
Amit Kapila
committed
Fix an oversight in 3f28b2f.
Commit 3f28b2f tried to ensure that the replication origin shouldn't be advanced in case of an ERROR in the apply worker, so that it can request the same data again after restart. However, it is possible that an ERROR was caught and handled by a (say PL/pgSQL) function, and the apply worker continues to apply further changes, in which case, we shouldn't reset the replication origin. Ensure to reset the origin only when the apply worker exits after an ERROR. Commit 3f28b2f added new function geterrlevel, which we removed in HEAD as part of this commit, but kept it in backbranches to avoid breaking any applications. A separate case can be made to have such a function even for HEAD. Reported-by: Shawn McCoy <shawn.the.mccoy@gmail.com> Author: Hayato Kuroda <kuroda.hayato@fujitsu.com> Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com> Reviewed-by: vignesh C <vignesh21@gmail.com> Reviewed-by: Amit Kapila <amit.kapila16@gmail.com> Backpatch-through: 16, where it was introduced Discussion: https://postgr.es/m/CALsgZNCGARa2mcYNVTSj9uoPcJo-tPuWUGECReKpNgTpo31_Pw@mail.gmail.com
1 parent e9ab867 commit 0de091a

File tree

2 files changed

+87
-11
lines changed

2 files changed

+87
-11
lines changed

src/backend/replication/logical/worker.c

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -436,6 +436,8 @@ static inline void reset_apply_error_context_info(void);
436436
static TransApplyAction get_transaction_apply_action(TransactionId xid,
437437
ParallelApplyWorkerInfo **winfo);
438438

439+
static void replorigin_reset(int code, Datum arg);
440+
439441
/*
440442
* Form the origin name for the subscription.
441443
*
@@ -4431,6 +4433,14 @@ start_apply(XLogRecPtr origin_startpos)
44314433
}
44324434
PG_CATCH();
44334435
{
4436+
/*
4437+
* Reset the origin state to prevent the advancement of origin
4438+
* progress if we fail to apply. Otherwise, this will result in
4439+
* transaction loss as that transaction won't be sent again by the
4440+
* server.
4441+
*/
4442+
replorigin_reset(0, (Datum) 0);
4443+
44344444
if (MySubscription->disableonerr)
44354445
DisableSubscriptionAndExit();
44364446
else
@@ -4940,23 +4950,12 @@ void
49404950
apply_error_callback(void *arg)
49414951
{
49424952
ApplyErrorCallbackArg *errarg = &apply_error_callback_arg;
4943-
int elevel;
49444953

49454954
if (apply_error_callback_arg.command == 0)
49464955
return;
49474956

49484957
Assert(errarg->origin_name);
49494958

4950-
elevel = geterrlevel();
4951-
4952-
/*
4953-
* Reset the origin state to prevent the advancement of origin progress if
4954-
* we fail to apply. Otherwise, this will result in transaction loss as
4955-
* that transaction won't be sent again by the server.
4956-
*/
4957-
if (elevel >= ERROR)
4958-
replorigin_reset(0, (Datum) 0);
4959-
49604959
if (errarg->rel == NULL)
49614960
{
49624961
if (!TransactionIdIsValid(errarg->remote_xid))

src/test/subscription/t/100_bugs.pl

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -491,4 +491,81 @@
491491
$node_publisher->stop('fast');
492492
$node_subscriber->stop('fast');
493493

494+
# The bug was that when an ERROR was caught and handled by a (PL/pgSQL)
495+
# function, the apply worker reset the replication origin but continued
496+
# processing subsequent changes. So, we fail to update the replication origin
497+
# during further apply operations. This can lead to the apply worker requesting
498+
# the changes that have been applied again after restarting.
499+
500+
$node_publisher->rotate_logfile();
501+
$node_publisher->start();
502+
503+
$node_subscriber->rotate_logfile();
504+
$node_subscriber->start();
505+
506+
# Set up a publication with a table
507+
$node_publisher->safe_psql(
508+
'postgres', qq(
509+
CREATE TABLE t1 (a int);
510+
CREATE PUBLICATION regress_pub FOR TABLE t1;
511+
));
512+
513+
# Set up a subscription which subscribes the publication
514+
$node_subscriber->safe_psql(
515+
'postgres', qq(
516+
CREATE TABLE t1 (a int);
517+
CREATE SUBSCRIPTION regress_sub CONNECTION '$publisher_connstr' PUBLICATION regress_pub;
518+
));
519+
520+
$node_subscriber->wait_for_subscription_sync($node_publisher, 'regress_sub');
521+
522+
# Create an AFTER INSERT trigger on the table that raises and subsequently
523+
# handles an exception. Subsequent insertions will trigger this exception,
524+
# causing the apply worker to invoke its error callback with an ERROR. However,
525+
# since the error is caught within the trigger, the apply worker will continue
526+
# processing changes.
527+
$node_subscriber->safe_psql(
528+
'postgres', q{
529+
CREATE FUNCTION handle_exception_trigger()
530+
RETURNS TRIGGER AS $$
531+
BEGIN
532+
BEGIN
533+
-- Raise an exception
534+
RAISE EXCEPTION 'This is a test exception';
535+
EXCEPTION
536+
WHEN OTHERS THEN
537+
RETURN NEW;
538+
END;
539+
540+
RETURN NEW;
541+
END;
542+
$$ LANGUAGE plpgsql;
543+
544+
CREATE TRIGGER silent_exception_trigger
545+
AFTER INSERT OR UPDATE ON t1
546+
FOR EACH ROW
547+
EXECUTE FUNCTION handle_exception_trigger();
548+
549+
ALTER TABLE t1 ENABLE ALWAYS TRIGGER silent_exception_trigger;
550+
});
551+
552+
# Obtain current remote_lsn value to check its advancement later
553+
my $remote_lsn = $node_subscriber->safe_psql('postgres',
554+
"SELECT remote_lsn FROM pg_replication_origin_status os, pg_subscription s WHERE os.external_id = 'pg_' || s.oid AND s.subname = 'regress_sub'"
555+
);
556+
557+
# Insert a tuple to replicate changes
558+
$node_publisher->safe_psql('postgres', "INSERT INTO t1 VALUES (1);");
559+
$node_publisher->wait_for_catchup('regress_sub');
560+
561+
# Confirms the origin can be advanced
562+
$result = $node_subscriber->safe_psql('postgres',
563+
"SELECT remote_lsn > '$remote_lsn' FROM pg_replication_origin_status os, pg_subscription s WHERE os.external_id = 'pg_' || s.oid AND s.subname = 'regress_sub'"
564+
);
565+
is($result, 't',
566+
'remote_lsn has advanced for apply worker raising an exception');
567+
568+
$node_publisher->stop('fast');
569+
$node_subscriber->stop('fast');
570+
494571
done_testing();

0 commit comments

Comments
 (0)