Skip to content

Commit 7dba57d

Browse files
knizhnikkelvich
authored andcommitted
Add ALTER INDEX WHERE construction
1 parent 036d06d commit 7dba57d

File tree

12 files changed

+314
-83
lines changed

12 files changed

+314
-83
lines changed

src/backend/commands/indexcmds.c

Lines changed: 53 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
#include "commands/tablespace.h"
3535
#include "mb/pg_wchar.h"
3636
#include "miscadmin.h"
37+
#include "funcapi.h"
3738
#include "nodes/nodeFuncs.h"
3839
#include "optimizer/clauses.h"
3940
#include "optimizer/planner.h"
@@ -52,6 +53,9 @@
5253
#include "utils/snapmgr.h"
5354
#include "utils/syscache.h"
5455
#include "utils/tqual.h"
56+
#include "utils/ruleutils.h"
57+
#include "executor/executor.h"
58+
#include "executor/spi.h"
5559

5660

5761
/* non-export function prototypes */
@@ -280,33 +284,40 @@ CheckIndexCompatible(Oid oldId,
280284
return ret;
281285
}
282286

283-
#if 0
284287
void
285-
AlterIndex(Oid relationId, IndexStmt *stmt, Oid indexRelationId)
288+
AlterIndex(Oid indexRelationId, IndexStmt *stmt)
286289
{
287290
char* select;
291+
Oid heapRelationId;
288292
IndexUniqueCheck checkUnique;
289-
bool satisfiesConstraint;
290293
Datum values[INDEX_MAX_KEYS];
291294
bool isnull[INDEX_MAX_KEYS];
292295
Relation heapRelation;
293296
Relation indexRelation;
294297
SPIPlanPtr plan;
295298
Portal portal;
296299
HeapTuple tuple;
297-
TupleDesc tupdesc;
300+
HeapTuple updatedTuple;
298301
TupleTableSlot *slot;
299302
ItemPointer tupleid;
300303
IndexInfo *indexInfo;
301304
EState *estate;
305+
Oid namespaceId;
306+
Relation pg_index;
307+
List* deparseCtx;
302308

303309
Assert(stmt->whereClause);
310+
CheckPredicate((Expr *) stmt->whereClause);
311+
312+
/* Open the target index relation */
313+
indexRelation = index_open(indexRelationId, RowExclusiveLock);
314+
namespaceId = RelationGetNamespace(indexRelation);
304315

305316
/* Open and lock the parent heap relation */
306-
heapRelation = heap_openrv(stmt->relation, ShareUpdateExclusiveLock);
317+
heapRelationId = IndexGetRelation(indexRelationId, false);
318+
heapRelation = heap_open(heapRelationId, ShareLock);
307319

308-
/* And the target index relation */
309-
indexRelation = index_open(indexRelationId, RowExclusiveLock);
320+
pg_index = heap_open(IndexRelationId, RowExclusiveLock);
310321

311322
indexInfo = BuildIndexInfo(indexRelation);
312323
Assert(indexInfo->ii_Predicate);
@@ -319,58 +330,40 @@ AlterIndex(Oid relationId, IndexStmt *stmt, Oid indexRelationId)
319330

320331
checkUnique = indexRelation->rd_index->indisunique ? UNIQUE_CHECK_YES : UNIQUE_CHECK_NO;
321332

322-
SPI_connect();
323-
select = psprintf("select * from %s where %s and not (%s)",
324-
quote_qualified_identifier(get_namespace_name(RelationGetNamespace(heapRelation)),
325-
get_rel_name(relationId)),
326-
nodeToString(indexInfo->ii_Predicate),
327-
nodeToString(stmt->whereClause)
328-
);
329-
plan = SPI_parepare(select, 0, NULL);
330-
if (plan == NULL) {
331-
ereport(ERROR,
332-
(errcode(ERRCODE_INVALID_CURSOR_STATE),
333-
errmsg("Failed to preapre statement ", select)));
334-
}
335-
portal = SPI_cursor_open(NULL, plan, NULL, NULL, true);
336-
if (portal == NULL) {
337-
ereport(ERROR,
338-
(errcode(ERRCODE_INVALID_CURSOR_STATE),
339-
errmsg("Failed to open cursor for ", select)));
340-
}
341-
while (true)
342-
{
343-
SPI_cursor_fetch(portal, true, 1);
344-
if (!SPI_processed) {
345-
break;
346-
}
347-
tuple = SPI_tuptable->vals[0];
348-
tupdesc = SPI_tuptable->tupdesc;
349-
slot = TupleDescGetSlot(tupdesc);
350-
tupleid = &tuple->t_datat->t_ctid;
333+
/* Update pg_index tuple */
334+
tuple = SearchSysCacheCopy1(INDEXRELID, ObjectIdGetDatum(indexRelationId));
335+
if (!HeapTupleIsValid(tuple))
336+
elog(ERROR, "cache lookup failed for index %u", indexRelationId);
351337

352-
/* delete tuple from index */
353-
}
354-
SPI_cursor_close(portal);
338+
Assert(Natts_pg_index <= INDEX_MAX_KEYS);
339+
heap_deform_tuple(tuple, RelationGetDescr(pg_index), values, isnull);
340+
values[Anum_pg_index_indpred - 1] = CStringGetTextDatum(nodeToString(stmt->whereClause));
341+
updatedTuple = heap_form_tuple(RelationGetDescr(pg_index), values, isnull);
342+
simple_heap_update(pg_index, &tuple->t_self, updatedTuple);
343+
CatalogUpdateIndexes(pg_index, updatedTuple);
344+
heap_freetuple(updatedTuple);
345+
heap_freetuple(tuple);
355346

347+
slot = MakeSingleTupleTableSlot(RelationGetDescr(heapRelation));
356348

349+
SPI_connect();
350+
deparseCtx = deparse_context_for(RelationGetRelationName(heapRelation), heapRelationId);
357351
select = psprintf("select * from %s where %s and not (%s)",
358-
quote_qualified_identifier(get_namespace_name(RelationGetNamespace(heapRelation)),
359-
get_rel_name(relationId)),
360-
nodeToString(stmt->whereClause),
361-
nodeToString(indexInfo->ii_Predicate)
362-
);
363-
plan = SPI_parepare(select, 0, NULL);
352+
quote_qualified_identifier(get_namespace_name(namespaceId),
353+
get_rel_name(heapRelationId)),
354+
deparse_expression(stmt->whereClause, deparseCtx, false, false),
355+
deparse_expression((Node*)make_ands_explicit(indexInfo->ii_Predicate), deparseCtx, false, false));
356+
plan = SPI_prepare(select, 0, NULL);
364357
if (plan == NULL) {
365358
ereport(ERROR,
366359
(errcode(ERRCODE_INVALID_CURSOR_STATE),
367-
errmsg("Failed to preapre statement ", select)));
360+
errmsg("Failed to preapre statement %s", select)));
368361
}
369362
portal = SPI_cursor_open(NULL, plan, NULL, NULL, true);
370363
if (portal == NULL) {
371364
ereport(ERROR,
372365
(errcode(ERRCODE_INVALID_CURSOR_STATE),
373-
errmsg("Failed to open cursor for ", select)));
366+
errmsg("Failed to open cursor for %s", select)));
374367
}
375368
while (true)
376369
{
@@ -379,40 +372,34 @@ AlterIndex(Oid relationId, IndexStmt *stmt, Oid indexRelationId)
379372
break;
380373
}
381374
tuple = SPI_tuptable->vals[0];
382-
tupdesc = SPI_tuptable->tupdesc;
383-
slot = TupleDescGetSlot(tupdesc);
384-
tupleid = &tuple->t_datat->t_ctid;
375+
tupleid = &tuple->t_data->t_ctid;
376+
ExecStoreTuple(tuple, slot, InvalidBuffer, false);
385377

