Skip to content

Commit 0da92dc

Browse files
committed
Logical decoding of sequences
This extends the logical decoding to also decode sequence increments. We differentiate between sequences created in the current (in-progress) transaction, and sequences created earlier. This mixed behavior is necessary because while sequences are not transactional (increments are not subject to ROLLBACK), relfilenode changes are. So we do this: * Changes for sequences created in the same top-level transaction are treated as transactional, i.e. just like any other change from that transaction, and discarded in case of a rollback. * Changes for sequences created earlier are applied immediately, as if performed outside any transaction. This applies also after ALTER SEQUENCE, which may create a new relfilenode. Moreover, if we ever get support for DDL replication, the sequence won't exist until the transaction gets applied. Sequences created in the current transaction are tracked in a simple hash table, identified by a relfilenode. That means a sequence may already exist, but if a transaction does ALTER SEQUENCE then the increments for the new relfilenode will be treated as transactional. For each relfilenode we track the XID of (sub)transaction that created it, which is needed for cleanup at transaction end. We don't need to check the XID to decide if an increment is transactional - if we find a match in the hash table, it has to be the same transaction. This requires two minor changes to WAL-logging. Firstly, we need to ensure the sequence record has a valid XID - until now the the increment might have XID 0 if it was the first change in a subxact. But the sequence might have been created in the same top-level transaction. So we ensure the XID is assigned when WAL-logging increments. The other change is addition of "created" flag, marking increments for newly created relfilenodes. This makes it easier to maintain the hash table of sequences that need transactional handling. Note: This is needed because of subxacts. A XID 0 might still have the sequence created in a different subxact of the same top-level xact. This does not include any changes to test_decoding and/or the built-in replication - those will be committed in separate patches. A patch adding decoding of sequences was originally submitted by Cary Huang. This commit reworks various important aspects (e.g. the WAL logging and transactional/non-transactional handling). However, the original patch and reviews were very useful. Author: Tomas Vondra, Cary Huang Reviewed-by: Peter Eisentraut, Hannu Krosing, Andres Freund Discussion: https://postgr.es/m/d045f3c2-6cfb-06d3-5540-e63c320df8bc@enterprisedb.com Discussion: https://postgr.es/m/1710ed7e13b.cd7177461430746.3372264562543607781@highgo.ca
1 parent 0d4513b commit 0da92dc

File tree

10 files changed

+786
-7
lines changed

10 files changed

+786
-7
lines changed

doc/src/sgml/logicaldecoding.sgml

Lines changed: 60 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -458,6 +458,7 @@ typedef struct OutputPluginCallbacks
458458
LogicalDecodeTruncateCB truncate_cb;
459459
LogicalDecodeCommitCB commit_cb;
460460
LogicalDecodeMessageCB message_cb;
461+
LogicalDecodeSequenceCB sequence_cb;
461462
LogicalDecodeFilterByOriginCB filter_by_origin_cb;
462463
LogicalDecodeShutdownCB shutdown_cb;
463464
LogicalDecodeFilterPrepareCB filter_prepare_cb;
@@ -472,6 +473,7 @@ typedef struct OutputPluginCallbacks
472473
LogicalDecodeStreamCommitCB stream_commit_cb;
473474
LogicalDecodeStreamChangeCB stream_change_cb;
474475
LogicalDecodeStreamMessageCB stream_message_cb;
476+
LogicalDecodeStreamSequenceCB stream_sequence_cb;
475477
LogicalDecodeStreamTruncateCB stream_truncate_cb;
476478
} OutputPluginCallbacks;
477479

@@ -481,9 +483,11 @@ typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);
481483
and <function>commit_cb</function> callbacks are required,
482484
while <function>startup_cb</function>,
483485
<function>filter_by_origin_cb</function>, <function>truncate_cb</function>,
484-
and <function>shutdown_cb</function> are optional.
485-
If <function>truncate_cb</function> is not set but a
486+
<function>sequence_cb</function>, and <function>shutdown_cb</function> are
487+
optional. If <function>truncate_cb</function> is not set but a
486488
<command>TRUNCATE</command> is to be decoded, the action will be ignored.
489+
Similarly, if <function>sequence_cb</function> is not set and a sequence
490+
change is to be decoded, the action will be ignored.
487491
</para>
488492

489493
<para>
@@ -492,7 +496,8 @@ typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);
492496
<function>stream_stop_cb</function>, <function>stream_abort_cb</function>,
493497
<function>stream_commit_cb</function>, <function>stream_change_cb</function>,
494498
and <function>stream_prepare_cb</function>
495-
are required, while <function>stream_message_cb</function> and
499+
are required, while <function>stream_message_cb</function>,
500+
<function>stream_sequence_cb</function>, and
496501
<function>stream_truncate_cb</function> are optional.
497502
</para>
498503

