Skip to content

Commit 3d6eaab

Browse files
committed
Obtain XID from DTMD
1 parent dc55677 commit 3d6eaab

File tree

8 files changed

+131
-99
lines changed

8 files changed

+131
-99
lines changed

contrib/pg_xtm/libdtm.h

Lines changed: 12 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -8,42 +8,28 @@
88

99
#define INVALID_XID 0
1010

11-
typedef int NodeId;
12-
typedef unsigned long long xid_t;
13-
14-
typedef struct DTMConnData *DTMConn;
15-
16-
// Connects to the specified DTM.
17-
DTMConn DtmConnect(char *host, int port);
18-
19-
// Disconnects from the DTM. Do not use the 'dtm' pointer after this call, or
20-
// bad things will happen.
21-
void DtmDisconnect(DTMConn dtm);
22-
2311
void DtmInitSnapshot(Snapshot snapshot);
2412

25-
typedef struct {
26-
TransactionId* xids;
27-
NodeId* nodes;
28-
int nNodes;
29-
} GlobalTransactionId;
13+
// Starts new global transaction
14+
TransactionId DtmGlobalStartTransaction(int nParticipants, Snapshot shaposhot);
3015

31-
// Creates an entry for a new global transaction. Returns 'true' on success, or
32-
// 'false' otherwise.
33-
bool DtmGlobalStartTransaction(DTMConn dtm, GlobalTransactionId* gtid);
16+
// Asks DTM for a fresh snapshot.
17+
void DtmGlobalNewSnapshot(TransactionId xid, Snapshot snapshot);
3418

35-
// Asks DTM for a fresh snapshot. Returns 'true' on success, or 'false'
36-
// otherwise.
37-
bool DtmGlobalGetSnapshot(DTMConn dtm, NodeId nodeid, TransactionId xid, Snapshot snapshot);
19+
// Get existed DTM snapshot.
20+
void DtmGlobalGetSnapshot(TransactionId xid, Snapshot snapshot);
3821

3922
// Commits transaction only once all participants have called this function,
40-
// does not change CLOG otherwise. Returns 'true' on success, 'false' if
41-
// something failed on the daemon side.
42-
bool DtmGlobalSetTransStatus(DTMConn dtm, NodeId nodeid, TransactionId xid, XidStatus status, bool wait);
23+
// does not change CLOG otherwise.
24+
void DtmGlobalSetTransStatus(TransactionId xid, XidStatus status, bool wait);
4325

4426
// Gets the status of the transaction identified by 'xid'. Returns the status
4527
// on success, or -1 otherwise. If 'wait' is true, then it does not return
4628
// until the transaction is finished.
4729
XidStatus DtmGlobalGetTransStatus(DTMConn dtm, NodeId nodeid, TransactionId xid, bool wait);
4830

31+
// Reserve XIDs for local transaction
32+
TransactioinId DtmGlobalReserve(int nXids);
33+
34+
4935
#endif

contrib/pg_xtm/pg_dtm--1.0.sql

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,22 @@
11
-- complain if script is sourced in psql, rather than via CREATE EXTENSION
22
\echo Use "CREATE EXTENSION pg_dtm" to load this file. \quit
33

4-
CREATE FUNCTION dtm_begin_transaction(nodes integer[], xids integer[]) RETURNS void
4+
CREATE FUNCTION dtm_begin_transaction(n_participants integer) RETURNS integer
55
AS 'MODULE_PATHNAME','dtm_begin_transaction'
66
LANGUAGE C;
77

8-
CREATE FUNCTION dtm_get_current_snapshot_xmin() RETURNS bigint
8+
CREATE FUNCTION dtm_get_snapshot(xid integer) RETURNS void
9+
AS 'MODULE_PATHNAME','dtm_get_snapshot'
10+
LANGUAGE C;
11+
12+
CREATE FUNCTION dtm_new_snapshot(xid integer) RETURNS void
13+
AS 'MODULE_PATHNAME','dtm_new_snapshot'
14+
LANGUAGE C;
15+
16+
CREATE FUNCTION dtm_get_current_snapshot_xmin() RETURNS integer
917
AS 'MODULE_PATHNAME','dtm_get_current_snapshot_xmin'
1018
LANGUAGE C;
1119

