Skip to content

Commit 79273cc

Browse files
committed
Replace not-very-bright implementation of topological sort with a better
one (use a priority heap to keep track of items ready to output, instead of searching the input array each time). This brings the runtime of pg_dump back to about what it was in 7.4.
1 parent 005a121 commit 79273cc

File tree

1 file changed

+169
-54
lines changed

1 file changed

+169
-54
lines changed

src/bin/pg_dump/pg_dump_sort.c

Lines changed: 169 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
*
1010
*
1111
* IDENTIFICATION
12-
* $PostgreSQL: pgsql/src/bin/pg_dump/pg_dump_sort.c,v 1.1 2003/12/06 03:00:16 tgl Exp $
12+
* $PostgreSQL: pgsql/src/bin/pg_dump/pg_dump_sort.c,v 1.2 2003/12/06 22:55:11 tgl Exp $
1313
*
1414
*-------------------------------------------------------------------------
1515
*/
@@ -52,6 +52,8 @@ static bool TopoSort(DumpableObject **objs,
5252
int numObjs,
5353
DumpableObject **ordering,
5454
int *nOrdering);
55+
static void addHeapElement(int val, int *heap, int heapLength);
56+
static int removeHeapElement(int *heap, int heapLength);
5557
static bool findLoop(DumpableObject *obj,
5658
int depth,
5759
DumpableObject **ordering,
@@ -122,14 +124,13 @@ sortDumpableObjects(DumpableObject **objs, int numObjs)
122124
* partial ordering.) Minimize rearrangement of the list not needed to
123125
* achieve the partial ordering.
124126
*
125-
* This is a lot simpler and slower than, for example, the topological sort
126-
* algorithm shown in Knuth's Volume 1. However, Knuth's method doesn't
127-
* try to minimize the damage to the existing order.
127+
* The input is the list of numObjs objects in objs[]. This list is not
128+
* modified.
128129
*
129130
* Returns TRUE if able to build an ordering that satisfies all the
130131
* constraints, FALSE if not (there are contradictory constraints).
131132
*
132-
* On success (TRUE result), ordering[] is filled with an array of
133+
* On success (TRUE result), ordering[] is filled with a sorted array of
133134
* DumpableObject pointers, of length equal to the input list length.
134135
*
135136
* On failure (FALSE result), ordering[] is filled with an array of
@@ -146,36 +147,60 @@ TopoSort(DumpableObject **objs,
146147
int *nOrdering) /* output argument */
147148
{
148149
DumpId maxDumpId = getMaxDumpId();
149-
bool result = true;
150-
DumpableObject **topoItems;
151-
DumpableObject *obj;
150+
int *pendingHeap;
152151
int *beforeConstraints;
152+
int *idMap;
153+
DumpableObject *obj;
154+
int heapLength;
153155
int i,
154156
j,
155-
k,
156-
last;
157+
k;
157158

158-
/* First, create work array with the dump items in their current order */
159-
topoItems = (DumpableObject **) malloc(numObjs * sizeof(DumpableObject *));
160-
if (topoItems == NULL)
161-
exit_horribly(NULL, modulename, "out of memory\n");
162-
memcpy(topoItems, objs, numObjs * sizeof(DumpableObject *));
159+
/*
160+
* This is basically the same algorithm shown for topological sorting in
161+
* Knuth's Volume 1. However, we would like to minimize unnecessary
162+
* rearrangement of the input ordering; that is, when we have a choice
163+
* of which item to output next, we always want to take the one highest
164+
* in the original list. Therefore, instead of maintaining an unordered
165+
* linked list of items-ready-to-output as Knuth does, we maintain a heap
166+
* of their item numbers, which we can use as a priority queue. This
167+
* turns the algorithm from O(N) to O(N log N) because each insertion or
168+
* removal of a heap item takes O(log N) time. However, that's still
169+
* plenty fast enough for this application.
170+
*/
163171

164172
*nOrdering = numObjs; /* for success return */
165173

174+
/* Eliminate the null case */
175+
if (numObjs <= 0)
176+
return true;
177+
178+
/* Create workspace for the above-described heap */
179+
pendingHeap = (int *) malloc(numObjs * sizeof(int));
180+
if (pendingHeap == NULL)
181+
exit_horribly(NULL, modulename, "out of memory\n");
182+
166183
/*
167-
* Scan the constraints, and for each item in the array, generate a
184+
* Scan the constraints, and for each item in the input, generate a
168185
* count of the number of constraints that say it must be before
169186
* something else. The count for the item with dumpId j is
170-
* stored in beforeConstraints[j].
187+
* stored in beforeConstraints[j]. We also make a map showing the
188+
* input-order index of the item with dumpId j.
171189
*/
172190
beforeConstraints = (int *) malloc((maxDumpId + 1) * sizeof(int));
173191
if (beforeConstraints == NULL)
174192
exit_horribly(NULL, modulename, "out of memory\n");
175193
memset(beforeConstraints, 0, (maxDumpId + 1) * sizeof(int));
194+
idMap = (int *) malloc((maxDumpId + 1) * sizeof(int));
195+
if (idMap == NULL)
196+
exit_horribly(NULL, modulename, "out of memory\n");
176197
for (i = 0; i < numObjs; i++)
177198
{
178-
obj = topoItems[i];
199+
obj = objs[i];
200+
j = obj->dumpId;
201+
if (j <= 0 || j > maxDumpId)
202+
exit_horribly(NULL, modulename, "invalid dumpId %d\n", j);
203+
idMap[j] = i;
179204
for (j = 0; j < obj->nDeps; j++)
180205
{
181206
k = obj->dependencies[j];
@@ -185,63 +210,153 @@ TopoSort(DumpableObject **objs,
185210
}
186211
}
187212

213+
/*
214+
* Now initialize the heap of items-ready-to-output by filling it with
215+
* the indexes of items that already have beforeConstraints[id] == 0.
216+
*
217+
* The essential property of a heap is heap[(j-1)/2] >= heap[j] for each
218+
* j in the range 1..heapLength-1 (note we are using 0-based subscripts
219+
* here, while the discussion in Knuth assumes 1-based subscripts).
220+
* So, if we simply enter the indexes into pendingHeap[] in decreasing
221+
* order, we a-fortiori have the heap invariant satisfied at completion
222+
* of this loop, and don't need to do any sift-up comparisons.
223+
*/
224+
heapLength = 0;
225+
for (i = numObjs; --i >= 0; )
226+
{
227+
if (beforeConstraints[objs[i]->dumpId] == 0)
228+
pendingHeap[heapLength++] = i;
229+
}
230+
188231
/*--------------------
189-
* Now scan the topoItems array backwards. At each step, output the
190-
* last item that has no remaining before-constraints, and decrease
191-
* the beforeConstraints count of each of the items it was constrained
192-
* against.
193-
* i = index of ordering[] entry we want to output this time
194-
* j = search index for topoItems[]
232+
* Now emit objects, working backwards in the output list. At each step,
233+
* we use the priority heap to select the last item that has no remaining
234+
* before-constraints. We remove that item from the heap, output it to
235+
* ordering[], and decrease the beforeConstraints count of each of the
236+
* items it was constrained against. Whenever an item's beforeConstraints
237+
* count is thereby decreased to zero, we insert it into the priority heap
238+
* to show that it is a candidate to output. We are done when the heap
239+
* becomes empty; if we have output every element then we succeeded,
240+
* otherwise we failed.
241+
* i = number of ordering[] entries left to output
242+
* j = objs[] index of item we are outputting
195243
* k = temp for scanning constraint list for item j
196-
* last = last non-null index in topoItems (avoid redundant searches)
197244
*--------------------
198245
*/
199-
last = numObjs - 1;
200-
for (i = numObjs; --i >= 0;)
246+
i = numObjs;
247+
while (heapLength > 0)
201248
{
202-
/* Find next candidate to output */
203-
while (topoItems[last] == NULL)
204-
last--;
205-
for (j = last; j >= 0; j--)
206-
{
207-
obj = topoItems[j];
208-
if (obj != NULL && beforeConstraints[obj->dumpId] == 0)
209-
break;
210-
}
211-
/* If no available candidate, topological sort fails */
212-
if (j < 0)
213-
{
214-
result = false;
215-
break;
216-
}
217-
/* Output candidate, and mark it done by zeroing topoItems[] entry */
218-
ordering[i] = obj = topoItems[j];
219-
topoItems[j] = NULL;
249+
/* Select object to output by removing largest heap member */
250+
j = removeHeapElement(pendingHeap, heapLength--);
251+
obj = objs[j];
252+
/* Output candidate to ordering[] */
253+
ordering[--i] = obj;
220254
/* Update beforeConstraints counts of its predecessors */
221255
for (k = 0; k < obj->nDeps; k++)
222-
beforeConstraints[obj->dependencies[k]]--;
256+
{
257+
int id = obj->dependencies[k];
258+
259+
if ((--beforeConstraints[id]) == 0)
260+
addHeapElement(idMap[id], pendingHeap, heapLength++);
261+
}
223262
}
224263

225264
/*
226-
* If we failed, report one of the circular constraint sets
265+
* If we failed, report one of the circular constraint sets. We do
266+
* this by scanning beforeConstraints[] to locate the items that have
267+
* not yet been output, and for each one, trying to trace a constraint
268+
* loop leading back to it. (There may be items that depend on items
269+
* involved in a loop, but aren't themselves part of the loop, so not
270+
* every nonzero beforeConstraints entry is necessarily a useful
271+
* starting point. We keep trying till we find a loop.)
227272
*/
228-
if (!result)
273+
if (i != 0)
229274
{
230-
for (j = last; j >= 0; j--)
275+
for (j = 1; j <= maxDumpId; j++)
231276
{
232-
ordering[0] = obj = topoItems[j];
233-
if (obj && findLoop(obj, 1, ordering, nOrdering))
234-
break;
277+
if (beforeConstraints[j] != 0)
278+
{
279+
ordering[0] = obj = objs[idMap[j]];
280+
if (findLoop(obj, 1, ordering, nOrdering))
281+
break;
282+
}
235283
}
236-
if (j < 0)
284+
if (j > maxDumpId)
237285
exit_horribly(NULL, modulename,
238286
"could not find dependency loop\n");
239287
}
240288

241289
/* Done */
242-
free(topoItems);
290+
free(pendingHeap);
243291
free(beforeConstraints);
292+
free(idMap);
293+
294+
return (i == 0);
295+
}
296+
297+
/*
298+
* Add an item to a heap (priority queue)
299+
*
300+
* heapLength is the current heap size; caller is responsible for increasing
301+
* its value after the call. There must be sufficient storage at *heap.
302+
*/
303+
static void
304+
addHeapElement(int val, int *heap, int heapLength)
305+
{
306+
int j;
244307

308+
/*
309+
* Sift-up the new entry, per Knuth 5.2.3 exercise 16. Note that Knuth
310+
* is using 1-based array indexes, not 0-based.
311+
*/
312+
j = heapLength;
313+
while (j > 0)
314+
{
315+
int i = (j - 1) >> 1;
316+
317+
if (val <= heap[i])
318+
break;
319+
heap[j] = heap[i];
320+
j = i;
321+
}
322+
heap[j] = val;
323+
}
324+
325+
/*
326+
* Remove the largest item present in a heap (priority queue)
327+
*
328+
* heapLength is the current heap size; caller is responsible for decreasing
329+
* its value after the call.
330+
*
331+
* We remove and return heap[0], which is always the largest element of
332+
* the heap, and then "sift up" to maintain the heap invariant.
333+
*/
334+
static int
335+
removeHeapElement(int *heap, int heapLength)
336+
{
337+
int result = heap[0];
338+
int val;
339+
int i;
340+
341+
if (--heapLength <= 0)
342+
return result;
343+
val = heap[heapLength]; /* value that must be reinserted */
344+
i = 0; /* i is where the "hole" is */
345+
for (;;)
346+
{
347+
int j = 2 * i + 1;
348+
349+
if (j >= heapLength)
350+
break;
351+
if (j + 1 < heapLength &&
352+
heap[j] < heap[j + 1])
353+
j++;
354+
if (val >= heap[j])
355+
break;
356+
heap[i] = heap[j];
357+
i = j;
358+
}
359+
heap[i] = val;
245360
return result;
246361
}
247362

0 commit comments

Comments
 (0)