Skip to content

Commit e8f78cc

Browse files
committed
Before suppressing stream.c
During Shuffle Join development I saw that conveyor is not work fully with synchronous message passing with delivery (we got a deadlock). We need new message passing interface. But now ,ore simplistic way for a proof-of-concept development we can use current dmq with barriers and reconnection counters.
1 parent 9a49249 commit e8f78cc

File tree

6 files changed

+485
-174
lines changed

6 files changed

+485
-174
lines changed

contrib/pg_exchange/common.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,13 @@
1919
#include "storage/lock.h"
2020
#include "dmq.h"
2121

22+
typedef char NodeName[256];
2223

2324
typedef struct
2425
{
2526
Oid serverid;
2627
DmqDestinationId dest_id;
28+
NodeName node;
2729
} DMQDestinations;
2830

2931
typedef struct

contrib/pg_exchange/dmq.c

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -324,6 +324,19 @@ dmq_sender_at_exit(int status, Datum arg)
324324
LWLockRelease(dmq_state->lock);
325325
}
326326

327+
static void
328+
switch_destination_state(DmqDestinationId dest_id, DmqConnState state)
329+
{
330+
DmqDestination *dest;
331+
332+
LWLockAcquire(dmq_state->lock, LW_EXCLUSIVE);
333+
dest = &(dmq_state->destinations[dest_id]);
334+
Assert(dest->active);
335+
336+
dest->state = state;
337+
LWLockRelease(dmq_state->lock);
338+
}
339+
327340
void
328341
dmq_sender_main(Datum main_arg)
329342
{
@@ -402,6 +415,7 @@ dmq_sender_main(Datum main_arg)
402415
conns[i] = *dest;
403416
Assert(conns[i].pgconn == NULL);
404417
conns[i].state = Idle;
418+
dest->state = Idle;
405419
prev_timer_at = 0; /* do not wait for timer event */
406420
}
407421
/* close connection to deleted destination */
@@ -443,6 +457,7 @@ dmq_sender_main(Datum main_arg)
443457
{
444458
// Assert(PQstatus(conns[conn_id].pgconn) != CONNECTION_OK);
445459
conns[conn_id].state = Idle;
460+
switch_destination_state(conn_id, Idle);
446461
// DeleteWaitEvent(set, conns[conn_id].pos);
447462

448463
mtm_log(DmqStateFinal,
@@ -532,6 +547,7 @@ dmq_sender_main(Datum main_arg)
532547
if (PQstatus(conns[conn_id].pgconn) == CONNECTION_BAD)
533548
{
534549
conns[conn_id].state = Idle;
550+
switch_destination_state(conn_id, Idle);
535551

536552
mtm_log(DmqStateIntermediate,
537553
"[DMQ] failed to start connection with %s (%s): %s",
@@ -542,6 +558,7 @@ dmq_sender_main(Datum main_arg)
542558
else
543559
{
544560
conns[conn_id].state = Connecting;
561+
switch_destination_state(conn_id, Connecting);
545562
conns[conn_id].pos = AddWaitEventToSet(set, WL_SOCKET_CONNECTED,
546563
PQsocket(conns[conn_id].pgconn),
547564
NULL, (void *) conn_id);
@@ -559,6 +576,7 @@ dmq_sender_main(Datum main_arg)
559576
if (ret < 0)
560577
{
561578
conns[conn_id].state = Idle;
579+
switch_destination_state(conn_id, Idle);
562580
// DeleteWaitEvent(set, conns[conn_id].pos);
563581
// Assert(PQstatus(conns[i].pgconn) != CONNECTION_OK);
564582

@@ -622,6 +640,7 @@ dmq_sender_main(Datum main_arg)
622640
sender_name);
623641

624642
conns[conn_id].state = Negotiating;
643+
switch_destination_state(conn_id, Negotiating);
625644
ModifyWaitEvent(set, event.pos, WL_SOCKET_READABLE, NULL);
626645
PQsendQuery(conns[conn_id].pgconn, query);
627646

@@ -632,6 +651,7 @@ dmq_sender_main(Datum main_arg)
632651
else if (status == PGRES_POLLING_FAILED)
633652
{
634653
conns[conn_id].state = Idle;
654+
switch_destination_state(conn_id, Idle);
635655
DeleteWaitEvent(set, event.pos);
636656

637657
mtm_log(DmqStateIntermediate,
@@ -655,6 +675,7 @@ dmq_sender_main(Datum main_arg)
655675
if (!PQconsumeInput(conns[conn_id].pgconn))
656676
{
657677
conns[conn_id].state = Idle;
678+
switch_destination_state(conn_id, Idle);
658679
DeleteWaitEvent(set, event.pos);
659680

660681
mtm_log(DmqStateIntermediate,
@@ -665,6 +686,7 @@ dmq_sender_main(Datum main_arg)
665686
if (!PQisBusy(conns[conn_id].pgconn))
666687
{
667688
conns[conn_id].state = Active;
689+
switch_destination_state(conn_id, Active);
668690
DeleteWaitEvent(set, event.pos);
669691

670692
mtm_log(DmqStateFinal,
@@ -679,6 +701,7 @@ dmq_sender_main(Datum main_arg)
679701
if (!PQconsumeInput(conns[conn_id].pgconn))
680702
{
681703
conns[conn_id].state = Idle;
704+
switch_destination_state(conn_id, Idle);
682705

683706
mtm_log(DmqStateFinal,
684707
"[DMQ] connection error with %s: %s",
@@ -1645,6 +1668,31 @@ dmq_destination_add(char *connstr, char *sender_name, char *receiver_name,
16451668
return dest_id;
16461669
}
16471670

1671+
/*
1672+
* Check availability of destination node.
1673+
* It is needed before sending process to prevent data loss.
1674+
*/
1675+
DmqConnState
1676+
dmq_get_destination_status(DmqDestinationId dest_id)
1677+
{
1678+
DmqConnState state;
1679+
1680+
if ((dest_id < 0) || (dest_id >= DMQ_MAX_DESTINATIONS))
1681+
return -2;
1682+
1683+
LWLockAcquire(dmq_state->lock, LW_EXCLUSIVE);
1684+
DmqDestination *dest = &(dmq_state->destinations[dest_id]);
1685+
if (!dest->active)
1686+
{
1687+
LWLockRelease(dmq_state->lock);
1688+
return -1;
1689+
}
1690+
1691+
state = dest->state;
1692+
LWLockRelease(dmq_state->lock);
1693+
return state;
1694+
}
1695+
16481696
void
16491697
dmq_destination_drop(char *receiver_name)
16501698
{

0 commit comments

Comments
 (0)