Skip to content

Commit d1587ff

Browse files
committed
fix check constraints validation mechanism
1 parent 4f04cff commit d1587ff

File tree

4 files changed

+179
-89
lines changed

4 files changed

+179
-89
lines changed

src/init.c

Lines changed: 174 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@ bool initialization_needed = true;
3131
static FmgrInfo *qsort_type_cmp_func;
3232
static bool globalByVal;
3333

34+
static bool validate_partition_constraints(Oid *children_oids,
35+
uint children_count,
36+
Snapshot snapshot,
37+
PartRelationInfo *prel,
38+
RangeRelation *rangerel);
3439
static bool validate_range_constraint(Expr *, PartRelationInfo *, Datum *, Datum *);
3540
static bool validate_hash_constraint(Expr *expr, PartRelationInfo *prel, int *hash);
3641
static bool read_opexpr_const(OpExpr *opexpr, int varattno, Datum *val);
@@ -120,7 +125,7 @@ load_config(void)
120125

121126
/* Load cache */
122127
LWLockAcquire(pmstate->load_config_lock, LW_EXCLUSIVE);
123-
load_relations_hashtable(new_segment_created);
128+
load_relations(new_segment_created);
124129
LWLockRelease(pmstate->load_config_lock);
125130
LWLockRelease(pmstate->dsm_init_lock);
126131
}
@@ -155,7 +160,7 @@ get_extension_schema()
155160
* Loads partitioned tables structure to hashtable
156161
*/
157162
void
158-
load_relations_hashtable(bool reinitialize)
163+
load_relations(bool reinitialize)
159164
{
160165
int ret,
161166
i,
@@ -233,15 +238,15 @@ load_relations_hashtable(bool reinitialize)
233238
free_dsm_array(&rangerel->ranges);
234239
prel->children_count = 0;
235240
}
236-
load_check_constraints(oid, GetCatalogSnapshot(oid));
241+
load_partitions(oid, GetCatalogSnapshot(oid));
237242
break;
238243
case PT_HASH:
239244
if (reinitialize && prel->children.length > 0)
240245
{
241246
free_dsm_array(&prel->children);
242247
prel->children_count = 0;
243248
}
244-
load_check_constraints(oid, GetCatalogSnapshot(oid));
249+
load_partitions(oid, GetCatalogSnapshot(oid));
245250
break;
246251
}
247252
}
@@ -268,18 +273,19 @@ create_relations_hashtable()
268273
* Load and validate CHECK constraints
269274
*/
270275
void
271-
load_check_constraints(Oid parent_oid, Snapshot snapshot)
276+
load_partitions(Oid parent_oid, Snapshot snapshot)
272277
{
273278
PartRelationInfo *prel = NULL;
274279
RangeRelation *rangerel = NULL;
275280
SPIPlanPtr plan;
276281
bool found;
277282
int ret,
278283
i,
279-
proc;
284+
children_count = 0;
280285
Datum vals[1];
281-
Oid oids[1] = {INT4OID};
286+
Oid types[1] = {INT4OID};
282287
bool nulls[1] = {false};
288+
Oid *children_oids;
283289

284290
vals[0] = Int32GetDatum(parent_oid);
285291
prel = get_pathman_relation_info(parent_oid, NULL);
@@ -288,77 +294,144 @@ load_check_constraints(Oid parent_oid, Snapshot snapshot)
288294
if (prel->children.length > 0)
289295
return;
290296

291-
plan = SPI_prepare("select pg_constraint.* "
292-
"from pg_constraint "
293-
"join pg_inherits on inhrelid = conrelid "
294-
"where inhparent = $1 and contype='c';",
295-
1, oids);
296-
ret = SPI_execute_snapshot(plan, vals, nulls,
297-
snapshot, InvalidSnapshot, true, false, 0);
298-
299-
proc = SPI_processed;
300-
297+
/* Load children oids */
298+
plan = SPI_prepare("select inhrelid from pg_inherits where inhparent = $1;", 1, types);
299+
ret = SPI_execute_snapshot(plan, vals, nulls, snapshot, InvalidSnapshot, true, false, 0);
300+
children_count = SPI_processed;
301301
if (ret > 0 && SPI_tuptable != NULL)
302302
{
303-
SPITupleTable *tuptable = SPI_tuptable;
304-
Oid *children;
305-
RangeEntry *ranges = NULL;
306-
Datum min;
307-
Datum max;
308-
int hash;
303+
children_oids = palloc(sizeof(Oid) * children_count);
304+
for(i=0; i<children_count; i++)
305+
{
306+
TupleDesc tupdesc = SPI_tuptable->tupdesc;
307+
HeapTuple tuple = SPI_tuptable->vals[i];
308+
bool isnull;
309309

310-
alloc_dsm_array(&prel->children, sizeof(Oid), proc);
311-
children = (Oid *) dsm_array_get_pointer(&prel->children);
310+
children_oids[i] = \
311+
DatumGetObjectId(SPI_getbinval(tuple, tupdesc, 1, &isnull));
312+
}
313+
}
312314

315+
if (children_count > 0)
316+
{
317+
alloc_dsm_array(&prel->children, sizeof(Oid), children_count);
318+
319+
/* allocate ranges array is dsm */
313320
if (prel->parttype == PT_RANGE)
314321
{
315-
TypeCacheEntry *tce;
316-
RelationKey key;
322+
TypeCacheEntry *tce = lookup_type_cache(prel->atttype, 0);
323+
RelationKey key;
324+
317325
key.dbid = MyDatabaseId;
318326
key.relid = parent_oid;
319-
320327
rangerel = (RangeRelation *)
321328
hash_search(range_restrictions, (void *) &key, HASH_ENTER, &found);
329+
rangerel->by_val = tce->typbyval;
330+
alloc_dsm_array(&rangerel->ranges, sizeof(RangeEntry), children_count);
331+
}
322332

323-
alloc_dsm_array(&rangerel->ranges, sizeof(RangeEntry), proc);
324-
ranges = (RangeEntry *) dsm_array_get_pointer(&rangerel->ranges);
333+
/* Validate partitions constraints */
334+
if (!validate_partition_constraints(children_oids,
335+
children_count,
336+
snapshot,
337+
prel,
338+
rangerel))
339+
{
340+
RelationKey key;
325341

326-
tce = lookup_type_cache(prel->atttype, 0);
327-
rangerel->by_val = tce->typbyval;
342+
/*
343+
* If validation failed then pg_pathman cannot handle this relation.
344+
* Remove it from the cache
345+
*/
346+
key.dbid = MyDatabaseId;
347+
key.relid = parent_oid;
348+
349+
free_dsm_array(&prel->children);
350+
free_dsm_array(&rangerel->ranges);
351+
hash_search(relations, (const void *) &key, HASH_REMOVE, &found);
352+
if (prel->parttype == PT_RANGE)
353+
hash_search(range_restrictions, (const void *) &key, HASH_REMOVE, &found);
354+
355+
elog(WARNING, "Validation failed for relation '%s'. "
356+
"It will not be handled by pg_pathman",
357+
get_rel_name(parent_oid));
328358
}
359+
else
360+
prel->children_count = children_count;
329361

330-
for (i=0; i<proc; i++)
362+
pfree(children_oids);
363+
}
364+
}
365+
366+
static bool
367+
validate_partition_constraints(Oid *children_oids,
368+
uint children_count,
369+
Snapshot snapshot,
370+
PartRelationInfo *prel,
371+
RangeRelation *rangerel)
372+
{
373+
RangeEntry re;
374+
bool isnull;
375+
char *conbin;
376+
Expr *expr;
377+
Datum oids[1];
378+
bool nulls[1] = {false};
379+
Oid types[1] = {INT4OID};
380+
Datum min,
381+
max;
382+
Datum conbin_datum;
383+
Form_pg_constraint con;
384+
RangeEntry *ranges = NULL;
385+
Oid *children;
386+
int hash;
387+
int i, j, idx;
388+
int ret;
389+
int proc;
390+
SPIPlanPtr plan;
391+
392+
/* Iterate through children */
393+
for (idx=0; idx<children_count; idx++)
394+
{
395+
Oid child_oid = children_oids[idx];
396+
397+
oids[0] = Int32GetDatum(child_oid);
398+
399+
/* Load constraints */
400+
plan = SPI_prepare("select * from pg_constraint where conrelid = $1 and contype = 'c';", 1, types);
401+
ret = SPI_execute_snapshot(plan, oids, nulls, snapshot, InvalidSnapshot, true, false, 0);
402+
proc = SPI_processed;
403+
404+
if (ret <= 0 || SPI_tuptable == NULL)
331405
{
332-
RangeEntry re;
333-
HeapTuple tuple = tuptable->vals[i];
334-
bool isnull;
335-
Datum val;
336-
char *conbin;
337-
Expr *expr;
406+
elog(WARNING, "No constraints found for relation '%s'.",
407+
get_rel_name(child_oid));
408+
return false;
409+
}
338410

339-
Form_pg_constraint con;
411+
children = dsm_array_get_pointer(&prel->children);
412+
if (prel->parttype == PT_RANGE)
413+
ranges = (RangeEntry *) dsm_array_get_pointer(&rangerel->ranges);
414+
415+
/* Iterate through check constraints and try to validate them */
416+
for (j=0; j<proc; j++)
417+
{
418+
HeapTuple tuple = SPI_tuptable->vals[j];
340419

341420
con = (Form_pg_constraint) GETSTRUCT(tuple);
421+
conbin_datum = SysCacheGetAttr(CONSTROID, tuple, Anum_pg_constraint_conbin, &isnull);
342422

343-
val = SysCacheGetAttr(CONSTROID, tuple, Anum_pg_constraint_conbin,
344-
&isnull);
423+
/* handle unexpected null value */
345424
if (isnull)
346-
elog(ERROR, "null conbin for constraint %u",
347-
HeapTupleGetOid(tuple));
348-
conbin = TextDatumGetCString(val);
425+
elog(ERROR, "null conbin for constraint %u", HeapTupleGetOid(tuple));
426+
427+
conbin = TextDatumGetCString(conbin_datum);
349428
expr = (Expr *) stringToNode(conbin);
350429

351430
switch(prel->parttype)
352431
{
353432
case PT_RANGE:
354433
if (!validate_range_constraint(expr, prel, &min, &max))
355-
{
356-
elog(WARNING, "Wrong CHECK constraint for relation '%s'. "
357-
"It MUST have exact format: "
358-
"VARIABLE >= CONST AND VARIABLE < CONST. Skipping...",
359-
get_rel_name(con->conrelid));
360434
continue;
361-
}
362435

363436
/* If datum is referenced by val then just assign */
364437
if (rangerel->by_val)
@@ -373,57 +446,74 @@ load_check_constraints(Oid parent_oid, Snapshot snapshot)
373446
memcpy(&re.max, DatumGetPointer(max), sizeof(re.max));
374447
}
375448
re.child_oid = con->conrelid;
376-
ranges[i] = re;
449+
ranges[idx] = re;
377450
break;
378451

379452
case PT_HASH:
380453
if (!validate_hash_constraint(expr, prel, &hash))
381-
{
382-
elog(WARNING, "Wrong CHECK constraint format for relation '%s'. "
383-
"Skipping...",
384-
get_rel_name(con->conrelid));
385454
continue;
386-
}
387455
children[hash] = con->conrelid;
456+
break;
388457
}
458+
459+
/* Constraint validated successfully. Move on to the next child */
460+
goto validate_next_child;
389461
}
390-
prel->children_count = proc;
391462

392-
if (prel->parttype == PT_RANGE)
463+
/* No constraint matches pattern */
464+
switch(prel->parttype)
393465
{
394-
TypeCacheEntry *tce;
395-
bool byVal = rangerel->by_val;
466+
case PT_RANGE:
467+
elog(WARNING, "Wrong CHECK constraint for relation '%s'. "
468+
"It MUST have exact format: "
469+
"VARIABLE >= CONST AND VARIABLE < CONST.",
470+
get_rel_name(con->conrelid));
471+
break;
472+
case PT_HASH:
473+
elog(WARNING, "Wrong CHECK constraint format for relation '%s'.",
474+
get_rel_name(con->conrelid));
475+
break;
476+
}
477+
return false;
396478

397-
/* Sort ascending */
398-
tce = lookup_type_cache(prel->atttype, TYPECACHE_CMP_PROC | TYPECACHE_CMP_PROC_FINFO);
399-
qsort_type_cmp_func = &tce->cmp_proc_finfo;
400-
globalByVal = byVal;
401-
qsort(ranges, proc, sizeof(RangeEntry), cmp_range_entries);
479+
validate_next_child:
480+
continue;
481+
}
402482

