Skip to content

Commit a4a6557

Browse files
committed
Reverse started hook commits
1 parent 65b18ec commit a4a6557

File tree

10 files changed

+8
-111
lines changed

10 files changed

+8
-111
lines changed

contrib/mmts/multimaster.c

Lines changed: 7 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,6 @@ static int MtmMaxRecoveryLag;
245245
static int MtmGcPeriod;
246246
static bool MtmIgnoreTablesWithoutPk;
247247
static int MtmLockCount;
248-
static int MtmSenderStarted;
249248

250249
static ExecutorStart_hook_type PreviousExecutorStartHook;
251250
static ExecutorFinish_hook_type PreviousExecutorFinishHook;
@@ -1668,8 +1667,8 @@ void MtmRecoveryCompleted(void)
16681667
Mtm->nodes[i].lastHeartbeat = 0; /* defuse watchdog until first heartbeat is received */
16691668
}
16701669
/* Mode will be changed to online once all logical receiver are connected */
1671-
elog(LOG, "Recovery completed with %d active receivers and %d started senders from %d", Mtm->nReceivers, Mtm->nSenders, Mtm->nLiveNodes-1);
1672-
MtmSwitchClusterMode(Mtm->nReceivers == Mtm->nLiveNodes-1 && Mtm->nSenders == Mtm->nLiveNodes-1 ? MTM_ONLINE : MTM_CONNECTED);
1670+
elog(LOG, "Recovery completed with %d active receivers from %d", Mtm->nReceivers, Mtm->nLiveNodes-1);
1671+
MtmSwitchClusterMode(Mtm->nReceivers == Mtm->nLiveNodes-1 ? MTM_ONLINE : MTM_CONNECTED);
16731672
MtmUnlock();
16741673
}
16751674

@@ -2199,7 +2198,6 @@ static void MtmInitialize()
21992198
Mtm->transListHead = NULL;
22002199
Mtm->transListTail = &Mtm->transListHead;
22012200
Mtm->nReceivers = 0;
2202-
Mtm->nSenders = 0;
22032201
Mtm->timeShift = 0;
22042202
Mtm->transCount = 0;
22052203
Mtm->gcCount = 0;
@@ -2908,9 +2906,11 @@ void MtmReceiverStarted(int nodeId)
29082906
MtmEnableNode(nodeId);
29092907
MtmCheckQuorum();
29102908
}
2911-
elog(LOG, "Start %d receivers and %d senders from %d cluster status %s", Mtm->nReceivers+1, Mtm->nSenders, Mtm->nLiveNodes-1, MtmNodeStatusMnem[Mtm->status]);
2912-
if (++Mtm->nReceivers == Mtm->nLiveNodes-1 && Mtm->nSenders == Mtm->nLiveNodes-1 && Mtm->status == MTM_CONNECTED) {
2913-
MtmSwitchClusterMode(MTM_ONLINE);
2909+
elog(LOG, "Start %d receivers from %d cluster status %s", Mtm->nReceivers+1, Mtm->nLiveNodes-1, MtmNodeStatusMnem[Mtm->status]);
2910+
if (++Mtm->nReceivers == Mtm->nLiveNodes-1) {
2911+
if (Mtm->status == MTM_CONNECTED) {
2912+
MtmSwitchClusterMode(MTM_ONLINE);
2913+
}
29142914
}
29152915
}
29162916
MtmUnlock();
@@ -2997,7 +2997,6 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
29972997
elog(WARNING, "Process %d starts recovery from node %d", MyProcPid, nodeId);
29982998
Mtm->recoverySlot = nodeId;
29992999
Mtm->nReceivers = 0;
3000-
Mtm->nSenders = 0;
30013000
Mtm->recoveryCount += 1;
30023001
Mtm->pglogicalNodeMask = 0;
30033002
MtmUnlock();
@@ -3077,19 +3076,6 @@ MtmOnProcExit(int code, Datum arg)
30773076
}
30783077
}
30793078

