Skip to content

Commit 85c0a49

Browse files
committed
VacuumProcArray
1 parent 9328e85 commit 85c0a49

File tree

6 files changed

+92
-7
lines changed

6 files changed

+92
-7
lines changed

contrib/pg_xtm/pg_dtm.c

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include "access/xtm.h"
2323
#include "access/transam.h"
2424
#include "access/xlog.h"
25+
#include "storage/proc.h"
2526
#include "storage/procarray.h"
2627
#include "access/twophase.h"
2728
#include <utils/guc.h>
@@ -40,6 +41,8 @@ static Snapshot DtmGetSnapshot(Snapshot snapshot);
4041
static void DtmCopySnapshot(Snapshot dst, Snapshot src);
4142
static XidStatus DtmGetTransactionStatus(TransactionId xid, XLogRecPtr *lsn);
4243
static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, TransactionId *subxids, XidStatus status, XLogRecPtr lsn);
44+
static bool TransactionIsInDtmSnapshot(TransactionId xid);
45+
4346
static NodeId DtmNodeId;
4447

4548
static DTMConn DtmConn;
@@ -80,7 +83,9 @@ static Snapshot DtmGetSnapshot(Snapshot snapshot)
8083
static XidStatus DtmGetTransactionStatus(TransactionId xid, XLogRecPtr *lsn)
8184
{
8285
XidStatus status = CLOGTransactionIdGetStatus(xid, lsn);
83-
if (status == TRANSACTION_STATUS_IN_PROGRESS && DtmHasSnapshot && !TransactionIdIsInProgress(xid)) {
86+
if (status == TRANSACTION_STATUS_IN_PROGRESS) {
87+
#if 0
88+
&& DtmHasSnapshot && !TransactionIdIsInProgress(xid)) {
8489
unsigned delay = 1000;
8590
while (true) {
8691
DtmEnsureConnection();
@@ -94,17 +99,31 @@ static XidStatus DtmGetTransactionStatus(TransactionId xid, XLogRecPtr *lsn)
9499
break;
95100
}
96101
}
97-
CLOGTransactionIdSetTreeStatus(xid, 0, NULL, status, InvalidXLogRecPtr);
102+
#endif
103+
status = DtmGlobalGetTransStatus(DtmConn, DtmNodeId, xid);
104+
if (status != TRANSACTION_STATUS_IN_PROGRESS) {
105+
CLOGTransactionIdSetTreeStatus(xid, 0, NULL, status, InvalidXLogRecPtr);
106+
}
98107
}
99108
return status;
100109
}
101110

111+
static bool TransactionIsInDtmSnapshot(TransactionId xid)
112+
{
113+
return bsearch(&xid, DtmSnapshot.xip, DtmSnapshot.xcnt,
114+
sizeof(TransactionId), xidComparator) != NULL;
115+
}
116+
117+
102118

