File tree Expand file tree Collapse file tree 2 files changed +24
-1
lines changed Expand file tree Collapse file tree 2 files changed +24
-1
lines changed Original file line number Diff line number Diff line change @@ -35,7 +35,16 @@ class Accumulable[T,R] (
35
35
else throw new UnsupportedOperationException (" Can't use read value in task" )
36
36
}
37
37
38
- private [spark] def localValue = value_
38
+ /**
39
+ * get the current value of this accumulator from within a task.
40
+ *
41
+ * This is NOT the global value of the accumulator. To get the global value after a
42
+ * completed operation on the dataset, call `value`.
43
+ *
44
+ * The typical use of this method is to directly mutate the local value, eg., to add
45
+ * an element to a Set.
46
+ */
47
+ def localValue = value_
39
48
40
49
def value_= (t : T ) {
41
50
if (! deserialized) value_ = t
Original file line number Diff line number Diff line change @@ -79,4 +79,18 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers {
79
79
}
80
80
}
81
81
82
+ test (" localValue readable in tasks" ) {
83
+ import SetAccum ._
84
+ val maxI = 1000
85
+ for (nThreads <- List (1 , 10 )) { // test single & multi-threaded
86
+ val sc = new SparkContext (" local[" + nThreads + " ]" , " test" )
87
+ val acc : Accumulable [mutable.Set [Any ], Any ] = sc.accumulable(new mutable.HashSet [Any ]())
88
+ val d = sc.parallelize(1 to maxI)
89
+ d.foreach {
90
+ x => acc.localValue += x
91
+ }
92
+ acc.value should be ( (1 to maxI).toSet)
93
+ }
94
+ }
95
+
82
96
}
You can’t perform that action at this time.
0 commit comments