3080-
static void
3081-
MtmReplicationStartedHook(struct PGLogicalStartedHookArgs* args)
3082-
{
3083-
MtmLock(LW_EXCLUSIVE);
3084-
MtmSenderStarted = 1;
3085-
elog(LOG, "Start %d senders and %d receivers from %d cluster status %s", Mtm->nSenders+1, Mtm->nReceivers, Mtm->nLiveNodes-1, MtmNodeStatusMnem[Mtm->status]);
3086-
if (++Mtm->nSenders == Mtm->nLiveNodes-1 && Mtm->nReceivers == Mtm->nLiveNodes-1 && Mtm->status == MTM_CONNECTED) {
3087-
MtmSwitchClusterMode(MTM_ONLINE);
3088-
}
3089-
MtmUnlock();
3090-
}
3091-
3092-
30933079
static void
30943080
MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
30953081
{
@@ -3206,9 +3192,6 @@ static void
32063192
MtmReplicationShutdownHook(struct PGLogicalShutdownHookArgs* args)
32073193
{
32083194
if (MtmReplicationNodeId >= 0) {
3209-
MtmLock(LW_EXCLUSIVE);
3210-
Mtm->nSenders -= MtmSenderStarted;
3211-
MtmUnlock();
32123195
MTM_LOG1("Logical replication to node %d is stopped", MtmReplicationNodeId);
32133196
/* MtmOnNodeDisconnect(MtmReplicationNodeId); */
32143197
MtmReplicationNodeId = -1; /* defuse on_proc_exit hook */
@@ -3320,7 +3303,6 @@ bool MtmFilterTransaction(char* record, int size)
33203303
void MtmSetupReplicationHooks(struct PGLogicalHooks* hooks)
33213304
{
33223305
hooks->startup_hook = MtmReplicationStartupHook;
3323-
hooks->started_hook = MtmReplicationStartedHook;
33243306
hooks->shutdown_hook = MtmReplicationShutdownHook;
33253307
hooks->txn_filter_hook = MtmReplicationTxnFilterHook;
33263308
hooks->row_filter_hook = MtmReplicationRowFilterHook;

contrib/mmts/multimaster.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -273,8 +273,7 @@ typedef struct
273273
int inject2PCError; /* Simulate error during 2PC commit at this node */
274274
int nLiveNodes; /* Number of active nodes */
275275
int nAllNodes; /* Total numbber of nodes */
276-
int nReceivers; /* Number of initialized logical receivers (used to determine moment when intialization/recovery is completed) */
277-
int nSenders; /* Number of started WAL senders (used to determine moment when recovery) */
276+
int nReceivers; /* Number of initialized logical receivers (used to determine moment when Mtm intialization is completed */
278277
int nLockers; /* Number of lockers */
279278
int nActiveTransactions; /* Nunmber of active 2PC transactions */
280279
int nConfigChanges; /* Number of cluster configuration changes */

contrib/mmts/pglogical_config.c

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -477,8 +477,6 @@ prepare_startup_message(PGLogicalOutputData *data)
477477
*/
478478
l = add_startup_msg_b(l, "hooks.startup_hook_enabled",
479479
data->hooks.startup_hook != NULL);
480-
l = add_startup_msg_b(l, "hooks.started_hook_enabled",
481-
data->hooks.started_hook != NULL);
482480
l = add_startup_msg_b(l, "hooks.shutdown_hook_enabled",
483481
data->hooks.shutdown_hook != NULL);
484482
l = add_startup_msg_b(l, "hooks.row_filter_enabled",

contrib/mmts/pglogical_hooks.c

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -96,14 +96,12 @@ load_hooks(PGLogicalOutputData *data)
9696

9797
elog(DEBUG3, "pglogical_output: Loaded hooks from function %u. Hooks are: \n"
9898
"\tstartup_hook: %p\n"
99-
"\tstarted_hook: %p\n"
10099
"\tshutdown_hook: %p\n"
101100
"\trow_filter_hook: %p\n"
102101
"\ttxn_filter_hook: %p\n"
103102
"\thooks_private_data: %p\n",
104103
hooks_func,
105104
data->hooks.startup_hook,
106-
data->hooks.started_hook,
107105
data->hooks.shutdown_hook,
108106
data->hooks.row_filter_hook,
109107
data->hooks.txn_filter_hook,
@@ -120,21 +118,6 @@ load_hooks(PGLogicalOutputData *data)
120118
CommitTransactionCommand();
121119
}
122120

123-
void
124-
call_started_hook(PGLogicalOutputData *data)
125-
{
126-
struct PGLogicalStartedHookArgs args;
127-
MemoryContext old_ctxt;
128-
129-
if (data->hooks.started_hook != NULL)
130-
{
131-
args.private_data = data->hooks.hooks_private_data;
132-
old_ctxt = MemoryContextSwitchTo(data->hooks_mctxt);
133-
(void) (*data->hooks.started_hook)(&args);
134-
MemoryContextSwitchTo(old_ctxt);
135-
}
136-
}
137-
138121
void
139122
call_startup_hook(PGLogicalOutputData *data, List *plugin_params)
140123
{

contrib/mmts/pglogical_hooks.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,6 @@ extern void load_hooks(PGLogicalOutputData *data);
1111

1212
extern void call_startup_hook(PGLogicalOutputData *data, List *plugin_params);
1313

14-
extern void call_started_hook(PGLogicalOutputData *data);
15-
1614
extern void call_shutdown_hook(PGLogicalOutputData *data);
1715

1816
extern bool call_row_filter_hook(PGLogicalOutputData *data,

contrib/mmts/pglogical_output.c

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@ extern void _PG_output_plugin_init(OutputPluginCallbacks *cb);
5454
/* These must be available to pg_dlsym() */
5555
static void pg_decode_startup(LogicalDecodingContext * ctx,
5656
OutputPluginOptions *opt, bool is_init);
57-
static void pg_decode_started(LogicalDecodingContext * ctx);
5857
static void pg_decode_shutdown(LogicalDecodingContext * ctx);
5958
static void pg_decode_begin_txn(LogicalDecodingContext *ctx,
6059
ReorderBufferTXN *txn);
@@ -84,7 +83,6 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
8483
AssertVariableIsOfType(&_PG_output_plugin_init, LogicalOutputPluginInit);
8584

8685
cb->startup_cb = pg_decode_startup;
87-
cb->started_cb = pg_decode_started;
8886
cb->begin_cb = pg_decode_begin_txn;
8987
cb->change_cb = pg_decode_change;
9088
cb->commit_cb = pg_decode_commit_txn;
@@ -492,15 +490,6 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
492490
MemoryContextReset(data->context);
493491
}
494492

495-
static void
496-
pg_decode_started(LogicalDecodingContext * ctx)
497-
{
498-
PGLogicalOutputData *data = ctx->output_plugin_private;
499-
call_started_hook(data);
500-
}
501-
502-
503-
504493
/*
505494
* Decide if the whole transaction with specific origin should be filtered out.
506495
*/

contrib/mmts/pglogical_output/hooks.h

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,6 @@ struct PGLogicalStartupHookArgs
2727

2828
typedef void (*pglogical_startup_hook_fn)(struct PGLogicalStartupHookArgs *args);
2929

30-
struct PGLogicalStartedHookArgs
31-
{
32-
void *private_data;
33-
};
34-
35-
typedef void (*pglogical_started_hook_fn)(struct PGLogicalStartedHookArgs *args);
3630

3731
struct PGLogicalTxnFilterArgs
3832
{
@@ -69,7 +63,6 @@ typedef void (*pglogical_shutdown_hook_fn)(struct PGLogicalShutdownHookArgs *arg
6963
struct PGLogicalHooks
7064
{
7165
pglogical_startup_hook_fn startup_hook;
72-
pglogical_started_hook_fn started_hook;
7366
pglogical_shutdown_hook_fn shutdown_hook;
7467
pglogical_txn_filter_hook_fn txn_filter_hook;
7568
pglogical_row_filter_hook_fn row_filter_hook;

contrib/multimaster/pglogical_output.c

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,6 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
7676
AssertVariableIsOfType(&_PG_output_plugin_init, LogicalOutputPluginInit);
7777

7878
cb->startup_cb = pg_decode_startup;
79-
cb->started_cb = pg_decode_started;
8079
cb->begin_cb = pg_decode_begin_txn;
8180
cb->change_cb = pg_decode_change;
8281
cb->commit_cb = pg_decode_commit_txn;

src/backend/replication/logical/logical.c

Lines changed: 0 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@ typedef struct LogicalErrorCallbackState
5454

5555
/* wrappers around output plugin callbacks */
5656
static void output_plugin_error_callback(void *arg);
57-
static void started_cb_wrapper(LogicalDecodingContext *ctx);
5857
static void startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
5958
bool is_init);
6059
static void shutdown_cb_wrapper(LogicalDecodingContext *ctx);
@@ -414,7 +413,6 @@ DecodingContextReady(LogicalDecodingContext *ctx)
414413
void
415414
DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
416415
{
417-
MemoryContext old_context;
418416
XLogRecPtr startptr;
419417

420418
/* Initialize from where to start reading WAL. */
@@ -449,13 +447,6 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
449447
}
450448

451449
ctx->slot->data.confirmed_flush = ctx->reader->EndRecPtr;
452-
453-
old_context = MemoryContextSwitchTo(ctx->context);
454-
if (ctx->callbacks.started_cb != NULL) {
455-
elog(LOG, "Call started callback");
456-
started_cb_wrapper(ctx);
457-
}
458-
MemoryContextSwitchTo(old_context);
459450
}
460451

461452
/*
@@ -571,31 +562,6 @@ startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool i
571562
error_context_stack = errcallback.previous;
572563
}
573564

574-
static void
575-
started_cb_wrapper(LogicalDecodingContext *ctx)
576-
{
577-
LogicalErrorCallbackState state;
578-
ErrorContextCallback errcallback;
579-
580-
/* Push callback + info on the error context stack */
581-
state.ctx = ctx;
582-
state.callback_name = "started";
583-
state.report_location = InvalidXLogRecPtr;
584-
errcallback.callback = output_plugin_error_callback;
585-
errcallback.arg = (void *) &state;
586-
errcallback.previous = error_context_stack;
587-
error_context_stack = &errcallback;
588-
589-
/* set output state */
590-
ctx->accept_writes = false;
591-
592-
/* do the actual work: call callback */
593-
ctx->callbacks.started_cb(ctx);
594-
595-
/* Pop the error context stack */
596-
error_context_stack = errcallback.previous;
597-
}
598-
599565
static void
600566
shutdown_cb_wrapper(LogicalDecodingContext *ctx)
601567
{

src/include/replication/output_plugin.h

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -47,15 +47,6 @@ typedef void (*LogicalDecodeStartupCB) (
4747
bool is_init
4848
);
4949

50-
/*
51-
* Callback that gets called when WAL-sender is started. ctx->private_data can
52-
* be set to some private data.
53-
*
54-
*/
55-
typedef void (*LogicalDecodeStartedCB) (
56-
struct LogicalDecodingContext *ctx
57-
);
58-
5950
/*
6051
* Callback called for every (explicit or implicit) BEGIN of a successful
6152
* transaction.
@@ -114,7 +105,6 @@ typedef void (*LogicalDecodeShutdownCB) (
114105
typedef struct OutputPluginCallbacks
115106
{
116107
LogicalDecodeStartupCB startup_cb;
117-
LogicalDecodeStartedCB started_cb;
118108
LogicalDecodeBeginCB begin_cb;
119109
LogicalDecodeChangeCB change_cb;
120110
LogicalDecodeCommitCB commit_cb;

0 commit comments

Comments
 (0)