Skip to content

Commit 8a7dfb1

Browse files
committed
Use hash table for checking in-doubt transactions
1 parent 0a03a1b commit 8a7dfb1

File tree

3 files changed

+118
-15
lines changed

3 files changed

+118
-15
lines changed

contrib/pg_xtm/libdtm.c

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,13 +91,12 @@ DTMConn DtmConnect(char *host, int port) {
9191

9292
for (a = addrs; a != NULL; a = a->ai_next) {
9393
DTMConn dtm;
94+
int one = 1;
9495
int sock = socket(a->ai_family, a->ai_socktype, a->ai_protocol);
9596
if (sock == -1) {
9697
perror("failed to create a socket");
9798
continue;
9899
}
99-
100-
int one = 1;
101100
setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one));
102101

103102
if (connect(sock, a->ai_addr, a->ai_addrlen) == -1) {

contrib/pg_xtm/pg_dtm.c

Lines changed: 114 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,14 @@
3333

3434
#include "libdtm.h"
3535

36-
#define MIN_DELAY 10000
37-
#define MAX_DELAY 100000
36+
typedef struct
37+
{
38+
LWLockId lock; /* protect access to hash table */
39+
} DtmState;
40+
41+
42+
#define DTM_SHMEM_SIZE (1024*1024)
43+
#define DTM_HASH_SIZE 1003
3844

3945
void _PG_init(void);
4046
void _PG_fini(void);
@@ -45,9 +51,19 @@ static void DtmCopySnapshot(Snapshot dst, Snapshot src);
4551
static XidStatus DtmGetTransactionStatus(TransactionId xid, XLogRecPtr *lsn);
4652
static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, TransactionId *subxids, XidStatus status, XLogRecPtr lsn);
4753
static void DtmUpdateRecentXmin(void);
54+
static void DtmInitialize();
55+
static void DtmXactCallback(XactEvent event, void *arg);
56+
4857
static bool TransactionIdIsInDtmSnapshot(Snapshot s, TransactionId xid);
4958
static bool TransactionIdIsInDoubt(Snapshot s, TransactionId xid);
5059

60+
static void dtm_shmem_startup(void);
61+
62+
static shmem_startup_hook_type prev_shmem_startup_hook;
63+
static HTAB* xid_in_doubt;
64+
static DtmState* dtm;
65+
static TransactionId DtmCurrentXid = InvalidTransactionId;
66+
5167
static NodeId DtmNodeId;
5268
static DTMConn DtmConn;
5369
static SnapshotData DtmSnapshot = { HeapTupleSatisfiesMVCC };
@@ -105,10 +121,13 @@ static bool TransactionIdIsInDtmSnapshot(Snapshot s, TransactionId xid)
105121

