Skip to content

Commit 21f8b18

Browse files
committed
Fix handling of CREATE TABLE LIKE with inheritance.
If a CREATE TABLE command uses both LIKE and traditional inheritance, Vars in CHECK constraints and expression indexes that are absorbed from a LIKE parent table tended to get mis-numbered, resulting in wrong answers and/or bizarre error messages (though probably not any actual crashes, thanks to validation occurring in the executor). In v12 and up, the same could happen to Vars in GENERATED expressions, even in cases with no LIKE clause but multiple traditional-inheritance parents. The cause of the problem for LIKE is that parse_utilcmd.c supposed it could renumber such Vars correctly during transformCreateStmt(), which it cannot since we have not yet accounted for columns added via inheritance. Fix that by postponing processing of LIKE INCLUDING CONSTRAINTS, DEFAULTS, GENERATED, INDEXES till after we've performed DefineRelation(). The error with GENERATED and multiple inheritance is a simple oversight in MergeAttributes(); it knows it has to renumber Vars in inherited CHECK constraints, but forgot to apply the same processing to inherited GENERATED expressions (a/k/a defaults). Per bug #16272 from Tom Gottfried. The non-GENERATED variants of the issue are ancient, presumably dating right back to the addition of CREATE TABLE LIKE; hence back-patch to all supported branches. Discussion: https://postgr.es/m/16272-6e32da020e9a9381@postgresql.org
1 parent aa4da26 commit 21f8b18

File tree

5 files changed

+209
-55
lines changed

5 files changed

+209
-55
lines changed

src/backend/parser/parse_utilcmd.c

Lines changed: 120 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828

2929
#include "access/htup_details.h"
3030
#include "access/reloptions.h"
31+
#include "access/tupconvert.h"
3132
#include "catalog/dependency.h"
3233
#include "catalog/heap.h"
3334
#include "catalog/index.h"
@@ -77,7 +78,6 @@ typedef struct
7778
List *ckconstraints; /* CHECK constraints */
7879
List *fkconstraints; /* FOREIGN KEY constraints */
7980
List *ixconstraints; /* index-creating constraints */
80-
List *inh_indexes; /* cloned indexes from INCLUDING INDEXES */
8181
List *blist; /* "before list" of things to do before
8282
* creating the table */
8383
List *alist; /* "after list" of things to do after creating
@@ -108,7 +108,7 @@ static void transformTableLikeClause(CreateStmtContext *cxt,
108108
TableLikeClause *table_like_clause);
109109
static void transformOfType(CreateStmtContext *cxt,
110110
TypeName *ofTypename);
111-
static IndexStmt *generateClonedIndexStmt(CreateStmtContext *cxt,
111+
static IndexStmt *generateClonedIndexStmt(RangeVar *heapRel,
112112
Relation source_idx,
113113
const AttrNumber *attmap, int attmap_length);
114114
static List *get_collation(Oid collation, Oid actual_datatype);
@@ -132,6 +132,9 @@ static void setSchemaName(char *context_schema, char **stmt_schema_name);
132132
* Returns a List of utility commands to be done in sequence. One of these
133133
* will be the transformed CreateStmt, but there may be additional actions
134134
* to be done before and after the actual DefineRelation() call.
135+
* In addition to normal utility commands such as AlterTableStmt and
136+
* IndexStmt, the result list may contain TableLikeClause(s), representing
137+
* the need to perform additional parse analysis after DefineRelation().
135138
*
136139
* SQL allows constraints to be scattered all over, so thumb through
137140
* the columns and collect all constraints into one place.
@@ -218,7 +221,6 @@ transformCreateStmt(CreateStmt *stmt, const char *queryString)
218221
cxt.ckconstraints = NIL;
219222
cxt.fkconstraints = NIL;
220223
cxt.ixconstraints = NIL;
221-
cxt.inh_indexes = NIL;
222224
cxt.blist = NIL;
223225
cxt.alist = NIL;
224226
cxt.pkey = NULL;
@@ -691,8 +693,11 @@ transformTableConstraint(CreateStmtContext *cxt, Constraint *constraint)
691693
* transformTableLikeClause
692694
*
693695
* Change the LIKE <srctable> portion of a CREATE TABLE statement into
694-
* column definitions which recreate the user defined column portions of
695-
* <srctable>.
696+
* column definitions that recreate the user defined column portions of
697+
* <srctable>. Also, if there are any LIKE options that we can't fully
698+
* process at this point, add the TableLikeClause to cxt->alist, which
699+
* will cause utility.c to call expandTableLikeClause() after the new
700+
* table has been created.
696701
*/
697702
static void
698703
transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_clause)
@@ -701,7 +706,6 @@ transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_cla
701706
Relation relation;
702707
TupleDesc tupleDesc;
703708
TupleConstr *constr;
704-
AttrNumber *attmap;
705709
AclResult aclresult;
706710
char *comment;
707711
ParseCallbackState pcbstate;
@@ -715,6 +719,7 @@ transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_cla
715719
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
716720
errmsg("LIKE is not supported for creating foreign tables")));
717721

