Skip to content

Commit 288462c

Browse files
committed
Reset memory context after each iteration of pglogical apply
1 parent cc596b6 commit 288462c

File tree

4 files changed

+52
-48
lines changed

4 files changed

+52
-48
lines changed

contrib/mmts/arbiter.c

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -724,9 +724,9 @@ static void MtmSender(Datum arg)
724724
MTM_ELOG(LOG, "Start arbiter sender %d", MyProcPid);
725725
InitializeTimeouts();
726726

727-
signal(SIGINT, SetStop);
728-
signal(SIGQUIT, SetStop);
729-
signal(SIGTERM, SetStop);
727+
pqsignal(SIGINT, SetStop);
728+
pqsignal(SIGQUIT, SetStop);
729+
pqsignal(SIGTERM, SetStop);
730730

731731
/* We're now ready to receive signals */
732732
BackgroundWorkerUnblockSignals();
@@ -803,9 +803,9 @@ static bool MtmRecovery()
803803

804804
static void MtmMonitor(Datum arg)
805805
{
806-
signal(SIGINT, SetStop);
807-
signal(SIGQUIT, SetStop);
808-
signal(SIGTERM, SetStop);
806+
pqsignal(SIGINT, SetStop);
807+
pqsignal(SIGQUIT, SetStop);
808+
pqsignal(SIGTERM, SetStop);
809809

810810
/* We're now ready to receive signals */
811811
BackgroundWorkerUnblockSignals();
@@ -840,9 +840,9 @@ static void MtmReceiver(Datum arg)
840840
max_fd = 0;
841841
#endif
842842

843-
signal(SIGINT, SetStop);
844-
signal(SIGQUIT, SetStop);
845-
signal(SIGTERM, SetStop);
843+
pqsignal(SIGINT, SetStop);
844+
pqsignal(SIGQUIT, SetStop);
845+
pqsignal(SIGTERM, SetStop);
846846

847847
/* We're now ready to receive signals */
848848
BackgroundWorkerUnblockSignals();

contrib/mmts/bgwpool.c

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,9 @@ static void BgwPoolMainLoop(BgwPool* pool)
3535
MtmIsLogicalReceiver = true;
3636
MtmPool = pool;
3737

38-
signal(SIGINT, BgwShutdownWorker);
39-
signal(SIGQUIT, BgwShutdownWorker);
40-
signal(SIGTERM, BgwShutdownWorker);
38+
pqsignal(SIGINT, BgwShutdownWorker);
39+
pqsignal(SIGQUIT, BgwShutdownWorker);
40+
pqsignal(SIGTERM, BgwShutdownWorker);
4141

4242
BackgroundWorkerUnblockSignals();
4343
BackgroundWorkerInitializeConnection(pool->dbname, pool->dbuser);

contrib/mmts/multimaster.c

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -526,8 +526,8 @@ bool MtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
526526
if (ts != NULL /*&& ts->status != TRANSACTION_STATUS_IN_PROGRESS*/)
527527
{
528528
if (ts->csn > MtmTx.snapshot) {
529-
MTM_LOG4("%d: tuple with xid=%d(csn=%lld) is invisible in snapshot %lld",
530-
MyProcPid, xid, ts->csn, MtmTx.snapshot);
529+
MTM_LOG4("%d: tuple with xid=%lld(csn=%lld) is invisible in snapshot %lld",
530+
MyProcPid, (long64)xid, ts->csn, MtmTx.snapshot);
531531
if (MtmGetSystemTime() - start > USECS_PER_SEC) {
532532
MTM_ELOG(WARNING, "Backend %d waits for transaction %s (%llu) status %lld usecs", MyProcPid, ts->gid, (long64)xid, MtmGetSystemTime() - start);
533533
}
@@ -567,8 +567,8 @@ bool MtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
567567
else
568568
{
569569
bool invisible = ts->status != TRANSACTION_STATUS_COMMITTED;
570-
MTM_LOG4("%d: tuple with xid=%d(csn= %lld) is %s in snapshot %lld",
571-
MyProcPid, xid, ts->csn, invisible ? "rollbacked" : "committed", MtmTx.snapshot);
570+
MTM_LOG4("%d: tuple with xid=%lld(csn= %lld) is %s in snapshot %lld",
571+
MyProcPid, (long64)xid, ts->csn, invisible ? "rollbacked" : "committed", MtmTx.snapshot);
572572
MtmUnlock();
573573
if (MtmGetSystemTime() - start > USECS_PER_SEC) {
574574
MTM_ELOG(WARNING, "Backend %d waits for %s transaction %s (%llu) %lld usecs", MyProcPid, invisible ? "rollbacked" : "committed",
@@ -579,7 +579,7 @@ bool MtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
579579
}
580580
else
581581
{
582-
MTM_LOG4("%d: visibility check is skipped for transaction %u in snapshot %llu", MyProcPid, xid, MtmTx.snapshot);
582+
MTM_LOG4("%d: visibility check is skipped for transaction %llu in snapshot %llu", MyProcPid, (long64)xid, MtmTx.snapshot);
583583
MtmUnlock();
584584
return PgXidInMVCCSnapshot(xid, snapshot);
585585
}
@@ -4894,6 +4894,7 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
48944894
break;
48954895

48964896
case T_DropStmt:
4897+
case T_TruncateStmt:
48974898
{
48984899
DropStmt *stmt = (DropStmt *) parsetree;
48994900
if (stmt->removeType == OBJECT_INDEX && stmt->concurrent)

contrib/mmts/pglogical_apply.c

Lines changed: 34 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -405,16 +405,13 @@ process_remote_message(StringInfo s)
405405
if (MtmVacuumStmt != NULL) {
406406
ExecVacuum(MtmVacuumStmt, 1);
407407
} else if (MtmIndexStmt != NULL) {
408-
MemoryContext oldContext = MemoryContextSwitchTo(MtmApplyContext);
409408
Oid relid = RangeVarGetRelidExtended(MtmIndexStmt->relation, ShareUpdateExclusiveLock,
410409
false, false,
411410
NULL,
412411
NULL);
413412
/* Run parse analysis ... */
414413
MtmIndexStmt = transformIndexStmt(relid, MtmIndexStmt, messageBody);
415414

416-
MemoryContextSwitchTo(oldContext);
417-
418415
DefineIndex(relid, /* OID of heap relation */
419416
MtmIndexStmt,
420417
InvalidOid, /* no predefined OID */
@@ -599,6 +596,7 @@ read_rel(StringInfo s, LOCKMODE mode)
599596
RangeVar* rv;
600597
Oid remote_relid = pq_getmsgint(s, 4);
601598
Oid local_relid;
599+
MemoryContext old_context;
602600

603601
local_relid = pglogical_relid_map_get(remote_relid);
604602
if (local_relid == InvalidOid) {
@@ -611,7 +609,9 @@ read_rel(StringInfo s, LOCKMODE mode)
611609
rv->relname = (char *) pq_getmsgbytes(s, relnamelen);
612610

613611
local_relid = RangeVarGetRelidExtended(rv, mode, false, false, NULL, NULL);
612+
old_context = MemoryContextSwitchTo(TopMemoryContext);
614613
pglogical_relid_map_put(remote_relid, local_relid);
614+
MemoryContextSwitchTo(old_context);
615615
} else {
616616
nspnamelen = pq_getmsgbyte(s);
617617
s->cursor += nspnamelen;
@@ -1041,7 +1041,8 @@ void MtmExecutor(void* work, size_t size)
10411041
int spill_file = -1;
10421042
int save_cursor = 0;
10431043
int save_len = 0;
1044-
MemoryContext topContext;
1044+
MemoryContext old_context;
1045+
MemoryContext top_context;
10451046

10461047
s.data = work;
10471048
s.len = size;
@@ -1055,13 +1056,15 @@ void MtmExecutor(void* work, size_t size)
10551056
ALLOCSET_DEFAULT_INITSIZE,
10561057
ALLOCSET_DEFAULT_MAXSIZE);
10571058
}
1058-
topContext = MemoryContextSwitchTo(MtmApplyContext);
1059-
1059+
top_context = MemoryContextSwitchTo(MtmApplyContext);
10601060
replorigin_session_origin = InvalidRepOriginId;
10611061
PG_TRY();
10621062
{
1063-
while (true) {
1063+
bool inside_transaction = true;
1064+
do {
10641065
char action = pq_getmsgbyte(&s);
1066+
old_context = MemoryContextSwitchTo(MtmApplyContext);
1067+
10651068
MTM_LOG2("%d: REMOTE process action %c", MyProcPid, action);
10661069
#if 0
10671070
if (Mtm->status == MTM_RECOVERY) {
@@ -1072,84 +1075,81 @@ void MtmExecutor(void* work, size_t size)
10721075
switch (action) {
10731076
/* BEGIN */
10741077
case 'B':
1075-
if (process_remote_begin(&s)) {
1076-
continue;
1077-
} else {
1078-
break;
1079-
}
1078+
inside_transaction = process_remote_begin(&s);
1079+
break;
10801080
/* COMMIT */
10811081
case 'C':
10821082
close_rel(rel);
10831083
process_remote_commit(&s);
1084+
inside_transaction = false;
10841085
break;
10851086
/* INSERT */
10861087
case 'I':
1087-
process_remote_insert(&s, rel);
1088-
continue;
1088+
process_remote_insert(&s, rel);
1089+
break;
10891090
/* UPDATE */
10901091
case 'U':
10911092
process_remote_update(&s, rel);
1092-
continue;
1093+
break;
10931094
/* DELETE */
10941095
case 'D':
10951096
process_remote_delete(&s, rel);
1096-
continue;
1097+
break;
10971098
case 'R':
10981099
close_rel(rel);
10991100
rel = read_rel(&s, RowExclusiveLock);
1100-
continue;
1101+
break;
11011102
case 'F':
11021103
{
11031104
int node_id = pq_getmsgint(&s, 4);
11041105
int file_id = pq_getmsgint(&s, 4);
11051106
Assert(spill_file < 0);
11061107
spill_file = MtmOpenSpillFile(node_id, file_id);
1107-
continue;
1108+
break;
11081109
}
11091110
case '(':
11101111
{
11111112
size_t size = pq_getmsgint(&s, 4);
1112-
s.data = palloc(size);
1113+
s.data = MemoryContextAlloc(TopMemoryContext, size);
11131114
save_cursor = s.cursor;
11141115
save_len = s.len;
11151116
s.cursor = 0;
11161117
s.len = size;
11171118
MtmReadSpillFile(spill_file, s.data, size);
1118-
continue;
1119+
break;
11191120
}
11201121
case ')':
11211122
{
11221123
pfree(s.data);
11231124
s.data = work;
11241125
s.cursor = save_cursor;
11251126
s.len = save_len;
1126-
continue;
1127+
break;
11271128
}
11281129
case 'M':
11291130
{
1130-
if (process_remote_message(&s)) {
1131-
break;
1132-
}
1133-
continue;
1131+
inside_transaction = !process_remote_message(&s);
1132+
break;
11341133
}
11351134
case 'Z':
11361135
{
11371136
MtmRecoveryCompleted();
1137+
inside_transaction = false;
11381138
break;
11391139
}
11401140
default:
11411141
MTM_ELOG(ERROR, "unknown action of type %c", action);
11421142
}
1143-
break;
1144-
}
1143+
MemoryContextSwitchTo(old_context);
1144+
MemoryContextResetAndDeleteChildren(MtmApplyContext);
1145+
} while (inside_transaction);
11451146
}
11461147
PG_CATCH();
11471148
{
1148-
MemoryContext oldcontext;
11491149
MtmReleaseLock();
1150-
oldcontext = MemoryContextSwitchTo(MtmApplyContext);
1150+
old_context = MemoryContextSwitchTo(MtmApplyContext);
11511151
MtmHandleApplyError();
1152-
MemoryContextSwitchTo(oldcontext);
1152+
MemoryContextSwitchTo(old_context);
11531153
EmitErrorReport();
11541154
FlushErrorState();
11551155
MTM_LOG1("%d: REMOTE begin abort transaction %llu", MyProcPid, (long64)MtmGetCurrentTransactionId());
@@ -1159,12 +1159,15 @@ void MtmExecutor(void* work, size_t size)
11591159
MTM_LOG2("%d: REMOTE end abort transaction %llu", MyProcPid, (long64)MtmGetCurrentTransactionId());
11601160
}
11611161
PG_END_TRY();
1162+
if (s.data != work) {
1163+
pfree(s.data);
1164+
}
11621165
#if 0 /* spill file is expecrted to be closed by tranaction commit or rollback */
11631166
if (spill_file >= 0) {
11641167
MtmCloseSpillFile(spill_file);
11651168
}
11661169
#endif
1170+
MemoryContextSwitchTo(top_context);
11671171
MemoryContextResetAndDeleteChildren(MtmApplyContext);
1168-
MemoryContextSwitchTo(topContext);
11691172
}
11701173

0 commit comments

Comments
 (0)