Skip to content

Commit 489b96e

Browse files
committed
Improve memory use in logical replication apply
Previously, the memory used by the logical replication apply worker for processing messages would never be freed, so that could end up using a lot of memory. To improve that, change the existing ApplyContext memory context to ApplyMessageContext and reset that after every message (similar to MessageContext used elsewhere). For consistency of naming, rename the ApplyCacheContext to ApplyContext. Author: Stas Kelvich <s.kelvich@postgrespro.ru>
1 parent e0bf160 commit 489b96e

File tree

3 files changed

+40
-26
lines changed

3 files changed

+40
-26
lines changed

src/backend/replication/logical/worker.c

Lines changed: 27 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -101,8 +101,8 @@ typedef struct SlotErrCallbackArg
101101
int attnum;
102102
} SlotErrCallbackArg;
103103

104-
static MemoryContext ApplyContext = NULL;
105-
MemoryContext ApplyCacheContext = NULL;
104+
static MemoryContext ApplyMessageContext = NULL;
105+
MemoryContext ApplyContext = NULL;
106106

107107
WalReceiverConn *wrconn = NULL;
108108

@@ -145,15 +145,16 @@ should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
145145
/*
146146
* Make sure that we started local transaction.
147147
*
148-
* Also switches to ApplyContext as necessary.
148+
* Also switches to ApplyMessageContext as necessary.
149149
*/
150150
static bool
151151
ensure_transaction(void)
152152
{
153153
if (IsTransactionState())
154154
{
155-
if (CurrentMemoryContext != ApplyContext)
156-
MemoryContextSwitchTo(ApplyContext);
155+
if (CurrentMemoryContext != ApplyMessageContext)
156+
MemoryContextSwitchTo(ApplyMessageContext);
157+
157158
return false;
158159
}
159160

@@ -162,7 +163,7 @@ ensure_transaction(void)
162163
if (!MySubscriptionValid)
163164
reread_subscription();
164165

165-
MemoryContextSwitchTo(ApplyContext);
166+
MemoryContextSwitchTo(ApplyMessageContext);
166167
return true;
167168
}
168169

@@ -961,15 +962,15 @@ store_flush_position(XLogRecPtr remote_lsn)
961962
FlushPosition *flushpos;
962963

963964
/* Need to do this in permanent context */
964-
MemoryContextSwitchTo(ApplyCacheContext);
965+
MemoryContextSwitchTo(ApplyContext);
965966

966967
/* Track commit lsn */
967968
flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
968969
flushpos->local_end = XactLastCommitEnd;
969970
flushpos->remote_end = remote_lsn;
970971

971972
dlist_push_tail(&lsn_mapping, &flushpos->node);
972-
MemoryContextSwitchTo(ApplyContext);
973+
MemoryContextSwitchTo(ApplyMessageContext);
973974
}
974975

975976

@@ -993,12 +994,13 @@ UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
993994
static void
994995
LogicalRepApplyLoop(XLogRecPtr last_received)
995996
{
996-
/* Init the ApplyContext which we use for easier cleanup. */
997-
ApplyContext = AllocSetContextCreate(TopMemoryContext,
998-
"ApplyContext",
999-
ALLOCSET_DEFAULT_MINSIZE,
1000-
ALLOCSET_DEFAULT_INITSIZE,
1001-
ALLOCSET_DEFAULT_MAXSIZE);
997+
/*
998+
* Init the ApplyMessageContext which we clean up after each
999+
* replication protocol message.
1000+
*/
1001+
ApplyMessageContext = AllocSetContextCreate(ApplyContext,
1002+
"ApplyMessageContext",
1003+
ALLOCSET_DEFAULT_SIZES);
10021004

10031005
/* mark as idle, before starting to loop */
10041006
pgstat_report_activity(STATE_IDLE, NULL);
@@ -1013,7 +1015,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
10131015
TimestampTz last_recv_timestamp = GetCurrentTimestamp();
10141016
bool ping_sent = false;
10151017

1016-
MemoryContextSwitchTo(ApplyContext);
1018+
MemoryContextSwitchTo(ApplyMessageContext);
10171019

10181020
len = walrcv_receive(wrconn, &buf, &fd);
10191021

@@ -1045,7 +1047,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
10451047
ping_sent = false;
10461048

10471049
/* Ensure we are reading the data into our memory context. */
1048-
MemoryContextSwitchTo(ApplyContext);
1050+
MemoryContextSwitchTo(ApplyMessageContext);
10491051

10501052
s.data = buf;
10511053
s.len = len;
@@ -1091,6 +1093,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
10911093
UpdateWorkerStats(last_received, timestamp, true);
10921094
}
10931095
/* other message types are purposefully ignored */
1096+
1097+
MemoryContextReset(ApplyMessageContext);
10941098
}
10951099

