Skip to content

Commit 6096224

Browse files
committed
Add recovery of the proper value for prev_/next_gxid in DTMD.
1 parent a4ca34c commit 6096224

File tree

3 files changed

+62
-23
lines changed

3 files changed

+62
-23
lines changed

contrib/pg_dtm/dtmd/include/raft.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,6 @@ void raft_tick(raft_t *r, int msec);
140140
void raft_handle_message(raft_t *r, raft_msg_t *m);
141141
raft_msg_t *raft_recv_message(raft_t *r);
142142
int raft_create_udp_socket(raft_t *r);
143-
void raft_start_next_term(raft_t *r);
143+
void raft_ensure_term(raft_t *r, int term);
144144

145145
#endif

contrib/pg_dtm/dtmd/src/main.c

Lines changed: 57 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,6 @@ Transaction* transaction_hash[MAX_TRANSACTIONS];
3131
// reserve something in (next, x) range otherwise, moving 'next' after 'x'.
3232
xid_t prev_gxid, next_gxid;
3333

34-
xid_t threshold_gxid; // when to start worrying about starting a new term
35-
xid_t last_gxid; // the greatest gxid we can provide on BEGIN or RESERVE
36-
3734
xid_t global_xmin = INVALID_XID;
3835

3936
static Transaction *find_transaction(xid_t xid) {
@@ -262,14 +259,41 @@ static void onhello(client_t client, int argc, xid_t *argv) {
262259
}
263260
}
264261

