76
76
77
77
78
78
#include "multimaster.h"
79
+ #include "state.h"
79
80
80
81
#define MAX_ROUTES 16
81
82
#define INIT_BUFFER_SIZE 1024
@@ -189,7 +190,6 @@ static void MtmDisconnect(int node)
189
190
MtmUnregisterSocket (sockets [node ]);
190
191
pg_closesocket (sockets [node ], MtmUseRDMA );
191
192
sockets [node ] = -1 ;
192
- MtmOnNodeDisconnect (node + 1 );
193
193
}
194
194
195
195
static int MtmWaitSocket (int sd , bool forWrite , timestamp_t timeoutMsec )
@@ -316,25 +316,22 @@ static void MtmCheckResponse(MtmArbiterMessage* resp)
316
316
} else {
317
317
BIT_CLEAR (Mtm -> currentLockNodeMask , resp -> node - 1 );
318
318
}
319
- if (
320
- ( BIT_CHECK (resp -> disabledNodeMask , MtmNodeId - 1 ) || Mtm -> status == MTM_IN_MINORITY )
321
- && !BIT_CHECK (Mtm -> disabledNodeMask , resp -> node - 1 )
322
- && Mtm -> status != MTM_RECOVERY
323
- && Mtm -> status != MTM_RECOVERED
324
- && Mtm -> nodes [MtmNodeId - 1 ].lastStatusChangeTime + MSEC_TO_USEC (MtmNodeDisableDelay ) < MtmGetSystemTime ())
325
- {
326
- MTM_ELOG (WARNING , "Node %d thinks that I'm dead, while I'm %s (message %s)" , resp -> node , MtmNodeStatusMnem [Mtm -> status ], MtmMessageKindMnem [resp -> code ]);
327
- BIT_SET (Mtm -> disabledNodeMask , MtmNodeId - 1 );
328
- Mtm -> nConfigChanges += 1 ;
329
- MtmSwitchClusterMode (MTM_RECOVERY );
330
- } else if (BIT_CHECK (Mtm -> disabledNodeMask , resp -> node - 1 ) && sockets [resp -> node - 1 ] < 0 ) {
331
- /* We receive heartbeat from disabled node.
319
+
320
+ // if (BIT_CHECK(resp->disabledNodeMask, MtmNodeId-1))
321
+ // {
322
+ // MtmStateProcessEvent(MTM_REMOTE_DISABLE);
323
+ // }
324
+
325
+ if (BIT_CHECK (Mtm -> disabledNodeMask , resp -> node - 1 ) &&
326
+ sockets [resp -> node - 1 ] < 0 )
327
+ {
328
+ /* We've received heartbeat from disabled node.
332
329
* Looks like it is restarted.
333
330
* Try to reconnect to it.
334
331
*/
335
332
MTM_ELOG (WARNING , "Receive heartbeat from disabled node %d" , resp -> node );
336
333
BIT_SET (Mtm -> reconnectMask , resp -> node - 1 );
337
- }
334
+ }
338
335
}
339
336
340
337
static void MtmScheduleHeartbeat ()
@@ -543,17 +540,9 @@ static void MtmOpenConnections()
543
540
for (i = 0 ; i < nNodes ; i ++ ) {
544
541
if (i + 1 != MtmNodeId && i < Mtm -> nAllNodes ) {
545
542
sockets [i ] = MtmConnectSocket (i , Mtm -> nodes [i ].con .arbiterPort );
546
- if (sockets [i ] < 0 ) {
547
- MtmOnNodeDisconnect (i + 1 );
548
- }
549
543
}
550
544
}
551
- if (Mtm -> nLiveNodes < Mtm -> nAllNodes /2 + 1 ) { /* no quorum */
552
- MTM_ELOG (WARNING , "Node is out of quorum: only %d nodes of %d are accessible" , Mtm -> nLiveNodes , Mtm -> nAllNodes );
553
- MtmSwitchClusterMode (MTM_IN_MINORITY );
554
- } else if (Mtm -> status == MTM_INITIALIZATION ) {
555
- MtmSwitchClusterMode (MTM_CONNECTED );
556
- }
545
+ MtmStateProcessEvent (MTM_ARBITER_RECEIVER_START );
557
546
}
558
547
559
548
@@ -586,7 +575,6 @@ static bool MtmSendToNode(int node, void const* buf, int size)
586
575
}
587
576
sockets [node ] = MtmConnectSocket (node , Mtm -> nodes [node ].con .arbiterPort );
588
577
if (sockets [node ] < 0 ) {
589
- MtmOnNodeDisconnect (node + 1 );
590
578
result = false;
591
579
break ;
592
580
}
@@ -716,16 +704,18 @@ static void MtmSender(Datum arg)
716
704
{
717
705
int nNodes = MtmMaxNodes ;
718
706
int i ;
707
+ MtmBuffer * txBuffer ;
719
708
720
709
MtmBackgroundWorker = true;
721
710
722
- MtmBuffer * txBuffer = (MtmBuffer * )palloc0 (sizeof (MtmBuffer )* nNodes );
711
+ txBuffer = (MtmBuffer * )palloc0 (sizeof (MtmBuffer )* nNodes );
723
712
MTM_ELOG (LOG , "Start arbiter sender %d" , MyProcPid );
724
713
InitializeTimeouts ();
725
714
726
715
pqsignal (SIGINT , SetStop );
727
716
pqsignal (SIGQUIT , SetStop );
728
717
pqsignal (SIGTERM , SetStop );
718
+ pqsignal (SIGHUP , PostgresSigHupHandler );
729
719
730
720
/* We're now ready to receive signals */
731
721
BackgroundWorkerUnblockSignals ();
@@ -744,6 +734,12 @@ static void MtmSender(Datum arg)
744
734
PGSemaphoreLock (& Mtm -> sendSemaphore );
745
735
CHECK_FOR_INTERRUPTS ();
746
736
737
+ if (ConfigReloadPending )
738
+ {
739
+ ConfigReloadPending = false;
740
+ ProcessConfigFile (PGC_SIGHUP );
741
+ }
742
+
747
743
MtmCheckHeartbeat ();
748
744
/*
749
745
* Use shared lock to improve locality,
@@ -805,6 +801,7 @@ static void MtmMonitor(Datum arg)
805
801
pqsignal (SIGINT , SetStop );
806
802
pqsignal (SIGQUIT , SetStop );
807
803
pqsignal (SIGTERM , SetStop );
804
+ pqsignal (SIGHUP , PostgresSigHupHandler );
808
805
809
806
MtmBackgroundWorker = true;
810
807
@@ -819,6 +816,13 @@ static void MtmMonitor(Datum arg)
819
816
if (rc & WL_POSTMASTER_DEATH ) {
820
817
break ;
821
818
}
819
+
820
+ if (ConfigReloadPending )
821
+ {
822
+ ConfigReloadPending = false;
823
+ ProcessConfigFile (PGC_SIGHUP );
824
+ }
825
+
822
826
MtmRefreshClusterStatus ();
823
827
}
824
828
}
@@ -844,6 +848,7 @@ static void MtmReceiver(Datum arg)
844
848
pqsignal (SIGINT , SetStop );
845
849
pqsignal (SIGQUIT , SetStop );
846
850
pqsignal (SIGTERM , SetStop );
851
+ pqsignal (SIGHUP , PostgresSigHupHandler );
847
852
848
853
MtmBackgroundWorker = true;
849
854
@@ -879,7 +884,14 @@ static void MtmReceiver(Datum arg)
879
884
for (j = 0 ; j < n ; j ++ ) {
880
885
if (events [j ].events & EPOLLIN )
881
886
#else
882
- fd_set events ;
887
+ fd_set events ;
888
+
889
+ if (ConfigReloadPending )
890
+ {
891
+ ConfigReloadPending = false;
892
+ ProcessConfigFile (PGC_SIGHUP );
893
+ }
894
+
883
895
do {
884
896
struct timeval tv ;
885
897
events = inset ;
@@ -1006,7 +1018,7 @@ static void MtmReceiver(Datum arg)
1006
1018
default :
1007
1019
break ;
1008
1020
}
1009
- if (BIT_CHECK (msg -> disabledNodeMask , node - 1 )) {
1021
+ if (BIT_CHECK (msg -> disabledNodeMask , node - 1 ) || BIT_CHECK ( Mtm -> disabledNodeMask , node - 1 ) ) {
1010
1022
MTM_ELOG (WARNING , "Ignore message from dead node %d\n" , node );
1011
1023
continue ;
1012
1024
}
@@ -1084,7 +1096,7 @@ static void MtmReceiver(Datum arg)
1084
1096
if (ts -> status != TRANSACTION_STATUS_ABORTED ) {
1085
1097
MTM_LOG1 ("Arbiter receive abort message for transaction %s (%llu) from node %d" , ts -> gid , (long64 )ts -> xid , node );
1086
1098
Assert (ts -> status == TRANSACTION_STATUS_IN_PROGRESS );
1087
- ts -> aborted_by_node = node ;
1099
+ ts -> abortedByNode = node ;
1088
1100
MtmAbortTransaction (ts );
1089
1101
}
1090
1102
if ((ts -> participantsMask & ~Mtm -> disabledNodeMask & ~ts -> votedMask ) == 0 ) {
@@ -1161,4 +1173,3 @@ static void MtmReceiver(Datum arg)
1161
1173
}
1162
1174
proc_exit (1 ); /* force restart of this bgwroker */
1163
1175
}
1164
-
0 commit comments