43
43
#include "storage/pmsignal.h"
44
44
#include "storage/proc.h"
45
45
#include "utils/syscache.h"
46
+ #include "replication/walsender.h"
46
47
#include "port/atomics.h"
47
48
48
49
#include "sockhub/sockhub.h"
@@ -757,7 +758,7 @@ DtmXactCallback(XactEvent event, void *arg)
757
758
case XACT_EVENT_START :
758
759
//XTM_INFO("%d: normal=%d, initialized=%d, replication=%d, bgw=%d, vacuum=%d\n",
759
760
// getpid(), IsNormalProcessingMode(), dtm->initialized, MMDoReplication, IsBackgroundWorker, IsAutoVacuumWorkerProcess());
760
- if (IsNormalProcessingMode () && dtm -> initialized && MMDoReplication && !IsBackgroundWorker && !IsAutoVacuumWorkerProcess ()) {
761
+ if (IsNormalProcessingMode () && dtm -> initialized && MMDoReplication && !am_walsender && ! IsBackgroundWorker && !IsAutoVacuumWorkerProcess ()) {
761
762
MMBeginTransaction ();
762
763
}
763
764
break ;
@@ -1190,7 +1191,7 @@ static void MMExecutor(int id, void* work, size_t size)
1190
1191
{
1191
1192
TransactionId xid = * (TransactionId * )work ;
1192
1193
char * stmts = (char * )work + 4 ;
1193
- int rc = SPI_ERROR_TRANSACTION ;
1194
+ bool finished = false ;
1194
1195
1195
1196
MMJoinTransaction (xid );
1196
1197
@@ -1201,9 +1202,10 @@ static void MMExecutor(int id, void* work, size_t size)
1201
1202
1202
1203
PG_TRY ();
1203
1204
{
1204
- rc = SPI_execute (stmts , false, 0 );
1205
+ int rc = SPI_execute (stmts , false, 0 );
1205
1206
SPI_finish ();
1206
1207
PopActiveSnapshot ();
1208
+ finished = true;
1207
1209
if (rc != SPI_OK_INSERT && rc != SPI_OK_UPDATE && rc != SPI_OK_DELETE ) {
1208
1210
ereport (LOG , (errmsg ("Executor %d: failed to apply transaction %u" ,
1209
1211
id , xid )));
@@ -1215,9 +1217,11 @@ static void MMExecutor(int id, void* work, size_t size)
1215
1217
PG_CATCH ();
1216
1218
{
1217
1219
FlushErrorState ();
1218
- if (rc == SPI_ERROR_TRANSACTION ) {
1220
+ if (! finished ) {
1219
1221
SPI_finish ();
1220
- PopActiveSnapshot ();
1222
+ if (ActiveSnapshotSet ()) {
1223
+ PopActiveSnapshot ();
1224
+ }
1221
1225
}
1222
1226
AbortCurrentTransaction ();
1223
1227
}
0 commit comments