Unit 2

Download as pdf or txt
Download as pdf or txt
You are on page 1of 30

LECTURE NOTES

ON
BIG DATA
VI Semester (KCS-061)

Mr. Manish Gupta, Assistant Professor


UNIT II

HADOOP

2.1 History of Hadoop

2
2002
It all started in the year 2002 with the Apache Nutch project.

In 2002, Doug Cutting and Mike Cafarella were working on Apache Nutch Project that aimed
at building a web search engine that would crawl and index websites.

After a lot of research, Mike Cafarella and Doug Cutting estimated that it would cost around
$500,000 in hardware with a monthly running cost of $30,000 for a system supporting a one-
billion-page index.

This project proved to be too expensive and thus found infeasible for indexing billions of
webpages. So they were looking for a feasible solution that would reduce the cost.

2003
Meanwhile, In 2003 Google released a search paper on Google distributed File System (GFS)
that described the architecture for GFS that provided an idea for storing large datasets in a
distributed environment. This paper solved the problem of storing huge files generated as a part
of the web crawl and indexing process. But this is half of a solution to their problem.

2004
In 2004, Nutch’s developers set about writing an open-source implementation, the Nutch
Distributed File System (NDFS).

In 2004, Google introduced MapReduce to the world by releasing a paper on MapReduce. This
paper provided the solution for processing those large datasets. It gave a full solution to the
Nutch developers.

Google provided the idea for distributed storage and MapReduce. Nutch developers
implemented MapReduce in the middle of 2004.

2006
The Apache community realized that the implementation of MapReduce and NDFS could be
used for other tasks as well. In February 2006, they came out of Nutch and formed an
independent subproject of Lucene called “Hadoop” (which is the name of Doug’s kid’s yellow
elephant).
As the Nutch project was limited to 20 to 40 nodes cluster, Doug Cutting in 2006 itself joined
Yahoo to scale the Hadoop project to thousands of nodes cluster.
2007
In 2007, Yahoo started using Hadoop on 1000 nodes cluster.

2008
In January 2008, Hadoop confirmed its success by becoming the top-level project at Apache.

By this time, many other companies like Last.fm, Facebook, and the New York Times started
using Hadoop.

Hadoop Defeated supercomputers

In April 2008, Hadoop defeated supercomputers and became the fastest system on the planet by
sorting an entire terabyte of data.

In November 2008, Google reported that its Mapreduce implementation sorted 1 terabyte in 68 3
seconds.
In April 2009, a team at Yahoo used Hadoop to sort 1 terabyte in 62 seconds, beaten Google
MapReduce implementation.

Various Release of Hadoop

2011 – 2012
On 27 December 2011, Apache released Hadoop version 1.0 that includes support for Security,
Hbase, etc.

On 10 March 2012, release 1.0.1 was available. This is a bug fix release for version 1.0.

On 23 May 2012, the Hadoop 2.0.0-alpha version was released. This release contains YARN.

The second (alpha) version in the Hadoop-2.x series with a more stable version of YARN was
released on 9 October 2012.

2017 – now
On 13 December 2017, release 3.0.0 was available

On 25 March 2018, Apache released Hadoop 3.0.1, which contains 49 bug fixes in Hadoop
3.0.0.

On 6 April 2018, Hadoop release 3.1.0 came that contains 768 bug fixes, improvements, and
enhancements since 3.0.0.

Later, in May 2018, Hadoop 3.0.3 was released.

On 8 August 2018, Apache 3.1.1 was released.

2.1 APACHE HADOOP

Apache Hadoop is one of the main supportive element in Big Data technologies. It simplifies the
processing of large amount of structured or unstructured data in a cheap manner. Hadoop is an open
source project from apache that is continuously improving over the years. "Hadoop is basically a set
of software libraries and frameworks to manage and process big amount of data from a single server to
thousands of machines.

Hadoop is an open source framework from Apache and is used to store process and analyze
data which are very huge in volume. Hadoop is written in Java and is not OLAP (online
analytical processing). It is used for batch/offline processing.It is being used by Facebook,
Yahoo, Google, Twitter, LinkedIn and many more. Moreover it can be scaled up just by adding
nodes in the cluster.

Modules of Hadoop
1. HDFS: Hadoop Distributed File System. Google published its paper GFS and on the
basis of that HDFS was developed. It states that the files will be broken into blocks and
stored in nodes over the distributed architecture.
2. Yarn: Yet another Resource Negotiator is used for job scheduling and manage the
cluster.
4
3. Map Reduce: This is a framework which helps Java programs to do the parallel
computation on data using key value pair. The Map task takes input data and converts it
into a data set which can be computed in Key value pair. The output of Map task is
consumed by reduce task and then the out of reducer gives the desired result.
4. Hadoop Common: These Java libraries are used to start Hadoop and are used by other
Hadoop modules.

Hadoop Architecture

The Hadoop architecture is a package of the file system, MapReduce engine and the HDFS
(Hadoop Distributed File System). The MapReduce engine can be MapReduce/MR1 or
YARN/MR2.

A Hadoop cluster consists of a single master and multiple slave nodes. The master node
includes Job Tracker, Task Tracker, NameNode, and DataNode whereas the slave node
includes DataNode and TaskTracker.

Hadoop Distributed File System

The Hadoop Distributed File System (HDFS) is a distributed file system for Hadoop. It contains
a master/slave architecture. This architecture consist of a single NameNode performs the role of
master, and multiple DataNodes performs the role of a slave.

