Skip to content

Commit 305bb17

Browse files
committed
Improve deadlock detection algorithm by taking in account hidden dependencies between transactions caused by lack of vacant workers in apply pool
1 parent 82d2e2d commit 305bb17

File tree

4 files changed

+69
-26
lines changed

4 files changed

+69
-26
lines changed

contrib/mmts/bgwpool.c

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ static void BgwPoolMainLoop(Datum arg)
3535
work = malloc(size);
3636
pool->pending -= 1;
3737
pool->active += 1;
38+
if (pool->lastPeakTime == 0 && pool->active == pool->nWorkers && pool->pending != 0) {
39+
pool->lastPeakTime = MtmGetSystemTime();
40+
}
3841
if (pool->head + size + 4 > pool->size) {
3942
memcpy(work, pool->queue, size);
4043
pool->head = INTALIGN(size);
@@ -48,17 +51,19 @@ static void BgwPoolMainLoop(Datum arg)
4851
if (pool->producerBlocked) {
4952
pool->producerBlocked = false;
5053
PGSemaphoreUnlock(&pool->overflow);
54+
pool->lastPeakTime = 0;
5155
}
5256
SpinLockRelease(&pool->lock);
5357
pool->executor(id, work, size);
5458
free(work);
5559
SpinLockAcquire(&pool->lock);
5660
pool->active -= 1;
61+
pool->lastPeakTime = 0;
5762
SpinLockRelease(&pool->lock);
5863
}
5964
}
6065

