Skip to content

Commit f003d9f

Browse files
committed
Add circular WAL decoding buffer.
Teach xlogreader.c to decode its output into a circular buffer, to support optimizations based on looking ahead. * XLogReadRecord() works as before, consuming records one by one, and allowing them to be examined via the traditional XLogRecGetXXX() macros. * An alternative new interface XLogNextRecord() is added that returns pointers to DecodedXLogRecord structs that can be examined directly. * XLogReadAhead() provides a second cursor that lets you see further ahead, as long as data is available and there is enough space in the decoding buffer. This returns DecodedXLogRecord pointers to the caller, but also adds them to a queue of records that will later be consumed by XLogNextRecord()/XLogReadRecord(). The buffer's size is controlled with wal_decode_buffer_size. The buffer could potentially be placed into shared memory, for future projects. Large records that don't fit in the circular buffer are called "oversized" and allocated separately with palloc(). Discussion: https://postgr.es/m/CA+hUKGJ4VJN8ttxScUFM8dOKX0BrBiboo5uz1cq=AovOddfHpA@mail.gmail.com
1 parent 323cbe7 commit f003d9f

File tree

8 files changed

+734
-200
lines changed

8 files changed

+734
-200
lines changed

src/backend/access/transam/generic_xlog.c

+3-3
Original file line numberDiff line numberDiff line change
@@ -482,10 +482,10 @@ generic_redo(XLogReaderState *record)
482482
uint8 block_id;
483483

484484
/* Protect limited size of buffers[] array */
485-
Assert(record->max_block_id < MAX_GENERIC_XLOG_PAGES);
485+
Assert(XLogRecMaxBlockId(record) < MAX_GENERIC_XLOG_PAGES);
486486

487487
/* Iterate over blocks */
488-
for (block_id = 0; block_id <= record->max_block_id; block_id++)
488+
for (block_id = 0; block_id <= XLogRecMaxBlockId(record); block_id++)
489489
{
490490
XLogRedoAction action;
491491

@@ -525,7 +525,7 @@ generic_redo(XLogReaderState *record)
525525
}
526526

