Skip to content

Commit 6af1793

Browse files
committed
Add info in WAL records in preparation for logical slot conflict handling
This commit only implements one prerequisite part for allowing logical decoding. The commit message contains an explanation of the overall design, which later commits will refer back to. Overall design: 1. We want to enable logical decoding on standbys, but replay of WAL from the primary might remove data that is needed by logical decoding, causing error(s) on the standby. To prevent those errors, a new replication conflict scenario needs to be addressed (as much as hot standby does). 2. Our chosen strategy for dealing with this type of replication slot is to invalidate logical slots for which needed data has been removed. 3. To do this we need the latestRemovedXid for each change, just as we do for physical replication conflicts, but we also need to know whether any particular change was to data that logical replication might access. That way, during WAL replay, we know when there is a risk of conflict and, if so, if there is a conflict. 4. We can't rely on the standby's relcache entries for this purpose in any way, because the startup process can't access catalog contents. 5. Therefore every WAL record that potentially removes data from the index or heap must carry a flag indicating whether or not it is one that might be accessed during logical decoding. Why do we need this for logical decoding on standby? First, let's forget about logical decoding on standby and recall that on a primary database, any catalog rows that may be needed by a logical decoding replication slot are not removed. This is done thanks to the catalog_xmin associated with the logical replication slot. But, with logical decoding on standby, in the following cases: - hot_standby_feedback is off - hot_standby_feedback is on but there is no a physical slot between the primary and the standby. Then, hot_standby_feedback will work, but only while the connection is alive (for example a node restart would break it) Then, the primary may delete system catalog rows that could be needed by the logical decoding on the standby (as it does not know about the catalog_xmin on the standby). So, it’s mandatory to identify those rows and invalidate the slots that may need them if any. Identifying those rows is the purpose of this commit. Implementation: When a WAL replay on standby indicates that a catalog table tuple is to be deleted by an xid that is greater than a logical slot's catalog_xmin, then that means the slot's catalog_xmin conflicts with the xid, and we need to handle the conflict. While subsequent commits will do the actual conflict handling, this commit adds a new field isCatalogRel in such WAL records (and a new bit set in the xl_heap_visible flags field), that is true for catalog tables, so as to arrange for conflict handling. The affected WAL records are the ones that already contain the snapshotConflictHorizon field, namely: - gistxlogDelete - gistxlogPageReuse - xl_hash_vacuum_one_page - xl_heap_prune - xl_heap_freeze_page - xl_heap_visible - xl_btree_reuse_page - xl_btree_delete - spgxlogVacuumRedirect Due to this new field being added, xl_hash_vacuum_one_page and gistxlogDelete do now contain the offsets to be deleted as a FLEXIBLE_ARRAY_MEMBER. This is needed to ensure correct alignment. It's not needed on the others struct where isCatalogRel has been added. This commit just introduces the WAL format changes mentioned above. Handling the actual conflicts will follow in future commits. Bumps XLOG_PAGE_MAGIC as the several WAL records are changed. Author: "Drouvot, Bertrand" <bertranddrouvot.pg@gmail.com> Author: Andres Freund <andres@anarazel.de> (in an older version) Author: Amit Khandekar <amitdkhan.pg@gmail.com> (in an older version) Reviewed-by: "Drouvot, Bertrand" <bertranddrouvot.pg@gmail.com> Reviewed-by: Andres Freund <andres@anarazel.de> Reviewed-by: Robert Haas <robertmhaas@gmail.com> Reviewed-by: Fabrízio de Royes Mello <fabriziomello@gmail.com> Reviewed-by: Melanie Plageman <melanieplageman@gmail.com>
1 parent ab73291 commit 6af1793

File tree

15 files changed

+61
-29
lines changed

15 files changed

+61
-29
lines changed

src/backend/access/gist/gistxlog.c

+4-8
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,7 @@ gistRedoDeleteRecord(XLogReaderState *record)
177177
gistxlogDelete *xldata = (gistxlogDelete *) XLogRecGetData(record);
178178
Buffer buffer;
179179
Page page;
180+
OffsetNumber *toDelete = xldata->offsets;
180181

