Bda Unit 5

Download as pdf or txt
Download as pdf or txt
You are on page 1of 29

UNIT V

Spark-Performance Tuning, Stream Processing Fundamentals, Event-Time and


State full Processing - Event Time, State full Processing, Windows on Event
Time- Tumbling Windows, Handling Late Data with Watermarks, Dropping
Duplicates in a Stream, Structured Streaming Basics - Core Concepts,
Structured Streaming in Action, Transformations on Streams, Input and Output.

Performance Tuning:

• Caching Data In Memory


• Other Configuration Options
• Broadcast Hint for SQL Queries
For some workloads, it is possible to improve performance by either caching
data in memory, or by turning on some experimental options.

Caching Data In Memory


Spark SQL can cache tables using an in-memory columnar format by
calling spark.catalog.cacheTable("tableName") or dataFrame.cache(). Then
Spark SQL will scan only required columns and will automatically tune
compression to minimize memory usage and GC pressure. You can
call spark.catalog.uncacheTable("tableName") to remove the table from
memory.
Configuration of in-memory caching can be done using the setConf method
on SparkSession or by running SET key=value commands using SQL.

Property Name Default Meaning

spark.sql.inMemoryColumnarStorage.compressed true When set to true Spark SQL will


automatically select a compression codec for each
column based on statistics of thedata.

spark.sql.inMemoryColumnarStorage.batchSize 10000 Controls the size of batches for columnar


caching. Larger batch sizes can improve memory
utilization and compression,but risk
OOMs when caching data.

BDA M Tech UNIT 5 1 Sem 1 MLWEC


Other Configuration Options
The following options can also be used to tune the performance of query
execution. It is possible that these options will be deprecated in future release as
more optimizations are performed automatically.

Property Name Default Meaning

spark.sql.files.maxPartitionBytes 134217728 The maximum number of bytes to pack into a


(128 MB) single partition when reading files.

spark.sql.files.openCostInBytes 4194304 (4 The estimated cost to open a file, measured by


MB) the number of bytes could be scanned in the
same time. This is used when putting multiple
files into a partition. It is better to over-
estimated, then the partitions with small files
will be faster than partitions with bigger files
(which is scheduled first).

spark.sql.broadcastTimeout 300 Timeout in seconds for the broadcast wait time


in broadcast joins

spark.sql.autoBroadcastJoinThreshold 10485760 Configures the maximum size in bytes for a table


(10 MB) that will be broadcast to all worker nodes when
performing a join. By setting this value to -1
broadcasting can be disabled. Note that currently
statistics are only supported for Hive Metastore
tables where the command ANALYZE TABLE
<tableName> COMPUTE STATISTICS
noscan has been run.

spark.sql.shuffle.partitions 200 Configures the number of partitions to use when


shuffling data for joins or aggregations.

BDA M Tech UNIT 5 1 Sem 2 MLWEC


Broadcast Hint for SQL Queries
The BROADCAST hint guides Spark to broadcast each specified table when
joining them with another table or view. When Spark deciding the join methods,
the broadcast hash join (i.e., BHJ) is preferred, even if the statistics is above the
configuration spark.sql.autoBroadcastJoinThreshold. When both sides of a join
are specified, Spark broadcasts the one having the lower statistics. Note Spark
does not guarantee BHJ is always chosen, since not all cases (e.g. full outer
join) support BHJ. When the broadcast nested loop join is selected, we still
respect the hint.
-- We accept BROADCAST, BROADCASTJOIN and MAPJOIN for broadcast
hint
SELECT /*+ BROADCAST(r) */ * FROM records r JOIN src s ON r.key =
s.key

Stream Processing Fundamentals:


Spark Streaming Fundamentals

1. Streaming Context
2. DStream
3. Caching
4. Accumulators, Broadcast Variables and Checkpoints

