@@ -103,13 +103,21 @@ typedef struct ReorderBufferTupleCidEnt
103
103
CommandId combocid ; /* just for debugging */
104
104
} ReorderBufferTupleCidEnt ;
105
105
106
+ /* Virtual file descriptor with file offset tracking */
107
+ typedef struct TXNEntryFile
108
+ {
109
+ File vfd ; /* -1 when the file is closed */
110
+ off_t curOffset ; /* offset for next write or read. Reset to 0
111
+ * when vfd is opened. */
112
+ } TXNEntryFile ;
113
+
106
114
/* k-way in-order change iteration support structures */
107
115
typedef struct ReorderBufferIterTXNEntry
108
116
{
109
117
XLogRecPtr lsn ;
110
118
ReorderBufferChange * change ;
111
119
ReorderBufferTXN * txn ;
112
- int fd ;
120
+ TXNEntryFile file ;
113
121
XLogSegNo segno ;
114
122
} ReorderBufferIterTXNEntry ;
115
123
@@ -178,7 +186,8 @@ static void AssertTXNLsnOrder(ReorderBuffer *rb);
178
186
* subtransactions
179
187
* ---------------------------------------
180
188
*/
181
- static ReorderBufferIterTXNState * ReorderBufferIterTXNInit (ReorderBuffer * rb , ReorderBufferTXN * txn );
189
+ static void ReorderBufferIterTXNInit (ReorderBuffer * rb , ReorderBufferTXN * txn ,
190
+ ReorderBufferIterTXNState * volatile * iter_state );
182
191
static ReorderBufferChange * ReorderBufferIterTXNNext (ReorderBuffer * rb , ReorderBufferIterTXNState * state );
183
192
static void ReorderBufferIterTXNFinish (ReorderBuffer * rb ,
184
193
ReorderBufferIterTXNState * state );
@@ -194,7 +203,7 @@ static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
194
203
static void ReorderBufferSerializeChange (ReorderBuffer * rb , ReorderBufferTXN * txn ,
195
204
int fd , ReorderBufferChange * change );
196
205
static Size ReorderBufferRestoreChanges (ReorderBuffer * rb , ReorderBufferTXN * txn ,
197
- int * fd , XLogSegNo * segno );
206
+ TXNEntryFile * file , XLogSegNo * segno );
198
207
static void ReorderBufferRestoreChange (ReorderBuffer * rb , ReorderBufferTXN * txn ,
199
208
char * change );
200
209
static void ReorderBufferRestoreCleanup (ReorderBuffer * rb , ReorderBufferTXN * txn );
@@ -945,15 +954,23 @@ ReorderBufferIterCompare(Datum a, Datum b, void *arg)
945
954
/*
946
955
* Allocate & initialize an iterator which iterates in lsn order over a
947
956
* transaction and all its subtransactions.
957
+ *
958
+ * Note: The iterator state is returned through iter_state parameter rather
959
+ * than the function's return value. This is because the state gets cleaned up
960
+ * in a PG_CATCH block in the caller, so we want to make sure the caller gets
961
+ * back the state even if this function throws an exception.
948
962
*/
949
- static ReorderBufferIterTXNState *
950
- ReorderBufferIterTXNInit (ReorderBuffer * rb , ReorderBufferTXN * txn )
963
+ static void
964
+ ReorderBufferIterTXNInit (ReorderBuffer * rb , ReorderBufferTXN * txn ,
965
+ ReorderBufferIterTXNState * volatile * iter_state )
951
966
{
952
967
Size nr_txns = 0 ;
953
968
ReorderBufferIterTXNState * state ;
954
969
dlist_iter cur_txn_i ;
955
970
int32 off ;
956
971
972
+ * iter_state = NULL ;
973
+
957
974
/*
958
975
* Calculate the size of our heap: one element for every transaction that
959
976
* contains changes. (Besides the transactions already in the reorder
@@ -988,7 +1005,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
988
1005
989
1006
for (off = 0 ; off < state -> nr_txns ; off ++ )
990
1007
{
991
- state -> entries [off ].fd = -1 ;
1008
+ state -> entries [off ].file . vfd = -1 ;
992
1009
state -> entries [off ].segno = 0 ;
993
1010
}
994
1011
@@ -997,6 +1014,9 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
997
1014
ReorderBufferIterCompare ,
998
1015
state );
999
1016
1017
+ /* Now that the state fields are initialized, it is safe to return it. */
1018
+ * iter_state = state ;
1019
+
1000
1020
/*
1001
1021
* Now insert items into the binary heap, in an unordered fashion. (We
1002
1022
* will run a heap assembly step at the end; this is more efficient.)
@@ -1013,7 +1033,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
1013
1033
{
1014
1034
/* serialize remaining changes */
1015
1035
ReorderBufferSerializeTXN (rb , txn );
1016
- ReorderBufferRestoreChanges (rb , txn , & state -> entries [off ].fd ,
1036
+ ReorderBufferRestoreChanges (rb , txn , & state -> entries [off ].file ,
1017
1037
& state -> entries [off ].segno );
1018
1038
}
1019
1039
@@ -1043,7 +1063,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
1043
1063
/* serialize remaining changes */
1044
1064
ReorderBufferSerializeTXN (rb , cur_txn );
1045
1065
ReorderBufferRestoreChanges (rb , cur_txn ,
1046
- & state -> entries [off ].fd ,
1066
+ & state -> entries [off ].file ,
1047
1067
& state -> entries [off ].segno );
1048
1068
}
1049
1069
cur_change = dlist_head_element (ReorderBufferChange , node ,
@@ -1059,8 +1079,6 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
1059
1079
1060
1080
/* assemble a valid binary heap */
1061
1081
binaryheap_build (state -> heap );
1062
-
1063
- return state ;
1064
1082
}
1065
1083
1066
1084
/*
@@ -1124,7 +1142,7 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
1124
1142
dlist_delete (& change -> node );
1125
1143
dlist_push_tail (& state -> old_change , & change -> node );
1126
1144
1127
- if (ReorderBufferRestoreChanges (rb , entry -> txn , & entry -> fd ,
1145
+ if (ReorderBufferRestoreChanges (rb , entry -> txn , & entry -> file ,
1128
1146
& state -> entries [off ].segno ))
1129
1147
{
1130
1148
/* successfully restored changes from disk */
@@ -1163,8 +1181,8 @@ ReorderBufferIterTXNFinish(ReorderBuffer *rb,
1163
1181
1164
1182
for (off = 0 ; off < state -> nr_txns ; off ++ )
1165
1183
{
1166
- if (state -> entries [off ].fd != -1 )
1167
- CloseTransientFile (state -> entries [off ].fd );
1184
+ if (state -> entries [off ].file . vfd != -1 )
1185
+ FileClose (state -> entries [off ].file . vfd );
1168
1186
}
1169
1187
1170
1188
/* free memory we might have "leaked" in the last *Next call */
@@ -1500,7 +1518,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
1500
1518
1501
1519
rb -> begin (rb , txn );
1502
1520
1503
- iterstate = ReorderBufferIterTXNInit (rb , txn );
1521
+ ReorderBufferIterTXNInit (rb , txn , & iterstate );
1504
1522
while ((change = ReorderBufferIterTXNNext (rb , iterstate )) != NULL )
1505
1523
{
1506
1524
Relation relation = NULL ;
@@ -2517,11 +2535,12 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
2517
2535
*/
2518
2536
static Size
2519
2537
ReorderBufferRestoreChanges (ReorderBuffer * rb , ReorderBufferTXN * txn ,
2520
- int * fd , XLogSegNo * segno )
2538
+ TXNEntryFile * file , XLogSegNo * segno )
2521
2539
{
2522
2540
Size restored = 0 ;
2523
2541
XLogSegNo last_segno ;
2524
2542
dlist_mutable_iter cleanup_iter ;
2543
+ File * fd = & file -> vfd ;
2525
2544
2526
2545
Assert (txn -> first_lsn != InvalidXLogRecPtr );
2527
2546
Assert (txn -> final_lsn != InvalidXLogRecPtr );
@@ -2562,7 +2581,11 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
2562
2581
ReorderBufferSerializedPath (path , MyReplicationSlot , txn -> xid ,
2563
2582
* segno );
2564
2583
2565
- * fd = OpenTransientFile (path , O_RDONLY | PG_BINARY );
2584
+ * fd = PathNameOpenFile (path , O_RDONLY | PG_BINARY );
2585
+
2586
+ /* No harm in resetting the offset even in case of failure */
2587
+ file -> curOffset = 0 ;
2588
+
2566
2589
if (* fd < 0 && errno == ENOENT )
2567
2590
{
2568
2591
* fd = -1 ;
@@ -2582,14 +2605,14 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
2582
2605
* end of this file.
2583
2606
*/
2584
2607
ReorderBufferSerializeReserve (rb , sizeof (ReorderBufferDiskChange ));
2585
- pgstat_report_wait_start ( WAIT_EVENT_REORDER_BUFFER_READ );
2586
- readBytes = read ( * fd , rb -> outbuf , sizeof (ReorderBufferDiskChange ));
2587
- pgstat_report_wait_end ( );
2608
+ readBytes = FileRead ( file -> vfd , rb -> outbuf ,
2609
+ sizeof (ReorderBufferDiskChange ),
2610
+ file -> curOffset , WAIT_EVENT_REORDER_BUFFER_READ );
2588
2611
2589
2612
/* eof */
2590
2613
if (readBytes == 0 )
2591
2614
{
2592
- CloseTransientFile (* fd );
2615
+ FileClose (* fd );
2593
2616
* fd = -1 ;
2594
2617
(* segno )++ ;
2595
2618
continue ;
@@ -2605,16 +2628,19 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
2605
2628
readBytes ,
2606
2629
(uint32 ) sizeof (ReorderBufferDiskChange ))));
2607
2630
2631
+ file -> curOffset += readBytes ;
2632
+
2608
2633
ondisk = (ReorderBufferDiskChange * ) rb -> outbuf ;
2609
2634
2610
2635
ReorderBufferSerializeReserve (rb ,
2611
2636
sizeof (ReorderBufferDiskChange ) + ondisk -> size );
2612
2637
ondisk = (ReorderBufferDiskChange * ) rb -> outbuf ;
2613
2638
2614
- pgstat_report_wait_start (WAIT_EVENT_REORDER_BUFFER_READ );
2615
- readBytes = read (* fd , rb -> outbuf + sizeof (ReorderBufferDiskChange ),
2616
- ondisk -> size - sizeof (ReorderBufferDiskChange ));
2617
- pgstat_report_wait_end ();
2639
+ readBytes = FileRead (file -> vfd ,
2640
+ rb -> outbuf + sizeof (ReorderBufferDiskChange ),
2641
+ ondisk -> size - sizeof (ReorderBufferDiskChange ),
2642
+ file -> curOffset ,
2643
+ WAIT_EVENT_REORDER_BUFFER_READ );
2618
2644
2619
2645
if (readBytes < 0 )
2620
2646
ereport (ERROR ,
@@ -2627,6 +2653,8 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
2627
2653
readBytes ,
2628
2654
(uint32 ) (ondisk -> size - sizeof (ReorderBufferDiskChange )))));
2629
2655
2656
+ file -> curOffset += readBytes ;
2657
+
2630
2658
/*
2631
2659
* ok, read a full change from disk, now restore it into proper
2632
2660
* in-memory format
0 commit comments