181182
/*
182183
* If we have any conflict processing to do, it must happen before we
@@ -203,14 +204,7 @@ gistRedoDeleteRecord(XLogReaderState *record)
203204
{
204205
page = (Page) BufferGetPage(buffer);
205206

206-
if (XLogRecGetDataLen(record) > SizeOfGistxlogDelete)
207-
{
208-
OffsetNumber *todelete;
209-
210-
todelete = (OffsetNumber *) ((char *) xldata + SizeOfGistxlogDelete);
211-
212-
PageIndexMultiDelete(page, todelete, xldata->ntodelete);
213-
}
207+
PageIndexMultiDelete(page, toDelete, xldata->ntodelete);
214208

215209
GistClearPageHasGarbage(page);
216210
GistMarkTuplesDeleted(page);
@@ -609,6 +603,7 @@ gistXLogPageReuse(Relation rel, Relation heaprel,
609603
*/
610604

611605
/* XLOG stuff */
606+
xlrec_reuse.isCatalogRel = RelationIsAccessibleInLogicalDecoding(heaprel);
612607
xlrec_reuse.locator = rel->rd_locator;
613608
xlrec_reuse.block = blkno;
614609
xlrec_reuse.snapshotConflictHorizon = deleteXid;
@@ -678,6 +673,7 @@ gistXLogDelete(Buffer buffer, OffsetNumber *todelete, int ntodelete,
678673
gistxlogDelete xlrec;
679674
XLogRecPtr recptr;
680675

676+
xlrec.isCatalogRel = RelationIsAccessibleInLogicalDecoding(heaprel);
681677
xlrec.snapshotConflictHorizon = snapshotConflictHorizon;
682678
xlrec.ntodelete = ntodelete;
683679

src/backend/access/hash/hash_xlog.c

+3-8
Original file line numberDiff line numberDiff line change
@@ -980,8 +980,10 @@ hash_xlog_vacuum_one_page(XLogReaderState *record)
980980
Page page;
981981
XLogRedoAction action;
982982
HashPageOpaque pageopaque;
983+
OffsetNumber *toDelete;
983984

984985
xldata = (xl_hash_vacuum_one_page *) XLogRecGetData(record);
986+
toDelete = xldata->offsets;
985987

986988
/*
987989
* If we have any conflict processing to do, it must happen before we
@@ -1010,14 +1012,7 @@ hash_xlog_vacuum_one_page(XLogReaderState *record)
10101012
{
10111013
page = (Page) BufferGetPage(buffer);
10121014

1013-
if (XLogRecGetDataLen(record) > SizeOfHashVacuumOnePage)
1014-
{
1015-
OffsetNumber *unused;
1016-
1017-
unused = (OffsetNumber *) ((char *) xldata + SizeOfHashVacuumOnePage);
1018-
1019-
PageIndexMultiDelete(page, unused, xldata->ntuples);
1020-
}
1015+
PageIndexMultiDelete(page, toDelete, xldata->ntuples);
10211016

10221017
/*
10231018
* Mark the page as not containing any LP_DEAD items. See comments in

src/backend/access/hash/hashinsert.c

+1
Original file line numberDiff line numberDiff line change
@@ -432,6 +432,7 @@ _hash_vacuum_one_page(Relation rel, Relation hrel, Buffer metabuf, Buffer buf)
432432
xl_hash_vacuum_one_page xlrec;
433433
XLogRecPtr recptr;
434434

435+
xlrec.isCatalogRel = RelationIsAccessibleInLogicalDecoding(hrel);
435436
xlrec.snapshotConflictHorizon = snapshotConflictHorizon;
436437
xlrec.ntuples = ndeletable;
437438

src/backend/access/heap/heapam.c

+10-1
Original file line numberDiff line numberDiff line change
@@ -6698,6 +6698,7 @@ heap_freeze_execute_prepared(Relation rel, Buffer buffer,
66986698
nplans = heap_log_freeze_plan(tuples, ntuples, plans, offsets);
66996699

67006700
xlrec.snapshotConflictHorizon = snapshotConflictHorizon;
6701+
xlrec.isCatalogRel = RelationIsAccessibleInLogicalDecoding(rel);
67016702
xlrec.nplans = nplans;
67026703

67036704
XLogBeginInsert();
@@ -8280,6 +8281,8 @@ log_heap_visible(Relation rel, Buffer heap_buffer, Buffer vm_buffer,
82808281

82818282
xlrec.snapshotConflictHorizon = snapshotConflictHorizon;
82828283
xlrec.flags = vmflags;
8284+
if (RelationIsAccessibleInLogicalDecoding(rel))
8285+
xlrec.flags |= VISIBILITYMAP_XLOG_CATALOG_REL;
82838286
XLogBeginInsert();
82848287
XLogRegisterData((char *) &xlrec, SizeOfHeapVisible);
82858288

@@ -8870,6 +8873,8 @@ heap_xlog_visible(XLogReaderState *record)
88708873
BlockNumber blkno;
88718874
XLogRedoAction action;
88728875

8876+
Assert((xlrec->flags & VISIBILITYMAP_XLOG_VALID_BITS) == xlrec->flags);
8877+
88738878
XLogRecGetBlockTag(record, 1, &rlocator, NULL, &blkno);
88748879

88758880
/*
@@ -8956,11 +8961,15 @@ heap_xlog_visible(XLogReaderState *record)
89568961
{
89578962
Page vmpage = BufferGetPage(vmbuffer);
89588963
Relation reln;
8964+
uint8 vmbits;
89598965

89608966
/* initialize the page if it was read as zeros */
89618967
if (PageIsNew(vmpage))
89628968
PageInit(vmpage, BLCKSZ, 0);
89638969

8970+
/* remove VISIBILITYMAP_XLOG_* */
8971+
vmbits = xlrec->flags & VISIBILITYMAP_VALID_BITS;
8972+
89648973
/*
89658974
* XLogReadBufferForRedoExtended locked the buffer. But
89668975
* visibilitymap_set will handle locking itself.
@@ -8971,7 +8980,7 @@ heap_xlog_visible(XLogReaderState *record)
89718980
visibilitymap_pin(reln, blkno, &vmbuffer);
89728981

89738982
visibilitymap_set(reln, blkno, InvalidBuffer, lsn, vmbuffer,
8974-
xlrec->snapshotConflictHorizon, xlrec->flags);
8983+
xlrec->snapshotConflictHorizon, vmbits);
89758984

89768985
ReleaseBuffer(vmbuffer);
89778986
FreeFakeRelcacheEntry(reln);

src/backend/access/heap/pruneheap.c

+1
Original file line numberDiff line numberDiff line change
@@ -418,6 +418,7 @@ heap_page_prune(Relation relation, Buffer buffer,
418418
xl_heap_prune xlrec;
419419
XLogRecPtr recptr;
420420

421+
xlrec.isCatalogRel = RelationIsAccessibleInLogicalDecoding(relation);
421422
xlrec.snapshotConflictHorizon = prstate.snapshotConflictHorizon;
422423
xlrec.nredirected = prstate.nredirected;
423424
xlrec.ndead = prstate.ndead;

src/backend/access/nbtree/nbtpage.c

+2
Original file line numberDiff line numberDiff line change
@@ -836,6 +836,7 @@ _bt_log_reuse_page(Relation rel, Relation heaprel, BlockNumber blkno,
836836
*/
837837

838838
/* XLOG stuff */
839+
xlrec_reuse.isCatalogRel = RelationIsAccessibleInLogicalDecoding(heaprel);
839840
xlrec_reuse.locator = rel->rd_locator;
840841
xlrec_reuse.block = blkno;
841842
xlrec_reuse.snapshotConflictHorizon = safexid;
@@ -1358,6 +1359,7 @@ _bt_delitems_delete(Relation rel, Relation heaprel, Buffer buf,
13581359
XLogRecPtr recptr;
13591360
xl_btree_delete xlrec_delete;
13601361

1362+
xlrec_delete.isCatalogRel = RelationIsAccessibleInLogicalDecoding(heaprel);
13611363
xlrec_delete.snapshotConflictHorizon = snapshotConflictHorizon;
13621364
xlrec_delete.ndeleted = ndeletable;
13631365
xlrec_delete.nupdated = nupdatable;

src/backend/access/spgist/spgvacuum.c

+1
Original file line numberDiff line numberDiff line change
@@ -503,6 +503,7 @@ vacuumRedirectAndPlaceholder(Relation index, Relation heaprel, Buffer buffer)
503503
spgxlogVacuumRedirect xlrec;
504504
GlobalVisState *vistest;
505505

506+
xlrec.isCatalogRel = RelationIsAccessibleInLogicalDecoding(heaprel);
506507
xlrec.nToPlaceholder = 0;
507508
xlrec.snapshotConflictHorizon = InvalidTransactionId;
508509

src/include/access/gistxlog.h

+8-3
Original file line numberDiff line numberDiff line change
@@ -51,11 +51,14 @@ typedef struct gistxlogDelete
5151
{
5252
TransactionId snapshotConflictHorizon;
5353
uint16 ntodelete; /* number of deleted offsets */
54+
bool isCatalogRel; /* to handle recovery conflict during logical
55+
* decoding on standby */
5456

55-
/* TODELETE OFFSET NUMBER ARRAY FOLLOWS */
57+
/* TODELETE OFFSET NUMBERS */
58+
OffsetNumber offsets[FLEXIBLE_ARRAY_MEMBER];
5659
} gistxlogDelete;
5760

58-
#define SizeOfGistxlogDelete (offsetof(gistxlogDelete, ntodelete) + sizeof(uint16))
61+
#define SizeOfGistxlogDelete offsetof(gistxlogDelete, offsets)
5962

6063
/*
6164
* Backup Blk 0: If this operation completes a page split, by inserting a
@@ -98,9 +101,11 @@ typedef struct gistxlogPageReuse
98101
RelFileLocator locator;
99102
BlockNumber block;
100103
FullTransactionId snapshotConflictHorizon;
104+
bool isCatalogRel; /* to handle recovery conflict during logical
105+
* decoding on standby */
101106
} gistxlogPageReuse;
102107

103-
#define SizeOfGistxlogPageReuse (offsetof(gistxlogPageReuse, snapshotConflictHorizon) + sizeof(FullTransactionId))
108+
#define SizeOfGistxlogPageReuse (offsetof(gistxlogPageReuse, isCatalogRel) + sizeof(bool))
104109

105110
extern void gist_redo(XLogReaderState *record);
106111
extern void gist_desc(StringInfo buf, XLogReaderState *record);

src/include/access/hash_xlog.h

+6-4
Original file line numberDiff line numberDiff line change
@@ -251,13 +251,15 @@ typedef struct xl_hash_init_bitmap_page
251251
typedef struct xl_hash_vacuum_one_page
252252
{
253253
TransactionId snapshotConflictHorizon;
254-
uint16 ntuples;
254+
uint16 ntuples;
255+
bool isCatalogRel; /* to handle recovery conflict during logical
256+
* decoding on standby */
255257

256-
/* TARGET OFFSET NUMBERS FOLLOW AT THE END */
258+
/* TARGET OFFSET NUMBERS */
259+
OffsetNumber offsets[FLEXIBLE_ARRAY_MEMBER];
257260
} xl_hash_vacuum_one_page;
258261

259-
#define SizeOfHashVacuumOnePage \
260-
(offsetof(xl_hash_vacuum_one_page, ntuples) + sizeof(uint16))
262+
#define SizeOfHashVacuumOnePage offsetof(xl_hash_vacuum_one_page, offsets)
261263

262264
extern void hash_redo(XLogReaderState *record);
263265
extern void hash_desc(StringInfo buf, XLogReaderState *record);

src/include/access/heapam_xlog.h

+6-2
Original file line numberDiff line numberDiff line change
@@ -245,10 +245,12 @@ typedef struct xl_heap_prune
245245
TransactionId snapshotConflictHorizon;
246246
uint16 nredirected;
247247
uint16 ndead;
248+
bool isCatalogRel; /* to handle recovery conflict during logical
249+
* decoding on standby */
248250
/* OFFSET NUMBERS are in the block reference 0 */
249251
} xl_heap_prune;
250252