Streaming Context
Streaming Context consumes a stream of data in Spark. It registers an Input
DStream to produce a Receiver object. It is the main entry point for Spark
functionality. Spark provides a number of default implementations of sources
like Twitter, Akka Actor and ZeroMQ that are accessible from the context.

A StreamingContext object can be created from a SparkContext object. A


SparkContext represents the connection to a Spark cluster and can be used to
create RDDs, accumulators and broadcast variables on that cluster.
1 import org.apache.spark._

BDA M Tech UNIT 5 1 Sem 3 MLWEC


2 import org.apache.spark.streaming._
3 var ssc = new StreamingContext(sc,Seconds(1))
DStream
Discretized Stream (DStream) is the basic abstraction provided by Spark
Streaming. It is a continuous stream of data. It is received from a data source or
a processed data stream generated by transforming the input stream.

Figure: Extracting words from an Input DStream

Internally, a DStream is represented by a continuous series of RDDs and each


RDD contains data from a certain interval.
Input DStreams: Input DStreams are DStreams representing the stream of
input data received from streaming sources.

Figure: The Receiver sends data onto the Input DStream where each Batch
contains RDDs

BDA M Tech UNIT 5 1 Sem 4 MLWEC


Every input DStream is associated with a Receiver object which receives the
data from a source and stores it in Spark’s memory for processing.
Transformations on DStreams:
Any operation applied on a DStream translates to operations on the underlying
RDDs. Transformations allow the data from the input DStream to be modified
similar to RDDs. DStreams support many of the transformations available on
normal Spark RDDs.

Figure: DStream Transformations


The following are some of the popular transformations on DStreams:

map(func) returns a new DStream by passing each element


map(func)
of the source DStream through a function func.

flatMap(func) is similar to map(func) but each input item


can be mapped to 0 or more output items and returns a
flatMap(func)
new DStream by passing each source element through a
function func.

filter(func) returns a new DStream by selecting only the


filter(func)
records of the source DStream on which func returns true.

reduce(func) returns a new DStream of single-element


reduce(func) RDDs by aggregating the elements in each RDD of the
source DStream using a function func.

groupBy(func) returns the new RDD which basically is


groupBy(func) made up with a key and corresponding list of items of that
group.

BDA M Tech UNIT 5 1 Sem 5 MLWEC


Output DStreams:
Output operations allow DStream’s data to be pushed out to external systems
like databases or file systems. Output operations trigger the actual execution of
all the DStream transformations.

Figure: Output Operations on DStreams


Caching
DStreams allow developers to cache/ persist the stream’s data in memory. This
is useful if the data in the DStream will be computed multiple times. This can be
done using the persist() method on a DStream.

Figure: Caching into 2 Nodes


For input streams that receive data over the network (such as Kafka, Flume,
Sockets, etc.), the default persistence level is set to replicate the data to two
nodes for fault-tolerance.
Accumulators, Broadcast Variables and Checkpoints
Accumulators: Accumulators are variables that are only added through an
associative and commutative operation. They are used to implement counters or
sums. Tracking accumulators in the UI can be useful for understanding the
progress of running stages. Spark natively supports numeric accumulators. We
can create named or unnamed accumulators.
Broadcast Variables: Broadcast variables allow the programmer to keep a
read-only variable cached on each machine rather than shipping a copy of it
with tasks. They can be used to give every node a copy of a large input dataset
in an efficient manner. Spark also attempts to distribute broadcast variables
using efficient broadcast algorithms to reduce communication cost.
BDA M Tech UNIT 5 1 Sem 6 MLWEC
Checkpoints: Checkpoints are similar to checkpoints in gaming. They make it
run 24/7 and make it resilient to failures unrelated to the application logic.

Figure: Features of Checkpoints

Event-Time and Stateful Processing

Covered the core concepts and basic APIs; this chapter dives into event-time
andstateful processing. Event-time processing is a hot topic because we analyze
information with respect to the time that it was created, not processed. The key
idea between this style of processing is that over the lifetime of the job, Spark
will maintain relevant state that it can update over the course of the job before
outputting it to the sink.

