Skip to content

Commit 5a2f9b3

Browse files
committed
2 parents c857a98 + 3b2518a commit 5a2f9b3

File tree

8 files changed

+47
-27
lines changed

8 files changed

+47
-27
lines changed

contrib/mmts/arbiter.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -369,7 +369,7 @@ static void MtmSendHeartbeat()
369369
if (!MtmSendToNode(i, &msg, sizeof(msg))) {
370370
elog(LOG, "Arbiter failed to send heartbeat to node %d", i+1);
371371
} else {
372-
MTM_LOG2("Send heartbeat to node %d with timestamp %ld", i+1, now);
372+
MTM_LOG4("Send heartbeat to node %d with timestamp %ld", i+1, now);
373373
}
374374
} else {
375375
MTM_LOG2("Do not send heartbeat to node %d, busy mask %lld, status %d", i+1, (long long) busy_mask, Mtm->status);
@@ -897,7 +897,7 @@ static void MtmReceiver(Datum arg)
897897

898898
switch (msg->code) {
899899
case MSG_HEARTBEAT:
900-
MTM_LOG2("Receive HEARTBEAT from node %d with timestamp %ld delay %ld",
900+
MTM_LOG4("Receive HEARTBEAT from node %d with timestamp %ld delay %ld",
901901
node, msg->csn, USEC_TO_MSEC(MtmGetSystemTime() - msg->csn));
902902
continue;
903903
case MSG_POLL_REQUEST:

contrib/mmts/multimaster.c

Lines changed: 32 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1028,6 +1028,7 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
10281028
MTM_TXTRACE(x, "PostPrepareTransaction Start");
10291029

10301030
if (!x->isDistributed) {
1031+
MTM_TXTRACE(x, "not distributed?");
10311032
return;
10321033
}
10331034

@@ -1040,25 +1041,34 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
10401041
Assert(ts != NULL);
10411042
//if (x->gid[0]) MTM_LOG1("Preparing transaction %d (%s) at %ld", x->xid, x->gid, MtmGetCurrentTime());
10421043
if (!MtmIsCoordinator(ts) || Mtm->status == MTM_RECOVERY) {
1044+
MTM_TXTRACE(x, "recovery?");
10431045
Assert(x->gid[0]);
10441046
ts->votingCompleted = true;
1045-
if (Mtm->status != MTM_RECOVERY || Mtm->recoverySlot != MtmReplicationNodeId) {
1047+
MTM_TXTRACE(x, "recovery? 1");
1048+
if (Mtm->status != MTM_RECOVERY || Mtm->recoverySlot != MtmReplicationNodeId) {
1049+
MTM_TXTRACE(x, "recovery? 2");
10461050
MtmSend2PCMessage(ts, MSG_PREPARED); /* send notification to coordinator */
10471051
if (!MtmUseDtm) {
10481052
ts->status = TRANSACTION_STATUS_UNKNOWN;
10491053
}
10501054
} else {
1055+
MTM_TXTRACE(x, "recovery? 3");
10511056
ts->status = TRANSACTION_STATUS_UNKNOWN;
10521057
}
1058+
MTM_TXTRACE(x, "recovery? 4");
10531059
MtmUnlock();
1060+
MTM_TXTRACE(x, "recovery? 5");
10541061
MtmResetTransaction();
1062+
MTM_TXTRACE(x, "recovery? 6");
10551063
} else {
1064+
MTM_TXTRACE(x, "not recovery?");
10561065
Mtm2PCVoting(x, ts);
10571066
MtmUnlock();
10581067
if (x->isTwoPhase) {
10591068
MtmResetTransaction();
10601069
}
10611070
}
1071+
MTM_TXTRACE(x, "recovery? 7");
10621072
//if (x->gid[0]) MTM_LOG1("Prepared transaction %d (%s) csn=%ld at %ld: %d", x->xid, x->gid, ts->csn, MtmGetCurrentTime(), ts->status);
10631073
if (Mtm->inject2PCError == 3) {
10641074
Mtm->inject2PCError = 0;
@@ -1136,6 +1146,7 @@ MtmLogAbortLogicalMessage(int nodeId, char const* gid)
11361146
strcpy(msg.gid, gid);
11371147
msg.origin_node = nodeId;
11381148
msg.origin_lsn = replorigin_session_origin_lsn;
1149+
MTM_LOG2("[TRACE] MtmLogAbortLogicalMessage(%d, %s)", nodeId, gid);
11391150
XLogFlush(LogLogicalMessage("A", (char*)&msg, sizeof msg, false));
11401151
}
11411152

@@ -1224,6 +1235,7 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
12241235
MtmTransactionListAppend(ts);
12251236
if (*x->gid) {
12261237
replorigin_session_origin_lsn = InvalidXLogRecPtr;
1238+
MTM_TXTRACE(x, "MtmEndTransaction/MtmLogAbortLogicalMessage");
12271239
MtmLogAbortLogicalMessage(MtmNodeId, x->gid);
12281240
}
12291241
}
@@ -2878,7 +2890,9 @@ void MtmRollbackPreparedTransaction(int nodeId, char const* gid)
28782890
CommitTransactionCommand();
28792891
MtmEndSession(nodeId, true);
28802892
} else if (status == TRANSACTION_STATUS_IN_PROGRESS) {
2893+
MtmBeginSession(nodeId);
28812894
MtmLogAbortLogicalMessage(nodeId, gid);
2895+
MtmEndSession(nodeId, true);
28822896
}
28832897
}
28842898

@@ -3045,6 +3059,7 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
30453059
sscanf(strVal(elem->arg), "%lx", &recoveredLSN);
30463060
MTM_LOG1("Recovered position of node %d is %lx", MtmReplicationNodeId, recoveredLSN);
30473061
if (Mtm->nodes[MtmReplicationNodeId-1].restartLSN < recoveredLSN) {
3062+
MTM_LOG2("[restartlsn] node %d: %lx -> %lx (MtmReplicationStartupHook)", MtmReplicationNodeId, Mtm->nodes[MtmReplicationNodeId-1].restartLSN, recoveredLSN);
30483063
Mtm->nodes[MtmReplicationNodeId-1].restartLSN = recoveredLSN;
30493064
}
30503065
} else {
@@ -3210,18 +3225,20 @@ bool MtmFilterTransaction(char* record, int size)
32103225
}
32113226
restart_lsn = origin_node == MtmReplicationNodeId ? end_lsn : origin_lsn;
32123227
if (Mtm->nodes[origin_node-1].restartLSN < restart_lsn) {
3228+
MTM_LOG2("[restartlsn] node %d: %lx -> %lx (MtmFilterTransaction)", MtmReplicationNodeId, Mtm->nodes[MtmReplicationNodeId-1].restartLSN, restart_lsn);
32133229
Mtm->nodes[origin_node-1].restartLSN = restart_lsn;
32143230
} else {
32153231
duplicate = true;
32163232
}
32173233

