0% found this document useful (0 votes)
16 views

_Chapter06Spark—In-memoryDistributedCompu

description of distributed memory huawei

Uploaded by

Njabulo Clement
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd
0% found this document useful (0 votes)
16 views

_Chapter06Spark—In-memoryDistributedCompu

description of distributed memory huawei

Uploaded by

Njabulo Clement
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd
You are on page 1/ 102

Spark — In-memory Distributed Computing

Engine & Flink — Stream and Batch


Processing in a Single Engine
Foreword

⚫ This chapter describes Spark and Flink. Spark is an in-memory distributed


computing engine, and Flink implements stream and batch processing in a
single engine. The first part describes the basic concepts of Spark and the
similarities and differences between the Resilient Distributed Dataset
(RDD), DataSet, and DataFrame data structures in Spark. It also covers the
features of Spark SQL, Spark Streaming, and Structured Streaming. The
second part describes the basic concepts, principles, architecture, time,
window, watermark, and fault tolerance mechanism of Flink.

2 Huawei Confidential
Objectives

⚫ Upon completion of this course, you will be able to:


 Understand the use cases and highlights of Spark.
 Master Spark programming and technical framework.
 Master the core technologies and architecture of Flink.
 Master the time and window mechanism of Flink.
 Master the fault tolerance mechanism of Flink.

3 Huawei Confidential
Contents

1. Spark — In-memory Distributed Computing Engine


◼ Spark Overview
 Spark Data Structure
 Spark Principles and Architecture

2. Flink — Stream and Batch Processing in a Single Engine

4 Huawei Confidential
Spark Overview
⚫ Apache Spark started as a research project at the UC Berkeley AMPLab in 2009.
After being released, Spark grew into a broad developer community, and moved to
the Apache Software Foundation in 2013. Today, the project is developed
collaboratively by a community of hundreds of developers from hundreds of
organizations.
⚫ It is a fast, versatile, and scalable memory-based big data computing engine.
⚫ It is a one-stop solution that integrates batch processing, real-time streaming,
interactive query, graph programming, and machine learning.

5 Huawei Confidential
Spark Use Cases
⚫ Batch processing can be used for extracting, transforming, and loading (ETL).
⚫ Machine learning can be used to automatically determine whether the comments of
online buyers are positive or negative.
⚫ Interactive analysis can be used to query the Hive warehouse.
⚫ Streaming processing can be used for real-time businesses analysis (such as page-
click streams), recommendation systems, and public opinion analysis.

6 Huawei Confidential
Highlights of Spark

Lightweight Fast Flexible Smart

Spark only has Spark can respond Spark offers Spark utilizes
30,000 lines of to small data set flexibility at existing big data
core code. queries in just different levels. components.
subseconds.

7 Huawei Confidential
Spark vs. MapReduce

Hadoop Spark Spark


Data Size 102.5 TB 102 TB 1000 TB

Time Required
72 23 234
(Minutes)
Number of
2100 206 190
Nodes

Cores 50400 6592 6080

Rate 1.4 TB/min 4.27 TB/min 4.27 TB/min

Rate/node 0.67 GB/min 20.7 GB/min 22.5 GB/min

Daytona Gray
Yes Yes Yes
Sort

8 Huawei Confidential
Contents

1. Spark — In-memory Distributed Computing Engine


 Spark Overview
◼ Spark Data Structure
 Spark Principles and Architecture

2. Flink — Stream and Batch Processing in a Single Engine

9 Huawei Confidential
RDD — Core Concept of Spark
⚫ RDDs are elastic, read-only, and partitioned distributed datasets.
⚫ By default, RDDs are stored in the memory and are written to disks when memory is
insufficient.
⚫ RDD data is stored in clusters as partitions.
⚫ RDDs have a lineage mechanism, which allows for rapid data recovery when data is lost.

HDFS Spark cluster External storage

RDD1 RDD2
Hello spark Hello spark
Hello hadoop "Hello Spark" "Hello, Spark" Hello hadoop
China moblie "Hello Hadoop" "Hello, Hadoop" China moblie
"China Mobile" "China, Mobile"

10 Huawei Confidential
RDD Dependencies
Narrow dependencies Wide dependencies

groupByKey
map, filter

join with inputs


co-partitioned
union join with inputs not
co-partitioned

11 Huawei Confidential
Differences Between Wide and Narrow Dependencies —
Operator
⚫ A narrow dependency indicates that only one child RDD can use each partition of a
parent RDD, for example, map, filter, and union.
⚫ A wide dependency indicates that the partitions of multiple child RDDs depend on
the partition of the same parent RDD, for example, groupByKey, reduceByKey, and
sortByKey.

12 Huawei Confidential
Differences Between Wide and Narrow Dependencies —
Fault Tolerance
⚫ If a node is faulty:
 Narrow dependency: Only the parent RDD partition corresponding to the child RDD partition needs
to be recalculated.
 Wide dependency: In extreme cases, all parent RDD partitions need to be recalculated.
⚫ As shown in the following figure, if the b1 partition is lost, a1, a2, and a3 need to be
recalculated.
a1
b1
a2
b2
a3
B
A
13 Huawei Confidential
Differences Between Wide and Narrow Dependencies —
Data Transmission
⚫ Wide dependency usually corresponds to Shuffle operations. During running, the partition of
the same parent RDD needs to be transferred to different child RDD partitions, which may
involve data transmission between multiple nodes.
⚫ The partition of each parent RDD with narrow dependency is transferred to only one child
RDD partition. Typically, the conversion can be completed on one node.

