Skip to content

Commit 4684643

Browse files
committed
shm_mq: Update mq_bytes_written less often.
Do not update shm_mq's mq_bytes_written until we have written an amount of data greater than 1/4th of the ring size, unless the caller of shm_mq_send(v) requests a flush at the end of the message. This reduces the number of calls to SetLatch(), and also the number of CPU cache misses, considerably, and thus makes shm_mq significantly faster. Dilip Kumar, reviewed by Zhihong Yu and Tomas Vondra. Some minor cosmetic changes by me. Discussion: http://postgr.es/m/CAFiTN-tVXqn_OG7tHNeSkBbN+iiCZTiQ83uakax43y1sQb2OBA@mail.gmail.com
1 parent 7821a0b commit 4684643

File tree

6 files changed

+69
-19
lines changed

6 files changed

+69
-19
lines changed

src/backend/executor/tqueue.c

+1-1
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self)
6060

6161
/* Send the tuple itself. */
6262
tuple = ExecFetchSlotMinimalTuple(slot, &should_free);
63-
result = shm_mq_send(tqueue->queue, tuple->t_len, tuple, false);
63+
result = shm_mq_send(tqueue->queue, tuple->t_len, tuple, false, false);
6464

6565
if (should_free)
6666
pfree(tuple);

src/backend/libpq/pqmq.c

+6-1
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,12 @@ mq_putmessage(char msgtype, const char *s, size_t len)
154154

155155
for (;;)
156156
{
157-
result = shm_mq_sendv(pq_mq_handle, iov, 2, true);
157+
/*
158+
* Immediately notify the receiver by passing force_flush as true so
159+
* that the shared memory value is updated before we send the parallel
160+
* message signal right after this.
161+
*/
162+
result = shm_mq_sendv(pq_mq_handle, iov, 2, true, true);
158163

159164
if (pq_mq_parallel_leader_pid != 0)
160165
SendProcSignal(pq_mq_parallel_leader_pid,

src/backend/storage/ipc/shm_mq.c

+52-10
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,12 @@ struct shm_mq
109109
* locally by copying the chunks into a backend-local buffer. mqh_buffer is
110110
* the buffer, and mqh_buflen is the number of bytes allocated for it.
111111
*
112+
* mqh_send_pending, is number of bytes that is written to the queue but not
113+
* yet updated in the shared memory. We will not update it until the written
114+
* data is 1/4th of the ring size or the tuple queue is full. This will
115+
* prevent frequent CPU cache misses, and it will also avoid frequent
116+
* SetLatch() calls, which are quite expensive.
117+
*
112118
* mqh_partial_bytes, mqh_expected_bytes, and mqh_length_word_complete
113119
* are used to track the state of non-blocking operations. When the caller
114120
* attempts a non-blocking operation that returns SHM_MQ_WOULD_BLOCK, they
@@ -137,6 +143,7 @@ struct shm_mq_handle
137143
char *mqh_buffer;
138144
Size mqh_buflen;
139145
Size mqh_consume_pending;
146+
Size mqh_send_pending;
140147
Size mqh_partial_bytes;
141148
Size mqh_expected_bytes;
142149
bool mqh_length_word_complete;
@@ -292,6 +299,7 @@ shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
292299
mqh->mqh_buffer = NULL;
293300
mqh->mqh_buflen = 0;
294301
mqh->mqh_consume_pending = 0;
302+
mqh->mqh_send_pending = 0;
295303
mqh->mqh_partial_bytes = 0;
296304
mqh->mqh_expected_bytes = 0;
297305
mqh->mqh_length_word_complete = false;
@@ -319,14 +327,15 @@ shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle)
319327
* Write a message into a shared message queue.
320328
*/
321329
shm_mq_result
322-
shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait)
330+
shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait,
331+
bool force_flush)
323332
{
324333
shm_mq_iovec iov;
325334

326335
iov.data = data;
327336
iov.len = nbytes;
328337

329-
return shm_mq_sendv(mqh, &iov, 1, nowait);
338+
return shm_mq_sendv(mqh, &iov, 1, nowait, force_flush);
330339
}
331340

332341
/*
@@ -343,9 +352,15 @@ shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait)
343352
* arguments, each time the process latch is set. (Once begun, the sending
344353
* of a message cannot be aborted except by detaching from the queue; changing
345354
* the length or payload will corrupt the queue.)
355+
*
356+
* When force_flush = true, we immediately update the shm_mq's mq_bytes_written
357+
* and notify the receiver (if it is already attached). Otherwise, we don't
358+
* update it until we have written an amount of data greater than 1/4th of the
359+
* ring size.
346360
*/
347361
shm_mq_result
348-
shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait)
362+
shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait,
363+
bool force_flush)
349364
{
350365
shm_mq_result res;
351366
shm_mq *mq = mqh->mqh_queue;
@@ -518,8 +533,18 @@ shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait)
518533
mqh->mqh_counterparty_attached = true;
519534
}
520535

521-
/* Notify receiver of the newly-written data, and return. */
522-
SetLatch(&receiver->procLatch);
536+
/*
537+
* If the caller has requested force flush or we have written more than 1/4
538+
* of the ring size, mark it as written in shared memory and notify the
539+
* receiver.
540+
*/
541+
if (force_flush || mqh->mqh_send_pending > (mq->mq_ring_size >> 2))
542+
{
543+
shm_mq_inc_bytes_written(mq, mqh->mqh_send_pending);
544+
SetLatch(&receiver->procLatch);
545+
mqh->mqh_send_pending = 0;
546+
}
547+
523548
return SHM_MQ_SUCCESS;
524549
}
525550