251-
#define SizeOfHeapPrune (offsetof(xl_heap_prune, ndead) + sizeof(uint16))
253+
#define SizeOfHeapPrune (offsetof(xl_heap_prune, isCatalogRel) + sizeof(bool))
252254

253255
/*
254256
* The vacuum page record is similar to the prune record, but can only mark
@@ -344,13 +346,15 @@ typedef struct xl_heap_freeze_page
344346
{
345347
TransactionId snapshotConflictHorizon;
346348
uint16 nplans;
349+
bool isCatalogRel; /* to handle recovery conflict during logical
350+
* decoding on standby */
347351

348352
/*
349353
* In payload of blk 0 : FREEZE PLANS and OFFSET NUMBER ARRAY
350354
*/
351355
} xl_heap_freeze_page;
352356

353-
#define SizeOfHeapFreezePage (offsetof(xl_heap_freeze_page, nplans) + sizeof(uint16))
357+
#define SizeOfHeapFreezePage (offsetof(xl_heap_freeze_page, isCatalogRel) + sizeof(bool))
354358

355359
/*
356360
* This is what we need to know about setting a visibility map bit

src/include/access/nbtxlog.h

+6-2
Original file line numberDiff line numberDiff line change
@@ -188,9 +188,11 @@ typedef struct xl_btree_reuse_page
188188
RelFileLocator locator;
189189
BlockNumber block;
190190
FullTransactionId snapshotConflictHorizon;
191+
bool isCatalogRel; /* to handle recovery conflict during logical
192+
* decoding on standby */
191193
} xl_btree_reuse_page;
192194