Let’s cover these concepts in greater detail before we begin working with code
to show they work.

Event Time

Event time is an important topic to cover discretely because Spark’s DStream


API does not support processing information with respect to event-time. At a
higher level, in stream-processing systems there are effectively two relevant
times for each event: the time at which it actually occurred (event time), and the
time that it was processed or reached the stream-processing system (processing
time).

Event time

Event time is the time that is embedded in the data itself. It is most often,

BDA M Tech UNIT 5 1 Sem 7 MLWEC


though not required to be, the time that an event actually occurs. This is
important to use because it provides a more robust way of comparing
events against one another. The challenge here is that event data can be
late or out of order. This means that the stream processing system must be
able to handle out-of-order or late data.

Processing time

Processing time is the time at which the stream-processing system ...

Stateful Streaming in Apache Spark

Apache Spark is a general processing engine built on top of the Hadoop eco-
system. Spark has a complete setup and a unified framework to process any
kind of data. Spark can do batch processing as well as stream processing. Spark
has a powerful SQL engine to run SQL queries on the data; it also has an
integrated Machine Learning library called MlLib and a graph processing
library called GraphX. As it can integrate many things into it, we identify Spark
as a unified framework rather than a processing engine.

Now coming to the real-time stream processing engine of Spark. Spark doesn’t
process the data in real time it does a near-real-time processing. It means it
processes the data in micro batches, in just a few milliseconds.

BDA M Tech UNIT 5 1 Sem 8 MLWEC


Here we have got a program where Spark’s streaming context will process the
data in micro batches but generally, this processing is stateless. Let’s take we
have defined the streaming Context to run for every 10 seconds, it will process
the data that is arrived within that 10 seconds, to process the previous data we
have something called windows concept, windows cannot give the accumulated
results from the starting timestamp.

But what if you need to the accumulate the results from the start of the
streaming job. Which means you need to check the previous state of the RDD in
order to update the new state of the RDD. This is what is known
as stateful streaming in Spark.
Spark provides 2 API’s to perform stateful streaming, which
is updateStateByKey and mapWithState.
Now we will see how to perform stateful streaming of wordcount
using updateStateByKey. UpdateStateByKey is a function of Dstreams in
Spark which accepts an update function as its parameter. In that update function,
you need to provide the following parameters newState for the key which is
a seq of values and the previous state of key as an Option[?].
Let’s take a word count program, let’s say for the first 10 seconds we have given
this data hello every one from acadgild. Now the wordcount program result
will be
(one,1)
(hello,1)
(from,1)
(acadgild,1)
(every,1)
Now without writing the updateStateByKey function, if you give some other
data, in the next 10 seconds i.e. let’s assume we give the same line hello every
one from acadigld. Now we will get the same result in the next 10 seconds also
i.e.,
(one,1)
(hello,1)
(from,1)
(acadgild,1)
(every,1)

BDA M Tech UNIT 5 1 Sem 9 MLWEC


Windows on Event Time- Tumbling Windows:

umbling Windows
In a tumbling window, tuples are grouped in a single window based on time or
count. A tuple belongs to only one window.

For example, consider a time-based tumbling window with a length of five


seconds. The first window (w1) contains events that arrived between the zeroth
and fifth seconds. The second window (w2) contains events that arrived
between the fifth and tenth seconds, and the third window (w3) contains events
that arrived between tenth and fifteenth seconds. The tumbling window is
evaluated every five seconds, and none of the windows overlap; each segment
represents a distinct time segment.

An example would be to compute the average price of a stock over the last five
minutes, computed every five minutes.

Handling Late Data with Watermarks:

BDA M Tech UNIT 5 1 Sem 10 MLWEC


