@@ -733,6 +733,18 @@ typedef struct XLogCtlData
733
733
XLogRecPtr lastFpwDisableRecPtr ;
734
734
735
735
slock_t info_lck ; /* locks shared variables shown above */
736
+
737
+ /*
738
+ * Variables used to track segment-boundary-crossing WAL records. See
739
+ * RegisterSegmentBoundary. Protected by segtrack_lck.
740
+ */
741
+ XLogSegNo lastNotifiedSeg ;
742
+ XLogSegNo earliestSegBoundary ;
743
+ XLogRecPtr earliestSegBoundaryEndPtr ;
744
+ XLogSegNo latestSegBoundary ;
745
+ XLogRecPtr latestSegBoundaryEndPtr ;
746
+
747
+ slock_t segtrack_lck ; /* locks shared variables shown above */
736
748
} XLogCtlData ;
737
749
738
750
static XLogCtlData * XLogCtl = NULL ;
@@ -931,6 +943,7 @@ static void RemoveXlogFile(const char *segname, XLogSegNo recycleSegNo,
931
943
XLogSegNo * endlogSegNo );
932
944
static void UpdateLastRemovedPtr (char * filename );
933
945
static void ValidateXLOGDirectoryStructure (void );
946
+ static void RegisterSegmentBoundary (XLogSegNo seg , XLogRecPtr pos );
934
947
static void CleanupBackupHistory (void );
935
948
static void UpdateMinRecoveryPoint (XLogRecPtr lsn , bool force );
936
949
static XLogRecord * ReadRecord (XLogReaderState * xlogreader ,
@@ -1165,23 +1178,56 @@ XLogInsertRecord(XLogRecData *rdata,
1165
1178
END_CRIT_SECTION ();
1166
1179
1167
1180
/*
1168
- * Update shared LogwrtRqst.Write, if we crossed page boundary.
1181
+ * If we crossed page boundary, update LogwrtRqst.Write; if we crossed
1182
+ * segment boundary, register that and wake up walwriter.
1169
1183
*/
1170
1184
if (StartPos / XLOG_BLCKSZ != EndPos / XLOG_BLCKSZ )
1171
1185
{
1186
+ XLogSegNo StartSeg ;
1187
+ XLogSegNo EndSeg ;
1188
+
1189
+ XLByteToSeg (StartPos , StartSeg , wal_segment_size );
1190
+ XLByteToSeg (EndPos , EndSeg , wal_segment_size );
1191
+
1192
+ /*
1193
+ * Register our crossing the segment boundary if that occurred.
1194
+ *
1195
+ * Note that we did not use XLByteToPrevSeg() for determining the
1196
+ * ending segment. This is so that a record that fits perfectly into
1197
+ * the end of the segment causes the latter to get marked ready for
1198
+ * archival immediately.
1199
+ */
1200
+ if (StartSeg != EndSeg && XLogArchivingActive ())
1201
+ RegisterSegmentBoundary (EndSeg , EndPos );
1202
+
1203
+ /*
1204
+ * Advance LogwrtRqst.Write so that it includes new block(s).
1205
+ *
1206
+ * We do this after registering the segment boundary so that the
1207
+ * comparison with the flushed pointer below can use the latest value
1208
+ * known globally.
1209
+ */
1172
1210
SpinLockAcquire (& XLogCtl -> info_lck );
1173
- /* advance global request to include new block(s) */
1174
1211
if (XLogCtl -> LogwrtRqst .Write < EndPos )
1175
1212
XLogCtl -> LogwrtRqst .Write = EndPos ;
1176
1213
/* update local result copy while I have the chance */
1177
1214
LogwrtResult = XLogCtl -> LogwrtResult ;
1178
1215
SpinLockRelease (& XLogCtl -> info_lck );
1216
+
1217
+ /*
1218
+ * There's a chance that the record was already flushed to disk and we
1219
+ * missed marking segments as ready for archive. If this happens, we
1220
+ * nudge the WALWriter, which will take care of notifying segments as
1221
+ * needed.
1222
+ */
1223
+ if (StartSeg != EndSeg && XLogArchivingActive () &&
1224
+ LogwrtResult .Flush >= EndPos && ProcGlobal -> walwriterLatch )
1225
+ SetLatch (ProcGlobal -> walwriterLatch );
1179
1226
}
1180
1227
1181
1228
/*
1182
1229
* If this was an XLOG_SWITCH record, flush the record and the empty
1183
- * padding space that fills the rest of the segment, and perform
1184
- * end-of-segment actions (eg, notifying archiver).
1230
+ * padding space that fills the rest of the segment.
1185
1231
*/
1186
1232
if (isLogSwitch )
1187
1233
{
@@ -2433,6 +2479,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
2433
2479
2434
2480
/* We should always be inside a critical section here */
2435
2481
Assert (CritSectionCount > 0 );
2482
+ Assert (LWLockHeldByMe (WALWriteLock ));
2436
2483
2437
2484
/*
2438
2485
* Update local LogwrtResult (caller probably did this already, but...)
@@ -2599,11 +2646,12 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
2599
2646
* later. Doing it here ensures that one and only one backend will
2600
2647
* perform this fsync.
2601
2648
*
2602
- * This is also the right place to notify the Archiver that the
2603
- * segment is ready to copy to archival storage, and to update the
2604
- * timer for archive_timeout, and to signal for a checkpoint if
2605
- * too many logfile segments have been used since the last
2606
- * checkpoint.
2649
+ * If WAL archiving is active, we attempt to notify the archiver
2650
+ * of any segments that are now ready for archival.
2651
+ *
2652
+ * This is also the right place to update the timer for
2653
+ * archive_timeout and to signal for a checkpoint if too many
2654
+ * logfile segments have been used since the last checkpoint.
2607
2655
*/
2608
2656
if (finishing_seg )
2609
2657
{
@@ -2615,7 +2663,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
2615
2663
LogwrtResult .Flush = LogwrtResult .Write ; /* end of page */
2616
2664
2617
2665
if (XLogArchivingActive ())
2618
- XLogArchiveNotifySeg ( openLogSegNo );
2666
+ NotifySegmentsReadyForArchive ( LogwrtResult . Flush );
2619
2667
2620
2668
XLogCtl -> lastSegSwitchTime = (pg_time_t ) time (NULL );
2621
2669
XLogCtl -> lastSegSwitchLSN = LogwrtResult .Flush ;
@@ -2703,6 +2751,9 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
2703
2751
XLogCtl -> LogwrtRqst .Flush = LogwrtResult .Flush ;
2704
2752
SpinLockRelease (& XLogCtl -> info_lck );
2705
2753
}
2754
+
2755
+ if (XLogArchivingActive ())
2756
+ NotifySegmentsReadyForArchive (LogwrtResult .Flush );
2706
2757
}
2707
2758
2708
2759
/*
@@ -4324,6 +4375,129 @@ ValidateXLOGDirectoryStructure(void)
4324
4375
}
4325
4376
}
4326
4377
4378
+ /*
4379
+ * RegisterSegmentBoundary
4380
+ *
4381
+ * WAL records that are split across a segment boundary require special
4382
+ * treatment for archiving: the initial segment must not be archived until
4383
+ * the end segment has been flushed, in case we crash before we have
4384
+ * the chance to flush the end segment (because after recovery we would
4385
+ * overwrite that WAL record with a different one, and so the file we
4386
+ * archived no longer represents truth.) This also applies to streaming
4387
+ * physical replication.
4388
+ *
4389
+ * To handle this, we keep track of the LSN of WAL records that cross
4390
+ * segment boundaries. Two such are sufficient: the ones with the
4391
+ * earliest and the latest end pointers we know about, since the flush
4392
+ * position advances monotonically. WAL record writers register
4393
+ * boundary-crossing records here, which is used by .ready file creation
4394
+ * to delay until the end segment is known flushed.
4395
+ */
4396
+ static void
4397
+ RegisterSegmentBoundary (XLogSegNo seg , XLogRecPtr endpos )
4398
+ {
4399
+ XLogSegNo segno PG_USED_FOR_ASSERTS_ONLY ;
4400
+
4401
+ /* verify caller computed segment number correctly */
4402
+ AssertArg ((XLByteToSeg (endpos , segno , wal_segment_size ), segno == seg ));
4403
+
4404
+ SpinLockAcquire (& XLogCtl -> segtrack_lck );
4405
+
4406
+ /*
4407
+ * If no segment boundaries are registered, store the new segment boundary
4408
+ * in earliestSegBoundary. Otherwise, store the greater segment
4409
+ * boundaries in latestSegBoundary.
4410
+ */
4411
+ if (XLogCtl -> earliestSegBoundary == MaxXLogSegNo )
4412
+ {
4413
+ XLogCtl -> earliestSegBoundary = seg ;
4414
+ XLogCtl -> earliestSegBoundaryEndPtr = endpos ;
4415
+ }
4416
+ else if (seg > XLogCtl -> earliestSegBoundary &&
4417
+ (XLogCtl -> latestSegBoundary == MaxXLogSegNo ||
4418
+ seg > XLogCtl -> latestSegBoundary ))
4419
+ {
4420
+ XLogCtl -> latestSegBoundary = seg ;
4421
+ XLogCtl -> latestSegBoundaryEndPtr = endpos ;
4422
+ }
4423
+
4424
+ SpinLockRelease (& XLogCtl -> segtrack_lck );
4425
+ }
4426
+
4427
+ /*
4428
+ * NotifySegmentsReadyForArchive
4429
+ *
4430
+ * Mark segments as ready for archival, given that it is safe to do so.
4431
+ * This function is idempotent.
4432
+ */
4433
+ void
4434
+ NotifySegmentsReadyForArchive (XLogRecPtr flushRecPtr )
4435
+ {
4436
+ XLogSegNo latest_boundary_seg ;
4437
+ XLogSegNo last_notified ;
4438
+ XLogSegNo flushed_seg ;
4439
+ XLogSegNo seg ;
4440
+ bool keep_latest ;
4441
+
4442
+ XLByteToSeg (flushRecPtr , flushed_seg , wal_segment_size );
4443
+
4444
+ SpinLockAcquire (& XLogCtl -> segtrack_lck );
4445
+
4446
+ if (XLogCtl -> latestSegBoundary <= flushed_seg &&
4447
+ XLogCtl -> latestSegBoundaryEndPtr <= flushRecPtr )
4448
+ {
4449
+ latest_boundary_seg = XLogCtl -> latestSegBoundary ;
4450
+ keep_latest = false;
4451
+ }
4452
+ else if (XLogCtl -> earliestSegBoundary <= flushed_seg &&
4453
+ XLogCtl -> earliestSegBoundaryEndPtr <= flushRecPtr )
4454
+ {
4455
+ latest_boundary_seg = XLogCtl -> earliestSegBoundary ;
4456
+ keep_latest = true;
4457
+ }
4458
+ else
4459
+ {
4460
+ SpinLockRelease (& XLogCtl -> segtrack_lck );
4461
+ return ;
4462
+ }
4463
+
4464
+ last_notified = XLogCtl -> lastNotifiedSeg ;
4465
+
4466
+ /*
4467
+ * Update shared memory and discard segment boundaries that are no longer
4468
+ * needed.
4469
+ *
4470
+ * It is safe to update shared memory before we attempt to create the
4471
+ * .ready files. If our calls to XLogArchiveNotifySeg() fail,
4472
+ * RemoveOldXlogFiles() will retry it as needed.
4473
+ */
4474
+ if (last_notified < latest_boundary_seg - 1 )
4475
+ XLogCtl -> lastNotifiedSeg = latest_boundary_seg - 1 ;
4476
+
4477
+ if (keep_latest )
4478
+ {
4479
+ XLogCtl -> earliestSegBoundary = XLogCtl -> latestSegBoundary ;
4480
+ XLogCtl -> earliestSegBoundaryEndPtr = XLogCtl -> latestSegBoundaryEndPtr ;
4481
+ }
4482
+ else
4483
+ {
4484
+ XLogCtl -> earliestSegBoundary = MaxXLogSegNo ;
4485
+ XLogCtl -> earliestSegBoundaryEndPtr = InvalidXLogRecPtr ;
4486
+ }
4487
+
4488
+ XLogCtl -> latestSegBoundary = MaxXLogSegNo ;
4489
+ XLogCtl -> latestSegBoundaryEndPtr = InvalidXLogRecPtr ;
4490
+
4491
+ SpinLockRelease (& XLogCtl -> segtrack_lck );
4492
+
4493
+ /*
4494
+ * Notify archiver about segments that are ready for archival (by creating
4495
+ * the corresponding .ready files).
4496
+ */
4497
+ for (seg = last_notified + 1 ; seg < latest_boundary_seg ; seg ++ )
4498
+ XLogArchiveNotifySeg (seg );
4499
+ }
4500
+
4327
4501
/*
4328
4502
* Remove previous backup history files. This also retries creation of
4329
4503
* .ready files for any backup history files for which XLogArchiveNotify
@@ -5225,9 +5399,17 @@ XLOGShmemInit(void)
5225
5399
5226
5400
SpinLockInit (& XLogCtl -> Insert .insertpos_lck );
5227
5401
SpinLockInit (& XLogCtl -> info_lck );
5402
+ SpinLockInit (& XLogCtl -> segtrack_lck );
5228
5403
SpinLockInit (& XLogCtl -> ulsn_lck );
5229
5404
InitSharedLatch (& XLogCtl -> recoveryWakeupLatch );
5230
5405
ConditionVariableInit (& XLogCtl -> recoveryNotPausedCV );
5406
+
5407
+ /* Initialize stuff for marking segments as ready for archival. */
5408
+ XLogCtl -> lastNotifiedSeg = MaxXLogSegNo ;
5409
+ XLogCtl -> earliestSegBoundary = MaxXLogSegNo ;
5410
+ XLogCtl -> earliestSegBoundaryEndPtr = InvalidXLogRecPtr ;
5411
+ XLogCtl -> latestSegBoundary = MaxXLogSegNo ;
5412
+ XLogCtl -> latestSegBoundaryEndPtr = InvalidXLogRecPtr ;
5231
5413
}
5232
5414
5233
5415
/*
@@ -7858,6 +8040,20 @@ StartupXLOG(void)
7858
8040
XLogCtl -> LogwrtRqst .Write = EndOfLog ;
7859
8041
XLogCtl -> LogwrtRqst .Flush = EndOfLog ;
7860
8042
8043
+ /*
8044
+ * Initialize XLogCtl->lastNotifiedSeg to the previous WAL file.
8045
+ */
8046
+ if (XLogArchivingActive ())
8047
+ {
8048
+ XLogSegNo EndOfLogSeg ;
8049
+
8050
+ XLByteToSeg (EndOfLog , EndOfLogSeg , wal_segment_size );
8051
+
8052
+ SpinLockAcquire (& XLogCtl -> segtrack_lck );
8053
+ XLogCtl -> lastNotifiedSeg = EndOfLogSeg - 1 ;
8054
+ SpinLockRelease (& XLogCtl -> segtrack_lck );
8055
+ }
8056
+
7861
8057
/*
7862
8058
* Update full_page_writes in shared memory and write an XLOG_FPW_CHANGE
7863
8059
* record before resource manager writes cleanup WAL records or checkpoint
0 commit comments