Skip to content

Commit 267b83c

Browse files
committed
Merge changes
2 parents b395714 + bc8d4bc commit 267b83c

33 files changed

+575
-455
lines changed

contrib/pg_dtm/README

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@ dtm_get_snapshot() RETURNS void
3636
libdtm api
3737
----------
3838

39+
// Sets up the host and port for DTM connection.
40+
// The defaults are "127.0.0.1" and 5431.
41+
void TuneToDtm(char *host, int port);
42+
3943
void DtmInitSnapshot(Snapshot snapshot);
4044

4145
// Starts a new global transaction of nParticipants size. Returns the

contrib/pg_dtm/libdtm.c

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@ typedef struct DTMConnData
2424

2525
typedef unsigned long long xid_t;
2626

27+
int DtmdPort;
28+
char* DtmdHost;
29+
30+
2731
// Returns true if the write was successful.
2832
static bool dtm_write_char(DTMConn dtm, char c)
2933
{
@@ -231,13 +235,20 @@ static bool dtm_query(DTMConn dtm, char cmd, int argc, ...)
231235
return true;
232236
}
233237

238+
static char *dtmhost = "127.0.0.1";
239+
static int dtmport = 5431;
240+
241+
void TuneToDtm(char *host, int port) {
242+
dtmhost = host;
243+
dtmport = port;
244+
}
245+
234246
static DTMConn GetConnection()
235247
{
236248
static DTMConn dtm = NULL;
237249
if (dtm == NULL)
238250
{
239-
// FIXME: add API for setting the host and port for dtm connection
240-
dtm = DtmConnect("127.0.0.1", 5431);
251+
dtm = DtmConnect(dtmhost, dtmport);
241252
if (dtm == NULL) {
242253
elog(ERROR, "Failed to connect to DTMD");
243254
}

contrib/pg_dtm/libdtm.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@
88

99
#define INVALID_XID 0
1010

11+
// Sets up the host and port for DTM connection.
12+
// The defaults are "127.0.0.1" and 5431.
13+
void TuneToDtm(char *host, int port);
14+
1115
void DtmInitSnapshot(Snapshot snapshot);
1216

1317
// Starts a new global transaction. Returns the

contrib/pg_dtm/pg_dtm--1.0.sql

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,7 @@ LANGUAGE C;
1616
CREATE FUNCTION dtm_get_current_snapshot_xmax() RETURNS integer
1717
AS 'MODULE_PATHNAME','dtm_get_current_snapshot_xmax'
1818
LANGUAGE C;
19+
20+
CREATE FUNCTION dtm_get_current_snapshot_xcnt() RETURNS integer
21+
AS 'MODULE_PATHNAME','dtm_get_current_snapshot_xcnt'
22+
LANGUAGE C;

contrib/pg_dtm/pg_dtm.c

Lines changed: 51 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,10 @@ static bool DtmGlobalXidAssigned;
8787
static int DtmLocalXidReserve;
8888
static int DtmCurcid;
8989
static Snapshot DtmLastSnapshot;
90-
static TransactionManager DtmTM = { DtmGetTransactionStatus, DtmSetTransactionStatus, DtmGetSnapshot, DtmGetNewTransactionId, DtmGetOldestXmin, TransactionIdIsRunning, DtmGetGlobalTransactionId };
90+
static TransactionManager DtmTM = { DtmGetTransactionStatus, DtmSetTransactionStatus, DtmGetSnapshot, DtmGetNewTransactionId, DtmGetOldestXmin, PgTransactionIdIsInProgress, DtmGetGlobalTransactionId, PgXidInMVCCSnapshot };
91+
92+
static char *DtmHost;
93+
static int DtmPort;
9194

9295

9396
#define XTM_TRACE(fmt, ...)
@@ -169,7 +172,7 @@ static void DtmMergeWithGlobalSnapshot(Snapshot dst)
169172
* Check that global and local snapshots are consistent: transactions marked as completed in global snapohsot
170173
* should be completed locally
171174
*/
172-
dst = GetLocalSnapshotData(dst);
175+
dst = PgGetSnapshotData(dst);
173176
for (i = 0; i < dst->xcnt; i++) {
174177
if (TransactionIdIsInDoubt(dst->xip[i])) {
175178
goto GetLocalSnapshot;
@@ -213,7 +216,7 @@ static void DtmMergeWithGlobalSnapshot(Snapshot dst)
213216
*/
214217
static TransactionId DtmGetOldestXmin(Relation rel, bool ignoreVacuum)
215218
{
216-
TransactionId localXmin = GetOldestLocalXmin(rel, ignoreVacuum);
219+
TransactionId localXmin = PgGetOldestXmin(rel, ignoreVacuum);
217220
TransactionId globalXmin = dtm->minXid;
218221
XTM_INFO("XTM: DtmGetOldestXmin localXmin=%d, globalXmin=%d\n", localXmin, globalXmin);
219222

@@ -526,7 +529,7 @@ static Snapshot DtmGetSnapshot(Snapshot snapshot)
526529
* which PRECEDS actual transaction for which Xid is received.
527530
* This transaction doesn't need to take in accountn global snapshot
528531
*/
529-
return GetLocalSnapshotData(snapshot);
532+
return PgGetSnapshotData(snapshot);
530533
}
531534
if (TransactionIdIsValid(DtmNextXid) && snapshot != &CatalogSnapshotData) {
532535
if (!DtmHasGlobalSnapshot && (snapshot != DtmLastSnapshot || DtmCurcid != snapshot->curcid)) {
@@ -543,7 +546,7 @@ static Snapshot DtmGetSnapshot(Snapshot snapshot)
543546
}
544547
} else {
545548
/* For local transactions and catalog snapshots use default GetSnapshotData implementation */
546-
snapshot = GetLocalSnapshotData(snapshot);
549+
snapshot = PgGetSnapshotData(snapshot);
547550
}
548551
DtmUpdateRecentXmin(snapshot);
549552
CurrentTransactionSnapshot = snapshot;
@@ -557,7 +560,7 @@ static XidStatus DtmGetTransactionStatus(TransactionId xid, XLogRecPtr *lsn)
557560
*/
558561
XidStatus status = xid >= ShmemVariableCache->nextXid
559562
? TRANSACTION_STATUS_IN_PROGRESS
560-
: CLOGTransactionIdGetStatus(xid, lsn);
563+
: PgTransactionIdGetStatus(xid, lsn);
561564
XTM_TRACE("XTM: DtmGetTransactionStatus\n");
562565
return status;
563566
}
@@ -569,7 +572,7 @@ static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, Transaction
569572
if (!DtmGlobalXidAssigned && TransactionIdIsValid(DtmNextXid)) {
570573
CurrentTransactionSnapshot = NULL;
571574
if (status == TRANSACTION_STATUS_ABORTED) {
572-
CLOGTransactionIdSetTreeStatus(xid, nsubxids, subxids, status, lsn);
575+
PgTransactionIdSetTreeStatus(xid, nsubxids, subxids, status, lsn);
573576
DtmGlobalSetTransStatus(xid, status, false);
574577
XTM_INFO("Abort transaction %d\n", xid);
575578
return;
@@ -592,7 +595,7 @@ static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, Transaction
592595
status = gs;
593596
}
594597
}
595-
CLOGTransactionIdSetTreeStatus(xid, nsubxids, subxids, status, lsn);
598+
PgTransactionIdSetTreeStatus(xid, nsubxids, subxids, status, lsn);
596599
}
597600

598601
static uint32 dtm_xid_hash_fn(const void *key, Size keysize)
@@ -713,6 +716,36 @@ _PG_init(void)
713716
NULL
714717
);
715718

719+
DefineCustomStringVariable(
720+
"dtm.host",
721+
"The host where DTM daemon resides",
722+
NULL,
723+
&DtmHost,
724+
"127.0.0.1",
725+
PGC_BACKEND, // context
726+
0, // flags,
727+
NULL, // GucStringCheckHook check_hook,
728+
NULL, // GucStringAssignHook assign_hook,
729+
NULL // GucShowHook show_hook
730+
);
731+
732+
DefineCustomIntVariable(
733+
"dtm.port",
734+
"The port DTM daemon is listening",
735+
NULL,
736+
&DtmPort,
737+
5431,
738+
1,
739+
INT_MAX,
740+
PGC_BACKEND,
741+
0,
742+
NULL,
743+
NULL,
744+
NULL
745+
);
746+
747+
TuneToDtm(DtmHost, DtmPort);
748+
716749
/*
717750
* Install hooks.
718751
*/
@@ -748,6 +781,7 @@ PG_FUNCTION_INFO_V1(dtm_begin_transaction);
748781
PG_FUNCTION_INFO_V1(dtm_join_transaction);
749782
PG_FUNCTION_INFO_V1(dtm_get_current_snapshot_xmax);
750783
PG_FUNCTION_INFO_V1(dtm_get_current_snapshot_xmin);
784+
PG_FUNCTION_INFO_V1(dtm_get_current_snapshot_xcnt);
751785

752786
Datum
753787
dtm_get_current_snapshot_xmin(PG_FUNCTION_ARGS)
@@ -761,11 +795,19 @@ dtm_get_current_snapshot_xmax(PG_FUNCTION_ARGS)
761795
PG_RETURN_INT32(CurrentTransactionSnapshot->xmax);
762796
}
763797

