Skip to content

Commit 260738c

Browse files
committed
Some corrections on optimizations
1 parent 04dc7fe commit 260738c

File tree

12 files changed

+40
-40
lines changed

12 files changed

+40
-40
lines changed

contrib/pg_exchange/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ EXTVERSION = 0.1
66
PGFILEDESC = "pg_exchange - an exchange custom node and rules for the planner"
77
MODULES = pg_exchange
88
OBJS = common.o dmq.o exchange.o expath.o hooks.o nodeDistPlanExec.o \
9-
nodeDummyscan.o partutils.o pg_exchange.o stream.o $(WIN32RES)
9+
nodeDummyscan.o partutils.o pg_exchange.o sbuf.o stream.o $(WIN32RES)
1010

1111
fdw_srcdir = $(top_srcdir)/contrib/postgres_fdw/
1212

contrib/pg_exchange/dmq.c

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,10 @@
4747
#include "utils/dynahash.h"
4848
#include "utils/ps_status.h"
4949

50-
#define DMQ_MQ_SIZE ((Size) 65536)
50+
//#define DMQ_MQ_SIZE ((Size) 65536)
51+
#define DMQ_MQ_SIZE ((Size) 1048576) /* 1 MB */
52+
//#define DMQ_MQ_SIZE ((Size) 8388608) /* 8 MB */
53+
5154
#define DMQ_MQ_MAGIC 0x646d71
5255

5356
// XXX: move to common
@@ -222,6 +225,7 @@ dmq_shmem_size(void)
222225
{
223226
Size size = 0;
224227

228+
// size = add_size(size, DMQ_MQ_SIZE * DMQ_MAX_DESTINATIONS * 2);
225229
size = add_size(size, sizeof(struct DmqSharedState));
226230
size = add_size(size, hash_estimate_size(DMQ_MAX_SUBS_PER_BACKEND*MaxBackends,
227231
sizeof(DmqStreamSubscription)));

contrib/pg_exchange/stream.c

Lines changed: 25 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,13 @@
33
*
44
*/
55

6+
#include "sbuf.h"
67
#include "stream.h"
78
#include "miscadmin.h"
89
#include "unistd.h"
910
#include "utils/memutils.h" /* MemoryContexts */
1011

11-
#define IsDeliveryMessage(msg) (msg->tot_len == MinSizeOfSendBuf)
12+
#define IsDeliveryMessage(msg) (msg->datalen == 0)
1213

1314
static List *istreams = NIL;
1415
static List *ostreams = NIL;
@@ -41,6 +42,8 @@ get_stream(List *streams, const char *name)
4142
for (lc = list_head(streams); lc != NULL; lc = lnext(lc))
4243
{
4344
char *streamName = (char *) lfirst(lc);
45+
46+
/* streamName is a first field of IStream and OStream structures. */
4447
if (strcmp(name, streamName) == 0)
4548
return streamName;
4649
}
@@ -153,12 +156,13 @@ RecvIfAny(void)
153156
DmqDestinationId dest_id;
154157

155158
/* If message is not delivery message, send delivery. */
156-
dbuf.tot_len = MinSizeOfSendBuf;
159+
dbuf.datalen = 0;
157160
dbuf.index = msg->index;
158161
dest_id = dmq_dest_id(sender_id);
159162
Assert(dest_id >= 0);
160163

161-
dmq_push_buffer(dest_id, istream->streamName, &dbuf, dbuf.tot_len, false);
164+
dmq_push_buffer(dest_id, istream->streamName, &dbuf,
165+
MinSizeOfSendBuf, false);
162166
}
163167
}
164168
}
@@ -193,17 +197,18 @@ checkDelivery(OStream *ostream)
193197
{
194198
RecvBuf *buf = lfirst(lc);
195199

196-
istream->msgs = list_delete_ptr(istream->msgs, buf);
200+
istream->deliveries = list_delete_ptr(istream->deliveries, buf);
197201
pfree(buf);
198202
}
203+
list_free(temp);
199204
return found;
200205
}
201206

202207
static void
203208
StreamRepeatSend(OStream *ostream)
204209
{
205210
while (!dmq_push_buffer(ostream->dest_id, ostream->streamName, ostream->buf,
206-
ostream->buf->tot_len, true))
211+
buf_len(ostream->buf), true))
207212
RecvIfAny();
208213
}
209214

