Skip to content

Commit 5dfd1e5

Browse files
committed
Logical decoding of TRUNCATE
Add a new WAL record type for TRUNCATE, which is only used when wal_level >= logical. (For physical replication, TRUNCATE is already replicated via SMGR records.) Add new callback for logical decoding output plugins to receive TRUNCATE actions. Author: Simon Riggs <simon@2ndquadrant.com> Author: Marco Nenciarini <marco.nenciarini@2ndquadrant.it> Author: Peter Eisentraut <peter.eisentraut@2ndquadrant.com> Reviewed-by: Petr Jelinek <petr.jelinek@2ndquadrant.com> Reviewed-by: Andres Freund <andres@anarazel.de> Reviewed-by: Alvaro Herrera <alvherre@alvh.no-ip.org>
1 parent b508a56 commit 5dfd1e5

File tree

15 files changed

+414
-13
lines changed

15 files changed

+414
-13
lines changed

contrib/test_decoding/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ submake-test_decoding:
3939

4040
REGRESSCHECKS=ddl xact rewrite toast permissions decoding_in_xact \
4141
decoding_into_rel binary prepared replorigin time messages \
42-
spill slot
42+
spill slot truncate
4343

4444
regresscheck: | submake-regress submake-test_decoding temp-install
4545
$(pg_regress_check) \
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
2+
?column?
3+
----------
4+
init
5+
(1 row)
6+
7+
CREATE TABLE tab1 (id serial unique, data int);
8+
CREATE TABLE tab2 (a int primary key, b int);
9+
TRUNCATE tab1;
10+
TRUNCATE tab1, tab1 RESTART IDENTITY CASCADE;
11+
TRUNCATE tab1, tab2;
12+
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
13+
data
14+
------------------------------------------------------
15+
BEGIN
16+
table public.tab1: TRUNCATE: (no-flags)
17+
COMMIT
18+
BEGIN
19+
table public.tab1: TRUNCATE: restart_seqs cascade
20+
COMMIT
21+
BEGIN
22+
table public.tab1, public.tab2: TRUNCATE: (no-flags)
23+
COMMIT
24+
(9 rows)
25+
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
2+
3+
CREATE TABLE tab1 (id serial unique, data int);
4+
CREATE TABLE tab2 (a int primary key, b int);
5+
6+
TRUNCATE tab1;
7+
TRUNCATE tab1, tab1 RESTART IDENTITY CASCADE;
8+
TRUNCATE tab1, tab2;
9+
10+
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');

contrib/test_decoding/test_decoding.c

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,10 @@ static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
5252
static void pg_decode_change(LogicalDecodingContext *ctx,
5353
ReorderBufferTXN *txn, Relation rel,
5454
ReorderBufferChange *change);
55+
static void pg_decode_truncate(LogicalDecodingContext *ctx,
56+
ReorderBufferTXN *txn,
57+
int nrelations, Relation relations[],
58+
ReorderBufferChange *change);
5559
static bool pg_decode_filter(LogicalDecodingContext *ctx,
5660
RepOriginId origin_id);
5761
static void pg_decode_message(LogicalDecodingContext *ctx,
@@ -74,6 +78,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
7478
cb->startup_cb = pg_decode_startup;
7579
cb->begin_cb = pg_decode_begin_txn;
7680
cb->change_cb = pg_decode_change;
81+
cb->truncate_cb = pg_decode_truncate;
7782
cb->commit_cb = pg_decode_commit_txn;
7883
cb->filter_by_origin_cb = pg_decode_filter;
7984
cb->shutdown_cb = pg_decode_shutdown;
@@ -480,6 +485,59 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
480485
OutputPluginWrite(ctx, true);
481486
}
482487

