|
23 | 23 | #include "storage/procsignal.h"
|
24 | 24 | #include "storage/shm_mq.h"
|
25 | 25 | #include "storage/spin.h"
|
| 26 | +#include "utils/memutils.h" |
26 | 27 |
|
27 | 28 | /*
|
28 | 29 | * This structure represents the actual queue, stored in shared memory.
|
@@ -359,6 +360,13 @@ shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait)
|
359 | 360 | for (i = 0; i < iovcnt; ++i)
|
360 | 361 | nbytes += iov[i].len;
|
361 | 362 |
|
| 363 | + /* Prevent writing messages overwhelming the receiver. */ |
| 364 | + if (nbytes > MaxAllocSize) |
| 365 | + ereport(ERROR, |
| 366 | + (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), |
| 367 | + errmsg("cannot send a message of size %zu via shared memory queue", |
| 368 | + nbytes))); |
| 369 | + |
362 | 370 | /* Try to write, or finish writing, the length word into the buffer. */
|
363 | 371 | while (!mqh->mqh_length_word_complete)
|
364 | 372 | {
|
@@ -624,6 +632,17 @@ shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
|
624 | 632 | }
|
625 | 633 | nbytes = mqh->mqh_expected_bytes;
|
626 | 634 |
|
| 635 | + /* |
| 636 | + * Should be disallowed on the sending side already, but better check and |
| 637 | + * error out on the receiver side as well rather than trying to read a |
| 638 | + * prohibitively large message. |
| 639 | + */ |
| 640 | + if (nbytes > MaxAllocSize) |
| 641 | + ereport(ERROR, |
| 642 | + (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), |
| 643 | + errmsg("invalid message size %zu in shared memory queue", |
| 644 | + nbytes))); |
| 645 | + |
627 | 646 | if (mqh->mqh_partial_bytes == 0)
|
628 | 647 | {
|
629 | 648 | /*
|
@@ -652,8 +671,13 @@ shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
|
652 | 671 | {
|
653 | 672 | Size newbuflen = Max(mqh->mqh_buflen, MQH_INITIAL_BUFSIZE);
|
654 | 673 |
|
| 674 | + /* |
| 675 | + * Double the buffer size until the payload fits, but limit to |
| 676 | + * MaxAllocSize. |
| 677 | + */ |
655 | 678 | while (newbuflen < nbytes)
|
656 | 679 | newbuflen *= 2;
|
| 680 | + newbuflen = Min(newbuflen, MaxAllocSize); |
657 | 681 |
|
658 | 682 | if (mqh->mqh_buffer != NULL)
|
659 | 683 | {
|
|
0 commit comments