62
62
#define DMQ_CONNSTR_MAX_LEN 1024
63
63
64
64
#define DMQ_MAX_SUBS_PER_BACKEND 100
65
- #define DMQ_MAX_DESTINATIONS 100
65
+ #define DMQ_MAX_DESTINATIONS 127
66
66
#define DMQ_MAX_RECEIVERS 100
67
67
68
68
typedef enum
@@ -118,7 +118,7 @@ struct DmqSharedState
118
118
119
119
120
120
/* Backend-local i/o queues. */
121
- struct
121
+ static struct
122
122
{
123
123
shm_mq_handle * mq_outh ;
124
124
int n_inhandles ;
@@ -294,14 +294,6 @@ dmq_toc_size()
294
294
*
295
295
*****************************************************************************/
296
296
297
- // static void
298
- // fe_close(PGconn *conn)
299
- // {
300
- // PQputCopyEnd(conn, NULL);
301
- // PQflush(conn);
302
- // PQfinish(conn);
303
- // }
304
-
305
297
static int
306
298
fe_send (PGconn * conn , char * msg , size_t len )
307
299
{
@@ -435,12 +427,12 @@ dmq_sender_main(Datum main_arg)
435
427
res = shm_mq_receive (mq_handles [i ], & len , & data , true);
436
428
if (res == SHM_MQ_SUCCESS )
437
429
{
438
- int conn_id ;
430
+ DmqDestinationId conn_id ;
439
431
440
432
/* first byte is connection_id */
441
- conn_id = * (char * ) data ;
442
- data = (char * ) data + 1 ;
443
- len -= 1 ;
433
+ conn_id = * (DmqDestinationId * ) data ;
434
+ data = (char * ) data + sizeof ( DmqDestinationId ) ;
435
+ len -= sizeof ( DmqDestinationId ) ;
444
436
Assert (0 <= conn_id && conn_id < DMQ_MAX_DESTINATIONS );
445
437
446
438
if (conns [conn_id ].state == Active )
@@ -724,7 +716,9 @@ dmq_handle_message(StringInfo msg, shm_mq_handle **mq_handles, dsm_segment *seg)
724
716
{
725
717
const char * stream_name ;
726
718
const char * body ;
719
+ const char * msgptr ;
727
720
int body_len ;
721
+ int msg_len ;
728
722
bool found ;
729
723
DmqStreamSubscription * sub ;
730
724
shm_mq_result res ;
@@ -734,9 +728,11 @@ dmq_handle_message(StringInfo msg, shm_mq_handle **mq_handles, dsm_segment *seg)
734
728
* as message body with unknown format that we are going to send down to
735
729
* the subscribed backend.
736
730
*/
737
- stream_name = pq_getmsgrawstring (msg );
738
- body_len = msg -> len - msg -> cursor ;
739
- body = pq_getmsgbytes (msg , body_len );
731
+ msg_len = msg -> len - msg -> cursor ;
732
+ msgptr = pq_getmsgbytes (msg , msg_len );
733
+ stream_name = msgptr ;
734
+ body = msgptr + strlen (stream_name ) + 1 ;
735
+ body_len = msg_len - (body - msgptr );
740
736
pq_getmsgend (msg );
741
737
742
738
/*
@@ -773,7 +769,7 @@ dmq_handle_message(StringInfo msg, shm_mq_handle **mq_handles, dsm_segment *seg)
773
769
sub -> procno );
774
770
775
771
/* and send it */
776
- res = shm_mq_send (mq_handles [sub -> procno ], body_len , body , false);
772
+ res = shm_mq_send (mq_handles [sub -> procno ], msg_len , msgptr , false);
777
773
if (res != SHM_MQ_SUCCESS )
778
774
{
779
775
mtm_log (WARNING , "[DMQ] can't send to queue %d" , sub -> procno );
@@ -1345,11 +1341,18 @@ dmq_reattach_shm_mq(int handle_id)
1345
1341
}
1346
1342
1347
1343
DmqSenderId
1348
- dmq_attach_receiver (char * sender_name , int mask_pos )
1344
+ dmq_attach_receiver (const char * sender_name , int mask_pos )
1349
1345
{
1350
1346
int i ;
1351
1347
int handle_id ;
1352
1348
1349
+ /* Search for existed receiver. */
1350
+ for (i = 0 ; i < dmq_local .n_inhandles ; i ++ )
1351
+ {
1352
+ if (strcmp (sender_name , dmq_local .inhandles [i ].name ) == 0 )
1353
+ return i ;
1354
+ }
1355
+
1353
1356
for (i = 0 ; i < DMQ_MAX_RECEIVERS ; i ++ )
1354
1357
{
1355
1358
if (dmq_local .inhandles [i ].name [0 ] == '\0' )
@@ -1375,7 +1378,7 @@ dmq_attach_receiver(char *sender_name, int mask_pos)
1375
1378
}
1376
1379
1377
1380
void
1378
- dmq_detach_receiver (char * sender_name )
1381
+ dmq_detach_receiver (const char * sender_name )
1379
1382
{
1380
1383
int i ;
1381
1384
int handle_id = -1 ;
@@ -1440,6 +1443,36 @@ dmq_stream_unsubscribe(const char *stream_name)
1440
1443
Assert (found );
1441
1444
}
1442
1445
1446
+ const char *
1447
+ dmq_sender_name (DmqSenderId id )
1448
+ {
1449
+ Assert ((id >= 0 ) && (id < dmq_local .n_inhandles ));
1450
+
1451
+ if (dmq_local .inhandles [id ].name [0 ] == '\0' )
1452
+ return NULL ;
1453
+ return dmq_local .inhandles [id ].name ;
1454
+ }
1455
+
1456
+ DmqDestinationId
1457
+ dmq_remote_id (const char * name )
1458
+ {
1459
+ DmqDestinationId i ;
1460
+
1461
+ LWLockAcquire (dmq_state -> lock , LW_SHARED );
1462
+ for (i = 0 ; i < DMQ_MAX_DESTINATIONS ; i ++ )
1463
+ {
1464
+ DmqDestination * dest = & (dmq_state -> destinations [i ]);
1465
+ if (strcmp (name , dest -> receiver_name ) == 0 )
1466
+ break ;
1467
+ }
1468
+ LWLockRelease (dmq_state -> lock );
1469
+
1470
+ if (i == DMQ_MAX_DESTINATIONS )
1471
+ return -1 ;
1472
+
1473
+ return i ;
1474
+ }
1475
+
1443
1476
/*
1444
1477
* Get a message from input queue. Execution blocking until message will not
1445
1478
* received. Returns false, if an error is occured.
@@ -1448,10 +1481,12 @@ dmq_stream_unsubscribe(const char *stream_name)
1448
1481
* msg - buffer that contains received message.
1449
1482
* len - size of received message.
1450
1483
*/
1451
- bool
1452
- dmq_pop (DmqSenderId * sender_id , void * * msg , Size * len , uint64 mask )
1484
+ const char *
1485
+ dmq_pop (DmqSenderId * sender_id , void * * msg , Size * len , uint64 mask ,
1486
+ bool waitMsg )
1453
1487
{
1454
1488
shm_mq_result res ;
1489
+ const char * stream ;
1455
1490
1456
1491
Assert (msg && len );
1457
1492
@@ -1477,13 +1512,19 @@ dmq_pop(DmqSenderId *sender_id, void **msg, Size *len, uint64 mask)
1477
1512
1478
1513
if (res == SHM_MQ_SUCCESS )
1479
1514
{
1480
- * msg = data ;
1515
+ /*
1516
+ * Stream name is first null-terminated string in
1517
+ * the message buffer.
1518
+ */
1519
+ stream = data ;
1520
+ * msg = (void * ) ((char * )data + strlen (stream ) + 1 );
1521
+ * len -= (char * )(* msg ) - (char * )data ;
1481
1522
* sender_id = i ;
1482
1523
1483
1524
mtm_log (DmqTraceIncoming ,
1484
1525
"[DMQ] dmq_pop: got message %s from %s" ,
1485
1526
(char * ) data , dmq_local .inhandles [i ].name );
1486
- return true ;
1527
+ return stream ;
1487
1528
}
1488
1529
else if (res == SHM_MQ_DETACHED )
1489
1530
{
@@ -1498,13 +1539,15 @@ dmq_pop(DmqSenderId *sender_id, void **msg, Size *len, uint64 mask)
1498
1539
else
1499
1540
{
1500
1541
* sender_id = i ;
1501
- return false ;
1542
+ return NULL ;
1502
1543
}
1503
1544
}
1504
1545
}
1505
1546
1506
- if (nowait )
1547
+ if (nowait && waitMsg )
1507
1548
continue ;
1549
+ if (!waitMsg )
1550
+ return NULL ;
1508
1551
1509
1552
// XXX cache that
1510
1553
rc = WaitLatch (MyLatch , WL_LATCH_SET | WL_TIMEOUT , 10.0 ,
@@ -1516,6 +1559,7 @@ dmq_pop(DmqSenderId *sender_id, void **msg, Size *len, uint64 mask)
1516
1559
if (rc & WL_LATCH_SET )
1517
1560
ResetLatch (MyLatch );
1518
1561
}
1562
+ return NULL ;
1519
1563
}
1520
1564
1521
1565
bool
@@ -1566,8 +1610,7 @@ dmq_pop_nb(DmqSenderId *sender_id, StringInfo msg, uint64 mask)
1566
1610
* sender_name - a symbolic name of the sender. Remote backend will attach
1567
1611
* to this channel by sender name.
1568
1612
* See dmq_attach_receiver() routine for details.
1569
- * Call this function after shared memory initialization. For example,
1570
- * extensions may create channels during 'CREATE EXTENSION' command execution.
1613
+ * Call this function after shared memory initialization.
1571
1614
*/
1572
1615
DmqDestinationId
1573
1616
dmq_destination_add (char * connstr , char * sender_name , char * receiver_name ,
0 commit comments