Skip to content

Commit 14a6bce

Browse files
author
Artur Zakirov
committed
rumbuild, rumbuildempty, ruminsert work. Need to check rumInsertCleanup, vacuum
1 parent 8dc92cf commit 14a6bce

File tree

6 files changed

+66
-130
lines changed

6 files changed

+66
-130
lines changed

rum.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
#include "access/amapi.h"
1616
#include "access/genam.h"
17+
#include "access/generic_xlog.h"
1718
#include "access/gin.h"
1819
#include "access/itup.h"
1920
#include "fmgr.h"
@@ -506,9 +507,9 @@ extern bytea *rumoptions(Datum reloptions, bool validate);
506507
extern Datum rumhandler(PG_FUNCTION_ARGS);
507508
extern void initRumState(RumState *state, Relation index);
508509
extern Buffer RumNewBuffer(Relation index);
509-
extern void RumInitBuffer(Buffer buffer, uint32 flags);
510+
extern void RumInitBuffer(GenericXLogState *state, Buffer buffer, uint32 flags);
510511
extern void RumInitPage(Page page, uint32 f, Size pageSize);
511-
extern void RumInitMetabuffer(Relation index, Buffer metaBuffer);
512+
extern void RumInitMetabuffer(GenericXLogState *state, Buffer metaBuffer);
512513
extern int rumCompareEntries(RumState *rumstate, OffsetNumber attnum,
513514
Datum a, RumNullCategory categorya,
514515
Datum b, RumNullCategory categoryb);

rumbtree.c

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -431,7 +431,6 @@ rumInsertValue(Relation index, RumBtree btree, RumBtreeStack *stack,
431431

432432
rpage = GenericXLogRegisterBuffer(state, rbuffer, 0);
433433

434-
435434
RumPageGetOpaque(rpage)->rightlink = InvalidBlockNumber;
436435
RumPageGetOpaque(newlpage)->rightlink = BufferGetBlockNumber(rbuffer);
437436
((rumxlogSplit *) (rdata->data))->lblkno = BufferGetBlockNumber(lbuffer);

rumdatapage.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -592,6 +592,7 @@ RumDataPageAddItem(Page page, void *data, OffsetNumber offset)
592592
(maxoff - offset + 1) * RumSizeOfDataPageItem(page));
593593
}
594594
memcpy(ptr, data, RumSizeOfDataPageItem(page));
595+
((PageHeader) page)->pd_lower = ptr - page;
595596

596597
RumPageGetOpaque(page)->maxoff++;
597598
}

rumfast.c

