Skip to content

Commit 9bfd44b

Browse files
Convert pg_restore's ready_list to a priority queue.
Presently, parallel restores spend a lot of time sorting this list so that we pick the largest items first. With many tables, this sorting can become a significant bottleneck. There are a couple of reports from the field about this, and it is easy to reproduce. This commit improves the performance of parallel pg_restore with many tables by converting its ready_list to a priority queue, i.e., a binary heap. We will first try to run the highest priority item, but if it cannot be chosen due to the lock heuristic, we'll do a sequential scan through the heap nodes until we find one that is runnable. This means that we might end up picking an item with a much lower priority. However, we expect that we will typically be able to pick one of the first few items, which should usually have a relatively high priority. Suggested-by: Tom Lane Tested-by: Pierre Ducroquet Reviewed-by: Tom Lane Discussion: https://postgr.es/m/3612876.1689443232%40sss.pgh.pa.us
1 parent 1f99886 commit 9bfd44b

File tree

1 file changed

+58
-140
lines changed

1 file changed

+58
-140
lines changed

src/bin/pg_dump/pg_backup_archiver.c

Lines changed: 58 additions & 140 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
#include "compress_io.h"
3535
#include "dumputils.h"
3636
#include "fe_utils/string_utils.h"
37+
#include "lib/binaryheap.h"
3738
#include "lib/stringinfo.h"
3839
#include "libpq/libpq-fs.h"
3940
#include "parallel.h"
@@ -44,24 +45,6 @@
4445
#define TEXT_DUMP_HEADER "--\n-- PostgreSQL database dump\n--\n\n"
4546
#define TEXT_DUMPALL_HEADER "--\n-- PostgreSQL database cluster dump\n--\n\n"
4647

47-
/*
48-
* State for tracking TocEntrys that are ready to process during a parallel
49-
* restore. (This used to be a list, and we still call it that, though now
50-
* it's really an array so that we can apply qsort to it.)
51-
*
52-
* tes[] is sized large enough that we can't overrun it.
53-
* The valid entries are indexed first_te .. last_te inclusive.
54-
* We periodically sort the array to bring larger-by-dataLength entries to
55-
* the front; "sorted" is true if the valid entries are known sorted.
56-
*/
57-
typedef struct _parallelReadyList
58-
{
59-
TocEntry **tes; /* Ready-to-dump TocEntrys */
60-
int first_te; /* index of first valid entry in tes[] */
61-
int last_te; /* index of last valid entry in tes[] */
62-
bool sorted; /* are valid entries currently sorted? */
63-
} ParallelReadyList;
64-
6548

6649
static ArchiveHandle *_allocAH(const char *FileSpec, const ArchiveFormat fmt,
6750
const pg_compress_specification compression_spec,
@@ -111,16 +94,12 @@ static void restore_toc_entries_postfork(ArchiveHandle *AH,
11194
static void pending_list_header_init(TocEntry *l);
11295
static void pending_list_append(TocEntry *l, TocEntry *te);
11396
static void pending_list_remove(TocEntry *te);
114-
static void ready_list_init(ParallelReadyList *ready_list, int tocCount);
115-
static void ready_list_free(ParallelReadyList *ready_list);
116-
static void ready_list_insert(ParallelReadyList *ready_list, TocEntry *te);
117-
static void ready_list_remove(ParallelReadyList *ready_list, int i);
118-
static void ready_list_sort(ParallelReadyList *ready_list);
119-
static int TocEntrySizeCompare(const void *p1, const void *p2);
120-
static void move_to_ready_list(TocEntry *pending_list,
121-
ParallelReadyList *ready_list,
97+
static int TocEntrySizeCompareQsort(const void *p1, const void *p2);
98+
static int TocEntrySizeCompareBinaryheap(void *p1, void *p2, void *arg);
99+
static void move_to_ready_heap(TocEntry *pending_list,
100+
binaryheap *ready_heap,
122101
RestorePass pass);
123-
static TocEntry *pop_next_work_item(ParallelReadyList *ready_list,
102+
static TocEntry *pop_next_work_item(binaryheap *ready_heap,
124103
ParallelState *pstate);
125104
static void mark_dump_job_done(ArchiveHandle *AH,
126105
TocEntry *te,
@@ -135,7 +114,7 @@ static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2);
135114
static void repoint_table_dependencies(ArchiveHandle *AH);
136115
static void identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te);
137116
static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
138-
ParallelReadyList *ready_list);
117+
binaryheap *ready_heap);
139118
static void mark_create_done(ArchiveHandle *AH, TocEntry *te);
140119
static void inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te);
141120

@@ -2384,7 +2363,7 @@ WriteDataChunks(ArchiveHandle *AH, ParallelState *pstate)
23842363
}
23852364

