Skip to content

Commit c981d91

Browse files
author
Amit Kapila
committed
Improve the code to decide and process the apply action.
The code that decides the apply action missed to handle non-transactional messages and we didn't catch it in our testing as currently such messages are simply ignored by the apply worker. This was introduced by changes in commit 216a784. While testing this, I noticed that we forgot to reset stream_xid after processing the stream stop message which could also result in the wrong apply action after the fix for non-transactional messages. In passing, change assert to elog for unexpected apply action in some of the routines so as to catch the problems in the production environment, if any. Reported-by: Tomas Vondra Author: Amit Kapila Reviewed-by: Tomas Vondra, Sawada Masahiko, Hou Zhijie Discussion: https://postgr.es/m/984ff689-adde-9977-affe-cd6029e850be@enterprisedb.com Discussion: https://postgr.es/m/CAA4eK1+wyN6zpaHUkCLorEWNx75MG0xhMwcFhvjqm2KURZEAGw@mail.gmail.com
1 parent 4f985ab commit c981d91

File tree

1 file changed

+47
-26
lines changed

1 file changed

+47
-26
lines changed

src/backend/replication/logical/worker.c

Lines changed: 47 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -247,8 +247,10 @@ typedef struct ApplyErrorCallbackArg
247247
* The action to be taken for the changes in the transaction.
248248
*
249249
* TRANS_LEADER_APPLY:
250-
* This action means that we are in the leader apply worker and changes of the
251-
* transaction are applied directly by the worker.
250+
* This action means that we are in the leader apply worker or table sync
251+
* worker. The changes of the transaction are either directly applied or
252+
* are read from temporary files (for streaming transactions) and then
253+
* applied by the worker.
252254
*
253255
* TRANS_LEADER_SERIALIZE:
254256
* This action means that we are in the leader apply worker or table sync
@@ -1004,6 +1006,9 @@ apply_handle_begin(StringInfo s)
10041006
{
10051007
LogicalRepBeginData begin_data;
10061008

1009+
/* There must not be an active streaming transaction. */
1010+
Assert(!TransactionIdIsValid(stream_xid));
1011+
10071012
logicalrep_read_begin(s, &begin_data);
10081013
set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn);
10091014

@@ -1058,6 +1063,9 @@ apply_handle_begin_prepare(StringInfo s)
10581063
(errcode(ERRCODE_PROTOCOL_VIOLATION),
10591064
errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
10601065

1066+
/* There must not be an active streaming transaction. */
1067+
Assert(!TransactionIdIsValid(stream_xid));
1068+
10611069
logicalrep_read_begin_prepare(s, &begin_data);
10621070
set_apply_error_context_xact(begin_data.xid, begin_data.prepare_lsn);
10631071

@@ -1301,7 +1309,7 @@ apply_handle_stream_prepare(StringInfo s)
13011309

13021310
switch (apply_action)
13031311
{
1304-
case TRANS_LEADER_SERIALIZE:
1312+
case TRANS_LEADER_APPLY:
13051313

13061314
/*
13071315
* The transaction has been serialized to file, so replay all the
@@ -1384,7 +1392,7 @@ apply_handle_stream_prepare(StringInfo s)
13841392
break;
13851393

13861394
default:
1387-
Assert(false);
1395+
elog(ERROR, "unexpected apply action: %d", (int) apply_action);
13881396
break;
13891397
}
13901398

@@ -1484,6 +1492,9 @@ apply_handle_stream_start(StringInfo s)
14841492
(errcode(ERRCODE_PROTOCOL_VIOLATION),
14851493
errmsg_internal("duplicate STREAM START message")));
14861494

1495+
/* There must not be an active streaming transaction. */
1496+
Assert(!TransactionIdIsValid(stream_xid));
1497+
14871498
/* notify handle methods we're processing a remote transaction */
14881499
in_streamed_transaction = true;
14891500

@@ -1589,7 +1600,7 @@ apply_handle_stream_start(StringInfo s)
15891600
break;
15901601

15911602
default:
1592-
Assert(false);
1603+
elog(ERROR, "unexpected apply action: %d", (int) apply_action);
15931604
break;
15941605
}
15951606

@@ -1705,11 +1716,12 @@ apply_handle_stream_stop(StringInfo s)
17051716
break;
17061717

17071718
default:
1708-
Assert(false);
1719+
elog(ERROR, "unexpected apply action: %d", (int) apply_action);
17091720
break;
17101721
}
17111722

