@@ -230,6 +230,51 @@ static int MtmReadSocket(int sd, void* buf, int buf_size)
230
230
231
231
232
232
233
+ static void MtmSetSocketOptions (int sd )
234
+ {
235
+ #ifdef TCP_NODELAY
236
+ int optval = 1 ;
237
+ if (setsockopt (sd , IPPROTO_TCP , TCP_NODELAY , (char const * )& optval , sizeof (optval )) < 0 ) {
238
+ elog (WARNING , "Failed to set TCP_NODELAY: %m" );
239
+ }
240
+ #endif
241
+ if (tcp_keepalives_idle ) {
242
+ #ifdef TCP_KEEPIDLE
243
+ if (setsockopt (sd , IPPROTO_TCP , TCP_KEEPIDLE ,
244
+ (char * ) & tcp_keepalives_idle , sizeof (tcp_keepalives_idle )) < 0 )
245
+ {
246
+ elog (WARNING , "Failed to set TCP_KEEPIDLE: %m" );
247
+ }
248
+ #else
249
+ #ifdef TCP_KEEPALIVE
250
+ if (setsockopt (sd , IPPROTO_TCP , TCP_KEEPALIVE ,
251
+ (char * ) & tcp_keepalives_idle , sizeof (tcp_keepalives_idle )) < 0 )
252
+ {
253
+ elog (WARNING , "Failed to set TCP_KEEPALIVE: %m" );
254
+ }
255
+ #endif
256
+ #endif
257
+ }
258
+ #ifdef TCP_KEEPINTVL
259
+ if (tcp_keepalives_interval ) {
260
+ if (setsockopt (sd , IPPROTO_TCP , TCP_KEEPINTVL ,
261
+ (char * ) & tcp_keepalives_interval , sizeof (tcp_keepalives_interval )) < 0 )
262
+ {
263
+ elog (WARNING , "Failed to set TCP_KEEPINTVL: %m" );
264
+ }
265
+ }
266
+ #endif
267
+ #ifdef TCP_KEEPCNT
268
+ if (tcp_keepalives_count ) {
269
+ if (setsockopt (sd , IPPROTO_TCP , TCP_KEEPCNT ,
270
+ (char * ) & tcp_keepalives_count , sizeof (tcp_keepalives_count )) < 0 )
271
+ {
272
+ elog (WARNING , "Failed to set TCP_KEEPCNT: %m" );
273
+ }
274
+ }
275
+ #endif
276
+ }
277
+
233
278
static int MtmConnectSocket (char const * host , int port , int max_attempts )
234
279
{
235
280
struct sockaddr_in sock_inet ;
@@ -274,12 +319,9 @@ static int MtmConnectSocket(char const* host, int port, int max_attempts)
274
319
}
275
320
continue ;
276
321
} else {
277
- int optval = 1 ;
278
322
MtmHandshakeMessage req ;
279
323
MtmArbiterMessage resp ;
280
- setsockopt (sd , IPPROTO_TCP , TCP_NODELAY , (char const * )& optval , sizeof (optval ));
281
- setsockopt (sd , SOL_SOCKET , SO_KEEPALIVE , (char const * )& optval , sizeof (optval ));
282
-
324
+ MtmSetSocketOptions (sd );
283
325
req .hdr .code = MSG_HANDSHAKE ;
284
326
req .hdr .node = MtmNodeId ;
285
327
req .hdr .dxid = HANDSHAKE_MAGIC ;
@@ -306,10 +348,9 @@ static int MtmConnectSocket(char const* host, int port, int max_attempts)
306
348
/* Some node considered that I am dead, so switch to recovery mode */
307
349
if (BIT_CHECK (resp .disabledNodeMask , MtmNodeId - 1 )) {
308
350
elog (WARNING , "Node %d think that I am dead" , resp .node );
351
+ BIT_SET (Mtm -> disabledNodeMask , MtmNodeId - 1 );
309
352
MtmSwitchClusterMode (MTM_RECOVERY );
310
353
}
311
- /* Combine disable masks from all node. Is it actually correct or we should better check availability of nodes ourselves? */
312
- Mtm -> disabledNodeMask |= resp .disabledNodeMask ;
313
354
return sd ;
314
355
}
315
356
}
@@ -335,7 +376,7 @@ static void MtmOpenConnections()
335
376
}
336
377
if (Mtm -> nNodes < MtmNodes /2 + 1 ) { /* no quorum */
337
378
elog (WARNING , "Node is out of quorum: only %d nodes from %d are accssible" , Mtm -> nNodes , MtmNodes );
338
- Mtm -> status = MTM_OFFLINE ;
379
+ Mtm -> status = MTM_IN_MINORITY ;
339
380
} else if (Mtm -> status == MTM_INITIALIZATION ) {
340
381
MtmSwitchClusterMode (MTM_CONNECTED );
341
382
}
@@ -389,6 +430,7 @@ static void MtmAcceptOneConnection()
389
430
resp .dxid = HANDSHAKE_MAGIC ;
390
431
resp .sxid = ShmemVariableCache -> nextXid ;
391
432
resp .csn = MtmGetCurrentTime ();
433
+ resp .node = MtmNodeId ;
392
434
MtmUpdateNodeConnectionInfo (& Mtm -> nodes [req .hdr .node - 1 ].con , req .connStr );
393
435
if (!MtmWriteSocket (fd , & resp , sizeof resp )) {
394
436
elog (WARNING , "Arbiter failed to write response for handshake message to node %d" , resp .node );
@@ -605,7 +647,7 @@ static void MtmTransReceiver(Datum arg)
605
647
} while (n < 0 && errno == EINTR );
606
648
} while (n < 0 && MtmRecovery ());
607
649
608
- if (rc < 0 ) {
650
+ if (n < 0 ) {
609
651
elog (ERROR , "Arbiter failed to select sockets: %d" , errno );
610
652
}
611
653
for (i = 0 ; i < nNodes ; i ++ ) {
0 commit comments