0% found this document useful (0 votes)
6 views

ECS765P_W2_The MapReduce Programming Model

The document provides an overview of the MapReduce programming model, detailing its implementation and various programming patterns for big data processing. It explains the roles of the map and reduce functions, the parallelization of tasks, and the importance of data partitioning and shuffling. Additionally, it discusses specific use cases such as word counting, inverted indexing, filtering, and numerical summarization, along with performance considerations.

Uploaded by

Yen-Kai Cheng
Copyright
© © All Rights Reserved
Available Formats
Download as PDF, TXT or read online on Scribd
0% found this document useful (0 votes)
6 views

ECS765P_W2_The MapReduce Programming Model

The document provides an overview of the MapReduce programming model, detailing its implementation and various programming patterns for big data processing. It explains the roles of the map and reduce functions, the parallelization of tasks, and the importance of data partitioning and shuffling. Additionally, it discusses specific use cases such as word counting, inverted indexing, filtering, and numerical summarization, along with performance considerations.

Uploaded by

Yen-Kai Cheng
Copyright
© © All Rights Reserved
Available Formats
Download as PDF, TXT or read online on Scribd
You are on page 1/ 53

ECS640U/ECS765P Big Data Processing

The MapReduce Programming Model


Lecturer: Ahmed M. A. Sayed
School of Electronic Engineering and Computer Science
ECS640U/ECS765P Big Data Processing
The MapReduce Programming Model
Lecturer: Ahmed M. A. Sayed
School of Electronic Engineering and Computer Science

Credit: Joseph Doyle, Jesus Carrion, Felix Cuadrado, …


Contents

● Introduction to MapReduce
● MapReduce Implementation
● MapReduce Programming patterns
● Aggregate computations
Our first parallel program (Reminder from last week)
● Task: count the number of occurrences of each word in one document
● Input: text document
● Output: sequence of: word, count
The 56
School 23
Queen 10

● Collection Stage: Not applicable in this case


● Ingestion Stage: Move file to data lake with applicable protocol e.g. HTTP/FTP
● Preparation Stage: Remove character which might confuse algorithm e.g. quotation marks etc
How to solve the problem on a single processor?
#input:text string with the complete text
words = text.split()
count = dict()
for word in words:
if word in count:
count[word] = count[word] + 1
else:
count[word] = 1

For example:

Text à “The good, the bad and the ugly”

Mywords à [‘The’, ‘good’, ‘the’, ‘bad’, ‘and’, ’the’, ‘ugly’]


Count à {‘the’: 3, ‘good’:1, ‘bad’:2, ‘and’:1, ‘ugly’:1}
Parallelising the solution
Our solution for one processor defines two basic tasks:
● Split input text into list of words
● Read each word and increase the corresponding counter

If we have several nodes, a simple way to parallelise the solution would be to:
● Assign a partial fragment of the input text to each node
● Each node solves the Word-Count problem for its fragment of text
● The results from each processor are merged together

Sounds easy but, how can we instruct a cluster of nodes to execute this simple job?
MapReduce
“A simple and powerful interface that enables automatic parallelization and distribution of large-scale
computations, combined with an implementation of this interface that achieves high performance on
large clusters of commodity PCs.”
(Dean and Ghermawat, “MapReduce: Simplified Data Processing on Large Clusters”, Google Inc, 2004.)

More simply, MapReduce is:


● A parallel programming model à map/reduce functions and associated implementation à
Hadoop.
MapReduce: Programming model
A MapReduce job consists of two functions
● The map function is called on an input item and emits intermediate key/value pairs
● The reduce function is called on groups of pairs with the same key, and emits results for this key
Given a job, if we can express it using a map function and a reduce function, we are using the MapReduce
programming model.

Input item map List key/value pairs

Group of key/value pairs


reduce Output for key
with same key
MapReduce parallelisation
MapReduce creates two opportunities for parallelisation:
● Input data can be partitioned into chunks, each of which can be assigned to a different mapper.
● The map stage produces collections of key/value pairs, that are grouped by key. Each distinct key-
group can be sent to a different reducer.
Map jobs are completed first, then a synchronisation step occurs, and reduce jobs start.

Map Reduce
Example wordcount (pythonish pseudocode)

def mapper(_,text):
words = text.split()
for word in words:
emit(word, 1)

def reducer(key, values):


emit(key, sum(values))
Example wordcount (javaish pseudocode)
public void Map (String filename, String text) {
List[String] words= text.split();
for (String word: words){
emit(word, 1)
}
}

