Skip to content

Commit 0e091ce

Browse files
author
Amit Kapila
committed
Fix an oversight in 3f28b2f.
Commit 3f28b2f tried to ensure that the replication origin shouldn't be advanced in case of an ERROR in the apply worker, so that it can request the same data again after restart. However, it is possible that an ERROR was caught and handled by a (say PL/pgSQL) function, and the apply worker continues to apply further changes, in which case, we shouldn't reset the replication origin. Ensure to reset the origin only when the apply worker exits after an ERROR. Commit 3f28b2f added new function geterrlevel, which we removed in HEAD as part of this commit, but kept it in backbranches to avoid breaking any applications. A separate case can be made to have such a function even for HEAD. Reported-by: Shawn McCoy <shawn.the.mccoy@gmail.com> Author: Hayato Kuroda <kuroda.hayato@fujitsu.com> Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com> Reviewed-by: vignesh C <vignesh21@gmail.com> Reviewed-by: Amit Kapila <amit.kapila16@gmail.com> Backpatch-through: 16, where it was introduced Discussion: https://postgr.es/m/CALsgZNCGARa2mcYNVTSj9uoPcJo-tPuWUGECReKpNgTpo31_Pw@mail.gmail.com
1 parent 1f7878c commit 0e091ce

File tree

4 files changed

+87
-29
lines changed

4 files changed

+87
-29
lines changed

src/backend/replication/logical/worker.c

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -414,6 +414,8 @@ static inline void reset_apply_error_context_info(void);
414414
static TransApplyAction get_transaction_apply_action(TransactionId xid,
415415
ParallelApplyWorkerInfo **winfo);
416416

