Skip to content

Commit 962fdff

Browse files
committed
Fix up buggy initialization code for poll() and select().
Also some logging and variable name cleanup.
1 parent b9c7189 commit 962fdff

File tree

2 files changed

+36
-28
lines changed

2 files changed

+36
-28
lines changed

bin/pg_repack.c

Lines changed: 35 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -602,7 +602,8 @@ rebuild_indexes(const repack_table *table)
602602
const char *params[1];
603603
int num_indexes;
604604
int i;
605-
int num_active_workers = 0;
605+
int num_active_workers;
606+
int num_workers;
606607
repack_index *index_jobs;
607608
char buffer[12];
608609
bool have_error = false;
@@ -630,8 +631,16 @@ rebuild_indexes(const repack_table *table)
630631
" FROM pg_index WHERE indrelid = $1 AND indisvalid", 1, params);
631632

632633
num_indexes = PQntuples(res);
634+
635+
/* We might have more actual worker connectionss than we need,
636+
* if the number of workers exceeds the number of indexes to be
637+
* built. In that case, ignore the extra workers.
638+
*/
639+
num_workers = num_indexes > workers.num_workers ? workers.num_workers : num_indexes;
640+
num_active_workers = num_workers;
641+
633642
elog(DEBUG2, "Have %d indexes and num_workers=%d", num_indexes,
634-
workers.num_workers);
643+
num_workers);
635644

636645
index_jobs = pgut_malloc(sizeof(repack_index) * num_indexes);
637646

@@ -651,7 +660,7 @@ rebuild_indexes(const repack_table *table)
651660
elog(DEBUG2, "target_oid : %u", index_jobs[i].target_oid);
652661
elog(DEBUG2, "create_index : %s", index_jobs[i].create_index);
653662

654-
if (workers.num_workers <= 1) {
663+
if (num_workers <= 1) {
655664
/* Use primary connection if we are not setting up parallel
656665
* index building, or if we only have one worker.
657666
*/
@@ -662,12 +671,12 @@ rebuild_indexes(const repack_table *table)
662671
*/
663672
index_jobs[i].status = FINISHED;
664673
}
665-
else if (i < workers.num_workers) {
674+
else if (i < num_workers) {
666675
/* Assign available worker to build an index. */
667676
index_jobs[i].status = INPROGRESS;
668677
index_jobs[i].worker_idx = i;
669-
elog(DEBUG2, "Worker %d building index: %s", i,
670-
index_jobs[i].create_index);
678+
elog(LOG, "Initial worker %d to build index: %s",
679+
i, index_jobs[i].create_index);
671680

672681
if (!(PQsendQuery(workers.conns[i], index_jobs[i].create_index)))
673682
{
@@ -685,20 +694,18 @@ rebuild_indexes(const repack_table *table)
685694
}
686695
PQclear(res);
687696

688-
/* How many workers we kicked off earlier. */
689-
num_active_workers = num_indexes > workers.num_workers ? workers.num_workers : num_indexes;
690-
691-
if (workers.num_workers > 1)
697+
if (num_workers > 1)
692698
{
693699
int freed_worker = -1;
694700
int ret;
695701

696702
/* Prefer poll() over select(), following PostgreSQL custom. */
703+
#undef HAVE_POLL
697704
#ifdef HAVE_POLL
698705
struct pollfd *input_fds;
699706

700-
input_fds = pgut_malloc(sizeof(struct pollfd) * num_active_workers);
701-
for (i = 0; i < num_active_workers; i++)
707+
input_fds = pgut_malloc(sizeof(struct pollfd) * num_workers);
708+
for (i = 0; i < num_workers; i++)
702709
{
703710
input_fds[i].fd = PQsocket(workers.conns[i]);
704711
input_fds[i].events = POLLIN | POLLERR;
@@ -708,15 +715,7 @@ rebuild_indexes(const repack_table *table)
708715
fd_set input_mask;
709716
struct timeval timeout;
710717
/* select() needs the highest-numbered socket descriptor */
711-
int max_fd = 0;
712-
713-
FD_ZERO(&input_mask);
714-
for (i = 0; i < num_active_workers; i++)
715-
{
716-
FD_SET(PQsocket(workers.conns[i]), &input_mask);
717-
if (PQsocket(workers.conns[i]) > max_fd)
718-
max_fd = PQsocket(workers.conns[i]);
719-
}
718+
int max_fd;
720719
#endif
721720

722721
/* Now go through our index builds, and look for any which is
@@ -728,14 +727,23 @@ rebuild_indexes(const repack_table *table)
728727
elog(DEBUG2, "polling %d active workers", num_active_workers);
729728

730729
#ifdef HAVE_POLL
731-
ret = poll(input_fds, num_active_workers, POLL_TIMEOUT * 1000);
730+
ret = poll(input_fds, num_workers, POLL_TIMEOUT * 1000);
732731
#else
733-
/* re-initialize timeout before each invocation of select()
734-
* just in case select() modifies timeout to indicate remaining
735-
* time.
732+
/* re-initialize timeout and input_mask before each
733+
* invocation of select(). I think this isn't
734+
* necessary on many Unixen, but just in case.
736735
*/
737736
timeout.tv_sec = POLL_TIMEOUT;
738737
timeout.tv_usec = 0;
738+
739+
FD_ZERO(&input_mask);
740+
for (i = 0, max_fd = 0; i < num_workers; i++)
741+
{
742+
FD_SET(PQsocket(workers.conns[i]), &input_mask);
743+
if (PQsocket(workers.conns[i]) > max_fd)
744+
max_fd = PQsocket(workers.conns[i]);
745+
}
746+
739747
ret = select(max_fd + 1, &input_mask, NULL, NULL, &timeout);
740748
#endif
741749
if (ret < 0 && errno != EINTR)
@@ -756,7 +764,7 @@ rebuild_indexes(const repack_table *table)
756764
}
757765
if (!PQisBusy(workers.conns[index_jobs[i].worker_idx]))
758766
{
759-
elog(NOTICE, "Command finished in worker %d: %s",
767+
elog(LOG, "Command finished in worker %d: %s",
760768
index_jobs[i].worker_idx,
761769
index_jobs[i].create_index);
762770

@@ -794,7 +802,7 @@ rebuild_indexes(const repack_table *table)
794802
{
795803
index_jobs[i].status = INPROGRESS;
796804
index_jobs[i].worker_idx = freed_worker;
797-
elog(NOTICE, "Assigning worker %d to build index #%d: "
805+
elog(LOG, "Assigning worker %d to build index #%d: "
798806
"%s", freed_worker, i,
799807
index_jobs[i].create_index);
800808

doc/pg_repack.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ The following options can be specified in ``OPTIONS``.
117117

118118
Options:
119119
-a, --all repack all databases
120-
-j, --jobs Use this many parallel jobs for each table
120+
-j, --jobs Use this many parallel jobs for each table
121121
-n, --no-order do vacuum full instead of cluster
122122
-o, --order-by=COLUMNS order by columns instead of cluster keys
123123
-t, --table=TABLE repack specific table only

0 commit comments

Comments
 (0)