103119
static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, TransactionId *subxids, XidStatus status, XLogRecPtr lsn)
104120
{
105121
if (DtmHasSnapshot) {
106122
/* Already should be IN_PROGRESS */
107123
/* CLOGTransactionIdSetTreeStatus(xid, nsubxids, subxids, TRANSACTION_STATUS_IN_PROGRESS, lsn); */
124+
if (status == TRANSACTION_STATUS_COMMITTED) {
125+
ProcArrayAdd(&ProcGlobal->allProcs[MyProc->pgprocno]);
126+
}
108127
DtmHasSnapshot = false;
109128
DtmEnsureConnection();
110129
if (!DtmGlobalSetTransStatus(DtmConn, DtmNodeId, xid, status) && status != TRANSACTION_STATUS_ABORTED) {
@@ -120,10 +139,13 @@ static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, Transaction
120139
* ***************************************************************************
121140
*/
122141

142+
extern bool (*TransactionIsInCurrentSnapshot)(TransactionId xid);
143+
123144
void
124145
_PG_init(void)
125146
{
126147
TM = &DtmTM;
148+
// TransactionIsInCurrentSnapshot = TransactionIsInDtmSnapshot;
127149

128150
DefineCustomIntVariable("dtm.node_id",
129151
"Identifier of node in distributed cluster for DTM",
@@ -170,12 +192,21 @@ dtm_global_transaction(PG_FUNCTION_ARGS)
170192
PG_RETURN_VOID();
171193
}
172194

195+
196+
197+
198+
199+
200+
173201
Datum
174202
dtm_get_snapshot(PG_FUNCTION_ARGS)
175203
{
176204
TransactionId xmin;
177205
DtmEnsureConnection();
178206
DtmGlobalGetSnapshot(DtmConn, DtmNodeId, GetCurrentTransactionId(), &DtmSnapshot);
207+
208+
VacuumProcArray(&DtmSnapshot);
209+
179210
/* Move it to DtmGlobalGetSnapshot? */
180211
xmin = DtmSnapshot.xmin;
181212
if (xmin != InvalidTransactionId) {

src/backend/executor/execMain.c

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2239,8 +2239,11 @@ EvalPlanQualFetch(EState *estate, Relation relation, int lockmode,
22392239
}
22402240

22412241
/* otherwise xmin should not be dirty... */
2242-
if (TransactionIdIsValid(SnapshotDirty.xmin))
2242+
#if 0
2243+
if (TransactionIdIsValid(SnapshotDirty.xmin)) {
22432244
elog(ERROR, "t_xmin is uncommitted in tuple to be updated");
2245+
}
2246+
#endif
22442247

22452248
/*
22462249
* If tuple is being updated by other transaction then we have to

src/backend/storage/ipc/procarray.c

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@
6262
#include "utils/tqual.h"
6363
#include "utils/snapmgr.h"
6464

65+
static bool
66+
TransactionIdIsRunning(TransactionId xid);
6567

6668
/* Our shared memory area */
6769
typedef struct ProcArrayStruct
@@ -946,6 +948,14 @@ ProcArrayApplyXidAssignment(TransactionId topxid,
946948
LWLockRelease(ProcArrayLock);
947949
}
948950

951+
bool (*TransactionIsInCurrentSnapshot)(TransactionId xid);
952+
953+
bool
954+
TransactionIdIsInProgress(TransactionId xid)
955+
{
956+
return TransactionIdIsRunning(xid) || (TransactionIsInCurrentSnapshot && TransactionIsInCurrentSnapshot(xid));
957+
}
958+
949959
/*
950960
* TransactionIdIsInProgress -- is given transaction running in some backend
951961
*
@@ -973,7 +983,7 @@ ProcArrayApplyXidAssignment(TransactionId topxid,
973983
* PGXACT again anyway; see GetNewTransactionId).
974984
*/
975985
bool
976-
TransactionIdIsInProgress(TransactionId xid)
986+
TransactionIdIsRunning(TransactionId xid)
977987
{
978988
static TransactionId *xids = NULL;
979989
int nxids = 0;
@@ -3867,3 +3877,41 @@ KnownAssignedXidsReset(void)
38673877

38683878
LWLockRelease(ProcArrayLock);
38693879
}
3880+
3881+
static bool TransactionIsStillInProgress(TransactionId xid, Snapshot snapshot)
3882+
{
3883+
return (bsearch(&xid, snapshot->xip, snapshot->xcnt, sizeof(TransactionId), xidComparator) != NULL) || (xid > snapshot->xmax);
3884+
}
3885+
3886+
3887+
void VacuumProcArray(Snapshot snapshot)
3888+
{
3889+
int i;
3890+
int nInProgress = 0;
3891+
int nCompleted = 0;
3892+
ProcArrayStruct *arrayP = procArray;
3893+
3894+
LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
3895+
for (i = arrayP->numProcs; --i >= 0;)
3896+
{
3897+
int pgprocno = arrayP->pgprocnos[i];
3898+
PGXACT *pgxact = &allPgXact[pgprocno];
3899+
TransactionId pxid = pgxact->xid;
3900+
3901+
if (!TransactionIdIsValid(pxid)) {
3902+
continue;
3903+
}
3904+
if (TransactionIsStillInProgress(pxid, snapshot)) {
3905+
elog(WARNING, "ProcArray: %d is in progress\n", pxid);
3906+
nInProgress += 1;
3907+
continue;
3908+
}
3909+
nCompleted += 1;
3910+
memmove(&arrayP->pgprocnos[i], &arrayP->pgprocnos[i + 1],
3911+
(arrayP->numProcs - i - 1) * sizeof(int));
3912+
arrayP->pgprocnos[arrayP->numProcs - 1] = -1; /* for debugging */
3913+
arrayP->numProcs--;
3914+
}
3915+
LWLockRelease(ProcArrayLock);
3916+
elog(WARNING, "ProcArray: %d in progress, %d completed, %d total\n", nInProgress, nCompleted, arrayP->numProcs);
3917+
}

src/backend/storage/lmgr/lmgr.c

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -542,9 +542,8 @@ XactLockTableWait(TransactionId xid, Relation rel, ItemPointer ctid,
542542
error_context_stack = &callback;
543543
}
544544

545-
for (;;)
546-
{
547-
Assert(TransactionIdIsValid(xid));
545+
while (TransactionIdIsValid(xid))
546+
{
548547
Assert(!TransactionIdEquals(xid, GetTopTransactionIdIfAny()));
549548

550549
SET_LOCKTAG_TRANSACTION(tag, xid);

src/backend/utils/time/tqual.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -848,6 +848,7 @@ HeapTupleSatisfiesDirty(HeapTuple htup, Snapshot snapshot,
848848
}
849849

850850
snapshot->xmin = HeapTupleHeaderGetRawXmin(tuple);
851+
snapshot->xmax = HeapTupleHeaderGetRawXmax(tuple);
851852
/* XXX shouldn't we fall through to look at xmax? */
852853
return true; /* in insertion by other */
853854
}
@@ -1800,3 +1801,4 @@ HeapTupleSatisfiesHistoricMVCC(HeapTuple htup, Snapshot snapshot,
18001801
else
18011802
return true;
18021803
}
1804+

src/include/storage/procarray.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,4 +88,6 @@ extern void ProcArraySetReplicationSlotXmin(TransactionId xmin,
8888
extern void ProcArrayGetReplicationSlotXmin(TransactionId *xmin,
8989
TransactionId *catalog_xmin);
9090

91+
extern void VacuumProcArray(Snapshot snapshot);
92+
9193
#endif /* PROCARRAY_H */

0 commit comments

Comments
 (0)