@@ -3,7 +3,7 @@ package spark
3
3
import java .io ._
4
4
5
5
import scala .collection .mutable .Map
6
- import collection .generic .Growable
6
+ import scala . collection .generic .Growable
7
7
8
8
class Accumulable [T ,R ] (
9
9
@ transient initialValue : T ,
@@ -109,16 +109,26 @@ trait AccumulableParam[R,T] extends Serializable {
109
109
def zero (initialValue : R ): R
110
110
}
111
111
112
- class GrowableAccumulableParam [R <% Growable [T ] with TraversableOnce [T ] with Serializable , T ] extends AccumulableParam [R ,T ] {
112
+ class GrowableAccumulableParam [R <% Growable [T ] with TraversableOnce [T ] with Serializable , T ]
113
+ extends AccumulableParam [R ,T ] {
113
114
def addAccumulator (growable : R , elem : T ) : R = {
114
115
growable += elem
115
116
growable
116
117
}
118
+
117
119
def addInPlace (t1 : R , t2 : R ) : R = {
118
120
t1 ++= t2
119
121
t1
120
122
}
121
- def zero (initialValue : R ) = initialValue
123
+
124
+ def zero (initialValue : R ): R = {
125
+ // We need to clone initialValue, but it's hard to specify that R should also be Cloneable.
126
+ // Instead we'll serialize it to a buffer and load it back.
127
+ val ser = (new spark.JavaSerializer ).newInstance
128
+ val copy = ser.deserialize[R ](ser.serialize(initialValue))
129
+ copy.clear() // In case it contained stuff
130
+ copy
131
+ }
122
132
}
123
133
124
134
// TODO: The multi-thread support in accumulators is kind of lame; check
0 commit comments