@@ -108,11 +108,13 @@ typedef struct RelationSyncEntry
108
108
{
109
109
Oid relid ; /* relation oid */
110
110
111
+ bool replicate_valid ; /* overall validity flag for entry */
112
+
111
113
bool schema_sent ;
112
114
List * streamed_txns ; /* streamed toplevel transactions with this
113
115
* schema */
114
116
115
- bool replicate_valid ;
117
+ /* are we publishing this rel? */
116
118
PublicationActions pubactions ;
117
119
118
120
/*
@@ -903,7 +905,9 @@ LoadPublications(List *pubnames)
903
905
}
904
906
905
907
/*
906
- * Publication cache invalidation callback.
908
+ * Publication syscache invalidation callback.
909
+ *
910
+ * Called for invalidations on pg_publication.
907
911
*/
908
912
static void
909
913
publication_invalidation_cb (Datum arg , int cacheid , uint32 hashvalue )
@@ -1130,13 +1134,12 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
1130
1134
HASH_ENTER , & found );
1131
1135
Assert (entry != NULL );
1132
1136
1133
- /* Not found means schema wasn't sent */
1137
+ /* initialize entry, if it's new */
1134
1138
if (!found )
1135
1139
{
1136
- /* immediately make a new entry valid enough to satisfy callbacks */
1140
+ entry -> replicate_valid = false;
1137
1141
entry -> schema_sent = false;
1138
1142
entry -> streamed_txns = NIL ;
1139
- entry -> replicate_valid = false;
1140
1143
entry -> pubactions .pubinsert = entry -> pubactions .pubupdate =
1141
1144
entry -> pubactions .pubdelete = entry -> pubactions .pubtruncate = false;
1142
1145
entry -> publish_as_relid = InvalidOid ;
@@ -1166,13 +1169,40 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
1166
1169
{
1167
1170
oldctx = MemoryContextSwitchTo (CacheMemoryContext );
1168
1171
if (data -> publications )
1172
+ {
1169
1173
list_free_deep (data -> publications );
1170
-
1174
+ data -> publications = NIL ;
1175
+ }
1171
1176
data -> publications = LoadPublications (data -> publication_names );
1172
1177
MemoryContextSwitchTo (oldctx );
1173
1178
publications_valid = true;
1174
1179
}
1175
1180
1181
+ /*
1182
+ * Reset schema_sent status as the relation definition may have
1183
+ * changed. Also reset pubactions to empty in case rel was dropped
1184
+ * from a publication. Also free any objects that depended on the
1185
+ * earlier definition.
1186
+ */
1187
+ entry -> schema_sent = false;
1188
+ list_free (entry -> streamed_txns );
1189
+ entry -> streamed_txns = NIL ;
1190
+ entry -> pubactions .pubinsert = false;
1191
+ entry -> pubactions .pubupdate = false;
1192
+ entry -> pubactions .pubdelete = false;
1193
+ entry -> pubactions .pubtruncate = false;
1194
+ if (entry -> map )
1195
+ {
1196
+ /*
1197
+ * Must free the TupleDescs contained in the map explicitly,
1198
+ * because free_conversion_map() doesn't.
1199
+ */
1200
+ FreeTupleDesc (entry -> map -> indesc );
1201
+ FreeTupleDesc (entry -> map -> outdesc );
1202
+ free_conversion_map (entry -> map );
1203
+ }
1204
+ entry -> map = NULL ;
1205
+
1176
1206
/*
1177
1207
* Build publication cache. We can't use one provided by relcache as
1178
1208
* relcache considers all publications given relation is in, but here
@@ -1212,16 +1242,18 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
1212
1242
foreach (lc2 , ancestors )
1213
1243
{
1214
1244
Oid ancestor = lfirst_oid (lc2 );
1245
+ List * apubids = GetRelationPublications (ancestor );
1246
+ List * aschemaPubids = GetSchemaPublications (get_rel_namespace (ancestor ));
1215
1247
1216
- if (list_member_oid (GetRelationPublications (ancestor ),
1217
- pub -> oid ) ||
1218
- list_member_oid (GetSchemaPublications (get_rel_namespace (ancestor )),
1219
- pub -> oid ))
1248
+ if (list_member_oid (apubids , pub -> oid ) ||
1249
+ list_member_oid (aschemaPubids , pub -> oid ))
1220
1250
{
1221
1251
ancestor_published = true;
1222
1252
if (pub -> pubviaroot )
1223
1253
publish_as_relid = ancestor ;
1224
1254
}
1255
+ list_free (apubids );
1256
+ list_free (aschemaPubids );
1225
1257
}
1226
1258
}
1227
1259
@@ -1251,6 +1283,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
1251
1283
}
1252
1284
1253
1285
list_free (pubids );
1286
+ list_free (schemaPubids );
1254
1287
1255
1288
entry -> publish_as_relid = publish_as_relid ;
1256
1289
entry -> replicate_valid = true;
@@ -1322,43 +1355,40 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid)
1322
1355
/*
1323
1356
* Nobody keeps pointers to entries in this hash table around outside
1324
1357
* logical decoding callback calls - but invalidation events can come in
1325
- * *during* a callback if we access the relcache in the callback. Because
1326
- * of that we must mark the cache entry as invalid but not remove it from
1327
- * the hash while it could still be referenced, then prune it at a later
1328
- * safe point.
1329
- *
1330
- * Getting invalidations for relations that aren't in the table is
1331
- * entirely normal, since there's no way to unregister for an invalidation
1332
- * event. So we don't care if it's found or not.
1358
+ * *during* a callback if we do any syscache access in the callback.
1359
+ * Because of that we must mark the cache entry as invalid but not damage
1360
+ * any of its substructure here. The next get_rel_sync_entry() call will
1361
+ * rebuild it all.
1333
1362
*/
1334
- entry = (RelationSyncEntry * ) hash_search (RelationSyncCache , & relid ,
1335
- HASH_FIND , NULL );
1336
-
1337
- /*
1338
- * Reset schema sent status as the relation definition may have changed.
1339
- * Also free any objects that depended on the earlier definition.
1340
- */
1341
- if (entry != NULL )
1363
+ if (OidIsValid (relid ))
1342
1364
{
1343
- entry -> schema_sent = false;
1344
- list_free (entry -> streamed_txns );
1345
- entry -> streamed_txns = NIL ;
1346
- if (entry -> map )
1365
+ /*
1366
+ * Getting invalidations for relations that aren't in the table is
1367
+ * entirely normal. So we don't care if it's found or not.
1368
+ */
1369
+ entry = (RelationSyncEntry * ) hash_search (RelationSyncCache , & relid ,
1370
+ HASH_FIND , NULL );
1371
+ if (entry != NULL )
1372
+ entry -> replicate_valid = false;
1373
+ }
1374
+ else
1375
+ {
1376
+ /* Whole cache must be flushed. */
1377
+ HASH_SEQ_STATUS status ;
1378
+
1379
+ hash_seq_init (& status , RelationSyncCache );
1380
+ while ((entry = (RelationSyncEntry * ) hash_seq_search (& status )) != NULL )
1347
1381
{
1348
- /*
1349
- * Must free the TupleDescs contained in the map explicitly,
1350
- * because free_conversion_map() doesn't.
1351
- */
1352
- FreeTupleDesc (entry -> map -> indesc );
1353
- FreeTupleDesc (entry -> map -> outdesc );
1354
- free_conversion_map (entry -> map );
1382
+ entry -> replicate_valid = false;
1355
1383
}
1356
- entry -> map = NULL ;
1357
1384
}
1358
1385
}
1359
1386
1360
1387
/*
1361
1388
* Publication relation/schema map syscache invalidation callback
1389
+ *
1390
+ * Called for invalidations on pg_publication, pg_publication_rel, and
1391
+ * pg_publication_namespace.
1362
1392
*/
1363
1393
static void
1364
1394
rel_sync_cache_publication_cb (Datum arg , int cacheid , uint32 hashvalue )
@@ -1382,15 +1412,6 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
1382
1412
while ((entry = (RelationSyncEntry * ) hash_seq_search (& status )) != NULL )
1383
1413
{
1384
1414
entry -> replicate_valid = false;
1385
-
1386
- /*
1387
- * There might be some relations dropped from the publication so we
1388
- * don't need to publish the changes for them.
1389
- */
1390
- entry -> pubactions .pubinsert = false;
1391
- entry -> pubactions .pubupdate = false;
1392
- entry -> pubactions .pubdelete = false;
1393
- entry -> pubactions .pubtruncate = false;
1394
1415
}
1395
1416
}
1396
1417
0 commit comments