106122
static bool TransactionIdIsInDoubt(Snapshot s, TransactionId xid)
107123
{
124+
bool inDoubt;
125+
108126
if (!TransactionIdIsInDtmSnapshot(s, xid)) {
109-
XLogRecPtr lsn;
110-
XidStatus status = CLOGTransactionIdGetStatus(xid, &lsn);
111-
if (status != TRANSACTION_STATUS_IN_PROGRESS) {
127+
LWLockAcquire(dtm->lock, LW_SHARED);
128+
inDoubt = hash_search(xid_in_doubt, &xid, HASH_FIND, NULL) != NULL;
129+
LWLockRelease(dtm->lock);
130+
if (inDoubt) {
112131
XTM_INFO("Wait for transaction %d to complete\n", xid);
113132
XactLockTableWait(xid, NULL, NULL, XLTW_None);
114133
return true;
@@ -191,7 +210,7 @@ static void DtmUpdateRecentXmin(void)
191210
static Snapshot DtmGetSnapshot(Snapshot snapshot)
192211
{
193212
XTM_TRACE("XTM: DtmGetSnapshot \n");
194-
if (DtmGlobalTransaction && !DtmHasSnapshot) {
213+
if (DtmGlobalTransaction/* && !DtmHasSnapshot*/) {
195214
DtmHasSnapshot = true;
196215
DtmEnsureConnection();
197216
DtmGlobalGetSnapshot(DtmConn, DtmNodeId, GetCurrentTransactionId(), &DtmSnapshot);
@@ -224,7 +243,12 @@ static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, Transaction
224243
DtmGlobalTransaction = false;
225244
DtmEnsureConnection();
226245
XTM_INFO("Begin commit transaction %d\n", xid);
227-
CLOGTransactionIdSetTreeStatus(xid, nsubxids, subxids, TRANSACTION_STATUS_COMMITTED, lsn);
246+
247+
DtmCurrentXid = xid;
248+
LWLockAcquire(dtm->lock, LW_EXCLUSIVE);
249+
hash_search(xid_in_doubt, &DtmCurrentXid, HASH_ENTER, NULL);
250+
LWLockRelease(dtm->lock);
251+
228252
if (!DtmGlobalSetTransStatus(DtmConn, DtmNodeId, xid, status, true) && status != TRANSACTION_STATUS_ABORTED) {
229253
elog(ERROR, "DTMD failed to set transaction status");
230254
}
@@ -243,15 +267,80 @@ static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, Transaction
243267
CLOGTransactionIdSetTreeStatus(xid, nsubxids, subxids, status, lsn);
244268
}
245269

270+
static uint32 dtm_xid_hash_fn(const void *key, Size keysize)
271+
{
272+
return (uint32)*(TransactionId*)key;
273+
}
274+
275+
static int dtm_xid_match_fn(const void *key1, const void *key2, Size keysize)
276+
{
277+
return *(TransactionId*)key1 - *(TransactionId*)key2;
278+
}
279+
280+
281+
static void DtmInitialize()
282+
{
283+
bool found;
284+
static HASHCTL info;
285+
286+
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
287+
dtm = ShmemInitStruct("dtm", sizeof(DtmState), &found);
288+
if (!found)
289+
{
290+
dtm->lock = LWLockAssign();
291+
}
292+
LWLockRelease(AddinShmemInitLock);
293+
294+
info.keysize = sizeof(TransactionId);
295+
info.entrysize = sizeof(TransactionId);
296+
info.hash = dtm_xid_hash_fn;
297+
info.match = dtm_xid_match_fn;
298+
xid_in_doubt = ShmemInitHash("xid_in_doubt", DTM_HASH_SIZE, DTM_HASH_SIZE,
299+
&info,
300+
HASH_ELEM | HASH_FUNCTION | HASH_COMPARE);
301+
302+
RegisterXactCallback(DtmXactCallback, NULL);
303+
304+
TM = &DtmTM;
305+
}
306+
307+
static void
308+
DtmXactCallback(XactEvent event, void *arg)
309+
{
310+
if (event == XACT_EVENT_COMMIT && DtmCurrentXid != InvalidTransactionId) {
311+
LWLockAcquire(dtm->lock, LW_EXCLUSIVE);
312+
hash_search(xid_in_doubt, &DtmCurrentXid, HASH_REMOVE, NULL);
313+
LWLockRelease(dtm->lock);
314+
}
315+
}
316+
317+
246318
/*
247319
* ***************************************************************************
248320
*/
249321

250322
void
251323
_PG_init(void)
252324
{
253-
TM = &DtmTM;
254-
325+
/*
326+
* In order to create our shared memory area, we have to be loaded via
327+
* shared_preload_libraries. If not, fall out without hooking into any of
328+
* the main system. (We don't throw error here because it seems useful to
329+
* allow the cs_* functions to be created even when the
330+
* module isn't active. The functions must protect themselves against
331+
* being called then, however.)
332+
*/
333+
if (!process_shared_preload_libraries_in_progress)
334+
return;
335+
336+
/*
337+
* Request additional shared resources. (These are no-ops if we're not in
338+
* the postmaster process.) We'll allocate or attach to the shared
339+
* resources in imcs_shmem_startup().
340+
*/
341+
RequestAddinShmemSpace(DTM_SHMEM_SIZE);
342+
RequestAddinLWLocks(1);
343+
255344
DefineCustomIntVariable("dtm.node_id",
256345
"Identifier of node in distributed cluster for DTM",
257346
NULL,
@@ -264,6 +353,12 @@ _PG_init(void)
264353
NULL,
265354
NULL,
266355
NULL);
356+
357+
/*
358+
* Install hooks.
359+
*/
360+
prev_shmem_startup_hook = shmem_startup_hook;
361+
shmem_startup_hook = dtm_shmem_startup;
267362
}
268363

269364
/*
@@ -272,6 +367,16 @@ _PG_init(void)
272367
void
273368
_PG_fini(void)
274369
{
370+
shmem_startup_hook = prev_shmem_startup_hook;
371+
}
372+
373+
374+
static void dtm_shmem_startup(void)
375+
{
376+
if (prev_shmem_startup_hook) {
377+
prev_shmem_startup_hook();
378+
}
379+
DtmInitialize();
275380
}
276381

277382
/*

contrib/pg_xtm/tests/transfers.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package main
33
import (
44
"fmt"
55
"sync"
6-
// "math/rand"
6+
"math/rand"
77
"github.com/jackc/pgx"
88
)
99

@@ -93,7 +93,6 @@ func max(a, b int64) int64 {
9393

9494
func transfer(id int, wg *sync.WaitGroup) {
9595
var err error
96-
// var sum1, sum2, sum int32
9796
var xids []int32 = make([]int32, 2)
9897

9998
conn1, err := pgx.Connect(cfg1)
@@ -107,8 +106,8 @@ func transfer(id int, wg *sync.WaitGroup) {
107106
for i := 0; i < N_ITERATIONS; i++ {
108107
//amount := 2*rand.Intn(2) - 1
109108
amount := 1
110-
account1 := id//rand.Intn(N_ACCOUNTS)
111-
account2 := id//rand.Intn(N_ACCOUNTS)
109+
account1 := rand.Intn(N_ACCOUNTS)
110+
account2 := rand.Intn(N_ACCOUNTS)
112111

113112
// strt transaction
114113
exec(conn1, "begin")

0 commit comments

Comments
 (0)