@@ -816,6 +841,13 @@ shm_mq_wait_for_attach(shm_mq_handle *mqh)
816841
void
817842
shm_mq_detach(shm_mq_handle *mqh)
818843
{
844+
/* Before detaching, notify the receiver about any already-written data. */
845+
if (mqh->mqh_send_pending > 0)
846+
{
847+
shm_mq_inc_bytes_written(mqh->mqh_queue, mqh->mqh_send_pending);
848+
mqh->mqh_send_pending = 0;
849+
}
850+
819851
/* Notify counterparty that we're outta here. */
820852
shm_mq_detach_internal(mqh->mqh_queue);
821853

@@ -894,7 +926,7 @@ shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
894926

895927
/* Compute number of ring buffer bytes used and available. */
896928
rb = pg_atomic_read_u64(&mq->mq_bytes_read);
897-
wb = pg_atomic_read_u64(&mq->mq_bytes_written);
929+
wb = pg_atomic_read_u64(&mq->mq_bytes_written) + mqh->mqh_send_pending;
898930
Assert(wb >= rb);
899931
used = wb - rb;
900932
Assert(used <= ringsize);
@@ -951,6 +983,9 @@ shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
951983
}
952984
else if (available == 0)
953985
{
986+
/* Update the pending send bytes in the shared memory. */
987+
shm_mq_inc_bytes_written(mq, mqh->mqh_send_pending);
988+
954989
/*
955990
* Since mq->mqh_counterparty_attached is known to be true at this
956991
* point, mq_receiver has been set, and it can't change once set.
@@ -959,6 +994,12 @@ shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
959994
Assert(mqh->mqh_counterparty_attached);
960995
SetLatch(&mq->mq_receiver->procLatch);
961996

997+
/*
998+
* We have just updated the mqh_send_pending bytes in the shared
999+
* memory so reset it.
1000+
*/
1001+
mqh->mqh_send_pending = 0;
1002+
9621003
/* Skip manipulation of our latch if nowait = true. */
9631004
if (nowait)
9641005
{
@@ -1009,13 +1050,14 @@ shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
10091050
* MAXIMUM_ALIGNOF, and each read is as well.
10101051
*/
10111052
Assert(sent == nbytes || sendnow == MAXALIGN(sendnow));
1012-
shm_mq_inc_bytes_written(mq, MAXALIGN(sendnow));
10131053

10141054
/*
1015-
* For efficiency, we don't set the reader's latch here. We'll do
1016-
* that only when the buffer fills up or after writing an entire
1017-
* message.
1055+
* For efficiency, we don't update the bytes written in the shared
1056+
* memory and also don't set the reader's latch here. Refer to
1057+
* the comments atop the shm_mq_handle structure for more
1058+
* information.
10181059
*/
1060+
mqh->mqh_send_pending += MAXALIGN(sendnow);
10191061
}
10201062
}
10211063

src/include/storage/shm_mq.h

+5-3
Original file line numberDiff line numberDiff line change
@@ -70,11 +70,13 @@ extern shm_mq *shm_mq_get_queue(shm_mq_handle *mqh);
7070

7171
/* Send or receive messages. */
7272
extern shm_mq_result shm_mq_send(shm_mq_handle *mqh,
73-
Size nbytes, const void *data, bool nowait);
74-
extern shm_mq_result shm_mq_sendv(shm_mq_handle *mqh,
75-
shm_mq_iovec *iov, int iovcnt, bool nowait);
73+
Size nbytes, const void *data, bool nowait,
74+
bool force_flush);
75+
extern shm_mq_result shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov,
76+
int iovcnt, bool nowait, bool force_flush);
7677
extern shm_mq_result shm_mq_receive(shm_mq_handle *mqh,
7778
Size *nbytesp, void **datap, bool nowait);
79+
extern void shm_mq_flush(shm_mq_handle *mqh);
7880

7981
/* Wait for our counterparty to attach to the queue. */
8082
extern shm_mq_result shm_mq_wait_for_attach(shm_mq_handle *mqh);

src/test/modules/test_shm_mq/test.c

+4-3
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ test_shm_mq(PG_FUNCTION_ARGS)
7373
test_shm_mq_setup(queue_size, nworkers, &seg, &outqh, &inqh);
7474

7575
/* Send the initial message. */
76-
res = shm_mq_send(outqh, message_size, message_contents, false);
76+
res = shm_mq_send(outqh, message_size, message_contents, false, true);
7777
if (res != SHM_MQ_SUCCESS)
7878
ereport(ERROR,
7979
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
@@ -97,7 +97,7 @@ test_shm_mq(PG_FUNCTION_ARGS)
9797
break;
9898

9999
/* Send it back out. */
100-
res = shm_mq_send(outqh, len, data, false);
100+
res = shm_mq_send(outqh, len, data, false, true);
101101
if (res != SHM_MQ_SUCCESS)
102102
ereport(ERROR,
103103
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
@@ -177,7 +177,8 @@ test_shm_mq_pipelined(PG_FUNCTION_ARGS)
177177
*/
178178
if (send_count < loop_count)
179179
{
180-
res = shm_mq_send(outqh, message_size, message_contents, true);
180+
res = shm_mq_send(outqh, message_size, message_contents, true,
181+
true);
181182
if (res == SHM_MQ_SUCCESS)
182183
{
183184
++send_count;

src/test/modules/test_shm_mq/worker.c

+1-1
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ copy_messages(shm_mq_handle *inqh, shm_mq_handle *outqh)
190190
break;
191191

192192
/* Send it back out. */
193-
res = shm_mq_send(outqh, len, data, false);
193+
res = shm_mq_send(outqh, len, data, false, true);
194194
if (res != SHM_MQ_SUCCESS)
195195
break;
196196
}

0 commit comments

Comments
 (0)