Skip to content

Commit 2f03fc1

Browse files
ueshinsrowen
authored andcommitted
[SPARK-1210] Prevent ContextClassLoader of Actor from becoming ClassLoader of Executo...
...r. Constructor of `org.apache.spark.executor.Executor` should not set context class loader of current thread, which is backend Actor's thread. Run the following code in local-mode REPL. ``` scala> case class Foo(i: Int) scala> val ret = sc.parallelize((1 to 100).map(Foo), 10).collect ``` This causes errors as follows: ``` ERROR actor.OneForOneStrategy: [L$line5.$read$$iwC$$iwC$$iwC$$iwC$Foo; java.lang.ArrayStoreException: [L$line5.$read$$iwC$$iwC$$iwC$$iwC$Foo; at scala.runtime.ScalaRunTime$.array_update(ScalaRunTime.scala:88) at org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:870) at org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:870) at org.apache.spark.scheduler.JobWaiter.taskSucceeded(JobWaiter.scala:56) at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:859) at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:616) at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) ``` This is because the class loaders to deserialize result `Foo` instances might be different from backend Actor's, and the Actor's class loader should be the same as Driver's. Author: Takuya UESHIN <ueshin@happy-camper.st> Closes apache#15 from ueshin/wip/wrongcontextclassloader and squashes the following commits: d79e8c0 [Takuya UESHIN] Change a parent class loader of ExecutorURLClassLoader. c6c09b6 [Takuya UESHIN] Add a test to collect objects of class defined in repl. 43e0feb [Takuya UESHIN] Prevent ContextClassLoader of Actor from becoming ClassLoader of Executor.
1 parent 6665df6 commit 2f03fc1

File tree

2 files changed

+12
-1
lines changed

2 files changed

+12
-1
lines changed

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -288,7 +288,7 @@ private[spark] class Executor(
288288
* created by the interpreter to the search path
289289
*/
290290
private def createClassLoader(): ExecutorURLClassLoader = {
291-
val loader = this.getClass.getClassLoader
291+
val loader = Thread.currentThread().getContextClassLoader
292292

293293
// For each of the jars in the jarSet, add them to the class loader.
294294
// We assume each of the files has already been fetched.

repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,4 +242,15 @@ class ReplSuite extends FunSuite {
242242
assertContains("res4: Array[Int] = Array(0, 0, 0, 0, 0)", output)
243243
}
244244
}
245+
246+
test("collecting objects of class defined in repl") {
247+
val output = runInterpreter("local[2]",
248+
"""
249+
|case class Foo(i: Int)
250+
|val ret = sc.parallelize((1 to 100).map(Foo), 10).collect
251+
""".stripMargin)
252+
assertDoesNotContain("error:", output)
253+
assertDoesNotContain("Exception", output)
254+
assertContains("ret: Array[Foo] = Array(Foo(1),", output)
255+
}
245256
}

0 commit comments

Comments
 (0)