8
8
*
9
9
*
10
10
* IDENTIFICATION
11
- * $PostgreSQL: pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.178 2010/03/28 09:27:01 sriggs Exp $
11
+ * $PostgreSQL: pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.179 2010/08/29 19:33:14 tgl Exp $
12
12
*
13
13
*-------------------------------------------------------------------------
14
14
*/
@@ -74,9 +74,8 @@ static OffsetNumber _bt_findsplitloc(Relation rel, Page page,
74
74
static void _bt_checksplitloc (FindSplitData * state ,
75
75
OffsetNumber firstoldonright , bool newitemonleft ,
76
76
int dataitemstoleft , Size firstoldonrightsz );
77
- static void _bt_pgaddtup (Relation rel , Page page ,
78
- Size itemsize , IndexTuple itup ,
79
- OffsetNumber itup_off , const char * where );
77
+ static bool _bt_pgaddtup (Page page , Size itemsize , IndexTuple itup ,
78
+ OffsetNumber itup_off );
80
79
static bool _bt_isequal (TupleDesc itupdesc , Page page , OffsetNumber offnum ,
81
80
int keysz , ScanKey scankey );
82
81
static void _bt_vacuum_one_page (Relation rel , Buffer buffer , Relation heapRel );
@@ -753,7 +752,9 @@ _bt_insertonpg(Relation rel,
753
752
/* Do the update. No ereport(ERROR) until changes are logged */
754
753
START_CRIT_SECTION ();
755
754
756
- _bt_pgaddtup (rel , page , itemsz , itup , newitemoff , "page" );
755
+ if (!_bt_pgaddtup (page , itemsz , itup , newitemoff ))
756
+ elog (PANIC , "failed to add new item to block %u in index \"%s\"" ,
757
+ itup_blkno , RelationGetRelationName (rel ));
757
758
758
759
MarkBufferDirty (buf );
759
760
@@ -879,6 +880,8 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
879
880
Page origpage ;
880
881
Page leftpage ,
881
882
rightpage ;
883
+ BlockNumber origpagenumber ,
884
+ rightpagenumber ;
882
885
BTPageOpaque ropaque ,
883
886
lopaque ,
884
887
oopaque ;
@@ -894,11 +897,27 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
894
897
OffsetNumber i ;
895
898
bool isroot ;
896
899
900
+ /* Acquire a new page to split into */
897
901
rbuf = _bt_getbuf (rel , P_NEW , BT_WRITE );
902
+
903
+ /*
904
+ * origpage is the original page to be split. leftpage is a temporary
905
+ * buffer that receives the left-sibling data, which will be copied back
906
+ * into origpage on success. rightpage is the new page that receives
907
+ * the right-sibling data. If we fail before reaching the critical
908
+ * section, origpage hasn't been modified and leftpage is only workspace.
909
+ * In principle we shouldn't need to worry about rightpage either,
910
+ * because it hasn't been linked into the btree page structure; but to
911
+ * avoid leaving possibly-confusing junk behind, we are careful to rewrite
912
+ * rightpage as zeroes before throwing any error.
913
+ */
898
914
origpage = BufferGetPage (buf );
899
915
leftpage = PageGetTempPage (origpage );
900
916
rightpage = BufferGetPage (rbuf );
901
917
918
+ origpagenumber = BufferGetBlockNumber (buf );
919
+ rightpagenumber = BufferGetBlockNumber (rbuf );
920
+
902
921
_bt_pageinit (leftpage , BufferGetPageSize (buf ));
903
922
/* rightpage was already initialized by _bt_getbuf */
904
923
@@ -923,8 +942,8 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
923
942
lopaque -> btpo_flags &= ~(BTP_ROOT | BTP_SPLIT_END | BTP_HAS_GARBAGE );
924
943
ropaque -> btpo_flags = lopaque -> btpo_flags ;
925
944
lopaque -> btpo_prev = oopaque -> btpo_prev ;
926
- lopaque -> btpo_next = BufferGetBlockNumber ( rbuf ) ;
927
- ropaque -> btpo_prev = BufferGetBlockNumber ( buf ) ;
945
+ lopaque -> btpo_next = rightpagenumber ;
946
+ ropaque -> btpo_prev = origpagenumber ;
928
947
ropaque -> btpo_next = oopaque -> btpo_next ;
929
948
lopaque -> btpo .level = ropaque -> btpo .level = oopaque -> btpo .level ;
930
949
/* Since we already have write-lock on both pages, ok to read cycleid */
@@ -947,9 +966,12 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
947
966
item = (IndexTuple ) PageGetItem (origpage , itemid );
948
967
if (PageAddItem (rightpage , (Item ) item , itemsz , rightoff ,
949
968
false, false) == InvalidOffsetNumber )
950
- elog (PANIC , "failed to add hikey to the right sibling"
969
+ {
970
+ memset (rightpage , 0 , BufferGetPageSize (rbuf ));
971
+ elog (ERROR , "failed to add hikey to the right sibling"
951
972
" while splitting block %u of index \"%s\"" ,
952
- BufferGetBlockNumber (buf ), RelationGetRelationName (rel ));
973
+ origpagenumber , RelationGetRelationName (rel ));
974
+ }
953
975
rightoff = OffsetNumberNext (rightoff );
954
976
}
955
977
@@ -974,9 +996,12 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
974
996
}
975
997
if (PageAddItem (leftpage , (Item ) item , itemsz , leftoff ,
976
998
false, false) == InvalidOffsetNumber )
977
- elog (PANIC , "failed to add hikey to the left sibling"
999
+ {
1000
+ memset (rightpage , 0 , BufferGetPageSize (rbuf ));
1001
+ elog (ERROR , "failed to add hikey to the left sibling"
978
1002
" while splitting block %u of index \"%s\"" ,
979
- BufferGetBlockNumber (buf ), RelationGetRelationName (rel ));
1003
+ origpagenumber , RelationGetRelationName (rel ));
1004
+ }
980
1005
leftoff = OffsetNumberNext (leftoff );
981
1006
982
1007
/*
@@ -998,29 +1023,49 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
998
1023
{
999
1024
if (newitemonleft )
1000
1025
{
1001
- _bt_pgaddtup (rel , leftpage , newitemsz , newitem , leftoff ,
1002
- "left sibling" );
1026
+ if (!_bt_pgaddtup (leftpage , newitemsz , newitem , leftoff ))
1027
+ {
1028
+ memset (rightpage , 0 , BufferGetPageSize (rbuf ));
1029
+ elog (ERROR , "failed to add new item to the left sibling"
1030
+ " while splitting block %u of index \"%s\"" ,
1031
+ origpagenumber , RelationGetRelationName (rel ));
1032
+ }
1003
1033
leftoff = OffsetNumberNext (leftoff );
1004
1034
}
1005
1035
else
1006
1036
{
1007
- _bt_pgaddtup (rel , rightpage , newitemsz , newitem , rightoff ,
1008
- "right sibling" );
1037
+ if (!_bt_pgaddtup (rightpage , newitemsz , newitem , rightoff ))
1038
+ {
1039
+ memset (rightpage , 0 , BufferGetPageSize (rbuf ));
1040
+ elog (ERROR , "failed to add new item to the right sibling"
1041
+ " while splitting block %u of index \"%s\"" ,
1042
+ origpagenumber , RelationGetRelationName (rel ));
1043
+ }
1009
1044
rightoff = OffsetNumberNext (rightoff );
1010
1045
}
1011
1046
}
1012
1047
1013
1048
/* decide which page to put it on */
1014
1049
if (i < firstright )
1015
1050
{
1016
- _bt_pgaddtup (rel , leftpage , itemsz , item , leftoff ,
1017
- "left sibling" );
1051
+ if (!_bt_pgaddtup (leftpage , itemsz , item , leftoff ))
1052
+ {
1053
+ memset (rightpage , 0 , BufferGetPageSize (rbuf ));
1054
+ elog (ERROR , "failed to add old item to the left sibling"
1055
+ " while splitting block %u of index \"%s\"" ,
1056
+ origpagenumber , RelationGetRelationName (rel ));
1057
+ }
1018
1058
leftoff = OffsetNumberNext (leftoff );
1019
1059
}
1020
1060
else
1021
1061
{
1022
- _bt_pgaddtup (rel , rightpage , itemsz , item , rightoff ,
1023
- "right sibling" );
1062
+ if (!_bt_pgaddtup (rightpage , itemsz , item , rightoff ))
1063
+ {
1064
+ memset (rightpage , 0 , BufferGetPageSize (rbuf ));
1065
+ elog (ERROR , "failed to add old item to the right sibling"
1066
+ " while splitting block %u of index \"%s\"" ,
1067
+ origpagenumber , RelationGetRelationName (rel ));
1068
+ }
1024
1069
rightoff = OffsetNumberNext (rightoff );
1025
1070
}
1026
1071
}
@@ -1034,8 +1079,13 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
1034
1079
* not be splitting the page).
1035
1080
*/
1036
1081
Assert (!newitemonleft );
1037
- _bt_pgaddtup (rel , rightpage , newitemsz , newitem , rightoff ,
1038
- "right sibling" );
1082
+ if (!_bt_pgaddtup (rightpage , newitemsz , newitem , rightoff ))
1083
+ {
1084
+ memset (rightpage , 0 , BufferGetPageSize (rbuf ));
1085
+ elog (ERROR , "failed to add new item to the right sibling"
1086
+ " while splitting block %u of index \"%s\"" ,
1087
+ origpagenumber , RelationGetRelationName (rel ));
1088
+ }
1039
1089
rightoff = OffsetNumberNext (rightoff );
1040
1090
}
1041
1091
@@ -1047,16 +1097,19 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
1047
1097
* neighbors.
1048
1098
*/
1049
1099
1050
- if (!P_RIGHTMOST (ropaque ))
1100
+ if (!P_RIGHTMOST (oopaque ))
1051
1101
{
1052
- sbuf = _bt_getbuf (rel , ropaque -> btpo_next , BT_WRITE );
1102
+ sbuf = _bt_getbuf (rel , oopaque -> btpo_next , BT_WRITE );
1053
1103
spage = BufferGetPage (sbuf );
1054
1104
sopaque = (BTPageOpaque ) PageGetSpecialPointer (spage );
1055
- if (sopaque -> btpo_prev != ropaque -> btpo_prev )
1056
- elog (PANIC , "right sibling's left-link doesn't match: "
1057
- "block %u links to %u instead of expected %u in index \"%s\"" ,
1058
- ropaque -> btpo_next , sopaque -> btpo_prev , ropaque -> btpo_prev ,
1105
+ if (sopaque -> btpo_prev != origpagenumber )
1106
+ {
1107
+ memset (rightpage , 0 , BufferGetPageSize (rbuf ));
1108
+ elog (ERROR , "right sibling's left-link doesn't match: "
1109
+ "block %u links to %u instead of expected %u in index \"%s\"" ,
1110
+ oopaque -> btpo_next , sopaque -> btpo_prev , origpagenumber ,
1059
1111
RelationGetRelationName (rel ));
1112
+ }
1060
1113
1061
1114
/*
1062
1115
* Check to see if we can set the SPLIT_END flag in the right-hand
@@ -1081,8 +1134,7 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
1081
1134
*
1082
1135
* NO EREPORT(ERROR) till right sibling is updated. We can get away with
1083
1136
* not starting the critical section till here because we haven't been
1084
- * scribbling on the original page yet, and we don't care about the new
1085
- * sibling until it's linked into the btree.
1137
+ * scribbling on the original page yet; see comments above.
1086
1138
*/
1087
1139
START_CRIT_SECTION ();
1088
1140
@@ -1094,19 +1146,21 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
1094
1146
* (in the page management code) that the center of a page always be
1095
1147
* clean, and the most efficient way to guarantee this is just to compact
1096
1148
* the data by reinserting it into a new left page. (XXX the latter
1097
- * comment is probably obsolete.)
1149
+ * comment is probably obsolete; but in any case it's good to not scribble
1150
+ * on the original page until we enter the critical section.)
1098
1151
*
1099
1152
* We need to do this before writing the WAL record, so that XLogInsert
1100
1153
* can WAL log an image of the page if necessary.
1101
1154
*/
1102
1155
PageRestoreTempPage (leftpage , origpage );
1156
+ /* leftpage, lopaque must not be used below here */
1103
1157
1104
1158
MarkBufferDirty (buf );
1105
1159
MarkBufferDirty (rbuf );
1106
1160
1107
1161
if (!P_RIGHTMOST (ropaque ))
1108
1162
{
1109
- sopaque -> btpo_prev = BufferGetBlockNumber ( rbuf ) ;
1163
+ sopaque -> btpo_prev = rightpagenumber ;
1110
1164
MarkBufferDirty (sbuf );
1111
1165
}
1112
1166
@@ -1120,8 +1174,8 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
1120
1174
XLogRecData * lastrdata ;
1121
1175
1122
1176
xlrec .node = rel -> rd_node ;
1123
- xlrec .leftsib = BufferGetBlockNumber ( buf ) ;
1124
- xlrec .rightsib = BufferGetBlockNumber ( rbuf ) ;
1177
+ xlrec .leftsib = origpagenumber ;
1178
+ xlrec .rightsib = rightpagenumber ;
1125
1179
xlrec .rnext = ropaque -> btpo_next ;
1126
1180
xlrec .level = ropaque -> btpo .level ;
1127
1181
xlrec .firstright = firstright ;
@@ -1920,13 +1974,11 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
1920
1974
* we insert the tuples in order, so that the given itup_off does
1921
1975
* represent the final position of the tuple!
1922
1976
*/
1923
- static void
1924
- _bt_pgaddtup (Relation rel ,
1925
- Page page ,
1977
+ static bool
1978
+ _bt_pgaddtup (Page page ,
1926
1979
Size itemsize ,
1927
1980
IndexTuple itup ,
1928
- OffsetNumber itup_off ,
1929
- const char * where )
1981
+ OffsetNumber itup_off )
1930
1982
{
1931
1983
BTPageOpaque opaque = (BTPageOpaque ) PageGetSpecialPointer (page );
1932
1984
IndexTupleData trunctuple ;
@@ -1941,8 +1993,9 @@ _bt_pgaddtup(Relation rel,
1941
1993
1942
1994
if (PageAddItem (page , (Item ) itup , itemsize , itup_off ,
1943
1995
false, false) == InvalidOffsetNumber )
1944
- elog (PANIC , "failed to add item to the %s in index \"%s\"" ,
1945
- where , RelationGetRelationName (rel ));
1996
+ return false;
1997
+
1998
+ return true;
1946
1999
}
1947
2000
1948
2001
/*
0 commit comments