Skip to content

Commit 588120c

Browse files
committed
Add more logging for number of records fetched by each reduce
1 parent 3d24281 commit 588120c

File tree

1 file changed

+6
-1
lines changed

1 file changed

+6
-1
lines changed

core/src/main/scala/spark/SimpleShuffleFetcher.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ class SimpleShuffleFetcher extends ShuffleFetcher with Logging {
1919
}
2020
for ((serverUri, inputIds) <- Utils.randomize(splitsByUri)) {
2121
for (i <- inputIds) {
22+
var numRecords = 0
2223
try {
2324
val url = "%s/shuffle/%d/%d/%d".format(serverUri, shuffleId, i, reduceId)
2425
// TODO: multithreaded fetch
@@ -29,12 +30,16 @@ class SimpleShuffleFetcher extends ShuffleFetcher with Logging {
2930
while (true) {
3031
val pair = inputStream.readObject().asInstanceOf[(K, V)]
3132
func(pair._1, pair._2)
33+
numRecords += 1
3234
}
3335
} finally {
3436
inputStream.close()
3537
}
3638
} catch {
37-
case e: EOFException => {} // We currently assume EOF means we read the whole thing
39+
case e: EOFException => {
40+
// We currently assume EOF means we read the whole thing
41+
logInfo("Reduce %s got %s records from map %s".format(reduceId, numRecords, i))
42+
}
3843
case other: Exception => {
3944
logError("Fetch failed", other)
4045
throw new FetchFailedException(serverUri, shuffleId, i, reduceId, other)

0 commit comments

Comments
 (0)