Skip to content

Commit 7259736

Browse files
author
Amit Kapila
committed
Implement streaming mode in ReorderBuffer.
Instead of serializing the transaction to disk after reaching the logical_decoding_work_mem limit in memory, we consume the changes we have in memory and invoke stream API methods added by commit 45fdc97. However, sometimes if we have incomplete toast or speculative insert we spill to the disk because we can't generate the complete tuple and stream. And, as soon as we get the complete tuple we stream the transaction including the serialized changes. We can do this incremental processing thanks to having assignments (associating subxact with toplevel xacts) in WAL right away, and thanks to logging the invalidation messages at each command end. These features are added by commits 0bead9a and c55040c respectively. Now that we can stream in-progress transactions, the concurrent aborts may cause failures when the output plugin consults catalogs (both system and user-defined). We handle such failures by returning ERRCODE_TRANSACTION_ROLLBACK sqlerrcode from system table scan APIs to the backend or WALSender decoding a specific uncommitted transaction. The decoding logic on the receipt of such a sqlerrcode aborts the decoding of the current transaction and continue with the decoding of other transactions. We have ReorderBufferTXN pointer in each ReorderBufferChange by which we know which xact it belongs to. The output plugin can use this to decide which changes to discard in case of stream_abort_cb (e.g. when a subxact gets discarded). We also provide a new option via SQL APIs to fetch the changes being streamed. Author: Dilip Kumar, Tomas Vondra, Amit Kapila, Nikhil Sontakke Reviewed-by: Amit Kapila, Kuntal Ghosh, Ajin Cherian Tested-by: Neha Sharma, Mahendra Singh Thalor and Ajin Cherian Discussion: https://postgr.es/m/688b0b7f-2f6c-d827-c27b-216a8e3ea700@2ndquadrant.com
1 parent 0a7d771 commit 7259736

File tree

21 files changed

+1331
-106
lines changed

21 files changed

+1331
-106
lines changed

contrib/test_decoding/Makefile

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ PGFILEDESC = "test_decoding - example of a logical decoding output plugin"
55

66
REGRESS = ddl xact rewrite toast permissions decoding_in_xact \
77
decoding_into_rel binary prepared replorigin time messages \
8-
spill slot truncate
8+
spill slot truncate stream
99
ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \
1010
oldest_xmin snapshot_transfer subxact_without_top
1111

+94
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
SET synchronous_commit = on;
2+
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
3+
?column?
4+
----------
5+
init
6+
(1 row)
7+
8+
CREATE TABLE stream_test(data text);
9+
-- consume DDL
10+
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
11+
data
12+
------
13+
(0 rows)
14+
15+
-- streaming test with sub-transaction
16+
BEGIN;
17+
savepoint s1;
18+
SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', repeat('a', 50));
19+
?column?
20+
----------
21+
msg5
22+
(1 row)
23+
24+
INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 35) g(i);
25+
TRUNCATE table stream_test;
26+
rollback to s1;
27+
INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i);
28+
COMMIT;
29+
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'stream-changes', '1');
30+
data
31+
----------------------------------------------------------
32+
opening a streamed block for transaction
33+
streaming message: transactional: 1 prefix: test, sz: 50
34+
closing a streamed block for transaction
35+
aborting streamed (sub)transaction
36+
opening a streamed block for transaction
37+
streaming change for transaction
38+
streaming change for transaction
39+
streaming change for transaction
40+
streaming change for transaction
41+
streaming change for transaction
42+
streaming change for transaction
43+
streaming change for transaction
44+
streaming change for transaction
45+
streaming change for transaction
46+
streaming change for transaction
47+
streaming change for transaction
48+
streaming change for transaction
49+
streaming change for transaction
50+
streaming change for transaction
51+
streaming change for transaction
52+
streaming change for transaction
53+
streaming change for transaction
54+
streaming change for transaction
55+
streaming change for transaction
56+
streaming change for transaction
57+
closing a streamed block for transaction
58+
committing streamed transaction
59+
(27 rows)
60+
61+
-- streaming test for toast changes
62+
ALTER TABLE stream_test ALTER COLUMN data set storage external;
63+
-- consume DDL
64+
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
65+
data
66+
------
67+
(0 rows)
68+
69+
INSERT INTO stream_test SELECT repeat('a', 6000) || g.i FROM generate_series(1, 10) g(i);
70+
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'stream-changes', '1');
71+
data
72+
------------------------------------------
73+
opening a streamed block for transaction
74+
streaming change for transaction
75+
streaming change for transaction
76+
streaming change for transaction
77+
streaming change for transaction
78+
streaming change for transaction
79+
streaming change for transaction
80+
streaming change for transaction
81+
streaming change for transaction
82+
streaming change for transaction
83+
streaming change for transaction
84+
closing a streamed block for transaction
85+
committing streamed transaction
86+
(13 rows)
87+
88+
DROP TABLE stream_test;
89+
SELECT pg_drop_replication_slot('regression_slot');
90+
pg_drop_replication_slot
91+
--------------------------
92+
93+
(1 row)
94+