@@ -808,6 +813,35 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
808813
</para>
809814
</sect3>
810815

816+
<sect3 id="logicaldecoding-output-plugin-sequence">
817+
<title>Sequence Callback</title>
818+
819+
<para>
820+
The optional <function>sequence_cb</function> callback is called for
821+
actions that update a sequence value.
822+
<programlisting>
823+
typedef void (*LogicalDecodeSequenceCB) (struct LogicalDecodingContext *ctx,
824+
ReorderBufferTXN *txn,
825+
XLogRecPtr sequence_lsn,
826+
Relation rel,
827+
bool transactional,
828+
int64 last_value,
829+
int64 log_cnt,
830+
bool is_called);
831+
</programlisting>
832+
The <parameter>txn</parameter> parameter contains meta information about
833+
the transaction the sequence change is part of. Note however that for
834+
non-transactional increments, the transaction may be either NULL or not
835+
NULL, depending on if the transaction already has XID assigned.
836+
The <parameter>sequence_lsn</parameter> has WAL location of the sequence
837+
update. The <parameter>transactional</parameter> says if the sequence has
838+
to be replayed as part of the transaction or directly.
839+
840+
The <parameter>last_value</parameter>, <parameter>log_cnt</parameter> and
841+
<parameter>is_called</parameter> parameters describe the sequence change.
842+
</para>
843+
</sect3>
844+
811845
<sect3 id="logicaldecoding-output-plugin-filter-prepare">
812846
<title>Prepare Filter Callback</title>
813847

@@ -1017,6 +1051,26 @@ typedef void (*LogicalDecodeStreamMessageCB) (struct LogicalDecodingContext *ctx
10171051
</para>
10181052
</sect3>
10191053

1054+
<sect3 id="logicaldecoding-output-plugin-stream-sequence">
1055+
<title>Stream Sequence Callback</title>
1056+
<para>
1057+
The optional <function>stream_sequence_cb</function> callback is called
1058+
for actions that change a sequence in a block of streamed changes
1059+
(demarcated by <function>stream_start_cb</function> and
1060+
<function>stream_stop_cb</function> calls).
1061+
<programlisting>
1062+
typedef void (*LogicalDecodeStreamSequenceCB) (struct LogicalDecodingContext *ctx,
1063+
ReorderBufferTXN *txn,
1064+
XLogRecPtr sequence_lsn,
1065+
Relation rel,
1066+
bool transactional,
1067+
int64 last_value,
1068+
int64 log_cnt,
1069+
bool is_called);
1070+
</programlisting>
1071+
</para>
1072+
</sect3>
1073+
10201074
<sect3 id="logicaldecoding-output-plugin-stream-truncate">
10211075
<title>Stream Truncate Callback</title>
10221076
<para>
@@ -1197,8 +1251,9 @@ OutputPluginWrite(ctx, true);
11971251
in-progress transactions. There are multiple required streaming callbacks
11981252
(<function>stream_start_cb</function>, <function>stream_stop_cb</function>,
11991253
<function>stream_abort_cb</function>, <function>stream_commit_cb</function>
1200-
and <function>stream_change_cb</function>) and two optional callbacks
1201-
(<function>stream_message_cb</function> and <function>stream_truncate_cb</function>).
1254+
and <function>stream_change_cb</function>) and multiple optional callbacks
1255+
(<function>stream_message_cb</function>, <function>stream_sequence_cb</function>,
1256+
and <function>stream_truncate_cb</function>).
12021257
Also, if streaming of two-phase commands is to be supported, then additional
12031258
callbacks must be provided. (See <xref linkend="logicaldecoding-two-phase-commits"/>
12041259
for details).

src/backend/commands/sequence.c

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -378,8 +378,13 @@ fill_seq_with_data(Relation rel, HeapTuple tuple)
378378

379379
/* check the comment above nextval_internal()'s equivalent call. */
380380
if (RelationNeedsWAL(rel))
381+
{
381382
GetTopTransactionId();
382383

384+
if (XLogLogicalInfoActive())
385+
GetCurrentTransactionId();
386+
}
387+
383388
START_CRIT_SECTION();
384389

385390
MarkBufferDirty(buf);
@@ -399,6 +404,7 @@ fill_seq_with_data(Relation rel, HeapTuple tuple)
399404
XLogRegisterBuffer(0, buf, REGBUF_WILL_INIT);
400405

401406
xlrec.node = rel->rd_node;
407+
xlrec.created = true;
402408

