Skip to content

Commit 877f5cb

Browse files
committed
Add dtm_recovery
1 parent c7ed87e commit 877f5cb

File tree

4 files changed

+171
-6
lines changed

4 files changed

+171
-6
lines changed

dtm_recovery/dtm_recovery.cpp

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
#include <iostream>
2+
#include <string>
3+
#include <vector>
4+
#include <set>
5+
6+
#include <pqxx/connection>
7+
#include <pqxx/transaction>
8+
#include <pqxx/nontransaction>
9+
10+
using namespace std;
11+
using namespace pqxx;
12+
13+
int main (int argc, char* argv[])
14+
{
15+
if (argc == 1){
16+
printf("Use -h to show usage options\n");
17+
return 1;
18+
}
19+
vector<string> connections;
20+
set<string> prepared_xacts;
21+
set<string> committed_xacts;
22+
bool verbose = false;
23+
for (int i = 1; i < argc; i++) {
24+
if (argv[i][0] == '-') {
25+
switch (argv[i][1]) {
26+
case 'C':
27+
case 'c':
28+
connections.push_back(string(argv[++i]));
29+
continue;
30+
case 'v':
31+
verbose = true;
32+
continue;
33+
}
34+
}
35+
printf("Perform recovery of pg_tsdtm cluster.\n"
36+
"Usage: dtm_recovery {options}\n"
37+
"Options:\n"
38+
"\t-c STR\tdatabase connection string\n"
39+
"\t-v\tverbose mode: print extra information while processing\n");
40+
return 1;
41+
}
42+
if (verbose) {
43+
cout << "Collecting information about prepared transactions...\n";
44+
}
45+
for (vector<string>::iterator ic = connections.begin(); ic != connections.end(); ++ic)
46+
{
47+
if (verbose) {
48+
cout << "Connecting to " << *ic << "...\n";
49+
}
50+
connection con(*ic);
51+
work txn(con);
52+
result r = txn.exec("select gid from pg_prepared_xacts");
53+
for (result::const_iterator it = r.begin(); it != r.end(); ++it)
54+
{
55+
string gid = it.at("gid").as(string());
56+
prepared_xacts.insert(gid);
57+
}
58+
txn.commit();
59+
}
60+
if (verbose) {
61+
cout << "Prepared transactions: ";
62+
for (set<string>::iterator it = prepared_xacts.begin(); it != prepared_xacts.end(); ++it)
63+
{
64+
cout << *it << ", ";
65+
}
66+
cout << "\nChecking which of them are committed...\n";
67+
}
68+
for (vector<string>::iterator ic = connections.begin(); ic != connections.end(); ++ic)
69+
{
70+
if (verbose) {
71+
cout << "Connecting to " << *ic << "...\n";
72+
}
73+
connection con(*ic);
74+
work txn(con);
75+
con.prepare("commit-check", "select * from pg_committed_xacts where gid=$1");
76+
for (set<string>::iterator it = prepared_xacts.begin(); it != prepared_xacts.end(); ++it)
77+
{
78+
string gid = *it;
79+
result r = txn.prepared("commit-check")(gid).exec();
80+
if (!r.empty()) {
81+
committed_xacts.insert(gid);
82+
}
83+
}
84+
txn.commit();
85+
}
86+
if (verbose) {
87+
cout << "Committed transactions: ";
88+
for (set<string>::iterator it = committed_xacts.begin(); it != committed_xacts.end(); ++it)
89+
{
90+
cout << *it << ", ";
91+
}
92+
cout << "\nCommitting them at all nodes...\n";
93+
}
94+
for (vector<string>::iterator ic = connections.begin(); ic != connections.end(); ++ic)
95+
{
96+
if (verbose) {
97+
cout << "Connecting to " << *ic << "...\n";
98+
}
99+
connection con(*ic);
100+
work txn(con);
101+
con.prepare("commit-check", "select * from pg_committed_xacts where gid=$1");
102+
con.prepare("commit-prepared", "commit prepared $1");
103+
con.prepare("rollback-prepared", "rollback prepared $1");
104+
result r = txn.exec("select gid from pg_prepared_xacts");
105+
for (result::const_iterator it = r.begin(); it != r.end(); ++it)
106+
{
107+
string gid = it.at("gid").as(string());
108+
result rc = txn.prepared("commit-check")(gid).exec();
109+
if (rc.empty()) {
110+
if (committed_xacts.find(gid) != committed_xacts.end()) {
111+
if (verbose) {
112+
cout << "Commit transaction " << gid << "\n";
113+
}
114+
txn.prepared("commit-prepared")(gid);
115+
} else {
116+
if (verbose) {
117+
cout << "Rollback transaction " << gid << "\n";
118+
}
119+
txn.prepared("rollback-prepared")(gid);
120+
}
121+
}
122+
}
123+
txn.commit();
124+
}
125+
if (verbose) {
126+
cout << "Recovery completed\n";
127+
}
128+
return 0;
129+
}

