You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
[SPARK-12429][STREAMING][DOC] Add Accumulator and Broadcast example for Streaming
This PR adds Scala, Java and Python examples to show how to use Accumulator and Broadcast in Spark Streaming to support checkpointing.
Author: Shixiong Zhu <shixiong@databricks.com>
Closesapache#10385 from zsxwing/accumulator-broadcast-example.
(cherry picked from commit 20591af)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
Copy file name to clipboardExpand all lines: docs/programming-guide.md
+3-3Lines changed: 3 additions & 3 deletions
Original file line number
Diff line number
Diff line change
@@ -806,7 +806,7 @@ However, in `cluster` mode, what happens is more complicated, and the above may
806
806
807
807
What is happening here is that the variables within the closure sent to each executor are now copies and thus, when **counter** is referenced within the `foreach` function, it's no longer the **counter** on the driver node. There is still a **counter** in the memory of the driver node but this is no longer visible to the executors! The executors only see the copy from the serialized closure. Thus, the final value of **counter** will still be zero since all operations on **counter** were referencing the value within the serialized closure.
808
808
809
-
To ensure well-defined behavior in these sorts of scenarios one should use an [`Accumulator`](#AccumLink). Accumulators in Spark are used specifically to provide a mechanism for safely updating a variable when execution is split up across worker nodes in a cluster. The Accumulators section of this guide discusses these in more detail.
809
+
To ensure well-defined behavior in these sorts of scenarios one should use an [`Accumulator`](#accumulators). Accumulators in Spark are used specifically to provide a mechanism for safely updating a variable when execution is split up across worker nodes in a cluster. The Accumulators section of this guide discusses these in more detail.
810
810
811
811
In general, closures - constructs like loops or locally defined methods, should not be used to mutate some global state. Spark does not define or guarantee the behavior of mutations to objects referenced from outside of closures. Some code that does this may work in local mode, but that's just by accident and such code will not behave as expected in distributed mode. Use an Accumulator instead if some global aggregation is needed.
812
812
@@ -1091,7 +1091,7 @@ for details.
1091
1091
</tr>
1092
1092
<tr>
1093
1093
<td> <b>foreach</b>(<i>func</i>) </td>
1094
-
<td> Run a function <i>func</i> on each element of the dataset. This is usually done for side effects such as updating an <ahref="#AccumLink">Accumulator</a> or interacting with external storage systems.
1094
+
<td> Run a function <i>func</i> on each element of the dataset. This is usually done for side effects such as updating an <ahref="#accumulators">Accumulator</a> or interacting with external storage systems.
1095
1095
<br /><b>Note</b>: modifying variables other than Accumulators outside of the <code>foreach()</code> may result in undefined behavior. See <ahref="#ClosuresLink">Understanding closures </a> for more details.</td>
1096
1096
</tr>
1097
1097
</table>
@@ -1336,7 +1336,7 @@ run on the cluster so that `v` is not shipped to the nodes more than once. In ad
1336
1336
`v` should not be modified after it is broadcast in order to ensure that all nodes get the same
1337
1337
value of the broadcast variable (e.g. if the variable is shipped to a new node later).
1338
1338
1339
-
## Accumulators <aname="AccumLink"></a>
1339
+
## Accumulators
1340
1340
1341
1341
Accumulators are variables that are only "added" to through an associative operation and can
1342
1342
therefore be efficiently supported in parallel. They can be used to implement counters (as in
Copy file name to clipboardExpand all lines: docs/streaming-programming-guide.md
+165Lines changed: 165 additions & 0 deletions
Original file line number
Diff line number
Diff line change
@@ -1415,6 +1415,171 @@ Note that the connections in the pool should be lazily created on demand and tim
1415
1415
1416
1416
***
1417
1417
1418
+
## Accumulators and Broadcast Variables
1419
+
1420
+
[Accumulators](programming-guide.html#accumulators) and [Broadcast variables](programming-guide.html#broadcast-variables) cannot be recovered from checkpoint in Spark Streaming. If you enable checkpointing and use [Accumulators](programming-guide.html#accumulators) or [Broadcast variables](programming-guide.html#broadcast-variables) as well, you'll have to create lazily instantiated singleton instances for [Accumulators](programming-guide.html#accumulators) and [Broadcast variables](programming-guide.html#broadcast-variables) so that they can be re-instantiated after the driver restarts on failure. This is shown in the following example.
1421
+
1422
+
<divclass="codetabs">
1423
+
<divdata-lang="scala"markdown="1">
1424
+
{% highlight scala %}
1425
+
1426
+
object WordBlacklist {
1427
+
1428
+
@volatile private var instance: Broadcast[Seq[String]] = null
val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
1462
+
// Get or register the droppedWordsCounter Accumulator
1463
+
val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)
1464
+
// Use blacklist to drop words and use droppedWordsCounter to count them
1465
+
val counts = rdd.filter { case (word, count) =>
1466
+
if (blacklist.value.contains(word)) {
1467
+
droppedWordsCounter += count
1468
+
false
1469
+
} else {
1470
+
true
1471
+
}
1472
+
}.collect()
1473
+
val output = "Counts at time " + time + " " + counts
1474
+
})
1475
+
1476
+
{% endhighlight %}
1477
+
1478
+
See the full [source code]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala).
public Boolean call(Tuple2<String, Integer> wordCount) throws Exception {
1527
+
if (blacklist.value().contains(wordCount._1())) {
1528
+
droppedWordsCounter.add(wordCount._2());
1529
+
return false;
1530
+
} else {
1531
+
return true;
1532
+
}
1533
+
}
1534
+
}).collect().toString();
1535
+
String output = "Counts at time " + time + " " + counts;
1536
+
}
1537
+
}
1538
+
1539
+
{% endhighlight %}
1540
+
1541
+
See the full [source code]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java).
# Use blacklist to drop words and use droppedWordsCounter to count them
1563
+
def filterFunc(wordCount):
1564
+
if wordCount[0] in blacklist.value:
1565
+
droppedWordsCounter.add(wordCount[1])
1566
+
False
1567
+
else:
1568
+
True
1569
+
1570
+
counts = "Counts at time %s %s" % (time, rdd.filter(filterFunc).collect())
1571
+
1572
+
wordCounts.foreachRDD(echo)
1573
+
1574
+
{% endhighlight %}
1575
+
1576
+
See the full [source code]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/python/streaming/recoverable_network_wordcount.py).
1577
+
1578
+
</div>
1579
+
</div>
1580
+
1581
+
***
1582
+
1418
1583
## DataFrame and SQL Operations
1419
1584
You can easily use [DataFrames and SQL](sql-programming-guide.html) operations on streaming data. You have to create a SQLContext using the SparkContext that the StreamingContext is using. Furthermore this has to done such that it can be restarted on driver failures. This is done by creating a lazily instantiated singleton instance of SQLContext. This is shown in the following example. It modifies the earlier [word count example](#a-quick-example) to generate word counts using DataFrames and SQL. Each RDD is converted to a DataFrame, registered as a temporary table and then queried using SQL.
0 commit comments