@@ -101,8 +101,8 @@ typedef struct SlotErrCallbackArg
101
101
int attnum ;
102
102
} SlotErrCallbackArg ;
103
103
104
- static MemoryContext ApplyContext = NULL ;
105
- MemoryContext ApplyCacheContext = NULL ;
104
+ static MemoryContext ApplyMessageContext = NULL ;
105
+ MemoryContext ApplyContext = NULL ;
106
106
107
107
WalReceiverConn * wrconn = NULL ;
108
108
@@ -145,15 +145,16 @@ should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
145
145
/*
146
146
* Make sure that we started local transaction.
147
147
*
148
- * Also switches to ApplyContext as necessary.
148
+ * Also switches to ApplyMessageContext as necessary.
149
149
*/
150
150
static bool
151
151
ensure_transaction (void )
152
152
{
153
153
if (IsTransactionState ())
154
154
{
155
- if (CurrentMemoryContext != ApplyContext )
156
- MemoryContextSwitchTo (ApplyContext );
155
+ if (CurrentMemoryContext != ApplyMessageContext )
156
+ MemoryContextSwitchTo (ApplyMessageContext );
157
+
157
158
return false;
158
159
}
159
160
@@ -162,7 +163,7 @@ ensure_transaction(void)
162
163
if (!MySubscriptionValid )
163
164
reread_subscription ();
164
165
165
- MemoryContextSwitchTo (ApplyContext );
166
+ MemoryContextSwitchTo (ApplyMessageContext );
166
167
return true;
167
168
}
168
169
@@ -961,15 +962,15 @@ store_flush_position(XLogRecPtr remote_lsn)
961
962
FlushPosition * flushpos ;
962
963
963
964
/* Need to do this in permanent context */
964
- MemoryContextSwitchTo (ApplyCacheContext );
965
+ MemoryContextSwitchTo (ApplyContext );
965
966
966
967
/* Track commit lsn */
967
968
flushpos = (FlushPosition * ) palloc (sizeof (FlushPosition ));
968
969
flushpos -> local_end = XactLastCommitEnd ;
969
970
flushpos -> remote_end = remote_lsn ;
970
971
971
972
dlist_push_tail (& lsn_mapping , & flushpos -> node );
972
- MemoryContextSwitchTo (ApplyContext );
973
+ MemoryContextSwitchTo (ApplyMessageContext );
973
974
}
974
975
975
976
@@ -993,12 +994,13 @@ UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
993
994
static void
994
995
LogicalRepApplyLoop (XLogRecPtr last_received )
995
996
{
996
- /* Init the ApplyContext which we use for easier cleanup. */
997
- ApplyContext = AllocSetContextCreate (TopMemoryContext ,
998
- "ApplyContext" ,
999
- ALLOCSET_DEFAULT_MINSIZE ,
1000
- ALLOCSET_DEFAULT_INITSIZE ,
1001
- ALLOCSET_DEFAULT_MAXSIZE );
997
+ /*
998
+ * Init the ApplyMessageContext which we clean up after each
999
+ * replication protocol message.
1000
+ */
1001
+ ApplyMessageContext = AllocSetContextCreate (ApplyContext ,
1002
+ "ApplyMessageContext" ,
1003
+ ALLOCSET_DEFAULT_SIZES );
1002
1004
1003
1005
/* mark as idle, before starting to loop */
1004
1006
pgstat_report_activity (STATE_IDLE , NULL );
@@ -1013,7 +1015,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
1013
1015
TimestampTz last_recv_timestamp = GetCurrentTimestamp ();
1014
1016
bool ping_sent = false;
1015
1017
1016
- MemoryContextSwitchTo (ApplyContext );
1018
+ MemoryContextSwitchTo (ApplyMessageContext );
1017
1019
1018
1020
len = walrcv_receive (wrconn , & buf , & fd );
1019
1021
@@ -1045,7 +1047,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
1045
1047
ping_sent = false;
1046
1048
1047
1049
/* Ensure we are reading the data into our memory context. */
1048
- MemoryContextSwitchTo (ApplyContext );
1050
+ MemoryContextSwitchTo (ApplyMessageContext );
1049
1051
1050
1052
s .data = buf ;
1051
1053
s .len = len ;
@@ -1091,6 +1093,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
1091
1093
UpdateWorkerStats (last_received , timestamp , true);
1092
1094
}
1093
1095
/* other message types are purposefully ignored */
1096
+
1097
+ MemoryContextReset (ApplyMessageContext );
1094
1098
}
1095
1099
1096
1100
len = walrcv_receive (wrconn , & buf , & fd );
@@ -1115,7 +1119,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
1115
1119
}
1116
1120
1117
1121
/* Cleanup the memory. */
1118
- MemoryContextResetAndDeleteChildren (ApplyContext );
1122
+ MemoryContextResetAndDeleteChildren (ApplyMessageContext );
1119
1123
MemoryContextSwitchTo (TopMemoryContext );
1120
1124
1121
1125
/* Check if we need to exit the streaming loop. */
@@ -1258,7 +1262,7 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
1258
1262
1259
1263
if (!reply_message )
1260
1264
{
1261
- MemoryContext oldctx = MemoryContextSwitchTo (ApplyCacheContext );
1265
+ MemoryContext oldctx = MemoryContextSwitchTo (ApplyContext );
1262
1266
reply_message = makeStringInfo ();
1263
1267
MemoryContextSwitchTo (oldctx );
1264
1268
}
@@ -1308,7 +1312,7 @@ reread_subscription(void)
1308
1312
}
1309
1313
1310
1314
/* Ensure allocations in permanent context. */
1311
- oldctx = MemoryContextSwitchTo (ApplyCacheContext );
1315
+ oldctx = MemoryContextSwitchTo (ApplyContext );
1312
1316
1313
1317
newsub = GetSubscription (MyLogicalRepWorker -> subid , true);
1314
1318
@@ -1483,12 +1487,11 @@ ApplyWorkerMain(Datum main_arg)
1483
1487
MyLogicalRepWorker -> userid );
1484
1488
1485
1489
/* Load the subscription into persistent memory context. */
1486
- CreateCacheMemoryContext ();
1487
- ApplyCacheContext = AllocSetContextCreate (CacheMemoryContext ,
1488
- "ApplyCacheContext" ,
1490
+ ApplyContext = AllocSetContextCreate (TopMemoryContext ,
1491
+ "ApplyContext" ,
1489
1492
ALLOCSET_DEFAULT_SIZES );
1490
1493
StartTransactionCommand ();
1491
- oldctx = MemoryContextSwitchTo (ApplyCacheContext );
1494
+ oldctx = MemoryContextSwitchTo (ApplyContext );
1492
1495
MySubscription = GetSubscription (MyLogicalRepWorker -> subid , false);
1493
1496
MySubscriptionValid = true;
1494
1497
MemoryContextSwitchTo (oldctx );
@@ -1533,7 +1536,7 @@ ApplyWorkerMain(Datum main_arg)
1533
1536
syncslotname = LogicalRepSyncTableStart (& origin_startpos );
1534
1537
1535
1538
/* The slot name needs to be allocated in permanent memory context. */
1536
- oldctx = MemoryContextSwitchTo (ApplyCacheContext );
1539
+ oldctx = MemoryContextSwitchTo (ApplyContext );
1537
1540
myslotname = pstrdup (syncslotname );
1538
1541
MemoryContextSwitchTo (oldctx );
1539
1542
0 commit comments