Skip to content

Commit 7a5f6b4

Browse files
committed
Make logical decoding a part of the rmgr.
Add a new rmgr method, rm_decode, and use that rather than a switch statement. In preparation for rmgr extensibility. Reviewed-by: Julien Rouhaud Discussion: https://postgr.es/m/ed1fb2e22d15d3563ae0eb610f7b61bb15999c0a.camel%40j-davis.com Discussion: https://postgr.es/m/20220118095332.6xtlcjoyxobv6cbk@jrouhaud
1 parent a3d6264 commit 7a5f6b4

File tree

8 files changed

+69
-112
lines changed

8 files changed

+69
-112
lines changed

src/backend/access/transam/rmgr.c

+3-2
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,15 @@
2424
#include "commands/dbcommands_xlog.h"
2525
#include "commands/sequence.h"
2626
#include "commands/tablespace.h"
27+
#include "replication/decode.h"
2728
#include "replication/message.h"
2829
#include "replication/origin.h"
2930
#include "storage/standby.h"
3031
#include "utils/relmapper.h"
3132

3233
/* must be kept in sync with RmgrData definition in xlog_internal.h */
33-
#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \
34-
{ name, redo, desc, identify, startup, cleanup, mask },
34+
#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \
35+
{ name, redo, desc, identify, startup, cleanup, mask, decode },
3536

3637
const RmgrData RmgrTable[RM_MAX_ID + 1] = {
3738
#include "access/rmgrlist.h"

src/backend/replication/logical/decode.c

+21-84
Original file line numberDiff line numberDiff line change
@@ -43,21 +43,6 @@
4343
#include "replication/snapbuild.h"
4444
#include "storage/standby.h"
4545

46-
typedef struct XLogRecordBuffer
47-
{
48-
XLogRecPtr origptr;
49-
XLogRecPtr endptr;
50-
XLogReaderState *record;
51-
} XLogRecordBuffer;
52-
53-
/* RMGR Handlers */
54-
static void DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
55-
static void DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
56-
static void DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
57-
static void DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
58-
static void DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
59-
static void DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
60-
6146
/* individual record(group)'s handlers */
6247
static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
6348
static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
@@ -107,6 +92,7 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
10792
{
10893
XLogRecordBuffer buf;
10994
TransactionId txid;
95+
RmgrId rmid;
11096

11197
buf.origptr = ctx->reader->ReadRecPtr;
11298
buf.endptr = ctx->reader->EndRecPtr;
@@ -127,72 +113,23 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
127113
buf.origptr);
128114
}
129115

130-
/* cast so we get a warning when new rmgrs are added */
131-
switch ((RmgrId) XLogRecGetRmid(record))
132-
{
133-
/*
134-
* Rmgrs we care about for logical decoding. Add new rmgrs in
135-
* rmgrlist.h's order.
136-
*/
137-
case RM_XLOG_ID:
138-
DecodeXLogOp(ctx, &buf);
139-
break;
140-
141-
case RM_XACT_ID:
142-
DecodeXactOp(ctx, &buf);
143-
break;
116+
rmid = XLogRecGetRmid(record);
144117

145-
case RM_STANDBY_ID:
146-
DecodeStandbyOp(ctx, &buf);
147-
break;
148-
149-
case RM_HEAP2_ID:
150-
DecodeHeap2Op(ctx, &buf);
151-
break;
152-
153-
case RM_HEAP_ID:
154-
DecodeHeapOp(ctx, &buf);
155-
break;
156-
157-
case RM_LOGICALMSG_ID:
158-
DecodeLogicalMsgOp(ctx, &buf);
159-
break;
160-
161-
/*
162-
* Rmgrs irrelevant for logical decoding; they describe stuff not
163-
* represented in logical decoding. Add new rmgrs in rmgrlist.h's
164-
* order.
165-
*/
166-
case RM_SMGR_ID:
167-
case RM_CLOG_ID:
168-
case RM_DBASE_ID:
169-
case RM_TBLSPC_ID:
170-
case RM_MULTIXACT_ID:
171-
case RM_RELMAP_ID:
172-
case RM_BTREE_ID:
173-
case RM_HASH_ID:
174-
case RM_GIN_ID:
175-
case RM_GIST_ID:
176-
case RM_SEQ_ID:
177-
case RM_SPGIST_ID:
178-
case RM_BRIN_ID:
179-
case RM_COMMIT_TS_ID:
180-
case RM_REPLORIGIN_ID:
181-
case RM_GENERIC_ID:
182-
/* just deal with xid, and done */
183-
ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(record),
184-
buf.origptr);
185-
break;
186-
case RM_NEXT_ID:
187-
elog(ERROR, "unexpected RM_NEXT_ID rmgr_id: %u", (RmgrIds) XLogRecGetRmid(buf.record));
118+
if (RmgrTable[rmid].rm_decode != NULL)
119+
RmgrTable[rmid].rm_decode(ctx, &buf);
120+
else
121+
{
122+
/* just deal with xid, and done */
123+
ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(record),
124+
buf.origptr);
188125
}
189126
}
190127