12-
CREATE FUNCTION dtm_get_current_snapshot_xmax() RETURNS bigint
20+
CREATE FUNCTION dtm_get_current_snapshot_xmax() RETURNS integer
1321
AS 'MODULE_PATHNAME','dtm_get_current_snapshot_xmax'
1422
LANGUAGE C;

contrib/pg_xtm/pg_dtm.c

Lines changed: 98 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/*
1+
sisva/*
22
* pg_dtm.c
33
*
44
* Pluggable distributed transaction manager
@@ -36,7 +36,10 @@
3636

3737
typedef struct
3838
{
39-
LWLockId lock; /* protect access to hash table */
39+
LWLockId hashLock;
40+
LWLockId xidLock;
41+
TransactionId nextXid;
42+
size_t nReservedXids;
4043
} DtmState;
4144

4245

@@ -48,15 +51,15 @@ typedef struct
4851
void _PG_init(void);
4952
void _PG_fini(void);
5053

51-
static void DtmEnsureConnection(void);
5254
static Snapshot DtmGetSnapshot(void);
5355
static void DtmMergeSnapshots(Snapshot dst, Snapshot src);
5456
static Snapshot DtmCopySnapshot(Snapshot snapshot);
5557
static XidStatus DtmGetTransactionStatus(TransactionId xid, XLogRecPtr *lsn);
5658
static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, TransactionId *subxids, XidStatus status, XLogRecPtr lsn);
5759
static void DtmUpdateRecentXmin(void);
58-
static void DtmInitialize();
60+
static void DtmInitialize(void);
5961
static void DtmXactCallback(XactEvent event, void *arg);
62+
static void DtmGetNextXid(void);
6063

6164
static bool TransactionIdIsInDtmSnapshot(TransactionId xid);
6265
static bool TransactionIdIsInDoubt(TransactionId xid);
@@ -69,33 +72,18 @@ static DtmState* dtm;
6972
static TransactionId DtmCurrentXid = InvalidTransactionId;
7073
static Snapshot CurrentTransactionSnapshot;
7174

72-
static NodeId DtmNodeId;
73-
static DTMConn DtmConn;
75+
static TransactionId DtmNextXid;
7476
static SnapshotData DtmSnapshot = { HeapTupleSatisfiesMVCC };
7577
static SnapshotData DtmLocalSnapshot = { HeapTupleSatisfiesMVCC };
76-
static bool DtmGlobalTransaction = false;
77-
static TransactionManager DtmTM = { DtmGetTransactionStatus, DtmSetTransactionStatus, DtmGetSnapshot, DtmCopySnapshot };
78-
static DTMConn DtmConn;
78+
static bool DtmIsGlobalTransaction = false;
79+
static int DtmLocalXidReserve;
80+
static TransactionManager DtmTM = { DtmGetTransactionStatus, DtmSetTransactionStatus, DtmGetSnapshot, DtmCopySnapshot, DtmGetNextXid };
81+
7982

8083
#define XTM_TRACE(fmt, ...)
8184
#define XTM_INFO(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
8285
//#define XTM_INFO(fmt, ...)
8386

84-
static void DtmEnsureConnection(void)
85-
{
86-
int attempt = 0;
87-
XTM_TRACE("XTM: DtmEnsureConnection\n");
88-
while (attempt < XTM_CONNECT_ATTEMPTS) {
89-
if (DtmConn) {
90-
return;
91-
}
92-
XTM_TRACE("XTM: DtmEnsureConnection, attempt #%u\n", attempt);
93-
DtmConn = DtmConnect("127.0.0.1", 5431);
94-
attempt++;
95-
}
96-
elog(ERROR, "Failed to connect to DTMD");
97-
}
98-
9987
static void DumpSnapshot(Snapshot s, char *name)
10088
{
10189
int i;
@@ -247,11 +235,37 @@ static Snapshot DtmCopySnapshot(Snapshot snapshot)
247235
return newsnap;
248236
}
249237