contrib/test_decoding/expected/truncate.out

+6
Original file line numberDiff line numberDiff line change
@@ -25,3 +25,9 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc
2525
COMMIT
2626
(9 rows)
2727

28+
SELECT pg_drop_replication_slot('regression_slot');
29+
pg_drop_replication_slot
30+
--------------------------
31+
32+
(1 row)
33+

contrib/test_decoding/sql/stream.sql

+30
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
SET synchronous_commit = on;
2+
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
3+
4+
CREATE TABLE stream_test(data text);
5+
6+
-- consume DDL
7+
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
8+
9+
-- streaming test with sub-transaction
10+
BEGIN;
11+
savepoint s1;
12+
SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', repeat('a', 50));
13+
INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 35) g(i);
14+
TRUNCATE table stream_test;
15+
rollback to s1;
16+
INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i);
17+
COMMIT;
18+
19+
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'stream-changes', '1');
20+
21+
-- streaming test for toast changes
22+
ALTER TABLE stream_test ALTER COLUMN data set storage external;
23+
-- consume DDL
24+
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
25+
26+
INSERT INTO stream_test SELECT repeat('a', 6000) || g.i FROM generate_series(1, 10) g(i);
27+
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'stream-changes', '1');
28+
29+
DROP TABLE stream_test;
30+
SELECT pg_drop_replication_slot('regression_slot');

contrib/test_decoding/sql/truncate.sql

+1
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,4 @@ TRUNCATE tab1, tab1 RESTART IDENTITY CASCADE;
1111
TRUNCATE tab1, tab2;
1212

1313
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
14+
SELECT pg_drop_replication_slot('regression_slot');

contrib/test_decoding/test_decoding.c

+13
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
122122
{
123123
ListCell *option;
124124
TestDecodingData *data;
125+
bool enable_streaming = false;
125126

126127
data = palloc0(sizeof(TestDecodingData));
127128
data->context = AllocSetContextCreate(ctx->context,
@@ -212,6 +213,16 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
212213
errmsg("could not parse value \"%s\" for parameter \"%s\"",
213214
strVal(elem->arg), elem->defname)));
214215
}
216+
else if (strcmp(elem->defname, "stream-changes") == 0)
217+
{
218+
if (elem->arg == NULL)
219+
continue;
220+
else if (!parse_bool(strVal(elem->arg), &enable_streaming))
221+
ereport(ERROR,
222+
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
223+
errmsg("could not parse value \"%s\" for parameter \"%s\"",
224+
strVal(elem->arg), elem->defname)));
225+
}
215226
else
216227
{
217228
ereport(ERROR,
@@ -221,6 +232,8 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
221232
elem->arg ? strVal(elem->arg) : "(null)")));
222233
}
223234
}
235+
236+
ctx->streaming &= enable_streaming;
224237
}
225238

226239
/* cleanup this plugin's resources */

doc/src/sgml/logicaldecoding.sgml

+6-3
Original file line numberDiff line numberDiff line change
@@ -433,9 +433,12 @@ typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);
433433
ALTER TABLE user_catalog_table SET (user_catalog_table = true);
434434
CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true);
435435
</programlisting>
436-
Any actions leading to transaction ID assignment are prohibited. That, among others,
437-
includes writing to tables, performing DDL changes, and
438-
calling <literal>pg_current_xact_id()</literal>.
436+
Note that access to user catalog tables or regular system catalog tables
437+
in the output plugins has to be done via the <literal>systable_*</literal>
438+
scan APIs only. Access via the <literal>heap_*</literal> scan APIs will
439+
error out. Additionally, any actions leading to transaction ID assignment
440+
are prohibited. That, among others, includes writing to tables, performing
441+
DDL changes, and calling <literal>pg_current_xact_id()</literal>.
439442
</para>
440443
</sect2>
441444

doc/src/sgml/test-decoding.sgml

+22
Original file line numberDiff line numberDiff line change
@@ -39,4 +39,26 @@ postgres=# SELECT * FROM pg_logical_slot_get_changes('test_slot', NULL, NULL, 'i
3939
</programlisting>
4040
</para>
4141