23862365
if (ntes > 1)
2387-
qsort(tes, ntes, sizeof(TocEntry *), TocEntrySizeCompare);
2366+
qsort(tes, ntes, sizeof(TocEntry *), TocEntrySizeCompareQsort);
23882367

23892368
for (int i = 0; i < ntes; i++)
23902369
DispatchJobForTocEntry(AH, pstate, tes[i], ACT_DUMP,
@@ -3984,7 +3963,7 @@ restore_toc_entries_prefork(ArchiveHandle *AH, TocEntry *pending_list)
39843963

39853964
(void) restore_toc_entry(AH, next_work_item, false);
39863965

3987-
/* Reduce dependencies, but don't move anything to ready_list */
3966+
/* Reduce dependencies, but don't move anything to ready_heap */
39883967
reduce_dependencies(AH, next_work_item, NULL);
39893968
}
39903969
else
@@ -4027,24 +4006,26 @@ static void
40274006
restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
40284007
TocEntry *pending_list)
40294008
{
4030-
ParallelReadyList ready_list;
4009+
binaryheap *ready_heap;
40314010
TocEntry *next_work_item;
40324011

40334012
pg_log_debug("entering restore_toc_entries_parallel");
40344013

4035-
/* Set up ready_list with enough room for all known TocEntrys */
4036-
ready_list_init(&ready_list, AH->tocCount);
4014+
/* Set up ready_heap with enough room for all known TocEntrys */
4015+
ready_heap = binaryheap_allocate(AH->tocCount,
4016+
TocEntrySizeCompareBinaryheap,
4017+
NULL);
40374018

40384019
/*
40394020
* The pending_list contains all items that we need to restore. Move all
4040-
* items that are available to process immediately into the ready_list.
4021+
* items that are available to process immediately into the ready_heap.
40414022
* After this setup, the pending list is everything that needs to be done
4042-
* but is blocked by one or more dependencies, while the ready list
4023+
* but is blocked by one or more dependencies, while the ready heap
40434024
* contains items that have no remaining dependencies and are OK to
40444025
* process in the current restore pass.
40454026
*/
40464027
AH->restorePass = RESTORE_PASS_MAIN;
4047-
move_to_ready_list(pending_list, &ready_list, AH->restorePass);
4028+
move_to_ready_heap(pending_list, ready_heap, AH->restorePass);
40484029

40494030
/*
40504031
* main parent loop
@@ -4058,7 +4039,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
40584039
for (;;)
40594040
{
40604041
/* Look for an item ready to be dispatched to a worker */
4061-
next_work_item = pop_next_work_item(&ready_list, pstate);
4042+
next_work_item = pop_next_work_item(ready_heap, pstate);
40624043
if (next_work_item != NULL)
40634044
{
40644045
/* If not to be restored, don't waste time launching a worker */
@@ -4068,7 +4049,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
40684049
next_work_item->dumpId,
40694050
next_work_item->desc, next_work_item->tag);
40704051
/* Update its dependencies as though we'd completed it */
4071-
reduce_dependencies(AH, next_work_item, &ready_list);
4052+
reduce_dependencies(AH, next_work_item, ready_heap);
40724053
/* Loop around to see if anything else can be dispatched */
40734054
continue;
40744055
}
@@ -4079,7 +4060,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
40794060

40804061
/* Dispatch to some worker */
40814062
DispatchJobForTocEntry(AH, pstate, next_work_item, ACT_RESTORE,
4082-
mark_restore_job_done, &ready_list);
4063+
mark_restore_job_done, ready_heap);
40834064
}
40844065
else if (IsEveryWorkerIdle(pstate))
40854066
{
@@ -4093,7 +4074,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
40934074
/* Advance to next restore pass */
40944075
AH->restorePass++;
40954076
/* That probably allows some stuff to be made ready */
4096-
move_to_ready_list(pending_list, &ready_list, AH->restorePass);
4077+
move_to_ready_heap(pending_list, ready_heap, AH->restorePass);
40974078
/* Loop around to see if anything's now ready */
40984079
continue;
40994080
}
@@ -4122,10 +4103,10 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
41224103
next_work_item ? WFW_ONE_IDLE : WFW_GOT_STATUS);
41234104
}
41244105

4125-
/* There should now be nothing in ready_list. */
4126-
Assert(ready_list.first_te > ready_list.last_te);
4106+
/* There should now be nothing in ready_heap. */
4107+
Assert(binaryheap_empty(ready_heap));
41274108

4128-
ready_list_free(&ready_list);
4109+
binaryheap_free(ready_heap);
41294110