238+
static TransactionId DtmGetNextXid()
239+
{
240+
TransactionId xid;
241+
if (TransactionIdIsValid(DtmNextXid)) {
242+
xid = DtmNextXid;
243+
DtmNextXid = InvalidTransactionId;
244+
} else {
245+
LWLockAcquire(dtm->xidLock, LW_EXCLUSIVE);
246+
if (dtm->nReservedXids == 0) {
247+
xid = DtmGlobalReserve(DtmLocalXidReserve);
248+
dtm->nReservedXids = DtmLocalXidReserve;
249+
ShmemVariableCache->nextXid = xid;
250+
}
251+
Assert(dtm->nextXid == ShmemVariableCache->nextXid);
252+
xid = ShmemVariableCache->nextXid;
253+
dtm->nextXid += 1;
254+
dtm->nReservedXids -= 1;
255+
LWLockRelease(dtm->xidLock);
256+
}
257+
return xid;
258+
}
250259

251260
static Snapshot DtmGetSnapshot()
252261
{
253-
CurrentTransactionSnapshot = GetLocalTransactionSnapshot();
254-
return CurrentTransactionSnapshot;
262+
Snapshot snapshot = GetLocalTransactionSnapshot();
263+
if (DtmIsGlobalTransaction) {
264+
DtmMergeSnapshots(snapshot, &DtmSnapshot);
265+
DtmUpdateRecentXmin();
266+
}
267+
CurrentTransactionSnapshot = snapshot;
268+
return snapshot;
255269
}
256270

257271