Both NameNode and DataNode are capable enough to run on commodity machines. The Java
language is used to develop HDFS. So any machine that supports Java language can easily run
the NameNode and DataNode software.

NameNode
5
o It is a single master server exist in the HDFS cluster.
o As it is a single node, it may become the reason of single point failure.
o It manages the file system namespace by executing an operation like the opening,
renaming and closing the files.
o It simplifies the architecture of the system.

DataNode
o The HDFS cluster contains multiple DataNodes.
o Each DataNode contains multiple data blocks.
o These data blocks are used to store data.
o It is the responsibility of DataNode to read and write requests from the file system's
clients.
o It performs block creation, deletion, and replication upon instruction from the
NameNode.

Job Tracker
o The role of Job Tracker is to accept the MapReduce jobs from client and process the
data by using NameNode.
o In response, NameNode provides metadata to Job Tracker.

Task Tracker
o It works as a slave node for Job Tracker.
o It receives task and code from Job Tracker and applies that code on the file. This
process can also be called as a Mapper.

MapReduce Layer

The MapReduce comes into existence when the client application submits the MapReduce job
to Job Tracker. In response, the Job Tracker sends the request to the appropriate Task Trackers.
Sometimes, the TaskTracker fails or time out. In such a case, that part of the job is rescheduled.

Advantages of Hadoop
o Fast: In HDFS the data distributed over the cluster and are mapped which helps in
faster retrieval. Even the tools to process the data are often on the same servers, thus
reducing the processing time. It is able to process terabytes of data in minutes and Peta
bytes in hours.
o Scalable: Hadoop cluster can be extended by just adding nodes in the cluster.
o Cost Effective: Hadoop is open source and uses commodity hardware to store data so it
really cost effective as compared to traditional relational database management system. 6
o Resilient to failure: HDFS has the property with which it can replicate data over the
network, so if one node is down or some other network failure happens, then Hadoop
takes the other copy of data and use it. Normally, data are replicated thrice but the
replication factor is configurable.

7
1. HDFS(Hadoop distributed file system)

HDFS is a java based file system that is used to store structured or unstructured data over large clusters of
distributed servers. The data stored in HDFS has no restriction or rule to be applied, the data can be either
fully unstructured of purely structured. In HDFS the work to make data senseful is done by developer's code
only. Hadoop distributed file system provides a highly fault tolerant atmosphere with a deployment on low
cost hardware machines. HDFS is now a part of Apache Hadoop project, more information and installation
guide can be found at Apache HDFS documentation.

Hadoop comes with a distributed file system called HDFS. In HDFS data is distributed over several
machines and replicated to ensure their durability to failure and high availability to parallel
application.

It is cost effective as it uses commodity hardware. It involves the concept of blocks, data nodes and
node name.

Where to use HDFS


o Very Large Files: Files should be of hundreds of megabytes, gigabytes or more.
o Streaming Data Access: The time to read whole data set is more important than latency in
reading the first. HDFS is built on write-once and read-many-times pattern.
o Commodity Hardware:It works on low cost hardware.

Where not to use HDFS


o Low Latency data access: Applications that require very less time to access the first data
should not use HDFS as it is giving importance to whole data rather than time to fetch the first
record.
o Lots Of Small Files:The name node contains the metadata of files in memory and if the files
are small in size it takes a lot of memory for name node's memory which is not feasible.
o Multiple Writes:It should not be used when we have to write multiple times.

HDFS Concepts
1. Blocks: A Block is the minimum amount of data that it can read or write.HDFS blocks are 128
MB by default and this is configurable.Files n HDFS are broken into block-sized chunks,which
are stored as independent units.Unlike a file system, if the file is in HDFS is smaller than block
size, then it does not occupy full block?s size, i.e. 5 MB of file stored in HDFS of block size
128 MB takes 5MB of space only.The HDFS block size is large just to minimize the cost of
seek.
2. Name Node: HDFS works in master-worker pattern where the name node acts as master.Name
Node is controller and manager of HDFS as it knows the status and the metadata of all the files
8
in HDFS; the metadata information being file permission, names and location of each
block.The metadata are small, so it is stored in the memory of name node,allowing faster
access to data. Moreover the HDFS cluster is accessed by multiple clients concurrently,so all
this information is handled bya single machine. The file system operations like opening,
closing, renaming etc. are executed by it.
3. Data Node: They store and retrieve blocks when they are told to; by client or name node. They
report back to name node periodically, with list of blocks that they are storing. The data node
being a commodity hardware also does the work of block creation, deletion and replication as
stated by the name node.

HDFS DataNode and NameNode Image:

HDFS Read Image:

HDFS Write Image:

9
Since all the metadata is stored in name node, it is very important. If it fails the file system can not be
used as there would be no way of knowing how to reconstruct the files from blocks present in data
node. To overcome this, the concept of secondary name node arises.

Secondary Name Node: It is a separate physical machine which acts as a helper of name node. It
performs periodic check points.It communicates with the name node and take snapshot of meta data
which helps minimize downtime and loss of data.

Starting HDFS

The HDFS should be formatted initially and then started in the distributed mode. Commands are given
below.

To Format $ hadoop namenode -format

To Start $ start-dfs.sh

HDFS Basic File Operations


