Skip to content

Commit 03a11dd

Browse files
committed
DMQ Wrapper + SELECT * from partition works
1 parent 2ffbe84 commit 03a11dd

23 files changed

+984
-781
lines changed

contrib/Makefile

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ SUBDIRS = \
3131
passwordcheck \
3232
pg_buffercache \
3333
pg_exchange \
34-
pg_execplan \
3534
pg_freespacemap \
3635
pg_prewarm \
3736
pg_standby \

contrib/pg_exchange/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ EXTENSION = pg_exchange
55
EXTVERSION = 0.1
66
PGFILEDESC = "pg_exchange - an exchange custom node and rules for the planner"
77
MODULES = pg_exchange
8-
OBJS = pg_exchange.o exchange.o hooks.o common.o nodeDummyscan.o nodeDistPlanExec.o dmq.o $(WIN32RES)
8+
OBJS = pg_exchange.o exchange.o hooks.o common.o nodeDummyscan.o nodeDistPlanExec.o dmq.o stream.o $(WIN32RES)
99

1010
fdw_srcdir = $(top_srcdir)/contrib/postgres_fdw/
1111

contrib/pg_exchange/common.c

Lines changed: 1 addition & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -23,96 +23,4 @@
2323
#include "common.h"
2424

2525

26-
/* GUC variables */
27-
int node_number;
28-
int nodes_at_cluster;
29-
char *pargres_hosts_string = NULL;
30-
char *pargres_ports_string = NULL;
31-
int eports_pool_size = 100;
32-
33-
int CoordNode = -1;
34-
bool PargresInitialized = false;
35-
PortStack *PORTS;
36-
37-
38-
Oid
39-
get_pargres_schema(void)
40-
{
41-
Oid result;
42-
Relation rel;
43-
SysScanDesc scandesc;
44-
HeapTuple tuple;
45-
ScanKeyData entry[1];
46-
Oid ext_oid;
47-
48-
/* It's impossible to fetch pg_pathman's schema now */
49-
if (!IsTransactionState())
50-
return InvalidOid;
51-
52-
ext_oid = get_extension_oid("pargres", true);
53-
if (ext_oid == InvalidOid)
54-
return InvalidOid; /* exit if pg_pathman does not exist */
55-
56-
ScanKeyInit(&entry[0],
57-
ObjectIdAttributeNumber,
58-
BTEqualStrategyNumber, F_OIDEQ,
59-
ObjectIdGetDatum(ext_oid));
60-
61-
rel = heap_open(ExtensionRelationId, AccessShareLock);
62-
scandesc = systable_beginscan(rel, ExtensionOidIndexId, true,
63-
NULL, 1, entry);
64-
65-
tuple = systable_getnext(scandesc);
66-
67-
/* We assume that there can be at most one matching tuple */
68-
if (HeapTupleIsValid(tuple))
69-
result = ((Form_pg_extension) GETSTRUCT(tuple))->extnamespace;
70-
else
71-
result = InvalidOid;
72-
73-
systable_endscan(scandesc);
74-
75-
heap_close(rel, AccessShareLock);
76-
77-
return result;
78-
}
79-
80-
void
81-
STACK_Init(PortStack *stack, int range_min, int size)
82-
{
83-
int i;
84-
85-
LWLockAcquire(&stack->lock, LW_EXCLUSIVE);
86-
87-
stack->size = size;
88-
stack->index = 0;
89-
for (i = 0; i < stack->size; i++)
90-
stack->values[i] = range_min + i;
91-
92-
LWLockRelease(&stack->lock);
93-
}
94-
95-
int
96-
STACK_Pop(PortStack *stack)
97-
{
98-
int value;
99-
100-
LWLockAcquire(&stack->lock, LW_EXCLUSIVE);
101-
102-
Assert(stack->index < stack->size);
103-
value = stack->values[stack->index++];
104-
105-
LWLockRelease(&stack->lock);
106-
return value;
107-
}
108-
109-
void
110-
STACK_Push(PortStack *stack, int value)
111-
{
112-
LWLockAcquire(&stack->lock, LW_EXCLUSIVE);
113-
114-
Assert(stack->index > 0);
115-
stack->values[--stack->index] = value;
116-
117-
LWLockRelease(&stack->lock);
118-
}
26+
ExchangeSharedState *ExchShmem = NULL;

contrib/pg_exchange/common.h

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -17,30 +17,28 @@
1717

1818
#include "nodes/pg_list.h"
1919
#include "storage/lock.h"
20+
#include "dmq.h"
2021

2122

2223
typedef struct
2324
{
24-
LWLock lock;
25-
int size;
26-
int index;
27-
int values[FLEXIBLE_ARRAY_MEMBER];
28-
} PortStack;
25+
Oid serverid;
26+
DmqDestinationId dest_id;
27+
} DMQDestinations;
2928

29+
typedef struct
30+
{
31+
int nservers;
32+
DMQDestinations *dests;
33+
int coordinator_num;
34+
} DMQDestCont;
3035

