Skip to content

Commit 0dc4435

Browse files
committed
Make arbiter rememeber the next_gxid on restart.
1 parent 32ddae5 commit 0dc4435

File tree

5 files changed

+68
-24
lines changed

5 files changed

+68
-24
lines changed

contrib/pg_dtm/dtmd/include/clog.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,4 +39,7 @@ bool clog_forget(clog_t clog, xid_t until);
3939
// 'true' on success, 'false' otherwise.
4040
bool clog_close(clog_t clog);
4141

42+
// Returns the last used xid.
43+
xid_t clog_find_last_used(clog_t clog);
44+
4245
#endif

contrib/pg_dtm/dtmd/src/clog.c

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,3 +195,17 @@ bool clog_close(clog_t clog) {
195195
free(clog);
196196
return true;
197197
}
198+
199+
// Returns the last used xid.
200+
xid_t clog_find_last_used(clog_t clog) {
201+
xid_t last_used = INVALID_XID;
202+
clogfile_chain_t *chain = clog->lastfile;
203+
xid_t xid;
204+
for (xid = chain->file.min; xid <= chain->file.max; xid++) {
205+
int status = clogfile_get_status(&chain->file, xid);
206+
if ((last_used == INVALID_XID) || (status != BLANK)) {
207+
last_used = xid;
208+
}
209+
}
210+
return last_used;
211+
}

contrib/pg_dtm/dtmd/src/main.c

Lines changed: 37 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -135,21 +135,6 @@ static void notify_listeners(Transaction *t, int status) {
135135
}
136136
}
137137

138-
static void set_next_gxid(xid_t value) {
139-
assert(next_gxid < value);
140-
if (use_raft && raft.role == ROLE_LEADER) {
141-
assert(value <= last_gxid);
142-
if (inrange(next_gxid + 1, threshold_gxid, value)) {
143-
// Time to worry has come.
144-
raft_start_next_term(&raft);
145-
} else {
146-
// It is either too early to worry,
147-
// or we have already increased the term.
148-
}
149-
}
150-
next_gxid = value;
151-
}
152-
153138
static void apply_clog_update(int action, int argument) {
154139
int status = action;
155140
xid_t xid = argument;
@@ -277,6 +262,37 @@ static void onhello(client_t client, int argc, xid_t *argv) {
277262
}
278263
}
279264

265+
static void set_next_gxid(xid_t value) {
266+
assert(next_gxid < value); // The value should only grow.
267+
268+
if (use_raft && raft.role == ROLE_LEADER) {
269+
assert(value <= last_gxid);
270+
if (inrange(next_gxid + 1, threshold_gxid, value)) {
271+
// Time to worry has come.
272+
raft_start_next_term(&raft);
273+
} else {
274+
// It is either too early to worry,
275+
// or we have already increased the term.
276+
}
277+
}
278+
279+
// Check that old position is 'dirty'. It is used when dtmd restarts,
280+
// to find out a correct value for 'next_gxid'. If we do not remember
281+
// 'next_gxid' it will lead to reuse of xids, which is bad.
282+
assert((next_gxid == MIN_XID) || (clog_read(clg, next_gxid) == NEGATIVE));
283+
assert(clog_read(clg, value) == BLANK); // New position should be clean.
284+
if (!clog_write(clg, value, NEGATIVE)) { // Marked the new position as dirty.
285+
shout("could not mark xid = %u dirty\n", value);
286+
assert(false); // should not happen
287+
}
288+
if (!clog_write(clg, next_gxid, BLANK)) { // Cleaned the old position.
289+
shout("could not clean clean xid = %u from dirty state\n", next_gxid);
290+
assert(false); // should not happen
291+
}
292+
293+
next_gxid = value;
294+
}
295+
280296
static void onreserve(client_t client, int argc, xid_t *argv) {
281297
CHECK(argc == 3, client, "RESERVE: wrong number of arguments");
282298

@@ -360,8 +376,8 @@ static void onbegin(client_t client, int argc, xid_t *argv) {
360376
client,
361377
"not enought xids left in this term"
362378
);
363-
set_next_gxid(next_gxid + 1);
364379
prev_gxid = t->xid = next_gxid;
380+
set_next_gxid(next_gxid + 1);
365381
t->snapshots_count = 0;
366382
t->size = 1;
367383

@@ -847,7 +863,12 @@ int main(int argc, char **argv) {
847863

848864
if (!redirect_output()) return EXIT_FAILURE;
849865

866+
next_gxid = MIN_XID;
850867
clg = clog_open(datadir);
868+
set_next_gxid(clog_find_last_used(clg) + 1);
869+
prev_gxid = next_gxid - 1;
870+
last_gxid = INVALID_XID;
871+
debug("initial next_gxid = %u\n", next_gxid);
851872
if (!clg) {
852873
shout("could not open clog at '%s'\n", datadir);
853874
return EXIT_FAILURE;
@@ -866,9 +887,6 @@ int main(int argc, char **argv) {
866887

867888
write_pid(pidpath, getpid());
868889

869-
prev_gxid = MIN_XID;
870-
next_gxid = MIN_XID;
871-
last_gxid = INVALID_XID;
872890

873891
int raftsock = raft_create_udp_socket(&raft);
874892
if (raftsock == -1) {

contrib/pg_dtm/tests/daemons.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"io"
77
"bufio"
88
"sync"
9+
"flag"
910
"os"
1011
"strconv"
1112
"strings"
@@ -153,6 +154,12 @@ func get_prefix(srcroot string) string {
153154
return "."
154155
}
155156

157+
var doInitDb bool = false
158+
func init() {
159+
flag.BoolVar(&doInitDb, "i", false, "perform initdb")
160+
flag.Parse()
161+
}
162+
156163
func main() {
157164
srcroot := "../../.."
158165
prefix := get_prefix(srcroot)
@@ -168,8 +175,10 @@ func main() {
168175

169176
check_bin(&bin);
170177

171-
for _, datadir := range datadirs {
172-
initdb(bin["initdb"], datadir)
178+
if doInitDb {
179+
for _, datadir := range datadirs {
180+
initdb(bin["initdb"], datadir)
181+
}
173182
}
174183

175184
var wg sync.WaitGroup

contrib/pg_dtm/tests/run.sh

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
./dtmbench \
2-
-c "dbname=postgres host=localhost user=knizhnik port=5432 sslmode=disable" \
3-
-c "dbname=postgres host=localhost user=knizhnik port=5433 sslmode=disable" \
4-
-c "dbname=postgres host=localhost user=knizhnik port=5434 sslmode=disable" \
2+
-c "dbname=postgres host=localhost port=5432 sslmode=disable" \
3+
-c "dbname=postgres host=localhost port=5433 sslmode=disable" \
4+
-c "dbname=postgres host=localhost port=5434 sslmode=disable" \
55
-n 1000 -a 1000 -w 10 -r 1 $*

0 commit comments

Comments
 (0)