@@ -109,6 +109,12 @@ struct shm_mq
109
109
* locally by copying the chunks into a backend-local buffer. mqh_buffer is
110
110
* the buffer, and mqh_buflen is the number of bytes allocated for it.
111
111
*
112
+ * mqh_send_pending, is number of bytes that is written to the queue but not
113
+ * yet updated in the shared memory. We will not update it until the written
114
+ * data is 1/4th of the ring size or the tuple queue is full. This will
115
+ * prevent frequent CPU cache misses, and it will also avoid frequent
116
+ * SetLatch() calls, which are quite expensive.
117
+ *
112
118
* mqh_partial_bytes, mqh_expected_bytes, and mqh_length_word_complete
113
119
* are used to track the state of non-blocking operations. When the caller
114
120
* attempts a non-blocking operation that returns SHM_MQ_WOULD_BLOCK, they
@@ -137,6 +143,7 @@ struct shm_mq_handle
137
143
char * mqh_buffer ;
138
144
Size mqh_buflen ;
139
145
Size mqh_consume_pending ;
146
+ Size mqh_send_pending ;
140
147
Size mqh_partial_bytes ;
141
148
Size mqh_expected_bytes ;
142
149
bool mqh_length_word_complete ;
@@ -292,6 +299,7 @@ shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
292
299
mqh -> mqh_buffer = NULL ;
293
300
mqh -> mqh_buflen = 0 ;
294
301
mqh -> mqh_consume_pending = 0 ;
302
+ mqh -> mqh_send_pending = 0 ;
295
303
mqh -> mqh_partial_bytes = 0 ;
296
304
mqh -> mqh_expected_bytes = 0 ;
297
305
mqh -> mqh_length_word_complete = false;
@@ -319,14 +327,15 @@ shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle)
319
327
* Write a message into a shared message queue.
320
328
*/
321
329
shm_mq_result
322
- shm_mq_send (shm_mq_handle * mqh , Size nbytes , const void * data , bool nowait )
330
+ shm_mq_send (shm_mq_handle * mqh , Size nbytes , const void * data , bool nowait ,
331
+ bool force_flush )
323
332
{
324
333
shm_mq_iovec iov ;
325
334
326
335
iov .data = data ;
327
336
iov .len = nbytes ;
328
337
329
- return shm_mq_sendv (mqh , & iov , 1 , nowait );
338
+ return shm_mq_sendv (mqh , & iov , 1 , nowait , force_flush );
330
339
}
331
340
332
341
/*
@@ -343,9 +352,15 @@ shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait)
343
352
* arguments, each time the process latch is set. (Once begun, the sending
344
353
* of a message cannot be aborted except by detaching from the queue; changing
345
354
* the length or payload will corrupt the queue.)
355
+ *
356
+ * When force_flush = true, we immediately update the shm_mq's mq_bytes_written
357
+ * and notify the receiver (if it is already attached). Otherwise, we don't
358
+ * update it until we have written an amount of data greater than 1/4th of the
359
+ * ring size.
346
360
*/
347
361
shm_mq_result
348
- shm_mq_sendv (shm_mq_handle * mqh , shm_mq_iovec * iov , int iovcnt , bool nowait )
362
+ shm_mq_sendv (shm_mq_handle * mqh , shm_mq_iovec * iov , int iovcnt , bool nowait ,
363
+ bool force_flush )
349
364
{
350
365
shm_mq_result res ;
351
366
shm_mq * mq = mqh -> mqh_queue ;
@@ -518,8 +533,18 @@ shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait)
518
533
mqh -> mqh_counterparty_attached = true;
519
534
}
520
535
521
- /* Notify receiver of the newly-written data, and return. */
522
- SetLatch (& receiver -> procLatch );
536
+ /*
537
+ * If the caller has requested force flush or we have written more than 1/4
538
+ * of the ring size, mark it as written in shared memory and notify the
539
+ * receiver.
540
+ */
541
+ if (force_flush || mqh -> mqh_send_pending > (mq -> mq_ring_size >> 2 ))
542
+ {
543
+ shm_mq_inc_bytes_written (mq , mqh -> mqh_send_pending );
544
+ SetLatch (& receiver -> procLatch );
545
+ mqh -> mqh_send_pending = 0 ;
546
+ }
547
+
523
548
return SHM_MQ_SUCCESS ;
524
549
}
525
550
@@ -816,6 +841,13 @@ shm_mq_wait_for_attach(shm_mq_handle *mqh)
816
841
void
817
842
shm_mq_detach (shm_mq_handle * mqh )
818
843
{
844
+ /* Before detaching, notify the receiver about any already-written data. */
845
+ if (mqh -> mqh_send_pending > 0 )
846
+ {
847
+ shm_mq_inc_bytes_written (mqh -> mqh_queue , mqh -> mqh_send_pending );
848
+ mqh -> mqh_send_pending = 0 ;
849
+ }
850
+
819
851
/* Notify counterparty that we're outta here. */
820
852
shm_mq_detach_internal (mqh -> mqh_queue );
821
853
@@ -894,7 +926,7 @@ shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
894
926
895
927
/* Compute number of ring buffer bytes used and available. */
896
928
rb = pg_atomic_read_u64 (& mq -> mq_bytes_read );
897
- wb = pg_atomic_read_u64 (& mq -> mq_bytes_written );
929
+ wb = pg_atomic_read_u64 (& mq -> mq_bytes_written ) + mqh -> mqh_send_pending ;
898
930
Assert (wb >= rb );
899
931
used = wb - rb ;
900
932
Assert (used <= ringsize );
@@ -951,6 +983,9 @@ shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
951
983
}
952
984
else if (available == 0 )
953
985
{
986
+ /* Update the pending send bytes in the shared memory. */
987
+ shm_mq_inc_bytes_written (mq , mqh -> mqh_send_pending );
988
+
954
989
/*
955
990
* Since mq->mqh_counterparty_attached is known to be true at this
956
991
* point, mq_receiver has been set, and it can't change once set.
@@ -959,6 +994,12 @@ shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
959
994
Assert (mqh -> mqh_counterparty_attached );
960
995
SetLatch (& mq -> mq_receiver -> procLatch );
961
996
997
+ /*
998
+ * We have just updated the mqh_send_pending bytes in the shared
999
+ * memory so reset it.
1000
+ */
1001
+ mqh -> mqh_send_pending = 0 ;
1002
+
962
1003
/* Skip manipulation of our latch if nowait = true. */
963
1004
if (nowait )
964
1005
{
@@ -1009,13 +1050,14 @@ shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
1009
1050
* MAXIMUM_ALIGNOF, and each read is as well.
1010
1051
*/
1011
1052
Assert (sent == nbytes || sendnow == MAXALIGN (sendnow ));
1012
- shm_mq_inc_bytes_written (mq , MAXALIGN (sendnow ));
1013
1053
1014
1054
/*
1015
- * For efficiency, we don't set the reader's latch here. We'll do
1016
- * that only when the buffer fills up or after writing an entire
1017
- * message.
1055
+ * For efficiency, we don't update the bytes written in the shared
1056
+ * memory and also don't set the reader's latch here. Refer to
1057
+ * the comments atop the shm_mq_handle structure for more
1058
+ * information.
1018
1059
*/
1060
+ mqh -> mqh_send_pending += MAXALIGN (sendnow );
1019
1061
}
1020
1062
}
1021
1063
0 commit comments