@@ -324,6 +324,19 @@ dmq_sender_at_exit(int status, Datum arg)
324
324
LWLockRelease (dmq_state -> lock );
325
325
}
326
326
327
+ static void
328
+ switch_destination_state (DmqDestinationId dest_id , DmqConnState state )
329
+ {
330
+ DmqDestination * dest ;
331
+
332
+ LWLockAcquire (dmq_state -> lock , LW_EXCLUSIVE );
333
+ dest = & (dmq_state -> destinations [dest_id ]);
334
+ Assert (dest -> active );
335
+
336
+ dest -> state = state ;
337
+ LWLockRelease (dmq_state -> lock );
338
+ }
339
+
327
340
void
328
341
dmq_sender_main (Datum main_arg )
329
342
{
@@ -402,6 +415,7 @@ dmq_sender_main(Datum main_arg)
402
415
conns [i ] = * dest ;
403
416
Assert (conns [i ].pgconn == NULL );
404
417
conns [i ].state = Idle ;
418
+ dest -> state = Idle ;
405
419
prev_timer_at = 0 ; /* do not wait for timer event */
406
420
}
407
421
/* close connection to deleted destination */
@@ -443,6 +457,7 @@ dmq_sender_main(Datum main_arg)
443
457
{
444
458
// Assert(PQstatus(conns[conn_id].pgconn) != CONNECTION_OK);
445
459
conns [conn_id ].state = Idle ;
460
+ switch_destination_state (conn_id , Idle );
446
461
// DeleteWaitEvent(set, conns[conn_id].pos);
447
462
448
463
mtm_log (DmqStateFinal ,
@@ -532,6 +547,7 @@ dmq_sender_main(Datum main_arg)
532
547
if (PQstatus (conns [conn_id ].pgconn ) == CONNECTION_BAD )
533
548
{
534
549
conns [conn_id ].state = Idle ;
550
+ switch_destination_state (conn_id , Idle );
535
551
536
552
mtm_log (DmqStateIntermediate ,
537
553
"[DMQ] failed to start connection with %s (%s): %s" ,
@@ -542,6 +558,7 @@ dmq_sender_main(Datum main_arg)
542
558
else
543
559
{
544
560
conns [conn_id ].state = Connecting ;
561
+ switch_destination_state (conn_id , Connecting );
545
562
conns [conn_id ].pos = AddWaitEventToSet (set , WL_SOCKET_CONNECTED ,
546
563
PQsocket (conns [conn_id ].pgconn ),
547
564
NULL , (void * ) conn_id );
@@ -559,6 +576,7 @@ dmq_sender_main(Datum main_arg)
559
576
if (ret < 0 )
560
577
{
561
578
conns [conn_id ].state = Idle ;
579
+ switch_destination_state (conn_id , Idle );
562
580
// DeleteWaitEvent(set, conns[conn_id].pos);
563
581
// Assert(PQstatus(conns[i].pgconn) != CONNECTION_OK);
564
582
@@ -622,6 +640,7 @@ dmq_sender_main(Datum main_arg)
622
640
sender_name );
623
641
624
642
conns [conn_id ].state = Negotiating ;
643
+ switch_destination_state (conn_id , Negotiating );
625
644
ModifyWaitEvent (set , event .pos , WL_SOCKET_READABLE , NULL );
626
645
PQsendQuery (conns [conn_id ].pgconn , query );
627
646
@@ -632,6 +651,7 @@ dmq_sender_main(Datum main_arg)
632
651
else if (status == PGRES_POLLING_FAILED )
633
652
{
634
653
conns [conn_id ].state = Idle ;
654
+ switch_destination_state (conn_id , Idle );
635
655
DeleteWaitEvent (set , event .pos );
636
656
637
657
mtm_log (DmqStateIntermediate ,
@@ -655,6 +675,7 @@ dmq_sender_main(Datum main_arg)
655
675
if (!PQconsumeInput (conns [conn_id ].pgconn ))
656
676
{
657
677
conns [conn_id ].state = Idle ;
678
+ switch_destination_state (conn_id , Idle );
658
679
DeleteWaitEvent (set , event .pos );
659
680
660
681
mtm_log (DmqStateIntermediate ,
@@ -665,6 +686,7 @@ dmq_sender_main(Datum main_arg)
665
686
if (!PQisBusy (conns [conn_id ].pgconn ))
666
687
{
667
688
conns [conn_id ].state = Active ;
689
+ switch_destination_state (conn_id , Active );
668
690
DeleteWaitEvent (set , event .pos );
669
691
670
692
mtm_log (DmqStateFinal ,
@@ -679,6 +701,7 @@ dmq_sender_main(Datum main_arg)
679
701
if (!PQconsumeInput (conns [conn_id ].pgconn ))
680
702
{
681
703
conns [conn_id ].state = Idle ;
704
+ switch_destination_state (conn_id , Idle );
682
705
683
706
mtm_log (DmqStateFinal ,
684
707
"[DMQ] connection error with %s: %s" ,
@@ -1645,6 +1668,31 @@ dmq_destination_add(char *connstr, char *sender_name, char *receiver_name,
1645
1668
return dest_id ;
1646
1669
}
1647
1670
1671
+ /*
1672
+ * Check availability of destination node.
1673
+ * It is needed before sending process to prevent data loss.
1674
+ */
1675
+ DmqConnState
1676
+ dmq_get_destination_status (DmqDestinationId dest_id )
1677
+ {
1678
+ DmqConnState state ;
1679
+
1680
+ if ((dest_id < 0 ) || (dest_id >= DMQ_MAX_DESTINATIONS ))
1681
+ return -2 ;
1682
+
1683
+ LWLockAcquire (dmq_state -> lock , LW_EXCLUSIVE );
1684
+ DmqDestination * dest = & (dmq_state -> destinations [dest_id ]);
1685
+ if (!dest -> active )
1686
+ {
1687
+ LWLockRelease (dmq_state -> lock );
1688
+ return -1 ;
1689
+ }
1690
+
1691
+ state = dest -> state ;
1692
+ LWLockRelease (dmq_state -> lock );
1693
+ return state ;
1694
+ }
1695
+
1648
1696
void
1649
1697
dmq_destination_drop (char * receiver_name )
1650
1698
{
0 commit comments