Skip to content

Commit 53edddb

Browse files
alvherrekelvich
authored andcommitted
pick new 2pc stuff
1 parent 5c7c9f4 commit 53edddb

File tree

4 files changed

+120
-83
lines changed

4 files changed

+120
-83
lines changed

src/backend/access/transam/twophase.c

Lines changed: 61 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,10 @@ static struct xllist
270270
static TwoPhaseStateData *TwoPhaseState;
271271

272272
/*
273-
* Global transaction entry currently locked by us, if any.
273+
* Global transaction entry currently locked by us, if any. Note that any
274+
* access to the entry pointed to by this variable must be protected by
275+
* TwoPhaseStateLock, though obviously the pointer itself doesn't need to be
276+
* (since it's just local memory).
274277
*/
275278
static GlobalTransaction MyLockedGxact = NULL;
276279

@@ -418,18 +421,13 @@ AtAbort_Twophase(void)
418421
* resources held by the transaction yet. In those cases, the in-memory
419422
* state can be wrong, but it's too late to back out.
420423
*/
424+
LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
421425
if (!MyLockedGxact->valid)
422-
{
423426
RemoveGXact(MyLockedGxact);
424-
}
425427
else
426-
{
427-
LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
428-
429428
MyLockedGxact->locking_pid = InvalidBackendId;
429+
LWLockRelease(TwoPhaseStateLock);
430430

431-
LWLockRelease(TwoPhaseStateLock);
432-
}
433431
MyLockedGxact = NULL;
434432
}
435433

@@ -534,6 +532,8 @@ MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid, const char *gid,
534532
PGXACT *pgxact;
535533
int i;
536534

