Skip to content

Commit 942c057

Browse files
zsxwingtdas
authored andcommitted
[SPARK-12429][STREAMING][DOC] Add Accumulator and Broadcast example for Streaming
This PR adds Scala, Java and Python examples to show how to use Accumulator and Broadcast in Spark Streaming to support checkpointing. Author: Shixiong Zhu <shixiong@databricks.com> Closes apache#10385 from zsxwing/accumulator-broadcast-example. (cherry picked from commit 20591af) Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
1 parent 94fb5e8 commit 942c057

File tree

5 files changed

+325
-13
lines changed

5 files changed

+325
-13
lines changed

docs/programming-guide.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -806,7 +806,7 @@ However, in `cluster` mode, what happens is more complicated, and the above may
806806

807807
What is happening here is that the variables within the closure sent to each executor are now copies and thus, when **counter** is referenced within the `foreach` function, it's no longer the **counter** on the driver node. There is still a **counter** in the memory of the driver node but this is no longer visible to the executors! The executors only see the copy from the serialized closure. Thus, the final value of **counter** will still be zero since all operations on **counter** were referencing the value within the serialized closure.
808808

809-
To ensure well-defined behavior in these sorts of scenarios one should use an [`Accumulator`](#AccumLink). Accumulators in Spark are used specifically to provide a mechanism for safely updating a variable when execution is split up across worker nodes in a cluster. The Accumulators section of this guide discusses these in more detail.
809+
To ensure well-defined behavior in these sorts of scenarios one should use an [`Accumulator`](#accumulators). Accumulators in Spark are used specifically to provide a mechanism for safely updating a variable when execution is split up across worker nodes in a cluster. The Accumulators section of this guide discusses these in more detail.
810810

811811
In general, closures - constructs like loops or locally defined methods, should not be used to mutate some global state. Spark does not define or guarantee the behavior of mutations to objects referenced from outside of closures. Some code that does this may work in local mode, but that's just by accident and such code will not behave as expected in distributed mode. Use an Accumulator instead if some global aggregation is needed.
812812

@@ -1091,7 +1091,7 @@ for details.
10911091
</tr>
10921092
<tr>
10931093
<td> <b>foreach</b>(<i>func</i>) </td>
1094-
<td> Run a function <i>func</i> on each element of the dataset. This is usually done for side effects such as updating an <a href="#AccumLink">Accumulator</a> or interacting with external storage systems.
1094+
<td> Run a function <i>func</i> on each element of the dataset. This is usually done for side effects such as updating an <a href="#accumulators">Accumulator</a> or interacting with external storage systems.
10951095
<br /><b>Note</b>: modifying variables other than Accumulators outside of the <code>foreach()</code> may result in undefined behavior. See <a href="#ClosuresLink">Understanding closures </a> for more details.</td>
10961096
</tr>
10971097
</table>
@@ -1336,7 +1336,7 @@ run on the cluster so that `v` is not shipped to the nodes more than once. In ad
13361336
`v` should not be modified after it is broadcast in order to ensure that all nodes get the same
13371337
value of the broadcast variable (e.g. if the variable is shipped to a new node later).
13381338

1339-
## Accumulators <a name="AccumLink"></a>
1339+
## Accumulators
13401340

13411341
Accumulators are variables that are only "added" to through an associative operation and can
13421342
therefore be efficiently supported in parallel. They can be used to implement counters (as in

docs/streaming-programming-guide.md

Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1415,6 +1415,171 @@ Note that the connections in the pool should be lazily created on demand and tim
14151415

14161416
***
14171417

1418+
## Accumulators and Broadcast Variables
1419+
1420+
[Accumulators](programming-guide.html#accumulators) and [Broadcast variables](programming-guide.html#broadcast-variables) cannot be recovered from checkpoint in Spark Streaming. If you enable checkpointing and use [Accumulators](programming-guide.html#accumulators) or [Broadcast variables](programming-guide.html#broadcast-variables) as well, you'll have to create lazily instantiated singleton instances for [Accumulators](programming-guide.html#accumulators) and [Broadcast variables](programming-guide.html#broadcast-variables) so that they can be re-instantiated after the driver restarts on failure. This is shown in the following example.
1421+
1422+
<div class="codetabs">
1423+
<div data-lang="scala" markdown="1">
1424+
{% highlight scala %}
1425+
1426+
object WordBlacklist {
1427+
1428+
@volatile private var instance: Broadcast[Seq[String]] = null
1429+
1430+
def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {
1431+
if (instance == null) {
1432+
synchronized {
1433+
if (instance == null) {
1434+
val wordBlacklist = Seq("a", "b", "c")
1435+
instance = sc.broadcast(wordBlacklist)
1436+
}
1437+
}
1438+
}
1439+
instance
1440+
}
1441+
}
1442+
1443+
object DroppedWordsCounter {
1444+
1445+
@volatile private var instance: Accumulator[Long] = null
1446+
1447+
def getInstance(sc: SparkContext): Accumulator[Long] = {
1448+
if (instance == null) {
1449+
synchronized {
1450+
if (instance == null) {
1451+
instance = sc.accumulator(0L, "WordsInBlacklistCounter")
1452+
}
1453+
}
1454+
}
1455+
instance
1456+
}
1457+
}
1458+
1459+
wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => {
1460+
// Get or register the blacklist Broadcast
1461+
val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
1462+
// Get or register the droppedWordsCounter Accumulator
1463+
val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)
1464+
// Use blacklist to drop words and use droppedWordsCounter to count them
1465+
val counts = rdd.filter { case (word, count) =>
1466+
if (blacklist.value.contains(word)) {
1467+
droppedWordsCounter += count
1468+
false
1469+
} else {
1470+
true
1471+
}
1472+
}.collect()
1473+
val output = "Counts at time " + time + " " + counts
1474+
})
1475+
1476+
{% endhighlight %}
1477+
1478+
See the full [source code]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala).
1479+
</div>
1480+
<div data-lang="java" markdown="1">
1481+
{% highlight java %}
1482+
1483+
class JavaWordBlacklist {
1484+
1485+
private static volatile Broadcast<List<String>> instance = null;
1486+
1487+
public static Broadcast<List<String>> getInstance(JavaSparkContext jsc) {
1488+
if (instance == null) {
1489+
synchronized (JavaWordBlacklist.class) {
1490+
if (instance == null) {
1491+
List<String> wordBlacklist = Arrays.asList("a", "b", "c");
1492+
instance = jsc.broadcast(wordBlacklist);
1493+
}
1494+
}
1495+
}
1496+
return instance;
1497+
}
1498+
}
1499+
1500+
class JavaDroppedWordsCounter {
1501+
1502+
private static volatile Accumulator<Integer> instance = null;
1503+
1504+
public static Accumulator<Integer> getInstance(JavaSparkContext jsc) {
1505+
if (instance == null) {
1506+
synchronized (JavaDroppedWordsCounter.class) {
1507+
if (instance == null) {
1508+
instance = jsc.accumulator(0, "WordsInBlacklistCounter");
1509+
}
1510+
}
1511+
}
1512+
return instance;
1513+
}
1514+
}
1515+
1516+
wordCounts.foreachRDD(new Function2<JavaPairRDD<String, Integer>, Time, Void>() {
1517+
@Override
1518+
public Void call(JavaPairRDD<String, Integer> rdd, Time time) throws IOException {
1519+
// Get or register the blacklist Broadcast
1520+
final Broadcast<List<String>> blacklist = JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context()));
1521+
// Get or register the droppedWordsCounter Accumulator
1522+
final Accumulator<Integer> droppedWordsCounter = JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context()));
1523+
// Use blacklist to drop words and use droppedWordsCounter to count them
1524+
String counts = rdd.filter(new Function<Tuple2<String, Integer>, Boolean>() {
1525+
@Override
1526+
public Boolean call(Tuple2<String, Integer> wordCount) throws Exception {
1527+
if (blacklist.value().contains(wordCount._1())) {
1528+
droppedWordsCounter.add(wordCount._2());
1529+
return false;
1530+
} else {
1531+
return true;
1532+
}
1533+
}
1534+
}).collect().toString();
1535+
String output = "Counts at time " + time + " " + counts;
1536+
}
1537+
}
1538+
1539+
{% endhighlight %}
1540+
1541+
See the full [source code]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java).
1542+
</div>
1543+
<div data-lang="python" markdown="1">
1544+
{% highlight python %}
1545+
1546+
def getWordBlacklist(sparkContext):
1547+
if ('wordBlacklist' not in globals()):
1548+
globals()['wordBlacklist'] = sparkContext.broadcast(["a", "b", "c"])
1549+
return globals()['wordBlacklist']
1550+
1551+
def getDroppedWordsCounter(sparkContext):
1552+
if ('droppedWordsCounter' not in globals()):
1553+
globals()['droppedWordsCounter'] = sparkContext.accumulator(0)
1554+
return globals()['droppedWordsCounter']
1555+
1556+
def echo(time, rdd):
1557+
# Get or register the blacklist Broadcast
1558+
blacklist = getWordBlacklist(rdd.context)
1559+
# Get or register the droppedWordsCounter Accumulator
1560+
droppedWordsCounter = getDroppedWordsCounter(rdd.context)
1561+
1562+
# Use blacklist to drop words and use droppedWordsCounter to count them
1563+
def filterFunc(wordCount):
1564+
if wordCount[0] in blacklist.value:
1565+
droppedWordsCounter.add(wordCount[1])
1566+
False
1567+
else:
1568+
True
1569+
1570+
counts = "Counts at time %s %s" % (time, rdd.filter(filterFunc).collect())
1571+
1572+
wordCounts.foreachRDD(echo)
1573+
1574+
{% endhighlight %}
1575+
1576+
See the full [source code]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/python/streaming/recoverable_network_wordcount.py).
1577+
1578+
</div>
1579+
</div>
1580+
1581+
***
1582+
14181583
## DataFrame and SQL Operations
14191584
You can easily use [DataFrames and SQL](sql-programming-guide.html) operations on streaming data. You have to create a SQLContext using the SparkContext that the StreamingContext is using. Furthermore this has to done such that it can be restarted on driver failures. This is done by creating a lazily instantiated singleton instance of SQLContext. This is shown in the following example. It modifies the earlier [word count example](#a-quick-example) to generate word counts using DataFrames and SQL. Each RDD is converted to a DataFrame, registered as a temporary table and then queried using SQL.
14201585

examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java

Lines changed: 67 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,22 @@
2121
import java.io.IOException;
2222
import java.nio.charset.Charset;
2323
import java.util.Arrays;
24+
import java.util.List;
2425
import java.util.regex.Pattern;
2526

2627
import scala.Tuple2;
2728
import com.google.common.collect.Lists;
2829
import com.google.common.io.Files;
2930

31+
import org.apache.spark.Accumulator;
3032
import org.apache.spark.SparkConf;
3133
import org.apache.spark.api.java.JavaPairRDD;
34+
import org.apache.spark.api.java.JavaSparkContext;
3235
import org.apache.spark.api.java.function.FlatMapFunction;
36+
import org.apache.spark.api.java.function.Function;
3337
import org.apache.spark.api.java.function.Function2;
3438
import org.apache.spark.api.java.function.PairFunction;
39+
import org.apache.spark.broadcast.Broadcast;
3540
import org.apache.spark.streaming.Durations;
3641
import org.apache.spark.streaming.Time;
3742
import org.apache.spark.streaming.api.java.JavaDStream;
@@ -41,7 +46,48 @@
4146
import org.apache.spark.streaming.api.java.JavaStreamingContextFactory;
4247

4348
/**
44-
* Counts words in text encoded with UTF8 received from the network every second.
49+
* Use this singleton to get or register a Broadcast variable.
50+
*/
51+
class JavaWordBlacklist {
52+
53+
private static volatile Broadcast<List<String>> instance = null;
54+
55+
public static Broadcast<List<String>> getInstance(JavaSparkContext jsc) {
56+
if (instance == null) {
57+
synchronized (JavaWordBlacklist.class) {
58+
if (instance == null) {
59+
List<String> wordBlacklist = Arrays.asList("a", "b", "c");
60+
instance = jsc.broadcast(wordBlacklist);
61+
}
62+
}
63+
}
64+
return instance;
65+
}
66+
}
67+
68+
/**
69+
* Use this singleton to get or register an Accumulator.
70+
*/
71+
class JavaDroppedWordsCounter {
72+
73+
private static volatile Accumulator<Integer> instance = null;
74+
75+
public static Accumulator<Integer> getInstance(JavaSparkContext jsc) {
76+
if (instance == null) {
77+
synchronized (JavaDroppedWordsCounter.class) {
78+
if (instance == null) {
79+
instance = jsc.accumulator(0, "WordsInBlacklistCounter");
80+
}
81+
}
82+
}
83+
return instance;
84+
}
85+
}
86+
87+
/**
88+
* Counts words in text encoded with UTF8 received from the network every second. This example also
89+
* shows how to use lazily instantiated singleton instances for Accumulator and Broadcast so that
90+
* they can be registered on driver failures.
4591
*
4692
* Usage: JavaRecoverableNetworkWordCount <hostname> <port> <checkpoint-directory> <output-file>
4793
* <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive
@@ -111,10 +157,27 @@ public Integer call(Integer i1, Integer i2) {
111157
wordCounts.foreachRDD(new Function2<JavaPairRDD<String, Integer>, Time, Void>() {
112158
@Override
113159
public Void call(JavaPairRDD<String, Integer> rdd, Time time) throws IOException {
114-
String counts = "Counts at time " + time + " " + rdd.collect();
115-
System.out.println(counts);
160+
// Get or register the blacklist Broadcast
161+
final Broadcast<List<String>> blacklist = JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context()));
162+
// Get or register the droppedWordsCounter Accumulator
163+
final Accumulator<Integer> droppedWordsCounter = JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context()));
164+
// Use blacklist to drop words and use droppedWordsCounter to count them
165+
String counts = rdd.filter(new Function<Tuple2<String, Integer>, Boolean>() {
166+
@Override
167+
public Boolean call(Tuple2<String, Integer> wordCount) throws Exception {
168+
if (blacklist.value().contains(wordCount._1())) {
169+
droppedWordsCounter.add(wordCount._2());
170+
return false;
171+
} else {
172+
return true;
173+
}
174+
}
175+
}).collect().toString();
176+
String output = "Counts at time " + time + " " + counts;
177+
System.out.println(output);
178+
System.out.println("Dropped " + droppedWordsCounter.value() + " word(s) totally");
116179
System.out.println("Appending to " + outputFile.getAbsolutePath());
117-
Files.append(counts + "\n", outputFile, Charset.defaultCharset());
180+
Files.append(output + "\n", outputFile, Charset.defaultCharset());
118181
return null;
119182
}
120183
});

examples/src/main/python/streaming/recoverable_network_wordcount.py

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,20 @@
4444
from pyspark.streaming import StreamingContext
4545

4646

47+
# Get or register a Broadcast variable
48+
def getWordBlacklist(sparkContext):
49+
if ('wordBlacklist' not in globals()):
50+
globals()['wordBlacklist'] = sparkContext.broadcast(["a", "b", "c"])
51+
return globals()['wordBlacklist']
52+
53+
54+
# Get or register an Accumulator
55+
def getDroppedWordsCounter(sparkContext):
56+
if ('droppedWordsCounter' not in globals()):
57+
globals()['droppedWordsCounter'] = sparkContext.accumulator(0)
58+
return globals()['droppedWordsCounter']
59+
60+
4761
def createContext(host, port, outputPath):
4862
# If you do not see this printed, that means the StreamingContext has been loaded
4963
# from the new checkpoint
@@ -60,8 +74,22 @@ def createContext(host, port, outputPath):
6074
wordCounts = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
6175

6276
def echo(time, rdd):
63-
counts = "Counts at time %s %s" % (time, rdd.collect())
77+
# Get or register the blacklist Broadcast
78+
blacklist = getWordBlacklist(rdd.context)
79+
# Get or register the droppedWordsCounter Accumulator
80+
droppedWordsCounter = getDroppedWordsCounter(rdd.context)
81+
82+
# Use blacklist to drop words and use droppedWordsCounter to count them
83+
def filterFunc(wordCount):
84+
if wordCount[0] in blacklist.value:
85+
droppedWordsCounter.add(wordCount[1])
86+
False
87+
else:
88+
True
89+
90+
counts = "Counts at time %s %s" % (time, rdd.filter(filterFunc).collect())
6491
print(counts)
92+
print("Dropped %d word(s) totally" % droppedWordsCounter.value)
6593
print("Appending to " + os.path.abspath(outputPath))
6694
with open(outputPath, 'a') as f:
6795
f.write(counts + "\n")

0 commit comments

Comments
 (0)