488+
static void
489+
pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
490+
int nrelations, Relation relations[], ReorderBufferChange *change)
491+
{
492+
TestDecodingData *data;
493+
MemoryContext old;
494+
int i;
495+
496+
data = ctx->output_plugin_private;
497+
498+
/* output BEGIN if we haven't yet */
499+
if (data->skip_empty_xacts && !data->xact_wrote_changes)
500+
{
501+
pg_output_begin(ctx, data, txn, false);
502+
}
503+
data->xact_wrote_changes = true;
504+
505+
/* Avoid leaking memory by using and resetting our own context */
506+
old = MemoryContextSwitchTo(data->context);
507+
508+
OutputPluginPrepareWrite(ctx, true);
509+
510+
appendStringInfoString(ctx->out, "table ");
511+
512+
for (i = 0; i < nrelations; i++)
513+
{
514+
if (i > 0)
515+
appendStringInfoString(ctx->out, ", ");
516+
517+
appendStringInfoString(ctx->out,
518+
quote_qualified_identifier(get_namespace_name(relations[i]->rd_rel->relnamespace),
519+
NameStr(relations[i]->rd_rel->relname)));
520+
}
521+
522+
appendStringInfoString(ctx->out, ": TRUNCATE:");
523+
524+
if (change->data.truncate.restart_seqs
525+
|| change->data.truncate.cascade)
526+
{
527+
if (change->data.truncate.restart_seqs)
528+
appendStringInfo(ctx->out, " restart_seqs");
529+
if (change->data.truncate.cascade)
530+
appendStringInfo(ctx->out, " cascade");
531+
}
532+
else
533+
appendStringInfoString(ctx->out, " (no-flags)");
534+
535+
MemoryContextSwitchTo(old);
536+
MemoryContextReset(data->context);
537+
538+
OutputPluginWrite(ctx, true);
539+
}
540+
483541
static void
484542
pg_decode_message(LogicalDecodingContext *ctx,
485543
ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,

doc/src/sgml/logicaldecoding.sgml

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -383,6 +383,7 @@ typedef struct OutputPluginCallbacks
383383
LogicalDecodeStartupCB startup_cb;
384384
LogicalDecodeBeginCB begin_cb;
385385
LogicalDecodeChangeCB change_cb;
386+
LogicalDecodeTruncateCB truncate_cb;
386387
LogicalDecodeCommitCB commit_cb;
387388
LogicalDecodeMessageCB message_cb;
388389
LogicalDecodeFilterByOriginCB filter_by_origin_cb;
@@ -394,8 +395,10 @@ typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);
394395
The <function>begin_cb</function>, <function>change_cb</function>
395396
and <function>commit_cb</function> callbacks are required,
396397
while <function>startup_cb</function>,
397-
<function>filter_by_origin_cb</function>
398+
<function>filter_by_origin_cb</function>, <function>truncate_cb</function>,
398399
and <function>shutdown_cb</function> are optional.
400+
If <function>truncate_cb</function> is not set but a
401+
<command>TRUNCATE</command> is to be decoded, the action will be ignored.
399402
</para>
400403
</sect2>
401404

@@ -590,6 +593,28 @@ typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx,
590593
</note>
591594
</sect3>
592595

596+
<sect3 id="logicaldecoding-output-plugin-truncate">
597+
<title>Truncate Callback</title>
598+
599+
<para>
600+
The <function>truncate_cb</function> callback is called for a
601+
<command>TRUNCATE</command> command.
602+
<programlisting>
603+
typedef void (*LogicalDecodeTruncateCB) (struct LogicalDecodingContext *ctx,
604+
ReorderBufferTXN *txn,
605+
int nrelations,
606+
Relation relations[],
607+
ReorderBufferChange *change);
608+
</programlisting>
609+
The parameters are analogous to the <function>change_cb</function>
610+
callback. However, because <command>TRUNCATE</command> actions on
611+
tables connected by foreign keys need to be executed together, this
612+
callback receives an array of relations instead of just a single one.
613+
See the description of the <xref linkend="sql-truncate"/> statement for
614+
details.
615+
</para>
616+
</sect3>
617+
593618
<sect3 id="logicaldecoding-output-plugin-filter-origin">
594619
<title>Origin Filter Callback</title>
595620

src/backend/access/heap/heapam.c

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9260,6 +9260,13 @@ heap_redo(XLogReaderState *record)
92609260
case XLOG_HEAP_UPDATE:
92619261
heap_xlog_update(record, false);
92629262
break;
9263+
case XLOG_HEAP_TRUNCATE:
9264+
/*
9265+
* TRUNCATE is a no-op because the actions are already logged as
9266+
* SMGR WAL records. TRUNCATE WAL record only exists for logical
9267+
* decoding.
9268+
*/
9269+
break;
92639270
case XLOG_HEAP_HOT_UPDATE:
92649271
heap_xlog_update(record, true);
92659272
break;

