Skip to content

Commit f1cd506

Browse files
committed
Fix memory leaks at stream.c and dmq.c. Now pargres do huge benchmarks
1 parent 5b85db5 commit f1cd506

File tree

13 files changed

+476
-182
lines changed

13 files changed

+476
-182
lines changed

contrib/pg_exchange/common.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
#include "common.h"
2525

2626

27+
MemoryContext memory_context = NULL;
2728
ExchangeSharedState *ExchShmem = NULL;
2829

2930
static bool

contrib/pg_exchange/common.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ typedef struct
4242
HTAB *htab;
4343
} ExchangeSharedState;
4444

45+
extern MemoryContext memory_context;
4546
extern ExchangeSharedState *ExchShmem;
4647

4748
bool plan_tree_walker(Plan *plan, bool (*walker) (), void *context);

contrib/pg_exchange/dmq.c

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -794,7 +794,11 @@ dmq_handle_message(StringInfo msg, shm_mq_handle **mq_handles, dsm_segment *seg)
794794
}
795795
}
796796

797-
#define DMQ_RECV_BUFFER 8192
797+
/*
798+
* recv_buffer can be as large as possible. It is critical for message passing
799+
* effectiveness.
800+
*/
801+
#define DMQ_RECV_BUFFER (8388608) /* 8 MB */
798802
static char recv_buffer[DMQ_RECV_BUFFER];
799803
static int recv_bytes;
800804
static int read_bytes;
@@ -1244,10 +1248,37 @@ dmq_push(DmqDestinationId dest_id, char *stream_name, char *msg)
12441248
}
12451249

12461250
static bool push_state = false;
1247-
static StringInfoData buf;
1251+
static StringInfoData buf = {NULL, 0, 0, 0};
1252+
1253+
/*
1254+
* _initStringInfo
1255+
*
1256+
* Replace call of initStringInfo() routine from stringinfo.c.
1257+
* We need larger strings and need to reduce memory allocations to optimize
1258+
* message passing.
1259+
*/
1260+
static void
1261+
_initStringInfo(StringInfo str, size_t size)
1262+
{
1263+
if (str->maxlen <= size)
1264+
{
1265+
size_t newsize = (size * 2 < 1024) ? 1024 : (size * 2);
1266+
1267+
if (str->data)
1268+
pfree(str->data);
1269+
1270+
/*
1271+
* We try to minimize str->data allocations. It can live all of the
1272+
* backend life.
1273+
*/
1274+
str->data = (char *) MemoryContextAlloc(TopMemoryContext, newsize);
1275+
str->maxlen = newsize;
1276+
}
1277+
resetStringInfo(str);
1278+
}
12481279

