Skip to content

Commit 89395bf

Browse files
committed
Improve gist XLOG code to follow the coding rules needed to prevent
torn-page problems. This introduces some issues of its own, mainly that there are now some critical sections of unreasonably broad scope, but it's a step forward anyway. Further cleanup will require some code refactoring that I'd prefer to get Oleg and Teodor involved in.
1 parent 4243f23 commit 89395bf

File tree

4 files changed

+229
-260
lines changed

4 files changed

+229
-260
lines changed

src/backend/access/gist/gist.c

Lines changed: 53 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
* Portions Copyright (c) 1994, Regents of the University of California
99
*
1010
* IDENTIFICATION
11-
* $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.129 2006/03/05 15:58:20 momjian Exp $
11+
* $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.130 2006/03/30 23:03:09 tgl Exp $
1212
*
1313
*-------------------------------------------------------------------------
1414
*/
@@ -90,6 +90,7 @@ gistbuild(PG_FUNCTION_ARGS)
9090
double reltuples;
9191
GISTBuildState buildstate;
9292
Buffer buffer;
93+
Page page;
9394

9495
/*
9596
* We expect to be called exactly once for any index relation. If that's
@@ -104,33 +105,33 @@ gistbuild(PG_FUNCTION_ARGS)
104105

105106
/* initialize the root page */
106107
buffer = gistNewBuffer(index);
108+
Assert(BufferGetBlockNumber(buffer) == GIST_ROOT_BLKNO);
109+
page = BufferGetPage(buffer);
110+
111+
START_CRIT_SECTION();
112+
107113
GISTInitBuffer(buffer, F_LEAF);
108114
if (!index->rd_istemp)
109115
{
110116
XLogRecPtr recptr;
111117
XLogRecData rdata;
112-
Page page;
113118

114-
rdata.buffer = InvalidBuffer;
115119
rdata.data = (char *) &(index->rd_node);
116120
rdata.len = sizeof(RelFileNode);
121+
rdata.buffer = InvalidBuffer;
117122
rdata.next = NULL;
118123

119-
page = BufferGetPage(buffer);
120-
121-
START_CRIT_SECTION();
122-
123124
recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_CREATE_INDEX, &rdata);
124125
PageSetLSN(page, recptr);
125126
PageSetTLI(page, ThisTimeLineID);
126-
127-
END_CRIT_SECTION();
128127
}
129128
else
130-
PageSetLSN(BufferGetPage(buffer), XLogRecPtrForTemp);
129+
PageSetLSN(page, XLogRecPtrForTemp);
131130
LockBuffer(buffer, GIST_UNLOCK);
132131
WriteBuffer(buffer);
133132

133+
END_CRIT_SECTION();
134+
134135
/* build the index */
135136
buildstate.numindexattrs = indexInfo->ii_NumIndexAttrs;
136137
buildstate.indtuples = 0;
@@ -305,13 +306,27 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate)
305306
bool is_splitted = false;
306307
bool is_leaf = (GistPageIsLeaf(state->stack->page)) ? true : false;
307308

309+
/*
310+
* XXX this code really ought to work by locking, but not modifying,
311+
* all the buffers it needs; then starting a critical section; then
312+
* modifying the buffers in an already-determined way and writing an
313+
* XLOG record to reflect that. Since it doesn't, we've got to put
314+
* a critical section around the entire process, which is horrible
315+
* from a robustness point of view.
316+
*/
317+
START_CRIT_SECTION();
308318

309319
if (!is_leaf)
310320

311321
/*
312322
* This node's key has been modified, either because a child split
313323
* occurred or because we needed to adjust our key for an insert in a
314324
* child node. Therefore, remove the old version of this node's key.
325+
*
326+
* Note: for WAL replay, in the non-split case we handle this by
327+
* setting up a one-element todelete array; in the split case, it's
328+
* handled implicitly because the tuple vector passed to gistSplit
329+
* won't include this tuple.
315330
*/
316331

