Skip to content

Commit 823878c

Browse files
author
Imran Rashid
committed
add accumulators for mutable collections, with correct typing!
1 parent 680df96 commit 823878c

File tree

3 files changed

+46
-5
lines changed

3 files changed

+46
-5
lines changed

core/src/main/scala/spark/Accumulators.scala

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package spark
33
import java.io._
44

55
import scala.collection.mutable.Map
6+
import collection.generic.Growable
67

78
class Accumulable[T,R] (
89
@transient initialValue: T,
@@ -99,6 +100,18 @@ trait AccumulableParam[R,T] extends Serializable {
99100
def zero(initialValue: R): R
100101
}
101102

103+
class GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializable, T] extends AccumulableParam[R,T] {
104+
def addAccumulator(growable: R, elem: T) : R = {
105+
growable += elem
106+
growable
107+
}
108+
def addInPlace(t1: R, t2: R) : R = {
109+
t1 ++= t2
110+
t1
111+
}
112+
def zero(initialValue: R) = initialValue
113+
}
114+
102115
// TODO: The multi-thread support in accumulators is kind of lame; check
103116
// if there's a more intuitive way of doing it right
104117
private object Accumulators {
@@ -143,4 +156,4 @@ private object Accumulators {
143156
}
144157
}
145158
}
146-
}
159+
}

core/src/main/scala/spark/SparkContext.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import java.io._
44
import java.util.concurrent.atomic.AtomicInteger
55

66
import scala.actors.remote.RemoteActor
7-
import scala.collection.mutable.ArrayBuffer
87

98
import org.apache.hadoop.fs.Path
109
import org.apache.hadoop.conf.Configuration
@@ -31,6 +30,7 @@ import org.apache.hadoop.mapreduce.{Job => NewHadoopJob}
3130
import org.apache.mesos.MesosNativeLibrary
3231

3332
import spark.broadcast._
33+
import collection.generic.Growable
3434

3535
class SparkContext(
3636
master: String,
@@ -253,6 +253,10 @@ class SparkContext(
253253
def accumulable[T,R](initialValue: T)(implicit param: AccumulableParam[T,R]) =
254254
new Accumulable(initialValue, param)
255255

256+
def accumlableCollection[R <% Growable[T] with TraversableOnce[T] with Serializable, T](initialValue: R) = {
257+
val param = new GrowableAccumulableParam[R,T]
258+
new Accumulable(initialValue, param)
259+
}
256260

257261
// Keep around a weak hash map of values to Cached versions?
258262
def broadcast[T](value: T) = Broadcast.getBroadcastFactory.newBroadcast[T] (value, isLocal)

core/src/test/scala/spark/AccumulatorSuite.scala

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,6 @@ package spark
33
import org.scalatest.FunSuite
44
import org.scalatest.matchers.ShouldMatchers
55
import collection.mutable
6-
import java.util.Random
7-
import scala.math.exp
8-
import scala.math.signum
96
import spark.SparkContext._
107

118
class AccumulatorSuite extends FunSuite with ShouldMatchers {
@@ -79,4 +76,31 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers {
7976
}
8077
}
8178

79+
test ("collection accumulators") {
80+
val maxI = 1000
81+
for (nThreads <- List(1, 10)) {
82+
//test single & multi-threaded
83+
val sc = new SparkContext("local[" + nThreads + "]", "test")
84+
val setAcc = sc.accumlableCollection(mutable.HashSet[Int]())
85+
val bufferAcc = sc.accumlableCollection(mutable.ArrayBuffer[Int]())
86+
val mapAcc = sc.accumlableCollection(mutable.HashMap[Int,String]())
87+
val d = sc.parallelize( (1 to maxI) ++ (1 to maxI))
88+
d.foreach {
89+
x => {setAcc += x; bufferAcc += x; mapAcc += (x -> x.toString)}
90+
}
91+
92+
//NOTE that this is typed correctly -- no casts necessary
93+
setAcc.value.size should be (maxI)
94+
bufferAcc.value.size should be (2 * maxI)
95+
mapAcc.value.size should be (maxI)
96+
for (i <- 1 to maxI) {
97+
setAcc.value should contain(i)
98+
bufferAcc.value should contain(i)
99+
mapAcc.value should contain (i -> i.toString)
100+
}
101+
sc.stop()
102+
}
103+
104+
}
105+
82106
}

0 commit comments

Comments
 (0)