Skip to content

Commit a6dde28

Browse files
committed
Sort xids in snapshot
1 parent 402ac5c commit a6dde28

File tree

12 files changed

+89
-45
lines changed

12 files changed

+89
-45
lines changed

contrib/pg_gtm/tests/transfers.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ func transfer(id int, wg *sync.WaitGroup) {
116116
snapshot = execQuery(conn1, "select dtm_extend($1)", gtid)
117117
snapshot = execQuery(conn2, "select dtm_access($1, $2)", snapshot, gtid)
118118

119-
exec(conn1, "update t set v = v + $1 where u=$2", amount, account1)
119+
exec(conn1, "update t set v = v + $1 - 1 where u=$2", amount, account1)
120120
exec(conn2, "update t set v = v - $1 where u=$2", amount, account2)
121121

122122
exec(conn1, "prepare transaction '" + gtid + "'")

contrib/pg_xtm/dtmd/src/eventwrap.c

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,14 @@ static void on_write(uv_write_t *req, int status) {
3333
}
3434

3535
static void on_read(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) {
36-
if (nread == -1) {
37-
shout("read failed\n");
36+
if (nread == UV_EOF) {
37+
ondisconnect_cb(stream->data);
3838
uv_close((uv_handle_t*)stream, NULL);
3939
return;
4040
}
4141

42-
if (nread == UV_EOF) {
43-
ondisconnect_cb(stream->data);
42+
if (nread < 0) {
43+
shout("read failed (error %d)\n", nread);
4444
uv_close((uv_handle_t*)stream, NULL);
4545
return;
4646
}

contrib/pg_xtm/dtmd/src/main.c

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -399,6 +399,12 @@ char *ondata(void *client, size_t len, char *data) {
399399
parser_t parser = CLIENT_PARSER(client);
400400
char *response = NULL;
401401

402+
shout(
403+
"[%d] got some data[%lu] %s\n",
404+
CLIENT_ID(client),
405+
len, data
406+
);
407+
402408
// The idea is to feed each character through
403409
// the parser, which will return a cmd from
404410
// time to time.
@@ -411,8 +417,9 @@ char *ondata(void *client, size_t len, char *data) {
411417
cmd_t *cmd = parser_feed(parser, data[i]);
412418
if (parser_failed(parser)) {
413419
shout(
414-
"[%d] parser failed: %s\n",
420+
"[%d] parser failed on character '%c' (%d): %s\n",
415421
CLIENT_ID(client),
422+
data[i], data[i],
416423
parser_errormsg(parser)
417424
);
418425
parser_init(parser);

contrib/pg_xtm/dtmd/src/snapshot.c

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,13 @@ static void append_hex16(char **cursorp, xid_t value) {
1515
*cursorp += written;
1616
}
1717

18+
static int compare_xid(void const* x, void const* y, size_t size)
19+
{
20+
xid_t xid1 = *(xid_t*)x;
21+
xid_t xid2 = *(xid_t*)y;
22+
return xid1 < xid2 ? -1 : xid1 == xid2 ? 0 : 1;
23+
}
24+
1825
char *snapshot_serialize(Snapshot *s) {
1926
assert(s->seqno > 0);
2027

@@ -30,6 +37,7 @@ char *snapshot_serialize(Snapshot *s) {
3037
append_hex16(&cursor, s->xmax);
3138
append_hex16(&cursor, s->nactive);
3239

40+
qsort(s->active, s->nactive, sizeof(xid_t), compare_xid);
3341
int i;
3442
for (i = 0; i < s->nactive; i++) {
3543
append_hex16(&cursor, s->active[i]);

contrib/pg_xtm/libdtm.c

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ typedef struct DTMConnData {
2020

2121
// Returns true if the write was successful.
2222
static bool dtm_write_char(DTMConn dtm, char c) {
23+
printf("writing %c\n", c);
2324
return write(dtm->sock, &c, 1) == 1;
2425
}
2526

@@ -55,6 +56,7 @@ static bool dtm_write_hex16(DTMConn dtm, xid_t i) {
5556
if (snprintf(buf, 17, "%016llx", i) != 16) {
5657
return false;
5758
}
59+
printf("writing %s\n", buf);
5860
return write(dtm->sock, buf, 16) == 16;
5961
}
6062

@@ -189,14 +191,20 @@ bool DtmGlobalGetSnapshot(DTMConn dtm, NodeId nodeid, TransactionId xid, Snapsho
189191
s->xcnt = number;
190192
Assert(s->xcnt == number); // the number should definitely fit into xcnt field size
191193

192-
if (s->xip) pfree(s->xip);
193-
s->xip = palloc(s->xcnt * sizeof(TransactionId));
194+
if (s->xip) free(s->xip);
195+
s->xip = malloc(s->xcnt * sizeof(TransactionId));
194196
for (i = 0; i < s->xcnt; i++) {
195197
if (!dtm_read_hex16(dtm, &number)) return false;
196198
s->xip[i] = number;
197199
Assert(s->xip[i] == number); // the number should fit into xip[i] size
198200
}
199201

202+
fprintf(stdout, "snapshot: xmin = %#x, xmax = %#x, active =", s->xmin, s->xmax);
203+
for (i = 0; i < s->xcnt; i++) {
204+
fprintf(stdout, " %#x", s->xip[i]);
205+
}
206+
fprintf(stdout, "\n");
207+
200208
return true;
201209
}
202210

contrib/pg_xtm/pg_dtm.c

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,13 @@ static void DtmEnsureConnection(void);
3939
static Snapshot DtmGetSnapshot(Snapshot snapshot);
4040
static XidStatus DtmGetTransactionStatus(TransactionId xid, XLogRecPtr *lsn);
4141
static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, TransactionId *subxids, XidStatus status, XLogRecPtr lsn);
42+
static bool DtmTransactionIsRunning(TransactionId xid);
4243
static NodeId DtmNodeId;
4344

4445
static DTMConn DtmConn;
4546
static SnapshotData DtmSnapshot = {HeapTupleSatisfiesMVCC};
4647
static bool DtmHasSnapshot = false;
47-
static TransactionManager DtmTM = { DtmGetTransactionStatus, DtmSetTransactionStatus, DtmGetSnapshot };
48+
static TransactionManager DtmTM = { DtmGetTransactionStatus, DtmSetTransactionStatus, DtmGetSnapshot, DtmTransactionIsRunning };
4849
static DTMConn DtmConn;
4950

5051
static void DtmEnsureConnection(void)
@@ -57,33 +58,50 @@ static void DtmEnsureConnection(void)
5758
}
5859
}
5960

61+
extern SnapshotData CatalogSnapshotData;
62+
6063
static Snapshot DtmGetSnapshot(Snapshot snapshot)
6164
{
62-
if (DtmHasSnapshot) {
65+
if (DtmHasSnapshot/* && snapshot != &CatalogSnapshotData*/) {
6366
return &DtmSnapshot;
6467
}
6568
return GetLocalSnapshotData(snapshot);
6669
}
6770

71+
static bool DtmTransactionIsRunning(TransactionId xid)
72+
{
73+
XLogRecPtr lsn;
74+
return DtmHasSnapshot
75+
? DtmGetTransactionStatus(xid, &lsn) == TRANSACTION_STATUS_IN_PROGRESS
76+
: TransactionIdIsRunning(xid);
77+
}
78+
6879
static XidStatus DtmGetTransactionStatus(TransactionId xid, XLogRecPtr *lsn)
6980
{
7081
XidStatus status = CLOGTransactionIdGetStatus(xid, lsn);
71-
if (status == TRANSACTION_STATUS_IN_PROGRESS) {
82+
if (status == TRANSACTION_STATUS_IN_PROGRESS && DtmHasSnapshot) {
7283
DtmEnsureConnection();
7384
status = DtmGlobalGetTransStatus(DtmConn, DtmNodeId, xid);
74-
CLOGTransactionIdSetTreeStatus(xid, 0, NULL, status, InvalidXLogRecPtr);
85+
if (status != TRANSACTION_STATUS_IN_PROGRESS) {
86+
CLOGTransactionIdSetTreeStatus(xid, 0, NULL, status, InvalidXLogRecPtr);
87+
}
7588
}
7689
return status;
7790
}
7891

7992

8093
static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, TransactionId *subxids, XidStatus status, XLogRecPtr lsn)
8194
{
82-
CLOGTransactionIdSetTreeStatus(xid, nsubxids, subxids, TRANSACTION_STATUS_IN_PROGRESS, lsn);
83-
DtmHasSnapshot = false;
84-
DtmEnsureConnection();
85-
if (DtmGlobalSetTransStatus(DtmConn, DtmNodeId, xid, status)) {
86-
elog(ERROR, "DTMD failed to set transaction status");
95+
if (DtmHasSnapshot) {
96+
/* Already should be IN_PROGRESS */
97+
/* CLOGTransactionIdSetTreeStatus(xid, nsubxids, subxids, TRANSACTION_STATUS_IN_PROGRESS, lsn); */
98+
DtmHasSnapshot = false;
99+
DtmEnsureConnection();
100+
if (!DtmGlobalSetTransStatus(DtmConn, DtmNodeId, xid, status) && status != TRANSACTION_STATUS_ABORTED) {
101+
elog(ERROR, "DTMD failed to set transaction status");
102+
}
103+
} else {
104+
CLOGTransactionIdSetTreeStatus(xid, nsubxids, subxids, status, lsn);
87105
}
88106
}
89107

@@ -97,14 +115,14 @@ _PG_init(void)
97115
{
98116
TM = &DtmTM;
99117

100-
DefineCustomIntVariable("dtm.node.id",
118+
DefineCustomIntVariable("dtm.node_id",
101119
"Identifier of node in distributed cluster for DTM",
102120
NULL,
103121
&DtmNodeId,
104122
0,
105123
0,
106124
INT_MAX,
107-
PGC_POSTMASTER,
125+
PGC_BACKEND,
108126
0,
109127
NULL,
110128
NULL,

contrib/pg_xtm/tests/transfers.go

Lines changed: 13 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package main
33
import (
44
"fmt"
55
"sync"
6-
"strconv"
76
"math/rand"
87
"github.com/jackc/pgx"
98
)
@@ -29,11 +28,10 @@ var cfg2 = pgx.ConnConfig{
2928
}
3029

3130
var running = false
31+
var nodes []int32 = []int32{0,1}
3232

3333
func prepare_db() {
34-
var xids [2]int
35-
var csn int64a
36-
nodes := []int{0,1}
34+
var xids []int32 = make([]int32, 2)
3735

3836
conn1, err := pgx.Connect(cfg1)
3937
checkErr(err)
@@ -62,7 +60,7 @@ func prepare_db() {
6260
xids[1] = execQuery(conn2, "select txid_current()")
6361

6462
// register global transaction in DTMD
65-
exec(conn1, "select dtm_global_transaction($1)", nodes, xids)
63+
exec(conn1, "select dtm_global_transaction($1, $2)", nodes, xids)
6664

6765
// first global statement
6866
exec(conn1, "select dtm_get_snapshot()")
@@ -77,9 +75,6 @@ func prepare_db() {
7775
exec(conn1, "select dtm_get_snapshot()")
7876
exec(conn2, "select dtm_get_snapshot()")
7977

80-
sum1 = execQuery(conn1, "select sum(v) from t")
81-
sum2 = execQuery(conn2, "select sum(v) from t")
82-
8378
// commit work
8479
exec(conn1, "commit")
8580
exec(conn2, "commit")
@@ -95,7 +90,7 @@ func max(a, b int64) int64 {
9590

9691
func transfer(id int, wg *sync.WaitGroup) {
9792
var err error
98-
var xids [2]int
93+
var xids []int32 = make([]int32, 2)
9994

10095
conn1, err := pgx.Connect(cfg1)
10196
checkErr(err)
@@ -120,7 +115,7 @@ func transfer(id int, wg *sync.WaitGroup) {
120115
xids[1] = execQuery(conn2, "select txid_current()")
121116

122117
// register global transaction in DTMD
123-
exec(conn1, "select dtm_global_transaction($1)", xids)
118+
exec(conn1, "select dtm_global_transaction($1, $2)", nodes, xids)
124119

125120
// first global statement
126121
exec(conn1, "select dtm_get_snapshot()")
@@ -133,9 +128,6 @@ func transfer(id int, wg *sync.WaitGroup) {
133128
exec(conn1, "select dtm_get_snapshot()")
134129
exec(conn2, "select dtm_get_snapshot()")
135130

136-
sum1 = execQuery(conn1, "select sum(v) from t")
137-
sum2 = execQuery(conn2, "select sum(v) from t")
138-
139131
// commit work
140132
exec(conn1, "commit")
141133
exec(conn2, "commit")
@@ -146,11 +138,11 @@ func transfer(id int, wg *sync.WaitGroup) {
146138
wg.Done()
147139
}
148140

149-
func total() int64 {
141+
func total() int32 {
150142
var err error
151-
var sum1 int64
152-
var sum2 int64
153-
var xids [2]int
143+
var sum1 int32
144+
var sum2 int32
145+
var xids []int32 = make([]int32, 2)
154146

155147
conn1, err := pgx.Connect(cfg1)
156148
checkErr(err)
@@ -169,7 +161,7 @@ func total() int64 {
169161
xids[1] = execQuery(conn2, "select txid_current()")
170162

171163
// register global transaction in DTMD
172-
exec(conn1, "select dtm_global_transaction($1)", xids)
164+
exec(conn1, "select dtm_global_transaction($1, $2)", nodes, xids)
173165

174166
sum1 = execQuery(conn1, "select sum(v) from t")
175167
sum2 = execQuery(conn2, "select sum(v) from t")
@@ -182,7 +174,7 @@ func total() int64 {
182174
}
183175

184176
func totalrep(wg *sync.WaitGroup) {
185-
var prevSum int64 = 0
177+
var prevSum int32 = 0
186178
for running {
187179
sum := total()
188180
if (sum != prevSum) {
@@ -218,12 +210,12 @@ func exec(conn *pgx.Conn, stmt string, arguments ...interface{}) {
218210
checkErr(err)
219211
}
220212

221-
func execQuery(conn *pgx.Conn, stmt string, arguments ...interface{}) int64 {
213+
func execQuery(conn *pgx.Conn, stmt string, arguments ...interface{}) int32 {
222214
var err error
223215
var result int64
224216
err = conn.QueryRow(stmt, arguments...).Scan(&result)
225217
checkErr(err)
226-
return result
218+
return int32(result)
227219
}
228220

229221
func checkErr(err error) {

src/backend/access/transam/clog.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
#include "miscadmin.h"
4444
#include "pg_trace.h"
4545

46-
TransactionManager DefaultTM = { CLOGTransactionIdGetStatus, CLOGTransactionIdSetTreeStatus, GetLocalSnapshotData };
46+
TransactionManager DefaultTM = { CLOGTransactionIdGetStatus, CLOGTransactionIdSetTreeStatus, GetLocalSnapshotData, TransactionIdIsRunning };
4747
TransactionManager* TM = &DefaultTM;
4848

4949
/*

src/backend/storage/ipc/procarray.c

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -946,6 +946,12 @@ ProcArrayApplyXidAssignment(TransactionId topxid,
946946
LWLockRelease(ProcArrayLock);
947947
}
948948

949+
bool
950+
TransactionIdIsInProgress(TransactionId xid)
951+
{
952+
return TM->IsInProgress(xid);
953+
}
954+
949955
/*
950956
* TransactionIdIsInProgress -- is given transaction running in some backend
951957
*
@@ -973,7 +979,7 @@ ProcArrayApplyXidAssignment(TransactionId topxid,
973979
* PGXACT again anyway; see GetNewTransactionId).
974980
*/
975981
bool
976-
TransactionIdIsInProgress(TransactionId xid)
982+
TransactionIdIsRunning(TransactionId xid)
977983
{
978984
static TransactionId *xids = NULL;
979985
int nxids = 0;

src/backend/utils/time/tqual.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -974,6 +974,7 @@ HeapTupleSatisfiesMVCC(HeapTuple htup, Snapshot snapshot,
974974
{
975975
if (TransactionIdDidCommit(xvac))
976976
{
977+
elog(WARNING, "Mark tuple %d as invalid 1", HeapTupleHeaderGetRawXmin(tuple));
977978
SetHintBits(tuple, buffer, HEAP_XMIN_INVALID,
978979
InvalidTransactionId);
979980
return false;
@@ -996,6 +997,7 @@ HeapTupleSatisfiesMVCC(HeapTuple htup, Snapshot snapshot,
996997
InvalidTransactionId);
997998
else
998999
{
1000+
elog(WARNING, "Mark tuple %d as invalid 2", HeapTupleHeaderGetRawXmin(tuple));
9991001
SetHintBits(tuple, buffer, HEAP_XMIN_INVALID,
10001002
InvalidTransactionId);
10011003
return false;
@@ -1052,6 +1054,7 @@ HeapTupleSatisfiesMVCC(HeapTuple htup, Snapshot snapshot,
10521054
else
10531055
{
10541056
/* it must have aborted or crashed */
1057+
elog(WARNING, "Mark tuple %d as invalid 3", HeapTupleHeaderGetRawXmin(tuple));
10551058
SetHintBits(tuple, buffer, HEAP_XMIN_INVALID,
10561059
InvalidTransactionId);
10571060
return false;

src/include/access/xtm.h

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,10 @@
1616

1717
typedef struct
1818
{
19-
XidStatus (*GetTransactionStatus)(TransactionId xid, XLogRecPtr *lsn);
20-
void (*SetTransactionStatus)(TransactionId xid, int nsubxids, TransactionId *subxids, XidStatus status, XLogRecPtr lsn);
21-
Snapshot (*GetSnapshot)(Snapshot snapshot);
19+
XidStatus (*GetTransactionStatus)(TransactionId xid, XLogRecPtr *lsn);
20+
void (*SetTransactionStatus)(TransactionId xid, int nsubxids, TransactionId *subxids, XidStatus status, XLogRecPtr lsn);
21+
Snapshot (*GetSnapshot)(Snapshot snapshot);
22+
bool (*IsInProgress)(TransactionId xid);
2223
} TransactionManager;
2324

2425
extern TransactionManager* TM;

src/include/storage/procarray.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ extern bool ProcArrayInstallRestoredXmin(TransactionId xmin, PGPROC *proc);
5353
extern RunningTransactions GetRunningTransactionData(void);
5454

5555
extern bool TransactionIdIsInProgress(TransactionId xid);
56+
extern bool TransactionIdIsRunning(TransactionId xid);
5657
extern bool TransactionIdIsActive(TransactionId xid);
5758
extern TransactionId GetOldestXmin(Relation rel, bool ignoreVacuum);
5859
extern TransactionId GetOldestActiveTransactionId(void);

0 commit comments

Comments
 (0)