1. Putting data to HDFS from local file system
o First create a folder in HDFS where data can be put form local file system.

$ hadoop fs -mkdir /user/test

o Copy the file "data.txt" from a file kept in local folder /usr/home/Desktop to HDFS
folder /user/ test

$ hadoop fs -copyFromLocal /usr/home/Desktop/data.txt /user/test

o Display the content of HDFS folder


10
$ Hadoop fs -ls /user/test

2. Copying data from HDFS to local file system


o $ hadoop fs -copyToLocal /user/test/data.txt /usr/bin/data_copy.txt
3. Compare the files and see that both are same
o $ md5 /usr/bin/data_copy.txt /usr/home/Desktop/data.txt

Recursive deleting

o hadoop fs -rmr <arg>

Example:

o hadoop fs -rmr /user/sonoo/

HDFS Other commands

The below is used in the commands

"<path>" means any file or directory name.

"<path>..." means one or more file or directory names.

"<file>" means any filename.

"<src>" and "<dest>" are path names in a directed operation.

"<localSrc>" and "<localDest>" are paths as above, but on the local file system

o put <localSrc><dest>

Copies the file or directory from the local file system identified by localSrc to dest within the
DFS.

o copyFromLocal <localSrc><dest>

Identical to -put

o copyFromLocal <localSrc><dest>

Identical to -put

o moveFromLocal <localSrc><dest>

11
Copies the file or directory from the local file system identified by localSrc to dest within
HDFS, and then deletes the local copy on success.

o get [-crc] <src><localDest>

Copies the file or directory in HDFS identified by src to the local file system path identified by
localDest.

o cat <filen-ame>

Displays the contents of filename on stdout.

o moveToLocal <src><localDest>

Works like -get, but deletes the HDFS copy on success.

o setrep [-R] [-w] rep <path>

Sets the target replication factor for files identified by path to rep. (The actual replication factor
will move toward the target over time)

o touchz <path>

Creates a file at path containing the current time as a timestamp. Fails if a file already exists at
path, unless the file is already size 0.

o test -[ezd] <path>

Returns 1 if path exists; has zero length; or is a directory or 0 otherwise.

o stat [format] <path>

Prints information about path. Format is a string which accepts file size in blocks (%b),
filename (%n), block size (%o), replication (%r), and modification date (%y, %Y).

12
2.1 ANALYZING THE DATA WITH HADOOP

To take advantage of the parallel processing that Hadoop provides, we need to express our query
as a MapReduce job. After some local, small-scale testing, we will be able to run it on a cluster
of machines.

MAP AND REDUCE

MapReduce works by breaking the processing into two phases: the map phase and the reduce
phase. Each phase has key-value pairs as input and output, the types of which may be chosen
by the programmer. The programmer also specifies two functions: the map function and the
reduce function.
The input to our map phase is the raw NCDC data. We choose a text input format that gives us
each line in the dataset as a text value. The key is the offset of the beginning of the line from the
beginning of the file, but as we have no need for this, we ignore it.

Our map function is simple. We pull out the year and the air temperature, since these are the
only fields we are interested in. In this case, the map function is just a data preparation
phase, setting up the data in such a way that the reducer function can do its work on it: finding
the maximum temperature for each year. The map function is also a good place to drop bad
records: here we filter out temperatures that are missing, suspect, or erroneous.

To visualize the way the map works, consider the following sample lines of input data (some
unused columns have been dropped to fit the page, indicated by ellipses):

0067011990999991950051507004...9999999N9+00001+99999999999...

0043011990999991950051512004...9999999N9+00221+99999999999...

0043011990999991950051518004...9999999N9-00111+99999999999...

0043012650999991949032412004...0500001N9+01111+99999999999...

0043012650999991949032418004...0500001N9+00781+99999999999...

These lines are presented to the map function as the key-value pairs:

(0, 0067011990999991950051507004...9999999N9+00001+99999999999...)

(106, 0043011990999991950051512004...9999999N9+00221+99999999999...)

(212, 0043011990999991950051518004...9999999N9-00111+99999999999...)

(318, 0043012650999991949032412004...0500001N9+01111+99999999999...)

(424, 0043012650999991949032418004...0500001N9+00781+99999999999...)

13
The keys are the line offsets within the file, which we ignore in our map function. The map
function merely extracts the year and the air temperature (indicated in bold text), and emits them
as its output (the temperature values have been interpreted as integers):

(1950, 0)

(1950, 22)

(1950, −11)

(1949, 111)

(1949, 78)

The output from the map function is processed by the MapReduce framework before being sent
to the reduce function. This processing sorts and groups the key-value pairs by key. So,
continuing the example, our reduce function sees the following input:

(1949, [111, 78])

(1950, [0, 22, −11])

Each year appears with a list of all itsair temperature readings. All the reduce function has to do
now is iterate through the list and pick up the maximum reading:

(1949, 111)

(1950, 22)

This is the final output: the maximum global temperature recorded in each year.

The whole data flow is illustrated in 2.2. At the bottom of the diagram is a Unix pipeline, which
mimics the whole MapReduce flow, and which we will see again later in the chapter when we
look at Hadoop Streaming.

Figure 2-1. MapReduce logical data flow

JAVA MAPREDUCE
Having run through how the MapReduce program works, the next step is to express it in code.
We need three things: a map function, a reduce function, and some code to run the job. The map
function is represented by the Mapper class, which declares an abstract map() method.

