@@ -602,7 +602,8 @@ rebuild_indexes(const repack_table *table)
602
602
const char * params [1 ];
603
603
int num_indexes ;
604
604
int i ;
605
- int num_active_workers = 0 ;
605
+ int num_active_workers ;
606
+ int num_workers ;
606
607
repack_index * index_jobs ;
607
608
char buffer [12 ];
608
609
bool have_error = false;
@@ -630,8 +631,16 @@ rebuild_indexes(const repack_table *table)
630
631
" FROM pg_index WHERE indrelid = $1 AND indisvalid" , 1 , params );
631
632
632
633
num_indexes = PQntuples (res );
634
+
635
+ /* We might have more actual worker connectionss than we need,
636
+ * if the number of workers exceeds the number of indexes to be
637
+ * built. In that case, ignore the extra workers.
638
+ */
639
+ num_workers = num_indexes > workers .num_workers ? workers .num_workers : num_indexes ;
640
+ num_active_workers = num_workers ;
641
+
633
642
elog (DEBUG2 , "Have %d indexes and num_workers=%d" , num_indexes ,
634
- workers . num_workers );
643
+ num_workers );
635
644
636
645
index_jobs = pgut_malloc (sizeof (repack_index ) * num_indexes );
637
646
@@ -651,7 +660,7 @@ rebuild_indexes(const repack_table *table)
651
660
elog (DEBUG2 , "target_oid : %u" , index_jobs [i ].target_oid );
652
661
elog (DEBUG2 , "create_index : %s" , index_jobs [i ].create_index );
653
662
654
- if (workers . num_workers <= 1 ) {
663
+ if (num_workers <= 1 ) {
655
664
/* Use primary connection if we are not setting up parallel
656
665
* index building, or if we only have one worker.
657
666
*/
@@ -662,12 +671,12 @@ rebuild_indexes(const repack_table *table)
662
671
*/
663
672
index_jobs [i ].status = FINISHED ;
664
673
}
665
- else if (i < workers . num_workers ) {
674
+ else if (i < num_workers ) {
666
675
/* Assign available worker to build an index. */
667
676
index_jobs [i ].status = INPROGRESS ;
668
677
index_jobs [i ].worker_idx = i ;
669
- elog (DEBUG2 , "Worker %d building index: %s" , i ,
670
- index_jobs [i ].create_index );
678
+ elog (LOG , "Initial worker %d to build index: %s" ,
679
+ i , index_jobs [i ].create_index );
671
680
672
681
if (!(PQsendQuery (workers .conns [i ], index_jobs [i ].create_index )))
673
682
{
@@ -685,20 +694,18 @@ rebuild_indexes(const repack_table *table)
685
694
}
686
695
PQclear (res );
687
696
688
- /* How many workers we kicked off earlier. */
689
- num_active_workers = num_indexes > workers .num_workers ? workers .num_workers : num_indexes ;
690
-
691
- if (workers .num_workers > 1 )
697
+ if (num_workers > 1 )
692
698
{
693
699
int freed_worker = -1 ;
694
700
int ret ;
695
701
696
702
/* Prefer poll() over select(), following PostgreSQL custom. */
703
+ #undef HAVE_POLL
697
704
#ifdef HAVE_POLL
698
705
struct pollfd * input_fds ;
699
706
700
- input_fds = pgut_malloc (sizeof (struct pollfd ) * num_active_workers );
701
- for (i = 0 ; i < num_active_workers ; i ++ )
707
+ input_fds = pgut_malloc (sizeof (struct pollfd ) * num_workers );
708
+ for (i = 0 ; i < num_workers ; i ++ )
702
709
{
703
710
input_fds [i ].fd = PQsocket (workers .conns [i ]);
704
711
input_fds [i ].events = POLLIN | POLLERR ;
@@ -708,15 +715,7 @@ rebuild_indexes(const repack_table *table)
708
715
fd_set input_mask ;
709
716
struct timeval timeout ;
710
717
/* select() needs the highest-numbered socket descriptor */
711
- int max_fd = 0 ;
712
-
713
- FD_ZERO (& input_mask );
714
- for (i = 0 ; i < num_active_workers ; i ++ )
715
- {
716
- FD_SET (PQsocket (workers .conns [i ]), & input_mask );
717
- if (PQsocket (workers .conns [i ]) > max_fd )
718
- max_fd = PQsocket (workers .conns [i ]);
719
- }
718
+ int max_fd ;
720
719
#endif
721
720
722
721
/* Now go through our index builds, and look for any which is
@@ -728,14 +727,23 @@ rebuild_indexes(const repack_table *table)
728
727
elog (DEBUG2 , "polling %d active workers" , num_active_workers );
729
728
730
729
#ifdef HAVE_POLL
731
- ret = poll (input_fds , num_active_workers , POLL_TIMEOUT * 1000 );
730
+ ret = poll (input_fds , num_workers , POLL_TIMEOUT * 1000 );
732
731
#else
733
- /* re-initialize timeout before each invocation of select()
734
- * just in case select() modifies timeout to indicate remaining
735
- * time .
732
+ /* re-initialize timeout and input_mask before each
733
+ * invocation of select(). I think this isn't
734
+ * necessary on many Unixen, but just in case .
736
735
*/
737
736
timeout .tv_sec = POLL_TIMEOUT ;
738
737
timeout .tv_usec = 0 ;
738
+
739
+ FD_ZERO (& input_mask );
740
+ for (i = 0 , max_fd = 0 ; i < num_workers ; i ++ )
741
+ {
742
+ FD_SET (PQsocket (workers .conns [i ]), & input_mask );
743
+ if (PQsocket (workers .conns [i ]) > max_fd )
744
+ max_fd = PQsocket (workers .conns [i ]);
745
+ }
746
+
739
747
ret = select (max_fd + 1 , & input_mask , NULL , NULL , & timeout );
740
748
#endif
741
749
if (ret < 0 && errno != EINTR )
@@ -756,7 +764,7 @@ rebuild_indexes(const repack_table *table)
756
764
}
757
765
if (!PQisBusy (workers .conns [index_jobs [i ].worker_idx ]))
758
766
{
759
- elog (NOTICE , "Command finished in worker %d: %s" ,
767
+ elog (LOG , "Command finished in worker %d: %s" ,
760
768
index_jobs [i ].worker_idx ,
761
769
index_jobs [i ].create_index );
762
770
@@ -794,7 +802,7 @@ rebuild_indexes(const repack_table *table)
794
802
{
795
803
index_jobs [i ].status = INPROGRESS ;
796
804
index_jobs [i ].worker_idx = freed_worker ;
797
- elog (NOTICE , "Assigning worker %d to build index #%d: "
805
+ elog (LOG , "Assigning worker %d to build index #%d: "
798
806
"%s" , freed_worker , i ,
799
807
index_jobs [i ].create_index );
800
808
0 commit comments