Skip to content

Commit 51f677e

Browse files
committed
SPARK-2043: ExternalAppendOnlyMap doesn't always find matching keys
The current implementation reads one key with the next hash code as it finishes reading the keys with the current hash code, which may cause it to miss some matches of the next key. This can cause operations like join to give the wrong result when reduce tasks spill to disk and there are hash collisions, as values won't be matched together. This PR fixes it by not reading in that next key, using a peeking iterator instead. Author: Matei Zaharia <matei@databricks.com> Closes apache#986 from mateiz/spark-2043 and squashes the following commits: 0959514 [Matei Zaharia] Added unit test for having many hash collisions 892debb [Matei Zaharia] SPARK-2043: don't read a key with the next hash code in ExternalAppendOnlyMap, instead use a buffered iterator to only read values with the current hash code. (cherry picked from commit b45c13e) Signed-off-by: Matei Zaharia <matei@databricks.com>
1 parent 6634a34 commit 51f677e

File tree

2 files changed

+44
-5
lines changed

2 files changed

+44
-5
lines changed

core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.util.collection
2020
import java.io._
2121
import java.util.Comparator
2222

23+
import scala.collection.BufferedIterator
2324
import scala.collection.mutable
2425
import scala.collection.mutable.ArrayBuffer
2526

@@ -230,7 +231,7 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
230231
// Input streams are derived both from the in-memory map and spilled maps on disk
231232
// The in-memory map is sorted in place, while the spilled maps are already in sorted order
232233
private val sortedMap = currentMap.destructiveSortedIterator(comparator)
233-
private val inputStreams = Seq(sortedMap) ++ spilledMaps
234+
private val inputStreams = (Seq(sortedMap) ++ spilledMaps).map(it => it.buffered)
234235

235236
inputStreams.foreach { it =>
236237
val kcPairs = getMorePairs(it)
@@ -245,13 +246,13 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
245246
* In the event of key hash collisions, this ensures no pairs are hidden from being merged.
246247
* Assume the given iterator is in sorted order.
247248
*/
248-
private def getMorePairs(it: Iterator[(K, C)]): ArrayBuffer[(K, C)] = {
249+
private def getMorePairs(it: BufferedIterator[(K, C)]): ArrayBuffer[(K, C)] = {
249250
val kcPairs = new ArrayBuffer[(K, C)]
250251
if (it.hasNext) {
251252
var kc = it.next()
252253
kcPairs += kc
253254
val minHash = kc._1.hashCode()
254-
while (it.hasNext && kc._1.hashCode() == minHash) {
255+
while (it.hasNext && it.head._1.hashCode() == minHash) {
255256
kc = it.next()
256257
kcPairs += kc
257258
}
@@ -324,7 +325,8 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
324325
*
325326
* StreamBuffers are ordered by the minimum key hash found across all of their own pairs.
326327
*/
327-
private case class StreamBuffer(iterator: Iterator[(K, C)], pairs: ArrayBuffer[(K, C)])
328+
private class StreamBuffer(
329+
val iterator: BufferedIterator[(K, C)], val pairs: ArrayBuffer[(K, C)])
328330
extends Comparable[StreamBuffer] {
329331

330332
def isEmpty = pairs.length == 0

core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,11 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
277277
("pomatoes", "eructation") // 568647356
278278
)
279279

280+
collisionPairs.foreach { case (w1, w2) =>
281+
// String.hashCode is documented to use a specific algorithm, but check just in case
282+
assert(w1.hashCode === w2.hashCode)
283+
}
284+
280285
(1 to 100000).map(_.toString).foreach { i => map.insert(i, i) }
281286
collisionPairs.foreach { case (w1, w2) =>
282287
map.insert(w1, w2)
@@ -296,7 +301,32 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
296301
assert(kv._2.equals(expectedValue))
297302
count += 1
298303
}
299-
assert(count == 100000 + collisionPairs.size * 2)
304+
assert(count === 100000 + collisionPairs.size * 2)
305+
}
306+
307+
test("spilling with many hash collisions") {
308+
val conf = new SparkConf(true)
309+
conf.set("spark.shuffle.memoryFraction", "0.0001")
310+
sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
311+
312+
val map = new ExternalAppendOnlyMap[FixedHashObject, Int, Int](_ => 1, _ + _, _ + _)
313+
314+
// Insert 10 copies each of lots of objects whose hash codes are either 0 or 1. This causes
315+
// problems if the map fails to group together the objects with the same code (SPARK-2043).
316+
for (i <- 1 to 10) {
317+
for (j <- 1 to 10000) {
318+
map.insert(FixedHashObject(j, j % 2), 1)
319+
}
320+
}
321+
322+
val it = map.iterator
323+
var count = 0
324+
while (it.hasNext) {
325+
val kv = it.next()
326+
assert(kv._2 === 10)
327+
count += 1
328+
}
329+
assert(count === 10000)
300330
}
301331

302332
test("spilling with hash collisions using the Int.MaxValue key") {
@@ -317,3 +347,10 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
317347
}
318348
}
319349
}
350+
351+
/**
352+
* A dummy class that always returns the same hash code, to easily test hash collisions
353+
*/
354+
case class FixedHashObject(val v: Int, val h: Int) extends Serializable {
355+
override def hashCode(): Int = h
356+
}

0 commit comments

Comments
 (0)