14
The Mapper class is a generic type, with four formal type parameters that specify the input key,
input value, output key, and output value types of the map function. For the present example,
the input key is a long integer offset, the input value is a line of text, the output key is a year,
and the output value is an air temperature (an integer). Rather than use built-in Java types,
Hadoop provides its own set of basic types that are opti- mized for network serialization.
These are found in the org.apache.hadoop.io package. Here we use LongWritable, which
corresponds to a Java Long, Text (like Java String), and IntWritable (like Java Integer).

The map() method is passed a key and a value. We convert the Text value containing the line
of input into a Java String, then use its substring() method to extract the columns we ar e
interested in.

The map() method also provides an instance of Context to write the output to. In this case, we
write the year as a Text object (since we are just using it as a key), and the temperature is
wrapped in an IntWritable. We write an output record only if the tem- perature is present and
the quality code indicates the temperature reading is OK.

Again, four formal type parameters are used to specify the input and output types, this time for
the reduce function. The input types of the reduce function must match the output types of
the map function: Text and IntWritable. And in this case, the output types of the reduce
function are Text and IntWritable, for a year and its maximum temperature, which we find
by iterating through the temperatures and comparing each with a record of the highest found
so far.

A Job object forms the specification of the job. It gives you control over how the job is run.
When we run this job on a Hadoop cluster, we will package the code into a JAR file (which
Hadoop will distribute around the cluster). Rather than explicitly specify the name of the JAR
file, we can pass a class in the Job’s setJarByClass() method, which Hadoop will use to locate
the relevant JAR file by looking for the JAR file containing this class.

Having constructed a Job object, we specify the input and output paths. An input path is
specified by calling the static addInputPath() method on FileInputFormat, and it can be a single
file, a directory (in which case, the input forms all the files in that directory), or a file pattern. As
the name suggests, addInputPath() can be called more than once to use input from multiple
paths.

The output path (of which there is only one) is specified by the static setOutput Path()
method on FileOutputFormat. It specifies a directory where the output files from the reducer
functions are written. The directory shouldn’t exist before running the job, as Hadoop will
complain and not run the job. This precaution is to prevent data loss(it can be very annoying
to accidentally overwrite the output of a long job with another).

Next, we specify the map and reduce types to use via the setMapperClass() and

setReducerClass() methods.

15
The setOutputKeyClass() and setOutputValueClass() methods control the output types for
the map and the reduce functions, which are often the same, as they are in our case. If they are
different, then the map output types can be set using the methods setMapOutputKeyClass()
and setMapOutputValueClass().

The input types are controlled via the input format, which we have not explicitly set since we
are using the default TextInputFormat.

After setting the classes that define the map and reduce functions, we are ready to run the job.
The waitForCompletion() method on Job submits the job and waits for it to finish. The method’s
boolean argument is a verbose flag, so in this case the job writes information about its progress
to the console.

The return value of the waitForCompletion() method is a boolean indicating success (true)
or failure (false), which we translate into the program’s exit code of 0 or 1.

A TEST RUN

After writing a MapReduce job, it’s normal to try it out on a small dataset to flush out any
immediate problems with the code. First install Hadoop in standalone mode— there are
instructions for how to do this in Appendix A. This is the mode in which Hadoop runs using the
local filesystem with a local job runner. Then install and compile the examples using the
instructions on the book’s website.

When the hadoop command is invoked with a classname as the first argument, it launches a
JVM to run the class. It is more convenient to use hadoop than straight java since the former
adds the Hadoop libraries (and their dependencies) to the class- path and picks up the Hadoop
configuration, too. To add the application classes to the classpath, we’ve defined an environment
variable called HADOOP_CLASSPATH, which the hadoop script picks up.

The last section of the output, titled “Counters,” shows the statistics that Hadoop generates
for each job it runs. These are very useful for checking whether the amount of data processed
is what you expected. For example, we can follow the number of records that went through
the system: five map inputs produced five map outputs, then five reduce inputs in two groups
produced two reduce outputs.

The output was written to the output directory, which contains one output file per reducer. The
job had a single reducer, so we find a single file, named part-r-00000:

% cat output/part-r-00000

1949 111

1950 22

This result is the same as when we went through it by hand earlier. We interpret this as saying
that the maximum temperature recorded in 1949 was 11.1°C, and in 1950 it was 2.2°C.

16
THE OLD AND THE NEW JAVA MAPREDUCE APIS

The Java MapReduce API used in the previous section was first released in Hadoop

0.20.0. This new API, sometimes referred to as “Context Objects,” was designed to

make the API easier to evolve in the future. It is type-incompatible with the old, how- ever, so
applications need to be rewritten to take advantage of it.

The new API is not complete in the 1.x (formerly 0.20) release series, so the old API is
recommended for these releases, despite having been marked as deprecated in the early

0.20 releases. (Understandably, this recommendation caused a lot of confusion so the


deprecation warning was removed from later releases in that series.)

Previous editions of this book were based on 0.20 releases, and used the old API throughout
(although the new API was covered, the code invariably used the old API). In this edition the
new API is used as the primary API, except where mentioned. How- ever, should you wish to
use the old API, you can, since the code for all the examples in this book is available for the
old API on the book’s website. 1

There are several notable differences between the two APIs:

