Skip to content

Commit c451eaf

Browse files
committed
Transfer current command counter ID to parallel workers.
Commit 924bcf4 correctly forbade parallel workers to modify the command counter while in parallel mode, but it inexplicably neglected to actually transfer the current command counter from leader to workers. This can result in the workers seeing a different set of tuples from the leader, which is bad. Repair.
1 parent 26981d2 commit c451eaf

File tree

1 file changed

+25
-21
lines changed
  • src/backend/access/transam

1 file changed

+25
-21
lines changed

src/backend/access/transam/xact.c

+25-21
Original file line numberDiff line numberDiff line change
@@ -4786,8 +4786,8 @@ Size
47864786
EstimateTransactionStateSpace(void)
47874787
{
47884788
TransactionState s;
4789-
Size nxids = 5; /* iso level, deferrable, top & current XID,
4790-
* XID count */
4789+
Size nxids = 6; /* iso level, deferrable, top & current XID,
4790+
* command counter, XID count */
47914791

47924792
for (s = CurrentTransactionState; s != NULL; s = s->parent)
47934793
{
@@ -4807,27 +4807,30 @@ EstimateTransactionStateSpace(void)
48074807
*
48084808
* We need to save and restore XactDeferrable, XactIsoLevel, and the XIDs
48094809
* associated with this transaction. The first eight bytes of the result
4810-
* contain XactDeferrable and XactIsoLevel; the next eight bytes contain the
4811-
* XID of the top-level transaction and the XID of the current transaction
4812-
* (or, in each case, InvalidTransactionId if none). After that, the next 4
4813-
* bytes contain a count of how many additional XIDs follow; this is followed
4814-
* by all of those XIDs one after another. We emit the XIDs in sorted order
4815-
* for the convenience of the receiving process.
4810+
* contain XactDeferrable and XactIsoLevel; the next twelve bytes contain the
4811+
* XID of the top-level transaction, the XID of the current transaction
4812+
* (or, in each case, InvalidTransactionId if none), and the current command
4813+
* counter. After that, the next 4 bytes contain a count of how many
4814+
* additional XIDs follow; this is followed by all of those XIDs one after
4815+
* another. We emit the XIDs in sorted order for the convenience of the
4816+
* receiving process.
48164817
*/
48174818
void
48184819
SerializeTransactionState(Size maxsize, char *start_address)
48194820
{
48204821
TransactionState s;
48214822
Size nxids = 0;
48224823
Size i = 0;
4824+
Size c = 0;
48234825
TransactionId *workspace;
48244826
TransactionId *result = (TransactionId *) start_address;
48254827

4826-
Assert(maxsize >= 5 * sizeof(TransactionId));
4827-
result[0] = (TransactionId) XactIsoLevel;
4828-
result[1] = (TransactionId) XactDeferrable;
4829-
result[2] = XactTopTransactionId;
4830-
result[3] = CurrentTransactionState->transactionId;
4828+
result[c++] = (TransactionId) XactIsoLevel;
4829+
result[c++] = (TransactionId) XactDeferrable;
4830+
result[c++] = XactTopTransactionId;
4831+
result[c++] = CurrentTransactionState->transactionId;
4832+
result[c++] = (TransactionId) currentCommandId;
4833+
Assert(maxsize >= c * sizeof(TransactionId));
48314834

48324835
/*
48334836
* If we're running in a parallel worker and launching a parallel worker
@@ -4836,9 +4839,9 @@ SerializeTransactionState(Size maxsize, char *start_address)
48364839
*/
48374840
if (nParallelCurrentXids > 0)
48384841
{
4839-
Assert(maxsize > (nParallelCurrentXids + 4) * sizeof(TransactionId));
4840-
result[4] = nParallelCurrentXids;
4841-
memcpy(&result[5], ParallelCurrentXids,
4842+
result[c++] = nParallelCurrentXids;
4843+
Assert(maxsize >= (nParallelCurrentXids + c) * sizeof(TransactionId));
4844+
memcpy(&result[c], ParallelCurrentXids,
48424845
nParallelCurrentXids * sizeof(TransactionId));
48434846
return;
48444847
}
@@ -4853,7 +4856,7 @@ SerializeTransactionState(Size maxsize, char *start_address)
48534856
nxids = add_size(nxids, 1);
48544857
nxids = add_size(nxids, s->nChildXids);
48554858
}
4856-
Assert(nxids * sizeof(TransactionId) < maxsize);
4859+
Assert((c + 1 + nxids) * sizeof(TransactionId) <= maxsize);
48574860

48584861
/* Copy them to our scratch space. */
48594862
workspace = palloc(nxids * sizeof(TransactionId));
@@ -4871,8 +4874,8 @@ SerializeTransactionState(Size maxsize, char *start_address)
48714874
qsort(workspace, nxids, sizeof(TransactionId), xidComparator);
48724875

48734876
/* Copy data into output area. */
4874-
result[4] = (TransactionId) nxids;
4875-
memcpy(&result[5], workspace, nxids * sizeof(TransactionId));
4877+
result[c++] = (TransactionId) nxids;
4878+
memcpy(&result[c], workspace, nxids * sizeof(TransactionId));
48764879
}
48774880

48784881
/*
@@ -4892,8 +4895,9 @@ StartParallelWorkerTransaction(char *tstatespace)
48924895
XactDeferrable = (bool) tstate[1];
48934896
XactTopTransactionId = tstate[2];
48944897
CurrentTransactionState->transactionId = tstate[3];
4895-
nParallelCurrentXids = (int) tstate[4];
4896-
ParallelCurrentXids = &tstate[5];
4898+
currentCommandId = tstate[4];
4899+
nParallelCurrentXids = (int) tstate[5];
4900+
ParallelCurrentXids = &tstate[6];
48974901

48984902
CurrentTransactionState->blockState = TBLOCK_PARALLEL_INPROGRESS;
48994903
}

0 commit comments

Comments
 (0)