17121723
in_streamed_transaction = false;
1724+
stream_xid = InvalidTransactionId;
17131725

17141726
/*
17151727
* The parallel apply worker could be in a transaction in which case we
@@ -1842,7 +1854,7 @@ apply_handle_stream_abort(StringInfo s)
18421854

18431855
switch (apply_action)
18441856
{
1845-
case TRANS_LEADER_SERIALIZE:
1857+
case TRANS_LEADER_APPLY:
18461858

18471859
/*
18481860
* We are in the leader apply worker and the transaction has been
@@ -1957,7 +1969,7 @@ apply_handle_stream_abort(StringInfo s)
19571969
break;
19581970

19591971
default:
1960-
Assert(false);
1972+
elog(ERROR, "unexpected apply action: %d", (int) apply_action);
19611973
break;
19621974
}
19631975

@@ -2154,7 +2166,7 @@ apply_handle_stream_commit(StringInfo s)
21542166

21552167
switch (apply_action)
21562168
{
2157-
case TRANS_LEADER_SERIALIZE:
2169+
case TRANS_LEADER_APPLY:
21582170

21592171
/*
21602172
* The transaction has been serialized to file, so replay all the
@@ -2226,7 +2238,7 @@ apply_handle_stream_commit(StringInfo s)
22262238
break;
22272239

22282240
default:
2229-
Assert(false);
2241+
elog(ERROR, "unexpected apply action: %d", (int) apply_action);
22302242
break;
22312243
}
22322244

@@ -4204,7 +4216,6 @@ stream_close_file(void)
42044216

42054217
BufFileClose(stream_fd);
42064218

4207-
stream_xid = InvalidTransactionId;
42084219
stream_fd = NULL;
42094220
}
42104221

@@ -4977,10 +4988,12 @@ set_apply_error_context_origin(char *originname)
49774988
}
49784989

49794990
/*
4980-
* Return the action to be taken for the given transaction. *winfo is
4981-
* assigned to the destination parallel worker info when the leader apply
4982-
* worker has to pass all the transaction's changes to the parallel apply
4983-
* worker.
4991+
* Return the action to be taken for the given transaction. See
4992+
* TransApplyAction for information on each of the actions.
4993+
*
4994+
* *winfo is assigned to the destination parallel worker info when the leader
4995+
* apply worker has to pass all the transaction's changes to the parallel
4996+
* apply worker.
49844997
*/
49854998
static TransApplyAction
49864999
get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo)
@@ -4991,27 +5004,35 @@ get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo)
49915004
{
49925005
return TRANS_PARALLEL_APPLY;
49935006
}
4994-
else if (in_remote_transaction)
4995-
{
4996-
return TRANS_LEADER_APPLY;
4997-
}
49985007

49995008
/*
5000-
* Check if we are processing this transaction using a parallel apply
5001-
* worker.
5009+
* If we are processing this transaction using a parallel apply worker then
5010+
* either we send the changes to the parallel worker or if the worker is busy
5011+
* then serialize the changes to the file which will later be processed by
5012+
* the parallel worker.
50025013
*/
50035014
*winfo = pa_find_worker(xid);
50045015

5005-
if (!*winfo)
5016+
if (*winfo && (*winfo)->serialize_changes)
50065017
{
5007-
return TRANS_LEADER_SERIALIZE;
5018+
return TRANS_LEADER_PARTIAL_SERIALIZE;
50085019
}
5009-
else if ((*winfo)->serialize_changes)
5020+
else if (*winfo)
50105021
{
5011-
return TRANS_LEADER_PARTIAL_SERIALIZE;
5022+
return TRANS_LEADER_SEND_TO_PARALLEL;
5023+
}
5024+
5025+
/*
5026+
* If there is no parallel worker involved to process this transaction then
5027+
* we either directly apply the change or serialize it to a file which will
5028+
* later be applied when the transaction finish message is processed.
5029+
*/
5030+
else if (in_streamed_transaction)
5031+
{
5032+
return TRANS_LEADER_SERIALIZE;
50125033
}
50135034
else
50145035
{
5015-
return TRANS_LEADER_SEND_TO_PARALLEL;
5036+
return TRANS_LEADER_APPLY;
50165037
}
50175038
}

0 commit comments

Comments
 (0)