File tree Expand file tree Collapse file tree 2 files changed +25
-1
lines changed Expand file tree Collapse file tree 2 files changed +25
-1
lines changed Original file line number Diff line number Diff line change @@ -35,7 +35,16 @@ class Accumulable[T,R] (
35
35
else throw new UnsupportedOperationException (" Can't use read value in task" )
36
36
}
37
37
38
- private [spark] def localValue = value_
38
+ /**
39
+ * Get the current value of this accumulator from within a task.
40
+ *
41
+ * This is NOT the global value of the accumulator. To get the global value after a
42
+ * completed operation on the dataset, call `value`.
43
+ *
44
+ * The typical use of this method is to directly mutate the local value, eg., to add
45
+ * an element to a Set.
46
+ */
47
+ def localValue = value_
39
48
40
49
def value_= (t : T ) {
41
50
if (! deserialized) value_ = t
Original file line number Diff line number Diff line change @@ -79,4 +79,19 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers {
79
79
}
80
80
}
81
81
82
+ test (" localValue readable in tasks" ) {
83
+ import SetAccum ._
84
+ val maxI = 1000
85
+ for (nThreads <- List (1 , 10 )) { // test single & multi-threaded
86
+ val sc = new SparkContext (" local[" + nThreads + " ]" , " test" )
87
+ val acc : Accumulable [mutable.Set [Any ], Any ] = sc.accumulable(new mutable.HashSet [Any ]())
88
+ val groupedInts = (1 to (maxI/ 20 )).map {x => (20 * (x - 1 ) to 20 * x).toSet}
89
+ val d = sc.parallelize(groupedInts)
90
+ d.foreach {
91
+ x => acc.localValue ++= x
92
+ }
93
+ acc.value should be ( (0 to maxI).toSet)
94
+ }
95
+ }
96
+
82
97
}
You can’t perform that action at this time.
0 commit comments