Skip to content

Commit 2e70a5a

Browse files
committed
remove hash table from twophase.c; clean it
1 parent 724b809 commit 2e70a5a

File tree

1 file changed

+45
-110
lines changed

1 file changed

+45
-110
lines changed

src/backend/access/transam/twophase.c

Lines changed: 45 additions & 110 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,6 @@
8484
#include "storage/proc.h"
8585
#include "storage/procarray.h"
8686
#include "storage/sinvaladt.h"
87-
#include "storage/spin.h"
8887
#include "storage/smgr.h"
8988
#include "utils/builtins.h"
9089
#include "utils/memutils.h"
@@ -138,7 +137,7 @@ typedef struct GlobalTransactionData
138137
int pgprocno; /* ID of associated dummy PGPROC */
139138
BackendId dummyBackendId; /* similar to backend id for backends */
140139
TimestampTz prepared_at; /* time of preparation */
141-
volatile slock_t spinlock; /* spinlock used to protect access to GlobalTransactionData */
140+
142141
/*
143142
* Note that we need to keep track of two LSNs for each GXACT. We keep
144143
* track of the start LSN because this is the address we must use to read
@@ -153,7 +152,6 @@ typedef struct GlobalTransactionData
153152
int locking_pid; /* backend currently working on the xact */
154153
bool valid; /* TRUE if PGPROC entry is in proc array */
155154
bool ondisk; /* TRUE if prepare state file is on disk */
156-
int prep_index; /* Index of prepXacts array */
157155
char gid[GIDSIZE]; /* The GID assigned to the prepared xact */
158156
char state_3pc[MAX_3PC_STATE_SIZE]; /* 3PC transaction state */
159157
} GlobalTransactionData;
@@ -170,9 +168,6 @@ typedef struct TwoPhaseStateData
170168
/* Number of valid prepXacts entries. */
171169
int numPrepXacts;
172170

173-
/* Hash table for gxacts */
174-
GlobalTransaction* hashTable;
175-
176171
/* There are max_prepared_xacts items in this array */
177172
GlobalTransaction prepXacts[FLEXIBLE_ARRAY_MEMBER];
178173
} TwoPhaseStateData;
@@ -259,12 +254,8 @@ static TwoPhaseStateData *TwoPhaseState;
259254
static GlobalTransaction MyLockedGxact = NULL;
260255

261256
static bool twophaseExitRegistered = false;
262-
static TransactionId cached_xid = InvalidTransactionId;
263-
static GlobalTransaction cached_gxact = NULL;
264257

265258
static char* ReadTwoPhaseFile(TransactionId xid, bool give_warnings);
266-
static void XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len);
267-
268259