32183234
if (duplicate) {
3219-
MTM_LOG1("Ignore transaction %s from node %d lsn %lx, flags=%x, origin node %d, original lsn=%lx, current lsn=%lx",
3220-
gid, replication_node, end_lsn, flags, origin_node, origin_lsn, restart_lsn);
3235+
MTM_LOG1("Ignore transaction %s from node %d flags=%x, our restartLSN for node: %lx,restart_lsn = (origin node %d == MtmReplicationNodeId %d) ? end_lsn=%lx, origin_lsn=%lx",
3236+
gid, replication_node, flags, Mtm->nodes[origin_node-1].restartLSN, origin_node, MtmReplicationNodeId, end_lsn, origin_lsn);
32213237
} else {
32223238
MTM_LOG2("Apply transaction %s from node %d lsn %lx, flags=%x, origin node %d, original lsn=%lx, current lsn=%lx",
32233239
gid, replication_node, end_lsn, flags, origin_node, origin_lsn, restart_lsn);
32243240
}
3241+
32253242
return duplicate;
32263243
}
32273244

@@ -4127,16 +4144,16 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
41274144

41284145
case T_VacuumStmt:
41294146
skipCommand = true;
4130-
if (context == PROCESS_UTILITY_TOPLEVEL) {
4131-
MtmProcessDDLCommand(queryString, false, true);
4132-
MtmTx.isDistributed = false;
4133-
} else if (MtmApplyContext != NULL) {
4134-
MemoryContext oldContext = MemoryContextSwitchTo(MtmApplyContext);
4135-
Assert(oldContext != MtmApplyContext);
4136-
MtmVacuumStmt = (VacuumStmt*)copyObject(parsetree);
4137-
MemoryContextSwitchTo(oldContext);
4138-
return;
4139-
}
4147+
// if (context == PROCESS_UTILITY_TOPLEVEL) {
4148+
// MtmProcessDDLCommand(queryString, false, true);
4149+
// MtmTx.isDistributed = false;
4150+
// } else if (MtmApplyContext != NULL) {
4151+
// MemoryContext oldContext = MemoryContextSwitchTo(MtmApplyContext);
4152+
// Assert(oldContext != MtmApplyContext);
4153+
// MtmVacuumStmt = (VacuumStmt*)copyObject(parsetree);
4154+
// MemoryContextSwitchTo(oldContext);
4155+
// return;
4156+
// }
41404157
break;
41414158

