Skip to content

Commit 5b85db5

Browse files
committed
Bugfix: huge memory allocations on performance benchmark
1 parent e8d526b commit 5b85db5

File tree

7 files changed

+66
-45
lines changed

7 files changed

+66
-45
lines changed

contrib/pg_exchange/dmq.c

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1379,7 +1379,7 @@ DmqSenderId
13791379
dmq_attach_receiver(const char *sender_name, int mask_pos)
13801380
{
13811381
int i;
1382-
int handle_id;
1382+
int handle_id = -1;
13831383

13841384
/* Search for existed receiver. */
13851385
for (i = 0; i < dmq_local.n_inhandles; i++)
@@ -1520,19 +1520,19 @@ dmq_remote_id(const char *name)
15201520
}
15211521

15221522
/*
1523-
* Get a message from input queue. Execution blocking until message will not
1524-
* received. Returns false, if an error is occured.
1523+
* Get a message from input queue. If waitMsg = true, execution blocking until
1524+
* message will not received. Returns false, if an error is occured.
15251525
*
15261526
* sender_id - identifier of the received message sender.
15271527
* msg - pointer to local buffer that contains received message.
15281528
* len - size of received message.
15291529
*/
15301530
const char *
1531-
dmq_pop(DmqSenderId *sender_id, void **msg, Size *len, uint64 mask,
1531+
dmq_pop(DmqSenderId *sender_id, const void **msg, Size *len, uint64 mask,
15321532
bool waitMsg)
15331533
{
15341534
shm_mq_result res;
1535-
const char *stream;
1535+
char *stream;
15361536

15371537
Assert(msg && len);
15381538

@@ -1559,12 +1559,14 @@ dmq_pop(DmqSenderId *sender_id, void **msg, Size *len, uint64 mask,
15591559
if (res == SHM_MQ_SUCCESS)
15601560
{
15611561
/*
1562+
* Set message pointer and length.
15621563
* Stream name is first null-terminated string in
15631564
* the message buffer.
15641565
*/
1565-
stream = data;
1566-
*msg = (void *) (stream + strlen(stream) + 1);
1567-
*len -= (char *)(*msg) - stream;
1566+
stream = (char *) data;
1567+
*len -= (strlen(stream) + 1);
1568+
Assert(*len > 0);
1569+
*msg = ((char *) data + strlen(stream) + 1);
15681570
*sender_id = i;
15691571

15701572
mtm_log(DmqTraceIncoming,

contrib/pg_exchange/dmq.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ extern char *dmq_receiver_name(DmqDestinationId dest_id);
4242
extern DmqDestinationId dmq_remote_id(const char *name);
4343

4444
extern const char *
45-
dmq_pop(DmqSenderId *sender_id, void **msg, Size *len, uint64 mask,
45+
dmq_pop(DmqSenderId *sender_id, const void **msg, Size *len, uint64 mask,
4646
bool waitMsg);
4747
extern bool dmq_pop_nb(DmqSenderId *sender_id, StringInfo msg, uint64 mask);
4848

contrib/pg_exchange/exchange.c

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
#include "optimizer/cost.h"
2929
#include "optimizer/pathnode.h"
3030
#include "partitioning/partbounds.h"
31+
#include "postgres_fdw.h"
3132
#include "utils/lsyscache.h"
3233
#include "utils/rel.h"
3334
#include "utils/syscache.h"
@@ -514,9 +515,17 @@ exchange_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTblEnt
514515
break;
515516

516517
case T_ForeignPath:
518+
{
519+
PgFdwRelationInfo *fpinfo =
520+
(PgFdwRelationInfo *) subpath->parent->fdw_private;
521+
517522
serverid = subpath->parent->serverid;
518523
tmpPath = make_local_scan_path(tmpLocalScanPath,
519524
subpath->parent, &indexinfo);
525+
Assert(subpath->parent->fdw_private != NULL);
526+
tmpPath->rows = fpinfo->rows;
527+
tmpPath->total_cost += fpinfo->total_cost - fpinfo->startup_cost;
528+
}
520529
break;
521530

522531
default:
@@ -539,7 +548,6 @@ exchange_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTblEnt
539548
PATH_REQ_OUTER(tmpLocalScanPath), 0, false,
540549
((AppendPath *) path)->partitioned_rels, -1);
541550
path = (Path *) create_exchange_path(root, rel, (Path *) ap, EXCH_GATHER);
542-
543551
set_exchange_altrel(EXCH_GATHER, (ExchangePath *) path, rel, NULL, NULL,
544552
servers);
545553

@@ -559,6 +567,7 @@ exchange_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTblEnt
559567
path = (Path *) create_distexec_path(root, rel, path, servers);
560568

561569
distributed_pathlist = lappend(distributed_pathlist, path);
570+
bms_free(servers);
562571
}
563572
return distributed_pathlist;
564573
}
@@ -607,13 +616,13 @@ cost_exchange(PlannerInfo *root, RelOptInfo *baserel, ExchangePath *expath)
607616
* subtree M/N local tuples, send to network [M-M/N] tuples and same to
608617
* receive.
609618
*/
610-
path->rows /= expath->altrel.nparts;
619+
// path->rows /= expath->altrel.nparts;
611620
instances = expath->altrel.nparts;
612621
send_rows = path->rows - (path->rows/instances);
613622
received_rows = send_rows;
614623
local_rows = path->rows/instances;
615624
path->total_cost += (send_rows + local_rows) * cpu_tuple_cost;
616-
path->total_cost += (received_rows) * cpu_tuple_cost * 4.;
625+
path->total_cost += (received_rows) * cpu_tuple_cost * 10.;
617626
}
618627
break;
619628
default:
@@ -1062,11 +1071,10 @@ init_state_ifany(ExchangeState *state)
10621071
state->hasLocal = true;
10631072
state->init = true;
10641073
}
1065-
1074+
int print1 = 0;
10661075
static TupleTableSlot *
10671076
EXCHANGE_Execute(CustomScanState *node)
10681077
{
1069-
ScanState *ss = &node->ss;
10701078
ScanState *subPlanState = linitial(node->custom_ps);
10711079
ExchangeState *state = (ExchangeState *) node;
10721080
bool readRemote = false;
@@ -1080,32 +1088,34 @@ EXCHANGE_Execute(CustomScanState *node)
10801088

10811089
readRemote = !readRemote;
10821090

1083-
if ((state->activeRemotes > 0) && readRemote)
1091+
if ((state->activeRemotes > 0) /*&& readRemote */)
10841092
{
10851093
int status;
1094+
status = RecvTuple(state->stream, node->ss.ss_ScanTupleSlot);
10861095

1087-
slot = RecvTuple(ss->ss_ScanTupleSlot->tts_tupleDescriptor,
1088-
state->stream, &status);
10891096
switch (status)
10901097
{
10911098
case -1:
10921099
/* No tuples currently */
10931100
break;
10941101
case 0:
1095-
Assert(!TupIsNull(slot));
1102+
Assert(!TupIsNull(node->ss.ss_ScanTupleSlot));
10961103
state->rtuples++;
1097-
return slot;
1104+
return node->ss.ss_ScanTupleSlot;
10981105
case 1:
10991106
state->activeRemotes--;
1100-
// elog(LOG, "[%s] GOT NULL. activeRemotes: %d, lt=%d, rt=%d hasLocal=%hhu st=%d", state->stream,
1101-
// state->activeRemotes, state->ltuples, state->rtuples, state->hasLocal, state->stuples);
1107+
// elog(LOG, "[%s %d] GOT NULL. activeRemotes: %d, lt=%d, rt=%d hasLocal=%hhu st=%d",\
1108+
// state->stream, state->mode, state->activeRemotes,
1109+
// state->ltuples,
1110+
// state->rtuples, state->hasLocal, state->stuples);
11021111
break;
11031112
case 2: /* Close EXCHANGE channel */
11041113
break;
11051114
default:
11061115
/* Any system message */
11071116
break;
11081117
}
1118+
slot = NULL;
11091119
}
11101120

11111121
if ((state->hasLocal) && (!readRemote))
@@ -1170,7 +1180,6 @@ EXCHANGE_Execute(CustomScanState *node)
11701180
{
11711181
state->stuples++;
11721182
SendTuple(dest, state->stream, slot, false);
1173-
// elog(LOG, "Send tuple: %d", state->stuples);
11741183
}
11751184
}
11761185
return NULL;
@@ -1241,7 +1250,7 @@ EXCHANGE_Explain(CustomScanState *node, List *ancestors, ExplainState *es)
12411250
}
12421251

