Skip to content

Commit 8ead24b

Browse files
committed
Fix a dangling pointer bug in DTMD transaction listener queue.
1 parent 3a67512 commit 8ead24b

File tree

10 files changed

+132
-85
lines changed

10 files changed

+132
-85
lines changed

contrib/pg_dtm/dtmd/include/dtmdlimits.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,12 @@
11
#ifndef DTMD_LIMITS_H
22
#define DTMD_LIMITS_H
33

4+
// how many xids are reserved per raft term
5+
#define XIDS_PER_TERM 1000000
6+
7+
// start a new term when this number of xids is left
8+
#define NEW_TERM_THRESHOLD 100000
9+
410
#define MAX_TRANSACTIONS 4096
511

612
#define BUFFER_SIZE (64 * 1024)

contrib/pg_dtm/dtmd/include/server.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,5 @@ bool client_message_shortcut(client_t client, xid_t arg);
127127
bool client_redirect(client_t client, unsigned addr, int port);
128128

129129
unsigned client_get_ip_addr(client_t client);
130-
int client_ref(client_t client);
131-
int client_deref(client_t client);
132130

133131
#endif

contrib/pg_dtm/dtmd/src/main.c

Lines changed: 88 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@ Transaction* transaction_hash[MAX_TRANSACTIONS];
3030
// We reserve the local xids if they fit between (prev, next) range, and
3131
// reserve something in (next, x) range otherwise, moving 'next' after 'x'.
3232
xid_t prev_gxid, next_gxid;
33+
34+
xid_t threshold_gxid; // when to start worrying about starting a new term
35+
xid_t last_gxid; // the greatest gxid we can provide on BEGIN or RESERVE
36+
3337
xid_t global_xmin = INVALID_XID;
3438

