Skip to content

Commit 713f7c4

Browse files
committed
Fix after trigger execution in logical replication
From: Petr Jelinek <petr.jelinek@2ndquadrant.com> Tested-by: Thom Brown <thom@linux.com>
1 parent 1e8a850 commit 713f7c4

File tree

2 files changed

+128
-0
lines changed

2 files changed

+128
-0
lines changed

src/backend/replication/logical/worker.c

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,9 @@ create_estate_for_relation(LogicalRepRelMapEntry *rel)
173173
if (resultRelInfo->ri_TrigDesc)
174174
estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(estate);
175175

176+
/* Prepare to catch AFTER triggers. */
177+
AfterTriggerBeginQuery();
178+
176179
return estate;
177180
}
178181

@@ -533,6 +536,10 @@ apply_handle_insert(StringInfo s)
533536
/* Cleanup. */
534537
ExecCloseIndices(estate->es_result_relation_info);
535538
PopActiveSnapshot();
539+
540+
/* Handle queued AFTER triggers. */
541+
AfterTriggerEndQuery(estate);
542+
536543
ExecResetTupleTable(estate->es_tupleTable, false);
537544
FreeExecutorState(estate);
538545

@@ -673,6 +680,10 @@ apply_handle_update(StringInfo s)
673680
/* Cleanup. */
674681
ExecCloseIndices(estate->es_result_relation_info);
675682
PopActiveSnapshot();
683+
684+
/* Handle queued AFTER triggers. */
685+
AfterTriggerEndQuery(estate);
686+
676687
EvalPlanQualEnd(&epqstate);
677688
ExecResetTupleTable(estate->es_tupleTable, false);
678689
FreeExecutorState(estate);
@@ -760,6 +771,10 @@ apply_handle_delete(StringInfo s)
760771
/* Cleanup. */
761772
ExecCloseIndices(estate->es_result_relation_info);
762773
PopActiveSnapshot();
774+
775+
/* Handle queued AFTER triggers. */
776+
AfterTriggerEndQuery(estate);
777+
763778
EvalPlanQualEnd(&epqstate);
764779
ExecResetTupleTable(estate->es_tupleTable, false);
765780
FreeExecutorState(estate);
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
# Basic logical replication test
2+
use strict;
3+
use warnings;
4+
use PostgresNode;
5+
use TestLib;
6+
use Test::More tests => 4;
7+
8+
# Initialize publisher node
9+
my $node_publisher = get_new_node('publisher');
10+
$node_publisher->init(allows_streaming => 'logical');
11+
$node_publisher->start;
12+
13+
# Create subscriber node
14+
my $node_subscriber = get_new_node('subscriber');
15+
$node_subscriber->init(allows_streaming => 'logical');
16+
$node_subscriber->start;
17+
18+
# Setup structure on publisher
19+
$node_publisher->safe_psql('postgres',
20+
"CREATE TABLE tab_fk (bid int PRIMARY KEY);");
21+
$node_publisher->safe_psql('postgres',
22+
"CREATE TABLE tab_fk_ref (id int PRIMARY KEY, bid int REFERENCES tab_fk (bid));");
23+
24+
# Setup structure on subscriber
25+
$node_subscriber->safe_psql('postgres',
26+
"CREATE TABLE tab_fk (bid int PRIMARY KEY);");
27+
$node_subscriber->safe_psql('postgres',
28+
"CREATE TABLE tab_fk_ref (id int PRIMARY KEY, bid int REFERENCES tab_fk (bid));");
29+
30+
# Setup logical replication
31+
my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
32+
$node_publisher->safe_psql('postgres',
33+
"CREATE PUBLICATION tap_pub FOR ALL TABLES;");
34+
35+
my $appname = 'tap_sub';
36+
$node_subscriber->safe_psql('postgres',
37+
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub;");
38+
39+
# Wait for subscriber to finish initialization
40+
my $caughtup_query =
41+
"SELECT pg_current_wal_location() <= replay_location FROM pg_stat_replication WHERE application_name = '$appname';";
42+
$node_publisher->poll_query_until('postgres', $caughtup_query)
43+
or die "Timed out while waiting for subscriber to catch up";
44+
45+
$node_publisher->safe_psql('postgres',
46+
"INSERT INTO tab_fk (bid) VALUES (1);");
47+
$node_publisher->safe_psql('postgres',
48+
"INSERT INTO tab_fk_ref (id, bid) VALUES (1, 1);");
49+
50+
$node_publisher->poll_query_until('postgres', $caughtup_query)
51+
or die "Timed out while waiting for subscriber to catch up";
52+
53+
# Check data on subscriber
54+
my $result =
55+
$node_subscriber->safe_psql('postgres', "SELECT count(*), min(bid), max(bid) FROM tab_fk;");
56+
is($result, qq(1|1|1), 'check replicated tab_fk inserts on subscriber');
57+
58+
$result =
59+
$node_subscriber->safe_psql('postgres', "SELECT count(*), min(bid), max(bid) FROM tab_fk_ref;");
60+
is($result, qq(1|1|1), 'check replicated tab_fk_ref inserts on subscriber');
61+
62+
# Drop the fk on publisher
63+
$node_publisher->safe_psql('postgres',
64+
"DROP TABLE tab_fk CASCADE;");
65+
66+
# Insert data
67+
$node_publisher->safe_psql('postgres',
68+
"INSERT INTO tab_fk_ref (id, bid) VALUES (2, 2);");
69+
70+
$node_publisher->poll_query_until('postgres', $caughtup_query)
71+
or die "Timed out while waiting for subscriber to catch up";
72+
73+
# FK is not enforced on subscriber
74+
$result =
75+
$node_subscriber->safe_psql('postgres', "SELECT count(*), min(bid), max(bid) FROM tab_fk_ref;");
76+
is($result, qq(2|1|2), 'check FK ignored on subscriber');
77+
78+
# Add replica trigger
79+
$node_subscriber->safe_psql('postgres', qq{
80+
CREATE FUNCTION filter_basic_dml_fn() RETURNS TRIGGER AS \$\$
81+
BEGIN
82+
IF (TG_OP = 'INSERT') THEN
83+
IF (NEW.id < 10) THEN
84+
RETURN NEW;
85+
ELSE
86+
RETURN NULL;
87+
END IF;
88+
ELSE
89+
RAISE WARNING 'Unknown action';
90+
RETURN NULL;
91+
END IF;
92+
END;
93+
\$\$ LANGUAGE plpgsql;
94+
CREATE TRIGGER filter_basic_dml_trg
95+
BEFORE INSERT ON tab_fk_ref
96+
FOR EACH ROW EXECUTE PROCEDURE filter_basic_dml_fn();
97+
ALTER TABLE tab_fk_ref ENABLE REPLICA TRIGGER filter_basic_dml_trg;
98+
});
99+
100+
# Insert data
101+
$node_publisher->safe_psql('postgres',
102+
"INSERT INTO tab_fk_ref (id, bid) VALUES (10, 10);");
103+
104+
$node_publisher->poll_query_until('postgres', $caughtup_query)
105+
or die "Timed out while waiting for subscriber to catch up";
106+
107+
# The row should be skipped on subscriber
108+
$result =
109+
$node_subscriber->safe_psql('postgres', "SELECT count(*), min(bid), max(bid) FROM tab_fk_ref;");
110+
is($result, qq(2|1|2), 'check replica trigger applied on subscriber');
111+
112+
$node_subscriber->stop('fast');
113+
$node_publisher->stop('fast');

0 commit comments

Comments
 (0)