Skip to content

Commit 07cacba

Browse files
committed
Add the notion of REPLICA IDENTITY for a table.
Pending patches for logical replication will use this to determine which columns of a tuple ought to be considered as its candidate key. Andres Freund, with minor, mostly cosmetic adjustments by me
1 parent b97ee66 commit 07cacba

File tree

23 files changed

+902
-49
lines changed

23 files changed

+902
-49
lines changed

doc/src/sgml/catalogs.sgml

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1862,6 +1862,19 @@
18621862
relations other than some materialized views)</entry>
18631863
</row>
18641864

1865+
<row>
1866+
<entry><structfield>relreplident</structfield></entry>
1867+
<entry><type>char</type></entry>
1868+
<entry></entry>
1869+
<entry>
1870+
Columns used to form <quote>replica identity</> for rows:
1871+
<literal>d</> = default (primary key, if any),
1872+
<literal>n</> = nothing,
1873+
<literal>f</> = all columns
1874+
<literal>i</> = index with indisreplident set, or default
1875+
</entry>
1876+
</row>
1877+
18651878
<row>
18661879
<entry><structfield>relfrozenxid</structfield></entry>
18671880
<entry><type>xid</type></entry>
@@ -3657,6 +3670,17 @@
36573670
</entry>
36583671
</row>
36593672

3673+
<row>
3674+
<entry><structfield>indisreplident</structfield></entry>
3675+
<entry><type>bool</type></entry>
3676+
<entry></entry>
3677+
<entry>
3678+
If true this index has been chosen as <quote>replica identity</>
3679+
using <command>ALTER TABLE ... REPLICA IDENTITY USING INDEX
3680+
...</>
3681+
</entry>
3682+
</row>
3683+
36603684
<row>
36613685
<entry><structfield>indkey</structfield></entry>
36623686
<entry><type>int2vector</type></entry>

doc/src/sgml/ref/alter_table.sgml

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ ALTER TABLE [ IF EXISTS ] <replaceable class="PARAMETER">name</replaceable>
6969
NOT OF
7070
OWNER TO <replaceable class="PARAMETER">new_owner</replaceable>
7171
SET TABLESPACE <replaceable class="PARAMETER">new_tablespace</replaceable>
72+
REPLICA IDENTITY {DEFAULT | USING INDEX <replaceable class="PARAMETER">index_name</replaceable> | FULL | NOTHING}
7273

7374
<phrase>and <replaceable class="PARAMETER">table_constraint_using_index</replaceable> is:</phrase>
7475

@@ -579,6 +580,24 @@ ALTER TABLE [ IF EXISTS ] <replaceable class="PARAMETER">name</replaceable>
579580
</listitem>
580581
</varlistentry>
581582

583+
<varlistentry>
584+
<term><literal>REPLICA IDENTITY</literal></term>
585+
<listitem>
586+
<para>
587+
This form changes the information which is written to the write-ahead log
588+
to identify rows which are updated or deleted. This option has no effect
589+
except when logical replication is in use. <literal>DEFAULT</> records the
590+
old values of the columns of the primary key, if any. <literal>USING INDEX</>
591+
records the old values of the columns covered by the named index, which
592+
must be unique, not partial, not deferrable, and include only columns marked
593+
<literal>NOT NULL</>. <literal>FULL</> records the old values of all columns
594+
in the row. <literal>NOTHING</> records no information about the old row.
595+
In all cases, no old values are logged unless at least one of the columns
596+
that would be logged differs between the old and new versions of the row.
597+
</para>
598+
</listitem>
599+
</varlistentry>
600+
582601
<varlistentry>
583602
<term><literal>RENAME</literal></term>
584603
<listitem>