417+
static void replorigin_reset(int code, Datum arg);
418+
417419
/*
418420
* Form the origin name for the subscription.
419421
*
@@ -4516,6 +4518,14 @@ start_apply(XLogRecPtr origin_startpos)
45164518
}
45174519
PG_CATCH();
45184520
{
4521+
/*
4522+
* Reset the origin state to prevent the advancement of origin
4523+
* progress if we fail to apply. Otherwise, this will result in
4524+
* transaction loss as that transaction won't be sent again by the
4525+
* server.
4526+
*/
4527+
replorigin_reset(0, (Datum) 0);
4528+
45194529
if (MySubscription->disableonerr)
45204530
DisableSubscriptionAndExit();
45214531
else
@@ -5004,23 +5014,12 @@ void
50045014
apply_error_callback(void *arg)
50055015
{
50065016
ApplyErrorCallbackArg *errarg = &apply_error_callback_arg;
5007-
int elevel;
50085017

50095018
if (apply_error_callback_arg.command == 0)
50105019
return;
50115020

50125021
Assert(errarg->origin_name);
50135022

5014-
elevel = geterrlevel();
5015-
5016-
/*
5017-
* Reset the origin state to prevent the advancement of origin progress if
5018-
* we fail to apply. Otherwise, this will result in transaction loss as
5019-
* that transaction won't be sent again by the server.
5020-
*/
5021-
if (elevel >= ERROR)
5022-
replorigin_reset(0, (Datum) 0);
5023-
50245023
if (errarg->rel == NULL)
50255024
{
50265025
if (!TransactionIdIsValid(errarg->remote_xid))

src/backend/utils/error/elog.c

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1590,23 +1590,6 @@ geterrcode(void)
15901590
return edata->sqlerrcode;
15911591
}
15921592

1593-
/*
1594-
* geterrlevel --- return the currently set error level
1595-
*
1596-
* This is only intended for use in error callback subroutines, since there
1597-
* is no other place outside elog.c where the concept is meaningful.
1598-
*/
1599-
int
1600-
geterrlevel(void)
1601-
{
1602-
ErrorData *edata = &errordata[errordata_stack_depth];
1603-
1604-
/* we don't bother incrementing recursion_depth */
1605-
CHECK_STACK_DEPTH();
1606-
1607-
return edata->elevel;
1608-
}
1609-
16101593
/*
16111594
* geterrposition --- return the currently set error position (0 if none)
16121595
*

src/include/utils/elog.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,6 @@ extern int internalerrquery(const char *query);
227227
extern int err_generic_string(int field, const char *str);
228228

229229
extern int geterrcode(void);
230-
extern int geterrlevel(void);
231230
extern int geterrposition(void);
232231
extern int getinternalerrposition(void);
233232

src/test/subscription/t/100_bugs.pl

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -498,4 +498,81 @@
498498
$node_publisher->stop('fast');
499499
$node_subscriber->stop('fast');
500500

501+
# The bug was that when an ERROR was caught and handled by a (PL/pgSQL)
502+
# function, the apply worker reset the replication origin but continued
503+
# processing subsequent changes. So, we fail to update the replication origin
504+
# during further apply operations. This can lead to the apply worker requesting
505+
# the changes that have been applied again after restarting.
506+
507+
$node_publisher->rotate_logfile();
508+
$node_publisher->start();
509+
510+
$node_subscriber->rotate_logfile();
511+
$node_subscriber->start();
512+
513+
# Set up a publication with a table
514+
$node_publisher->safe_psql(
515+
'postgres', qq(
516+
CREATE TABLE t1 (a int);
517+
CREATE PUBLICATION regress_pub FOR TABLE t1;
518+
));
519+
520+
# Set up a subscription which subscribes the publication
521+
$node_subscriber->safe_psql(
522+
'postgres', qq(
523+
CREATE TABLE t1 (a int);
524+
CREATE SUBSCRIPTION regress_sub CONNECTION '$publisher_connstr' PUBLICATION regress_pub;
525+
));
526+
527+
$node_subscriber->wait_for_subscription_sync($node_publisher, 'regress_sub');
528+
529+
# Create an AFTER INSERT trigger on the table that raises and subsequently
530+
# handles an exception. Subsequent insertions will trigger this exception,
531+
# causing the apply worker to invoke its error callback with an ERROR. However,
532+
# since the error is caught within the trigger, the apply worker will continue
533+
# processing changes.
534+
$node_subscriber->safe_psql(
535+
'postgres', q{
536+
CREATE FUNCTION handle_exception_trigger()
537+
RETURNS TRIGGER AS $$
538+
BEGIN
539+
BEGIN
540+
-- Raise an exception
541+
RAISE EXCEPTION 'This is a test exception';
542+
EXCEPTION
543+
WHEN OTHERS THEN
544+
RETURN NEW;
545+
END;
546+
547+
RETURN NEW;
548+
END;
549+
$$ LANGUAGE plpgsql;
550+
551+
CREATE TRIGGER silent_exception_trigger
552+
AFTER INSERT OR UPDATE ON t1
553+
FOR EACH ROW
554+
EXECUTE FUNCTION handle_exception_trigger();
555+
556+
ALTER TABLE t1 ENABLE ALWAYS TRIGGER silent_exception_trigger;
557+
});
558+
559+
# Obtain current remote_lsn value to check its advancement later
560+
my $remote_lsn = $node_subscriber->safe_psql('postgres',
561+
"SELECT remote_lsn FROM pg_replication_origin_status os, pg_subscription s WHERE os.external_id = 'pg_' || s.oid AND s.subname = 'regress_sub'"
562+
);
563+
564+
# Insert a tuple to replicate changes
565+
$node_publisher->safe_psql('postgres', "INSERT INTO t1 VALUES (1);");
566+
$node_publisher->wait_for_catchup('regress_sub');
567+
568+
# Confirms the origin can be advanced
569+
$result = $node_subscriber->safe_psql('postgres',
570+
"SELECT remote_lsn > '$remote_lsn' FROM pg_replication_origin_status os, pg_subscription s WHERE os.external_id = 'pg_' || s.oid AND s.subname = 'regress_sub'"
571+
);
572+
is($result, 't',
573+
'remote_lsn has advanced for apply worker raising an exception');
574+
575+
$node_publisher->stop('fast');
576+
$node_subscriber->stop('fast');
577+
501578
done_testing();

0 commit comments

Comments
 (0)