535+
Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
536+
537537
Assert(gxact != NULL);
538538
proc = &ProcGlobal->allProcs[gxact->pgprocno];
539539
pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
@@ -611,15 +611,19 @@ GXactLoadSubxactData(GlobalTransaction gxact, int nsubxacts,
611611
/*
612612
* MarkAsPrepared
613613
* Mark the GXACT as fully valid, and enter it into the global ProcArray.
614+
*
615+
* lock_held indicates whether caller already holds TwoPhaseStateLock.
614616
*/
615617
static void
616-
MarkAsPrepared(GlobalTransaction gxact)
618+
MarkAsPrepared(GlobalTransaction gxact, bool lock_held)
617619
{
618620
/* Lock here may be overkill, but I'm not convinced of that ... */
619-
LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
621+
if (!lock_held)
622+
LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
620623
Assert(!gxact->valid);
621624
gxact->valid = true;
622-
LWLockRelease(TwoPhaseStateLock);
625+
if (!lock_held)
626+
LWLockRelease(TwoPhaseStateLock);
623627

624628
/*
625629
* Put it into the global ProcArray so TransactionIdIsInProgress considers
@@ -714,7 +718,7 @@ RemoveGXact(GlobalTransaction gxact)
714718
{
715719
int i;
716720

717-
LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
721+
Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
718722

719723
for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
720724
{
@@ -728,15 +732,10 @@ RemoveGXact(GlobalTransaction gxact)
728732
gxact->next = TwoPhaseState->freeGXacts;
729733
TwoPhaseState->freeGXacts = gxact;
730734

731-
gxact->locking_pid = InvalidBackendId;
732-
LWLockRelease(TwoPhaseStateLock);
733-
734735
return;
735736
}
736737
}
737738

738-
LWLockRelease(TwoPhaseStateLock);
739-
740739
elog(ERROR, "failed to find %p in GlobalTransaction array", gxact);
741740
}
742741

@@ -1261,7 +1260,7 @@ EndPrepare(GlobalTransaction gxact)
12611260
* the xact crashed. Instead we have a window where the same XID appears
12621261
* twice in ProcArray, which is OK.
12631262
*/
1264-
MarkAsPrepared(gxact);
1263+
MarkAsPrepared(gxact, false);
12651264

12661265
/*
12671266
* Now we can mark ourselves as out of the commit critical section: a
@@ -1688,7 +1687,9 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
16881687
if (gxact->ondisk)
16891688
RemoveTwoPhaseFile(xid, true);
16901689

1690+
LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
16911691
RemoveGXact(gxact);
1692+
LWLockRelease(TwoPhaseStateLock);
16921693
MyLockedGxact = NULL;
16931694

16941695
pfree(buf);
@@ -1917,6 +1918,7 @@ restoreTwoPhaseData(void)
19171918
struct dirent *clde;
19181919

19191920
cldir = AllocateDir(TWOPHASE_DIR);
1921+
LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
19201922
while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL)
19211923
{
19221924
if (strlen(clde->d_name) == 2*sizeof(TransactionId) &&
@@ -1936,6 +1938,7 @@ restoreTwoPhaseData(void)
19361938
PrepareRedoAdd(buf, InvalidXLogRecPtr, InvalidXLogRecPtr);
19371939
}
19381940
}
1941+
LWLockRelease(TwoPhaseStateLock);
19391942
FreeDir(cldir);
19401943
}
19411944

@@ -1977,7 +1980,7 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
19771980
int allocsize = 0;
19781981
int i;
19791982

1980-
LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
1983+
LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
19811984
for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
19821985
{
19831986
TransactionId xid;
@@ -2055,7 +2058,7 @@ StandbyRecoverPreparedTransactions(bool overwriteOK)
20552058
{
20562059
int i;
20572060

2058-
LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
2061+
LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
20592062
for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
20602063
{
20612064
TransactionId xid;
@@ -2082,16 +2085,22 @@ StandbyRecoverPreparedTransactions(bool overwriteOK)
20822085
* Scan the shared memory entries of TwoPhaseState and reload the state for
20832086
* each prepared transaction (reacquire locks, etc).
20842087
*
2085-
* This is run during database startup.
2088+
* This is run at the end of recovery, but before we allow backends to write
2089+
* WAL.
2090+
*
2091+
* At the end of recovery the way we take snapshots will change. We now need
2092+
* to mark all running transactions with their full SubTransSetParent() info
2093+
* to allow normal snapshots to work correctly if snapshots overflow.
2094+
* We do this here because by definition prepared transactions are the only
2095+
* type of write transaction still running, so this is necessary and
2096+
* complete.
20862097
*/
20872098
void
20882099
RecoverPreparedTransactions(void)
20892100
{
20902101
int i;
20912102

2092-
/*
2093-
* Don't need a lock in the recovery phase.
2094-
*/
2103+
LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
20952104
for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
20962105
{
20972106
TransactionId xid;
@@ -2114,7 +2123,7 @@ RecoverPreparedTransactions(void)
21142123
continue;
21152124

21162125
ereport(LOG,
2117-
(errmsg("recovering prepared transaction %u from shared memory", xid)));
2126+
(errmsg("recovering prepared transaction " XID_FMT " from shared memory", xid)));
21182127

21192128
hdr = (TwoPhaseFileHeader *) buf;
21202129
Assert(TransactionIdEquals(hdr->xid, xid));
@@ -2151,21 +2160,20 @@ RecoverPreparedTransactions(void)
21512160
* it was added in redo and already has a shmem entry for
21522161
* it.
21532162
*/
2154-
LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
21552163
MarkAsPreparingGuts(gxact, xid, gid,
21562164
hdr->prepared_at,
21572165
hdr->owner, hdr->database);
21582166

21592167
/* recovered, so reset the flag for entries generated by redo */
21602168
gxact->inredo = false;
21612169

2162-
LWLockRelease(TwoPhaseStateLock);
2163-
21642170
GXactLoadSubxactData(gxact, hdr->nsubxacts, subxids);
2165-
MarkAsPrepared(gxact);
2171+
MarkAsPrepared(gxact, true);
2172+
2173+
LWLockRelease(TwoPhaseStateLock);
21662174

21672175
/*
2168-
* Recover other state (notably locks) using resource managers
2176+
* Recover other state (notably locks) using resource managers.
21692177
*/
21702178
ProcessRecords(bufptr, xid, twophase_recover_callbacks);
21712179

@@ -2185,7 +2193,11 @@ RecoverPreparedTransactions(void)
21852193
PostPrepare_Twophase();
21862194

21872195
pfree(buf);
2196+
2197+
LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
21882198
}
2199+
2200+
LWLockRelease(TwoPhaseStateLock);
21892201
}
21902202

21912203
/*
@@ -2216,6 +2228,8 @@ ProcessTwoPhaseBuffer(TransactionId xid,
22162228
TwoPhaseFileHeader *hdr;
22172229
int i;
22182230

2231+
Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
2232+
22192233
if (!fromdisk)
22202234
Assert(prepare_start_lsn != InvalidXLogRecPtr);
22212235

@@ -2230,15 +2244,15 @@ ProcessTwoPhaseBuffer(TransactionId xid,
22302244
if (fromdisk)
22312245
{
22322246
ereport(WARNING,
2233-
(errmsg("removing stale two-phase state file for \"%u\"",
2247+
(errmsg("removing stale two-phase state file for \"" XID_FMT "\"",
22342248
xid)));
22352249
RemoveTwoPhaseFile(xid, true);
22362250
}
22372251
else
22382252
{
22392253
ereport(WARNING,
2240-
(errmsg("removing stale two-phase state from"
2241-
" shared memory for \"%u\"", xid)));
2254+
(errmsg("removing stale two-phase state from shared memory for \"" XID_FMT "\"",
2255+
xid)));
22422256
PrepareRedoRemove(xid, true);
22432257
}
22442258
return NULL;
@@ -2250,14 +2264,14 @@ ProcessTwoPhaseBuffer(TransactionId xid,
22502264
if (fromdisk)
22512265
{
22522266
ereport(WARNING,
2253-
(errmsg("removing future two-phase state file for \"%u\"",
2267+
(errmsg("removing future two-phase state file for \"" XID_FMT "\"",
22542268
xid)));
22552269
RemoveTwoPhaseFile(xid, true);
22562270
}
22572271
else
22582272
{
22592273
ereport(WARNING,
2260-
(errmsg("removing future two-phase state from memory for \"%u\"",
2274+
(errmsg("removing future two-phase state from memory for \"" XID_FMT "\"",
22612275
xid)));
22622276
PrepareRedoRemove(xid, true);
22632277
}
@@ -2271,7 +2285,7 @@ ProcessTwoPhaseBuffer(TransactionId xid,
22712285
if (buf == NULL)
22722286
{
22732287
ereport(WARNING,
2274-
(errmsg("removing corrupt two-phase state file for \"%u\"",
2288+
(errmsg("removing corrupt two-phase state file for \"" XID_FMT "\"",
22752289
xid)));
22762290
RemoveTwoPhaseFile(xid, true);
22772291
return NULL;
@@ -2290,14 +2304,14 @@ ProcessTwoPhaseBuffer(TransactionId xid,
22902304
if (fromdisk)
22912305
{
22922306
ereport(WARNING,
2293-
(errmsg("removing corrupt two-phase state file for \"%u\"",
2307+
(errmsg("removing corrupt two-phase state file for \"" XID_FMT "\"",
22942308
xid)));
22952309
RemoveTwoPhaseFile(xid, true);
22962310
}
22972311
else
22982312
{
22992313
ereport(WARNING,
2300-
(errmsg("removing corrupt two-phase state from memory for \"%u\"",
2314+
(errmsg("removing corrupt two-phase state from memory for \"" XID_FMT "\"",
23012315
xid)));
23022316
PrepareRedoRemove(xid, true);
23032317
}
@@ -2565,6 +2579,7 @@ PrepareRedoAdd(char *buf, XLogRecPtr start_lsn, XLogRecPtr end_lsn)
25652579
const char *gid;
25662580
GlobalTransaction gxact;
25672581

2582+
Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
25682583
Assert(RecoveryInProgress());
25692584

25702585
bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
@@ -2581,7 +2596,6 @@ PrepareRedoAdd(char *buf, XLogRecPtr start_lsn, XLogRecPtr end_lsn)
25812596
* that it got added in the redo phase
25822597
*/
25832598

2584-
LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
25852599
/* Get a free gxact from the freelist */
25862600
if (TwoPhaseState->freeGXacts == NULL)
25872601
ereport(ERROR,
@@ -2607,27 +2621,27 @@ PrepareRedoAdd(char *buf, XLogRecPtr start_lsn, XLogRecPtr end_lsn)
26072621
Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts);
26082622
TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact;
26092623

2610-
LWLockRelease(TwoPhaseStateLock);
2611-
2612-
elog(DEBUG2, "Adding 2PC data to shared memory %u", gxact->xid);
2624+
elog(DEBUG2, "added 2PC data in shared memory for transaction " XID_FMT, gxact->xid);
26132625
}
26142626

