Skip to content

Commit 2498f95

Browse files
committed
Fixed GrowableAccumulatorParam.zero() to do a copy and to make sure it's
empty instead of using the old initialValue
1 parent 66e5362 commit 2498f95

File tree

1 file changed

+13
-3
lines changed

1 file changed

+13
-3
lines changed

core/src/main/scala/spark/Accumulators.scala

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package spark
33
import java.io._
44

55
import scala.collection.mutable.Map
6-
import collection.generic.Growable
6+
import scala.collection.generic.Growable
77

88
class Accumulable[T,R] (
99
@transient initialValue: T,
@@ -109,16 +109,26 @@ trait AccumulableParam[R,T] extends Serializable {
109109
def zero(initialValue: R): R
110110
}
111111

112-
class GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializable, T] extends AccumulableParam[R,T] {
112+
class GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializable, T]
113+
extends AccumulableParam[R,T] {
113114
def addAccumulator(growable: R, elem: T) : R = {
114115
growable += elem
115116
growable
116117
}
118+
117119
def addInPlace(t1: R, t2: R) : R = {
118120
t1 ++= t2
119121
t1
120122
}
121-
def zero(initialValue: R) = initialValue
123+
124+
def zero(initialValue: R): R = {
125+
// We need to clone initialValue, but it's hard to specify that R should also be Cloneable.
126+
// Instead we'll serialize it to a buffer and load it back.
127+
val ser = (new spark.JavaSerializer).newInstance
128+
val copy = ser.deserialize[R](ser.serialize(initialValue))
129+
copy.clear() // In case it contained stuff
130+
copy
131+
}
122132
}
123133

124134
// TODO: The multi-thread support in accumulators is kind of lame; check

0 commit comments

Comments
 (0)