@@ -399,6 +399,7 @@ static void ATExecDropInherit(Relation rel, RangeVar *parent, LOCKMODE lockmode)
399
399
static void drop_parent_dependency (Oid relid , Oid refclassid , Oid refobjid );
400
400
static void ATExecAddOf (Relation rel , const TypeName * ofTypename , LOCKMODE lockmode );
401
401
static void ATExecDropOf (Relation rel , LOCKMODE lockmode );
402
+ static void ATExecReplicaIdentity (Relation rel , ReplicaIdentityStmt * stmt , LOCKMODE lockmode );
402
403
static void ATExecGenericOptions (Relation rel , List * options );
403
404
404
405
static void copy_relation_data (SMgrRelation rel , SMgrRelation dst ,
@@ -2809,6 +2810,7 @@ AlterTableGetLockLevel(List *cmds)
2809
2810
case AT_DisableTrigUser :
2810
2811
case AT_AddIndex : /* from ADD CONSTRAINT */
2811
2812
case AT_AddIndexConstraint :
2813
+ case AT_ReplicaIdentity :
2812
2814
cmd_lockmode = ShareRowExclusiveLock ;
2813
2815
break ;
2814
2816
@@ -3140,6 +3142,12 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
3140
3142
cmd -> subtype = AT_ValidateConstraintRecurse ;
3141
3143
pass = AT_PASS_MISC ;
3142
3144
break ;
3145
+ case AT_ReplicaIdentity : /* REPLICA IDENTITY ... */
3146
+ ATSimplePermissions (rel , ATT_TABLE | ATT_MATVIEW );
3147
+ pass = AT_PASS_MISC ;
3148
+ /* This command never recurses */
3149
+ /* No command-specific prep needed */
3150
+ break ;
3143
3151
case AT_EnableTrig : /* ENABLE TRIGGER variants */
3144
3152
case AT_EnableAlwaysTrig :
3145
3153
case AT_EnableReplicaTrig :
@@ -3440,6 +3448,9 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
3440
3448
case AT_DropOf :
3441
3449
ATExecDropOf (rel , lockmode );
3442
3450
break ;
3451
+ case AT_ReplicaIdentity :
3452
+ ATExecReplicaIdentity (rel , (ReplicaIdentityStmt * ) cmd -> def , lockmode );
3453
+ break ;
3443
3454
case AT_GenericOptions :
3444
3455
ATExecGenericOptions (rel , (List * ) cmd -> def );
3445
3456
break ;
@@ -10009,6 +10020,217 @@ ATExecDropOf(Relation rel, LOCKMODE lockmode)
10009
10020
heap_close (relationRelation , RowExclusiveLock );
10010
10021
}
10011
10022
10023
+ /*
10024
+ * relation_mark_replica_identity: Update a table's replica identity
10025
+ *
10026
+ * Iff ri_type = REPLICA_IDENTITY_INDEX, indexOid must be the Oid of a suitable
10027
+ * index. Otherwise, it should be InvalidOid.
10028
+ */
10029
+ static void
10030
+ relation_mark_replica_identity (Relation rel , char ri_type , Oid indexOid ,
10031
+ bool is_internal )
10032
+ {
10033
+ Relation pg_index ;
10034
+ Relation pg_class ;
10035
+ HeapTuple pg_class_tuple ;
10036
+ HeapTuple pg_index_tuple ;
10037
+ Form_pg_class pg_class_form ;
10038
+ Form_pg_index pg_index_form ;
10039
+
10040
+ ListCell * index ;
10041
+
10042
+ /*
10043
+ * Check whether relreplident has changed, and update it if so.
10044
+ */
10045
+ pg_class = heap_open (RelationRelationId , RowExclusiveLock );
10046
+ pg_class_tuple = SearchSysCacheCopy1 (RELOID ,
10047
+ ObjectIdGetDatum (RelationGetRelid (rel )));
10048
+ if (!HeapTupleIsValid (pg_class_tuple ))
10049
+ elog (ERROR , "cache lookup failed for relation \"%s\"" ,
10050
+ RelationGetRelationName (rel ));
10051
+ pg_class_form = (Form_pg_class ) GETSTRUCT (pg_class_tuple );
10052
+ if (pg_class_form -> relreplident != ri_type )
10053
+ {
10054
+ pg_class_form -> relreplident = ri_type ;
10055
+ simple_heap_update (pg_class , & pg_class_tuple -> t_self , pg_class_tuple );
10056
+ CatalogUpdateIndexes (pg_class , pg_class_tuple );
10057
+ }
10058
+ heap_close (pg_class , RowExclusiveLock );
10059
+ heap_freetuple (pg_class_tuple );
10060
+
10061
+ /*
10062
+ * Check whether the correct index is marked indisreplident; if so, we're
10063
+ * done.
10064
+ */
10065
+ if (OidIsValid (indexOid ))
10066
+ {
10067
+ Assert (ri_type == REPLICA_IDENTITY_INDEX );
10068
+
10069
+ pg_index_tuple = SearchSysCache1 (INDEXRELID , ObjectIdGetDatum (indexOid ));
10070
+ if (!HeapTupleIsValid (pg_index_tuple ))
10071
+ elog (ERROR , "cache lookup failed for index %u" , indexOid );
10072
+ pg_index_form = (Form_pg_index ) GETSTRUCT (pg_index_tuple );
10073
+
10074
+ if (pg_index_form -> indisreplident )
10075
+ {
10076
+ ReleaseSysCache (pg_index_tuple );
10077
+ return ;
10078
+ }
10079
+ ReleaseSysCache (pg_index_tuple );
10080
+ }
10081
+
10082
+ /*
10083
+ * Clear the indisreplident flag from any index that had it previously, and
10084
+ * set it for any index that should have it now.
10085
+ */
10086
+ pg_index = heap_open (IndexRelationId , RowExclusiveLock );
10087
+ foreach (index , RelationGetIndexList (rel ))
10088
+ {
10089
+ Oid thisIndexOid = lfirst_oid (index );
10090
+ bool dirty = false;
10091
+
10092
+ pg_index_tuple = SearchSysCacheCopy1 (INDEXRELID ,
10093
+ ObjectIdGetDatum (thisIndexOid ));
10094
+ if (!HeapTupleIsValid (pg_index_tuple ))
10095
+ elog (ERROR , "cache lookup failed for index %u" , thisIndexOid );
10096
+ pg_index_form = (Form_pg_index ) GETSTRUCT (pg_index_tuple );
10097
+
10098
+ /*
10099
+ * Unset the bit if set. We know it's wrong because we checked this
10100
+ * earlier.
10101
+ */
10102
+ if (pg_index_form -> indisreplident )
10103
+ {
10104
+ dirty = true;
10105
+ pg_index_form -> indisreplident = false;
10106
+ }
10107
+ else if (thisIndexOid == indexOid )
10108
+ {
10109
+ dirty = true;
10110
+ pg_index_form -> indisreplident = true;
10111
+ }
10112
+
10113
+ if (dirty )
10114
+ {
10115
+ simple_heap_update (pg_index , & pg_index_tuple -> t_self , pg_index_tuple );
10116
+ CatalogUpdateIndexes (pg_index , pg_index_tuple );
10117
+ InvokeObjectPostAlterHookArg (IndexRelationId , thisIndexOid , 0 ,
10118
+ InvalidOid , is_internal );
10119
+ }
10120
+ heap_freetuple (pg_index_tuple );
10121
+ }
10122
+
10123
+ heap_close (pg_index , RowExclusiveLock );
10124
+ }
10125
+
10126
+ /*
10127
+ * ALTER TABLE <name> REPLICA IDENTITY ...
10128
+ */
10129
+ static void
10130
+ ATExecReplicaIdentity (Relation rel , ReplicaIdentityStmt * stmt , LOCKMODE lockmode )
10131
+ {
10132
+ Oid indexOid ;
10133
+ Relation indexRel ;
10134
+ int key ;
10135
+
10136
+ if (stmt -> identity_type == REPLICA_IDENTITY_DEFAULT )
10137
+ {
10138
+ relation_mark_replica_identity (rel , stmt -> identity_type , InvalidOid , true);
10139
+ return ;
10140
+ }
10141
+ else if (stmt -> identity_type == REPLICA_IDENTITY_FULL )
10142
+ {
10143
+ relation_mark_replica_identity (rel , stmt -> identity_type , InvalidOid , true);
10144
+ return ;
10145
+ }
10146
+ else if (stmt -> identity_type == REPLICA_IDENTITY_NOTHING )
10147
+ {
10148
+ relation_mark_replica_identity (rel , stmt -> identity_type , InvalidOid , true);
10149
+ return ;
10150
+ }
10151
+ else if (stmt -> identity_type == REPLICA_IDENTITY_INDEX )
10152
+ {
10153
+ /* fallthrough */ ;
10154
+ }
10155
+ else
10156
+ elog (ERROR , "unexpected identity type %u" , stmt -> identity_type );
10157
+
10158
+
10159
+ /* Check that the index exists */
10160
+ indexOid = get_relname_relid (stmt -> name , rel -> rd_rel -> relnamespace );
10161
+ if (!OidIsValid (indexOid ))
10162
+ ereport (ERROR ,
10163
+ (errcode (ERRCODE_UNDEFINED_OBJECT ),
10164
+ errmsg ("index \"%s\" for table \"%s\" does not exist" ,
10165
+ stmt -> name , RelationGetRelationName (rel ))));
10166
+
10167
+ indexRel = index_open (indexOid , ShareLock );
10168
+
10169
+ /* Check that the index is on the relation we're altering. */
10170
+ if (indexRel -> rd_index == NULL ||
10171
+ indexRel -> rd_index -> indrelid != RelationGetRelid (rel ))
10172
+ ereport (ERROR ,
10173
+ (errcode (ERRCODE_WRONG_OBJECT_TYPE ),
10174
+ errmsg ("\"%s\" is not an index for table \"%s\"" ,
10175
+ RelationGetRelationName (indexRel ),
10176
+ RelationGetRelationName (rel ))));
10177
+ /* The AM must support uniqueness, and the index must in fact be unique. */
10178
+ if (!indexRel -> rd_am -> amcanunique || !indexRel -> rd_index -> indisunique )
10179
+ ereport (ERROR ,
10180
+ (errcode (ERRCODE_WRONG_OBJECT_TYPE ),
10181
+ errmsg ("cannot use non-unique index \"%s\" as replica identity" ,
10182
+ RelationGetRelationName (indexRel ))));
10183
+ /* Deferred indexes are not guaranteed to be always unique. */
10184
+ if (!indexRel -> rd_index -> indimmediate )
10185
+ ereport (ERROR ,
10186
+ (errcode (ERRCODE_FEATURE_NOT_SUPPORTED ),
10187
+ errmsg ("cannot use non-immediate index \"%s\" as replica identity" ,
10188
+ RelationGetRelationName (indexRel ))));
10189
+ /* Expression indexes aren't supported. */
10190
+ if (RelationGetIndexExpressions (indexRel ) != NIL )
10191
+ ereport (ERROR ,
10192
+ (errcode (ERRCODE_FEATURE_NOT_SUPPORTED ),
10193
+ errmsg ("cannot use expression index \"%s\" as replica identity" ,
10194
+ RelationGetRelationName (indexRel ))));
10195
+ /* Predicate indexes aren't supported. */
10196
+ if (RelationGetIndexPredicate (indexRel ) != NIL )
10197
+ ereport (ERROR ,
10198
+ (errcode (ERRCODE_FEATURE_NOT_SUPPORTED ),
10199
+ errmsg ("cannot use partial index \"%s\" as replica identity" ,
10200
+ RelationGetRelationName (indexRel ))));
10201
+ /* And neither are invalid indexes. */
10202
+ if (!IndexIsValid (indexRel -> rd_index ))
10203
+ ereport (ERROR ,
10204
+ (errcode (ERRCODE_FEATURE_NOT_SUPPORTED ),
10205
+ errmsg ("cannot use invalid index \"%s\" as replica identity" ,
10206
+ RelationGetRelationName (indexRel ))));
10207
+
10208
+ /* Check index for nullable columns. */
10209
+ for (key = 0 ; key < indexRel -> rd_index -> indnatts ; key ++ )
10210
+ {
10211
+ int16 attno = indexRel -> rd_index -> indkey .values [key ];
10212
+ Form_pg_attribute attr ;
10213
+
10214
+ /* Of the system columns, only oid is indexable. */
10215
+ if (attno <= 0 && attno != ObjectIdAttributeNumber )
10216
+ elog (ERROR , "internal column %u in unique index \"%s\"" ,
10217
+ attno , RelationGetRelationName (indexRel ));
10218
+
10219
+ attr = rel -> rd_att -> attrs [attno - 1 ];
10220
+ if (!attr -> attnotnull )
10221
+ ereport (ERROR ,
10222
+ (errcode (ERRCODE_WRONG_OBJECT_TYPE ),
10223
+ errmsg ("index \"%s\" cannot be used as replica identity because column \"%s\" is nullable" ,
10224
+ RelationGetRelationName (indexRel ),
10225
+ NameStr (attr -> attname ))));
10226
+ }
10227
+
10228
+ /* This index is suitable for use as a replica identity. Mark it. */
10229
+ relation_mark_replica_identity (rel , stmt -> identity_type , indexOid , true);
10230
+
10231
+ index_close (indexRel , NoLock );
10232
+ }
10233
+
10012
10234
/*
10013
10235
* ALTER FOREIGN TABLE <name> OPTIONS (...)
10014
10236
*/
0 commit comments