Skip to content

Commit c4ab7da

Browse files
committed
Avoid needless large memcpys in libpq socket writing
Until now, when calling pq_putmessage to write new data to a libpq socket, all writes are copied into a buffer and that buffer gets flushed when full to avoid having to perform small writes to the socket. There are cases where we must write large amounts of data to the socket, sometimes larger than the size of the buffer. In this case, it's wasteful to memcpy this data into the buffer and flush it out, instead, we can send it directly from the memory location that the data is already stored in. Here we adjust internal_putbytes() so that after having just flushed the buffer to the socket, if the remaining bytes to send is as big or bigger than the buffer size, we just send directly rather than needlessly copying into the PqSendBuffer buffer first. Examples of operations that write large amounts of data in one message are; outputting large tuples with SELECT or COPY TO STDOUT and pg_basebackup. Author: Melih Mutlu Reviewed-by: Heikki Linnakangas Reviewed-by: Jelte Fennema-Nio Reviewed-by: David Rowley Reviewed-by: Ranier Vilela Reviewed-by: Andres Freund Discussion: https://postgr.es/m/CAGPVpCR15nosj0f6xe-c2h477zFR88q12e6WjEoEZc8ZYkTh3Q@mail.gmail.com
1 parent a97bbe1 commit c4ab7da

File tree

1 file changed

+53
-21
lines changed

1 file changed

+53
-21
lines changed

src/backend/libpq/pqcomm.c

Lines changed: 53 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -120,8 +120,8 @@ static List *sock_paths = NIL;
120120

121121
static char *PqSendBuffer;
122122
static int PqSendBufferSize; /* Size send buffer */
123-
static int PqSendPointer; /* Next index to store a byte in PqSendBuffer */
124-
static int PqSendStart; /* Next index to send a byte in PqSendBuffer */
123+
static size_t PqSendPointer; /* Next index to store a byte in PqSendBuffer */
124+
static size_t PqSendStart; /* Next index to send a byte in PqSendBuffer */
125125

126126
static char PqRecvBuffer[PQ_RECV_BUFFER_SIZE];
127127
static int PqRecvPointer; /* Next index to read a byte from PqRecvBuffer */
@@ -143,8 +143,10 @@ static int socket_flush_if_writable(void);
143143
static bool socket_is_send_pending(void);
144144
static int socket_putmessage(char msgtype, const char *s, size_t len);
145145
static void socket_putmessage_noblock(char msgtype, const char *s, size_t len);
146-
static int internal_putbytes(const char *s, size_t len);
147-
static int internal_flush(void);
146+
static inline int internal_putbytes(const char *s, size_t len);
147+
static inline int internal_flush(void);
148+
static pg_noinline int internal_flush_buffer(const char *buf, size_t *start,
149+
size_t *end);
148150

149151
static int Lock_AF_UNIX(const char *unixSocketDir, const char *unixSocketPath);
150152
static int Setup_AF_UNIX(const char *sock_path);
@@ -1268,11 +1270,9 @@ pq_getmessage(StringInfo s, int maxlen)
12681270
}
12691271

12701272

1271-
static int
1273+
static inline int
12721274
internal_putbytes(const char *s, size_t len)
12731275
{
1274-
size_t amount;
1275-
12761276
while (len > 0)
12771277
{
12781278
/* If buffer is full, then flush it out */
@@ -1282,14 +1282,33 @@ internal_putbytes(const char *s, size_t len)
12821282
if (internal_flush())
12831283
return EOF;
12841284
}
1285-
amount = PqSendBufferSize - PqSendPointer;
1286-
if (amount > len)
1287-
amount = len;
1288-
memcpy(PqSendBuffer + PqSendPointer, s, amount);
1289-
PqSendPointer += amount;
1290-
s += amount;
1291-
len -= amount;
1285+
1286+
/*
1287+
* If the buffer is empty and data length is larger than the buffer
1288+
* size, send it without buffering. Otherwise, copy as much data as
1289+
* possible into the buffer.
1290+
*/
1291+
if (len >= PqSendBufferSize && PqSendStart == PqSendPointer)
1292+
{
1293+
size_t start = 0;
1294+
1295+
socket_set_nonblocking(false);
1296+
if (internal_flush_buffer(s, &start, &len))
1297+
return EOF;
1298+
}
1299+
else
1300+
{
1301+
size_t amount = PqSendBufferSize - PqSendPointer;
1302+
1303+
if (amount > len)
1304+
amount = len;
1305+
memcpy(PqSendBuffer + PqSendPointer, s, amount);
1306+
PqSendPointer += amount;
1307+
s += amount;
1308+
len -= amount;
1309+
}
12921310
}
1311+
12931312
return 0;
12941313
}
12951314

@@ -1321,19 +1340,32 @@ socket_flush(void)
13211340
* and the socket is in non-blocking mode), or EOF if trouble.
13221341
* --------------------------------
13231342
*/
1324-
static int
1343+
static inline int
13251344
internal_flush(void)
1345+
{
1346+
return internal_flush_buffer(PqSendBuffer, &PqSendStart, &PqSendPointer);
1347+
}
1348+
1349+
/* --------------------------------
1350+
* internal_flush_buffer - flush the given buffer content
1351+
*
1352+
* Returns 0 if OK (meaning everything was sent, or operation would block
1353+
* and the socket is in non-blocking mode), or EOF if trouble.
1354+
* --------------------------------
1355+
*/
1356+
static pg_noinline int
1357+
internal_flush_buffer(const char *buf, size_t *start, size_t *end)
13261358
{
13271359
static int last_reported_send_errno = 0;
13281360

1329-
char *bufptr = PqSendBuffer + PqSendStart;
1330-
char *bufend = PqSendBuffer + PqSendPointer;
1361+
const char *bufptr = buf + *start;
1362+
const char *bufend = buf + *end;
13311363

13321364
while (bufptr < bufend)
13331365
{
13341366
int r;
13351367

1336-
r = secure_write(MyProcPort, bufptr, bufend - bufptr);
1368+
r = secure_write(MyProcPort, (char *) bufptr, bufend - bufptr);
13371369

13381370
if (r <= 0)
13391371
{
@@ -1373,18 +1405,18 @@ internal_flush(void)
13731405
* flag that'll cause the next CHECK_FOR_INTERRUPTS to terminate
13741406
* the connection.
13751407
*/
1376-
PqSendStart = PqSendPointer = 0;
1408+
*start = *end = 0;
13771409
ClientConnectionLost = 1;
13781410
InterruptPending = 1;
13791411
return EOF;
13801412
}
13811413

13821414
last_reported_send_errno = 0; /* reset after any successful send */
13831415
bufptr += r;
1384-
PqSendStart += r;
1416+
*start += r;
13851417
}
13861418

1387-
PqSendStart = PqSendPointer = 0;
1419+
*start = *end = 0;
13881420
return 0;
13891421
}
13901422

0 commit comments

Comments
 (0)