Skip to content

Commit 04dc7fe

Browse files
committed
Patch for stable execute at 3-node cluster
1 parent f84be93 commit 04dc7fe

File tree

12 files changed

+291
-269
lines changed

12 files changed

+291
-269
lines changed

contrib/pg_exchange/Makefile

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@ EXTENSION = pg_exchange
55
EXTVERSION = 0.1
66
PGFILEDESC = "pg_exchange - an exchange custom node and rules for the planner"
77
MODULES = pg_exchange
8-
OBJS = pg_exchange.o exchange.o hooks.o common.o nodeDummyscan.o nodeDistPlanExec.o dmq.o stream.o partutils.o $(WIN32RES)
8+
OBJS = common.o dmq.o exchange.o expath.o hooks.o nodeDistPlanExec.o \
9+
nodeDummyscan.o partutils.o pg_exchange.o stream.o $(WIN32RES)
910

1011
fdw_srcdir = $(top_srcdir)/contrib/postgres_fdw/
1112

contrib/pg_exchange/dmq.c

Lines changed: 40 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1239,27 +1239,44 @@ dmq_push(DmqDestinationId dest_id, char *stream_name, char *msg)
12391239
resetStringInfo(&buf);
12401240
}
12411241

1242+
static bool push_state = false;
1243+
static StringInfoData buf;
12421244

1243-
void
1244-
dmq_push_buffer(DmqDestinationId dest_id, char *stream_name, const void *payload, size_t len)
1245+
bool
1246+
dmq_push_buffer(DmqDestinationId dest_id, char *stream_name,
1247+
const void *payload, size_t len, bool nowait)
12451248
{
1246-
StringInfoData buf;
12471249
shm_mq_result res;
12481250

1249-
ensure_outq_handle();
1251+
if (!push_state)
1252+
{
1253+
ensure_outq_handle();
12501254

1251-
initStringInfo(&buf);
1252-
pq_sendbyte(&buf, dest_id);
1253-
pq_send_ascii_string(&buf, stream_name);
1254-
pq_sendbytes(&buf, payload, len);
1255+
initStringInfo(&buf);
1256+
pq_sendbyte(&buf, dest_id);
1257+
pq_send_ascii_string(&buf, stream_name);
1258+
pq_sendbytes(&buf, payload, len);
12551259

1256-
mtm_log(DmqTraceOutgoing, "[DMQ] pushing l=%d '%.*s'",
1257-
buf.len, buf.len, buf.data);
1260+
mtm_log(DmqTraceOutgoing, "[DMQ] pushing l=%d '%.*s'",
1261+
buf.len, buf.len, buf.data);
1262+
}
12581263

12591264
// XXX: use sendv instead
1260-
res = shm_mq_send(dmq_local.mq_outh, buf.len, buf.data, false);
1265+
res = shm_mq_send(dmq_local.mq_outh, buf.len, buf.data, nowait);
1266+
1267+
if (res == SHM_MQ_WOULD_BLOCK)
1268+
{
1269+
Assert(nowait == true);
1270+
push_state = true;
1271+
/* Report on full queue. */
1272+
return false;
1273+
}
1274+
12611275
if (res != SHM_MQ_SUCCESS)
12621276
mtm_log(WARNING, "[DMQ] dmq_push: can't send to queue");
1277+
1278+
push_state = false;
1279+
return true;
12631280
}
12641281

12651282
static bool
@@ -1467,6 +1484,17 @@ dmq_sender_name(DmqSenderId id)
14671484
return dmq_local.inhandles[id].name;
14681485
}
14691486

1487+
char *
1488+
dmq_receiver_name(DmqDestinationId dest_id)
1489+
{
1490+
char *recvName;
1491+
1492+
LWLockAcquire(dmq_state->lock, LW_SHARED);
1493+
recvName = pstrdup(dmq_state->destinations[dest_id].receiver_name);
1494+
LWLockRelease(dmq_state->lock);
1495+
return recvName;
1496+
}
1497+
14701498
DmqDestinationId
14711499
dmq_remote_id(const char *name)
14721500
{
@@ -1492,7 +1520,7 @@ dmq_remote_id(const char *name)
14921520
* received. Returns false, if an error is occured.
14931521
*
14941522
* sender_id - identifier of the received message sender.
1495-
* msg - buffer that contains received message.
1523+
* msg - pointer to local buffer that contains received message.
14961524
* len - size of received message.
14971525
*/
14981526
const char *

contrib/pg_exchange/dmq.h

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ typedef enum
1515
typedef int8 DmqDestinationId;
1616
typedef int8 DmqSenderId;
1717

18-
#define DMQ_NAME_MAXLEN 32
18+
#define DMQ_NAME_MAXLEN 64
1919
#define DMQ_MAX_DESTINATIONS 127
2020
#define DMQ_MAX_RECEIVERS 100
2121

@@ -38,6 +38,7 @@ extern void dmq_terminate_receiver(char *name);
3838
extern void dmq_stream_subscribe(const char *stream_name);
3939
extern void dmq_stream_unsubscribe(const char *stream_name);
4040
extern const char *dmq_sender_name(DmqSenderId id);
41+
extern char *dmq_receiver_name(DmqDestinationId dest_id);
4142
extern DmqDestinationId dmq_remote_id(const char *name);
4243

4344
extern const char *
@@ -46,8 +47,8 @@ dmq_pop(DmqSenderId *sender_id, void **msg, Size *len, uint64 mask,
4647
extern bool dmq_pop_nb(DmqSenderId *sender_id, StringInfo msg, uint64 mask);
4748

4849
extern void dmq_push(DmqDestinationId dest_id, char *stream_name, char *msg);
49-
extern void dmq_push_buffer(DmqDestinationId dest_id, char *stream_name,
50-
const void *buffer, size_t len);
50+
extern bool dmq_push_buffer(DmqDestinationId dest_id, char *stream_name,
51+
const void *buffer, size_t len, bool nowait);
5152

5253
typedef void (*dmq_receiver_hook_type) (const char *);
5354
extern dmq_receiver_hook_type dmq_receiver_start_hook;

0 commit comments

Comments
 (0)