76
76
77
77
78
78
#include "multimaster.h"
79
- #include "state.h"
80
79
81
80
#define MAX_ROUTES 16
82
81
#define INIT_BUFFER_SIZE 1024
@@ -190,6 +189,7 @@ static void MtmDisconnect(int node)
190
189
MtmUnregisterSocket (sockets [node ]);
191
190
pg_closesocket (sockets [node ], MtmUseRDMA );
192
191
sockets [node ] = -1 ;
192
+ MtmOnNodeDisconnect (node + 1 );
193
193
}
194
194
195
195
static int MtmWaitSocket (int sd , bool forWrite , timestamp_t timeoutMsec )
@@ -316,22 +316,25 @@ static void MtmCheckResponse(MtmArbiterMessage* resp)
316
316
} else {
317
317
BIT_CLEAR (Mtm -> currentLockNodeMask , resp -> node - 1 );
318
318
}
319
-
320
- // if (BIT_CHECK(resp->disabledNodeMask, MtmNodeId-1))
321
- // {
322
- // MtmStateProcessEvent(MTM_REMOTE_DISABLE);
323
- // }
324
-
325
- if (BIT_CHECK (Mtm -> disabledNodeMask , resp -> node - 1 ) &&
326
- sockets [resp -> node - 1 ] < 0 )
327
- {
328
- /* We've received heartbeat from disabled node.
319
+ if (
320
+ ( BIT_CHECK (resp -> disabledNodeMask , MtmNodeId - 1 ) || Mtm -> status == MTM_IN_MINORITY )
321
+ && !BIT_CHECK (Mtm -> disabledNodeMask , resp -> node - 1 )
322
+ && Mtm -> status != MTM_RECOVERY
323
+ && Mtm -> status != MTM_RECOVERED
324
+ && Mtm -> nodes [MtmNodeId - 1 ].lastStatusChangeTime + MSEC_TO_USEC (MtmNodeDisableDelay ) < MtmGetSystemTime ())
325
+ {
326
+ MTM_ELOG (WARNING , "Node %d thinks that I'm dead, while I'm %s (message %s)" , resp -> node , MtmNodeStatusMnem [Mtm -> status ], MtmMessageKindMnem [resp -> code ]);
327
+ BIT_SET (Mtm -> disabledNodeMask , MtmNodeId - 1 );
328
+ Mtm -> nConfigChanges += 1 ;
329
+ MtmSwitchClusterMode (MTM_RECOVERY );
330
+ } else if (BIT_CHECK (Mtm -> disabledNodeMask , resp -> node - 1 ) && sockets [resp -> node - 1 ] < 0 ) {
331
+ /* We receive heartbeat from disabled node.
329
332
* Looks like it is restarted.
330
333
* Try to reconnect to it.
331
334
*/
332
335
MTM_ELOG (WARNING , "Receive heartbeat from disabled node %d" , resp -> node );
333
336
BIT_SET (Mtm -> reconnectMask , resp -> node - 1 );
334
- }
337
+ }
335
338
}
336
339
337
340
static void MtmScheduleHeartbeat ()
@@ -540,9 +543,17 @@ static void MtmOpenConnections()
540
543
for (i = 0 ; i < nNodes ; i ++ ) {
541
544
if (i + 1 != MtmNodeId && i < Mtm -> nAllNodes ) {
542
545
sockets [i ] = MtmConnectSocket (i , Mtm -> nodes [i ].con .arbiterPort );
546
+ if (sockets [i ] < 0 ) {
547
+ MtmOnNodeDisconnect (i + 1 );
548
+ }
543
549
}
544
550
}
545
- MtmStateProcessEvent (MTM_ARBITER_RECEIVER_START );
551
+ if (Mtm -> nLiveNodes < Mtm -> nAllNodes /2 + 1 ) { /* no quorum */
552
+ MTM_ELOG (WARNING , "Node is out of quorum: only %d nodes of %d are accessible" , Mtm -> nLiveNodes , Mtm -> nAllNodes );
553
+ MtmSwitchClusterMode (MTM_IN_MINORITY );
554
+ } else if (Mtm -> status == MTM_INITIALIZATION ) {
555
+ MtmSwitchClusterMode (MTM_CONNECTED );
556
+ }
546
557
}
547
558
548
559
@@ -575,6 +586,7 @@ static bool MtmSendToNode(int node, void const* buf, int size)
575
586
}
576
587
sockets [node ] = MtmConnectSocket (node , Mtm -> nodes [node ].con .arbiterPort );
577
588
if (sockets [node ] < 0 ) {
589
+ MtmOnNodeDisconnect (node + 1 );
578
590
result = false;
579
591
break ;
580
592
}
@@ -704,18 +716,16 @@ static void MtmSender(Datum arg)
704
716
{
705
717
int nNodes = MtmMaxNodes ;
706
718
int i ;
707
- MtmBuffer * txBuffer ;
708
719
709
720
MtmBackgroundWorker = true;
710
721
711
- txBuffer = (MtmBuffer * )palloc0 (sizeof (MtmBuffer )* nNodes );
722
+ MtmBuffer * txBuffer = (MtmBuffer * )palloc0 (sizeof (MtmBuffer )* nNodes );
712
723
MTM_ELOG (LOG , "Start arbiter sender %d" , MyProcPid );
713
724
InitializeTimeouts ();
714
725
715
726
pqsignal (SIGINT , SetStop );
716
727
pqsignal (SIGQUIT , SetStop );
717
728
pqsignal (SIGTERM , SetStop );
718
- pqsignal (SIGHUP , PostgresSigHupHandler );
719
729
720
730
/* We're now ready to receive signals */
721
731
BackgroundWorkerUnblockSignals ();
@@ -734,12 +744,6 @@ static void MtmSender(Datum arg)
734
744
PGSemaphoreLock (& Mtm -> sendSemaphore );
735
745
CHECK_FOR_INTERRUPTS ();
736
746
737
- if (ConfigReloadPending )
738
- {
739
- ConfigReloadPending = false;
740
- ProcessConfigFile (PGC_SIGHUP );
741
- }
742
-
743
747
MtmCheckHeartbeat ();
744
748
/*
745
749
* Use shared lock to improve locality,
@@ -801,7 +805,6 @@ static void MtmMonitor(Datum arg)
801
805
pqsignal (SIGINT , SetStop );
802
806
pqsignal (SIGQUIT , SetStop );
803
807
pqsignal (SIGTERM , SetStop );
804
- pqsignal (SIGHUP , PostgresSigHupHandler );
805
808
806
809
MtmBackgroundWorker = true;
807
810
@@ -816,13 +819,6 @@ static void MtmMonitor(Datum arg)
816
819
if (rc & WL_POSTMASTER_DEATH ) {
817
820
break ;
818
821
}
819
-
820
- if (ConfigReloadPending )
821
- {
822
- ConfigReloadPending = false;
823
- ProcessConfigFile (PGC_SIGHUP );
824
- }
825
-
826
822
MtmRefreshClusterStatus ();
827
823
}
828
824
}
@@ -848,7 +844,6 @@ static void MtmReceiver(Datum arg)
848
844
pqsignal (SIGINT , SetStop );
849
845
pqsignal (SIGQUIT , SetStop );
850
846
pqsignal (SIGTERM , SetStop );
851
- pqsignal (SIGHUP , PostgresSigHupHandler );
852
847
853
848
MtmBackgroundWorker = true;
854
849
@@ -884,14 +879,7 @@ static void MtmReceiver(Datum arg)
884
879
for (j = 0 ; j < n ; j ++ ) {
885
880
if (events [j ].events & EPOLLIN )
886
881
#else
887
- fd_set events ;
888
-
889
- if (ConfigReloadPending )
890
- {
891
- ConfigReloadPending = false;
892
- ProcessConfigFile (PGC_SIGHUP );
893
- }
894
-
882
+ fd_set events ;
895
883
do {
896
884
struct timeval tv ;
897
885
events = inset ;
@@ -1018,7 +1006,7 @@ static void MtmReceiver(Datum arg)
1018
1006
default :
1019
1007
break ;
1020
1008
}
1021
- if (BIT_CHECK (msg -> disabledNodeMask , node - 1 ) || BIT_CHECK ( Mtm -> disabledNodeMask , node - 1 ) ) {
1009
+ if (BIT_CHECK (msg -> disabledNodeMask , node - 1 )) {
1022
1010
MTM_ELOG (WARNING , "Ignore message from dead node %d\n" , node );
1023
1011
continue ;
1024
1012
}
@@ -1096,7 +1084,7 @@ static void MtmReceiver(Datum arg)
1096
1084
if (ts -> status != TRANSACTION_STATUS_ABORTED ) {
1097
1085
MTM_LOG1 ("Arbiter receive abort message for transaction %s (%llu) from node %d" , ts -> gid , (long64 )ts -> xid , node );
1098
1086
Assert (ts -> status == TRANSACTION_STATUS_IN_PROGRESS );
1099
- ts -> abortedByNode = node ;
1087
+ ts -> aborted_by_node = node ;
1100
1088
MtmAbortTransaction (ts );
1101
1089
}
1102
1090
if ((ts -> participantsMask & ~Mtm -> disabledNodeMask & ~ts -> votedMask ) == 0 ) {
@@ -1173,3 +1161,4 @@ static void MtmReceiver(Datum arg)
1173
1161
}
1174
1162
proc_exit (1 ); /* force restart of this bgwroker */
1175
1163
}
1164
+
0 commit comments