Bda Unit 5
Bda Unit 5
Bda Unit 5
Performance Tuning:
1. Streaming Context
2. DStream
3. Caching
4. Accumulators, Broadcast Variables and Checkpoints
Streaming Context
Streaming Context consumes a stream of data in Spark. It registers an Input
DStream to produce a Receiver object. It is the main entry point for Spark
functionality. Spark provides a number of default implementations of sources
like Twitter, Akka Actor and ZeroMQ that are accessible from the context.
Figure: The Receiver sends data onto the Input DStream where each Batch
contains RDDs
Covered the core concepts and basic APIs; this chapter dives into event-time
andstateful processing. Event-time processing is a hot topic because we analyze
information with respect to the time that it was created, not processed. The key
idea between this style of processing is that over the lifetime of the job, Spark
will maintain relevant state that it can update over the course of the job before
outputting it to the sink.
Let’s cover these concepts in greater detail before we begin working with code
to show they work.
Event Time
Event time
Event time is the time that is embedded in the data itself. It is most often,
Processing time
Apache Spark is a general processing engine built on top of the Hadoop eco-
system. Spark has a complete setup and a unified framework to process any
kind of data. Spark can do batch processing as well as stream processing. Spark
has a powerful SQL engine to run SQL queries on the data; it also has an
integrated Machine Learning library called MlLib and a graph processing
library called GraphX. As it can integrate many things into it, we identify Spark
as a unified framework rather than a processing engine.
Now coming to the real-time stream processing engine of Spark. Spark doesn’t
process the data in real time it does a near-real-time processing. It means it
processes the data in micro batches, in just a few milliseconds.
But what if you need to the accumulate the results from the start of the
streaming job. Which means you need to check the previous state of the RDD in
order to update the new state of the RDD. This is what is known
as stateful streaming in Spark.
Spark provides 2 API’s to perform stateful streaming, which
is updateStateByKey and mapWithState.
Now we will see how to perform stateful streaming of wordcount
using updateStateByKey. UpdateStateByKey is a function of Dstreams in
Spark which accepts an update function as its parameter. In that update function,
you need to provide the following parameters newState for the key which is
a seq of values and the previous state of key as an Option[?].
Let’s take a word count program, let’s say for the first 10 seconds we have given
this data hello every one from acadgild. Now the wordcount program result
will be
(one,1)
(hello,1)
(from,1)
(acadgild,1)
(every,1)
Now without writing the updateStateByKey function, if you give some other
data, in the next 10 seconds i.e. let’s assume we give the same line hello every
one from acadigld. Now we will get the same result in the next 10 seconds also
i.e.,
(one,1)
(hello,1)
(from,1)
(acadgild,1)
(every,1)
umbling Windows
In a tumbling window, tuples are grouped in a single window based on time or
count. A tuple belongs to only one window.
An example would be to compute the average price of a stock over the last five
minutes, computed every five minutes.
import spark.implicits._
+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
| James| Sales| 3000|
| Michael| Sales| 4600|
| Robert| Sales| 4100|
| Maria| Finance| 3000|
| James| Sales| 3000|
| Scott| Finance| 3300|
| Jen| Finance| 3900|
| Jeff| Marketing| 3000|
| Kumar| Marketing| 2000|
| Saif| Sales| 4100|
+-------------+----------+------+
On the above table, I’ve highlighted all duplicate rows, As you notice
we have 2 rows that have duplicate values on all columns and we have
4 rows that have duplicate values on “department” and “salary”
columns.
On the above dataset, we have a total of 10 rows and one row with all
values duplicated, performing distinct on this DataFrame should get us
9 as we have one duplicate.
Distinct count: 9
+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|James |Sales |3000 |
|Michael |Sales |4600 |
|Maria |Finance |3000 |
|Robert |Sales |4100 |
|Saif |Sales |4100 |
|Scott |Finance |3300 |
|Jeff |Marketing |3000 |
|Jen |Finance |3900 |
|Kumar |Marketing |2000 |
+-------------+----------+------+
Alternatively, you can also run dropDuplicates() function which return
a new DataFrame with duplicate rows removed.
package com.sparkbyexamples.spark.dataframe.functions.aggregate
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
spark.sparkContext.setLogLevel("ERROR")
import spark.implicits._
Spark Core
Contains the Spark core execution engine and a set of low-level functional API
used to distribute computations to a cluster of computing resources.
Spark SQL
It Is a module for working with structed data. It implements the higher-level
Dataset and DataFrame APIs of Spark and adds SQL support on top of it.
This feature was first introduced in Spark 2.0 in July 2016. It is built on top of
Spark SQL abstraction. Structured Streaming enriches Dataset and DataFrame
APIs with streaming capabilities. It requires the specification of a schema for
the data in the stream.
Consider that all data arriving are treated as an unbounded input table. Every
data item is like a new row being appended to the input Table.
A query on the input will generate the “Result Table”. Each time a trigger fires,
Spark checks for new data (new row in the input table), and incrementally
updates the result.
Transformations on Streams:
Apache Spark Streaming Transformation Operations
1. Objective
Through this Apache Spark Transformation Operations tutorial, you will learn
about various Apache Spark streaming transformation operations with example
being used by Spark professionals for playing with Apache Spark Streaming
concepts. You will learn the Streaming operations like Spark Map operation,
flatmap operation, Spark filter operation, count operation, Spark ReduceByKey
operation, Spark CountByValue operation with example and Spark
UpdateStateByKey operation with example that will help you in your Spark
jobs.
Map function in Spark passes each element of the source DStream through a
function and returns a new DStream.
Spark map() example
1. val conf = new
SparkConf().setMaster("local[2]") .setAppName("MapOpTest")
2. val ssc = new StreamingContext(conf , Seconds(1))
3. val words = ssc.socketTextStream("localhost", 9999)
4. val ans = words.map { word => ("hello" ,word ) } // map hello with each
line
5. ans.print()
6. ssc.start() // Start the computation
7. ssc.awaitTermination() // Wait for termination
8. }
b. flatMap()
Filter function in Apache Spark returns selects only those records of the source
DStream on which func returns true and returns a new DStream of those
records.
Spark Filter function example
1. val lines = ssc.socketTextStream("localhost", 9999)
2. val words = lines.flatMap(_.split(" "))
3. val output = words.filter { word => word.startsWith("s") } // filter the
words starts with letter“s”
4. output.print()
HadoopInputFormat
In Spark, we can implement the InputFormat of Hadoop to process the data,
similar to Hadoop. For this, Spark provides API's of Hadoop in Java, Scala,
Python.
• hadoopRDD
• hadoopFile
New APIs (Which support MapReduce libraries of Hadoop)
• newAPIHadoopRDD
• newAPIHadoopFile
We can implement the APIs using Spark context as shown below.
Old APIs:
SparkContext.hadoopRDD(conf, inputFormatClass, keyClass, valueClass,
minPartitions)
Here’s the explanation of the above line:
To use or implement any Java class in a Scala project, we need to use the
syntax classOf[class_name].
Here KeyValueTextInputFormat and Text are Hadoop's IO classes written in
Java. In this place, we can use our custom input format classes also, which will
be discussed in our next post.
• Path - Here in path, we need to give the path of the output file where it
need to be saved.
• Keyclass - Here you need to give the output key class.
• Valueclass - Here you need to give the output value class.
• outputFormatClass – Here you need to give the outputFormat class.
• conf - Here you need to give the Hadoop configuration –
org.apache.hadoop.mapred.jobConf.
• codec – Here you need to give the compression format.
• conf and codec are optional parameters.
SparkContext.saveAsNewAPIHadoopFile(path, keyClass, valueClass,
outputFormatClass, conf)
The explanation of the above line is as follows:
• Path - Here in path, we need to give the path of the output file where it
need to be saved.
• Keyclass - Here you need to give the output key class.
• Valueclass - Here you need to give the output value class.
In the above screenshot, you can see that we have saved the output using old
API of Hadoop i.e., using mapred libraries.
Now, we will save the same using new APIs of Hadoop i.e., using
mapreducelibraries. You can refer to the below screenshot for the same.