Book Recommender System Using Hadoop
Book Recommender System Using Hadoop
Book Recommender System Using Hadoop
Submitted in partial fulfillment for the award of the degree of Bachelor of Technology in Computer Science & Engineering Under the Guidance of Mrs Shaveta Tatwani
Department of Computer Science and Engineering Amity School of Engineering and Technology (Affiliated to G.G.S.I.P.University ) New Delhi - 110061
CERTIFICATE
This is to certify that the Project Report titled Book Recommender System using Hadoop Done by Ankit Kumar Das (04710402710) Pranay Khatri (03010402710) Shreya Tripathi (05110402710)
is an authentic work carried out by them under my guidance at : AMITY SCHOOL OF ENGINEERING AND TECHNOLOGY, Bijwasan The matter embodied in this project report has not been submitted earlier for the award of any degree or diploma to the best of my knowledge and belief.
AKNOWLEDGEMENT
First and foremost, our acknowledgements are due to Prof. B.P.Singh, Senior Director and Prof. Rekha Aggarwal, Director for the education under their guidance that has provided strong fundamentals, positive competition and unmatched learning experience.
I express our gratitude to our esteemed Professor M.N. Gupta, (Head, Department of CSE & IT) and our guide Asst. Professor, Mrs Shaveta Tatwani. Their inspiration, motivation, suggestion and invaluable guidance enabled us to develop the minor project. Their careful observations and precise ideas were an immense help in refining our content.
ii
ABSTRACT
Recommender Systems are new generation internet tool that help user in navigating through information on the internet and receive information related to their preferences. Although most of the time recommender systems are applied in the area of online shopping and entertainment domains like movie and music, yet their applicability is being researched upon in other area as well. This report presents an overview of the Recommender Systems which are currently working in the domain of online book shopping. This report also proposes a new book recommender system that combines user choices with not only similar users but other users as well to give diverse recommendation that change over time. The overall architecture of the proposed system is presented and its implementation with a prototype design is described. Lastly, the report presents empirical evaluation of the system based on a survey reflecting the impact of such diverse recommendations on the user choices.
iii
TABLE OF CONTENTS
Certificate ...i Acknowledgement .....ii Abstract ....iii Table of Contents .....iv List of Figuresvi
1. Introduction..........1 1.1 Recommender Systems 1.2 Apache Hadoop 2. Recommender Systems2 2.1 Introduction 2.2 Content based Filtering 2.3 Collaborative Filtering 2.3.1 User based Collaborative Filtering 2.3.2 Item based Collaborative Filtering 2.4 Similarity Measures 2.4.1 Cosine based Similarity 2.4.2 Pearson Correlation based Similarity 2.4.3 Cooccurrence based Similarity 2.4.4 Euclidian Distance based Similarity 2.4.5 Tanimoto Coefficient based Similarity 2.4.6 Log Likelihood based Similarity 2.5 Challenges and Issues 3. Hadoop Distributed File System.......11 3.1 Overview 3.2 Assumptions and Goals 3.3 Architecture 3.3.1 Namenode 3.3.1.1 Checkpoint Node 3.3.1.2 Backup Node 3.3.2 DataNode 3.3.3 HDFS Client 3.4 Data Replication 3.4.1 Replica Placement 3.4.2 Replica Selection 3.4.3 Replica Selection 3.5 Data Organisation 3.5.1 Data Blocks 3.5.3 Staging 3.5.3 Replication and Pipelining 3.6 HDFS Features 3.6.1 Communication Protocols iv
3.6.2 Data Disk Failure, Heartbeats and Re replication 3.6.3 Cluster Rebalancing 3.6.4 Data Integrity 3.6.5 Metadata Disk Failure 3.7 Advantages and Disadvantages 4. Hadoop MapReduce...24 4.1 Introduction 4.2 Inputs and Outputs 4.3 User Interfaces 4.3.1 Mapper 4.3.2 Reducer 4.3.3 Partitioner 4.3.4 Reporter 4.3.5 Output Collector 4.4 Job Configuration 4.5 MapReduce Features 5. Item Based Recommendation Algorithm..30 5.1 Algorithm 5.2 Logical Parts of Code 5.2.1 Preparation of Preference Matrix 5.2.2 Generation of Similarity Matrix 5.2.3 Preparation for Matrix Multiplication 5.2.4 Matrix Multiplication 6. System Preparation and Implementation.33 6.1 Runtime Environment 6.2 Software and Language Versions 6.3 Hardware Specification of each Hadoop Node 6.4 Hadoop Configuration 6.5 Starting the Cluster 6.5.1 Formatting the Namenode 6.5.2 Starting HDFS daemons 6.5.3 Starting mapred daemons 6.5.4 Starting the Job 6.6 Monitoring the Cluster 7. Results and Conclusion40 7.1 Results 7.1.1 Run Time 7.1.2 Recommendations 7.2 Conclusions 8. Future Scope...........46 9. References...........47
LIST OF FIGURES
Figure no.
2.1 2.2 2.3 2.4 3.1 3.2 3.3 3.4 4.1 4.2 6.1 6.2 6.3 6.4 6.5 6.6 6.7 6.8 7.1 7.2 7.3 7.4 7.5 7.6
Name
User based Collaborative Filtering Item based Collaborative Filtering Item-Item Similarity Tanimoto Coefficient HDFS Architecture Reading a File Writing to a file Block Replication Map reduce Framework Task Environment Formatting Namenode HDFS daemons Mapred daemons Starting Job Namenode Interface JobTracker Interface Scheduling Interface 1 Scheduling Interface 2 Average Run Time Similarity Cooccurrence Recommendations Tanimoto Recommendations Loglikelihood Recommendations Euclidian Distance Recommendation Pearson Coefficient Recommendations
Page no.
5 5 6 8 13 16 17 18 25 28 36 36 37 37 37 38 38 39 41 42 42 43 43 44
vi
LIST OF TABLES
Table no.
2.1 7.1
Name
User Ratings Time Taken
Page no.
4 40
vii
CHAPTER 1 INTRODUCTION
1.1 Recommender Systems
Recommender systems are widespread tools that are employed by a wide range of organisations and companies for recommending items such as movies, books and even employees for projects. But with the advent of big data it has become difficult to process the large amount of data for recommendations. Due to this reason, Apache Hadoop is employed for scalability, reliability and faster processing. Recommender systems (sometimes replacing "system" with a synonym such as platform or engine) are a subclass of information filtering system that seek to predict the 'rating' or 'preference' that user would give to an item. Recommender systems have become extremely common in recent years, and are applied in a variety of applications. The most popular ones are probably movies, music, news, books, research articles, search queries, social tags, and products in general
Hadoop Common: The common utilities that support the other Hadoop modules. Hadoop Distributed File System (HDFS): A distributed file system that provides high-throughput access to application data.
Hadoop MapReduce: A YARN-based system for parallel processing of large data sets.
a film critic has written and which appear in the newspaper they read. In seeking to mimic this behaviour, the first RSs applied algorithms to leverage recommendations produced by a community of users to deliver recommendations to an active user, i.e., a user looking for suggestions. The recommendations were for items that similar users (those with similar tastes) had liked. This approach is termed collaborative-filtering and its rationale is that if the active user agreed in the past with some users, then the other recommendations coming from these similar users should be relevant as well and of interest to the active user. As ecommerce Web sites began to develop, a pressing need emerged for providing recommendations derived from filtering the whole range of available alternatives. Users were finding it very difficult to arrive at the most appropriate choices from the immense variety of items (products and services) that these Web sites were offering. The explosive growth and variety of information available on the Web and the rapid introduction of new e-business services (buying products, product comparison, auction, etc.) frequently overwhelmed users, leading them to make poor decisions. The availability of choices, instead of producing a benefit, started to decrease users wellbeing. It was understood that while choice is good, more choice is not always better. Indeed, choice, with its implications of freedom, autonomy, and self-determination can become excessive, creating a sense that freedom may come to be regarded as a kind of miseryinducing tyranny. RSs have proved in recent years to be a valuable means for coping with the information overload problem. Ultimately a RS addresses this phenomenon by pointing the user towards the items in which he/she may be interested.
. Those items that are mostly similar to the positively rated ones, will be
Recommendations can be based on demographics of the users, overall top selling items, or past buying habit of users as a predictor of future items. Collaborative Filtering (CF)
[2] [3]
is
the most successful recommendation technique to date. The idea of collaborative ltering is in nding users in a community that share appreciations. If two users have same or almost same rated items in common, then they have similar tastes. Such users build a group or a so called neighbourhood. A user gets recommendations to those items that he/she hasnt rated before, but that were already positively rated by users in his/her neighbourhood. Table 2.1 shows that all three users rated the books positively and with similar marks. That means that they have similar taste and build a neighbourhood. The user A hasnt rated the book ASP.Net, which probably mean that he hasnt watched it yet. As the movie was positively rated by the other users, he will get this item recommended. As opposed to simpler recommender systems where recommendations base on the most rated item and the most popular item methods, collaborative recommender systems care about the taste of user. The taste is considered to be constant or at least change slowly.
Table 2.1: User Ratings Collaborative filtering is widely used in e-commerce. Customers can rate books, songs, movies and then get recommendations regarding those issues in future. Moreover collaborative filtering is utilized in browsing of certain documents (e.g. documents among scientific works, articles, and magazines).
In the user-based approach, the users perform the main role. If certain majority of the customers has the same taste then they join into one group. Recommendations are given to user based on evaluation of items by other users form the same group, with whom he/she shares common preferences. If the item was positively rated by the community, it will be recommended to the user. Thus in the user-based approach the items that were already rated by the user before play an important role in searching a group that shares appreciations with him.
Fig. 2.3: Item-Item Similarity There are different ways to compute the similarity between items. These are cosine-based similarity, correlation-based similarity and adjusted-cosine similarity.
The Pearson correlation of two series is the ratio of their covariance to the product of their variances. Covariance is a measure of how much two series move together in absolute terms; its big when the series moves far in the same direction from their means in the same places. Dividing by the variances merely normalizes for the absolute size of their changes. The Pearson correlation is a number between 1 and 1 that measures the tendency of two series of numbers, paired up one-to-one, to move together. That is to say, it measures how likely a number in one series is to be relatively large when the corresponding number in the other series is high, and vice versa. It measures the tendency of the numbers to move together proportionally, such that theres a roughly linear relationship between the values in one series and the other. When this tendency is high, the correlation is close to 1. When there appears to be little relationship at all, the value is near 0. When there appears to be an opposing relationshipone series numbers are high exactly when the other series numbers are low the value is near 1. In this case, similarity between two items i and j is measured by computing the Pearson-r correlation corr i, j. To make the correlation computation accurate we must first isolate the co-rated cases (i.e., cases where the users rated both i and j). Let the set of users who both rated i and j are denoted by U then the correlation similarity is given by
preference values. This similarity metric computes the Euclidean distance d between two such user points. This value alone doesnt constitute a valid similarity metric, because larger values would mean more-distant, and therefore less similar, users. The value should be smaller when users are more similar. Therefore, the implementation actually returns 1 / (1+d). This similarity metric never returns a negative value, but larger values still mean more similarity.
number of items in common between two users, but its value is more an expression of how unlikely it is for two users to have so much overlap, given the total number of items out there and the number of items each user has a preference for. It is a probability based distance. The distance between two clusters is related to the decrease in log-likelihood as they are combined into one cluster. In calculating log-likelihood, normal distributions for continuous variables and multinomial distributions for categorical variables are assumed. It is also assumed that the variables are independent of each other, and so are the cases. The distance between clusters j and s is defined as:
Where
Scalability With the growth of numbers of users and items, the system needs more resources for processing information and forming recommendations. Majority of resources is consumed with the purpose of determining users with similar tastes, and goods with similar descriptions. This problem is also solved by the combination of various types of filters and physical improvement of systems. Parts of numerous computations may also be implemented offline in order to accelerate issuance of recommendations online.
Sparsity In online shops that have a huge amount of users and items there are almost always users that have rated just a few items. Using collaborative and other approaches recommender systems generally create neighbourhoods of users using their profiles. If a user has evaluated just few items then its pretty difficult to determine his taste and he/she could be related to the wrong neighbourhood. Sparsity is the problem of lack of information.
Privacy Privacy has been the most important problem. In order to receive the most accurate and correct recommendation, the system must acquire the most amount of information possible about the user, including demographic data, and data about the location of a particular user. Naturally, the question of reliability, security and confidentiality of the given information arises. Many online shops offer effective protection of privacy of the users by utilizing specialized algorithms and programs.
10
11
Applications that run on HDFS have large data sets. A typical file in HDFS is gigabytes to terabytes in size. Thus, HDFS is tuned to support large files. It should provide high aggregate data bandwidth and scale to hundreds of nodes in a single cluster. It should support tens of millions of files in a single instance. Simple Coherency Model HDFS applications need a write-once-read-many access model for files. A file once created, written, and closed need not be changed. This assumption simplifies data coherency issues and enables high throughput data access. A MapReduce application or a web crawler application fits perfectly with this model. There is a plan to support appending-writes to files in the future. Moving Computation is Cheaper than Moving Data A computation requested by an application is much more efficient if it is executed near the data it operates on. This is especially true when the size of the data set is huge. This minimizes network congestion and increases the overall throughput of the system. The assumption is that it is often better to migrate the computation closer to where the data is located rather than moving the data to where the application is running. HDFS provides interfaces for applications to move themselves closer to where the data is located. Portability Across Heterogeneous Hardware and Software Platforms HDFS has been designed to be easily portable from one platform to another. This facilitates widespread adoption of HDFS as a platform of choice for a large set of applications.
3.3 Architecture
HDFS is based on master/slave architecture. A HDFS cluster consists of a single NameNode (as master) and a number of DataNodes (as slaves). The NameNode and DataNodes are pieces of software designed to run on commodity machines. These machines typically run a GNU/Linux operating system (OS). The usage of the highly portable Java language means that HDFS can be deployed on a wide range of machines. A typical deployment has a dedicated machine that runs only the NameNode software. Each of the other machines in the cluster runs one instance of the DataNodes software. The architecture does not preclude running multiple DataNodes on the same machine but in a real deployment one machine usually runs one DataNode. 12
The existence of a single NameNode in a cluster greatly simplifies the architecture of the system. The NameNode is the arbitrator and repository for all HDFS metadata. The system is designed in such a way that user data never flows through the NameNode.
3.3.1 Namenode
NameNode manages the filesystem namespace, metadata for all the files and directories in the tree.
Fig. 3.1: HDFS Architecture The file is divided into large blocks (typically 148 megabytes, but the user selectable file-byfile) and each block is independently replicated at multiple DataNodes (typically three, but user selectable file-by-file) to provide reliability. The NameNode maintains and stores the namespace tree and the mapping of file blocks to DataNodes persistently on the local disk in the form of two files: the namespace image and the edit log. The NameNode also knows the DataNodes on which all the blocks for a given file are located. However, it does not store block locations persistently, since this information is reconstructed from DataNodes when the system starts. On the NameNode failure, the filesystem becomes inaccessible because only NameNode knows how to reconstruct the files from the blocks on the DataNodes. So, for this reason, it is 13
important to make the NameNode resilient to failure, and Hadoop provides two mechanisms for this: Checkpoint Node and Backup Node. 3.3.1.1 Checkpoint Node Checkpoint is an image record written persistently to disk. NameNode uses two types of files to persist its namespace: - Fsimage: the latest checkpoint of the namespace - Edits: logs containing changes to the namespace; these logs are also called journals. NameNode creates an updated file system metadata by merging both files i.e. fsimage and edits on restart. The NameNode then overwrites fsimage with the new HDFS state and begins a new edits journal. The Checkpoint node periodically downloads the latest fsimage and edits from the active NameNode to create checkpoints by merging them locally and then to upload new checkpoints back to the active NameNode. This requires the same memory space as that of NameNode and so checkpoint needs to be run on separate machine. Namespace information lost if either the checkpoint or the journal is missing, so it is highly recommended to configure HDFS to store the checkpoint and journal in multiple storage directories. The Checkpoint node uses parameter fs.checkpoint.period to check the interval between two consecutive checkpoints. The Interval time is in seconds (default is 4600 second). The Edit log file size is specified by parameter fs.checkpoint.size (default size 64MB) and a checkpoint triggers if size exceeds. Multiple checkpoint nodes may be specified in the cluster configuration file. 3.3.1.2 Backup Node The Backup node has the same functionality as the Checkpoint node. In addition, it maintains an in-memory, up-to-date copy of the file system namespace that is always synchronized with the active NameNode state. Along with accepting a journal stream of the filesystem edits from the NameNode and persisting this to disk, the Backup node also applies those edits into its own copy of the namespace in memory, thus creating a backup of the namespace. Unlike the Checkpoint node, the Backup node has an up-to-date state of the namespace state in memory. The Backup node requires same RAM as of NameNode. The NameNode supports one Backup node at a time. No Checkpoint nodes may be registered if a Backup
14
node is in use. The Backup node takes care of the namespace data persistence and NameNode does not need to have persistent store.
3.3.2 DataNodes
There are a number of DataNodes, usually one per node in the cluster, which manage storage attached to the nodes. HDFS exposes a file system namespace and allows user data to be stored in files. Internally, a file is split into one or more blocks and these blocks are stored in a set of DataNodes[7]. The NameNode executes the file system namespace operations such as opening, closing, and renaming files and directories. It also determines the mapping of blocks to DataNodes. DataNodes store and retrieve blocks when requested (by clients or the NameNode), and they report back to the NameNode periodically with lists of blocks they are storing. The DataNodes are responsible for serving read and write requests from the file systems clients. The DataNodes also perform block creation, deletion, and replication upon instruction from the NameNode. DataNodes and NameNode connections are established by handshake where namespace ID and the software version of the DataNodes are verified. The namespace ID is assigned to the file system instance when it is formatted. The namespace ID is stored persistently on all nodes of the cluster. A different namespace ID node cannot join the cluster. A new DataNode without any namespace ID can join the cluster and receive the clusters namespace ID and DataNode registers with the NameNode with storage ID. A DataNode identifies block replicas in its possession to the NameNode by sending a block report. A block report contains the block id, the generation stamp and the length for each block replica the server hosts. The first block report is sent immediately after the DataNodes registrations. Subsequent block reports are sent every hour and provide the NameNode with an up-to date view of where block replicas are located on the cluster.
15
Fig. 3.2: Reading a file Writing to a File For writing to a file, HDFS client first creates an empty file without any blocks. File creation is only possible when the client has writing permission and a new file does not exist in the system. NameNode records new file creation and allocates data blocks to list of suitable DataNodes to host replicas of the first block of the file. Replication of data makes DataNodes in pipeline. When the first block is filled, new DataNodes are requested to host replicas of the next block. A new pipeline is organized, and the client sends the further bytes of the file. Each choice of DataNodes is likely to be different. If a DataNode in pipeline fails while writing the data then pipeline is first closed and partial block on failed data node is deleted and failed DataNode is removed from the pipeline. New DataNodes in the pipeline are chosen to write remaining blocks of data.
16
17
Fig 3.4: Block Replication 3.4.1 Replica Placement The placement of replicas is critical to HDFS reliability and performance. Optimizing replica placement distinguishes HDFS from most other distributed file systems. This is a feature that needs lots of tuning and experience. The purpose of a rack-aware replica placement policy is to improve data reliability, availability, and network bandwidth utilization. The current implementation for the replica placement policy is a first effort in this direction. The shortterm goals of implementing this policy are to validate it on production systems, learn more about its behavior, and build a foundation to test and research more sophisticated policies. Large HDFS instances run on a cluster of computers that commonly spread across many racks. Communication between two nodes in different racks has to go through switches. In most cases, network bandwidth between machines in the same rack is greater than network bandwidth between machines in different racks. The NameNode determines the rack id each DataNode belongs to via the process outlined in Hadoop Rack Awareness. A simple but non-optimal policy is to place replicas on unique 18
racks. This prevents losing data when an entire rack fails and allows use of bandwidth from multiple racks when reading data. This policy evenly distributes replicas in the cluster which makes it easy to balance load on component failure. However, this policy increases the cost of writes because a write needs to transfer blocks to multiple racks. For the common case, when the replication factor is three, HDFSs placement policy is to put one replica on one node in the local rack, another on a node in a different (remote) rack, and the last on a different node in the same remote rack. This policy cuts the inter-rack write traffic which generally improves write performance. The chance of rack failure is far less than that of node failure; this policy does not impact data reliability and availability guarantees. However, it does reduce the aggregate network bandwidth used when reading data since a block is placed in only two unique racks rather than three. With this policy, the replicas of a file do not evenly distribute across the racks. One third of replicas are on one node, two thirds of replicas are on one rack, and the other third are evenly distributed across the remaining racks. This policy improves write performance without compromising data reliability or read performance. 3.4.2 Replica Selection To minimize global bandwidth consumption and read latency, HDFS tries to satisfy a read request from a replica that is closest to the reader. If there exists a replica on the same rack as the reader node, then that replica is preferred to satisfy the read request. If angg/ HDFS cluster spans multiple data centers, then a replica that is resident in the local data center is preferred over any remote replica. 3.4.3 Safemode On startup, the NameNode enters a special state called Safemode. Replication of data blocks does not occur when the NameNode is in the Safemode state. The NameNode receives Heartbeat and Blockreport messages from the DataNodes. A Blockreport contains the list of data blocks that a DataNode is hosting. Each block has a specified minimum number of replicas. A block is considered safely replicated when the minimum number of replicas of that data block has checked in with the NameNode. After a configurable percentage of safely replicated data blocks checks in with the NameNode (plus an additional 40 seconds), the NameNode exits the Safemode state. It then determines the list of data blocks (if any) that still have fewer than the specified number of replicas. The NameNode then replicates these blocks to other DataNodes. 19
that block. The client then flushes the data block to the first DataNode. The first DataNode starts receiving the data in small portions (4 KB), writes each portion to its local repository and transfers that portion to the second DataNode in the list. The second DataNode, in turn starts receiving each portion of the data block, writes that portion to its repository and then flushes that portion to the third DataNode. Finally, the third DataNode writes the data to its local repository. Thus, a DataNode can be receiving data from the previous one in the pipeline and at the same time forwarding data to the next one in the pipeline. Thus, the data is pipelined from one DataNode to the next.
21
The HDFS architecture has data rebalancing schemes in which data is automatically moved from one DataNode to another if the free space threshold is reached. In the event of a sudden high demand for a particular file, a scheme might dynamically create additional replicas and rebalance other data in the cluster. These types of data rebalancing schemes are not yet implemented.
22
Architectural bottlenecks - There are scheduling delays in the Hadoop architecture that result in cluster nodes waiting for new tasks. More over disk is not used in a streaming manner, the access pattern is periodic. HDFS client serializes computation and I/O instead of decoupling and pipelining those operations. Portability limitations - HDFS being in Java could not able to support some performance-enhancing features in the native filesystem. Small file - HDFS is not efficient for large numbers of small files. Single MasterNode There might be risk of data loss because of single NameNode. Latency data access At the expense of latency HDFS delivers a high throughput of data. If an application needs lowlatency access to data then HDFS is not a good choice.
23
slave TaskTracker per cluster-node. The master is responsible for scheduling the jobs' component tasks on the slaves, monitoring them and re-executing the failed tasks. The slaves execute the tasks as directed by the master. Minimally, applications specify the input/output locations and
supply map and reduce functions via implementations of appropriate interfaces and/or abstract-classes. These, and other job parameters, comprise the job configuration. The Hadoop job client then submits the job (jar/executable etc.) and configuration to the JobTrackerwhich then assumes the responsibility of distributing the
software/configuration to the slaves, scheduling tasks and monitoring them, providing status and diagnostic information to the job-client.
The key and value classes have to be serializable by the framework and hence need to implement the Writable interface. Additionally, the key classes have to implement
the WritableComparable interface to facilitate sorting by the framework. Input and Output types of a MapReduce job: (input) <k1,v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k4, v4> (output)
4.3.2 Reducer
Reducer reduces a set of intermediate values which share a key to a smaller set of values. 25
The number of reduces for the job is set by the user via JobConf.setNumReduceTasks(int). Overall, Reducer implementations are passed the JobConf for the job via
the JobConfigurable.configure(JobConf) method and can override it to initialize themselves. The framework then calls reduce( WritableComparable,Iterator, OutputCollector,
reporter) method for each <key, (list of values)> pair in the grouped inputs. Applications can then override the Closeable.close() method to perform any required cleanup. Reducer has 4 primary phases: shuffle, sort and reduce. Shuffle Input to the Reducer is the sorted output of the mappers. In this phase the framework fetches the relevant partition of the output of all the mappers, via HTTP. Sort The framework groups Reducer inputs by keys (since different mappers may have output the same key) in this stage. The shuffle and sort phases occur simultaneously; while map-outputs are being fetched they are merged. Secondary Sort If equivalence rules for grouping the intermediate keys are required to be different from those for grouping keys before reduction, then one may specify a Comparator via
intermediate keys are grouped, these can be used in conjunction to simulate secondary sort on values. Reduce In this phase the reduce(WritableComparable, Iterator, OutputCollector, Reporter) method is called for each <key, (list of values)> pair in the grouped inputs. The output of the reduce task is typically written to
the FileSystem via OutputCollector.collect(WritableComparable, Writable). Applications can use the Reporter to report progress, set application-level status messages and update Counters, or just indicate that they are alive.
26
4.3.3 Partitioner
Partitioner partitions the key space. Partitioner controls the partitioning of the keys of the intermediate map-outputs. The key (or a subset of the key) is used to derive the partition, typically by a hash function. The total number of partitions is the same as the number of reduce tasks for the job. Hence this controls which of the m reduce tasks the intermediate key (and hence the record) is sent to for reduction.
4.3.4 Reporter
Reporter is a facility for MapReduce applications to report progress, set application-level status messages and update Counters. Mapper and Reducer implementations can use the Reporter to report progress or just indicate that they are alive. In scenarios where the application takes a significant amount of time to process individual key/value pairs, this is crucial since the framework might assume that the task has timed-out and kill that task. Another way to avoid this is to set the configuration parameter mapred.task.timeout to a high-enough value (or even set it to zero for no timeouts).
(e.g. setNumReduceTasks(int)), other parameters interact subtly with the rest of the framework and/or job configuration and are more complex to set
(e.g. setNumMapTasks(int)).
27
JobConf is
typically
used
to
specify
the
Mapper,
combiner
(if
any), Partitioner, Reducer, InputFormat, OutputFormat and OutputCommitter implementation s. JobConf also indicates the set of input files (setInputPaths(JobConf, Path...) / addInputPath (JobConf,Path)) and (setInputPaths (JobConf, String) / addInputPaths(JobConf,String)) and where the output files should be written (setOutputPath (Path)).
28
DistributedCache is a facility provided by the Map/Reduce framework to cache files (text, archives, jars and so on) needed by applications. Applications specify the files to be cached via urls (hdfs://) in the JobConf. The DistributedCache assumes that the files specified via hdfs:// urls are already present on the FileSystem. The framework will copy the necessary files to the slave node before any tasks for the job are executed on that node. Its efficiency stems from the fact that the files are only copied once per job and the ability to cache archives which are un-archived on the slaves. DistributedCache tracks the modification timestamps of the cached files. Clearly the cache files should not be modified by the application or externally while the job is executing. DistributedCache can be used to distribute simple, read-only data/text files and more complex types such as archives and jars. Archives (zip, tar, tgz and tar.gz files) areunarchived at the slave nodes. Files have execution permissions set. Tool The Tool interface supports the handling of generic Hadoop command-line options. Tool is the standard for any Map/Reduce tool or application. The application should delegate the handling of standard command-line options
to GenericOptionsParser viaToolRunner.run(Tool, String[]) and only handle its custom arguments. IsolationRunner IsolationRunner is a utility to help debug Map/Reduce programs. To use the IsolationRunner, first set keep.failed.tasks.files to true Profiling Profiling is a utility to get a representative (2 or 4) sample of built-in java profiler for a sample of maps and reduces. User can specify whether the system should collect profiler information for some of the tasks in the job by setting the configuration property mapred.task.profile. The value can be set using the api JobConf.setProfileEnabled(boolean). If the value is set true, the task profiling is enabled. The profiler information is stored in the user log directory. By default, profiling is not enabled for the job. 29
Debugging Map/Reduce framework provides a facility to run user-provided scripts for debugging. When map/reduce task fails, user can run script for doing post-processing on task logs i.e task's stdout, stderr, syslog and jobconf. The stdout and stderr of the user-provided debug script are printed on the diagnostics. These outputs are also displayed on job UI on demand. In the following sections we discuss how to submit debug script along with the job. For submitting debug script, first it has to distributed. Then the script has to supplied in Configuration. JobControl JobControl is a utility which encapsulates a set of Map/Reduce jobs and their dependencies. Data Compression Hadoop Map/Reduce provides facilities for the application-writer to specify compression for both intermediate map-outputs and the job-outputs i.e. output of the reduces. It also comes bundled with CompressionCodec implementations for the zlib and lzo compression algorithms. Intermediate Outputs Applications can control compression of intermediate map-outputs via
the JobConf.setCompressMapOutput(boolean) api and the CompressionCodec to be used via theJobConf.setMapOutputCompressorClass(Class) api. Job Outputs Applications can control compression of job-outputs via and via
30
3. UnsymmetrifyMapper and MergeToTopKSimilaritiesReducer reads half of the item-item similarity matrix and generates an almost complete one, missing only the values along its diagonal.
32
2. UserVectorSplitterMapper and Reducer reads in user-item vectors and outputs them as item-user vectors, formatted as VectorOrPrefWritable, to make them compatible for multiplication with the item-item matrix.
3. Default Mapper and ToVectorAndPrefReducer combines the output from the previous two steps, thus starting the multiplication of the item-item similarity matrix with the item-user vectors.
33
hadoop-env.sh
The only required environment variable that has to be configured for Hadoop is JAVA_HOME
core-site.xml
34
mapred-site.xml
The following configurations are added
<property> <name>mapred.job.tracker</name> <value>localhost:54411</value> <description>The host and port that the MapReduce job tracker runs at. If "local", then jobs are run in-process as a single map and reduce task. </description> </property>
hdfs-site.xml
The following configurations are added
<property> <name>dfs.replication</name> <value>1</value> <description>Default block replication. The actual number of replications can be specified when the file is created. The default is used if replication is not specified in create time. </description> </property>
masters
This file contains the label of the master. In this case- master is added to the file.
slaves
35
This file contains the labels of the slaves. In this case- slave is added to the file.
36
Fig. 6.5: Namenode interface Shown below is the interface for JobTracker in the master machine. It displays the number of tasks that are running and their other details.
37
39
Metric/Run no. Similarity Cooccurrence Tanimoto Coefficient Similarity Loglikelihood Euclidian Distance Pearson Coefficient
Average
566.740
564.924
566.108
565.098
564.982
565.570
589.650
590.472
588.847
589.241
589.594
589.548
704.074
705.012
704.725
704.104
705.021
704.586
588.212
589.027
588.624
587.984
588.149
588.499
441.286
440.956
442.012
441.459
441.427
441.428
40
800
700
600
500 Similarity Co-occurrence Tanimoto Coefficient 400 Similarity Loglikelihood Euclidian Distance 300 Pearson Coefficient
200
100
7.1.2 Recommendations The Book Crossing dataset was used for the recommender job. It contained user details, book ratings and book details such as name, genre, etc. A python script was coded which takes the ratings file, users file and book details file as parameters and displays the rated books and the recommended books for the user which has been specified through the user ID in the script in the console. The rated books are displayed with the book name followed by the user rating given to them by the user. The recommendations were made based on the similarity matrix specified to them.
41
7.2 Conclusion
The recommender job was successfully run on the Hadoop distributed environment. The following conclusions can be drawn from the results above. The time taken by Loglikelihood is the greatest. This is so because it eliminates all the intersections by chance by counting the number of times the items occur together and also when they are not together. This increases the relevance of the recommendations but also increases the overhead. Though the Pearson correlation takes the least time but it is used only theoretically. It has a few drawbacks in principle which do not make recommendations accurate. First, it doesnt take into account the number of items in which two users preferences overlap, which is probably a weakness in the context of recommender engines. Two items that have been read by 100 of the same users, for instance, even if they dont often agree on ratings, are probably more similar than two items which have only been read by two users in common. This is not considered by this metric and similarity of the two user ratings outweighs the 100 similar users. Second, if two 44
items overlap on only one user, no correlation can be computed because of how the computation is defined. This could be an issue for small or sparse data sets, in which users item sets rarely overlap. Finally, the correlation is also undefined if either series of preference values are all identical. It doesnt require that both series have all the same preference values. The run times of Euclidian distance, similarity co-occurrence and tanimoto coefficient were nearly equal and lie between Loglikelihood and Pearson coefficient.
45
46
CHAPTER 9 REFERENCES
[1] A. Felfering, G. Friedrich and Schmidt Thieme, Recommender systems, IEEE Intelligent systems, pages 18-21, 2007. [2] P. Resnick, N. Iacovou, M. Suchak, and J. Riedl, GroupLens: An Open Architecture for Collaborative Filtering of Netnews, In Proceedings of CSCW 94, Chapel Hill, NC, 1994. [3] U. Shardanand and P. Maes Social Information Filtering: Algorithms for Automating Word of Mouth, In Proceedings of CHI 95. Denver, 1995. [4] Daniar Asanov, Algorithms and Methods in Recommender Systems, Berlin Institute of Technology, Berlin, 2011 [5] Badrul Sarwar , George Karypis and John Riedl, Item-Based Collaborative Filtering Recommendation Algorithms, IW3C2: Hong Kong, China, 2001 [Online]. http://wwwconference.org/www10/cdrom/papers/519/index.html [6] Francesco Ricci, Lior Rokach and Bracha Shapira, Introduction to Recommender System Handbook, New York: Springer Science+Buisness Media Ltd, 2011, ch. 1, sec. 1.4, pp 10-14. [7] D. Borthakur and S. Dhruba, HDFS architecture guide, Hadoop Apache Project, 2008 [Online]. http://hadoop.apache.org/common/docs/current/hdfsdesign.pdf [8] Dean, Jeffrey, and Sanjay Ghemawat, MapReduce: simplified data processing on large clusters, Communications of the ACM 51.1, 2008, pp 107-113. [9] Linden, Greg, Brent Smith, and Jeremy York, Amazon.com recommendations: Item-toitem collaborative filtering, Internet Computing, IEEE 7.1, 2003, pp 76-80.
47