Skip to content

Commit 60dda54

Browse files
committed
pathman: reinitialize dsm segment after its destruction
1 parent 43a78cd commit 60dda54

File tree

4 files changed

+173
-54
lines changed

4 files changed

+173
-54
lines changed

contrib/pathman/dsm_array.c

Lines changed: 30 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -11,32 +11,44 @@ static dsm_segment *segment = NULL;
1111
void
1212
alloc_dsm_table()
1313
{
14-
bool foundPtr;
15-
table = (Table *) ShmemInitStruct("dsm table", sizeof(Table), &foundPtr);
16-
table->segment = 0;
14+
bool found;
15+
table = (Table *) ShmemInitStruct("dsm table", sizeof(Table), &found);
16+
if (!found)
17+
table->segment_handle = 0;
1718
}
1819

1920

20-
void create_dsm_segment(size_t block_size)
21+
/*
22+
* Initialize dsm segment. Returns true if new segment was created and
23+
* false if attached to existing segment
24+
*/
25+
bool
26+
init_dsm_segment(size_t block_size)
2127
{
22-
// bool foundPtr;
28+
bool ret;
2329
dsm_handle handle;
2430

2531
/* lock here */
2632
LWLockAcquire(dsm_init_lock, LW_EXCLUSIVE);
2733

28-
/* if segment hasn't been created yet then create it */
29-
if (table->segment == 0)
34+
/* if there is already an existing segment then attach to it */
35+
if (table->segment_handle != 0)
36+
{
37+
ret = false;
38+
segment = dsm_attach(table->segment_handle);
39+
}
40+
41+
/*
42+
* If segment hasn't been created yet or has already been destroyed
43+
* (it happens when last session detaches segment) then create new one
44+
*/
45+
if (table->segment_handle == 0 || segment == NULL)
3046
{
3147
/* create segment */
3248
segment = dsm_create(block_size * BLOCKS_COUNT, 0);
3349
handle = dsm_segment_handle(segment);
3450
init_dsm_table(table, handle, block_size);
35-
}
36-
/* else attach to an existing segment */
37-
else
38-
{
39-
segment = dsm_attach(table->segment);
51+
ret = true;
4052
}
4153

4254
/*
@@ -47,17 +59,18 @@ void create_dsm_segment(size_t block_size)
4759

4860
/* unlock here */
4961
LWLockRelease(dsm_init_lock);
62+
63+
return ret;
5064
}
5165

