34
34
#include "compress_io.h"
35
35
#include "dumputils.h"
36
36
#include "fe_utils/string_utils.h"
37
+ #include "lib/binaryheap.h"
37
38
#include "lib/stringinfo.h"
38
39
#include "libpq/libpq-fs.h"
39
40
#include "parallel.h"
44
45
#define TEXT_DUMP_HEADER "--\n-- PostgreSQL database dump\n--\n\n"
45
46
#define TEXT_DUMPALL_HEADER "--\n-- PostgreSQL database cluster dump\n--\n\n"
46
47
47
- /*
48
- * State for tracking TocEntrys that are ready to process during a parallel
49
- * restore. (This used to be a list, and we still call it that, though now
50
- * it's really an array so that we can apply qsort to it.)
51
- *
52
- * tes[] is sized large enough that we can't overrun it.
53
- * The valid entries are indexed first_te .. last_te inclusive.
54
- * We periodically sort the array to bring larger-by-dataLength entries to
55
- * the front; "sorted" is true if the valid entries are known sorted.
56
- */
57
- typedef struct _parallelReadyList
58
- {
59
- TocEntry * * tes ; /* Ready-to-dump TocEntrys */
60
- int first_te ; /* index of first valid entry in tes[] */
61
- int last_te ; /* index of last valid entry in tes[] */
62
- bool sorted ; /* are valid entries currently sorted? */
63
- } ParallelReadyList ;
64
-
65
48
66
49
static ArchiveHandle * _allocAH (const char * FileSpec , const ArchiveFormat fmt ,
67
50
const pg_compress_specification compression_spec ,
@@ -111,16 +94,12 @@ static void restore_toc_entries_postfork(ArchiveHandle *AH,
111
94
static void pending_list_header_init (TocEntry * l );
112
95
static void pending_list_append (TocEntry * l , TocEntry * te );
113
96
static void pending_list_remove (TocEntry * te );
114
- static void ready_list_init (ParallelReadyList * ready_list , int tocCount );
115
- static void ready_list_free (ParallelReadyList * ready_list );
116
- static void ready_list_insert (ParallelReadyList * ready_list , TocEntry * te );
117
- static void ready_list_remove (ParallelReadyList * ready_list , int i );
118
- static void ready_list_sort (ParallelReadyList * ready_list );
119
- static int TocEntrySizeCompare (const void * p1 , const void * p2 );
120
- static void move_to_ready_list (TocEntry * pending_list ,
121
- ParallelReadyList * ready_list ,
97
+ static int TocEntrySizeCompareQsort (const void * p1 , const void * p2 );
98
+ static int TocEntrySizeCompareBinaryheap (void * p1 , void * p2 , void * arg );
99
+ static void move_to_ready_heap (TocEntry * pending_list ,
100
+ binaryheap * ready_heap ,
122
101
RestorePass pass );
123
- static TocEntry * pop_next_work_item (ParallelReadyList * ready_list ,
102
+ static TocEntry * pop_next_work_item (binaryheap * ready_heap ,
124
103
ParallelState * pstate );
125
104
static void mark_dump_job_done (ArchiveHandle * AH ,
126
105
TocEntry * te ,
@@ -135,7 +114,7 @@ static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2);
135
114
static void repoint_table_dependencies (ArchiveHandle * AH );
136
115
static void identify_locking_dependencies (ArchiveHandle * AH , TocEntry * te );
137
116
static void reduce_dependencies (ArchiveHandle * AH , TocEntry * te ,
138
- ParallelReadyList * ready_list );
117
+ binaryheap * ready_heap );
139
118
static void mark_create_done (ArchiveHandle * AH , TocEntry * te );
140
119
static void inhibit_data_for_failed_table (ArchiveHandle * AH , TocEntry * te );
141
120
@@ -2384,7 +2363,7 @@ WriteDataChunks(ArchiveHandle *AH, ParallelState *pstate)
2384
2363
}
2385
2364
2386
2365
if (ntes > 1 )
2387
- qsort (tes , ntes , sizeof (TocEntry * ), TocEntrySizeCompare );
2366
+ qsort (tes , ntes , sizeof (TocEntry * ), TocEntrySizeCompareQsort );
2388
2367
2389
2368
for (int i = 0 ; i < ntes ; i ++ )
2390
2369
DispatchJobForTocEntry (AH , pstate , tes [i ], ACT_DUMP ,
@@ -3984,7 +3963,7 @@ restore_toc_entries_prefork(ArchiveHandle *AH, TocEntry *pending_list)
3984
3963
3985
3964
(void ) restore_toc_entry (AH , next_work_item , false);
3986
3965
3987
- /* Reduce dependencies, but don't move anything to ready_list */
3966
+ /* Reduce dependencies, but don't move anything to ready_heap */
3988
3967
reduce_dependencies (AH , next_work_item , NULL );
3989
3968
}
3990
3969
else
@@ -4027,24 +4006,26 @@ static void
4027
4006
restore_toc_entries_parallel (ArchiveHandle * AH , ParallelState * pstate ,
4028
4007
TocEntry * pending_list )
4029
4008
{
4030
- ParallelReadyList ready_list ;
4009
+ binaryheap * ready_heap ;
4031
4010
TocEntry * next_work_item ;
4032
4011
4033
4012
pg_log_debug ("entering restore_toc_entries_parallel" );
4034
4013
4035
- /* Set up ready_list with enough room for all known TocEntrys */
4036
- ready_list_init (& ready_list , AH -> tocCount );
4014
+ /* Set up ready_heap with enough room for all known TocEntrys */
4015
+ ready_heap = binaryheap_allocate (AH -> tocCount ,
4016
+ TocEntrySizeCompareBinaryheap ,
4017
+ NULL );
4037
4018
4038
4019
/*
4039
4020
* The pending_list contains all items that we need to restore. Move all
4040
- * items that are available to process immediately into the ready_list .
4021
+ * items that are available to process immediately into the ready_heap .
4041
4022
* After this setup, the pending list is everything that needs to be done
4042
- * but is blocked by one or more dependencies, while the ready list
4023
+ * but is blocked by one or more dependencies, while the ready heap
4043
4024
* contains items that have no remaining dependencies and are OK to
4044
4025
* process in the current restore pass.
4045
4026
*/
4046
4027
AH -> restorePass = RESTORE_PASS_MAIN ;
4047
- move_to_ready_list (pending_list , & ready_list , AH -> restorePass );
4028
+ move_to_ready_heap (pending_list , ready_heap , AH -> restorePass );
4048
4029
4049
4030
/*
4050
4031
* main parent loop
@@ -4058,7 +4039,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
4058
4039
for (;;)
4059
4040
{
4060
4041
/* Look for an item ready to be dispatched to a worker */
4061
- next_work_item = pop_next_work_item (& ready_list , pstate );
4042
+ next_work_item = pop_next_work_item (ready_heap , pstate );
4062
4043
if (next_work_item != NULL )
4063
4044
{
4064
4045
/* If not to be restored, don't waste time launching a worker */
@@ -4068,7 +4049,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
4068
4049
next_work_item -> dumpId ,
4069
4050
next_work_item -> desc , next_work_item -> tag );
4070
4051
/* Update its dependencies as though we'd completed it */
4071
- reduce_dependencies (AH , next_work_item , & ready_list );
4052
+ reduce_dependencies (AH , next_work_item , ready_heap );
4072
4053
/* Loop around to see if anything else can be dispatched */
4073
4054
continue ;
4074
4055
}
@@ -4079,7 +4060,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
4079
4060
4080
4061
/* Dispatch to some worker */
4081
4062
DispatchJobForTocEntry (AH , pstate , next_work_item , ACT_RESTORE ,
4082
- mark_restore_job_done , & ready_list );
4063
+ mark_restore_job_done , ready_heap );
4083
4064
}
4084
4065
else if (IsEveryWorkerIdle (pstate ))
4085
4066
{
@@ -4093,7 +4074,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
4093
4074
/* Advance to next restore pass */
4094
4075
AH -> restorePass ++ ;
4095
4076
/* That probably allows some stuff to be made ready */
4096
- move_to_ready_list (pending_list , & ready_list , AH -> restorePass );
4077
+ move_to_ready_heap (pending_list , ready_heap , AH -> restorePass );
4097
4078
/* Loop around to see if anything's now ready */
4098
4079
continue ;
4099
4080
}
@@ -4122,10 +4103,10 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
4122
4103
next_work_item ? WFW_ONE_IDLE : WFW_GOT_STATUS );
4123
4104
}
4124
4105
4125
- /* There should now be nothing in ready_list . */
4126
- Assert (ready_list . first_te > ready_list . last_te );
4106
+ /* There should now be nothing in ready_heap . */
4107
+ Assert (binaryheap_empty ( ready_heap ) );
4127
4108
4128
- ready_list_free ( & ready_list );
4109
+ binaryheap_free ( ready_heap );
4129
4110
4130
4111
pg_log_info ("finished main parallel loop" );
4131
4112
}
@@ -4225,80 +4206,9 @@ pending_list_remove(TocEntry *te)
4225
4206
}
4226
4207
4227
4208
4228
- /*
4229
- * Initialize the ready_list with enough room for up to tocCount entries.
4230
- */
4231
- static void
4232
- ready_list_init (ParallelReadyList * ready_list , int tocCount )
4233
- {
4234
- ready_list -> tes = (TocEntry * * )
4235
- pg_malloc (tocCount * sizeof (TocEntry * ));
4236
- ready_list -> first_te = 0 ;
4237
- ready_list -> last_te = -1 ;
4238
- ready_list -> sorted = false;
4239
- }
4240
-
4241
- /*
4242
- * Free storage for a ready_list.
4243
- */
4244
- static void
4245
- ready_list_free (ParallelReadyList * ready_list )
4246
- {
4247
- pg_free (ready_list -> tes );
4248
- }
4249
-
4250
- /* Add te to the ready_list */
4251
- static void
4252
- ready_list_insert (ParallelReadyList * ready_list , TocEntry * te )
4253
- {
4254
- ready_list -> tes [++ ready_list -> last_te ] = te ;
4255
- /* List is (probably) not sorted anymore. */
4256
- ready_list -> sorted = false;
4257
- }
4258
-
4259
- /* Remove the i'th entry in the ready_list */
4260
- static void
4261
- ready_list_remove (ParallelReadyList * ready_list , int i )
4262
- {
4263
- int f = ready_list -> first_te ;
4264
-
4265
- Assert (i >= f && i <= ready_list -> last_te );
4266
-
4267
- /*
4268
- * In the typical case where the item to be removed is the first ready
4269
- * entry, we need only increment first_te to remove it. Otherwise, move
4270
- * the entries before it to compact the list. (This preserves sortedness,
4271
- * if any.) We could alternatively move the entries after i, but there
4272
- * are typically many more of those.
4273
- */
4274
- if (i > f )
4275
- {
4276
- TocEntry * * first_te_ptr = & ready_list -> tes [f ];
4277
-
4278
- memmove (first_te_ptr + 1 , first_te_ptr , (i - f ) * sizeof (TocEntry * ));
4279
- }
4280
- ready_list -> first_te ++ ;
4281
- }
4282
-
4283
- /* Sort the ready_list into the desired order */
4284
- static void
4285
- ready_list_sort (ParallelReadyList * ready_list )
4286
- {
4287
- if (!ready_list -> sorted )
4288
- {
4289
- int n = ready_list -> last_te - ready_list -> first_te + 1 ;
4290
-
4291
- if (n > 1 )
4292
- qsort (ready_list -> tes + ready_list -> first_te , n ,
4293
- sizeof (TocEntry * ),
4294
- TocEntrySizeCompare );
4295
- ready_list -> sorted = true;
4296
- }
4297
- }
4298
-
4299
4209
/* qsort comparator for sorting TocEntries by dataLength */
4300
4210
static int
4301
- TocEntrySizeCompare (const void * p1 , const void * p2 )
4211
+ TocEntrySizeCompareQsort (const void * p1 , const void * p2 )
4302
4212
{
4303
4213
const TocEntry * te1 = * (const TocEntry * const * ) p1 ;
4304
4214
const TocEntry * te2 = * (const TocEntry * const * ) p2 ;
@@ -4318,17 +4228,25 @@ TocEntrySizeCompare(const void *p1, const void *p2)
4318
4228
return 0 ;
4319
4229
}
4320
4230
4231
+ /* binaryheap comparator for sorting TocEntries by dataLength */
4232
+ static int
4233
+ TocEntrySizeCompareBinaryheap (void * p1 , void * p2 , void * arg )
4234
+ {
4235
+ /* return opposite of qsort comparator for max-heap */
4236
+ return - TocEntrySizeCompareQsort (& p1 , & p2 );
4237
+ }
4238
+
4321
4239
4322
4240
/*
4323
- * Move all immediately-ready items from pending_list to ready_list .
4241
+ * Move all immediately-ready items from pending_list to ready_heap .
4324
4242
*
4325
4243
* Items are considered ready if they have no remaining dependencies and
4326
4244
* they belong in the current restore pass. (See also reduce_dependencies,
4327
4245
* which applies the same logic one-at-a-time.)
4328
4246
*/
4329
4247
static void
4330
- move_to_ready_list (TocEntry * pending_list ,
4331
- ParallelReadyList * ready_list ,
4248
+ move_to_ready_heap (TocEntry * pending_list ,
4249
+ binaryheap * ready_heap ,
4332
4250
RestorePass pass )
4333
4251
{
4334
4252
TocEntry * te ;
@@ -4344,38 +4262,38 @@ move_to_ready_list(TocEntry *pending_list,
4344
4262
{
4345
4263
/* Remove it from pending_list ... */
4346
4264
pending_list_remove (te );
4347
- /* ... and add to ready_list */
4348
- ready_list_insert ( ready_list , te );
4265
+ /* ... and add to ready_heap */
4266
+ binaryheap_add ( ready_heap , te );
4349
4267
}
4350
4268
}
4351
4269
}
4352
4270
4353
4271
/*
4354
4272
* Find the next work item (if any) that is capable of being run now,
4355
- * and remove it from the ready_list .
4273
+ * and remove it from the ready_heap .
4356
4274
*
4357
4275
* Returns the item, or NULL if nothing is runnable.
4358
4276
*
4359
4277
* To qualify, the item must have no remaining dependencies
4360
4278
* and no requirements for locks that are incompatible with
4361
- * items currently running. Items in the ready_list are known to have
4279
+ * items currently running. Items in the ready_heap are known to have
4362
4280
* no remaining dependencies, but we have to check for lock conflicts.
4363
4281
*/
4364
4282
static TocEntry *
4365
- pop_next_work_item (ParallelReadyList * ready_list ,
4283
+ pop_next_work_item (binaryheap * ready_heap ,
4366
4284
ParallelState * pstate )
4367
4285
{
4368
4286
/*
4369
- * Sort the ready_list so that we'll tackle larger jobs first.
4370
- */
4371
- ready_list_sort ( ready_list );
4372
-
4373
- /*
4374
- * Search the ready_list until we find a suitable item .
4287
+ * Search the ready_heap until we find a suitable item. Note that we do a
4288
+ * sequential scan through the heap nodes, so even though we will first
4289
+ * try to choose the highest-priority item, we might end up picking
4290
+ * something with a much lower priority. However, we expect that we will
4291
+ * typically be able to pick one of the first few items, which should
4292
+ * usually have a relatively high priority .
4375
4293
*/
4376
- for (int i = ready_list -> first_te ; i <= ready_list -> last_te ; i ++ )
4294
+ for (int i = 0 ; i < binaryheap_size ( ready_heap ) ; i ++ )
4377
4295
{
4378
- TocEntry * te = ready_list -> tes [ i ] ;
4296
+ TocEntry * te = ( TocEntry * ) binaryheap_get_node ( ready_heap , i ) ;
4379
4297
bool conflicts = false;
4380
4298
4381
4299
/*
@@ -4401,7 +4319,7 @@ pop_next_work_item(ParallelReadyList *ready_list,
4401
4319
continue ;
4402
4320
4403
4321
/* passed all tests, so this item can run */
4404
- ready_list_remove ( ready_list , i );
4322
+ binaryheap_remove_node ( ready_heap , i );
4405
4323
return te ;
4406
4324
}
4407
4325
@@ -4447,7 +4365,7 @@ mark_restore_job_done(ArchiveHandle *AH,
4447
4365
int status ,
4448
4366
void * callback_data )
4449
4367
{
4450
- ParallelReadyList * ready_list = (ParallelReadyList * ) callback_data ;
4368
+ binaryheap * ready_heap = (binaryheap * ) callback_data ;
4451
4369
4452
4370
pg_log_info ("finished item %d %s %s" ,
4453
4371
te -> dumpId , te -> desc , te -> tag );
@@ -4465,7 +4383,7 @@ mark_restore_job_done(ArchiveHandle *AH,
4465
4383
pg_fatal ("worker process failed: exit code %d" ,
4466
4384
status );
4467
4385
4468
- reduce_dependencies (AH , te , ready_list );
4386
+ reduce_dependencies (AH , te , ready_heap );
4469
4387
}
4470
4388
4471
4389
@@ -4708,11 +4626,11 @@ identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te)
4708
4626
/*
4709
4627
* Remove the specified TOC entry from the depCounts of items that depend on
4710
4628
* it, thereby possibly making them ready-to-run. Any pending item that
4711
- * becomes ready should be moved to the ready_list , if that's provided.
4629
+ * becomes ready should be moved to the ready_heap , if that's provided.
4712
4630
*/
4713
4631
static void
4714
4632
reduce_dependencies (ArchiveHandle * AH , TocEntry * te ,
4715
- ParallelReadyList * ready_list )
4633
+ binaryheap * ready_heap )
4716
4634
{
4717
4635
int i ;
4718
4636
@@ -4730,18 +4648,18 @@ reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
4730
4648
* the current restore pass, and it is currently a member of the
4731
4649
* pending list (that check is needed to prevent double restore in
4732
4650
* some cases where a list-file forces out-of-order restoring).
4733
- * However, if ready_list == NULL then caller doesn't want any list
4651
+ * However, if ready_heap == NULL then caller doesn't want any list
4734
4652
* memberships changed.
4735
4653
*/
4736
4654
if (otherte -> depCount == 0 &&
4737
4655
_tocEntryRestorePass (otherte ) == AH -> restorePass &&
4738
4656
otherte -> pending_prev != NULL &&
4739
- ready_list != NULL )
4657
+ ready_heap != NULL )
4740
4658
{
4741
4659
/* Remove it from pending list ... */
4742
4660
pending_list_remove (otherte );
4743
- /* ... and add to ready_list */
4744
- ready_list_insert ( ready_list , otherte );
4661
+ /* ... and add to ready_heap */
4662
+ binaryheap_add ( ready_heap , otherte );
4745
4663
}
4746
4664
}
4747
4665
}
0 commit comments