Skip to content

Commit 4b5d155

Browse files
committed
merge
2 parents 358204a + 3d6eaab commit 4b5d155

File tree

8 files changed

+142
-107
lines changed

8 files changed

+142
-107
lines changed

contrib/pg_xtm/libdtm.h

Lines changed: 12 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -8,42 +8,28 @@
88

99
#define INVALID_XID 0
1010

11-
typedef int NodeId;
12-
typedef unsigned long long xid_t;
13-
14-
typedef struct DTMConnData *DTMConn;
15-
16-
// Connects to the specified DTM.
17-
DTMConn DtmConnect(char *host, int port);
18-
19-
// Disconnects from the DTM. Do not use the 'dtm' pointer after this call, or
20-
// bad things will happen.
21-
void DtmDisconnect(DTMConn dtm);
22-
2311
void DtmInitSnapshot(Snapshot snapshot);
2412

25-
typedef struct {
26-
TransactionId* xids;
27-
NodeId* nodes;
28-
int nNodes;
29-
} GlobalTransactionId;
13+
// Starts new global transaction
14+
TransactionId DtmGlobalStartTransaction(int nParticipants, Snapshot shaposhot);
3015

31-
// Creates an entry for a new global transaction. Returns 'true' on success, or
32-
// 'false' otherwise.
33-
bool DtmGlobalStartTransaction(DTMConn dtm, GlobalTransactionId* gtid);
16+
// Asks DTM for a fresh snapshot.
17+
void DtmGlobalNewSnapshot(TransactionId xid, Snapshot snapshot);
3418

35-
// Asks DTM for a fresh snapshot. Returns 'true' on success, or 'false'
36-
// otherwise.
37-
bool DtmGlobalGetSnapshot(DTMConn dtm, NodeId nodeid, TransactionId xid, Snapshot snapshot);
19+
// Get existed DTM snapshot.
20+
void DtmGlobalGetSnapshot(TransactionId xid, Snapshot snapshot);
3821

3922
// Commits transaction only once all participants have called this function,
40-
// does not change CLOG otherwise. Returns 'true' on success, 'false' if
41-
// something failed on the daemon side.
42-
bool DtmGlobalSetTransStatus(DTMConn dtm, NodeId nodeid, TransactionId xid, XidStatus status, bool wait);
23+
// does not change CLOG otherwise.
24+
void DtmGlobalSetTransStatus(TransactionId xid, XidStatus status, bool wait);
4325

4426
// Gets the status of the transaction identified by 'xid'. Returns the status
4527
// on success, or -1 otherwise. If 'wait' is true, then it does not return
4628
// until the transaction is finished.
4729
XidStatus DtmGlobalGetTransStatus(DTMConn dtm, NodeId nodeid, TransactionId xid, bool wait);
4830

31+
// Reserve XIDs for local transaction
32+
TransactioinId DtmGlobalReserve(int nXids);
33+
34+
4935
#endif

contrib/pg_xtm/pg_dtm--1.0.sql

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,22 @@
11
-- complain if script is sourced in psql, rather than via CREATE EXTENSION
22
\echo Use "CREATE EXTENSION pg_dtm" to load this file. \quit
33

4-
CREATE FUNCTION dtm_begin_transaction(nodes integer[], xids integer[]) RETURNS void
4+
CREATE FUNCTION dtm_begin_transaction(n_participants integer) RETURNS integer
55
AS 'MODULE_PATHNAME','dtm_begin_transaction'
66
LANGUAGE C;
77

8-
CREATE FUNCTION dtm_get_current_snapshot_xmin() RETURNS bigint
8+
CREATE FUNCTION dtm_get_snapshot(xid integer) RETURNS void
9+
AS 'MODULE_PATHNAME','dtm_get_snapshot'
10+
LANGUAGE C;
11+
12+
CREATE FUNCTION dtm_new_snapshot(xid integer) RETURNS void
13+
AS 'MODULE_PATHNAME','dtm_new_snapshot'
14+
LANGUAGE C;
15+
16+
CREATE FUNCTION dtm_get_current_snapshot_xmin() RETURNS integer
917
AS 'MODULE_PATHNAME','dtm_get_current_snapshot_xmin'
1018
LANGUAGE C;
1119

