Skip to content

Commit ea55160

Browse files
committed
In walsender, don't sleep if there's outstanding WAL waiting to be sent,
otherwise we effectively rate-limit the streaming as pointed out by Simon Riggs. Also, send the WAL in smaller chunks, to respond to signals more promptly.
1 parent 4ed4b6c commit ea55160

File tree

1 file changed

+96
-85
lines changed

1 file changed

+96
-85
lines changed

src/backend/replication/walsender.c

Lines changed: 96 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
*
3131
*
3232
* IDENTIFICATION
33-
* $PostgreSQL: pgsql/src/backend/replication/walsender.c,v 1.20 2010/05/09 18:11:55 tgl Exp $
33+
* $PostgreSQL: pgsql/src/backend/replication/walsender.c,v 1.21 2010/05/26 22:21:33 heikki Exp $
3434
*
3535
*-------------------------------------------------------------------------
3636
*/
@@ -100,13 +100,19 @@ static void InitWalSnd(void);
100100
static void WalSndHandshake(void);
101101
static void WalSndKill(int code, Datum arg);
102102
static void XLogRead(char *buf, XLogRecPtr recptr, Size nbytes);
103-
static bool XLogSend(StringInfo outMsg);
103+
static bool XLogSend(StringInfo outMsg, bool *caughtup);
104104
static void CheckClosedConnection(void);
105105

106106
/*
107107
* How much WAL to send in one message? Must be >= XLOG_BLCKSZ.
108+
*
109+
* We don't have a good idea of what a good value would be; there's some
110+
* overhead per message in both walsender and walreceiver, but on the other
111+
* hand sending large batches makes walsender less responsive to signals
112+
* because signals are checked only between messages. 128kB seems like
113+
* a reasonable guess for now.
108114
*/
109-
#define MAX_SEND_SIZE (XLOG_SEG_SIZE / 2)
115+
#define MAX_SEND_SIZE (128 * 1024)
110116

111117
/* Main entry point for walsender process */
112118
int
@@ -360,6 +366,7 @@ static int
360366
WalSndLoop(void)
361367
{
362368
StringInfoData output_message;
369+
bool caughtup = false;
363370

364371
initStringInfo(&output_message);
365372

@@ -387,7 +394,7 @@ WalSndLoop(void)
387394
*/
388395
if (ready_to_stop)
389396
{
390-
XLogSend(&output_message);
397+
XLogSend(&output_message, &caughtup);
391398
shutdown_requested = true;
392399
}
393400

@@ -402,31 +409,32 @@ WalSndLoop(void)
402409
}
403410

404411
/*
405-
* Nap for the configured time or until a message arrives.
412+
* If we had sent all accumulated WAL in last round, nap for the
413+
* configured time before retrying.
406414
*
407415
* On some platforms, signals won't interrupt the sleep. To ensure we
408416
* respond reasonably promptly when someone signals us, break down the
409417
* sleep into NAPTIME_PER_CYCLE increments, and check for
410418
* interrupts after each nap.
411419
*/
412-
remain = WalSndDelay * 1000L;
413-
while (remain > 0)
420+
if (caughtup)
414421
{
415-
if (got_SIGHUP || shutdown_requested || ready_to_stop)
416-
break;
422+
remain = WalSndDelay * 1000L;
423+
while (remain > 0)
424+
{
425+
/* Check for interrupts */
426+
if (got_SIGHUP || shutdown_requested || ready_to_stop)
427+
break;
417428

418-
/*
419-
* Check to see whether a message from the standby or an interrupt
420-
* from other processes has arrived.
421-
*/
422-
pg_usleep(remain > NAPTIME_PER_CYCLE ? NAPTIME_PER_CYCLE : remain);
423-
CheckClosedConnection();
429+
/* Sleep and check that the connection is still alive */
430+
pg_usleep(remain > NAPTIME_PER_CYCLE ? NAPTIME_PER_CYCLE : remain);
431+
CheckClosedConnection();
424432

425-
remain -= NAPTIME_PER_CYCLE;
433+
remain -= NAPTIME_PER_CYCLE;
434+
}
426435
}
427-
428436
/* Attempt to send the log once every loop */
429-
if (!XLogSend(&output_message))
437+
if (!XLogSend(&output_message, &caughtup))
430438
goto eof;
431439
}
432440

@@ -623,15 +631,20 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
623631
}
624632