41304111
pg_log_info("finished main parallel loop");
41314112
}
@@ -4225,80 +4206,9 @@ pending_list_remove(TocEntry *te)
42254206
}
42264207

42274208

4228-
/*
4229-
* Initialize the ready_list with enough room for up to tocCount entries.
4230-
*/
4231-
static void
4232-
ready_list_init(ParallelReadyList *ready_list, int tocCount)
4233-
{
4234-
ready_list->tes = (TocEntry **)
4235-
pg_malloc(tocCount * sizeof(TocEntry *));
4236-
ready_list->first_te = 0;
4237-
ready_list->last_te = -1;
4238-
ready_list->sorted = false;
4239-
}
4240-
4241-
/*
4242-
* Free storage for a ready_list.
4243-
*/
4244-
static void
4245-
ready_list_free(ParallelReadyList *ready_list)
4246-
{
4247-
pg_free(ready_list->tes);
4248-
}
4249-
4250-
/* Add te to the ready_list */
4251-
static void
4252-
ready_list_insert(ParallelReadyList *ready_list, TocEntry *te)
4253-
{
4254-
ready_list->tes[++ready_list->last_te] = te;
4255-
/* List is (probably) not sorted anymore. */
4256-
ready_list->sorted = false;
4257-
}
4258-
4259-
/* Remove the i'th entry in the ready_list */
4260-
static void
4261-
ready_list_remove(ParallelReadyList *ready_list, int i)
4262-
{
4263-
int f = ready_list->first_te;
4264-
4265-
Assert(i >= f && i <= ready_list->last_te);
4266-
4267-
/*
4268-
* In the typical case where the item to be removed is the first ready
4269-
* entry, we need only increment first_te to remove it. Otherwise, move
4270-
* the entries before it to compact the list. (This preserves sortedness,
4271-
* if any.) We could alternatively move the entries after i, but there
4272-
* are typically many more of those.
4273-
*/
4274-
if (i > f)
4275-
{
4276-
TocEntry **first_te_ptr = &ready_list->tes[f];
4277-
4278-
memmove(first_te_ptr + 1, first_te_ptr, (i - f) * sizeof(TocEntry *));
4279-
}
4280-
ready_list->first_te++;
4281-
}
4282-
4283-
/* Sort the ready_list into the desired order */
4284-
static void
4285-
ready_list_sort(ParallelReadyList *ready_list)
4286-
{
4287-
if (!ready_list->sorted)
4288-
{
4289-
int n = ready_list->last_te - ready_list->first_te + 1;
4290-
4291-
if (n > 1)
4292-
qsort(ready_list->tes + ready_list->first_te, n,
4293-
sizeof(TocEntry *),
4294-
TocEntrySizeCompare);
4295-
ready_list->sorted = true;
4296-
}
4297-
}
4298-
42994209
/* qsort comparator for sorting TocEntries by dataLength */
43004210
static int
4301-
TocEntrySizeCompare(const void *p1, const void *p2)
4211+
TocEntrySizeCompareQsort(const void *p1, const void *p2)
43024212
{
43034213
const TocEntry *te1 = *(const TocEntry *const *) p1;
43044214
const TocEntry *te2 = *(const TocEntry *const *) p2;
@@ -4318,17 +4228,25 @@ TocEntrySizeCompare(const void *p1, const void *p2)
43184228
return 0;
43194229
}
43204230

4231+
/* binaryheap comparator for sorting TocEntries by dataLength */
4232+
static int
4233+
TocEntrySizeCompareBinaryheap(void *p1, void *p2, void *arg)
4234+
{
4235+
/* return opposite of qsort comparator for max-heap */
4236+
return -TocEntrySizeCompareQsort(&p1, &p2);
4237+
}
4238+
43214239