386378
FormIndexDatum(indexInfo,
387379
slot,
388380
estate,
389381
values,
390382
isnull);
391-
satisfiesConstraint =
392-
index_insert(indexRelation, /* index relation */
393-
values, /* array of index Datums */
394-
isnull, /* null flags */
395-
tupleid, /* tid of heap tuple */
396-
heapRelation, /* heap relation */
397-
checkUnique); /* type of uniqueness check to do */
398-
399-
if (!satisfiesConstraint)
400-
{
401-
ereport(ERROR,
402-
(errcode(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION),
403-
errmsg("Index constraint violation")));
404-
}
383+
index_insert(indexRelation, /* index relation */
384+
values, /* array of index Datums */
385+
isnull, /* null flags */
386+
tupleid, /* tid of heap tuple */
387+
heapRelation, /* heap relation */
388+
checkUnique); /* type of uniqueness check to do */
389+
405390
SPI_freetuple(tuple);
406391
SPI_freetuptable(SPI_tuptable);
407392
}
408393
SPI_cursor_close(portal);
409394
SPI_finish();
410395

411-
/* Close both the relations, but keep the locks */
396+
ExecDropSingleTupleTableSlot(slot);
397+
FreeExecutorState(estate);
398+
399+
heap_close(pg_index, NoLock);
412400
heap_close(heapRelation, NoLock);
413401
index_close(indexRelation, NoLock);
414402
}
415-
#endif
416403

