@@ -405,16 +405,13 @@ process_remote_message(StringInfo s)
405
405
if (MtmVacuumStmt != NULL ) {
406
406
ExecVacuum (MtmVacuumStmt , 1 );
407
407
} else if (MtmIndexStmt != NULL ) {
408
- MemoryContext oldContext = MemoryContextSwitchTo (MtmApplyContext );
409
408
Oid relid = RangeVarGetRelidExtended (MtmIndexStmt -> relation , ShareUpdateExclusiveLock ,
410
409
false, false,
411
410
NULL ,
412
411
NULL );
413
412
/* Run parse analysis ... */
414
413
MtmIndexStmt = transformIndexStmt (relid , MtmIndexStmt , messageBody );
415
414
416
- MemoryContextSwitchTo (oldContext );
417
-
418
415
DefineIndex (relid , /* OID of heap relation */
419
416
MtmIndexStmt ,
420
417
InvalidOid , /* no predefined OID */
@@ -599,6 +596,7 @@ read_rel(StringInfo s, LOCKMODE mode)
599
596
RangeVar * rv ;
600
597
Oid remote_relid = pq_getmsgint (s , 4 );
601
598
Oid local_relid ;
599
+ MemoryContext old_context ;
602
600
603
601
local_relid = pglogical_relid_map_get (remote_relid );
604
602
if (local_relid == InvalidOid ) {
@@ -611,7 +609,9 @@ read_rel(StringInfo s, LOCKMODE mode)
611
609
rv -> relname = (char * ) pq_getmsgbytes (s , relnamelen );
612
610
613
611
local_relid = RangeVarGetRelidExtended (rv , mode , false, false, NULL , NULL );
612
+ old_context = MemoryContextSwitchTo (TopMemoryContext );
614
613
pglogical_relid_map_put (remote_relid , local_relid );
614
+ MemoryContextSwitchTo (old_context );
615
615
} else {
616
616
nspnamelen = pq_getmsgbyte (s );
617
617
s -> cursor += nspnamelen ;
@@ -1041,7 +1041,8 @@ void MtmExecutor(void* work, size_t size)
1041
1041
int spill_file = -1 ;
1042
1042
int save_cursor = 0 ;
1043
1043
int save_len = 0 ;
1044
- MemoryContext topContext ;
1044
+ MemoryContext old_context ;
1045
+ MemoryContext top_context ;
1045
1046
1046
1047
s .data = work ;
1047
1048
s .len = size ;
@@ -1055,13 +1056,15 @@ void MtmExecutor(void* work, size_t size)
1055
1056
ALLOCSET_DEFAULT_INITSIZE ,
1056
1057
ALLOCSET_DEFAULT_MAXSIZE );
1057
1058
}
1058
- topContext = MemoryContextSwitchTo (MtmApplyContext );
1059
-
1059
+ top_context = MemoryContextSwitchTo (MtmApplyContext );
1060
1060
replorigin_session_origin = InvalidRepOriginId ;
1061
1061
PG_TRY ();
1062
1062
{
1063
- while (true) {
1063
+ bool inside_transaction = true;
1064
+ do {
1064
1065
char action = pq_getmsgbyte (& s );
1066
+ old_context = MemoryContextSwitchTo (MtmApplyContext );
1067
+
1065
1068
MTM_LOG2 ("%d: REMOTE process action %c" , MyProcPid , action );
1066
1069
#if 0
1067
1070
if (Mtm -> status == MTM_RECOVERY ) {
@@ -1072,84 +1075,81 @@ void MtmExecutor(void* work, size_t size)
1072
1075
switch (action ) {
1073
1076
/* BEGIN */
1074
1077
case 'B' :
1075
- if (process_remote_begin (& s )) {
1076
- continue ;
1077
- } else {
1078
- break ;
1079
- }
1078
+ inside_transaction = process_remote_begin (& s );
1079
+ break ;
1080
1080
/* COMMIT */
1081
1081
case 'C' :
1082
1082
close_rel (rel );
1083
1083
process_remote_commit (& s );
1084
+ inside_transaction = false;
1084
1085
break ;
1085
1086
/* INSERT */
1086
1087
case 'I' :
1087
- process_remote_insert (& s , rel );
1088
- continue ;
1088
+ process_remote_insert (& s , rel );
1089
+ break ;
1089
1090
/* UPDATE */
1090
1091
case 'U' :
1091
1092
process_remote_update (& s , rel );
1092
- continue ;
1093
+ break ;
1093
1094
/* DELETE */
1094
1095
case 'D' :
1095
1096
process_remote_delete (& s , rel );
1096
- continue ;
1097
+ break ;
1097
1098
case 'R' :
1098
1099
close_rel (rel );
1099
1100
rel = read_rel (& s , RowExclusiveLock );
1100
- continue ;
1101
+ break ;
1101
1102
case 'F' :
1102
1103
{
1103
1104
int node_id = pq_getmsgint (& s , 4 );
1104
1105
int file_id = pq_getmsgint (& s , 4 );
1105
1106
Assert (spill_file < 0 );
1106
1107
spill_file = MtmOpenSpillFile (node_id , file_id );
1107
- continue ;
1108
+ break ;
1108
1109
}
1109
1110
case '(' :
1110
1111
{
1111
1112
size_t size = pq_getmsgint (& s , 4 );
1112
- s .data = palloc ( size );
1113
+ s .data = MemoryContextAlloc ( TopMemoryContext , size );
1113
1114
save_cursor = s .cursor ;
1114
1115
save_len = s .len ;
1115
1116
s .cursor = 0 ;
1116
1117
s .len = size ;
1117
1118
MtmReadSpillFile (spill_file , s .data , size );
1118
- continue ;
1119
+ break ;
1119
1120
}
1120
1121
case ')' :
1121
1122
{
1122
1123
pfree (s .data );
1123
1124
s .data = work ;
1124
1125
s .cursor = save_cursor ;
1125
1126
s .len = save_len ;
1126
- continue ;
1127
+ break ;
1127
1128
}
1128
1129
case 'M' :
1129
1130
{
1130
- if (process_remote_message (& s )) {
1131
- break ;
1132
- }
1133
- continue ;
1131
+ inside_transaction = !process_remote_message (& s );
1132
+ break ;
1134
1133
}
1135
1134
case 'Z' :
1136
1135
{
1137
1136
MtmRecoveryCompleted ();
1137
+ inside_transaction = false;
1138
1138
break ;
1139
1139
}
1140
1140
default :
1141
1141
MTM_ELOG (ERROR , "unknown action of type %c" , action );
1142
1142
}
1143
- break ;
1144
- }
1143
+ MemoryContextSwitchTo (old_context );
1144
+ MemoryContextResetAndDeleteChildren (MtmApplyContext );
1145
+ } while (inside_transaction );
1145
1146
}
1146
1147
PG_CATCH ();
1147
1148
{
1148
- MemoryContext oldcontext ;
1149
1149
MtmReleaseLock ();
1150
- oldcontext = MemoryContextSwitchTo (MtmApplyContext );
1150
+ old_context = MemoryContextSwitchTo (MtmApplyContext );
1151
1151
MtmHandleApplyError ();
1152
- MemoryContextSwitchTo (oldcontext );
1152
+ MemoryContextSwitchTo (old_context );
1153
1153
EmitErrorReport ();
1154
1154
FlushErrorState ();
1155
1155
MTM_LOG1 ("%d: REMOTE begin abort transaction %llu" , MyProcPid , (long64 )MtmGetCurrentTransactionId ());
@@ -1159,12 +1159,15 @@ void MtmExecutor(void* work, size_t size)
1159
1159
MTM_LOG2 ("%d: REMOTE end abort transaction %llu" , MyProcPid , (long64 )MtmGetCurrentTransactionId ());
1160
1160
}
1161
1161
PG_END_TRY ();
1162
+ if (s .data != work ) {
1163
+ pfree (s .data );
1164
+ }
1162
1165
#if 0 /* spill file is expecrted to be closed by tranaction commit or rollback */
1163
1166
if (spill_file >= 0 ) {
1164
1167
MtmCloseSpillFile (spill_file );
1165
1168
}
1166
1169
#endif
1170
+ MemoryContextSwitchTo (top_context );
1167
1171
MemoryContextResetAndDeleteChildren (MtmApplyContext );
1168
- MemoryContextSwitchTo (topContext );
1169
1172
}
1170
1173
0 commit comments