798+
Datum
799+
dtm_get_current_snapshot_xcnt(PG_FUNCTION_ARGS)
800+
{
801+
PG_RETURN_INT32(CurrentTransactionSnapshot->xcnt);
802+
}
803+
764804
Datum
765805
dtm_begin_transaction(PG_FUNCTION_ARGS)
766806
{
767807
Assert(!TransactionIdIsValid(DtmNextXid));
768-
808+
if (dtm == NULL) {
809+
elog(ERROR, "DTM is not properly initialized, please check that pg_dtm plugin was added to shared_preload_libraries list in postgresql.conf");
810+
}
769811
DtmNextXid = DtmGlobalStartTransaction(&DtmSnapshot, &dtm->minXid);
770812
Assert(TransactionIdIsValid(DtmNextXid));
771813
XTM_INFO("%d: Start global transaction %d, dtm->minXid=%d\n", getpid(), DtmNextXid, dtm->minXid);

contrib/pg_dtm/tests/daemons.go

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"sync"
99
"os"
1010
"strconv"
11+
"strings"
1112
)
1213

1314
func read_to_channel(r io.Reader, c chan string, wg *sync.WaitGroup) {
@@ -91,9 +92,12 @@ func postgres(bin string, datadir string, port int, nodeid int, wg *sync.WaitGro
9192
"-D", datadir,
9293
"-p", strconv.Itoa(port),
9394
"-c", "dtm.node_id=" + strconv.Itoa(nodeid),
95+
"-c", "dtm.host=127.0.0.2",
96+
"-c", "dtm.port=" + strconv.Itoa(5431),
9497
"-c", "autovacuum=off",
9598
"-c", "fsync=off",
9699
"-c", "synchronous_commit=off",
100+
"-c", "shared_preload_libraries=pg_dtm",
97101
}
98102
name := "postgres " + datadir
99103
c := make(chan string)
@@ -118,14 +122,33 @@ func check_bin(bin *map[string]string) {
118122
}
119123
}
120124