317332
PageIndexTupleDelete(state->stack->page, state->stack->childoffnum);
@@ -336,9 +351,7 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate)
336351
XLogRecData *rdata;
337352

338353
rdata = formSplitRdata(state->r->rd_node, state->stack->blkno,
339-
&(state->key), dist);
340-
341-
START_CRIT_SECTION();
354+
is_leaf, &(state->key), dist);
342355

343356
recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata);
344357
ptr = dist;
@@ -348,8 +361,6 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate)
348361
PageSetTLI(BufferGetPage(ptr->buffer), ThisTimeLineID);
349362
ptr = ptr->next;
350363
}
351-
352-
END_CRIT_SECTION();
353364
}
354365
else
355366
{
@@ -410,7 +421,6 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate)
410421
else
411422
ourpage = dist;
412423

413-
414424
/* now gets all needed data, and sets nsn's */
415425
page = (Page) BufferGetPage(ourpage->buffer);
416426
opaque = GistPageGetOpaque(page);
@@ -437,8 +447,11 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate)
437447
WriteBuffer(ptr->buffer);
438448
ptr = ptr->next;
439449
}
450+
451+
WriteNoReleaseBuffer(state->stack->buffer);
440452
}
441-
WriteNoReleaseBuffer(state->stack->buffer);
453+
454+
END_CRIT_SECTION();
442455
}
443456
else
444457
{
@@ -451,7 +464,7 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate)
451464
if (!state->r->rd_istemp)
452465
{
453466
OffsetNumber noffs = 0,
454-
offs[MAXALIGN(sizeof(OffsetNumber)) / sizeof(OffsetNumber)];
467+
offs[1];
455468
XLogRecPtr recptr;
456469
XLogRecData *rdata;
457470

@@ -462,17 +475,14 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate)
462475
noffs = 1;
463476
}
464477

465-
rdata = formUpdateRdata(state->r->rd_node, state->stack->blkno,
466-
offs, noffs, false, state->itup, state->ituplen,
478+
rdata = formUpdateRdata(state->r->rd_node, state->stack->buffer,
479+
offs, noffs, false,
480+
state->itup, state->ituplen,
467481
&(state->key));
468482

469-
START_CRIT_SECTION();
470-
471-
recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_ENTRY_UPDATE, rdata);
483+
recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata);
472484
PageSetLSN(state->stack->page, recptr);
473485
PageSetTLI(state->stack->page, ThisTimeLineID);
474-
475-
END_CRIT_SECTION();
476486
}
477487
else
478488
PageSetLSN(state->stack->page, XLogRecPtrForTemp);
@@ -481,6 +491,8 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate)
481491
state->needInsertComplete = false;
482492
WriteNoReleaseBuffer(state->stack->buffer);
483493

494+
END_CRIT_SECTION();
495+
484496
if (!is_leaf) /* small optimization: inform scan ablout
485497
* deleting... */
486498
gistadjscans(state->r, GISTOP_DEL, state->stack->blkno,
@@ -636,30 +648,14 @@ gistfindleaf(GISTInsertState *state, GISTSTATE *giststate)
636648
}
637649

