51
51
#include "access/xloginsert.h"
52
52
#include "access/xlogutils.h"
53
53
#include "catalog/catalog.h"
54
+ #include "catalog/pg_database.h"
55
+ #include "catalog/pg_database_d.h"
54
56
#include "commands/vacuum.h"
55
57
#include "miscadmin.h"
56
58
#include "pgstat.h"
@@ -75,6 +77,12 @@ static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf,
75
77
Buffer newbuf , HeapTuple oldtup ,
76
78
HeapTuple newtup , HeapTuple old_key_tuple ,
77
79
bool all_visible_cleared , bool new_all_visible_cleared );
80
+ #ifdef USE_ASSERT_CHECKING
81
+ static void check_lock_if_inplace_updateable_rel (Relation relation ,
82
+ ItemPointer otid ,
83
+ HeapTuple newtup );
84
+ static void check_inplace_rel_lock (HeapTuple oldtup );
85
+ #endif
78
86
static Bitmapset * HeapDetermineColumnsInfo (Relation relation ,
79
87
Bitmapset * interesting_cols ,
80
88
Bitmapset * external_cols ,
@@ -121,6 +129,8 @@ static HeapTuple ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool ke
121
129
* heavyweight lock mode and MultiXactStatus values to use for any particular
122
130
* tuple lock strength.
123
131
*
132
+ * These interact with InplaceUpdateTupleLock, an alias for ExclusiveLock.
133
+ *
124
134
* Don't look at lockstatus/updstatus directly! Use get_mxact_status_for_lock
125
135
* instead.
126
136
*/
@@ -3207,6 +3217,10 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
3207
3217
(errcode (ERRCODE_INVALID_TRANSACTION_STATE ),
3208
3218
errmsg ("cannot update tuples during a parallel operation" )));
3209
3219
3220
+ #ifdef USE_ASSERT_CHECKING
3221
+ check_lock_if_inplace_updateable_rel (relation , otid , newtup );
3222
+ #endif
3223
+
3210
3224
/*
3211
3225
* Fetch the list of attributes to be checked for various operations.
3212
3226
*
@@ -4071,6 +4085,128 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
4071
4085
return TM_Ok ;
4072
4086
}
4073
4087
4088
+ #ifdef USE_ASSERT_CHECKING
4089
+ /*
4090
+ * Confirm adequate lock held during heap_update(), per rules from
4091
+ * README.tuplock section "Locking to write inplace-updated tables".
4092
+ */
4093
+ static void
4094
+ check_lock_if_inplace_updateable_rel (Relation relation ,
4095
+ ItemPointer otid ,
4096
+ HeapTuple newtup )
4097
+ {
4098
+ /* LOCKTAG_TUPLE acceptable for any catalog */
4099
+ switch (RelationGetRelid (relation ))
4100
+ {
4101
+ case RelationRelationId :
4102
+ case DatabaseRelationId :
4103
+ {
4104
+ LOCKTAG tuptag ;
4105
+
4106
+ SET_LOCKTAG_TUPLE (tuptag ,
4107
+ relation -> rd_lockInfo .lockRelId .dbId ,
4108
+ relation -> rd_lockInfo .lockRelId .relId ,
4109
+ ItemPointerGetBlockNumber (otid ),
4110
+ ItemPointerGetOffsetNumber (otid ));
4111
+ if (LockHeldByMe (& tuptag , InplaceUpdateTupleLock , false))
4112
+ return ;
4113
+ }
4114
+ break ;
4115
+ default :
4116
+ Assert (!IsInplaceUpdateRelation (relation ));
4117
+ return ;
4118
+ }
4119
+
4120
+ switch (RelationGetRelid (relation ))
4121
+ {
4122
+ case RelationRelationId :
4123
+ {
4124
+ /* LOCKTAG_TUPLE or LOCKTAG_RELATION ok */
4125
+ Form_pg_class classForm = (Form_pg_class ) GETSTRUCT (newtup );
4126
+ Oid relid = classForm -> oid ;
4127
+ Oid dbid ;
4128
+ LOCKTAG tag ;
4129
+
4130
+ if (IsSharedRelation (relid ))
4131
+ dbid = InvalidOid ;
4132
+ else
4133
+ dbid = MyDatabaseId ;
4134
+
4135
+ if (classForm -> relkind == RELKIND_INDEX )
4136
+ {
4137
+ Relation irel = index_open (relid , AccessShareLock );
4138
+
4139
+ SET_LOCKTAG_RELATION (tag , dbid , irel -> rd_index -> indrelid );
4140
+ index_close (irel , AccessShareLock );
4141
+ }
4142
+ else
4143
+ SET_LOCKTAG_RELATION (tag , dbid , relid );
4144
+
4145
+ if (!LockHeldByMe (& tag , ShareUpdateExclusiveLock , false) &&
4146
+ !LockHeldByMe (& tag , ShareRowExclusiveLock , true))
4147
+ elog (WARNING ,
4148
+ "missing lock for relation \"%s\" (OID %u, relkind %c) @ TID (%u,%u)" ,
4149
+ NameStr (classForm -> relname ),
4150
+ relid ,
4151
+ classForm -> relkind ,
4152
+ ItemPointerGetBlockNumber (otid ),
4153
+ ItemPointerGetOffsetNumber (otid ));
4154
+ }
4155
+ break ;
4156
+ case DatabaseRelationId :
4157
+ {
4158
+ /* LOCKTAG_TUPLE required */
4159
+ Form_pg_database dbForm = (Form_pg_database ) GETSTRUCT (newtup );
4160
+
4161
+ elog (WARNING ,
4162
+ "missing lock on database \"%s\" (OID %u) @ TID (%u,%u)" ,
4163
+ NameStr (dbForm -> datname ),
4164
+ dbForm -> oid ,
4165
+ ItemPointerGetBlockNumber (otid ),
4166
+ ItemPointerGetOffsetNumber (otid ));
4167
+ }
4168
+ break ;
4169
+ }
4170
+ }
4171
+
4172
+ /*
4173
+ * Confirm adequate relation lock held, per rules from README.tuplock section
4174
+ * "Locking to write inplace-updated tables".
4175
+ */
4176
+ static void
4177
+ check_inplace_rel_lock (HeapTuple oldtup )
4178
+ {
4179
+ Form_pg_class classForm = (Form_pg_class ) GETSTRUCT (oldtup );
4180
+ Oid relid = classForm -> oid ;
4181
+ Oid dbid ;
4182
+ LOCKTAG tag ;
4183
+
4184
+ if (IsSharedRelation (relid ))
4185
+ dbid = InvalidOid ;
4186
+ else
4187
+ dbid = MyDatabaseId ;
4188
+
4189
+ if (classForm -> relkind == RELKIND_INDEX )
4190
+ {
4191
+ Relation irel = index_open (relid , AccessShareLock );
4192
+
4193
+ SET_LOCKTAG_RELATION (tag , dbid , irel -> rd_index -> indrelid );
4194
+ index_close (irel , AccessShareLock );
4195
+ }
4196
+ else
4197
+ SET_LOCKTAG_RELATION (tag , dbid , relid );
4198
+
4199
+ if (!LockHeldByMe (& tag , ShareUpdateExclusiveLock , true))
4200
+ elog (WARNING ,
4201
+ "missing lock for relation \"%s\" (OID %u, relkind %c) @ TID (%u,%u)" ,
4202
+ NameStr (classForm -> relname ),
4203
+ relid ,
4204
+ classForm -> relkind ,
4205
+ ItemPointerGetBlockNumber (& oldtup -> t_self ),
4206
+ ItemPointerGetOffsetNumber (& oldtup -> t_self ));
4207
+ }
4208
+ #endif
4209
+
4074
4210
/*
4075
4211
* Check if the specified attribute's values are the same. Subroutine for
4076
4212
* HeapDetermineColumnsInfo.
@@ -6088,15 +6224,21 @@ heap_inplace_lock(Relation relation,
6088
6224
TM_Result result ;
6089
6225
bool ret ;
6090
6226
6227
+ #ifdef USE_ASSERT_CHECKING
6228
+ if (RelationGetRelid (relation ) == RelationRelationId )
6229
+ check_inplace_rel_lock (oldtup_ptr );
6230
+ #endif
6231
+
6091
6232
Assert (BufferIsValid (buffer ));
6092
6233
6234
+ LockTuple (relation , & oldtup .t_self , InplaceUpdateTupleLock );
6093
6235
LockBuffer (buffer , BUFFER_LOCK_EXCLUSIVE );
6094
6236
6095
6237
/*----------
6096
6238
* Interpret HeapTupleSatisfiesUpdate() like heap_update() does, except:
6097
6239
*
6098
6240
* - wait unconditionally
6099
- * - no tuple locks
6241
+ * - already locked tuple above, since inplace needs that unconditionally
6100
6242
* - don't recheck header after wait: simpler to defer to next iteration
6101
6243
* - don't try to continue even if the updater aborts: likewise
6102
6244
* - no crosscheck
@@ -6180,7 +6322,10 @@ heap_inplace_lock(Relation relation,
6180
6322
* don't bother optimizing that.
6181
6323
*/
6182
6324
if (!ret )
6325
+ {
6326
+ UnlockTuple (relation , & oldtup .t_self , InplaceUpdateTupleLock );
6183
6327
InvalidateCatalogSnapshot ();
6328
+ }
6184
6329
return ret ;
6185
6330
}
6186
6331
@@ -6189,6 +6334,8 @@ heap_inplace_lock(Relation relation,
6189
6334
*
6190
6335
* The tuple cannot change size, and therefore its header fields and null
6191
6336
* bitmap (if any) don't change either.
6337
+ *
6338
+ * Since we hold LOCKTAG_TUPLE, no updater has a local copy of this tuple.
6192
6339
*/
6193
6340
void
6194
6341
heap_inplace_update_and_unlock (Relation relation ,
@@ -6272,6 +6419,7 @@ heap_inplace_unlock(Relation relation,
6272
6419
HeapTuple oldtup , Buffer buffer )
6273
6420
{
6274
6421
LockBuffer (buffer , BUFFER_LOCK_UNLOCK );
6422
+ UnlockTuple (relation , & oldtup -> t_self , InplaceUpdateTupleLock );
6275
6423
}
6276
6424
6277
6425
/*
0 commit comments