@@ -46,39 +46,27 @@ typedef struct KeyArray
46
46
* Returns amount of free space left on the page.
47
47
*/
48
48
static int32
49
- writeListPage (Relation index , Buffer buffer , GenericXLogState * state ,
49
+ writeListPage (Relation index , Buffer buffer ,
50
50
IndexTuple * tuples , int32 ntuples , BlockNumber rightlink )
51
51
{
52
52
Page page ;
53
53
int32 i ,
54
- freesize ,
55
- size = 0 ;
56
- OffsetNumber l ,
57
- off ;
58
- char * workspace ;
59
- char * ptr ;
60
- GenericXLogState * state2 ;
54
+ freesize ;
55
+ OffsetNumber l ,
56
+ off ;
57
+ GenericXLogState * state ;
61
58
62
- state2 = GenericXLogStart (index );
63
- /* workspace could be a local array; we use palloc for alignment */
64
- workspace = palloc (BLCKSZ );
59
+ state = GenericXLogStart (index );
65
60
66
- START_CRIT_SECTION ();
67
-
68
- RumInitBuffer (buffer , RUM_LIST );
69
- page = GenericXLogRegisterBuffer (state2 , buffer , 0 );
61
+ page = GenericXLogRegisterBuffer (state , buffer , 0 );
62
+ RumInitPage (page , RUM_LIST , BufferGetPageSize (buffer ));
70
63
71
64
off = FirstOffsetNumber ;
72
- ptr = workspace ;
73
65
74
66
for (i = 0 ; i < ntuples ; i ++ )
75
67
{
76
68
int this_size = IndexTupleSize (tuples [i ]);
77
69
78
- memcpy (ptr , tuples [i ], this_size );
79
- ptr += this_size ;
80
- size += this_size ;
81
-
82
70
l = PageAddItem (page , (Item ) tuples [i ], this_size , off , false, false);
83
71
84
72
if (l == InvalidOffsetNumber )
@@ -88,8 +76,6 @@ writeListPage(Relation index, Buffer buffer, GenericXLogState *state,
88
76
off ++ ;
89
77
}
90
78
91
- Assert (size <= BLCKSZ ); /* else we overran workspace */
92
-
93
79
RumPageGetOpaque (page )-> rightlink = rightlink ;
94
80
95
81
/*
@@ -107,24 +93,17 @@ writeListPage(Relation index, Buffer buffer, GenericXLogState *state,
107
93
RumPageGetOpaque (page )-> maxoff = 0 ;
108
94
}
109
95
110
- // MarkBufferDirty(buffer);
111
- GenericXLogFinish (state2 );
112
-
113
96
/* get free space before releasing buffer */
114
97
freesize = PageGetExactFreeSpace (page );
115
-
98
+ GenericXLogFinish ( state );
116
99
UnlockReleaseBuffer (buffer );
117
100
118
- END_CRIT_SECTION ();
119
-
120
- pfree (workspace );
121
-
122
101
return freesize ;
123
102
}
124
103
125
104
static void
126
- makeSublist (Relation index , GenericXLogState * state ,
127
- IndexTuple * tuples , int32 ntuples , RumMetaPageData * res )
105
+ makeSublist (Relation index , IndexTuple * tuples , int32 ntuples ,
106
+ RumMetaPageData * res )
128
107
{
129
108
Buffer curBuffer = InvalidBuffer ;
130
109
Buffer prevBuffer = InvalidBuffer ;
@@ -147,7 +126,7 @@ makeSublist(Relation index, GenericXLogState *state,
147
126
if (prevBuffer != InvalidBuffer )
148
127
{
149
128
res -> nPendingPages ++ ;
150
- writeListPage (index , prevBuffer , state ,
129
+ writeListPage (index , prevBuffer ,
151
130
tuples + startTuple ,
152
131
i - startTuple ,
153
132
BufferGetBlockNumber (curBuffer ));
@@ -180,7 +159,7 @@ makeSublist(Relation index, GenericXLogState *state,
180
159
* Write last page
181
160
*/
182
161
res -> tail = BufferGetBlockNumber (curBuffer );
183
- res -> tailFreeSize = writeListPage (index , curBuffer , state ,
162
+ res -> tailFreeSize = writeListPage (index , curBuffer ,
184
163
tuples + startTuple ,
185
164
ntuples - startTuple ,
186
165
InvalidBlockNumber );
@@ -203,29 +182,17 @@ rumHeapTupleFastInsert(RumState *rumstate, RumTupleCollector *collector)
203
182
Buffer metabuffer ;
204
183
Page metapage ;
205
184
RumMetaPageData * metadata = NULL ;
206
- // RumXLogRecData rdata[2];
207
185
Buffer buffer = InvalidBuffer ;
208
186
Page page = NULL ;
209
- rumxlogUpdateMeta data ;
210
187
bool separateList = false;
211
188
bool needCleanup = false;
212
189
GenericXLogState * state ;
213
190
214
191
if (collector -> ntuples == 0 )
215
192
return ;
216
193
217
- data .node = index -> rd_node ;
218
- data .ntuples = 0 ;
219
- data .newRightlink = data .prevTail = InvalidBlockNumber ;
220
-
221
- // rdata[0].buffer = InvalidBuffer;
222
- // rdata[0].data = (char *) &data;
223
- // rdata[0].len = sizeof(rumxlogUpdateMeta);
224
- // rdata[0].next = NULL;
225
-
226
194
state = GenericXLogStart (rumstate -> index );
227
195
metabuffer = ReadBuffer (index , RUM_METAPAGE_BLKNO );
228
- metapage = GenericXLogRegisterBuffer (state , metabuffer , 0 );
229
196
230
197
if (collector -> sumsize + collector -> ntuples * sizeof (ItemIdData ) > RumListPageSize )
231
198
{
@@ -237,7 +204,8 @@ rumHeapTupleFastInsert(RumState *rumstate, RumTupleCollector *collector)
237
204
else
238
205
{
239
206
LockBuffer (metabuffer , RUM_EXCLUSIVE );
240
- metadata = RumPageGetMeta (metapage );
207
+ metadata = RumPageGetMeta (BufferGetPage (metabuffer , NULL , NULL ,
208
+ BGP_NO_SNAPSHOT_TEST ));
241
209
242
210
if (metadata -> head == InvalidBlockNumber ||
243
211
collector -> sumsize + collector -> ntuples * sizeof (ItemIdData ) > metadata -> tailFreeSize )
@@ -261,22 +229,20 @@ rumHeapTupleFastInsert(RumState *rumstate, RumTupleCollector *collector)
261
229
RumMetaPageData sublist ;
262
230
263
231
memset (& sublist , 0 , sizeof (RumMetaPageData ));
264
- makeSublist (index , state ,
265
- collector -> tuples , collector -> ntuples , & sublist );
232
+ makeSublist (index , collector -> tuples , collector -> ntuples , & sublist );
266
233
267
234
/*
268
235
* metapage was unlocked, see above
269
236
*/
270
237
LockBuffer (metabuffer , RUM_EXCLUSIVE );
238
+ metapage = GenericXLogRegisterBuffer (state , metabuffer , 0 );
271
239
metadata = RumPageGetMeta (metapage );
272
240
273
241
if (metadata -> head == InvalidBlockNumber )
274
242
{
275
243
/*
276
244
* Main list is empty, so just insert sublist as main list
277
245
*/
278
- START_CRIT_SECTION ();
279
-
280
246
metadata -> head = sublist .head ;
281
247
metadata -> tail = sublist .tail ;
282
248
metadata -> tailFreeSize = sublist .tailFreeSize ;
@@ -289,29 +255,15 @@ rumHeapTupleFastInsert(RumState *rumstate, RumTupleCollector *collector)
289
255
/*
290
256
* Merge lists
291
257
*/
292
- data .prevTail = metadata -> tail ;
293
- data .newRightlink = sublist .head ;
294
258
295
259
buffer = ReadBuffer (index , metadata -> tail );
296
260
LockBuffer (buffer , RUM_EXCLUSIVE );
297
261
page = GenericXLogRegisterBuffer (state , buffer , 0 );
298
262
299
- // rdata[0].next = rdata + 1;
300
- //
301
- // rdata[1].buffer = buffer;
302
- // rdata[1].buffer_std = true;
303
- // rdata[1].data = NULL;
304
- // rdata[1].len = 0;
305
- // rdata[1].next = NULL;
306
-
307
263
Assert (RumPageGetOpaque (page )-> rightlink == InvalidBlockNumber );
308
264
309
- START_CRIT_SECTION ();
310
-
311
265
RumPageGetOpaque (page )-> rightlink = sublist .head ;
312
266
313
- // MarkBufferDirty(buffer);
314
-
315
267
metadata -> tail = sublist .tail ;
316
268
metadata -> tailFreeSize = sublist .tailFreeSize ;
317
269
@@ -328,7 +280,9 @@ rumHeapTupleFastInsert(RumState *rumstate, RumTupleCollector *collector)
328
280
off ;
329
281
int i ,
330
282
tupsize ;
331
- char * ptr ;
283
+
284
+ metapage = GenericXLogRegisterBuffer (state , metabuffer , 0 );
285
+ metadata = RumPageGetMeta (metapage );
332
286
333
287
buffer = ReadBuffer (index , metadata -> tail );
334
288
LockBuffer (buffer , RUM_EXCLUSIVE );
@@ -337,18 +291,6 @@ rumHeapTupleFastInsert(RumState *rumstate, RumTupleCollector *collector)
337
291
off = (PageIsEmpty (page )) ? FirstOffsetNumber :
338
292
OffsetNumberNext (PageGetMaxOffsetNumber (page ));
339
293
340
- // rdata[0].next = rdata + 1;
341
- //
342
- // rdata[1].buffer = buffer;
343
- // rdata[1].buffer_std = true;
344
- ptr /*= rdata[1].data*/ = (char * ) palloc (collector -> sumsize );
345
- // rdata[1].len = collector->sumsize;
346
- // rdata[1].next = NULL;
347
-
348
- data .ntuples = collector -> ntuples ;
349
-
350
- START_CRIT_SECTION ();
351
-
352
294
/*
353
295
* Increase counter of heap tuples
354
296
*/
@@ -368,29 +310,12 @@ rumHeapTupleFastInsert(RumState *rumstate, RumTupleCollector *collector)
368
310
RelationGetRelationName (index ));
369
311
}
370
312
371
- memcpy (ptr , collector -> tuples [i ], tupsize );
372
- ptr += tupsize ;
373
-
374
313
off ++ ;
375
314
}
376
315
377
- // Assert((ptr - rdata[1].data) <= collector->sumsize);
378
-
379
316
metadata -> tailFreeSize = PageGetExactFreeSpace (page );
380
-
381
- // MarkBufferDirty(buffer);
382
317
}
383
318
384
- /*
385
- * Write metabuffer, make xlog entry
386
- */
387
- // MarkBufferDirty(metabuffer);
388
-
389
- GenericXLogFinish (state );
390
-
391
- if (buffer != InvalidBuffer )
392
- UnlockReleaseBuffer (buffer );
393
-
394
319
/*
395
320
* Force pending list cleanup when it becomes too long. And,
396
321
* rumInsertCleanup could take significant amount of time, so we prefer to
@@ -403,9 +328,12 @@ rumHeapTupleFastInsert(RumState *rumstate, RumTupleCollector *collector)
403
328
if (metadata -> nPendingPages * RUM_PAGE_FREESIZE > work_mem * 1024L )
404
329
needCleanup = true;
405
330
406
- UnlockReleaseBuffer ( metabuffer );
331
+ GenericXLogFinish ( state );
407
332
408
- END_CRIT_SECTION ();
333
+ if (buffer != InvalidBuffer )
334
+ UnlockReleaseBuffer (buffer );
335
+
336
+ UnlockReleaseBuffer (metabuffer );
409
337
410
338
if (needCleanup )
411
339
rumInsertCleanup (rumstate , false, NULL );
0 commit comments