638650
/*
639-
* Should have the same interface as XLogReadBuffer
640-
*/
641-
static Buffer
642-
gistReadAndLockBuffer(Relation r, BlockNumber blkno)
643-
{
644-
Buffer buffer = ReadBuffer(r, blkno);
645-
646-
LockBuffer(buffer, GIST_SHARE);
647-
return buffer;
648-
}
649-
650-
/*
651-
* Traverse the tree to find path from root page.
651+
* Traverse the tree to find path from root page to specified "child" block.
652652
*
653653
* returns from the begining of closest parent;
654654
*
655-
* Function is used in both regular and recovery mode, so must work with
656-
* different read functions (gistReadAndLockBuffer and XLogReadBuffer)
657-
*
658655
* To prevent deadlocks, this should lock only one page simultaneously.
659656
*/
660657
GISTInsertStack *
661-
gistFindPath(Relation r, BlockNumber child,
662-
Buffer (*myReadBuffer) (Relation, BlockNumber))
658+
gistFindPath(Relation r, BlockNumber child)
663659
{
664660
Page page;
665661
Buffer buffer;
@@ -677,7 +673,8 @@ gistFindPath(Relation r, BlockNumber child,
677673

678674
while (top && top->blkno != child)
679675
{
680-
buffer = myReadBuffer(r, top->blkno); /* locks buffer */
676+
buffer = ReadBuffer(r, top->blkno);
677+
LockBuffer(buffer, GIST_SHARE);
681678
gistcheckpage(r, buffer);
682679
page = (Page) BufferGetPage(buffer);
683680

@@ -833,7 +830,7 @@ gistFindCorrectParent(Relation r, GISTInsertStack *child)
833830
}
834831

835832
/* ok, find new path */
836-
ptr = parent = gistFindPath(r, child->blkno, gistReadAndLockBuffer);
833+
ptr = parent = gistFindPath(r, child->blkno);
837834
Assert(ptr != NULL);
838835

839836
/* read all buffers as expected by caller */
@@ -1192,27 +1189,31 @@ gistnewroot(Relation r, Buffer buffer, IndexTuple *itup, int len, ItemPointer ke
11921189

11931190
Assert(BufferGetBlockNumber(buffer) == GIST_ROOT_BLKNO);
11941191
page = BufferGetPage(buffer);
1195-
GISTInitBuffer(buffer, 0);
11961192

1193+
START_CRIT_SECTION();
1194+
1195+
GISTInitBuffer(buffer, 0); /* XXX not F_LEAF? */
11971196
gistfillbuffer(r, page, itup, len, FirstOffsetNumber);
1197+
11981198
if (!r->rd_istemp)
11991199
{
12001200
XLogRecPtr recptr;
12011201
XLogRecData *rdata;
12021202

1203-
rdata = formUpdateRdata(r->rd_node, GIST_ROOT_BLKNO,
1204-
NULL, 0, false, itup, len, key);
1205-
1206-
START_CRIT_SECTION();
1203+
rdata = formUpdateRdata(r->rd_node, buffer,
1204+
NULL, 0, false,
1205+
itup, len, key);
12071206

12081207
recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_NEW_ROOT, rdata);
12091208
PageSetLSN(page, recptr);
12101209
PageSetTLI(page, ThisTimeLineID);
1211-
1212-
END_CRIT_SECTION();
12131210
}
12141211
else
12151212
PageSetLSN(page, XLogRecPtrForTemp);
1213+
1214+
WriteNoReleaseBuffer(buffer);
1215+
1216+
END_CRIT_SECTION();
12161217
}
12171218

12181219
void

src/backend/access/gist/gistvacuum.c

Lines changed: 34 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
* Portions Copyright (c) 1994, Regents of the University of California
99
*
1010
* IDENTIFICATION
11-
* $PostgreSQL: pgsql/src/backend/access/gist/gistvacuum.c,v 1.16 2006/03/05 15:58:20 momjian Exp $
11+
* $PostgreSQL: pgsql/src/backend/access/gist/gistvacuum.c,v 1.17 2006/03/30 23:03:10 tgl Exp $
1212
*
1313
*-------------------------------------------------------------------------
1414
*/
@@ -80,6 +80,12 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion)
8080
page = (Page) BufferGetPage(buffer);
8181
maxoff = PageGetMaxOffsetNumber(page);
8282

83+
/*
84+
* XXX need to reduce scope of changes to page so we can make this
85+
* critical section less extensive
86+
*/
87+
START_CRIT_SECTION();
88+
8389
if (GistPageIsLeaf(page))
8490
{
8591
if (GistTuplesDeleted(page))
@@ -188,11 +194,9 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion)
188194
ItemPointerSet(&key, blkno, TUPLE_IS_VALID);
189195

190196
rdata = formSplitRdata(gv->index->rd_node, blkno,
191-
&key, dist);
197+
false, &key, dist);
192198
xlinfo = rdata->data;
193199