625633
/*
626-
* Read all WAL that's been written (and flushed) since last cycle, and send
627-
* it to client.
634+
* Read up to MAX_SEND_SIZE bytes of WAL that's been written (and flushed),
635+
* but not yet sent to the client, and send it. If there is no unsent WAL,
636+
* *caughtup is set to true and nothing is sent, otherwise *caughtup is set
637+
* to false.
628638
*
629639
* Returns true if OK, false if trouble.
630640
*/
631641
static bool
632-
XLogSend(StringInfo outMsg)
642+
XLogSend(StringInfo outMsg, bool *caughtup)
633643
{
634644
XLogRecPtr SendRqstPtr;
645+
XLogRecPtr startptr;
646+
XLogRecPtr endptr;
647+
Size nbytes;
635648
char activitymsg[50];
636649

637650
/* use volatile pointer to prevent code rearrangement */
@@ -642,84 +655,82 @@ XLogSend(StringInfo outMsg)
642655

643656
/* Quick exit if nothing to do */
644657
if (!XLByteLT(sentPtr, SendRqstPtr))
658+
{
659+
*caughtup = true;
645660
return true;
661+
}
662+
/*
663+
* Otherwise let the caller know that we're not fully caught up. Unless
664+
* there's a huge backlog, we'll be caught up to the current WriteRecPtr
665+
* after we've sent everything below, but more WAL could accumulate while
666+
* we're busy sending.
667+
*/
668+
*caughtup = false;
646669

647670
/*
648-
* We gather multiple records together by issuing just one XLogRead() of a
649-
* suitable size, and send them as one CopyData message. Repeat until
650-
* we've sent everything we can.
671+
* Figure out how much to send in one message. If there's less than
672+
* MAX_SEND_SIZE bytes to send, send everything. Otherwise send
673+
* MAX_SEND_SIZE bytes, but round to page boundary.
674+
*
675+
* The rounding is not only for performance reasons. Walreceiver
676+
* relies on the fact that we never split a WAL record across two
677+
* messages. Since a long WAL record is split at page boundary into
678+
* continuation records, page boundary is always a safe cut-off point.
679+
* We also assume that SendRqstPtr never points in the middle of a WAL
680+
* record.
651681
*/
652-
while (XLByteLT(sentPtr, SendRqstPtr))
682+
startptr = sentPtr;
683+
if (startptr.xrecoff >= XLogFileSize)
653684
{
654-
XLogRecPtr startptr;
655-
XLogRecPtr endptr;
656-
Size nbytes;
657-
658685
/*
659-
* Figure out how much to send in one message. If there's less than
660-
* MAX_SEND_SIZE bytes to send, send everything. Otherwise send
661-
* MAX_SEND_SIZE bytes, but round to page boundary.
662-
*
663-
* The rounding is not only for performance reasons. Walreceiver
664-
* relies on the fact that we never split a WAL record across two
665-
* messages. Since a long WAL record is split at page boundary into
666-
* continuation records, page boundary is always a safe cut-off point.
667-
* We also assume that SendRqstPtr never points in the middle of a WAL
668-
* record.
686+
* crossing a logid boundary, skip the non-existent last log
687+
* segment in previous logical log file.
669688
*/
670-
startptr = sentPtr;
671-
if (startptr.xrecoff >= XLogFileSize)
672-
{
673-
/*
674-
* crossing a logid boundary, skip the non-existent last log
675-
* segment in previous logical log file.
676-
*/
677-
startptr.xlogid += 1;
678-
startptr.xrecoff = 0;
679-
}
689+
startptr.xlogid += 1;
690+
startptr.xrecoff = 0;
691+
}
680692

681-
endptr = startptr;
682-
XLByteAdvance(endptr, MAX_SEND_SIZE);
683-
/* round down to page boundary. */
684-
endptr.xrecoff -= (endptr.xrecoff % XLOG_BLCKSZ);
685-
/* if we went beyond SendRqstPtr, back off */
686-
if (XLByteLT(SendRqstPtr, endptr))
687-
endptr = SendRqstPtr;
693+
endptr = startptr;
694+
XLByteAdvance(endptr, MAX_SEND_SIZE);
695+
/* round down to page boundary. */
696+
endptr.xrecoff -= (endptr.xrecoff % XLOG_BLCKSZ);
697+
/* if we went beyond SendRqstPtr, back off */
698+
if (XLByteLT(SendRqstPtr, endptr))
699+
endptr = SendRqstPtr;
688700

689-
/*
690-
* OK to read and send the slice.
691-
*
692-
* We don't need to convert the xlogid/xrecoff from host byte order to
693-
* network byte order because the both server can be expected to have
694-
* the same byte order. If they have different byte order, we don't
695-
* reach here.
696-
*/
697-
pq_sendbyte(outMsg, 'w');
698-
pq_sendbytes(outMsg, (char *) &startptr, sizeof(startptr));
701+
/*
702+
* OK to read and send the slice.
703+
*
704+
* We don't need to convert the xlogid/xrecoff from host byte order to
705+
* network byte order because the both server can be expected to have
706+
* the same byte order. If they have different byte order, we don't
707+
* reach here.
708+
*/
709+
pq_sendbyte(outMsg, 'w');
710+
pq_sendbytes(outMsg, (char *) &startptr, sizeof(startptr));
699711

700-
if (endptr.xlogid != startptr.xlogid)
701-
{
702-
Assert(endptr.xlogid == startptr.xlogid + 1);
703-
nbytes = endptr.xrecoff + XLogFileSize - startptr.xrecoff;
704-
}
705-
else
706-
nbytes = endptr.xrecoff - startptr.xrecoff;
712+
if (endptr.xlogid != startptr.xlogid)
713+
{
714+
Assert(endptr.xlogid == startptr.xlogid + 1);
715+
nbytes = endptr.xrecoff + XLogFileSize - startptr.xrecoff;
716+
}
717+
else
718+
nbytes = endptr.xrecoff - startptr.xrecoff;
707719

708-
sentPtr = endptr;
720+
sentPtr = endptr;
709721

710-
/*
711-
* Read the log directly into the output buffer to prevent extra
712-
* memcpy calls.
713-
*/
714-
enlargeStringInfo(outMsg, nbytes);
722+
/*
723+
* Read the log directly into the output buffer to prevent extra
724+
* memcpy calls.
725+
*/
726+
enlargeStringInfo(outMsg, nbytes);
715727

716-
XLogRead(&outMsg->data[outMsg->len], startptr, nbytes);
717-
outMsg->len += nbytes;
718-
outMsg->data[outMsg->len] = '\0';
728+
XLogRead(&outMsg->data[outMsg->len], startptr, nbytes);
729+
outMsg->len += nbytes;
730+
outMsg->data[outMsg->len] = '\0';
719731

720-
pq_putmessage('d', outMsg->data, outMsg->len);
721-
resetStringInfo(outMsg);
722-
}
732+
pq_putmessage('d', outMsg->data, outMsg->len);
733+
resetStringInfo(outMsg);
723734

724735
/* Update shared memory status */
725736
SpinLockAcquire(&walsnd->mutex);

0 commit comments

Comments
 (0)