42+
<para>
43+
We can also get the changes of the in-progress transaction and the typical
44+
output, might be:
45+
46+
<programlisting>
47+
postgres[33712]=#* SELECT * FROM pg_logical_slot_get_changes('test_slot', NULL, NULL, 'stream-changes', '1');
48+
lsn | xid | data
49+
-----------+-----+--------------------------------------------------
50+
0/16B21F8 | 503 | opening a streamed block for transaction TXN 503
51+
0/16B21F8 | 503 | streaming change for TXN 503
52+
0/16B2300 | 503 | streaming change for TXN 503
53+
0/16B2408 | 503 | streaming change for TXN 503
54+
0/16BEBA0 | 503 | closing a streamed block for transaction TXN 503
55+
0/16B21F8 | 503 | opening a streamed block for transaction TXN 503
56+
0/16BECA8 | 503 | streaming change for TXN 503
57+
0/16BEDB0 | 503 | streaming change for TXN 503
58+
0/16BEEB8 | 503 | streaming change for TXN 503
59+
0/16BEBA0 | 503 | closing a streamed block for transaction TXN 503
60+
(10 rows)
61+
</programlisting>
62+
</para>
63+
4264
</sect1>

src/backend/access/heap/heapam.c

+13
Original file line numberDiff line numberDiff line change
@@ -1299,6 +1299,16 @@ heap_getnext(TableScanDesc sscan, ScanDirection direction)
12991299
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
13001300
errmsg_internal("only heap AM is supported")));
13011301

1302+
/*
1303+
* We don't expect direct calls to heap_getnext with valid CheckXidAlive
1304+
* for catalog or regular tables. See detailed comments in xact.c where
1305+
* these variables are declared. Normally we have such a check at tableam
1306+
* level API but this is called from many places so we need to ensure it
1307+
* here.
1308+
*/
1309+
if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan))
1310+
elog(ERROR, "unexpected heap_getnext call during logical decoding");
1311+
13021312
/* Note: no locking manipulations needed */
13031313

13041314
if (scan->rs_base.rs_flags & SO_ALLOW_PAGEMODE)
@@ -1956,6 +1966,9 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
19561966
{
19571967
xlrec.flags |= XLH_INSERT_CONTAINS_NEW_TUPLE;
19581968
bufflags |= REGBUF_KEEP_DATA;
1969+
1970+
if (IsToastRelation(relation))
1971+
xlrec.flags |= XLH_INSERT_ON_TOAST_RELATION;
19591972
}
19601973

19611974
XLogBeginInsert();

src/backend/access/heap/heapam_visibility.c

+37-5
Original file line numberDiff line numberDiff line change
@@ -1571,8 +1571,25 @@ HeapTupleSatisfiesHistoricMVCC(HeapTuple htup, Snapshot snapshot,
15711571
htup, buffer,
15721572
&cmin, &cmax);
15731573

1574+
/*
1575+
* If we haven't resolved the combocid to cmin/cmax, that means we
1576+
* have not decoded the combocid yet. That means the cmin is
1577+
* definitely in the future, and we're not supposed to see the tuple
1578+
* yet.
1579+
*
1580+
* XXX This only applies to decoding of in-progress transactions. In
1581+
* regular logical decoding we only execute this code at commit time,
1582+
* at which point we should have seen all relevant combocids. So
1583+
* ideally, we should error out in this case but in practice, this
1584+
* won't happen. If we are too worried about this then we can add an
1585+
* elog inside ResolveCminCmaxDuringDecoding.
1586+
*
1587+
* XXX For the streaming case, we can track the largest combocid
1588+
* assigned, and error out based on this (when unable to resolve
1589+
* combocid below that observed maximum value).
1590+
*/
15741591
if (!resolved)
1575-
elog(ERROR, "could not resolve cmin/cmax of catalog tuple");
1592+
return false;
15761593

15771594
Assert(cmin != InvalidCommandId);
15781595

@@ -1642,10 +1659,25 @@ HeapTupleSatisfiesHistoricMVCC(HeapTuple htup, Snapshot snapshot,
16421659
htup, buffer,
16431660
&cmin, &cmax);
16441661

1645-
if (!resolved)
1646-
elog(ERROR, "could not resolve combocid to cmax");
1647-
1648-
Assert(cmax != InvalidCommandId);
1662+
/*
1663+
* If we haven't resolved the combocid to cmin/cmax, that means we
1664+
* have not decoded the combocid yet. That means the cmax is
1665+
* definitely in the future, and we're still supposed to see the
1666+
* tuple.
1667+
*
1668+
* XXX This only applies to decoding of in-progress transactions. In
1669+
* regular logical decoding we only execute this code at commit time,
1670+
* at which point we should have seen all relevant combocids. So
1671+
* ideally, we should error out in this case but in practice, this
1672+
* won't happen. If we are too worried about this then we can add an
1673+
* elog inside ResolveCminCmaxDuringDecoding.
1674+
*
1675+
* XXX For the streaming case, we can track the largest combocid
1676+
* assigned, and error out based on this (when unable to resolve
1677+
* combocid below that observed maximum value).
1678+
*/
1679+
if (!resolved || cmax == InvalidCommandId)
1680+
return true;
16491681

16501682
if (cmax >= snapshot->curcid)
16511683
return true; /* deleted after scan started */

0 commit comments

Comments
 (0)