191128
/*
192129
* Handle rmgr XLOG_ID records for DecodeRecordIntoReorderBuffer().
193130
*/
194-
static void
195-
DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
131+
void
132+
xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
196133
{
197134
SnapBuild *builder = ctx->snapshot_builder;
198135
uint8 info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK;
@@ -234,8 +171,8 @@ DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
234171
/*
235172
* Handle rmgr XACT_ID records for DecodeRecordIntoReorderBuffer().
236173
*/
237-
static void
238-
DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
174+
void
175+
xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
239176
{
240177
SnapBuild *builder = ctx->snapshot_builder;
241178
ReorderBuffer *reorder = ctx->reorder;
@@ -391,8 +328,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
391328
/*
392329
* Handle rmgr STANDBY_ID records for DecodeRecordIntoReorderBuffer().
393330
*/
394-
static void
395-
DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
331+
void
332+
standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
396333
{
397334
SnapBuild *builder = ctx->snapshot_builder;
398335
XLogReaderState *r = buf->record;
@@ -437,8 +374,8 @@ DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
437374
/*
438375
* Handle rmgr HEAP2_ID records for DecodeRecordIntoReorderBuffer().
439376
*/
440-
static void
441-
DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
377+
void
378+
heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
442379
{
443380
uint8 info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK;
444381
TransactionId xid = XLogRecGetXid(buf->record);
@@ -497,8 +434,8 @@ DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
497434
/*
498435
* Handle rmgr HEAP_ID records for DecodeRecordIntoReorderBuffer().
499436
*/
500-
static void
501-
DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
437+
void
438+
heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
502439
{
503440
uint8 info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK;
504441
TransactionId xid = XLogRecGetXid(buf->record);
@@ -619,8 +556,8 @@ FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
619556
/*
620557
* Handle rmgr LOGICALMSG_ID records for DecodeRecordIntoReorderBuffer().
621558
*/
622-
static void
623-
DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
559+
void
560+
logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
624561
{
625562
SnapBuild *builder = ctx->snapshot_builder;
626563
XLogReaderState *r = buf->record;

src/bin/pg_rewind/parsexlog.c

+1-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
* RmgrNames is an array of resource manager names, to make error messages
2929
* a bit nicer.
3030
*/
31-
#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \
31+
#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \
3232
name,
3333

3434
static const char *RmgrNames[RM_MAX_ID + 1] = {

src/bin/pg_waldump/rmgrdesc.c

+1-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
#include "storage/standbydefs.h"
3333
#include "utils/relmapper.h"
3434

35-
#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \
35+
#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \
3636
{ name, desc, identify},
3737

3838
const RmgrDescData RmgrDescTable[RM_MAX_ID + 1] = {

src/include/access/rmgr.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ typedef uint8 RmgrId;
1919
* Note: RM_MAX_ID must fit in RmgrId; widening that type will affect the XLOG
2020
* file format.
2121
*/
22-
#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \
22+
#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \
2323
symname,
2424

2525
typedef enum RmgrIds

src/include/access/rmgrlist.h

+22-22
Original file line numberDiff line numberDiff line change
@@ -25,25 +25,25 @@
2525
*/
2626

2727
/* symbol name, textual name, redo, desc, identify, startup, cleanup */
28-
PG_RMGR(RM_XLOG_ID, "XLOG", xlog_redo, xlog_desc, xlog_identify, NULL, NULL, NULL)
29-
PG_RMGR(RM_XACT_ID, "Transaction", xact_redo, xact_desc, xact_identify, NULL, NULL, NULL)
30-
PG_RMGR(RM_SMGR_ID, "Storage", smgr_redo, smgr_desc, smgr_identify, NULL, NULL, NULL)
31-
PG_RMGR(RM_CLOG_ID, "CLOG", clog_redo, clog_desc, clog_identify, NULL, NULL, NULL)
32-
PG_RMGR(RM_DBASE_ID, "Database", dbase_redo, dbase_desc, dbase_identify, NULL, NULL, NULL)
33-
PG_RMGR(RM_TBLSPC_ID, "Tablespace", tblspc_redo, tblspc_desc, tblspc_identify, NULL, NULL, NULL)
34-
PG_RMGR(RM_MULTIXACT_ID, "MultiXact", multixact_redo, multixact_desc, multixact_identify, NULL, NULL, NULL)
35-
PG_RMGR(RM_RELMAP_ID, "RelMap", relmap_redo, relmap_desc, relmap_identify, NULL, NULL, NULL)
36-
PG_RMGR(RM_STANDBY_ID, "Standby", standby_redo, standby_desc, standby_identify, NULL, NULL, NULL)
37-
PG_RMGR(RM_HEAP2_ID, "Heap2", heap2_redo, heap2_desc, heap2_identify, NULL, NULL, heap_mask)
38-
PG_RMGR(RM_HEAP_ID, "Heap", heap_redo, heap_desc, heap_identify, NULL, NULL, heap_mask)
39-
PG_RMGR(RM_BTREE_ID, "Btree", btree_redo, btree_desc, btree_identify, btree_xlog_startup, btree_xlog_cleanup, btree_mask)
40-
PG_RMGR(RM_HASH_ID, "Hash", hash_redo, hash_desc, hash_identify, NULL, NULL, hash_mask)
41-
PG_RMGR(RM_GIN_ID, "Gin", gin_redo, gin_desc, gin_identify, gin_xlog_startup, gin_xlog_cleanup, gin_mask)
42-
PG_RMGR(RM_GIST_ID, "Gist", gist_redo, gist_desc, gist_identify, gist_xlog_startup, gist_xlog_cleanup, gist_mask)
43-
PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, seq_identify, NULL, NULL, seq_mask)
44-
PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_startup, spg_xlog_cleanup, spg_mask)
45-
PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL, brin_mask)
46-
PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL, NULL)
47-
PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL)
48-
PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask)
49-
PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL)
28+
PG_RMGR(RM_XLOG_ID, "XLOG", xlog_redo, xlog_desc, xlog_identify, NULL, NULL, NULL, xlog_decode)
29+
PG_RMGR(RM_XACT_ID, "Transaction", xact_redo, xact_desc, xact_identify, NULL, NULL, NULL, xact_decode)
30+
PG_RMGR(RM_SMGR_ID, "Storage", smgr_redo, smgr_desc, smgr_identify, NULL, NULL, NULL, NULL)
31+
PG_RMGR(RM_CLOG_ID, "CLOG", clog_redo, clog_desc, clog_identify, NULL, NULL, NULL, NULL)
32+
PG_RMGR(RM_DBASE_ID, "Database", dbase_redo, dbase_desc, dbase_identify, NULL, NULL, NULL, NULL)
33+
PG_RMGR(RM_TBLSPC_ID, "Tablespace", tblspc_redo, tblspc_desc, tblspc_identify, NULL, NULL, NULL, NULL)
34+
PG_RMGR(RM_MULTIXACT_ID, "MultiXact", multixact_redo, multixact_desc, multixact_identify, NULL, NULL, NULL, NULL)
35+
PG_RMGR(RM_RELMAP_ID, "RelMap", relmap_redo, relmap_desc, relmap_identify, NULL, NULL, NULL, NULL)
36+
PG_RMGR(RM_STANDBY_ID, "Standby", standby_redo, standby_desc, standby_identify, NULL, NULL, NULL, standby_decode)
37+
PG_RMGR(RM_HEAP2_ID, "Heap2", heap2_redo, heap2_desc, heap2_identify, NULL, NULL, heap_mask, heap2_decode)
38+
PG_RMGR(RM_HEAP_ID, "Heap", heap_redo, heap_desc, heap_identify, NULL, NULL, heap_mask, heap_decode)
39+
PG_RMGR(RM_BTREE_ID, "Btree", btree_redo, btree_desc, btree_identify, btree_xlog_startup, btree_xlog_cleanup, btree_mask, NULL)
40+
PG_RMGR(RM_HASH_ID, "Hash", hash_redo, hash_desc, hash_identify, NULL, NULL, hash_mask, NULL)
41+
PG_RMGR(RM_GIN_ID, "Gin", gin_redo, gin_desc, gin_identify, gin_xlog_startup, gin_xlog_cleanup, gin_mask, NULL)
42+
PG_RMGR(RM_GIST_ID, "Gist", gist_redo, gist_desc, gist_identify, gist_xlog_startup, gist_xlog_cleanup, gist_mask, NULL)
43+
PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, seq_identify, NULL, NULL, seq_mask, NULL)
44+
PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_startup, spg_xlog_cleanup, spg_mask, NULL)
45+
PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL, brin_mask, NULL)
46+
PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL, NULL, NULL)
47+
PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL, NULL)
48+
PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask, NULL)
49+
PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL, logicalmsg_decode)