403409
XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec));
404410
XLogRegisterData((char *) tuple->t_data, tuple->t_len);
@@ -764,10 +770,28 @@ nextval_internal(Oid relid, bool check_permissions)
764770
* It's sufficient to ensure the toplevel transaction has an xid, no need
765771
* to assign xids subxacts, that'll already trigger an appropriate wait.
766772
* (Have to do that here, so we're outside the critical section)
773+
*
774+
* We have to ensure we have a proper XID, which will be included in
775+
* the XLOG record by XLogRecordAssemble. Otherwise the first nextval()
776+
* in a subxact (without any preceding changes) would get XID 0, and it
777+
* would then be impossible to decide which top xact it belongs to.
778+
* It'd also trigger assert in DecodeSequence. We only do that with
779+
* wal_level=logical, though.
780+
*
781+
* XXX This might seem unnecessary, because if there's no XID the xact
782+
* couldn't have done anything important yet, e.g. it could not have
783+
* created a sequence. But that's incorrect, because of subxacts. The
784+
* current subtransaction might not have done anything yet (thus no XID),
785+
* but an earlier one might have created the sequence.
767786
*/
768787
if (logit && RelationNeedsWAL(seqrel))
788+
{
769789
GetTopTransactionId();
770790

791+
if (XLogLogicalInfoActive())
792+
GetCurrentTransactionId();
793+
}
794+
771795
/* ready to change the on-disk (or really, in-buffer) tuple */
772796
START_CRIT_SECTION();
773797

@@ -803,6 +827,7 @@ nextval_internal(Oid relid, bool check_permissions)
803827
seq->log_cnt = 0;
804828

805829
xlrec.node = seqrel->rd_node;
830+
xlrec.created = false;
806831

807832
XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec));
808833
XLogRegisterData((char *) seqdatatuple.t_data, seqdatatuple.t_len);
@@ -977,8 +1002,13 @@ do_setval(Oid relid, int64 next, bool iscalled)
9771002

9781003
/* check the comment above nextval_internal()'s equivalent call. */
9791004
if (RelationNeedsWAL(seqrel))
1005+
{
9801006
GetTopTransactionId();
9811007

1008+
if (XLogLogicalInfoActive())
1009+
GetCurrentTransactionId();
1010+
}
1011+
9821012
/* ready to change the on-disk (or really, in-buffer) tuple */
9831013
START_CRIT_SECTION();
9841014

@@ -999,6 +1029,8 @@ do_setval(Oid relid, int64 next, bool iscalled)
9991029
XLogRegisterBuffer(0, buf, REGBUF_WILL_INIT);
10001030

10011031
xlrec.node = seqrel->rd_node;
1032+
xlrec.created = false;
1033+
10021034
XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec));
10031035
XLogRegisterData((char *) seqdatatuple.t_data, seqdatatuple.t_len);
10041036

src/backend/replication/logical/decode.c

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
#include "replication/reorderbuffer.h"
4343
#include "replication/snapbuild.h"
4444
#include "storage/standby.h"
45+
#include "commands/sequence.h"
4546

4647
/* individual record(group)'s handlers */
4748
static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
@@ -63,6 +64,7 @@ static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
6364

6465
/* common function to decode tuples */
6566
static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
67+
static void DecodeSeqTuple(char *data, Size len, ReorderBufferTupleBuf *tuple);
6668

