@@ -176,24 +176,29 @@ ReadNextXLogRecord(XLogReaderState *xlogreader)
176
176
}
177
177
178
178
/*
179
- * Get a single WAL record info.
179
+ * Output values that make up a row describing caller's WAL record.
180
+ *
181
+ * This function leaks memory. Caller may need to use its own custom memory
182
+ * context.
183
+ *
184
+ * Keep this in sync with GetWALBlockInfo.
180
185
*/
181
186
static void
182
187
GetWALRecordInfo (XLogReaderState * record , Datum * values ,
183
188
bool * nulls , uint32 ncols )
184
189
{
185
- const char * id ;
190
+ const char * record_type ;
186
191
RmgrData desc ;
187
192
uint32 fpi_len = 0 ;
188
193
StringInfoData rec_desc ;
189
194
StringInfoData rec_blk_ref ;
190
195
int i = 0 ;
191
196
192
197
desc = GetRmgr (XLogRecGetRmid (record ));
193
- id = desc .rm_identify (XLogRecGetInfo (record ));
198
+ record_type = desc .rm_identify (XLogRecGetInfo (record ));
194
199
195
- if (id == NULL )
196
- id = psprintf ("UNKNOWN (%x)" , XLogRecGetInfo (record ) & ~XLR_INFO_MASK );
200
+ if (record_type == NULL )
201
+ record_type = psprintf ("UNKNOWN (%x)" , XLogRecGetInfo (record ) & ~XLR_INFO_MASK );
197
202
198
203
initStringInfo (& rec_desc );
199
204
desc .rm_desc (& rec_desc , record );
@@ -209,7 +214,7 @@ GetWALRecordInfo(XLogReaderState *record, Datum *values,
209
214
values [i ++ ] = LSNGetDatum (XLogRecGetPrev (record ));
210
215
values [i ++ ] = TransactionIdGetDatum (XLogRecGetXid (record ));
211
216
values [i ++ ] = CStringGetTextDatum (desc .rm_name );
212
- values [i ++ ] = CStringGetTextDatum (id );
217
+ values [i ++ ] = CStringGetTextDatum (record_type );
213
218
values [i ++ ] = UInt32GetDatum (XLogRecGetTotalLen (record ));
214
219
values [i ++ ] = UInt32GetDatum (XLogRecGetDataLen (record ));
215
220
values [i ++ ] = UInt32GetDatum (fpi_len );
@@ -229,24 +234,48 @@ GetWALRecordInfo(XLogReaderState *record, Datum *values,
229
234
230
235
231
236
/*
232
- * Store a set of block information from a single record (FPI and block
233
- * information).
237
+ * Output one or more rows in rsinfo tuple store, each describing a single
238
+ * block reference from caller's WAL record. (Should only be called with
239
+ * records that have block references.)
240
+ *
241
+ * This function leaks memory. Caller may need to use its own custom memory
242
+ * context.
243
+ *
244
+ * Keep this in sync with GetWALRecordInfo.
234
245
*/
235
246
static void
236
247
GetWALBlockInfo (FunctionCallInfo fcinfo , XLogReaderState * record )
237
248
{
238
- #define PG_GET_WAL_BLOCK_INFO_COLS 11
249
+ #define PG_GET_WAL_BLOCK_INFO_COLS 20
239
250
int block_id ;
240
251
ReturnSetInfo * rsinfo = (ReturnSetInfo * ) fcinfo -> resultinfo ;
252
+ RmgrData desc ;
253
+ const char * record_type ;
254
+ StringInfoData rec_desc ;
255
+
256
+ Assert (XLogRecHasAnyBlockRefs (record ));
257
+
258
+ desc = GetRmgr (XLogRecGetRmid (record ));
259
+ record_type = desc .rm_identify (XLogRecGetInfo (record ));
260
+
261
+ if (record_type == NULL )
262
+ record_type = psprintf ("UNKNOWN (%x)" ,
263
+ XLogRecGetInfo (record ) & ~XLR_INFO_MASK );
264
+
265
+ initStringInfo (& rec_desc );
266
+ desc .rm_desc (& rec_desc , record );
241
267
242
268
for (block_id = 0 ; block_id <= XLogRecMaxBlockId (record ); block_id ++ )
243
269
{
244
270
DecodedBkpBlock * blk ;
245
271
BlockNumber blkno ;
246
272
RelFileLocator rnode ;
247
- ForkNumber fork ;
273
+ ForkNumber forknum ;
248
274
Datum values [PG_GET_WAL_BLOCK_INFO_COLS ] = {0 };
249
275
bool nulls [PG_GET_WAL_BLOCK_INFO_COLS ] = {0 };
276
+ uint32 block_data_len = 0 ,
277
+ block_fpi_len = 0 ;
278
+ ArrayType * block_fpi_info = NULL ;
250
279
int i = 0 ;
251
280
252
281
if (!XLogRecHasBlockRef (record , block_id ))
@@ -255,99 +284,117 @@ GetWALBlockInfo(FunctionCallInfo fcinfo, XLogReaderState *record)
255
284
blk = XLogRecGetBlock (record , block_id );
256
285
257
286
(void ) XLogRecGetBlockTagExtended (record , block_id ,
258
- & rnode , & fork , & blkno , NULL );
287
+ & rnode , & forknum , & blkno , NULL );
259
288
289
+ /* Save block_data_len */
290
+ if (blk -> has_data )
291
+ block_data_len = blk -> data_len ;
292
+
293
+ if (blk -> has_image )
294
+ {
295
+ /* Block reference has an FPI, so prepare relevant output */
296
+ int bitcnt ;
297
+ int cnt = 0 ;
298
+ Datum * flags ;
299
+
300
+ /* Save block_fpi_len */
301
+ block_fpi_len = blk -> bimg_len ;
302
+
303
+ /* Construct and save block_fpi_info */
304
+ bitcnt = pg_popcount ((const char * ) & blk -> bimg_info ,
305
+ sizeof (uint8 ));
306
+ flags = (Datum * ) palloc0 (sizeof (Datum ) * bitcnt );
307
+ if ((blk -> bimg_info & BKPIMAGE_HAS_HOLE ) != 0 )
308
+ flags [cnt ++ ] = CStringGetTextDatum ("HAS_HOLE" );
309
+ if (blk -> apply_image )
310
+ flags [cnt ++ ] = CStringGetTextDatum ("APPLY" );
311
+ if ((blk -> bimg_info & BKPIMAGE_COMPRESS_PGLZ ) != 0 )
312
+ flags [cnt ++ ] = CStringGetTextDatum ("COMPRESS_PGLZ" );
313
+ if ((blk -> bimg_info & BKPIMAGE_COMPRESS_LZ4 ) != 0 )
314
+ flags [cnt ++ ] = CStringGetTextDatum ("COMPRESS_LZ4" );
315
+ if ((blk -> bimg_info & BKPIMAGE_COMPRESS_ZSTD ) != 0 )
316
+ flags [cnt ++ ] = CStringGetTextDatum ("COMPRESS_ZSTD" );
317
+
318
+ Assert (cnt <= bitcnt );
319
+ block_fpi_info = construct_array_builtin (flags , cnt , TEXTOID );
320
+ }
321
+
322
+ /* start_lsn, end_lsn, prev_lsn, and blockid outputs */
260
323
values [i ++ ] = LSNGetDatum (record -> ReadRecPtr );
324
+ values [i ++ ] = LSNGetDatum (record -> EndRecPtr );
325
+ values [i ++ ] = LSNGetDatum (XLogRecGetPrev (record ));
261
326
values [i ++ ] = Int16GetDatum (block_id );
327
+
328
+ /* relfile and block related outputs */
262
329
values [i ++ ] = ObjectIdGetDatum (blk -> rlocator .spcOid );
263
330
values [i ++ ] = ObjectIdGetDatum (blk -> rlocator .dbOid );
264
331
values [i ++ ] = ObjectIdGetDatum (blk -> rlocator .relNumber );
332
+ values [i ++ ] = Int16GetDatum (forknum );
265
333
values [i ++ ] = Int64GetDatum ((int64 ) blkno );
266
334
267
- if (fork >= 0 && fork <= MAX_FORKNUM )
268
- values [i ++ ] = CStringGetTextDatum (forkNames [fork ]);
335
+ /* xid, resource_manager, and record_type outputs */
336
+ values [i ++ ] = TransactionIdGetDatum (XLogRecGetXid (record ));
337
+ values [i ++ ] = CStringGetTextDatum (desc .rm_name );
338
+ values [i ++ ] = CStringGetTextDatum (record_type );
339
+
340
+ /*
341
+ * record_length, main_data_length, block_data_len, and
342
+ * block_fpi_length outputs
343
+ */
344
+ values [i ++ ] = UInt32GetDatum (XLogRecGetTotalLen (record ));
345
+ values [i ++ ] = UInt32GetDatum (XLogRecGetDataLen (record ));
346
+ values [i ++ ] = UInt32GetDatum (block_data_len );
347
+ values [i ++ ] = UInt32GetDatum (block_fpi_len );
348
+
349
+ /* block_fpi_info (text array) output */
350
+ if (block_fpi_info )
351
+ values [i ++ ] = PointerGetDatum (block_fpi_info );
269
352
else
270
- ereport (ERROR ,
271
- (errcode (ERRCODE_INTERNAL_ERROR ),
272
- errmsg_internal ("invalid fork number: %u" , fork )));
353
+ nulls [i ++ ] = true;
273
354
274
- /* Block data */
355
+ /* description output (describes WAL record) */
356
+ if (rec_desc .len > 0 )
357
+ values [i ++ ] = CStringGetTextDatum (rec_desc .data );
358
+ else
359
+ nulls [i ++ ] = true;
360
+
361
+ /* block_data output */
275
362
if (blk -> has_data )
276
363
{
277
- bytea * raw_data ;
364
+ bytea * block_data ;
278
365
279
- /* Initialize bytea buffer to copy the data to */
280
- raw_data = (bytea * ) palloc (blk -> data_len + VARHDRSZ );
281
- SET_VARSIZE (raw_data , blk -> data_len + VARHDRSZ );
282
-
283
- /* Copy the data */
284
- memcpy (VARDATA (raw_data ), blk -> data , blk -> data_len );
285
- values [i ++ ] = PointerGetDatum (raw_data );
366
+ block_data = (bytea * ) palloc (block_data_len + VARHDRSZ );
367
+ SET_VARSIZE (block_data , block_data_len + VARHDRSZ );
368
+ memcpy (VARDATA (block_data ), blk -> data , block_data_len );
369
+ values [i ++ ] = PointerGetDatum (block_data );
286
370
}
287
371
else
288
- {
289
- /* No data, so set this field to NULL */
290
372
nulls [i ++ ] = true;
291
- }
292
373
374
+ /* block_fpi_data output */
293
375
if (blk -> has_image )
294
376
{
295
377
PGAlignedBlock buf ;
296
378
Page page ;
297
- bytea * raw_page ;
298
- int bitcnt ;
299
- int cnt = 0 ;
300
- Datum * flags ;
301
- ArrayType * a ;
379
+ bytea * block_fpi_data ;
302
380
303
381
page = (Page ) buf .data ;
304
-
305
- /* Full page image exists, so let's save it */
306
382
if (!RestoreBlockImage (record , block_id , page ))
307
383
ereport (ERROR ,
308
384
(errcode (ERRCODE_INTERNAL_ERROR ),
309
385
errmsg_internal ("%s" , record -> errormsg_buf )));
310
386
311
- /* Initialize bytea buffer to copy the FPI to */
312
- raw_page = (bytea * ) palloc (BLCKSZ + VARHDRSZ );
313
- SET_VARSIZE (raw_page , BLCKSZ + VARHDRSZ );
314
-
315
- /* Take a verbatim copy of the FPI */
316
- memcpy (VARDATA (raw_page ), page , BLCKSZ );
317
-
318
- values [i ++ ] = PointerGetDatum (raw_page );
319
- values [i ++ ] = UInt32GetDatum (blk -> bimg_len );
320
-
321
- /* FPI flags */
322
- bitcnt = pg_popcount ((const char * ) & blk -> bimg_info ,
323
- sizeof (uint8 ));
324
- /* Build set of raw flags */
325
- flags = (Datum * ) palloc0 (sizeof (Datum ) * bitcnt );
326
-
327
- if ((blk -> bimg_info & BKPIMAGE_HAS_HOLE ) != 0 )
328
- flags [cnt ++ ] = CStringGetTextDatum ("HAS_HOLE" );
329
- if (blk -> apply_image )
330
- flags [cnt ++ ] = CStringGetTextDatum ("APPLY" );
331
- if ((blk -> bimg_info & BKPIMAGE_COMPRESS_PGLZ ) != 0 )
332
- flags [cnt ++ ] = CStringGetTextDatum ("COMPRESS_PGLZ" );
333
- if ((blk -> bimg_info & BKPIMAGE_COMPRESS_LZ4 ) != 0 )
334
- flags [cnt ++ ] = CStringGetTextDatum ("COMPRESS_LZ4" );
335
- if ((blk -> bimg_info & BKPIMAGE_COMPRESS_ZSTD ) != 0 )
336
- flags [cnt ++ ] = CStringGetTextDatum ("COMPRESS_ZSTD" );
337
-
338
- Assert (cnt <= bitcnt );
339
- a = construct_array_builtin (flags , cnt , TEXTOID );
340
- values [i ++ ] = PointerGetDatum (a );
387
+ block_fpi_data = (bytea * ) palloc (BLCKSZ + VARHDRSZ );
388
+ SET_VARSIZE (block_fpi_data , BLCKSZ + VARHDRSZ );
389
+ memcpy (VARDATA (block_fpi_data ), page , BLCKSZ );
390
+ values [i ++ ] = PointerGetDatum (block_fpi_data );
341
391
}
342
392
else
343
- {
344
- /* No full page image, so store NULLs for all its fields */
345
- memset (& nulls [i ], true, 3 * sizeof (bool ));
346
- i += 3 ;
347
- }
393
+ nulls [i ++ ] = true;
348
394
349
395
Assert (i == PG_GET_WAL_BLOCK_INFO_COLS );
350
396
397
+ /* Store a tuple for this block reference */
351
398
tuplestore_putvalues (rsinfo -> setResult , rsinfo -> setDesc ,
352
399
values , nulls );
353
400
}
@@ -356,11 +403,7 @@ GetWALBlockInfo(FunctionCallInfo fcinfo, XLogReaderState *record)
356
403
}
357
404
358
405
/*
359
- * Get information about all the blocks saved in WAL records between start
360
- * and end LSNs. This produces information about the full page images with
361
- * their relation information, and the data saved in each block associated
362
- * to a record. Decompression is applied to the full page images, if
363
- * necessary.
406
+ * Get WAL record info, unnested by block reference
364
407
*/
365
408
Datum
366
409
pg_get_wal_block_info (PG_FUNCTION_ARGS )
@@ -484,7 +527,7 @@ ValidateInputLSNs(XLogRecPtr start_lsn, XLogRecPtr *end_lsn)
484
527
}
485
528
486
529
/*
487
- * Get info and data of all WAL records between start LSN and end LSN.
530
+ * Get info of all WAL records between start LSN and end LSN.
488
531
*/
489
532
static void
490
533
GetWALRecordsInfo (FunctionCallInfo fcinfo , XLogRecPtr start_lsn ,
@@ -536,7 +579,7 @@ GetWALRecordsInfo(FunctionCallInfo fcinfo, XLogRecPtr start_lsn,
536
579
}
537
580
538
581
/*
539
- * Get info and data of all WAL records between start LSN and end LSN.
582
+ * Get info of all WAL records between start LSN and end LSN.
540
583
*/
541
584
Datum
542
585
pg_get_wal_records_info (PG_FUNCTION_ARGS )
0 commit comments