@@ -709,6 +709,18 @@ typedef struct XLogCtlData
709
709
XLogRecPtr lastFpwDisableRecPtr ;
710
710
711
711
slock_t info_lck ; /* locks shared variables shown above */
712
+
713
+ /*
714
+ * Variables used to track segment-boundary-crossing WAL records. See
715
+ * RegisterSegmentBoundary. Protected by segtrack_lck.
716
+ */
717
+ XLogSegNo lastNotifiedSeg ;
718
+ XLogSegNo earliestSegBoundary ;
719
+ XLogRecPtr earliestSegBoundaryEndPtr ;
720
+ XLogSegNo latestSegBoundary ;
721
+ XLogRecPtr latestSegBoundaryEndPtr ;
722
+
723
+ slock_t segtrack_lck ; /* locks shared variables shown above */
712
724
} XLogCtlData ;
713
725
714
726
static XLogCtlData * XLogCtl = NULL ;
@@ -899,6 +911,7 @@ static void RemoveOldXlogFiles(XLogSegNo segno, XLogRecPtr lastredoptr, XLogRecP
899
911
static void RemoveXlogFile (const char * segname , XLogRecPtr lastredoptr , XLogRecPtr endptr );
900
912
static void UpdateLastRemovedPtr (char * filename );
901
913
static void ValidateXLOGDirectoryStructure (void );
914
+ static void RegisterSegmentBoundary (XLogSegNo seg , XLogRecPtr pos );
902
915
static void CleanupBackupHistory (void );
903
916
static void UpdateMinRecoveryPoint (XLogRecPtr lsn , bool force );
904
917
static XLogRecord * ReadRecord (XLogReaderState * xlogreader , XLogRecPtr RecPtr ,
@@ -1129,23 +1142,56 @@ XLogInsertRecord(XLogRecData *rdata,
1129
1142
END_CRIT_SECTION ();
1130
1143
1131
1144
/*
1132
- * Update shared LogwrtRqst.Write, if we crossed page boundary.
1145
+ * If we crossed page boundary, update LogwrtRqst.Write; if we crossed
1146
+ * segment boundary, register that and wake up walwriter.
1133
1147
*/
1134
1148
if (StartPos / XLOG_BLCKSZ != EndPos / XLOG_BLCKSZ )
1135
1149
{
1150
+ XLogSegNo StartSeg ;
1151
+ XLogSegNo EndSeg ;
1152
+
1153
+ XLByteToSeg (StartPos , StartSeg , wal_segment_size );
1154
+ XLByteToSeg (EndPos , EndSeg , wal_segment_size );
1155
+
1156
+ /*
1157
+ * Register our crossing the segment boundary if that occurred.
1158
+ *
1159
+ * Note that we did not use XLByteToPrevSeg() for determining the
1160
+ * ending segment. This is so that a record that fits perfectly into
1161
+ * the end of the segment causes the latter to get marked ready for
1162
+ * archival immediately.
1163
+ */
1164
+ if (StartSeg != EndSeg && XLogArchivingActive ())
1165
+ RegisterSegmentBoundary (EndSeg , EndPos );
1166
+
1167
+ /*
1168
+ * Advance LogwrtRqst.Write so that it includes new block(s).
1169
+ *
1170
+ * We do this after registering the segment boundary so that the
1171
+ * comparison with the flushed pointer below can use the latest value
1172
+ * known globally.
1173
+ */
1136
1174
SpinLockAcquire (& XLogCtl -> info_lck );
1137
- /* advance global request to include new block(s) */
1138
1175
if (XLogCtl -> LogwrtRqst .Write < EndPos )
1139
1176
XLogCtl -> LogwrtRqst .Write = EndPos ;
1140
1177
/* update local result copy while I have the chance */
1141
1178
LogwrtResult = XLogCtl -> LogwrtResult ;
1142
1179
SpinLockRelease (& XLogCtl -> info_lck );
1180
+
1181
+ /*
1182
+ * There's a chance that the record was already flushed to disk and we
1183
+ * missed marking segments as ready for archive. If this happens, we
1184
+ * nudge the WALWriter, which will take care of notifying segments as
1185
+ * needed.
1186
+ */
1187
+ if (StartSeg != EndSeg && XLogArchivingActive () &&
1188
+ LogwrtResult .Flush >= EndPos && ProcGlobal -> walwriterLatch )
1189
+ SetLatch (ProcGlobal -> walwriterLatch );
1143
1190
}
1144
1191
1145
1192
/*
1146
1193
* If this was an XLOG_SWITCH record, flush the record and the empty
1147
- * padding space that fills the rest of the segment, and perform
1148
- * end-of-segment actions (eg, notifying archiver).
1194
+ * padding space that fills the rest of the segment.
1149
1195
*/
1150
1196
if (isLogSwitch )
1151
1197
{
@@ -2388,6 +2434,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
2388
2434
2389
2435
/* We should always be inside a critical section here */
2390
2436
Assert (CritSectionCount > 0 );
2437
+ Assert (LWLockHeldByMe (WALWriteLock ));
2391
2438
2392
2439
/*
2393
2440
* Update local LogwrtResult (caller probably did this already, but...)
@@ -2524,11 +2571,12 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
2524
2571
* later. Doing it here ensures that one and only one backend will
2525
2572
* perform this fsync.
2526
2573
*
2527
- * This is also the right place to notify the Archiver that the
2528
- * segment is ready to copy to archival storage, and to update the
2529
- * timer for archive_timeout, and to signal for a checkpoint if
2530
- * too many logfile segments have been used since the last
2531
- * checkpoint.
2574
+ * If WAL archiving is active, we attempt to notify the archiver
2575
+ * of any segments that are now ready for archival.
2576
+ *
2577
+ * This is also the right place to update the timer for
2578
+ * archive_timeout and to signal for a checkpoint if too many
2579
+ * logfile segments have been used since the last checkpoint.
2532
2580
*/
2533
2581
if (finishing_seg )
2534
2582
{
@@ -2540,7 +2588,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
2540
2588
LogwrtResult .Flush = LogwrtResult .Write ; /* end of page */
2541
2589
2542
2590
if (XLogArchivingActive ())
2543
- XLogArchiveNotifySeg ( openLogSegNo );
2591
+ NotifySegmentsReadyForArchive ( LogwrtResult . Flush );
2544
2592
2545
2593
XLogCtl -> lastSegSwitchTime = (pg_time_t ) time (NULL );
2546
2594
XLogCtl -> lastSegSwitchLSN = LogwrtResult .Flush ;
@@ -2627,6 +2675,9 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
2627
2675
XLogCtl -> LogwrtRqst .Flush = LogwrtResult .Flush ;
2628
2676
SpinLockRelease (& XLogCtl -> info_lck );
2629
2677
}
2678
+
2679
+ if (XLogArchivingActive ())
2680
+ NotifySegmentsReadyForArchive (LogwrtResult .Flush );
2630
2681
}
2631
2682
2632
2683
/*
@@ -4227,6 +4278,129 @@ ValidateXLOGDirectoryStructure(void)
4227
4278
}
4228
4279
}
4229
4280
4281
+ /*
4282
+ * RegisterSegmentBoundary
4283
+ *
4284
+ * WAL records that are split across a segment boundary require special
4285
+ * treatment for archiving: the initial segment must not be archived until
4286
+ * the end segment has been flushed, in case we crash before we have
4287
+ * the chance to flush the end segment (because after recovery we would
4288
+ * overwrite that WAL record with a different one, and so the file we
4289
+ * archived no longer represents truth.) This also applies to streaming
4290
+ * physical replication.
4291
+ *
4292
+ * To handle this, we keep track of the LSN of WAL records that cross
4293
+ * segment boundaries. Two such are sufficient: the ones with the
4294
+ * earliest and the latest end pointers we know about, since the flush
4295
+ * position advances monotonically. WAL record writers register
4296
+ * boundary-crossing records here, which is used by .ready file creation
4297
+ * to delay until the end segment is known flushed.
4298
+ */
4299
+ static void
4300
+ RegisterSegmentBoundary (XLogSegNo seg , XLogRecPtr endpos )
4301
+ {
4302
+ XLogSegNo segno PG_USED_FOR_ASSERTS_ONLY ;
4303
+
4304
+ /* verify caller computed segment number correctly */
4305
+ AssertArg ((XLByteToSeg (endpos , segno , wal_segment_size ), segno == seg ));
4306
+
4307
+ SpinLockAcquire (& XLogCtl -> segtrack_lck );
4308
+
4309
+ /*
4310
+ * If no segment boundaries are registered, store the new segment boundary
4311
+ * in earliestSegBoundary. Otherwise, store the greater segment
4312
+ * boundaries in latestSegBoundary.
4313
+ */
4314
+ if (XLogCtl -> earliestSegBoundary == MaxXLogSegNo )
4315
+ {
4316
+ XLogCtl -> earliestSegBoundary = seg ;
4317
+ XLogCtl -> earliestSegBoundaryEndPtr = endpos ;
4318
+ }
4319
+ else if (seg > XLogCtl -> earliestSegBoundary &&
4320
+ (XLogCtl -> latestSegBoundary == MaxXLogSegNo ||
4321
+ seg > XLogCtl -> latestSegBoundary ))
4322
+ {
4323
+ XLogCtl -> latestSegBoundary = seg ;
4324
+ XLogCtl -> latestSegBoundaryEndPtr = endpos ;
4325
+ }
4326
+
4327
+ SpinLockRelease (& XLogCtl -> segtrack_lck );
4328
+ }
4329
+
4330
+ /*
4331
+ * NotifySegmentsReadyForArchive
4332
+ *
4333
+ * Mark segments as ready for archival, given that it is safe to do so.
4334
+ * This function is idempotent.
4335
+ */
4336
+ void
4337
+ NotifySegmentsReadyForArchive (XLogRecPtr flushRecPtr )
4338
+ {
4339
+ XLogSegNo latest_boundary_seg ;
4340
+ XLogSegNo last_notified ;
4341
+ XLogSegNo flushed_seg ;
4342
+ XLogSegNo seg ;
4343
+ bool keep_latest ;
4344
+
4345
+ XLByteToSeg (flushRecPtr , flushed_seg , wal_segment_size );
4346
+
4347
+ SpinLockAcquire (& XLogCtl -> segtrack_lck );
4348
+
4349
+ if (XLogCtl -> latestSegBoundary <= flushed_seg &&
4350
+ XLogCtl -> latestSegBoundaryEndPtr <= flushRecPtr )
4351
+ {
4352
+ latest_boundary_seg = XLogCtl -> latestSegBoundary ;
4353
+ keep_latest = false;
4354
+ }
4355
+ else if (XLogCtl -> earliestSegBoundary <= flushed_seg &&
4356
+ XLogCtl -> earliestSegBoundaryEndPtr <= flushRecPtr )
4357
+ {
4358
+ latest_boundary_seg = XLogCtl -> earliestSegBoundary ;
4359
+ keep_latest = true;
4360
+ }
4361
+ else
4362
+ {
4363
+ SpinLockRelease (& XLogCtl -> segtrack_lck );
4364
+ return ;
4365
+ }
4366
+
4367
+ last_notified = XLogCtl -> lastNotifiedSeg ;
4368
+
4369
+ /*
4370
+ * Update shared memory and discard segment boundaries that are no longer
4371
+ * needed.
4372
+ *
4373
+ * It is safe to update shared memory before we attempt to create the
4374
+ * .ready files. If our calls to XLogArchiveNotifySeg() fail,
4375
+ * RemoveOldXlogFiles() will retry it as needed.
4376
+ */
4377
+ if (last_notified < latest_boundary_seg - 1 )
4378
+ XLogCtl -> lastNotifiedSeg = latest_boundary_seg - 1 ;
4379
+
4380
+ if (keep_latest )
4381
+ {
4382
+ XLogCtl -> earliestSegBoundary = XLogCtl -> latestSegBoundary ;
4383
+ XLogCtl -> earliestSegBoundaryEndPtr = XLogCtl -> latestSegBoundaryEndPtr ;
4384
+ }
4385
+ else
4386
+ {
4387
+ XLogCtl -> earliestSegBoundary = MaxXLogSegNo ;
4388
+ XLogCtl -> earliestSegBoundaryEndPtr = InvalidXLogRecPtr ;
4389
+ }
4390
+
4391
+ XLogCtl -> latestSegBoundary = MaxXLogSegNo ;
4392
+ XLogCtl -> latestSegBoundaryEndPtr = InvalidXLogRecPtr ;
4393
+
4394
+ SpinLockRelease (& XLogCtl -> segtrack_lck );
4395
+
4396
+ /*
4397
+ * Notify archiver about segments that are ready for archival (by creating
4398
+ * the corresponding .ready files).
4399
+ */
4400
+ for (seg = last_notified + 1 ; seg < latest_boundary_seg ; seg ++ )
4401
+ XLogArchiveNotifySeg (seg );
4402
+ }
4403
+
4230
4404
/*
4231
4405
* Remove previous backup history files. This also retries creation of
4232
4406
* .ready files for any backup history files for which XLogArchiveNotify
@@ -5112,8 +5286,16 @@ XLOGShmemInit(void)
5112
5286
5113
5287
SpinLockInit (& XLogCtl -> Insert .insertpos_lck );
5114
5288
SpinLockInit (& XLogCtl -> info_lck );
5289
+ SpinLockInit (& XLogCtl -> segtrack_lck );
5115
5290
SpinLockInit (& XLogCtl -> ulsn_lck );
5116
5291
InitSharedLatch (& XLogCtl -> recoveryWakeupLatch );
5292
+
5293
+ /* Initialize stuff for marking segments as ready for archival. */
5294
+ XLogCtl -> lastNotifiedSeg = MaxXLogSegNo ;
5295
+ XLogCtl -> earliestSegBoundary = MaxXLogSegNo ;
5296
+ XLogCtl -> earliestSegBoundaryEndPtr = InvalidXLogRecPtr ;
5297
+ XLogCtl -> latestSegBoundary = MaxXLogSegNo ;
5298
+ XLogCtl -> latestSegBoundaryEndPtr = InvalidXLogRecPtr ;
5117
5299
}
5118
5300
5119
5301
/*
@@ -7605,6 +7787,20 @@ StartupXLOG(void)
7605
7787
XLogCtl -> LogwrtRqst .Write = EndOfLog ;
7606
7788
XLogCtl -> LogwrtRqst .Flush = EndOfLog ;
7607
7789
7790
+ /*
7791
+ * Initialize XLogCtl->lastNotifiedSeg to the previous WAL file.
7792
+ */
7793
+ if (XLogArchivingActive ())
7794
+ {
7795
+ XLogSegNo EndOfLogSeg ;
7796
+
7797
+ XLByteToSeg (EndOfLog , EndOfLogSeg , wal_segment_size );
7798
+
7799
+ SpinLockAcquire (& XLogCtl -> segtrack_lck );
7800
+ XLogCtl -> lastNotifiedSeg = EndOfLogSeg - 1 ;
7801
+ SpinLockRelease (& XLogCtl -> segtrack_lck );
7802
+ }
7803
+
7608
7804
/*
7609
7805
* Update full_page_writes in shared memory and write an XLOG_FPW_CHANGE
7610
7806
* record before resource manager writes cleanup WAL records or checkpoint
0 commit comments