src/backend/catalog/heap.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -793,6 +793,7 @@ InsertPgClassTuple(Relation pg_class_desc,
793793
values[Anum_pg_class_relhastriggers - 1] = BoolGetDatum(rd_rel->relhastriggers);
794794
values[Anum_pg_class_relhassubclass - 1] = BoolGetDatum(rd_rel->relhassubclass);
795795
values[Anum_pg_class_relispopulated - 1] = BoolGetDatum(rd_rel->relispopulated);
796+
values[Anum_pg_class_relreplident - 1] = CharGetDatum(rd_rel->relreplident);
796797
values[Anum_pg_class_relfrozenxid - 1] = TransactionIdGetDatum(rd_rel->relfrozenxid);
797798
values[Anum_pg_class_relminmxid - 1] = MultiXactIdGetDatum(rd_rel->relminmxid);
798799
if (relacl != (Datum) 0)

src/backend/catalog/index.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -614,6 +614,7 @@ UpdateIndexRelation(Oid indexoid,
614614
/* we set isvalid and isready the same way */
615615
values[Anum_pg_index_indisready - 1] = BoolGetDatum(isvalid);
616616
values[Anum_pg_index_indislive - 1] = BoolGetDatum(true);
617+
values[Anum_pg_index_indisreplident - 1] = BoolGetDatum(false);
617618
values[Anum_pg_index_indkey - 1] = PointerGetDatum(indkey);
618619
values[Anum_pg_index_indcollation - 1] = PointerGetDatum(indcollation);
619620
values[Anum_pg_index_indclass - 1] = PointerGetDatum(indclass);

src/backend/commands/tablecmds.c

Lines changed: 222 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -399,6 +399,7 @@ static void ATExecDropInherit(Relation rel, RangeVar *parent, LOCKMODE lockmode)
399399
static void drop_parent_dependency(Oid relid, Oid refclassid, Oid refobjid);
400400
static void ATExecAddOf(Relation rel, const TypeName *ofTypename, LOCKMODE lockmode);
401401
static void ATExecDropOf(Relation rel, LOCKMODE lockmode);
402+
static void ATExecReplicaIdentity(Relation rel, ReplicaIdentityStmt *stmt, LOCKMODE lockmode);
402403
static void ATExecGenericOptions(Relation rel, List *options);
403404

404405
static void copy_relation_data(SMgrRelation rel, SMgrRelation dst,
@@ -2809,6 +2810,7 @@ AlterTableGetLockLevel(List *cmds)
28092810
case AT_DisableTrigUser:
28102811
case AT_AddIndex: /* from ADD CONSTRAINT */
28112812
case AT_AddIndexConstraint:
2813+
case AT_ReplicaIdentity:
28122814
cmd_lockmode = ShareRowExclusiveLock;
28132815
break;
28142816

@@ -3140,6 +3142,12 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
31403142
cmd->subtype = AT_ValidateConstraintRecurse;
31413143
pass = AT_PASS_MISC;
31423144
break;
3145+
case AT_ReplicaIdentity: /* REPLICA IDENTITY ... */
3146+
ATSimplePermissions(rel, ATT_TABLE | ATT_MATVIEW);
3147+
pass = AT_PASS_MISC;
3148+
/* This command never recurses */
3149+
/* No command-specific prep needed */
3150+
break;
31433151
case AT_EnableTrig: /* ENABLE TRIGGER variants */
31443152
case AT_EnableAlwaysTrig:
31453153
case AT_EnableReplicaTrig:
@@ -3440,6 +3448,9 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
34403448
case AT_DropOf:
34413449
ATExecDropOf(rel, lockmode);
34423450
break;
3451+
case AT_ReplicaIdentity:
3452+
ATExecReplicaIdentity(rel, (ReplicaIdentityStmt *) cmd->def, lockmode);
3453+
break;
34433454
case AT_GenericOptions:
34443455
ATExecGenericOptions(rel, (List *) cmd->def);
34453456
break;
@@ -10009,6 +10020,217 @@ ATExecDropOf(Relation rel, LOCKMODE lockmode)
1000910020
heap_close(relationRelation, RowExclusiveLock);
1001010021
}
1001110022

10023+
/*
10024+
* relation_mark_replica_identity: Update a table's replica identity
10025+
*
10026+
* Iff ri_type = REPLICA_IDENTITY_INDEX, indexOid must be the Oid of a suitable
10027+
* index. Otherwise, it should be InvalidOid.
10028+
*/
10029+
static void
10030+
relation_mark_replica_identity(Relation rel, char ri_type, Oid indexOid,
10031+
bool is_internal)
10032+
{
10033+
Relation pg_index;
10034+
Relation pg_class;
10035+
HeapTuple pg_class_tuple;
10036+
HeapTuple pg_index_tuple;
10037+
Form_pg_class pg_class_form;
10038+
Form_pg_index pg_index_form;
10039+
10040+
ListCell *index;
10041+
10042+
/*
10043+
* Check whether relreplident has changed, and update it if so.
10044+
*/
10045+
pg_class = heap_open(RelationRelationId, RowExclusiveLock);
10046+
pg_class_tuple = SearchSysCacheCopy1(RELOID,
10047+
ObjectIdGetDatum(RelationGetRelid(rel)));
10048+
if (!HeapTupleIsValid(pg_class_tuple))
10049+
elog(ERROR, "cache lookup failed for relation \"%s\"",
10050+
RelationGetRelationName(rel));
10051+
pg_class_form = (Form_pg_class) GETSTRUCT(pg_class_tuple);
10052+
if (pg_class_form->relreplident != ri_type)
10053+
{
10054+
pg_class_form->relreplident = ri_type;
10055+
simple_heap_update(pg_class, &pg_class_tuple->t_self, pg_class_tuple);
10056+
CatalogUpdateIndexes(pg_class, pg_class_tuple);
10057+
}
10058+
heap_close(pg_class, RowExclusiveLock);
10059+
heap_freetuple(pg_class_tuple);
10060+
10061+
/*
10062+
* Check whether the correct index is marked indisreplident; if so, we're
10063+
* done.
10064+
*/
10065+
if (OidIsValid(indexOid))
10066+
{
10067+
Assert(ri_type == REPLICA_IDENTITY_INDEX);
10068+
10069+
pg_index_tuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(indexOid));
10070+
if (!HeapTupleIsValid(pg_index_tuple))
10071+
elog(ERROR, "cache lookup failed for index %u", indexOid);
10072+
pg_index_form = (Form_pg_index) GETSTRUCT(pg_index_tuple);
10073+
10074+
if (pg_index_form->indisreplident)
10075+
{
10076+
ReleaseSysCache(pg_index_tuple);
10077+
return;
10078+
}
10079+
ReleaseSysCache(pg_index_tuple);
10080+
}
10081+
10082+
/*
10083+
* Clear the indisreplident flag from any index that had it previously, and
10084+
* set it for any index that should have it now.
10085+
*/
10086+
pg_index = heap_open(IndexRelationId, RowExclusiveLock);
10087+
foreach(index, RelationGetIndexList(rel))
10088+
{
10089+
Oid thisIndexOid = lfirst_oid(index);
10090+
bool dirty = false;
10091+
10092+
pg_index_tuple = SearchSysCacheCopy1(INDEXRELID,
10093+
ObjectIdGetDatum(thisIndexOid));
10094+
if (!HeapTupleIsValid(pg_index_tuple))
10095+
elog(ERROR, "cache lookup failed for index %u", thisIndexOid);
10096+
pg_index_form = (Form_pg_index) GETSTRUCT(pg_index_tuple);
10097+
10098+
/*
10099+
* Unset the bit if set. We know it's wrong because we checked this
10100+
* earlier.
10101+
*/
10102+
if (pg_index_form->indisreplident)
10103+
{
10104+
dirty = true;
10105+
pg_index_form->indisreplident = false;
10106+
}
10107+
else if (thisIndexOid == indexOid)
10108+
{
10109+
dirty = true;
10110+
pg_index_form->indisreplident = true;
10111+
}
10112+
10113+
if (dirty)
10114+
{
10115+
simple_heap_update(pg_index, &pg_index_tuple->t_self, pg_index_tuple);
10116+
CatalogUpdateIndexes(pg_index, pg_index_tuple);
10117+
InvokeObjectPostAlterHookArg(IndexRelationId, thisIndexOid, 0,
10118+
InvalidOid, is_internal);
10119+
}
10120+
heap_freetuple(pg_index_tuple);
10121+
}
10122+
10123+
heap_close(pg_index, RowExclusiveLock);
10124+
}
10125+
10126+
/*
10127+
* ALTER TABLE <name> REPLICA IDENTITY ...
10128+
*/
10129+
static void
10130+
ATExecReplicaIdentity(Relation rel, ReplicaIdentityStmt *stmt, LOCKMODE lockmode)
10131+
{
10132+
Oid indexOid;
10133+
Relation indexRel;
10134+
int key;
10135+
10136+
if (stmt->identity_type == REPLICA_IDENTITY_DEFAULT)
10137+
{
10138+
relation_mark_replica_identity(rel, stmt->identity_type, InvalidOid, true);
10139+
return;
10140+
}
10141+
else if (stmt->identity_type == REPLICA_IDENTITY_FULL)
10142+
{
10143+
relation_mark_replica_identity(rel, stmt->identity_type, InvalidOid, true);
10144+
return;
10145+
}
10146+
else if (stmt->identity_type == REPLICA_IDENTITY_NOTHING)
10147+
{
10148+
relation_mark_replica_identity(rel, stmt->identity_type, InvalidOid, true);
10149+
return;
10150+
}
10151+
else if (stmt->identity_type == REPLICA_IDENTITY_INDEX)
10152+
{
10153+
/* fallthrough */;
10154+
}
10155+
else
10156+
elog(ERROR, "unexpected identity type %u", stmt->identity_type);
10157+
10158+
10159+
/* Check that the index exists */
10160+
indexOid = get_relname_relid(stmt->name, rel->rd_rel->relnamespace);
10161+
if (!OidIsValid(indexOid))
10162+
ereport(ERROR,
10163+
(errcode(ERRCODE_UNDEFINED_OBJECT),
10164+
errmsg("index \"%s\" for table \"%s\" does not exist",
10165+
stmt->name, RelationGetRelationName(rel))));
10166+
10167+
indexRel = index_open(indexOid, ShareLock);
10168+
10169+
/* Check that the index is on the relation we're altering. */
10170+
if (indexRel->rd_index == NULL ||
10171+
indexRel->rd_index->indrelid != RelationGetRelid(rel))
10172+
ereport(ERROR,
10173+
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
10174+
errmsg("\"%s\" is not an index for table \"%s\"",
10175+
RelationGetRelationName(indexRel),
10176+
RelationGetRelationName(rel))));
10177+
/* The AM must support uniqueness, and the index must in fact be unique. */
10178+
if (!indexRel->rd_am->amcanunique || !indexRel->rd_index->indisunique)
10179+
ereport(ERROR,
10180+
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
10181+
errmsg("cannot use non-unique index \"%s\" as replica identity",
10182+
RelationGetRelationName(indexRel))));
10183+
/* Deferred indexes are not guaranteed to be always unique. */
10184+
if (!indexRel->rd_index->indimmediate)
10185+
ereport(ERROR,
10186+
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
10187+
errmsg("cannot use non-immediate index \"%s\" as replica identity",
10188+
RelationGetRelationName(indexRel))));
10189+
/* Expression indexes aren't supported. */
10190+
if (RelationGetIndexExpressions(indexRel) != NIL)
10191+
ereport(ERROR,
10192+
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
10193+
errmsg("cannot use expression index \"%s\" as replica identity",
10194+
RelationGetRelationName(indexRel))));
10195+
/* Predicate indexes aren't supported. */
10196+
if (RelationGetIndexPredicate(indexRel) != NIL)
10197+
ereport(ERROR,
10198+
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
10199+
errmsg("cannot use partial index \"%s\" as replica identity",
10200+
RelationGetRelationName(indexRel))));
10201+
/* And neither are invalid indexes. */
10202+
if (!IndexIsValid(indexRel->rd_index))
10203+
ereport(ERROR,
10204+
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
10205+
errmsg("cannot use invalid index \"%s\" as replica identity",
10206+
RelationGetRelationName(indexRel))));
10207+
10208+
/* Check index for nullable columns. */
10209+
for (key = 0; key < indexRel->rd_index->indnatts; key++)
10210+
{
10211+
int16 attno = indexRel->rd_index->indkey.values[key];
10212+
Form_pg_attribute attr;
10213+
10214+
/* Of the system columns, only oid is indexable. */
10215+
if (attno <= 0 && attno != ObjectIdAttributeNumber)
10216+
elog(ERROR, "internal column %u in unique index \"%s\"",
10217+
attno, RelationGetRelationName(indexRel));
10218+
10219+
attr = rel->rd_att->attrs[attno - 1];
10220+
if (!attr->attnotnull)
10221+
ereport(ERROR,
10222+
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
10223+
errmsg("index \"%s\" cannot be used as replica identity because column \"%s\" is nullable",
10224+
RelationGetRelationName(indexRel),
10225+
NameStr(attr->attname))));
10226+
}
10227+
10228+
/* This index is suitable for use as a replica identity. Mark it. */
10229+
relation_mark_replica_identity(rel, stmt->identity_type, indexOid, true);
10230+
10231+
index_close(indexRel, NoLock);
10232+
}
10233+
1001210234
/*
1001310235
* ALTER FOREIGN TABLE <name> OPTIONS (...)
1001410236
*/

