@@ -109,75 +109,17 @@ xlogVacuumPage(Relation index, Buffer buffer)
109
109
PageSetLSN (page , recptr );
110
110
}
111
111
112
- static bool
113
- ginVacuumPostingTreeLeaves (GinVacuumState * gvs , BlockNumber blkno , bool isRoot , Buffer * rootBuffer )
114
- {
115
- Buffer buffer ;
116
- Page page ;
117
- bool hasVoidPage = FALSE;
118
- MemoryContext oldCxt ;
119
-
120
- buffer = ReadBufferExtended (gvs -> index , MAIN_FORKNUM , blkno ,
121
- RBM_NORMAL , gvs -> strategy );
122
- page = BufferGetPage (buffer );
123
-
124
- /*
125
- * We should be sure that we don't concurrent with inserts, insert process
126
- * never release root page until end (but it can unlock it and lock
127
- * again). New scan can't start but previously started ones work
128
- * concurrently.
129
- */
130
- if (isRoot )
131
- LockBufferForCleanup (buffer );
132
- else
133
- LockBuffer (buffer , GIN_EXCLUSIVE );
134
-
135
- Assert (GinPageIsData (page ));
136
112
137
- if (GinPageIsLeaf (page ))
138
- {
139
- oldCxt = MemoryContextSwitchTo (gvs -> tmpCxt );
140
- ginVacuumPostingTreeLeaf (gvs -> index , buffer , gvs );
141
- MemoryContextSwitchTo (oldCxt );
142
- MemoryContextReset (gvs -> tmpCxt );
143
-
144
- /* if root is a leaf page, we don't desire further processing */
145
- if (!isRoot && !hasVoidPage && GinDataLeafPageIsEmpty (page ))
146
- hasVoidPage = TRUE;
147
- }
148
- else
149
- {
150
- OffsetNumber i ;
151
- bool isChildHasVoid = FALSE;
152
-
153
- for (i = FirstOffsetNumber ; i <= GinPageGetOpaque (page )-> maxoff ; i ++ )
154
- {
155
- PostingItem * pitem = GinDataPageGetPostingItem (page , i );
156
-
157
- if (ginVacuumPostingTreeLeaves (gvs , PostingItemGetBlockNumber (pitem ), FALSE, NULL ))
158
- isChildHasVoid = TRUE;
159
- }
160
-
161
- if (isChildHasVoid )
162
- hasVoidPage = TRUE;
163
- }
113
+ typedef struct DataPageDeleteStack
114
+ {
115
+ struct DataPageDeleteStack * child ;
116
+ struct DataPageDeleteStack * parent ;
164
117
165
- /*
166
- * if we have root and there are empty pages in tree, then we don't
167
- * release lock to go further processing and guarantee that tree is unused
168
- */
169
- if (!(isRoot && hasVoidPage ))
170
- {
171
- UnlockReleaseBuffer (buffer );
172
- }
173
- else
174
- {
175
- Assert (rootBuffer );
176
- * rootBuffer = buffer ;
177
- }
118
+ BlockNumber blkno ; /* current block number */
119
+ BlockNumber leftBlkno ; /* rightest non-deleted page on left */
120
+ bool isRoot ;
121
+ } DataPageDeleteStack ;
178
122
179
- return hasVoidPage ;
180
- }
181
123
182
124
/*
183
125
* Delete a posting tree page.
@@ -194,8 +136,13 @@ ginDeletePage(GinVacuumState *gvs, BlockNumber deleteBlkno, BlockNumber leftBlkn
194
136
BlockNumber rightlink ;
195
137
196
138
/*
197
- * Lock the pages in the same order as an insertion would, to avoid
198
- * deadlocks: left, then right, then parent.
139
+ * This function MUST be called only if someone of parent pages hold
140
+ * exclusive cleanup lock. This guarantees that no insertions currently
141
+ * happen in this subtree. Caller also acquire Exclusive lock on deletable
142
+ * page and is acquiring and releasing exclusive lock on left page before.
143
+ * Left page was locked and released. Then parent and this page are locked.
144
+ * We acquire left page lock here only to mark page dirty after changing
145
+ * right pointer.
199
146
*/
200
147
lBuffer = ReadBufferExtended (gvs -> index , MAIN_FORKNUM , leftBlkno ,
201
148
RBM_NORMAL , gvs -> strategy );
@@ -205,10 +152,6 @@ ginDeletePage(GinVacuumState *gvs, BlockNumber deleteBlkno, BlockNumber leftBlkn
205
152
RBM_NORMAL , gvs -> strategy );
206
153
207
154
LockBuffer (lBuffer , GIN_EXCLUSIVE );
208
- LockBuffer (dBuffer , GIN_EXCLUSIVE );
209
- if (!isParentRoot ) /* parent is already locked by
210
- * LockBufferForCleanup() */
211
- LockBuffer (pBuffer , GIN_EXCLUSIVE );
212
155
213
156
START_CRIT_SECTION ();
214
157
@@ -272,26 +215,15 @@ ginDeletePage(GinVacuumState *gvs, BlockNumber deleteBlkno, BlockNumber leftBlkn
272
215
PageSetLSN (BufferGetPage (lBuffer ), recptr );
273
216
}
274
217
275
- if (!isParentRoot )
276
- LockBuffer (pBuffer , GIN_UNLOCK );
277
218
ReleaseBuffer (pBuffer );
278
219
UnlockReleaseBuffer (lBuffer );
279
- UnlockReleaseBuffer (dBuffer );
220
+ ReleaseBuffer (dBuffer );
280
221
281
222
END_CRIT_SECTION ();
282
223
283
224
gvs -> result -> pages_deleted ++ ;
284
225
}
285
226
286
- typedef struct DataPageDeleteStack
287
- {
288
- struct DataPageDeleteStack * child ;
289
- struct DataPageDeleteStack * parent ;
290
-
291
- BlockNumber blkno ; /* current block number */
292
- BlockNumber leftBlkno ; /* rightest non-deleted page on left */
293
- bool isRoot ;
294
- } DataPageDeleteStack ;
295
227
296
228
/*
297
229
* scans posting tree and deletes empty pages
@@ -325,6 +257,10 @@ ginScanToDelete(GinVacuumState *gvs, BlockNumber blkno, bool isRoot,
325
257
326
258
buffer = ReadBufferExtended (gvs -> index , MAIN_FORKNUM , blkno ,
327
259
RBM_NORMAL , gvs -> strategy );
260
+
261
+ if (!isRoot )
262
+ LockBuffer (buffer , GIN_EXCLUSIVE );
263
+
328
264
page = BufferGetPage (buffer );
329
265
330
266
Assert (GinPageIsData (page ));
@@ -359,6 +295,9 @@ ginScanToDelete(GinVacuumState *gvs, BlockNumber blkno, bool isRoot,
359
295
}
360
296
}
361
297
298
+ if (!isRoot )
299
+ LockBuffer (buffer , GIN_UNLOCK );
300
+
362
301
ReleaseBuffer (buffer );
363
302
364
303
if (!meDelete )
@@ -367,37 +306,124 @@ ginScanToDelete(GinVacuumState *gvs, BlockNumber blkno, bool isRoot,
367
306
return meDelete ;
368
307
}
369
308
370
- static void
371
- ginVacuumPostingTree (GinVacuumState * gvs , BlockNumber rootBlkno )
309
+
310
+ /*
311
+ * Scan through posting tree, delete empty tuples from leaf pages.
312
+ * Also, this function collects empty subtrees (with all empty leafs).
313
+ * For parents of these subtrees CleanUp lock is taken, then we call
314
+ * ScanToDelete. This is done for every inner page, which points to
315
+ * empty subtree.
316
+ */
317
+ static bool
318
+ ginVacuumPostingTreeLeaves (GinVacuumState * gvs , BlockNumber blkno , bool isRoot )
372
319
{
373
- Buffer rootBuffer = InvalidBuffer ;
374
- DataPageDeleteStack root ,
375
- * ptr ,
376
- * tmp ;
320
+ Buffer buffer ;
321
+ Page page ;
322
+ bool hasVoidPage = FALSE;
323
+ MemoryContext oldCxt ;
324
+
325
+ buffer = ReadBufferExtended (gvs -> index , MAIN_FORKNUM , blkno ,
326
+ RBM_NORMAL , gvs -> strategy );
327
+ page = BufferGetPage (buffer );
328
+
329
+ ginTraverseLock (buffer ,false);
377
330
378
- if (ginVacuumPostingTreeLeaves (gvs , rootBlkno , TRUE, & rootBuffer ) == FALSE)
331
+ Assert (GinPageIsData (page ));
332
+
333
+ if (GinPageIsLeaf (page ))
379
334
{
380
- Assert (rootBuffer == InvalidBuffer );
381
- return ;
335
+ oldCxt = MemoryContextSwitchTo (gvs -> tmpCxt );
336
+ ginVacuumPostingTreeLeaf (gvs -> index , buffer , gvs );
337
+ MemoryContextSwitchTo (oldCxt );
338
+ MemoryContextReset (gvs -> tmpCxt );
339
+
340
+ /* if root is a leaf page, we don't desire further processing */
341
+ if (GinDataLeafPageIsEmpty (page ))
342
+ hasVoidPage = TRUE;
343
+
344
+ UnlockReleaseBuffer (buffer );
345
+
346
+ return hasVoidPage ;
382
347
}
348
+ else
349
+ {
350
+ OffsetNumber i ;
351
+ bool hasEmptyChild = FALSE;
352
+ bool hasNonEmptyChild = FALSE;
353
+ OffsetNumber maxoff = GinPageGetOpaque (page )-> maxoff ;
354
+ BlockNumber * children = palloc (sizeof (BlockNumber ) * (maxoff + 1 ));
355
+
356
+ /*
357
+ * Read all children BlockNumbers.
358
+ * Not sure it is safe if there are many concurrent vacuums.
359
+ */
383
360
384
- memset ( & root , 0 , sizeof ( DataPageDeleteStack ));
385
- root . leftBlkno = InvalidBlockNumber ;
386
- root . isRoot = TRUE ;
361
+ for ( i = FirstOffsetNumber ; i <= maxoff ; i ++ )
362
+ {
363
+ PostingItem * pitem = GinDataPageGetPostingItem ( page , i ) ;
387
364
388
- vacuum_delay_point ();
365
+ children [i ] = PostingItemGetBlockNumber (pitem );
366
+ }
389
367
390
- ginScanToDelete ( gvs , rootBlkno , TRUE, & root , InvalidOffsetNumber );
368
+ UnlockReleaseBuffer ( buffer );
391
369
392
- ptr = root .child ;
393
- while (ptr )
394
- {
395
- tmp = ptr -> child ;
396
- pfree (ptr );
397
- ptr = tmp ;
370
+ for (i = FirstOffsetNumber ; i <= maxoff ; i ++ )
371
+ {
372
+ if (ginVacuumPostingTreeLeaves (gvs , children [i ], FALSE))
373
+ hasEmptyChild = TRUE;
374
+ else
375
+ hasNonEmptyChild = TRUE;
376
+ }
377
+
378
+ pfree (children );
379
+
380
+ vacuum_delay_point ();
381
+
382
+ /*
383
+ * All subtree is empty - just return TRUE to indicate that parent must
384
+ * do a cleanup. Unless we are ROOT an there is way to go upper.
385
+ */
386
+
387
+ if (hasEmptyChild && !hasNonEmptyChild && !isRoot )
388
+ return TRUE;
389
+
390
+ if (hasEmptyChild )
391
+ {
392
+ DataPageDeleteStack root ,
393
+ * ptr ,
394
+ * tmp ;
395
+
396
+ buffer = ReadBufferExtended (gvs -> index , MAIN_FORKNUM , blkno ,
397
+ RBM_NORMAL , gvs -> strategy );
398
+ LockBufferForCleanup (buffer );
399
+
400
+ memset (& root , 0 , sizeof (DataPageDeleteStack ));
401
+ root .leftBlkno = InvalidBlockNumber ;
402
+ root .isRoot = TRUE;
403
+
404
+ ginScanToDelete (gvs , blkno , TRUE, & root , InvalidOffsetNumber );
405
+
406
+ ptr = root .child ;
407
+
408
+ while (ptr )
409
+ {
410
+ tmp = ptr -> child ;
411
+ pfree (ptr );
412
+ ptr = tmp ;
413
+ }
414
+
415
+ UnlockReleaseBuffer (buffer );
416
+ }
417
+
418
+ /* Here we have deleted all empty subtrees */
419
+ return FALSE;
398
420
}
421
+ }
399
422
400
- UnlockReleaseBuffer (rootBuffer );
423
+ static void
424
+ ginVacuumPostingTree (GinVacuumState * gvs , BlockNumber rootBlkno )
425
+ {
426
+ ginVacuumPostingTreeLeaves (gvs , rootBlkno , TRUE);
401
427
}
402
428
403
429
/*
0 commit comments