@@ -74,7 +74,8 @@ static void send_relation_and_attrs(Relation relation, TransactionId xid,
74
74
/*
75
75
* Entry in the map used to remember which relation schemas we sent.
76
76
*
77
- * The schema_sent flag determines if the current schema record was already
77
+ * The schema_sent flag determines if the current schema record for the
78
+ * relation (and for its ancestor if publish_as_relid is set) was already
78
79
* sent to the subscriber (in which case we don't need to send it again).
79
80
*
80
81
* The schema cache on downstream is however updated only at commit time,
@@ -92,10 +93,6 @@ typedef struct RelationSyncEntry
92
93
{
93
94
Oid relid ; /* relation oid */
94
95
95
- /*
96
- * Did we send the schema? If ancestor relid is set, its schema must also
97
- * have been sent for this to be true.
98
- */
99
96
bool schema_sent ;
100
97
List * streamed_txns ; /* streamed toplevel transactions with this
101
98
* schema */
@@ -437,10 +434,17 @@ maybe_send_schema(LogicalDecodingContext *ctx,
437
434
else
438
435
schema_sent = relentry -> schema_sent ;
439
436
437
+ /* Nothing to do if we already sent the schema. */
440
438
if (schema_sent )
441
439
return ;
442
440
443
- /* If needed, send the ancestor's schema first. */
441
+ /*
442
+ * Nope, so send the schema. If the changes will be published using an
443
+ * ancestor's schema, not the relation's own, send that ancestor's schema
444
+ * before sending relation's own (XXX - maybe sending only the former
445
+ * suffices?). This is also a good place to set the map that will be used
446
+ * to convert the relation's tuples into the ancestor's format, if needed.
447
+ */
444
448
if (relentry -> publish_as_relid != RelationGetRelid (relation ))
445
449
{
446
450
Relation ancestor = RelationIdGetRelation (relentry -> publish_as_relid );
@@ -450,8 +454,21 @@ maybe_send_schema(LogicalDecodingContext *ctx,
450
454
451
455
/* Map must live as long as the session does. */
452
456
oldctx = MemoryContextSwitchTo (CacheMemoryContext );
453
- relentry -> map = convert_tuples_by_name (CreateTupleDescCopy (indesc ),
454
- CreateTupleDescCopy (outdesc ));
457
+
458
+ /*
459
+ * Make copies of the TupleDescs that will live as long as the map
460
+ * does before putting into the map.
461
+ */
462
+ indesc = CreateTupleDescCopy (indesc );
463
+ outdesc = CreateTupleDescCopy (outdesc );
464
+ relentry -> map = convert_tuples_by_name (indesc , outdesc );
465
+ if (relentry -> map == NULL )
466
+ {
467
+ /* Map not necessary, so free the TupleDescs too. */
468
+ FreeTupleDesc (indesc );
469
+ FreeTupleDesc (outdesc );
470
+ }
471
+
455
472
MemoryContextSwitchTo (oldctx );
456
473
send_relation_and_attrs (ancestor , xid , ctx );
457
474
RelationClose (ancestor );
@@ -1011,6 +1028,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
1011
1028
entry -> pubactions .pubinsert = entry -> pubactions .pubupdate =
1012
1029
entry -> pubactions .pubdelete = entry -> pubactions .pubtruncate = false;
1013
1030
entry -> publish_as_relid = InvalidOid ;
1031
+ entry -> map = NULL ; /* will be set by maybe_send_schema() if needed */
1014
1032
}
1015
1033
1016
1034
/* Validate the entry */
@@ -1191,12 +1209,24 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid)
1191
1209
1192
1210
/*
1193
1211
* Reset schema sent status as the relation definition may have changed.
1212
+ * Also free any objects that depended on the earlier definition.
1194
1213
*/
1195
1214
if (entry != NULL )
1196
1215
{
1197
1216
entry -> schema_sent = false;
1198
1217
list_free (entry -> streamed_txns );
1199
1218
entry -> streamed_txns = NIL ;
1219
+ if (entry -> map )
1220
+ {
1221
+ /*
1222
+ * Must free the TupleDescs contained in the map explicitly,
1223
+ * because free_conversion_map() doesn't.
1224
+ */
1225
+ FreeTupleDesc (entry -> map -> indesc );
1226
+ FreeTupleDesc (entry -> map -> outdesc );
1227
+ free_conversion_map (entry -> map );
1228
+ }
1229
+ entry -> map = NULL ;
1200
1230
}
1201
1231
}
1202
1232
0 commit comments