Skip to content

Commit cfd6cbc

Browse files
committed
Fix memory leak in pgoutput with publication list cache
The pgoutput module caches publication names in a list and frees it upon invalidation. However, the code forgot to free the actual publication names within the list elements, as publication names are pstrdup()'d in GetPublication(). This would cause memory to leak in CacheMemoryContext, bloating it over time as this context is not cleaned. This is a problem for WAL senders running for a long time, as an accumulation of invalidation requests would bloat its cache memory usage. A second case, where this leak is easier to see, involves a backend calling SQL functions like pg_logical_slot_{get,peek}_changes() which create a new decoding context with each execution. More publications create more bloat. To address this, this commit adds a new memory context within the logical decoding context and resets it each time the publication names cache is invalidated, based on a suggestion from Amit Kapila. This ensures that the lifespan of the publication names aligns with that of the logical decoding context. Contrary to the HEAD-only commit f0c569d that has changed PGOutputData to track this new child memory context, the context is tracked with a static variable whose state is reset with a MemoryContext reset callback attached to PGOutputData->context, so as ABI compatibility is preserved in stable branches. This approach is based on an suggestion from Amit Kapila. Analyzed-by: Michael Paquier, Jeff Davis Author: Masahiko Sawada Reviewed-by: Amit Kapila, Michael Paquier, Euler Taveira, Hou Zhijie Discussion: https://postgr.es/m/Z0khf9EVMVLOc_YY@paquier.xyz Backpatch-through: 13
1 parent 41eafbb commit cfd6cbc

File tree

1 file changed

+36
-5
lines changed

1 file changed

+36
-5
lines changed

src/backend/replication/pgoutput/pgoutput.c

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,13 @@ static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
6565
static bool publications_valid;
6666
static bool in_streaming;
6767

68+
/*
69+
* Private memory context for publication data, created in
70+
* PGOutputData->context when starting pgoutput, and set to NULL when its
71+
* parent context is reset via a dedicated MemoryContextCallback.
72+
*/
73+
static MemoryContext pubctx = NULL;
74+
6875
static List *LoadPublications(List *pubnames);
6976
static void publication_invalidation_cb(Datum arg, int cacheid,
7077
uint32 hashvalue);
@@ -252,6 +259,15 @@ parse_output_parameters(List *options, PGOutputData *data)
252259
}
253260
}
254261

262+
/*
263+
* Callback of PGOutputData->context in charge of cleaning pubctx.
264+
*/
265+
static void
266+
pgoutput_pubctx_reset_callback(void *arg)
267+
{
268+
pubctx = NULL;
269+
}
270+
255271
/*
256272
* Initialize this plugin
257273
*/
@@ -261,12 +277,22 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
261277
{
262278
PGOutputData *data = palloc0(sizeof(PGOutputData));
263279
static bool publication_callback_registered = false;
280+
MemoryContextCallback *mcallback;
264281

265282
/* Create our memory context for private allocations. */
266283
data->context = AllocSetContextCreate(ctx->context,
267284
"logical replication output context",
268285
ALLOCSET_DEFAULT_SIZES);
269286

287+
Assert(pubctx == NULL);
288+
pubctx = AllocSetContextCreate(ctx->context,
289+
"logical replication publication list context",
290+
ALLOCSET_SMALL_SIZES);
291+
292+
mcallback = palloc0(sizeof(MemoryContextCallback));
293+
mcallback->func = pgoutput_pubctx_reset_callback;
294+
MemoryContextRegisterResetCallback(ctx->context, mcallback);
295+
270296
ctx->output_plugin_private = data;
271297

272298
/* This plugin uses binary protocol. */
@@ -786,8 +812,9 @@ pgoutput_origin_filter(LogicalDecodingContext *ctx,
786812
/*
787813
* Shutdown the output plugin.
788814
*
789-
* Note, we don't need to clean the data->context as it's child context
790-
* of the ctx->context so it will be cleaned up by logical decoding machinery.
815+
* Note, we don't need to clean the data->context and pubctx as they are
816+
* child contexts of the ctx->context so they will be cleaned up by logical
817+
* decoding machinery.
791818
*/
792819
static void
793820
pgoutput_shutdown(LogicalDecodingContext *ctx)
@@ -797,6 +824,9 @@ pgoutput_shutdown(LogicalDecodingContext *ctx)
797824
hash_destroy(RelationSyncCache);
798825
RelationSyncCache = NULL;
799826
}
827+
828+
/* Better safe than sorry */
829+
pubctx = NULL;
800830
}
801831

802832
/*
@@ -1071,9 +1101,10 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
10711101
/* Reload publications if needed before use. */
10721102
if (!publications_valid)
10731103
{
1074-
oldctx = MemoryContextSwitchTo(CacheMemoryContext);
1075-
if (data->publications)
1076-
list_free_deep(data->publications);
1104+
Assert(pubctx);
1105+
1106+
MemoryContextReset(pubctx);
1107+
oldctx = MemoryContextSwitchTo(pubctx);
10771108

10781109
data->publications = LoadPublications(data->publication_names);
10791110
MemoryContextSwitchTo(oldctx);

0 commit comments

Comments
 (0)