Skip to content

Commit fb788a8

Browse files
committed
Add wait 2pc timeout
1 parent 95daee4 commit fb788a8

File tree

5 files changed

+75
-31
lines changed

5 files changed

+75
-31
lines changed

contrib/mmts/arbiter.c

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -720,14 +720,24 @@ static void MtmTransReceiver(Datum arg)
720720
MtmAbortTransaction(ts);
721721
}
722722

723+
if (!MtmUseDtm && msg->csn > ts->csn) {
724+
ts->csn = msg->csn;
725+
MtmSyncClock(ts->csn);
726+
}
727+
723728
if (++ts->nVotes == Mtm->nNodes) {
724729
/* All nodes are finished their transactions */
725-
if (ts->status == TRANSACTION_STATUS_IN_PROGRESS) {
730+
if (ts->status == TRANSACTION_STATUS_ABORTED) {
731+
MtmWakeUpBackend(ts);
732+
} else if (MtmUseDtm) {
733+
Assert(ts->status == TRANSACTION_STATUS_IN_PROGRESS);
726734
ts->nVotes = 1; /* I voted myself */
727735
MtmSendNotificationMessage(ts, MSG_PREPARE);
728736
} else {
729-
Assert(ts->status == TRANSACTION_STATUS_ABORTED);
730-
MtmWakeUpBackend(ts);
737+
Assert(ts->status == TRANSACTION_STATUS_IN_PROGRESS);
738+
ts->csn = MtmAssignCSN();
739+
ts->status = TRANSACTION_STATUS_UNKNOWN;
740+
MtmWakeUpBackend(ts);
731741
}
732742
}
733743
break;

contrib/mmts/multimaster.c

Lines changed: 55 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,8 @@ static int MtmWorkers;
191191
static int MtmVacuumDelay;
192192
static int MtmMinRecoveryLag;
193193
static int MtmMaxRecoveryLag;
194+
static int Mtm2PCPrepareRatio;
195+
static int Mtm2PCMinTimeout;
194196
static bool MtmIgnoreTablesWithoutPk;
195197

196198
static ExecutorFinish_hook_type PreviousExecutorFinishHook;
@@ -766,8 +768,6 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
766768

767769
}
768770