12431252
appendStringInfo(&str, "mode: %s, stream: %s. ", mode, state->stream);
1244-
appendStringInfo(&str, "qual: %s.", nodeToString(state->partexprs));
1253+
// appendStringInfo(&str, "qual: %s.", nodeToString(state->partexprs));
12451254
ExplainPropertyText("Exchange", str.data, es);
12461255
}
12471256

contrib/pg_exchange/nodeDistPlanExec.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -864,6 +864,7 @@ init_exchange_channel(PlanState *node, void *context)
864864
if (j >= 0)
865865
{
866866
char c;
867+
867868
while ((c = RecvByteMessage(state->stream, dmq_data->dests[j].node)) == 0);
868869
Assert(c == 'I');
869870
}

contrib/pg_exchange/stream.c

Lines changed: 29 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,8 @@ Stream_unsubscribe(const char *streamName)
100100
istreams = list_delete_ptr(istreams, istream);
101101
ostreams = list_delete_ptr(ostreams, ostream);
102102
dmq_stream_unsubscribe(streamName);
103+
list_free(istream->deliveries);
104+
list_free(istream->msgs);
103105
pfree(istream);
104106
pfree(ostream);
105107
return true;
@@ -112,23 +114,29 @@ static void
112114
RecvIfAny(void)
113115
{
114116
const char *streamName;
115-
SendBuf *msg;
117+
const SendBuf *msg;
116118
Size len;
117119
DmqSenderId sender_id;
118120
IStream *istream;
119121

120122
/* Try to receive a message */
121123
for (;;)
122124
{
123-
streamName = dmq_pop(&sender_id, (void **)(&msg), &len, UINT64_MAX, false);
125+
streamName = dmq_pop(&sender_id, (const void **) &msg, &len, UINT64_MAX, false);
124126
if (!streamName)
125-
/* No messages arrived */
127+
/* No messages */
126128
return;
127129

128-
/* Any message was received */
130+
/* Message has been received */
129131
Assert(len >= MinSizeOfSendBuf);
130132
istream = (IStream *) get_stream(istreams, streamName);
131-
Assert(istream != NULL);
133+
134+
if (istream == NULL)
135+
{
136+
/* We can't lose any data except resended byte messages. */
137+
Assert(msg->datalen <= 1);
138+
return;
139+
}
132140

133141
if ((msg->index > istream->indexes[sender_id]) || IsDeliveryMessage(msg))
134142
{
@@ -137,13 +145,13 @@ RecvIfAny(void)
137145
buf = palloc(MinSizeOfRecvBuf + len - MinSizeOfSendBuf);
138146
buf->index = msg->index;
139147
buf->sid = sender_id;
148+
buf->datalen = msg->datalen;
140149

141150
if (IsDeliveryMessage(msg))
142151
istream->deliveries = lappend(istream->deliveries, buf);
143152
else
144153
{
145-
buf->datalen = len - MinSizeOfSendBuf;
146-
memcpy(&buf->data, &msg->data, buf->datalen);
154+
memcpy(buf->data, msg->data, buf->datalen);
147155
istream->msgs = lappend(istream->msgs, buf);
148156
istream->indexes[sender_id] = buf->index;
149157
}
@@ -249,6 +257,8 @@ ISendTuple(DmqDestinationId dest_id, char *stream, TupleTableSlot *slot,
249257

250258
if (buf->needConfirm)
251259
ostream->buf = buf;
260+
else
261+
pfree(buf);
252262

253263
return ostream;
254264
}
@@ -291,11 +301,11 @@ SendByteMessage(DmqDestinationId dest_id, char *stream, char tag)
291301

292302
ostream->buf = buf;
293303
ostream->dest_id = dest_id;
294-
304+
//elog(LOG, "-> [%s] SendByteMessage: dest_id=%d, tag=%c", stream, dest_id, tag);
295305
while (!dmq_push_buffer(dest_id, stream, buf, buf_len(buf), true))
296306
RecvIfAny();
297-
298307
wait_for_delivery(ostream);
308+
// elog(LOG, "-> SendByteMessage: CONFIRMED");
299309
pfree(ostream->buf);
300310
ostream->buf = NULL;
301311
}
@@ -352,19 +362,19 @@ SendTuple(DmqDestinationId dest_id, char *stream, TupleTableSlot *slot,
352362
* Receive tuple or message from any remote instance.
353363
* Returns NULL, if end-of-transfer received from a instance.
354364
*/
355-
TupleTableSlot *
356-
RecvTuple(TupleDesc tupdesc, char *streamName, int *status)
365+
int
366+
RecvTuple(char *streamName, TupleTableSlot *slot)
357367
{
358368
IStream *istream;
359369
ListCell *lc;
360-
TupleTableSlot *slot = NULL;
361370
List *temp = NIL;
371+
int status;
362372

363373
RecvIfAny();
364374

365375
istream = (IStream *) get_stream(istreams, streamName);
366376
Assert(istream);
367-
*status = -1; /* No tuples from network */
377+
status = -1; /* No tuples from network */
368378

369379
foreach(lc, istream->msgs)
370380
{
@@ -382,25 +392,24 @@ RecvTuple(TupleDesc tupdesc, char *streamName, int *status)
382392
{
383393
case END_OF_TUPLES:
384394
/* No tuples from network */
385-
*status = 1;
395+
status = 1;
386396
break;
387397
case 'Q':
388-
*status = 2;
398+
status = 2;
389399
break;
390400
default:
391-
*status = 3;
401+
status = 3;
392402
break;
393403
}
394404

395405
break;
396406
}
397407

398408
Assert(buf->datalen > 1);
399-
*status = 0;
409+
status = 0;
400410
tup = palloc(buf->datalen);
401-
memcpy(tup, buf->data, buf->datalen);
411+
memcpy(tup, &buf->data[0], buf->datalen);
402412
tup->t_data = (HeapTupleHeader) ((char *) tup + tupsize);
403-
slot = MakeSingleTupleTableSlot(tupdesc);
404413
slot = ExecStoreTuple((HeapTuple) tup, slot, InvalidBuffer, true);
405414
break;
406415
}
@@ -415,5 +424,5 @@ RecvTuple(TupleDesc tupdesc, char *streamName, int *status)
415424
}
416425
list_free(temp);
417426

418-
return slot;
427+
return status;
419428
}

contrib/pg_exchange/stream.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,5 +62,5 @@ extern void SendByteMessage(DmqDestinationId dest_id, char *stream, char tag);
6262
extern char RecvByteMessage(const char *streamName, const char *sender);
6363
extern void SendTuple(DmqDestinationId dest_id, char *stream, TupleTableSlot *slot,
6464
bool needConfirm);
65-
extern TupleTableSlot *RecvTuple(TupleDesc tupdesc, char *streamName, int *status);
65+
extern int RecvTuple(char *streamName, TupleTableSlot *slot);
6666
#endif /* CONTRIB_PG_EXCHANGE_STREAM_H_ */

src/backend/nodes/outfuncs.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ static void
6060
write_oid_field(StringInfo str, Oid oid)
6161
{
6262
int i;
63-
char *rulename;
63+
char *rulename = NULL;
6464
Oid ev_class = InvalidOid;
6565

6666
if (!portable_output)

0 commit comments

Comments
 (0)