193-
#define SizeOfBtreeReusePage (sizeof(xl_btree_reuse_page))
195+
#define SizeOfBtreeReusePage (offsetof(xl_btree_reuse_page, isCatalogRel) + sizeof(bool))
194196

195197
/*
196198
* xl_btree_vacuum and xl_btree_delete records describe deletion of index
@@ -235,6 +237,8 @@ typedef struct xl_btree_delete
235237
TransactionId snapshotConflictHorizon;
236238
uint16 ndeleted;
237239
uint16 nupdated;
240+
bool isCatalogRel; /* to handle recovery conflict during logical
241+
* decoding on standby */
238242

239243
/*----
240244
* In payload of blk 0 :
@@ -245,7 +249,7 @@ typedef struct xl_btree_delete
245249
*/
246250
} xl_btree_delete;
247251

248-
#define SizeOfBtreeDelete (offsetof(xl_btree_delete, nupdated) + sizeof(uint16))
252+
#define SizeOfBtreeDelete (offsetof(xl_btree_delete, isCatalogRel) + sizeof(bool))
249253

250254
/*
251255
* The offsets that appear in xl_btree_update metadata are offsets into the

src/include/access/spgxlog.h

+2
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,8 @@ typedef struct spgxlogVacuumRedirect
240240
uint16 nToPlaceholder; /* number of redirects to make placeholders */
241241
OffsetNumber firstPlaceholder; /* first placeholder tuple to remove */
242242
TransactionId snapshotConflictHorizon; /* newest XID of removed redirects */
243+
bool isCatalogRel; /* to handle recovery conflict during logical
244+
* decoding on standby */
243245

