Unit 2-HDFS SGS
Unit 2-HDFS SGS
Unit 2-HDFS SGS
Design of HDFS
Hadoop is:
HDFS
. Hadoop File System was developed using distributed file system design. It is run on
commodity hardware. Unlike other distributed systems, HDFS is highly fault tolerant and
designed using low-cost hardware.
HDFS holds very large amount of data and provides easier access. To store such huge
data, the files are stored across multiple machines. These files are stored in redundant
fashion to rescue the system from possible data losses in case of failure. HDFS also
makes applications available to parallel processing.
Features of HDFS
It is suitable for the distributed storage and processing.
Hadoop provides a command interface to interact with HDFS.
The built-in servers of name node and data node help users to easily check the
status of cluster.
Streaming access to file system data.
HDFS provides file permissions and authentication
HDFS Architecture
Given below is the architecture of a Hadoop File System.
HDFS follows the master-slave architecture and it has the following elements.
Name node
The namenode is the commodity hardware that contains the GNU/Linux operating
system and the namenode software. It is a software that can be run on commodity
hardware. The system having the namenode acts as the master server and it does the
following tasks −
Data node
The datanode is a commodity hardware having the GNU/Linux operating system and
datanode software. For every node (Commodity hardware/System) in a cluster, there will
be a datanode. These nodes manage the data storage of their system.
Datanodes perform read-write operations on the file systems, as per client request.
They also perform operations such as block creation, deletion, and replication according
to the instructions of the namenode.
Block
Generally the user data is stored in the files of HDFS. The file in a file system will be
divided into one or more segments and/or stored in individual data nodes. These file
segments are called as blocks. In other words, the minimum amount of data that HDFS
can read or write is called a Block. The default block size is 64MB, but it can be
increased as per the need to change in HDFS configuration.
Goals of HDFS
Fault detection and recovery − Since HDFS includes a large number of commodity
hardware, failure of components is frequent. Therefore HDFS should have mechanisms
for quick and automatic fault detection and recovery.
Huge datasets − HDFS should have hundreds of nodes per cluster to manage the
applications having huge datasets.
Hardware at data − A requested task can be done efficiently, when the computation
takes place near the data. Especially where huge datasets are involved, it reduces the
network traffic and increases the throughput.
The File System (FS) shell includes various shell-like commands that directly interact
with the Hadoop Distributed File System (HDFS) as well as other file systems that
Hadoop supports, such as Local FS, HFTP FS, S3 FS, and others.Below are the
commands supported
For complete documentation please refer the link FileSystemShell.html
appendToFile
cat
checksum
chgrp
Change group association of files. The user must be the owner of files, or else a super-
user. Additional information is in the Permissions Guide HdfsPermissionsGuide.html
Options
The -R option will make the change recursively through the directory structure.
chmod
Change the permissions of files. With -R, make the change recursively through the
directory structure. The user must be the owner of the file, or else a super-user.
Additional information is in the Permissions guide HdfsPermissionsGuide.html.
Options
The -R option will make the change recursively through the directory structure.
chown
Change the owner of files. The user must be a super-user. Additional information is in the
Permissions Guide HdfsPermissionsGuide.html
Options
The -R option will make the change recursively through the directory structure.
copyFromLocal
This command copies all the files inside test folder in the edge node to test folder in the
hdfs Similar to put command, except that the source is restricted to a local file reference.
This command copies all the files inside test folder in the hdfs to test folder in the edge
node.Similar to get command, except that the destination is restricted to a local file
reference.
count
Count the number of directories, files and bytes under the paths that match the specified
file pattern. The output columns with -count are: DIR_COUNT, FILE_COUNT,
CONTENT_SIZE, PATHNAME
Example:
cp
Copy files from source to destination. This command allows multiple sources as well in
which case the destination must be a directory.
Options:
createSnapshot
HDFS Snapshots are read-only point-in-time copies of the file system. Snapshots can be
taken on a subtree of the file system or the entire file system. Some common use cases of
snapshots are data backup, protection against user errors and disaster recovery. For more
information refer the link HdfsSnapshots.html
deleteSnapshot
df
Options:
Displays sizes of files and directories contained in the given directory or the length of a
file in case its just a file.
Options:
The -s option will result in an aggregate summary of file lengths being displayed, rather
than the individual files.
The -h option will format file sizes in a “human-readable” fashion (e.g 64.0m instead of
67108864)
Example:
expunge
hadoop fs -expunge
find
Finds all files that match the specified expression and applies selected actions to them. If
no path is specified then defaults to the current working directory. If no expression is
specified then defaults to -print.
hadoop fs -find / -name test -print
get
Copy files to the local file system. Files that fail the CRC check may be copied with the -
ignorecrc option. Files and CRCs may be copied using the -crc option.
Example:
getfacl
Displays the Access Control Lists (ACLs) of files and directories. If a directory has a
default ACL, then getfacl also displays the default ACL.
Options:
Displays the extended attribute names and values (if any) for a file or directory.
Options:
-R: Recursively list the attributes for all files and directories.
-n name: Dump the named extended attribute value.
-d: Dump all extended attribute values associated with pathname.
-e encoding: Encode values after retrieving them. Valid encodings are “text”, “hex”, and
“base64”. Values encoded as text strings are enclosed in double quotes (“), and values
encoded as hexadecimal and base64 are prefixed with 0x and 0s, respectively.
path: The file or directory.
Examples:
getmerge
Takes a source directory and a destination file as input and concatenates files in src into
the destination local file. Optionally -nl can be set to enable adding a newline character
(LF) at the end of each file.
Examples:
hadoop fs -help
ls
list files
lsr
mkdir
Options:
The -p option behavior is much like Unix mkdir -p, creating parent directories along the
path.
moveFromLocal
Similar to put command, except that the source localsrc is deleted after it’s copied.
moveToLocal
mv
Moves files from source to destination. This command allows multiple sources as well in
which case the destination needs to be a directory. Moving files across file systems is not
permitted.
put
Copy single src, or multiple srcs from local file system to the destination file system.
Also reads input from stdin and writes to destination file system.
renameSnapshot
rm
Options:
The -f option will not display a diagnostic message or modify the exit status to reflect an
error if the file does not exist.
The -R option deletes the directory and any content under it recursively.
The -r option is equivalent to -R.
The -skipTrash option will bypass trash, if enabled, and delete the specified file(s)
immediately. This can be useful when it is necessary to delete files from an over-quota
directory.
Example:
rmdir
Delete a directory.
rmr
setfacl
hadoop fs -setfacl [-R] [-b |-k -m |-x <acl_spec> <path>] |[–set <acl_spec> <path>]
setfattr
setrep
Changes the replication factor of a file. If path is a directory then the command
recursively changes the replication factor of all files under the directory tree rooted at
path.
Stat
tail
test
text
Takes a source file and outputs the file in text format. The allowed formats are zip and
TextRecordInputStream.
touchz
truncate
Truncate all files that match the specified file pattern to the specified length.
usage
cloud store
or KFS(KosmosFileSystem) is
KFS a file system that is written in
(Cloud- kfs fs.kfs.KosmosFileSystem c++. It is very much similar to
Store) a distributed file system like
HDFS and GFS(Google File
System).
which is supported by
Amazon s3 stores files in
based) blocks(similar to HDFS) just
to overcome S3’s file system
5 GB file size limit.
1. Capture Big Data : The sources can be extensive lists that are structured, semi-
structured, and unstructured, some streaming, real-time data sources, sensors,
devices, machine-captured data, and many other sources. For data capturing
and storage, we have different data integrators such as, Flume, Sqoop, Storm,
and so on in the Hadoop ecosystem, depending on the type of data.
3. Distribute Results: The processed data can be used by the BI and analytics
system or the big data analytics system for performing analysis or visualization.
4. Feedback and Retain: The data analyzed can be fed back to Hadoop and used
for improvements...
Input reader
The input reader reads the upcoming data and splits it into the data blocks of
the appropriate size (64 MB to 128 MB). Each data block is associated with a
Map function.
Once input reads the data, it generates the corresponding key-value pairs.
The input files reside in HDFS.
Map function
The map function process the upcoming key-value pairs and generated the
corresponding output key-value pairs. The map input and output type may
be different from each other.
Partition function
The partition function assigns the output of each Map function to the
appropriate reducer. The available key and value provide this function. It
returns the index of reducers.
The sorting operation is performed on input data for Reduce function. Here,
the data is compared using comparison function and arranged in a sorted
form.
Reduce function
The Reduce function is assigned to each unique key. These keys are already
arranged in sorted order. The values associated with the keys can iterate the
Reduce and generates the corresponding output.
Output writer
Once the data flow from all the above phases, Output writer executes. The
role of Output writer is to write the Reduce output to the stable storage.
SCOOP(SQL TO HADOOP)
Sqoop parallelizes data transfer for optimal system utilization and fast
performance.
Apache Sqoop provides direct input i.e. it can map relational databases and
import directly into HBase and Hive.
Coupons.com uses Sqoop tool for data transfer between its IBM Netezza data
warehouse and the hadoop environment.
Flume
This frustrates developers as the logs are often not present at the location where they can view
them easily, they have limited number of tools available for processing logs and have confined
capabilities in intelligently managing the lifecycle. Apache Flume is designed to address the
difficulties of both operations group and developers by providing them an easy to use tool that
can push logs from bunch of applications servers to various repositories via a highly configurable
agent.
Source defines where the data is coming from, for instance a message queue or a file.
Sinks defined the destination of the data pipelined from various sources.
Channels are pipes which establish connect between sources and sinks.
The master acts like a reliable configuration service which is used by nodes for retrieving their
configuration.
If the configuration for a particular node changes on the master then it will dynamically be
updated by the master.
Node is generally an event pipe in Hadoop Flume which reads from the source and writes to the
Sink. The characteristics and role of a flume node is determine by the behaviour of source and
sinks. Apache Flume is built with several source and sink options but if none of them fits in your
requirements then developers can write their own. A flume node can also be configured with the
help of a sink decorator which can interpret the event and transforms it as it passes through. With
all these basic primitives, developers can create different topologies to collect data on any
application server and direct it to any log repository.
Mozilla uses flume Hadoop for the BuildBot project along with Elastic Search.
Capillary technologies uses Flume for aggregating logs from 25 machines in production.
Apache Sqoop and Apache Flume work with various kinds of data sources. Flume functions well
in streaming data sources which are generated continuously in hadoop environment such as log
files from multiple servers whereas Apache Sqoop is designed to work well with any kind of
relational database system that has JDBC connectivity. Sqoop can also import data
from NoSQL databases like MongoDB or Cassandra and also allows direct data transfer or Hive
or HDFS. For transferring data to Hive using Apache Sqoop tool, a table has to be created for
which the schema is taken from the database itself.
In Apache Flume data loading is event driven whereas in Apache Sqoop data load is not driven
by events.
Flume is a better choice when moving bulk streaming data from various sources like JMS or
Spooling directory whereas Sqoop is an ideal fit if the data is sitting in databases like Teradata,
Oracle, MySQL Server, Postgres or any other JDBC compatible database then it is best to use
Apache Sqoop.
In Apache Flume, data flows to HDFS through multiple channels whereas in Apache Sqoop
HDFS is the destination for importing data.
Apache Flume has agent based architecture i.e. the code written in flume is known as agent
which is responsible for fetching data whereas in Apache Sqoop the architecture is based on
connectors. The connectors in Sqoop know how to connect with the various data sources and
fetch data accordingly.
Lastly, Sqoop and Flume cannot be used achieve the same tasks as they are developed
specifically to serve different purposes. Apache Flume agents are designed to fetch streaming
data like tweets from Twitter or log file from the web server whereas Sqoop connectors are
designed to work only with structured data sources and fetch data from them.
Apache Sqoop is mainly used for parallel data transfers, for data imports as it copies data quickly
where Apache Flume is used for collecting and aggregating data because of its distributed,
reliable nature and highly available backup routes.
Hadoop I/O
Hadoop comes with a set of primitives for data I/O. Some of these are techniques that are more
general than Hadoop, such as data integrity and compression, but deserve special consideration
when dealing with multi-terabyte datasets. Others are Hadoop tools or APIs that form the
building blocks for developing distributed systems, such as serialization frameworks and on-disk
data structures.
Unlike any I/O subsystem, Hadoop also comes with a set of primitives.
These primitive considerations, although generic in nature, go with the
Hadoop IO system as well with some special connotation to it, of
course. Hadoop deals with multi-terabytes of datasets; a special
consideration on these primitives will give an idea how Hadoop handles
data input and output. This article quickly skims over these primitives
to give a perspective on the Hadoop input output system.
Data Integrity
Data integrity means that data should remain accurate and consistent
all across its storing, processing, and retrieval operations. To ensure
that no data is lost or corrupted during persistence and processing,
Hadoop maintains stringent data integrity constraints. Every read/write
operation occurs in disks, more so through the network is prone to
errors. And, the volume of data that Hadoop handles only aggravates
the situation. The usual way to detect corrupt data is through
checksums. A checksum is computed when data first enters into the
system and is sent across the channel during the retrieval process. The
retrieving end computes the checksum again and matches with the
received ones. If it matches exactly then the data deemed to be error
free else it contains error. But the problem is – what if the checksum
sent itself is corrupt? This is highly unlikely because it is a small data,
but not an undeniable possibility. Using the right kind of hardware such
as ECC memory can be used to alleviate the situation.
Hadoop takes it further and creates a distinct checksum for every 512
(default) bytes of data. Because CRC-32 is 4 bytes only, the storage
overhead is not an issue. All data that enters into the system is verified
by the datanodes before being forwarded for storage or further
processing. Data sent to the datanode pipeline is verified through
checksums and any corruption found is immediately notified to the
client with ChecksumException. The client read from the datanode also
goes through the same drill. The datanodes maintain a log of
checksum verification to keep track of the verified block. The log is
updated by the datanode upon receiving a block verification success
signal from the client. This type of statistics helps in keeping the bad
disks at bay.
Apart from this, a periodic verification on the block store is made with
the help of DataBlockScanner running along with the datanode thread
in the background. This protects data from corruption in the physical
storage media.
For every file created in the Hadoop LocalFileSystem, a hidden file with
the same name in the same directory with the
extension .<filename>.crc is created. This file maintains the checksum
of each chunk of data (512 bytes) in the file. The maintenance of
metadata helps in detecting read error before
throwing ChecksumException by the LocalFileSystem.
Compression
Keeping in mind the volume of data Hadoop deals with, compression is
not a luxury but a requirement. There are many obvious benefits of file
compression rightly used by Hadoop. It economizes storage
requirements and is a must-have capability to speed up data
transmission over the network and disks. There are many tools,
techniques, and algorithms commonly used by Hadoop. Many of them
are quite popular and have been used in file compression over the
ages. For example, gzip, bzip2, LZO, zip, and so forth are often used.
Serialization
The process that turns structured objects to stream of bytes is
called serialization. This is specifically required for data transmission
over the network or persisting raw data in disks. Deserialization is just
the reverse process, where a stream of bytes is transformed into a
structured object. This is particularly required for object
implementation of the raw bytes. Therefore, it is not surprising that
distributed computing uses this in a couple of distinct areas: inter-
process communication and data persistence.
Hadoop has its own compact and fast serialization format, Writables,
that MapReduce programs use to generate keys and value types.
Conclusion
This is just a quick overview of the input/output system of Hadoop. We
will delve into many intricate details in subsequent articles. It is not
very difficult to understand the Hadoop input/output system if one has
a basic understanding of I/O systems in general. Hadoop simply put
some extra juice to it to keep up with its distributed nature that works
in massive scale of data. That’s all.