Skip to content

Commit 592f00f

Browse files
author
Amit Kapila
committed
Update replication statistics after every stream/spill.
Currently, replication slot statistics are updated at prepare, commit, and rollback. Now, if the transaction is interrupted the stats might not get updated. Fixed this by updating replication statistics after every stream/spill. In passing update the docs to change the description of some of the slot stats. Author: Vignesh C, Sawada Masahiko Reviewed-by: Amit Kapila Discussion: https://postgr.es/m/20210319185247.ldebgpdaxsowiflw@alap3.anarazel.de
1 parent 7f2e10b commit 592f00f

File tree

4 files changed

+23
-16
lines changed

4 files changed

+23
-16
lines changed

doc/src/sgml/monitoring.sgml

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2708,10 +2708,10 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
27082708
<structfield>stream_bytes</structfield><type>bigint</type>
27092709
</para>
27102710
<para>
2711-
Amount of decoded in-progress transaction data streamed to the decoding
2712-
output plugin while decoding changes from WAL for this slot. This and other
2713-
streaming counters for this slot can be used to gauge the network I/O which
2714-
occurred during logical decoding and allow tuning <literal>logical_decoding_work_mem</literal>.
2711+
Amount of transaction data decoded for streaming in-progress
2712+
transactions to the decoding output plugin while decoding changes from
2713+
WAL for this slot. This and other streaming counters for this slot can
2714+
be used to tune <literal>logical_decoding_work_mem</literal>.
27152715
</para>
27162716
</entry>
27172717
</row>
@@ -2733,10 +2733,9 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
27332733
<structfield>total_bytes</structfield><type>bigint</type>
27342734
</para>
27352735
<para>
2736-
Amount of decoded transaction data sent to the decoding output plugin
2737-
while decoding the changes from WAL for this slot. This can be used to
2738-
gauge the total amount of data sent during logical decoding. Note that
2739-
this includes data that is streamed and/or spilled.
2736+
Amount of transaction data decoded for sending transactions to the
2737+
decoding output plugin while decoding changes from WAL for this slot.
2738+
Note that this includes data that is streamed and/or spilled.
27402739
</para>
27412740
</entry>
27422741
</row>

src/backend/replication/logical/decode.c

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -746,9 +746,10 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
746746
}
747747

748748
/*
749-
* Update the decoding stats at transaction prepare/commit/abort. It is
750-
* not clear that sending more or less frequently than this would be
751-
* better.
749+
* Update the decoding stats at transaction prepare/commit/abort.
750+
* Additionally we send the stats when we spill or stream the changes to
751+
* avoid losing them in case the decoding is interrupted. It is not clear
752+
* that sending more or less frequently than this would be better.
752753
*/
753754
UpdateDecodingStats(ctx);
754755
}
@@ -828,9 +829,10 @@ DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
828829
ReorderBufferPrepare(ctx->reorder, xid, parsed->twophase_gid);
829830

830831
/*
831-
* Update the decoding stats at transaction prepare/commit/abort. It is
832-
* not clear that sending more or less frequently than this would be
833-
* better.
832+
* Update the decoding stats at transaction prepare/commit/abort.
833+
* Additionally we send the stats when we spill or stream the changes to
834+
* avoid losing them in case the decoding is interrupted. It is not clear
835+
* that sending more or less frequently than this would be better.
834836
*/
835837
UpdateDecodingStats(ctx);
836838
}

src/backend/replication/logical/reorderbuffer.c

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3559,6 +3559,9 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
35593559

35603560
/* don't consider already serialized transactions */
35613561
rb->spillTxns += (rbtxn_is_serialized(txn) || rbtxn_is_serialized_clear(txn)) ? 0 : 1;
3562+
3563+
/* update the decoding stats */
3564+
UpdateDecodingStats((LogicalDecodingContext *) rb->private_data);
35623565
}
35633566

35643567
Assert(spilled == txn->nentries_mem);
@@ -3928,6 +3931,9 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
39283931
/* Don't consider already streamed transaction. */
39293932
rb->streamTxns += (txn_is_streamed) ? 0 : 1;
39303933

3934+
/* update the decoding stats */
3935+
UpdateDecodingStats((LogicalDecodingContext *) rb->private_data);
3936+
39313937
Assert(dlist_is_empty(&txn->changes));
39323938
Assert(txn->nentries == 0);
39333939
Assert(txn->nentries_mem == 0);

src/include/replication/reorderbuffer.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -617,14 +617,14 @@ struct ReorderBuffer
617617
/* Statistics about transactions streamed to the decoding output plugin */
618618
int64 streamTxns; /* number of transactions streamed */
619619
int64 streamCount; /* streaming invocation counter */
620-
int64 streamBytes; /* amount of data streamed */
620+
int64 streamBytes; /* amount of data decoded */
621621

622622
/*
623623
* Statistics about all the transactions sent to the decoding output
624624
* plugin
625625
*/
626626
int64 totalTxns; /* total number of transactions sent */
627-
int64 totalBytes; /* total amount of data sent */
627+
int64 totalBytes; /* total amount of data decoded */
628628
};
629629

630630

0 commit comments

Comments
 (0)