@@ -129,8 +129,14 @@ bool log_replication_commands = false;
129
129
*/
130
130
bool wake_wal_senders = false;
131
131
132
- static WALOpenSegment * sendSeg = NULL ;
133
- static WALSegmentContext * sendCxt = NULL ;
132
+ /*
133
+ * Physical walsender does not use xlogreader to read WAL, but it does use a
134
+ * fake one to keep state. Logical walsender uses a proper xlogreader. Both
135
+ * keep the 'xlogreader' pointer to the right one, for the sake of common
136
+ * routines.
137
+ */
138
+ static XLogReaderState fake_xlogreader ;
139
+ static XLogReaderState * xlogreader ;
134
140
135
141
/*
136
142
* These variables keep track of the state of the timeline we're currently
@@ -248,8 +254,8 @@ static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
248
254
static TimeOffset LagTrackerRead (int head , XLogRecPtr lsn , TimestampTz now );
249
255
static bool TransactionIdInRecentPast (TransactionId xid , uint32 epoch );
250
256
251
- static int WalSndSegmentOpen (XLogReaderState * state , XLogSegNo nextSegNo ,
252
- WALSegmentContext * segcxt , TimeLineID * tli_p );
257
+ static void WalSndSegmentOpen (XLogReaderState * state , XLogSegNo nextSegNo ,
258
+ TimeLineID * tli_p );
253
259
static void UpdateSpillStats (LogicalDecodingContext * ctx );
254
260
255
261
@@ -280,12 +286,19 @@ InitWalSender(void)
280
286
/* Initialize empty timestamp buffer for lag tracking. */
281
287
lag_tracker = MemoryContextAllocZero (TopMemoryContext , sizeof (LagTracker ));
282
288
283
- /* Make sure we can remember the current read position in XLOG. */
284
- sendSeg = (WALOpenSegment * )
285
- MemoryContextAlloc (TopMemoryContext , sizeof (WALOpenSegment ));
286
- sendCxt = (WALSegmentContext * )
287
- MemoryContextAlloc (TopMemoryContext , sizeof (WALSegmentContext ));
288
- WALOpenSegmentInit (sendSeg , sendCxt , wal_segment_size , NULL );
289
+ /*
290
+ * Prepare physical walsender's fake xlogreader struct. Logical walsender
291
+ * does this later.
292
+ */
293
+ if (!am_db_walsender )
294
+ {
295
+ xlogreader = & fake_xlogreader ;
296
+ xlogreader -> routine =
297
+ * XL_ROUTINE (.segment_open = WalSndSegmentOpen ,
298
+ .segment_close = wal_segment_close );
299
+ WALOpenSegmentInit (& xlogreader -> seg , & xlogreader -> segcxt ,
300
+ wal_segment_size , NULL );
301
+ }
289
302
}
290
303
291
304
/*
@@ -302,11 +315,8 @@ WalSndErrorCleanup(void)
302
315
ConditionVariableCancelSleep ();
303
316
pgstat_report_wait_end ();
304
317
305
- if (sendSeg -> ws_file >= 0 )
306
- {
307
- close (sendSeg -> ws_file );
308
- sendSeg -> ws_file = -1 ;
309
- }
318
+ if (xlogreader -> seg .ws_file >= 0 )
319
+ wal_segment_close (xlogreader );
310
320
311
321
if (MyReplicationSlot != NULL )
312
322
ReplicationSlotRelease ();
@@ -837,11 +847,9 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
837
847
cur_page ,
838
848
targetPagePtr ,
839
849
XLOG_BLCKSZ ,
840
- sendSeg -> ws_tli , /* Pass the current TLI because only
850
+ state -> seg . ws_tli , /* Pass the current TLI because only
841
851
* WalSndSegmentOpen controls whether new
842
852
* TLI is needed. */
843
- sendSeg ,
844
- sendCxt ,
845
853
& errinfo ))
846
854
WALReadRaiseError (& errinfo );
847
855
@@ -852,8 +860,8 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
852
860
* read() succeeds in that case, but the data we tried to read might
853
861
* already have been overwritten with new WAL records.
854
862
*/
855
- XLByteToSeg (targetPagePtr , segno , sendCxt -> ws_segsize );
856
- CheckXLogRemoved (segno , sendSeg -> ws_tli );
863
+ XLByteToSeg (targetPagePtr , segno , state -> segcxt . ws_segsize );
864
+ CheckXLogRemoved (segno , state -> seg . ws_tli );
857
865
858
866
return count ;
859
867
}
@@ -1176,6 +1184,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
1176
1184
.segment_close = wal_segment_close ),
1177
1185
WalSndPrepareWrite , WalSndWriteData ,
1178
1186
WalSndUpdateProgress );
1187
+ xlogreader = logical_decoding_ctx -> reader ;
1179
1188
1180
1189
WalSndSetState (WALSNDSTATE_CATCHUP );
1181
1190
@@ -2447,13 +2456,11 @@ WalSndKill(int code, Datum arg)
2447
2456
}
2448
2457
2449
2458
/* XLogReaderRoutine->segment_open callback */
2450
- static int
2451
- WalSndSegmentOpen (XLogReaderState * state ,
2452
- XLogSegNo nextSegNo , WALSegmentContext * segcxt ,
2459
+ static void
2460
+ WalSndSegmentOpen (XLogReaderState * state , XLogSegNo nextSegNo ,
2453
2461
TimeLineID * tli_p )
2454
2462
{
2455
2463
char path [MAXPGPATH ];
2456
- int fd ;
2457
2464
2458
2465
/*-------
2459
2466
* When reading from a historic timeline, and there is a timeline switch
@@ -2484,15 +2491,15 @@ WalSndSegmentOpen(XLogReaderState *state,
2484
2491
{
2485
2492
XLogSegNo endSegNo ;
2486
2493
2487
- XLByteToSeg (sendTimeLineValidUpto , endSegNo , segcxt -> ws_segsize );
2488
- if (sendSeg -> ws_segno == endSegNo )
2494
+ XLByteToSeg (sendTimeLineValidUpto , endSegNo , state -> segcxt . ws_segsize );
2495
+ if (state -> seg . ws_segno == endSegNo )
2489
2496
* tli_p = sendTimeLineNextTLI ;
2490
2497
}
2491
2498
2492
- XLogFilePath (path , * tli_p , nextSegNo , segcxt -> ws_segsize );
2493
- fd = BasicOpenFile (path , O_RDONLY | PG_BINARY );
2494
- if (fd >= 0 )
2495
- return fd ;
2499
+ XLogFilePath (path , * tli_p , nextSegNo , state -> segcxt . ws_segsize );
2500
+ state -> seg . ws_file = BasicOpenFile (path , O_RDONLY | PG_BINARY );
2501
+ if (state -> seg . ws_file >= 0 )
2502
+ return ;
2496
2503
2497
2504
/*
2498
2505
* If the file is not found, assume it's because the standby asked for a
@@ -2515,7 +2522,6 @@ WalSndSegmentOpen(XLogReaderState *state,
2515
2522
(errcode_for_file_access (),
2516
2523
errmsg ("could not open file \"%s\": %m" ,
2517
2524
path )));
2518
- return -1 ; /* keep compiler quiet */
2519
2525
}
2520
2526
2521
2527
/*
@@ -2537,12 +2543,6 @@ XLogSendPhysical(void)
2537
2543
Size nbytes ;
2538
2544
XLogSegNo segno ;
2539
2545
WALReadError errinfo ;
2540
- static XLogReaderState fake_xlogreader =
2541
- {
2542
- /* Fake xlogreader state for WALRead */
2543
- .routine .segment_open = WalSndSegmentOpen ,
2544
- .routine .segment_close = wal_segment_close
2545
- };
2546
2546
2547
2547
/* If requested switch the WAL sender to the stopping state. */
2548
2548
if (got_STOPPING )
@@ -2685,9 +2685,8 @@ XLogSendPhysical(void)
2685
2685
if (sendTimeLineIsHistoric && sendTimeLineValidUpto <= sentPtr )
2686
2686
{
2687
2687
/* close the current file. */
2688
- if (sendSeg -> ws_file >= 0 )
2689
- close (sendSeg -> ws_file );
2690
- sendSeg -> ws_file = -1 ;
2688
+ if (xlogreader -> seg .ws_file >= 0 )
2689
+ wal_segment_close (xlogreader );
2691
2690
2692
2691
/* Send CopyDone */
2693
2692
pq_putmessage_noblock ('c' , NULL , 0 );
@@ -2760,21 +2759,19 @@ XLogSendPhysical(void)
2760
2759
enlargeStringInfo (& output_message , nbytes );
2761
2760
2762
2761
retry :
2763
- if (!WALRead (& fake_xlogreader ,
2762
+ if (!WALRead (xlogreader ,
2764
2763
& output_message .data [output_message .len ],
2765
2764
startptr ,
2766
2765
nbytes ,
2767
- sendSeg -> ws_tli , /* Pass the current TLI because only
2768
- * WalSndSegmentOpen controls whether new
2769
- * TLI is needed. */
2770
- sendSeg ,
2771
- sendCxt ,
2766
+ xlogreader -> seg .ws_tli , /* Pass the current TLI because
2767
+ * only WalSndSegmentOpen controls
2768
+ * whether new TLI is needed. */
2772
2769
& errinfo ))
2773
2770
WALReadRaiseError (& errinfo );
2774
2771
2775
2772
/* See logical_read_xlog_page(). */
2776
- XLByteToSeg (startptr , segno , sendCxt -> ws_segsize );
2777
- CheckXLogRemoved (segno , sendSeg -> ws_tli );
2773
+ XLByteToSeg (startptr , segno , xlogreader -> segcxt . ws_segsize );
2774
+ CheckXLogRemoved (segno , xlogreader -> seg . ws_tli );
2778
2775
2779
2776
/*
2780
2777
* During recovery, the currently-open WAL file might be replaced with the
@@ -2792,10 +2789,9 @@ XLogSendPhysical(void)
2792
2789
walsnd -> needreload = false;
2793
2790
SpinLockRelease (& walsnd -> mutex );
2794
2791
2795
- if (reload && sendSeg -> ws_file >= 0 )
2792
+ if (reload && xlogreader -> seg . ws_file >= 0 )
2796
2793
{
2797
- close (sendSeg -> ws_file );
2798
- sendSeg -> ws_file = -1 ;
2794
+ wal_segment_close (xlogreader );
2799
2795
2800
2796
goto retry ;
2801
2797
}
0 commit comments