Skip to content

Commit bbefe45

Browse files
committed
Change detection of duplicated transaction
1 parent d531bb5 commit bbefe45

File tree

4 files changed

+48
-29
lines changed

4 files changed

+48
-29
lines changed

contrib/mmts/multimaster.c

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1531,8 +1531,8 @@ static void MtmEnableNode(int nodeId)
15311531
void MtmRecoveryCompleted(void)
15321532
{
15331533
int i;
1534-
MTM_LOG1("Recovery of node %d is completed, disabled mask=%llx, connectivity mask=%llx, live nodes=%d",
1535-
MtmNodeId, (long long) Mtm->disabledNodeMask, (long long) Mtm->connectivityMask, Mtm->nLiveNodes);
1534+
MTM_LOG1("Recovery of node %d is completed, disabled mask=%llx, connectivity mask=%llx, endLSN=%lx, live nodes=%d",
1535+
MtmNodeId, (long long) Mtm->disabledNodeMask, (long long) Mtm->connectivityMask, GetXLogInsertRecPtr(), Mtm->nLiveNodes);
15361536
MtmLock(LW_EXCLUSIVE);
15371537
Mtm->recoverySlot = 0;
15381538
Mtm->recoveredLSN = GetXLogInsertRecPtr();
@@ -1542,7 +1542,7 @@ void MtmRecoveryCompleted(void)
15421542
for (i = 0; i < Mtm->nAllNodes; i++) {
15431543
Mtm->nodes[i].lastHeartbeat = 0; /* defuse watchdog until first heartbeat is received */
15441544
}
1545-
/* Mode will be changed to online once all logical reciever are connected */
1545+
/* Mode will be changed to online once all logical receiver are connected */
15461546
MtmSwitchClusterMode(MTM_CONNECTED);
15471547
MtmUnlock();
15481548
}
@@ -2131,7 +2131,6 @@ static void MtmInitialize()
21312131
Mtm->nodes[i].restartLSN = InvalidXLogRecPtr;
21322132
Mtm->nodes[i].originId = InvalidRepOriginId;
21332133
Mtm->nodes[i].timeline = 0;
2134-
Mtm->nodes[i].recoveredLSN = InvalidXLogRecPtr;
21352134
}
21362135
Mtm->nodes[MtmNodeId-1].originId = DoNotReplicateId;
21372136
/* All transaction originated from the current node should be ignored during recovery */
@@ -2882,13 +2881,14 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
28822881
{
28832882
MtmReplicationMode mode = REPLMODE_OPEN_EXISTED;
28842883

2884+
MtmLock(LW_EXCLUSIVE);
28852885
while ((Mtm->status != MTM_CONNECTED && Mtm->status != MTM_ONLINE) || BIT_CHECK(Mtm->disabledNodeMask, nodeId-1))
28862886
{
28872887
if (*shutdown)
28882888
{
2889+
MtmUnlock();
28892890
return REPLMODE_EXIT;
28902891
}
2891-
MtmLock(LW_EXCLUSIVE);
28922892
if (BIT_CHECK(Mtm->disabledNodeMask, nodeId-1)) {
28932893
mode = REPLMODE_CREATE_NEW;
28942894
}
@@ -2911,6 +2911,7 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
29112911
MtmUnlock();
29122912
/* delay opening of other slots until recovery is completed */
29132913
MtmSleep(STATUS_POLL_DELAY);
2914+
MtmLock(LW_EXCLUSIVE);
29142915
}
29152916
if (mode == REPLMODE_RECOVERED) {
29162917
MTM_LOG1("%d: Restart replication from node %d after end of recovery", MyProcPid, nodeId);
@@ -2919,6 +2920,7 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
29192920
} else {
29202921
MTM_LOG1("%d: Continue replication from node %d", MyProcPid, nodeId);
29212922
}
2923+
MtmUnlock();
29222924
return mode;
29232925
}
29242926

@@ -3012,7 +3014,12 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
30123014
}
30133015
} else if (strcmp("mtm_recovered_pos", elem->defname) == 0) {
30143016
if (elem->arg != NULL && strVal(elem->arg) != NULL) {
3015-
sscanf(strVal(elem->arg), "%lx", &Mtm->nodes[MtmReplicationNodeId-1].recoveredLSN);
3017+
XLogRecPtr recoveredLSN;
3018+
sscanf(strVal(elem->arg), "%lx", &recoveredLSN);
3019+
MTM_LOG1("Recovered position of node %d is %lx", MtmReplicationNodeId, recoveredLSN);
3020+
if (Mtm->nodes[MtmReplicationNodeId-1].restartLSN < recoveredLSN) {
3021+
Mtm->nodes[MtmReplicationNodeId-1].restartLSN = recoveredLSN;
3022+
}
30163023
} else {
30173024
elog(ERROR, "Recovered position is not specified");
30183025
}
@@ -3127,16 +3134,21 @@ MtmReplicationRowFilterHook(struct PGLogicalRowFilterArgs* args)
31273134
return isDistributed;
31283135
}
31293136