125+
func get_prefix(srcroot string) string {
126+
makefile, err := os.Open(srcroot + "/src/Makefile.global")
127+
if err != nil {
128+
return "."
129+
}
130+
131+
scanner := bufio.NewScanner(makefile)
132+
for scanner.Scan() {
133+
s := scanner.Text()
134+
if strings.HasPrefix(s, "prefix := ") {
135+
return strings.TrimPrefix(s, "prefix := ")
136+
}
137+
}
138+
return "."
139+
}
140+
121141
func main() {
142+
srcroot := "../../.."
143+
prefix := get_prefix(srcroot)
144+
122145
bin := map[string]string{
123-
"dtmd": "/home/kvap/postgrespro/contrib/pg_xtm/dtmd/bin/dtmd",
124-
"initdb": "/home/kvap/postgrespro-build/bin/initdb",
125-
"postgres": "/home/kvap/postgrespro-build/bin/postgres",
146+
"dtmd": srcroot + "/contrib/pg_dtm/dtmd/bin/dtmd",
147+
"initdb": prefix + "/bin/initdb",
148+
"postgres": prefix + "/bin/postgres",
126149
}
127150

128-
datadirs := []string{"/tmp/data1", "/tmp/data2"}
151+
datadirs := []string{"/tmp/data1", "/tmp/data2", "/tmp/data3"}
129152

130153
check_bin(&bin);
131154

0 commit comments

Comments
 (0)