public void Reduce (String key, List[Integer] values) {


int sum = 0;
for (Integer count: values){
sum+=count;
}
emit(key, sum);
}
How MapReduce parallelises
Input data is partitioned into processable chunks/partitions
One Map job is executed per chunk of data
● All mappers can be parallelised (depending on the number of nodes)
One Reduce Job is executed for each distinct key emitted by the Mappers
● All reducers can be parallelised (partitioned ‘almost evenly’ among nodes)
Note, computing nodes first work on the Map jobs. After all have completed the Map jobs, a
synchronization step occurs, and then the computing nodes start running Reduce jobs.
Word Count Example
MapReduce Benefits

● High level parallel programming abstraction


● Framework implementations provide good performance results
● Greatly reduces parallel programming complexity
● However, it is not suitable for every parallel programming algorithm!
Synchronization and message passing
Input key*value Input key*value
pairs pairs

...

map map
Data store 1 Data store n

(key 1, (key 2, (key 3, (key 1, (key 2, (key 3,


values...) values...) values...) values...) values...) values...)

== Barrier == : Aggregates intermediate values by output key

key 1, key 2, key 3,


intermediate intermediate intermediate
values values values

reduce reduce reduce

final key 1 final key 2 final key 3


values values values
Synchronization Step

● Every key-value item generated by the mappers is collected