14 Huawei Confidential
Stage Division of RDD

A: B:

G:
Stage 1 groupby

C: D: F:

map join

E:

Stage 2 union
Stage 3

15 Huawei Confidential
RDD Operation Type
⚫ Spark operations can be classified into creation, conversion, control, and behavior.
 Creation: used to create an RDD. An RDD is created through a memory collection and an external
storage system, or by a transformation operation.
 Transformation: An RDD is transformed into a new RDD through certain operations. The
transformation operation of the RDD is a lazy operation, which only defines a new RDD but does
not execute it immediately.
 Control: RDD persistence is performed. An RDD can be stored in the disk or memory based on
different storage policies. For example, the cache API caches the RDD in the memory by default.
 Action: an operation that can trigger Spark running. There are two types of action operations in
Spark. One is to output the calculation result, and the other is to save the RDD to an external file
system or database.

16 Huawei Confidential
Creation Operation
⚫ Currently, there are two types of basic RDD
 Parallel collection: An existing collection is collected and computed in parallel.
 External storage: A function is executed on each record in a file. The file system must be
the HDFS or any storage system supported by Hadoop.
⚫ The two types of RDD can be operated in the same way to obtain a series of
extensions, such as sub-RDD, and form a lineage diagram.

17 Huawei Confidential
Control Operation
⚫ Spark can persistently store RDD to the memory or disk file system persistently. The
RDD in memory can significantly improve iterative computing and data sharing
between computing models. Generally, 60% of the memory of the execution node is
used to cache data, and the remaining 40% is used to run tasks. In Spark, persist
and cache operations are used for data persistency. Cache is a special case of
persist().

18 Huawei Confidential
Transformation Operation — Transformation Operator

Transformation Description

Uses the func method to generate a new RDD for each element in
map(func)
the RDD that invokes map.

func is used for each element of an RDD that invokes filter and then
filter(func)
an RDD with elements containing func (the value is true) is returned.

reduceBykey(func, It is similar to groupBykey. However, the value of each key is


[numTasks]) calculated based on the provided func to obtain a new value.

If the data set is (K, V) and the associated data set is (K, W), then (K,
join(otherDataset, (V, W) is returned.
[numTasks]) leftOuterJoin, rightOutJoin, and fullOuterJoin are supported.

19 Huawei Confidential
Action Operation — Action Operator

Action Description
reduce(func) Aggregates elements in a dataset based on functions.
Used to encapsulate the filter result or a small enough result and
collect()
return an array.
count() Collects statistics on the number of elements in an RDD.
first() Obtains the first element of a dataset.
take(n) Obtains the top elements of a dataset and returns an array.
This API is used to write the dataset to a text file or HDFS. Spark
saveAsTextFile(path)
converts each record into a row and writes the row to the file.

20 Huawei Confidential
DataFrame
⚫ Similar to RDD, DataFrame is also an invariable, elastic, and distributed dataset. It records
the data structure information, that is, schema, in addition to data. The schema is similar to
a two-dimensional table.
⚫ The query plan of DataFrame can be optimized by Spark Catalyst Optimizer. Spark is easy to
use, which improves high-quality execution efficiency. Programs written by DataFrame can
be converted into efficient forms for execution.

21 Huawei Confidential
DataSet
⚫ DataFrame is a special case of DataSet (DataFrame=Dataset[Row]).
Therefore, you can use the as method to convert DataFrame to DataSet. Row
is a common type where all table structure information is represented by
row.
⚫ DataSet, a typed dataset, includes Dataset[Car] and Dataset[Person].

22 Huawei Confidential
Differences Between DataFrame, DataSet, and RDD
⚫ Assume that there are two lines of data in an RDD, which are displayed as follows:
1, Lucy, 23
2, Tony, 35

⚫ The data in DataFrame is displayed as follows:


ID:String Name:String Age:int
1 Lucy 23
2 Tony 35

⚫ The data in DataSet is displayed as follows:


Value:People[age:bight,id:bigint,name:string]

People(id=1,name=“Lucy”,age=23)

People(id=2,name=“Tony”,age=35)

23 Huawei Confidential
Typical Case — WordCount

textFile flatMap map reduceByKey saveAsTextFile

HDFS RDD RDD RDD RDD HDFS

An apple An apple (An, 1) (An, 1) (An, 1)


A pair of A pair of (apple, 1) (A,1) (A,1)
An apple
shoes shoes Orange (A, 1) (apple, 2)
A pairHDFS
of shoes HDFS
(apple, 2)
Orange apple apple (pair, 1) (pair, 1) (pair, 1)
Orange apple
(of, 1) (of, 1) (of, 1)
(shoes, 1) (shoes, 1) (shoes, 1)
(Orange, 1) (Orange, 1) (Orange, 1)
(apple, 1)

25 Huawei Confidential
WordCount
object WordCount

def main (args: Array[String]): Unit = {

// Configure the Spark application name.

val conf = new


Create a SparkContext
object and set the SparkConf().setAppName("WordCount") Load the text file from
application name to HDFS to obtain the
Wordcount. val sc: SparkContext = new SparkContext(conf) RDD data set.

Invoke the transformation val textFile = sc.textFile("hdfs://...")


of the RDD for calculation.
Split the text file by space, val counts = textFile.flatMap(line => line.split(" "))
set the count of each word .map(word => (word, 1)) Invoke the action to save
to 1, and combine the the result.
counts in the same key. .reduceByKey(_ + _) This line triggers the
This step is distributed to actual task execution.
each Executor for counts.saveAsTextFile("hdfs://...")
execution. }

}
26 Huawei Confidential
Contents

