@@ -411,51 +411,250 @@ PageRestoreTempPage(Page tempPage, Page oldPage)
411
411
}
412
412
413
413
/*
414
- * sorting support for PageRepairFragmentation and PageIndexMultiDelete
414
+ * Tuple defrag support for PageRepairFragmentation and PageIndexMultiDelete
415
415
*/
416
- typedef struct itemIdSortData
416
+ typedef struct itemIdCompactData
417
417
{
418
418
uint16 offsetindex ; /* linp array index */
419
419
int16 itemoff ; /* page offset of item data */
420
420
uint16 alignedlen ; /* MAXALIGN(item data len) */
421
- } itemIdSortData ;
422
- typedef itemIdSortData * itemIdSort ;
423
-
424
- static int
425
- itemoffcompare (const void * itemidp1 , const void * itemidp2 )
426
- {
427
- /* Sort in decreasing itemoff order */
428
- return ((itemIdSort ) itemidp2 )-> itemoff -
429
- ((itemIdSort ) itemidp1 )-> itemoff ;
430
- }
421
+ } itemIdCompactData ;
422
+ typedef itemIdCompactData * itemIdCompact ;
431
423
432
424
/*
433
425
* After removing or marking some line pointers unused, move the tuples to
434
- * remove the gaps caused by the removed items.
426
+ * remove the gaps caused by the removed items and reorder them back into
427
+ * reverse line pointer order in the page.
428
+ *
429
+ * This function can often be fairly hot, so it pays to take some measures to
430
+ * make it as optimal as possible.
431
+ *
432
+ * Callers may pass 'presorted' as true if the 'itemidbase' array is sorted in
433
+ * descending order of itemoff. When this is true we can just memmove()
434
+ * tuples towards the end of the page. This is quite a common case as it's
435
+ * the order that tuples are initially inserted into pages. When we call this
436
+ * function to defragment the tuples in the page then any new line pointers
437
+ * added to the page will keep that presorted order, so hitting this case is
438
+ * still very common for tables that are commonly updated.
439
+ *
440
+ * When the 'itemidbase' array is not presorted then we're unable to just
441
+ * memmove() tuples around freely. Doing so could cause us to overwrite the
442
+ * memory belonging to a tuple we've not moved yet. In this case, we copy all
443
+ * the tuples that need to be moved into a temporary buffer. We can then
444
+ * simply memcpy() out of that temp buffer back into the page at the correct
445
+ * location. Tuples are copied back into the page in the same order as the
446
+ * 'itemidbase' array, so we end up reordering the tuples back into reverse
447
+ * line pointer order. This will increase the chances of hitting the
448
+ * presorted case the next time around.
449
+ *
450
+ * Callers must ensure that nitems is > 0
435
451
*/
436
452
static void
437
- compactify_tuples (itemIdSort itemidbase , int nitems , Page page )
453
+ compactify_tuples (itemIdCompact itemidbase , int nitems , Page page , bool presorted )
438
454
{
439
455
PageHeader phdr = (PageHeader ) page ;
440
456
Offset upper ;
457
+ Offset copy_tail ;
458
+ Offset copy_head ;
459
+ itemIdCompact itemidptr ;
441
460
int i ;
442
461
443
- /* sort itemIdSortData array into decreasing itemoff order */
444
- qsort ((char * ) itemidbase , nitems , sizeof (itemIdSortData ),
445
- itemoffcompare );
462
+ /* Code within will not work correctly if nitems == 0 */
463
+ Assert (nitems > 0 );
446
464
447
- upper = phdr -> pd_special ;
448
- for (i = 0 ; i < nitems ; i ++ )
465
+ if (presorted )
449
466
{
450
- itemIdSort itemidptr = & itemidbase [i ];
451
- ItemId lp ;
452
467
453
- lp = PageGetItemId (page , itemidptr -> offsetindex + 1 );
454
- upper -= itemidptr -> alignedlen ;
468
+ #ifdef USE_ASSERT_CHECKING
469
+ {
470
+ /*
471
+ * Verify we've not gotten any new callers that are incorrectly
472
+ * passing a true presorted value.
473
+ */
474
+ Offset lastoff = phdr -> pd_special ;
475
+
476
+ for (i = 0 ; i < nitems ; i ++ )
477
+ {
478
+ itemidptr = & itemidbase [i ];
479
+
480
+ Assert (lastoff > itemidptr -> itemoff );
481
+
482
+ lastoff = itemidptr -> itemoff ;
483
+ }
484
+ }
485
+ #endif /* USE_ASSERT_CHECKING */
486
+
487
+ /*
488
+ * 'itemidbase' is already in the optimal order, i.e, lower item
489
+ * pointers have a higher offset. This allows us to memmove() the
490
+ * tuples up to the end of the page without having to worry about
491
+ * overwriting other tuples that have not been moved yet.
492
+ *
493
+ * There's a good chance that there are tuples already right at the
494
+ * end of the page that we can simply skip over because they're
495
+ * already in the correct location within the page. We'll do that
496
+ * first...
497
+ */
498
+ upper = phdr -> pd_special ;
499
+ i = 0 ;
500
+ do
501
+ {
502
+ itemidptr = & itemidbase [i ];
503
+ if (upper != itemidptr -> itemoff + itemidptr -> alignedlen )
504
+ break ;
505
+ upper -= itemidptr -> alignedlen ;
506
+
507
+ i ++ ;
508
+ } while (i < nitems );
509
+
510
+ /*
511
+ * Now that we've found the first tuple that needs to be moved, we can
512
+ * do the tuple compactification. We try and make the least number of
513
+ * memmove() calls and only call memmove() when there's a gap. When
514
+ * we see a gap we just move all tuples after the gap up until the
515
+ * point of the last move operation.
516
+ */
517
+ copy_tail = copy_head = itemidptr -> itemoff + itemidptr -> alignedlen ;
518
+ for (; i < nitems ; i ++ )
519
+ {
520
+ ItemId lp ;
521
+
522
+ itemidptr = & itemidbase [i ];
523
+ lp = PageGetItemId (page , itemidptr -> offsetindex + 1 );
524
+
525
+ if (copy_head != itemidptr -> itemoff + itemidptr -> alignedlen )
526
+ {
527
+ memmove ((char * ) page + upper ,
528
+ page + copy_head ,
529
+ copy_tail - copy_head );
530
+
531
+ /*
532
+ * We've now moved all tuples already seen, but not the
533
+ * current tuple, so we set the copy_tail to the end of this
534
+ * tuple so it can be moved in another iteration of the loop.
535
+ */
536
+ copy_tail = itemidptr -> itemoff + itemidptr -> alignedlen ;
537
+ }
538
+ /* shift the target offset down by the length of this tuple */
539
+ upper -= itemidptr -> alignedlen ;
540
+ /* point the copy_head to the start of this tuple */
541
+ copy_head = itemidptr -> itemoff ;
542
+
543
+ /* update the line pointer to reference the new offset */
544
+ lp -> lp_off = upper ;
545
+
546
+ }
547
+
548
+ /* move the remaining tuples. */
455
549
memmove ((char * ) page + upper ,
456
- (char * ) page + itemidptr -> itemoff ,
457
- itemidptr -> alignedlen );
458
- lp -> lp_off = upper ;
550
+ page + copy_head ,
551
+ copy_tail - copy_head );
552
+ }
553
+ else
554
+ {
555
+ PGAlignedBlock scratch ;
556
+ char * scratchptr = scratch .data ;
557
+
558
+ /*
559
+ * Non-presorted case: The tuples in the itemidbase array may be in
560
+ * any order. So, in order to move these to the end of the page we
561
+ * must make a temp copy of each tuple that needs to be moved before
562
+ * we copy them back into the page at the new offset.
563
+ *
564
+ * If a large percentage of tuples have been pruned (>75%) then we'll
565
+ * copy these into the temp buffer tuple-by-tuple, otherwise, we'll
566
+ * just do a single memcpy() for all tuples that need to be moved.
567
+ * When so many tuples have been removed there's likely to be a lot of
568
+ * gaps and it's unlikely that many non-movable tuples remain at the
569
+ * end of the page.
570
+ */
571
+ if (nitems < PageGetMaxOffsetNumber (page ) / 4 )
572
+ {
573
+ i = 0 ;
574
+ do
575
+ {
576
+ itemidptr = & itemidbase [i ];
577
+ memcpy (scratchptr + itemidptr -> itemoff , page + itemidptr -> itemoff ,
578
+ itemidptr -> alignedlen );
579
+ i ++ ;
580
+ } while (i < nitems );
581
+
582
+ /* Set things up for the compactification code below */
583
+ i = 0 ;
584
+ itemidptr = & itemidbase [0 ];
585
+ upper = phdr -> pd_special ;
586
+ }
587
+ else
588
+ {
589
+ upper = phdr -> pd_special ;
590
+
591
+ /*
592
+ * Many tuples are likely to already be in the correct location.
593
+ * There's no need to copy these into the temp buffer. Instead
594
+ * we'll just skip forward in the itemidbase array to the position
595
+ * that we do need to move tuples from so that the code below just
596
+ * leaves these ones alone.
597
+ */
598
+ i = 0 ;
599
+ do
600
+ {
601
+ itemidptr = & itemidbase [i ];
602
+ if (upper != itemidptr -> itemoff + itemidptr -> alignedlen )
603
+ break ;
604
+ upper -= itemidptr -> alignedlen ;
605
+
606
+ i ++ ;
607
+ } while (i < nitems );
608
+
609
+ /* Copy all tuples that need to be moved into the temp buffer */
610
+ memcpy (scratchptr + phdr -> pd_upper ,
611
+ page + phdr -> pd_upper ,
612
+ upper - phdr -> pd_upper );
613
+ }
614
+
615
+ /*
616
+ * Do the tuple compactification. itemidptr is already pointing to
617
+ * the first tuple that we're going to move. Here we collapse the
618
+ * memcpy calls for adjacent tuples into a single call. This is done
619
+ * by delaying the memcpy call until we find a gap that needs to be
620
+ * closed.
621
+ */
622
+ copy_tail = copy_head = itemidptr -> itemoff + itemidptr -> alignedlen ;
623
+ for (; i < nitems ; i ++ )
624
+ {
625
+ ItemId lp ;
626
+
627
+ itemidptr = & itemidbase [i ];
628
+ lp = PageGetItemId (page , itemidptr -> offsetindex + 1 );
629
+
630
+ /* copy pending tuples when we detect a gap */
631
+ if (copy_head != itemidptr -> itemoff + itemidptr -> alignedlen )
632
+ {
633
+ memcpy ((char * ) page + upper ,
634
+ scratchptr + copy_head ,
635
+ copy_tail - copy_head );
636
+
637
+ /*
638
+ * We've now copied all tuples already seen, but not the
639
+ * current tuple, so we set the copy_tail to the end of this
640
+ * tuple.
641
+ */
642
+ copy_tail = itemidptr -> itemoff + itemidptr -> alignedlen ;
643
+ }
644
+ /* shift the target offset down by the length of this tuple */
645
+ upper -= itemidptr -> alignedlen ;
646
+ /* point the copy_head to the start of this tuple */
647
+ copy_head = itemidptr -> itemoff ;
648
+
649
+ /* update the line pointer to reference the new offset */
650
+ lp -> lp_off = upper ;
651
+
652
+ }
653
+
654
+ /* Copy the remaining chunk */
655
+ memcpy ((char * ) page + upper ,
656
+ scratchptr + copy_head ,
657
+ copy_tail - copy_head );
459
658
}
460
659
461
660
phdr -> pd_upper = upper ;
@@ -477,14 +676,16 @@ PageRepairFragmentation(Page page)
477
676
Offset pd_lower = ((PageHeader ) page )-> pd_lower ;
478
677
Offset pd_upper = ((PageHeader ) page )-> pd_upper ;
479
678
Offset pd_special = ((PageHeader ) page )-> pd_special ;
480
- itemIdSortData itemidbase [MaxHeapTuplesPerPage ];
481
- itemIdSort itemidptr ;
679
+ Offset last_offset ;
680
+ itemIdCompactData itemidbase [MaxHeapTuplesPerPage ];
681
+ itemIdCompact itemidptr ;
482
682
ItemId lp ;
483
683
int nline ,
484
684
nstorage ,
485
685
nunused ;
486
686
int i ;
487
687
Size totallen ;
688
+ bool presorted = true; /* For now */
488
689
489
690
/*
490
691
* It's worth the trouble to be more paranoid here than in most places,
@@ -509,6 +710,7 @@ PageRepairFragmentation(Page page)
509
710
nline = PageGetMaxOffsetNumber (page );
510
711
itemidptr = itemidbase ;
511
712
nunused = totallen = 0 ;
713
+ last_offset = pd_special ;
512
714
for (i = FirstOffsetNumber ; i <= nline ; i ++ )
513
715
{
514
716
lp = PageGetItemId (page , i );
@@ -518,6 +720,12 @@ PageRepairFragmentation(Page page)
518
720
{
519
721
itemidptr -> offsetindex = i - 1 ;
520
722
itemidptr -> itemoff = ItemIdGetOffset (lp );
723
+
724
+ if (last_offset > itemidptr -> itemoff )
725
+ last_offset = itemidptr -> itemoff ;
726
+ else
727
+ presorted = false;
728
+
521
729
if (unlikely (itemidptr -> itemoff < (int ) pd_upper ||
522
730
itemidptr -> itemoff >= (int ) pd_special ))
523
731
ereport (ERROR ,
@@ -552,7 +760,7 @@ PageRepairFragmentation(Page page)
552
760
errmsg ("corrupted item lengths: total %u, available space %u" ,
553
761
(unsigned int ) totallen , pd_special - pd_lower )));
554
762
555
- compactify_tuples (itemidbase , nstorage , page );
763
+ compactify_tuples (itemidbase , nstorage , page , presorted );
556
764
}
557
765
558
766
/* Set hint bit for PageAddItem */
@@ -831,9 +1039,10 @@ PageIndexMultiDelete(Page page, OffsetNumber *itemnos, int nitems)
831
1039
Offset pd_lower = phdr -> pd_lower ;
832
1040
Offset pd_upper = phdr -> pd_upper ;
833
1041
Offset pd_special = phdr -> pd_special ;
834
- itemIdSortData itemidbase [MaxIndexTuplesPerPage ];
1042
+ Offset last_offset ;
1043
+ itemIdCompactData itemidbase [MaxIndexTuplesPerPage ];
835
1044
ItemIdData newitemids [MaxIndexTuplesPerPage ];
836
- itemIdSort itemidptr ;
1045
+ itemIdCompact itemidptr ;
837
1046
ItemId lp ;
838
1047
int nline ,
839
1048
nused ;
@@ -842,6 +1051,7 @@ PageIndexMultiDelete(Page page, OffsetNumber *itemnos, int nitems)
842
1051
unsigned offset ;
843
1052
int nextitm ;
844
1053
OffsetNumber offnum ;
1054
+ bool presorted = true; /* For now */
845
1055
846
1056
Assert (nitems <= MaxIndexTuplesPerPage );
847
1057
@@ -883,6 +1093,7 @@ PageIndexMultiDelete(Page page, OffsetNumber *itemnos, int nitems)
883
1093
totallen = 0 ;
884
1094
nused = 0 ;
885
1095
nextitm = 0 ;
1096
+ last_offset = pd_special ;
886
1097
for (offnum = FirstOffsetNumber ; offnum <= nline ; offnum = OffsetNumberNext (offnum ))
887
1098
{
888
1099
lp = PageGetItemId (page , offnum );
@@ -906,6 +1117,12 @@ PageIndexMultiDelete(Page page, OffsetNumber *itemnos, int nitems)
906
1117
{
907
1118
itemidptr -> offsetindex = nused ; /* where it will go */
908
1119
itemidptr -> itemoff = offset ;
1120
+
1121
+ if (last_offset > itemidptr -> itemoff )
1122
+ last_offset = itemidptr -> itemoff ;
1123
+ else
1124
+ presorted = false;
1125
+
909
1126
itemidptr -> alignedlen = MAXALIGN (size );
910
1127
totallen += itemidptr -> alignedlen ;
911
1128
newitemids [nused ] = * lp ;
@@ -932,7 +1149,10 @@ PageIndexMultiDelete(Page page, OffsetNumber *itemnos, int nitems)
932
1149
phdr -> pd_lower = SizeOfPageHeaderData + nused * sizeof (ItemIdData );
933
1150
934
1151
/* and compactify the tuple data */
935
- compactify_tuples (itemidbase , nused , page );
1152
+ if (nused > 0 )
1153
+ compactify_tuples (itemidbase , nused , page , presorted );
1154
+ else
1155
+ phdr -> pd_upper = pd_special ;
936
1156
}
937
1157
938
1158
0 commit comments