61-
void BgwPoolInit(BgwPool* pool, BgwPoolExecutor executor, char const* dbname, size_t queueSize)
66+
void BgwPoolInit(BgwPool* pool, BgwPoolExecutor executor, char const* dbname, size_t queueSize, size_t nWorkers)
6267
{
6368
pool->queue = (char*)ShmemAlloc(queueSize);
6469
pool->executor = executor;
@@ -73,8 +78,15 @@ void BgwPoolInit(BgwPool* pool, BgwPoolExecutor executor, char const* dbname, si
7378
pool->size = queueSize;
7479
pool->active = 0;
7580
pool->pending = 0;
81+
pool->nWorkers = nWorkers;
82+
pool->lastPeakTime = 0;
7683
strcpy(pool->dbname, dbname);
7784
}
85+
86+
timestamp_t BgwGetLastPeekTime(BgwPool* pool)
87+
{
88+
return pool->lastPeakTime;
89+
}
7890

7991
void BgwPoolStart(int nWorkers, BgwPoolConstructor constructor)
8092
{
@@ -123,12 +135,18 @@ void BgwPoolExecute(BgwPool* pool, void* work, size_t size)
123135
if ((pool->head <= pool->tail && pool->size - pool->tail < size + 4 && pool->head < size)
124136
|| (pool->head > pool->tail && pool->head - pool->tail < size + 4))
125137
{
126-
pool->producerBlocked = true;
138+
if (pool->lastPeakTime == 0) {
139+
pool->lastPeakTime = MtmGetSystemTime();
140+
}
141+
pool->producerBlocked = true;
127142
SpinLockRelease(&pool->lock);
128143
PGSemaphoreLock(&pool->overflow);
129144
SpinLockAcquire(&pool->lock);
130145
} else {
131146
pool->pending += 1;
147+
if (pool->lastPeakTime == 0 && pool->active == pool->nWorkers && pool->pending != 0) {
148+
pool->lastPeakTime = MtmGetSystemTime();
149+
}
132150
*(int*)&pool->queue[pool->tail] = size;
133151
if (pool->size - pool->tail >= size + 4) {
134152
memcpy(&pool->queue[pool->tail+4], work, size);

contrib/mmts/bgwpool.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77

88
typedef void(*BgwPoolExecutor)(int id, void* work, size_t size);
99

10+
typedef uint64 timestamp_t;
11+
1012
#define MAX_DBNAME_LEN 30
1113
#define MULTIMASTER_BGW_RESTART_TIMEOUT 1 /* seconds */
1214

@@ -21,6 +23,8 @@ typedef struct
2123
size_t size;
2224
size_t active;
2325
size_t pending;
26+
size_t nWorkers;
27+
time_t lastPeakTime;
2428
bool producerBlocked;
2529
char dbname[MAX_DBNAME_LEN];
2630
char* queue;
@@ -30,10 +34,12 @@ typedef BgwPool*(*BgwPoolConstructor)(void);
3034

3135
extern void BgwPoolStart(int nWorkers, BgwPoolConstructor constructor);
3236

33-
extern void BgwPoolInit(BgwPool* pool, BgwPoolExecutor executor, char const* dbname, size_t queueSize);
37+
extern void BgwPoolInit(BgwPool* pool, BgwPoolExecutor executor, char const* dbname, size_t queueSize, size_t nWorkers);
3438

3539
extern void BgwPoolExecute(BgwPool* pool, void* work, size_t size);
3640

3741
extern size_t BgwPoolGetQueueSize(BgwPool* pool);
3842

43+
extern timestamp_t BgwGetLastPeekTime(BgwPool* pool);
44+
3945
#endif

contrib/mmts/multimaster.c

Lines changed: 33 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -256,13 +256,18 @@ void MtmUnlockNode(int nodeId)
256256
*/
257257

258258

259-
timestamp_t MtmGetCurrentTime(void)
259+
timestamp_t MtmGetSystemTime(void)
260260
{
261261
struct timeval tv;
262262
gettimeofday(&tv, NULL);
263263
return (timestamp_t)tv.tv_sec*USEC + tv.tv_usec + Mtm->timeShift;
264264
}
265265

266+
timestamp_t MtmGetCurrentTime(void)
267+
{
268+
return MtmGetSystemTime() + Mtm->timeShift;
269+
}
270+
266271
void MtmSleep(timestamp_t interval)
267272
{
268273
struct timespec ts;
@@ -1046,7 +1051,7 @@ void MtmRecoveryCompleted(void)
10461051
MtmLock(LW_EXCLUSIVE);
10471052
Mtm->recoverySlot = 0;
10481053
BIT_CLEAR(Mtm->disabledNodeMask, MtmNodeId-1);
1049-
Mtm->nodes[MtmNodeId-1].lastStatusChangeTime = time(NULL);
1054+
Mtm->nodes[MtmNodeId-1].lastStatusChangeTime = MtmGetSystemTime();
10501055
/* Mode will be changed to online once all locagical reciever are connected */
10511056
MtmSwitchClusterMode(MTM_CONNECTED);
10521057
MtmUnlock();
@@ -1135,7 +1140,7 @@ bool MtmRecoveryCaughtUp(int nodeId, XLogRecPtr slotLSN)
11351140
/* We are lucky: caugth-up without locking cluster! */
11361141
}
11371142
BIT_CLEAR(Mtm->disabledNodeMask, nodeId-1);
1138-
Mtm->nodes[nodeId-1].lastStatusChangeTime = time(NULL);
1143+
Mtm->nodes[nodeId-1].lastStatusChangeTime = MtmGetSystemTime();
11391144
Mtm->nNodes += 1;
11401145
caughtUp = true;
11411146
} else if (!BIT_CHECK(Mtm->nodeLockerMask, nodeId-1)
@@ -1280,15 +1285,15 @@ bool MtmRefreshClusterStatus(bool nowait)
12801285
if (mask & 1) {
12811286
Mtm->nNodes -= 1;
12821287
BIT_SET(Mtm->disabledNodeMask, i);
1283-
Mtm->nodes[i].lastStatusChangeTime = time(NULL);
1288+
Mtm->nodes[i].lastStatusChangeTime = MtmGetSystemTime();
12841289
}
12851290
}
12861291
mask = clique & Mtm->disabledNodeMask; /* new enabled nodes mask */
12871292
for (i = 0; mask != 0; i++, mask >>= 1) {
12881293
if (mask & 1) {
12891294
Mtm->nNodes += 1;
12901295
BIT_CLEAR(Mtm->disabledNodeMask, i);
1291-
Mtm->nodes[i].lastStatusChangeTime = time(NULL);
1296+
Mtm->nodes[i].lastStatusChangeTime = MtmGetSystemTime();
12921297
}
12931298
}
12941299
MtmCheckQuorum();
@@ -1328,7 +1333,7 @@ void MtmOnNodeDisconnect(int nodeId)
13281333
{
13291334
MtmTransState *ts;
13301335

1331-
if (Mtm->nodes[nodeId-1].lastStatusChangeTime + MtmNodeDisableDelay > time(NULL)) {
1336+
if (Mtm->nodes[nodeId-1].lastStatusChangeTime + MSEC_TO_USEC(MtmNodeDisableDelay) > MtmGetSystemTime()) {
13321337
/* Avoid false detection of node failure and prevent node status blinking */
13331338
return;
13341339
}
@@ -1343,7 +1348,7 @@ void MtmOnNodeDisconnect(int nodeId)
13431348
if (!MtmRefreshClusterStatus(false)) {
13441349
MtmLock(LW_EXCLUSIVE);
13451350
if (!BIT_CHECK(Mtm->disabledNodeMask, nodeId-1)) {
1346-
Mtm->nodes[nodeId-1].lastStatusChangeTime = time(NULL);
1351+
Mtm->nodes[nodeId-1].lastStatusChangeTime = MtmGetSystemTime();
13471352
BIT_SET(Mtm->disabledNodeMask, nodeId-1);
13481353
Mtm->nNodes -= 1;
13491354
MtmCheckQuorum();
@@ -1511,14 +1516,14 @@ static void MtmInitialize()
15111516
for (i = 0; i < MtmNodes; i++) {
15121517
Mtm->nodes[i].oldestSnapshot = 0;
15131518
Mtm->nodes[i].transDelay = 0;
1514-
Mtm->nodes[i].lastStatusChangeTime = time(NULL);
1519+
Mtm->nodes[i].lastStatusChangeTime = MtmGetSystemTime();
15151520
Mtm->nodes[i].con = MtmConnections[i];
15161521
Mtm->nodes[i].flushPos = 0;
15171522
}
15181523
PGSemaphoreCreate(&Mtm->votingSemaphore);
15191524
PGSemaphoreReset(&Mtm->votingSemaphore);
15201525
SpinLockInit(&Mtm->spinlock);
1521-
BgwPoolInit(&Mtm->pool, MtmExecutor, MtmDatabaseName, MtmQueueSize);
1526+
BgwPoolInit(&Mtm->pool, MtmExecutor, MtmDatabaseName, MtmQueueSize, MtmWorkers);
15221527
RegisterXactCallback(MtmXactCallback, NULL);
15231528
MtmTx.snapshot = INVALID_CSN;
15241529
MtmTx.xid = InvalidTransactionId;
@@ -1682,10 +1687,10 @@ _PG_init(void)
16821687

16831688
DefineCustomIntVariable(
16841689
"multimaster.node_disable_delay",
1685-
"Minamal amount of time (sec) between node status change",
1690+
"Minamal amount of time (msec) between node status change",
16861691
"This delay is used to avoid false detection of node failure and to prevent blinking of node status node",
16871692
&MtmNodeDisableDelay,
1688-
1,
1693+
1000,
16891694
1,
16901695
INT_MAX,
16911696
PGC_BACKEND,
@@ -2033,7 +2038,7 @@ void MtmDropNode(int nodeId, bool dropSlot)
20332038
{
20342039
elog(ERROR, "NodeID %d is out of range [1,%d]", nodeId, Mtm->nNodes);
20352040
}
2036-
Mtm->nodes[nodeId-1].lastStatusChangeTime = time(NULL);
2041+
Mtm->nodes[nodeId-1].lastStatusChangeTime = MtmGetSystemTime();
20372042
BIT_SET(Mtm->disabledNodeMask, nodeId-1);
20382043
Mtm->nNodes -= 1;
20392044
MtmCheckQuorum();
@@ -2084,15 +2089,15 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
20842089
if (MtmIsRecoverySession) {
20852090
MTM_LOG1("%d: Node %d start recovery of node %d", MyProcPid, MtmNodeId, MtmReplicationNodeId);
20862091
if (!BIT_CHECK(Mtm->disabledNodeMask, MtmReplicationNodeId-1)) {
2087-
Mtm->nodes[MtmReplicationNodeId-1].lastStatusChangeTime = time(NULL);
2092+
Mtm->nodes[MtmReplicationNodeId-1].lastStatusChangeTime = MtmGetSystemTime();
20882093
BIT_SET(Mtm->disabledNodeMask, MtmReplicationNodeId-1);
20892094
Mtm->nNodes -= 1;
20902095
MtmCheckQuorum();
20912096
}
20922097
} else if (BIT_CHECK(Mtm->disabledNodeMask, MtmReplicationNodeId-1)) {
20932098
if (recoveryCompleted) {
20942099
MTM_LOG1("Node %d consider that recovery of node %d is completed: start normal replication", MtmNodeId, MtmReplicationNodeId);
2095-
Mtm->nodes[MtmReplicationNodeId-1].lastStatusChangeTime = time(NULL);
2100+
Mtm->nodes[MtmReplicationNodeId-1].lastStatusChangeTime = MtmGetSystemTime();
20962101
BIT_CLEAR(Mtm->disabledNodeMask, MtmReplicationNodeId-1);
20972102
Mtm->nNodes += 1;
20982103
MtmCheckQuorum();
@@ -2239,7 +2244,7 @@ mtm_poll_node(PG_FUNCTION_ARGS)
22392244
}
22402245
if (!nowait) {
22412246
/* Just wait some time until logical repication channels will be reestablished */
2242-
MtmSleep(MtmNodeDisableDelay);
2247+
MtmSleep(MSEC_TO_USEC(MtmNodeDisableDelay));
22432248
}
22442249
PG_RETURN_BOOL(online);
22452250
}
@@ -2298,7 +2303,7 @@ mtm_get_nodes_state(PG_FUNCTION_ARGS)
22982303
usrfctx->values[4] = Int64GetDatum(lag);
22992304
usrfctx->nulls[4] = lag < 0;
23002305
usrfctx->values[5] = Int64GetDatum(Mtm->transCount ? Mtm->nodes[usrfctx->nodeId-1].transDelay/Mtm->transCount : 0);
2301-
usrfctx->values[6] = TimestampTzGetDatum(time_t_to_timestamptz(Mtm->nodes[usrfctx->nodeId-1].lastStatusChangeTime));
2306+
usrfctx->values[6] = TimestampTzGetDatum(time_t_to_timestamptz(Mtm->nodes[usrfctx->nodeId-1].lastStatusChangeTime/USEC));
23022307
usrfctx->values[7] = CStringGetTextDatum(Mtm->nodes[usrfctx->nodeId-1].con.connStr);
23032308
usrfctx->nodeId += 1;
23042309

@@ -3061,6 +3066,18 @@ MtmDetectGlobalDeadLock(PGPROC* proc)
30613066
MtmGetGtid(pgxact->xid, &gtid);
30623067
hasDeadlock = MtmGraphFindLoop(&graph, &gtid);
30633068
elog(WARNING, "Distributed deadlock check for %u:%u = %d", gtid.node, gtid.xid, hasDeadlock);
3069+
if (!hasDeadlock) {
3070+
/* There is no deadlock loop in graph, but deadlock can be caused by lack of apply workers: if all of them are busy, then some transactions
3071+
* can not be appied just because there are no vacant workers and it cause additional dependency between transactions which is not
3072+
* refelected in lock graph
3073+
*/
3074+
timestamp_t lastPeekTime = BgwGetLastPeekTime(&Mtm->pool);
3075+
if (lastPeekTime != 0 && MtmGetSystemTime() - lastPeekTime >= MSEC_TO_USEC(DeadlockTimeout)) {
3076+
hasDeadlock = true;
3077+
elog(WARNING, "Apply workers were blocked more than %d msec",
3078+
(int)USEC_TO_MSEC(MtmGetSystemTime() - lastPeekTime));
3079+
}
3080+
}
30643081
}
30653082
return hasDeadlock;
30663083
}

contrib/mmts/multimaster.h

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,9 @@
4848

4949
#define USEC 1000000
5050

51+
#define USEC_TO_MSEC(t) ((t)/1000)
52+
#define MSEC_TO_USEC(t) ((t)*1000)
53+
5154
#define Natts_mtm_ddl_log 2
5255
#define Anum_mtm_ddl_log_issued 1
5356
#define Anum_mtm_ddl_log_query 2
@@ -72,8 +75,6 @@ typedef uint64 csn_t; /* commit serial number */
7275
#define PGLOGICAL_CAUGHT_UP 0x04
7376

7477

75-
typedef uint64 timestamp_t;
76-
7778
/* Identifier of global transaction */
7879
typedef struct
7980
{
@@ -122,9 +123,9 @@ typedef struct
122123
typedef struct
123124
{
124125
MtmConnectionInfo con;
125-
time_t transDelay;
126-
time_t lastStatusChangeTime;
127-
XLogRecPtr flushPos;
126+
timestamp_t transDelay;
127+
timestamp_t lastStatusChangeTime;
128+
XLogRecPtr flushPos;
128129
csn_t oldestSnapshot; /* Oldest snapshot used by active transactions at this node */
129130
} MtmNodeInfo;
130131

@@ -232,8 +233,9 @@ extern void MtmRecoverNode(int nodeId);
232233
extern void MtmOnNodeDisconnect(int nodeId);
233234
extern void MtmOnNodeConnect(int nodeId);
234235
extern void MtmWakeUpBackend(MtmTransState* ts);
235-
extern timestamp_t MtmGetCurrentTime(void);
236-
extern void MtmSleep(timestamp_t interval);
236+
extern timestamp_t MtmGetSystemTime(void); /* non-adjusted current system time */
237+
extern timestamp_t MtmGetCurrentTime(void); /* adjusted current system time */
238+
extern void MtmSleep(timestamp_t interval);
237239
extern void MtmAbortTransaction(MtmTransState* ts);
238240
extern void MtmSetCurrentTransactionGID(char const* gid);
239241
extern csn_t MtmGetTransactionCSN(TransactionId xid);

0 commit comments

Comments
 (0)