Skip to content

Commit 79c82b6

Browse files
committed
Merge pull request apache#173 from squito/accum_localValue
make accumulator.localValue public, add tests
2 parents 680df96 + 4d2efe9 commit 79c82b6

File tree

2 files changed

+25
-1
lines changed

2 files changed

+25
-1
lines changed

core/src/main/scala/spark/Accumulators.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,16 @@ class Accumulable[T,R] (
3535
else throw new UnsupportedOperationException("Can't use read value in task")
3636
}
3737

38-
private[spark] def localValue = value_
38+
/**
39+
* Get the current value of this accumulator from within a task.
40+
*
41+
* This is NOT the global value of the accumulator. To get the global value after a
42+
* completed operation on the dataset, call `value`.
43+
*
44+
* The typical use of this method is to directly mutate the local value, eg., to add
45+
* an element to a Set.
46+
*/
47+
def localValue = value_
3948

4049
def value_= (t: T) {
4150
if (!deserialized) value_ = t

core/src/test/scala/spark/AccumulatorSuite.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,4 +79,19 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers {
7979
}
8080
}
8181

82+
test ("localValue readable in tasks") {
83+
import SetAccum._
84+
val maxI = 1000
85+
for (nThreads <- List(1, 10)) { //test single & multi-threaded
86+
val sc = new SparkContext("local[" + nThreads + "]", "test")
87+
val acc: Accumulable[mutable.Set[Any], Any] = sc.accumulable(new mutable.HashSet[Any]())
88+
val groupedInts = (1 to (maxI/20)).map {x => (20 * (x - 1) to 20 * x).toSet}
89+
val d = sc.parallelize(groupedInts)
90+
d.foreach {
91+
x => acc.localValue ++= x
92+
}
93+
acc.value should be ( (0 to maxI).toSet)
94+
}
95+
}
96+
8297
}

0 commit comments

Comments
 (0)