@@ -746,6 +746,7 @@ startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool i
746
746
747
747
/* set output state */
748
748
ctx -> accept_writes = false;
749
+ ctx -> end_xact = false;
749
750
750
751
/* do the actual work: call callback */
751
752
ctx -> callbacks .startup_cb (ctx , opt , is_init );
@@ -773,6 +774,7 @@ shutdown_cb_wrapper(LogicalDecodingContext *ctx)
773
774
774
775
/* set output state */
775
776
ctx -> accept_writes = false;
777
+ ctx -> end_xact = false;
776
778
777
779
/* do the actual work: call callback */
778
780
ctx -> callbacks .shutdown_cb (ctx );
@@ -808,6 +810,7 @@ begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
808
810
ctx -> accept_writes = true;
809
811
ctx -> write_xid = txn -> xid ;
810
812
ctx -> write_location = txn -> first_lsn ;
813
+ ctx -> end_xact = false;
811
814
812
815
/* do the actual work: call callback */
813
816
ctx -> callbacks .begin_cb (ctx , txn );
@@ -839,6 +842,7 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
839
842
ctx -> accept_writes = true;
840
843
ctx -> write_xid = txn -> xid ;
841
844
ctx -> write_location = txn -> end_lsn ; /* points to the end of the record */
845
+ ctx -> end_xact = true;
842
846
843
847
/* do the actual work: call callback */
844
848
ctx -> callbacks .commit_cb (ctx , txn , commit_lsn );
@@ -879,6 +883,7 @@ begin_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
879
883
ctx -> accept_writes = true;
880
884
ctx -> write_xid = txn -> xid ;
881
885
ctx -> write_location = txn -> first_lsn ;
886
+ ctx -> end_xact = false;
882
887
883
888
/*
884
889
* If the plugin supports two-phase commits then begin prepare callback is
@@ -923,6 +928,7 @@ prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
923
928
ctx -> accept_writes = true;
924
929
ctx -> write_xid = txn -> xid ;
925
930
ctx -> write_location = txn -> end_lsn ; /* points to the end of the record */
931
+ ctx -> end_xact = true;
926
932
927
933
/*
928
934
* If the plugin supports two-phase commits then prepare callback is
@@ -967,6 +973,7 @@ commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
967
973
ctx -> accept_writes = true;
968
974
ctx -> write_xid = txn -> xid ;
969
975
ctx -> write_location = txn -> end_lsn ; /* points to the end of the record */
976
+ ctx -> end_xact = true;
970
977
971
978
/*
972
979
* If the plugin support two-phase commits then commit prepared callback
@@ -1012,6 +1019,7 @@ rollback_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
1012
1019
ctx -> accept_writes = true;
1013
1020
ctx -> write_xid = txn -> xid ;
1014
1021
ctx -> write_location = txn -> end_lsn ; /* points to the end of the record */
1022
+ ctx -> end_xact = true;
1015
1023
1016
1024
/*
1017
1025
* If the plugin support two-phase commits then rollback prepared callback
@@ -1062,6 +1070,8 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
1062
1070
*/
1063
1071
ctx -> write_location = change -> lsn ;
1064
1072
1073
+ ctx -> end_xact = false;
1074
+
1065
1075
ctx -> callbacks .change_cb (ctx , txn , relation , change );
1066
1076
1067
1077
/* Pop the error context stack */
@@ -1102,6 +1112,8 @@ truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
1102
1112
*/
1103
1113
ctx -> write_location = change -> lsn ;
1104
1114
1115
+ ctx -> end_xact = false;
1116
+
1105
1117
ctx -> callbacks .truncate_cb (ctx , txn , nrelations , relations , change );
1106
1118
1107
1119
/* Pop the error context stack */
@@ -1129,6 +1141,7 @@ filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, TransactionId xid,
1129
1141
1130
1142
/* set output state */
1131
1143
ctx -> accept_writes = false;
1144
+ ctx -> end_xact = false;
1132
1145
1133
1146
/* do the actual work: call callback */
1134
1147
ret = ctx -> callbacks .filter_prepare_cb (ctx , xid , gid );
@@ -1159,6 +1172,7 @@ filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
1159
1172
1160
1173
/* set output state */
1161
1174
ctx -> accept_writes = false;
1175
+ ctx -> end_xact = false;
1162
1176
1163
1177
/* do the actual work: call callback */
1164
1178
ret = ctx -> callbacks .filter_by_origin_cb (ctx , origin_id );
@@ -1196,6 +1210,7 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
1196
1210
ctx -> accept_writes = true;
1197
1211
ctx -> write_xid = txn != NULL ? txn -> xid : InvalidTransactionId ;
1198
1212
ctx -> write_location = message_lsn ;
1213
+ ctx -> end_xact = false;
1199
1214
1200
1215
/* do the actual work: call callback */
1201
1216
ctx -> callbacks .message_cb (ctx , txn , message_lsn , transactional , prefix ,
@@ -1239,6 +1254,8 @@ stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
1239
1254
*/
1240
1255
ctx -> write_location = first_lsn ;
1241
1256
1257
+ ctx -> end_xact = false;
1258
+
1242
1259
/* in streaming mode, stream_start_cb is required */
1243
1260
if (ctx -> callbacks .stream_start_cb == NULL )
1244
1261
ereport (ERROR ,
@@ -1286,6 +1303,8 @@ stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
1286
1303
*/
1287
1304
ctx -> write_location = last_lsn ;
1288
1305
1306
+ ctx -> end_xact = false;
1307
+
1289
1308
/* in streaming mode, stream_stop_cb is required */
1290
1309
if (ctx -> callbacks .stream_stop_cb == NULL )
1291
1310
ereport (ERROR ,
@@ -1325,6 +1344,7 @@ stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
1325
1344
ctx -> accept_writes = true;
1326
1345
ctx -> write_xid = txn -> xid ;
1327
1346
ctx -> write_location = abort_lsn ;
1347
+ ctx -> end_xact = true;
1328
1348
1329
1349
/* in streaming mode, stream_abort_cb is required */
1330
1350
if (ctx -> callbacks .stream_abort_cb == NULL )
@@ -1369,6 +1389,7 @@ stream_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
1369
1389
ctx -> accept_writes = true;
1370
1390
ctx -> write_xid = txn -> xid ;
1371
1391
ctx -> write_location = txn -> end_lsn ;
1392
+ ctx -> end_xact = true;
1372
1393
1373
1394
/* in streaming mode with two-phase commits, stream_prepare_cb is required */
1374
1395
if (ctx -> callbacks .stream_prepare_cb == NULL )
@@ -1409,6 +1430,7 @@ stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
1409
1430
ctx -> accept_writes = true;
1410
1431
ctx -> write_xid = txn -> xid ;
1411
1432
ctx -> write_location = txn -> end_lsn ;
1433
+ ctx -> end_xact = true;
1412
1434
1413
1435
/* in streaming mode, stream_commit_cb is required */
1414
1436
if (ctx -> callbacks .stream_commit_cb == NULL )
@@ -1457,6 +1479,8 @@ stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
1457
1479
*/
1458
1480
ctx -> write_location = change -> lsn ;
1459
1481
1482
+ ctx -> end_xact = false;
1483
+
1460
1484
/* in streaming mode, stream_change_cb is required */
1461
1485
if (ctx -> callbacks .stream_change_cb == NULL )
1462
1486
ereport (ERROR ,
@@ -1501,6 +1525,7 @@ stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
1501
1525
ctx -> accept_writes = true;
1502
1526
ctx -> write_xid = txn != NULL ? txn -> xid : InvalidTransactionId ;
1503
1527
ctx -> write_location = message_lsn ;
1528
+ ctx -> end_xact = false;
1504
1529
1505
1530
/* do the actual work: call callback */
1506
1531
ctx -> callbacks .stream_message_cb (ctx , txn , message_lsn , transactional , prefix ,
@@ -1549,6 +1574,8 @@ stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
1549
1574
*/
1550
1575
ctx -> write_location = change -> lsn ;
1551
1576
1577
+ ctx -> end_xact = false;
1578
+
1552
1579
ctx -> callbacks .stream_truncate_cb (ctx , txn , nrelations , relations , change );
1553
1580
1554
1581
/* Pop the error context stack */
0 commit comments