@@ -215,36 +220,31 @@ ISendTuple(DmqDestinationId dest_id, char *stream, TupleTableSlot *slot,
215220
int tupsize;
216221
SendBuf *buf;
217222
OStream *ostream;
223+
int tot_len;
218224

219225
RecvIfAny();
220226

221227
ostream = (OStream *) get_stream(ostreams, stream);
222228
Assert(ostream && !ostream->buf);
223229

224-
if (!TupIsNull(slot))
225-
{
226-
int tot_len;
230+
Assert(!TupIsNull(slot));
227231

228-
if (slot->tts_tuple == NULL)
229-
ExecMaterializeSlot(slot);
232+
if (slot->tts_tuple == NULL)
233+
ExecMaterializeSlot(slot);
230234

231-
tuple = slot->tts_tuple;
232-
tupsize = offsetof(HeapTupleData, t_data);
233-
234-
tot_len = MinSizeOfSendBuf + tupsize + tuple->t_len;
235-
buf = palloc(tot_len);
236-
buf->tot_len = tot_len;
237-
memcpy(buf->data, tuple, tupsize);
238-
memcpy(buf->data + tupsize, tuple->t_data, tuple->t_len);
239-
}
240-
else
241-
Assert(0);
235+
tuple = slot->tts_tuple;
236+
tupsize = offsetof(HeapTupleData, t_data);
237+
tot_len = MinSizeOfSendBuf + tupsize + tuple->t_len;
238+
buf = palloc(tot_len);
239+
buf->datalen = tot_len - MinSizeOfSendBuf;
240+
memcpy(buf->data, tuple, tupsize);
241+
memcpy(buf->data + tupsize, tuple->t_data, tuple->t_len);
242242

243243
buf->index = ++(ostream->index);
244244
buf->needConfirm = needConfirm;
245245
ostream->dest_id = dest_id;
246246

247-
while (!dmq_push_buffer(dest_id, stream, buf, buf->tot_len, true))
247+
while (!dmq_push_buffer(dest_id, stream, buf, buf_len(buf), true))
248248
RecvIfAny();
249249

250250
if (buf->needConfirm)
@@ -284,15 +284,15 @@ SendByteMessage(DmqDestinationId dest_id, char *stream, char tag)
284284
Assert(ostream && !ostream->buf);
285285

286286
buf = palloc(MinSizeOfSendBuf + 1);
287-
buf->tot_len = MinSizeOfSendBuf + 1;
287+
buf->datalen = 1;
288288
buf->data[0] = tag;
289289
buf->index = ++(ostream->index);
290290
buf->needConfirm = true;
291291

292292
ostream->buf = buf;
293293
ostream->dest_id = dest_id;
294294

295-
while (!dmq_push_buffer(dest_id, stream, buf, buf->tot_len, true))
295+
while (!dmq_push_buffer(dest_id, stream, buf, buf_len(buf), true))
296296
RecvIfAny();
297297

298298
wait_for_delivery(ostream);
@@ -335,7 +335,7 @@ RecvByteMessage(const char *streamName, const char *sender)
335335
*/
336336
void
337337
SendTuple(DmqDestinationId dest_id, char *stream, TupleTableSlot *slot,
338-
bool needConfirm)
338+
bool needConfirm)
339339
{
340340
OStream *ostream;
341341

contrib/pg_exchange/stream.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
typedef struct SendBuf
2222
{
2323
uint32 index;
24-
uint32 tot_len;
24+
uint32 datalen;
2525
bool needConfirm;
2626
char data[FLEXIBLE_ARRAY_MEMBER];
2727
} SendBuf;
@@ -35,12 +35,13 @@ typedef struct RecvBuf
3535
} RecvBuf;
3636

3737
#define MinSizeOfSendBuf offsetof(SendBuf, data)
38+
#define buf_len(buf) (MinSizeOfSendBuf + buf->datalen)
3839
#define MinSizeOfRecvBuf offsetof(RecvBuf, data)
3940

4041
typedef struct
4142
{
4243
char streamName[STREAM_NAME_MAX_LEN];
43-
uint64 index;
44+
uint32 index;
4445
SendBuf *buf;
4546
DmqDestinationId dest_id;
4647
} OStream;