src/backend/access/rmgrdesc/heapdesc.c

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,19 @@ heap_desc(StringInfo buf, XLogReaderState *record)
7575
xlrec->new_offnum,
7676
xlrec->new_xmax);
7777
}
78+
else if (info == XLOG_HEAP_TRUNCATE)
79+
{
80+
xl_heap_truncate *xlrec = (xl_heap_truncate *) rec;
81+
int i;
82+
83+
if (xlrec->flags & XLH_TRUNCATE_CASCADE)
84+
appendStringInfo(buf, "cascade ");
85+
if (xlrec->flags & XLH_TRUNCATE_RESTART_SEQS)
86+
appendStringInfo(buf, "restart_seqs ");
87+
appendStringInfo(buf, "nrelids %u relids", xlrec->nrelids);
88+
for (i = 0; i < xlrec->nrelids; i++)
89+
appendStringInfo(buf, " %u", xlrec->relids[i]);
90+
}
7891
else if (info == XLOG_HEAP_CONFIRM)
7992
{
8093
xl_heap_confirm *xlrec = (xl_heap_confirm *) rec;
@@ -186,6 +199,9 @@ heap_identify(uint8 info)
186199
case XLOG_HEAP_HOT_UPDATE | XLOG_HEAP_INIT_PAGE:
187200
id = "HOT_UPDATE+INIT";
188201
break;
202+
case XLOG_HEAP_TRUNCATE:
203+
id = "TRUNCATE";
204+
break;
189205
case XLOG_HEAP_CONFIRM:
190206
id = "HEAP_CONFIRM";
191207
break;

src/backend/commands/tablecmds.c

Lines changed: 95 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
#include "access/genam.h"
1818
#include "access/heapam.h"
19+
#include "access/heapam_xlog.h"
1920
#include "access/multixact.h"
2021
#include "access/reloptions.h"
2122
#include "access/relscan.h"
@@ -1322,11 +1323,7 @@ ExecuteTruncate(TruncateStmt *stmt)
13221323
{
13231324
List *rels = NIL;
13241325
List *relids = NIL;
1325-
List *seq_relids = NIL;
1326-
EState *estate;
1327-
ResultRelInfo *resultRelInfos;
1328-
ResultRelInfo *resultRelInfo;
1329-
SubTransactionId mySubid;
1326+
List *relids_logged = NIL;
13301327
ListCell *cell;
13311328

13321329
/*
@@ -1350,6 +1347,9 @@ ExecuteTruncate(TruncateStmt *stmt)
13501347
truncate_check_rel(rel);
13511348
rels = lappend(rels, rel);
13521349
relids = lappend_oid(relids, myrelid);
1350+
/* Log this relation only if needed for logical decoding */
1351+
if (RelationIsLogicallyLogged(rel))
1352+
relids_logged = lappend_oid(relids_logged, myrelid);
13531353

13541354
if (recurse)
13551355
{
@@ -1370,6 +1370,9 @@ ExecuteTruncate(TruncateStmt *stmt)
13701370
truncate_check_rel(rel);
13711371
rels = lappend(rels, rel);
13721372
relids = lappend_oid(relids, childrelid);
1373+
/* Log this relation only if needed for logical decoding */
1374+
if (RelationIsLogicallyLogged(rel))
1375+
relids_logged = lappend_oid(relids_logged, childrelid);
13731376
}
13741377
}
13751378
else if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
@@ -1379,15 +1382,56 @@ ExecuteTruncate(TruncateStmt *stmt)
13791382
errhint("Do not specify the ONLY keyword, or use TRUNCATE ONLY on the partitions directly.")));
13801383
}
13811384