@@ -274,18 +288,16 @@ static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, Transaction
274288
CurrentTransactionSnapshot = NULL;
275289
if (status == TRANSACTION_STATUS_ABORTED) {
276290
CLOGTransactionIdSetTreeStatus(xid, nsubxids, subxids, status, lsn);
277-
DtmEnsureConnection();
278-
DtmGlobalSetTransStatus(DtmConn, DtmNodeId, xid, status, false);
291+
DtmGlobalSetTransStatus(xid, status, false);
279292
XTM_INFO("Abort transaction %d\n", xid);
280293
return;
281294
} else {
282-
DtmEnsureConnection();
283295
XTM_INFO("Begin commit transaction %d\n", xid);
284296
DtmCurrentXid = xid;
285-
LWLockAcquire(dtm->lock, LW_EXCLUSIVE);
297+
LWLockAcquire(dtm->hashLock, LW_EXCLUSIVE);
286298
hash_search(xid_in_doubt, &DtmCurrentXid, HASH_ENTER, NULL);
287-
LWLockRelease(dtm->lock);
288-
if (!DtmGlobalSetTransStatus(DtmConn, DtmNodeId, xid, status, true)) {
299+
LWLockRelease(dtm->hashLock);
300+
if (!DtmGlobalSetTransStatus(xid, status, true)) {
289301
elog(ERROR, "DTMD failed to set transaction status");
290302
}
291303
XTM_INFO("Commit transaction %d\n", xid);
@@ -295,8 +307,7 @@ static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, Transaction
295307
}
296308
} else {
297309
XidStatus gs;
298-
DtmEnsureConnection();
299-
gs = DtmGlobalGetTransStatus(DtmConn, DtmNodeId, xid, false);
310+
gs = DtmGlobalGetTransStatus(xid, false);
300311
if (gs != TRANSACTION_STATUS_UNKNOWN) {
301312
status = gs;
302313
}
@@ -324,7 +335,9 @@ static void DtmInitialize()
324335
dtm = ShmemInitStruct("dtm", sizeof(DtmState), &found);
325336
if (!found)
326337
{
327-
dtm->lock = LWLockAssign();
338+
dtm->hashLock = LWLockAssign();
339+
dtm->xidLock = LWLockAssign();
340+
dtm->nReservedXids = 0;
328341
}
329342
LWLockRelease(AddinShmemInitLock);
330343

@@ -346,9 +359,9 @@ static void
346359
DtmXactCallback(XactEvent event, void *arg)
347360
{
348361
if (event == XACT_EVENT_COMMIT && DtmCurrentXid != InvalidTransactionId) {
349-
LWLockAcquire(dtm->lock, LW_EXCLUSIVE);
362+
LWLockAcquire(dtm->hashLock, LW_EXCLUSIVE);
350363
hash_search(xid_in_doubt, &DtmCurrentXid, HASH_REMOVE, NULL);
351-
LWLockRelease(dtm->lock);
364+
LWLockRelease(dtm->hashLock);
352365
}
353366
}
354367

@@ -377,14 +390,14 @@ _PG_init(void)
377390
* resources in imcs_shmem_startup().
378391
*/
379392
RequestAddinShmemSpace(DTM_SHMEM_SIZE);
380-
RequestAddinLWLocks(1);
393+
RequestAddinLWLocks(2);
381394

382-
DefineCustomIntVariable("dtm.node_id",
383-
"Identifier of node in distributed cluster for DTM",
395+
DefineCustomIntVariable("dtm.local_xid_reserve",
396+
"Number of XIDs reserved by node for local transactions",
384397
NULL,
385-
&DtmNodeId,
386-
0,
387-
0,
398+
&DtmLocalXidReserve,
399+
100,
400+
1,
388401
INT_MAX,
389402
PGC_BACKEND,
390403
0,
@@ -424,41 +437,59 @@ static void dtm_shmem_startup(void)
424437
PG_MODULE_MAGIC;
425438

426439
PG_FUNCTION_INFO_V1(dtm_begin_transaction);
440+
PG_FUNCTION_INFO_V1(dtm_get_snapshot);
441+
PG_FUNCTION_INFO_V1(dtm_new_snapshot);
427442
PG_FUNCTION_INFO_V1(dtm_get_current_snapshot_xmax);
428443
PG_FUNCTION_INFO_V1(dtm_get_current_snapshot_xmin);
429444

430445
Datum
431-
dtm_begin_transaction(PG_FUNCTION_ARGS)
446+
dtm_get_current_snapshot_xmin(PG_FUNCTION_ARGS)
432447
{
433-
GlobalTransactionId gtid;
434-
ArrayType* nodes = PG_GETARG_ARRAYTYPE_P(0);
435-
ArrayType* xids = PG_GETARG_ARRAYTYPE_P(1);
436-
gtid.xids = (TransactionId*)ARR_DATA_PTR(xids);
437-
gtid.nodes = (NodeId*)ARR_DATA_PTR(nodes);
438-
gtid.nNodes = ArrayGetNItems(ARR_NDIM(nodes), ARR_DIMS(nodes));
439-
DtmGlobalTransaction = true;
440-
Assert(gtid.xids[DtmNodeId] == GetCurrentTransactionId());
441-
XTM_INFO("Start transaction {%d,%d} at node %d\n", gtid.xids[0], gtid.xids[1], DtmNodeId);
442-
if (DtmNodeId == gtid.nodes[0]) {
443-
DtmEnsureConnection();
444-
DtmGlobalStartTransaction(DtmConn, &gtid);
445-
}
446-
DtmEnsureConnection();
447-
DtmGlobalGetSnapshot(DtmConn, DtmNodeId, gtid.xids[DtmNodeId], &DtmSnapshot);
448-
Assert(CurrentTransactionSnapshot != NULL);
449-
DtmMergeSnapshots(CurrentTransactionSnapshot, &DtmSnapshot);
450-
DtmUpdateRecentXmin();
451-
PG_RETURN_VOID();
448+
PG_RETURN_INT32(CurrentTransactionSnapshot->xmin);
452449
}
453450

454451
Datum
455-
dtm_get_current_snapshot_xmin(PG_FUNCTION_ARGS)
452+
dtm_get_current_snapshot_xmax(PG_FUNCTION_ARGS)
456453
{
457-
PG_RETURN_INT64(CurrentTransactionSnapshot->xmin);
454+
PG_RETURN_INT32(CurrentTransactionSnapshot->xmax);
458455
}
459456

457+
460458
Datum
461-
dtm_get_current_snapshot_xmax(PG_FUNCTION_ARGS)
459+
dtm_begin_transaction(PG_FUNCTION_ARGS)
460+
{
461+
int nPaticipants = PG_GETARG_INT32(0);
462+
Assert(!TransactionIdIsValid(DtmNextXid));
463+
464+
DtmNextXid = DtmGlobalStartTransaction(nParticipants, &DtmSnapshot);
465+
Assert(TransactionIdIsValid(DtmNextXid));
466+
467+
DtmIsGlobalTransaction = true;
468+
469+
PG_RETURN_INT32(DtmNextXid);
470+
}
471+
472+
Datum dtm_get_snapshot(PG_FUNCTION_ARGS)
473+
{
474+
Assert(!TransactionIdIsValid(DtmNextXid));
475+
DtmNextXid = PG_GETARG_INT32(0);
476+
Assert(TransactionIdIsValid(DtmNextXid));
477+
478+
DtmNextXid = DtmGlobalGetSnapshot(DtmConn, DtmNextXid, &DtmSnapshot);
479+
480+
DtmIsGlobalTransaction = true;
481+
482+
PG_RETURN_VOID();
483+
}
484+
485+
Datum dtm_new_snapshot(PG_FUNCTION_ARGS)
462486
{
463-
PG_RETURN_INT64(CurrentTransactionSnapshot->xmax);
487+
Assert(!TransactionIdIsValid(DtmNextXid));
488+
DtmNextXid = PG_GETARG_INT32(0);
489+
Assert(TransactionIdIsValid(DtmNextXid));
490+
491+
DtmNextXid = DtmGlobalNewSnapshot(DtmConn, DtmNextXid, &DtmSnapshot);
492+
493+
PG_RETURN_VOID();
464494
}
495+

src/backend/access/transam/clog.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444
#include "miscadmin.h"
4545
#include "pg_trace.h"
4646

47-
TransactionManager DefaultTM = { CLOGTransactionIdGetStatus, CLOGTransactionIdSetTreeStatus, GetLocalTransactionSnapshot, CopyLocalSnapshot };
47+
TransactionManager DefaultTM = { CLOGTransactionIdGetStatus, CLOGTransactionIdSetTreeStatus, GetLocalTransactionSnapshot, CopyLocalSnapshot, GetNextTransactionId };
4848
TransactionManager* TM = &DefaultTM;
4949

5050
/*

src/backend/access/transam/varsup.c

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,10 @@
3333
/* pointer to "variable cache" in shared memory (set up by shmem.c) */
3434
VariableCache ShmemVariableCache = NULL;
3535

36+
TransactionId GetNextTransactionId()
37+
{
38+
return ShmemVariableCache->nextXid;
39+
}
3640

3741
/*
3842
* Allocate the next XID for a new transaction or subtransaction.
@@ -73,7 +77,7 @@ GetNewTransactionId(bool isSubXact)
7377

7478
LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
7579

76-
xid = ShmemVariableCache->nextXid;
80+
xid = TM->GetNextXid();
7781

7882
/*----------
7983
* Check to see if it's safe to assign another XID. This protects against

0 commit comments

Comments
 (0)