Skip to content

Commit 66fd0ad

Browse files
committed
Have logical replication subscriber fire column triggers
The logical replication apply worker did not fire per-column update triggers because the updatedCols bitmap in the RTE was not populated. This fixes that. Reviewed-by: Euler Taveira <euler@timbira.com.br> Discussion: https://www.postgresql.org/message-id/flat/21673e2d-597c-6afe-637e-e8b10425b240%402ndquadrant.com
1 parent 91595d1 commit 66fd0ad

File tree

2 files changed

+48
-4
lines changed

2 files changed

+48
-4
lines changed

src/backend/replication/logical/worker.c

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
#include "pgstat.h"
2828
#include "funcapi.h"
2929

30+
#include "access/sysattr.h"
3031
#include "access/xact.h"
3132
#include "access/xlog_internal.h"
3233

@@ -703,6 +704,8 @@ apply_handle_update(StringInfo s)
703704
bool has_oldtup;
704705
TupleTableSlot *localslot;
705706
TupleTableSlot *remoteslot;
707+
RangeTblEntry *target_rte;
708+
int i;
706709
bool found;
707710
MemoryContext oldctx;
708711

@@ -732,6 +735,21 @@ apply_handle_update(StringInfo s)
732735
ExecSetSlotDescriptor(localslot, RelationGetDescr(rel->localrel));
733736
EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
734737

738+
/*
739+
* Populate updatedCols so that per-column triggers can fire. This could
740+
* include more columns than were actually changed on the publisher
741+
* because the logical replication protocol doesn't contain that
742+
* information. But it would for example exclude columns that only exist
743+
* on the subscriber, since we are not touching those.
744+
*/
745+
target_rte = list_nth(estate->es_range_table, 0);
746+
for (i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++)
747+
{
748+
if (newtup.changed[i])
749+
target_rte->updatedCols = bms_add_member(target_rte->updatedCols,
750+
i + 1 - FirstLowInvalidHeapAttributeNumber);
751+
}
752+
735753
PushActiveSnapshot(GetTransactionSnapshot());
736754
ExecOpenIndices(estate->es_result_relation_info, false);
737755

src/test/subscription/t/003_constraints.pl

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
use warnings;
44
use PostgresNode;
55
use TestLib;
6-
use Test::More tests => 4;
6+
use Test::More tests => 6;
77

88
# Initialize publisher node
99
my $node_publisher = get_new_node('publisher');
@@ -88,14 +88,16 @@ BEGIN
8888
ELSE
8989
RETURN NULL;
9090
END IF;
91+
ELSIF (TG_OP = 'UPDATE') THEN
92+
RETURN NULL;
9193
ELSE
9294
RAISE WARNING 'Unknown action';
9395
RETURN NULL;
9496
END IF;
9597
END;
9698
\$\$ LANGUAGE plpgsql;
9799
CREATE TRIGGER filter_basic_dml_trg
98-
BEFORE INSERT ON tab_fk_ref
100+
BEFORE INSERT OR UPDATE OF bid ON tab_fk_ref
99101
FOR EACH ROW EXECUTE PROCEDURE filter_basic_dml_fn();
100102
ALTER TABLE tab_fk_ref ENABLE REPLICA TRIGGER filter_basic_dml_trg;
101103
});
@@ -107,10 +109,34 @@ BEGIN
107109
$node_publisher->poll_query_until('postgres', $caughtup_query)
108110
or die "Timed out while waiting for subscriber to catch up";
109111

110-
# The row should be skipped on subscriber
112+
# The trigger should cause the insert to be skipped on subscriber
113+
$result = $node_subscriber->safe_psql('postgres',
114+
"SELECT count(*), min(bid), max(bid) FROM tab_fk_ref;");
115+
is($result, qq(2|1|2), 'check replica insert trigger applied on subscriber');
116+
117+
# Update data
118+
$node_publisher->safe_psql('postgres',
119+
"UPDATE tab_fk_ref SET bid = 2 WHERE bid = 1;");
120+
121+
$node_publisher->poll_query_until('postgres', $caughtup_query)
122+
or die "Timed out while waiting for subscriber to catch up";
123+
124+
# The trigger should cause the update to be skipped on subscriber
111125
$result = $node_subscriber->safe_psql('postgres',
112126
"SELECT count(*), min(bid), max(bid) FROM tab_fk_ref;");
113-
is($result, qq(2|1|2), 'check replica trigger applied on subscriber');
127+
is($result, qq(2|1|2), 'check replica update column trigger applied on subscriber');
128+
129+
# Update on a column not specified in the trigger, but it will trigger
130+
# anyway because logical replication ships all columns in an update.
131+
$node_publisher->safe_psql('postgres',
132+
"UPDATE tab_fk_ref SET id = 6 WHERE id = 1;");
133+
134+
$node_publisher->poll_query_until('postgres', $caughtup_query)
135+
or die "Timed out while waiting for subscriber to catch up";
136+
137+
$result = $node_subscriber->safe_psql('postgres',
138+
"SELECT count(*), min(id), max(id) FROM tab_fk_ref;");
139+
is($result, qq(2|1|2), 'check column trigger applied on even for other column');
114140

115141
$node_subscriber->stop('fast');
116142
$node_publisher->stop('fast');

0 commit comments

Comments
 (0)