76
76
77
77
78
78
#include "multimaster.h"
79
+ #include "state.h"
79
80
80
81
#define MAX_ROUTES 16
81
82
#define INIT_BUFFER_SIZE 1024
@@ -189,7 +190,6 @@ static void MtmDisconnect(int node)
189
190
MtmUnregisterSocket (sockets [node ]);
190
191
pg_closesocket (sockets [node ], MtmUseRDMA );
191
192
sockets [node ] = -1 ;
192
- MtmOnNodeDisconnect (node + 1 );
193
193
}
194
194
195
195
static int MtmWaitSocket (int sd , bool forWrite , timestamp_t timeoutMsec )
@@ -316,25 +316,22 @@ static void MtmCheckResponse(MtmArbiterMessage* resp)
316
316
} else {
317
317
BIT_CLEAR (Mtm -> currentLockNodeMask , resp -> node - 1 );
318
318
}
319
- if (
320
- ( BIT_CHECK (resp -> disabledNodeMask , MtmNodeId - 1 ) || Mtm -> status == MTM_IN_MINORITY )
321
- && !BIT_CHECK (Mtm -> disabledNodeMask , resp -> node - 1 )
322
- && Mtm -> status != MTM_RECOVERY
323
- && Mtm -> status != MTM_RECOVERED
324
- && Mtm -> nodes [MtmNodeId - 1 ].lastStatusChangeTime + MSEC_TO_USEC (MtmNodeDisableDelay ) < MtmGetSystemTime ())
325
- {
326
- MTM_ELOG (WARNING , "Node %d thinks that I'm dead, while I'm %s (message %s)" , resp -> node , MtmNodeStatusMnem [Mtm -> status ], MtmMessageKindMnem [resp -> code ]);
327
- BIT_SET (Mtm -> disabledNodeMask , MtmNodeId - 1 );
328
- Mtm -> nConfigChanges += 1 ;
329
- MtmSwitchClusterMode (MTM_RECOVERY );
330
- } else if (BIT_CHECK (Mtm -> disabledNodeMask , resp -> node - 1 ) && sockets [resp -> node - 1 ] < 0 ) {
331
- /* We receive heartbeat from disabled node.
319
+
320
+ // if (BIT_CHECK(resp->disabledNodeMask, MtmNodeId-1))
321
+ // {
322
+ // MtmStateProcessEvent(MTM_REMOTE_DISABLE);
323
+ // }
324
+
325
+ if (BIT_CHECK (Mtm -> disabledNodeMask , resp -> node - 1 ) &&
326
+ sockets [resp -> node - 1 ] < 0 )
327
+ {
328
+ /* We've received heartbeat from disabled node.
332
329
* Looks like it is restarted.
333
330
* Try to reconnect to it.
334
331
*/
335
332
MTM_ELOG (WARNING , "Receive heartbeat from disabled node %d" , resp -> node );
336
333
BIT_SET (Mtm -> reconnectMask , resp -> node - 1 );
337
- }
334
+ }
338
335
}
339
336
340
337
static void MtmScheduleHeartbeat ()
@@ -543,17 +540,9 @@ static void MtmOpenConnections()
543
540
for (i = 0 ; i < nNodes ; i ++ ) {
544
541
if (i + 1 != MtmNodeId && i < Mtm -> nAllNodes ) {
545
542
sockets [i ] = MtmConnectSocket (i , Mtm -> nodes [i ].con .arbiterPort );
546
- if (sockets [i ] < 0 ) {
547
- MtmOnNodeDisconnect (i + 1 );
548
- }
549
543
}
550
544
}
551
- if (Mtm -> nLiveNodes < Mtm -> nAllNodes /2 + 1 ) { /* no quorum */
552
- MTM_ELOG (WARNING , "Node is out of quorum: only %d nodes of %d are accessible" , Mtm -> nLiveNodes , Mtm -> nAllNodes );
553
- MtmSwitchClusterMode (MTM_IN_MINORITY );
554
- } else if (Mtm -> status == MTM_INITIALIZATION ) {
555
- MtmSwitchClusterMode (MTM_CONNECTED );
556
- }
545
+ MtmStateProcessEvent (MTM_ARBITER_RECEIVER_START );
557
546
}
558
547
559
548
@@ -586,7 +575,6 @@ static bool MtmSendToNode(int node, void const* buf, int size)
586
575
}
587
576
sockets [node ] = MtmConnectSocket (node , Mtm -> nodes [node ].con .arbiterPort );
588
577
if (sockets [node ] < 0 ) {
589
- MtmOnNodeDisconnect (node + 1 );
590
578
result = false;
591
579
break ;
592
580
}
@@ -727,6 +715,7 @@ static void MtmSender(Datum arg)
727
715
pqsignal (SIGINT , SetStop );
728
716
pqsignal (SIGQUIT , SetStop );
729
717
pqsignal (SIGTERM , SetStop );
718
+ pqsignal (SIGHUP , PostgresSigHupHandler );
730
719
731
720
/* We're now ready to receive signals */
732
721
BackgroundWorkerUnblockSignals ();
@@ -745,6 +734,12 @@ static void MtmSender(Datum arg)
745
734
PGSemaphoreLock (& Mtm -> sendSemaphore );
746
735
CHECK_FOR_INTERRUPTS ();
747
736
737
+ if (ConfigReloadPending )
738
+ {
739
+ ConfigReloadPending = false;
740
+ ProcessConfigFile (PGC_SIGHUP );
741
+ }
742
+
748
743
MtmCheckHeartbeat ();
749
744
/*
750
745
* Use shared lock to improve locality,
@@ -806,6 +801,7 @@ static void MtmMonitor(Datum arg)
806
801
pqsignal (SIGINT , SetStop );
807
802
pqsignal (SIGQUIT , SetStop );
808
803
pqsignal (SIGTERM , SetStop );
804
+ pqsignal (SIGHUP , PostgresSigHupHandler );
809
805
810
806
MtmBackgroundWorker = true;
811
807
@@ -820,6 +816,13 @@ static void MtmMonitor(Datum arg)
820
816
if (rc & WL_POSTMASTER_DEATH ) {
821
817
break ;
822
818
}
819
+
820
+ if (ConfigReloadPending )
821
+ {
822
+ ConfigReloadPending = false;
823
+ ProcessConfigFile (PGC_SIGHUP );
824
+ }
825
+
823
826
MtmRefreshClusterStatus ();
824
827
}
825
828
}
@@ -845,6 +848,7 @@ static void MtmReceiver(Datum arg)
845
848
pqsignal (SIGINT , SetStop );
846
849
pqsignal (SIGQUIT , SetStop );
847
850
pqsignal (SIGTERM , SetStop );
851
+ pqsignal (SIGHUP , PostgresSigHupHandler );
848
852
849
853
MtmBackgroundWorker = true;
850
854
@@ -880,7 +884,14 @@ static void MtmReceiver(Datum arg)
880
884
for (j = 0 ; j < n ; j ++ ) {
881
885
if (events [j ].events & EPOLLIN )
882
886
#else
883
- fd_set events ;
887
+ fd_set events ;
888
+
889
+ if (ConfigReloadPending )
890
+ {
891
+ ConfigReloadPending = false;
892
+ ProcessConfigFile (PGC_SIGHUP );
893
+ }
894
+
884
895
do {
885
896
struct timeval tv ;
886
897
events = inset ;
@@ -1007,7 +1018,7 @@ static void MtmReceiver(Datum arg)
1007
1018
default :
1008
1019
break ;
1009
1020
}
1010
- if (BIT_CHECK (msg -> disabledNodeMask , node - 1 )) {
1021
+ if (BIT_CHECK (msg -> disabledNodeMask , node - 1 ) || BIT_CHECK ( Mtm -> disabledNodeMask , node - 1 ) ) {
1011
1022
MTM_ELOG (WARNING , "Ignore message from dead node %d\n" , node );
1012
1023
continue ;
1013
1024
}
@@ -1085,7 +1096,7 @@ static void MtmReceiver(Datum arg)
1085
1096
if (ts -> status != TRANSACTION_STATUS_ABORTED ) {
1086
1097
MTM_LOG1 ("Arbiter receive abort message for transaction %s (%llu) from node %d" , ts -> gid , (long64 )ts -> xid , node );
1087
1098
Assert (ts -> status == TRANSACTION_STATUS_IN_PROGRESS );
1088
- ts -> aborted_by_node = node ;
1099
+ ts -> abortedByNode = node ;
1089
1100
MtmAbortTransaction (ts );
1090
1101
}
1091
1102
if ((ts -> participantsMask & ~Mtm -> disabledNodeMask & ~ts -> votedMask ) == 0 ) {
0 commit comments