1. Spark — In-memory Distributed Computing Engine


 Spark Overview
 Spark Data Structure
◼ Spark Principles and Architecture

2. Flink — Stream and Batch Processing in a Single Engine

27 Huawei Confidential
Spark Architecture

Structured Spark
Spark SQL MLlib GraphX SparkR
Streaming Streaming

Spark Core

Standalone YARN Mesos

Existing functions of Spark 1.0

New functions of Spark 2.0

28 Huawei Confidential
Spark SQL Overview
⚫ Spark SQL is a module used for structured data processing in Spark. In Spark applications,
you can seamlessly use SQL statements or DataFrame APIs to query structured data.

SQL AST
Analysis Logical optimization Code generation

Cost model
Selected
Unresolved Optimized
DataFrame Logical plan Physical plans physical RDDs
logical plan logical plan plan

Catalog
Dataset

29 Huawei Confidential
Spark SQL vs. Hive
⚫ Differences
 The execution engine of Spark SQL is Spark Core, and the default execution engine of
Hive is MapReduce.
 Spark SQL executes 10 to 100 times faster than Hive.
 Spark SQL does not support buckets, but Hive does.
⚫ Relationships
 Spark SQL depends on Hive's metadata.
 Spark SQL is compatible with most Hive syntax and functions.
 Spark SQL can use the custom functions of Hive.

30 Huawei Confidential
Structured Streaming Overview
⚫ Structured Streaming is a stream processing engine built on the Spark SQL engine.
You can compile a streaming computing process like using static RDD data. Spark
SQL will take care of running it incrementally and continuously, and then updates
the final result as streaming data continues to arrive.

31 Huawei Confidential
Structured Streaming Overview

Data stream Unbounded Table

new data in the


data stream
=
new rows appended to
an unbounded table

Data stream as an unbounded table

32 Huawei Confidential
Sample Programming Model of Structured Streaming

Cat dog Dog


Owl cat
Dog dog owl

1 2 3
Time

Enter data in the Cat dog Cat dog Cat dog


unbounded table. Dog dog Dog dog Dog dog
Owl cat Owl cat
Dog
owl

T=1 computing result T=2 computing result T=3 computing result

Cat 2 Cat 2
Cat 1
Result dog 3 dog 4
dog 3
owl 1 owl 2

33 Huawei Confidential
Spark Streaming Overview
⚫ The basic principle of Spark Streaming is to split real-time input data streams by
time slice (in seconds), and then use the Spark engine to process data of each time
slice in a way similar to batch processing.

Batches of
Input data stream Spark Batches of input data Spark processed data
Streaming Engine

34 Huawei Confidential
Window Interval and Sliding Interval

time 1 time 2 time 3 time 4 time 5

Original
DStream

Window-based
operation
Windowed
DStream
Window Window Window
at time 1 at time 3 at time 5

A window slides on DStream, merges and performs operations on the RDDs that fall into the window,
and generates windowed RDDs.
Window length: indicates the duration of a window.
Sliding window interval: indicates the interval for performing window operations.

35 Huawei Confidential
Spark Streaming vs. Storm

Parameter Storm Spark Streaming

Quasi real time: Data within a


Real time: A piece of data is
Real-time computing model time range is collected and
processed once it is requested.
processed as an RDD.

Real-time computing delay In milliseconds In seconds

Throughput Low High

Transaction mechanism Supported and perfect Supported but not perfect

Normal for Checkpoint and


Fault tolerance High for ZooKeeper and Acker
WAL

Dynamic adjustment
Supported Not supported
parallelism

36 Huawei Confidential
Quiz

1. What are the features of Spark?

2. What are the advantages of Spark over MapReduce?

3. What are the differences between wide and narrow dependencies of Spark?

4. What are the use cases of Spark?

37 Huawei Confidential
Quiz

5. RDD operators are classified into: and .

6. The module is the core module of Spark.

7. RDD dependency types include and .

38 Huawei Confidential
Contents

1. Spark — In-memory Distributed Computing Engine

2. Flink — Stream and Batch Processing in a Single Engine


◼ Flink Principles and Architecture
 Flink Time and Window
 Flink Watermark
 Flink Fault Tolerance Mechanism

39 Huawei Confidential
Flink Overview
⚫ Flink started through joint research by Berlin University of Technology, Humboldt University of Berlin,
and Hasso Plattner Institute in Potsdam in 2010. In April 2014, Flink was donated to the Apache
Software Foundation as an incubating project. In December 2014, Flink graduated to become a top-
level project of the Apache Software Foundation.
⚫ Apache Flink is an open source stream processing framework for distributed, high-performance stream
processing applications. Flink supports real-time computing with both high throughput and exactly-
once semantics, and provides batch data processing.
⚫ Unlike other data processing engines on the market, Flink and Spark support both stream and batch
processing. Technically, Spark simulates stream computing based on batch processing. Flink is the
opposite, simulating batch processing based on stream computing.