722+
/* Open the relation referenced by the LIKE clause */
718723
relation = relation_openrv(table_like_clause->relation, AccessShareLock);
719724

720725
if (relation->rd_rel->relkind != RELKIND_RELATION &&
@@ -752,15 +757,10 @@ transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_cla
752757
tupleDesc = RelationGetDescr(relation);
753758
constr = tupleDesc->constr;
754759

755-
/*
756-
* Initialize column number map for map_variable_attnos(). We need this
757-
* since dropped columns in the source table aren't copied, so the new
758-
* table can have different column numbers.
759-
*/
760-
attmap = (AttrNumber *) palloc0(sizeof(AttrNumber) * tupleDesc->natts);
761-
762760
/*
763761
* Insert the copied attributes into the cxt for the new table definition.
762+
* We must do this now so that they appear in the table in the relative
763+
* position where the LIKE clause is, as required by SQL99.
764764
*/
765765
for (parent_attno = 1; parent_attno <= tupleDesc->natts;
766766
parent_attno++)
@@ -770,7 +770,7 @@ transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_cla
770770
ColumnDef *def;
771771

772772
/*
773-
* Ignore dropped columns in the parent. attmap entry is left zero.
773+
* Ignore dropped columns in the parent.
774774
*/
775775
if (attribute->attisdropped)
776776
continue;
@@ -802,8 +802,6 @@ transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_cla
802802
*/
803803
cxt->columns = lappend(cxt->columns, def);
804804

805-
attmap[parent_attno - 1] = list_length(cxt->columns);
806-
807805
/*
808806
* Copy default, if present and the default has been requested
809807
*/
@@ -860,22 +858,88 @@ transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_cla
860858
}
861859
}
862860

