Skip to content

Commit 345ca19

Browse files
committed
pathman: validate HASH constraints
1 parent 338ac21 commit 345ca19

File tree

2 files changed

+101
-20
lines changed

2 files changed

+101
-20
lines changed

contrib/pathman/init.c

Lines changed: 94 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
#include "catalog/pg_class.h"
66
#include "catalog/pg_constraint.h"
7+
#include "catalog/pg_operator.h"
78
#include "utils/syscache.h"
89
#include "access/htup_details.h"
910
#include "utils/builtins.h"
@@ -17,6 +18,7 @@ bool initialization_needed = true;
1718

1819

1920
static bool validate_range_constraint(Expr *, PartRelationInfo *, Datum *, Datum *);
21+
static bool validate_hash_constraint(Expr *expr, PartRelationInfo *prel, int *hash);
2022
static int cmp_range_entries(const void *p1, const void *p2);
2123

2224

@@ -124,7 +126,8 @@ load_part_relations_hashtable(bool reinitialize)
124126
free_dsm_array(&prel->children);
125127
prel->children_count = 0;
126128
}
127-
load_hash_restrictions(oid);
129+
load_check_constraints(oid);
130+
// load_hash_restrictions(oid);
128131
break;
129132
}
130133
}
@@ -281,16 +284,20 @@ load_check_constraints(Oid parent_oid)
281284
RangeEntry *ranges;
282285
Datum min;
283286
Datum max;
284-
285-
rangerel = (RangeRelation *)
286-
hash_search(range_restrictions, (void *) &parent_oid, HASH_ENTER, &found);
287-
// rangerel->nranges = 0;
287+
int hash;
288+
HashRelation *hashrel;
288289

289290
alloc_dsm_array(&prel->children, sizeof(Oid), proc);
290291
children = (Oid *) dsm_array_get_pointer(&prel->children);
291292

292-
alloc_dsm_array(&rangerel->ranges, sizeof(RangeEntry), proc);
293-
ranges = (RangeEntry *) dsm_array_get_pointer(&rangerel->ranges);
293+
if (prel->parttype == PT_RANGE)
294+
{
295+
rangerel = (RangeRelation *)
296+
hash_search(range_restrictions, (void *) &parent_oid, HASH_ENTER, &found);
297+
298+
alloc_dsm_array(&rangerel->ranges, sizeof(RangeEntry), proc);
299+
ranges = (RangeEntry *) dsm_array_get_pointer(&rangerel->ranges);
300+
}
294301

295302
for (i=0; i<proc; i++)
296303
{
@@ -315,24 +322,42 @@ load_check_constraints(Oid parent_oid)
315322
conbin = TextDatumGetCString(val);
316323
expr = (Expr *) stringToNode(conbin);
317324

318-
if (prel->parttype == PT_RANGE)
319-
validate_range_constraint(expr, prel, &min, &max);
325+
switch(prel->parttype)
326+
{
327+
case PT_RANGE:
328+
if (!validate_range_constraint(expr, prel, &min, &max))
329+
/* TODO: elog() */
330+
continue;
320331

321-
// re.child_oid = DatumGetObjectId(SPI_getbinval(tuple, tupdesc, 2, &arg1_isnull));
322-
re.child_oid = con->conrelid;
323-
re.min = min;
324-
re.max = max;
332+
re.child_oid = con->conrelid;
333+
re.min = min;
334+
re.max = max;
325335

326-
ranges[i] = re;
327-
// children[prel->children_count++] = re.child_oid;
336+
ranges[i] = re;
337+
break;
338+
339+
case PT_HASH:
340+
if (!validate_hash_constraint(expr, prel, &hash))
341+
/* TODO: elog() */
342+
continue;
343+
344+
hashrel = (HashRelation *)
345+
hash_search(hash_restrictions, (void *) &hash, HASH_ENTER, &found);
346+
hashrel->child_oid = con->conrelid;
347+
children[hash] = con->conrelid;
348+
}
328349
}
350+
prel->children_count = proc;
329351

330-
/* sort ascending */
331-
qsort(ranges, proc, sizeof(RangeEntry), cmp_range_entries);
352+
if (prel->parttype == PT_RANGE)
353+
{
354+
/* sort ascending */
355+
qsort(ranges, proc, sizeof(RangeEntry), cmp_range_entries);
332356

333-
/* copy oids to prel */
334-
for(i=0; i < proc; i++, prel->children_count++)
335-
children[i] = ranges[i].child_oid;
357+
/* copy oids to prel */
358+
for(i=0; i < proc; i++)
359+
children[i] = ranges[i].child_oid;
360+
}
336361

337362
/* TODO: check if some ranges overlap! */
338363
}
@@ -402,6 +427,55 @@ validate_range_constraint(Expr *expr, PartRelationInfo *prel, Datum *min, Datum
402427
*max = ((Const*) right)->constvalue;
403428
}
404429

430+
return true;
431+
}
432+
433+
/*
434+
* Validate hash constraint. It should look like "Var % Const = Const"
435+
*/
436+
static bool
437+
validate_hash_constraint(Expr *expr, PartRelationInfo *prel, int *hash)
438+
{
439+
OpExpr *eqexpr;
440+
OpExpr *modexpr;
441+
442+
if (!IsA(expr, OpExpr))
443+
return false;
444+
eqexpr = (OpExpr *) expr;
445+
446+
/* is this an equality operator? */
447+
if (eqexpr->opno != Int4EqualOperator)
448+
return false;
449+
450+
if (!IsA(linitial(eqexpr->args), OpExpr))
451+
return false;
452+
453+
/* is this a modulus operator? */
454+
modexpr = (OpExpr *) linitial(eqexpr->args);
455+
if (modexpr->opno != 530)
456+
return false;
457+
458+
if (list_length(modexpr->args) == 2)
459+
{
460+
Node *left = linitial(modexpr->args);
461+
Node *right = lsecond(modexpr->args);
462+
Const *mod_result;
463+
464+
if ( !IsA(left, Var) || !IsA(right, Const) )
465+
return false;
466+
if ( ((Var*) left)->varattno != prel->attnum )
467+
return false;
468+
if (DatumGetInt32(((Const*) right)->constvalue) != prel->children.length)
469+
return false;
470+
471+
if ( !IsA(lsecond(eqexpr->args), Const) )
472+
return false;
473+
474+
mod_result = lsecond(eqexpr->args);
475+
*hash = DatumGetInt32(mod_result->constvalue);
476+
return true;
477+
}
478+
405479
return false;
406480
}
407481

contrib/pathman/sql/hash.sql

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,13 @@ BEGIN
3232
, relation
3333
, partnum);
3434

35+
EXECUTE format('ALTER TABLE %s_%s ADD CHECK (%s %% %s = %s)'
36+
, relation
37+
, partnum
38+
, attribute
39+
, partitions_count
40+
, partnum);
41+
3542
-- EXECUTE format('CREATE TABLE %s_%s () INHERITS (%1$s)', relation, partnum);
3643
-- child_oid := relfilenode FROM pg_class WHERE relname = format('%s_%s', relation, partnum);
3744
INSERT INTO pg_pathman_hash_rels (parent, hash, child)

0 commit comments

Comments
 (0)