dtm_recovery/makefile

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
CXX=g++
2+
CXXFLAGS=-g -Wall -O0 -pthread
3+
4+
all: dtm_recovery
5+
6+
dtm_recovery: dtm_recovery.cpp
7+
$(CXX) $(CXXFLAGS) -o dtm_recovery dtm_recovery.cpp -lpqxx
8+
9+
clean:
10+
rm -f dtm_recovery

pg_dtm.c

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#include "access/xlog.h"
2626
#include "access/clog.h"
2727
#include "access/twophase.h"
28+
#include "executor/spi.h"
2829
#include "utils/hsearch.h"
2930
#include "utils/tqual.h"
3031
#include <utils/guc.h>
@@ -81,6 +82,7 @@ static DtmNodeState* local;
8182
static DtmCurrentTrans dtm_tx;
8283
static uint64 totalSleepInterrupts;
8384
static int DtmVacuumDelay;
85+
static bool DtmRecordCommits;
8486

8587
static Snapshot DtmGetSnapshot(Snapshot snapshot);
8688
static TransactionId DtmGetOldestXmin(Relation rel, bool ignoreVacuum);
@@ -211,6 +213,19 @@ _PG_init(void)
211213
NULL
212214
);
213215

216+
DefineCustomBoolVariable(
217+
"dtm.record_commits",
218+
"Store information about committed global transactions in pg_committed_xacts table",
219+
NULL,
220+
&DtmRecordCommit,
221+
false,
222+
PGC_BACKEND,
223+
0,
224+
NULL,
225+
NULL,
226+
NULL
227+
);
228+
214229

215230
/*
216231
* Install hooks.
@@ -684,6 +699,17 @@ void DtmLocalEndPrepare(GlobalTransactionId gtid, cid_t cid)
684699
DTM_TRACE((stderr, "Prepare transaction %u(%s) with CSN %lu\n", id->xid, gtid, cid));
685700
}
686701
SpinLockRelease(&local->lock);
702+
if (DtmRecordCommits) {
703+
char stmt[MAX_GTID_SIZE + 64];
704+
int rc;
705+
sprintf(stmt, "insert into pg_committed_xacts values ('%s')", gtid);
706+
SPI_connect();
707+
rc = SPI_execute(stmt, true, 0);
708+
SPI_finish();
709+
if (rc != SPI_OK_INSERT) {
710+
elog(ERROR, "Failed to insert GTID %s in table pg_committed_xacts", gtid);
711+
}
712+
}
687713
}
688714

689715
void DtmLocalCommitPrepared(DtmCurrentTrans* x, GlobalTransactionId gtid)

tests/dtmbench.cpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,17 @@ using namespace std;
1717
using namespace pqxx;
1818

1919
template<class T>
20-
class unique_ptr
20+
class my_unique_ptr
2121
{
2222
T* ptr;
2323

2424
public:
25-
unique_ptr(T* p = NULL) : ptr(p) {}
26-
~unique_ptr() { delete ptr; }
25+
my_unique_ptr(T* p = NULL) : ptr(p) {}
26+
~my_unique_ptr() { delete ptr; }
2727
T& operator*() { return *ptr; }
2828
T* operator->() { return ptr; }
2929
void operator=(T* p) { ptr = p; }
30-
void operator=(unique_ptr& other) {
30+
void operator=(my_unique_ptr& other) {
3131
ptr = other.ptr;
3232
other.ptr = NULL;
3333
}
@@ -122,15 +122,15 @@ int64_t execQuery( transaction_base& txn, char const* sql, ...)
122122
void* reader(void* arg)
123123
{
124124
thread& t = *(thread*)arg;
125-
vector< unique_ptr<connection> > conns(cfg.connections.size());
125+
vector< my_unique_ptr<connection> > conns(cfg.connections.size());
126126
for (size_t i = 0; i < conns.size(); i++) {
127127
conns[i] = new connection(cfg.connections[i]);
128128
}
129129
int64_t prevSum = 0;
130130

131131
while (running) {
132132
csn_t snapshot = 0;
133-
vector< unique_ptr<work> > txns(conns.size());
133+
vector< my_unique_ptr<work> > txns(conns.size());
134134
time_t start = getCurrentTime();
135135
for (size_t i = 0; i < conns.size(); i++) {
136136
txns[i] = new work(*conns[i]);

0 commit comments

Comments
 (0)