5266
void
5367
init_dsm_table(Table *tbl, dsm_handle h, size_t block_size)
5468
{
5569
int i;
5670
Block *block;
57-
// Segment *segment = dsm_attach(h);
5871

59-
// seg = dsm_create(block_size * BLOCKS_COUNT, DSM_CREATE_NULL_IF_MAXSEGMENTS);
60-
table->segment = h;
72+
memset(table, 0, sizeof(Table));
73+
table->segment_handle = h;
6174
table->block_size = block_size;
6275
table->first_free = 0;
6376

@@ -83,8 +96,8 @@ alloc_dsm_array(DsmArray *arr, size_t entry_size, size_t length)
8396
Block *block = NULL;
8497
int free_count = 0;
8598
int size_requested = entry_size * length;
86-
int min_pos;
87-
int max_pos;
99+
int min_pos = 0;
100+
int max_pos = 0;
88101

89102
for (i = table->first_free; i<BLOCKS_COUNT; i++)
90103
{

contrib/pathman/init.c

Lines changed: 39 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,13 @@ static int cmp_range_entries(const void *p1, const void *p2);
2626
void
2727
init(void)
2828
{
29+
bool new_segment_created;
30+
2931
initialization_needed = false;
30-
create_dsm_segment(32);
32+
new_segment_created = init_dsm_segment(32);
3133

3234
LWLockAcquire(load_config_lock, LW_EXCLUSIVE);
33-
load_part_relations_hashtable();
35+
load_part_relations_hashtable(new_segment_created);
3436
LWLockRelease(load_config_lock);
3537
}
3638

@@ -42,9 +44,9 @@ check_extension()
4244
}
4345

4446
void
45-
load_part_relations_hashtable()
47+
load_part_relations_hashtable(bool reinitialize)
4648
{
47-
PartRelationInfo *prinfo;
49+
PartRelationInfo *prel;
4850
int ret;
4951
int i;
5052
int proc;
@@ -78,12 +80,12 @@ load_part_relations_hashtable()
7880
HeapTuple tuple = tuptable->vals[i];
7981
int oid = DatumGetObjectId(SPI_getbinval(tuple, tupdesc, 1, &isnull));
8082

81-
prinfo = (PartRelationInfo*)
83+
prel = (PartRelationInfo*)
8284
hash_search(relations, (const void *)&oid, HASH_ENTER, NULL);
83-
prinfo->oid = oid;
84-
prinfo->attnum = DatumGetInt32(SPI_getbinval(tuple, tupdesc, 2, &isnull));
85-
prinfo->parttype = DatumGetInt32(SPI_getbinval(tuple, tupdesc, 3, &isnull));
86-
prinfo->atttype = DatumGetObjectId(SPI_getbinval(tuple, tupdesc, 4, &isnull));
85+
prel->oid = oid;
86+
prel->attnum = DatumGetInt32(SPI_getbinval(tuple, tupdesc, 2, &isnull));
87+
prel->parttype = DatumGetInt32(SPI_getbinval(tuple, tupdesc, 3, &isnull));
88+
prel->atttype = DatumGetObjectId(SPI_getbinval(tuple, tupdesc, 4, &isnull));
8789

8890
part_oids = lappend_int(part_oids, oid);
8991

@@ -97,18 +99,31 @@ load_part_relations_hashtable()
9799
{
98100
Oid oid = (int) lfirst_int(lc);
99101

100-
prinfo = (PartRelationInfo*)
102+
prel = (PartRelationInfo*)
101103
hash_search(relations, (const void *)&oid, HASH_FIND, NULL);
102104

103105
// load_check_constraints(oid);
104106

105-
switch(prinfo->parttype)
107+
switch(prel->parttype)
106108
{
107109
case PT_RANGE:
108110
// load_range_restrictions(oid);
111+
if (reinitialize && prel->children.length > 0)
112+
{
113+
RangeRelation *rangerel = (RangeRelation *)
114+
hash_search(range_restrictions, (void *) &oid, HASH_FIND, NULL);
115+
free_dsm_array(&prel->children);
116+
free_dsm_array(&rangerel->ranges);
117+
prel->children_count = 0;
118+
}
109119
load_check_constraints(oid);
110120
break;
111121
case PT_HASH:
122+
if (reinitialize && prel->children.length > 0)
123+
{
124+
free_dsm_array(&prel->children);
125+
prel->children_count = 0;
126+
}
112127
load_hash_restrictions(oid);
113128
break;
114129
}
@@ -242,6 +257,10 @@ load_check_constraints(Oid parent_oid)
242257
prel = (PartRelationInfo*)
243258
hash_search(relations, (const void *) &parent_oid, HASH_FIND, &found);
244259

260+
/* skip if already loaded */
261+
if (prel->children.length > 0)
262+
return;
263+
245264
// /* if already loaded then quit */
246265
// if (prel->children_count > 0)
247266
// return;
@@ -256,7 +275,7 @@ load_check_constraints(Oid parent_oid)
256275

257276
if (ret > 0 && SPI_tuptable != NULL)
258277
{
259-
TupleDesc tupdesc = SPI_tuptable->tupdesc;
278+
// TupleDesc tupdesc = SPI_tuptable->tupdesc;
260279
SPITupleTable *tuptable = SPI_tuptable;
261280
Oid *children;
262281
RangeEntry *ranges;
@@ -265,7 +284,7 @@ load_check_constraints(Oid parent_oid)
265284

266285
rangerel = (RangeRelation *)
267286
hash_search(range_restrictions, (void *) &parent_oid, HASH_ENTER, &found);
268-
rangerel->nranges = 0;
287+
// rangerel->nranges = 0;
269288

270289
alloc_dsm_array(&prel->children, sizeof(Oid), proc);
271290
children = (Oid *) dsm_array_get_pointer(&prel->children);
@@ -304,15 +323,15 @@ load_check_constraints(Oid parent_oid)
304323
re.min = min;
305324
re.max = max;
306325

307-
ranges[rangerel->nranges++] = re;
326+
ranges[i] = re;
308327
// children[prel->children_count++] = re.child_oid;
309328
}
310329

311330
/* sort ascending */
312-
qsort(ranges, rangerel->nranges, sizeof(RangeEntry), cmp_range_entries);
331+
qsort(ranges, proc, sizeof(RangeEntry), cmp_range_entries);
313332

314333
/* copy oids to prel */
315-
for(i=0; i < rangerel->nranges; i++, prel->children_count++)
334+
for(i=0; i < proc; i++, prel->children_count++)
316335
children[i] = ranges[i].child_oid;
317336

318337
/* TODO: check if some ranges overlap! */
@@ -324,8 +343,8 @@ load_check_constraints(Oid parent_oid)
324343
static int
325344
cmp_range_entries(const void *p1, const void *p2)
326345
{
327-
RangeEntry *v1 = (const RangeEntry *) p1;
328-
RangeEntry *v2 = (const RangeEntry *) p2;
346+
const RangeEntry *v1 = (const RangeEntry *) p1;
347+
const RangeEntry *v2 = (const RangeEntry *) p2;
329348

330349
if (v1->min < v2->min)
331350
return -1;
@@ -382,8 +401,8 @@ validate_range_constraint(Expr *expr, PartRelationInfo *prel, Datum *min, Datum
382401
return false;
383402
*max = ((Const*) right)->constvalue;
384403
}
385-
else
386-
return false;
404+
405+
return false;
387406
}
388407

389408
/*

0 commit comments

Comments
 (0)