861+
/*
862+
* We cannot yet deal with CHECK constraints or indexes, since we don't
863+
* yet know what column numbers the copied columns will have in the
864+
* finished table. If any of those options are specified, add the LIKE
865+
* clause to cxt->alist so that expandTableLikeClause will be called after
866+
* we do know that.
867+
*/
868+
if (table_like_clause->options &
869+
(CREATE_TABLE_LIKE_CONSTRAINTS |
870+
CREATE_TABLE_LIKE_INDEXES))
871+
cxt->alist = lappend(cxt->alist, table_like_clause);
872+
873+
/*
874+
* Close the parent rel, but keep our AccessShareLock on it until xact
875+
* commit. That will prevent someone else from deleting or ALTERing the
876+
* parent before we can run expandTableLikeClause.
877+
*/
878+
heap_close(relation, NoLock);
879+
}
880+
881+
/*
882+
* expandTableLikeClause
883+
*
884+
* Process LIKE options that require knowing the final column numbers
885+
* assigned to the new table's columns. This executes after we have
886+
* run DefineRelation for the new table. It returns a list of utility
887+
* commands that should be run to generate indexes etc.
888+
*/
889+
List *
890+
expandTableLikeClause(RangeVar *heapRel, TableLikeClause *table_like_clause)
891+
{
892+
List *result = NIL;
893+
List *atsubcmds = NIL;
894+
Relation relation;
895+
Relation childrel;
896+
TupleDesc tupleDesc;
897+
TupleConstr *constr;
898+
AttrNumber *attmap;
899+
char *comment;
900+
901+
/*
902+
* Open the relation referenced by the LIKE clause. We should still have
903+
* the table lock obtained by transformTableLikeClause (and this'll throw
904+
* an assertion failure if not). Hence, no need to recheck privileges
905+
* etc.
906+
*/
907+
relation = relation_openrv(table_like_clause->relation, NoLock);
908+
909+
tupleDesc = RelationGetDescr(relation);
910+
constr = tupleDesc->constr;
911+
912+
/*
913+
* Open the newly-created child relation; we have lock on that too.
914+
*/
915+
childrel = relation_openrv(heapRel, NoLock);
916+
917+
/*
918+
* Construct a map from the LIKE relation's attnos to the child rel's.
919+
* This re-checks type match etc, although it shouldn't be possible to
920+
* have a failure since both tables are locked.
921+
*/
922+
attmap = convert_tuples_by_name_map(RelationGetDescr(childrel),
923+
tupleDesc,
924+
gettext_noop("could not convert row type"));
925+
863926
/*
864927
* Copy CHECK constraints if requested, being careful to adjust attribute
865928
* numbers so they match the child.
866929
*/
867930
if ((table_like_clause->options & CREATE_TABLE_LIKE_CONSTRAINTS) &&
868-
tupleDesc->constr)
931+
constr != NULL)
869932
{
870933
int ccnum;
871934

872-
for (ccnum = 0; ccnum < tupleDesc->constr->num_check; ccnum++)
935+
for (ccnum = 0; ccnum < constr->num_check; ccnum++)
873936
{
874-
char *ccname = tupleDesc->constr->check[ccnum].ccname;
875-
char *ccbin = tupleDesc->constr->check[ccnum].ccbin;
876-
Constraint *n = makeNode(Constraint);
937+
char *ccname = constr->check[ccnum].ccname;
938+
char *ccbin = constr->check[ccnum].ccbin;
877939
Node *ccbin_node;
878940
bool found_whole_row;
941+
Constraint *n;
942+
AlterTableCmd *atsubcmd;
879943

880944
ccbin_node = map_variable_attnos(stringToNode(ccbin),
881945
1, 0,
@@ -896,12 +960,17 @@ transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_cla
896960
ccname,
897961
RelationGetRelationName(relation))));
898962

963+
n = makeNode(Constraint);
899964
n->contype = CONSTR_CHECK;
900965
n->location = -1;
901966
n->conname = pstrdup(ccname);
902967
n->raw_expr = NULL;
903968
n->cooked_expr = nodeToString(ccbin_node);
904-
cxt->ckconstraints = lappend(cxt->ckconstraints, n);
969+
970+
atsubcmd = makeNode(AlterTableCmd);
971+
atsubcmd->subtype = AT_AddConstraint;
972+
atsubcmd->def = (Node *) n;
973+
atsubcmds = lappend(atsubcmds, atsubcmd);
905974

906975
/* Copy comment on constraint */
907976
if ((table_like_clause->options & CREATE_TABLE_LIKE_COMMENTS) &&
@@ -913,19 +982,35 @@ transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_cla
913982
CommentStmt *stmt = makeNode(CommentStmt);
914983

915984
stmt->objtype = OBJECT_TABCONSTRAINT;
916-
stmt->objname = list_make3(makeString(cxt->relation->schemaname),
917-
makeString(cxt->relation->relname),
985+
stmt->objname = list_make3(makeString(heapRel->schemaname),
986+
makeString(heapRel->relname),
918987
makeString(n->conname));
919988
stmt->objargs = NIL;
920989
stmt->comment = comment;
921990

922-
cxt->alist = lappend(cxt->alist, stmt);
991+
result = lappend(result, stmt);
923992
}
924993
}
925994
}
926995

927996
/*
928-
* Likewise, copy indexes if requested
997+
* If we generated any ALTER TABLE actions above, wrap them into a single
998+
* ALTER TABLE command. Stick it at the front of the result, so it runs
999+
* before any CommentStmts we made above.
1000+
*/
1001+
if (atsubcmds)
1002+
{
1003+
AlterTableStmt *atcmd = makeNode(AlterTableStmt);
1004+
1005+
atcmd->relation = copyObject(heapRel);
1006+
atcmd->cmds = atsubcmds;
1007+
atcmd->relkind = OBJECT_TABLE;
1008+
atcmd->missing_ok = false;
1009+
result = lcons(atcmd, result);
1010+
}
1011+
1012+
/*
1013+
* Process indexes if required.
9291014
*/
9301015
if ((table_like_clause->options & CREATE_TABLE_LIKE_INDEXES) &&
9311016
relation->rd_rel->relhasindex)
@@ -944,7 +1029,7 @@ transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_cla
9441029
parent_index = index_open(parent_index_oid, AccessShareLock);
9451030