40 Huawei Confidential
Key Concepts of Flink
⚫ Continuous processing of stream data
⚫ Event time
⚫ Stateful stream processing
⚫ Status snapshot

41 Huawei Confidential
Core Concepts of Flink
⚫ The biggest difference between Flink and other stream computing engines is state
management.
⚫ Flink provides built-in state management. You can store states in Flink instead of
storing them in an external system. This:
 Reduces the dependency of the computing engine on external systems, simplifying
deployment and O&M.
 Significantly improves performance.

42 Huawei Confidential
Flink Runtime Architecture

43 Huawei Confidential
Core Concept — DataStream
⚫ Flink uses class DataStream to represent the stream data in Flink programs.
⚫ You can think of DataStream as immutable collections of data that can contain duplicates.
The number of elements in DataStream is unlimited.

44 Huawei Confidential
Core Concept — DataSet
⚫ DataSet programs in Flink are regular programs that transform data sets (for
example, filtering, mapping, joining, and grouping). The datasets are initially created
by reading files or from local collections. Results are returned via sinks, which can
write the data to (distributed) files or to standard output (for example, the
command line terminal).

45 Huawei Confidential
Flink Program
⚫ Flink programs consist of three parts: source, transformation, and sink. The source
reads data from data sources such as HDFS, Kafka, and text. The transformation is
responsible for data transformation operations. The sink outputs final data (such as
HDFS, Kafka, and text). Data that flows between these parts is called a stream.

Data Source Transformations Data Sink

46 Huawei Confidential
Flink Data Sources
⚫ Batch processing
Stream processing
 Files ⚫ Files

◼ HDFS, local file systems, and MapR file system ⚫ Socket streams

⚫ Kafka
◼ Text, CSV, Avro, and Hadoop input formats
⚫ RabbitMQ
 JDBC
⚫ Flume

 HBase ⚫ Collections

⚫ Implement your own


 Collections
 SourceFunction.collect

47 Huawei Confidential
Flink Program Running

48 Huawei Confidential
Flink Job Process
⚫ A user submits a Flink program to JobClient. JobClient processes, parses, and
optimizes the program, and then submits the program to JobManager. Then,
TaskManager runs the task.

4. Submits a task.
TaskManager
1. Submits a 2. Sends the program.
Flink program.
Actor JobClient JobManager
3. The submission succeeds.

6. Returns the task result to JobClient.


TaskManager

5. Periodically reports the task status.

49 Huawei Confidential
Flink Job Process
⚫ JobClient is a bridge between Flink programs and JobManager. It receives programs,
parses and optimizes program execution plans, and submits them to JobManager.
There are three types of operators in Flink.
 Source Operator: data source operations, such as file, socket, and Kafka.
 Transformation Operator: data transformation operations, such as map, flatMap, and
reduce operators.
 Sink Operator: data storage operations, for example, data is stored in HDFS, MySQL, and
Kafka.

50 Huawei Confidential
Complete Flink Program

public class Example {


public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env
=StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Person> flintstones = env.fromElements(
new Person("Fred", 35), new Person("Wilma", 35), new Person("Pebbles", 2));
DataStream<Person> adults = flintstones.filter(new FilterFunction<Person>() {
@Override
public boolean filter(Person person) throws Exception {return person.age >= 18;}
});
adults.print();
env.execute();
}

51 Huawei Confidential
Complete Flink Program
public static class Person {
public String name;
public Integer age;
public Person() {};
public Person(String name, Integer age) {
this.name = name;
this.age = age;
};
public String toString() {
return this.name.toString() + ": age " + this.age.toString();
};
}
}

52 Huawei Confidential
Data Processing
⚫ Apache Flink supports both batch and stream processing that can be used to create
a number of event-based applications.
 Flink is first of all a pure stream computing engine with data streams as its basic data
model. A stream can be an unbounded infinite stream, that is, stream processing in a
general sense. It can also be a bounded finite stream, that is, batch processing. Flink thus
uses a single architecture to support both stream and batch processing.
 Flink has the additional capability of supporting stateful computation. Processing is called
stateless when the result of processing an event or a piece of data is only related to the
content of that event itself. Alternatively, if the result is related to previously processed
events, it is called stateful processing.

53 Huawei Confidential
Bounded Stream and Unbounded Stream
⚫ Unbounded stream: Only the start of a stream is defined. Data sources generate data
endlessly. The data of unbounded streams must be processed continuously. That is, the data
must be processed immediately after being read. You cannot wait until all data arrives to
process it, because the input is infinite and cannot be completed at any time. Processing
unbounded stream data typically requires ingesting events in a particular sequence, such as
the sequence of the events that occurred, so that the integrity of the results can be inferred.
⚫ Bounded stream: Both the start and end of a stream are defined. Bounded streams can be
computed after all data is read. All data in a bounded stream can be sorted, without
ingesting events in an order. Bounded stream processing is often referred to as batch
processing.

54 Huawei Confidential
Example Batch Processing
⚫ Batch processing is a very special case of stream processing. Instead of defining a
sliding or tumbling window over the data and producing results every time the
window slides, we define a global window, with all records belonging to the same
window. For example, a simple Flink program that counts visitors at a website every
hour, grouped by region continuously, is the following:

val counts = visits


.keyBy("region")
.timeWindow(Time.hours(1))
.sum("visits")

55 Huawei Confidential
Example Batch Processing
⚫ If you know that the input data is bounded, you can implement batch processing
using the following code:
val counts = visits
.keyBy("region")
.window(GlobalWindows.create)
.trigger(EndOfTimeTrigger.create)
.sum("visits")