• The new API favors abstract classes over interfaces, since these are easier to evolve. For
example, you can add a method (with a default implementation) to an abstract class
without breaking old implementations of the class2. For example, the Mapper and
Reducer interfaces in the old API are abstract classes in the new API.
• The new API is in the org.apache.hadoop.mapreduce package (and subpackages). The old
API can still be found in org.apache.hadoop.mapred.
• The new API makes extensive use of context objects that allow the user code to
communicate with the MapReduce system. The new Context, for example, essen- tially
unifies the role of the JobConf, the OutputCollector, and the Reporter from the old API.
• In both APIs, key-value record pairs are pushed to the mapper and reducer, but in addition,
the new API allows both mappers and reducers to control the execution flow by overriding
the run() method. For example, records can be processed in batches, or the execution can
be terminated before all the records have been pro- cessed. In the old API this is possible
for mappers by writing a MapRunnable, but no equivalent exists for reducers.
• Configuration has been unified. The old API has a special JobConf object for job
configuration, which is an extension of Hadoop’s vanilla Configuration object (used for
configuring daemons. In the new API, this distinction is dropped, so job configuration is
done through a Configuration.
• Job control is performed through the Job class in the new API, rather than the old

17
JobClient, which no longer exists in the new API.

• Output files are named slightly differently: in the old API both map and reduce outputs
are named part-nnnnn, while in the new API map outputs are named part- m-nnnnn, and
reduce outputs are named part-r-nnnnn (where nnnnn is an integer designating the part
number, starting from zero).
• User-overridable methods in the new API are declared to throw java.lang.Inter
ruptedException. What this means is that you can write your code to be reponsive to
interupts so that the framework can gracefully cancel long-running operations if it needs
to3.
• In the new API the reduce() method passes values as a java.lang.Iterable, rather than
a java.lang.Iterator (as the old API does). This change makes it easier to iterate
over the values using Java’s for-each loop construct: for (VALUEIN
value : values) { ... }
Steps to execute MapReduce word count example
Create a text file in your local machine and write some text into it.
$ nano data.txt
MapReduce Word Count Example
Check the text written in the data.txt file.
$ cat data.txt
MapReduce Word Count Example
In this example, we find out the frequency of each word exists in this text file.

Create a directory in HDFS, where to kept text file.


$ hdfs dfs -mkdir /test
Upload the data.txt file on HDFS in the specific directory.
$ hdfs dfs -put /home/codegyani/data.txt /test
MapReduce Word Count Example
Write the MapReduce program using eclipse.
File: WC_Mapper.java
package com.javatpoint;

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
18
import org.apache.hadoop.mapred.Reporter;
public class WC_Mapper extends MapReduceBase implements
Mapper<LongWritable,Text,Text,IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value,OutputCollector<Text,IntWritable> output,
Reporter reporter) throws IOException{
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()){
word.set(tokenizer.nextToken());
output.collect(word, one);
}
}

}
File: WC_Reducer.java
package com.javatpoint;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;

public class WC_Reducer extends MapReduceBase implements


Reducer<Text,IntWritable,Text,IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values,OutputCollector<Text,IntWritable> output,
Reporter reporter) throws IOException {
int sum=0;
while (values.hasNext()) {
sum+=values.next().get();
}
output.collect(key,new IntWritable(sum));
}
}

19
File: WC_Runner.java
package com.javatpoint;

import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
public class WC_Runner {
public static void main(String[] args) throws IOException{
JobConf conf = new JobConf(WC_Runner.class);
conf.setJobName("WordCount");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(WC_Mapper.class);
conf.setCombinerClass(WC_Reducer.class);
conf.setReducerClass(WC_Reducer.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf,new Path(args[0]));
FileOutputFormat.setOutputPath(conf,new Path(args[1]));
JobClient.runJob(conf);
}
}
Download the source code.
Create the jar file of this program and name it countworddemo.jar.
Run the jar file
hadoop jar /home/codegyani/wordcountdemo.jar com.javatpoint.WC_Runner /test/data.txt /r_output
The output is stored in /r_output/part-00000

20
2.4 Unit tests with MR unit, test data and local tests

MRUnit is a JUnit-based Java library that allows us to unit test Hadoop MapReduce
programs. This makes it easy to develop as well as to maintain Hadoop MapReduce code
bases. MRUnit supports testing Mappers and Reducers separately as well as testing
MapReduce computations as a whole. In this recipe, we'll be exploring all three testing
scenarios.
This post will take a slight detour from implementing the patterns found in Data-Intensive
Processing with MapReduce to discuss something equally important, testing. I was
inspired in part from a presentation by Tom Wheeler that I attended while at the 2012
Strata/Hadoop World conference in New York. When working with large data sets, unit
testing might not be the first thing that comes to mind. However, when you consider the
fact that no matter how large your cluster is, or how much data you have, the same code is
pushed out to all nodes for running the MapReduce job, Hadoop mappers and reducers
lend themselves very well to being unit tested. But what is not easy about unit testing
Hadoop, is the framework itself. Luckily there is a library that makes testing Hadoop
fairly easy – MRUnit. MRUnit is based on JUnit and allows for the unit testing of
mappers, reducers and some limited integration testing of the mapper – reducer interaction
along with combiners, custom counters and partitioners. We are using the latest release of
MRUnit as of this writing, 0.9.0. All of the code under test comes from the previous post
on computing averages using local aggregation.
Setup