3539
static Transaction *find_transaction(xid_t xid) {
@@ -41,7 +45,11 @@ static Transaction *find_transaction(xid_t xid) {
4145
typedef struct client_userdata_t {
4246
int id;
4347
int snapshots_sent;
44-
xid_t xid;
48+
49+
// FIXME: use some meaningful words for these. E.g. "expectee" instead
50+
// of "xwait".
51+
Transaction *xpart; // the transaction this client is participating in
52+
Transaction *xwait; // the transaction this client is waiting for
4553
} client_userdata_t;
4654

4755
clog_t clg;
@@ -51,13 +59,15 @@ bool use_raft;
5159
#define CLIENT_USERDATA(CLIENT) ((client_userdata_t*)client_get_userdata(CLIENT))
5260
#define CLIENT_ID(CLIENT) (CLIENT_USERDATA(CLIENT)->id)
5361
#define CLIENT_SNAPSENT(CLIENT) (CLIENT_USERDATA(CLIENT)->snapshots_sent)
54-
#define CLIENT_XID(CLIENT) (CLIENT_USERDATA(CLIENT)->xid)
62+
#define CLIENT_XPART(CLIENT) (CLIENT_USERDATA(CLIENT)->xpart)
63+
#define CLIENT_XWAIT(CLIENT) (CLIENT_USERDATA(CLIENT)->xwait)
5564

5665
static client_userdata_t *create_client_userdata(int id) {
5766
client_userdata_t *cd = malloc(sizeof(client_userdata_t));
5867
cd->id = id;
5968
cd->snapshots_sent = 0;
60-
cd->xid = INVALID_XID;
69+
cd->xpart = NULL;
70+
cd->xwait = NULL;
6171
return cd;
6272
}
6373

@@ -66,9 +76,10 @@ static void free_client_userdata(client_userdata_t *cd) {
6676
}
6777

6878
inline static void free_transaction(Transaction* t) {
69-
Transaction** tpp;
70-
for (tpp = &transaction_hash[t->xid % MAX_TRANSACTIONS]; *tpp != t; tpp = &(*tpp)->collision);
71-
*tpp = t->collision;
79+
assert(transaction_pop_listener(t, 's') == NULL);
80+
Transaction** tpp;
81+
for (tpp = &transaction_hash[t->xid % MAX_TRANSACTIONS]; *tpp != t; tpp = &(*tpp)->collision);
82+
*tpp = t->collision;
7283
l2_list_unlink(&t->elem);
7384
t->elem.next = free_transactions;
7485
free_transactions = &t->elem;
@@ -84,7 +95,7 @@ static void notify_listeners(Transaction *t, int status) {
8495
case BLANK:
8596
while ((listener = transaction_pop_listener(t, 's'))) {
8697
debug("[%d] notifying the client about xid=%u (unknown)\n", CLIENT_ID(listener), t->xid);
87-
shout("%p DEREF(notify): %d\n", listener, client_deref(listener));
98+
CLIENT_XWAIT(listener) = NULL;
8899
client_message_shortcut(
89100
(client_t)listener,
90101
RES_TRANSACTION_UNKNOWN
@@ -94,7 +105,7 @@ static void notify_listeners(Transaction *t, int status) {
94105
case NEGATIVE:
95106
while ((listener = transaction_pop_listener(t, 's'))) {
96107
debug("[%d] notifying the client about xid=%u (aborted)\n", CLIENT_ID(listener), t->xid);
97-
shout("%p DEREF(notify): %d\n", listener, client_deref(listener));
108+
CLIENT_XWAIT(listener) = NULL;
98109
client_message_shortcut(
99110
(client_t)listener,
100111
RES_TRANSACTION_ABORTED
@@ -104,7 +115,7 @@ static void notify_listeners(Transaction *t, int status) {
104115
case POSITIVE:
105116
while ((listener = transaction_pop_listener(t, 's'))) {
106117
debug("[%d] notifying the client about xid=%u (committed)\n", CLIENT_ID(listener), t->xid);
107-
shout("%p DEREF(notify): %d\n", listener, client_deref(listener));
118+
CLIENT_XWAIT(listener) = NULL;
108119
client_message_shortcut(
109120
(client_t)listener,
110121
RES_TRANSACTION_COMMITTED
@@ -114,7 +125,7 @@ static void notify_listeners(Transaction *t, int status) {
114125
case DOUBT:
115126
while ((listener = transaction_pop_listener(t, 's'))) {
116127
debug("[%d] notifying the client about xid=%u (inprogress)\n", CLIENT_ID(listener), t->xid);
117-
shout("%p DEREF(notify): %d\n", listener, client_deref(listener));
128+
CLIENT_XWAIT(listener) = NULL;
118129
client_message_shortcut(
119130
(client_t)listener,
120131
RES_TRANSACTION_INPROGRESS
@@ -124,6 +135,21 @@ static void notify_listeners(Transaction *t, int status) {
124135
}
125136
}
126137

138+
static void set_next_gxid(xid_t value) {
139+
assert(next_gxid < value);
140+
if (use_raft && raft.role == ROLE_LEADER) {
141+
assert(value <= last_gxid);
142+
if (inrange(next_gxid + 1, threshold_gxid, value)) {
143+
// Time to worry has come.
144+
raft.term++;
145+
} else {
146+
// It is either too early to worry,
147+
// or we have already increased the term.
148+
}
149+
}
150+
next_gxid = value;
151+
}
152+
127153
static void apply_clog_update(int action, int argument) {
128154
int status = action;
129155
xid_t xid = argument;
@@ -154,28 +180,20 @@ static void onconnect(client_t client) {
154180
}
155181

156182
static void ondisconnect(client_t client) {
157-
debug("[%d] disconnected\n", CLIENT_ID(client));
158-
159-
if (CLIENT_XID(client) != INVALID_XID) {
160-
Transaction* t = find_transaction(CLIENT_XID(client));
161-
if (t != NULL) {
162-
if (transaction_remove_listener(t, 's', client)) {
163-
shout("%p DEREF(disconn): %d\n", client, client_deref(client));
164-
} else {
165-
shout("%p DEREF(disconn): not found\n", client);
166-
}
167-
168-
if (use_raft && (raft.role == ROLE_LEADER)) {
169-
raft_emit(&raft, NEGATIVE, t->xid);
170-
}
171-
} else {
172-
shout(
173-
"[%d] DISCONNECT: transaction xid=%u not found O_o\n",
174-
CLIENT_ID(client), CLIENT_XID(client)
175-
);
183+
Transaction *t;
184+
debug("[%d, %p] disconnected\n", CLIENT_ID(client), client);
185+
186+
if ((t = CLIENT_XPART(client))) {
187+
transaction_remove_listener(t, 's', client);
188+
if (use_raft && (raft.role == ROLE_LEADER)) {
189+
raft_emit(&raft, NEGATIVE, t->xid);
176190
}
177191
}
178192

193+
if ((t = CLIENT_XWAIT(client))) {
194+
transaction_remove_listener(t, 's', client);
195+
}
196+
179197
free_client_userdata(CLIENT_USERDATA(client));
180198
client_set_userdata(client, NULL);
181199
}
@@ -274,15 +292,20 @@ static void onreserve(client_t client, int argc, xid_t *argv) {
274292

275293
if ((prev_gxid >= minxid) || (maxxid >= next_gxid)) {
276294
debug(
277-
"[%d] RESERVE: local range %u-%u is not between global range %u-%u\n",
295+
"[%d] RESERVE: local range %u-%u is not inside global range %u-%u\n",
278296
CLIENT_ID(client),
279297
minxid, maxxid,
280298
prev_gxid, next_gxid
281299
);
282300

283301
minxid = max_of_xids(minxid, next_gxid);
284302
maxxid = max_of_xids(maxxid, minxid + minsize - 1);
285-
next_gxid = maxxid + 1;
303+
CHECK(
304+
maxxid <= last_gxid,
305+
client,
306+
"not enough xids left in this term"
307+
);
308+
set_next_gxid(maxxid + 1);
286309
}
287310
debug(
288311
"[%d] RESERVE: allocating range %u-%u\n",
@@ -318,7 +341,7 @@ static void onbegin(client_t client, int argc, xid_t *argv) {
318341
);
319342

320343
CHECK(
321-
CLIENT_XID(client) == INVALID_XID,
344+
CLIENT_XPART(client) == NULL,
322345
client,
323346
"BEGIN: already participating in another transaction"
324347
);
@@ -332,15 +355,21 @@ static void onbegin(client_t client, int argc, xid_t *argv) {
332355
transaction_clear(t);
333356
l2_list_link(&active_transactions, &t->elem);
334357

335-
prev_gxid = t->xid = next_gxid++;
358+
CHECK(
359+
next_gxid <= last_gxid,
360+
client,
361+
"not enought xids left in this term"
362+
);
363+
set_next_gxid(next_gxid + 1);
364+
prev_gxid = t->xid = next_gxid;
336365
t->snapshots_count = 0;
337366
t->size = 1;
338367

339-
t->collision = transaction_hash[t->xid % MAX_TRANSACTIONS];
340-
transaction_hash[t->xid % MAX_TRANSACTIONS] = t;
368+
t->collision = transaction_hash[t->xid % MAX_TRANSACTIONS];
369+
transaction_hash[t->xid % MAX_TRANSACTIONS] = t;
341370

342371
CLIENT_SNAPSENT(client) = 0;
343-
CLIENT_XID(client) = t->xid;
372+
CLIENT_XPART(client) = t;
344373

345374
if (!clog_write(clg, t->xid, DOUBT)) {
346375
shout(
@@ -390,8 +419,8 @@ static bool queue_for_transaction_finish(client_t client, xid_t xid, char cmd) {
390419
// CLIENT_XID(client) and 'xid', i.e. we are able to tell which
391420
// transaction waits which transaction.
392421

422+
CLIENT_XWAIT(client) = t;
393423
transaction_push_listener(t, cmd, client);
394-
shout("%p REF: %d\n", client, client_ref(client));
395424
return true;
396425
}
397426

@@ -403,7 +432,7 @@ static void onvote(client_t client, int argc, xid_t *argv, int vote) {
403432
bool wait = argv[2];
404433

405434
CHECK(
406-
CLIENT_XID(client) == xid,
435+
CLIENT_XPART(client) && (CLIENT_XPART(client)->xid == xid),
407436
client,
408437
"VOTE: voting for a transaction not participated in"
409438
);
@@ -427,7 +456,7 @@ static void onvote(client_t client, int argc, xid_t *argv, int vote) {
427456
}
428457
assert(t->votes_for + t->votes_against <= t->size);
429458

430-
CLIENT_XID(client) = INVALID_XID; // not participating any more
459+
CLIENT_XPART(client) = NULL; // not participating any more
431460

432461
int s = transaction_status(t);
433462
switch (s) {
@@ -485,14 +514,14 @@ static void onsnapshot(client_t client, int argc, xid_t *argv) {
485514
return;
486515
}
487516

488-
if (CLIENT_XID(client) == INVALID_XID) {
517+
if (CLIENT_XPART(client) == NULL) {
489518
CLIENT_SNAPSENT(client) = 0;
490-
CLIENT_XID(client) = t->xid;
519+
CLIENT_XPART(client) = t;
491520
t->size += 1;
492521
}
493522

494523
CHECK(
495-
CLIENT_XID(client) == t->xid,
524+
CLIENT_XPART(client) && (CLIENT_XPART(client)->xid == xid),
496525
client,
497526
"SNAPSHOT: getting snapshot for a transaction not participated in"
498527
);
@@ -839,6 +868,7 @@ int main(int argc, char **argv) {
839868

840869
prev_gxid = MIN_XID;
841870
next_gxid = MIN_XID;
871+
last_gxid = INVALID_XID;
842872

843873
int raftsock = raft_create_udp_socket(&raft);
844874
if (raftsock == -1) {
@@ -856,8 +886,6 @@ int main(int argc, char **argv) {
856886
return EXIT_FAILURE;
857887
}
858888

859-
srand(getpid());
860-
861889
mstimer_t t;
862890
mstimer_reset(&t);
863891
while (true) {
@@ -874,11 +902,6 @@ int main(int argc, char **argv) {
874902
assert(m); // m should not be NULL, because the message should be ready to recv
875903
}
876904

877-
if (rand() % 10000 == 0) {
878-
shout("sleeping to test raft features\n");
879-
sleep(1);
880-
}
881-
882905
if (use_raft) {
883906
int applied = raft_apply(&raft, apply_clog_update);
884907
if (applied) {
@@ -890,6 +913,22 @@ int main(int argc, char **argv) {
890913
}
891914

892915
server_set_enabled(server, raft.role == ROLE_LEADER);
916+
917+
// Update the gxid limits based on current term and leadership.
918+
xid_t recent_last_gxid = raft.term * XIDS_PER_TERM;
919+
if (last_gxid < recent_last_gxid) {
920+
shout("updating last_gxid from %u to %u\n", last_gxid, recent_last_gxid);
921+
last_gxid = recent_last_gxid;
922+
threshold_gxid = last_gxid - NEW_TERM_THRESHOLD;
923+
if (raft.role == ROLE_FOLLOWER) {
924+
// If we become a leader, we will use
925+
// the range of xids after the current
926+
// last_gxid.
927+
prev_gxid = last_gxid;
928+
next_gxid = prev_gxid + 1;
929+
shout("updated range to %u-%u\n", prev_gxid, next_gxid);
930+
}
931+
}
893932
} else {
894933
server_set_enabled(server, true);
895934
}

0 commit comments

Comments
 (0)