src/include/access/xlog_internal.h

+5
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,9 @@ typedef enum
287287
RECOVERY_TARGET_ACTION_SHUTDOWN
288288
} RecoveryTargetAction;
289289

290+
struct LogicalDecodingContext;
291+
struct XLogRecordBuffer;
292+
290293
/*
291294
* Method table for resource managers.
292295
*
@@ -312,6 +315,8 @@ typedef struct RmgrData
312315
void (*rm_startup) (void);
313316
void (*rm_cleanup) (void);
314317
void (*rm_mask) (char *pagedata, BlockNumber blkno);
318+
void (*rm_decode) (struct LogicalDecodingContext *ctx,
319+
struct XLogRecordBuffer *buf);
315320
} RmgrData;
316321

317322
extern const RmgrData RmgrTable[];

src/include/replication/decode.h

+15-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,21 @@
1414
#include "replication/logical.h"
1515
#include "replication/reorderbuffer.h"
1616

17-
void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx,
17+
typedef struct XLogRecordBuffer
18+
{
19+
XLogRecPtr origptr;
20+
XLogRecPtr endptr;
21+
XLogReaderState *record;
22+
} XLogRecordBuffer;
23+
24+
extern void xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
25+
extern void heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
26+
extern void heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
27+
extern void xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
28+
extern void standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
29+
extern void logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
30+
31+
extern void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx,
1832
XLogReaderState *record);
1933

2034
#endif

0 commit comments

Comments
 (0)