BDA M Tech UNIT 5 1 Sem 11 MLWEC
Dropping Duplicates in a Stream:
Duplicate rows could be remove or drop from Spark DataFrame
using distinct() and dropDuplicates() functions, distinct() can be used
to remove rows that have the same values on all columns whereas
dropDuplicates() can be used to remove rows that have the same
values on multiple selected columns.

BDA M Tech UNIT 5 1 Sem 12 MLWEC


Before we start, first let’s create a DataFrame with some duplicate
rows and duplicate values on a few columns.

import spark.implicits._

val simpleData = Seq(("James", "Sales", 3000),


("Michael", "Sales", 4600),
("Robert", "Sales", 4100),
("Maria", "Finance", 3000),
("James", "Sales", 3000),
("Scott", "Finance", 3300),
("Jen", "Finance", 3900),
("Jeff", "Marketing", 3000),
("Kumar", "Marketing", 2000),
("Saif", "Sales", 4100)
)
val df = simpleData.toDF("employee_name", "department", "salary")
df.show()

Yields below output

+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
| James| Sales| 3000|
| Michael| Sales| 4600|
| Robert| Sales| 4100|
| Maria| Finance| 3000|
| James| Sales| 3000|
| Scott| Finance| 3300|
| Jen| Finance| 3900|
| Jeff| Marketing| 3000|
| Kumar| Marketing| 2000|
| Saif| Sales| 4100|
+-------------+----------+------+
On the above table, I’ve highlighted all duplicate rows, As you notice
we have 2 rows that have duplicate values on all columns and we have
4 rows that have duplicate values on “department” and “salary”
columns.

BDA M Tech UNIT 5 1 Sem 13 MLWEC


Use distinct() to remove duplicate rows on DataFrame

On the above dataset, we have a total of 10 rows and one row with all
values duplicated, performing distinct on this DataFrame should get us
9 as we have one duplicate.

//Distinct all columns


val distinctDF = df.distinct()
println("Distinct count: "+distinctDF.count())
distinctDF.show(false)
distinct() function on DataFrame returns a new DataFrame after
removing the duplicate records. This example yields the below output.

Distinct count: 9
+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|James |Sales |3000 |
|Michael |Sales |4600 |
|Maria |Finance |3000 |
|Robert |Sales |4100 |
|Saif |Sales |4100 |
|Scott |Finance |3300 |
|Jeff |Marketing |3000 |
|Jen |Finance |3900 |
|Kumar |Marketing |2000 |
+-------------+----------+------+
Alternatively, you can also run dropDuplicates() function which return
a new DataFrame with duplicate rows removed.

val df2 = df.dropDuplicates()


println("Distinct count: "+df2.count())
df2.show(false)

BDA M Tech UNIT 5 1 Sem 14 MLWEC


Use dropDuplicate() to remove duplicate rows on DataFrame
Spark doesn’t have a distinct method which takes columns that should
run distinct on however, Spark provides another signature
of dropDuplicates() function which takes multiple columns to
eliminate duplicates.
Note that calling dropDuplicates() on DataFrame returns a new
DataFrame with duplicate rows removed.

//Distinct using dropDuplicates


val dropDisDF = df.dropDuplicates("department","salary")
println("Distinct count of department & salary : " +dropDisDF.count())
dropDisDF.show(false)
Yields below output. If you notice the output, It dropped 2 records that
are duplicate.

Distinct count of department & salary : 8


+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|Jen |Finance |3900 |
|Maria |Finance |3000 |
|Scott |Finance |3300 |
|Michael |Sales |4600 |
|Kumar |Marketing |2000 |
|Robert |Sales |4100 |
|James |Sales |3000 |
|Jeff |Marketing |3000 |
+-------------+----------+------+
Source code – Remove Duplicate Rows

package com.sparkbyexamples.spark.dataframe.functions.aggregate

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

