Skip to content

Commit 4235735

Browse files
committed
Several fixes for concurrent index builds:
* Use poll() if it is available, or select() otherwise, to efficiently wait on index builds in worker queries to finish. * fix off-by-one error when initially assigning workers * move PQsetnonblocking() calls to setup_workers()
1 parent 8c2dd16 commit 4235735

File tree

2 files changed

+85
-9
lines changed

2 files changed

+85
-9
lines changed

bin/pg_repack.c

Lines changed: 71 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,17 +24,32 @@ const char *PROGRAM_VERSION = "unknown";
2424

2525
#include "pgut/pgut-fe.h"
2626

27+
#include <errno.h>
2728
#include <string.h>
2829
#include <stdlib.h>
2930
#include <unistd.h>
3031
#include <time.h>
3132

33+
34+
#ifdef HAVE_POLL_H
35+
#include <poll.h>
36+
#endif
37+
#ifdef HAVE_SYS_POLL_H
38+
#include <sys/poll.h>
39+
#endif
40+
#ifdef HAVE_SYS_SELECT_H
41+
#include <sys/select.h>
42+
#endif
43+
44+
3245
/*
3346
* APPLY_COUNT: Number of applied logs per transaction. Larger values
3447
* could be faster, but will be long transactions in the REDO phase.
3548
*/
3649
#define APPLY_COUNT 1000
3750

51+
/* poll() or select() timeout, in seconds */
52+
#define POLL_TIMEOUT 3
3853

3954
/* Compile an array of existing transactions which are active during
4055
* pg_repack's setup. Some transactions we can safely ignore:
@@ -633,7 +648,7 @@ rebuild_indexes(const repack_table *table)
633648
*/
634649
index_jobs[i].status = FINISHED;
635650
}
636-
else if (i <= workers.num_workers) {
651+
else if (i < workers.num_workers) {
637652
/* Assign available worker to build an index. */
638653
index_jobs[i].status = INPROGRESS;
639654
index_jobs[i].worker_idx = i;
@@ -656,23 +671,67 @@ rebuild_indexes(const repack_table *table)
656671
}
657672
PQclear(res);
658673

674+
/* How many workers we kicked off earlier. */
675+
num_active_workers = num_indexes > workers.num_workers ? workers.num_workers : num_indexes;
676+
659677
if (workers.num_workers > 1)
660678
{
661-
/* How many workers we kicked off earlier. */
662-
num_active_workers = num_indexes > workers.num_workers ? workers.num_workers : num_indexes;
679+
int freed_worker = -1;
680+
int ret;
681+
682+
/* Prefer poll() over select(), following PostgreSQL custom. */
683+
#ifdef HAVE_POLL
684+
struct pollfd *input_fds;
685+
686+
input_fds = pgut_malloc(sizeof(struct pollfd) * num_active_workers);
687+
for (i = 0; i < num_active_workers; i++)
688+
{
689+
input_fds[i].fd = PQsocket(workers.conns[i]);
690+
input_fds[i].events = POLLIN | POLLERR;
691+
input_fds[i].revents = 0;
692+
}
693+
#else
694+
fd_set input_mask;
695+
struct timeval timeout;
696+
/* select() needs the highest-numbered socket descriptor */
697+
int max_fd = 0;
698+
699+
FD_ZERO(&input_mask);
700+
for (i = 0; i < num_active_workers; i++)
701+
{
702+
FD_SET(PQsocket(workers.conns[i]), &input_mask);
703+
if (PQsocket(workers.conns[i]) > max_fd)
704+
max_fd = PQsocket(workers.conns[i]);
705+
}
706+
#endif
663707

664708
/* Now go through our index builds, and look for any which is
665709
* reported complete. Reassign that worker to the next index to
666710
* be built, if any.
667711
*/
668-
while (num_active_workers)
712+
while (num_active_workers > 0)
669713
{
670-
int freed_worker = -1;
714+
elog(DEBUG2, "polling %d active workers", num_active_workers);
715+
716+
#ifdef HAVE_POLL
717+
ret = poll(input_fds, num_active_workers, POLL_TIMEOUT * 1000);
718+
#else
719+
/* re-initialize timeout before each invocation of select()
720+
* just in case select() modifies timeout to indicate remaining
721+
* time.
722+
*/
723+
timeout.tv_sec = POLL_TIMEOUT;
724+
timeout.tv_usec = 0;
725+
ret = select(max_fd + 1, &input_mask, NULL, NULL, &timeout);
726+
#endif
727+
if (ret < 0 && errno != EINTR)
728+
elog(ERROR, "poll() failed: %d, %d", ret, errno);
671729

672730
for (i = 0; i < num_indexes; i++)
673731
{
674732
if (index_jobs[i].status == INPROGRESS)
675733
{
734+
Assert(index_jobs[i].worker_idx >= 0);
676735
/* Must call PQconsumeInput before we can check PQisBusy */
677736
if (PQconsumeInput(workers.conns[index_jobs[i].worker_idx]) != 1)
678737
{
@@ -699,7 +758,13 @@ rebuild_indexes(const repack_table *table)
699758
}
700759
PQclear(res);
701760
}
702-
761+
762+
/* We are only going to re-queue one worker, even
763+
* though more than one index build might be finished.
764+
* Any other jobs which may be finished will
765+
* just have to wait for the next pass through the
766+
* poll()/select() loop.
767+
*/
703768
freed_worker = index_jobs[i].worker_idx;
704769
index_jobs[i].status = FINISHED;
705770
num_active_workers--;
@@ -733,7 +798,6 @@ rebuild_indexes(const repack_table *table)
733798
}
734799
freed_worker = -1;
735800
}
736-
sleep(1);
737801
}
738802

739803
}
@@ -980,7 +1044,6 @@ repack_one_table(const repack_table *table, const char *orderby)
9801044
goto cleanup;
9811045
}
9821046

983-
9841047
/*
9851048
* 4. Apply log to temp table until no tuples are left in the log
9861049
* and all of the old transactions are finished.

bin/pgut/pgut-fe.c

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,11 @@ setup_workers(int num_workers)
8282
*/
8383
elog(DEBUG2, "Setting up worker conn %d", i);
8484

85-
/* Don't confuse pgut_connections by using pgut_connect() */
85+
/* Don't confuse pgut_connections by using pgut_connect()
86+
*
87+
* XXX: could use PQconnectStart() and PQconnectPoll() to
88+
* open these connections in non-blocking manner.
89+
*/
8690
conn = PQconnectdb(buf.data);
8791
if (PQstatus(conn) == CONNECTION_OK)
8892
{
@@ -94,6 +98,15 @@ setup_workers(int num_workers)
9498
PQerrorMessage(conn));
9599
break;
96100
}
101+
102+
/* Make sure each worker connection can work in non-blocking
103+
* mode.
104+
*/
105+
if (PQsetnonblocking(workers.conns[i], 1))
106+
{
107+
elog(ERROR, "Unable to set worker connection %d "
108+
"non-blocking.", i);
109+
}
97110
}
98111
/* In case we bailed out of setting up all workers, record
99112
* how many successful worker conns we actually have.

0 commit comments

Comments
 (0)