Lines changed: 25 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -46,39 +46,27 @@ typedef struct KeyArray
4646
* Returns amount of free space left on the page.
4747
*/
4848
static int32
49-
writeListPage(Relation index, Buffer buffer, GenericXLogState *state,
49+
writeListPage(Relation index, Buffer buffer,
5050
IndexTuple *tuples, int32 ntuples, BlockNumber rightlink)
5151
{
5252
Page page;
5353
int32 i,
54-
freesize,
55-
size = 0;
56-
OffsetNumber l,
57-
off;
58-
char *workspace;
59-
char *ptr;
60-
GenericXLogState *state2;
54+
freesize;
55+
OffsetNumber l,
56+
off;
57+
GenericXLogState *state;
6158

62-
state2 = GenericXLogStart(index);
63-
/* workspace could be a local array; we use palloc for alignment */
64-
workspace = palloc(BLCKSZ);
59+
state = GenericXLogStart(index);
6560

66-
START_CRIT_SECTION();
67-
68-
RumInitBuffer(buffer, RUM_LIST);
69-
page = GenericXLogRegisterBuffer(state2, buffer, 0);
61+
page = GenericXLogRegisterBuffer(state, buffer, 0);
62+
RumInitPage(page, RUM_LIST, BufferGetPageSize(buffer));
7063

7164
off = FirstOffsetNumber;
72-
ptr = workspace;
7365

7466
for (i = 0; i < ntuples; i++)
7567
{
7668
int this_size = IndexTupleSize(tuples[i]);
7769

78-
memcpy(ptr, tuples[i], this_size);
79-
ptr += this_size;
80-
size += this_size;
81-
8270
l = PageAddItem(page, (Item) tuples[i], this_size, off, false, false);
8371

8472
if (l == InvalidOffsetNumber)
@@ -88,8 +76,6 @@ writeListPage(Relation index, Buffer buffer, GenericXLogState *state,
8876
off++;
8977
}
9078

91-
Assert(size <= BLCKSZ); /* else we overran workspace */
92-
9379
RumPageGetOpaque(page)->rightlink = rightlink;
9480

9581
/*
@@ -107,24 +93,17 @@ writeListPage(Relation index, Buffer buffer, GenericXLogState *state,
10793
RumPageGetOpaque(page)->maxoff = 0;
10894
}
10995

110-
// MarkBufferDirty(buffer);
111-
GenericXLogFinish(state2);
112-
11396
/* get free space before releasing buffer */
11497
freesize = PageGetExactFreeSpace(page);
115-
98+
GenericXLogFinish(state);
11699
UnlockReleaseBuffer(buffer);
117100

118-
END_CRIT_SECTION();
119-
120-
pfree(workspace);
121-
122101
return freesize;
123102
}
124103

125104
static void
126-
makeSublist(Relation index, GenericXLogState *state,
127-
IndexTuple *tuples, int32 ntuples, RumMetaPageData *res)
105+
makeSublist(Relation index, IndexTuple *tuples, int32 ntuples,
106+
RumMetaPageData *res)
128107
{
129108
Buffer curBuffer = InvalidBuffer;
130109
Buffer prevBuffer = InvalidBuffer;
@@ -147,7 +126,7 @@ makeSublist(Relation index, GenericXLogState *state,
147126
if (prevBuffer != InvalidBuffer)
148127
{
149128
res->nPendingPages++;
150-
writeListPage(index, prevBuffer, state,
129+
writeListPage(index, prevBuffer,
151130
tuples + startTuple,
152131
i - startTuple,
153132
BufferGetBlockNumber(curBuffer));
@@ -180,7 +159,7 @@ makeSublist(Relation index, GenericXLogState *state,
180159
* Write last page
181160
*/
182161
res->tail = BufferGetBlockNumber(curBuffer);
183-
res->tailFreeSize = writeListPage(index, curBuffer, state,
162+
res->tailFreeSize = writeListPage(index, curBuffer,
184163
tuples + startTuple,
185164
ntuples - startTuple,
186165
InvalidBlockNumber);
@@ -203,29 +182,17 @@ rumHeapTupleFastInsert(RumState *rumstate, RumTupleCollector *collector)
203182
Buffer metabuffer;
204183
Page metapage;
205184
RumMetaPageData *metadata = NULL;
206-
// RumXLogRecData rdata[2];
207185
Buffer buffer = InvalidBuffer;
208186
Page page = NULL;
209-
rumxlogUpdateMeta data;
210187
bool separateList = false;
211188
bool needCleanup = false;
212189
GenericXLogState *state;
213190

214191
if (collector->ntuples == 0)
215192
return;
216193

217-
data.node = index->rd_node;
218-
data.ntuples = 0;
219-
data.newRightlink = data.prevTail = InvalidBlockNumber;
220-
221-
// rdata[0].buffer = InvalidBuffer;
222-
// rdata[0].data = (char *) &data;
223-
// rdata[0].len = sizeof(rumxlogUpdateMeta);
224-
// rdata[0].next = NULL;
225-
226194
state = GenericXLogStart(rumstate->index);
227195
metabuffer = ReadBuffer(index, RUM_METAPAGE_BLKNO);
228-
metapage = GenericXLogRegisterBuffer(state, metabuffer, 0);
229196

230197
if (collector->sumsize + collector->ntuples * sizeof(ItemIdData) > RumListPageSize)
231198
{
@@ -237,7 +204,8 @@ rumHeapTupleFastInsert(RumState *rumstate, RumTupleCollector *collector)
237204
else
238205
{
239206
LockBuffer(metabuffer, RUM_EXCLUSIVE);
240-
metadata = RumPageGetMeta(metapage);
207+
metadata = RumPageGetMeta(BufferGetPage(metabuffer, NULL, NULL,
208+
BGP_NO_SNAPSHOT_TEST));
241209

242210
if (metadata->head == InvalidBlockNumber ||
243211
collector->sumsize + collector->ntuples * sizeof(ItemIdData) > metadata->tailFreeSize)
@@ -261,22 +229,20 @@ rumHeapTupleFastInsert(RumState *rumstate, RumTupleCollector *collector)
261229
RumMetaPageData sublist;
262230

263231
memset(&sublist, 0, sizeof(RumMetaPageData));
264-
makeSublist(index, state,
265-
collector->tuples, collector->ntuples, &sublist);
232+
makeSublist(index, collector->tuples, collector->ntuples, &sublist);
266233

267234
/*
268235
* metapage was unlocked, see above
269236
*/
270237
LockBuffer(metabuffer, RUM_EXCLUSIVE);
238+
metapage = GenericXLogRegisterBuffer(state, metabuffer, 0);
271239
metadata = RumPageGetMeta(metapage);
272240

273241
if (metadata->head == InvalidBlockNumber)
274242
{
275243
/*
276244
* Main list is empty, so just insert sublist as main list
277245
*/
278-
START_CRIT_SECTION();
279-
280246
metadata->head = sublist.head;
281247
metadata->tail = sublist.tail;
282248
metadata->tailFreeSize = sublist.tailFreeSize;
@@ -289,29 +255,15 @@ rumHeapTupleFastInsert(RumState *rumstate, RumTupleCollector *collector)
289255
/*
290256
* Merge lists
291257
*/
292-
data.prevTail = metadata->tail;
293-
data.newRightlink = sublist.head;
294258

295259
buffer = ReadBuffer(index, metadata->tail);
296260
LockBuffer(buffer, RUM_EXCLUSIVE);
297261
page = GenericXLogRegisterBuffer(state, buffer, 0);
298262

299-
// rdata[0].next = rdata + 1;
300-
//
301-
// rdata[1].buffer = buffer;
302-
// rdata[1].buffer_std = true;
303-
// rdata[1].data = NULL;
304-
// rdata[1].len = 0;
305-
// rdata[1].next = NULL;
306-
307263
Assert(RumPageGetOpaque(page)->rightlink == InvalidBlockNumber);
308264

309-
START_CRIT_SECTION();
310-
311265
RumPageGetOpaque(page)->rightlink = sublist.head;
312266

313-
// MarkBufferDirty(buffer);
314-
315267
metadata->tail = sublist.tail;
316268
metadata->tailFreeSize = sublist.tailFreeSize;
317269

@@ -328,7 +280,9 @@ rumHeapTupleFastInsert(RumState *rumstate, RumTupleCollector *collector)
328280
off;
329281
int i,
330282
tupsize;
331-
char *ptr;
283+
284+
metapage = GenericXLogRegisterBuffer(state, metabuffer, 0);
285+
metadata = RumPageGetMeta(metapage);
332286

333287
buffer = ReadBuffer(index, metadata->tail);
334288
LockBuffer(buffer, RUM_EXCLUSIVE);
@@ -337,18 +291,6 @@ rumHeapTupleFastInsert(RumState *rumstate, RumTupleCollector *collector)
337291
off = (PageIsEmpty(page)) ? FirstOffsetNumber :
338292
OffsetNumberNext(PageGetMaxOffsetNumber(page));
339293

340-
// rdata[0].next = rdata + 1;
341-
//
342-
// rdata[1].buffer = buffer;
343-
// rdata[1].buffer_std = true;
344-
ptr /*= rdata[1].data*/ = (char *) palloc(collector->sumsize);
345-
// rdata[1].len = collector->sumsize;
346-
// rdata[1].next = NULL;
347-
348-
data.ntuples = collector->ntuples;
349-
350-
START_CRIT_SECTION();
351-
352294
/*
353295
* Increase counter of heap tuples
354296
*/
@@ -368,29 +310,12 @@ rumHeapTupleFastInsert(RumState *rumstate, RumTupleCollector *collector)
368310
RelationGetRelationName(index));
369311
}
370312

371-
memcpy(ptr, collector->tuples[i], tupsize);
372-
ptr += tupsize;
373-
374313
off++;
375314
}
376315

377-
// Assert((ptr - rdata[1].data) <= collector->sumsize);
378-
379316
metadata->tailFreeSize = PageGetExactFreeSpace(page);
380-
381-
// MarkBufferDirty(buffer);
382317
}
383318

384-
/*
385-
* Write metabuffer, make xlog entry
386-
*/
387-
// MarkBufferDirty(metabuffer);
388-
389-
GenericXLogFinish(state);
390-
391-
if (buffer != InvalidBuffer)
392-
UnlockReleaseBuffer(buffer);
393-
394319
/*
395320
* Force pending list cleanup when it becomes too long. And,
396321
* rumInsertCleanup could take significant amount of time, so we prefer to
@@ -403,9 +328,12 @@ rumHeapTupleFastInsert(RumState *rumstate, RumTupleCollector *collector)
403328
if (metadata->nPendingPages * RUM_PAGE_FREESIZE > work_mem * 1024L)
404329
needCleanup = true;
405330

406-
UnlockReleaseBuffer(metabuffer);
331+
GenericXLogFinish(state);
407332

408-
END_CRIT_SECTION();
333+
if (buffer != InvalidBuffer)
334+
UnlockReleaseBuffer(buffer);
335+
336+
UnlockReleaseBuffer(metabuffer);
409337

410338
if (needCleanup)
411339
rumInsertCleanup(rumstate, false, NULL);

0 commit comments

Comments
 (0)