@@ -333,6 +333,8 @@ static void apply_handle_tuple_routing(ApplyExecutionData *edata,
333
333
/* Compute GID for two_phase transactions */
334
334
static void TwoPhaseTransactionGid (Oid subid , TransactionId xid , char * gid , int szgid );
335
335
336
+ /* Common streaming function to apply all the spooled messages */
337
+ static void apply_spooled_messages (TransactionId xid , XLogRecPtr lsn );
336
338
337
339
/*
338
340
* Should this worker apply changes for given relation.
@@ -884,14 +886,47 @@ apply_handle_begin_prepare(StringInfo s)
884
886
pgstat_report_activity (STATE_RUNNING , NULL );
885
887
}
886
888
889
+ /*
890
+ * Common function to prepare the GID.
891
+ */
892
+ static void
893
+ apply_handle_prepare_internal (LogicalRepPreparedTxnData * prepare_data )
894
+ {
895
+ char gid [GIDSIZE ];
896
+
897
+ /*
898
+ * Compute unique GID for two_phase transactions. We don't use GID of
899
+ * prepared transaction sent by server as that can lead to deadlock when
900
+ * we have multiple subscriptions from same node point to publications on
901
+ * the same node. See comments atop worker.c
902
+ */
903
+ TwoPhaseTransactionGid (MySubscription -> oid , prepare_data -> xid ,
904
+ gid , sizeof (gid ));
905
+
906
+ /*
907
+ * BeginTransactionBlock is necessary to balance the EndTransactionBlock
908
+ * called within the PrepareTransactionBlock below.
909
+ */
910
+ BeginTransactionBlock ();
911
+ CommitTransactionCommand (); /* Completes the preceding Begin command. */
912
+
913
+ /*
914
+ * Update origin state so we can restart streaming from correct position
915
+ * in case of crash.
916
+ */
917
+ replorigin_session_origin_lsn = prepare_data -> end_lsn ;
918
+ replorigin_session_origin_timestamp = prepare_data -> prepare_time ;
919
+
920
+ PrepareTransactionBlock (gid );
921
+ }
922
+
887
923
/*
888
924
* Handle PREPARE message.
889
925
*/
890
926
static void
891
927
apply_handle_prepare (StringInfo s )
892
928
{
893
929
LogicalRepPreparedTxnData prepare_data ;
894
- char gid [GIDSIZE ];
895
930
896
931
logicalrep_read_prepare (s , & prepare_data );
897
932
@@ -902,15 +937,6 @@ apply_handle_prepare(StringInfo s)
902
937
LSN_FORMAT_ARGS (prepare_data .prepare_lsn ),
903
938
LSN_FORMAT_ARGS (remote_final_lsn ))));
904
939
905
- /*
906
- * Compute unique GID for two_phase transactions. We don't use GID of
907
- * prepared transaction sent by server as that can lead to deadlock when
908
- * we have multiple subscriptions from same node point to publications on
909
- * the same node. See comments atop worker.c
910
- */
911
- TwoPhaseTransactionGid (MySubscription -> oid , prepare_data .xid ,
912
- gid , sizeof (gid ));
913
-
914
940
/*
915
941
* Unlike commit, here, we always prepare the transaction even though no
916
942
* change has happened in this transaction. It is done this way because at
@@ -923,21 +949,8 @@ apply_handle_prepare(StringInfo s)
923
949
*/
924
950
begin_replication_step ();
925
951
926
- /*
927
- * BeginTransactionBlock is necessary to balance the EndTransactionBlock
928
- * called within the PrepareTransactionBlock below.
929
- */
930
- BeginTransactionBlock ();
931
- CommitTransactionCommand (); /* Completes the preceding Begin command. */
932
-
933
- /*
934
- * Update origin state so we can restart streaming from correct position
935
- * in case of crash.
936
- */
937
- replorigin_session_origin_lsn = prepare_data .end_lsn ;
938
- replorigin_session_origin_timestamp = prepare_data .prepare_time ;
952
+ apply_handle_prepare_internal (& prepare_data );
939
953
940
- PrepareTransactionBlock (gid );
941
954
end_replication_step ();
942
955
CommitTransactionCommand ();
943
956
pgstat_report_stat (false);
@@ -1256,30 +1269,19 @@ apply_handle_stream_abort(StringInfo s)
1256
1269
}
1257
1270
1258
1271
/*
1259
- * Handle STREAM COMMIT message .
1272
+ * Common spoolfile processing .
1260
1273
*/
1261
1274
static void
1262
- apply_handle_stream_commit ( StringInfo s )
1275
+ apply_spooled_messages ( TransactionId xid , XLogRecPtr lsn )
1263
1276
{
1264
- TransactionId xid ;
1265
1277
StringInfoData s2 ;
1266
1278
int nchanges ;
1267
1279
char path [MAXPGPATH ];
1268
1280
char * buffer = NULL ;
1269
- LogicalRepCommitData commit_data ;
1270
1281
StreamXidHash * ent ;
1271
1282
MemoryContext oldcxt ;
1272
1283
BufFile * fd ;
1273
1284
1274
- if (in_streamed_transaction )
1275
- ereport (ERROR ,
1276
- (errcode (ERRCODE_PROTOCOL_VIOLATION ),
1277
- errmsg_internal ("STREAM COMMIT message without STREAM STOP" )));
1278
-
1279
- xid = logicalrep_read_stream_commit (s , & commit_data );
1280
-
1281
- elog (DEBUG1 , "received commit for streamed transaction %u" , xid );
1282
-
1283
1285
/* Make sure we have an open transaction */
1284
1286
begin_replication_step ();
1285
1287
@@ -1311,7 +1313,7 @@ apply_handle_stream_commit(StringInfo s)
1311
1313
1312
1314
MemoryContextSwitchTo (oldcxt );
1313
1315
1314
- remote_final_lsn = commit_data . commit_lsn ;
1316
+ remote_final_lsn = lsn ;
1315
1317
1316
1318
/*
1317
1319
* Make sure the handle apply_dispatch methods are aware we're in a remote
@@ -1390,6 +1392,29 @@ apply_handle_stream_commit(StringInfo s)
1390
1392
elog (DEBUG1 , "replayed %d (all) changes from file \"%s\"" ,
1391
1393
nchanges , path );
1392
1394
1395
+ return ;
1396
+ }
1397
+
1398
+ /*
1399
+ * Handle STREAM COMMIT message.
1400
+ */
1401
+ static void
1402
+ apply_handle_stream_commit (StringInfo s )
1403
+ {
1404
+ TransactionId xid ;
1405
+ LogicalRepCommitData commit_data ;
1406
+
1407
+ if (in_streamed_transaction )
1408
+ ereport (ERROR ,
1409
+ (errcode (ERRCODE_PROTOCOL_VIOLATION ),
1410
+ errmsg_internal ("STREAM COMMIT message without STREAM STOP" )));
1411
+
1412
+ xid = logicalrep_read_stream_commit (s , & commit_data );
1413
+
1414
+ elog (DEBUG1 , "received commit for streamed transaction %u" , xid );
1415
+
1416
+ apply_spooled_messages (xid , commit_data .commit_lsn );
1417
+
1393
1418
apply_handle_commit_internal (s , & commit_data );
1394
1419
1395
1420
/* unlink the files with serialized changes and subxact info */
0 commit comments