269260
static void RecordTransactionCommitPrepared(TransactionId xid,
270261
int nchildren,
@@ -297,7 +288,7 @@ TwoPhaseShmemSize(void)
297288

298289
/* Need the fixed struct, the array of pointers, and the GTD structs */
299290
size = offsetof(TwoPhaseStateData, prepXacts);
300-
size = add_size(size, mul_size(max_prepared_xacts*2,
291+
size = add_size(size, mul_size(max_prepared_xacts,
301292
sizeof(GlobalTransaction)));
302293
size = MAXALIGN(size);
303294
size = add_size(size, mul_size(max_prepared_xacts,
@@ -329,18 +320,13 @@ TwoPhaseShmemInit(void)
329320
gxacts = (GlobalTransaction)
330321
((char *) TwoPhaseState +
331322
MAXALIGN(offsetof(TwoPhaseStateData, prepXacts) +
332-
sizeof(GlobalTransaction) * 2 * max_prepared_xacts));
333-
334-
TwoPhaseState->hashTable = &TwoPhaseState->prepXacts[max_prepared_xacts];
335-
323+
sizeof(GlobalTransaction) * max_prepared_xacts));
336324
for (i = 0; i < max_prepared_xacts; i++)
337325
{
338326
/* insert into linked list */
339327
gxacts[i].next = TwoPhaseState->freeGXacts;
340328
TwoPhaseState->freeGXacts = &gxacts[i];
341329

342-
TwoPhaseState->hashTable[i] = NULL;
343-
344330
/* associate it with a PGPROC assigned by InitProcGlobal */
345331
gxacts[i].pgprocno = PreparedXactProcs[i].pgprocno;
346332

@@ -357,8 +343,6 @@ TwoPhaseShmemInit(void)
357343
* technique.
358344
*/
359345
gxacts[i].dummyBackendId = MaxBackends + 1 + i;
360-
SpinLockInit(&gxacts[i].spinlock);
361-
gxacts[i].locking_pid = -1;
362346
}
363347
}
364348
else
@@ -383,7 +367,7 @@ AtAbort_Twophase(void)
383367
{
384368
if (MyLockedGxact == NULL)
385369
return;
386-
Assert(MyLockedGxact->locking_pid >= 0);
370+
387371
/*
388372
* What to do with the locked global transaction entry? If we were in the
389373
* process of preparing the transaction, but haven't written the WAL
@@ -410,8 +394,11 @@ AtAbort_Twophase(void)
410394
}
411395
else
412396
{
413-
MyLockedGxact->locking_pid = -1;
414-
SpinLockRelease(&MyLockedGxact->spinlock);
397+
LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
398+
399+
MyLockedGxact->locking_pid = InvalidBackendId;
400+
401+
LWLockRelease(TwoPhaseStateLock);
415402
}
416403
MyLockedGxact = NULL;
417404
}
@@ -423,9 +410,10 @@ AtAbort_Twophase(void)
423410
void
424411
PostPrepare_Twophase(void)
425412
{
426-
Assert(MyLockedGxact->locking_pid >= 0);
427-
MyLockedGxact->locking_pid = -1;
428-
SpinLockRelease(&MyLockedGxact->spinlock);
413+
LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
414+
MyLockedGxact->locking_pid = InvalidBackendId;
415+
LWLockRelease(TwoPhaseStateLock);
416+
429417
MyLockedGxact = NULL;
430418
}
431419

@@ -470,12 +458,11 @@ MarkAsPreparing(TransactionId xid, const char *gid,
470458
LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
471459

472460
/* Check for conflicting GID */
473-
i = string_hash(gid, 0) % max_prepared_xacts;
474-
for (gxact = TwoPhaseState->hashTable[i]; gxact != NULL; gxact = gxact->next)
461+
for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
475462
{
463+
gxact = TwoPhaseState->prepXacts[i];
476464
if (strcmp(gxact->gid, gid) == 0)
477465
{
478-
LWLockRelease(TwoPhaseStateLock);
479466
ereport(ERROR,
480467
(errcode(ERRCODE_DUPLICATE_OBJECT),
481468
errmsg("transaction identifier \"%s\" is already in use",
@@ -485,28 +472,14 @@ MarkAsPreparing(TransactionId xid, const char *gid,
485472

486473
/* Get a free gxact from the freelist */
487474
if (TwoPhaseState->freeGXacts == NULL)
488-
{
489-
LWLockRelease(TwoPhaseStateLock);
490475
ereport(ERROR,
491476
(errcode(ERRCODE_OUT_OF_MEMORY),
492477
errmsg("maximum number of prepared transactions reached"),
493478
errhint("Increase max_prepared_transactions (currently %d).",
494479
max_prepared_xacts)));
495-
}
496480
gxact = TwoPhaseState->freeGXacts;
497481
TwoPhaseState->freeGXacts = gxact->next;
498482

499-
/* Lock gxact usnig spinlock. We have to release TwoPhaseStateLock LWLock to avoid deadlock and reobtain it after holding spinlock */
500-
LWLockRelease(TwoPhaseStateLock);
501-
SpinLockAcquire(&gxact->spinlock);
502-
LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
503-
504-
Assert(gxact->locking_pid < 0);
505-
506-
/* Include in collision chain */
507-
gxact->next = TwoPhaseState->hashTable[i];
508-
TwoPhaseState->hashTable[i] = gxact;
509-
510483
proc = &ProcGlobal->allProcs[gxact->pgprocno];
511484
pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
512485

@@ -530,10 +503,6 @@ MarkAsPreparing(TransactionId xid, const char *gid,
530503
proc->lwWaitMode = 0;
531504
proc->waitLock = NULL;
532505
proc->waitProcLock = NULL;
533-
534-
cached_xid = xid;
535-
cached_gxact = gxact;
536-
537506
for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
538507
SHMQueueInit(&(proc->myProcLocks[i]));
539508
/* subxid data must be filled later by GXactLoadSubxactData */
@@ -548,7 +517,6 @@ MarkAsPreparing(TransactionId xid, const char *gid,
548517
gxact->locking_pid = MyProcPid;
549518
gxact->valid = false;
550519
gxact->ondisk = false;
551-
gxact->prep_index = TwoPhaseState->numPrepXacts;
552520
strcpy(gxact->gid, gid);
553521
*gxact->state_3pc = '\0';
554522

@@ -615,7 +583,6 @@ MarkAsPrepared(GlobalTransaction gxact)
615583
ProcArrayAdd(&ProcGlobal->allProcs[gxact->pgprocno]);
616584
}
617585

618-
619586
/*
620587
* LockGXact
621588
* Locate the prepared transaction and mark it busy for COMMIT or PREPARE.
@@ -624,7 +591,6 @@ static GlobalTransaction
624591
LockGXact(const char *gid, Oid user)
625592
{
626593
int i;
627-
GlobalTransaction gxact;
628594

629595
/* on first call, register the exit hook */
630596
if (!twophaseExitRegistered)
@@ -633,78 +599,55 @@ LockGXact(const char *gid, Oid user)
633599
twophaseExitRegistered = true;
634600
}
635601

636-
/* here we know in advance that there are no prepared transactions */
637-
if (max_prepared_xacts == 0)
638-
ereport(ERROR,
639-
(errcode(ERRCODE_UNDEFINED_OBJECT),
640-
errmsg("prepared transaction with identifier \"%s\" does not exist",
641-
gid)));
642-
643-
MyLockedGxact = NULL;
644-
i = string_hash(gid, 0) % max_prepared_xacts;
645-
Retry:
646602
LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
647-
for (gxact = TwoPhaseState->hashTable[i]; gxact != NULL; gxact = gxact->next)
648-
{
649-
if (strcmp(gxact->gid, gid) == 0)
650-
{
651-
PGPROC *proc = &ProcGlobal->allProcs[gxact->pgprocno];
652-
653-
/* Lock gxact. We have to release TwoPhaseStateLock LW-Lock to avoid deadlock */
654603

655-
LWLockRelease(TwoPhaseStateLock);
604+
for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
605+
{
606+
GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
607+
PGPROC *proc = &ProcGlobal->allProcs[gxact->pgprocno];
656608

657-
if (MyLockedGxact != gxact) {
658-
if (MyLockedGxact != NULL) {
659-
SpinLockRelease(&MyLockedGxact->spinlock);
660-
}
661-
MyLockedGxact = gxact;
662-
SpinLockAcquire(&gxact->spinlock);
663-
goto Retry;
664-
}
609+
/* Ignore not-yet-valid GIDs */
610+
if (!gxact->valid)
611+
continue;
612+
if (strcmp(gxact->gid, gid) != 0)
613+
continue;
665614

666-
/* Ignore not-yet-valid GIDs */
667-
if (!gxact->valid) {
615+
/* Found it, but has someone else got it locked? */
616+
if (gxact->locking_pid != InvalidBackendId)
668617
ereport(ERROR,
669618
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
670-
errmsg("prepared transaction with identifier \"%s\" is not valid",
619+
errmsg("prepared transaction with identifier \"%s\" is busy",
671620
gid)));
672-
}
673621

674-
if (user != gxact->owner && !superuser_arg(user)) {
622+
if (user != gxact->owner && !superuser_arg(user))
675623
ereport(ERROR,
676624
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
677625
errmsg("permission denied to finish prepared transaction"),
678626
errhint("Must be superuser or the user that prepared the transaction.")));
679-
}
680627

681628
/*
682629
* Note: it probably would be possible to allow committing from
683630
* another database; but at the moment NOTIFY is known not to work and
684631
* there may be some other issues as well. Hence disallow until
685632
* someone gets motivated to make it work.
686633
*/
687-
if (MyDatabaseId != proc->databaseId) {
634+
if (MyDatabaseId != proc->databaseId)
688635
ereport(ERROR,
689636
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
690-
errmsg("prepared transaction belongs to another database"),
637+
errmsg("prepared transaction belongs to another database"),
691638
errhint("Connect to the database where the transaction was prepared to finish it.")));
692-
}
693-
694639

695640
/* OK for me to lock it */
696641
Assert(gxact->locking_pid < 0);
697642
gxact->locking_pid = MyProcPid;
643+
MyLockedGxact = gxact;
644+
645+
LWLockRelease(TwoPhaseStateLock);
698646

699647
return gxact;
700-
}
701-
}
648+
}
702649

703650
LWLockRelease(TwoPhaseStateLock);
704-
if (MyLockedGxact != NULL) {
705-
SpinLockRelease(&MyLockedGxact->spinlock);
706-
MyLockedGxact = NULL;
707-
}
708651

709652
ereport(ERROR,
710653
(errcode(ERRCODE_UNDEFINED_OBJECT),
@@ -725,32 +668,23 @@ static void
725668
RemoveGXact(GlobalTransaction gxact)
726669
{
727670
int i;
728-
GlobalTransaction* prev;
729671

730672
LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
731673

732-
i = string_hash(gxact->gid, 0) % max_prepared_xacts;
733-
734-
for (prev = &TwoPhaseState->hashTable[i]; *prev != NULL; prev = &(*prev)->next)
674+
for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
735675
{
736-
if (gxact == *prev)
676+
if (gxact == TwoPhaseState->prepXacts[i])
737677
{
738678
/* remove from the active array */
739679
TwoPhaseState->numPrepXacts--;
740-
TwoPhaseState->prepXacts[gxact->prep_index] = TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts];
741-
TwoPhaseState->prepXacts[gxact->prep_index]->prep_index = gxact->prep_index;
742-
743-
/* remove from collision list */
744-
*prev = gxact->next;
680+
TwoPhaseState->prepXacts[i] = TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts];
745681

746682
/* and put it back in the freelist */
747683
gxact->next = TwoPhaseState->freeGXacts;
748684
TwoPhaseState->freeGXacts = gxact;
749685

750-
gxact->locking_pid = -1;
751-
686+
gxact->locking_pid = InvalidBackendId;
752687
LWLockRelease(TwoPhaseStateLock);
753-
SpinLockRelease(&gxact->spinlock);
754688

755689
return;
756690
}
@@ -809,9 +743,9 @@ bool GetPreparedTransactionState(char const* gid, char* state)
809743
bool result = false;
810744

811745
LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
812-
i = string_hash(gid, 0) % max_prepared_xacts;
813-
for (gxact = TwoPhaseState->hashTable[i]; gxact != NULL; gxact = gxact->next)
746+
for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
814747
{
748+
gxact = TwoPhaseState->prepXacts[i];
815749
if (strcmp(gxact->gid, gid) == 0)
816750
{
817751
strcpy(state, gxact->state_3pc);
@@ -1011,6 +945,9 @@ TwoPhaseGetGXact(TransactionId xid)
1011945
GlobalTransaction result = NULL;
1012946
int i;
1013947

948+
static TransactionId cached_xid = InvalidTransactionId;
949+
static GlobalTransaction cached_gxact = NULL;
950+
1014951
/*
1015952
* During a recovery, COMMIT PREPARED, or ABORT PREPARED, we'll be called
1016953
* repeatedly for the same XID. We can save work with a simple cache.
@@ -1194,7 +1131,6 @@ EndPrepare(GlobalTransaction gxact)
11941131
{
11951132
TwoPhaseFileHeader *hdr;
11961133
StateFileChunk *record;
1197-
uint8 info = XLOG_XACT_PREPARE;
11981134
bool replorigin;
11991135

12001136
replorigin = (replorigin_session_origin != InvalidRepOriginId &&
@@ -1244,6 +1180,7 @@ EndPrepare(GlobalTransaction gxact)
12441180
XLogEnsureRecordSpace(0, records.num_chunks);
12451181

12461182
START_CRIT_SECTION();
1183+
12471184
MyPgXact->delayChkpt = true;
12481185

12491186
XLogBeginInsert();
@@ -1253,7 +1190,7 @@ EndPrepare(GlobalTransaction gxact)
12531190
if (replorigin)
12541191
XLogIncludeOrigin();
12551192

1256-
gxact->prepare_end_lsn = XLogInsert(RM_XACT_ID, info);
1193+
gxact->prepare_end_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE);
12571194

12581195
if (replorigin)
12591196
/* Move LSNs forward for this replication origin */
@@ -1381,7 +1318,6 @@ ReadTwoPhaseFile(TransactionId xid, bool give_warnings)
13811318
stat.st_size > MaxAllocSize)
13821319
{
13831320
CloseTransientFile(fd);
1384-
fprintf(stderr, "wrong size of two-phase file \"%s\"\n", path);
13851321
return NULL;
13861322
}
13871323

@@ -1427,7 +1363,6 @@ ReadTwoPhaseFile(TransactionId xid, bool give_warnings)
14271363
if (!EQ_CRC32C(calc_crc, file_crc))
14281364
{
14291365
pfree(buf);
1430-
fprintf(stderr, "wrong crc32 in two-phase file \"%s\"\n", path);
14311366
return NULL;
14321367
}
14331368

0 commit comments

Comments
 (0)