object SQLDistinct extends App {

BDA M Tech UNIT 5 1 Sem 15 MLWEC


val spark: SparkSession = SparkSession.builder()
.master("local[1]")
.appName("SparkByExamples.com")
.getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

import spark.implicits._

val simpleData = Seq(("James", "Sales", 3000),


("Michael", "Sales", 4600),
("Robert", "Sales", 4100),
("Maria", "Finance", 3000),
("James", "Sales", 3000),
("Scott", "Finance", 3300),
("Jen", "Finance", 3900),
("Jeff", "Marketing", 3000),
("Kumar", "Marketing", 2000),
("Saif", "Sales", 4100)
)
val df = simpleData.toDF("employee_name", "department", "salary")
df.show()

//Distinct all columns


val distinctDF = df.distinct()
println("Distinct count: "+distinctDF.count())
distinctDF.show(false)

val df2 = df.dropDuplicates()


println("Distinct count: "+df2.count())
df2.show(false)

//Distinct using dropDuplicates


val dropDisDF = df.dropDuplicates("department","salary")
println("Distinct count of department & salary :
"+dropDisDF.count())
dropDisDF.show(false)

BDA M Tech UNIT 5 1 Sem 16 MLWEC


Structured Streaming Basics:
STRUCTURED STREAMING IN SPARK
Streaming processing is a set of techniques used to extract information from
unbounded data (a type of dataset theoretically infinite in size)

Some examples of streaming are device monitoring, fault detection, billing


modernization, fleet management, media recommendations, faster loans… In all
these cases, we can imagine that data should be consumed as fresh as possible.

INTRODUCING APACHE SPARK


Apache Spark is fast, reliable and fault-tolerant distributed computing
framework for large-scale data processing. Spark memory model uses RAM to
cache data as it is being processed, so it can be 100 times faster than Hadoop
MapReduce.

Its core is Resilient Distributed Dataset. RDD have a rich functional


programming model that abstracted the complexities of distributed computing
on a cluster. It offers a better programming model than Map Reduce. We have 2
types of operations in Spark: transformation and action. The transformations
(for example map, flatmap, join, filter) are lazy evaluated, it means that the
execution will not start immediately, whereas actions (for example count,
collect, reduce) are eager operations computed on the distributed system to
produce the result.

BDA M Tech UNIT 5 1 Sem 17 MLWEC


SPARK COMPONENTS

Learning Spark. O’Reilly. 2015

Spark Core
Contains the Spark core execution engine and a set of low-level functional API
used to distribute computations to a cluster of computing resources.

Spark SQL
It Is a module for working with structed data. It implements the higher-level
Dataset and DataFrame APIs of Spark and adds SQL support on top of it.

The libraries built on top of these are: MLLib for machine


learning, GraphFrames for graph analysis, and 2 APIs for stream
processing: Spark Streaming and Structured Streaming. In this article, we
will focus on Structured Streaming.

BDA M Tech UNIT 5 1 Sem 18 MLWEC


STRUCTURED STREAMING PROCESSING

This feature was first introduced in Spark 2.0 in July 2016. It is built on top of
Spark SQL abstraction. Structured Streaming enriches Dataset and DataFrame
APIs with streaming capabilities. It requires the specification of a schema for
the data in the stream.

BASIC CONCEPT OF STREAMING PROCESSING

Consider that all data arriving are treated as an unbounded input table. Every
data item is like a new row being appended to the input Table.

Learning Spark. O’Reilly, 2015

A query on the input will generate the “Result Table”. Each time a trigger fires,
Spark checks for new data (new row in the input table), and incrementally
updates the result.

BDA M Tech UNIT 5 1 Sem 19 MLWEC


The last part of the model is output modes. Each time the result table is
updated, we want to write the changes to an external system, such as S3, HDFS
or a database. We usually want to write output incrementally. For this purpose,
Structured Streaming provides three output modes:
• Append: Only the new rows appended to the result table since the last
trigger will be written to the external storage. This is applicable only on queries
where existing rows in the result table cannot change (e.g. a map on an input
stream).
• Complete: The entire updated result table will be written to external
storage.
• Update: Only the rows that were updated in the result table since the last
trigger will be changed in the external storage. This mode works for output
sinks that can be updated in place, such as a MySQL table.
Let’s see an example: how can we run our mobile monitoring application in this
model. The query is designed to compute a count of actions grouped by (action,
hour). When this query is started, spark will continuously check for new data. If
there are new ones, Spark will run an “incremental” query that combines the

BDA M Tech UNIT 5 1 Sem 20 MLWEC


previous running counts with the new data to compute updated counts as shown
below.

Transformations on Streams:
Apache Spark Streaming Transformation Operations
1. Objective

Through this Apache Spark Transformation Operations tutorial, you will learn
about various Apache Spark streaming transformation operations with example
being used by Spark professionals for playing with Apache Spark Streaming
concepts. You will learn the Streaming operations like Spark Map operation,
flatmap operation, Spark filter operation, count operation, Spark ReduceByKey
operation, Spark CountByValue operation with example and Spark
UpdateStateByKey operation with example that will help you in your Spark
jobs.

BDA M Tech UNIT 5 1 Sem 21 MLWEC


Apache Spark Streaming Transformation Operations

2. Introduction to Apache Spark Streaming Transformation Operations

Before we start learning the various Streaming operations in Spark, let us


revise Spark Streaming concepts.
Below are the various most common Streaming transformation functions being
used in Spark industry:
a. map()

Map function in Spark passes each element of the source DStream through a
function and returns a new DStream.
Spark map() example
1. val conf = new
SparkConf().setMaster("local[2]") .setAppName("MapOpTest")
2. val ssc = new StreamingContext(conf , Seconds(1))
3. val words = ssc.socketTextStream("localhost", 9999)
4. val ans = words.map { word => ("hello" ,word ) } // map hello with each
line
5. ans.print()
6. ssc.start() // Start the computation
7. ssc.awaitTermination() // Wait for termination
8. }
b. flatMap()

FlatMap function in Spark is similar to Spark map function, but in flatmap,


input item can be mapped to 0 or more output items. This creates difference
BDA M Tech UNIT 5 1 Sem 22 MLWEC
between map and flatmap operations in spark.
Spark FlatMap Example
1. val lines = ssc.socketTextStream("localhost", 9999)
2. val words = lines.flatMap(_.split(" ")) // for each line it split the words by
space
3. val pairs = words.map(word => (word, 1))
4. val wordCounts = pairs.reduceByKey(_ + _)
5. wordCounts.print()
c. filter()

Filter function in Apache Spark returns selects only those records of the source
DStream on which func returns true and returns a new DStream of those
records.
Spark Filter function example
1. val lines = ssc.socketTextStream("localhost", 9999)
2. val words = lines.flatMap(_.split(" "))
3. val output = words.filter { word => word.startsWith("s") } // filter the
words starts with letter“s”
4. output.print()

Apache Spark Streaming Transformation Operations


d. reduceByKey(func, [numTasks])

When called on a DStream of (K, V) pairs, ReduceByKey function in Spark


returns a new DStream of (K, V) pairs where the values for each key are

BDA M Tech UNIT 5 1 Sem 23 MLWEC


aggregated using the given reduce function.
Spark reduceByKey example
1. val lines = ssc.socketTextStream("localhost", 9999)
2. val words = lines.flatMap(_.split(" "))
3. val pairs = words.map(word => (word, 1))
4. val wordCounts = pairs.reduceByKey(_ + _)
5. wordCounts.print()
e. countByValue()

CountByValue function in Spark is called on a DStream of elements of type K


and it returns a new DStream of (K, Long) pairs where the value of each key is
its frequency in each Spark RDD of the source DStream.
Spark CountByValue function example
1. val line = ssc.socketTextStream("localhost", 9999)
2. val words = line.flatMap(_.split(" "))
3. words.countByValue().print()
f. UpdateStateByKey()

The updateStateByKey operation allows you to maintain arbitrary state while


continuously updating it with new information. To use this, you will have to do
two steps.
Define the state – The state can be an arbitrary data type.
Define the state update function – Specify with a function how to update the
state using the previous state and the new values from an input stream.
In every batch, Spark will apply the state update function for all existing keys,
regardless of whether they have new data in a batch or not. If the update
function returns None then the key-value pair will be eliminated.
Spark UpdateByKey Example
1. def updateFunc(values: Seq[Int], state: Option[Int]): Option[Int] = {
2. val currentCount = values.sum
3. val previousCount = state.getOrElse(0)
4. Some(currentCount + previousCount)
5. }
6. val ssc = new StreamingContext(conf , Seconds(10))
7. val line = ssc.socketTextStream("localhost", 9999)
8. ssc.checkpoint("/home/asus/checkpoints/") // Here ./checkpoints/ are the
directory where all checkpoints are stored.
9. val words = line.flatMap(_.split(" "))
10. val pairs = words.map(word => (word, 1))

BDA M Tech UNIT 5 1 Sem 24 MLWEC


11. val globalCountStream = pairs.updateStateByKey(updateFunc)
12. globalCountStream.print()
13. ssc.start() // Start the computation
14. ssc.awaitTermination()

Input and Output:


In order to understand the concepts explained here, it is best to have some basic
knowledge of Apache Spark. We recommend you to go through the following
posts before going through this post: Beginners Guide to Spark and Spark
RDD's in Scala.
Now, let’s discuss about the input formats of Hadoop. An input split is nothing
but the chunk of data that is present in HDFS. Each mapper will work on each
input split. Before going through the map method, RecordReader will work on
the input splits and arrange the records in key-value format.
The InputFormat describes the input-specification for a Map-Reduce job.
The Map-Reduce framework relies on the InputFormat of the job to do the
following:

• Validate the input-specification of the job.


• Split-up the input file(s) into logical InputSplits, each of which is then
assigned to an individual Mapper.
• Provide the RecordReader implementation to be used to clean input
records from the logical InputSplit for processing by the Mapper.

The default behavior of file-based InputFormats, typically sub-classes


of FileInputFormat, is to split the input into logicalInputSplits based on the total
size, in bytes, of the input files. However, the FileSystem block size of the input
files is treated as an upper bound for input splits. A lower bound on the split size
can be set via mapreduce.input.fileinputformat.split.minsize.
By default, Hadoop takes TextInputFormat, where columns in each record are
separated by tab space.
This is also called as KeyValueInputFormat. The keys and values used in
Hadoop are serialized.

HadoopInputFormat
In Spark, we can implement the InputFormat of Hadoop to process the data,
similar to Hadoop. For this, Spark provides API's of Hadoop in Java, Scala,
Python.

BDA M Tech UNIT 5 1 Sem 25 MLWEC


Now, let’s look at a demo using Hadoop input formats in spark.
Spark has given support for both the old and new APIs of Hadoop. They are as
follows:
Old APIs (Which support MapReduce libraries of Hadoop)

• hadoopRDD
• hadoopFile
New APIs (Which support MapReduce libraries of Hadoop)

• newAPIHadoopRDD
• newAPIHadoopFile
We can implement the APIs using Spark context as shown below.
Old APIs:
SparkContext.hadoopRDD(conf, inputFormatClass, keyClass, valueClass,
minPartitions)
Here’s the explanation of the above line:

• conf – Here the conf to be passed is org.apache.hadoop.mapred.JobConf.


In this specific format, we need to pass the input file from the
configuration.
• InputFormatClass – Here you need to pass the Input format class of
Hadoop.
• KeyClass – Here you need to pass the input Key class.
• ValueClass – Here you need to pass the input Value class.
SparkContext.hadoopFile(path, inputFormatClass, keyClass, valueClass,
minPartitions)
The above line can be explained like so:

• Path - Here Input file is passed as the arguments itself in path.


• InputFormatClass – Here you need to pass the Input format class of
Hadoop.
• KeyClass – Here you need to pass the input Key class.
• ValueClass – Here you need to pass the input Value class.
• minPartitions- Here you need to Specify the minimum number of
partitions.
• New APIs
SparkContext.newAPIHadoopRDD(conf, fClass, kClass, vClass)

BDA M Tech UNIT 5 1 Sem 26 MLWEC


Here, the conf to be passed is org.apache.hadoop.conf.Configuration.

• fClass is the Input format class


• kClass is the input key class
• vClass is the input value class
SparkContext.newAPIHadoopFile(conf, fClass, kClass, vClass)
Here, the conf to be passed is org.apache.hadoop.conf.Configuration

• fClass is the Input format class


• kClass is the input key class
• vClass is the input value class
Now, let’s look at a demo using one input file. The input data we are using here
is:
• Manjunath50,000
• Kiran40,0000
• Onkar45,0000
• Prateek45,0000
Now, we will implement KeyValueTextInputFormat on this data.
val input = sc.newAPIHadoopFile("/home/kiran/Hadoop_input",
classOf[KeyValueTextInputFormat], classOf[Text],classOf[Text])

To use or implement any Java class in a Scala project, we need to use the
syntax classOf[class_name].
Here KeyValueTextInputFormat and Text are Hadoop's IO classes written in
Java. In this place, we can use our custom input format classes also, which will
be discussed in our next post.

BDA M Tech UNIT 5 1 Sem 27 MLWEC


HadoopOutputFormat
Let’s look at HadoopOutputFormat and how to implement it in Spark. By
default, Hadoop takes TextOutputFormat, where the key and value of the output
are saved in the part file separated by commas.
The same can be implemented in Spark. Spark provides APIs for both old and
new APIs of Hadoop. Spark provides API's for both mapred and MapReduce
output formats.
Mapred API is saveAsHadoopFile
SparkContext.saveAsHadoopFile(path, keyClass, valueClass,
outputFormatClass, conf, codec)
The explanation of the above line is as follows:

• Path - Here in path, we need to give the path of the output file where it
need to be saved.
• Keyclass - Here you need to give the output key class.
• Valueclass - Here you need to give the output value class.
• outputFormatClass – Here you need to give the outputFormat class.
• conf - Here you need to give the Hadoop configuration –
org.apache.hadoop.mapred.jobConf.
• codec – Here you need to give the compression format.
• conf and codec are optional parameters.
SparkContext.saveAsNewAPIHadoopFile(path, keyClass, valueClass,
outputFormatClass, conf)
The explanation of the above line is as follows:

• Path - Here in path, we need to give the path of the output file where it
need to be saved.
• Keyclass - Here you need to give the output key class.
• Valueclass - Here you need to give the output value class.

BDA M Tech UNIT 5 1 Sem 28 MLWEC


• outputFormatClass - Here you need to give the outputFormat class.
• conf – Here you need to give the Hadoop configuration –
org.apache.hadoop.conf.configuration.

Here, in the place of keyclass, valueclass, outputFormatClass, we can define


and give our own customOutputFormat classes as well.
Now, let’s save the output of the above HadoopInputFormat using
HadoopOutputFormat. You can refer to the below screenshot for the same.

In the above screenshot, you can see that we have saved the output using old
API of Hadoop i.e., using mapred libraries.
Now, we will save the same using new APIs of Hadoop i.e., using
mapreducelibraries. You can refer to the below screenshot for the same.

This is how the output is saved in Spark, using the hadoopOutputFormat!


We hope this post has been helpful in understanding how to work on
Hadoop Input format and Output format in Spark. In case of any queries, feel
free to drop us a comment and we will get back to you at the earliest.

BDA M Tech UNIT 5 1 Sem 29 MLWEC

You might also like