31-
/* GUC variables */
32-
extern char *pargres_hosts_string;
33-
extern char *pargres_ports_string;
34-
extern int eports_pool_size;
35-
36-
extern PortStack *PORTS;
37-
extern int CoordNode;
38-
extern bool PargresInitialized;
39-
36+
typedef struct
37+
{
38+
LWLock *lock;
39+
HTAB *htab;
40+
} ExchangeSharedState;
4041

41-
extern Oid get_pargres_schema(void);
42-
extern void STACK_Init(PortStack *stack, int range_min, int size);
43-
int STACK_Pop(PortStack *stack);
44-
void STACK_Push(PortStack *stack, int value);
42+
extern ExchangeSharedState *ExchShmem;
4543

4644
#endif /* COMMON_H_ */

contrib/pg_exchange/dmq.c

Lines changed: 71 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@
6262
#define DMQ_CONNSTR_MAX_LEN 1024
6363

6464
#define DMQ_MAX_SUBS_PER_BACKEND 100
65-
#define DMQ_MAX_DESTINATIONS 100
65+
#define DMQ_MAX_DESTINATIONS 127
6666
#define DMQ_MAX_RECEIVERS 100
6767

6868
typedef enum
@@ -118,7 +118,7 @@ struct DmqSharedState
118118

119119