12491280
bool
1250-
dmq_push_buffer(DmqDestinationId dest_id, char *stream_name,
1281+
dmq_push_buffer(DmqDestinationId dest_id, const char *stream_name,
12511282
const void *payload, size_t len, bool nowait)
12521283
{
12531284
shm_mq_result res;
@@ -1256,7 +1287,7 @@ dmq_push_buffer(DmqDestinationId dest_id, char *stream_name,
12561287
{
12571288
ensure_outq_handle();
12581289

1259-
initStringInfo(&buf);
1290+
_initStringInfo(&buf, len);
12601291
pq_sendbyte(&buf, dest_id);
12611292
pq_send_ascii_string(&buf, stream_name);
12621293
pq_sendbytes(&buf, payload, len);

contrib/pg_exchange/dmq.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ dmq_pop(DmqSenderId *sender_id, const void **msg, Size *len, uint64 mask,
4747
extern bool dmq_pop_nb(DmqSenderId *sender_id, StringInfo msg, uint64 mask);
4848

4949
extern void dmq_push(DmqDestinationId dest_id, char *stream_name, char *msg);
50-
extern bool dmq_push_buffer(DmqDestinationId dest_id, char *stream_name,
50+
extern bool dmq_push_buffer(DmqDestinationId dest_id, const char *stream_name,
5151
const void *buffer, size_t len, bool nowait);
5252

5353
typedef void (*dmq_receiver_hook_type) (const char *);

contrib/pg_exchange/exchange.c

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1071,7 +1071,8 @@ init_state_ifany(ExchangeState *state)
10711071
state->hasLocal = true;
10721072
state->init = true;
10731073
}
1074-
int print1 = 0;
1074+
1075+
#include "postmaster/postmaster.h"
10751076
static TupleTableSlot *
10761077
EXCHANGE_Execute(CustomScanState *node)
10771078
{
@@ -1104,13 +1105,11 @@ EXCHANGE_Execute(CustomScanState *node)
11041105
return node->ss.ss_ScanTupleSlot;
11051106
case 1:
11061107
state->activeRemotes--;
1107-
// elog(LOG, "[%s %d] GOT NULL. activeRemotes: %d, lt=%d, rt=%d hasLocal=%hhu st=%d",\
1108+
// elog(LOG, "[%s %d] GOT NULL. activeRemotes: %d, lt=%d, rt=%d hasLocal=%hhu st=%d",
11081109
// state->stream, state->mode, state->activeRemotes,
11091110
// state->ltuples,
11101111
// state->rtuples, state->hasLocal, state->stuples);
11111112
break;
1112-
case 2: /* Close EXCHANGE channel */
1113-
break;
11141113
default:
11151114
/* Any system message */
11161115
break;
@@ -1124,15 +1123,17 @@ EXCHANGE_Execute(CustomScanState *node)
11241123

11251124
if (TupIsNull(slot))
11261125
{
1127-
int i;
1128-
// elog(LOG, "[%s] FINISH Local store: l=%d, r=%d s=%d",
1126+
// elog(LOG, "[%s] FINISH Local store: l=%d, r=%d s=%d, activeRemotes=%d",
11291127
// state->stream, state->ltuples,
1130-
// state->rtuples, state->stuples);
1128+
// state->rtuples, state->stuples, state->activeRemotes);
11311129
if (state->mode != EXCH_STEALTH)
1130+
{
1131+
int i;
1132+
11321133
for (i = 0; i < state->dests->nservers; i++)
11331134
SendByteMessage(state->dests->dests[i].dest_id,
1134-
state->stream, END_OF_TUPLES);
1135-
1135+
state->stream, END_OF_TUPLES, false);
1136+
}
11361137
state->hasLocal = false;
11371138
continue;
11381139
}
@@ -1166,9 +1167,11 @@ EXCHANGE_Execute(CustomScanState *node)
11661167
{
11671168
int i;
11681169
state->stuples++;
1170+
11691171
/* Send tuple to each server that involved. Himself too. */
11701172
for (i = 0; i < state->dests->nservers; i++)
1171-
SendTuple(state->dests->dests[i].dest_id, state->stream, slot, false);
1173+
SendTuple(state->dests->dests[i].dest_id, state->stream, slot);
1174+
11721175
return slot;
11731176
}
11741177
else
@@ -1179,7 +1182,7 @@ EXCHANGE_Execute(CustomScanState *node)
11791182
else
11801183
{
11811184
state->stuples++;
1182-
SendTuple(dest, state->stream, slot, false);
1185+
SendTuple(dest, state->stream, slot);
11831186
}
11841187
}
11851188
return NULL;

contrib/pg_exchange/nodeDistPlanExec.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -854,7 +854,7 @@ init_exchange_channel(PlanState *node, void *context)
854854
else
855855
state->indexes[i] = j;
856856

857-
SendByteMessage(dmq_data->dests[j].dest_id, state->stream, ib);
857+
SendByteMessage(dmq_data->dests[j].dest_id, state->stream, ib, true);
858858
}
859859

860860
for (i = 0; i < state->nnodes; i++)

contrib/pg_exchange/pg_exchange.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,8 @@ _PG_init(void)
101101

102102
old_dmq_receiver_stop_hook = dmq_receiver_stop_hook;
103103
dmq_receiver_stop_hook = OnNodeDisconnect;
104+
105+
memory_context = AllocSetContextCreate(TopMemoryContext, "PG_EXCHANGE_MEMCONTEXT", ALLOCSET_DEFAULT_SIZES*8);
104106
}
105107

106108
Datum

contrib/pg_exchange/sbuf.c

Lines changed: 179 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,187 @@
33
*
44
*/
55

6+
#include "postgres.h"
7+
8+
#include "access/htup_details.h"
9+
#include "nodes/pg_list.h"
10+
#include "utils/memutils.h" /* MemoryContexts */
11+
12+
#include "common.h"
613
#include "sbuf.h"
714

15+
typedef struct
16+
{
17+
StreamDataPackage header;
18+
int ntuples;
19+
char *curptr;
20+
char data[FLEXIBLE_ARRAY_MEMBER];
21+
} TupleBuffer;
22+
23+
#define TupleBufferMinSize offsetof(TupleBuffer, data)
24+
#define SDP_FREE_SPACE(buf) (StorageSize(buf) - (buf->curptr - &buf->data[0]))
25+
26+
static List *freebufs = NIL;
27+
28+
static int
29+
StorageSize(const TupleBuffer *buf)
30+
{
31+
return (SDP_Size(buf) - TupleBufferMinSize);
32+
}
33+
34+
bool
35+
SDP_IsEmpty(const StreamDataPackage *buffer)
36+
{
37+
TupleBuffer *buf = (TupleBuffer *) buffer;
38+
39+
Assert(buf->ntuples >= 0);
40+
Assert(SDP_FREE_SPACE(buf) >= 0);
41+
return buf->ntuples == 0;
42+
}
43+
44+
int
45+
SDP_Actual_size(const StreamDataPackage *buffer)
46+
{
47+
TupleBuffer *buf;
48+
49+
if (buffer->datalen <= 1)
50+
return SDP_Size(buffer);
51+
Assert(IsSDPBuf(buffer));
52+
53+
buf = (TupleBuffer *) buffer;
54+
return SDP_Size(buf) - SDP_FREE_SPACE(buf);
55+
}
56+
57+
/*
58+
* Check correctness of Stream Data Package
59+
*/
60+
bool
61+
IsSDPBuf(const StreamDataPackage *buffer)
62+
{
63+
TupleBuffer *buf;
64+
65+
if (buffer == NULL || buffer->datalen <= 1)
66+
return false;
67+
68+
buf = (TupleBuffer *) buffer;
69+
70+
if (buf->curptr == NULL || buf->ntuples < 0)
71+
return false;
72+
73+
return true;
74+
}
75+
76+
StreamDataPackage *
77+
SDP_Alloc(int size)
78+
{
79+
ListCell *lc;
80+
TupleBuffer *buf = NULL;
81+
MemoryContext OldMemoryContext;
82+
83+
OldMemoryContext = MemoryContextSwitchTo(memory_context);
84+
85+
/* To avoid palloc/free overheads we can store buffers */
86+
for (lc = list_head(freebufs); lc != NULL; lc = lnext(lc))
87+
{
88+
TupleBuffer *freebuf = (TupleBuffer *) lfirst(lc);
89+
90+
if (freebuf->header.datalen < size)
91+
continue;
92+
93+
buf = freebuf;
94+
freebufs = list_delete_ptr(freebufs, freebuf);
95+
break;
96+
}
97+
98+
if (buf == NULL)
99+
{
100+
size = Max((TupleBufferMinSize + size), DEFAULT_PACKAGE_SIZE);
101+
102+
/* No one buffer can be found */
103+
buf = palloc0(size + SDPHeaderSize);
104+
buf->header.datalen = size;
105+
}
106+
107+
buf->header.index = -1;
108+
buf->curptr = &buf->data[0];
109+
buf->ntuples = 0;
110+
MemoryContextSwitchTo(OldMemoryContext);
111+
return (StreamDataPackage *) buf;
112+
}
113+
114+
/*
115+
* Return buffer to the free buffers list
116+
*/
8117
void
9-
initTupleBuffer(TupleBuffer *tbuf, size_t mem_size)
118+
SDP_Free(StreamDataPackage *buffer)
119+
{
120+
TupleBuffer *buf = (TupleBuffer *) buffer;
121+
MemoryContext OldMemoryContext;
122+
123+
OldMemoryContext = MemoryContextSwitchTo(memory_context);
124+
125+
Assert(IsSDPBuf(buffer));
126+
buf->curptr = NULL;
127+
buf->ntuples = -1;
128+
freebufs = lappend(freebufs, buf);
129+
130+
MemoryContextSwitchTo(OldMemoryContext);
131+
}
132+
133+
bool
134+
SDP_Store(StreamDataPackage *buffer, HeapTuple tuple)
10135
{
11-
tbuf->curptr = &tbuf->data;
12-
/* Will corrected before send to DMQ for 'trim tails' purpose. */
13-
tbuf->size = mem_size;
136+
TupleBuffer *buf = (TupleBuffer *) buffer;
137+
HeapTuple dest;
138+
139+
Assert(buf != NULL && tuple != NULL);
140+
/* Check that user not pass pointer to non-allocated buffer */
141+
Assert(buf->curptr != NULL && buf->ntuples >= 0);
142+
143+
if (SDP_FREE_SPACE(buf) < HEAPTUPLESIZE + tuple->t_len)
144+
return true;
145+
146+
dest = (HeapTuple) buf->curptr;
147+
dest->t_len = tuple->t_len;
148+
dest->t_self = tuple->t_self;
149+
dest->t_tableOid = tuple->t_tableOid;
150+
dest->t_data = (HeapTupleHeader) ((char *) dest + HEAPTUPLESIZE);
151+
buf->curptr += HEAPTUPLESIZE;
152+
memcpy((char *) dest->t_data, (char *) tuple->t_data, tuple->t_len);
153+
154+
buf->curptr += tuple->t_len;
155+
buf->ntuples++;
156+
Assert(SDP_FREE_SPACE(buf) >= 0);
157+
158+
return false;
159+
}
160+
161+
void
162+
SDP_PrepareToRead(StreamDataPackage *buffer)
163+
{
164+
TupleBuffer *buf = (TupleBuffer *) buffer;
165+
166+
Assert(IsSDPBuf(buffer));
167+
buf->curptr = buf->data;
168+
}
169+
170+
HeapTuple
171+
SDP_Get_tuple(StreamDataPackage *buffer)
172+
{
173+
TupleBuffer *buf;
174+
HeapTuple tuple;
175+
176+
Assert(IsSDPBuf(buffer));
177+
178+
if (SDP_IsEmpty(buffer))
179+
return NULL;
180+
181+
buf = (TupleBuffer *) buffer;
182+
tuple = (HeapTuple) buf->curptr;
183+
tuple->t_data = (HeapTupleHeader) ((char *) tuple + HEAPTUPLESIZE);
184+
185+
buf->curptr += TupSize(tuple);
186+
buf->ntuples--;
187+
188+
return tuple;
14189
}

0 commit comments

Comments
 (0)