@@ -724,6 +724,7 @@ startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool i
724
724
725
725
/* set output state */
726
726
ctx -> accept_writes = false;
727
+ ctx -> end_xact = false;
727
728
728
729
/* do the actual work: call callback */
729
730
ctx -> callbacks .startup_cb (ctx , opt , is_init );
@@ -751,6 +752,7 @@ shutdown_cb_wrapper(LogicalDecodingContext *ctx)
751
752
752
753
/* set output state */
753
754
ctx -> accept_writes = false;
755
+ ctx -> end_xact = false;
754
756
755
757
/* do the actual work: call callback */
756
758
ctx -> callbacks .shutdown_cb (ctx );
@@ -786,6 +788,7 @@ begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
786
788
ctx -> accept_writes = true;
787
789
ctx -> write_xid = txn -> xid ;
788
790
ctx -> write_location = txn -> first_lsn ;
791
+ ctx -> end_xact = false;
789
792
790
793
/* do the actual work: call callback */
791
794
ctx -> callbacks .begin_cb (ctx , txn );
@@ -817,6 +820,7 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
817
820
ctx -> accept_writes = true;
818
821
ctx -> write_xid = txn -> xid ;
819
822
ctx -> write_location = txn -> end_lsn ; /* points to the end of the record */
823
+ ctx -> end_xact = true;
820
824
821
825
/* do the actual work: call callback */
822
826
ctx -> callbacks .commit_cb (ctx , txn , commit_lsn );
@@ -857,6 +861,7 @@ begin_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
857
861
ctx -> accept_writes = true;
858
862
ctx -> write_xid = txn -> xid ;
859
863
ctx -> write_location = txn -> first_lsn ;
864
+ ctx -> end_xact = false;
860
865
861
866
/*
862
867
* If the plugin supports two-phase commits then begin prepare callback is
@@ -901,6 +906,7 @@ prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
901
906
ctx -> accept_writes = true;
902
907
ctx -> write_xid = txn -> xid ;
903
908
ctx -> write_location = txn -> end_lsn ; /* points to the end of the record */
909
+ ctx -> end_xact = true;
904
910
905
911
/*
906
912
* If the plugin supports two-phase commits then prepare callback is
@@ -945,6 +951,7 @@ commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
945
951
ctx -> accept_writes = true;
946
952
ctx -> write_xid = txn -> xid ;
947
953
ctx -> write_location = txn -> end_lsn ; /* points to the end of the record */
954
+ ctx -> end_xact = true;
948
955
949
956
/*
950
957
* If the plugin support two-phase commits then commit prepared callback
@@ -990,6 +997,7 @@ rollback_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
990
997
ctx -> accept_writes = true;
991
998
ctx -> write_xid = txn -> xid ;
992
999
ctx -> write_location = txn -> end_lsn ; /* points to the end of the record */
1000
+ ctx -> end_xact = true;
993
1001
994
1002
/*
995
1003
* If the plugin support two-phase commits then rollback prepared callback
@@ -1040,6 +1048,8 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
1040
1048
*/
1041
1049
ctx -> write_location = change -> lsn ;
1042
1050
1051
+ ctx -> end_xact = false;
1052
+
1043
1053
ctx -> callbacks .change_cb (ctx , txn , relation , change );
1044
1054
1045
1055
/* Pop the error context stack */
@@ -1080,6 +1090,8 @@ truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
1080
1090
*/
1081
1091
ctx -> write_location = change -> lsn ;
1082
1092
1093
+ ctx -> end_xact = false;
1094
+
1083
1095
ctx -> callbacks .truncate_cb (ctx , txn , nrelations , relations , change );
1084
1096
1085
1097
/* Pop the error context stack */
@@ -1107,6 +1119,7 @@ filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, TransactionId xid,
1107
1119
1108
1120
/* set output state */
1109
1121
ctx -> accept_writes = false;
1122
+ ctx -> end_xact = false;
1110
1123
1111
1124
/* do the actual work: call callback */
1112
1125
ret = ctx -> callbacks .filter_prepare_cb (ctx , xid , gid );
@@ -1137,6 +1150,7 @@ filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
1137
1150
1138
1151
/* set output state */
1139
1152
ctx -> accept_writes = false;
1153
+ ctx -> end_xact = false;
1140
1154
1141
1155
/* do the actual work: call callback */
1142
1156
ret = ctx -> callbacks .filter_by_origin_cb (ctx , origin_id );
@@ -1174,6 +1188,7 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
1174
1188
ctx -> accept_writes = true;
1175
1189
ctx -> write_xid = txn != NULL ? txn -> xid : InvalidTransactionId ;
1176
1190
ctx -> write_location = message_lsn ;
1191
+ ctx -> end_xact = false;
1177
1192
1178
1193
/* do the actual work: call callback */
1179
1194
ctx -> callbacks .message_cb (ctx , txn , message_lsn , transactional , prefix ,
@@ -1217,6 +1232,8 @@ stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
1217
1232
*/
1218
1233
ctx -> write_location = first_lsn ;
1219
1234
1235
+ ctx -> end_xact = false;
1236
+
1220
1237
/* in streaming mode, stream_start_cb is required */
1221
1238
if (ctx -> callbacks .stream_start_cb == NULL )
1222
1239
ereport (ERROR ,
@@ -1264,6 +1281,8 @@ stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
1264
1281
*/
1265
1282
ctx -> write_location = last_lsn ;
1266
1283
1284
+ ctx -> end_xact = false;
1285
+
1267
1286
/* in streaming mode, stream_stop_cb is required */
1268
1287
if (ctx -> callbacks .stream_stop_cb == NULL )
1269
1288
ereport (ERROR ,
@@ -1303,6 +1322,7 @@ stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
1303
1322
ctx -> accept_writes = true;
1304
1323
ctx -> write_xid = txn -> xid ;
1305
1324
ctx -> write_location = abort_lsn ;
1325
+ ctx -> end_xact = true;
1306
1326
1307
1327
/* in streaming mode, stream_abort_cb is required */
1308
1328
if (ctx -> callbacks .stream_abort_cb == NULL )
@@ -1347,6 +1367,7 @@ stream_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
1347
1367
ctx -> accept_writes = true;
1348
1368
ctx -> write_xid = txn -> xid ;
1349
1369
ctx -> write_location = txn -> end_lsn ;
1370
+ ctx -> end_xact = true;
1350
1371
1351
1372
/* in streaming mode with two-phase commits, stream_prepare_cb is required */
1352
1373
if (ctx -> callbacks .stream_prepare_cb == NULL )
@@ -1387,6 +1408,7 @@ stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
1387
1408
ctx -> accept_writes = true;
1388
1409
ctx -> write_xid = txn -> xid ;
1389
1410
ctx -> write_location = txn -> end_lsn ;
1411
+ ctx -> end_xact = true;
1390
1412
1391
1413
/* in streaming mode, stream_commit_cb is required */
1392
1414
if (ctx -> callbacks .stream_commit_cb == NULL )
@@ -1435,6 +1457,8 @@ stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
1435
1457
*/
1436
1458
ctx -> write_location = change -> lsn ;
1437
1459
1460
+ ctx -> end_xact = false;
1461
+
1438
1462
/* in streaming mode, stream_change_cb is required */
1439
1463
if (ctx -> callbacks .stream_change_cb == NULL )
1440
1464
ereport (ERROR ,
@@ -1479,6 +1503,7 @@ stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
1479
1503
ctx -> accept_writes = true;
1480
1504
ctx -> write_xid = txn != NULL ? txn -> xid : InvalidTransactionId ;
1481
1505
ctx -> write_location = message_lsn ;
1506
+ ctx -> end_xact = false;
1482
1507
1483
1508
/* do the actual work: call callback */
1484
1509
ctx -> callbacks .stream_message_cb (ctx , txn , message_lsn , transactional , prefix ,
@@ -1527,6 +1552,8 @@ stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
1527
1552
*/
1528
1553
ctx -> write_location = change -> lsn ;
1529
1554
1555
+ ctx -> end_xact = false;
1556
+
1530
1557
ctx -> callbacks .stream_truncate_cb (ctx , txn , nrelations , relations , change );
1531
1558
1532
1559
/* Pop the error context stack */
0 commit comments