@@ -247,8 +247,10 @@ typedef struct ApplyErrorCallbackArg
247
247
* The action to be taken for the changes in the transaction.
248
248
*
249
249
* TRANS_LEADER_APPLY:
250
- * This action means that we are in the leader apply worker and changes of the
251
- * transaction are applied directly by the worker.
250
+ * This action means that we are in the leader apply worker or table sync
251
+ * worker. The changes of the transaction are either directly applied or
252
+ * are read from temporary files (for streaming transactions) and then
253
+ * applied by the worker.
252
254
*
253
255
* TRANS_LEADER_SERIALIZE:
254
256
* This action means that we are in the leader apply worker or table sync
@@ -1004,6 +1006,9 @@ apply_handle_begin(StringInfo s)
1004
1006
{
1005
1007
LogicalRepBeginData begin_data ;
1006
1008
1009
+ /* There must not be an active streaming transaction. */
1010
+ Assert (!TransactionIdIsValid (stream_xid ));
1011
+
1007
1012
logicalrep_read_begin (s , & begin_data );
1008
1013
set_apply_error_context_xact (begin_data .xid , begin_data .final_lsn );
1009
1014
@@ -1058,6 +1063,9 @@ apply_handle_begin_prepare(StringInfo s)
1058
1063
(errcode (ERRCODE_PROTOCOL_VIOLATION ),
1059
1064
errmsg_internal ("tablesync worker received a BEGIN PREPARE message" )));
1060
1065
1066
+ /* There must not be an active streaming transaction. */
1067
+ Assert (!TransactionIdIsValid (stream_xid ));
1068
+
1061
1069
logicalrep_read_begin_prepare (s , & begin_data );
1062
1070
set_apply_error_context_xact (begin_data .xid , begin_data .prepare_lsn );
1063
1071
@@ -1301,7 +1309,7 @@ apply_handle_stream_prepare(StringInfo s)
1301
1309
1302
1310
switch (apply_action )
1303
1311
{
1304
- case TRANS_LEADER_SERIALIZE :
1312
+ case TRANS_LEADER_APPLY :
1305
1313
1306
1314
/*
1307
1315
* The transaction has been serialized to file, so replay all the
@@ -1384,7 +1392,7 @@ apply_handle_stream_prepare(StringInfo s)
1384
1392
break ;
1385
1393
1386
1394
default :
1387
- Assert (false );
1395
+ elog ( ERROR , "unexpected apply action: %d" , ( int ) apply_action );
1388
1396
break ;
1389
1397
}
1390
1398
@@ -1484,6 +1492,9 @@ apply_handle_stream_start(StringInfo s)
1484
1492
(errcode (ERRCODE_PROTOCOL_VIOLATION ),
1485
1493
errmsg_internal ("duplicate STREAM START message" )));
1486
1494
1495
+ /* There must not be an active streaming transaction. */
1496
+ Assert (!TransactionIdIsValid (stream_xid ));
1497
+
1487
1498
/* notify handle methods we're processing a remote transaction */
1488
1499
in_streamed_transaction = true;
1489
1500
@@ -1589,7 +1600,7 @@ apply_handle_stream_start(StringInfo s)
1589
1600
break ;
1590
1601
1591
1602
default :
1592
- Assert (false );
1603
+ elog ( ERROR , "unexpected apply action: %d" , ( int ) apply_action );
1593
1604
break ;
1594
1605
}
1595
1606
@@ -1705,11 +1716,12 @@ apply_handle_stream_stop(StringInfo s)
1705
1716
break ;
1706
1717
1707
1718
default :
1708
- Assert (false );
1719
+ elog ( ERROR , "unexpected apply action: %d" , ( int ) apply_action );
1709
1720
break ;
1710
1721
}
1711
1722
1712
1723
in_streamed_transaction = false;
1724
+ stream_xid = InvalidTransactionId ;
1713
1725
1714
1726
/*
1715
1727
* The parallel apply worker could be in a transaction in which case we
@@ -1842,7 +1854,7 @@ apply_handle_stream_abort(StringInfo s)
1842
1854
1843
1855
switch (apply_action )
1844
1856
{
1845
- case TRANS_LEADER_SERIALIZE :
1857
+ case TRANS_LEADER_APPLY :
1846
1858
1847
1859
/*
1848
1860
* We are in the leader apply worker and the transaction has been
@@ -1957,7 +1969,7 @@ apply_handle_stream_abort(StringInfo s)
1957
1969
break ;
1958
1970
1959
1971
default :
1960
- Assert (false );
1972
+ elog ( ERROR , "unexpected apply action: %d" , ( int ) apply_action );
1961
1973
break ;
1962
1974
}
1963
1975
@@ -2154,7 +2166,7 @@ apply_handle_stream_commit(StringInfo s)
2154
2166
2155
2167
switch (apply_action )
2156
2168
{
2157
- case TRANS_LEADER_SERIALIZE :
2169
+ case TRANS_LEADER_APPLY :
2158
2170
2159
2171
/*
2160
2172
* The transaction has been serialized to file, so replay all the
@@ -2226,7 +2238,7 @@ apply_handle_stream_commit(StringInfo s)
2226
2238
break ;
2227
2239
2228
2240
default :
2229
- Assert (false );
2241
+ elog ( ERROR , "unexpected apply action: %d" , ( int ) apply_action );
2230
2242
break ;
2231
2243
}
2232
2244
@@ -4204,7 +4216,6 @@ stream_close_file(void)
4204
4216
4205
4217
BufFileClose (stream_fd );
4206
4218
4207
- stream_xid = InvalidTransactionId ;
4208
4219
stream_fd = NULL ;
4209
4220
}
4210
4221
@@ -4977,10 +4988,12 @@ set_apply_error_context_origin(char *originname)
4977
4988
}
4978
4989
4979
4990
/*
4980
- * Return the action to be taken for the given transaction. *winfo is
4981
- * assigned to the destination parallel worker info when the leader apply
4982
- * worker has to pass all the transaction's changes to the parallel apply
4983
- * worker.
4991
+ * Return the action to be taken for the given transaction. See
4992
+ * TransApplyAction for information on each of the actions.
4993
+ *
4994
+ * *winfo is assigned to the destination parallel worker info when the leader
4995
+ * apply worker has to pass all the transaction's changes to the parallel
4996
+ * apply worker.
4984
4997
*/
4985
4998
static TransApplyAction
4986
4999
get_transaction_apply_action (TransactionId xid , ParallelApplyWorkerInfo * * winfo )
@@ -4991,27 +5004,35 @@ get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo)
4991
5004
{
4992
5005
return TRANS_PARALLEL_APPLY ;
4993
5006
}
4994
- else if (in_remote_transaction )
4995
- {
4996
- return TRANS_LEADER_APPLY ;
4997
- }
4998
5007
4999
5008
/*
5000
- * Check if we are processing this transaction using a parallel apply
5001
- * worker.
5009
+ * If we are processing this transaction using a parallel apply worker then
5010
+ * either we send the changes to the parallel worker or if the worker is busy
5011
+ * then serialize the changes to the file which will later be processed by
5012
+ * the parallel worker.
5002
5013
*/
5003
5014
* winfo = pa_find_worker (xid );
5004
5015
5005
- if (! * winfo )
5016
+ if (* winfo && ( * winfo ) -> serialize_changes )
5006
5017
{
5007
- return TRANS_LEADER_SERIALIZE ;
5018
+ return TRANS_LEADER_PARTIAL_SERIALIZE ;
5008
5019
}
5009
- else if (( * winfo ) -> serialize_changes )
5020
+ else if (* winfo )
5010
5021
{
5011
- return TRANS_LEADER_PARTIAL_SERIALIZE ;
5022
+ return TRANS_LEADER_SEND_TO_PARALLEL ;
5023
+ }
5024
+
5025
+ /*
5026
+ * If there is no parallel worker involved to process this transaction then
5027
+ * we either directly apply the change or serialize it to a file which will
5028
+ * later be applied when the transaction finish message is processed.
5029
+ */
5030
+ else if (in_streamed_transaction )
5031
+ {
5032
+ return TRANS_LEADER_SERIALIZE ;
5012
5033
}
5013
5034
else
5014
5035
{
5015
- return TRANS_LEADER_SEND_TO_PARALLEL ;
5036
+ return TRANS_LEADER_APPLY ;
5016
5037
}
5017
5038
}
0 commit comments