1385+
ExecuteTruncateGuts(rels, relids, relids_logged,
1386+
stmt->behavior, stmt->restart_seqs);
1387+
1388+
/* And close the rels */
1389+
foreach(cell, rels)
1390+
{
1391+
Relation rel = (Relation) lfirst(cell);
1392+
1393+
heap_close(rel, NoLock);
1394+
}
1395+
}
1396+
1397+
/*
1398+
* ExecuteTruncateGuts
1399+
*
1400+
* Internal implementation of TRUNCATE. This is called by the actual TRUNCATE
1401+
* command (see above) as well as replication subscribers that execute a
1402+
* replicated TRUNCATE action.
1403+
*
1404+
* explicit_rels is the list of Relations to truncate that the command
1405+
* specified. relids is the list of Oids corresponding to explicit_rels.
1406+
* relids_logged is the list of Oids (a subset of relids) that require
1407+
* WAL-logging. This is all a bit redundant, but the existing callers have
1408+
* this information handy in this form.
1409+
*/
1410+
void
1411+
ExecuteTruncateGuts(List *explicit_rels, List *relids, List *relids_logged,
1412+
DropBehavior behavior, bool restart_seqs)
1413+
{
1414+
List *rels;
1415+
List *seq_relids = NIL;
1416+
EState *estate;
1417+
ResultRelInfo *resultRelInfos;
1418+
ResultRelInfo *resultRelInfo;
1419+
SubTransactionId mySubid;
1420+
ListCell *cell;
1421+
Oid *logrelids;
1422+
13821423
/*
1424+
* Open, exclusive-lock, and check all the explicitly-specified relations
1425+
*
13831426
* In CASCADE mode, suck in all referencing relations as well. This
13841427
* requires multiple iterations to find indirectly-dependent relations. At
13851428
* each phase, we need to exclusive-lock new rels before looking for their
13861429
* dependencies, else we might miss something. Also, we check each rel as
13871430
* soon as we open it, to avoid a faux pas such as holding lock for a long
13881431
* time on a rel we have no permissions for.
13891432
*/
1390-
if (stmt->behavior == DROP_CASCADE)
1433+
rels = list_copy(explicit_rels);
1434+
if (behavior == DROP_CASCADE)
13911435
{
13921436
for (;;)
13931437
{
@@ -1409,6 +1453,9 @@ ExecuteTruncate(TruncateStmt *stmt)
14091453
truncate_check_rel(rel);
14101454
rels = lappend(rels, rel);
14111455
relids = lappend_oid(relids, relid);
1456+
/* Log this relation only if needed for logical decoding */
1457+
if (RelationIsLogicallyLogged(rel))
1458+
relids_logged = lappend_oid(relids_logged, relid);
14121459
}
14131460
}
14141461
}
@@ -1421,7 +1468,7 @@ ExecuteTruncate(TruncateStmt *stmt)
14211468
#ifdef USE_ASSERT_CHECKING
14221469
heap_truncate_check_FKs(rels, false);
14231470
#else
1424-
if (stmt->behavior == DROP_RESTRICT)
1471+
if (behavior == DROP_RESTRICT)
14251472
heap_truncate_check_FKs(rels, false);
14261473
#endif
14271474

@@ -1431,7 +1478,7 @@ ExecuteTruncate(TruncateStmt *stmt)
14311478
* We want to do this early since it's pointless to do all the truncation
14321479
* work only to fail on sequence permissions.
14331480
*/
1434-
if (stmt->restart_seqs)
1481+
if (restart_seqs)
14351482
{
14361483
foreach(cell, rels)
14371484
{
@@ -1586,6 +1633,41 @@ ExecuteTruncate(TruncateStmt *stmt)
15861633
ResetSequence(seq_relid);
15871634
}
15881635

1636+
/*
1637+
* Write a WAL record to allow this set of actions to be logically decoded.
1638+
*
1639+
* Assemble an array of relids so we can write a single WAL record for the
1640+
* whole action.
1641+
*/
1642+
if (list_length(relids_logged) > 0)
1643+
{
1644+
xl_heap_truncate xlrec;
1645+
int i = 0;
1646+
1647+
/* should only get here if wal_level >= logical */
1648+
Assert(XLogLogicalInfoActive());
1649+
1650+
logrelids = palloc(list_length(relids_logged) * sizeof(Oid));
1651+
foreach (cell, relids_logged)
1652+
logrelids[i++] = lfirst_oid(cell);
1653+
1654+
xlrec.dbId = MyDatabaseId;
1655+
xlrec.nrelids = list_length(relids_logged);
1656+
xlrec.flags = 0;
1657+
if (behavior == DROP_CASCADE)
1658+
xlrec.flags |= XLH_TRUNCATE_CASCADE;
1659+
if (restart_seqs)
1660+
xlrec.flags |= XLH_TRUNCATE_RESTART_SEQS;
1661+
1662+
XLogBeginInsert();
1663+
XLogRegisterData((char *) &xlrec, SizeOfHeapTruncate);
1664+
XLogRegisterData((char *) logrelids, list_length(relids_logged) * sizeof(Oid));
1665+
1666+
XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
1667+
1668+
(void) XLogInsert(RM_HEAP_ID, XLOG_HEAP_TRUNCATE);
1669+
}
1670+
15891671
/*
15901672
* Process all AFTER STATEMENT TRUNCATE triggers.
15911673
*/
@@ -1603,7 +1685,11 @@ ExecuteTruncate(TruncateStmt *stmt)
16031685
/* We can clean up the EState now */
16041686
FreeExecutorState(estate);
16051687

1606-
/* And close the rels (can't do this while EState still holds refs) */
1688+
/*
1689+
* Close any rels opened by CASCADE (can't do this while EState still
1690+
* holds refs)
1691+
*/
1692+
rels = list_difference_ptr(rels, explicit_rels);
16071693
foreach(cell, rels)
16081694
{
16091695
Relation rel = (Relation) lfirst(cell);

0 commit comments

Comments
 (0)