262+
// the greatest gxid we can provide on BEGIN or RESERVE
263+
static xid_t last_xid_in_term() {
264+
return raft.term * XIDS_PER_TERM - 1;
265+
}
266+
267+
static xid_t first_xid_in_term() {
268+
return (raft.term - 1) * XIDS_PER_TERM;
269+
}
270+
271+
static int xid2term(xid_t xid) {
272+
int term = xid / XIDS_PER_TERM + 1;
273+
return term;
274+
}
275+
276+
// when to start worrying about starting a new term
277+
static xid_t get_threshold_xid() {
278+
return last_xid_in_term() - NEW_TERM_THRESHOLD;
279+
}
280+
281+
static bool xid_is_safe(xid_t xid) {
282+
return xid <= last_xid_in_term();
283+
}
284+
285+
static bool xid_is_disturbing(xid_t xid) {
286+
return inrange(next_gxid + 1, get_threshold_xid(), xid);
287+
}
288+
265289
static void set_next_gxid(xid_t value) {
266290
assert(next_gxid < value); // The value should only grow.
267291

268292
if (use_raft && raft.role == ROLE_LEADER) {
269-
assert(value <= last_gxid);
270-
if (inrange(next_gxid + 1, threshold_gxid, value)) {
293+
assert(xid_is_safe(value));
294+
if (xid_is_disturbing(value)) {
271295
// Time to worry has come.
272-
raft_start_next_term(&raft);
296+
raft_ensure_term(&raft, xid2term(value));
273297
} else {
274298
// It is either too early to worry,
275299
// or we have already increased the term.
@@ -293,6 +317,15 @@ static void set_next_gxid(xid_t value) {
293317
next_gxid = value;
294318
}
295319

320+
static bool use_xid(xid_t xid) {
321+
if (!xid_is_safe(xid)) {
322+
return false;
323+
}
324+
shout("setting next_gxid to %u\n", xid + 1);
325+
set_next_gxid(xid + 1);
326+
return true;
327+
}
328+
296329
static void onreserve(client_t client, int argc, xid_t *argv) {
297330
CHECK(argc == 3, client, "RESERVE: wrong number of arguments");
298331

@@ -317,11 +350,10 @@ static void onreserve(client_t client, int argc, xid_t *argv) {
317350
minxid = max_of_xids(minxid, next_gxid);
318351
maxxid = max_of_xids(maxxid, minxid + minsize - 1);
319352
CHECK(
320-
maxxid <= last_gxid,
353+
use_xid(maxxid),
321354
client,
322355
"not enough xids left in this term"
323356
);
324-
set_next_gxid(maxxid + 1);
325357
}
326358
debug(
327359
"[%d] RESERVE: allocating range %u-%u\n",
@@ -371,13 +403,13 @@ static void onbegin(client_t client, int argc, xid_t *argv) {
371403
transaction_clear(t);
372404
l2_list_link(&active_transactions, &t->elem);
373405

406+
t->xid = next_gxid;
374407
CHECK(
375-
next_gxid <= last_gxid,
408+
use_xid(next_gxid),
376409
client,
377410
"not enought xids left in this term"
378411
);
379-
prev_gxid = t->xid = next_gxid;
380-
set_next_gxid(next_gxid + 1);
412+
prev_gxid = t->xid;
381413
t->snapshots_count = 0;
382414
t->size = 1;
383415

@@ -865,9 +897,16 @@ int main(int argc, char **argv) {
865897

866898
next_gxid = MIN_XID;
867899
clg = clog_open(datadir);
868-
set_next_gxid(clog_find_last_used(clg) + 1);
900+
901+
xid_t last_used_xid = clog_find_last_used(clg);
902+
shout("will use %u\n", last_used_xid);
903+
if (!use_xid(last_used_xid)) {
904+
shout("could not set last used xid to %u\n", last_used_xid);
905+
return EXIT_FAILURE;
906+
}
907+
raft.term = xid2term(next_gxid);
908+
869909
prev_gxid = next_gxid - 1;
870-
last_gxid = INVALID_XID;
871910
debug("initial next_gxid = %u\n", next_gxid);
872911
if (!clg) {
873912
shout("could not open clog at '%s'\n", datadir);
@@ -906,6 +945,7 @@ int main(int argc, char **argv) {
906945

907946
mstimer_t t;
908947
mstimer_reset(&t);
948+
int old_term = 0;
909949
while (true) {
910950
int ms = mstimer_reset(&t);
911951
raft_msg_t *m = NULL;
@@ -933,19 +973,16 @@ int main(int argc, char **argv) {
933973
server_set_enabled(server, raft.role == ROLE_LEADER);
934974

935975
// Update the gxid limits based on current term and leadership.
936-
xid_t recent_last_gxid = raft.term * XIDS_PER_TERM;
937-
if (last_gxid < recent_last_gxid) {
938-
shout("updating last_gxid from %u to %u\n", last_gxid, recent_last_gxid);
939-
last_gxid = recent_last_gxid;
940-
threshold_gxid = last_gxid - NEW_TERM_THRESHOLD;
976+
if (old_term < raft.term) {
941977
if (raft.role == ROLE_FOLLOWER) {
942978
// If we become a leader, we will use
943979
// the range of xids after the current
944980
// last_gxid.
945-
prev_gxid = last_gxid;
946-
next_gxid = prev_gxid + 1;
981+
prev_gxid = last_xid_in_term();
982+
set_next_gxid(prev_gxid + 1);
947983
shout("updated range to %u-%u\n", prev_gxid, next_gxid);
948984
}
985+
old_term = raft.term;
949986
}
950987
} else {
951988
server_set_enabled(server, true);

contrib/pg_dtm/dtmd/src/raft.c

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -565,9 +565,11 @@ static void raft_set_term(raft_t *r, int term) {
565565
r->votes = 0;
566566
}
567567

568-
void raft_start_next_term(raft_t *r) {
568+
void raft_ensure_term(raft_t *r, int term) {
569569
assert(r->role == ROLE_LEADER);
570-
r->term++;
570+
if (term > r->term) {
571+
r->term = term;
572+
}
571573
}
572574

573575
static void raft_handle_claim(raft_t *r, raft_msg_claim_t *m) {

0 commit comments

Comments
 (0)