Skip to content

Commit 02c1b64

Browse files
author
Amit Kapila
committed
Refactor to split Apply and Tablesync Workers code.
Both apply and tablesync workers were using ApplyWorkerMain() as entry point. As the name implies, ApplyWorkerMain() should be considered as the main function for apply workers. Tablesync worker's path was hidden and does not have enough in common to share the same main function with apply worker. Also, most of the code shared by both worker types is already combined in LogicalRepApplyLoop(). There is no need to combine the rest in ApplyWorkerMain() anymore. This patch introduces TablesyncWorkerMain() as a new entry point for tablesync workers. This aims to increase code readability and would help with future improvements like the reuse of tablesync workers in the initial synchronization. Author: Melih Mutlu based on suggestions by Melanie Plageman Reviewed-by: Peter Smith, Kuroda Hayato, Amit Kapila Discussion: http://postgr.es/m/CAGPVpCTq=rUDd4JUdaRc1XUWf4BrH2gdSNf3rtOMUGj9rPpfzQ@mail.gmail.com
1 parent 0125c4e commit 02c1b64

File tree

7 files changed

+299
-224
lines changed

7 files changed

+299
-224
lines changed

src/backend/postmaster/bgworker.c

+3
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,9 @@ static const struct
131131
},
132132
{
133133
"ParallelApplyWorkerMain", ParallelApplyWorkerMain
134+
},
135+
{
136+
"TablesyncWorkerMain", TablesyncWorkerMain
134137
}
135138
};
136139

src/backend/replication/logical/applyparallelworker.c

+1-1
Original file line numberDiff line numberDiff line change
@@ -942,7 +942,7 @@ ParallelApplyWorkerMain(Datum main_arg)
942942
MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time =
943943
MyLogicalRepWorker->reply_time = 0;
944944

945-
InitializeApplyWorker();
945+
InitializeLogRepWorker();
946946

947947
InitializingApplyWorker = false;
948948

src/backend/replication/logical/launcher.c

+19-13
Original file line numberDiff line numberDiff line change
@@ -459,24 +459,30 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
459459
snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
460460

461461
if (is_parallel_apply_worker)
462+
{
462463
snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain");
463-
else
464-
snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
465-
466-
if (OidIsValid(relid))
467464
snprintf(bgw.bgw_name, BGW_MAXLEN,
468-
"logical replication worker for subscription %u sync %u", subid, relid);
469-
else if (is_parallel_apply_worker)
465+
"logical replication parallel apply worker for subscription %u",
466+
subid);
467+
snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
468+
}
469+
else if (OidIsValid(relid))
470+
{
471+
snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain");
470472
snprintf(bgw.bgw_name, BGW_MAXLEN,
471-
"logical replication parallel apply worker for subscription %u", subid);
473+
"logical replication tablesync worker for subscription %u sync %u",
474+
subid,
475+
relid);
476+
snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication tablesync worker");
477+
}
472478
else
479+
{
480+
snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
473481
snprintf(bgw.bgw_name, BGW_MAXLEN,
474-
"logical replication apply worker for subscription %u", subid);
475-
476-
if (is_parallel_apply_worker)
477-
snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
478-
else
479-
snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication worker");
482+
"logical replication apply worker for subscription %u",
483+
subid);
484+
snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication apply worker");
485+
}
480486

481487
bgw.bgw_restart_time = BGW_NEVER_RESTART;
482488
bgw.bgw_notify_pid = MyProcPid;

src/backend/replication/logical/tablesync.c

+90-1
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@
106106
#include "pgstat.h"
107107
#include "replication/logicallauncher.h"
108108
#include "replication/logicalrelation.h"
109+
#include "replication/logicalworker.h"
109110
#include "replication/walreceiver.h"
110111
#include "replication/worker_internal.h"
111112
#include "replication/slot.h"
@@ -1241,7 +1242,7 @@ ReplicationSlotNameForTablesync(Oid suboid, Oid relid,
12411242
*
12421243
* The returned slot name is palloc'ed in current memory context.
12431244
*/
1244-
char *
1245+
static char *
12451246
LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
12461247
{
12471248
char *slotname;
@@ -1584,6 +1585,94 @@ FetchTableStates(bool *started_tx)
15841585
return has_subrels;
15851586
}
15861587

1588+
/*
1589+
* Execute the initial sync with error handling. Disable the subscription,
1590+
* if it's required.
1591+
*
1592+
* Allocate the slot name in long-lived context on return. Note that we don't
1593+
* handle FATAL errors which are probably because of system resource error and
1594+
* are not repeatable.
1595+
*/
1596+
static void
1597+
start_table_sync(XLogRecPtr *origin_startpos, char **slotname)
1598+
{
1599+
char *sync_slotname = NULL;
1600+
1601+
Assert(am_tablesync_worker());
1602+
1603+
PG_TRY();
1604+
{
1605+
/* Call initial sync. */
1606+
sync_slotname = LogicalRepSyncTableStart(origin_startpos);
1607+
}
1608+
PG_CATCH();
1609+
{
1610+
if (MySubscription->disableonerr)
1611+
DisableSubscriptionAndExit();
1612+
else
1613+
{
1614+
/*
1615+
* Report the worker failed during table synchronization. Abort
1616+
* the current transaction so that the stats message is sent in an
1617+
* idle state.
1618+
*/
1619+
AbortOutOfAnyTransaction();
1620+
pgstat_report_subscription_error(MySubscription->oid, false);
1621+
1622+
PG_RE_THROW();
1623+
}
1624+
}
1625+
PG_END_TRY();
1626+
1627+
/* allocate slot name in long-lived context */
1628+
*slotname = MemoryContextStrdup(ApplyContext, sync_slotname);
1629+
pfree(sync_slotname);
1630+
}
1631+
1632+
/*
1633+
* Runs the tablesync worker.
1634+
*
1635+
* It starts syncing tables. After a successful sync, sets streaming options
1636+
* and starts streaming to catchup with apply worker.
1637+
*/
1638+
static void
1639+
run_tablesync_worker()
1640+
{
1641+
char originname[NAMEDATALEN];
1642+
XLogRecPtr origin_startpos = InvalidXLogRecPtr;
1643+
char *slotname = NULL;
1644+
WalRcvStreamOptions options;
1645+
1646+
start_table_sync(&origin_startpos, &slotname);
1647+
1648+
ReplicationOriginNameForLogicalRep(MySubscription->oid,
1649+
MyLogicalRepWorker->relid,
1650+
originname,
1651+
sizeof(originname));
1652+
1653+
set_apply_error_context_origin(originname);
1654+
1655+
set_stream_options(&options, slotname, &origin_startpos);
1656+
1657+
walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
1658+
1659+
/* Apply the changes till we catchup with the apply worker. */
1660+
start_apply(origin_startpos);
1661+
}
1662+
1663+
/* Logical Replication Tablesync worker entry point */
1664+
void
1665+
TablesyncWorkerMain(Datum main_arg)
1666+
{
1667+
int worker_slot = DatumGetInt32(main_arg);
1668+
1669+
SetupApplyOrSyncWorker(worker_slot);
1670+
1671+
run_tablesync_worker();
1672+
1673+
finish_sync_worker();
1674+
}
1675+
15871676
/*
15881677
* If the subscription has no tables then return false.
15891678
*

0 commit comments

Comments
 (0)