Skip to content

Commit cec9a05

Browse files
committed
Merge branch 'xtm' of gitlab.postgrespro.ru:pgpro-dev/postgrespro into xtm
2 parents 9b75502 + d4b08b1 commit cec9a05

File tree

5 files changed

+55
-27
lines changed

5 files changed

+55
-27
lines changed

contrib/pg_xtm/dtmd/include/util.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ char *join_path(const char *dir, const char *file);
1616
bool inrange(xid_t min, xid_t x, xid_t max);
1717
int falloc(int fd, off64_t size);
1818

19-
#if 0
19+
#if 1
2020
#define shout(...)
2121
#else
2222
#define shout(...) \

contrib/pg_xtm/dtmd/src/main.c

Lines changed: 17 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,11 @@ static void free_client_data(client_data_t *cd) {
4545
int next_client_id = 0;
4646
static void onconnect(void **client) {
4747
*client = create_client_data(next_client_id++);
48-
//shout("[%d] connected\n", CLIENT_ID(*client));
48+
shout("[%d] connected\n", CLIENT_ID(*client));
4949
}
5050

5151
static void ondisconnect(void *client) {
52-
//shout("[%d] disconnected\n", CLIENT_ID(client));
52+
shout("[%d] disconnected\n", CLIENT_ID(client));
5353
free_client_data(client);
5454
}
5555

@@ -194,10 +194,10 @@ static char *onvote(void *client, cmd_t *cmd, int vote) {
194194
switch (global_transaction_status(transactions + i)) {
195195
case NEGATIVE:
196196
if (global_transaction_mark(clg, transactions + i, NEGATIVE)) {
197-
//shout(
198-
// "[%d] VOTE: global transaction aborted\n",
199-
// CLIENT_ID(client)
200-
//);
197+
shout(
198+
"[%d] VOTE: global transaction aborted\n",
199+
CLIENT_ID(client)
200+
);
201201
transactions[i] = transactions[transactions_count - 1];
202202
transactions_count--;
203203
return strdup("+");
@@ -210,14 +210,14 @@ static char *onvote(void *client, cmd_t *cmd, int vote) {
210210
return strdup("-");
211211
}
212212
case NEUTRAL:
213-
//shout("[%d] VOTE: vote counted\n", CLIENT_ID(client));
213+
shout("[%d] VOTE: vote counted\n", CLIENT_ID(client));
214214
return strdup("+");
215215
case POSITIVE:
216216
if (global_transaction_mark(clg, transactions + i, POSITIVE)) {
217-
//shout(
218-
// "[%d] VOTE: global transaction committed\n",
219-
// CLIENT_ID(client)
220-
//);
217+
shout(
218+
"[%d] VOTE: global transaction committed\n",
219+
CLIENT_ID(client)
220+
);
221221
transactions[i] = transactions[transactions_count - 1];
222222
transactions_count--;
223223
return strdup("+");
@@ -365,9 +365,8 @@ static char *onnoise(void *client, cmd_t *cmd) {
365365
// }
366366

367367
static char *oncmd(void *client, cmd_t *cmd) {
368-
//shout_cmd(client, cmd);
368+
shout_cmd(client, cmd);
369369

370-
// float started = now_s();
371370
char *result = NULL;
372371
switch (cmd->cmd) {
373372
case CMD_BEGIN:
@@ -388,8 +387,6 @@ static char *oncmd(void *client, cmd_t *cmd) {
388387
default:
389388
return onnoise(client, cmd);
390389
}
391-
// float elapsed = now_s() - started;
392-
// shout("cmd '%c' processed in %0.4f sec\n", cmd->cmd, elapsed);
393390
return result;
394391
}
395392

@@ -420,11 +417,11 @@ char *ondata(void *client, size_t len, char *data) {
420417
parser_t parser = CLIENT_PARSER(client);
421418
char *response = NULL;
422419

423-
//shout(
424-
// "[%d] got some data[%lu] %s\n",
425-
// CLIENT_ID(client),
426-
// len, data
427-
//);
420+
shout(
421+
"[%d] got some data[%lu] %s\n",
422+
CLIENT_ID(client),
423+
len, data
424+
);
428425

429426
// The idea is to feed each character through
430427
// the parser, which will return a cmd from

contrib/pg_xtm/pg_dtm.c

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,23 +43,30 @@ static XidStatus DtmGetTransactionStatus(TransactionId xid, XLogRecPtr *lsn);
4343
static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, TransactionId *subxids, XidStatus status, XLogRecPtr lsn);
4444
static XidStatus DtmGetGloabalTransStatus(TransactionId xid);
4545
static void DtmUpdateRecentXmin(void);
46-
static bool IsInDtmSnapshot(TransactionId xid);
46+
// static bool IsInDtmSnapshot(TransactionId xid);
4747
static bool DtmTransactionIsInProgress(TransactionId xid);
4848

4949
static NodeId DtmNodeId;
5050
static DTMConn DtmConn;
51-
static SnapshotData DtmSnapshot = {HeapTupleSatisfiesMVCC};
51+
static SnapshotData DtmSnapshot = { HeapTupleSatisfiesMVCC };
5252
static bool DtmHasSnapshot = false;
5353
static TransactionManager DtmTM = { DtmGetTransactionStatus, DtmSetTransactionStatus, DtmGetSnapshot, DtmTransactionIsInProgress };
5454
static DTMConn DtmConn;
5555

56+
#define XTM_TRACE(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
57+
#define XTM_CONNECT_ATTEMPTS 10
58+
5659
static void DtmEnsureConnection(void)
5760
{
58-
while (true) {
61+
int attempt = 0;
62+
XTM_TRACE("XTM: DtmEnsureConnection\n");
63+
while (attempt < XTM_CONNECT_ATTEMPTS) {
5964
if (DtmConn) {
6065
break;
61-
}
66+
}
67+
XTM_TRACE("XTM: DtmEnsureConnection, attempt #%u\n", attempt);
6268
DtmConn = DtmConnect("127.0.0.1", 5431);
69+
attempt++;
6370
}
6471
}
6572

@@ -68,6 +75,9 @@ static void DtmCopySnapshot(Snapshot dst, Snapshot src)
6875
int i, j, n;
6976
static TransactionId* buf;
7077
TransactionId prev = InvalidTransactionId;
78+
79+
XTM_TRACE("XTM: DtmCopySnapshot \n");
80+
7181
if (buf == NULL) {
7282
buf = (TransactionId *)malloc(GetMaxSnapshotSubxidCount() * sizeof(TransactionId) * 2);
7383
}
@@ -95,6 +105,9 @@ static void DtmCopySnapshot(Snapshot dst, Snapshot src)
95105
static void DtmUpdateRecentXmin(void)
96106
{
97107
TransactionId xmin = DtmSnapshot.xmin;
108+
109+
XTM_TRACE("XTM: DtmUpdateRecentXmin \n");
110+
98111
if (xmin != InvalidTransactionId) {
99112
xmin -= vacuum_defer_cleanup_age;
100113
if (!TransactionIdIsNormal(xmin)) {
@@ -112,6 +125,8 @@ static void DtmUpdateRecentXmin(void)
112125

113126
static Snapshot DtmGetSnapshot(Snapshot snapshot)
114127
{
128+
XTM_TRACE("XTM: DtmGetSnapshot \n");
129+
115130
if (DtmHasSnapshot) {
116131
DtmCopySnapshot(snapshot, &DtmSnapshot);
117132
DtmUpdateRecentXmin();
@@ -132,12 +147,16 @@ static bool IsInDtmSnapshot(TransactionId xid)
132147

133148
static bool DtmTransactionIsInProgress(TransactionId xid)
134149
{
150+
XTM_TRACE("XTM: DtmTransactionIsInProgress \n");
135151
return /*IsInDtmSnapshot(xid) || */ TransactionIdIsRunning(xid);
136152
}
137153

138154
static XidStatus DtmGetGloabalTransStatus(TransactionId xid)
139155
{
140156
unsigned delay = 1000;
157+
158+
XTM_TRACE("XTM: DtmGetGloabalTransStatus \n");
159+
141160
while (true) {
142161
XidStatus status;
143162
DtmEnsureConnection();
@@ -155,6 +174,7 @@ static XidStatus DtmGetGloabalTransStatus(TransactionId xid)
155174

156175
static XidStatus DtmGetTransactionStatus(TransactionId xid, XLogRecPtr *lsn)
157176
{
177+
XTM_TRACE("XTM: DtmGetTransactionStatus \n");
158178
XidStatus status = CLOGTransactionIdGetStatus(xid, lsn);
159179
#if 0
160180
if (status == TRANSACTION_STATUS_IN_PROGRESS) {
@@ -170,6 +190,7 @@ static XidStatus DtmGetTransactionStatus(TransactionId xid, XLogRecPtr *lsn)
170190

171191
static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, TransactionId *subxids, XidStatus status, XLogRecPtr lsn)
172192
{
193+
XTM_TRACE("XTM: DtmSetTransactionStatus %u = %u \n", xid, status);
173194
if (!RecoveryInProgress()) {
174195
if (DtmHasSnapshot) {
175196
/* Already should be IN_PROGRESS */
@@ -179,6 +200,7 @@ static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, Transaction
179200
DtmEnsureConnection();
180201
if (!DtmGlobalSetTransStatus(DtmConn, DtmNodeId, xid, status) && status != TRANSACTION_STATUS_ABORTED) {
181202
elog(ERROR, "DTMD failed to set transaction status");
203+
// elog(WARNING, "DTMD failed to set transaction status");
182204
}
183205
status = DtmGetGloabalTransStatus(xid);
184206
Assert(status == TRANSACTION_STATUS_ABORTED || status == TRANSACTION_STATUS_COMMITTED);
@@ -244,6 +266,9 @@ dtm_begin_transaction(PG_FUNCTION_ARGS)
244266
gtid.xids = (TransactionId*)ARR_DATA_PTR(xids);
245267
gtid.nodes = (NodeId*)ARR_DATA_PTR(nodes);
246268
gtid.nNodes = ArrayGetNItems(ARR_NDIM(nodes), ARR_DIMS(nodes));
269+
270+
XTM_TRACE("XTM: dtm_begin_transaction \n");
271+
247272
DtmEnsureConnection();
248273
DtmGlobalStartTransaction(DtmConn, &gtid);
249274
PG_RETURN_VOID();
@@ -255,6 +280,8 @@ dtm_get_snapshot(PG_FUNCTION_ARGS)
255280
DtmEnsureConnection();
256281
DtmGlobalGetSnapshot(DtmConn, DtmNodeId, GetCurrentTransactionId(), &DtmSnapshot);
257282

283+
XTM_TRACE("XTM: dtm_get_snapshot \n");
284+
258285
/* Move it to DtmGlobalGetSnapshot? */
259286
DtmHasSnapshot = true;
260287
PG_RETURN_VOID();

contrib/pg_xtm/tests/transfers.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,7 @@ func main() {
207207

208208
func exec(conn *pgx.Conn, stmt string, arguments ...interface{}) {
209209
var err error
210+
// fmt.Println(stmt)
210211
_, _ = conn.Exec(stmt, arguments... )
211212
checkErr(err)
212213
}

install.sh

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ rm -rf install
77

88
make install
99

10-
cd contrib/pg_gtm/
10+
cd contrib/pg_xtm/
1111

1212
make clean
1313
make
@@ -23,12 +23,15 @@ sed -i '' 's/#port =.*/port = 5433/' ./install/data2/postgresql.conf
2323
sed -i '' 's/#fsync =.*/fsync = off/' ./install/data1/postgresql.conf
2424
sed -i '' 's/#fsync =.*/fsync = off/' ./install/data2/postgresql.conf
2525

26+
echo 'dtm.node_id = 0' >> ./install/data1/postgresql.conf
27+
echo 'dtm.node_id = 1' >> ./install/data2/postgresql.conf
2628

2729
./install/bin/pg_ctl -D ./install/data1 -l ./install/data1/log start
2830
./install/bin/pg_ctl -D ./install/data2 -l ./install/data2/log start
2931

3032

31-
cd contrib/pg_gtm/dtmd
33+
cd contrib/pg_xtm/dtmd
3234
make clean
3335
make
36+
rm -rf /tmp/clog/*
3437
./bin/dtmd

0 commit comments

Comments
 (0)