Skip to content

Commit 86374c9

Browse files
committed
Split ATExecValidateConstraint into reusable pieces
With this, we have separate functions to add validation requests to ALTER TABLE's phase 3 queue for check and foreign key constraints, which allows reusing them in future commits -- particularly this will allow us to perform validation of invalid foreign key constraints in partitioned tables. We could have let the check constraint code alone since we don't need to reuse that for anything at this point, but it seems cleaner and more consistent to do both at the same time. Author: Amul Sul <sulamul@gmail.com> Discussion: https://postgr.es/m/CAAJ_b96Bp=-ZwihPPtuaNX=SrZ0U6ZsXD3+fgARO0JuKa8v2jQ@mail.gmail.com
1 parent 80feb72 commit 86374c9

File tree

1 file changed

+165
-106
lines changed

1 file changed

+165
-106
lines changed

src/backend/commands/tablecmds.c

+165-106
Original file line numberDiff line numberDiff line change
@@ -403,6 +403,11 @@ static void ATExecAlterChildConstr(Constraint *cmdcon, Relation conrel, Relation
403403
static ObjectAddress ATExecValidateConstraint(List **wqueue,
404404
Relation rel, char *constrName,
405405
bool recurse, bool recursing, LOCKMODE lockmode);
406+
static void QueueFKConstraintValidation(List **wqueue, Relation conrel, Relation rel,
407+
HeapTuple contuple, LOCKMODE lockmode);
408+
static void QueueCheckConstraintValidation(List **wqueue, Relation conrel, Relation rel,
409+
char *constrName, HeapTuple contuple,
410+
bool recurse, bool recursing, LOCKMODE lockmode);
406411
static int transformColumnNameList(Oid relId, List *colList,
407412
int16 *attnums, Oid *atttypids, Oid *attcollids);
408413
static int transformFkeyGetPrimaryKey(Relation pkrel, Oid *indexOid,
@@ -12079,136 +12084,190 @@ ATExecValidateConstraint(List **wqueue, Relation rel, char *constrName,
1207912084

1208012085
if (!con->convalidated)
1208112086
{
12082-
AlteredTableInfo *tab;
12083-
HeapTuple copyTuple;
12084-
Form_pg_constraint copy_con;
12085-
1208612087
if (con->contype == CONSTRAINT_FOREIGN)
1208712088
{
12088-
NewConstraint *newcon;
12089-
Constraint *fkconstraint;
12089+
QueueFKConstraintValidation(wqueue, conrel, rel, tuple, lockmode);
12090+
}
12091+
else if (con->contype == CONSTRAINT_CHECK)
12092+
{
12093+
QueueCheckConstraintValidation(wqueue, conrel, rel, constrName,
12094+
tuple, recurse, recursing, lockmode);
12095+
}
1209012096

12091-
/* Queue validation for phase 3 */
12092-
fkconstraint = makeNode(Constraint);
12093-
/* for now this is all we need */
12094-
fkconstraint->conname = constrName;
12097+
ObjectAddressSet(address, ConstraintRelationId, con->oid);
12098+
}
12099+
else
12100+
address = InvalidObjectAddress; /* already validated */
1209512101

12096-
newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
12097-
newcon->name = constrName;
12098-
newcon->contype = CONSTR_FOREIGN;
12099-
newcon->refrelid = con->confrelid;
12100-
newcon->refindid = con->conindid;
12101-
newcon->conid = con->oid;
12102-
newcon->qual = (Node *) fkconstraint;
12102+
systable_endscan(scan);
1210312103

12104-
/* Find or create work queue entry for this table */
12105-
tab = ATGetQueueEntry(wqueue, rel);
12106-
tab->constraints = lappend(tab->constraints, newcon);
12104+
table_close(conrel, RowExclusiveLock);
1210712105

12108-
/*
12109-
* We disallow creating invalid foreign keys to or from
12110-
* partitioned tables, so ignoring the recursion bit is okay.
12111-
*/
12112-
}
12113-
else if (con->contype == CONSTRAINT_CHECK)
12114-
{
12115-
List *children = NIL;
12116-
ListCell *child;
12117-
NewConstraint *newcon;
12118-
Datum val;
12119-
char *conbin;
12106+
return address;
12107+
}
1212012108

12121-
/*
12122-
* If we're recursing, the parent has already done this, so skip
12123-
* it. Also, if the constraint is a NO INHERIT constraint, we
12124-
* shouldn't try to look for it in the children.
12125-
*/
12126-
if (!recursing && !con->connoinherit)
12127-
children = find_all_inheritors(RelationGetRelid(rel),
12128-
lockmode, NULL);
12109+
/*
12110+
* QueueFKConstraintValidation
12111+
*
12112+
* Add an entry to the wqueue to validate the given foreign key constraint in
12113+
* Phase 3 and update the convalidated field in the pg_constraint catalog
12114+
* for the specified relation.
12115+
*/
12116+
static void
12117+
QueueFKConstraintValidation(List **wqueue, Relation conrel, Relation rel,
12118+
HeapTuple contuple, LOCKMODE lockmode)
12119+
{
12120+
Form_pg_constraint con;
12121+
AlteredTableInfo *tab;
12122+
HeapTuple copyTuple;
12123+
Form_pg_constraint copy_con;
1212912124

12130-
/*
12131-
* For CHECK constraints, we must ensure that we only mark the
12132-
* constraint as validated on the parent if it's already validated
12133-
* on the children.
12134-
*
12135-
* We recurse before validating on the parent, to reduce risk of
12136-
* deadlocks.
12137-
*/
12138-
foreach(child, children)
12139-
{
12140-
Oid childoid = lfirst_oid(child);
12141-
Relation childrel;
12125+
con = (Form_pg_constraint) GETSTRUCT(contuple);
12126+
Assert(con->contype == CONSTRAINT_FOREIGN);
1214212127

12143-
if (childoid == RelationGetRelid(rel))
12144-
continue;
12128+
if (rel->rd_rel->relkind == RELKIND_RELATION)
12129+
{
12130+
NewConstraint *newcon;
12131+
Constraint *fkconstraint;
1214512132

12146-
/*
12147-
* If we are told not to recurse, there had better not be any
12148-
* child tables, because we can't mark the constraint on the
12149-
* parent valid unless it is valid for all child tables.
12150-
*/
12151-
if (!recurse)
12152-
ereport(ERROR,
12153-
(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
12154-
errmsg("constraint must be validated on child tables too")));
12133+
/* Queue validation for phase 3 */
12134+
fkconstraint = makeNode(Constraint);
12135+
/* for now this is all we need */
12136+
fkconstraint->conname = pstrdup(NameStr(con->conname));
1215512137

12156-
/* find_all_inheritors already got lock */
12157-
childrel = table_open(childoid, NoLock);
12138+
newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
12139+
newcon->name = fkconstraint->conname;
12140+
newcon->contype = CONSTR_FOREIGN;
12141+
newcon->refrelid = con->confrelid;
12142+
newcon->refindid = con->conindid;
12143+
newcon->conid = con->oid;
12144+
newcon->qual = (Node *) fkconstraint;
1215812145

12159-
ATExecValidateConstraint(wqueue, childrel, constrName, false,
12160-
true, lockmode);
12161-
table_close(childrel, NoLock);
12162-
}
12146+
/* Find or create work queue entry for this table */
12147+
tab = ATGetQueueEntry(wqueue, rel);
12148+
tab->constraints = lappend(tab->constraints, newcon);
12149+
}
1216312150

12164-
/* Queue validation for phase 3 */
12165-
newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
12166-
newcon->name = constrName;
12167-
newcon->contype = CONSTR_CHECK;
12168-
newcon->refrelid = InvalidOid;
12169-
newcon->refindid = InvalidOid;
12170-
newcon->conid = con->oid;
12171-
12172-
val = SysCacheGetAttrNotNull(CONSTROID, tuple,
12173-
Anum_pg_constraint_conbin);
12174-
conbin = TextDatumGetCString(val);
12175-
newcon->qual = (Node *) stringToNode(conbin);
12176-
12177-
/* Find or create work queue entry for this table */
12178-
tab = ATGetQueueEntry(wqueue, rel);
12179-
tab->constraints = lappend(tab->constraints, newcon);
12151+
/*
12152+
* We disallow creating invalid foreign keys to or from partitioned
12153+
* tables, so ignoring the recursion bit is okay.
12154+
*/
1218012155

12181-
/*
12182-
* Invalidate relcache so that others see the new validated
12183-
* constraint.
12184-
*/
12185-
CacheInvalidateRelcache(rel);
12186-
}
12156+
/*
12157+
* Now update the catalog, while we have the door open.
12158+
*/
12159+
copyTuple = heap_copytuple(contuple);
12160+
copy_con = (Form_pg_constraint) GETSTRUCT(copyTuple);
12161+
copy_con->convalidated = true;
12162+
CatalogTupleUpdate(conrel, &copyTuple->t_self, copyTuple);
12163+
12164+
InvokeObjectPostAlterHook(ConstraintRelationId, con->oid, 0);
12165+
12166+
heap_freetuple(copyTuple);
12167+
}
12168+
12169+
/*
12170+
* QueueCheckConstraintValidation
12171+
*
12172+
* Add an entry to the wqueue to validate the given check constraint in Phase 3
12173+
* and update the convalidated field in the pg_constraint catalog for the
12174+
* specified relation and all its inheriting children.
12175+
*/
12176+
static void
12177+
QueueCheckConstraintValidation(List **wqueue, Relation conrel, Relation rel,
12178+
char *constrName, HeapTuple contuple,
12179+
bool recurse, bool recursing, LOCKMODE lockmode)
12180+
{
12181+
Form_pg_constraint con;
12182+
AlteredTableInfo *tab;
12183+
HeapTuple copyTuple;
12184+
Form_pg_constraint copy_con;
12185+
12186+
List *children = NIL;
12187+
ListCell *child;
12188+
NewConstraint *newcon;
12189+
Datum val;
12190+
char *conbin;
12191+
12192+
con = (Form_pg_constraint) GETSTRUCT(contuple);
12193+
Assert(con->contype == CONSTRAINT_CHECK);
12194+
12195+
/*
12196+
* If we're recursing, the parent has already done this, so skip it. Also,
12197+
* if the constraint is a NO INHERIT constraint, we shouldn't try to look
12198+
* for it in the children.
12199+
*/
12200+
if (!recursing && !con->connoinherit)
12201+
children = find_all_inheritors(RelationGetRelid(rel),
12202+
lockmode, NULL);
12203+
12204+
/*
12205+
* For CHECK constraints, we must ensure that we only mark the constraint
12206+
* as validated on the parent if it's already validated on the children.
12207+
*
12208+
* We recurse before validating on the parent, to reduce risk of
12209+
* deadlocks.
12210+
*/
12211+
foreach(child, children)
12212+
{
12213+
Oid childoid = lfirst_oid(child);
12214+
Relation childrel;
12215+
12216+
if (childoid == RelationGetRelid(rel))
12217+
continue;
1218712218

1218812219
/*
12189-
* Now update the catalog, while we have the door open.
12220+
* If we are told not to recurse, there had better not be any child
12221+
* tables, because we can't mark the constraint on the parent valid
12222+
* unless it is valid for all child tables.
1219012223
*/
12191-
copyTuple = heap_copytuple(tuple);
12192-
copy_con = (Form_pg_constraint) GETSTRUCT(copyTuple);
12193-
copy_con->convalidated = true;
12194-
CatalogTupleUpdate(conrel, &copyTuple->t_self, copyTuple);
12195-
12196-
InvokeObjectPostAlterHook(ConstraintRelationId, con->oid, 0);
12224+
if (!recurse)
12225+
ereport(ERROR,
12226+
(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
12227+
errmsg("constraint must be validated on child tables too")));
1219712228

12198-
heap_freetuple(copyTuple);
12229+
/* find_all_inheritors already got lock */
12230+
childrel = table_open(childoid, NoLock);
1219912231

12200-
ObjectAddressSet(address, ConstraintRelationId, con->oid);
12232+
ATExecValidateConstraint(wqueue, childrel, constrName, false,
12233+
true, lockmode);
12234+
table_close(childrel, NoLock);
1220112235
}
12202-
else
12203-
address = InvalidObjectAddress; /* already validated */
1220412236

12205-
systable_endscan(scan);
12237+
/* Queue validation for phase 3 */
12238+
newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
12239+
newcon->name = constrName;
12240+
newcon->contype = CONSTR_CHECK;
12241+
newcon->refrelid = InvalidOid;
12242+
newcon->refindid = InvalidOid;
12243+
newcon->conid = con->oid;
1220612244

12207-
table_close(conrel, RowExclusiveLock);
12245+
val = SysCacheGetAttrNotNull(CONSTROID, contuple,
12246+
Anum_pg_constraint_conbin);
12247+
conbin = TextDatumGetCString(val);
12248+
newcon->qual = (Node *) stringToNode(conbin);
1220812249

12209-
return address;
12210-
}
12250+
/* Find or create work queue entry for this table */
12251+
tab = ATGetQueueEntry(wqueue, rel);
12252+
tab->constraints = lappend(tab->constraints, newcon);
1221112253

12254+
/*
12255+
* Invalidate relcache so that others see the new validated constraint.
12256+
*/
12257+
CacheInvalidateRelcache(rel);
12258+
12259+
/*
12260+
* Now update the catalog, while we have the door open.
12261+
*/
12262+
copyTuple = heap_copytuple(contuple);
12263+
copy_con = (Form_pg_constraint) GETSTRUCT(copyTuple);
12264+
copy_con->convalidated = true;
12265+
CatalogTupleUpdate(conrel, &copyTuple->t_self, copyTuple);
12266+
12267+
InvokeObjectPostAlterHook(ConstraintRelationId, con->oid, 0);
12268+
12269+
heap_freetuple(copyTuple);
12270+
}
1221212271

1221312272
/*
1221412273
* transformColumnNameList - transform list of column names

0 commit comments

Comments
 (0)