To get started, download MRUnit from here. After you have extracted the tar file, cd into the
mrunit-0.9.0-incubating/lib directory. In there you should see the following:

mrunit-0.9.0-incubating-hadoop1.jar
mrunit-0.9.0-incubating-hadoop2.jar
As I’m sure can guess, the mrunit-0.9.0-incubating-hadoop1.jar is for MapReduce version 1
of Hadoop and mrunit-0.9.0-incubating-hadoop2.jar is for working the new version of
Hadoop’s MapReduce. For this post, and all others going forward, we will be using
hadoop-2.0 version from Cloudera’s CDH4.1.1 release so we will need the mrunit-0.9.0-
incubating-hadoop2.jar file. I added MRUnit, JUnit and Mockito as libraries in Intellij
(JUnit and Mockito are found in the same directory as the MRUnit jar files). Now that we
have set up our dependencies, let’s start testing.

Testing Mappers

Setting up to test a mapper is very straight forward and is best explained by looking at some
code first. We will use the in-mapper combining example from the previous post:

@Test
public void testCombiningMapper() throws Exception {
new MapDriver<LongWritable,Text,Text,TemperatureAveragingPair>()
.withMapper(new AverageTemperatureCombiningMapper())
.withInput(new LongWritable(4),new Text(temps[3]))
.withOutput(new Text('190101'),new TemperatureAveragingPair(-61,1))
.runTest();
21
}
Notice the fluent api style which adds the ease of creating the test. To write your test you
would:

Instantiate an instance of the MapDriver class parameterized exactly as the mapper under
test.
Add an instance of the Mapper you are testing in the withMapper call.
In the withInput call pass in your key and input value, in this case a LongWritable with an
arbitrary value and a Text object that contains a line from from the NCDC weather dataset
contained in a String array called ‘temps’ that was set up earlier in the test (not displayed
here as it would take away from the presentation).
Specify the expected output in the withOutput call, here we are expecting a Text object with
the value of “190101″ and a TemperatureAveragingPair object containing the values -61
(temperature) and a 1 (count).
The last call runTest feeds the specified input values into the mapper and compares the actual
output against the expected output set in the ‘withOutput’ method.
One thing to note is the MapDriver only allows one input and output per test. You can call
withInput and withOutput multiple times if you want, but the MapDriver will overwrite
the existing values with the new ones, so you will only ever be testing with one
input/output at any time. To specify multiple inputs we would use the MapReduceDriver,
covered a couple of sections later, but next up is testing the reducer.

Testing Reducers

Testing the reducer follows the same pattern as the mapper test. Again, let’s start by looking
at a code example:

@Test
public void testReducerCold(){
List<TemperatureAveragingPair> pairList = new ArrayList<TemperatureAveragingPair>();
pairList.add(new TemperatureAveragingPair(-78,1));
pairList.add(new TemperatureAveragingPair(-84,1));
pairList.add(new TemperatureAveragingPair(-28,1));
pairList.add(new TemperatureAveragingPair(-56,1));

new ReduceDriver<Text,TemperatureAveragingPair,Text,IntWritable>()
.withReducer(new AverageTemperatureReducer())
.withInput(new Text('190101'), pairList)
.withOutput(new Text('190101'),new IntWritable(-61))
.runTest();
}
The test starts by creating a list of TemperatureAveragingPair objects to be used as the input
to the reducer.
A ReducerDriver is instantiated, and like the MapperDriver, is parameterized exactly as the
reducer under test.
Next we pass in an instance of the reducer we want to test in the withReducer call.
In the withInput call we pass in the key of “190101″ and the pairList object created at the
start of the test.
Next we specify the output that we expect our reducer to emit, the same key of “190101″ and
22
an IntWritable representing the average of the temperatures in the list.
Finally runTest is called, which feeds our reducer the inputs specified and compares the
output from the reducer against the expect output.
The ReducerDriver has the same limitation as the MapperDriver of not accepting more than
one input/output pair. So far we have tested the Mapper and Reducer in isolation, but we
would also like to test them together in an integration test. Integration testing can be
accomplished by using the MapReduceDriver class. The MapReduceDriver is also the
class to use for testing the use of combiners, custom counters or custom partitioners.

Integration Testing

To test your mapper and reducer working together, MRUnit provides the MapReduceDriver
class. The MapReduceDriver class as you would expect by now, with 2 main differences.
First, you parameterize the input and output types of the mapper and the input and output
types of the reducer. Since the mapper output types need to match the reducer input types,
you end up with 3 pairs of parameterized types. Secondly you can provide multiple inputs
and specify multiple expected outputs. Here is our sample code:
@Test
public void testMapReduce(){

new MapReduceDriver<LongWritable,Text,
Text,TemperatureAveragingPair,
Text,IntWritable>()
.withMapper(new AverageTemperatureMapper())
.withInput(new LongWritable(1),new Text(temps[0]))
.withInput(new LongWritable(2),new Text(temps[1]))
.withInput(new LongWritable(3),new Text(temps[2]))
.withInput(new LongWritable(4),new Text(temps[3]))
.withInput(new LongWritable(5),new Text(temps[6]))
.withInput(new LongWritable(6),new Text(temps[7]))
.withInput(new LongWritable(7),new Text(temps[8]))
.withInput(new LongWritable(8),new Text(temps[9]))
.withCombiner(new AverageTemperatureCombiner())
.withReducer(new AverageTemperatureReducer())
.withOutput(new Text('190101'),new IntWritable(-22))
.withOutput(new Text('190102'),new IntWritable(-40))
.runTest();
}
As you can see from the example above, the setup is the same as the MapDriver and the
ReduceDriver classes. You pass in instances of the mapper, reducer and optionally a
combiner to test. The MapReduceDriver allows us to pass in multiple inputs that have
different keys. Here the ‘temps’ array is the same one referenced in the mapper sample
and contains a few lines from the NCDC weather dataset and the keys in those sample
lines are the months of January and February of the year 1901 represented as “190101″
and “190102″ respectively. This test is successful, so we gain a little more confidence
around the correctness of our mapper and reducer working together.

