Skip to content

Commit 69bd606

Browse files
author
Amit Kapila
committed
Fix initialization of RelationSyncEntry for streaming transactions.
In commit 4648243, for each RelationSyncEntry we maintained the list of xids (streamed_txns) for which we have already sent the schema. This helps us to track when to send the schema to the downstream node for replication of streaming transactions. Before this list got initialized, we were processing invalidation messages which access this list and led to an assertion failure. In passing, clean up the nearby code: * Initialize the list of xids with NIL instead of NULL which is our usual coding practice. * Remove the MemoryContext switch for creating a RelationSyncEntry in dynahash. Diagnosed-by: Amit Kapila and Tom Lane Author: Amit Kapila Reviewed-by: Tom Lane and Dilip Kumar Discussion: https://postgr.es/m/904373.1600033123@sss.pgh.pa.us
1 parent 19c60ad commit 69bd606

File tree

1 file changed

+15
-14
lines changed

1 file changed

+15
-14
lines changed

src/backend/replication/pgoutput/pgoutput.c

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -945,16 +945,26 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
945945

946946
Assert(RelationSyncCache != NULL);
947947

948-
/* Find cached function info, creating if not found */
949-
oldctx = MemoryContextSwitchTo(CacheMemoryContext);
948+
/* Find cached relation info, creating if not found */
950949
entry = (RelationSyncEntry *) hash_search(RelationSyncCache,
951950
(void *) &relid,
952951
HASH_ENTER, &found);
953-
MemoryContextSwitchTo(oldctx);
954952
Assert(entry != NULL);
955953

956954
/* Not found means schema wasn't sent */
957-
if (!found || !entry->replicate_valid)
955+
if (!found)
956+
{
957+
/* immediately make a new entry valid enough to satisfy callbacks */
958+
entry->schema_sent = false;
959+
entry->streamed_txns = NIL;
960+
entry->replicate_valid = false;
961+
entry->pubactions.pubinsert = entry->pubactions.pubupdate =
962+
entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
963+
entry->publish_as_relid = InvalidOid;
964+
}
965+
966+
/* Validate the entry */
967+
if (!entry->replicate_valid)
958968
{
959969
List *pubids = GetRelationPublications(relid);
960970
ListCell *lc;
@@ -977,9 +987,6 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
977987
* relcache considers all publications given relation is in, but here
978988
* we only need to consider ones that the subscriber requested.
979989
*/
980-
entry->pubactions.pubinsert = entry->pubactions.pubupdate =
981-
entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
982-
983990
foreach(lc, data->publications)
984991
{
985992
Publication *pub = lfirst(lc);
@@ -1054,12 +1061,6 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
10541061
entry->replicate_valid = true;
10551062
}
10561063

1057-
if (!found)
1058-
{
1059-
entry->schema_sent = false;
1060-
entry->streamed_txns = NULL;
1061-
}
1062-
10631064
return entry;
10641065
}
10651066

@@ -1145,7 +1146,7 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid)
11451146
{
11461147
entry->schema_sent = false;
11471148
list_free(entry->streamed_txns);
1148-
entry->streamed_txns = NULL;
1149+
entry->streamed_txns = NIL;
11491150
}
11501151
}
11511152

0 commit comments

Comments
 (0)