src/backend/optimizer/path/joinpath.c

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,9 @@ static void consider_parallel_mergejoin(PlannerInfo *root,
6868
JoinType jointype,
6969
JoinPathExtraData *extra,
7070
Path *inner_cheapest_total);
71+
static void hash_inner_and_outer(PlannerInfo *root, RelOptInfo *joinrel,
72+
RelOptInfo *outerrel, RelOptInfo *innerrel,
73+
JoinType jointype, JoinPathExtraData *extra);
7174
static List *select_mergejoin_clauses(PlannerInfo *root,
7275
RelOptInfo *joinrel,
7376
RelOptInfo *outerrel,
@@ -1670,7 +1673,7 @@ consider_parallel_nestloop(PlannerInfo *root,
16701673
* 'jointype' is the type of join to do
16711674
* 'extra' contains additional input values
16721675
*/
1673-
void
1676+
static void
16741677
hash_inner_and_outer(PlannerInfo *root,
16751678
RelOptInfo *joinrel,
16761679
RelOptInfo *outerrel,

src/backend/optimizer/plan/planner.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1182,7 +1182,7 @@ inheritance_planner(PlannerInfo *root)
11821182
PlannerInfo **parent_roots = NULL;
11831183

11841184
Assert(parse->commandType != CMD_INSERT);
1185-
elog(LOG, "inheritance_plnner()");
1185+
11861186
/*
11871187
* We generate a modified instance of the original Query for each target
11881188
* relation, plan that, and put all the plans into a list that will be

src/backend/storage/ipc/dsm_impl.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,7 @@ dsm_impl_posix(dsm_op op, dsm_handle handle, Size request_size,
341341
*/
342342
if (errno == EINTR && elevel >= ERROR)
343343
CHECK_FOR_INTERRUPTS();
344-
344+
Assert(0);
345345
ereport(elevel,
346346
(errcode_for_dynamic_shared_memory(),
347347
errmsg("could not resize shared memory segment \"%s\" to %zu bytes: %m",

src/backend/storage/ipc/latch.c

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343
#include <poll.h>
4444
#endif
4545

46-
//#include "common/pg_socket.h"
4746
#include "miscadmin.h"
4847
#include "pgstat.h"
4948
#include "port/atomics.h"

src/backend/tcop/postgres.c

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2738,7 +2738,6 @@ StatementCancelHandler(SIGNAL_ARGS)
27382738
void
27392739
FloatExceptionHandler(SIGNAL_ARGS)
27402740
{
2741-
Assert(0);
27422741
/* We're not returning, so no need to save errno */
27432742
ereport(ERROR,
27442743
(errcode(ERRCODE_FLOATING_POINT_EXCEPTION),
@@ -3190,7 +3189,6 @@ check_stack_depth(void)
31903189
{
31913190
if (stack_is_too_deep())
31923191
{
3193-
Assert(0);
31943192
ereport(ERROR,
31953193
(errcode(ERRCODE_STATEMENT_TOO_COMPLEX),
31963194
errmsg("stack depth limit exceeded"),

src/backend/utils/cache/typcache.c

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1621,7 +1621,6 @@ lookup_rowtype_tupdesc_internal(Oid type_id, int32 typmod, bool noError)
16211621
ereport(ERROR,
16221622
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
16231623
errmsg("record type has not been registered")));
1624-
16251624
return NULL;
16261625
}
16271626
}

src/include/executor/nodeSeqscan.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
#include "access/parallel.h"
1818
#include "nodes/execnodes.h"
19-
#include "nodes/extensible.h"
2019

2120
extern SeqScanState *ExecInitSeqScan(SeqScan *node, EState *estate, int eflags);
2221
extern void ExecEndSeqScan(SeqScanState *node);

src/include/optimizer/paths.h

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -102,9 +102,6 @@ extern void add_paths_to_joinrel(PlannerInfo *root, RelOptInfo *joinrel,
102102
RelOptInfo *outerrel, RelOptInfo *innerrel,
103103
JoinType jointype, SpecialJoinInfo *sjinfo,
104104
List *restrictlist);
105-
extern void hash_inner_and_outer(PlannerInfo *root, RelOptInfo *joinrel,
106-
RelOptInfo *outerrel, RelOptInfo *innerrel,
107-
JoinType jointype, JoinPathExtraData *extra);
108105

109106
/*
110107
* joinrels.c

0 commit comments

Comments
 (0)