23
Anatomy of a Map Reduce job run, failures, job scheduling, shuffle and sort, task execution

Timeline of a MapReduce Job

This is the timeline of a MapReduce Job execution:

 Map Phase: several Map Tasks are executed


 Reduce Phase: several Reduce Tasks are executed

Notice that the Reduce Phase may start before the end of Map Phase. Hence, an interleaving
between them is possible.

Map Phase
We now focus our discussion on the Map Phase. A key decision is how many MapTasks the
Application Master needs to start for the current job.

What does the user give us?


Let’s take a step back. When a client submits an application, several kinds of information are
provided to the YARN infrastucture. In particular:

 a configuration: this may be partial (some parameters are not specified by the user) and in
this case the default values are used for the job. Notice that these default values may be
the ones chosen by a Hadoop provider like Amazon.
 a JAR containing:
o a map() implementation
o a combiner implementation
o a reduce() implementation
 input and output information:
o input directory: is the input directory on HDFS? On S3? How many files?
o output directory: where will we store the output? On HDFS? On S3?

24
The number of files inside the input directory is used for deciding the number of Map Tasks of a
job.

How many Map Tasks?


The Application Master will launch one MapTask for each map split. Typically, there is a map
split for each input file. If the input file is too big (bigger than the HDFS block size) then we have
two or more map splits associated to the same input file. This is the pseudocode used inside the
method getSplits() of the FileInputFormat class:

num_splits = 0

for each input file f:

remaining = f.length

while remaining / split_size > split_slope:

num_splits += 1

remaining -= split_size

where:

split_slope = 1.1

split_size =~ dfs.blocksize

Notice that the configuration parameter mapreduce.job.maps is ignored in MRv2 (in the past it
was just an hint).

MapTask Launch
The MapReduce Application Master asks to the Resource Manager for Containers needed by
the Job: one MapTask container request for each MapTask (map split).

A container request for a MapTask tries to exploit data locality of the map split. The Application
Master asks for:

 a container located on the same Node Manager where the map split is stored (a map split
may be stored on multiple nodes due to the HDFS replication factor);
 otherwise, a container located on a Node Manager in the same rack where the the map
split is stored;
 otherwise, a container on any other Node Manager of the cluster

This is just an hint to the Resource Scheduler. The Resource Scheduler is free to ignore data
locality if the suggested assignment is in conflict with the Resouce Scheduler’s goal.

When a Container is assigned to the Application Master, the MapTask is launched.

25
Map Phase: example of an execution scenario

This is a possible execution scenario of the Map Phase:

 there are two Node Managers: each Node Manager has 2GB of RAM (NM capacity) and
each MapTask requires 1GB, we can run in parallel 2 containers on each Node Manager
(this is the best scenario, the Resource Scheduler may decide differently)
 there are no other YARN applications running in the cluster
 our job has 8 map splits (e.g., there are 7 files inside the input directory, but only one of
them is bigger than the HDFS block size so we split it into 2 map splits): we need to run 8
Map Tasks.

Map Task Execution Timeline

Let’s now focus on a


single Map Task. This is the Map Task execution timeline:

 INIT phase: we setup the Map Task


 EXECUTION phase: for each (key, value) tuple inside the map split we run
the map() function
 SPILLING phase: the map output is stored in an in-memory buffer; when this buffer
is almost full then we start (in parallel) the spilling phase in order to remove data from it
 SHUFFLE phase: at the end of the spilling phase, we merge all the map outputs and
package them for the reduce phase

26
MapTask: INIT
During the INIT phase, we:

1. create a context (TaskAttemptContext.class)


2. create an instance of the user Mapper.class
3. setup the input (e.g., InputFormat.class, InputSplit.class, RecordReader.class)
4. setup the output (NewOutputCollector.class)
5. create a mapper context (MapContext.class, Mapper.Context.class)
6. initialize the input, e.g.:
7. create a SplitLineReader.class object
8. create a HdfsDataInputStream.class object

MapTask: EXECUTION

The EXECUTION phase is performed by the run method of the Mapper class. The user can
override it, but by default it will start by calling the setup method: this function by default does not
do anything useful but can be override by the user in order to setup the Task (e.g., initialize class
variables). After the setup, for each <key, value> tuple contained in the map split, the map() is
invoked. Therefore, map() receives: a key a value, and a mapper context. Using the context,
a map stores its output to a buffer.

27
Notice that the map split is fetched chuck by chunk (e.g., 64KB) and each chunk is split in
several (key, value) tuples (e.g., using SplitLineReader.class). This is done inside
the Mapper.Context.nextKeyValue method.

When the map split has been completely processed, the run function calls the clean method: by
default, no action is performed but the user may decide to override it.