769-
static time_t maxWakeupTime;
770-
771771
static void
772772
MtmPostPrepareTransaction(MtmCurrentTrans* x)
773773
{
@@ -783,25 +783,32 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
783783
tm->state = ts;
784784
ts->votingCompleted = true;
785785
if (Mtm->status != MTM_RECOVERY) {
786-
MtmSendNotificationMessage(ts, MtmUseDtm ? MSG_READY : MSG_PREPARED); /* send notification to coordinator */
786+
if (MtmUseDtm) {
787+
MtmSendNotificationMessage(ts, MSG_READY); /* send notification to coordinator */
788+
} else {
789+
ts->csn = MtmAssignCSN();
790+
MtmSendNotificationMessage(ts, MSG_PREPARED); /* send notification to coordinator */
791+
ts->status = TRANSACTION_STATUS_UNKNOWN;
792+
}
787793
} else {
788794
ts->status = TRANSACTION_STATUS_UNKNOWN;
789795
}
790796
MtmUnlock();
791797
MtmResetTransaction(x);
792798
} else {
793-
time_t wakeupTime;
799+
time_t timeout = Max(Mtm2PCMinTimeout, (ts->csn - ts->snapshot)*Mtm2PCPrepareRatio/100000); /* usec->msec and percents */
800+
int result = 0;
794801
/* wait votes from all nodes */
795-
while (!ts->votingCompleted) {
802+
while (!ts->votingCompleted && !(result & WL_TIMEOUT)) {
796803
MtmUnlock();
797-
WaitLatch(&MyProc->procLatch, WL_LATCH_SET, -1);
804+
result = WaitLatch(&MyProc->procLatch, WL_LATCH_SET|WL_TIMEOUT, timeout);
798805
ResetLatch(&MyProc->procLatch);
799-
wakeupTime = MtmGetCurrentTime() - ts->wakeupTime;
800-
if (wakeupTime > maxWakeupTime) {
801-
maxWakeupTime = wakeupTime;
802-
}
803806
MtmLock(LW_SHARED);
804807
}
808+
if (!ts->votingCompleted) {
809+
ts->status = TRANSACTION_STATUS_ABORTED;
810+
elog(WARNING, "Transaction is aborted because of %d msec timeout expiration", (int)timeout);
811+
}
805812
x->status = ts->status;
806813
MTM_LOG3("%d: Result of vote: %d", MyProcPid, ts->status);
807814
MtmUnlock();
@@ -989,11 +996,12 @@ csn_t MtmGetTransactionCSN(TransactionId xid)
989996
}
990997

991998
void MtmWakeUpBackend(MtmTransState* ts)
992-
{
993-
MTM_LOG3("Wakeup backed procno=%d, pid=%d", ts->procno, ProcGlobal->allProcs[ts->procno].pid);
994-
ts->votingCompleted = true;
995-
ts->wakeupTime = MtmGetCurrentTime();
996-
SetLatch(&ProcGlobal->allProcs[ts->procno].procLatch);
999+
{
1000+
if (!ts->votingCompleted) {
1001+
MTM_LOG3("Wakeup backed procno=%d, pid=%d", ts->procno, ProcGlobal->allProcs[ts->procno].pid);
1002+
ts->votingCompleted = true;
1003+
SetLatch(&ProcGlobal->allProcs[ts->procno].procLatch);
1004+
}
9971005
}
9981006

9991007
void MtmAbortTransaction(MtmTransState* ts)
@@ -1599,6 +1607,38 @@ _PG_init(void)
15991607
if (!process_shared_preload_libraries_in_progress)
16001608
return;
16011609

1610+
DefineCustomIntVariable(
1611+
"multimaster.2pc_min_timeout",
1612+
"Minamal amount of time (milliseconds) to wait 2PC confirmation from all nodes",
1613+
"Timeout for 2PC is calculated as MAX(prepare_time*2pc_prepare_ratio/100,2pc_min_timeout)",
1614+
&Mtm2PCMinTimeout,
1615+
10000,
1616+
0,
1617+
INT_MAX,
1618+
PGC_BACKEND,
1619+
0,
1620+
NULL,
1621+
NULL,
1622+
NULL
1623+
);
1624+
1625+
DefineCustomIntVariable(
1626+
"multimaster.2pc_prepare_ratio",
1627+
"Percent of prepare time for maximal time of second phase of two-pahse commit",
1628+
"Timeout for 2PC is calculated as MAX(prepare_time*2pc_prepare_ratio/100,2pc_min_timeout)",
1629+
&Mtm2PCPrepareRatio,
1630+
100,
1631+
0,
1632+
INT_MAX,
1633+
PGC_BACKEND,
1634+
0,
1635+
NULL,
1636+
NULL,
1637+
NULL
1638+
);
1639+
1640+
1641+
16021642
DefineCustomIntVariable(
16031643
"multimaster.node_disable_delay",
16041644
"Minamal amount of time (sec) between node status change",

contrib/mmts/multimaster.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,6 @@ typedef struct MtmTransState
135135
int procno; /* pgprocno of transaction coordinator waiting for responses from replicas,
136136
used to notify coordinator by arbiter */
137137
int nSubxids; /* Number of subtransanctions */
138-
time_t wakeupTime;
139138
MtmMessageCode cmd; /* Notification message to be sent */
140139
struct MtmTransState* nextVoting; /* Next element in L1-list of voting transactions. */
141140
struct MtmTransState* next; /* Next element in L1 list of all finished transaction present in xid2state hash */

contrib/mmts/pglogical_receiver.c

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -53,15 +53,13 @@ static volatile sig_atomic_t got_sighup = false;
5353

5454
/* GUC variables */
5555
static int receiver_idle_time = 0;
56-
static bool receiver_sync_mode = true;
56+
static bool receiver_sync_mode = false;
5757

5858
/* Worker name */
59-
char worker_proc[BGW_MAXLEN];
59+
static char worker_proc[BGW_MAXLEN];
6060

6161
/* Lastly written positions */
6262
static XLogRecPtr output_written_lsn = InvalidXLogRecPtr;
63-
static XLogRecPtr output_fsync_lsn = InvalidXLogRecPtr;
64-
static XLogRecPtr output_applied_lsn = InvalidXLogRecPtr;
6563

6664
/* Stream functions */
6765
static void fe_sendint64(int64 i, char *buf);
@@ -91,16 +89,17 @@ receiver_raw_sighup(SIGNAL_ARGS)
9189
* Send a Standby Status Update message to server.
9290
*/
9391
static bool
94-
sendFeedback(PGconn *conn, int64 now)
92+
sendFeedback(PGconn *conn, int64 now, RepOriginId originId)
9593
{
9694
char replybuf[1 + 8 + 8 + 8 + 8 + 1];
9795
int len = 0;
96+
XLogRecPtr output_applied_lsn = replorigin_get_progress(originId, true);
9897

9998
replybuf[len] = 'r';
10099
len += 1;
101100
fe_sendint64(output_written_lsn, &replybuf[len]); /* write */
102101
len += 8;
103-
fe_sendint64(output_fsync_lsn, &replybuf[len]); /* flush */
102+
fe_sendint64(output_applied_lsn, &replybuf[len]); /* flush */
104103
len += 8;
105104
fe_sendint64(output_applied_lsn, &replybuf[len]); /* apply */
106105
len += 8;
@@ -409,8 +408,6 @@ pglogical_receiver_main(Datum main_arg)
409408

410409
/* Update written position */
411410
output_written_lsn = Max(walEnd, output_written_lsn);
412-
output_fsync_lsn = output_written_lsn;
413-
output_applied_lsn = output_written_lsn;
414411

415412
/*
416413
* If the server requested an immediate reply, send one.
@@ -424,7 +421,7 @@ pglogical_receiver_main(Datum main_arg)
424421
int64 now = feGetCurrentTimestamp();
425422

426423
/* Leave is feedback is not sent properly */
427-
if (!sendFeedback(conn, now))
424+
if (!sendFeedback(conn, now, originId))
428425
proc_exit(1);
429426
}
430427
continue;
@@ -482,8 +479,6 @@ pglogical_receiver_main(Datum main_arg)
482479
}
483480
/* Update written position */
484481
output_written_lsn = Max(walEnd, output_written_lsn);
485-
output_fsync_lsn = output_written_lsn;
486-
output_applied_lsn = output_written_lsn;
487482
}
488483

489484
/* No data, move to next loop */

contrib/mmts/tests/dtmbench.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ void* reader(void* arg)
129129
result r = txn.exec("select sum(v) from t");
130130
int64_t sum = r[0][0].as(int64_t());
131131
if (sum != prevSum) {
132-
r = txn.exec("select mtm_get_snapshot()");
132+
r = txn.exec("select mtm.get_snapshot()");
133133
printf("Total=%ld, snapshot=%ld\n", sum, r[0][0].as(int64_t()));
134134
prevSum = sum;
135135
}

0 commit comments

Comments
 (0)