Skip to content

Commit 509e568

Browse files
committed
First pass at implementing concurrent index builds using multiple connections.
Adds a new --jobs command-line argument to specify how many worker connections you want. These worker connections should stick around while processing table(s) in a single database. For each table, parcel out the indexes to be built among these worker conns, submitting each CREATE INDEX ... request using PQsendQuery() i.e. in non-blocking fashion. Most of this is still rather crude, in particular the while (num_active_workers) ... loop in rebuild_indexes(), but it seems to be working, so I'm committing here.
1 parent b4d8a90 commit 509e568

File tree

3 files changed

+331
-40
lines changed

3 files changed

+331
-40
lines changed

bin/pg_repack.c

Lines changed: 215 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -137,20 +137,31 @@ typedef struct repack_table
137137
const char *sql_pop; /* SQL used in flush */
138138
} repack_table;
139139

140+
141+
typedef enum
142+
{
143+
UNPROCESSED,
144+
INPROGRESS,
145+
FINISHED
146+
} index_status_t;
147+
140148
/*
141149
* per-index information
142150
*/
143151
typedef struct repack_index
144152
{
145153
Oid target_oid; /* target: OID */
146154
const char *create_index; /* CREATE INDEX */
155+
index_status_t status; /* Track parallel build statuses. */
156+
int worker_idx; /* which worker conn is handling */
147157
} repack_index;
148158

149159
static bool is_superuser(void);
150160
static void repack_all_databases(const char *order_by);
151161
static bool repack_one_database(const char *order_by, char *errbuf, size_t errsize);
152162
static void repack_one_table(const repack_table *table, const char *order_by);
153163
static void repack_cleanup(bool fatal, const repack_table *table);
164+
static bool rebuild_indexes(const repack_table *table);
154165

155166
static char *getstr(PGresult *res, int row, int col);
156167
static Oid getoid(PGresult *res, int row, int col);
@@ -172,6 +183,7 @@ static bool noorder = false;
172183
static SimpleStringList table_list = {NULL, NULL};
173184
static char *orderby = NULL;
174185
static int wait_timeout = 60; /* in seconds */
186+
static int jobs = 0; /* number of concurrent worker conns. */
175187

176188
/* buffer should have at least 11 bytes */
177189
static char *
@@ -189,6 +201,7 @@ static pgut_option options[] =
189201
{ 's', 'o', "order-by", &orderby },
190202
{ 'i', 'T', "wait-timeout", &wait_timeout },
191203
{ 'B', 'Z', "no-analyze", &analyze },
204+
{ 'i', 'j', "jobs", &jobs },
192205
{ 0 },
193206
};
194207

@@ -320,7 +333,7 @@ getoid(PGresult *res, int row, int col)
320333
}
321334

322335
/*
323-
* Call repack_one_table for the target table or each table in a database.
336+
* Call repack_one_table for the target tables or each table in a database.
324337
*/
325338
static bool
326339
repack_one_database(const char *orderby, char *errbuf, size_t errsize)
@@ -346,6 +359,10 @@ repack_one_database(const char *orderby, char *errbuf, size_t errsize)
346359

347360
reconnect(ERROR);
348361