⚫ If the input data is bounded, the result of the following code is the same as that of
the preceding code:

val counts = visits


.groupBy("region")
.sum("visits")

56 Huawei Confidential
Flink Batch Processing Model
⚫ Flink uses an underlying engine to support both stream processing and batch processing.

DataStream API DataSet API

Backtracking for
Checkpoints, status
scheduling and recovery,
management,
special memory data
watermarks, windows,
structures, and query
and triggers
optimization

Stream processing engine

57 Huawei Confidential
Stream and Batch Processing Mechanisms
⚫ The two sets of Flink mechanisms correspond to their respective APIs (DataStream
API and DataSet API). When creating a Flink job, you cannot combine the two sets
of mechanisms to use all Flink functions at the same time.
⚫ Flink supports two types of relational APIs: Table API and SQL. Both of these APIs
are used for unified batch and stream processing, which means that relational APIs
execute queries with the same semantics and produce the same results on
unbounded real-time data streams and bounded historical data streams.
 The Table API and SQL are becoming the main APIs to be used with unified stream and
batch processing for analytical use cases.
 The DataStream API is the primary API for data-driven applications and pipelines.

58 Huawei Confidential
Contents

1. Spark — In-memory Distributed Computing Engine

2. Flink — Stream and Batch Processing in a Single Engine


 Flink Principles and Architecture
◼ Flink Time and Window
 Flink Watermark
 Flink Fault Tolerance Mechanism

59 Huawei Confidential
Time Background
⚫ In stream processor programming, time processing is very critical. For example,
event stream data (such as server log data, web page click data, and transaction
data) is continuously generated. In this case, you need to use keys to group events
and count the events corresponding to each key at a specified interval. This is our
known "big data" application.

60 Huawei Confidential
Time Classification in Stream Processing
⚫ During data stream processing, the system time, that is, processing time, is often
used as the time of an event. In fact, the processing time is the time imposed on the
event. Due to reasons such as network latency, the processing time cannot reflect
the sequence of events.
⚫ In actual scenarios, the time of each event can be classified into three types:
 Event time: the time when an event occurs.
 Ingestion time: the time when an event arrives at the stream processing system.
 Processing time: the time when an event is processed by the system.

61 Huawei Confidential
Example Time Classifications
⚫ For example, if a log enters Flink at 10:00:00.123 on November 12, 2019 and reaches
Windows at 10:00:01.234 on November 12, 2019, the log content is as follows:
 2019-11-02 18:37:15.624 INFO Fail over to rm2
◼ 2019-11-02 18:37:15.624 is the event time.
◼ 2019-11-12 10:00:00.123 is the ingestion time.
◼ 2019-11-12 10:00:01.234 is the processing time.

62 Huawei Confidential
Differences Among Three Time Classifications
⚫ In the actual situation, the sequence of events is different from the system time. These
differences are caused by factors such as the network latency and processing time.

 The horizontal coordinate indicates the event


time, and the vertical coordinate indicates the
Skew

Processing Time
processing time. Ideally, the coordinate ~Watermark
formed by the event time and processing Ideal
time should form a line with a tilt angle of 45
degrees. However, the processing time is later
than the event time. As a result, the sequence
Event Time
of events is inconsistent.

63 Huawei Confidential
Time Semantics Supported by Flink

Processing Time Event Time (Row Time)

Time in the real world Time in the data world

Local time of the data node Timestamp carried in a record

Simple processing Complex processing

Uncertain results (cannot be reproduced) Certain results (reproducible)

64 Huawei Confidential
Window Overview
⚫ Streaming computing system is a data processing engine designed to process infinite
data sets. Infinite data sets refer to constantly growing data sets. Window is a
method for splitting infinite data sets into finite blocks for processing.
⚫ Windows are at the heart of processing infinite streams. Windows split the stream
into "buckets" of finite sizes, over which we can apply computations.

65 Huawei Confidential
Window Types
⚫ Windows can be classified into the following types:
 Count window: A data-driven window is generated based on the specified number of data
records, which is irrelevant to time.
 Time window: A time-driven window is generated based on time.
⚫ Apache Flink is a distributed computing framework that supports infinite stream
processing. In Flink, windows can divide infinite streams into finite streams. Flink
windows can be either Time windows or Count windows.

66 Huawei Confidential
Time Window Types
⚫ According to the window implementation principles, time windows can be classified
into:
 Tumbling window
 Sliding window
 Session window

67 Huawei Confidential
Tumbling Window
⚫ The data is sliced according to a fixed window length.
⚫ Characteristics: The time is aligned, the window length is fixed, and the windows do not overlap.
⚫ Use case: BI statistics collection (aggregate calculation in each time period)

Window 1 Window 2 Window 3 Window 4 Window 5

User 1

User 2

User 3

Time
Window size

68 Huawei Confidential
Sliding Window
⚫ The sliding window is a more generalized form of a fixed window. It consists of a fixed window length
and a sliding interval.
⚫ Characteristics: The time is aligned, the window length is fixed, and the windows can be overlapping.
⚫ Use case: Collect statistics on the failure rate of an API in the last 5 minutes to determine whether to
report an alarm. Window 1 Window 3

User 1

User 2

User 3 Window 2
Window 4

Time
Window size Sliding window size