3137+
/*
3138+
* Filter received transacyions at destination side.
3139+
* This function is executed by receiver, so there are no race conditions and it is possible to update nodes[i].restaetLSN without lock
3140+
*/
31303141
bool MtmFilterTransaction(char* record, int size)
31313142
{
31323143
StringInfoData s;
31333144
uint8 flags;
31343145
XLogRecPtr origin_lsn;
31353146
XLogRecPtr end_lsn;
3147+
XLogRecPtr restart_lsn;
31363148
int replication_node;
31373149
int origin_node;
31383150
char const* gid = "";
3139-
bool duplicate;
3151+
bool duplicate = false;
31403152

31413153
s.data = record;
31423154
s.len = size;
@@ -3172,11 +3184,17 @@ bool MtmFilterTransaction(char* record, int size)
31723184
default:
31733185
break;
31743186
}
3187+
restart_lsn = origin_node == MtmReplicationNodeId ? end_lsn : origin_lsn;
3188+
if (Mtm->nodes[origin_node-1].restartLSN < restart_lsn) {
3189+
Mtm->nodes[origin_node-1].restartLSN = restart_lsn;
3190+
} else {
3191+
duplicate = true;
3192+
}
3193+
31753194
//duplicate = Mtm->status == MTM_RECOVERY && origin_lsn != InvalidXLogRecPtr && origin_lsn <= Mtm->nodes[origin_node-1].restartLSN;
3176-
duplicate = origin_lsn != InvalidXLogRecPtr && origin_lsn <= Mtm->nodes[origin_node-1].restartLSN;
31773195

31783196
MTM_LOG1("%s transaction %s from node %d lsn %lx, flags=%x, origin node %d, original lsn=%lx, current lsn=%lx",
3179-
duplicate ? "Ignore" : "Apply", gid, replication_node, end_lsn, flags, origin_node, origin_lsn, Mtm->nodes[origin_node-1].restartLSN);
3197+
duplicate ? "Ignore" : "Apply", gid, replication_node, end_lsn, flags, origin_node, origin_lsn, restart_lsn);
31803198
return duplicate;
31813199
}
31823200

contrib/mmts/multimaster.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,6 @@ typedef struct
202202
void* lockGraphData;
203203
int lockGraphAllocated;
204204
int lockGraphUsed;
205-
XLogRecPtr recoveredLSN;
206205
} MtmNodeInfo;
207206

208207
typedef struct MtmTransState

contrib/mmts/pglogical_apply.c

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -430,11 +430,17 @@ process_remote_message(StringInfo s)
430430
MtmAbortLogicalMessage* msg = (MtmAbortLogicalMessage*)messageBody;
431431
int origin_node = msg->origin_node;
432432
Assert(messageSize == sizeof(MtmAbortLogicalMessage));
433+
/* This function is called directly by receiver, so there is no race condition and we can update
434+
* restartLSN without locks
435+
*/
433436
if (Mtm->nodes[origin_node-1].restartLSN < msg->origin_lsn) {
434437
Mtm->nodes[origin_node-1].restartLSN = msg->origin_lsn;
438+
replorigin_session_origin_lsn = msg->origin_lsn;
439+
MtmRollbackPreparedTransaction(origin_node, msg->gid);
440+
} else {
441+
MTM_LOG1("Ignore rollback of transaction %s from node %d because it's LSN %lx <= %lx",
442+
msg->gid, origin_node, msg->origin_lsn, Mtm->nodes[origin_node-1].restartLSN);
435443
}
436-
replorigin_session_origin_lsn = msg->origin_lsn;
437-
MtmRollbackPreparedTransaction(origin_node, msg->gid);
438444
standalone = true;
439445
break;
440446
}
@@ -611,9 +617,6 @@ process_remote_commit(StringInfo in)
611617
origin_lsn = pq_getmsgint64(in);
612618

