@@ -101,7 +101,7 @@ typedef struct ReorderBufferIterTXNEntry
101
101
XLogRecPtr lsn ;
102
102
ReorderBufferChange * change ;
103
103
ReorderBufferTXN * txn ;
104
- int fd ;
104
+ File fd ;
105
105
XLogSegNo segno ;
106
106
} ReorderBufferIterTXNEntry ;
107
107
@@ -182,7 +182,8 @@ static void AssertTXNLsnOrder(ReorderBuffer *rb);
182
182
* subtransactions
183
183
* ---------------------------------------
184
184
*/
185
- static ReorderBufferIterTXNState * ReorderBufferIterTXNInit (ReorderBuffer * rb , ReorderBufferTXN * txn );
185
+ static void ReorderBufferIterTXNInit (ReorderBuffer * rb , ReorderBufferTXN * txn ,
186
+ ReorderBufferIterTXNState * volatile * iter_state );
186
187
static ReorderBufferChange *
187
188
ReorderBufferIterTXNNext (ReorderBuffer * rb , ReorderBufferIterTXNState * state );
188
189
static void ReorderBufferIterTXNFinish (ReorderBuffer * rb ,
@@ -199,7 +200,7 @@ static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
199
200
static void ReorderBufferSerializeChange (ReorderBuffer * rb , ReorderBufferTXN * txn ,
200
201
int fd , ReorderBufferChange * change );
201
202
static Size ReorderBufferRestoreChanges (ReorderBuffer * rb , ReorderBufferTXN * txn ,
202
- int * fd , XLogSegNo * segno );
203
+ File * fd , XLogSegNo * segno );
203
204
static void ReorderBufferRestoreChange (ReorderBuffer * rb , ReorderBufferTXN * txn ,
204
205
char * change );
205
206
static void ReorderBufferRestoreCleanup (ReorderBuffer * rb , ReorderBufferTXN * txn );
@@ -942,15 +943,23 @@ ReorderBufferIterCompare(Datum a, Datum b, void *arg)
942
943
/*
943
944
* Allocate & initialize an iterator which iterates in lsn order over a
944
945
* transaction and all its subtransactions.
946
+ *
947
+ * Note: The iterator state is returned through iter_state parameter rather
948
+ * than the function's return value. This is because the state gets cleaned up
949
+ * in a PG_CATCH block in the caller, so we want to make sure the caller gets
950
+ * back the state even if this function throws an exception.
945
951
*/
946
- static ReorderBufferIterTXNState *
947
- ReorderBufferIterTXNInit (ReorderBuffer * rb , ReorderBufferTXN * txn )
952
+ static void
953
+ ReorderBufferIterTXNInit (ReorderBuffer * rb , ReorderBufferTXN * txn ,
954
+ ReorderBufferIterTXNState * volatile * iter_state )
948
955
{
949
956
Size nr_txns = 0 ;
950
957
ReorderBufferIterTXNState * state ;
951
958
dlist_iter cur_txn_i ;
952
959
int32 off ;
953
960
961
+ * iter_state = NULL ;
962
+
954
963
/*
955
964
* Calculate the size of our heap: one element for every transaction that
956
965
* contains changes. (Besides the transactions already in the reorder
@@ -994,6 +1003,9 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
994
1003
ReorderBufferIterCompare ,
995
1004
state );
996
1005
1006
+ /* Now that the state fields are initialized, it is safe to return it. */
1007
+ * iter_state = state ;
1008
+
997
1009
/*
998
1010
* Now insert items into the binary heap, in an unordered fashion. (We
999
1011
* will run a heap assembly step at the end; this is more efficient.)
@@ -1056,8 +1068,6 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
1056
1068
1057
1069
/* assemble a valid binary heap */
1058
1070
binaryheap_build (state -> heap );
1059
-
1060
- return state ;
1061
1071
}
1062
1072
1063
1073
/*
@@ -1161,7 +1171,7 @@ ReorderBufferIterTXNFinish(ReorderBuffer *rb,
1161
1171
for (off = 0 ; off < state -> nr_txns ; off ++ )
1162
1172
{
1163
1173
if (state -> entries [off ].fd != -1 )
1164
- CloseTransientFile (state -> entries [off ].fd );
1174
+ FileClose (state -> entries [off ].fd );
1165
1175
}
1166
1176
1167
1177
/* free memory we might have "leaked" in the last *Next call */
@@ -1496,7 +1506,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
1496
1506
1497
1507
rb -> begin (rb , txn );
1498
1508
1499
- iterstate = ReorderBufferIterTXNInit (rb , txn );
1509
+ ReorderBufferIterTXNInit (rb , txn , & iterstate );
1500
1510
while ((change = ReorderBufferIterTXNNext (rb , iterstate )) != NULL )
1501
1511
{
1502
1512
Relation relation = NULL ;
@@ -2338,7 +2348,7 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
2338
2348
*/
2339
2349
static Size
2340
2350
ReorderBufferRestoreChanges (ReorderBuffer * rb , ReorderBufferTXN * txn ,
2341
- int * fd , XLogSegNo * segno )
2351
+ File * fd , XLogSegNo * segno )
2342
2352
{
2343
2353
Size restored = 0 ;
2344
2354
XLogSegNo last_segno ;
@@ -2383,7 +2393,7 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
2383
2393
ReorderBufferSerializedPath (path , MyReplicationSlot , txn -> xid ,
2384
2394
* segno );
2385
2395
2386
- * fd = OpenTransientFile (path , O_RDONLY | PG_BINARY , 0 );
2396
+ * fd = PathNameOpenFile (path , O_RDONLY | PG_BINARY , 0 );
2387
2397
if (* fd < 0 && errno == ENOENT )
2388
2398
{
2389
2399
* fd = -1 ;
@@ -2404,12 +2414,12 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
2404
2414
* end of this file.
2405
2415
*/
2406
2416
ReorderBufferSerializeReserve (rb , sizeof (ReorderBufferDiskChange ));
2407
- readBytes = read (* fd , rb -> outbuf , sizeof (ReorderBufferDiskChange ));
2417
+ readBytes = FileRead (* fd , rb -> outbuf , sizeof (ReorderBufferDiskChange ));
2408
2418
2409
2419
/* eof */
2410
2420
if (readBytes == 0 )
2411
2421
{
2412
- CloseTransientFile (* fd );
2422
+ FileClose (* fd );
2413
2423
* fd = -1 ;
2414
2424
(* segno )++ ;
2415
2425
continue ;
@@ -2431,8 +2441,8 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
2431
2441
sizeof (ReorderBufferDiskChange ) + ondisk -> size );
2432
2442
ondisk = (ReorderBufferDiskChange * ) rb -> outbuf ;
2433
2443
2434
- readBytes = read (* fd , rb -> outbuf + sizeof (ReorderBufferDiskChange ),
2435
- ondisk -> size - sizeof (ReorderBufferDiskChange ));
2444
+ readBytes = FileRead (* fd , rb -> outbuf + sizeof (ReorderBufferDiskChange ),
2445
+ ondisk -> size - sizeof (ReorderBufferDiskChange ));
2436
2446
2437
2447
if (readBytes < 0 )
2438
2448
ereport (ERROR ,
0 commit comments