69 Huawei Confidential
Session Window
⚫ A session window consists of a series of events and a timeout interval of a specified duration.
It is similar to a web application session. That is, a new window is generated if no new data
is received within a period of time. Characteristics: The time is not aligned.

Window 1 Window 2 Window 3 Window 4


User 1

Window 1 Window 2 Window 3


Window 4
User 2

Window 1 Window 2 Window 3


User 3 Session interval

Time

70 Huawei Confidential
Code Definition
⚫ A tumbling time window of 1 minute can be defined in Flink simply as:
 stream.timeWindow(Time.minutes(1))
⚫ A sliding time window of 1 minute that slides every 30 seconds can be defined as
simply as:
 stream.timeWindow(Time.minutes(1),Time.seconds(30))

71 Huawei Confidential
Contents

1. Spark — In-memory Distributed Computing Engine

2. Flink — Stream and Batch Processing in a Single Engine


 Flink Principles and Architecture
 Flink Time and Window
◼ Flink Watermark
 Flink Fault Tolerance Mechanism

72 Huawei Confidential
Out-of-Order Data Problem
⚫ There is a process from event generation to stream processing through the source and then
to the operator. In most cases, data is sent to the operator based on the time when the
event is generated. However, out-of-order data may be generated, for example, events
received by Flink are not sorted strictly based on the event time due to network problems.

Ideal situation

Actual situation

73 Huawei Confidential
Out-of-Order Data Example
⚫ For example, an application records all clicks of a user and sends logs back. (When
the network is poor, the logs are saved locally and sent back later.) User A performs
operations on the application at 11:02:00, and user B performs operations on the
application at 11:03:00. However, the network of user A is unstable and the log
sending is delayed. As a result, the server receives the message from user B at
11:03:00 and then receives the message from user A at 11:02:00, the message is out
of order.

74 Huawei Confidential
Why Are Watermarks Needed?
⚫ For infinite datasets, there is no effective way to determine data integrity. Therefore,
watermark is a concept based on event time to describe the integrity of data
streams. If events are measured by processing time, everything is ordered and
perfect. Therefore, watermarks are not required. In other words, event time causes
disorder. Watermarks are used to solve the disorder problem. The so-called disorder
is actually an event delay. For a delayed element, it is impossible to wait indefinitely.
A mechanism must be provided to ensure that the window is triggered for
calculation after a specific point in time. This special mechanism is watermark,
which tells operators that delayed messages should no longer be received.

75 Huawei Confidential
Watermark Principles
⚫ How does Flink ensure that all data has been processed when the event time-based
window is destroyed? This is what a watermark does. A watermark carries a
monotonically increasing timestamp t. Watermark(t) indicates that all data whose
timestamp is less than or equal to t has arrived, and data whose timestamp is less
than or equal to t will not be received in the future. Therefore, the window can be
safely triggered and destroyed.

76 Huawei Confidential
Watermark Principles
⚫ The following figure shows the watermark of ordered streams (Watermark is set to 0).
Ordered stream

23 21 20 19 18 17 15 14 11 10 9 9 7

W (20) W (11)
Event
watermark Event timestamp

⚫ The following figure shows the watermark of out-of-order streams (Watermark is set to 2).

Out-of-order stream

21 19 20 17 22 12 17 14 12 9 15 11 7

W (17) W (11)
Event
watermark Event timestamp

77 Huawei Confidential
Delayed Data
⚫ Watermark can cope with out-of-order data, but a perfect watermark value cannot
be obtained in the real world. Perfect values are either unobtainable or too
expensive to obtain. Therefore, there is a low probability that the data before the
timestamp t is received after Watermark(t). In Flink, the data is defined as late
elements. You can also specify the maximum delay time in the window. The default
value is 0. You can use the following code to set the maximum delay time:

input .keyBy(<key selector>)


.window(<window assigner>)
.allowedLateness(<time>)
.<windowed transformation>(<window function>);

78 Huawei Confidential
Delayed Data Processing Mechanism
⚫ Delayed events are special out-of-order events. Different from common out-of-order
events, their out-of-order degree exceeds the degree that watermarks can predict.
As a result, the window is closed before they arrive.
⚫ In this case, you can use any of the following methods to solve the problem:
 Reactivate the closed windows and recalculate to correct the results (with the Side Output
mechanism).
 Collect the delayed events and process them separately (with the Allowed Lateness mechanism).
 Consider delayed events as error messages and discard them.

⚫ By default, Flink employs the third method.

79 Huawei Confidential
Side Output Mechanism
⚫ In the Side Output mechanism, a delayed event can be placed in an independent data
stream, which can be used as a by-product of the window calculation result so that users
can obtain and perform special processing on the delayed event.
⚫ After allowedLateness is set, delayed data can also trigger the window for output. We can
obtain this late data using the Side Output mechanism of Flink. The method is as follows:

final OutputTag<T> lateOutputTag = new OutputTag<T>("late-date"){};

DataStream input =...;


SingleOutputStreamOperator<T> result = input
.keyBy(<key selector>)
.window(<window assigner>)
.allowedLateness(<time>)
.<windowed transformation>(<window function>);
DataStream<T> lateStream = result.getSideOutput(lateOutputTag);