244246
/* offsets of redirect tuples to make placeholders follow */
245247
OffsetNumber offsets[FLEXIBLE_ARRAY_MEMBER];

src/include/access/visibilitymapdefs.h

+9
Original file line numberDiff line numberDiff line change
@@ -21,5 +21,14 @@
2121
#define VISIBILITYMAP_ALL_FROZEN 0x02
2222
#define VISIBILITYMAP_VALID_BITS 0x03 /* OR of all valid visibilitymap
2323
* flags bits */
24+
/*
25+
* To detect recovery conflicts during logical decoding on a standby, we need
26+
* to know if a table is a user catalog table. For that we add an additional
27+
* bit into xl_heap_visible.flags, in addition to the above.
28+
*
29+
* NB: VISIBILITYMAP_XLOG_* may not be passed to visibilitymap_set().
30+
*/
31+
#define VISIBILITYMAP_XLOG_CATALOG_REL 0x04
32+
#define VISIBILITYMAP_XLOG_VALID_BITS (VISIBILITYMAP_VALID_BITS | VISIBILITYMAP_XLOG_CATALOG_REL)
2433

2534
#endif /* VISIBILITYMAPDEFS_H */

src/include/access/xlog_internal.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
/*
3232
* Each page of XLOG file has a header like this:
3333
*/
34-
#define XLOG_PAGE_MAGIC 0xD112 /* can be used as WAL version indicator */
34+
#define XLOG_PAGE_MAGIC 0xD113 /* can be used as WAL version indicator */
3535

3636
typedef struct XLogPageHeaderData
3737
{

src/include/utils/rel.h

+1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
#include "access/tupdesc.h"
1818
#include "access/xlog.h"
19+
#include "catalog/catalog.h"
1920
#include "catalog/pg_class.h"
2021
#include "catalog/pg_index.h"
2122
#include "catalog/pg_publication.h"

0 commit comments

Comments
 (0)