120120
/* Backend-local i/o queues. */
121-
struct
121+
static struct
122122
{
123123
shm_mq_handle *mq_outh;
124124
int n_inhandles;
@@ -294,14 +294,6 @@ dmq_toc_size()
294294
*
295295
*****************************************************************************/
296296

297-
// static void
298-
// fe_close(PGconn *conn)
299-
// {
300-
// PQputCopyEnd(conn, NULL);
301-
// PQflush(conn);
302-
// PQfinish(conn);
303-
// }
304-
305297
static int
306298
fe_send(PGconn *conn, char *msg, size_t len)
307299
{
@@ -435,12 +427,12 @@ dmq_sender_main(Datum main_arg)
435427
res = shm_mq_receive(mq_handles[i], &len, &data, true);
436428
if (res == SHM_MQ_SUCCESS)
437429
{
438-
int conn_id;
430+
DmqDestinationId conn_id;
439431

440432
/* first byte is connection_id */
441-
conn_id = * (char *) data;
442-
data = (char *) data + 1;
443-
len -= 1;
433+
conn_id = * (DmqDestinationId *) data;
434+
data = (char *) data + sizeof(DmqDestinationId);
435+
len -= sizeof(DmqDestinationId);
444436
Assert(0 <= conn_id && conn_id < DMQ_MAX_DESTINATIONS);
445437

446438
if (conns[conn_id].state == Active)
@@ -724,7 +716,9 @@ dmq_handle_message(StringInfo msg, shm_mq_handle **mq_handles, dsm_segment *seg)
724716
{
725717
const char *stream_name;
726718
const char *body;
719+
const char *msgptr;
727720
int body_len;
721+
int msg_len;
728722
bool found;
729723
DmqStreamSubscription *sub;
730724
shm_mq_result res;
@@ -734,9 +728,11 @@ dmq_handle_message(StringInfo msg, shm_mq_handle **mq_handles, dsm_segment *seg)
734728
* as message body with unknown format that we are going to send down to
735729
* the subscribed backend.
736730
*/
737-
stream_name = pq_getmsgrawstring(msg);
738-
body_len = msg->len - msg->cursor;
739-
body = pq_getmsgbytes(msg, body_len);
731+
msg_len = msg->len - msg->cursor;
732+
msgptr = pq_getmsgbytes(msg, msg_len);
733+
stream_name = msgptr;
734+
body = msgptr + strlen(stream_name) + 1;
735+
body_len = msg_len - (body - msgptr);
740736
pq_getmsgend(msg);
741737

742738
/*
@@ -773,7 +769,7 @@ dmq_handle_message(StringInfo msg, shm_mq_handle **mq_handles, dsm_segment *seg)
773769
sub->procno);
774770

775771
/* and send it */
776-
res = shm_mq_send(mq_handles[sub->procno], body_len, body, false);
772+
res = shm_mq_send(mq_handles[sub->procno], msg_len, msgptr, false);
777773
if (res != SHM_MQ_SUCCESS)
778774
{
779775
mtm_log(WARNING, "[DMQ] can't send to queue %d", sub->procno);
@@ -1345,11 +1341,18 @@ dmq_reattach_shm_mq(int handle_id)
13451341
}
13461342

13471343
DmqSenderId
1348-
dmq_attach_receiver(char *sender_name, int mask_pos)
1344+
dmq_attach_receiver(const char *sender_name, int mask_pos)
13491345
{
13501346
int i;
13511347
int handle_id;
13521348

1349+
/* Search for existed receiver. */
1350+
for (i = 0; i < dmq_local.n_inhandles; i++)
1351+
{
1352+
if (strcmp(sender_name, dmq_local.inhandles[i].name) == 0)
1353+
return i;
1354+
}
1355+
13531356
for (i = 0; i < DMQ_MAX_RECEIVERS; i++)
13541357
{
13551358
if (dmq_local.inhandles[i].name[0] == '\0')
@@ -1375,7 +1378,7 @@ dmq_attach_receiver(char *sender_name, int mask_pos)
13751378
}
13761379

13771380
void
1378-
dmq_detach_receiver(char *sender_name)
1381+
dmq_detach_receiver(const char *sender_name)
13791382
{
13801383
int i;
13811384
int handle_id = -1;
@@ -1440,6 +1443,36 @@ dmq_stream_unsubscribe(const char *stream_name)
14401443
Assert(found);
14411444
}
14421445

1446+
const char *
1447+
dmq_sender_name(DmqSenderId id)
1448+
{
1449+
Assert((id >= 0) && (id < dmq_local.n_inhandles));
1450+
1451+
if (dmq_local.inhandles[id].name[0] == '\0')
1452+
return NULL;
1453+
return dmq_local.inhandles[id].name;
1454+
}
1455+
1456+
DmqDestinationId
1457+
dmq_remote_id(const char *name)
1458+
{
1459+
DmqDestinationId i;
1460+
1461+
LWLockAcquire(dmq_state->lock, LW_SHARED);
1462+
for (i = 0; i < DMQ_MAX_DESTINATIONS; i++)
1463+
{
1464+
DmqDestination *dest = &(dmq_state->destinations[i]);
1465+
if (strcmp(name, dest->receiver_name) == 0)
1466+
break;
1467+
}
1468+
LWLockRelease(dmq_state->lock);
1469+
1470+
if (i == DMQ_MAX_DESTINATIONS)
1471+
return -1;
1472+
1473+
return i;
1474+
}
1475+
14431476
/*
14441477
* Get a message from input queue. Execution blocking until message will not
14451478
* received. Returns false, if an error is occured.
@@ -1448,10 +1481,12 @@ dmq_stream_unsubscribe(const char *stream_name)
14481481
* msg - buffer that contains received message.
14491482
* len - size of received message.
14501483
*/
1451-
bool
1452-
dmq_pop(DmqSenderId *sender_id, void **msg, Size *len, uint64 mask)
1484+
const char *
1485+
dmq_pop(DmqSenderId *sender_id, void **msg, Size *len, uint64 mask,
1486+
bool waitMsg)
14531487
{
14541488
shm_mq_result res;
1489+
const char *stream;
14551490

14561491
Assert(msg && len);
14571492

@@ -1477,13 +1512,19 @@ dmq_pop(DmqSenderId *sender_id, void **msg, Size *len, uint64 mask)
14771512

14781513
if (res == SHM_MQ_SUCCESS)
14791514
{
1480-
*msg = data;
1515+
/*
1516+
* Stream name is first null-terminated string in
1517+
* the message buffer.
1518+
*/
1519+
stream = data;
1520+
*msg = (void *) ((char *)data + strlen(stream) + 1);
1521+
*len -= (char *)(*msg) - (char *)data;
14811522
*sender_id = i;
14821523

14831524
mtm_log(DmqTraceIncoming,
14841525
"[DMQ] dmq_pop: got message %s from %s",
14851526
(char *) data, dmq_local.inhandles[i].name);
1486-
return true;
1527+
return stream;
14871528
}
14881529
else if (res == SHM_MQ_DETACHED)
14891530
{
@@ -1498,13 +1539,15 @@ dmq_pop(DmqSenderId *sender_id, void **msg, Size *len, uint64 mask)
14981539
else
14991540
{
15001541
*sender_id = i;
1501-
return false;
1542+
return NULL;
15021543
}
15031544
}
15041545
}
15051546

1506-
if (nowait)
1547+
if (nowait && waitMsg)
15071548
continue;
1549+
if (!waitMsg)
1550+
return NULL;
15081551

15091552
// XXX cache that
15101553
rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT, 10.0,
@@ -1516,6 +1559,7 @@ dmq_pop(DmqSenderId *sender_id, void **msg, Size *len, uint64 mask)
15161559
if (rc & WL_LATCH_SET)
15171560
ResetLatch(MyLatch);
15181561
}
1562+
return NULL;
15191563
}
15201564

15211565
bool
@@ -1566,8 +1610,7 @@ dmq_pop_nb(DmqSenderId *sender_id, StringInfo msg, uint64 mask)
15661610
* sender_name - a symbolic name of the sender. Remote backend will attach
15671611
* to this channel by sender name.
15681612
* See dmq_attach_receiver() routine for details.
1569-
* Call this function after shared memory initialization. For example,
1570-
* extensions may create channels during 'CREATE EXTENSION' command execution.
1613+
* Call this function after shared memory initialization.
15711614
*/
15721615
DmqDestinationId
15731616
dmq_destination_add(char *connstr, char *sender_name, char *receiver_name,

0 commit comments

Comments
 (0)