26152627
/*
26162628
* PrepareRedoRemove
26172629
*
2618-
* Remove the corresponding gxact entry from TwoPhaseState. Also
2619-
* remove the 2PC file if a prepared transaction was saved via
2620-
* an earlier checkpoint.
2630+
* Remove the corresponding gxact entry from TwoPhaseState. Also remove
2631+
* the 2PC file if a prepared transaction was saved via an earlier checkpoint.
2632+
*
2633+
* Caller must hold TwoPhaseStateLock in exclusive mode, because TwoPhaseState
2634+
* is updated.
26212635
*/
26222636
void
26232637
PrepareRedoRemove(TransactionId xid, bool giveWarning)
26242638
{
26252639
GlobalTransaction gxact = NULL;
26262640
int i;
26272641

2642+
Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
26282643
Assert(RecoveryInProgress());
26292644

2630-
LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
26312645
for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
26322646
{
26332647
gxact = TwoPhaseState->prepXacts[i];
@@ -2638,7 +2652,6 @@ PrepareRedoRemove(TransactionId xid, bool giveWarning)
26382652
break;
26392653
}
26402654
}
2641-
LWLockRelease(TwoPhaseStateLock);
26422655

26432656
/*
26442657
* Just leave if there is nothing, this is expected during WAL replay.
@@ -2649,7 +2662,7 @@ PrepareRedoRemove(TransactionId xid, bool giveWarning)
26492662
/*
26502663
* And now we can clean up any files we may have left.
26512664
*/
2652-
elog(DEBUG2, "Removing 2PC data from shared memory %u", xid);
2665+
elog(DEBUG2, "removing 2PC data for transaction " XID_FMT, xid);
26532666
if (gxact->ondisk)
26542667
RemoveTwoPhaseFile(xid, giveWarning);
26552668
RemoveGXact(gxact);

0 commit comments

Comments
 (0)