80 Huawei Confidential
Allowed Lateness Mechanism
⚫ Allowed Lateness allows users to specify a maximum allowed lateness. After the
window is closed, Flink keeps the state of windows until their allowed lateness
expires. During this period, the delayed events are not discarded, but window
recalculation is triggered by default. Keeping the window state requires extra
memory. If the ProcessWindowFunction API is used for window calculation, each
delayed event may trigger a full calculation of the window, which costs a lot.
Therefore, the allowed lateness should not be too long, and the number of delayed
events should not be too many.

81 Huawei Confidential
Contents

1. Spark — In-memory Distributed Computing Engine

2. Flink — Stream and Batch Processing in a Single Engine


 Flink Principles and Architecture
 Flink Time and Window
 Flink Watermark
◼ Flink Fault Tolerance Mechanism

82 Huawei Confidential
Checkpointing
⚫ How does Flink ensure exactly-once semantics?
 Flink uses a feature called checkpoint to reset the system to the correct state when a fault
occurs.
 Flink offers the fault tolerance mechanism based on the lightweight asynchronous
snapshot technology for distributed data streams. It can globally take snapshots of the
state data of tasks or operations at the same time point, that is, backs up the states of
programs.

83 Huawei Confidential
Checkpoint Mechanism
⚫ Flink provides the checkpoint fault tolerance mechanism. The distributed snapshot function
can take global snapshots of task/operator status data at the same time point in a unified
manner. Flink periodically generates checkpoint barriers on the input data set and divides
the data within the interval to the corresponding checkpoints through barriers. When an
exception occurs in an application, the operator can restore the status of all operators from
the previous snapshot to ensure data consistency.
 For stream processing applications, these snapshots are very lightweight and can be taken
frequently without compromising performance.
 For applications with small state, these snapshots are very light-weight and can be taken frequently
without impacting the performance much. During checkpointing, the state is stored at a
configurable place (such as the JobManager node or HDFS).

84 Huawei Confidential
Checkpointing Mechanism
⚫ A core element in Flink's distributed snapshotting is stream barriers. These barriers are injected into the
data stream and flow with the records as part of the data stream. Barriers flow strictly in line. A barrier
separates the records in the data stream into the set of records that goes into the current snapshot,
and the records that go into the next snapshot. Barriers do not interrupt the flow of the stream and are
hence very lightweight. Multiple barriers from different snapshots can be in the stream at the same
time, which means that various snapshots may happen concurrently.
New record Data stream Old record

checkpoint stream record


barrier n-1 (event)

part of part of part of


checkpoint n+1 checkpoint n checkpoint n-1

85 Huawei Confidential
Checkpointing Configuration
⚫ By default, checkpointing is disabled. To enable checkpointing, call enableCheckpointing(n),
where n is the checkpoint interval in milliseconds.

env.enableCheckpointing(1000) //Enable checkpointing and set the


checkpointing interval to 1000 ms. Set this parameter based on site
requirements. If the state is large, set it to a larger value.

86 Huawei Confidential
Checkpointing Configuration
⚫ Exactly-once or at-least-once
 Exactly-once ensures end-to-end data consistency, prevents data loss and duplicates, but
delivers poor Flink performance.
 At-least-once applies to scenarios that have high requirements on the latency and
throughput but low requirements on data consistency.
⚫ Exactly-once is used by default. You can use the setCheckpointingMode() method to
set the semantic mode.

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)

87 Huawei Confidential
Checkpointing Configuration
⚫ Checkpointing timeout
 Specifies the timeout period of checkpoint execution. Once the threshold is reached, Flink
interrupts the checkpoint process and regards it as timeout.
 This metric can be set using the setCheckpointTimeout method. The default value is 10
minutes.

env.getCheckpointConfig().setCheckpointingTimeout(60000)

88 Huawei Confidential
Checkpointing Configuration
⚫ Minimum interval between checkpoints
 When checkpoints frequently end up taking longer than the base interval because the
state grew larger than planned, the system constantly takes checkpoints. This can mean
that too many compute resources are constantly tied up in checkpointing, thereby
affecting the overall application performance. To prevent such a situation, applications
can define a minimum duration between two checkpoints:

env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500)

89 Huawei Confidential
Checkpointing Configuration
⚫ Maximum number of concurrent checkpoints
 The number of checkpoints that can be executed at the same time. By default, the system
will not trigger another checkpoint while one is still in progress. If you configure multiple
checkpoints, the system will trigger multiple checkpoints at the same time.

env.getCheckpointConfig().setMaxConcurrentCheckpoints(500)

90 Huawei Confidential
Checkpointing Configuration
⚫ External checkpoints
 You can configure periodic checkpoints to be persisted externally. Externalized
checkpoints write their metadata out to persistent storage and are not automatically
cleaned up when the job fails. This way, you will have a checkpoint around to resume
from if your job fails.

env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)

91 Huawei Confidential
How Do I Restore Data for a Job?
⚫ The checkpoint can be retained in an external media when a job is cancelled. Flink
also has another mechanism, savepoint, to restore job data.
⚫ Savepoints are a special implementation of checkpoints. The underlying layer
actually uses the checkpointing mechanism. Savepoints are triggered by manual
commands and the results are persisted to a specified storage path. Savepoints help
users save system state data during cluster upgrade and maintenance. This ensures
that the system restores to the original computing state when application
termination operations such as shutdown O&M or application upgrade are
performed. As a result, end-to-end exactly-once semantics can be ensured.

92 Huawei Confidential
Savepoint vs. Checkpoint

Parameter Checkpoint Savepoint


