33
33
#include "utils/array.h"
34
34
#include "utils/builtins.h"
35
35
#include "utils/memutils.h"
36
+ #include "commands/dbcommands.h"
37
+ #include "miscadmin.h"
38
+ #include "postmaster/autovacuum.h"
39
+ #include "storage/pmsignal.h"
40
+ #include "storage/proc.h"
41
+ #include "utils/syscache.h"
36
42
37
43
#include "libdtm.h"
38
44
@@ -61,6 +67,7 @@ static void DtmUpdateRecentXmin(void);
61
67
static void DtmInitialize (void );
62
68
static void DtmXactCallback (XactEvent event , void * arg );
63
69
static TransactionId DtmGetNextXid (void );
70
+ static TransactionId DtmGetNewTransactionId (bool isSubXact );
64
71
65
72
static bool TransactionIdIsInDtmSnapshot (TransactionId xid );
66
73
static bool TransactionIdIsInDoubt (TransactionId xid );
@@ -77,7 +84,7 @@ static SnapshotData DtmSnapshot = { HeapTupleSatisfiesMVCC };
77
84
static bool DtmHasGlobalSnapshot ;
78
85
static bool DtmIsGlobalTransaction ;
79
86
static int DtmLocalXidReserve ;
80
- static TransactionManager DtmTM = { DtmGetTransactionStatus , DtmSetTransactionStatus , DtmGetSnapshot , DtmGetNextXid };
87
+ static TransactionManager DtmTM = { DtmGetTransactionStatus , DtmSetTransactionStatus , DtmGetSnapshot , DtmGetNewTransactionId };
81
88
82
89
83
90
#define XTM_TRACE (fmt , ...)
@@ -190,7 +197,7 @@ static void DtmUpdateRecentXmin(void)
190
197
191
198
if (TransactionIdIsValid (xmin )) {
192
199
xmin -= vacuum_defer_cleanup_age ;
193
- xmin = FirstNormalTransactionId ;
200
+ // xmin = FirstNormalTransactionId;
194
201
if (!TransactionIdIsNormal (xmin )) {
195
202
xmin = FirstNormalTransactionId ;
196
203
}
@@ -245,6 +252,211 @@ static TransactionId DtmGetNextXid()
245
252
return xid ;
246
253
}
247
254
255
+ TransactionId
256
+ DtmGetNewTransactionId (bool isSubXact )
257
+ {
258
+ TransactionId xid ;
259
+
260
+ /*
261
+ * Workers synchronize transaction state at the beginning of each parallel
262
+ * operation, so we can't account for new XIDs after that point.
263
+ */
264
+ if (IsInParallelMode ())
265
+ elog (ERROR , "cannot assign TransactionIds during a parallel operation" );
266
+
267
+ /*
268
+ * During bootstrap initialization, we return the special bootstrap
269
+ * transaction id.
270
+ */
271
+ if (IsBootstrapProcessingMode ())
272
+ {
273
+ Assert (!isSubXact );
274
+ MyPgXact -> xid = BootstrapTransactionId ;
275
+ return BootstrapTransactionId ;
276
+ }
277
+
278
+ /* safety check, we should never get this far in a HS slave */
279
+ if (RecoveryInProgress ())
280
+ elog (ERROR , "cannot assign TransactionIds during recovery" );
281
+
282
+ LWLockAcquire (XidGenLock , LW_EXCLUSIVE );
283
+ xid = DtmGetNextXid ();
284
+
285
+ /*----------
286
+ * Check to see if it's safe to assign another XID. This protects against
287
+ * catastrophic data loss due to XID wraparound. The basic rules are:
288
+ *
289
+ * If we're past xidVacLimit, start trying to force autovacuum cycles.
290
+ * If we're past xidWarnLimit, start issuing warnings.
291
+ * If we're past xidStopLimit, refuse to execute transactions, unless
292
+ * we are running in single-user mode (which gives an escape hatch
293
+ * to the DBA who somehow got past the earlier defenses).
294
+ *
295
+ * Note that this coding also appears in GetNewMultiXactId.
296
+ *----------
297
+ */
298
+ if (TransactionIdFollowsOrEquals (xid , ShmemVariableCache -> xidVacLimit ))
299
+ {
300
+ /*
301
+ * For safety's sake, we release XidGenLock while sending signals,
302
+ * warnings, etc. This is not so much because we care about
303
+ * preserving concurrency in this situation, as to avoid any
304
+ * possibility of deadlock while doing get_database_name(). First,
305
+ * copy all the shared values we'll need in this path.
306
+ */
307
+ TransactionId xidWarnLimit = ShmemVariableCache -> xidWarnLimit ;
308
+ TransactionId xidStopLimit = ShmemVariableCache -> xidStopLimit ;
309
+ TransactionId xidWrapLimit = ShmemVariableCache -> xidWrapLimit ;
310
+ Oid oldest_datoid = ShmemVariableCache -> oldestXidDB ;
311
+
312
+ LWLockRelease (XidGenLock );
313
+
314
+ /*
315
+ * To avoid swamping the postmaster with signals, we issue the autovac
316
+ * request only once per 64K transaction starts. This still gives
317
+ * plenty of chances before we get into real trouble.
318
+ */
319
+ if (IsUnderPostmaster && (xid % 65536 ) == 0 )
320
+ SendPostmasterSignal (PMSIGNAL_START_AUTOVAC_LAUNCHER );
321
+
322
+ if (IsUnderPostmaster &&
323
+ TransactionIdFollowsOrEquals (xid , xidStopLimit ))
324
+ {
325
+ char * oldest_datname = get_database_name (oldest_datoid );
326
+
327
+ /* complain even if that DB has disappeared */
328
+ if (oldest_datname )
329
+ ereport (ERROR ,
330
+ (errcode (ERRCODE_PROGRAM_LIMIT_EXCEEDED ),
331
+ errmsg ("database is not accepting commands to avoid wraparound data loss in database \"%s\"" ,
332
+ oldest_datname ),
333
+ errhint ("Stop the postmaster and vacuum that database in single-user mode.\n"
334
+ "You might also need to commit or roll back old prepared transactions." )));
335
+ else
336
+ ereport (ERROR ,
337
+ (errcode (ERRCODE_PROGRAM_LIMIT_EXCEEDED ),
338
+ errmsg ("database is not accepting commands to avoid wraparound data loss in database with OID %u" ,
339
+ oldest_datoid ),
340
+ errhint ("Stop the postmaster and vacuum that database in single-user mode.\n"
341
+ "You might also need to commit or roll back old prepared transactions." )));
342
+ }
343
+ else if (TransactionIdFollowsOrEquals (xid , xidWarnLimit ))
344
+ {
345
+ char * oldest_datname = get_database_name (oldest_datoid );
346
+
347
+ /* complain even if that DB has disappeared */
348
+ if (oldest_datname )
349
+ ereport (WARNING ,
350
+ (errmsg ("database \"%s\" must be vacuumed within %u transactions" ,
351
+ oldest_datname ,
352
+ xidWrapLimit - xid ),
353
+ errhint ("To avoid a database shutdown, execute a database-wide VACUUM in that database.\n"
354
+ "You might also need to commit or roll back old prepared transactions." )));
355
+ else
356
+ ereport (WARNING ,
357
+ (errmsg ("database with OID %u must be vacuumed within %u transactions" ,
358
+ oldest_datoid ,
359
+ xidWrapLimit - xid ),
360
+ errhint ("To avoid a database shutdown, execute a database-wide VACUUM in that database.\n"
361
+ "You might also need to commit or roll back old prepared transactions." )));
362
+ }
363
+
364
+ /* Re-acquire lock and start over */
365
+ LWLockAcquire (XidGenLock , LW_EXCLUSIVE );
366
+ xid = DtmGetNextXid ();
367
+ }
368
+
369
+ /*
370
+ * If we are allocating the first XID of a new page of the commit log,
371
+ * zero out that commit-log page before returning. We must do this while
372
+ * holding XidGenLock, else another xact could acquire and commit a later
373
+ * XID before we zero the page. Fortunately, a page of the commit log
374
+ * holds 32K or more transactions, so we don't have to do this very often.
375
+ *
376
+ * Extend pg_subtrans and pg_commit_ts too.
377
+ */
378
+ if (TransactionIdFollowsOrEquals (xid , ShmemVariableCache -> nextXid )) {
379
+ fprintf (stderr , "Extend CLOG to %d\n" , xid );
380
+ ExtendCLOG (xid );
381
+ ExtendCommitTs (xid );
382
+ ExtendSUBTRANS (xid );
383
+ }
384
+ /*
385
+ * Now advance the nextXid counter. This must not happen until after we
386
+ * have successfully completed ExtendCLOG() --- if that routine fails, we
387
+ * want the next incoming transaction to try it again. We cannot assign
388
+ * more XIDs until there is CLOG space for them.
389
+ */
390
+ if (xid == ShmemVariableCache -> nextXid ) {
391
+ TransactionIdAdvance (ShmemVariableCache -> nextXid );
392
+ } else {
393
+ Assert (TransactionIdPrecedes (xid , ShmemVariableCache -> nextXid ));
394
+ }
395
+
396
+ /*
397
+ * We must store the new XID into the shared ProcArray before releasing
398
+ * XidGenLock. This ensures that every active XID older than
399
+ * latestCompletedXid is present in the ProcArray, which is essential for
400
+ * correct OldestXmin tracking; see src/backend/access/transam/README.
401
+ *
402
+ * XXX by storing xid into MyPgXact without acquiring ProcArrayLock, we
403
+ * are relying on fetch/store of an xid to be atomic, else other backends
404
+ * might see a partially-set xid here. But holding both locks at once
405
+ * would be a nasty concurrency hit. So for now, assume atomicity.
406
+ *
407
+ * Note that readers of PGXACT xid fields should be careful to fetch the
408
+ * value only once, rather than assume they can read a value multiple
409
+ * times and get the same answer each time.
410
+ *
411
+ * The same comments apply to the subxact xid count and overflow fields.
412
+ *
413
+ * A solution to the atomic-store problem would be to give each PGXACT its
414
+ * own spinlock used only for fetching/storing that PGXACT's xid and
415
+ * related fields.
416
+ *
417
+ * If there's no room to fit a subtransaction XID into PGPROC, set the
418
+ * cache-overflowed flag instead. This forces readers to look in
419
+ * pg_subtrans to map subtransaction XIDs up to top-level XIDs. There is a
420
+ * race-condition window, in that the new XID will not appear as running
421
+ * until its parent link has been placed into pg_subtrans. However, that
422
+ * will happen before anyone could possibly have a reason to inquire about
423
+ * the status of the XID, so it seems OK. (Snapshots taken during this
424
+ * window *will* include the parent XID, so they will deliver the correct
425
+ * answer later on when someone does have a reason to inquire.)
426
+ */
427
+ {
428
+ /*
429
+ * Use volatile pointer to prevent code rearrangement; other backends
430
+ * could be examining my subxids info concurrently, and we don't want
431
+ * them to see an invalid intermediate state, such as incrementing
432
+ * nxids before filling the array entry. Note we are assuming that
433
+ * TransactionId and int fetch/store are atomic.
434
+ */
435
+ volatile PGPROC * myproc = MyProc ;
436
+ volatile PGXACT * mypgxact = MyPgXact ;
437
+
438
+ if (!isSubXact )
439
+ mypgxact -> xid = xid ;
440
+ else
441
+ {
442
+ int nxids = mypgxact -> nxids ;
443
+
444
+ if (nxids < PGPROC_MAX_CACHED_SUBXIDS )
445
+ {
446
+ myproc -> subxids .xids [nxids ] = xid ;
447
+ mypgxact -> nxids = nxids + 1 ;
448
+ }
449
+ else
450
+ mypgxact -> overflowed = true;
451
+ }
452
+ }
453
+
454
+ LWLockRelease (XidGenLock );
455
+
456
+ return xid ;
457
+ }
458
+
459
+
248
460
static Snapshot DtmGetSnapshot (Snapshot snapshot )
249
461
{
250
462
0 commit comments