@@ -31,6 +31,11 @@ bool initialization_needed = true;
31
31
static FmgrInfo * qsort_type_cmp_func ;
32
32
static bool globalByVal ;
33
33
34
+ static bool validate_partition_constraints (Oid * children_oids ,
35
+ uint children_count ,
36
+ Snapshot snapshot ,
37
+ PartRelationInfo * prel ,
38
+ RangeRelation * rangerel );
34
39
static bool validate_range_constraint (Expr * , PartRelationInfo * , Datum * , Datum * );
35
40
static bool validate_hash_constraint (Expr * expr , PartRelationInfo * prel , int * hash );
36
41
static bool read_opexpr_const (OpExpr * opexpr , int varattno , Datum * val );
@@ -120,7 +125,7 @@ load_config(void)
120
125
121
126
/* Load cache */
122
127
LWLockAcquire (pmstate -> load_config_lock , LW_EXCLUSIVE );
123
- load_relations_hashtable (new_segment_created );
128
+ load_relations (new_segment_created );
124
129
LWLockRelease (pmstate -> load_config_lock );
125
130
LWLockRelease (pmstate -> dsm_init_lock );
126
131
}
@@ -155,7 +160,7 @@ get_extension_schema()
155
160
* Loads partitioned tables structure to hashtable
156
161
*/
157
162
void
158
- load_relations_hashtable (bool reinitialize )
163
+ load_relations (bool reinitialize )
159
164
{
160
165
int ret ,
161
166
i ,
@@ -233,15 +238,15 @@ load_relations_hashtable(bool reinitialize)
233
238
free_dsm_array (& rangerel -> ranges );
234
239
prel -> children_count = 0 ;
235
240
}
236
- load_check_constraints (oid , GetCatalogSnapshot (oid ));
241
+ load_partitions (oid , GetCatalogSnapshot (oid ));
237
242
break ;
238
243
case PT_HASH :
239
244
if (reinitialize && prel -> children .length > 0 )
240
245
{
241
246
free_dsm_array (& prel -> children );
242
247
prel -> children_count = 0 ;
243
248
}
244
- load_check_constraints (oid , GetCatalogSnapshot (oid ));
249
+ load_partitions (oid , GetCatalogSnapshot (oid ));
245
250
break ;
246
251
}
247
252
}
@@ -268,18 +273,19 @@ create_relations_hashtable()
268
273
* Load and validate CHECK constraints
269
274
*/
270
275
void
271
- load_check_constraints (Oid parent_oid , Snapshot snapshot )
276
+ load_partitions (Oid parent_oid , Snapshot snapshot )
272
277
{
273
278
PartRelationInfo * prel = NULL ;
274
279
RangeRelation * rangerel = NULL ;
275
280
SPIPlanPtr plan ;
276
281
bool found ;
277
282
int ret ,
278
283
i ,
279
- proc ;
284
+ children_count = 0 ;
280
285
Datum vals [1 ];
281
- Oid oids [1 ] = {INT4OID };
286
+ Oid types [1 ] = {INT4OID };
282
287
bool nulls [1 ] = {false};
288
+ Oid * children_oids ;
283
289
284
290
vals [0 ] = Int32GetDatum (parent_oid );
285
291
prel = get_pathman_relation_info (parent_oid , NULL );
@@ -288,77 +294,144 @@ load_check_constraints(Oid parent_oid, Snapshot snapshot)
288
294
if (prel -> children .length > 0 )
289
295
return ;
290
296
291
- plan = SPI_prepare ("select pg_constraint.* "
292
- "from pg_constraint "
293
- "join pg_inherits on inhrelid = conrelid "
294
- "where inhparent = $1 and contype='c';" ,
295
- 1 , oids );
296
- ret = SPI_execute_snapshot (plan , vals , nulls ,
297
- snapshot , InvalidSnapshot , true, false, 0 );
298
-
299
- proc = SPI_processed ;
300
-
297
+ /* Load children oids */
298
+ plan = SPI_prepare ("select inhrelid from pg_inherits where inhparent = $1;" , 1 , types );
299
+ ret = SPI_execute_snapshot (plan , vals , nulls , snapshot , InvalidSnapshot , true, false, 0 );
300
+ children_count = SPI_processed ;
301
301
if (ret > 0 && SPI_tuptable != NULL )
302
302
{
303
- SPITupleTable * tuptable = SPI_tuptable ;
304
- Oid * children ;
305
- RangeEntry * ranges = NULL ;
306
- Datum min ;
307
- Datum max ;
308
- int hash ;
303
+ children_oids = palloc ( sizeof ( Oid ) * children_count ) ;
304
+ for ( i = 0 ; i < children_count ; i ++ )
305
+ {
306
+ TupleDesc tupdesc = SPI_tuptable -> tupdesc ;
307
+ HeapTuple tuple = SPI_tuptable -> vals [ i ] ;
308
+ bool isnull ;
309
309
310
- alloc_dsm_array (& prel -> children , sizeof (Oid ), proc );
311
- children = (Oid * ) dsm_array_get_pointer (& prel -> children );
310
+ children_oids [i ] = \
311
+ DatumGetObjectId (SPI_getbinval (tuple , tupdesc , 1 , & isnull ));
312
+ }
313
+ }
312
314
315
+ if (children_count > 0 )
316
+ {
317
+ alloc_dsm_array (& prel -> children , sizeof (Oid ), children_count );
318
+
319
+ /* allocate ranges array is dsm */
313
320
if (prel -> parttype == PT_RANGE )
314
321
{
315
- TypeCacheEntry * tce ;
316
- RelationKey key ;
322
+ TypeCacheEntry * tce = lookup_type_cache (prel -> atttype , 0 );
323
+ RelationKey key ;
324
+
317
325
key .dbid = MyDatabaseId ;
318
326
key .relid = parent_oid ;
319
-
320
327
rangerel = (RangeRelation * )
321
328
hash_search (range_restrictions , (void * ) & key , HASH_ENTER , & found );
329
+ rangerel -> by_val = tce -> typbyval ;
330
+ alloc_dsm_array (& rangerel -> ranges , sizeof (RangeEntry ), children_count );
331
+ }
322
332
323
- alloc_dsm_array (& rangerel -> ranges , sizeof (RangeEntry ), proc );
324
- ranges = (RangeEntry * ) dsm_array_get_pointer (& rangerel -> ranges );
333
+ /* Validate partitions constraints */
334
+ if (!validate_partition_constraints (children_oids ,
335
+ children_count ,
336
+ snapshot ,
337
+ prel ,
338
+ rangerel ))
339
+ {
340
+ RelationKey key ;
325
341
326
- tce = lookup_type_cache (prel -> atttype , 0 );
327
- rangerel -> by_val = tce -> typbyval ;
342
+ /*
343
+ * If validation failed then pg_pathman cannot handle this relation.
344
+ * Remove it from the cache
345
+ */
346
+ key .dbid = MyDatabaseId ;
347
+ key .relid = parent_oid ;
348
+
349
+ free_dsm_array (& prel -> children );
350
+ free_dsm_array (& rangerel -> ranges );
351
+ hash_search (relations , (const void * ) & key , HASH_REMOVE , & found );
352
+ if (prel -> parttype == PT_RANGE )
353
+ hash_search (range_restrictions , (const void * ) & key , HASH_REMOVE , & found );
354
+
355
+ elog (WARNING , "Validation failed for relation '%s'. "
356
+ "It will not be handled by pg_pathman" ,
357
+ get_rel_name (parent_oid ));
328
358
}
359
+ else
360
+ prel -> children_count = children_count ;
329
361
330
- for (i = 0 ; i < proc ; i ++ )
362
+ pfree (children_oids );
363
+ }
364
+ }
365
+
366
+ static bool
367
+ validate_partition_constraints (Oid * children_oids ,
368
+ uint children_count ,
369
+ Snapshot snapshot ,
370
+ PartRelationInfo * prel ,
371
+ RangeRelation * rangerel )
372
+ {
373
+ RangeEntry re ;
374
+ bool isnull ;
375
+ char * conbin ;
376
+ Expr * expr ;
377
+ Datum oids [1 ];
378
+ bool nulls [1 ] = {false};
379
+ Oid types [1 ] = {INT4OID };
380
+ Datum min ,
381
+ max ;
382
+ Datum conbin_datum ;
383
+ Form_pg_constraint con ;
384
+ RangeEntry * ranges = NULL ;
385
+ Oid * children ;
386
+ int hash ;
387
+ int i , j , idx ;
388
+ int ret ;
389
+ int proc ;
390
+ SPIPlanPtr plan ;
391
+
392
+ /* Iterate through children */
393
+ for (idx = 0 ; idx < children_count ; idx ++ )
394
+ {
395
+ Oid child_oid = children_oids [idx ];
396
+
397
+ oids [0 ] = Int32GetDatum (child_oid );
398
+
399
+ /* Load constraints */
400
+ plan = SPI_prepare ("select * from pg_constraint where conrelid = $1 and contype = 'c';" , 1 , types );
401
+ ret = SPI_execute_snapshot (plan , oids , nulls , snapshot , InvalidSnapshot , true, false, 0 );
402
+ proc = SPI_processed ;
403
+
404
+ if (ret <= 0 || SPI_tuptable == NULL )
331
405
{
332
- RangeEntry re ;
333
- HeapTuple tuple = tuptable -> vals [i ];
334
- bool isnull ;
335
- Datum val ;
336
- char * conbin ;
337
- Expr * expr ;
406
+ elog (WARNING , "No constraints found for relation '%s'." ,
407
+ get_rel_name (child_oid ));
408
+ return false;
409
+ }
338
410
339
- Form_pg_constraint con ;
411
+ children = dsm_array_get_pointer (& prel -> children );
412
+ if (prel -> parttype == PT_RANGE )
413
+ ranges = (RangeEntry * ) dsm_array_get_pointer (& rangerel -> ranges );
414
+
415
+ /* Iterate through check constraints and try to validate them */
416
+ for (j = 0 ; j < proc ; j ++ )
417
+ {
418
+ HeapTuple tuple = SPI_tuptable -> vals [j ];
340
419
341
420
con = (Form_pg_constraint ) GETSTRUCT (tuple );
421
+ conbin_datum = SysCacheGetAttr (CONSTROID , tuple , Anum_pg_constraint_conbin , & isnull );
342
422
343
- val = SysCacheGetAttr (CONSTROID , tuple , Anum_pg_constraint_conbin ,
344
- & isnull );
423
+ /* handle unexpected null value */
345
424
if (isnull )
346
- elog (ERROR , "null conbin for constraint %u" ,
347
- HeapTupleGetOid ( tuple ));
348
- conbin = TextDatumGetCString (val );
425
+ elog (ERROR , "null conbin for constraint %u" , HeapTupleGetOid ( tuple ));
426
+
427
+ conbin = TextDatumGetCString (conbin_datum );
349
428
expr = (Expr * ) stringToNode (conbin );
350
429
351
430
switch (prel -> parttype )
352
431
{
353
432
case PT_RANGE :
354
433
if (!validate_range_constraint (expr , prel , & min , & max ))
355
- {
356
- elog (WARNING , "Wrong CHECK constraint for relation '%s'. "
357
- "It MUST have exact format: "
358
- "VARIABLE >= CONST AND VARIABLE < CONST. Skipping..." ,
359
- get_rel_name (con -> conrelid ));
360
434
continue ;
361
- }
362
435
363
436
/* If datum is referenced by val then just assign */
364
437
if (rangerel -> by_val )
@@ -373,57 +446,74 @@ load_check_constraints(Oid parent_oid, Snapshot snapshot)
373
446
memcpy (& re .max , DatumGetPointer (max ), sizeof (re .max ));
374
447
}
375
448
re .child_oid = con -> conrelid ;
376
- ranges [i ] = re ;
449
+ ranges [idx ] = re ;
377
450
break ;
378
451
379
452
case PT_HASH :
380
453
if (!validate_hash_constraint (expr , prel , & hash ))
381
- {
382
- elog (WARNING , "Wrong CHECK constraint format for relation '%s'. "
383
- "Skipping..." ,
384
- get_rel_name (con -> conrelid ));
385
454
continue ;
386
- }
387
455
children [hash ] = con -> conrelid ;
456
+ break ;
388
457
}
458
+
459
+ /* Constraint validated successfully. Move on to the next child */
460
+ goto validate_next_child ;
389
461
}
390
- prel -> children_count = proc ;
391
462
392
- if (prel -> parttype == PT_RANGE )
463
+ /* No constraint matches pattern */
464
+ switch (prel -> parttype )
393
465
{
394
- TypeCacheEntry * tce ;
395
- bool byVal = rangerel -> by_val ;
466
+ case PT_RANGE :
467
+ elog (WARNING , "Wrong CHECK constraint for relation '%s'. "
468
+ "It MUST have exact format: "
469
+ "VARIABLE >= CONST AND VARIABLE < CONST." ,
470
+ get_rel_name (con -> conrelid ));
471
+ break ;
472
+ case PT_HASH :
473
+ elog (WARNING , "Wrong CHECK constraint format for relation '%s'." ,
474
+ get_rel_name (con -> conrelid ));
475
+ break ;
476
+ }
477
+ return false;
396
478
397
- /* Sort ascending */
398
- tce = lookup_type_cache (prel -> atttype , TYPECACHE_CMP_PROC | TYPECACHE_CMP_PROC_FINFO );
399
- qsort_type_cmp_func = & tce -> cmp_proc_finfo ;
400
- globalByVal = byVal ;
401
- qsort (ranges , proc , sizeof (RangeEntry ), cmp_range_entries );
479
+ validate_next_child :
480
+ continue ;
481
+ }
402
482
403
- /* Copy oids to prel */
404
- for (i = 0 ; i < proc ; i ++ )
405
- children [i ] = ranges [i ].child_oid ;
483
+ /*
484
+ * Sort range partitions and check for overlaps
485
+ */
486
+ if (prel -> parttype == PT_RANGE )
487
+ {
488
+ TypeCacheEntry * tce ;
489
+ bool byVal = rangerel -> by_val ;
406
490
407
- /* Check if some ranges overlap */
408
- for (i = 0 ; i < proc - 1 ; i ++ )
409
- {
410
- Datum cur_upper = PATHMAN_GET_DATUM (ranges [i ].max , byVal );
411
- Datum next_lower = PATHMAN_GET_DATUM (ranges [i + 1 ].min , byVal );
412
- bool overlap = DatumGetInt32 (FunctionCall2 (qsort_type_cmp_func , next_lower , cur_upper )) < 0 ;
491
+ /* Sort ascending */
492
+ tce = lookup_type_cache (prel -> atttype , TYPECACHE_CMP_PROC | TYPECACHE_CMP_PROC_FINFO );
493
+ qsort_type_cmp_func = & tce -> cmp_proc_finfo ;
494
+ globalByVal = byVal ;
495
+ qsort (ranges , children_count , sizeof (RangeEntry ), cmp_range_entries );
413
496
414
- if (overlap )
415
- {
416
- RelationKey key ;
417
- key .dbid = MyDatabaseId ;
418
- key .relid = parent_oid ;
497
+ /* Copy oids to prel */
498
+ for (i = 0 ; i < children_count ; i ++ )
499
+ children [i ] = ranges [i ].child_oid ;
419
500
420
- elog (WARNING , "Partitions %u and %u overlap. Disabling pathman for relation %u..." ,
421
- ranges [i ].child_oid , ranges [i + 1 ].child_oid , parent_oid );
422
- hash_search (relations , (const void * ) & key , HASH_REMOVE , & found );
423
- }
501
+ /* Check if some ranges overlap */
502
+ for (i = 0 ; i < children_count - 1 ; i ++ )
503
+ {
504
+ Datum cur_upper = PATHMAN_GET_DATUM (ranges [i ].max , byVal );
505
+ Datum next_lower = PATHMAN_GET_DATUM (ranges [i + 1 ].min , byVal );
506
+ bool overlap = DatumGetInt32 (FunctionCall2 (qsort_type_cmp_func , next_lower , cur_upper )) < 0 ;
507
+
508
+ if (overlap )
509
+ {
510
+ elog (WARNING , "Partitions %u and %u overlap." ,
511
+ ranges [i ].child_oid , ranges [i + 1 ].child_oid );
512
+ return false;
424
513
}
425
514
}
426
515
}
516
+ return true;
427
517
}
428
518
429
519
0 commit comments