Skip to content

Commit bc7a35e

Browse files
committed
Merge branch 'xtm' of gitlab.postgrespro.ru:pgpro-dev/postgrespro into xtm
2 parents 89f3fdc + d6559e2 commit bc7a35e

File tree

4 files changed

+117
-71
lines changed

4 files changed

+117
-71
lines changed

contrib/pg_dtm/sockhub/sockhub.c

Lines changed: 114 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -69,19 +69,24 @@ static void close_socket(Shub* shub, int fd)
6969
FD_CLR(fd, &shub->inset);
7070
}
7171

72-
static int read_socket(int sd, char* buf, int size)
72+
static int read_socket_ex(int sd, char* buf, int min_size, int max_size)
7373
{
74-
while (size != 0) {
75-
int n = recv(sd, buf, size, 0);
74+
int received = 0;
75+
while (received < min_size) {
76+
int n = recv(sd, buf + received, max_size - received, 0);
7677
if (n <= 0) {
77-
return 0;
78+
break;
7879
}
79-
size -= n;
80-
buf += n;
80+
received += n;
8181
}
82-
return 1;
82+
return received;
8383
}
8484

85+
static int read_socket(int sd, char* buf, int size)
86+
{
87+
return read_socket_ex(sd, buf, size, size) == size;
88+
}
89+
8590
static int write_socket(int sd, char const* buf, int size)
8691
{
8792
while (size != 0) {
@@ -235,9 +240,9 @@ void ShubLoop(Shub* shub)
235240
}
236241
} else if (i == shub->output) { /* receive response from server */
237242
/* try to read as much as possible */
238-
int available = recv(shub->output, shub->out_buffer + shub->out_buffer_used, buffer_size - shub->out_buffer_used, 0);
243+
int available = read_socket_ex(shub->output, shub->out_buffer + shub->out_buffer_used, sizeof(ShubMessageHdr), buffer_size - shub->out_buffer_used);
239244
int pos = 0;
240-
if (available <= 0) {
245+
if (available < sizeof(ShubMessageHdr)) {
241246
shub->params->error_handler("Failed to read inet socket", SHUB_RECOVERABLE_ERROR);
242247
reconnect(shub);
243248
continue;
@@ -279,74 +284,116 @@ void ShubLoop(Shub* shub)
279284
}
280285
pos += n;
281286
}
282-
/* Move partly fetched message header (if any) to the beginning of byffer */
287+
/* Move partly fetched message header (if any) to the beginning of buffer */
283288
memcpy(shub->out_buffer, shub->out_buffer + pos, shub->out_buffer_used - pos);
284289
shub->out_buffer_used -= pos;
285290
} else { /* receive request from client */
286-
ShubMessageHdr* hdr = (ShubMessageHdr*)&shub->in_buffer[shub->in_buffer_used];
287291
int chan = i;
288-
if (!read_socket(chan, (char*)hdr, sizeof(ShubMessageHdr))) { /* fetch message header */
289-
shub->params->error_handler("Failed to read local socket", SHUB_RECOVERABLE_ERROR);
290-
close_socket(shub, i);
291-
} else {
292-
unsigned int size = hdr->size;
293-
hdr->chan = chan; /* remember socket descriptor from which this message was read */
294-
if (size + shub->in_buffer_used + sizeof(ShubMessageHdr) > buffer_size) {
295-
/* message doesn't completely fit in buffer */
296-
if (shub->in_buffer_used != 0) { /* if buffer is not empty...*/
297-
/* ... then send it */
298-
while (!write_socket(shub->output, shub->in_buffer, shub->in_buffer_used)) {
299-
shub->params->error_handler("Failed to write to inet socket", SHUB_RECOVERABLE_ERROR);
300-
reconnect(shub);
301-
}
302-
/* move received message header to the beginning of the buffer */
303-
memcpy(shub->in_buffer, shub->in_buffer + shub->in_buffer_used, sizeof(ShubMessageHdr));
304-
shub->in_buffer_used = 0;
305-
}
306-
}
307-
shub->in_buffer_used += sizeof(ShubMessageHdr);
308-
309-
do {
310-
unsigned int n = size + shub->in_buffer_used > buffer_size ? buffer_size - shub->in_buffer_used : size;
311-
/* fetch message body */
312-
if (chan >= 0 && !read_socket(chan, shub->in_buffer + shub->in_buffer_used, n)) {
313-
shub->params->error_handler("Failed to read local socket", SHUB_RECOVERABLE_ERROR);
314-
close_socket(shub, chan);
315-
if (hdr != NULL) { /* if message header is not yet sent to the server... */
316-
/* ... then skip this message */
317-
shub->in_buffer_used = (char*)hdr - shub->in_buffer;
318-
break;
319-
} else { /* if message was partly sent to the server, we can not skip it, so we have to send garbage to the server */
320-
chan = -1; /* do not try to read rest of body of this message */
292+
int available = 0;
293+
while (1) {
294+
available += read_socket_ex(chan, &shub->in_buffer[shub->in_buffer_used + available], sizeof(ShubMessageHdr) - available, buffer_size - shub->in_buffer_used - available);
295+
if (available < sizeof(ShubMessageHdr)) {
296+
shub->params->error_handler("Failed to read local socket", SHUB_RECOVERABLE_ERROR);
297+
close_socket(shub, i);
298+
} else {
299+
int pos = 0;
300+
/* loop through all fetched messages */
301+
while (pos + sizeof(ShubMessageHdr) <= available) {
302+
ShubMessageHdr* hdr = (ShubMessageHdr*)&shub->in_buffer[shub->in_buffer_used];
303+
unsigned int size = hdr->size;
304+
pos += sizeof(ShubMessageHdr) + size;
305+
hdr->chan = chan; /* remember socket descriptor from which this message was read */
306+
if (pos <= available) {
307+
shub->in_buffer_used += sizeof(ShubMessageHdr) + size;
308+
continue;
321309
}
322-
}
323-
shub->in_buffer_used += n;
324-
size -= n;
325-
/* if there is no more free space in the buffer to receive new message header... */
326-
if (shub->in_buffer_used + sizeof(ShubMessageHdr) > buffer_size) {
327310

328-
/* ... then send buffer to the server */
329-
while (!write_socket(shub->output, shub->in_buffer, shub->in_buffer_used)) {
330-
shub->params->error_handler("Failed to write to inet socket", SHUB_RECOVERABLE_ERROR);
331-
reconnect(shub);
332-
}
333-
hdr = NULL; /* message is partly sent to the server: can not skip it any more */
334-
shub->in_buffer_used = 0;
311+
if (shub->in_buffer_used + sizeof(ShubMessageHdr) + size > buffer_size) {
312+
/* message doesn't completely fit in buffer */
313+
if (shub->in_buffer_used != 0) { /* if buffer is not empty...*/
314+
/* ... then send it */
315+
while (!write_socket(shub->output, shub->in_buffer, shub->in_buffer_used)) {
316+
shub->params->error_handler("Failed to write to inet socket", SHUB_RECOVERABLE_ERROR);
317+
reconnect(shub);
318+
}
319+
/* move received message header to the beginning of the buffer */
320+
memcpy(shub->in_buffer, shub->in_buffer + shub->in_buffer_used, buffer_size - shub->in_buffer_used);
321+
shub->in_buffer_used = 0;
322+
}
323+
}
324+
shub->in_buffer_used += sizeof(ShubMessageHdr) + size - (pos - available);
325+
size = pos - available;
326+
327+
do {
328+
unsigned int n = size + shub->in_buffer_used > buffer_size ? buffer_size - shub->in_buffer_used : size;
329+
/* fetch rest of message body */
330+
if (chan >= 0 && !read_socket(chan, shub->in_buffer + shub->in_buffer_used, n)) {
331+
shub->params->error_handler("Failed to read local socket", SHUB_RECOVERABLE_ERROR);
332+
close_socket(shub, chan);
333+
if (hdr != NULL) { /* if message header is not yet sent to the server... */
334+
/* ... then skip this message */
335+
shub->in_buffer_used = (char*)hdr - shub->in_buffer;
336+
break;
337+
} else { /* if message was partly sent to the server, we can not skip it, so we have to send garbage to the server */
338+
chan = -1; /* do not try to read rest of body of this message */
339+
}
340+
}
341+
shub->in_buffer_used += n;
342+
size -= n;
343+
/* if there is no more free space in the buffer to receive new message header... */
344+
if (shub->in_buffer_used + sizeof(ShubMessageHdr) > buffer_size) {
345+
/* ... then send buffer to the server */
346+
while (!write_socket(shub->output, shub->in_buffer, shub->in_buffer_used)) {
347+
shub->params->error_handler("Failed to write to inet socket", SHUB_RECOVERABLE_ERROR);
348+
reconnect(shub);
349+
}
350+
hdr = NULL; /* message is partly sent to the server: can not skip it any more */
351+
shub->in_buffer_used = 0;
352+
}
353+
} while (size != 0); /* repeat until all message body is received */
354+
355+
pos = available;
356+
break;
335357
}
336-
} while (size != 0); /* repeat until all message body is received */
337-
}
358+
if (chan >= 0 && pos != available) { /* partly fetched message header */
359+
if (shub->in_buffer_used + sizeof(ShubMessageHdr) > buffer_size) {
360+
/* message doesn't completely fit in buffer */
361+
while (!write_socket(shub->output, shub->in_buffer, shub->in_buffer_used)) {
362+
shub->params->error_handler("Failed to write to inet socket", SHUB_RECOVERABLE_ERROR);
363+
reconnect(shub);
364+
}
365+
/* move received message header to the beginning of the buffer */
366+
memcpy(shub->in_buffer, shub->in_buffer + shub->in_buffer_used, available - pos);
367+
shub->in_buffer_used = 0;
368+
}
369+
available -= pos;
370+
continue;
371+
}
372+
}
373+
break;
374+
}
338375
}
339376
}
340377
}
341-
} else { /* timeout expired */
342-
if (shub->in_buffer_used != 0) { /* if buffer is not empty... */
343-
/* ...then send it */
344-
while (!write_socket(shub->output, shub->in_buffer, shub->in_buffer_used)) {
345-
shub->params->error_handler("Failed to write to inet socket", SHUB_RECOVERABLE_ERROR);
346-
reconnect(shub);
347-
}
348-
shub->in_buffer_used = 0;
349-
}
378+
if (shub->params->delay != 0) {
379+
continue;
380+
}
381+
}
382+
if (shub->in_buffer_used != 0) { /* if buffer is not empty... */
383+
/* ...then send it */
384+
#if SHOW_SENT_STATISTIC
385+
static size_t total_sent;
386+
static size_t total_count;
387+
total_sent += shub->in_buffer_used;
388+
if (++total_count % 1024 == 0) {
389+
printf("Average sent buffer size: %ld\n", total_sent/total_count);
390+
}
391+
#endif
392+
while (!write_socket(shub->output, shub->in_buffer, shub->in_buffer_used)) {
393+
shub->params->error_handler("Failed to write to inet socket", SHUB_RECOVERABLE_ERROR);
394+
reconnect(shub);
395+
}
396+
shub->in_buffer_used = 0;
350397
}
351398
}
352399
}

contrib/pg_dtm/sockhub/start-clients.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
n_clients=10
1+
n_clients=100
22
n_iters=100000
33
./sockhub -h $1 -p 5001 -f /tmp/p5002 &
44
for ((i=0;i<n_clients;i++))

contrib/pg_dtm/tests/pg_shard_transfers.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,9 @@ import (
1111
)
1212

1313
const (
14-
TRANSFER_CONNECTIONS = 8
14+
TRANSFER_CONNECTIONS = 50
1515
INIT_AMOUNT = 10000
16-
N_ITERATIONS = 10000
16+
N_ITERATIONS = 2000
1717
N_ACCOUNTS = 2*100000
1818
)
1919

contrib/pg_dtm/tests/transfers.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,6 @@ func prepare_one(connstr string, wg *sync.WaitGroup) {
208208
exec(conn, "drop table if exists t")
209209
exec(conn, "create table t(u int primary key, v int)")
210210

211-
exec(conn, "begin transaction isolation level " + cfg.Isolation)
212211
exec(conn, "begin transaction isolation level " + cfg.Isolation)
213212

214213
start := time.Now()

0 commit comments

Comments
 (0)