Skip to content

Commit 1157be8

Browse files
committed
Add client -> stream pointer update while moving streams in memory in arbiter code.
1 parent b34d59e commit 1157be8

File tree

5 files changed

+48
-7
lines changed

5 files changed

+48
-7
lines changed

contrib/pg_dtm/dtmd/src/main.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ static void debug_cmd(client_t client, int argc, xid_t *argv) {
165165
#define CHECK(COND, CLIENT, MSG) \
166166
do { \
167167
if (!(COND)) { \
168-
shout("[%d] %s, returning '-'\n", CLIENT_ID(CLIENT), MSG); \
168+
shout("[%d] %s, returning RES_FAILED\n", CLIENT_ID(CLIENT), MSG); \
169169
client_message_shortcut(CLIENT, RES_FAILED); \
170170
return; \
171171
} \

contrib/pg_dtm/dtmd/src/server.c

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,16 @@ static void server_stream_destroy(server_t server, stream_t stream) {
214214
free(stream->output.data);
215215
}
216216

217+
static void stream_move(stream_t dst, stream_t src) {
218+
int i;
219+
*dst = *src;
220+
for (i = 0; i < MAX_TRANSACTIONS; i++) {
221+
if (dst->clients[i].stream) {
222+
dst->clients[i].stream = dst;
223+
}
224+
}
225+
}
226+
217227
static void server_close_bad_streams(server_t server) {
218228
int i;
219229
for (i = server->streamsnum - 1; i >= 0; i--) {
@@ -223,6 +233,7 @@ static void server_close_bad_streams(server_t server) {
223233
if (i != server->streamsnum - 1) {
224234
// move the last one here
225235
*stream = server->streams[server->streamsnum - 1];
236+
stream_move(stream, server->streams + server->streamsnum - 1);
226237
}
227238
server->streamsnum--;
228239
}
@@ -372,6 +383,15 @@ static client_t stream_get_client(stream_t stream, unsigned int chan, bool *isne
372383
return client;
373384
}
374385

386+
static void hexdump(int len, char *data) {
387+
fprintf(stderr, "hex:");
388+
int i;
389+
for (i = 0; i < len; i++) {
390+
fprintf(stderr, " %02x", data[i]);
391+
}
392+
fprintf(stderr, "\n");
393+
}
394+
375395
static bool server_stream_handle(server_t server, stream_t stream) {
376396
debug("a stream ready to recv\n");
377397

@@ -466,8 +486,8 @@ void server_loop(server_t server) {
466486
}
467487
}
468488

469-
server_flush(server);
470489
server_close_bad_streams(server);
490+
server_flush(server);
471491
}
472492
}
473493

contrib/pg_dtm/libdtm.c

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,10 @@ static int dtm_recv_results(DTMConn dtm, int maxlen, xid_t *results) {
131131
elog(ERROR, "Failed to recv results header from arbiter");
132132
return 0;
133133
}
134+
if (newbytes == 0) {
135+
elog(ERROR, "Arbiter closed connection during recv");
136+
return 0;
137+
}
134138
recved += newbytes;
135139
}
136140

@@ -147,6 +151,10 @@ static int dtm_recv_results(DTMConn dtm, int maxlen, xid_t *results) {
147151
elog(ERROR, "Failed to recv results body from arbiter");
148152
return 0;
149153
}
154+
if (newbytes == 0) {
155+
elog(ERROR, "Arbiter closed connection during recv");
156+
return 0;
157+
}
150158
recved += newbytes;
151159
}
152160
return needed / sizeof(xid_t);
@@ -156,6 +164,7 @@ static bool dtm_send_command(DTMConn dtm, xid_t cmd, int argc, ...)
156164
{
157165
va_list argv;
158166
int i;
167+
int sent;
159168
char buf[COMMAND_BUFFER_SIZE];
160169
int datasize;
161170
char *cursor = buf;
@@ -181,7 +190,16 @@ static bool dtm_send_command(DTMConn dtm, xid_t cmd, int argc, ...)
181190
assert(msg->size + sizeof(ShubMessageHdr) == datasize);
182191
assert(datasize <= COMMAND_BUFFER_SIZE);
183192

184-
return write(dtm->sock, buf, datasize) == datasize;
193+
sent = 0;
194+
while (sent < datasize) {
195+
int newbytes = write(dtm->sock, buf + sent, datasize - sent);
196+
if (newbytes == -1) {
197+
elog(ERROR, "Failed to send a command to arbiter");
198+
return false;
199+
}
200+
sent += newbytes;
201+
}
202+
return true;
185203
}
186204

187205
void DtmGlobalConfig(char *host, int port, char* sock_dir) {
@@ -395,7 +413,7 @@ XidStatus DtmGlobalGetTransStatus(TransactionId xid, bool wait)
395413
case RES_TRANSACTION_INPROGRESS:
396414
return TRANSACTION_STATUS_IN_PROGRESS;
397415
case RES_TRANSACTION_UNKNOWN:
398-
return TRANSACTION_STATUS_IN_PROGRESS;
416+
return TRANSACTION_STATUS_UNKNOWN;
399417
default:
400418
goto failure;
401419
}

contrib/pg_dtm/tests/transfers.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ func prepare_one(connstr string, wg *sync.WaitGroup) {
207207
}
208208
exec(conn, "drop table if exists t")
209209
exec(conn, "create table t(u int primary key, v int)")
210-
exec(conn, "insert into t (select generate_series(0,1000000), $1)", cfg.Accounts.Balance)
210+
exec(conn, "insert into t (select generate_series(0,$1-1), $2)", cfg.Accounts.Num, cfg.Accounts.Balance)
211211
/*
212212
exec(conn, "begin transaction isolation level " + cfg.Isolation)
213213

contrib/pg_dtm/tests/transfers.sh

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
#!/bin/sh
2+
23
go run transfers.go \
34
-d 'dbname=postgres port=5432' \
45
-d 'dbname=postgres port=5433' \
5-
-m \
6-
-w 128 \
6+
-v \
7+
-m \
8+
-u 1000 \
9+
-w 10 \
710
-g

0 commit comments

Comments
 (0)