194-
START_CRIT_SECTION();
195-
196200
recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata);
197201
ptr = dist;
198202
while (ptr)
@@ -202,7 +206,6 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion)
202206
ptr = ptr->next;
203207
}
204208

205-
END_CRIT_SECTION();
206209
pfree(xlinfo);
207210
pfree(rdata);
208211
}
@@ -235,8 +238,6 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion)
235238
oldCtx = MemoryContextSwitchTo(gv->opCtx);
236239
gistnewroot(gv->index, buffer, res.itup, res.ituplen, &key);
237240
MemoryContextSwitchTo(oldCtx);
238-
239-
WriteNoReleaseBuffer(buffer);
240241
}
241242

242243
needwrite = false;
@@ -302,15 +303,14 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion)
302303
XLogRecPtr recptr;
303304
char *xlinfo;
304305

305-
rdata = formUpdateRdata(gv->index->rd_node, blkno, todelete, ntodelete,
306-
res.emptypage, addon, curlenaddon, NULL);
306+
rdata = formUpdateRdata(gv->index->rd_node, buffer,
307+
todelete, ntodelete, res.emptypage,
308+
addon, curlenaddon, NULL);
307309
xlinfo = rdata->data;
308310

309-
START_CRIT_SECTION();
310-
recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_ENTRY_UPDATE, rdata);
311+
recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata);
311312
PageSetLSN(page, recptr);
312313
PageSetTLI(page, ThisTimeLineID);
313-
END_CRIT_SECTION();
314314

315315
pfree(xlinfo);
316316
pfree(rdata);
@@ -322,6 +322,8 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion)
322322
else
323323
ReleaseBuffer(buffer);
324324

325+
END_CRIT_SECTION();
326+
325327
if (ncompleted && !gv->index->rd_istemp)
326328
gistxlogInsertCompletion(gv->index->rd_node, completed, ncompleted);
327329

@@ -579,6 +581,17 @@ gistbulkdelete(PG_FUNCTION_ARGS)
579581
*/
580582
pushStackIfSplited(page, stack);
581583

584+
/*
585+
* Remove deletable tuples from page
586+
*
587+
* XXX try to make this critical section shorter. Could do it
588+
* by separating the callback loop from the actual tuple deletion,
589+
* but that would affect the definition of the todelete[] array
590+
* passed into the WAL record (because the indexes would all be
591+
* pre-deletion).
592+
*/
593+
START_CRIT_SECTION();
594+
582595
maxoff = PageGetMaxOffsetNumber(page);
583596

584597
for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
@@ -608,17 +621,17 @@ gistbulkdelete(PG_FUNCTION_ARGS)
608621
{
609622
XLogRecData *rdata;
610623
XLogRecPtr recptr;
611-
gistxlogEntryUpdate *xlinfo;
624+
gistxlogPageUpdate *xlinfo;
612625

613-
rdata = formUpdateRdata(rel->rd_node, stack->blkno, todelete, ntodelete,
614-
false, NULL, 0, NULL);
615-
xlinfo = (gistxlogEntryUpdate *) rdata->data;
626+
rdata = formUpdateRdata(rel->rd_node, buffer,
627+
todelete, ntodelete, false,
628+
NULL, 0,
629+
NULL);
630+
xlinfo = (gistxlogPageUpdate *) rdata->data;
616631

617-
START_CRIT_SECTION();
618-
recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_ENTRY_UPDATE, rdata);
632+
recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata);
619633
PageSetLSN(page, recptr);
620634
PageSetTLI(page, ThisTimeLineID);
621-
END_CRIT_SECTION();
622635

623636
pfree(xlinfo);
624637
pfree(rdata);
@@ -627,6 +640,8 @@ gistbulkdelete(PG_FUNCTION_ARGS)
627640
PageSetLSN(page, XLogRecPtrForTemp);
628641
WriteNoReleaseBuffer(buffer);
629642
}
643+
644+
END_CRIT_SECTION();
630645
}
631646
else
632647
{

0 commit comments

Comments
 (0)