Trigger and Management Automatically triggered and managed Manually triggered and managed by
Method by Flink users
Quickly recovers tasks when exceptions Backs up data as planned, for example,
Usage occur, for example, timeout exceptions modifying code and adjusting
caused by network jitter. concurrency.
• Persistent
• Lightweight • Stored in a standard format,
• Automatic restoration from faults allowing code or configuration
Characteristics
• Cleared by default after a job is changes
stopped • Manually triggered for restoration
from a savepoint

93 Huawei Confidential
State Backend
⚫ Programs written in the DataStream API often hold their states in various forms.
When checkpointing is activated, such states are persisted upon checkpoints to
prevent against data loss and ensure data consistency in recovery. How a state is
represented internally, and how and where it is persisted upon checkpoints depends
on the chosen state backend.

94 Huawei Confidential
State Storage Method — MemoryStateBackend
⚫ Construction method
 MemoryStateBackend(int maxStateSize, boolean asynchronousSnapshots)
 Storage method
◼ State: TaskManager memory
◼ Checkpoint: JobManager memory

 Capacity limit
◼ The default value of maxStateSize for a single state is 5 MB.
◼ maxStateSize ≤ akka.framesize (Default value: 10 MB)
◼ The total size cannot exceed the memory of JobManager.

 The MemoryStateBackend is encouraged for local testing and jobs that hold few states, such as ETL.

95 Huawei Confidential
State Storage Method — FsStateBackend
⚫ Construction method
 FsStateBackend(URI checkpointDataUri, boolean asynchronousSnapshots)
 Storage method
◼ State: TaskManager memory
◼ Checkpoint: external file storage system (local or HDFS)

 Capacity limit
◼ The size of states on a single TaskManager cannot exceed its memory.
◼ The total size cannot exceed the configured file system capacity.

 The FsStateBackend is recommended for jobs with common states, such as aggregation at the
minute-window level and join, and jobs requiring high-availability setups. It can be used in
production scenarios.

96 Huawei Confidential
State Storage Method — RocksDBStateBackend
⚫ Construction method
 RocksDBStateBackend(URI checkpointDataUri, boolean enableIncrementalCheckpointing)
 Storage mode
◼ State: KV database on the TaskManager (used memory + disk)
◼ Checkpoint: external file storage system (local or HDFS)

 Capacity limit
◼ The size of states on a single TaskManager cannot exceed the memory and disk size of the TaskManager.
◼ The maximum size of a key is 2 GB.

 The total size cannot exceed the configured file system capacity.

⚫ The RocksDBStateBackend is recommended for jobs with very large states, for example, aggregation at
the day-window level, jobs requiring high-availability setups, and jobs that do not require high
read/write performance. It can be used in production scenarios.

97 Huawei Confidential
Summary

⚫ The first part of this chapter describes the basic concepts and technical
architecture of Spark, including SparkSQL, Structured Streaming, and Spark
Streaming. The second part describes the architecture and technical
principles of Flink and the running process of Flink programs. The focus is
on the difference between Flink stream processing and batch processing. In
the long run, the DataStream API should completely include the DataSet
API through bounded data streams.

98 Huawei Confidential
Quiz

1. (Multiple-choice) Which of the following are Flink components? ( )

A. JobManager

B. TaskManager

C. ResourceManager

D. Dispatcher

2. (Single-choice) Flink stores the state data of a specific task or operation in HDFS by
default. ( )

A. True

B. False

99 Huawei Confidential
Quiz
3. (Single-choice) The sliding window is a more generalized form of a fixed window. It consists of a fixed window length
and a sliding interval. ( )
A. True
B. False
4. (Multiple-choice) Which of the following are state storage methods provided by Flink? ( )
A. MemoryStateBackend
B. FsStateBackend
C. RocksDBStateBackend
D. HDFS
5. (Single-choice) Flink provides three layered APIs. Each API offers a different trade-off between conciseness and
expressiveness and targets different use cases. ( )
A. True
B. False

100 Huawei Confidential


Acronyms and Abbreviations
⚫ ETL: extract, transform, and load, a process that involves extracting data from
sources, transforming the data, and loading the data to final targets
⚫ FIFO: First Input First Output
⚫ SQL: Structured Query Language
⚫ Write-Ahead Logging (WAL)
⚫ BI: Business Intelligence
⚫ API: Application Programming Interface
⚫ APP: Application
⚫ SQL: Structured Query Language
101 Huawei Confidential
Recommendations

⚫ Huawei Talent
 https://e.huawei.com/en/talent
⚫ Huawei Enterprise Product & Service Support
 https://support.huawei.com/enterprise/en/index.html
⚫ Huawei Cloud
 https://www.huaweicloud.com/intl/en-us/

102 Huawei Confidential


Thank you. 把数字世界带入每个人、每个家庭、
每个组织,构建万物互联的智能世界。
Bring digital to every person, home, and
organization for a fully connected,
intelligent world.

Copyright© 2022 Huawei Technologies Co., Ltd.


All Rights Reserved.

The information in this document may contain predictive


statements including, without limitation, statements regarding
the future financial and operating results, future product
portfolio, new technology, etc. There are a number of factors
that could cause actual results and developments to differ
materially from those expressed or implied in the predictive
statements. Therefore, such information is provided for reference
purpose only and constitutes neither an offer nor an acceptance.
Huawei may change the information at any time without notice.

You might also like