Skip to content

Commit d42fd8f

Browse files
committed
dsm_array_get_pointer() can return palloc'ed copy, new InitWalkerContext macro, WalkerContext caches RangeEntry array
1 parent 9d8590c commit d42fd8f

File tree

11 files changed

+299
-180
lines changed

11 files changed

+299
-180
lines changed

src/dsm_array.c

Lines changed: 48 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,14 @@ typedef struct DsmConfig
2626

2727
static DsmConfig *dsm_cfg = NULL;
2828

29-
typedef int BlockHeader;
29+
30+
/*
31+
* Block header
32+
*
33+
* Its size must be 4 bytes for 32bit and 8 bytes for 64bit.
34+
* Otherwise it could screw up an alignment (for example on Sparc9)
35+
*/
36+
typedef uintptr_t BlockHeader;
3037
typedef BlockHeader* BlockHeaderPtr;
3138

3239
#define FREE_BIT 0x80000000
@@ -144,20 +151,22 @@ init_dsm_table(size_t block_size, size_t start, size_t end)
144151
* Allocate array inside dsm_segment
145152
*/
146153
void
147-
alloc_dsm_array(DsmArray *arr, size_t entry_size, size_t length)
154+
alloc_dsm_array(DsmArray *arr, size_t entry_size, size_t elem_count)
148155
{
149-
int i = 0;
150-
int size_requested = entry_size * length;
151-
int min_pos = 0;
152-
int max_pos = 0;
153-
bool found = false;
154-
bool collecting_blocks = false;
155-
size_t offset = -1;
156-
size_t total_length = 0;
156+
size_t i = 0;
157+
size_t size_requested = entry_size * elem_count;
158+
size_t min_pos = 0;
159+
size_t max_pos = 0;
160+
bool found = false;
161+
bool collecting_blocks = false;
162+
size_t offset = -1;
163+
size_t total_length = 0;
157164
BlockHeaderPtr header;
158-
char *ptr = dsm_segment_address(segment);
165+
char *ptr = dsm_segment_address(segment);
166+
167+
arr->entry_size = entry_size;
159168

160-
for (i = dsm_cfg->first_free; i<dsm_cfg->blocks_count; )
169+
for (i = dsm_cfg->first_free; i < dsm_cfg->blocks_count; )
161170
{
162171
header = (BlockHeaderPtr) &ptr[i * dsm_cfg->block_size];
163172
if (is_free(header))
@@ -204,7 +213,7 @@ alloc_dsm_array(DsmArray *arr, size_t entry_size, size_t length)
204213
dsm_cfg->blocks_count = new_blocks_count;
205214

206215
/* try again */
207-
return alloc_dsm_array(arr, entry_size, length);
216+
return alloc_dsm_array(arr, entry_size, elem_count);
208217
}
209218

210219
/* look up for first free block */
@@ -233,7 +242,7 @@ alloc_dsm_array(DsmArray *arr, size_t entry_size, size_t length)
233242
*header = set_length(header, max_pos - min_pos + 1);
234243

235244
arr->offset = offset;
236-
arr->length = length;
245+
arr->elem_count = elem_count;
237246
}
238247
}
239248

@@ -258,37 +267,54 @@ free_dsm_array(DsmArray *arr)
258267
dsm_cfg->first_free = start;
259268

260269
arr->offset = 0;
261-
arr->length = 0;
270+
arr->elem_count = 0;
262271
}
263272

264273
void
265-
resize_dsm_array(DsmArray *arr, size_t entry_size, size_t length)
274+
resize_dsm_array(DsmArray *arr, size_t entry_size, size_t elem_count)
266275
{
267276
void *array_data;
268277
size_t array_data_size;
269278
void *buffer;
270279

271280
/* Copy data from array to temporary buffer */
272-
array_data = dsm_array_get_pointer(arr);
273-
array_data_size = arr->length * entry_size;
281+
array_data = dsm_array_get_pointer(arr, false);
282+
array_data_size = arr->elem_count * entry_size;
274283
buffer = palloc(array_data_size);
275284
memcpy(buffer, array_data, array_data_size);
276285

277286
/* Free array */
278287
free_dsm_array(arr);
279288

280289
/* Allocate new array */
281-
alloc_dsm_array(arr, entry_size, length);
290+
alloc_dsm_array(arr, entry_size, elem_count);
282291

283292
/* Copy data to new array */
284-
array_data = dsm_array_get_pointer(arr);
293+
array_data = dsm_array_get_pointer(arr, false);
285294
memcpy(array_data, buffer, array_data_size);
286295

287296
pfree(buffer);
288297
}
289298

290299
void *
291-
dsm_array_get_pointer(const DsmArray *arr)
300+
dsm_array_get_pointer(const DsmArray *arr, bool copy)
292301
{
293-
return (char *) dsm_segment_address(segment) + arr->offset + sizeof(BlockHeader);
302+
uint8 *segment_address,
303+
*dsm_array,
304+
*result;
305+
size_t size;
306+
307+
segment_address = (uint8 *) dsm_segment_address(segment);
308+
dsm_array = segment_address + arr->offset + sizeof(BlockHeader);
309+
310+
if (copy)
311+
{
312+
size = arr->elem_count * arr->entry_size;
313+
result = palloc(size);
314+
memcpy((void *) result, (void *) dsm_array, size);
315+
}
316+
else
317+
result = dsm_array;
318+
319+
return result;
294320
}

