84
84
#include "storage/proc.h"
85
85
#include "storage/procarray.h"
86
86
#include "storage/sinvaladt.h"
87
- #include "storage/spin.h"
88
87
#include "storage/smgr.h"
89
88
#include "utils/builtins.h"
90
89
#include "utils/memutils.h"
@@ -138,7 +137,7 @@ typedef struct GlobalTransactionData
138
137
int pgprocno ; /* ID of associated dummy PGPROC */
139
138
BackendId dummyBackendId ; /* similar to backend id for backends */
140
139
TimestampTz prepared_at ; /* time of preparation */
141
- volatile slock_t spinlock ; /* spinlock used to protect access to GlobalTransactionData */
140
+
142
141
/*
143
142
* Note that we need to keep track of two LSNs for each GXACT. We keep
144
143
* track of the start LSN because this is the address we must use to read
@@ -153,7 +152,6 @@ typedef struct GlobalTransactionData
153
152
int locking_pid ; /* backend currently working on the xact */
154
153
bool valid ; /* TRUE if PGPROC entry is in proc array */
155
154
bool ondisk ; /* TRUE if prepare state file is on disk */
156
- int prep_index ; /* Index of prepXacts array */
157
155
char gid [GIDSIZE ]; /* The GID assigned to the prepared xact */
158
156
char state_3pc [MAX_3PC_STATE_SIZE ]; /* 3PC transaction state */
159
157
} GlobalTransactionData ;
@@ -170,9 +168,6 @@ typedef struct TwoPhaseStateData
170
168
/* Number of valid prepXacts entries. */
171
169
int numPrepXacts ;
172
170
173
- /* Hash table for gxacts */
174
- GlobalTransaction * hashTable ;
175
-
176
171
/* There are max_prepared_xacts items in this array */
177
172
GlobalTransaction prepXacts [FLEXIBLE_ARRAY_MEMBER ];
178
173
} TwoPhaseStateData ;
@@ -259,12 +254,8 @@ static TwoPhaseStateData *TwoPhaseState;
259
254
static GlobalTransaction MyLockedGxact = NULL ;
260
255
261
256
static bool twophaseExitRegistered = false;
262
- static TransactionId cached_xid = InvalidTransactionId ;
263
- static GlobalTransaction cached_gxact = NULL ;
264
257
265
258
static char * ReadTwoPhaseFile (TransactionId xid , bool give_warnings );
266
- static void XlogReadTwoPhaseData (XLogRecPtr lsn , char * * buf , int * len );
267
-
268
259
269
260
static void RecordTransactionCommitPrepared (TransactionId xid ,
270
261
int nchildren ,
@@ -297,7 +288,7 @@ TwoPhaseShmemSize(void)
297
288
298
289
/* Need the fixed struct, the array of pointers, and the GTD structs */
299
290
size = offsetof(TwoPhaseStateData , prepXacts );
300
- size = add_size (size , mul_size (max_prepared_xacts * 2 ,
291
+ size = add_size (size , mul_size (max_prepared_xacts ,
301
292
sizeof (GlobalTransaction )));
302
293
size = MAXALIGN (size );
303
294
size = add_size (size , mul_size (max_prepared_xacts ,
@@ -329,18 +320,13 @@ TwoPhaseShmemInit(void)
329
320
gxacts = (GlobalTransaction )
330
321
((char * ) TwoPhaseState +
331
322
MAXALIGN (offsetof(TwoPhaseStateData , prepXacts ) +
332
- sizeof (GlobalTransaction ) * 2 * max_prepared_xacts ));
333
-
334
- TwoPhaseState -> hashTable = & TwoPhaseState -> prepXacts [max_prepared_xacts ];
335
-
323
+ sizeof (GlobalTransaction ) * max_prepared_xacts ));
336
324
for (i = 0 ; i < max_prepared_xacts ; i ++ )
337
325
{
338
326
/* insert into linked list */
339
327
gxacts [i ].next = TwoPhaseState -> freeGXacts ;
340
328
TwoPhaseState -> freeGXacts = & gxacts [i ];
341
329
342
- TwoPhaseState -> hashTable [i ] = NULL ;
343
-
344
330
/* associate it with a PGPROC assigned by InitProcGlobal */
345
331
gxacts [i ].pgprocno = PreparedXactProcs [i ].pgprocno ;
346
332
@@ -357,8 +343,6 @@ TwoPhaseShmemInit(void)
357
343
* technique.
358
344
*/
359
345
gxacts [i ].dummyBackendId = MaxBackends + 1 + i ;
360
- SpinLockInit (& gxacts [i ].spinlock );
361
- gxacts [i ].locking_pid = -1 ;
362
346
}
363
347
}
364
348
else
@@ -383,7 +367,7 @@ AtAbort_Twophase(void)
383
367
{
384
368
if (MyLockedGxact == NULL )
385
369
return ;
386
- Assert ( MyLockedGxact -> locking_pid >= 0 );
370
+
387
371
/*
388
372
* What to do with the locked global transaction entry? If we were in the
389
373
* process of preparing the transaction, but haven't written the WAL
@@ -410,8 +394,11 @@ AtAbort_Twophase(void)
410
394
}
411
395
else
412
396
{
413
- MyLockedGxact -> locking_pid = -1 ;
414
- SpinLockRelease (& MyLockedGxact -> spinlock );
397
+ LWLockAcquire (TwoPhaseStateLock , LW_EXCLUSIVE );
398
+
399
+ MyLockedGxact -> locking_pid = InvalidBackendId ;
400
+
401
+ LWLockRelease (TwoPhaseStateLock );
415
402
}
416
403
MyLockedGxact = NULL ;
417
404
}
@@ -423,9 +410,10 @@ AtAbort_Twophase(void)
423
410
void
424
411
PostPrepare_Twophase (void )
425
412
{
426
- Assert (MyLockedGxact -> locking_pid >= 0 );
427
- MyLockedGxact -> locking_pid = -1 ;
428
- SpinLockRelease (& MyLockedGxact -> spinlock );
413
+ LWLockAcquire (TwoPhaseStateLock , LW_EXCLUSIVE );
414
+ MyLockedGxact -> locking_pid = InvalidBackendId ;
415
+ LWLockRelease (TwoPhaseStateLock );
416
+
429
417
MyLockedGxact = NULL ;
430
418
}
431
419
@@ -470,12 +458,11 @@ MarkAsPreparing(TransactionId xid, const char *gid,
470
458
LWLockAcquire (TwoPhaseStateLock , LW_EXCLUSIVE );
471
459
472
460
/* Check for conflicting GID */
473
- i = string_hash (gid , 0 ) % max_prepared_xacts ;
474
- for (gxact = TwoPhaseState -> hashTable [i ]; gxact != NULL ; gxact = gxact -> next )
461
+ for (i = 0 ; i < TwoPhaseState -> numPrepXacts ; i ++ )
475
462
{
463
+ gxact = TwoPhaseState -> prepXacts [i ];
476
464
if (strcmp (gxact -> gid , gid ) == 0 )
477
465
{
478
- LWLockRelease (TwoPhaseStateLock );
479
466
ereport (ERROR ,
480
467
(errcode (ERRCODE_DUPLICATE_OBJECT ),
481
468
errmsg ("transaction identifier \"%s\" is already in use" ,
@@ -485,28 +472,14 @@ MarkAsPreparing(TransactionId xid, const char *gid,
485
472
486
473
/* Get a free gxact from the freelist */
487
474
if (TwoPhaseState -> freeGXacts == NULL )
488
- {
489
- LWLockRelease (TwoPhaseStateLock );
490
475
ereport (ERROR ,
491
476
(errcode (ERRCODE_OUT_OF_MEMORY ),
492
477
errmsg ("maximum number of prepared transactions reached" ),
493
478
errhint ("Increase max_prepared_transactions (currently %d)." ,
494
479
max_prepared_xacts )));
495
- }
496
480
gxact = TwoPhaseState -> freeGXacts ;
497
481
TwoPhaseState -> freeGXacts = gxact -> next ;
498
482
499
- /* Lock gxact usnig spinlock. We have to release TwoPhaseStateLock LWLock to avoid deadlock and reobtain it after holding spinlock */
500
- LWLockRelease (TwoPhaseStateLock );
501
- SpinLockAcquire (& gxact -> spinlock );
502
- LWLockAcquire (TwoPhaseStateLock , LW_EXCLUSIVE );
503
-
504
- Assert (gxact -> locking_pid < 0 );
505
-
506
- /* Include in collision chain */
507
- gxact -> next = TwoPhaseState -> hashTable [i ];
508
- TwoPhaseState -> hashTable [i ] = gxact ;
509
-
510
483
proc = & ProcGlobal -> allProcs [gxact -> pgprocno ];
511
484
pgxact = & ProcGlobal -> allPgXact [gxact -> pgprocno ];
512
485
@@ -530,10 +503,6 @@ MarkAsPreparing(TransactionId xid, const char *gid,
530
503
proc -> lwWaitMode = 0 ;
531
504
proc -> waitLock = NULL ;
532
505
proc -> waitProcLock = NULL ;
533
-
534
- cached_xid = xid ;
535
- cached_gxact = gxact ;
536
-
537
506
for (i = 0 ; i < NUM_LOCK_PARTITIONS ; i ++ )
538
507
SHMQueueInit (& (proc -> myProcLocks [i ]));
539
508
/* subxid data must be filled later by GXactLoadSubxactData */
@@ -548,7 +517,6 @@ MarkAsPreparing(TransactionId xid, const char *gid,
548
517
gxact -> locking_pid = MyProcPid ;
549
518
gxact -> valid = false;
550
519
gxact -> ondisk = false;
551
- gxact -> prep_index = TwoPhaseState -> numPrepXacts ;
552
520
strcpy (gxact -> gid , gid );
553
521
* gxact -> state_3pc = '\0' ;
554
522
@@ -615,7 +583,6 @@ MarkAsPrepared(GlobalTransaction gxact)
615
583
ProcArrayAdd (& ProcGlobal -> allProcs [gxact -> pgprocno ]);
616
584
}
617
585
618
-
619
586
/*
620
587
* LockGXact
621
588
* Locate the prepared transaction and mark it busy for COMMIT or PREPARE.
@@ -624,7 +591,6 @@ static GlobalTransaction
624
591
LockGXact (const char * gid , Oid user )
625
592
{
626
593
int i ;
627
- GlobalTransaction gxact ;
628
594
629
595
/* on first call, register the exit hook */
630
596
if (!twophaseExitRegistered )
@@ -633,78 +599,55 @@ LockGXact(const char *gid, Oid user)
633
599
twophaseExitRegistered = true;
634
600
}
635
601
636
- /* here we know in advance that there are no prepared transactions */
637
- if (max_prepared_xacts == 0 )
638
- ereport (ERROR ,
639
- (errcode (ERRCODE_UNDEFINED_OBJECT ),
640
- errmsg ("prepared transaction with identifier \"%s\" does not exist" ,
641
- gid )));
642
-
643
- MyLockedGxact = NULL ;
644
- i = string_hash (gid , 0 ) % max_prepared_xacts ;
645
- Retry :
646
602
LWLockAcquire (TwoPhaseStateLock , LW_EXCLUSIVE );
647
- for (gxact = TwoPhaseState -> hashTable [i ]; gxact != NULL ; gxact = gxact -> next )
648
- {
649
- if (strcmp (gxact -> gid , gid ) == 0 )
650
- {
651
- PGPROC * proc = & ProcGlobal -> allProcs [gxact -> pgprocno ];
652
-
653
- /* Lock gxact. We have to release TwoPhaseStateLock LW-Lock to avoid deadlock */
654
603
655
- LWLockRelease (TwoPhaseStateLock );
604
+ for (i = 0 ; i < TwoPhaseState -> numPrepXacts ; i ++ )
605
+ {
606
+ GlobalTransaction gxact = TwoPhaseState -> prepXacts [i ];
607
+ PGPROC * proc = & ProcGlobal -> allProcs [gxact -> pgprocno ];
656
608
657
- if (MyLockedGxact != gxact ) {
658
- if (MyLockedGxact != NULL ) {
659
- SpinLockRelease (& MyLockedGxact -> spinlock );
660
- }
661
- MyLockedGxact = gxact ;
662
- SpinLockAcquire (& gxact -> spinlock );
663
- goto Retry ;
664
- }
609
+ /* Ignore not-yet-valid GIDs */
610
+ if (!gxact -> valid )
611
+ continue ;
612
+ if (strcmp (gxact -> gid , gid ) != 0 )
613
+ continue ;
665
614
666
- /* Ignore not-yet-valid GIDs */
667
- if (! gxact -> valid ) {
615
+ /* Found it, but has someone else got it locked? */
616
+ if (gxact -> locking_pid != InvalidBackendId )
668
617
ereport (ERROR ,
669
618
(errcode (ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ),
670
- errmsg ("prepared transaction with identifier \"%s\" is not valid " ,
619
+ errmsg ("prepared transaction with identifier \"%s\" is busy " ,
671
620
gid )));
672
- }
673
621
674
- if (user != gxact -> owner && !superuser_arg (user )) {
622
+ if (user != gxact -> owner && !superuser_arg (user ))
675
623
ereport (ERROR ,
676
624
(errcode (ERRCODE_INSUFFICIENT_PRIVILEGE ),
677
625
errmsg ("permission denied to finish prepared transaction" ),
678
626
errhint ("Must be superuser or the user that prepared the transaction." )));
679
- }
680
627
681
628
/*
682
629
* Note: it probably would be possible to allow committing from
683
630
* another database; but at the moment NOTIFY is known not to work and
684
631
* there may be some other issues as well. Hence disallow until
685
632
* someone gets motivated to make it work.
686
633
*/
687
- if (MyDatabaseId != proc -> databaseId ) {
634
+ if (MyDatabaseId != proc -> databaseId )
688
635
ereport (ERROR ,
689
636
(errcode (ERRCODE_FEATURE_NOT_SUPPORTED ),
690
- errmsg ("prepared transaction belongs to another database" ),
637
+ errmsg ("prepared transaction belongs to another database" ),
691
638
errhint ("Connect to the database where the transaction was prepared to finish it." )));
692
- }
693
-
694
639
695
640
/* OK for me to lock it */
696
641
Assert (gxact -> locking_pid < 0 );
697
642
gxact -> locking_pid = MyProcPid ;
643
+ MyLockedGxact = gxact ;
644
+
645
+ LWLockRelease (TwoPhaseStateLock );
698
646
699
647
return gxact ;
700
- }
701
- }
648
+ }
702
649
703
650
LWLockRelease (TwoPhaseStateLock );
704
- if (MyLockedGxact != NULL ) {
705
- SpinLockRelease (& MyLockedGxact -> spinlock );
706
- MyLockedGxact = NULL ;
707
- }
708
651
709
652
ereport (ERROR ,
710
653
(errcode (ERRCODE_UNDEFINED_OBJECT ),
@@ -725,32 +668,23 @@ static void
725
668
RemoveGXact (GlobalTransaction gxact )
726
669
{
727
670
int i ;
728
- GlobalTransaction * prev ;
729
671
730
672
LWLockAcquire (TwoPhaseStateLock , LW_EXCLUSIVE );
731
673
732
- i = string_hash (gxact -> gid , 0 ) % max_prepared_xacts ;
733
-
734
- for (prev = & TwoPhaseState -> hashTable [i ]; * prev != NULL ; prev = & (* prev )-> next )
674
+ for (i = 0 ; i < TwoPhaseState -> numPrepXacts ; i ++ )
735
675
{
736
- if (gxact == * prev )
676
+ if (gxact == TwoPhaseState -> prepXacts [ i ] )
737
677
{
738
678
/* remove from the active array */
739
679
TwoPhaseState -> numPrepXacts -- ;
740
- TwoPhaseState -> prepXacts [gxact -> prep_index ] = TwoPhaseState -> prepXacts [TwoPhaseState -> numPrepXacts ];
741
- TwoPhaseState -> prepXacts [gxact -> prep_index ]-> prep_index = gxact -> prep_index ;
742
-
743
- /* remove from collision list */
744
- * prev = gxact -> next ;
680
+ TwoPhaseState -> prepXacts [i ] = TwoPhaseState -> prepXacts [TwoPhaseState -> numPrepXacts ];
745
681
746
682
/* and put it back in the freelist */
747
683
gxact -> next = TwoPhaseState -> freeGXacts ;
748
684
TwoPhaseState -> freeGXacts = gxact ;
749
685
750
- gxact -> locking_pid = -1 ;
751
-
686
+ gxact -> locking_pid = InvalidBackendId ;
752
687
LWLockRelease (TwoPhaseStateLock );
753
- SpinLockRelease (& gxact -> spinlock );
754
688
755
689
return ;
756
690
}
@@ -809,9 +743,9 @@ bool GetPreparedTransactionState(char const* gid, char* state)
809
743
bool result = false;
810
744
811
745
LWLockAcquire (TwoPhaseStateLock , LW_SHARED );
812
- i = string_hash (gid , 0 ) % max_prepared_xacts ;
813
- for (gxact = TwoPhaseState -> hashTable [i ]; gxact != NULL ; gxact = gxact -> next )
746
+ for (i = 0 ; i < TwoPhaseState -> numPrepXacts ; i ++ )
814
747
{
748
+ gxact = TwoPhaseState -> prepXacts [i ];
815
749
if (strcmp (gxact -> gid , gid ) == 0 )
816
750
{
817
751
strcpy (state , gxact -> state_3pc );
@@ -1011,6 +945,9 @@ TwoPhaseGetGXact(TransactionId xid)
1011
945
GlobalTransaction result = NULL ;
1012
946
int i ;
1013
947
948
+ static TransactionId cached_xid = InvalidTransactionId ;
949
+ static GlobalTransaction cached_gxact = NULL ;
950
+
1014
951
/*
1015
952
* During a recovery, COMMIT PREPARED, or ABORT PREPARED, we'll be called
1016
953
* repeatedly for the same XID. We can save work with a simple cache.
@@ -1194,7 +1131,6 @@ EndPrepare(GlobalTransaction gxact)
1194
1131
{
1195
1132
TwoPhaseFileHeader * hdr ;
1196
1133
StateFileChunk * record ;
1197
- uint8 info = XLOG_XACT_PREPARE ;
1198
1134
bool replorigin ;
1199
1135
1200
1136
replorigin = (replorigin_session_origin != InvalidRepOriginId &&
@@ -1244,6 +1180,7 @@ EndPrepare(GlobalTransaction gxact)
1244
1180
XLogEnsureRecordSpace (0 , records .num_chunks );
1245
1181
1246
1182
START_CRIT_SECTION ();
1183
+
1247
1184
MyPgXact -> delayChkpt = true;
1248
1185
1249
1186
XLogBeginInsert ();
@@ -1253,7 +1190,7 @@ EndPrepare(GlobalTransaction gxact)
1253
1190
if (replorigin )
1254
1191
XLogIncludeOrigin ();
1255
1192
1256
- gxact -> prepare_end_lsn = XLogInsert (RM_XACT_ID , info );
1193
+ gxact -> prepare_end_lsn = XLogInsert (RM_XACT_ID , XLOG_XACT_PREPARE );
1257
1194
1258
1195
if (replorigin )
1259
1196
/* Move LSNs forward for this replication origin */
@@ -1381,7 +1318,6 @@ ReadTwoPhaseFile(TransactionId xid, bool give_warnings)
1381
1318
stat .st_size > MaxAllocSize )
1382
1319
{
1383
1320
CloseTransientFile (fd );
1384
- fprintf (stderr , "wrong size of two-phase file \"%s\"\n" , path );
1385
1321
return NULL ;
1386
1322
}
1387
1323
@@ -1427,7 +1363,6 @@ ReadTwoPhaseFile(TransactionId xid, bool give_warnings)
1427
1363
if (!EQ_CRC32C (calc_crc , file_crc ))
1428
1364
{
1429
1365
pfree (buf );
1430
- fprintf (stderr , "wrong crc32 in two-phase file \"%s\"\n" , path );
1431
1366
return NULL ;
1432
1367
}
1433
1368
0 commit comments