41424159
case T_CreateDomainStmt:
@@ -4231,7 +4248,7 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
42314248
if (indexStmt->concurrent)
42324249
{
42334250
if (context == PROCESS_UTILITY_TOPLEVEL) {
4234-
MtmProcessDDLCommand(queryString, false, true);
4251+
// MtmProcessDDLCommand(queryString, false, true);
42354252
MtmTx.isDistributed = false;
42364253
skipCommand = true;
42374254
/*
@@ -4258,7 +4275,7 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
42584275
if (stmt->removeType == OBJECT_INDEX && stmt->concurrent)
42594276
{
42604277
if (context == PROCESS_UTILITY_TOPLEVEL) {
4261-
MtmProcessDDLCommand(queryString, false, true);
4278+
// MtmProcessDDLCommand(queryString, false, true);
42624279
MtmTx.isDistributed = false;
42634280
skipCommand = true;
42644281
} else if (MtmApplyContext != NULL) {

contrib/mmts/multimaster.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
#define MTM_TXTRACE(tx, event)
3939
#else
4040
#define MTM_TXTRACE(tx, event) \
41-
fprintf(stderr, "[MTM_TXTRACE], %s, %lld, %s\n", tx->gid, (long long)MtmGetSystemTime(), event)
41+
fprintf(stderr, "[MTM_TXTRACE], %s, %lld, %s, %d\n", tx->gid, (long long)MtmGetSystemTime(), event, getpid())
4242
#endif
4343

4444
#define MULTIMASTER_NAME "multimaster"

contrib/mmts/pglogical_apply.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -434,6 +434,7 @@ process_remote_message(StringInfo s)
434434
* restartLSN without locks
435435
*/
436436
if (Mtm->nodes[origin_node-1].restartLSN < msg->origin_lsn) {
437+
MTM_LOG2("[restartlsn] node %d: %lx -> %lx (MtmFilterTransaction)", origin_node, Mtm->nodes[origin_node-1].restartLSN, msg->origin_lsn);
437438
Mtm->nodes[origin_node-1].restartLSN = msg->origin_lsn;
438439
replorigin_session_origin_lsn = msg->origin_lsn;
439440
MtmRollbackPreparedTransaction(origin_node, msg->gid);

contrib/mmts/pglogical_receiver.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,7 @@ pglogical_receiver_main(Datum main_arg)
338338
} else {
339339
originStartPos = replorigin_get_progress(originId, false);
340340
if (Mtm->nodes[nodeId-1].restartLSN < originStartPos) {
341+
MTM_LOG2("[restartlsn] node %d: %lx -> %lx (pglogical_receiver_mains)", nodeId, Mtm->nodes[nodeId-1].restartLSN, originStartPos);
341342
Mtm->nodes[nodeId-1].restartLSN = originStartPos;
342343
}
343344
MTM_LOG1("Restart logical receiver at position %lx with origin=%d from node %d", originStartPos, originId, nodeId);
@@ -533,7 +534,7 @@ pglogical_receiver_main(Datum main_arg)
533534
MtmSpillToFile(spill_file, buf.data, buf.used);
534535
ByteBufferReset(&buf);
535536
}
536-
if (stmt[0] == 'M' && (stmt[1] == 'L' || stmt[1] == 'C' || stmt[1] == 'A')) {
537+
if (stmt[0] == 'M' && (stmt[1] == 'L' || stmt[1] == 'A')) {
537538
MTM_LOG3("Process '%c' message from %d", stmt[1], nodeId);
538539
MtmExecutor(stmt, rc - hdr_len);
539540
} else {

contrib/mmts/tests/reinit-mm.sh

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,10 @@ do
5353
multimaster.use_raftable = false
5454
multimaster.queue_size=52857600
5555
multimaster.ignore_tables_without_pk = 1
56-
multimaster.heartbeat_recv_timeout = 1000
56+
multimaster.heartbeat_recv_timeout = 2000
5757
multimaster.heartbeat_send_timeout = 250
58-
multimaster.twopc_min_timeout = 400000
59-
multimaster.min_2pc_timeout = 400000
58+
multimaster.twopc_min_timeout = 40000000
59+
multimaster.min_2pc_timeout = 40000000
6060
multimaster.volkswagen_mode = 1
6161
multimaster.conn_strings = '$conn_str'
6262
multimaster.node_id = $i

contrib/mmts/tests2/docker-entrypoint.sh

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,8 @@ if [ "$1" = 'postgres' ]; then
6767
multimaster.conn_strings = '$CONNSTRS'
6868
multimaster.heartbeat_recv_timeout = 1100
6969
multimaster.heartbeat_send_timeout = 250
70-
multimaster.twopc_min_timeout = 200000
70+
multimaster.twopc_min_timeout = 20000
71+
multimaster.min_2pc_timeout = 10000
7172
EOF
7273

7374
cat $PGDATA/postgresql.conf

src/test/regress/pg_regress.c

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -549,10 +549,10 @@ convert_sourcefiles_in(char *source_subdir, char *dest_dir, char *dest_subdir, c
549549
}
550550
while (fgets(line, sizeof(line), infile))
551551
{
552-
// replace_string(line, "@abs_srcdir@", inputdir);
553-
replace_string(line, "@abs_srcdir@", "/pg/src/src/test/regress");
554-
// replace_string(line, "@abs_builddir@", outputdir);
555-
replace_string(line, "@abs_builddir@", "/pg/src/src/test/regress");
552+
replace_string(line, "@abs_srcdir@", inputdir);
553+
// replace_string(line, "@abs_srcdir@", "/pg/src/src/test/regress");
554+
replace_string(line, "@abs_builddir@", outputdir);
555+
// replace_string(line, "@abs_builddir@", "/pg/src/src/test/regress");
556556
replace_string(line, "@testtablespace@", testtablespace);
557557
replace_string(line, "@libdir@", dlpath);
558558
replace_string(line, "@DLSUFFIX@", DLSUFFIX);

0 commit comments

Comments
 (0)