6769
/* helper functions for decoding transactions */
6870
static inline bool FilterPrepare(LogicalDecodingContext *ctx,
@@ -1250,3 +1252,130 @@ DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
12501252
(txn_dbid != InvalidOid && txn_dbid != ctx->slot->data.database) ||
12511253
ctx->fast_forward || FilterByOrigin(ctx, origin_id));
12521254
}
1255+
1256+
/*
1257+
* DecodeSeqTuple
1258+
* decode tuple describing the sequence increment
1259+
*
1260+
* Sequences are represented as a table with a single row, which gets updated
1261+
* by nextval(). The tuple is stored in WAL right after the xl_seq_rec, so we
1262+
* simply copy it into the tuplebuf (similar to seq_redo).
1263+
*/
1264+
static void
1265+
DecodeSeqTuple(char *data, Size len, ReorderBufferTupleBuf *tuple)
1266+
{
1267+
int datalen = len - sizeof(xl_seq_rec) - SizeofHeapTupleHeader;
1268+
1269+
Assert(datalen >= 0);
1270+
1271+
tuple->tuple.t_len = datalen + SizeofHeapTupleHeader;
1272+
1273+
ItemPointerSetInvalid(&tuple->tuple.t_self);
1274+
1275+
tuple->tuple.t_tableOid = InvalidOid;
1276+
1277+
memcpy(((char *) tuple->tuple.t_data),
1278+
data + sizeof(xl_seq_rec),
1279+
SizeofHeapTupleHeader);
1280+
1281+
memcpy(((char *) tuple->tuple.t_data) + SizeofHeapTupleHeader,
1282+
data + sizeof(xl_seq_rec) + SizeofHeapTupleHeader,
1283+
datalen);
1284+
}
1285+
1286+
/*
1287+
* Handle sequence decode
1288+
*
1289+
* Decoding sequences is a bit tricky, because while most sequence actions
1290+
* are non-transactional (not subject to rollback), some need to be handled
1291+
* as transactional.
1292+
*
1293+
* By default, a sequence increment is non-transactional - we must not queue
1294+
* it in a transaction as other changes, because the transaction might get
1295+
* rolled back and we'd discard the increment. The downstream would not be
1296+
* notified about the increment, which is wrong.
1297+
*
1298+
* On the other hand, the sequence may be created in a transaction. In this
1299+
* case we *should* queue the change as other changes in the transaction,
1300+
* because we don't want to send the increments for unknown sequence to the
1301+
* plugin - it might get confused about which sequence it's related to etc.
1302+
*/
1303+
void
1304+
sequence_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
1305+
{
1306+
SnapBuild *builder = ctx->snapshot_builder;
1307+
ReorderBufferTupleBuf *tuplebuf;
1308+
RelFileNode target_node;
1309+
XLogReaderState *r = buf->record;
1310+
char *tupledata = NULL;
1311+
Size tuplelen;
1312+
Size datalen = 0;
1313+
TransactionId xid = XLogRecGetXid(r);
1314+
uint8 info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK;
1315+
xl_seq_rec *xlrec;
1316+
Snapshot snapshot;
1317+
RepOriginId origin_id = XLogRecGetOrigin(r);
1318+
bool transactional;
1319+
1320+
/* only decode changes flagged with XLOG_SEQ_LOG */
1321+
if (info != XLOG_SEQ_LOG)
1322+
elog(ERROR, "unexpected RM_SEQ_ID record type: %u", info);
1323+
1324+
/*
1325+
* If we don't have snapshot or we are just fast-forwarding, there is no
1326+
* point in decoding messages.
1327+
*/
1328+
if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
1329+
ctx->fast_forward)
1330+
return;
1331+
1332+
/* only interested in our database */
1333+
XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL);
1334+
if (target_node.dbNode != ctx->slot->data.database)
1335+
return;
1336+
1337+
/* output plugin doesn't look for this origin, no need to queue */
1338+
if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
1339+
return;
1340+
1341+
tupledata = XLogRecGetData(r);
1342+
datalen = XLogRecGetDataLen(r);
1343+
tuplelen = datalen - SizeOfHeapHeader - sizeof(xl_seq_rec);
1344+
1345+
/* extract the WAL record, with "created" flag */
1346+
xlrec = (xl_seq_rec *) XLogRecGetData(r);
1347+
1348+
/* XXX how could we have sequence change without data? */
1349+
if(!datalen || !tupledata)
1350+
return;
1351+
1352+
tuplebuf = ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
1353+
DecodeSeqTuple(tupledata, datalen, tuplebuf);
1354+
1355+
/*
1356+
* Should we handle the sequence increment as transactional or not?
1357+
*
1358+
* If the sequence was created in a still-running transaction, treat
1359+
* it as transactional and queue the increments. Otherwise it needs
1360+
* to be treated as non-transactional, in which case we send it to
1361+
* the plugin right away.
1362+
*/
1363+
transactional = ReorderBufferSequenceIsTransactional(ctx->reorder,
1364+
target_node,
1365+
xlrec->created);
1366+
1367+
/* Skip the change if already processed (per the snapshot). */
1368+
if (transactional &&
1369+
!SnapBuildProcessChange(builder, xid, buf->origptr))
1370+
return;
1371+
else if (!transactional &&
1372+
(SnapBuildCurrentState(builder) != SNAPBUILD_CONSISTENT ||
1373+
SnapBuildXactNeedsSkip(builder, buf->origptr)))
1374+
return;
1375+
1376+
/* Queue the increment (or send immediately if not transactional). */
1377+
snapshot = SnapBuildGetOrBuildSnapshot(builder, xid);
1378+
ReorderBufferQueueSequence(ctx->reorder, xid, snapshot, buf->endptr,
1379+
origin_id, target_node, transactional,
1380+
xlrec->created, tuplebuf);
1381+
}

0 commit comments

Comments
 (0)