● Items are transferred over the network (might be temporarily stored in memory/network storage)
● Same key items are grouped into a list of values
● Data is partitioned among the number of Reducers (nearly even partitions by # of keys)
● Data is copied over the network to each Reducer (if needed)
● The data provided to each Reducer is sorted according to the keys
MapReduce Runtime System
The runtime is responsible for the following:
● Partitions input data
● Schedules execution across a set of machines
● Handles load balancing
● Shuffles, partitions and sorts data between Map and Reduce steps
● Handles machine failure transparently
● Manages inter process communication
Contents

● Introduction to MapReduce?
● MapReduce Implementation
● MapReduce programming pattern
● Aggregate computations
MapReduce: Implementation
The MapReduce programming model defines two main functions, namely Map and Reduce. Both
functions encapsulate the logic of the solution and can be parallelised by allocating them to different
nodes.

However, in order for this strategy to work, in addition to expressing the logic of the solution as map and
reduce functions, we need to partition the input data and partition and move key/value pairs from
mappers to reducers

MapReduce specifies how this is implemented. When creating a MapReduce job, we do not need to
worry about implementing this: the MapReduce framework will do it for us.
Word Count Example
Partitions and move Key-Value pairs
Partitions Input Data

You define the map and reduce functions


Shuffle and sort

At the end of the map stage:


● All emitted key/value pairs are collected
● Partitioned evenly (partition)
● Each partition is sent to one reducer

Reducers then
● Obtain their partitions from each mapper (shuffling)
● Partitions from different mappers are grouped by key (sort). A group of key/value pairs with the same
key is expressed as a single key/value pair, where the new value is a list of all the input values.
● The reduce function is called on this key/value pair
Shuffle and Sort
Shuffle and Sort – on each Mapper

All emitted Key-value pairs are collected


● In-memory buffer (100MB default size) then spills to Hard Disk (HD)
Key/Value Pairs are partitioned depending on target reducer
● Partitioning aims at evenly splitting the keys
(Optionally) Combiner runs on each partition à will be covered in detail later
Output is available to the Reducers through HTTP server threads
Shuffle and Sort – on each Reducer

The reducer downloads output from mappers


● Potentially all Mappers are contacted (communication bottleneck)
Partial values from each Mapper are merged (e.g., same key on different mappers)
Keys are sorted and fed as input for the Reducer
● List of <key, list<value> >, sorted by key
The cost of communications

Parallelising Map and Reduce jobs allow algorithms to scale sub-linearly


One potential bottleneck for MapReduce programs is the cost of Shuffle and Sort operations
● Data has to be copied over network (i.e., expensive communication cost)
All the keys emitted by the mappers are transferred
● Sorting operation of large amounts of elements can also be costly

Combiner is an additional optional step that can be optionally executed before reducer to reduce the
communication volume.
The Combiner

● The combiner acts as a preliminary reducer


● It is executed at each mapper node just before sending all the key value pairs for shuffling
● Reduces the number of emitted items and improves efficiency
● It is not mandatory (the algorithm must work correctly if the Combiner is not invoked)
● Nearly the same as the Reducer function (but not always the case)

def combiner(self, word, values):


total = sum(values) def reducer(key, values):
yield(key, total) emit(key, sum(values))
The Combiner
Word count combiner
def mapper(_,text):
words = text.split()
for word in words:
emit(word, 1)

def reducer(key, values):


emit(key, sum(values) )

def combiner(key, values):


emit(key, sum(values)
Combiner Rules
The combiner has the same structure as the reducer (same input parameters) but must comply with these
rules
● Idempotent - The number of times the combiner is applied can't change the output
● Transitive - The order of the inputs can't change the output
● Side-effect free - Combiners can't have side effects (or they won't be idempotent).
● Preserves the sort order - They can't change the keys to disrupt the sort order
● Preserves the partitioning - They can't change the keys to change the partitioning to the Reducers
Word Count Example with Combiner
Contents

● Introduction to MapReduce?
● MapReduce Implementation
● MapReduce programming pattern
● Aggregate computations

Quiz and Break Time


MapReduce Programming Model/Patterns
MapReduce provides a high-level abstraction:
● The primary task is to express the logic of our solution as a map function followed by a reduce
function. This reduces parallel programming complexity.
● The low-level implementation is common to all MapReduce solutions. This gives an opportunity to
optimise the low-level implementation and improve performance results.

MapReduce is not a tool, but a framework: any solution has to be expressed using map and reduce
functions. It turns our that some problems are amenable to the MapReduce model, others are not.

A family of problems whose solution follow the same strategy to be implemented in the MapReduce
framework is called a MapReduce programming pattern.
Pattern 1: Inverted Index
Goal: Generate index from a dataset to allow faster searches for specific features
Examples:
● Building index from a textbook.
● Finding all websites that match a search term
Inverted Index Structure
Inverted Index Pattern

in this case, we can’t do combiner here


Inverted Index Pseudocode

def mapper(docId, text):


features = find_features(text)
for(feature in features):
emit(feature, docId)

def reducer(feature, docIds):


emit(feature, formatNicely(docIds))
Pattern 2: Filtering
Goal: Filter out records/fields that are not of interest for further computation.
Speedup the actual computation thanks to a reduced size of the dataset.

Examples:
● Distributed grep (text pattern match)
● Tracking a thread of events (logs from the same user)
● Data cleansing

Filtering is a mapper-only job: it doesn’t produce an aggregation.


Filtering structure
Pattern 3: Top-K
Goal: Retrieve a small number of records, relative to a ranking.

Examples:
● Build top sellers view
● Find outliers in data

In this pattern only ONE reducer will be used. Hence, there is no need to perform partition and
shuffling. Sorting will be part of the reduce function.
Top-Ten structure
Top-Ten Pattern

only one reducer is in use


Example Top Ten MapReduce
def mapper(_, row):
studentId = parseId(row)
grade = parseGrade(row)
pair = tuple(studentId, grade)
emit(None, pair)

def reducer(_, pairs):


top10 = pairs.sort().getTop(10)
rank = 1
for(student in top10):
emit(rank, student[0])
rank += 1

Minimum requirement: the ranking data for a whole input split must fit into memory of a single
Reducer
Performance Considerations
● How many Mappers and Reducers instances?
● Performance issues for using too many?
● What happens if we don’t use Combiners?

Performance depends greatly on the number of elements, (to a lesser extent on the size of data)
à For instance, data could have been filtered by the ingestion stage
Contents

● Introduction to MapReduce?
● MapReduce Implementation
● MapReduce Programming patterns
● Aggregate computations
Numerical Summarisation
Goal: Calculate aggregate statistical values over a dataset
Extract features from the dataset elements and compute the same statistical function for each feature
Examples:
● Count occurrences
● Maximum / minimum values
● Average / median / standard deviation
Sample numerical summarisation questions
● Compute what is the maximum PM2.5 registered for each location provided in the dataset
● Return the average Air Quality Index (AQI) registered each week
● Compute for each day of the week the number of locations where the PM2.5 index exceeded 150
Numerical Summarisation Structure
Numerical Summarisation Map and Reduce functions
Numerical Summarisation Combiner?

Meaning it is applicable to SUM, MUL or COUNT (associative) but not SUB, AVERAGE (not associative)
https://en.wikipedia.org/wiki/Associative_property
Computing Averages
Computing Averages
Combining Average
Average is NOT an associative operation
● Cannot be executed partially with the Combiners
Solution: Change Mapper results to be associative (e.g., sum and count)
Emit aggregated quantities, and number of elements
● Mapper: For mark values (100,100,20),
Emit (100,1),(100,1), (20,1)
● Combiner: adds aggregates and number of elements
Emits (220,3)
● Reducer
Adds aggregates (sum, count) from different mappers and computes average
Contents

● Introduction to MapReduce?
● MapReduce Implementation
● MapReduce Programming patterns
● Aggregate computations

Quiz and End

You might also like