12-
CREATE FUNCTION dtm_get_current_snapshot_xmax() RETURNS bigint
20+
CREATE FUNCTION dtm_get_current_snapshot_xmax() RETURNS integer
1321
AS 'MODULE_PATHNAME','dtm_get_current_snapshot_xmax'
1422
LANGUAGE C;

contrib/pg_xtm/pg_dtm.c

Lines changed: 109 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/*
1+
sisva/*
22
* pg_dtm.c
33
*
44
* Pluggable distributed transaction manager
@@ -36,7 +36,10 @@
3636

3737
typedef struct
3838
{
39-
LWLockId lock; /* protect access to hash table */
39+
LWLockId hashLock;
40+
LWLockId xidLock;
41+
TransactionId nextXid;
42+
size_t nReservedXids;
4043
} DtmState;
4144

4245

@@ -48,15 +51,15 @@ typedef struct
4851
void _PG_init(void);
4952
void _PG_fini(void);
5053

51-
static void DtmEnsureConnection(void);
5254
static Snapshot DtmGetSnapshot(void);
5355
static void DtmMergeSnapshots(Snapshot dst, Snapshot src);
5456
static Snapshot DtmCopySnapshot(Snapshot snapshot);
5557
static XidStatus DtmGetTransactionStatus(TransactionId xid, XLogRecPtr *lsn);
5658
static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, TransactionId *subxids, XidStatus status, XLogRecPtr lsn);
5759
static void DtmUpdateRecentXmin(void);
58-
static void DtmInitialize();
60+
static void DtmInitialize(void);
5961
static void DtmXactCallback(XactEvent event, void *arg);
62+
static void DtmGetNextXid(void);
6063

6164
static bool TransactionIdIsInDtmSnapshot(TransactionId xid);
6265
static bool TransactionIdIsInDoubt(TransactionId xid);
@@ -69,33 +72,18 @@ static DtmState* dtm;
6972
static TransactionId DtmCurrentXid = InvalidTransactionId;
7073
static Snapshot CurrentTransactionSnapshot;
7174

72-
static NodeId DtmNodeId;
73-
static DTMConn DtmConn;
75+
static TransactionId DtmNextXid;
7476
static SnapshotData DtmSnapshot = { HeapTupleSatisfiesMVCC };
7577
static SnapshotData DtmLocalSnapshot = { HeapTupleSatisfiesMVCC };
76-
static bool DtmGlobalTransaction = false;
77-
static TransactionManager DtmTM = { DtmGetTransactionStatus, DtmSetTransactionStatus, DtmGetSnapshot, DtmCopySnapshot };
78-
static DTMConn DtmConn;
78+
static bool DtmIsGlobalTransaction = false;
79+
static int DtmLocalXidReserve;
80+
static TransactionManager DtmTM = { DtmGetTransactionStatus, DtmSetTransactionStatus, DtmGetSnapshot, DtmCopySnapshot, DtmGetNextXid };
81+
7982

8083
#define XTM_TRACE(fmt, ...)
8184
#define XTM_INFO(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
8285
//#define XTM_INFO(fmt, ...)
8386

84-
static void DtmEnsureConnection(void)
85-
{
86-
int attempt = 0;
87-
XTM_TRACE("XTM: DtmEnsureConnection\n");
88-
while (attempt < XTM_CONNECT_ATTEMPTS) {
89-
if (DtmConn) {
90-
return;
91-
}
92-
XTM_TRACE("XTM: DtmEnsureConnection, attempt #%u\n", attempt);
93-
DtmConn = DtmConnect("127.0.0.1", 5431);
94-
attempt++;
95-
}
96-
elog(ERROR, "Failed to connect to DTMD");
97-
}
98-
9987
static void DumpSnapshot(Snapshot s, char *name)
10088
{
10189
int i;
@@ -247,17 +235,43 @@ static Snapshot DtmCopySnapshot(Snapshot snapshot)
247235
return newsnap;
248236
}
249237