10961100
len = walrcv_receive(wrconn, &buf, &fd);
@@ -1115,7 +1119,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
11151119
}
11161120

11171121
/* Cleanup the memory. */
1118-
MemoryContextResetAndDeleteChildren(ApplyContext);
1122+
MemoryContextResetAndDeleteChildren(ApplyMessageContext);
11191123
MemoryContextSwitchTo(TopMemoryContext);
11201124

11211125
/* Check if we need to exit the streaming loop. */
@@ -1258,7 +1262,7 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
12581262

12591263
if (!reply_message)
12601264
{
1261-
MemoryContext oldctx = MemoryContextSwitchTo(ApplyCacheContext);
1265+
MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
12621266
reply_message = makeStringInfo();
12631267
MemoryContextSwitchTo(oldctx);
12641268
}
@@ -1308,7 +1312,7 @@ reread_subscription(void)
13081312
}
13091313

13101314
/* Ensure allocations in permanent context. */
1311-
oldctx = MemoryContextSwitchTo(ApplyCacheContext);
1315+
oldctx = MemoryContextSwitchTo(ApplyContext);
13121316

13131317
newsub = GetSubscription(MyLogicalRepWorker->subid, true);
13141318

@@ -1483,12 +1487,11 @@ ApplyWorkerMain(Datum main_arg)
14831487
MyLogicalRepWorker->userid);
14841488

14851489
/* Load the subscription into persistent memory context. */
1486-
CreateCacheMemoryContext();
1487-
ApplyCacheContext = AllocSetContextCreate(CacheMemoryContext,
1488-
"ApplyCacheContext",
1490+
ApplyContext = AllocSetContextCreate(TopMemoryContext,
1491+
"ApplyContext",
14891492
ALLOCSET_DEFAULT_SIZES);
14901493
StartTransactionCommand();
1491-
oldctx = MemoryContextSwitchTo(ApplyCacheContext);
1494+
oldctx = MemoryContextSwitchTo(ApplyContext);
14921495
MySubscription = GetSubscription(MyLogicalRepWorker->subid, false);
14931496
MySubscriptionValid = true;
14941497
MemoryContextSwitchTo(oldctx);
@@ -1533,7 +1536,7 @@ ApplyWorkerMain(Datum main_arg)
15331536
syncslotname = LogicalRepSyncTableStart(&origin_startpos);
15341537

15351538
/* The slot name needs to be allocated in permanent memory context. */
1536-
oldctx = MemoryContextSwitchTo(ApplyCacheContext);
1539+
oldctx = MemoryContextSwitchTo(ApplyContext);
15371540
myslotname = pstrdup(syncslotname);
15381541
MemoryContextSwitchTo(oldctx);
15391542

src/backend/utils/mmgr/README

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,17 @@ from prepared statements simply reference the prepared statements' trees,
265265
and don't actually need any storage allocated in their private contexts.
266266

267267

268+
Logical Replication Worker Contexts
269+
-----------------------------------
270+
271+
ApplyContext --- permanent during whole lifetime of apply worker. It
272+
is possible to use TopMemoryContext here as well, but for simplicity
273+
of memory usage analysis we spin up different context.
274+
275+
ApplyMessageContext --- short-lived context that is reset after each
276+
logical replication protocol message is processed.
277+
278+
268279
Transient Contexts During Execution
269280
-----------------------------------
270281

src/include/replication/worker_internal.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,8 @@ typedef struct LogicalRepWorker
5656
TimestampTz reply_time;
5757
} LogicalRepWorker;
5858

59-
/* Memory context for cached variables in apply worker. */
60-
extern MemoryContext ApplyCacheContext;
59+
/* Main memory context for apply worker. Permanent during worker lifetime. */
60+
extern MemoryContext ApplyContext;
6161

6262
/* libpqreceiver connection */
6363
extern struct WalReceiverConn *wrconn;

0 commit comments

Comments
 (0)