@@ -137,20 +137,31 @@ typedef struct repack_table
137
137
const char * sql_pop ; /* SQL used in flush */
138
138
} repack_table ;
139
139
140
+
141
+ typedef enum
142
+ {
143
+ UNPROCESSED ,
144
+ INPROGRESS ,
145
+ FINISHED
146
+ } index_status_t ;
147
+
140
148
/*
141
149
* per-index information
142
150
*/
143
151
typedef struct repack_index
144
152
{
145
153
Oid target_oid ; /* target: OID */
146
154
const char * create_index ; /* CREATE INDEX */
155
+ index_status_t status ; /* Track parallel build statuses. */
156
+ int worker_idx ; /* which worker conn is handling */
147
157
} repack_index ;
148
158
149
159
static bool is_superuser (void );
150
160
static void repack_all_databases (const char * order_by );
151
161
static bool repack_one_database (const char * order_by , char * errbuf , size_t errsize );
152
162
static void repack_one_table (const repack_table * table , const char * order_by );
153
163
static void repack_cleanup (bool fatal , const repack_table * table );
164
+ static bool rebuild_indexes (const repack_table * table );
154
165
155
166
static char * getstr (PGresult * res , int row , int col );
156
167
static Oid getoid (PGresult * res , int row , int col );
@@ -172,6 +183,7 @@ static bool noorder = false;
172
183
static SimpleStringList table_list = {NULL , NULL };
173
184
static char * orderby = NULL ;
174
185
static int wait_timeout = 60 ; /* in seconds */
186
+ static int jobs = 0 ; /* number of concurrent worker conns. */
175
187
176
188
/* buffer should have at least 11 bytes */
177
189
static char *
@@ -189,6 +201,7 @@ static pgut_option options[] =
189
201
{ 's' , 'o' , "order-by" , & orderby },
190
202
{ 'i' , 'T' , "wait-timeout" , & wait_timeout },
191
203
{ 'B' , 'Z' , "no-analyze" , & analyze },
204
+ { 'i' , 'j' , "jobs" , & jobs },
192
205
{ 0 },
193
206
};
194
207
@@ -320,7 +333,7 @@ getoid(PGresult *res, int row, int col)
320
333
}
321
334
322
335
/*
323
- * Call repack_one_table for the target table or each table in a database.
336
+ * Call repack_one_table for the target tables or each table in a database.
324
337
*/
325
338
static bool
326
339
repack_one_database (const char * orderby , char * errbuf , size_t errsize )
@@ -346,6 +359,10 @@ repack_one_database(const char *orderby, char *errbuf, size_t errsize)
346
359
347
360
reconnect (ERROR );
348
361
362
+ /* No sense in setting up concurrent workers if --jobs=1 */
363
+ if (jobs > 1 )
364
+ setup_workers (jobs );
365
+
349
366
if (!is_superuser ()) {
350
367
if (errbuf )
351
368
snprintf (errbuf , errsize , "You must be a superuser to use %s" ,
@@ -559,6 +576,181 @@ apply_log(PGconn *conn, const repack_table *table, int count)
559
576
return result ;
560
577
}
561
578
579
+ /*
580
+ * Create indexes on temp table, possibly using multiple worker connections
581
+ * concurrently if the user asked for --jobs=...
582
+ */
583
+ static bool
584
+ rebuild_indexes (const repack_table * table )
585
+ {
586
+ PGresult * res ;
587
+ const char * params [1 ];
588
+ int num_indexes ;
589
+ int i ;
590
+ int num_active_workers = 0 ;
591
+ repack_index * index_jobs ;
592
+ char buffer [12 ];
593
+ bool have_error = false;
594
+
595
+ elog (DEBUG2 , "---- create indexes ----" );
596
+
597
+ params [0 ] = utoa (table -> target_oid , buffer );
598
+ res = execute ("SELECT indexrelid,"
599
+ " repack.repack_indexdef(indexrelid, indrelid), "
600
+ " pg_get_indexdef(indexrelid)"
601
+ " FROM pg_index WHERE indrelid = $1 AND indisvalid" , 1 , params );
602
+
603
+ num_indexes = PQntuples (res );
604
+ elog (DEBUG2 , "Have %d indexes and num_workers=%d" , num_indexes ,
605
+ workers .num_workers );
606
+
607
+ index_jobs = pgut_malloc (sizeof (repack_index ) * num_indexes );
608
+
609
+ for (i = 0 ; i < num_indexes ; i ++ )
610
+ {
611
+ int c = 0 ;
612
+ const char * indexdef ;
613
+
614
+ index_jobs [i ].target_oid = getoid (res , i , c ++ );
615
+ index_jobs [i ].create_index = getstr (res , i , c ++ );
616
+ index_jobs [i ].status = UNPROCESSED ;
617
+ index_jobs [i ].worker_idx = -1 ; /* Unassigned */
618
+
619
+ indexdef = getstr (res , i , c ++ );
620
+
621
+ elog (DEBUG2 , "set up index_jobs [%d]" , i );
622
+ elog (DEBUG2 , "target_oid : %u" , index_jobs [i ].target_oid );
623
+ elog (DEBUG2 , "create_index : %s" , index_jobs [i ].create_index );
624
+
625
+ if (workers .num_workers <= 1 ) {
626
+ /* Use primary connection if we are not setting up parallel
627
+ * index building, or if we only have one worker.
628
+ */
629
+ command (index_jobs [i ].create_index , 0 , NULL );
630
+
631
+ /* This bookkeeping isn't actually important in this no-workers
632
+ * case, but just for clarity.
633
+ */
634
+ index_jobs [i ].status = FINISHED ;
635
+ }
636
+ }
637
+ PQclear (res );
638
+
639
+ if (workers .num_workers > 1 )
640
+ {
641
+ /* First time through, assign every available worker to build an index.
642
+ */
643
+ for (i = 0 ; i < num_indexes && i < workers .num_workers ; i ++ )
644
+ {
645
+ index_jobs [i ].status = INPROGRESS ;
646
+ index_jobs [i ].worker_idx = i ;
647
+ elog (DEBUG2 , "Worker %d building index: %s" , i ,
648
+ index_jobs [i ].create_index );
649
+
650
+ /* Make sure each worker connection can work in non-blocking
651
+ * mode.
652
+ */
653
+ if (PQsetnonblocking (workers .conns [i ], 1 ))
654
+ {
655
+ elog (WARNING , "Unable to set worker connection %d "
656
+ "non-blocking." , i );
657
+ have_error = true;
658
+ goto cleanup ;
659
+ }
660
+
661
+ if (!(PQsendQuery (workers .conns [i ], index_jobs [i ].create_index )))
662
+ {
663
+ elog (WARNING , "Error sending async query: %s\n%s" ,
664
+ index_jobs [i ].create_index ,
665
+ PQerrorMessage (workers .conns [i ]));
666
+ have_error = true;
667
+ goto cleanup ;
668
+ }
669
+
670
+ }
671
+ num_active_workers = i ;
672
+
673
+ /* Now go through our index builds, and look for any which is
674
+ * reported complete. Reassign that worker to the next index to
675
+ * be built, if any.
676
+ */
677
+ while (num_active_workers )
678
+ {
679
+ int freed_worker = -1 ;
680
+
681
+ for (i = 0 ; i < num_indexes ; i ++ )
682
+ {
683
+ if (index_jobs [i ].status == INPROGRESS )
684
+ {
685
+ /* Must call PQconsumeInput before we can check PQisBusy */
686
+ if (PQconsumeInput (workers .conns [index_jobs [i ].worker_idx ]) != 1 )
687
+ {
688
+ elog (WARNING , "Error fetching async query status: %s" ,
689
+ PQerrorMessage (workers .conns [index_jobs [i ].worker_idx ]));
690
+ have_error = true;
691
+ goto cleanup ;
692
+ }
693
+ if (!PQisBusy (workers .conns [index_jobs [i ].worker_idx ]))
694
+ {
695
+ elog (NOTICE , "Command finished in worker %d: %s" ,
696
+ index_jobs [i ].worker_idx ,
697
+ index_jobs [i ].create_index );
698
+
699
+ while ((res = PQgetResult (workers .conns [index_jobs [i ].worker_idx ])))
700
+ {
701
+ if (PQresultStatus (res ) != PGRES_COMMAND_OK )
702
+ {
703
+ elog (WARNING , "Error with create index: %s" ,
704
+ PQerrorMessage (workers .conns [index_jobs [i ].worker_idx ]));
705
+ PQclear (res );
706
+ have_error = true;
707
+ goto cleanup ;
708
+ }
709
+ PQclear (res );
710
+ }
711
+
712
+ freed_worker = index_jobs [i ].worker_idx ;
713
+ index_jobs [i ].status = FINISHED ;
714
+ num_active_workers -- ;
715
+ break ;
716
+ }
717
+ }
718
+ }
719
+ if (freed_worker > -1 )
720
+ {
721
+ for (i = 0 ; i < num_indexes ; i ++ )
722
+ {
723
+ if (index_jobs [i ].status == UNPROCESSED )
724
+ {
725
+ index_jobs [i ].status = INPROGRESS ;
726
+ index_jobs [i ].worker_idx = freed_worker ;
727
+ elog (NOTICE , "Assigning worker %d execute job %d: %s" ,
728
+ freed_worker , i , index_jobs [i ].create_index );
729
+
730
+ if (!(PQsendQuery (workers .conns [freed_worker ],
731
+ index_jobs [i ].create_index ))) {
732
+ elog (WARNING , "Error sending async query: %s\n%s" ,
733
+ index_jobs [i ].create_index ,
734
+ PQerrorMessage (workers .conns [freed_worker ]));
735
+ have_error = true;
736
+ goto cleanup ;
737
+ }
738
+ num_active_workers ++ ;
739
+ break ;
740
+ }
741
+ }
742
+ freed_worker = -1 ;
743
+ }
744
+ sleep (1 );
745
+ }
746
+
747
+ }
748
+
749
+ cleanup :
750
+ return (!have_error );
751
+ }
752
+
753
+
562
754
/*
563
755
* Re-organize one table.
564
756
*/
@@ -568,8 +760,8 @@ repack_one_table(const repack_table *table, const char *orderby)
568
760
PGresult * res ;
569
761
const char * params [2 ];
570
762
int num ;
571
- int i ;
572
763
int num_waiting = 0 ;
764
+
573
765
char * vxid = NULL ;
574
766
char buffer [12 ];
575
767
StringInfoData sql ;
@@ -674,7 +866,14 @@ repack_one_table(const repack_table *table, const char *orderby)
674
866
printfStringInfo (& sql , "LOCK TABLE %s IN ACCESS SHARE MODE" ,
675
867
table -> target_name );
676
868
elog (DEBUG2 , "LOCK TABLE %s IN ACCESS SHARE MODE" , table -> target_name );
677
- if (!(PQsendQuery (conn2 , sql .data ))) {
869
+ if (PQsetnonblocking (conn2 , 1 ))
870
+ {
871
+ elog (WARNING , "Unable to set conn2 nonblocking." );
872
+ have_error = true;
873
+ goto cleanup ;
874
+ }
875
+ if (!(PQsendQuery (conn2 , sql .data )))
876
+ {
678
877
elog (WARNING , "Error sending async query: %s\n%s" , sql .data ,
679
878
PQerrorMessage (conn2 ));
680
879
have_error = true;
@@ -725,6 +924,14 @@ repack_one_table(const repack_table *table, const char *orderby)
725
924
PQclear (res );
726
925
}
727
926
927
+ /* Turn conn2 back into blocking mode for further non-async use. */
928
+ if (PQsetnonblocking (conn2 , 0 ))
929
+ {
930
+ elog (WARNING , "Unable to set conn2 blocking." );
931
+ have_error = true;
932
+ goto cleanup ;
933
+ }
934
+
728
935
/*
729
936
* 2. Copy tuples into temp table.
730
937
*/
@@ -790,44 +997,11 @@ repack_one_table(const repack_table *table, const char *orderby)
790
997
/*
791
998
* 3. Create indexes on temp table.
792
999
*/
793
- elog (DEBUG2 , "---- create indexes ----" );
794
-
795
- params [0 ] = utoa (table -> target_oid , buffer );
796
- res = execute ("SELECT indexrelid,"
797
- " repack.repack_indexdef(indexrelid, indrelid),"
798
- " indisvalid,"
799
- " pg_get_indexdef(indexrelid)"
800
- " FROM pg_index WHERE indrelid = $1" , 1 , params );
801
-
802
- num = PQntuples (res );
803
- for (i = 0 ; i < num ; i ++ )
804
- {
805
- repack_index index ;
806
- int c = 0 ;
807
- const char * isvalid ;
808
- const char * indexdef ;
809
-
810
- index .target_oid = getoid (res , i , c ++ );
811
- index .create_index = getstr (res , i , c ++ );
812
- isvalid = getstr (res , i , c ++ );
813
- indexdef = getstr (res , i , c ++ );
814
-
815
- if (isvalid && isvalid [0 ] == 'f' ) {
816
- elog (WARNING , "skipping invalid index: %s" , indexdef );
817
- continue ;
818
- }
819
-
820
- elog (DEBUG2 , "[%d]" , i );
821
- elog (DEBUG2 , "target_oid : %u" , index .target_oid );
822
- elog (DEBUG2 , "create_index : %s" , index .create_index );
823
-
824
- /*
825
- * NOTE: If we want to create multiple indexes in parallel,
826
- * we need to call create_index in multiple connections.
827
- */
828
- command (index .create_index , 0 , NULL );
1000
+ if (!rebuild_indexes (table )) {
1001
+ have_error = true;
1002
+ goto cleanup ;
829
1003
}
830
- PQclear ( res );
1004
+
831
1005
832
1006
/*
833
1007
* 4. Apply log to temp table until no tuples are left in the log
@@ -1187,6 +1361,7 @@ pgut_help(bool details)
1187
1361
1188
1362
printf ("Options:\n" );
1189
1363
printf (" -a, --all repack all databases\n" );
1364
+ printf (" -j --jobs Use this many parallel jobs" );
1190
1365
printf (" -n, --no-order do vacuum full instead of cluster\n" );
1191
1366
printf (" -o, --order-by=COLUMNS order by columns instead of cluster keys\n" );
1192
1367
printf (" -t, --table=TABLE repack specific table only\n" );
0 commit comments