Skip to content

Commit cae87f3

Browse files
committed
Better integration of DTM in FDW
1 parent c9f37d6 commit cae87f3

File tree

12 files changed

+63
-40
lines changed

12 files changed

+63
-40
lines changed

contrib/pg_xtm/dtmd/include/transaction.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ typedef struct Transaction {
1515
xid_t xid;
1616

1717
int size; // number of paritcipants
18-
int max_size; // maximal number of participants
1918

2019
// for + against ≤ size
2120
int votes_for;

contrib/pg_xtm/dtmd/src/main.c

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -257,18 +257,11 @@ static char *onbegin(void *stream, void *clientdata, cmd_t *cmd) {
257257
);
258258

259259
CHECK(
260-
cmd->argc == 1,
260+
cmd->argc == 0,
261261
clientdata,
262262
"BEGIN: wrong number of arguments"
263263
);
264264

265-
int size = cmd->argv[0];
266-
CHECK(
267-
size <= MAX_NODES,
268-
clientdata,
269-
"BEGIN: 'size' > MAX_NODES"
270-
);
271-
272265
CHECK(
273266
CLIENT_XID(clientdata) == INVALID_XID,
274267
clientdata,
@@ -280,7 +273,6 @@ static char *onbegin(void *stream, void *clientdata, cmd_t *cmd) {
280273

281274
prev_gxid = t->xid = next_gxid++;
282275
t->snapshots_count = 0;
283-
t->max_size = size;
284276
t->size = 1;
285277

286278
CLIENT_SNAPSENT(clientdata) = 0;
@@ -438,11 +430,6 @@ static char *onsnapshot(void *stream, void *clientdata, cmd_t *cmd) {
438430
CLIENT_SNAPSENT(clientdata) = 0;
439431
CLIENT_XID(clientdata) = t->xid;
440432
t->size += 1;
441-
CHECK(
442-
t->size <= t->max_size,
443-
clientdata,
444-
"SNAPSHOT: too many participants"
445-
);
446433
}
447434

448435
CHECK(

contrib/pg_xtm/dtmd/src/transaction.c

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ void transaction_clear(Transaction *t) {
2828

2929
t->xid = INVALID_XID;
3030
t->size = 0;
31-
t->max_size = 0;
3231
t->votes_for = 0;
3332
t->votes_against = 0;
3433
t->snapshots_count = 0;

contrib/pg_xtm/libdtm.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -281,15 +281,15 @@ void DtmInitSnapshot(Snapshot snapshot)
281281
// transaction id, fills the 'snapshot' and 'gxmin' on success. 'gxmin' is the
282282
// smallest xmin among all snapshots known to DTM. Returns INVALID_XID
283283
// otherwise.
284-
TransactionId DtmGlobalStartTransaction(int nParticipants, Snapshot snapshot, TransactionId *gxmin)
284+
TransactionId DtmGlobalStartTransaction(Snapshot snapshot, TransactionId *gxmin)
285285
{
286286
bool ok;
287287
xid_t xid;
288288
xid_t number;
289289
DTMConn dtm = GetConnection();
290290

291291
// query
292-
if (!dtm_query(dtm, 'b', 1, nParticipants)) goto failure;
292+
if (!dtm_query(dtm, 'b', 0)) goto failure;
293293

294294
// response
295295
if (!dtm_read_bool(dtm, &ok)) goto failure;

contrib/pg_xtm/libdtm.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,11 @@
1010

1111
void DtmInitSnapshot(Snapshot snapshot);
1212

13-
// Starts a new global transaction of nParticipants size. Returns the
13+
// Starts a new global transaction. Returns the
1414
// transaction id, fills the 'snapshot' and 'gxmin' on success. 'gxmin' is the
1515
// smallest xmin among all snapshots known to DTM. Returns INVALID_XID
1616
// otherwise.
17-
TransactionId DtmGlobalStartTransaction(int nParticipants, Snapshot snapshot, TransactionId *gxmin);
17+
TransactionId DtmGlobalStartTransaction(Snapshot snapshot, TransactionId *gxmin);
1818

1919
// Asks the DTM for a fresh snapshot. Fills the 'snapshot' and 'gxmin' on
2020
// success. 'gxmin' is the smallest xmin among all snapshots known to DTM.

contrib/pg_xtm/pg_dtm--1.0.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
-- complain if script is sourced in psql, rather than via CREATE EXTENSION
22
\echo Use "CREATE EXTENSION pg_dtm" to load this file. \quit
33

4-
CREATE FUNCTION dtm_begin_transaction(n_participants integer) RETURNS integer
4+
CREATE FUNCTION dtm_begin_transaction() RETURNS integer
55
AS 'MODULE_PATHNAME','dtm_begin_transaction'
66
LANGUAGE C;
77

contrib/pg_xtm/pg_dtm.c

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ static bool DtmTransactionIdIsInProgress(TransactionId xid);
7474
static TransactionId DtmGetNextXid(void);
7575
static TransactionId DtmGetNewTransactionId(bool isSubXact);
7676
static TransactionId DtmGetOldestXmin(Relation rel, bool ignoreVacuum);
77+
static TransactionId DtmGetGlobalTransactionId(void);
7778

7879
static bool TransactionIdIsInSnapshot(TransactionId xid, Snapshot snapshot);
7980
static bool TransactionIdIsInDoubt(TransactionId xid);
@@ -92,7 +93,7 @@ static bool DtmGlobalXidAssigned;
9293
static int DtmLocalXidReserve;
9394
static int DtmCurcid;
9495
static Snapshot DtmLastSnapshot;
95-
static TransactionManager DtmTM = { DtmGetTransactionStatus, DtmSetTransactionStatus, DtmGetSnapshot, DtmGetNewTransactionId, DtmGetOldestXmin, DtmTransactionIdIsInProgress };
96+
static TransactionManager DtmTM = { DtmGetTransactionStatus, DtmSetTransactionStatus, DtmGetSnapshot, DtmGetNewTransactionId, DtmGetOldestXmin, DtmTransactionIdIsInProgress, DtmGetGlobalTransactionId };
9697

9798

9899
#define XTM_TRACE(fmt, ...)
@@ -323,6 +324,12 @@ static TransactionId DtmGetNextXid()
323324
return xid;
324325
}
325326

327+
TransactionId
328+
DtmGetGlobalTransactionId()
329+
{
330+
return DtmNextXid;
331+
}
332+
326333
TransactionId
327334
DtmGetNewTransactionId(bool isSubXact)
328335
{
@@ -667,8 +674,8 @@ static void DtmInitialize()
667674
static void
668675
DtmXactCallback(XactEvent event, void *arg)
669676
{
677+
XTM_INFO("%d: DtmXactCallbackevent=%d isGlobal=%d, nextxid=%d\n", getpid(), event, DtmGlobalXidAssigned, DtmNextXid);
670678
if (event == XACT_EVENT_COMMIT || event == XACT_EVENT_ABORT) {
671-
XTM_INFO("%d: DtmXactCallbackevent=%d isGlobal=%d, nextxid=%d\n", getpid(), event, DtmGlobalXidAssigned, DtmNextXid);
672679
if (DtmGlobalXidAssigned) {
673680
DtmGlobalXidAssigned = false;
674681
} else if (TransactionIdIsValid(DtmNextXid)) {
@@ -780,10 +787,9 @@ dtm_get_current_snapshot_xmax(PG_FUNCTION_ARGS)
780787
Datum
781788
dtm_begin_transaction(PG_FUNCTION_ARGS)
782789
{
783-
int nParticipants = PG_GETARG_INT32(0);
784790
Assert(!TransactionIdIsValid(DtmNextXid));
785791

786-
DtmNextXid = DtmGlobalStartTransaction(nParticipants, &DtmSnapshot, &dtm->minXid);
792+
DtmNextXid = DtmGlobalStartTransaction(&DtmSnapshot, &dtm->minXid);
787793
Assert(TransactionIdIsValid(DtmNextXid));
788794
XTM_INFO("%d: Start global transaction %d, dtm->minXid=%d\n", getpid(), DtmNextXid, dtm->minXid);
789795

contrib/pg_xtm/tests/transfers-fdw.go

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package main
33
import (
44
"fmt"
55
"sync"
6-
"strconv"
76
"math/rand"
87
"time"
98
"github.com/jackc/pgx"
@@ -91,7 +90,6 @@ func progress(total int, cCommits chan int, cAborts chan int) {
9190

9291
func transfer(id int, cCommits chan int, cAborts chan int, wg *sync.WaitGroup) {
9392
var err error
94-
var xid int32
9593
var nAborts = 0
9694
var nCommits = 0
9795
var myCommits = 0
@@ -106,10 +104,7 @@ func transfer(id int, cCommits chan int, cAborts chan int, wg *sync.WaitGroup) {
106104
account1 := rand.Intn(N_ACCOUNTS)
107105
account2 := ^rand.Intn(N_ACCOUNTS)
108106

109-
exec(conn, "begin")
110-
xid = execQuery(conn, "select dtm_begin_transaction(2)")
111-
exec(conn, "select postgres_fdw_exec('t_fdw'::regclass::oid, 'select public.dtm_join_transaction(" + strconv.Itoa(int(xid)) + ")')")
112-
exec(conn, "commit")
107+
exec(conn, "select dtm_begin_transaction()")
113108

114109
exec(conn, "begin transaction isolation level " + ISOLATION_LEVEL)
115110

@@ -141,25 +136,21 @@ func transfer(id int, cCommits chan int, cAborts chan int, wg *sync.WaitGroup) {
141136
func inspect(wg *sync.WaitGroup) {
142137
var sum int64
143138
var prevSum int64 = 0
144-
var xid int32
145139

146140
{
147141
conn, err := pgx.Connect(cfg1)
148142
checkErr(err)
149143

150144
for running {
151145

152-
exec(conn, "begin")
153-
xid = execQuery(conn, "select dtm_begin_transaction(2)")
154-
exec(conn, "select postgres_fdw_exec('t_fdw'::regclass::oid, 'select public.dtm_join_transaction(" + strconv.Itoa(int(xid)) + ")')")
155-
exec(conn, "commit")
146+
exec(conn, "select dtm_begin_transaction()")
156147

157148
exec(conn, "begin transaction isolation level " + ISOLATION_LEVEL)
158149

159150
sum = execQuery64(conn, "select sum(v) from t")
160151

161152
if (sum != prevSum) {
162-
fmt.Printf("Total=%d xid=%d\n", sum, xid)
153+
fmt.Printf("Total=%d\n", sum)
163154
prevSum = sum
164155
}
165156

contrib/pg_xtm/tests/transfers.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ func transfer(id int, cCommits chan int, cAborts chan int, wg *sync.WaitGroup) {
130130
src := conn[0]
131131
dst := conn[1]
132132

133-
xid = execQuery(src, "select dtm_begin_transaction(2)")
133+
xid = execQuery(src, "select dtm_begin_transaction()")
134134
exec(dst, "select dtm_join_transaction($1)", xid)
135135

136136
// start transaction
@@ -176,7 +176,7 @@ func inspect(wg *sync.WaitGroup) {
176176
checkErr(err)
177177

178178
for running {
179-
xid = execQuery(conn1, "select dtm_begin_transaction(2)")
179+
xid = execQuery(conn1, "select dtm_begin_transaction()")
180180
exec(conn2, "select dtm_join_transaction($1)", xid)
181181

182182
exec(conn1, "begin transaction isolation level " + ISOLATION_LEVEL)

contrib/postgres_fdw/connection.c

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
#include "postgres_fdw.h"
1616

1717
#include "access/xact.h"
18+
#include "access/xtm.h"
19+
#include "access/transam.h"
1820
#include "mb/pg_wchar.h"
1921
#include "miscadmin.h"
2022
#include "utils/hsearch.h"
@@ -401,11 +403,21 @@ begin_remote_xact(ConnCacheEntry *entry)
401403
/* Start main transaction if we haven't yet */
402404
if (entry->xact_depth <= 0)
403405
{
406+
TransactionId gxid = GetTransactionManager()->GetGlobalTransactionId();
404407
const char *sql;
405408

406409
elog(DEBUG3, "starting remote transaction on connection %p",
407410
entry->conn);
408411

412+
if (TransactionIdIsValid(gxid)) {
413+
char stmt[64];
414+
PGresult *res;
415+
416+
snprintf(stmt, sizeof(stmt), "select public.dtm_join_transaction(%d)", gxid);
417+
res = PQexec(entry->conn, stmt);
418+
PQclear(res);
419+
}
420+
409421
if (IsolationIsSerializable())
410422
sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
411423
else

src/backend/access/transam/clog.c

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,21 @@
4444
#include "miscadmin.h"
4545
#include "pg_trace.h"
4646

47-
TransactionManager DefaultTM = { CLOGTransactionIdGetStatus, CLOGTransactionIdSetTreeStatus, GetLocalSnapshotData, GetNewLocalTransactionId, GetOldestLocalXmin, TransactionIdIsRunning };
47+
static TransactionId GetGlobalTransactionId(void);
48+
49+
TransactionId GetGlobalTransactionId(void)
50+
{
51+
return InvalidTransactionId;
52+
}
53+
54+
TransactionManager DefaultTM = { CLOGTransactionIdGetStatus, CLOGTransactionIdSetTreeStatus, GetLocalSnapshotData, GetNewLocalTransactionId, GetOldestLocalXmin, TransactionIdIsRunning, GetGlobalTransactionId };
4855
TransactionManager* TM = &DefaultTM;
4956

57+
TransactionManager* GetTransactionManager(void)
58+
{
59+
return TM;
60+
}
61+
5062
/*
5163
* Defines for CLOG page sizes. A page is the same BLCKSZ as is used
5264
* everywhere else in Postgres.

src/include/access/xtm.h

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,31 @@
1717

1818
typedef struct
1919
{
20+
/* Get current transaction status (encapsulation of TransactionIdGetStatus in clog.c) */
2021
XidStatus (*GetTransactionStatus)(TransactionId xid, XLogRecPtr *lsn);
22+
23+
/* Set current transaction status (encapsulation of TransactionIdGetStatus in clog.c) */
2124
void (*SetTransactionStatus)(TransactionId xid, int nsubxids, TransactionId *subxids, XidStatus status, XLogRecPtr lsn);
25+
26+
/* Get current transaction snaphot (encapsulation of GetSnapshotData in procarray.c) */
2227
Snapshot (*GetSnapshot)(Snapshot snapshot);
28+
29+
/* Assign new Xid to transaction (encapsulation of GetNewTransactionId in varsup.c) */
2330
TransactionId (*GetNewTransactionId)(bool isSubXact);
31+
32+
/* Get oldest transaction Xid that was running when any current transaction was started (encapsulation of GetOldestXmin in procarray.c) */
2433
TransactionId (*GetOldestXmin)(Relation rel, bool ignoreVacuum);
34+
35+
/* Check if current transaction is not yet completed (encapsulation of TransactionIdIsInProgress in procarray.c) */
2536
bool (*IsInProgress)(TransactionId xid);
37+
38+
/* Get global transaction XID: returns XID of current transaction if it is global, InvalidTransactionId otherwise */
39+
TransactionId (*GetGlobalTransactionId)(void);
2640
} TransactionManager;
2741

42+
43+
TransactionManager* GetTransactionManager(void);
44+
2845
extern TransactionManager* TM;
2946
extern TransactionManager DefaultTM;
3047

0 commit comments

Comments
 (0)