Skip to content

Commit 7f481b8

Browse files
author
Amit Kapila
committed
Improve invalidation handling in pgoutput.c.
Fix the following issues in pgoutput.c: * rel_sync_cache_relation_cb does the wrong thing when called for a cache flush (i.e., relid == 0). Instead of invalidating all RelationSyncCache entries as it should, it does nothing. * When rel_sync_cache_relation_cb does invalidate an entry, it immediately zaps the entry->map structure, even though that might still be in use. We instead just mark the entry as invalid and rebuild it at a later safe point. * Similarly, rel_sync_cache_publication_cb is way too eager to reset the pubactions flags, which would likely lead to failing to transmit changes that we should transmit. In this case also, we just mark the entry as invalid and rebuild it at a later safe point. Author: Tom Lane Reviewed-by: Amit Kapila Discussion: https://postgr.es/m/885288.1641420714@sss.pgh.pa.us
1 parent 00c360a commit 7f481b8

File tree

1 file changed

+68
-47
lines changed

1 file changed

+68
-47
lines changed

src/backend/replication/pgoutput/pgoutput.c

Lines changed: 68 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -108,11 +108,13 @@ typedef struct RelationSyncEntry
108108
{
109109
Oid relid; /* relation oid */
110110

111+
bool replicate_valid; /* overall validity flag for entry */
112+
111113
bool schema_sent;
112114
List *streamed_txns; /* streamed toplevel transactions with this
113115
* schema */
114116

115-
bool replicate_valid;
117+
/* are we publishing this rel? */
116118
PublicationActions pubactions;
117119

118120
/*
@@ -903,7 +905,9 @@ LoadPublications(List *pubnames)
903905
}
904906

905907
/*
906-
* Publication cache invalidation callback.
908+
* Publication syscache invalidation callback.
909+
*
910+
* Called for invalidations on pg_publication.
907911
*/
908912
static void
909913
publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
@@ -1130,13 +1134,12 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
11301134
HASH_ENTER, &found);
11311135
Assert(entry != NULL);
11321136

