Skip to content

Commit 93db6cb

Browse files
author
Amit Kapila
committed
Add a new slot sync worker to synchronize logical slots.
By enabling slot synchronization, all the failover logical replication slots on the primary (assuming configurations are appropriate) are automatically created on the physical standbys and are synced periodically. The slot sync worker on the standby server pings the primary server at regular intervals to get the necessary failover logical slots information and create/update the slots locally. The slots that no longer require synchronization are automatically dropped by the worker. The nap time of the worker is tuned according to the activity on the primary. The slot sync worker waits for some time before the next synchronization, with the duration varying based on whether any slots were updated during the last cycle. A new parameter sync_replication_slots enables or disables this new process. On promotion, the slot sync worker is shut down by the startup process to drop any temporary slots acquired by the slot sync worker and to prevent the worker from trying to fetch the failover slots. A functionality to allow logical walsenders to wait for the physical will be done in a subsequent commit. Author: Shveta Malik, Hou Zhijie based on design inputs by Masahiko Sawada and Amit Kapila Reviewed-by: Masahiko Sawada, Bertrand Drouvot, Peter Smith, Dilip Kumar, Ajin Cherian, Nisha Moond, Kuroda Hayato, Amit Kapila Discussion: https://postgr.es/m/514f6f2f-6833-4539-39f1-96cd1e011f23@enterprisedb.com
1 parent 3d47b75 commit 93db6cb

File tree

19 files changed

+966
-76
lines changed

19 files changed

+966
-76
lines changed

doc/src/sgml/config.sgml

+18
Original file line numberDiff line numberDiff line change
@@ -4943,6 +4943,24 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="
49434943
</listitem>
49444944
</varlistentry>
49454945

4946+
<varlistentry id="guc-sync-replication-slots" xreflabel="sync_replication_slots">
4947+
<term><varname>sync_replication_slots</varname> (<type>boolean</type>)
4948+
<indexterm>
4949+
<primary><varname>sync_replication_slots</varname> configuration parameter</primary>
4950+
</indexterm>
4951+
</term>
4952+
<listitem>
4953+
<para>
4954+
It enables a physical standby to synchronize logical failover slots
4955+
from the primary server so that logical subscribers can resume
4956+
replication from the new primary server after failover.
4957+
</para>
4958+
<para>
4959+
It is disabled by default. This parameter can only be set in the
4960+
<filename>postgresql.conf</filename> file or on the server command line.
4961+
</para>
4962+
</listitem>
4963+
</varlistentry>
49464964
</variablelist>
49474965
</sect2>
49484966

doc/src/sgml/logicaldecoding.sgml

+4-1
Original file line numberDiff line numberDiff line change
@@ -373,7 +373,10 @@ postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NU
373373
<command>CREATE SUBSCRIPTION</command> during slot creation, and then calling
374374
<link linkend="pg-sync-replication-slots">
375375
<function>pg_sync_replication_slots</function></link>
376-
on the standby. For the synchronization to work, it is mandatory to
376+
on the standby. By setting <link linkend="guc-sync-replication-slots">
377+
<varname>sync_replication_slots</varname></link>
378+
on the standby, the failover slots can be synchronized periodically in
379+
the slotsync worker. For the synchronization to work, it is mandatory to
377380
have a physical replication slot between the primary and the standby aka
378381
<link linkend="guc-primary-slot-name"><varname>primary_slot_name</varname></link>
379382
should be configured on the standby, and

src/backend/access/transam/xlogrecovery.c

+15
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
#include "postmaster/bgwriter.h"
5050
#include "postmaster/startup.h"
5151
#include "replication/slot.h"
52+
#include "replication/slotsync.h"
5253
#include "replication/walreceiver.h"
5354
#include "storage/fd.h"
5455
#include "storage/ipc.h"
@@ -1467,6 +1468,20 @@ FinishWalRecovery(void)
14671468
*/
14681469
XLogShutdownWalRcv();
14691470