527527
/* Changes are done: unlock and release all buffers */
528-
for (block_id = 0; block_id <= record->max_block_id; block_id++)
528+
for (block_id = 0; block_id <= XLogRecMaxBlockId(record); block_id++)
529529
{
530530
if (BufferIsValid(buffers[block_id]))
531531
UnlockReleaseBuffer(buffers[block_id]);

src/backend/access/transam/xlog.c

+23-5
Original file line numberDiff line numberDiff line change
@@ -1209,6 +1209,7 @@ XLogInsertRecord(XLogRecData *rdata,
12091209
StringInfoData recordBuf;
12101210
char *errormsg = NULL;
12111211
MemoryContext oldCxt;
1212+
DecodedXLogRecord *decoded;
12121213

12131214
oldCxt = MemoryContextSwitchTo(walDebugCxt);
12141215

@@ -1224,14 +1225,19 @@ XLogInsertRecord(XLogRecData *rdata,
12241225
for (; rdata != NULL; rdata = rdata->next)
12251226
appendBinaryStringInfo(&recordBuf, rdata->data, rdata->len);
12261227

1228+
/* How much space would it take to decode this record? */
1229+
decoded = palloc(DecodeXLogRecordRequiredSpace(recordBuf.len));
1230+
12271231
if (!debug_reader)
12281232
debug_reader = XLogReaderAllocate(wal_segment_size, NULL, NULL);
12291233

12301234
if (!debug_reader)
12311235
{
12321236
appendStringInfoString(&buf, "error decoding record: out of memory");
12331237
}
1234-
else if (!DecodeXLogRecord(debug_reader, (XLogRecord *) recordBuf.data,
1238+
else if (!DecodeXLogRecord(debug_reader, decoded,
1239+
(XLogRecord *) recordBuf.data,
1240+
EndPos,
12351241
&errormsg))
12361242
{
12371243
appendStringInfo(&buf, "error decoding record: %s",
@@ -1240,10 +1246,17 @@ XLogInsertRecord(XLogRecData *rdata,
12401246
else
12411247
{
12421248
appendStringInfoString(&buf, " - ");
1249+
/*
1250+
* Temporarily make this decoded record the current record for
1251+
* XLogRecGetXXX() macros.
1252+
*/
1253+
debug_reader->record = decoded;
12431254
xlog_outdesc(&buf, debug_reader);
1255+
debug_reader->record = NULL;
12441256
}
12451257
elog(LOG, "%s", buf.data);
12461258

1259+
pfree(decoded);
12471260
pfree(buf.data);
12481261
pfree(recordBuf.data);
12491262
MemoryContextSwitchTo(oldCxt);
@@ -1417,7 +1430,7 @@ checkXLogConsistency(XLogReaderState *record)
14171430

14181431
Assert((XLogRecGetInfo(record) & XLR_CHECK_CONSISTENCY) != 0);
14191432

1420-
for (block_id = 0; block_id <= record->max_block_id; block_id++)
1433+
for (block_id = 0; block_id <= XLogRecMaxBlockId(record); block_id++)
14211434
{
14221435
Buffer buf;
14231436
Page page;
@@ -4383,6 +4396,7 @@ ReadRecord(XLogReaderState *xlogreader, int emode,
43834396

43844397
ReadRecPtr = xlogreader->ReadRecPtr;
43854398
EndRecPtr = xlogreader->EndRecPtr;
4399+
43864400
if (record == NULL)
43874401
{
43884402
if (readFile >= 0)
@@ -10300,7 +10314,7 @@ xlog_redo(XLogReaderState *record)
1030010314
* XLOG_FPI and XLOG_FPI_FOR_HINT records, they use a different info
1030110315
* code just to distinguish them for statistics purposes.
1030210316
*/
10303-
for (uint8 block_id = 0; block_id <= record->max_block_id; block_id++)
10317+
for (uint8 block_id = 0; block_id <= XLogRecMaxBlockId(record); block_id++)
1030410318
{
1030510319
Buffer buffer;
1030610320

@@ -10435,7 +10449,7 @@ xlog_block_info(StringInfo buf, XLogReaderState *record)
1043510449
int block_id;
1043610450

1043710451
/* decode block references */
10438-
for (block_id = 0; block_id <= record->max_block_id; block_id++)
10452+
for (block_id = 0; block_id <= XLogRecMaxBlockId(record); block_id++)
1043910453
{
1044010454
RelFileNode rnode;
1044110455
ForkNumber forknum;
@@ -12104,7 +12118,7 @@ XLogPageRead(XLogReaderState *state,
1210412118
XLogRecPtr targetPagePtr = state->readPagePtr;
1210512119
int reqLen = state->reqLen;
1210612120
int readLen = 0;
12107-
XLogRecPtr targetRecPtr = state->ReadRecPtr;
12121+
XLogRecPtr targetRecPtr = state->DecodeRecPtr;
1210812122
uint32 targetPageOff;
1210912123
XLogSegNo targetSegNo PG_USED_FOR_ASSERTS_ONLY;
1211012124
int r;
@@ -12122,6 +12136,9 @@ XLogPageRead(XLogReaderState *state,
1212212136
/*
1212312137
* Request a restartpoint if we've replayed too much xlog since the
1212412138
* last one.
12139+
*
12140+
* XXX Why is this here? Move it to recovery loop, since it's based
12141+
* on replay position, not read position?
1212512142
*/
1212612143
if (bgwriterLaunched)
1212712144
{
@@ -12613,6 +12630,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
1261312630
* be updated on each cycle. When we are behind,
1261412631
* XLogReceiptTime will not advance, so the grace time
1261512632
* allotted to conflicting queries will decrease.
12633+
*
1261612634
*/
1261712635
if (RecPtr < flushedUpto)
1261812636
havedata = true;

0 commit comments

Comments
 (0)