src/backend/nodes/copyfuncs.c

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3270,6 +3270,17 @@ _copyRefreshMatViewStmt(const RefreshMatViewStmt *from)
32703270
return newnode;
32713271
}
32723272

3273+
static ReplicaIdentityStmt *
3274+
_copyReplicaIdentityStmt(const ReplicaIdentityStmt *from)
3275+
{
3276+
ReplicaIdentityStmt *newnode = makeNode(ReplicaIdentityStmt);
3277+
3278+
COPY_SCALAR_FIELD(identity_type);
3279+
COPY_STRING_FIELD(name);
3280+
3281+
return newnode;
3282+
}
3283+
32733284
static CreateSeqStmt *
32743285
_copyCreateSeqStmt(const CreateSeqStmt *from)
32753286
{
@@ -4343,6 +4354,9 @@ copyObject(const void *from)
43434354
case T_RefreshMatViewStmt:
43444355
retval = _copyRefreshMatViewStmt(from);
43454356
break;
4357+
case T_ReplicaIdentityStmt:
4358+
retval = _copyReplicaIdentityStmt(from);
4359+
break;
43464360
case T_CreateSeqStmt:
43474361
retval = _copyCreateSeqStmt(from);
43484362
break;

src/backend/nodes/equalfuncs.c

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1537,6 +1537,15 @@ _equalRefreshMatViewStmt(const RefreshMatViewStmt *a, const RefreshMatViewStmt *
15371537
return true;
15381538
}
15391539

1540+
static bool
1541+
_equalReplicaIdentityStmt(const ReplicaIdentityStmt *a, const ReplicaIdentityStmt *b)
1542+
{
1543+
COMPARE_SCALAR_FIELD(identity_type);
1544+
COMPARE_STRING_FIELD(name);
1545+
1546+
return true;
1547+
}
1548+
15401549
static bool
15411550
_equalCreateSeqStmt(const CreateSeqStmt *a, const CreateSeqStmt *b)
15421551
{
@@ -2813,6 +2822,9 @@ equal(const void *a, const void *b)
28132822
case T_RefreshMatViewStmt:
28142823
retval = _equalRefreshMatViewStmt(a, b);
28152824
break;
2825+
case T_ReplicaIdentityStmt:
2826+
retval = _equalReplicaIdentityStmt(a, b);
2827+
break;
28162828
case T_CreateSeqStmt:
28172829
retval = _equalCreateSeqStmt(a, b);
28182830
break;

0 commit comments

Comments
 (0)