Skip to content

Commit cc66f46

Browse files
committed
Reimplement transaction list
1 parent f7cf59c commit cc66f46

File tree

3 files changed

+56
-37
lines changed

3 files changed

+56
-37
lines changed

contrib/pg_dtm/dtmd/include/limits.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
#ifndef LIMITS_H
22
#define LIMITS_H
33

4-
#define MAX_TRANSACTIONS_PER_CLIENT 1024
54
#define MAX_TRANSACTIONS 1024
65

76
#define BUFFER_SIZE (64 * 1024)

contrib/pg_dtm/dtmd/include/transaction.h

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,14 @@
1111

1212
#define CHAR_TO_INDEX(C) ((C) - 'a')
1313

14+
typedef struct L2List
15+
{
16+
struct L2List* next;
17+
struct L2List* prev;
18+
} L2List;
19+
1420
typedef struct Transaction {
21+
L2List elem;
1522
xid_t xid;
1623

1724
int size; // number of paritcipants
@@ -26,6 +33,22 @@ typedef struct Transaction {
2633
void *listeners[CHAR_TO_INDEX('z')]; // we are going to use 'a' to 'z' for indexing
2734
} Transaction;
2835

36+
static inline void l2_list_link(L2List* after, L2List* elem)
37+
{
38+
elem->next = after->next;
39+
elem->prev = after;
40+
after->next->prev = elem;
41+
after->next = elem;
42+
}
43+
44+
static inline void l2_list_unlink(L2List* elem)
45+
{
46+
elem->next->prev = elem->prev;
47+
elem->prev->next = elem->next;
48+
}
49+
50+
51+
2952
Snapshot *transaction_latest_snapshot(Transaction *t);
3053
Snapshot *transaction_snapshot(Transaction *t, int snapno);
3154
Snapshot *transaction_next_snapshot(Transaction *t);

contrib/pg_dtm/dtmd/src/main.c

Lines changed: 33 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818
#define DEFAULT_LISTENHOST "0.0.0.0"
1919
#define DEFAULT_LISTENPORT 5431
2020

21-
Transaction transactions[MAX_TRANSACTIONS];
22-
int transactions_count;
21+
L2List active_transactions = {&active_transactions, &active_transactions};
22+
L2List* free_transactions;
2323

2424
// We reserve the local xids if they fit between (prev, next) range, and
2525
// reserve something in (next, x) range otherwise, moving 'next' after 'x'.
@@ -50,6 +50,14 @@ static void free_client_userdata(client_userdata_t *cd) {
5050
free(cd);
5151
}
5252

53+
inline static void free_transaction(Transaction* t)
54+
{
55+
l2_list_unlink(&t->elem);
56+
t->elem.next = free_transactions;
57+
free_transactions = &t->elem;
58+
}
59+
60+
5361
static int next_client_id = 0;
5462
static void onconnect(client_t client) {
5563
client_userdata_t *cd = create_client_userdata(next_client_id++);
@@ -103,18 +111,15 @@ static void ondisconnect(client_t client) {
103111
debug("[%d] disconnected\n", CLIENT_ID(client));
104112

105113
if (CLIENT_XID(client) != INVALID_XID) {
106-
int i;
114+
Transaction* t;
107115

108116
// need to abort the transaction this client is participating in
109-
for (i = transactions_count - 1; i >= 0; i--) {
110-
Transaction *t = transactions + i;
111-
117+
for (t = (Transaction*)active_transactions.next; t != (Transaction*)&active_transactions; t = (Transaction*)t->elem.next)
118+
{
112119
if (t->xid == CLIENT_XID(client)) {
113120
if (clog_write(clg, t->xid, NEGATIVE)) {
114121
notify_listeners(t, NEGATIVE);
115-
116-
*t = transactions[transactions_count - 1];
117-
transactions_count--;
122+
free_transaction(t);
118123
} else {
119124
shout(
120125
"[%d] DISCONNECT: transaction %u"
@@ -126,7 +131,7 @@ static void ondisconnect(client_t client) {
126131
}
127132
}
128133

129-
if (i < 0) {
134+
if (t == (Transaction*)&active_transactions) {
130135
shout(
131136
"[%d] DISCONNECT: transaction %u not found O_o\n",
132137
CLIENT_ID(client), CLIENT_XID(client)
@@ -176,13 +181,13 @@ static xid_t max(xid_t a, xid_t b) {
176181
}
177182

178183
static void gen_snapshot(Snapshot *s) {
184+
Transaction* t;
179185
s->times_sent = 0;
180186
s->nactive = 0;
181187
s->xmin = MAX_XID;
182188
s->xmax = MIN_XID;
183-
int i;
184-
for (i = 0; i < transactions_count; i++) {
185-
Transaction *t = transactions + i;
189+
for (t = (Transaction*)active_transactions.next; t != (Transaction*)&active_transactions; t = (Transaction*)t->elem.next)
190+
{
186191
if (t->xid < s->xmin) {
187192
s->xmin = t->xid;
188193
}
@@ -241,11 +246,10 @@ static void onreserve(client_t client, int argc, xid_t *argv) {
241246
}
242247

243248
static xid_t get_global_xmin() {
244-
int i, j;
249+
int j;
245250
xid_t xmin = next_gxid;
246251
Transaction *t;
247-
for (i = 0; i < transactions_count; i++) {
248-
t = transactions + i;
252+
for (t = (Transaction*)active_transactions.next; t != (Transaction*)&active_transactions; t = (Transaction*)t->elem.next) {
249253
j = t->snapshots_count > MAX_SNAPSHOTS_PER_TRANS ? MAX_SNAPSHOTS_PER_TRANS : t->snapshots_count;
250254
while (--j >= 0) {
251255
Snapshot* s = transaction_snapshot(t, j);
@@ -259,12 +263,7 @@ static xid_t get_global_xmin() {
259263
}
260264

261265
static void onbegin(client_t client, int argc, xid_t *argv) {
262-
CHECK(
263-
transactions_count < MAX_TRANSACTIONS,
264-
client,
265-
"BEGIN: transaction limit hit"
266-
);
267-
266+
Transaction *t;
268267
CHECK(
269268
argc == 1,
270269
client,
@@ -277,8 +276,14 @@ static void onbegin(client_t client, int argc, xid_t *argv) {
277276
"BEGIN: already participating in another transaction"
278277
);
279278

280-
Transaction *t = transactions + transactions_count;
281-
transaction_clear(t);
279+
t = (Transaction*)free_transactions;
280+
if (t == NULL) {
281+
t = (Transaction*)malloc(sizeof(Transaction));
282+
} else {
283+
free_transactions = t->elem.next;
284+
}
285+
286+
transaction_clear(t);
282287

283288
prev_gxid = t->xid = next_gxid++;
284289
t->snapshots_count = 0;
@@ -299,8 +304,6 @@ static void onbegin(client_t client, int argc, xid_t *argv) {
299304

300305
xid_t gxmin = get_global_xmin();
301306

302-
transactions_count++;
303-
304307
gen_snapshot(transaction_next_snapshot(t));
305308
// will wrap around if exceeded max snapshots
306309
Snapshot *snap = transaction_latest_snapshot(t);
@@ -318,10 +321,9 @@ static void onbegin(client_t client, int argc, xid_t *argv) {
318321
}
319322

320323
static Transaction *find_transaction(xid_t xid) {
321-
int i;
322324
Transaction *t;
323-
for (i = 0; i < transactions_count; i++) {
324-
t = transactions + i;
325+
326+
for (t = (Transaction*)active_transactions.next; t != (Transaction*)&active_transactions; t = (Transaction*)t->elem.next) {
325327
if (t->xid == xid) {
326328
return t;
327329
}
@@ -393,9 +395,7 @@ static void onvote(client_t client, int argc, xid_t *argv, int vote) {
393395
);
394396

395397
notify_listeners(t, NEGATIVE);
396-
397-
*t = transactions[transactions_count - 1];
398-
transactions_count--;
398+
free_transaction(t);
399399
client_message_shortcut(client, RES_TRANSACTION_ABORTED);
400400
return;
401401
case DOUBT:
@@ -418,9 +418,7 @@ static void onvote(client_t client, int argc, xid_t *argv, int vote) {
418418
);
419419

420420
notify_listeners(t, POSITIVE);
421-
422-
*t = transactions[transactions_count - 1];
423-
transactions_count--;
421+
free_transaction(t);
424422
client_message_shortcut(client, RES_TRANSACTION_COMMITTED);
425423
return;
426424
}
@@ -734,7 +732,6 @@ int main(int argc, char **argv) {
734732

735733
prev_gxid = MIN_XID;
736734
next_gxid = MIN_XID;
737-
transactions_count = 0;
738735

739736
server_t server = server_init(
740737
listenhost, listenport,

0 commit comments

Comments
 (0)