43224240
/*
4323-
* Move all immediately-ready items from pending_list to ready_list.
4241+
* Move all immediately-ready items from pending_list to ready_heap.
43244242
*
43254243
* Items are considered ready if they have no remaining dependencies and
43264244
* they belong in the current restore pass. (See also reduce_dependencies,
43274245
* which applies the same logic one-at-a-time.)
43284246
*/
43294247
static void
4330-
move_to_ready_list(TocEntry *pending_list,
4331-
ParallelReadyList *ready_list,
4248+
move_to_ready_heap(TocEntry *pending_list,
4249+
binaryheap *ready_heap,
43324250
RestorePass pass)
43334251
{
43344252
TocEntry *te;
@@ -4344,38 +4262,38 @@ move_to_ready_list(TocEntry *pending_list,
43444262
{
43454263
/* Remove it from pending_list ... */
43464264
pending_list_remove(te);
4347-
/* ... and add to ready_list */
4348-
ready_list_insert(ready_list, te);
4265+
/* ... and add to ready_heap */
4266+
binaryheap_add(ready_heap, te);
43494267
}
43504268
}
43514269
}
43524270

43534271
/*
43544272
* Find the next work item (if any) that is capable of being run now,
4355-
* and remove it from the ready_list.
4273+
* and remove it from the ready_heap.
43564274
*
43574275
* Returns the item, or NULL if nothing is runnable.
43584276
*
43594277
* To qualify, the item must have no remaining dependencies
43604278
* and no requirements for locks that are incompatible with
4361-
* items currently running. Items in the ready_list are known to have
4279+
* items currently running. Items in the ready_heap are known to have
43624280
* no remaining dependencies, but we have to check for lock conflicts.
43634281
*/
43644282
static TocEntry *
4365-
pop_next_work_item(ParallelReadyList *ready_list,
4283+
pop_next_work_item(binaryheap *ready_heap,
43664284
ParallelState *pstate)
43674285
{
43684286
/*
4369-
* Sort the ready_list so that we'll tackle larger jobs first.
4370-
*/
4371-
ready_list_sort(ready_list);
4372-
4373-
/*
4374-
* Search the ready_list until we find a suitable item.
4287+
* Search the ready_heap until we find a suitable item. Note that we do a
4288+
* sequential scan through the heap nodes, so even though we will first
4289+
* try to choose the highest-priority item, we might end up picking
4290+
* something with a much lower priority. However, we expect that we will
4291+
* typically be able to pick one of the first few items, which should
4292+
* usually have a relatively high priority.
43754293
*/
4376-
for (int i = ready_list->first_te; i <= ready_list->last_te; i++)
4294+
for (int i = 0; i < binaryheap_size(ready_heap); i++)
43774295
{
4378-
TocEntry *te = ready_list->tes[i];
4296+
TocEntry *te = (TocEntry *) binaryheap_get_node(ready_heap, i);
43794297
bool conflicts = false;
43804298

43814299
/*
@@ -4401,7 +4319,7 @@ pop_next_work_item(ParallelReadyList *ready_list,
44014319
continue;
44024320

44034321
/* passed all tests, so this item can run */
4404-
ready_list_remove(ready_list, i);
4322+
binaryheap_remove_node(ready_heap, i);
44054323
return te;
44064324
}
44074325

@@ -4447,7 +4365,7 @@ mark_restore_job_done(ArchiveHandle *AH,
44474365
int status,
44484366
void *callback_data)
44494367
{
4450-
ParallelReadyList *ready_list = (ParallelReadyList *) callback_data;
4368+
binaryheap *ready_heap = (binaryheap *) callback_data;
44514369

44524370
pg_log_info("finished item %d %s %s",
44534371
te->dumpId, te->desc, te->tag);
@@ -4465,7 +4383,7 @@ mark_restore_job_done(ArchiveHandle *AH,
44654383
pg_fatal("worker process failed: exit code %d",
44664384
status);
44674385

4468-
reduce_dependencies(AH, te, ready_list);
4386+
reduce_dependencies(AH, te, ready_heap);
44694387
}
44704388

44714389

@@ -4708,11 +4626,11 @@ identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te)
47084626
/*
47094627
* Remove the specified TOC entry from the depCounts of items that depend on
47104628
* it, thereby possibly making them ready-to-run. Any pending item that
4711-
* becomes ready should be moved to the ready_list, if that's provided.
4629+
* becomes ready should be moved to the ready_heap, if that's provided.
47124630
*/
47134631
static void
47144632
reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
4715-
ParallelReadyList *ready_list)
4633+
binaryheap *ready_heap)
47164634
{
47174635
int i;
47184636

@@ -4730,18 +4648,18 @@ reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
47304648
* the current restore pass, and it is currently a member of the
47314649
* pending list (that check is needed to prevent double restore in
47324650
* some cases where a list-file forces out-of-order restoring).
4733-
* However, if ready_list == NULL then caller doesn't want any list
4651+
* However, if ready_heap == NULL then caller doesn't want any list
47344652
* memberships changed.
47354653
*/
47364654
if (otherte->depCount == 0 &&
47374655
_tocEntryRestorePass(otherte) == AH->restorePass &&
47384656
otherte->pending_prev != NULL &&
4739-
ready_list != NULL)
4657+
ready_heap != NULL)
47404658
{
47414659
/* Remove it from pending list ... */
47424660
pending_list_remove(otherte);
4743-
/* ... and add to ready_list */
4744-
ready_list_insert(ready_list, otherte);
4661+
/* ... and add to ready_heap */
4662+
binaryheap_add(ready_heap, otherte);
47454663
}
47464664
}
47474665
}

0 commit comments

Comments
 (0)