Skip to content

Commit d77e73c

Browse files
committed
Merge pull request apache#609 from pwendell/SPARK-738-0.7
SPARK-738 (branch 0.7 backport) Fixing nonserializable exception bug
2 parents f3891f1 + 9b77a6f commit d77e73c

File tree

5 files changed

+42
-10
lines changed

5 files changed

+42
-10
lines changed

core/src/main/scala/spark/TaskEndReason.scala

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,17 @@ private[spark] case object Success extends TaskEndReason
1414
private[spark]
1515
case object Resubmitted extends TaskEndReason // Task was finished earlier but we've now lost it
1616

17-
private[spark]
18-
case class FetchFailed(bmAddress: BlockManagerId, shuffleId: Int, mapId: Int, reduceId: Int) extends TaskEndReason
17+
private[spark] case class FetchFailed(
18+
bmAddress: BlockManagerId,
19+
shuffleId: Int,
20+
mapId: Int,
21+
reduceId: Int)
22+
extends TaskEndReason
1923

20-
private[spark] case class ExceptionFailure(exception: Throwable) extends TaskEndReason
24+
private[spark] case class ExceptionFailure(
25+
className: String,
26+
description: String,
27+
stackTrace: Array[StackTraceElement])
28+
extends TaskEndReason
2129

2230
private[spark] case class OtherFailure(message: String) extends TaskEndReason

core/src/main/scala/spark/executor/Executor.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert
117117
}
118118

119119
case t: Throwable => {
120-
val reason = ExceptionFailure(t)
120+
val reason = ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace)
121121
context.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
122122

123123
// TODO: Should we exit the whole executor here? On the one hand, the failed task may

core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -298,7 +298,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
298298
return
299299

300300
case ef: ExceptionFailure =>
301-
val key = ef.exception.toString
301+
val key = ef.description
302302
val now = System.currentTimeMillis
303303
val (printFull, dupCount) = {
304304
if (recentExceptions.contains(key)) {
@@ -316,10 +316,11 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
316316
}
317317
}
318318
if (printFull) {
319-
val locs = ef.exception.getStackTrace.map(loc => "\tat %s".format(loc.toString))
320-
logInfo("Loss was due to %s\n%s".format(ef.exception.toString, locs.mkString("\n")))
319+
val locs = ef.stackTrace.map(loc => "\tat %s".format(loc.toString))
320+
logInfo("Loss was due to %s\n%s\n%s".format(
321+
ef.className, ef.description, locs.mkString("\n")))
321322
} else {
322-
logInfo("Loss was due to %s [duplicate %d]".format(ef.exception.toString, dupCount))
323+
logInfo("Loss was due to %s [duplicate %d]".format(ef.description, dupCount))
323324
}
324325

325326
case _ => {}

core/src/main/scala/spark/scheduler/local/LocalScheduler.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,8 +101,10 @@ private[spark] class LocalScheduler(threads: Int, maxFailures: Int, sc: SparkCon
101101
submitTask(task, idInJob)
102102
} else {
103103
// TODO: Do something nicer here to return all the way to the user
104-
if (!Thread.currentThread().isInterrupted)
105-
listener.taskEnded(task, new ExceptionFailure(t), null, null, info, null)
104+
if (!Thread.currentThread().isInterrupted) {
105+
val failure = new ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace)
106+
listener.taskEnded(task, failure, null, null, info, null)
107+
}
106108
}
107109
}
108110
}

core/src/test/scala/spark/DistributedSuite.scala

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@ import scala.collection.mutable.ArrayBuffer
1616
import SparkContext._
1717
import storage.{GetBlock, BlockManagerWorker, StorageLevel}
1818

19+
class NotSerializableClass
20+
class NotSerializableExn(val notSer: NotSerializableClass) extends Throwable() {}
21+
1922
class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter with LocalSparkContext {
2023

2124
val clusterUrl = "local-cluster[2,1,512]"
@@ -25,6 +28,24 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
2528
System.clearProperty("spark.storage.memoryFraction")
2629
}
2730

31+
test("task throws not serializable exception") {
32+
// Ensures that executors do not crash when an exn is not serializable. If executors crash,
33+
// this test will hang. Correct behavior is that executors don't crash but fail tasks
34+
// and the scheduler throws a SparkException.
35+
36+
// numSlaves must be less than numPartitions
37+
val numSlaves = 3
38+
val numPartitions = 10
39+
40+
sc = new SparkContext("local-cluster[%s,1,512]".format(numSlaves), "test")
41+
val data = sc.parallelize(1 to 100, numPartitions).
42+
map(x => throw new NotSerializableExn(new NotSerializableClass))
43+
intercept[SparkException] {
44+
data.count()
45+
}
46+
resetSparkContext()
47+
}
48+
2849
test("local-cluster format") {
2950
sc = new SparkContext("local-cluster[2,1,512]", "test")
3051
assert(sc.parallelize(1 to 2, 2).count() == 2)

0 commit comments

Comments
 (0)