@@ -288,8 +288,9 @@ static void AlterSeqNamespaces(Relation classRel, Relation rel,
288
288
LOCKMODE lockmode );
289
289
static ObjectAddress ATExecAlterConstraint (Relation rel , AlterTableCmd * cmd ,
290
290
bool recurse , bool recursing , LOCKMODE lockmode );
291
- static ObjectAddress ATExecValidateConstraint (Relation rel , char * constrName ,
292
- bool recurse , bool recursing , LOCKMODE lockmode );
291
+ static ObjectAddress ATExecValidateConstraint (List * * wqueue , Relation rel ,
292
+ char * constrName , bool recurse , bool recursing ,
293
+ LOCKMODE lockmode );
293
294
static int transformColumnNameList (Oid relId , List * colList ,
294
295
int16 * attnums , Oid * atttypids );
295
296
static int transformFkeyGetPrimaryKey (Relation pkrel , Oid * indexOid ,
@@ -302,7 +303,6 @@ static Oid transformFkeyCheckAttrs(Relation pkrel,
302
303
static void checkFkeyPermissions (Relation rel , int16 * attnums , int natts );
303
304
static CoercionPathType findFkeyCast (Oid targetTypeId , Oid sourceTypeId ,
304
305
Oid * funcid );
305
- static void validateCheckConstraint (Relation rel , HeapTuple constrtup );
306
306
static void validateForeignKeyConstraint (char * conname ,
307
307
Relation rel , Relation pkrel ,
308
308
Oid pkindOid , Oid constraintOid );
@@ -3582,13 +3582,13 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
3582
3582
address = ATExecAlterConstraint (rel , cmd , false, false, lockmode );
3583
3583
break ;
3584
3584
case AT_ValidateConstraint : /* VALIDATE CONSTRAINT */
3585
- address = ATExecValidateConstraint (rel , cmd -> name , false , false,
3586
- lockmode );
3585
+ address = ATExecValidateConstraint (wqueue , rel , cmd -> name , false,
3586
+ false, lockmode );
3587
3587
break ;
3588
3588
case AT_ValidateConstraintRecurse : /* VALIDATE CONSTRAINT with
3589
3589
* recursion */
3590
- address = ATExecValidateConstraint (rel , cmd -> name , true, false ,
3591
- lockmode );
3590
+ address = ATExecValidateConstraint (wqueue , rel , cmd -> name , true,
3591
+ false, lockmode );
3592
3592
break ;
3593
3593
case AT_DropConstraint : /* DROP CONSTRAINT */
3594
3594
ATExecDropConstraint (rel , cmd -> name , cmd -> behavior ,
@@ -6851,8 +6851,8 @@ ATExecAlterConstraint(Relation rel, AlterTableCmd *cmd,
6851
6851
* was already validated, InvalidObjectAddress is returned.
6852
6852
*/
6853
6853
static ObjectAddress
6854
- ATExecValidateConstraint (Relation rel , char * constrName , bool recurse ,
6855
- bool recursing , LOCKMODE lockmode )
6854
+ ATExecValidateConstraint (List * * wqueue , Relation rel , char * constrName ,
6855
+ bool recurse , bool recursing , LOCKMODE lockmode )
6856
6856
{
6857
6857
Relation conrel ;
6858
6858
SysScanDesc scan ;
@@ -6899,27 +6899,31 @@ ATExecValidateConstraint(Relation rel, char *constrName, bool recurse,
6899
6899
6900
6900
if (!con -> convalidated )
6901
6901
{
6902
+ AlteredTableInfo * tab ;
6902
6903
HeapTuple copyTuple ;
6903
6904
Form_pg_constraint copy_con ;
6904
6905
6905
6906
if (con -> contype == CONSTRAINT_FOREIGN )
6906
6907
{
6907
- Relation refrel ;
6908
+ NewConstraint * newcon ;
6909
+ Constraint * fkconstraint ;
6908
6910
6909
- /*
6910
- * Triggers are already in place on both tables, so a concurrent
6911
- * write that alters the result here is not possible. Normally we
6912
- * can run a query here to do the validation, which would only
6913
- * require AccessShareLock. In some cases, it is possible that we
6914
- * might need to fire triggers to perform the check, so we take a
6915
- * lock at RowShareLock level just in case.
6916
- */
6917
- refrel = heap_open (con -> confrelid , RowShareLock );
6911
+ /* Queue validation for phase 3 */
6912
+ fkconstraint = makeNode (Constraint );
6913
+ /* for now this is all we need */
6914
+ fkconstraint -> conname = constrName ;
6918
6915
6919
- validateForeignKeyConstraint (constrName , rel , refrel ,
6920
- con -> conindid ,
6921
- HeapTupleGetOid (tuple ));
6922
- heap_close (refrel , NoLock );
6916
+ newcon = (NewConstraint * ) palloc0 (sizeof (NewConstraint ));
6917
+ newcon -> name = constrName ;
6918
+ newcon -> contype = CONSTR_FOREIGN ;
6919
+ newcon -> refrelid = con -> confrelid ;
6920
+ newcon -> refindid = con -> conindid ;
6921
+ newcon -> conid = HeapTupleGetOid (tuple );
6922
+ newcon -> qual = (Node * ) fkconstraint ;
6923
+
6924
+ /* Find or create work queue entry for this table */
6925
+ tab = ATGetQueueEntry (wqueue , rel );
6926
+ tab -> constraints = lappend (tab -> constraints , newcon );
6923
6927
6924
6928
/*
6925
6929
* Foreign keys do not inherit, so we purposely ignore the
@@ -6930,6 +6934,10 @@ ATExecValidateConstraint(Relation rel, char *constrName, bool recurse,
6930
6934
{
6931
6935
List * children = NIL ;
6932
6936
ListCell * child ;
6937
+ NewConstraint * newcon ;
6938
+ bool isnull ;
6939
+ Datum val ;
6940
+ char * conbin ;
6933
6941
6934
6942
/*
6935
6943
* If we're recursing, the parent has already done this, so skip
@@ -6968,12 +6976,31 @@ ATExecValidateConstraint(Relation rel, char *constrName, bool recurse,
6968
6976
/* find_all_inheritors already got lock */
6969
6977
childrel = heap_open (childoid , NoLock );
6970
6978
6971
- ATExecValidateConstraint (childrel , constrName , false,
6979
+ ATExecValidateConstraint (wqueue , childrel , constrName , false,
6972
6980
true, lockmode );
6973
6981
heap_close (childrel , NoLock );
6974
6982
}
6975
6983
6976
- validateCheckConstraint (rel , tuple );
6984
+ /* Queue validation for phase 3 */
6985
+ newcon = (NewConstraint * ) palloc0 (sizeof (NewConstraint ));
6986
+ newcon -> name = constrName ;
6987
+ newcon -> contype = CONSTR_CHECK ;
6988
+ newcon -> refrelid = InvalidOid ;
6989
+ newcon -> refindid = InvalidOid ;
6990
+ newcon -> conid = HeapTupleGetOid (tuple );
6991
+
6992
+ val = SysCacheGetAttr (CONSTROID , tuple ,
6993
+ Anum_pg_constraint_conbin , & isnull );
6994
+ if (isnull )
6995
+ elog (ERROR , "null conbin for constraint %u" ,
6996
+ HeapTupleGetOid (tuple ));
6997
+
6998
+ conbin = TextDatumGetCString (val );
6999
+ newcon -> qual = (Node * ) make_ands_implicit ((Expr * ) stringToNode (conbin ));
7000
+
7001
+ /* Find or create work queue entry for this table */
7002
+ tab = ATGetQueueEntry (wqueue , rel );
7003
+ tab -> constraints = lappend (tab -> constraints , newcon );
6977
7004
6978
7005
/*
6979
7006
* Invalidate relcache so that others see the new validated
@@ -7345,88 +7372,6 @@ checkFkeyPermissions(Relation rel, int16 *attnums, int natts)
7345
7372
}
7346
7373
}
7347
7374
7348
- /*
7349
- * Scan the existing rows in a table to verify they meet a proposed
7350
- * CHECK constraint.
7351
- *
7352
- * The caller must have opened and locked the relation appropriately.
7353
- */
7354
- static void
7355
- validateCheckConstraint (Relation rel , HeapTuple constrtup )
7356
- {
7357
- EState * estate ;
7358
- Datum val ;
7359
- char * conbin ;
7360
- Expr * origexpr ;
7361
- List * exprstate ;
7362
- TupleDesc tupdesc ;
7363
- HeapScanDesc scan ;
7364
- HeapTuple tuple ;
7365
- ExprContext * econtext ;
7366
- MemoryContext oldcxt ;
7367
- TupleTableSlot * slot ;
7368
- Form_pg_constraint constrForm ;
7369
- bool isnull ;
7370
- Snapshot snapshot ;
7371
-
7372
- /* VALIDATE CONSTRAINT is a no-op for foreign tables */
7373
- if (rel -> rd_rel -> relkind == RELKIND_FOREIGN_TABLE )
7374
- return ;
7375
-
7376
- constrForm = (Form_pg_constraint ) GETSTRUCT (constrtup );
7377
-
7378
- estate = CreateExecutorState ();
7379
-
7380
- /*
7381
- * XXX this tuple doesn't really come from a syscache, but this doesn't
7382
- * matter to SysCacheGetAttr, because it only wants to be able to fetch
7383
- * the tupdesc
7384
- */
7385
- val = SysCacheGetAttr (CONSTROID , constrtup , Anum_pg_constraint_conbin ,
7386
- & isnull );
7387
- if (isnull )
7388
- elog (ERROR , "null conbin for constraint %u" ,
7389
- HeapTupleGetOid (constrtup ));
7390
- conbin = TextDatumGetCString (val );
7391
- origexpr = (Expr * ) stringToNode (conbin );
7392
- exprstate = (List * )
7393
- ExecPrepareExpr ((Expr * ) make_ands_implicit (origexpr ), estate );
7394
-
7395
- econtext = GetPerTupleExprContext (estate );
7396
- tupdesc = RelationGetDescr (rel );
7397
- slot = MakeSingleTupleTableSlot (tupdesc );
7398
- econtext -> ecxt_scantuple = slot ;
7399
-
7400
- snapshot = RegisterSnapshot (GetLatestSnapshot ());
7401
- scan = heap_beginscan (rel , snapshot , 0 , NULL );
7402
-
7403
- /*
7404
- * Switch to per-tuple memory context and reset it for each tuple
7405
- * produced, so we don't leak memory.
7406
- */
7407
- oldcxt = MemoryContextSwitchTo (GetPerTupleMemoryContext (estate ));
7408
-
7409
- while ((tuple = heap_getnext (scan , ForwardScanDirection )) != NULL )
7410
- {
7411
- ExecStoreTuple (tuple , slot , InvalidBuffer , false);
7412
-
7413
- if (!ExecQual (exprstate , econtext , true))
7414
- ereport (ERROR ,
7415
- (errcode (ERRCODE_CHECK_VIOLATION ),
7416
- errmsg ("check constraint \"%s\" is violated by some row" ,
7417
- NameStr (constrForm -> conname )),
7418
- errtableconstraint (rel , NameStr (constrForm -> conname ))));
7419
-
7420
- ResetExprContext (econtext );
7421
- }
7422
-
7423
- MemoryContextSwitchTo (oldcxt );
7424
- heap_endscan (scan );
7425
- UnregisterSnapshot (snapshot );
7426
- ExecDropSingleTupleTableSlot (slot );
7427
- FreeExecutorState (estate );
7428
- }
7429
-
7430
7375
/*
7431
7376
* Scan the existing rows in a table to verify they meet a proposed FK
7432
7377
* constraint.
0 commit comments