|
23 | 23 | #include "storage/procsignal.h"
|
24 | 24 | #include "storage/shm_mq.h"
|
25 | 25 | #include "storage/spin.h"
|
| 26 | +#include "utils/memutils.h" |
26 | 27 |
|
27 | 28 | /*
|
28 | 29 | * This structure represents the actual queue, stored in shared memory.
|
@@ -359,6 +360,13 @@ shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait)
|
359 | 360 | for (i = 0; i < iovcnt; ++i)
|
360 | 361 | nbytes += iov[i].len;
|
361 | 362 |
|
| 363 | + /* Prevent writing messages overwhelming the receiver. */ |
| 364 | + if (nbytes > MaxAllocSize) |
| 365 | + ereport(ERROR, |
| 366 | + (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), |
| 367 | + errmsg("cannot send a message of size %zu via shared memory queue", |
| 368 | + nbytes))); |
| 369 | + |
362 | 370 | /* Try to write, or finish writing, the length word into the buffer. */
|
363 | 371 | while (!mqh->mqh_length_word_complete)
|
364 | 372 | {
|
@@ -652,6 +660,17 @@ shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
|
652 | 660 | }
|
653 | 661 | nbytes = mqh->mqh_expected_bytes;
|
654 | 662 |
|
| 663 | + /* |
| 664 | + * Should be disallowed on the sending side already, but better check and |
| 665 | + * error out on the receiver side as well rather than trying to read a |
| 666 | + * prohibitively large message. |
| 667 | + */ |
| 668 | + if (nbytes > MaxAllocSize) |
| 669 | + ereport(ERROR, |
| 670 | + (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), |
| 671 | + errmsg("invalid message size %zu in shared memory queue", |
| 672 | + nbytes))); |
| 673 | + |
655 | 674 | if (mqh->mqh_partial_bytes == 0)
|
656 | 675 | {
|
657 | 676 | /*
|
@@ -680,8 +699,13 @@ shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
|
680 | 699 | {
|
681 | 700 | Size newbuflen = Max(mqh->mqh_buflen, MQH_INITIAL_BUFSIZE);
|
682 | 701 |
|
| 702 | + /* |
| 703 | + * Double the buffer size until the payload fits, but limit to |
| 704 | + * MaxAllocSize. |
| 705 | + */ |
683 | 706 | while (newbuflen < nbytes)
|
684 | 707 | newbuflen *= 2;
|
| 708 | + newbuflen = Min(newbuflen, MaxAllocSize); |
685 | 709 |
|
686 | 710 | if (mqh->mqh_buffer != NULL)
|
687 | 711 | {
|
|
0 commit comments