src/hooks.c

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -218,14 +218,11 @@ pathman_rel_pathlist_hook(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTb
218218
}
219219

220220
rte->inh = true;
221-
dsm_arr = (Oid *) dsm_array_get_pointer(&prel->children);
221+
dsm_arr = (Oid *) dsm_array_get_pointer(&prel->children, true);
222222
ranges = list_make1_irange(make_irange(0, prel->children_count - 1, false));
223223

224224
/* Make wrappers over restrictions and collect final rangeset */
225-
context.prel = prel;
226-
context.econtext = NULL;
227-
context.hasLeast = false;
228-
context.hasGreatest = false;
225+
InitWalkerContext(&context, prel, NULL);
229226
wrappers = NIL;
230227
foreach(lc, rel->baserestrictinfo)
231228
{

src/init.c

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -98,19 +98,19 @@ load_config(void)
9898
* oid into it. This array contains databases oids
9999
* that have already been cached (to prevent repeat caching)
100100
*/
101-
if (&pmstate->databases.length > 0)
101+
if (&pmstate->databases.elem_count > 0)
102102
free_dsm_array(&pmstate->databases);
103103
alloc_dsm_array(&pmstate->databases, sizeof(Oid), 1);
104-
databases = (Oid *) dsm_array_get_pointer(&pmstate->databases);
104+
databases = (Oid *) dsm_array_get_pointer(&pmstate->databases, false);
105105
databases[0] = MyDatabaseId;
106106
}
107107
else
108108
{
109-
int databases_count = pmstate->databases.length;
109+
int databases_count = pmstate->databases.elem_count;
110110
int i;
111111

112112
/* Check if we already cached config for current database */
113-
databases = (Oid *) dsm_array_get_pointer(&pmstate->databases);
113+
databases = (Oid *) dsm_array_get_pointer(&pmstate->databases, false);
114114
for(i = 0; i < databases_count; i++)
115115
if (databases[i] == MyDatabaseId)
116116
{
@@ -120,7 +120,7 @@ load_config(void)
120120

121121
/* Put current database oid to databases list */
122122
resize_dsm_array(&pmstate->databases, sizeof(Oid), databases_count + 1);
123-
databases = (Oid *) dsm_array_get_pointer(&pmstate->databases);
123+
databases = (Oid *) dsm_array_get_pointer(&pmstate->databases, false);
124124
databases[databases_count] = MyDatabaseId;
125125
}
126126

@@ -227,7 +227,7 @@ load_relations_hashtable(bool reinitialize)
227227
switch(prel->parttype)
228228
{
229229
case PT_RANGE:
230-
if (reinitialize && prel->children.length > 0)
230+
if (reinitialize && prel->children.elem_count > 0)
231231
{
232232
RangeRelation *rangerel = get_pathman_range_relation(oid, NULL);
233233
free_dsm_array(&prel->children);
@@ -237,7 +237,7 @@ load_relations_hashtable(bool reinitialize)
237237
load_check_constraints(oid, GetCatalogSnapshot(oid));
238238
break;
239239
case PT_HASH:
240-
if (reinitialize && prel->children.length > 0)
240+
if (reinitialize && prel->children.elem_count > 0)
241241
{
242242
free_dsm_array(&prel->children);
243243
prel->children_count = 0;
@@ -286,7 +286,7 @@ load_check_constraints(Oid parent_oid, Snapshot snapshot)
286286
prel = get_pathman_relation_info(parent_oid, NULL);
287287

288288
/* Skip if already loaded */
289-
if (prel->children.length > 0)
289+
if (prel->children.elem_count > 0)
290290
return;
291291

292292
plan = SPI_prepare("select pg_constraint.* "
@@ -309,7 +309,7 @@ load_check_constraints(Oid parent_oid, Snapshot snapshot)
309309
int hash;
310310

311311
alloc_dsm_array(&prel->children, sizeof(Oid), proc);
312-
children = (Oid *) dsm_array_get_pointer(&prel->children);
312+
children = (Oid *) dsm_array_get_pointer(&prel->children, false);
313313

314314
if (prel->parttype == PT_RANGE)
315315
{
@@ -322,7 +322,7 @@ load_check_constraints(Oid parent_oid, Snapshot snapshot)
322322
hash_search(range_restrictions, (void *) &key, HASH_ENTER, &found);
323323

324324
alloc_dsm_array(&rangerel->ranges, sizeof(RangeEntry), proc);
325-
ranges = (RangeEntry *) dsm_array_get_pointer(&rangerel->ranges);
325+
ranges = (RangeEntry *) dsm_array_get_pointer(&rangerel->ranges, false);
326326

327327
tce = lookup_type_cache(prel->atttype, 0);
328328
rangerel->by_val = tce->typbyval;
@@ -535,7 +535,7 @@ validate_hash_constraint(Expr *expr, PartRelationInfo *prel, int *hash)
535535
return false;
536536
if ( ((Var*) left)->varattno != prel->attnum )
537537
return false;
538-
if (DatumGetInt32(((Const*) right)->constvalue) != prel->children.length)
538+
if (DatumGetInt32(((Const*) right)->constvalue) != prel->children.elem_count)
539539
return false;
540540

541541
if ( !IsA(lsecond(eqexpr->args), Const) )

src/nodes_common.c

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ get_partition_oids(List *ranges, int *n, PartRelationInfo *prel)
211211
int allocated = INITIAL_ALLOC_NUM;
212212
int used = 0;
213213
Oid *result = (Oid *) palloc(allocated * sizeof(Oid));
214-
Oid *children = dsm_array_get_pointer(&prel->children);
214+
Oid *children = dsm_array_get_pointer(&prel->children, true);
215215

216216
foreach (range_cell, ranges)
217217
{
@@ -495,20 +495,30 @@ rescan_append_common(CustomScanState *node)
495495
ListCell *lc;
496496
Oid *parts;
497497
int nparts;
498-
WalkerContext wcxt;
499498

500499
ranges = list_make1_irange(make_irange(0, prel->children_count - 1, false));
501500

502-
wcxt.prel = prel;
503-
wcxt.econtext = econtext;
504-
wcxt.hasLeast = false;
505-
wcxt.hasGreatest = false;
501+
/*
502+
* We'd like to persist RangeEntry array
503+
* in case of range partitioning, so 'wcxt'
504+
* is stored inside of RuntimeAppendState
505+
*/
506+
if (!scan_state->wcxt_cached)
507+
{
508+
scan_state->wcxt.prel = prel;
509+
scan_state->wcxt.econtext = econtext;
510+
scan_state->wcxt.ranges = NULL;
511+
512+
scan_state->wcxt_cached = true;
513+
}
514+
scan_state->wcxt.hasLeast = false; /* refresh runtime values */
515+
scan_state->wcxt.hasGreatest = false;
506516

507517
foreach (lc, scan_state->custom_exprs)
508518
{
509519
WrapperNode *wn;
510520

511-
wn = walk_expr_tree((Expr *) lfirst(lc), &wcxt);
521+
wn = walk_expr_tree((Expr *) lfirst(lc), &scan_state->wcxt);
512522

513523
ranges = irange_list_intersect(ranges, wn->rangeset);
514524
}

src/partition_filter.c

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ partition_filter_exec(CustomScanState *node)
126126

127127
PartitionFilterState *state = (PartitionFilterState *) node;
128128

129+
ExprContext *econtext = node->ss.ps.ps_ExprContext;
129130
EState *estate = node->ss.ps.state;
130131
PlanState *child_ps = (PlanState *) linitial(node->custom_ps);
131132
TupleTableSlot *slot;
@@ -138,7 +139,6 @@ partition_filter_exec(CustomScanState *node)
138139

139140
if (!TupIsNull(slot))
140141
{
141-
WalkerContext wcxt;
142142
List *ranges;
143143
int nparts;
144144
Oid *parts;
@@ -159,20 +159,34 @@ partition_filter_exec(CustomScanState *node)
159159
CopyToTempConst(constlen, attlen);
160160
CopyToTempConst(constbyval, attbyval);
161161

162-
wcxt.prel = state->prel;
163-
wcxt.econtext = NULL;
164-
wcxt.hasLeast = false;
165-
wcxt.hasGreatest = false;
162+
/*
163+
* We'd like to persist RangeEntry array
164+
* in case of range partitioning, so 'wcxt'
165+
* is stored inside of PartitionFilterState
166+
*/
167+
if (!state->wcxt_cached)
168+
{
169+
state->wcxt.prel = state->prel;
170+
state->wcxt.econtext = econtext;
171+
state->wcxt.ranges = NULL;
166172

167-
ranges = walk_expr_tree((Expr *) &state->temp_const, &wcxt)->rangeset;
173+
state->wcxt_cached = true;
174+
}
175+
state->wcxt.hasLeast = false; /* refresh runtime values */
176+
state->wcxt.hasGreatest = false;
177+
178+
ranges = walk_expr_tree((Expr *) &state->temp_const, &state->wcxt)->rangeset;
168179
parts = get_partition_oids(ranges, &nparts, state->prel);
169180

170181
if (nparts > 1)
171182
elog(ERROR, "PartitionFilter selected more than one partition");
172183
else if (nparts == 0)
184+
{
173185
selected_partid = add_missing_partition(state->partitioned_table,
174186
&state->temp_const);
175187

188+
refresh_walker_context_ranges(&state->wcxt);
189+
}
176190
else
177191
selected_partid = parts[0];
178192

src/partition_filter.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@ typedef struct
2727

2828
HTAB *result_rels_table;
2929
HASHCTL result_rels_table_config;
30+
31+
WalkerContext wcxt;
32+
bool wcxt_cached;
3033
} PartitionFilterState;
3134

3235

0 commit comments

Comments
 (0)