Skip to content

Commit 9c13b68

Browse files
author
Amit Kapila
committed
Reset the logical worker type while cleaning up other worker info.
Commit 2a8b40e introduces the worker type field for logical replication workers, but forgot to reset the type when the worker exits. This can lead to recognizing a stopped worker as a valid logical replication worker. Fix it by resetting the worker type and additionally adding the safeguard to not use LogicalRepWorker until ->in_use is verified. Reported-by: Thomas Munro based on cfbot reports. Author: Hou Zhijie, Alvaro Herrera Reviewed-by: Amit Kapila Discussion: http://postgr.es/m/CA+hUKGK2RQh4LifVgBmkHsCYChP-65UwGXOmnCzYVa5aAt4GWg@mail.gmail.com
1 parent 252dcb3 commit 9c13b68

File tree

2 files changed

+9
-4
lines changed

2 files changed

+9
-4
lines changed

src/backend/replication/logical/launcher.c

+3-2
Original file line numberDiff line numberDiff line change
@@ -793,6 +793,7 @@ logicalrep_worker_cleanup(LogicalRepWorker *worker)
793793
{
794794
Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_EXCLUSIVE));
795795

796+
worker->type = WORKERTYPE_UNKNOWN;
796797
worker->in_use = false;
797798
worker->proc = NULL;
798799
worker->dbid = InvalidOid;
@@ -862,7 +863,7 @@ logicalrep_sync_worker_count(Oid subid)
862863
{
863864
LogicalRepWorker *w = &LogicalRepCtx->workers[i];
864865

865-
if (w->subid == subid && isTablesyncWorker(w))
866+
if (isTablesyncWorker(w) && w->subid == subid)
866867
res++;
867868
}
868869

@@ -889,7 +890,7 @@ logicalrep_pa_worker_count(Oid subid)
889890
{
890891
LogicalRepWorker *w = &LogicalRepCtx->workers[i];
891892

892-
if (w->subid == subid && isParallelApplyWorker(w))
893+
if (isParallelApplyWorker(w) && w->subid == subid)
893894
res++;
894895
}
895896

src/include/replication/worker_internal.h

+6-2
Original file line numberDiff line numberDiff line change
@@ -327,8 +327,10 @@ extern void pa_decr_and_wait_stream_block(void);
327327
extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
328328
XLogRecPtr remote_lsn);
329329

330-
#define isParallelApplyWorker(worker) ((worker)->type == WORKERTYPE_PARALLEL_APPLY)
331-
#define isTablesyncWorker(worker) ((worker)->type == WORKERTYPE_TABLESYNC)
330+
#define isParallelApplyWorker(worker) ((worker)->in_use && \
331+
(worker)->type == WORKERTYPE_PARALLEL_APPLY)
332+
#define isTablesyncWorker(worker) ((worker)->in_use && \
333+
(worker)->type == WORKERTYPE_TABLESYNC)
332334

333335
static inline bool
334336
am_tablesync_worker(void)
@@ -339,12 +341,14 @@ am_tablesync_worker(void)
339341
static inline bool
340342
am_leader_apply_worker(void)
341343
{
344+
Assert(MyLogicalRepWorker->in_use);
342345
return (MyLogicalRepWorker->type == WORKERTYPE_APPLY);
343346
}
344347

345348
static inline bool
346349
am_parallel_apply_worker(void)
347350
{
351+
Assert(MyLogicalRepWorker->in_use);
348352
return isParallelApplyWorker(MyLogicalRepWorker);
349353
}
350354

0 commit comments

Comments
 (0)