1133-
/* Not found means schema wasn't sent */
1137+
/* initialize entry, if it's new */
11341138
if (!found)
11351139
{
1136-
/* immediately make a new entry valid enough to satisfy callbacks */
1140+
entry->replicate_valid = false;
11371141
entry->schema_sent = false;
11381142
entry->streamed_txns = NIL;
1139-
entry->replicate_valid = false;
11401143
entry->pubactions.pubinsert = entry->pubactions.pubupdate =
11411144
entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
11421145
entry->publish_as_relid = InvalidOid;
@@ -1166,13 +1169,40 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
11661169
{
11671170
oldctx = MemoryContextSwitchTo(CacheMemoryContext);
11681171
if (data->publications)
1172+
{
11691173
list_free_deep(data->publications);
1170-
1174+
data->publications = NIL;
1175+
}
11711176
data->publications = LoadPublications(data->publication_names);
11721177
MemoryContextSwitchTo(oldctx);
11731178
publications_valid = true;
11741179
}
11751180

1181+
/*
1182+
* Reset schema_sent status as the relation definition may have
1183+
* changed. Also reset pubactions to empty in case rel was dropped
1184+
* from a publication. Also free any objects that depended on the
1185+
* earlier definition.
1186+
*/
1187+
entry->schema_sent = false;
1188+
list_free(entry->streamed_txns);
1189+
entry->streamed_txns = NIL;
1190+
entry->pubactions.pubinsert = false;
1191+
entry->pubactions.pubupdate = false;
1192+
entry->pubactions.pubdelete = false;
1193+
entry->pubactions.pubtruncate = false;
1194+
if (entry->map)
1195+
{
1196+
/*
1197+
* Must free the TupleDescs contained in the map explicitly,
1198+
* because free_conversion_map() doesn't.
1199+
*/
1200+
FreeTupleDesc(entry->map->indesc);
1201+
FreeTupleDesc(entry->map->outdesc);
1202+
free_conversion_map(entry->map);
1203+
}
1204+
entry->map = NULL;
1205+
11761206
/*
11771207
* Build publication cache. We can't use one provided by relcache as
11781208
* relcache considers all publications given relation is in, but here
@@ -1212,16 +1242,18 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
12121242
foreach(lc2, ancestors)
12131243
{
12141244
Oid ancestor = lfirst_oid(lc2);
1245+
List *apubids = GetRelationPublications(ancestor);
1246+
List *aschemaPubids = GetSchemaPublications(get_rel_namespace(ancestor));
12151247

1216-
if (list_member_oid(GetRelationPublications(ancestor),
1217-
pub->oid) ||
1218-
list_member_oid(GetSchemaPublications(get_rel_namespace(ancestor)),
1219-
pub->oid))
1248+
if (list_member_oid(apubids, pub->oid) ||
1249+
list_member_oid(aschemaPubids, pub->oid))
12201250
{
12211251
ancestor_published = true;
12221252
if (pub->pubviaroot)
12231253
publish_as_relid = ancestor;
12241254
}
1255+
list_free(apubids);
1256+
list_free(aschemaPubids);
12251257
}
12261258
}
12271259

@@ -1251,6 +1283,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
12511283
}
12521284

12531285
list_free(pubids);
1286+
list_free(schemaPubids);
12541287

12551288
entry->publish_as_relid = publish_as_relid;
12561289
entry->replicate_valid = true;
@@ -1322,43 +1355,40 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid)
13221355
/*
13231356
* Nobody keeps pointers to entries in this hash table around outside
13241357
* logical decoding callback calls - but invalidation events can come in
1325-
* *during* a callback if we access the relcache in the callback. Because
1326-
* of that we must mark the cache entry as invalid but not remove it from
1327-
* the hash while it could still be referenced, then prune it at a later
1328-
* safe point.
1329-
*
1330-
* Getting invalidations for relations that aren't in the table is
1331-
* entirely normal, since there's no way to unregister for an invalidation
1332-
* event. So we don't care if it's found or not.
1358+
* *during* a callback if we do any syscache access in the callback.
1359+
* Because of that we must mark the cache entry as invalid but not damage
1360+
* any of its substructure here. The next get_rel_sync_entry() call will
1361+
* rebuild it all.
13331362
*/
1334-
entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
1335-
HASH_FIND, NULL);
1336-
1337-
/*
1338-
* Reset schema sent status as the relation definition may have changed.
1339-
* Also free any objects that depended on the earlier definition.
1340-
*/
1341-
if (entry != NULL)
1363+
if (OidIsValid(relid))
13421364
{
1343-
entry->schema_sent = false;
1344-
list_free(entry->streamed_txns);
1345-
entry->streamed_txns = NIL;
1346-
if (entry->map)
1365+
/*
1366+
* Getting invalidations for relations that aren't in the table is
1367+
* entirely normal. So we don't care if it's found or not.
1368+
*/
1369+
entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
1370+
HASH_FIND, NULL);
1371+
if (entry != NULL)
1372+
entry->replicate_valid = false;
1373+
}
1374+
else
1375+
{
1376+
/* Whole cache must be flushed. */
1377+
HASH_SEQ_STATUS status;
1378+
1379+
hash_seq_init(&status, RelationSyncCache);
1380+
while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
13471381
{
1348-
/*
1349-
* Must free the TupleDescs contained in the map explicitly,
1350-
* because free_conversion_map() doesn't.
1351-
*/
1352-
FreeTupleDesc(entry->map->indesc);
1353-
FreeTupleDesc(entry->map->outdesc);
1354-
free_conversion_map(entry->map);
1382+
entry->replicate_valid = false;
13551383
}
1356-
entry->map = NULL;
13571384
}
13581385
}
13591386

13601387
/*
13611388
* Publication relation/schema map syscache invalidation callback
1389+
*
1390+
* Called for invalidations on pg_publication, pg_publication_rel, and
1391+
* pg_publication_namespace.
13621392
*/
13631393
static void
13641394
rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
@@ -1382,15 +1412,6 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
13821412
while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
13831413
{
13841414
entry->replicate_valid = false;
1385-
1386-
/*
1387-
* There might be some relations dropped from the publication so we
1388-
* don't need to publish the changes for them.
1389-
*/
1390-
entry->pubactions.pubinsert = false;
1391-
entry->pubactions.pubupdate = false;
1392-
entry->pubactions.pubdelete = false;
1393-
entry->pubactions.pubtruncate = false;
13941415
}
13951416
}
13961417

0 commit comments

Comments
 (0)