403-
/* Copy oids to prel */
404-
for(i=0; i < proc; i++)
405-
children[i] = ranges[i].child_oid;
483+
/*
484+
* Sort range partitions and check for overlaps
485+
*/
486+
if (prel->parttype == PT_RANGE)
487+
{
488+
TypeCacheEntry *tce;
489+
bool byVal = rangerel->by_val;
406490

407-
/* Check if some ranges overlap */
408-
for(i=0; i < proc-1; i++)
409-
{
410-
Datum cur_upper = PATHMAN_GET_DATUM(ranges[i].max, byVal);
411-
Datum next_lower = PATHMAN_GET_DATUM(ranges[i+1].min, byVal);
412-
bool overlap = DatumGetInt32(FunctionCall2(qsort_type_cmp_func, next_lower, cur_upper)) < 0;
491+
/* Sort ascending */
492+
tce = lookup_type_cache(prel->atttype, TYPECACHE_CMP_PROC | TYPECACHE_CMP_PROC_FINFO);
493+
qsort_type_cmp_func = &tce->cmp_proc_finfo;
494+
globalByVal = byVal;
495+
qsort(ranges, children_count, sizeof(RangeEntry), cmp_range_entries);
413496

414-
if (overlap)
415-
{
416-
RelationKey key;
417-
key.dbid = MyDatabaseId;
418-
key.relid = parent_oid;
497+
/* Copy oids to prel */
498+
for(i=0; i < children_count; i++)
499+
children[i] = ranges[i].child_oid;
419500

420-
elog(WARNING, "Partitions %u and %u overlap. Disabling pathman for relation %u...",
421-
ranges[i].child_oid, ranges[i+1].child_oid, parent_oid);
422-
hash_search(relations, (const void *) &key, HASH_REMOVE, &found);
423-
}
501+
/* Check if some ranges overlap */
502+
for(i=0; i < children_count-1; i++)
503+
{
504+
Datum cur_upper = PATHMAN_GET_DATUM(ranges[i].max, byVal);
505+
Datum next_lower = PATHMAN_GET_DATUM(ranges[i+1].min, byVal);
506+
bool overlap = DatumGetInt32(FunctionCall2(qsort_type_cmp_func, next_lower, cur_upper)) < 0;
507+
508+
if (overlap)
509+
{
510+
elog(WARNING, "Partitions %u and %u overlap.",
511+
ranges[i].child_oid, ranges[i+1].child_oid);
512+
return false;
424513
}
425514
}
426515
}
516+
return true;
427517
}
428518