362+
/* No sense in setting up concurrent workers if --jobs=1 */
363+
if (jobs > 1)
364+
setup_workers(jobs);
365+
349366
if (!is_superuser()) {
350367
if (errbuf)
351368
snprintf(errbuf, errsize, "You must be a superuser to use %s",
@@ -559,6 +576,181 @@ apply_log(PGconn *conn, const repack_table *table, int count)
559576
return result;
560577
}
561578

579+
/*
580+
* Create indexes on temp table, possibly using multiple worker connections
581+
* concurrently if the user asked for --jobs=...
582+
*/
583+
static bool
584+
rebuild_indexes(const repack_table *table)
585+
{
586+
PGresult *res;
587+
const char *params[1];
588+
int num_indexes;
589+
int i;
590+
int num_active_workers = 0;
591+
repack_index *index_jobs;
592+
char buffer[12];
593+
bool have_error = false;
594+
595+
elog(DEBUG2, "---- create indexes ----");
596+
597+
params[0] = utoa(table->target_oid, buffer);
598+
res = execute("SELECT indexrelid,"
599+
" repack.repack_indexdef(indexrelid, indrelid), "
600+
" pg_get_indexdef(indexrelid)"
601+
" FROM pg_index WHERE indrelid = $1 AND indisvalid", 1, params);
602+
603+
num_indexes = PQntuples(res);
604+
elog(DEBUG2, "Have %d indexes and num_workers=%d", num_indexes,
605+
workers.num_workers);
606+
607+
index_jobs = pgut_malloc(sizeof(repack_index) * num_indexes);
608+
609+
for (i = 0; i < num_indexes; i++)
610+
{
611+
int c = 0;
612+
const char *indexdef;
613+
614+
index_jobs[i].target_oid = getoid(res, i, c++);
615+
index_jobs[i].create_index = getstr(res, i, c++);
616+
index_jobs[i].status = UNPROCESSED;
617+
index_jobs[i].worker_idx = -1; /* Unassigned */
618+
619+
indexdef = getstr(res, i, c++);
620+
621+
elog(DEBUG2, "set up index_jobs [%d]", i);
622+
elog(DEBUG2, "target_oid : %u", index_jobs[i].target_oid);
623+
elog(DEBUG2, "create_index : %s", index_jobs[i].create_index);
624+
625+
if (workers.num_workers <= 1) {
626+
/* Use primary connection if we are not setting up parallel
627+
* index building, or if we only have one worker.
628+
*/
629+
command(index_jobs[i].create_index, 0, NULL);
630+
631+
/* This bookkeeping isn't actually important in this no-workers
632+
* case, but just for clarity.
633+
*/
634+
index_jobs[i].status = FINISHED;
635+
}
636+
}
637+
PQclear(res);
638+
639+
if (workers.num_workers > 1)
640+
{
641+
/* First time through, assign every available worker to build an index.
642+
*/
643+
for (i = 0; i < num_indexes && i < workers.num_workers; i++)
644+
{
645+
index_jobs[i].status = INPROGRESS;
646+
index_jobs[i].worker_idx = i;
647+
elog(DEBUG2, "Worker %d building index: %s", i,
648+
index_jobs[i].create_index);
649+
650+
/* Make sure each worker connection can work in non-blocking
651+
* mode.
652+
*/
653+
if (PQsetnonblocking(workers.conns[i], 1))
654+
{
655+
elog(WARNING, "Unable to set worker connection %d "
656+
"non-blocking.", i);
657+
have_error = true;
658+
goto cleanup;
659+
}
660+
661+
if (!(PQsendQuery(workers.conns[i], index_jobs[i].create_index)))
662+
{
663+
elog(WARNING, "Error sending async query: %s\n%s",
664+
index_jobs[i].create_index,
665+
PQerrorMessage(workers.conns[i]));
666+
have_error = true;
667+
goto cleanup;
668+
}
669+
670+
}
671+
num_active_workers = i;
672+
673+
/* Now go through our index builds, and look for any which is
674+
* reported complete. Reassign that worker to the next index to
675+
* be built, if any.
676+
*/
677+
while (num_active_workers)
678+
{
679+
int freed_worker = -1;
680+
681+
for (i = 0; i < num_indexes; i++)
682+
{
683+
if (index_jobs[i].status == INPROGRESS)
684+
{
685+
/* Must call PQconsumeInput before we can check PQisBusy */
686+
if (PQconsumeInput(workers.conns[index_jobs[i].worker_idx]) != 1)
687+
{
688+
elog(WARNING, "Error fetching async query status: %s",
689+
PQerrorMessage(workers.conns[index_jobs[i].worker_idx]));
690+
have_error = true;
691+
goto cleanup;
692+
}
693+
if (!PQisBusy(workers.conns[index_jobs[i].worker_idx]))
694+
{
695+
elog(NOTICE, "Command finished in worker %d: %s",
696+
index_jobs[i].worker_idx,
697+
index_jobs[i].create_index);
698+
699+
while ((res = PQgetResult(workers.conns[index_jobs[i].worker_idx])))
700+
{
701+
if (PQresultStatus(res) != PGRES_COMMAND_OK)
702+
{
703+
elog(WARNING, "Error with create index: %s",
704+
PQerrorMessage(workers.conns[index_jobs[i].worker_idx]));
705+
PQclear(res);
706+
have_error = true;
707+
goto cleanup;
708+
}
709+
PQclear(res);
710+
}
711+
712+
freed_worker = index_jobs[i].worker_idx;
713+
index_jobs[i].status = FINISHED;
714+
num_active_workers--;
715+
break;
716+
}
717+
}
718+
}
719+
if (freed_worker > -1)
720+
{
721+
for (i = 0; i < num_indexes; i++)
722+
{
723+
if (index_jobs[i].status == UNPROCESSED)
724+
{
725+
index_jobs[i].status = INPROGRESS;
726+
index_jobs[i].worker_idx = freed_worker;
727+
elog(NOTICE, "Assigning worker %d execute job %d: %s",
728+
freed_worker, i, index_jobs[i].create_index);
729+
730+
if (!(PQsendQuery(workers.conns[freed_worker],
731+
index_jobs[i].create_index))) {
732+
elog(WARNING, "Error sending async query: %s\n%s",
733+
index_jobs[i].create_index,
734+
PQerrorMessage(workers.conns[freed_worker]));
735+
have_error = true;
736+
goto cleanup;
737+
}
738+
num_active_workers++;
739+
break;
740+
}
741+
}
742+
freed_worker = -1;
743+
}
744+
sleep(1);
745+
}
746+
747+
}
748+
749+
cleanup:
750+
return (!have_error);
751+
}
752+
753+
562754
/*
563755
* Re-organize one table.
564756
*/
@@ -568,8 +760,8 @@ repack_one_table(const repack_table *table, const char *orderby)
568760
PGresult *res;
569761
const char *params[2];
570762
int num;
571-
int i;
572763
int num_waiting = 0;
764+
573765
char *vxid = NULL;
574766
char buffer[12];
575767
StringInfoData sql;
@@ -674,7 +866,14 @@ repack_one_table(const repack_table *table, const char *orderby)
674866
printfStringInfo(&sql, "LOCK TABLE %s IN ACCESS SHARE MODE",
675867
table->target_name);
676868
elog(DEBUG2, "LOCK TABLE %s IN ACCESS SHARE MODE", table->target_name);
677-
if (!(PQsendQuery(conn2, sql.data))) {
869+
if (PQsetnonblocking(conn2, 1))
870+
{
871+
elog(WARNING, "Unable to set conn2 nonblocking.");
872+
have_error = true;
873+
goto cleanup;
874+
}
875+
if (!(PQsendQuery(conn2, sql.data)))
876+
{
678877
elog(WARNING, "Error sending async query: %s\n%s", sql.data,
679878
PQerrorMessage(conn2));
680879
have_error = true;
@@ -725,6 +924,14 @@ repack_one_table(const repack_table *table, const char *orderby)
725924
PQclear(res);
726925
}
727926

927+
/* Turn conn2 back into blocking mode for further non-async use. */
928+
if (PQsetnonblocking(conn2, 0))
929+
{
930+
elog(WARNING, "Unable to set conn2 blocking.");
931+
have_error = true;
932+
goto cleanup;
933+
}
934+
728935
/*
729936
* 2. Copy tuples into temp table.
730937
*/
@@ -790,44 +997,11 @@ repack_one_table(const repack_table *table, const char *orderby)
790997
/*
791998
* 3. Create indexes on temp table.
792999
*/
793-
elog(DEBUG2, "---- create indexes ----");
794-
795-
params[0] = utoa(table->target_oid, buffer);
796-
res = execute("SELECT indexrelid,"
797-
" repack.repack_indexdef(indexrelid, indrelid),"
798-
" indisvalid,"
799-
" pg_get_indexdef(indexrelid)"
800-
" FROM pg_index WHERE indrelid = $1", 1, params);
801-
802-
num = PQntuples(res);
803-
for (i = 0; i < num; i++)
804-
{
805-
repack_index index;
806-
int c = 0;
807-
const char *isvalid;
808-
const char *indexdef;
809-
810-
index.target_oid = getoid(res, i, c++);
811-
index.create_index = getstr(res, i, c++);
812-
isvalid = getstr(res, i, c++);
813-
indexdef = getstr(res, i, c++);
814-
815-
if (isvalid && isvalid[0] == 'f') {
816-
elog(WARNING, "skipping invalid index: %s", indexdef);
817-
continue;
818-
}
819-
820-
elog(DEBUG2, "[%d]", i);
821-
elog(DEBUG2, "target_oid : %u", index.target_oid);
822-
elog(DEBUG2, "create_index : %s", index.create_index);
823-
824-
/*
825-
* NOTE: If we want to create multiple indexes in parallel,
826-
* we need to call create_index in multiple connections.
827-
*/
828-
command(index.create_index, 0, NULL);
1000+
if (!rebuild_indexes(table)) {
1001+
have_error = true;
1002+
goto cleanup;
8291003
}
830-
PQclear(res);
1004+
8311005

8321006
/*
8331007
* 4. Apply log to temp table until no tuples are left in the log
@@ -1187,6 +1361,7 @@ pgut_help(bool details)
11871361

11881362
printf("Options:\n");
11891363
printf(" -a, --all repack all databases\n");
1364+
printf(" -j --jobs Use this many parallel jobs");
11901365
printf(" -n, --no-order do vacuum full instead of cluster\n");
11911366
printf(" -o, --order-by=COLUMNS order by columns instead of cluster keys\n");
11921367
printf(" -t, --table=TABLE repack specific table only\n");

0 commit comments

Comments
 (0)