Skip to content

Commit 66b84fa

Browse files
committed
Receive invalidation messages correctly in tablesync worker
We didn't accept any invalidation messages until the whole sync process had finished (because it flattens all the remote transactions in the single one). So the sync worker didn't learn about subscription changes/drop until it has finished. This could lead to "orphaned" sync workers. Author: Petr Jelinek <petr.jelinek@2ndquadrant.com> Reported-by: Masahiko Sawada <sawada.mshk@gmail.com>
1 parent 3c9bc21 commit 66b84fa

File tree

1 file changed

+15
-8
lines changed

1 file changed

+15
-8
lines changed

src/backend/replication/logical/worker.c

+15-8
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
118118

119119
static void store_flush_position(XLogRecPtr remote_lsn);
120120

121-
static void reread_subscription(void);
121+
static void maybe_reread_subscription(void);
122122

123123
/* Flags set by signal handlers */
124124
static volatile sig_atomic_t got_SIGHUP = false;
@@ -165,8 +165,7 @@ ensure_transaction(void)
165165

166166
StartTransactionCommand();
167167

168-
if (!MySubscriptionValid)
169-
reread_subscription();
168+
maybe_reread_subscription();
170169

171170
MemoryContextSwitchTo(ApplyMessageContext);
172171
return true;
@@ -463,6 +462,12 @@ apply_handle_commit(StringInfo s)
463462

464463
store_flush_position(commit_data.end_lsn);
465464
}
465+
else
466+
{
467+
/* Process any invalidation messages that might have accumulated. */
468+
AcceptInvalidationMessages();
469+
maybe_reread_subscription();
470+
}
466471

467472
in_remote_transaction = false;
468473

@@ -1119,8 +1124,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
11191124
* now.
11201125
*/
11211126
AcceptInvalidationMessages();
1122-
if (!MySubscriptionValid)
1123-
reread_subscription();
1127+
maybe_reread_subscription();
11241128

11251129
/* Process any table synchronization changes. */
11261130
process_syncing_tables(last_received);
@@ -1302,17 +1306,20 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
13021306
last_flushpos = flushpos;
13031307
}
13041308

1305-
13061309
/*
1307-
* Reread subscription info and exit on change.
1310+
* Reread subscription info if needed. Most changes will be exit.
13081311
*/
13091312
static void
1310-
reread_subscription(void)
1313+
maybe_reread_subscription(void)
13111314
{
13121315
MemoryContext oldctx;
13131316
Subscription *newsub;
13141317
bool started_tx = false;
13151318

1319+
/* When cache state is valid there is nothing to do here. */
1320+
if (MySubscriptionValid)
1321+
return;
1322+
13161323
/* This function might be called inside or outside of transaction. */
13171324
if (!IsTransactionState())
13181325
{

0 commit comments

Comments
 (0)