1471+
/*
1472+
* Shutdown the slot sync worker to drop any temporary slots acquired by
1473+
* it and to prevent it from keep trying to fetch the failover slots.
1474+
*
1475+
* We do not update the 'synced' column from true to false here, as any
1476+
* failed update could leave 'synced' column false for some slots. This
1477+
* could cause issues during slot sync after restarting the server as a
1478+
* standby. While updating the 'synced' column after switching to the new
1479+
* timeline is an option, it does not simplify the handling for the
1480+
* 'synced' column. Therefore, we retain the 'synced' column as true after
1481+
* promotion as it may provide useful information about the slot origin.
1482+
*/
1483+
ShutDownSlotSync();
1484+
14701485
/*
14711486
* We are now done reading the xlog from stream. Turn off streaming
14721487
* recovery to force fetching the files (which would be required at end of

src/backend/postmaster/postmaster.c

+78-15
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@
115115
#include "postmaster/syslogger.h"
116116
#include "postmaster/walsummarizer.h"
117117
#include "replication/logicallauncher.h"
118+
#include "replication/slotsync.h"
118119
#include "replication/walsender.h"
119120
#include "storage/fd.h"
120121
#include "storage/ipc.h"
@@ -167,11 +168,11 @@
167168
* they will never become live backends. dead_end children are not assigned a
168169
* PMChildSlot. dead_end children have bkend_type NORMAL.
169170
*
170-
* "Special" children such as the startup, bgwriter and autovacuum launcher
171-
* tasks are not in this list. They are tracked via StartupPID and other
172-
* pid_t variables below. (Thus, there can't be more than one of any given
173-
* "special" child process type. We use BackendList entries for any child
174-
* process there can be more than one of.)
171+
* "Special" children such as the startup, bgwriter, autovacuum launcher, and
172+
* slot sync worker tasks are not in this list. They are tracked via StartupPID
173+
* and other pid_t variables below. (Thus, there can't be more than one of any
174+
* given "special" child process type. We use BackendList entries for any
175+
* child process there can be more than one of.)
175176
*/
176177
typedef struct bkend
177178
{
@@ -254,7 +255,8 @@ static pid_t StartupPID = 0,
254255
WalSummarizerPID = 0,
255256
AutoVacPID = 0,
256257
PgArchPID = 0,
257-
SysLoggerPID = 0;
258+
SysLoggerPID = 0,
259+
SlotSyncWorkerPID = 0;
258260

259261
/* Startup process's status */
260262
typedef enum
@@ -445,6 +447,7 @@ static void StartAutovacuumWorker(void);
445447
static void MaybeStartWalReceiver(void);
446448
static void MaybeStartWalSummarizer(void);
447449
static void InitPostmasterDeathWatchHandle(void);
450+
static void MaybeStartSlotSyncWorker(void);
448451

449452
/*
450453
* Archiver is allowed to start up at the current postmaster state?
@@ -1822,6 +1825,9 @@ ServerLoop(void)
18221825
if (PgArchPID == 0 && PgArchStartupAllowed())
18231826
PgArchPID = StartChildProcess(ArchiverProcess);
18241827

1828+
/* If we need to start a slot sync worker, try to do that now */
1829+
MaybeStartSlotSyncWorker();
1830+
18251831
/* If we need to signal the autovacuum launcher, do so now */
18261832
if (avlauncher_needs_signal)
18271833
{
@@ -2661,6 +2667,8 @@ process_pm_reload_request(void)
26612667
signal_child(PgArchPID, SIGHUP);
26622668
if (SysLoggerPID != 0)
26632669
signal_child(SysLoggerPID, SIGHUP);
2670+
if (SlotSyncWorkerPID != 0)
2671+
signal_child(SlotSyncWorkerPID, SIGHUP);
26642672

26652673
/* Reload authentication config files too */
26662674
if (!load_hba())
@@ -3010,6 +3018,7 @@ process_pm_child_exit(void)
30103018
AutoVacPID = StartAutoVacLauncher();
30113019
if (PgArchStartupAllowed() && PgArchPID == 0)
30123020
PgArchPID = StartChildProcess(ArchiverProcess);
3021+
MaybeStartSlotSyncWorker();
30133022

30143023
/* workers may be scheduled to start now */
30153024
maybe_start_bgworkers();
@@ -3180,6 +3189,22 @@ process_pm_child_exit(void)
31803189
continue;
31813190
}
31823191

3192+
/*
3193+
* Was it the slot sync worker? Normal exit or FATAL exit can be
3194+
* ignored (FATAL can be caused by libpqwalreceiver on receiving
3195+
* shutdown request by the startup process during promotion); we'll
3196+
* start a new one at the next iteration of the postmaster's main
3197+
* loop, if necessary. Any other exit condition is treated as a crash.
3198+
*/
3199+
if (pid == SlotSyncWorkerPID)
3200+
{
3201+
SlotSyncWorkerPID = 0;
3202+
if (!EXIT_STATUS_0(exitstatus) && !EXIT_STATUS_1(exitstatus))
3203+
HandleChildCrash(pid, exitstatus,
3204+
_("slot sync worker process"));
3205+
continue;
3206+
}
3207+
31833208
/* Was it one of our background workers? */
31843209
if (CleanupBackgroundWorker(pid, exitstatus))
31853210
{
@@ -3384,7 +3409,7 @@ CleanupBackend(int pid,
33843409

33853410
/*
33863411
* HandleChildCrash -- cleanup after failed backend, bgwriter, checkpointer,
3387-
* walwriter, autovacuum, archiver or background worker.
3412+
* walwriter, autovacuum, archiver, slot sync worker, or background worker.
33883413
*
33893414
* The objectives here are to clean up our local state about the child
33903415
* process, and to signal all other remaining children to quickdie.
@@ -3546,6 +3571,12 @@ HandleChildCrash(int pid, int exitstatus, const char *procname)
35463571
else if (PgArchPID != 0 && take_action)
35473572
sigquit_child(PgArchPID);
35483573

3574+
/* Take care of the slot sync worker too */
3575+
if (pid == SlotSyncWorkerPID)
3576+
SlotSyncWorkerPID = 0;
3577+
else if (SlotSyncWorkerPID != 0 && take_action)
3578+
sigquit_child(SlotSyncWorkerPID);
3579+
35493580
/* We do NOT restart the syslogger */
35503581

35513582
if (Shutdown != ImmediateShutdown)
@@ -3686,6 +3717,8 @@ PostmasterStateMachine(void)
36863717
signal_child(WalReceiverPID, SIGTERM);
36873718
if (WalSummarizerPID != 0)
36883719
signal_child(WalSummarizerPID, SIGTERM);
3720+
if (SlotSyncWorkerPID != 0)
3721+
signal_child(SlotSyncWorkerPID, SIGTERM);
36893722
/* checkpointer, archiver, stats, and syslogger may continue for now */
36903723

36913724
/* Now transition to PM_WAIT_BACKENDS state to wait for them to die */
@@ -3701,13 +3734,13 @@ PostmasterStateMachine(void)
37013734
/*
37023735
* PM_WAIT_BACKENDS state ends when we have no regular backends
37033736
* (including autovac workers), no bgworkers (including unconnected
3704-
* ones), and no walwriter, autovac launcher or bgwriter. If we are
3705-
* doing crash recovery or an immediate shutdown then we expect the
3706-
* checkpointer to exit as well, otherwise not. The stats and
3707-
* syslogger processes are disregarded since they are not connected to
3708-
* shared memory; we also disregard dead_end children here. Walsenders
3709-
* and archiver are also disregarded, they will be terminated later
3710-
* after writing the checkpoint record.
3737+
* ones), and no walwriter, autovac launcher, bgwriter or slot sync
3738+
* worker. If we are doing crash recovery or an immediate shutdown
3739+
* then we expect the checkpointer to exit as well, otherwise not. The
3740+
* stats and syslogger processes are disregarded since they are not
3741+
* connected to shared memory; we also disregard dead_end children
3742+
* here. Walsenders and archiver are also disregarded, they will be
3743+
* terminated later after writing the checkpoint record.
37113744
*/
37123745
if (CountChildren(BACKEND_TYPE_ALL - BACKEND_TYPE_WALSND) == 0 &&
37133746
StartupPID == 0 &&
@@ -3717,7 +3750,8 @@ PostmasterStateMachine(void)
37173750
(CheckpointerPID == 0 ||
37183751
(!FatalError && Shutdown < ImmediateShutdown)) &&
37193752
WalWriterPID == 0 &&
3720-
AutoVacPID == 0)
3753+
AutoVacPID == 0 &&
3754+
SlotSyncWorkerPID == 0)
37213755
{
37223756
if (Shutdown >= ImmediateShutdown || FatalError)
37233757
{
@@ -3815,6 +3849,7 @@ PostmasterStateMachine(void)
38153849
Assert(CheckpointerPID == 0);
38163850
Assert(WalWriterPID == 0);
38173851
Assert(AutoVacPID == 0);
3852+
Assert(SlotSyncWorkerPID == 0);
38183853
/* syslogger is not considered here */
38193854
pmState = PM_NO_CHILDREN;
38203855
}
@@ -4038,6 +4073,8 @@ TerminateChildren(int signal)
40384073
signal_child(AutoVacPID, signal);
40394074
if (PgArchPID != 0)
40404075
signal_child(PgArchPID, signal);
4076+
if (SlotSyncWorkerPID != 0)
4077+
signal_child(SlotSyncWorkerPID, signal);
40414078
}
40424079

40434080
/*
@@ -4850,6 +4887,7 @@ SubPostmasterMain(int argc, char *argv[])
48504887
*/
48514888
if (strcmp(argv[1], "--forkbackend") == 0 ||
48524889
strcmp(argv[1], "--forkavlauncher") == 0 ||
4890+
strcmp(argv[1], "--forkssworker") == 0 ||
48534891
strcmp(argv[1], "--forkavworker") == 0 ||
48544892
strcmp(argv[1], "--forkaux") == 0 ||
48554893
strcmp(argv[1], "--forkbgworker") == 0)
@@ -4953,6 +4991,13 @@ SubPostmasterMain(int argc, char *argv[])
49534991

49544992
AutoVacWorkerMain(argc - 2, argv + 2); /* does not return */
49554993
}
4994+
if (strcmp(argv[1], "--forkssworker") == 0)
4995+
{
4996+
/* Restore basic shared memory pointers */
4997+
InitShmemAccess(UsedShmemSegAddr);
4998+
4999+
ReplSlotSyncWorkerMain(argc - 2, argv + 2); /* does not return */
5000+
}
49565001
if (strcmp(argv[1], "--forkbgworker") == 0)
49575002
{
49585003
/* do this as early as possible; in particular, before InitProcess() */
@@ -5498,6 +5543,24 @@ MaybeStartWalSummarizer(void)
54985543
}
54995544

55005545

5546+
/*
5547+
* MaybeStartSlotSyncWorker
5548+
* Start the slot sync worker, if not running and our state allows.
5549+
*
5550+
* We allow to start the slot sync worker when we are on a hot standby,
5551+
* fast or immediate shutdown is not in progress, slot sync parameters
5552+
* are configured correctly, and it is the first time of worker's launch,
5553+
* or enough time has passed since the worker was launched last.
5554+
*/
5555+
static void
5556+
MaybeStartSlotSyncWorker(void)
5557+
{
5558+
if (SlotSyncWorkerPID == 0 && pmState == PM_HOT_STANDBY &&
5559+
Shutdown <= SmartShutdown && sync_replication_slots &&
5560+
ValidateSlotSyncParams(LOG) && SlotSyncWorkerCanRestart())
5561+
SlotSyncWorkerPID = StartSlotSyncWorker();
5562+
}
5563+
55015564
/*
55025565
* Create the opts file
55035566
*/

src/backend/replication/libpqwalreceiver/libpqwalreceiver.c

+3
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@
66
* loaded as a dynamic module to avoid linking the main server binary with
77
* libpq.
88
*
9+
* Apart from walreceiver, the libpq-specific routines are now being used by
10+
* logical replication workers and slot synchronization.
11+
*
912
* Portions Copyright (c) 2010-2024, PostgreSQL Global Development Group
1013
*
1114
*

0 commit comments

Comments
 (0)