@@ -256,13 +256,18 @@ void MtmUnlockNode(int nodeId)
256
256
*/
257
257
258
258
259
- timestamp_t MtmGetCurrentTime (void )
259
+ timestamp_t MtmGetSystemTime (void )
260
260
{
261
261
struct timeval tv ;
262
262
gettimeofday (& tv , NULL );
263
263
return (timestamp_t )tv .tv_sec * USEC + tv .tv_usec + Mtm -> timeShift ;
264
264
}
265
265
266
+ timestamp_t MtmGetCurrentTime (void )
267
+ {
268
+ return MtmGetSystemTime () + Mtm -> timeShift ;
269
+ }
270
+
266
271
void MtmSleep (timestamp_t interval )
267
272
{
268
273
struct timespec ts ;
@@ -1046,7 +1051,7 @@ void MtmRecoveryCompleted(void)
1046
1051
MtmLock (LW_EXCLUSIVE );
1047
1052
Mtm -> recoverySlot = 0 ;
1048
1053
BIT_CLEAR (Mtm -> disabledNodeMask , MtmNodeId - 1 );
1049
- Mtm -> nodes [MtmNodeId - 1 ].lastStatusChangeTime = time ( NULL );
1054
+ Mtm -> nodes [MtmNodeId - 1 ].lastStatusChangeTime = MtmGetSystemTime ( );
1050
1055
/* Mode will be changed to online once all locagical reciever are connected */
1051
1056
MtmSwitchClusterMode (MTM_CONNECTED );
1052
1057
MtmUnlock ();
@@ -1135,7 +1140,7 @@ bool MtmRecoveryCaughtUp(int nodeId, XLogRecPtr slotLSN)
1135
1140
/* We are lucky: caugth-up without locking cluster! */
1136
1141
}
1137
1142
BIT_CLEAR (Mtm -> disabledNodeMask , nodeId - 1 );
1138
- Mtm -> nodes [nodeId - 1 ].lastStatusChangeTime = time ( NULL );
1143
+ Mtm -> nodes [nodeId - 1 ].lastStatusChangeTime = MtmGetSystemTime ( );
1139
1144
Mtm -> nNodes += 1 ;
1140
1145
caughtUp = true;
1141
1146
} else if (!BIT_CHECK (Mtm -> nodeLockerMask , nodeId - 1 )
@@ -1280,15 +1285,15 @@ bool MtmRefreshClusterStatus(bool nowait)
1280
1285
if (mask & 1 ) {
1281
1286
Mtm -> nNodes -= 1 ;
1282
1287
BIT_SET (Mtm -> disabledNodeMask , i );
1283
- Mtm -> nodes [i ].lastStatusChangeTime = time ( NULL );
1288
+ Mtm -> nodes [i ].lastStatusChangeTime = MtmGetSystemTime ( );
1284
1289
}
1285
1290
}
1286
1291
mask = clique & Mtm -> disabledNodeMask ; /* new enabled nodes mask */
1287
1292
for (i = 0 ; mask != 0 ; i ++ , mask >>= 1 ) {
1288
1293
if (mask & 1 ) {
1289
1294
Mtm -> nNodes += 1 ;
1290
1295
BIT_CLEAR (Mtm -> disabledNodeMask , i );
1291
- Mtm -> nodes [i ].lastStatusChangeTime = time ( NULL );
1296
+ Mtm -> nodes [i ].lastStatusChangeTime = MtmGetSystemTime ( );
1292
1297
}
1293
1298
}
1294
1299
MtmCheckQuorum ();
@@ -1328,7 +1333,7 @@ void MtmOnNodeDisconnect(int nodeId)
1328
1333
{
1329
1334
MtmTransState * ts ;
1330
1335
1331
- if (Mtm -> nodes [nodeId - 1 ].lastStatusChangeTime + MtmNodeDisableDelay > time ( NULL )) {
1336
+ if (Mtm -> nodes [nodeId - 1 ].lastStatusChangeTime + MSEC_TO_USEC ( MtmNodeDisableDelay ) > MtmGetSystemTime ( )) {
1332
1337
/* Avoid false detection of node failure and prevent node status blinking */
1333
1338
return ;
1334
1339
}
@@ -1343,7 +1348,7 @@ void MtmOnNodeDisconnect(int nodeId)
1343
1348
if (!MtmRefreshClusterStatus (false)) {
1344
1349
MtmLock (LW_EXCLUSIVE );
1345
1350
if (!BIT_CHECK (Mtm -> disabledNodeMask , nodeId - 1 )) {
1346
- Mtm -> nodes [nodeId - 1 ].lastStatusChangeTime = time ( NULL );
1351
+ Mtm -> nodes [nodeId - 1 ].lastStatusChangeTime = MtmGetSystemTime ( );
1347
1352
BIT_SET (Mtm -> disabledNodeMask , nodeId - 1 );
1348
1353
Mtm -> nNodes -= 1 ;
1349
1354
MtmCheckQuorum ();
@@ -1511,14 +1516,14 @@ static void MtmInitialize()
1511
1516
for (i = 0 ; i < MtmNodes ; i ++ ) {
1512
1517
Mtm -> nodes [i ].oldestSnapshot = 0 ;
1513
1518
Mtm -> nodes [i ].transDelay = 0 ;
1514
- Mtm -> nodes [i ].lastStatusChangeTime = time ( NULL );
1519
+ Mtm -> nodes [i ].lastStatusChangeTime = MtmGetSystemTime ( );
1515
1520
Mtm -> nodes [i ].con = MtmConnections [i ];
1516
1521
Mtm -> nodes [i ].flushPos = 0 ;
1517
1522
}
1518
1523
PGSemaphoreCreate (& Mtm -> votingSemaphore );
1519
1524
PGSemaphoreReset (& Mtm -> votingSemaphore );
1520
1525
SpinLockInit (& Mtm -> spinlock );
1521
- BgwPoolInit (& Mtm -> pool , MtmExecutor , MtmDatabaseName , MtmQueueSize );
1526
+ BgwPoolInit (& Mtm -> pool , MtmExecutor , MtmDatabaseName , MtmQueueSize , MtmWorkers );
1522
1527
RegisterXactCallback (MtmXactCallback , NULL );
1523
1528
MtmTx .snapshot = INVALID_CSN ;
1524
1529
MtmTx .xid = InvalidTransactionId ;
@@ -1682,10 +1687,10 @@ _PG_init(void)
1682
1687
1683
1688
DefineCustomIntVariable (
1684
1689
"multimaster.node_disable_delay" ,
1685
- "Minamal amount of time (sec ) between node status change" ,
1690
+ "Minamal amount of time (msec ) between node status change" ,
1686
1691
"This delay is used to avoid false detection of node failure and to prevent blinking of node status node" ,
1687
1692
& MtmNodeDisableDelay ,
1688
- 1 ,
1693
+ 1000 ,
1689
1694
1 ,
1690
1695
INT_MAX ,
1691
1696
PGC_BACKEND ,
@@ -2033,7 +2038,7 @@ void MtmDropNode(int nodeId, bool dropSlot)
2033
2038
{
2034
2039
elog (ERROR , "NodeID %d is out of range [1,%d]" , nodeId , Mtm -> nNodes );
2035
2040
}
2036
- Mtm -> nodes [nodeId - 1 ].lastStatusChangeTime = time ( NULL );
2041
+ Mtm -> nodes [nodeId - 1 ].lastStatusChangeTime = MtmGetSystemTime ( );
2037
2042
BIT_SET (Mtm -> disabledNodeMask , nodeId - 1 );
2038
2043
Mtm -> nNodes -= 1 ;
2039
2044
MtmCheckQuorum ();
@@ -2084,15 +2089,15 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
2084
2089
if (MtmIsRecoverySession ) {
2085
2090
MTM_LOG1 ("%d: Node %d start recovery of node %d" , MyProcPid , MtmNodeId , MtmReplicationNodeId );
2086
2091
if (!BIT_CHECK (Mtm -> disabledNodeMask , MtmReplicationNodeId - 1 )) {
2087
- Mtm -> nodes [MtmReplicationNodeId - 1 ].lastStatusChangeTime = time ( NULL );
2092
+ Mtm -> nodes [MtmReplicationNodeId - 1 ].lastStatusChangeTime = MtmGetSystemTime ( );
2088
2093
BIT_SET (Mtm -> disabledNodeMask , MtmReplicationNodeId - 1 );
2089
2094
Mtm -> nNodes -= 1 ;
2090
2095
MtmCheckQuorum ();
2091
2096
}
2092
2097
} else if (BIT_CHECK (Mtm -> disabledNodeMask , MtmReplicationNodeId - 1 )) {
2093
2098
if (recoveryCompleted ) {
2094
2099
MTM_LOG1 ("Node %d consider that recovery of node %d is completed: start normal replication" , MtmNodeId , MtmReplicationNodeId );
2095
- Mtm -> nodes [MtmReplicationNodeId - 1 ].lastStatusChangeTime = time ( NULL );
2100
+ Mtm -> nodes [MtmReplicationNodeId - 1 ].lastStatusChangeTime = MtmGetSystemTime ( );
2096
2101
BIT_CLEAR (Mtm -> disabledNodeMask , MtmReplicationNodeId - 1 );
2097
2102
Mtm -> nNodes += 1 ;
2098
2103
MtmCheckQuorum ();
@@ -2239,7 +2244,7 @@ mtm_poll_node(PG_FUNCTION_ARGS)
2239
2244
}
2240
2245
if (!nowait ) {
2241
2246
/* Just wait some time until logical repication channels will be reestablished */
2242
- MtmSleep (MtmNodeDisableDelay );
2247
+ MtmSleep (MSEC_TO_USEC ( MtmNodeDisableDelay ) );
2243
2248
}
2244
2249
PG_RETURN_BOOL (online );
2245
2250
}
@@ -2298,7 +2303,7 @@ mtm_get_nodes_state(PG_FUNCTION_ARGS)
2298
2303
usrfctx -> values [4 ] = Int64GetDatum (lag );
2299
2304
usrfctx -> nulls [4 ] = lag < 0 ;
2300
2305
usrfctx -> values [5 ] = Int64GetDatum (Mtm -> transCount ? Mtm -> nodes [usrfctx -> nodeId - 1 ].transDelay /Mtm -> transCount : 0 );
2301
- usrfctx -> values [6 ] = TimestampTzGetDatum (time_t_to_timestamptz (Mtm -> nodes [usrfctx -> nodeId - 1 ].lastStatusChangeTime ));
2306
+ usrfctx -> values [6 ] = TimestampTzGetDatum (time_t_to_timestamptz (Mtm -> nodes [usrfctx -> nodeId - 1 ].lastStatusChangeTime / USEC ));
2302
2307
usrfctx -> values [7 ] = CStringGetTextDatum (Mtm -> nodes [usrfctx -> nodeId - 1 ].con .connStr );
2303
2308
usrfctx -> nodeId += 1 ;
2304
2309
@@ -3061,6 +3066,18 @@ MtmDetectGlobalDeadLock(PGPROC* proc)
3061
3066
MtmGetGtid (pgxact -> xid , & gtid );
3062
3067
hasDeadlock = MtmGraphFindLoop (& graph , & gtid );
3063
3068
elog (WARNING , "Distributed deadlock check for %u:%u = %d" , gtid .node , gtid .xid , hasDeadlock );
3069
+ if (!hasDeadlock ) {
3070
+ /* There is no deadlock loop in graph, but deadlock can be caused by lack of apply workers: if all of them are busy, then some transactions
3071
+ * can not be appied just because there are no vacant workers and it cause additional dependency between transactions which is not
3072
+ * refelected in lock graph
3073
+ */
3074
+ timestamp_t lastPeekTime = BgwGetLastPeekTime (& Mtm -> pool );
3075
+ if (lastPeekTime != 0 && MtmGetSystemTime () - lastPeekTime >= MSEC_TO_USEC (DeadlockTimeout )) {
3076
+ hasDeadlock = true;
3077
+ elog (WARNING , "Apply workers were blocked more than %d msec" ,
3078
+ (int )USEC_TO_MSEC (MtmGetSystemTime () - lastPeekTime ));
3079
+ }
3080
+ }
3064
3081
}
3065
3082
return hasDeadlock ;
3066
3083
}
0 commit comments