238+
static TransactionId DtmGetNextXid()
239+
{
240+
TransactionId xid;
241+
if (TransactionIdIsValid(DtmNextXid)) {
242+
xid = DtmNextXid;
243+
DtmNextXid = InvalidTransactionId;
244+
} else {
245+
LWLockAcquire(dtm->xidLock, LW_EXCLUSIVE);
246+
if (dtm->nReservedXids == 0) {
247+
xid = DtmGlobalReserve(DtmLocalXidReserve);
248+
dtm->nReservedXids = DtmLocalXidReserve;
249+
ShmemVariableCache->nextXid = xid;
250+
}
251+
Assert(dtm->nextXid == ShmemVariableCache->nextXid);
252+
xid = ShmemVariableCache->nextXid;
253+
dtm->nextXid += 1;
254+
dtm->nReservedXids -= 1;
255+
LWLockRelease(dtm->xidLock);
256+
}
257+
return xid;
258+
}
250259

251260
static Snapshot DtmGetSnapshot()
252261
{
253-
CurrentTransactionSnapshot = GetLocalTransactionSnapshot();
254-
if (DtmGlobalTransaction && !IsolationUsesXactSnapshot()){ /* RU & RC */
255-
DtmEnsureConnection();
256-
DtmGlobalGetSnapshot(DtmConn, DtmNodeId, GetCurrentTransactionId(), &DtmSnapshot);
257-
DtmMergeSnapshots(CurrentTransactionSnapshot, &DtmSnapshot);
262+
Snapshot snapshot = GetLocalTransactionSnapshot();
263+
264+
if (DtmIsGlobalTransaction) {
265+
if (!IsolationUsesXactSnapshot()){ /* RU & RC */
266+
DtmEnsureConnection();
267+
DtmGlobalGetSnapshot(DtmConn, DtmNodeId, GetCurrentTransactionId(), &DtmSnapshot);
268+
}
269+
/* Should we actually perform that for RR & S levels? */
270+
DtmMergeSnapshots(snapshot, &DtmSnapshot);
258271
DtmUpdateRecentXmin();
259272
}
260-
return CurrentTransactionSnapshot;
273+
CurrentTransactionSnapshot = snapshot;
274+
return snapshot;
261275
}
262276

263277

@@ -280,18 +294,16 @@ static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, Transaction
280294
CurrentTransactionSnapshot = NULL;
281295
if (status == TRANSACTION_STATUS_ABORTED) {
282296
CLOGTransactionIdSetTreeStatus(xid, nsubxids, subxids, status, lsn);
283-
DtmEnsureConnection();
284-
DtmGlobalSetTransStatus(DtmConn, DtmNodeId, xid, status, false);
297+
DtmGlobalSetTransStatus(xid, status, false);
285298
XTM_INFO("Abort transaction %d\n", xid);
286299
return;
287300
} else {
288-
DtmEnsureConnection();
289301
XTM_INFO("Begin commit transaction %d\n", xid);
290302
DtmCurrentXid = xid;
291-
LWLockAcquire(dtm->lock, LW_EXCLUSIVE);
303+
LWLockAcquire(dtm->hashLock, LW_EXCLUSIVE);
292304
hash_search(xid_in_doubt, &DtmCurrentXid, HASH_ENTER, NULL);
293-
LWLockRelease(dtm->lock);
294-
if (!DtmGlobalSetTransStatus(DtmConn, DtmNodeId, xid, status, true)) {
305+
LWLockRelease(dtm->hashLock);
306+
if (!DtmGlobalSetTransStatus(xid, status, true)) {
295307
elog(ERROR, "DTMD failed to set transaction status");
296308
}
297309
XTM_INFO("Commit transaction %d\n", xid);
@@ -301,8 +313,7 @@ static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, Transaction
301313
}
302314
} else {
303315
XidStatus gs;
304-
DtmEnsureConnection();
305-
gs = DtmGlobalGetTransStatus(DtmConn, DtmNodeId, xid, false);
316+
gs = DtmGlobalGetTransStatus(xid, false);
306317
if (gs != TRANSACTION_STATUS_UNKNOWN) {
307318
status = gs;
308319
}
@@ -330,7 +341,9 @@ static void DtmInitialize()
330341
dtm = ShmemInitStruct("dtm", sizeof(DtmState), &found);
331342
if (!found)
332343
{
333-
dtm->lock = LWLockAssign();
344+
dtm->hashLock = LWLockAssign();
345+
dtm->xidLock = LWLockAssign();
346+
dtm->nReservedXids = 0;
334347
}
335348
LWLockRelease(AddinShmemInitLock);
336349

@@ -352,9 +365,9 @@ static void
352365
DtmXactCallback(XactEvent event, void *arg)
353366
{
354367
if (event == XACT_EVENT_COMMIT && DtmCurrentXid != InvalidTransactionId) {
355-
LWLockAcquire(dtm->lock, LW_EXCLUSIVE);
368+
LWLockAcquire(dtm->hashLock, LW_EXCLUSIVE);
356369
hash_search(xid_in_doubt, &DtmCurrentXid, HASH_REMOVE, NULL);
357-
LWLockRelease(dtm->lock);
370+
LWLockRelease(dtm->hashLock);
358371
}
359372
}
360373

@@ -383,14 +396,14 @@ _PG_init(void)
383396
* resources in imcs_shmem_startup().
384397
*/
385398
RequestAddinShmemSpace(DTM_SHMEM_SIZE);
386-
RequestAddinLWLocks(1);
399+
RequestAddinLWLocks(2);
387400

388-
DefineCustomIntVariable("dtm.node_id",
389-
"Identifier of node in distributed cluster for DTM",
401+
DefineCustomIntVariable("dtm.local_xid_reserve",
402+
"Number of XIDs reserved by node for local transactions",
390403
NULL,
391-
&DtmNodeId,
392-
0,
393-
0,
404+
&DtmLocalXidReserve,
405+
100,
406+
1,
394407
INT_MAX,
395408
PGC_BACKEND,
396409
0,
@@ -430,45 +443,66 @@ static void dtm_shmem_startup(void)
430443
PG_MODULE_MAGIC;
431444

432445
PG_FUNCTION_INFO_V1(dtm_begin_transaction);
446+
PG_FUNCTION_INFO_V1(dtm_get_snapshot);
447+
PG_FUNCTION_INFO_V1(dtm_new_snapshot);
433448
PG_FUNCTION_INFO_V1(dtm_get_current_snapshot_xmax);
434449
PG_FUNCTION_INFO_V1(dtm_get_current_snapshot_xmin);
435450

436451
Datum
437-
dtm_begin_transaction(PG_FUNCTION_ARGS)
452+
dtm_get_current_snapshot_xmin(PG_FUNCTION_ARGS)
438453
{
439-
GlobalTransactionId gtid;
440-
ArrayType* nodes = PG_GETARG_ARRAYTYPE_P(0);
441-
ArrayType* xids = PG_GETARG_ARRAYTYPE_P(1);
442-
gtid.xids = (TransactionId*)ARR_DATA_PTR(xids);
443-
gtid.nodes = (NodeId*)ARR_DATA_PTR(nodes);
444-
gtid.nNodes = ArrayGetNItems(ARR_NDIM(nodes), ARR_DIMS(nodes));
445-
DtmGlobalTransaction = true;
446-
Assert(gtid.xids[DtmNodeId] == GetCurrentTransactionId());
447-
XTM_INFO("Start transaction {%d,%d} at node %d, iso=%d\n", gtid.xids[0], gtid.xids[1], DtmNodeId, XactIsoLevel);
448-
if (DtmNodeId == gtid.nodes[0]) {
449-
DtmEnsureConnection();
450-
DtmGlobalStartTransaction(DtmConn, &gtid);
451-
}
452-
453-
if (IsolationUsesXactSnapshot()){ /* RR & S */
454-
DtmEnsureConnection();
455-
DtmGlobalGetSnapshot(DtmConn, DtmNodeId, gtid.xids[DtmNodeId], &DtmSnapshot);
456-
Assert(CurrentTransactionSnapshot != NULL);
457-
DtmMergeSnapshots(CurrentTransactionSnapshot, &DtmSnapshot);
458-
DtmUpdateRecentXmin();
459-
}
460-
461-
PG_RETURN_VOID();
454+
// if (IsolationUsesXactSnapshot()){ /* RR & S */
455+
// DtmEnsureConnection();
456+
// DtmGlobalGetSnapshot(DtmConn, DtmNodeId, gtid.xids[DtmNodeId], &DtmSnapshot);
457+
// Assert(CurrentTransactionSnapshot != NULL);
458+
// DtmMergeSnapshots(CurrentTransactionSnapshot, &DtmSnapshot);
459+
// DtmUpdateRecentXmin();
460+
// }
461+
PG_RETURN_INT32(CurrentTransactionSnapshot->xmin);
462462
}
463463

464464
Datum
465-
dtm_get_current_snapshot_xmin(PG_FUNCTION_ARGS)
465+
dtm_get_current_snapshot_xmax(PG_FUNCTION_ARGS)
466466
{
467-
PG_RETURN_INT64(CurrentTransactionSnapshot->xmin);
467+
PG_RETURN_INT32(CurrentTransactionSnapshot->xmax);
468468
}
469469

470+
470471
Datum
471-
dtm_get_current_snapshot_xmax(PG_FUNCTION_ARGS)
472+
dtm_begin_transaction(PG_FUNCTION_ARGS)
473+
{
474+
int nPaticipants = PG_GETARG_INT32(0);
475+
Assert(!TransactionIdIsValid(DtmNextXid));
476+
477+
DtmNextXid = DtmGlobalStartTransaction(nParticipants, &DtmSnapshot);
478+
Assert(TransactionIdIsValid(DtmNextXid));
479+
480+
DtmIsGlobalTransaction = true;
481+
482+
PG_RETURN_INT32(DtmNextXid);
483+
}
484+
485+
Datum dtm_get_snapshot(PG_FUNCTION_ARGS)
472486
{
473-
PG_RETURN_INT64(CurrentTransactionSnapshot->xmax);
487+
Assert(!TransactionIdIsValid(DtmNextXid));
488+
DtmNextXid = PG_GETARG_INT32(0);
489+
Assert(TransactionIdIsValid(DtmNextXid));
490+
491+
DtmNextXid = DtmGlobalGetSnapshot(DtmConn, DtmNextXid, &DtmSnapshot);
492+
493+
DtmIsGlobalTransaction = true;
494+
495+
PG_RETURN_VOID();
474496
}
497+
498+
Datum dtm_new_snapshot(PG_FUNCTION_ARGS)
499+
{
500+
Assert(!TransactionIdIsValid(DtmNextXid));
501+
DtmNextXid = PG_GETARG_INT32(0);
502+
Assert(TransactionIdIsValid(DtmNextXid));
503+
504+
DtmNextXid = DtmGlobalNewSnapshot(DtmConn, DtmNextXid, &DtmSnapshot);
505+
506+
PG_RETURN_VOID();
507+
}
508+

src/backend/access/transam/clog.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444
#include "miscadmin.h"
4545
#include "pg_trace.h"
4646

47-
TransactionManager DefaultTM = { CLOGTransactionIdGetStatus, CLOGTransactionIdSetTreeStatus, GetLocalTransactionSnapshot, CopyLocalSnapshot };
47+
TransactionManager DefaultTM = { CLOGTransactionIdGetStatus, CLOGTransactionIdSetTreeStatus, GetLocalTransactionSnapshot, CopyLocalSnapshot, GetNextTransactionId };
4848
TransactionManager* TM = &DefaultTM;
4949

5050
/*

0 commit comments

Comments
 (0)