@@ -137,20 +137,31 @@ typedef struct repack_table
137
137
const char * sql_pop ; /* SQL used in flush */
138
138
} repack_table ;
139
139
140
+
141
+ typedef enum
142
+ {
143
+ UNPROCESSED ,
144
+ INPROGRESS ,
145
+ FINISHED
146
+ } index_status_t ;
147
+
140
148
/*
141
149
* per-index information
142
150
*/
143
151
typedef struct repack_index
144
152
{
145
153
Oid target_oid ; /* target: OID */
146
154
const char * create_index ; /* CREATE INDEX */
155
+ index_status_t status ; /* Track parallel build statuses. */
156
+ int worker_idx ; /* which worker conn is handling */
147
157
} repack_index ;
148
158
149
159
static bool is_superuser (void );
150
160
static void repack_all_databases (const char * order_by );
151
161
static bool repack_one_database (const char * order_by , char * errbuf , size_t errsize );
152
162
static void repack_one_table (const repack_table * table , const char * order_by );
153
163
static void repack_cleanup (bool fatal , const repack_table * table );
164
+ static bool rebuild_indexes (const repack_table * table );
154
165
155
166
static char * getstr (PGresult * res , int row , int col );
156
167
static Oid getoid (PGresult * res , int row , int col );
@@ -172,6 +183,7 @@ static bool noorder = false;
172
183
static SimpleStringList table_list = {NULL , NULL };
173
184
static char * orderby = NULL ;
174
185
static int wait_timeout = 60 ; /* in seconds */
186
+ static int jobs = 0 ; /* number of concurrent worker conns. */
175
187
176
188
/* buffer should have at least 11 bytes */
177
189
static char *
@@ -189,6 +201,7 @@ static pgut_option options[] =
189
201
{ 's' , 'o' , "order-by" , & orderby },
190
202
{ 'i' , 'T' , "wait-timeout" , & wait_timeout },
191
203
{ 'B' , 'Z' , "no-analyze" , & analyze },
204
+ { 'i' , 'j' , "jobs" , & jobs },
192
205
{ 0 },
193
206
};
194
207
@@ -320,7 +333,7 @@ getoid(PGresult *res, int row, int col)
320
333
}
321
334
322
335
/*
323
- * Call repack_one_table for the target table or each table in a database.
336
+ * Call repack_one_table for the target tables or each table in a database.
324
337
*/
325
338
static bool
326
339
repack_one_database (const char * orderby , char * errbuf , size_t errsize )
@@ -346,6 +359,10 @@ repack_one_database(const char *orderby, char *errbuf, size_t errsize)
346
359
347
360
reconnect (ERROR );
348
361
362
+ /* No sense in setting up concurrent workers if --jobs=1 */
363
+ if (jobs > 1 )
364
+ setup_workers (jobs );
365
+
349
366
if (!is_superuser ()) {
350
367
if (errbuf )
351
368
snprintf (errbuf , errsize , "You must be a superuser to use %s" ,
@@ -403,6 +420,7 @@ repack_one_database(const char *orderby, char *errbuf, size_t errsize)
403
420
}
404
421
goto cleanup ;
405
422
}
423
+ PQclear (res );
406
424
407
425
/* Disable statement timeout. */
408
426
command ("SET statement_timeout = 0" , 0 , NULL );
@@ -558,6 +576,181 @@ apply_log(PGconn *conn, const repack_table *table, int count)
558
576
return result ;
559
577
}
560
578
579
+ /*
580
+ * Create indexes on temp table, possibly using multiple worker connections
581
+ * concurrently if the user asked for --jobs=...
582
+ */
583
+ static bool
584
+ rebuild_indexes (const repack_table * table )
585
+ {
586
+ PGresult * res ;
587
+ const char * params [1 ];
588
+ int num_indexes ;
589
+ int i ;
590
+ int num_active_workers = 0 ;
591
+ repack_index * index_jobs ;
592
+ char buffer [12 ];
593
+ bool have_error = false;
594
+
595
+ elog (DEBUG2 , "---- create indexes ----" );
596
+
597
+ params [0 ] = utoa (table -> target_oid , buffer );
598
+ res = execute ("SELECT indexrelid,"
599
+ " repack.repack_indexdef(indexrelid, indrelid), "
600
+ " pg_get_indexdef(indexrelid)"
601
+ " FROM pg_index WHERE indrelid = $1 AND indisvalid" , 1 , params );
602
+
603
+ num_indexes = PQntuples (res );
604
+ elog (DEBUG2 , "Have %d indexes and num_workers=%d" , num_indexes ,
605
+ workers .num_workers );
606
+
607
+ index_jobs = pgut_malloc (sizeof (repack_index ) * num_indexes );
608
+
609
+ for (i = 0 ; i < num_indexes ; i ++ )
610
+ {
611
+ int c = 0 ;
612
+ const char * indexdef ;
613
+
614
+ index_jobs [i ].target_oid = getoid (res , i , c ++ );
615
+ index_jobs [i ].create_index = getstr (res , i , c ++ );
616
+ index_jobs [i ].status = UNPROCESSED ;
617
+ index_jobs [i ].worker_idx = -1 ; /* Unassigned */
618
+
619
+ indexdef = getstr (res , i , c ++ );
620
+
621
+ elog (DEBUG2 , "set up index_jobs [%d]" , i );
622
+ elog (DEBUG2 , "target_oid : %u" , index_jobs [i ].target_oid );
623
+ elog (DEBUG2 , "create_index : %s" , index_jobs [i ].create_index );
624
+
625
+ if (workers .num_workers <= 1 ) {
626
+ /* Use primary connection if we are not setting up parallel
627
+ * index building, or if we only have one worker.
628
+ */
629
+ command (index_jobs [i ].create_index , 0 , NULL );
630
+
631
+ /* This bookkeeping isn't actually important in this no-workers
632
+ * case, but just for clarity.
633
+ */
634
+ index_jobs [i ].status = FINISHED ;
635
+ }
636
+ }
637
+ PQclear (res );
638
+
639
+ if (workers .num_workers > 1 )
640
+ {
641
+ /* First time through, assign every available worker to build an index.
642
+ */
643
+ for (i = 0 ; i < num_indexes && i < workers .num_workers ; i ++ )
644
+ {
645
+ index_jobs [i ].status = INPROGRESS ;
646
+ index_jobs [i ].worker_idx = i ;
647
+ elog (DEBUG2 , "Worker %d building index: %s" , i ,
648
+ index_jobs [i ].create_index );
649
+
650
+ /* Make sure each worker connection can work in non-blocking
651
+ * mode.
652
+ */
653
+ if (PQsetnonblocking (workers .conns [i ], 1 ))
654
+ {
655
+ elog (WARNING , "Unable to set worker connection %d "
656
+ "non-blocking." , i );
657
+ have_error = true;
658
+ goto cleanup ;
659
+ }
660
+
661
+ if (!(PQsendQuery (workers .conns [i ], index_jobs [i ].create_index )))
662
+ {
663
+ elog (WARNING , "Error sending async query: %s\n%s" ,
664
+ index_jobs [i ].create_index ,
665
+ PQerrorMessage (workers .conns [i ]));
666
+ have_error = true;
667
+ goto cleanup ;
668
+ }
669
+
670
+ }
671
+ num_active_workers = i ;
672
+
673
+ /* Now go through our index builds, and look for any which is
674
+ * reported complete. Reassign that worker to the next index to
675
+ * be built, if any.
676
+ */
677
+ while (num_active_workers )
678
+ {
679
+ int freed_worker = -1 ;
680
+
681
+ for (i = 0 ; i < num_indexes ; i ++ )
682
+ {
683
+ if (index_jobs [i ].status == INPROGRESS )
684
+ {
685
+ /* Must call PQconsumeInput before we can check PQisBusy */
686
+ if (PQconsumeInput (workers .conns [index_jobs [i ].worker_idx ]) != 1 )
687
+ {
688
+ elog (WARNING , "Error fetching async query status: %s" ,
689
+ PQerrorMessage (workers .conns [index_jobs [i ].worker_idx ]));
690
+ have_error = true;
691
+ goto cleanup ;
692
+ }
693
+ if (!PQisBusy (workers .conns [index_jobs [i ].worker_idx ]))
694
+ {
695
+ elog (NOTICE , "Command finished in worker %d: %s" ,
696
+ index_jobs [i ].worker_idx ,
697
+ index_jobs [i ].create_index );
698
+
699
+ while ((res = PQgetResult (workers .conns [index_jobs [i ].worker_idx ])))
700
+ {
701
+ if (PQresultStatus (res ) != PGRES_COMMAND_OK )
702
+ {
703
+ elog (WARNING , "Error with create index: %s" ,
704
+ PQerrorMessage (workers .conns [index_jobs [i ].worker_idx ]));
705
+ PQclear (res );
706
+ have_error = true;
707
+ goto cleanup ;
708
+ }
709
+ PQclear (res );
710
+ }
711
+
712
+ freed_worker = index_jobs [i ].worker_idx ;
713
+ index_jobs [i ].status = FINISHED ;
714
+ num_active_workers -- ;
715
+ break ;
716
+ }
717
+ }
718
+ }
719
+ if (freed_worker > -1 )
720
+ {
721
+ for (i = 0 ; i < num_indexes ; i ++ )
722
+ {
723
+ if (index_jobs [i ].status == UNPROCESSED )
724
+ {
725
+ index_jobs [i ].status = INPROGRESS ;
726
+ index_jobs [i ].worker_idx = freed_worker ;
727
+ elog (NOTICE , "Assigning worker %d execute job %d: %s" ,
728
+ freed_worker , i , index_jobs [i ].create_index );
729
+
730
+ if (!(PQsendQuery (workers .conns [freed_worker ],
731
+ index_jobs [i ].create_index ))) {
732
+ elog (WARNING , "Error sending async query: %s\n%s" ,
733
+ index_jobs [i ].create_index ,
734
+ PQerrorMessage (workers .conns [freed_worker ]));
735
+ have_error = true;
736
+ goto cleanup ;
737
+ }
738
+ num_active_workers ++ ;
739
+ break ;
740
+ }
741
+ }
742
+ freed_worker = -1 ;
743
+ }
744
+ sleep (1 );
745
+ }
746
+
747
+ }
748
+
749
+ cleanup :
750
+ return (!have_error );
751
+ }
752
+
753
+
561
754
/*
562
755
* Re-organize one table.
563
756
*/
@@ -567,8 +760,8 @@ repack_one_table(const repack_table *table, const char *orderby)
567
760
PGresult * res ;
568
761
const char * params [2 ];
569
762
int num ;
570
- int i ;
571
763
int num_waiting = 0 ;
764
+
572
765
char * vxid = NULL ;
573
766
char buffer [12 ];
574
767
StringInfoData sql ;
@@ -665,7 +858,14 @@ repack_one_table(const repack_table *table, const char *orderby)
665
858
printfStringInfo (& sql , "LOCK TABLE %s IN ACCESS SHARE MODE" ,
666
859
table -> target_name );
667
860
elog (DEBUG2 , "LOCK TABLE %s IN ACCESS SHARE MODE" , table -> target_name );
668
- if (!(PQsendQuery (conn2 , sql .data ))) {
861
+ if (PQsetnonblocking (conn2 , 1 ))
862
+ {
863
+ elog (WARNING , "Unable to set conn2 nonblocking." );
864
+ have_error = true;
865
+ goto cleanup ;
866
+ }
867
+ if (!(PQsendQuery (conn2 , sql .data )))
868
+ {
669
869
elog (WARNING , "Error sending async query: %s\n%s" , sql .data ,
670
870
PQerrorMessage (conn2 ));
671
871
have_error = true;
@@ -710,6 +910,14 @@ repack_one_table(const repack_table *table, const char *orderby)
710
910
PQclear (res );
711
911
}
712
912
913
+ /* Turn conn2 back into blocking mode for further non-async use. */
914
+ if (PQsetnonblocking (conn2 , 0 ))
915
+ {
916
+ elog (WARNING , "Unable to set conn2 blocking." );
917
+ have_error = true;
918
+ goto cleanup ;
919
+ }
920
+
713
921
/*
714
922
* 2. Copy tuples into temp table.
715
923
*/
@@ -775,44 +983,11 @@ repack_one_table(const repack_table *table, const char *orderby)
775
983
/*
776
984
* 3. Create indexes on temp table.
777
985
*/
778
- elog (DEBUG2 , "---- create indexes ----" );
779
-
780
- params [0 ] = utoa (table -> target_oid , buffer );
781
- res = execute ("SELECT indexrelid,"
782
- " repack.repack_indexdef(indexrelid, indrelid),"
783
- " indisvalid,"
784
- " pg_get_indexdef(indexrelid)"
785
- " FROM pg_index WHERE indrelid = $1" , 1 , params );
786
-
787
- num = PQntuples (res );
788
- for (i = 0 ; i < num ; i ++ )
789
- {
790
- repack_index index ;
791
- int c = 0 ;
792
- const char * isvalid ;
793
- const char * indexdef ;
794
-
795
- index .target_oid = getoid (res , i , c ++ );
796
- index .create_index = getstr (res , i , c ++ );
797
- isvalid = getstr (res , i , c ++ );
798
- indexdef = getstr (res , i , c ++ );
799
-
800
- if (isvalid && isvalid [0 ] == 'f' ) {
801
- elog (WARNING , "skipping invalid index: %s" , indexdef );
802
- continue ;
803
- }
804
-
805
- elog (DEBUG2 , "[%d]" , i );
806
- elog (DEBUG2 , "target_oid : %u" , index .target_oid );
807
- elog (DEBUG2 , "create_index : %s" , index .create_index );
808
-
809
- /*
810
- * NOTE: If we want to create multiple indexes in parallel,
811
- * we need to call create_index in multiple connections.
812
- */
813
- command (index .create_index , 0 , NULL );
986
+ if (!rebuild_indexes (table )) {
987
+ have_error = true;
988
+ goto cleanup ;
814
989
}
815
- PQclear ( res );
990
+
816
991
817
992
/*
818
993
* 4. Apply log to temp table until no tuples are left in the log
@@ -1172,6 +1347,7 @@ pgut_help(bool details)
1172
1347
1173
1348
printf ("Options:\n" );
1174
1349
printf (" -a, --all repack all databases\n" );
1350
+ printf (" -j --jobs Use this many parallel jobs" );
1175
1351
printf (" -n, --no-order do vacuum full instead of cluster\n" );
1176
1352
printf (" -o, --order-by=COLUMNS order by columns instead of cluster keys\n" );
1177
1353
printf (" -t, --table=TABLE repack specific table only\n" );
0 commit comments