MapTask: SPILLING

As seen in the EXECUTING phase, the map will write (using Mapper.Context.write()) its output
into a circular in-memory buffer (MapTask.MapOutputBuffer). The size of this buffer is fixed and
determined by the configuration parameter mapreduce.task.io.sort.mb (default: 100MB).

Whenever this circular buffer is almost full (mapreduce.map. sort.spill.percent: 80% by


default), the SPILLING phase is performed (in parallel using a separate thread). Notice that if the
splilling thread is too slow and the buffer is 100% full, then the map() cannot be executed and
thus it has to wait.

The SPILLING thread performs the following actions:

1. it creates a SpillRecord and FSOutputStream (local filesystem)


2. in-memory sorts the used chunk of the buffer: the output tuples are sorted by (partitionIdx,
key) using a quicksort algorithm.
3. the sorted output is split into partitions: one partition for each ReduceTask of the job (see
later).
4. Partitions are sequentially written into the local file.

How Many Reduce Tasks?


The number of ReduceTasks for the job is decided by the configuration
parameter mapreduce.job.reduces.

What is the partitionIdx associated to an output tuple?


The paritionIdx of an output tuple is the index of a partition. It is decided inside
the Mapper.Context.write():

partitionIdx = (key.hashCode() & Integer.MAX_VALUE) % numReducers

28
It is stored as metadata in the circular buffer alongside the output tuple. The user can customize
the partitioner by setting the configuration parameter mapreduce.job.partitioner.class.

When do we apply the combiner?


If the user specifies a combiner then the SPILLING thread, before writing the tuples to the file (4),
executes the combiner on the tuples contained in each partition. Basically, we:

1. create an instance of the user Reducer.class (the one specified for the combiner!)
2. create a Reducer.Context: the output will be stored on the local filesystem
3. execute Reduce.run(): see Reduce Task description

The combiner typically use the same implementation of the standard reduce() function and thus
can be seen as a local reducer.

MapTask: end of EXECUTION


At the end of the EXECUTION phase, the SPILLING thread is triggered for the last time. In more
detail, we:

1. sort and spill the remaining unspilled tuples


2. start the SHUFFLE phase

Notice that for each time the buffer was almost full, we get one spill file (SpillRecord + output
file). Each Spill file contains several partitions (segments).

Map Reduce types, input formats, output formats

 Writable data types are meant for writing the data to the local disk and it is a serialization format. Just
like in Java there are data types to store variables (int, float, long, double,etc.), Hadoop has its own
equivalent data types called Writable data types. These Writable data types are passed as parameters
(input and output key-value pairs) for the mapper and reducer.
The Writable data types discussed below
implements WritableComparableinterface. Comparable interface is used for comparing when the
reducer sorts the keys, and Writable can write the result to the local disk. It does not use the java
Serializable because java Serializable is too big or too heavy for hadoop, Writable can serializable the
hadoop Object in a very light way. WritableComparable is a combination
of Writable and Comparableinterfaces.
Below is the list of few data types in Java along with the equivalent Hadoop variant:
1. Integer –> IntWritable: It is the Hadoop variant of Integer. It is used to pass integer numbers as
key or value.
2. Float –> FloatWritable: Hadoop variant of Float used to pass floating point numbers as key or
value.
3. Long –> LongWritable: Hadoop variant of Long data type to store long values.
4. Short –> ShortWritable: Hadoop variant of Short data type to store short values.
5. Double –> DoubleWritable: Hadoop variant of Double to store double values.
6. String –> Text: Hadoop variant of String to pass string characters as key or value.
7. Byte –> ByteWritable: Hadoop variant of byte to store sequence of bytes.
29
8. null –> NullWritable: Hadoop variant of null to pass null as a key or value.
Usually NullWritable is used as data type for output key of the reducer, when the output key is
not important in the final result.
Example of a Mapper class implementing few above data types:
public static class wordMapper extends Mapper<LongWritable, Text, Text, IntWritable>
Here the first two data types are input key and value to the map function, will be of long values
and string characters respectively. The second two data types are intermediate output key and value
from the map function will be string characters and int numbers respectively.
Example of a Reducer class:
public static class wordReducer extends Reducer<Text, IntWritable, NullWritable, Text>
Here the first two data types are input key and value to the reduce function, must match the
intermediate key and value from the mapper. The second two data types are output key and value from
the reduce function, which will be the final result of the MapReduce program.
Apart from this, we can also write a custom Writable by overriding the writeand readFields methods.

Writable is an interface in mean in Hadoop and types in Hadoop must implement this interface.
Hadoop provides these writable wrappers for almost all Java primitive types and some other types,but
sometimes we need to pass custom objects and these custom objects should implement Hadoop’s
Writable interface. MapReduce uses implementations of Writables for interacting with user-provided
Mappers and Reducers.
To implement the Writable interface we require two methods:

public interface Writable {


void readFields(DataInput in);
void write(DataOutput out);
}
We use Hadoop Writable(s) because the data needs to be transmitted between different nodes in a
distributed computing environment. This requires serialization and deserialization of data to convert
the data that is in structured format to byte stream and vice-versa. Hadoop therefore uses simple and
efficient serialization protocol to serialize data between map and reduce phase and these are called
Writable(s).
Some of the examples of writables as already mentioned before are IntWritable, LongWritable,
BooleanWritable and FloatWritable.

30

You might also like