417404
/*
418405
* DefineIndex

src/backend/nodes/copyfuncs.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3127,6 +3127,7 @@ _copyIndexStmt(const IndexStmt *from)
31273127
COPY_SCALAR_FIELD(transformed);
31283128
COPY_SCALAR_FIELD(concurrent);
31293129
COPY_SCALAR_FIELD(if_not_exists);
3130+
COPY_SCALAR_FIELD(is_alter);
31303131

31313132
return newnode;
31323133
}

src/backend/nodes/equalfuncs.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1243,6 +1243,7 @@ _equalIndexStmt(const IndexStmt *a, const IndexStmt *b)
12431243
COMPARE_SCALAR_FIELD(transformed);
12441244
COMPARE_SCALAR_FIELD(concurrent);
12451245
COMPARE_SCALAR_FIELD(if_not_exists);
1246+
COMPARE_SCALAR_FIELD(is_alter);
12461247

12471248
return true;
12481249
}

src/backend/nodes/outfuncs.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2189,6 +2189,7 @@ _outIndexStmt(StringInfo str, const IndexStmt *node)
21892189
WRITE_BOOL_FIELD(transformed);
21902190
WRITE_BOOL_FIELD(concurrent);
21912191
WRITE_BOOL_FIELD(if_not_exists);
2192+
WRITE_BOOL_FIELD(is_alter);
21922193
}
21932194

21942195
static void

src/backend/parser/gram.y

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1801,6 +1801,15 @@ AlterTableStmt:
18011801
n->nowait = $13;
18021802
$$ = (Node *)n;
18031803
}
1804+
| ALTER INDEX qualified_name WHERE a_expr
1805+
{
1806+
IndexStmt* n = makeNode(IndexStmt);
1807+
n->relation = $3;
1808+
n->whereClause = $5;
1809+
n->is_alter = true;
1810+
$$ = (Node *)n;
1811+
}
1812+
18041813
| ALTER INDEX qualified_name alter_table_cmds
18051814
{
18061815
AlterTableStmt *n = makeNode(AlterTableStmt);

src/backend/parser/parse_utilcmd.c

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2065,6 +2065,10 @@ transformIndexStmt(Oid relid, IndexStmt *stmt, const char *queryString)
20652065
* to its fields without qualification. Caller is responsible for locking
20662066
* relation, but we still need to open it.
20672067
*/
2068+
if (stmt->is_alter)
2069+
{
2070+
relid = IndexGetRelation(relid, false);
2071+
}
20682072
rel = relation_open(relid, NoLock);
20692073
rte = addRangeTableEntryForRelation(pstate, rel, NULL, false, true);
20702074

src/backend/tcop/utility.c

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1257,22 +1257,29 @@ ProcessUtilitySlow(Node *parsetree,
12571257

12581258
/* ... and do it */
12591259
EventTriggerAlterTableStart(parsetree);
1260-
address =
1261-
DefineIndex(relid, /* OID of heap relation */
1262-
stmt,
1263-
InvalidOid, /* no predefined OID */
1264-
false, /* is_alter_table */
1265-
true, /* check_rights */
1266-
false, /* skip_build */
1267-
false); /* quiet */
1268-
1269-
/*
1270-
* Add the CREATE INDEX node itself to stash right away;
1271-
* if there were any commands stashed in the ALTER TABLE
1272-
* code, we need them to appear after this one.
1273-
*/
1274-
EventTriggerCollectSimpleCommand(address, secondaryObject,
1275-
parsetree);
1260+
if (stmt->is_alter)
1261+
{
1262+
AlterIndex(relid, stmt);
1263+
}
1264+
else
1265+
{
1266+
address =
1267+
DefineIndex(relid, /* OID of heap relation */
1268+
stmt,
1269+
InvalidOid, /* no predefined OID */
1270+
false, /* is_alter_table */
1271+
true, /* check_rights */
1272+
false, /* skip_build */
1273+
false); /* quiet */
1274+
1275+
/*
1276+
* Add the CREATE INDEX node itself to stash right away;
1277+
* if there were any commands stashed in the ALTER TABLE
1278+
* code, we need them to appear after this one.
1279+
*/
1280+
EventTriggerCollectSimpleCommand(address, secondaryObject,
1281+
parsetree);
1282+
}
12761283
commandCollected = true;
12771284
EventTriggerAlterTableEnd();
12781285
}

src/bin/insbench/core

9.32 MB
Binary file not shown.

0 commit comments

Comments
 (0)