Skip to content

Commit 3060cc4

Browse files
committed
Send disconnect message to the server
1 parent 6046342 commit 3060cc4

File tree

4 files changed

+32
-3
lines changed

4 files changed

+32
-3
lines changed

contrib/pg_dtm/dtmd/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
CC=gcc
2-
CFLAGS=-g -Wall -Iinclude -D_LARGEFILE64_SOURCE # -DDEBUG
2+
CFLAGS=-g -O2 -Wall -Iinclude -D_LARGEFILE64_SOURCE # -DDEBUG
33
LIBUV_PREFIX=$(HOME)/libuv-build
44
LIBUV_CFLAGS=-I"$(LIBUV_PREFIX)/include" -L"$(LIBUV_PREFIX)/lib"
55
LIBUV_LDFLAGS=-luv -pthread -lrt

contrib/pg_dtm/sockhub/sockhub.c

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
#include <unistd.h>
1515
#include <string.h>
1616
#include <errno.h>
17+
#include <assert.h>
1718

1819
#include "sockhub.h"
1920

@@ -151,6 +152,17 @@ static void reconnect(Shub* shub)
151152
}
152153
}
153154

155+
static void notify_disconnect(Shub* shub, int chan)
156+
{
157+
ShubMessageHdr* hdr;
158+
assert(shub->in_buffer_used + sizeof(ShubMessageHdr) < shub->params->buffer_size);
159+
hdr = (ShubMessageHdr*)&shub->in_buffer[shub->in_buffer_used];
160+
hdr->size = 0;
161+
hdr->chan = chan;
162+
hdr->code = MSG_DISCONNECT;
163+
shub->in_buffer_used += sizeof(ShubMessageHdr);
164+
}
165+
154166
static void recovery(Shub* shub)
155167
{
156168
int i, max_fd;
@@ -162,6 +174,9 @@ static void recovery(Shub* shub)
162174
FD_ZERO(&tryset);
163175
FD_SET(i, &tryset);
164176
if (select(i+1, &tryset, NULL, NULL, &tm) < 0) {
177+
if (i != shub->input && i != shub->output) {
178+
notify_disconnect(shub, i);
179+
}
165180
close_socket(shub, i);
166181
}
167182
}
@@ -259,6 +274,7 @@ void ShubLoop(Shub* shub)
259274
if (!write_socket(chan, (char*)hdr, n)) {
260275
shub->params->error_handler("Failed to write to local socket", SHUB_RECOVERABLE_ERROR);
261276
close_socket(shub, chan);
277+
notify_disconnect(shub, chan);
262278
chan = -1;
263279
}
264280
if (n != hdr->size + sizeof(ShubMessageHdr)) {
@@ -274,6 +290,7 @@ void ShubLoop(Shub* shub)
274290
if (chan >= 0 && !write_socket(chan, shub->out_buffer, n)) {
275291
shub->params->error_handler("Failed to write to local socket", SHUB_RECOVERABLE_ERROR);
276292
close_socket(shub, chan);
293+
notify_disconnect(shub, chan);
277294
chan = -1;
278295
}
279296
tail -= n;
@@ -295,6 +312,7 @@ void ShubLoop(Shub* shub)
295312
if (available < sizeof(ShubMessageHdr)) {
296313
shub->params->error_handler("Failed to read local socket", SHUB_RECOVERABLE_ERROR);
297314
close_socket(shub, i);
315+
notify_disconnect(shub, i);
298316
} else {
299317
int pos = 0;
300318
/* loop through all fetched messages */
@@ -333,6 +351,7 @@ void ShubLoop(Shub* shub)
333351
if (hdr != NULL) { /* if message header is not yet sent to the server... */
334352
/* ... then skip this message */
335353
shub->in_buffer_used = (char*)hdr - shub->in_buffer;
354+
notify_disconnect(shub, chan);
336355
break;
337356
} else { /* if message was partly sent to the server, we can not skip it, so we have to send garbage to the server */
338357
chan = -1; /* do not try to read rest of body of this message */
@@ -351,6 +370,10 @@ void ShubLoop(Shub* shub)
351370
shub->in_buffer_used = 0;
352371
}
353372
} while (size != 0); /* repeat until all message body is received */
373+
374+
if (chan < 0) {
375+
notify_disconnect(shub, i);
376+
}
354377

355378
pos = available;
356379
break;

contrib/pg_dtm/sockhub/sockhub.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,12 @@ typedef struct {
99
unsigned int chan; /* local socket: set by SockHUB */
1010
} ShubMessageHdr;
1111

12+
enum ShubMessageCodes
13+
{
14+
MSG_DISCONNECT,
15+
MSG_FIRST_USER_CODE /* all codes >= 1 are user defined */
16+
};
17+
1218
typedef enum
1319
{
1420
SHUB_FATAL_ERROR,

contrib/pg_dtm/tests/pg_shard_transfers.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,9 @@ import (
1111
)
1212

1313
const (
14-
TRANSFER_CONNECTIONS = 50
14+
TRANSFER_CONNECTIONS = 10
1515
INIT_AMOUNT = 10000
16-
N_ITERATIONS = 2000
16+
N_ITERATIONS = 10000
1717
N_ACCOUNTS = 2*100000
1818
)
1919

0 commit comments

Comments
 (0)