Skip to content

Commit 58e3b4a

Browse files
committed
2 parents b9db6df + cff6628 commit 58e3b4a

File tree

9 files changed

+201
-110
lines changed

9 files changed

+201
-110
lines changed

contrib/pg_xtm/README

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,9 @@ bool DtmGlobalGetSnapshot(DTMConn dtm, NodeId nodeid, TransactionId xid, Snapsho
6161
bool DtmGlobalSetTransStatus(DTMConn dtm, NodeId nodeid, TransactionId xid, XidStatus status);
6262

6363
// Gets the status of the transaction identified by 'xid'. Returns the status
64-
// on success, or -1 otherwise.
65-
XidStatus DtmGlobalGetTransStatus(DTMConn dtm, NodeId nodeid, TransactionId xid);
64+
// on success, or -1 otherwise. If 'wait' is true, then it does not return
65+
// until the transaction is finished.
66+
XidStatus DtmGlobalGetTransStatus(DTMConn dtm, NodeId nodeid, TransactionId xid, bool wait);
6667

6768
--------------------
6869
Backend-DTM Protocol

contrib/pg_xtm/dtmd/include/eventwrap.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,11 @@
1010
int eventwrap(
1111
const char *host,
1212
int port,
13-
char *(*ondata)(void *client, size_t len, char *data),
14-
void (*onconnect)(void **client),
15-
void (*ondisconnect)(void *client)
13+
char *(*ondata)(void *stream, void *clientdata, size_t len, char *data),
14+
void (*onconnect)(void *stream, void **clientdata),
15+
void (*ondisconnect)(void *stream, void *clientdata)
1616
);
1717

18+
void write_to_stream(void *stream, char *data);
19+
1820
#endif

contrib/pg_xtm/dtmd/include/transaction.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,13 @@ typedef struct Transaction {
2323

2424
typedef struct GlobalTransaction {
2525
Transaction participants[MAX_NODES];
26+
void *listener;
2627
} GlobalTransaction;
2728

2829
int global_transaction_status(GlobalTransaction *gt);
2930
bool global_transaction_mark(clog_t clg, GlobalTransaction *gt, int status);
31+
void global_transaction_clear(GlobalTransaction *gt);
32+
void global_transaction_push_listener(GlobalTransaction *gt, void *listener);
33+
void *global_transaction_pop_listener(GlobalTransaction *gt);
3034

3135
#endif

contrib/pg_xtm/dtmd/src/eventwrap.c

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@
1515

1616
uv_loop_t *loop;
1717

18-
char *(*ondata_cb)(void *client, size_t len, char *data);
19-
void (*onconnect_cb)(void **client);
20-
void (*ondisconnect_cb)(void *client);
18+
char *(*ondata_cb)(void *stream, void *clientdata, size_t len, char *data);
19+
void (*onconnect_cb)(void *stream, void **clientdata);
20+
void (*ondisconnect_cb)(void *stream, void *clientdata);
2121

2222
static void on_alloc(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) {
2323
buf->len = suggested_size;
@@ -34,18 +34,19 @@ static void on_write(uv_write_t *req, int status) {
3434

3535
static void on_read(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) {
3636
if (nread == UV_EOF) {
37-
ondisconnect_cb(stream->data);
38-
uv_close((uv_handle_t*)stream, NULL);
37+
ondisconnect_cb(stream, stream->data);
38+
uv_close((uv_handle_t*)stream, NULL);
3939
return;
4040
}
4141

4242
if (nread < 0) {
43-
shout("read failed (error %d)\n", nread);
44-
uv_close((uv_handle_t*)stream, NULL);
43+
shout("read failed (error %zd)\n", nread);
44+
ondisconnect_cb(stream, stream->data);
45+
uv_close((uv_handle_t*)stream, NULL);
4546
return;
4647
}
4748

48-
char *response = ondata_cb(stream->data, nread, buf->base);
49+
char *response = ondata_cb(stream, stream->data, nread, buf->base);
4950
free(buf->base);
5051

5152
if (response) {
@@ -71,16 +72,16 @@ static void on_connect(uv_stream_t *server, int status) {
7172
return;
7273
}
7374
uv_tcp_nodelay(client, 1);
74-
onconnect_cb(&client->data);
75+
onconnect_cb(client, &client->data);
7576
uv_read_start((uv_stream_t*)client, on_alloc, on_read);
7677
}
7778

7879
int eventwrap(
7980
const char *host,
8081
int port,
81-
char *(*ondata)(void *client, size_t len, char *data),
82-
void (*onconnect)(void **client),
83-
void (*ondisconnect)(void *client)
82+
char *(*ondata)(void *stream, void *clientdata, size_t len, char *data),
83+
void (*onconnect)(void *stream, void **clientdata),
84+
void (*ondisconnect)(void *stream, void *clientdata)
8485
) {
8586
ondata_cb = ondata;
8687
onconnect_cb = onconnect;
@@ -109,3 +110,10 @@ int eventwrap(
109110

110111
return uv_run(loop, UV_RUN_DEFAULT);
111112
}
113+
114+
void write_to_stream(void *stream, char *data) {
115+
uv_write_t *wreq = malloc(sizeof(uv_write_t));
116+
uv_buf_t wbuf = uv_buf_init(data, strlen(data));
117+
uv_write(wreq, (uv_stream_t*)stream, &wbuf, 1, on_write);
118+
free(data);
119+
}

0 commit comments

Comments
 (0)