613619
replorigin_session_origin_lsn = origin_node == MtmReplicationNodeId ? end_lsn : origin_lsn;
614-
if (Mtm->nodes[origin_node-1].restartLSN < replorigin_session_origin_lsn) {
615-
Mtm->nodes[origin_node-1].restartLSN = replorigin_session_origin_lsn;
616-
}
617620
Assert(replorigin_session_origin == InvalidRepOriginId);
618621

619622
switch(PGLOGICAL_XACT_EVENT(flags))

contrib/mmts/pglogical_receiver.c

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -260,9 +260,8 @@ pglogical_receiver_main(Datum main_arg)
260260
{
261261
int count;
262262
ConnStatusType status;
263-
XLogRecPtr originStartPos = InvalidXLogRecPtr;
263+
XLogRecPtr originStartPos = Mtm->nodes[nodeId-1].restartLSN;
264264
int timeline;
265-
bool newTimeline = false;
266265

267266
/*
268267
* Determine when and how we should open replication slot.
@@ -291,12 +290,12 @@ pglogical_receiver_main(Datum main_arg)
291290
if ((mode == REPLMODE_OPEN_EXISTED && timeline != Mtm->nodes[nodeId-1].timeline)
292291
|| mode == REPLMODE_CREATE_NEW)
293292
{ /* recreate slot */
293+
elog(LOG, "Recreate replication slot %s", slotName);
294294
appendPQExpBuffer(query, "DROP_REPLICATION_SLOT \"%s\"", slotName);
295295
res = PQexec(conn, query->data);
296296
PQclear(res);
297297
resetPQExpBuffer(query);
298298
timeline = Mtm->nodes[nodeId-1].timeline;
299-
newTimeline = true;
300299
}
301300
/* My original assumption was that we can perfrom recovery only from existed slot,
302301
* but unfortunately looks like slots can "disapear" together with WAL-sender.
@@ -322,11 +321,7 @@ pglogical_receiver_main(Datum main_arg)
322321
}
323322

324323
/* Start logical replication at specified position */
325-
if (mode == REPLMODE_RECOVERED) {
326-
originStartPos = Mtm->nodes[nodeId-1].restartLSN;
327-
MTM_LOG1("Restart replication from node %d from position %lx", nodeId, originStartPos);
328-
}
329-
if (originStartPos == InvalidXLogRecPtr && !newTimeline) {
324+
if (originStartPos == InvalidXLogRecPtr) {
330325
StartTransactionCommand();
331326
originName = psprintf(MULTIMASTER_SLOT_PATTERN, nodeId);
332327
originId = replorigin_by_name(originName, true);
@@ -349,10 +344,11 @@ pglogical_receiver_main(Datum main_arg)
349344
}
350345
Mtm->nodes[nodeId-1].originId = originId;
351346
CommitTransactionCommand();
352-
} else if (mode == REPLMODE_CREATE_NEW) {
353-
originStartPos = Mtm->nodes[nodeId-1].recoveredLSN;
354-
}
347+
}
355348

349+
MTM_LOG1("Start replication on slot %s from node %d at position %lx, mode %s, recovered lsn %lx",
350+
slotName, nodeId, originStartPos, MtmReplicationModeName[mode], Mtm->recoveredLSN);
351+
356352
appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL %x/%x (\"startup_params_format\" '1', \"max_proto_version\" '%d', \"min_proto_version\" '%d', \"forward_changesets\" '1', \"mtm_replication_mode\" '%s', \"mtm_restart_pos\" '%lx', \"mtm_recovered_pos\" '%lx')",
357353
slotName,
358354
(uint32) (originStartPos >> 32),
@@ -409,13 +405,16 @@ pglogical_receiver_main(Datum main_arg)
409405
ereport(LOG, (errmsg("%s: restart WAL receiver because node was switched to %s mode", worker_proc, MtmNodeStatusMnem[Mtm->status])));
410406
break;
411407
}
412-
if (count != Mtm->recoveryCount) {
413-
408+
if (count != Mtm->recoveryCount) {
414409
ereport(LOG, (errmsg("%s: restart WAL receiver because node was recovered", worker_proc)));
415410
break;
416411
}
417412

418-
413+
if (timeline != Mtm->nodes[nodeId-1].timeline) {
414+
ereport(LOG, (errmsg("%s: restart WAL receiver because node %d timeline is changed", worker_proc, nodeId)));
415+
break;
416+
}
417+
419418
/*
420419
* Receive data.
421420
*/

0 commit comments

Comments
 (0)