9461031
/* Build CREATE INDEX statement to recreate the parent_index */
947-
index_stmt = generateClonedIndexStmt(cxt, parent_index,
1032+
index_stmt = generateClonedIndexStmt(heapRel, parent_index,
9481033
attmap, tupleDesc->natts);
9491034

9501035
/* Copy comment on index, if requested */
@@ -959,19 +1044,23 @@ transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_cla
9591044
index_stmt->idxcomment = comment;
9601045
}
9611046

962-
/* Save it in the inh_indexes list for the time being */
963-
cxt->inh_indexes = lappend(cxt->inh_indexes, index_stmt);
1047+
result = lappend(result, index_stmt);
9641048

9651049
index_close(parent_index, AccessShareLock);
9661050
}
9671051
}
9681052

1053+
/* Done with child rel */
1054+
heap_close(childrel, NoLock);
1055+
9691056
/*
9701057
* Close the parent rel, but keep our AccessShareLock on it until xact
9711058
* commit. That will prevent someone else from deleting or ALTERing the
9721059
* parent before the child is committed.
9731060
*/
9741061
heap_close(relation, NoLock);
1062+
1063+
return result;
9751064
}
9761065

9771066
static void
@@ -1024,7 +1113,7 @@ transformOfType(CreateStmtContext *cxt, TypeName *ofTypename)
10241113
* "source_idx". Attribute numbers should be adjusted according to attmap.
10251114
*/
10261115
static IndexStmt *
1027-
generateClonedIndexStmt(CreateStmtContext *cxt, Relation source_idx,
1116+
generateClonedIndexStmt(RangeVar *heapRel, Relation source_idx,
10281117
const AttrNumber *attmap, int attmap_length)
10291118
{
10301119
Oid source_relid = RelationGetRelid(source_idx);
@@ -1076,7 +1165,7 @@ generateClonedIndexStmt(CreateStmtContext *cxt, Relation source_idx,
10761165

10771166
/* Begin building the IndexStmt */
10781167
index = makeNode(IndexStmt);
1079-
index->relation = cxt->relation;
1168+
index->relation = heapRel;
10801169
index->accessMethod = pstrdup(NameStr(amrec->amname));
10811170
if (OidIsValid(idxrelrec->reltablespace))
10821171
index->tableSpace = get_tablespace_name(idxrelrec->reltablespace);
@@ -1425,24 +1514,6 @@ transformIndexConstraints(CreateStmtContext *cxt)
14251514
indexlist = lappend(indexlist, index);
14261515
}
14271516

1428-
/* Add in any indexes defined by LIKE ... INCLUDING INDEXES */
1429-
foreach(lc, cxt->inh_indexes)
1430-
{
1431-
index = (IndexStmt *) lfirst(lc);
1432-
1433-
if (index->primary)
1434-
{
1435-
if (cxt->pkey != NULL)
1436-
ereport(ERROR,
1437-
(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
1438-
errmsg("multiple primary keys for table \"%s\" are not allowed",
1439-
cxt->relation->relname)));
1440-
cxt->pkey = index;
1441-
}
1442-
1443-
indexlist = lappend(indexlist, index);
1444-
}
1445-
14461517
/*
14471518
* Scan the index list and remove any redundant index specifications. This
14481519
* can happen if, for instance, the user writes UNIQUE PRIMARY KEY. A
@@ -2429,7 +2500,6 @@ transformAlterTableStmt(Oid relid, AlterTableStmt *stmt,
24292500
cxt.ckconstraints = NIL;
24302501
cxt.fkconstraints = NIL;
24312502
cxt.ixconstraints = NIL;
2432-
cxt.inh_indexes = NIL;
24332503
cxt.blist = NIL;
24342504
cxt.alist = NIL;
24352505
cxt.pkey = NULL;

0 commit comments

Comments
 (0)