Skip to content

Commit e709596

Browse files
author
Amit Kapila
committed
Add macros for ReorderBufferTXN toptxn.
Currently, there are quite a few places in reorderbuffer.c that tries to access top-transaction for a subtransaction. This makes the code to access top-transaction consistent and easier to follow. Author: Peter Smith Reviewed-by: Vignesh C, Sawada Masahiko Discussion: https://postgr.es/m/CAHut+PuCznOyTqBQwjRUu-ibG-=KHyCv-0FTcWQtZUdR88umfg@mail.gmail.com
1 parent eb7d043 commit e709596

File tree

4 files changed

+41
-33
lines changed

4 files changed

+41
-33
lines changed

contrib/test_decoding/test_decoding.c

+2-2
Original file line numberDiff line numberDiff line change
@@ -815,11 +815,11 @@ pg_decode_stream_abort(LogicalDecodingContext *ctx,
815815
* maintain the output_plugin_private only under the toptxn so if this is
816816
* not the toptxn then fetch the toptxn.
817817
*/
818-
ReorderBufferTXN *toptxn = txn->toptxn ? txn->toptxn : txn;
818+
ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn);
819819
TestDecodingTxnData *txndata = toptxn->output_plugin_private;
820820
bool xact_wrote_changes = txndata->xact_wrote_changes;
821821

822-
if (txn->toptxn == NULL)
822+
if (rbtxn_is_toptxn(txn))
823823
{
824824
Assert(txn->output_plugin_private != NULL);
825825
pfree(txndata);

src/backend/replication/logical/reorderbuffer.c

+18-28
Original file line numberDiff line numberDiff line change
@@ -717,10 +717,7 @@ ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
717717
return;
718718

719719
/* Get the top transaction. */
720-
if (txn->toptxn != NULL)
721-
toptxn = txn->toptxn;
722-
else
723-
toptxn = txn;
720+
toptxn = rbtxn_get_toptxn(txn);
724721

725722
/*
726723
* Indicate a partial change for toast inserts. The change will be
@@ -809,13 +806,7 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
809806
change->action == REORDER_BUFFER_CHANGE_TRUNCATE ||
810807
change->action == REORDER_BUFFER_CHANGE_MESSAGE)
811808
{
812-
ReorderBufferTXN *toptxn;
813-
814-
/* get the top transaction */
815-
if (txn->toptxn != NULL)
816-
toptxn = txn->toptxn;
817-
else
818-
toptxn = txn;
809+
ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn);
819810

820811
toptxn->txn_flags |= RBTXN_HAS_STREAMABLE_CHANGE;
821812
}
@@ -1655,9 +1646,9 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prep
16551646
/*
16561647
* Mark the transaction as streamed.
16571648
*
1658-
* The toplevel transaction, identified by (toptxn==NULL), is marked as
1659-
* streamed always, even if it does not contain any changes (that is, when
1660-
* all the changes are in subtransactions).
1649+
* The top-level transaction, is marked as streamed always, even if it
1650+
* does not contain any changes (that is, when all the changes are in
1651+
* subtransactions).
16611652
*
16621653
* For subtransactions, we only mark them as streamed when there are
16631654
* changes in them.
@@ -1667,7 +1658,7 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prep
16671658
* about the toplevel xact (we send the XID in all messages), but we never
16681659
* stream XIDs of empty subxacts.
16691660
*/
1670-
if ((!txn_prepared) && ((!txn->toptxn) || (txn->nentries_mem != 0)))
1661+
if ((!txn_prepared) && (rbtxn_is_toptxn(txn) || (txn->nentries_mem != 0)))
16711662
txn->txn_flags |= RBTXN_IS_STREAMED;
16721663

16731664
if (txn_prepared)
@@ -3207,10 +3198,7 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
32073198
* Update the total size in top level as well. This is later used to
32083199
* compute the decoding stats.
32093200
*/
3210-
if (txn->toptxn != NULL)
3211-
toptxn = txn->toptxn;
3212-
else
3213-
toptxn = txn;
3201+
toptxn = rbtxn_get_toptxn(txn);
32143202

