Hive and Pig Hol 1937050
Hive and Pig Hol 1937050
Hive and Pig Hol 1937050
WORKSHOP
Data Manipulation with Hive
and Pig
Contents
Acknowledgements ......................................................................................................... 1
Introduction to Hive and Pig ................................................................................................ 2
Setup .................................................................................................................................. 2
Exercise 1 Load Avro data into HDFS ....................................................................... 2
Exercise 2 Define an external Hive table and review the results ........................... 3
Exercise 3 Extract facts using Hive ............................................................................. 5
Optional: Extract sessions using Pig ............................................................................. 9
Solutions................................................................................................................................. 11
Exercise 1 Load Avro data into HDFS ..................................................................... 11
Exercise 2 Define an external Hive table and review the results ......................... 12
Exercise 3 Extract facts using Hive ........................................................................... 15
Acknowledgements
Information about the movies used in this demonstration including titles, descriptions, actors, genres - is
courtesy of Wikipedia. Movie posters are courtesy of TMDb. All other data used in this demonstration
including customers and their viewing behavior - is completely fictitious.
HDFS: A distributed file system to provide fault-tolerant storage while scaling horizontally
MapRedce: A computational framework which moves the compute to the data enabling incredible
parallelism
The Hadoop Distributed File System (HDFS) plays a central role in storing and efficiently accessing massive
amounts of data. HDFS is:
High-throughput: streaming data access and large block sizes optimizes operation on massive datasets
Designed for data locality: enabling mapped computation is central to the design
Operating in parallel over data stored in HDFS requires a computing framework. MapReduce is the parallel data
processing framework in Hadoop. Processing jobs are broken into tasks, distributed across the cluster, and run
locally with the correct data. MapReduce jobs consist of two types of tasks:
Reduce: summarize and aggregate the sorted map results, producing final output
Because Hadoop is largely written in Java, Java is the language of MapReduce. However, the Hadoop
community understands that Java is not always the quickest or most natural way to describe data processing. As
such, the ecosystem has evolved to support a wide variety of APIs and secondary languages. From scripting
languages to SQL, the Hadoop ecosystem allows developers to express their data processing jobs in the language
they deem most suitable. Hive and Pig are a pair of these secondary languages for interacting with data stored
HDFS. Hive is a data warehousing system which exposes an SQL-like language called HiveQL. Pig is an
analysis platform which provides a dataflow language called Pig Latin. In this workshop, we will cover the
basics of each language.
Setup
Make sure the hands-on lab is initialized by running the following script:
cd /home/oracle/movie/moviework/reset
./reset_mapreduce.sh
Review the commands available for the Hadoop Distributed File System:
cd /home/oracle/movie/moviework/mapreduce
hadoop fs
2.
3.
Create a subdirectory called my_stuff in the /user/oracle folder and then ensure the directory has been
created:
hadoop fs -mkdir /user/oracle/my_stuff
hadoop fs -ls /user/oracle
4.
Remove the directory my_stuff and then ensure it has been removed:
hadoop fs -rm -r my_stuff
hadoop fs -ls
Next, load a file into HDFS from the local file system. Specifically, you will load an Avro log file that tracked
activity in an on-line movie application. The Avro data represents individual clicks from an online movie rental
site. You will use the basic put commands for moving data into Hadoop Distributed File System.
5.
6.
Review the commands available for the Hadoop Distributed File System and copy the Avro file into
HDFS:
hadoop fs -put movieapp_3months.avro /user/oracle/moviework/applog_avro
7.
The remaining exercises operate over the data in this JSON file. Make a note of its location in HDFS and the fields
in each tuple.
Review the Avro schema for the data file that contains the movie activity
1.
Enter the Hive command line by typing hive at the Linux prompt:
Create an external table that parses the Avro fields and maps them to the columns in the table.
Select the min and max time periods contained table using HiveQL
hive
2.
Create a new hive database called moviework. Ensure that the database has been successfully created:
hive> create database moviework;
hive> show databases;
3.
To create a table in a database, you can either fully qualify the table name (i.e. prepend the database to
the name of the table) or you can designate that you want all DDL and DML operations to apply to a
specific database. For simplicity, you will apply subsequent operations to the moviework database:
hive> use moviework;
4. Review the schema for the Avro file. This schema definition has already been saved in HDFS in the
/user/oracle/moviework/schemas/ directory. Create a new Terminal window and type the following
command at the Linux prompt to review the schema definition:
5. Create a Hive external table using that schema definition. Notice that you do not need to specify the
column names or data types when defining the table. The Avro serializer-deserializer (or SERDE) will
parse the schema definition to determine these values. After creating the table, review the results by
selecting the first 20 rows. Go back into your Hive terminal window and run the following commands in
the moviework database:
hive> CREATE EXTERNAL TABLE movieapp_log_avro
ROW FORMAT
SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
WITH SERDEPROPERTIES
('avro.schema.url'='hdfs://bigdatalite.localdomain/user/oracle/moviework/schemas/activity.avsc')
STORED AS
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
LOCATION '/user/oracle/moviework/applog_avro';
hive> SELECT * FROM movieapp_log_avro LIMIT 20;
6. HiveQL supports many standard SQL operations. Find the min and max time periods that are available
in the log file:
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
RATE_MOVIE
COMPLETED_MOVIE
PAUSE_MOVIE
START_MOVIE
BROWSE_MOVIE
LIST_MOVIE
SEARCH_MOVIE
LOGIN
LOGOUT
INCOMPLETE_MOVIE
PURCHASE_MOVIE
Hive maps queries into MapReduce jobs, simplifying the process of querying large datasets in HDFS. HiveQL
statements can be mapped to phases of the MapReduce framework. As illustrated in the following figure,
selection and transformation operations occur in map tasks, while aggregation is handled by reducers. Join
operations are flexible: they can be performed in the reducer or mappers depending on the size of the leftmost
table.
1.
Write a query to select only those clicks which correspond to starting, browsing, completing, or
purchasing movies. Use a CASE statement to transform the RECOMMENDED column into integers
where Y is 1 and N is 0. Also, ensure GENREID is not null. Only include the first 25 rows:
Select the movie ratings made by a user. And, consider the following: what if a user rates the same movie
multiple times? In this scenario, you should only load the users most recent movie rating.
In Oracle Database 12c, you can use a windowing function. However, HiveQL does not provide sophisticated
analytic functions. Instead, you must use an inner join to compute the result.
Note: Joins occur before WHERE clauses. To restrict the output of a join, a requirement should be in the WHERE
clause, otherwise it should be in the JOIN clause.
2. Write a query to select the customer ID, movie ID, recommended state and most recent rating for each
movie.
hive> SELECT
m1.custid,
m1.movieid,
CASE WHEN m1.genreid > 0 THEN m1.genreid ELSE -1 END genreid,
m1.time,
CASE m1.recommended WHEN 'Y' THEN 1 ELSE 0 END
recommended,
m1.activity,
m1.rating
FROM movieapp_log_avro m1
JOIN
(SELECT
custid,
movieid,
CASE WHEN genreid > 0 THEN genreid ELSE -1 END genreid,
MAX(time) max_time,
activity
FROM movieapp_log_avro
GROUP BY custid,
movieid,
genreid,
activity
) m2
ON (
m1.custid = m2.custid
AND m1.movieid = m2.movieid
AND m1.genreid = m2.genreid
AND m1.time = m2.max_time
AND m1.activity = 1
AND m2.activity = 1
) LIMIT 25;
3.
Load the results of the previous two queries into a staging table. First, create the staging table:
4.
Next, load the results of the queries into the staging table:
INSERT OVERWRITE TABLE movieapp_log_stage
SELECT * FROM (
SELECT custid,
movieid,
CASE WHEN genreid > 0 THEN genreid ELSE -1 END genreid,
time,
CAST((CASE recommended WHEN 'Y' THEN 1 ELSE 0 END) AS INT)
recommended,
activity,
cast(null AS INT) rating,
price
FROM movieapp_log_avro
WHERE activity IN (2,4,5,11)
UNION ALL
SELECT
m1.custid,
m1.movieid,
CASE WHEN m1.genreid > 0 THEN m1.genreid ELSE -1 END genreid,
m1.time,
CAST((CASE m1.recommended WHEN 'Y' THEN 1 ELSE 0 END) AS
INT) recommended,
m1.activity,
m1.rating,
cast(null as float) price
FROM movieapp_log_avro m1
JOIN
(SELECT
custid,
movieid,
CASE WHEN genreid > 0 THEN genreid ELSE -1 END genreid,
MAX(time) max_time,
activity
FROM movieapp_log_avro
GROUP BY custid,
movieid,
genreid,
activity
) m2
ON (
m1.custid = m2.custid
AND m1.movieid = m2.movieid
AND m1.genreid = m2.genreid
AND m1.time = m2.max_time
AND m1.activity = 1
AND m2.activity = 1
)
) union_result;
In this exercise you will learn basic Pig Latin semantics and about the fundamental types in Pig Latin, Data Bags
and Tuples.
1. Start the Grunt shell and execute the following statements to set up a dataflow with the clickstream data. Note:
Pig Latin statements are assembled into MapReduce jobs which are launched at execution of a DUMP or STORE
statement.
pig
REGISTER /usr/lib/pig/piggybank.jar
REGISTER /usr/lib/pig/lib/avro-1.7.4.jar
REGISTER /usr/lib/pig/lib/json-simple-1.1.jar
REGISTER /usr/lib/pig/lib/snappy-java-1.0.4.1.jar
REGISTER /usr/lib/pig/lib/jackson-core-asl-1.8.8.jar
REGISTER /usr/lib/pig/lib/jackson-mapper-asl-1.8.8.jar
applogs = LOAD '/user/oracle/moviework/applog_avro'
USING org.apache.pig.piggybank.storage.avro.AvroStorage(
'no_schema_check',
'schema_file',
'hdfs://bigdatalite.localdomain/user/oracle/moviework/schemas/activity.avsc');
DESCRIBE applogs;
log_sample = SAMPLE applogs 0.001;
DESCRIBE log_sample;
DUMP log_sample;
log_sample is a bag of tuples, two of the basic Pig Latin data types. Put concisely:
1227714 is a field
(1227714,2012-09-30:22:56:03,6,,39451,6,) is a tuple, an ordered collection of fields
{(1227714,2012-09-30:22:56:03,6,,39451,6,), (1070227,2012-09-30:19:09:32,8,,,,)} is a bag, a collection
of tuples
Solutions
Exercise 1 Load Avro data into HDFS
1.
2.
Review the commands available for the Hadoop Distributed File System and copy the gzipped file into
HDFS:
hadoop fs
hadoop fs -put movieapp_3months.avro /user/oracle/moviework/applog_avro
3.
Enter the Hive command line by typing hive at the Linux prompt:
hive
2.
Create a new hive database called moviework. Ensure that the database has been successfully created:
hive> create database moviework;
hive> show databases;
Result:
3.
Review the schema definition for the avro file and then define a table using that schema file:
hadoop fs -cat moviework/schemas/activity.avsc:
Result:
4.
To create a table in a database, you can either fully qualify the table name (i.e. prepend the database to
the name of the table) or you can designate that you want all DDL and DML operations to apply to a
specific database. For simplicity, you will apply subsequent operations to the moviework database:
hive> use moviework;
Now create the external table movieapp_log_avro that uses the activit Avro schema. Select the first 20 rows:
hive> CREATE EXTERNAL TABLE movieapp_log_avro
ROW FORMAT
SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
WITH SERDEPROPERTIES
('avro.schema.url'='hdfs://bigdatalite.localdomain/user/oracle/moviework/schemas/activity.avsc')
STORED AS
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
LOCATION '/user/oracle/moviework/applog_avro';
SELECT * FROM movieapp_log_avro LIMIT 20;
5.
Write a query in the Hive command line that returns the first 5 rows from the table. After reviewing the
results, drop the table:
hive> SELECT * FROM movieapp_log_avro LIMIT 5;
hive> drop table movieapp_log_avro;
6. HiveQL supports many standard SQL operations. Find the min and max time periods that are available
in the log file:
Write a query to select only those clicks which correspond to starting, browsing, completing, or
purchasing movies. Use a CASE statement to transform the RECOMMENDED column into integers
where Y is 1 and N is 0. Also, ensure GENREID is not null. Only include the first 25 rows:
2. Write a query to select the customer ID, movie ID, recommended state and most recent rating for each
movie.
hive> SELECT
m1.custid,
m1.movieid,
CASE WHEN m1.genreid > 0 THEN m1.genreid ELSE -1 END genreid,
m1.time,
CASE m1.recommended WHEN 'Y' THEN 1 ELSE 0 END
recommended,
m1.activity,
m1.rating
FROM movieapp_log_avro m1
JOIN
(SELECT
custid,
movieid,
CASE WHEN genreid > 0 THEN genreid ELSE -1 END genreid,
MAX(time) max_time,
activity
FROM movieapp_log_avro
GROUP BY custid,
movieid,
genreid,
activity
) m2
ON (
m1.custid = m2.custid
AND m1.movieid = m2.movieid
AND m1.genreid = m2.genreid
AND m1.time = m2.max_time
AND m1.activity = 1
AND m2.activity = 1
) LIMIT 25;
Result:
3.
Load the results of the previous two queries into a staging table. First, create the staging table:
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';
Result:
4.
Next, load the results of the queries into the staging table:
INSERT OVERWRITE TABLE movieapp_log_stage
SELECT * FROM (
SELECT custid,
movieid,
CASE WHEN genreid > 0 THEN genreid ELSE -1 END genreid,
time,
CAST((CASE recommended WHEN 'Y' THEN 1 ELSE 0 END) AS INT)
recommended,
activity,
cast(null AS INT) rating,
price
FROM movieapp_log_avro
WHERE activity IN (2,4,5,11)
UNION ALL
SELECT
m1.custid,
m1.movieid,
CASE WHEN m1.genreid > 0 THEN m1.genreid ELSE -1 END genreid,
m1.time,
CAST((CASE m1.recommended WHEN 'Y' THEN 1 ELSE 0 END) AS
INT) recommended,
m1.activity,
m1.rating,
cast(null as float) price
FROM movieapp_log_avro m1
JOIN
(SELECT
custid,
movieid,
CASE WHEN genreid > 0 THEN genreid ELSE -1 END genreid,
MAX(time) max_time,
activity
FROM movieapp_log_avro
GROUP BY custid,
movieid,
genreid,
activity
) m2
ON (
m1.custid = m2.custid
AND m1.movieid = m2.movieid
AND m1.genreid = m2.genreid
AND m1.time = m2.max_time
AND m1.activity = 1
AND m2.activity = 1
)
) union_result;