@@ -24,17 +24,32 @@ const char *PROGRAM_VERSION = "unknown";
24
24
25
25
#include "pgut/pgut-fe.h"
26
26
27
+ #include <errno.h>
27
28
#include <string.h>
28
29
#include <stdlib.h>
29
30
#include <unistd.h>
30
31
#include <time.h>
31
32
33
+
34
+ #ifdef HAVE_POLL_H
35
+ #include <poll.h>
36
+ #endif
37
+ #ifdef HAVE_SYS_POLL_H
38
+ #include <sys/poll.h>
39
+ #endif
40
+ #ifdef HAVE_SYS_SELECT_H
41
+ #include <sys/select.h>
42
+ #endif
43
+
44
+
32
45
/*
33
46
* APPLY_COUNT: Number of applied logs per transaction. Larger values
34
47
* could be faster, but will be long transactions in the REDO phase.
35
48
*/
36
49
#define APPLY_COUNT 1000
37
50
51
+ /* poll() or select() timeout, in seconds */
52
+ #define POLL_TIMEOUT 3
38
53
39
54
/* Compile an array of existing transactions which are active during
40
55
* pg_repack's setup. Some transactions we can safely ignore:
@@ -633,7 +648,7 @@ rebuild_indexes(const repack_table *table)
633
648
*/
634
649
index_jobs [i ].status = FINISHED ;
635
650
}
636
- else if (i <= workers .num_workers ) {
651
+ else if (i < workers .num_workers ) {
637
652
/* Assign available worker to build an index. */
638
653
index_jobs [i ].status = INPROGRESS ;
639
654
index_jobs [i ].worker_idx = i ;
@@ -656,23 +671,67 @@ rebuild_indexes(const repack_table *table)
656
671
}
657
672
PQclear (res );
658
673
674
+ /* How many workers we kicked off earlier. */
675
+ num_active_workers = num_indexes > workers .num_workers ? workers .num_workers : num_indexes ;
676
+
659
677
if (workers .num_workers > 1 )
660
678
{
661
- /* How many workers we kicked off earlier. */
662
- num_active_workers = num_indexes > workers .num_workers ? workers .num_workers : num_indexes ;
679
+ int freed_worker = -1 ;
680
+ int ret ;
681
+
682
+ /* Prefer poll() over select(), following PostgreSQL custom. */
683
+ #ifdef HAVE_POLL
684
+ struct pollfd * input_fds ;
685
+
686
+ input_fds = pgut_malloc (sizeof (struct pollfd ) * num_active_workers );
687
+ for (i = 0 ; i < num_active_workers ; i ++ )
688
+ {
689
+ input_fds [i ].fd = PQsocket (workers .conns [i ]);
690
+ input_fds [i ].events = POLLIN | POLLERR ;
691
+ input_fds [i ].revents = 0 ;
692
+ }
693
+ #else
694
+ fd_set input_mask ;
695
+ struct timeval timeout ;
696
+ /* select() needs the highest-numbered socket descriptor */
697
+ int max_fd = 0 ;
698
+
699
+ FD_ZERO (& input_mask );
700
+ for (i = 0 ; i < num_active_workers ; i ++ )
701
+ {
702
+ FD_SET (PQsocket (workers .conns [i ]), & input_mask );
703
+ if (PQsocket (workers .conns [i ]) > max_fd )
704
+ max_fd = PQsocket (workers .conns [i ]);
705
+ }
706
+ #endif
663
707
664
708
/* Now go through our index builds, and look for any which is
665
709
* reported complete. Reassign that worker to the next index to
666
710
* be built, if any.
667
711
*/
668
- while (num_active_workers )
712
+ while (num_active_workers > 0 )
669
713
{
670
- int freed_worker = -1 ;
714
+ elog (DEBUG2 , "polling %d active workers" , num_active_workers );
715
+
716
+ #ifdef HAVE_POLL
717
+ ret = poll (input_fds , num_active_workers , POLL_TIMEOUT * 1000 );
718
+ #else
719
+ /* re-initialize timeout before each invocation of select()
720
+ * just in case select() modifies timeout to indicate remaining
721
+ * time.
722
+ */
723
+ timeout .tv_sec = POLL_TIMEOUT ;
724
+ timeout .tv_usec = 0 ;
725
+ ret = select (max_fd + 1 , & input_mask , NULL , NULL , & timeout );
726
+ #endif
727
+ if (ret < 0 && errno != EINTR )
728
+ elog (ERROR , "poll() failed: %d, %d" , ret , errno );
671
729
672
730
for (i = 0 ; i < num_indexes ; i ++ )
673
731
{
674
732
if (index_jobs [i ].status == INPROGRESS )
675
733
{
734
+ Assert (index_jobs [i ].worker_idx >= 0 );
676
735
/* Must call PQconsumeInput before we can check PQisBusy */
677
736
if (PQconsumeInput (workers .conns [index_jobs [i ].worker_idx ]) != 1 )
678
737
{
@@ -699,7 +758,13 @@ rebuild_indexes(const repack_table *table)
699
758
}
700
759
PQclear (res );
701
760
}
702
-
761
+
762
+ /* We are only going to re-queue one worker, even
763
+ * though more than one index build might be finished.
764
+ * Any other jobs which may be finished will
765
+ * just have to wait for the next pass through the
766
+ * poll()/select() loop.
767
+ */
703
768
freed_worker = index_jobs [i ].worker_idx ;
704
769
index_jobs [i ].status = FINISHED ;
705
770
num_active_workers -- ;
@@ -733,7 +798,6 @@ rebuild_indexes(const repack_table *table)
733
798
}
734
799
freed_worker = -1 ;
735
800
}
736
- sleep (1 );
737
801
}
738
802
739
803
}
@@ -994,7 +1058,6 @@ repack_one_table(const repack_table *table, const char *orderby)
994
1058
goto cleanup ;
995
1059
}
996
1060
997
-
998
1061
/*
999
1062
* 4. Apply log to temp table until no tuples are left in the log
1000
1063
* and all of the old transactions are finished.
0 commit comments