32153203
if (addition)
32163204
{
@@ -3295,8 +3283,7 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid,
32953283
* so that we can execute them all together. See comments atop this
32963284
* function.
32973285
*/
3298-
if (txn->toptxn)
3299-
txn = txn->toptxn;
3286+
txn = rbtxn_get_toptxn(txn);
33003287

33013288
Assert(nmsgs > 0);
33023289

@@ -3354,7 +3341,6 @@ ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid,
33543341
XLogRecPtr lsn)
33553342
{
33563343
ReorderBufferTXN *txn;
3357-
ReorderBufferTXN *toptxn;
33583344

33593345
txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
33603346

@@ -3370,11 +3356,15 @@ ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid,
33703356
* conveniently check just top-level transaction and decide whether to
33713357
* build the hash table or not.
33723358
*/
3373-
toptxn = txn->toptxn;
3374-
if (toptxn != NULL && !rbtxn_has_catalog_changes(toptxn))
3359+
if (rbtxn_is_subtxn(txn))
33753360
{
3376-
toptxn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES;
3377-
dclist_push_tail(&rb->catchange_txns, &toptxn->catchange_node);
3361+
ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn);
3362+
3363+
if (!rbtxn_has_catalog_changes(toptxn))
3364+
{
3365+
toptxn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES;
3366+
dclist_push_tail(&rb->catchange_txns, &toptxn->catchange_node);
3367+
}
33783368
}
33793369
}
33803370

@@ -3619,7 +3609,7 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
36193609
(txn = ReorderBufferLargestStreamableTopTXN(rb)) != NULL)
36203610
{
36213611
/* we know there has to be one, because the size is not zero */
3622-
Assert(txn && !txn->toptxn);
3612+
Assert(txn && rbtxn_is_toptxn(txn));
36233613
Assert(txn->total_size > 0);
36243614
Assert(rb->size >= txn->total_size);
36253615

@@ -4007,7 +3997,7 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
40073997
bool txn_is_streamed;
40083998

40093999
/* We can never reach here for a subtransaction. */
4010-
Assert(txn->toptxn == NULL);
4000+
Assert(rbtxn_is_toptxn(txn));
40114001

40124002
/*
40134003
* We can't make any assumptions about base snapshot here, similar to what

src/backend/replication/pgoutput/pgoutput.c

+3-3
Original file line numberDiff line numberDiff line change
@@ -694,8 +694,8 @@ maybe_send_schema(LogicalDecodingContext *ctx,
694694
if (in_streaming)
695695
xid = change->txn->xid;
696696

697-
if (change->txn->toptxn)
698-
topxid = change->txn->toptxn->xid;
697+
if (rbtxn_is_subtxn(change->txn))
698+
topxid = rbtxn_get_toptxn(change->txn)->xid;
699699
else
700700
topxid = xid;
701701

@@ -1879,7 +1879,7 @@ pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
18791879
Assert(!in_streaming);
18801880

18811881
/* determine the toplevel transaction */
1882-
toptxn = (txn->toptxn) ? txn->toptxn : txn;
1882+
toptxn = rbtxn_get_toptxn(txn);
18831883

18841884
Assert(rbtxn_is_streamed(toptxn));
18851885

src/include/replication/reorderbuffer.h

+18
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,24 @@ typedef struct ReorderBufferChange
249249
((txn)->txn_flags & RBTXN_SKIPPED_PREPARE) != 0 \
250250
)
251251

252+
/* Is this a top-level transaction? */
253+
#define rbtxn_is_toptxn(txn) \
254+
( \
255+
(txn)->toptxn == NULL \
256+
)
257+
258+
/* Is this a subtransaction? */
259+
#define rbtxn_is_subtxn(txn) \
260+
( \
261+
(txn)->toptxn != NULL \
262+
)
263+
264+
/* Get the top-level transaction of this (sub)transaction. */
265+
#define rbtxn_get_toptxn(txn) \
266+
( \
267+
rbtxn_is_subtxn(txn) ? (txn)->toptxn : (txn) \
268+
)
269+
252270
typedef struct ReorderBufferTXN
253271
{
254272
/* See above */

0 commit comments

Comments
 (0)