Skip to content

Commit 57891c2

Browse files
author
Amit Kapila
committed
Add STREAM_START/STREAM_STOP for transactional messages during decoding.
In test_decoding module, when skip_empty_xacts option was specified, add stream_start/stop for streaming transactional messages. This makes the handling of transactional messages stream consistent irrespective of whether skip_empty_xacts option was specified. Commit 26dd028 made a similar change for non-streaming messages but forgot to update the streaming cases. Author: Peter Smith Reviewed-by: Amit Kapila Discussion: http://postgr.es/m/OS0PR01MB5716AEBD2988F8F5E9D5985794DFA@OS0PR01MB5716.jpnprd01.prod.outlook.com
1 parent 675fed4 commit 57891c2

File tree

3 files changed

+25
-3
lines changed

3 files changed

+25
-3
lines changed

contrib/test_decoding/expected/stream.out

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,10 @@ COMMIT;
2929
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
3030
data
3131
----------------------------------------------------------
32+
opening a streamed block for transaction
3233
streaming message: transactional: 1 prefix: test, sz: 50
34+
closing a streamed block for transaction
35+
aborting streamed (sub)transaction
3336
opening a streamed block for transaction
3437
streaming change for transaction
3538
streaming change for transaction
@@ -53,7 +56,7 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'incl
5356
streaming change for transaction
5457
closing a streamed block for transaction
5558
committing streamed transaction
56-
(24 rows)
59+
(27 rows)
5760

5861
-- streaming test for toast changes
5962
ALTER TABLE stream_test ALTER COLUMN data set storage external;

contrib/test_decoding/expected/twophase_stream.out

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,10 @@ PREPARE TRANSACTION 'test1';
3131
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
3232
data
3333
----------------------------------------------------------
34+
opening a streamed block for transaction
3435
streaming message: transactional: 1 prefix: test, sz: 50
36+
closing a streamed block for transaction
37+
aborting streamed (sub)transaction
3538
opening a streamed block for transaction
3639
streaming change for transaction
3740
streaming change for transaction
@@ -55,7 +58,7 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'incl
5558
streaming change for transaction
5659
closing a streamed block for transaction
5760
preparing streamed transaction 'test1'
58-
(24 rows)
61+
(27 rows)
5962

6063
COMMIT PREPARED 'test1';
6164
--should show the COMMIT PREPARED and the other changes in the transaction
@@ -84,8 +87,11 @@ PREPARE TRANSACTION 'test1_nodecode';
8487
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
8588
data
8689
----------------------------------------------------------
90+
opening a streamed block for transaction
8791
streaming message: transactional: 1 prefix: test, sz: 50
88-
(1 row)
92+
closing a streamed block for transaction
93+
aborting streamed (sub)transaction
94+
(4 rows)
8995

9096
COMMIT PREPARED 'test1_nodecode';
9197
-- should show the inserts but not show a COMMIT PREPARED but a COMMIT

contrib/test_decoding/test_decoding.c

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -944,6 +944,19 @@ pg_decode_stream_message(LogicalDecodingContext *ctx,
944944
ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
945945
const char *prefix, Size sz, const char *message)
946946
{
947+
/* Output stream start if we haven't yet for transactional messages. */
948+
if (transactional)
949+
{
950+
TestDecodingData *data = ctx->output_plugin_private;
951+
TestDecodingTxnData *txndata = txn->output_plugin_private;
952+
953+
if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
954+
{
955+
pg_output_stream_start(ctx, data, txn, false);
956+
}
957+
txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
958+
}
959+
947960
OutputPluginPrepareWrite(ctx, true);
948961

949962
if (transactional)

0 commit comments

Comments
 (0)