35
35
36
36
#include "access/xlog.h"
37
37
#include "access/xlog_internal.h"
38
+ #include "lib/binaryheap.h"
38
39
#include "libpq/pqsignal.h"
39
40
#include "miscadmin.h"
40
41
#include "pgstat.h"
47
48
#include "storage/proc.h"
48
49
#include "storage/procsignal.h"
49
50
#include "storage/shmem.h"
51
+ #include "storage/spin.h"
50
52
#include "utils/guc.h"
51
53
#include "utils/ps_status.h"
52
54
72
74
*/
73
75
#define NUM_ORPHAN_CLEANUP_RETRIES 3
74
76
77
+ /*
78
+ * Maximum number of .ready files to gather per directory scan.
79
+ */
80
+ #define NUM_FILES_PER_DIRECTORY_SCAN 64
81
+
75
82
/* Shared memory area for archiver process */
76
83
typedef struct PgArchData
77
84
{
78
85
int pgprocno ; /* pgprocno of archiver process */
86
+
87
+ /*
88
+ * Forces a directory scan in pgarch_readyXlog(). Protected by
89
+ * arch_lck.
90
+ */
91
+ bool force_dir_scan ;
92
+
93
+ slock_t arch_lck ;
79
94
} PgArchData ;
80
95
81
96
@@ -86,6 +101,22 @@ typedef struct PgArchData
86
101
static time_t last_sigterm_time = 0 ;
87
102
static PgArchData * PgArch = NULL ;
88
103
104
+ /*
105
+ * Stuff for tracking multiple files to archive from each scan of
106
+ * archive_status. Minimizing the number of directory scans when there are
107
+ * many files to archive can significantly improve archival rate.
108
+ *
109
+ * arch_heap is a max-heap that is used during the directory scan to track
110
+ * the highest-priority files to archive. After the directory scan
111
+ * completes, the file names are stored in ascending order of priority in
112
+ * arch_files. pgarch_readyXlog() returns files from arch_files until it
113
+ * is empty, at which point another directory scan must be performed.
114
+ */
115
+ static binaryheap * arch_heap = NULL ;
116
+ static char arch_filenames [NUM_FILES_PER_DIRECTORY_SCAN ][MAX_XFN_CHARS ];
117
+ static char * arch_files [NUM_FILES_PER_DIRECTORY_SCAN ];
118
+ static int arch_files_size = 0 ;
119
+
89
120
/*
90
121
* Flags set by interrupt handlers for later service in the main loop.
91
122
*/
@@ -103,6 +134,7 @@ static bool pgarch_readyXlog(char *xlog);
103
134
static void pgarch_archiveDone (char * xlog );
104
135
static void pgarch_die (int code , Datum arg );
105
136
static void HandlePgArchInterrupts (void );
137
+ static int ready_file_comparator (Datum a , Datum b , void * arg );
106
138
107
139
/* Report shared memory space needed by PgArchShmemInit */
108
140
Size
@@ -129,6 +161,7 @@ PgArchShmemInit(void)
129
161
/* First time through, so initialize */
130
162
MemSet (PgArch , 0 , PgArchShmemSize ());
131
163
PgArch -> pgprocno = INVALID_PGPROCNO ;
164
+ SpinLockInit (& PgArch -> arch_lck );
132
165
}
133
166
}
134
167
@@ -198,6 +231,10 @@ PgArchiverMain(void)
198
231
*/
199
232
PgArch -> pgprocno = MyProc -> pgprocno ;
200
233
234
+ /* Initialize our max-heap for prioritizing files to archive. */
235
+ arch_heap = binaryheap_allocate (NUM_FILES_PER_DIRECTORY_SCAN ,
236
+ ready_file_comparator , NULL );
237
+
201
238
pgarch_MainLoop ();
202
239
203
240
proc_exit (0 );
@@ -325,6 +362,9 @@ pgarch_ArchiverCopyLoop(void)
325
362
{
326
363
char xlog [MAX_XFN_CHARS + 1 ];
327
364
365
+ /* force directory scan in the first call to pgarch_readyXlog() */
366
+ arch_files_size = 0 ;
367
+
328
368
/*
329
369
* loop through all xlogs with archive_status of .ready and archive
330
370
* them...mostly we expect this to be a single file, though it is possible
@@ -600,26 +640,62 @@ pgarch_archiveXlog(char *xlog)
600
640
static bool
601
641
pgarch_readyXlog (char * xlog )
602
642
{
603
- /*
604
- * open xlog status directory and read through list of xlogs that have the
605
- * .ready suffix, looking for earliest file. It is possible to optimise
606
- * this code, though only a single file is expected on the vast majority
607
- * of calls, so....
608
- */
609
643
char XLogArchiveStatusDir [MAXPGPATH ];
610
644
DIR * rldir ;
611
645
struct dirent * rlde ;
612
- bool found = false;
613
- bool historyFound = false;
646
+ bool force_dir_scan ;
614
647
648
+ /*
649
+ * If a directory scan was requested, clear the stored file names and
650
+ * proceed.
651
+ */
652
+ SpinLockAcquire (& PgArch -> arch_lck );
653
+ force_dir_scan = PgArch -> force_dir_scan ;
654
+ PgArch -> force_dir_scan = false;
655
+ SpinLockRelease (& PgArch -> arch_lck );
656
+
657
+ if (force_dir_scan )
658
+ arch_files_size = 0 ;
659
+
660
+ /*
661
+ * If we still have stored file names from the previous directory scan,
662
+ * try to return one of those. We check to make sure the status file
663
+ * is still present, as the archive_command for a previous file may
664
+ * have already marked it done.
665
+ */
666
+ while (arch_files_size > 0 )
667
+ {
668
+ struct stat st ;
669
+ char status_file [MAXPGPATH ];
670
+ char * arch_file ;
671
+
672
+ arch_files_size -- ;
673
+ arch_file = arch_files [arch_files_size ];
674
+ StatusFilePath (status_file , arch_file , ".ready" );
675
+
676
+ if (stat (status_file , & st ) == 0 )
677
+ {
678
+ strcpy (xlog , arch_file );
679
+ return true;
680
+ }
681
+ else if (errno != ENOENT )
682
+ ereport (ERROR ,
683
+ (errcode_for_file_access (),
684
+ errmsg ("could not stat file \"%s\": %m" , status_file )));
685
+ }
686
+
687
+ /*
688
+ * Open the archive status directory and read through the list of files
689
+ * with the .ready suffix, looking for the earliest files.
690
+ */
615
691
snprintf (XLogArchiveStatusDir , MAXPGPATH , XLOGDIR "/archive_status" );
616
692
rldir = AllocateDir (XLogArchiveStatusDir );
617
693
618
694
while ((rlde = ReadDir (rldir , XLogArchiveStatusDir )) != NULL )
619
695
{
620
696
int basenamelen = (int ) strlen (rlde -> d_name ) - 6 ;
621
697
char basename [MAX_XFN_CHARS + 1 ];
622
- bool ishistory ;
698
+ char * arch_file ;
623
699
624
700
/* Ignore entries with unexpected number of characters */
625
701
if (basenamelen < MIN_XFN_CHARS ||
@@ -638,32 +714,97 @@ pgarch_readyXlog(char *xlog)
638
714
memcpy (basename , rlde -> d_name , basenamelen );
639
715
basename [basenamelen ] = '\0' ;
640
716
641
- /* Is this a history file? */
642
- ishistory = IsTLHistoryFileName (basename );
643
-
644
717
/*
645
- * Consume the file to archive. History files have the highest
646
- * priority. If this is the first file or the first history file
647
- * ever, copy it. In the presence of a history file already chosen as
648
- * target, ignore all other files except history files which have been
649
- * generated for an older timeline than what is already chosen as
650
- * target to archive.
718
+ * Store the file in our max-heap if it has a high enough priority.
651
719
*/
652
- if (! found || ( ishistory && ! historyFound ) )
720
+ if (arch_heap -> bh_size < NUM_FILES_PER_DIRECTORY_SCAN )
653
721
{
654
- strcpy (xlog , basename );
655
- found = true;
656
- historyFound = ishistory ;
722
+ /* If the heap isn't full yet, quickly add it. */
723
+ arch_file = arch_filenames [arch_heap -> bh_size ];
724
+ strcpy (arch_file , basename );
725
+ binaryheap_add_unordered (arch_heap , CStringGetDatum (arch_file ));
726
+
727
+ /* If we just filled the heap, make it a valid one. */
728
+ if (arch_heap -> bh_size == NUM_FILES_PER_DIRECTORY_SCAN )
729
+ binaryheap_build (arch_heap );
657
730
}
658
- else if (ishistory || !historyFound )
731
+ else if (ready_file_comparator (binaryheap_first (arch_heap ),
732
+ CStringGetDatum (basename ), NULL ) > 0 )
659
733
{
660
- if (strcmp (basename , xlog ) < 0 )
661
- strcpy (xlog , basename );
734
+ /*
735
+ * Remove the lowest priority file and add the current one to
736
+ * the heap.
737
+ */
738
+ arch_file = DatumGetCString (binaryheap_remove_first (arch_heap ));
739
+ strcpy (arch_file , basename );
740
+ binaryheap_add (arch_heap , CStringGetDatum (arch_file ));
662
741
}
663
742
}
664
743
FreeDir (rldir );
665
744
666
- return found ;
745
+ /* If no files were found, simply return. */
746
+ if (arch_heap -> bh_size == 0 )
747
+ return false;
748
+
749
+ /*
750
+ * If we didn't fill the heap, we didn't make it a valid one. Do that
751
+ * now.
752
+ */
753
+ if (arch_heap -> bh_size < NUM_FILES_PER_DIRECTORY_SCAN )
754
+ binaryheap_build (arch_heap );
755
+
756
+ /*
757
+ * Fill arch_files array with the files to archive in ascending order
758
+ * of priority.
759
+ */
760
+ arch_files_size = arch_heap -> bh_size ;
761
+ for (int i = 0 ; i < arch_files_size ; i ++ )
762
+ arch_files [i ] = DatumGetCString (binaryheap_remove_first (arch_heap ));
763
+
764
+ /* Return the highest priority file. */
765
+ arch_files_size -- ;
766
+ strcpy (xlog , arch_files [arch_files_size ]);
767
+
768
+ return true;
769
+ }
770
+
771
+ /*
772
+ * ready_file_comparator
773
+ *
774
+ * Compares the archival priority of the given files to archive. If "a"
775
+ * has a higher priority than "b", a negative value will be returned. If
776
+ * "b" has a higher priority than "a", a positive value will be returned.
777
+ * If "a" and "b" have equivalent values, 0 will be returned.
778
+ */
779
+ static int
780
+ ready_file_comparator (Datum a , Datum b , void * arg )
781
+ {
782
+ char * a_str = DatumGetCString (a );
783
+ char * b_str = DatumGetCString (b );
784
+ bool a_history = IsTLHistoryFileName (a_str );
785
+ bool b_history = IsTLHistoryFileName (b_str );
786
+
787
+ /* Timeline history files always have the highest priority. */
788
+ if (a_history != b_history )
789
+ return a_history ? -1 : 1 ;
790
+
791
+ /* Priority is given to older files. */
792
+ return strcmp (a_str , b_str );
793
+ }
794
+
795
+ /*
796
+ * PgArchForceDirScan
797
+ *
798
+ * When called, the next call to pgarch_readyXlog() will perform a
799
+ * directory scan. This is useful for ensuring that important files such
800
+ * as timeline history files are archived as quickly as possible.
801
+ */
802
+ void
803
+ PgArchForceDirScan (void )
804
+ {
805
+ SpinLockAcquire (& PgArch -> arch_lck );
806
+ PgArch -> force_dir_scan = true;
807
+ SpinLockRelease (& PgArch -> arch_lck );
667
808
}
668
809
669
810
/*
0 commit comments