57
57
#include "replication/walsender.h"
58
58
#include "replication/syncrep.h"
59
59
#include "storage/fd.h"
60
+ #include "storage/ipc.h"
60
61
#include "storage/predicate.h"
61
62
#include "storage/procarray.h"
62
63
#include "storage/sinvaladt.h"
@@ -80,25 +81,25 @@ int max_prepared_xacts = 0;
80
81
*
81
82
* The lifecycle of a global transaction is:
82
83
*
83
- * 1. After checking that the requested GID is not in use, set up an
84
- * entry in the TwoPhaseState->prepXacts array with the correct XID and GID ,
85
- * with locking_xid = my own XID and valid = false .
84
+ * 1. After checking that the requested GID is not in use, set up an entry in
85
+ * the TwoPhaseState->prepXacts array with the correct GID and valid = false ,
86
+ * and mark it as locked by my backend .
86
87
*
87
88
* 2. After successfully completing prepare, set valid = true and enter the
88
89
* referenced PGPROC into the global ProcArray.
89
90
*
90
- * 3. To begin COMMIT PREPARED or ROLLBACK PREPARED, check that the entry
91
- * is valid and its locking_xid is no longer active, then store my current
92
- * XID into locking_xid . This prevents concurrent attempts to commit or
93
- * rollback the same prepared xact.
91
+ * 3. To begin COMMIT PREPARED or ROLLBACK PREPARED, check that the entry is
92
+ * valid and not locked, then mark the entry as locked by storing my current
93
+ * backend ID into locking_backend . This prevents concurrent attempts to
94
+ * commit or rollback the same prepared xact.
94
95
*
95
96
* 4. On completion of COMMIT PREPARED or ROLLBACK PREPARED, remove the entry
96
97
* from the ProcArray and the TwoPhaseState->prepXacts array and return it to
97
98
* the freelist.
98
99
*
99
100
* Note that if the preparing transaction fails between steps 1 and 2, the
100
- * entry will remain in prepXacts until recycled. We can detect recyclable
101
- * entries by checking for valid = false and locking_xid no longer active .
101
+ * entry must be removed so that the GID and the GlobalTransaction struct
102
+ * can be reused. See AtAbort_Twophase() .
102
103
*
103
104
* typedef struct GlobalTransactionData *GlobalTransaction appears in
104
105
* twophase.h
@@ -113,8 +114,8 @@ typedef struct GlobalTransactionData
113
114
TimestampTz prepared_at ; /* time of preparation */
114
115
XLogRecPtr prepare_lsn ; /* XLOG offset of prepare record */
115
116
Oid owner ; /* ID of user that executed the xact */
116
- TransactionId locking_xid ; /* top-level XID of backend working on xact */
117
- bool valid ; /* TRUE if fully prepared */
117
+ BackendId locking_backend ; /* backend currently working on the xact */
118
+ bool valid ; /* TRUE if PGPROC entry is in proc array */
118
119
char gid [GIDSIZE ]; /* The GID assigned to the prepared xact */
119
120
} GlobalTransactionData ;
120
121
@@ -139,6 +140,12 @@ typedef struct TwoPhaseStateData
139
140
140
141
static TwoPhaseStateData * TwoPhaseState ;
141
142
143
+ /*
144
+ * Global transaction entry currently locked by us, if any.
145
+ */
146
+ static GlobalTransaction MyLockedGxact = NULL ;
147
+
148
+ static bool twophaseExitRegistered = false;
142
149
143
150
static void RecordTransactionCommitPrepared (TransactionId xid ,
144
151
int nchildren ,
@@ -155,6 +162,7 @@ static void RecordTransactionAbortPrepared(TransactionId xid,
155
162
RelFileNode * rels );
156
163
static void ProcessRecords (char * bufptr , TransactionId xid ,
157
164
const TwoPhaseCallback callbacks []);
165
+ static void RemoveGXact (GlobalTransaction gxact );
158
166
159
167
160
168
/*
@@ -228,6 +236,74 @@ TwoPhaseShmemInit(void)
228
236
Assert (found );
229
237
}
230
238
239
+ /*
240
+ * Exit hook to unlock the global transaction entry we're working on.
241
+ */
242
+ static void
243
+ AtProcExit_Twophase (int code , Datum arg )
244
+ {
245
+ /* same logic as abort */
246
+ AtAbort_Twophase ();
247
+ }
248
+
249
+ /*
250
+ * Abort hook to unlock the global transaction entry we're working on.
251
+ */
252
+ void
253
+ AtAbort_Twophase (void )
254
+ {
255
+ if (MyLockedGxact == NULL )
256
+ return ;
257
+
258
+ /*
259
+ * What to do with the locked global transaction entry? If we were in
260
+ * the process of preparing the transaction, but haven't written the WAL
261
+ * record and state file yet, the transaction must not be considered as
262
+ * prepared. Likewise, if we are in the process of finishing an
263
+ * already-prepared transaction, and fail after having already written
264
+ * the 2nd phase commit or rollback record to the WAL, the transaction
265
+ * should not be considered as prepared anymore. In those cases, just
266
+ * remove the entry from shared memory.
267
+ *
268
+ * Otherwise, the entry must be left in place so that the transaction
269
+ * can be finished later, so just unlock it.
270
+ *
271
+ * If we abort during prepare, after having written the WAL record, we
272
+ * might not have transfered all locks and other state to the prepared
273
+ * transaction yet. Likewise, if we abort during commit or rollback,
274
+ * after having written the WAL record, we might not have released
275
+ * all the resources held by the transaction yet. In those cases, the
276
+ * in-memory state can be wrong, but it's too late to back out.
277
+ */
278
+ if (!MyLockedGxact -> valid )
279
+ {
280
+ RemoveGXact (MyLockedGxact );
281
+ }
282
+ else
283
+ {
284
+ LWLockAcquire (TwoPhaseStateLock , LW_EXCLUSIVE );
285
+
286
+ MyLockedGxact -> locking_backend = InvalidBackendId ;
287
+
288
+ LWLockRelease (TwoPhaseStateLock );
289
+ }
290
+ MyLockedGxact = NULL ;
291
+ }
292
+
293
+ /*
294
+ * This is called after we have finished transfering state to the prepared
295
+ * PGXACT entry.
296
+ */
297
+ void
298
+ PostPrepare_Twophase ()
299
+ {
300
+ LWLockAcquire (TwoPhaseStateLock , LW_EXCLUSIVE );
301
+ MyLockedGxact -> locking_backend = InvalidBackendId ;
302
+ LWLockRelease (TwoPhaseStateLock );
303
+
304
+ MyLockedGxact = NULL ;
305
+ }
306
+
231
307
232
308
/*
233
309
* MarkAsPreparing
@@ -259,29 +335,15 @@ MarkAsPreparing(TransactionId xid, const char *gid,
259
335
errmsg ("prepared transactions are disabled" ),
260
336
errhint ("Set max_prepared_transactions to a nonzero value." )));
261
337
262
- LWLockAcquire (TwoPhaseStateLock , LW_EXCLUSIVE );
263
-
264
- /*
265
- * First, find and recycle any gxacts that failed during prepare. We do
266
- * this partly to ensure we don't mistakenly say their GIDs are still
267
- * reserved, and partly so we don't fail on out-of-slots unnecessarily.
268
- */
269
- for (i = 0 ; i < TwoPhaseState -> numPrepXacts ; i ++ )
338
+ /* on first call, register the exit hook */
339
+ if (!twophaseExitRegistered )
270
340
{
271
- gxact = TwoPhaseState -> prepXacts [i ];
272
- if (!gxact -> valid && !TransactionIdIsActive (gxact -> locking_xid ))
273
- {
274
- /* It's dead Jim ... remove from the active array */
275
- TwoPhaseState -> numPrepXacts -- ;
276
- TwoPhaseState -> prepXacts [i ] = TwoPhaseState -> prepXacts [TwoPhaseState -> numPrepXacts ];
277
- /* and put it back in the freelist */
278
- gxact -> next = TwoPhaseState -> freeGXacts ;
279
- TwoPhaseState -> freeGXacts = gxact ;
280
- /* Back up index count too, so we don't miss scanning one */
281
- i -- ;
282
- }
341
+ on_shmem_exit (AtProcExit_Twophase , 0 );
342
+ twophaseExitRegistered = true;
283
343
}
284
344
345
+ LWLockAcquire (TwoPhaseStateLock , LW_EXCLUSIVE );
346
+
285
347
/* Check for conflicting GID */
286
348
for (i = 0 ; i < TwoPhaseState -> numPrepXacts ; i ++ )
287
349
{
@@ -339,14 +401,20 @@ MarkAsPreparing(TransactionId xid, const char *gid,
339
401
gxact -> prepare_lsn .xlogid = 0 ;
340
402
gxact -> prepare_lsn .xrecoff = 0 ;
341
403
gxact -> owner = owner ;
342
- gxact -> locking_xid = xid ;
404
+ gxact -> locking_backend = MyBackendId ;
343
405
gxact -> valid = false;
344
406
strcpy (gxact -> gid , gid );
345
407
346
408
/* And insert it into the active array */
347
409
Assert (TwoPhaseState -> numPrepXacts < max_prepared_xacts );
348
410
TwoPhaseState -> prepXacts [TwoPhaseState -> numPrepXacts ++ ] = gxact ;
349
411
412
+ /*
413
+ * Remember that we have this GlobalTransaction entry locked for us.
414
+ * If we abort after this, we must release it.
415
+ */
416
+ MyLockedGxact = gxact ;
417
+
350
418
LWLockRelease (TwoPhaseStateLock );
351
419
352
420
return gxact ;
@@ -409,6 +477,13 @@ LockGXact(const char *gid, Oid user)
409
477
{
410
478
int i ;
411
479
480
+ /* on first call, register the exit hook */
481
+ if (!twophaseExitRegistered )
482
+ {
483
+ on_shmem_exit (AtProcExit_Twophase , 0 );
484
+ twophaseExitRegistered = true;
485
+ }
486
+
412
487
LWLockAcquire (TwoPhaseStateLock , LW_EXCLUSIVE );
413
488
414
489
for (i = 0 ; i < TwoPhaseState -> numPrepXacts ; i ++ )
@@ -423,15 +498,11 @@ LockGXact(const char *gid, Oid user)
423
498
continue ;
424
499
425
500
/* Found it, but has someone else got it locked? */
426
- if (TransactionIdIsValid (gxact -> locking_xid ))
427
- {
428
- if (TransactionIdIsActive (gxact -> locking_xid ))
429
- ereport (ERROR ,
430
- (errcode (ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ),
431
- errmsg ("prepared transaction with identifier \"%s\" is busy" ,
432
- gid )));
433
- gxact -> locking_xid = InvalidTransactionId ;
434
- }
501
+ if (gxact -> locking_backend != InvalidBackendId )
502
+ ereport (ERROR ,
503
+ (errcode (ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ),
504
+ errmsg ("prepared transaction with identifier \"%s\" is busy" ,
505
+ gid )));
435
506
436
507
if (user != gxact -> owner && !superuser_arg (user ))
437
508
ereport (ERROR ,
@@ -452,7 +523,8 @@ LockGXact(const char *gid, Oid user)
452
523
errhint ("Connect to the database where the transaction was prepared to finish it." )));
453
524
454
525
/* OK for me to lock it */
455
- gxact -> locking_xid = GetTopTransactionId ();
526
+ gxact -> locking_backend = MyBackendId ;
527
+ MyLockedGxact = gxact ;
456
528
457
529
LWLockRelease (TwoPhaseStateLock );
458
530
@@ -1100,6 +1172,13 @@ EndPrepare(GlobalTransaction gxact)
1100
1172
*/
1101
1173
MyPgXact -> inCommit = false;
1102
1174
1175
+ /*
1176
+ * Remember that we have this GlobalTransaction entry locked for us. If
1177
+ * we crash after this point, it's too late to abort, but we must unlock
1178
+ * it so that the prepared transaction can be committed or rolled back.
1179
+ */
1180
+ MyLockedGxact = gxact ;
1181
+
1103
1182
END_CRIT_SECTION ();
1104
1183
1105
1184
/*
@@ -1346,8 +1425,9 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
1346
1425
1347
1426
/*
1348
1427
* In case we fail while running the callbacks, mark the gxact invalid so
1349
- * no one else will try to commit/rollback, and so it can be recycled
1350
- * properly later. It is still locked by our XID so it won't go away yet.
1428
+ * no one else will try to commit/rollback, and so it will be recycled
1429
+ * if we fail after this point. It is still locked by our backend so it
1430
+ * won't go away yet.
1351
1431
*
1352
1432
* (We assume it's safe to do this without taking TwoPhaseStateLock.)
1353
1433
*/
@@ -1407,6 +1487,7 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
1407
1487
RemoveTwoPhaseFile (xid , true);
1408
1488
1409
1489
RemoveGXact (gxact );
1490
+ MyLockedGxact = NULL ;
1410
1491
1411
1492
pfree (buf );
1412
1493
}
0 commit comments