429519

src/pathman.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -196,8 +196,8 @@ void load_config(void);
196196
void create_relations_hashtable(void);
197197
void create_hash_restrictions_hashtable(void);
198198
void create_range_restrictions_hashtable(void);
199-
void load_relations_hashtable(bool reinitialize);
200-
void load_check_constraints(Oid parent_oid, Snapshot snapshot);
199+
void load_relations(bool reinitialize);
200+
void load_partitions(Oid parent_oid, Snapshot snapshot);
201201
void remove_relation_info(Oid relid);
202202

203203
/* utility functions */

src/pl_funcs.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ on_partitions_created(PG_FUNCTION_ARGS)
4444

4545
/* Reload config */
4646
/* TODO: reload just the specified relation */
47-
load_relations_hashtable(false);
47+
load_relations(false);
4848

4949
LWLockRelease(pmstate->load_config_lock);
5050

@@ -64,7 +64,7 @@ on_partitions_updated(PG_FUNCTION_ARGS)
6464
{
6565
LWLockAcquire(pmstate->load_config_lock, LW_EXCLUSIVE);
6666
remove_relation_info(relid);
67-
load_relations_hashtable(false);
67+
load_relations(false);
6868
LWLockRelease(pmstate->load_config_lock);
6969
}
7070

0 commit comments

Comments
 (0)