0% found this document useful (0 votes)
523 views30 pages

Kafka Internals

This document provides an overview of setting up a single node Kafka cluster with multiple brokers. It discusses starting Zookeeper, configuring three brokers each with unique IDs and log directories, creating a topic with replication factor 3, producing and consuming messages, and how the brokers maintain membership in the cluster through Zookeeper. It also summarizes Kafka's internal design including replication, how it handles requests from producers and consumers, and storage formats.

Uploaded by

Darshna Gupta
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as DOCX, PDF, TXT or read online on Scribd
0% found this document useful (0 votes)
523 views30 pages

Kafka Internals

This document provides an overview of setting up a single node Kafka cluster with multiple brokers. It discusses starting Zookeeper, configuring three brokers each with unique IDs and log directories, creating a topic with replication factor 3, producing and consuming messages, and how the brokers maintain membership in the cluster through Zookeeper. It also summarizes Kafka's internal design including replication, how it handles requests from producers and consumers, and storage formats.

Uploaded by

Darshna Gupta
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as DOCX, PDF, TXT or read online on Scribd
You are on page 1/ 30

Course Introduction

We kick-started the learning of Kafka, by knowing what is Kafka, its fundamental


components and uses, in our first course Kafka - Premiera Ola.

We are taking a step ahead here and will discuss the following concepts in this course:

 Steps to set up a single node - multi-broker cluster


 Kafka Internal Design
 Deep dive into Producer and Consumer
 Kafka APIs

Single Node-Multiple Broker Cluster - Setup


In 'Kafka - Premiera Ola', we described the steps to configure Single node-Single broker
cluster.

Following are the steps to configure a Single node-Multiple broker cluster.

 Start the Zookeeper server.

 Create three separate brokers and change their configuration parameters.

 Run each broker in three separate terminals.

 Create a topic Multibrokerapplication with replication factor 3.

 Create a Producer and send messages to the topic.

 Start consumer and consume messages.

Starting Zookeeper Server


Following is the command to start the Zookeeper server.

bin/zookeeper-server-start.sh config/zookeeper.properties

Brokers and Parameters Configuration


i]. Make copies of config/server.properties file into two new config
files config/server-one.properties and config/server-two.properties, for broker 1
and broker 2 respectively.

ii]. Edit the following parameters of config/server-one.properties.

 The id of the broker must be set to a unique integer.

broker.id=1

 The port to which the socket server listens on.

 port=9093

 A comma-separated list of directories in which the log files are stored.

 log.dirs=/tmp/kafka-logs-1

Repeat the same configurations for config/server-two.propertieswith appropriate


parameters set.

Running Each Broker in Three Terminals


After changing parameters for the two copies of server.properties file, open three
separate terminals, one for each broker and start Kafka server one by one.

Broker1
bin/kafka-server-start.sh config/server.properties
Broker2
bin/kafka-server-start.sh config/server-one.properties
Broker3
bin/kafka-server-start.sh config/server-two.properties

Now, our machine is running three Kafka brokers.

Creating a Topic with Replication Factor


Now, let us create a topic Multibrokerapplicationwith replication factor 3 as we have
three separate brokers.

Let us use the command below to run in the terminal.

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 -parti


tions 1 --topic Multibrokerapplication

Checking Topic and Partition Summary


Using the following command, you can check the summary of partitions, topic name,
replication factor and the in sync replicas.

bin/kafka-topics.sh --describe --zookeeper localhost:2181


--topic Multibrokerapplication

The output of the above snippet is given below:

Topic:Multibrokerapplication PartitionCount:1
ReplicationFactor:3 Configs:

Topic:Multibrokerapplication Partition:0 Leader:0


Replicas:0,2,1 Isr:0,2,1

From the above output, we can understand Multibrokerapplication has one partition
with three replicas.

The first broker (broker.id 0) is the leader of the partition and all other
brokers replicate this partition.

Creating a Producer and Send Messages


Create a Producer to send messages to the topic Multibrokerapplication. You will get
a prompt. Type messages to be sent to the topic.
Let's paste the command below:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Multibrokerapplica


tion

This is single node-multi broker demo


This is the second message

Creating a Consumer and Consume Messages


Start Consumer to consume messages.

bin/kafka-console-consumer.sh --zookeeper localhost:2181


--topic Multibrokerapplica-tion —from-beginning

This is single node-multi broker demo


This is the second message

Cluster Membership
In Kafka, brokers maintain their membership in the cluster through Zookeeper. Every
broker in a cluster has a unique ID specified in their configuration file.

As soon as a broker process starts, it registers itself with the Zookeeper with its
unique ID creating an ephemeral node. Kafka Components such as producer and
consumer subscribe to the /broker/ids path in Zookeeper and get notified each time a
broker is added or removed in the cluster.

When a broker crashes and loses connectivity to Zookeeper, its ephemeral node is
removed from Zookeeper but continue to exist in other data structures. When the broker
is brought back, or a new broker is added to the cluster with the same ID, it immediately
joins the cluster.
Kafka Internals
A prior knowledge of the internal architecture and working of Kafka helps to tune,
troubleshoot Kafka for better performance and throughput.

Here, we will discuss three important topics of Kafka's internal design.

 Replication in Kafka
 How Kafka handles requests from Producers and Consumers
 Kafka Storage formats such as file formats and indexes

Replication in Kafka
Kafka is a distributed system and it works on a cluster of
computers. Replication plays a crucial role in achieving fault tolerance, stronger
durabilityand higher availability in a Kafka system.

For understanding replication, it is better to describe message partitioning.

 In a Kafka system, the producer decides on the topic partition to be published. A


message is sent to the leader replica of the brokers in cluster.
 The broker uses message partitioning strategy storing each partition in the
corresponding broker in a cluster.
 The number of partitions for each topic is configured within the Kafka Broker.

As shown in the above image,

 Each message is partitioned, and each partition has n replicas.

 Replicas are stored on brokers, and a broker may have several other replicas of
other topic's partitions.

 One replica of a topic partition acts as a leader. All producer and consumer
requestsgo through the leader for that topic partition.

 Follower are the replicas other than the leader that replicate messages from the
leader and remain updated with the recent message. In the event of a leader
crash, one follower will be promoted as leader.
How Replication Works?
 Fetch requests are constantly sent by the follower replicas to the leader to
consume recent messages. The request contains an offset of the
message the replica wants to receive. The leader responds to the request with
a message at that offset.

 Message requests to the leader are in sequential order. The offset requested by
the replica, helps the leader to identify the last message received by
the follower replica.

 A replica is considered out of sync when it fails to update with recent


messages within 10 seconds and loses credibility to become a leader in the
event of some failures with the current leader in the cluster.

Different Forms of Replication


Kafka supports two forms of replication :

 Synchronous Replication
 Asynchronous Replication

Synchronous Replication
Synchronous replication works as follows:

1. The Producer identifies the lead replica from the Zookeeper and publishes a
message.

2. The published message is written to the log of the lead replica, and all of its
followers in ISR (In-sync Replica) start to pull the messages. Please note that
every lead replica maintains In-Sync Replicas (ISR) in the cluster. These are the
minimum set of replicas that ought to be in sync with the lead replica whenever
there is a change in the lead replica.

3. Once the message is written to its respective logs, each of the follower replicas
send an acknowledgment to the lead replica.
4. Once all expected acknowledgments are received, and the replication is
complete, the lead replica sends an acknowledgment to the Producer.

5. The consumer pulls the message from the lead replica.

Asynchronous Replication
In an asynchronous replication, once the published message is written to the lead
replica log, it acknowledges the message producer without waitingfor an
acknowledgment from follower replicas.

In such replication, in the event of broker failure, there is no guarantee that all the
follower replicas would have committed the published message.

Handling Broker Failures


1. As soon as a follower in the ISR (in-sync replica) fails, the leader removes it from
the ISR and continues writing to other followers in the ISR.

2. Once it comes back, it will immediately truncate its log to the offset position of the
last message committed to its log.

3. Then it starts reading the messages from that point from the leader.

4. Once it is fully synced with the leader, it is added back to the current ISR by the
leader.

Handling Leader Failures


 When a leader fails while writing a message to its log or acknowledging the
producer, a new leader is elected.

 To elect the new leader, all the follower ISRs register themselves with the
Zookeeper.

 The first one to register becomes the new leader replica, and the offset of its log
end becomes the offset of the last committed message.

 All other replicas become the follower of the new leader.

Handling Leader Failures...


 Each replica registers a listener in the Zookeeper so that it gets notified of any
leader change.

 Once there is a leader change, the replica truncates to the offset of the last
committed message and starts to catch up with the new leader.

 The new leader waits until all live replicas get in sync, write the new ISR to the
Zookeeper and opens itself for any read/write from producer or consumer.

Processing Requests
In Kafka, the client initiates a connection and sends requests. The requests could be
a fetch request from consumer or a write request from the producer. The
broker processes the request in the order; they are received. The broker then
responds back to the client preserving the order of the message they store.

A client sends requests to the following:

 partition leaders
 partition replicas
 controller

Kafka uses a binary protocol (TCP) that specifies a format and defines how a broker
should respond:

 when the request is processed successfully.


 when the broker meets with an error while processing the request.

Request Header
All the request headers should have:

 Request Type - This is the API Key.


 Request Version - enable brokers to handle clients independent of their versions and
respond accordingly.
 Correlation ID - This is a number which can uniquely identify requests. It is also
present in the response and error logs.
 Client ID - This identifies the application from which a request is sent.
1. The broker runs an acceptor thread on each port it is listening to.

2. The acceptor thread creates a client connection, and hands over the connection
to the processor thread, also called network thread.

3. The processor thread takes requests from client connections and places them in
a Request Queue.

4. From the Request Queue, the I/O threadpicks up the request, processes them
and places them in a Response Queue.

5. From the Response Queue, the process thread picks up the response and sends
back to clients.

Request Types Handled by Broker


There are three types of requests managed by the broker. They are:

 Produce
 Metadata
 Fetch

Metadata Requests
 Metadata Requests are sent both by the Producer and Consumer to any broker
and contain a list of topics it is interested in.

 The broker will respond to metadata request with the partitions of the
topic, replicas for the partition, and the leader replica.

 It could be sent to any broker since all brokers have a copy of metadata cache
with this information.

 It needs to be refreshed intermittently by sending new ones, to know if the


topic metadata changed or if the client received Not a Leader for
Partition error.

Produce Requests
 Produce requests are sent from the producer application and contain messages
that need to be written to the Broker.

 The Producer sends these requests only to the leader replica. Otherwise, it will
receive an error response saying "Not a Leader for Partition".

 The Producer could send these requests to the leader replica of a partition, by
sending metadata request to any broker.

Produce Requests
 On receiving a Produce request, the leader replica will check if the respective
user has write privilege to the topic partition.

 If the privilege is available, it will write the data and check 'ack' (acknowledgment
configuration).

o If it is 0, the leader replica will immediately acknowledge the Producer


application.
o If it is 1, then the leader replica acknowledges to the producer immediately after
the request is written in the leader replica.
o If it is set to all, the produce request is responded by the leader replica only
after it receives acknowledgement from all the follower replicas. The request will
be stored in a buffer called purgatorybuffer till then.

Fetch Requests
 Fetch requests are sent by the consumer to Broker to read messages from
an offset of a topic's leader partition. It is also used among Brokers to replicate
partitions.

 The client sends an upper limit (maximum amount of data the client can accept
from the broker), and a lower limit (minimum amount of data needed for a data
transfer to happen).

 The Fetch request should have an upper limit. Otherwise, the broker will send a
large amount of data making the client run out of memory.

 Likewise, setting a minimum limit will ensure that broker sends the response only
when there is sizable amount of data available to be sent to the client. Else,
smaller amount of data processing happens resulting in resources being utilized
inefficiently.

Fetch Requests
 As soon as a leader replica receives a Fetch request, it validates the request by
checking if the offset specified in the request exists for that particular partition.

o If it is valid, the leader replica will read messages from that offset till the
specified upper limit and return the messages to the client.
o The broker will respond with an errormessage if it is an old one or if it does not
exist.
 To avoid any inconsistency, the leader replica always sends messages that are
replicated to all its followers.

Physical Storage
A Partition is the basic storage unit in Kafka. A partition cannot be broken down and
stored among multiple brokers or multiple discs of the same broker.

The administrator can provide a list of directories using the log.dirs parameter, to store
partitions, during configuration.

This topic describes the following:

 How are Partitions allocated among Brokers?


 How is Retention period for messages handled?
 Indexes
 Log Compaction

When a topic is created, Kafka decides how the partition should be allocated among
brokers of the cluster.

New partition assignment is based on the following:

 There should be minimum data movement.


 The partitions of topics should be evenly distributed across brokers in the
cluster.
 No two partitions of a topic should remain in the same broker.
 The replicas for each partition should be assigned to different racks. For
example, if brokers 0 and 1 are on the same rack and broker 2 and 3 are on
another rack. Then, the topic partitions will be stored on brokers in the order 0, 2,
1, 3 instead of 0 to 3. Each broker will be followed by a broker from a different
rack.
 Partition leaders should be evenly distributed across the cluster.

Partition Distribution
Given below is a high-level algorithm to distribute the partitions across the brokers
evenly. Assuming the values for topic partitions, replication factorand the number of
nodes are known.

 Find out the number of partitions that each node can accept so that the number
of partitions in each node is roughly the same.

 Assign a partition from the current assignment back to their current


ownernodes considering the number of partition each node can accept.

 Assign all partition replicas that remain unassigned, evenly to nodes that can
accept them.

Messages sent from the producer are retained in the Kafka broker till the time
configured for each topic of the message, referred as retention period.

The administrator configures retention period for each topic regarding

 the amount of time the message should be kept before deleting them.
 the amount of data that can be stored before the older messages are purged.

Partition Segments
 Each partition is split into segments of a particular size, say 1 GB or configured
to be retained for a proper amount of time, say one week.

 A message from the Producer is written to multiple partitions.

 Each partition has numerous segments. When any of the above limits
(size/retention period) exceeds while writing data to a partition segment, then that
segment is closed, and a new one is started.

 A segment to which Broker is currently writing is called an active segment.


 An active segment will never be deleted until any of the segment's limits are
exceeded.

 A broker may open multiple file handles for every segment for all of its partitions,
irrespective of active or inactive.

Segment File
Each partition segment is stored in a single data file. The file contains messages and
their offsets.

Each message consists of its:

 key
 value
 offset
 message size
 checksum code - helps to detect message corruption.
 magic byte - indicates the version of the message format.
 compression codec - indicates message compression format (snappy, GZip or LZ4).
 timestamp - denotes the time message was sent from the producer or received by the
consumer.

Partition Index
Kafka broker allows the consumer to read data from any available offset.

 To help brokers to locate the given partition quickly, Kafka maintains an index for
each partition.

 An index is a mapping of offsets to segment files and its position within the
file.

 When messages are deleted, corresponding offset entries will be deleted from
the index.

 Indexes are split into segments.


 When an index becomes corrupted, it will get automatically regenerated by re-
reading the messages and recording the offsets and corresponding locations.

Log Compaction
Kafka stores messages till it reaches its retention period and purges messages that are
older than the retention period.

To store latest data, Kafka supports:

 Retention Period of Topic: Messages that are older than the retention period
will be deleted.

 Compaction: This is done only on messages with key-value data inside


the inactive partitionsegments. Compaction allows storing only the most recent
value of each key in a message.

Compaction - Splitting Partition logs


The partition log is split into two parts during compaction.

 Clean: Portion of partition log that has messages already compacted. This will
have messages with keys having only one value, which will be the latest value.

 Dirty: Portion of the partition log that has messages written after the last
compaction.

Compaction Threads
When compaction is enabled, the following things happen:
 The broker starts a compaction managerthread and multiple compaction
threads. Compaction threads are responsible for the compaction task.

 The compaction thread starts the compaction task by choosing the partition, with
the highest ratio of dirty messages to the total size of the partition and starts
cleaning.

 The compaction thread reads the dirty portion of the partition and creates an in-
memory heap of map entries. A map entry comprises of hash of each
message (a key-value pair) of 16 bytes and the offset of the previous
message that had the same key which is 8 bytes. Thus, each map entry is 24
bytes in size.

 After building the offset map for the dirty segments of the partition log, the
compaction thread starts reading the clean segments, starting from the oldest.
It checks their content against the offset map.

Compaction - Key Validation

 It checks if the key of the message is present in the offset map.

o If the key is present, the message in the segment is ignored, since there
will be a newer value for the key later in the partition.

o If the key is not present, the corresponding message will be copied to a


replacement segment, since it is the newest message for that key.

 Once the replacement segment is built with all latest messages for their keys, the
original segment is replaced.

 Finally, only clean segments exist with one message per key – each keys having
the latest value.

Deletion of Messages from Segments


 To delete the message of a key completely from the system, the application
needs to produce a message with that key with a NULL value.

 When the compaction thread finds the key with the NULL value, regular
compaction is done replacing the segment with the key with the NULL value.
 This message with the key with the NULL value is also called a tombstone
message and is retained for a configured time.

 When a consumer sees a tombstone message, it knows that the value is deleted.

 After the configured time, the compaction thread will delete the tombstone
message, and the key will be completely removed from the partition log.

Producer Record Creation


Producers are front-end applications, proxy applications and adapters to legacy
systems, which produce messages and publish those messages to Kafka Broker.
The producer uses ProducerAPI to publish messages on the desired topic.

Producer Components:

 Producers create messages by creating a ProducerRecord that includes


the topic to which the data should be sent, and the corresponding data.

 ProducerRecord can have a Key and Partition.

 After sending the ProducerRecord, the producer serializes the key and the
value objects to ByteArray.

 The serialized data is then sent to a partitioner.

 Specifying a partition in the ProducerRecordwill send the data to that partition


by the partitioner. Otherwise, it will find a partition based on the key specified in
the ProducerRecord.

 Once a partition is chosen for the data, the producer will know which partition of
the topic the data should go.

 The partitioner then adds the record to a batch of records that will be sent to a
specific partition of a topic.

Writing Message and acknowledging


 Using a separate thread, the Producer will send each record from the batch of
records to the appropriate Kafka Broker.
 If the record is completely written to the broker, it will respond back to the
Producer with a RecordMetadata object. This object contains the topic, partition,
and the offset of the record within the partition.

 If the broker fails to write to the partition log, it will send back an error.

 When the producer receives the error, it will try resending the record.

Creating Producer Using Java API


Creating a Producer using Java API involves three steps:

 Creating a Producer Object with all the properties we want to pass to the
Producer.

 Creating a ProdumerRecord Object which has the name of the topic, key and
value of the record we want to send to the Broker.

 Sending the ProducerRecord Object to the Kafka Broker using Producer


Object's send()method.

Creating Producer Object


The following properties are mandatory for a Kafka Producer.

 bootstrap.servers: This is the list of host:port pairs of brokers needed for


Producer to establish an initial connection to the Kafka cluster. The list must have
at least two pairs so that the producer can connect to the cluster during failures.

 key.serializer: The producer interface accepts parameterized types and


objects to be sent as key-value pairs. However, brokers accept keys and values
of messages as byte arrays. key.serializer is again the name of the class
that serializes the keys of the records sent from Producer to byte arrays. It is
the class that
implementsorg.apache.kafka.common.serialization.Serializer interface.

 value.serializer: This is the name of the class that serializes the values of the
records produced from Kafka.It is the class that
implementsorg.apache.kafka.common.serialization.Serializer interface.

Creating Producer Object


The subsequent code snippet shows how to create a producer object in Java.

private Properties kafkaProps = new Properties();


kafkaProps.put("bootstrap.servers","broker1:9092,broker2:9092");
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerial
izer");
kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSeri
alizer");

producer = new KafkaProducer<String, String>(kafkaProps);

Creating Producer Record Object


Kafka producer accepts ProducerRecord object which is created
using ProducerRecord constructor.

It needs two parameters, the topic name to which we want to send the record and
the key and valueof the record we are sending to Kafka Broker (which are of string
type).

The following code snippet displays the creation of a ProducerRecord object in Java.

ProducerRecord<String, String> record =


new ProducerRecord<>("CustomerCountry", "Precision Products","France");

In the above code snippet, CustomerCountry is the topic name, Precision


Products is the message keyand France is the message value.

Sending Record to Kafka Broker


The producer object's send method is used to send the ProducerRecord object, to the
Kafka broker.

The below code snippet is used to send the ProducerRecord to Kafka broker.

producer.send(record);

Synchronous Send: Here, the send() method returns a future object, and we use
the get()method of that object to wait and see if the send() was successful.
Asynchronous send: Here, the send() method is called with a callback function which
will be triggered on getting a response from the Kafka broker.

Configuring Kafka Producers


Kafka is configured using key-value pairs in .properties extension files. The key value
pairs can be supplied to the .properties file from files and using programs.

There are additional configuration parameters which we will look at.

acks: It is the number of acknowledgments that producer requires the leader to have
received from the followers, to consider the request to be completed.

The ack configuration ensures the durability of the message sent. It can have three
values viz. 0, 1, and all.

Configuring ACKS
i. acks=0:

 This setting can be used for achieving high throughput and is the fastest one.
 When acks is set to 0, the Producer will not wait for an acknowledgment from any of the
brokers to assume the message is sent successfully.
 This setting provides the least durability as there could be message loss - sometimes the
message is not sent to the broker, and the Producer will not know about it.

ii.acks=1:

 With this setting, the Producer will receive a success response from the broker as soon
as the message is written to the leader replica.
 If the leader replica crashes in between and the message could not be written to the
leader, the producer will receive an error response, and it will resend the message
avoiding loss of data.
 There can still be a message loss if the leader crashes and a replica gets elected as a
new leader without this message.

Configuring ACKS...
 With acks = 1, there will be a tradeoff between throughput and latency depending on
whether the send() method sends data synchronously or asynchronously.
 If the data is sent synchronously, waiting for a reply from the server using
the FutureObject get() method, there will be an increase in latency.
 If the data is sent asynchronously with a callback, the throughput depends on the
number of messages sent from the Producer before receiving a reply from the server.

iii. acks=all:

 The Producer will receive a success response from the broker, once all in-sync replicas
received the message.
 This setting ensures the safest message publish, as it ensures the data had reached at
more than one replicas in the cluster.
 But it has the highest latency as the broker is waiting for the message to be written to
more than one replicas.

Configuring Others Parameters


 buffer.memory: This is used to set the total amount of memory the Producer can
use, to buffer messages waiting to be sent to Broker. The producer will block
sends for max.block.ms time and throw an exception, if the messages are sent
faster than the brokers could receive them.

 compression.type: This is used to set the compression format for the messages
sent from the Producer. The default value is None- meaning no compression.
For compressing messages before sending to brokers, gzip, snappy, or lz4
algorithm can be specified.

 retries: The message from the Producer sometimes will not be written to broker
due to transient errors (no leader for a partition). The value of retries decides the
number of times the producer resends data to the broker on receiving a transient
error. It is set to a value greater than zero.

Consumer Overview
Consumers are applications doing real-time analysis, data warehousing
solutions, applications having NoSQL, backend services or other subscriber-
based solutions that read data from Kafka topics, validates those
messagesand write them to other applications.
To consume messages from topics in the broker,

 The application will create a consumer object, subscribe to the required topic.
 Consume messages from the topic, validate them and write them to another application
or data store.

Consumer Group:
Consider the situation when Producer is sending data to the broker at a faster rate than
your application could read. Your application will fall back failing to keep up with the
rate of messages coming from Producer.

 To scale the consumption of messages from the topic, more consumers will be
added so that the reading will be balanced between the consumers.

 Multiple consumers subscribed to a topic can join a group called Consumer


Group so that each consumer in the group will have the same id which will be
that of the consumer group id.

By default, a consumer will be part of a consumer group.

Consuming Messages
 To consume message from Kafka broker, the consumer subscribes to the required
topic on the broker.
 While subscribing, the consumer connects to any of the live nodes in the cluster to get
metadata about leaders of partitions.
 Once getting metadata about leader partition, the consumer sends a fetch request to
the broker with the leader partition giving the offset of the message.
 Each topic is divided into multiple partitions, and each consumer in a consumer group
will be assigned to a different set of partitions of the topic.
 Thus a whole consumer group reads the topic.
 Once a partition is consumed by the consumer, it changes the offset to the next partition
to be read.
 A consumer can change to an older offsetthus re-consuming the message.
Scaling Data Consumption
If your application has only one consumer instanceconsuming and processing
partitions, it will be unable to keep up with the rate of messages flowing into topics.

To scale message consumption, more consumers are added so that the topic
partitions are split between multiple consumers. Here any of the four situations may
occur:

1. When multiple consumers are subscribed to a topic, and all of them belong to the
same consumer group. Each consumer in the consumer group will get different set of
partitions of the topic and the consumer group as a whole reads the topic.

2. When the topic has multiple partitions, and only one consumer subscribed to the
topic, and the consumer group has only one consumer. All the topic partition will be
consumed by that single consumer in the consumer group.

Scaling Data Consumption


3. When the number of consumers in the consumer group grows to the number of topic
partitions, then each consumer in the consumer group gets exactly one partition of the
topic.

4. When the number of consumers in the consumer group grows to a number greater
than the number of topic partitions, some of the consumers in the consumer group
remain idle and get no message partitions.

Consumer Rebalance - Introduction


A consumer in a consumer group reads different sets of partitions of a topic.

 When a new consumer is added to the consumer group, it starts reading


messages from partitions read early by another previous consumer.

 When a consumer in a consumer group crashes, it leaves the consumer


group and its corresponding partitions will be consumed by any other remaining
consumers.

 Partition re-assignment also happens when a topic currently being consumed by


a consumer group is modified.
Consumer Rebalance
Transferring the partitions of one consumer, to another consumer is called a rebalance.

 Rebalance makes the consumer group highly available and scalable.

 During a consumer rebalance, consumers cannot consume messages or change


their current state.

 Consumers ensure their membership in a consumer group by


sending heartbeats to a broker that is declared as the group coordinator for
the consumer group and will be different for each consumer group.

 Heartbeats are sent when the consumer is retrieving messages from its topic
partitions.

Consumer Rebalance
 As long as a consumer sends heartbeats at regular intervals to the broker, it is
considered to be live, and processingmessages from the partitions it is
assigned with.

 If the consumer stops sending heartbeats for a long time, the group coordinator
will consider the consumer is dead and triggers a consumer rebalance.

 When the consumer is crashed, it makes the group coordinator wait for some
timewithout heartbeats, and decide that it is dead and a rebalance should be
triggered.

 When the consumer is completely closed, it notifies the coordinator that it is


going to leave the group, and the group coordinator will immediately trigger a
rebalance.

Assigning Partitions to Consumers


 A consumer that needs to join a group sends JoinGroup request to the group
coordinator.
 The first consumer to join the group becomes the leader of consumers in the consumer
group. The leader gets a list of all consumers in the group from the group coordinator.
 The leader consumer assigns unique partitions to each consumer in the consumer
group. The leader uses PartitionAssignerimplementation for this purpose.
 Once deciding the partition assignment, the consumer leader sends a list of partition
assignments to group coordinator.
 Group coordinator sends the assignment information to corresponding consumers.
 The consumer leader will have the complete list of consumers in the group and the
partitions assigned to them.

Create Consumer Object


Creation of a Kafka consumer using Java API is similar to that of creating Kafka
Producer.

The first step involves the creation of Kafka consumer object. Create a properties
object, with the properties to be provided to the consumer.

The mandatory parameters are

 bootstrap.servers: It is a list of host:portpairs of brokers needed for the


consumer to establish an initial connection to the Kafka cluster.

 key.deserializer: Unlike key.serializer for producers, key.deserializer is the


class for deserializing keys. It takes byte arrays as input and converts to Java
Objects.

 value.deserializer: Unlike key.serializer for producers, key.deserializer is the


class for deserializing values. It takes byte arrays as input and converts to Java
Objects.

Create Consumer Object...


The following code snippet shows the first step in creating consumer object with all
needed properties:

Properties props = new Properties();

props.put("bootstrap.servers", "broker1:9092,broker2:9092");

props.put("group.id", "CountryCounter");
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserialize
r");

props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeseriali
zer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

Create Consumer Subscription


 The second step involves subscribing to one or more topics by the consumer.
 Subscription is done using the subscribe()method to a consumer object.
 The subscribe() method takes a list of topics, to be subscribed as its parameter. It can
also take a regular expression as its parameter so that the expression can match
multiple topic names.
 When a new topic matching the expression is newly added, the consumer will
automatically subscribe to that topic.

The subsequent code snippet shows how to subscribe to topics using subscribe()
method.

consumer.subscribe(Collections.singletonList("customerCountries"));

Consumer Polling
 Step 3 involves fetching (polling) the server for more data, from subscribed topics using
the poll() method.
 The poll loop handles coordination between consumers in the consumer group, partition
rebalancing, heartbeats and fetching data from servers.
 The poll() method returns data from the topic partitions assigned to it.

Poll and Fetch Data


The following is a sample code of consumer object's poll() method fetching data from
subscribed topics.
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
log.debug("topic = %s, partition = %s, offset = %d, customer = %s, country =
%s\n", record.topic(), record.partition(), record.offset(), record.key(), record.valu
e());
int updatedCount = 1;
if (custCountryMap.countainsValue(record.value())) {
updatedCount = custCountryMap.get(record.value()) + 1;
}

custCountryMap.put(record.value(), updatedCount)
JSONObject json = new JSONObject(custCountryMap);
System.out.println(json.toString(4))
}
}
} finally {
consumer.close();
}

Configuring Kafka Consumers


This topic describes some of the important parameters that could be configured to
increase the performance and availability of Kafka consumers.

 fetch.min.bytes: It is the minimum amount of data in topic partitions in a broker that a


consumer can request for. If the amount of data in broker's topic partition is less than
fetch.min.bytes, the broker will wait till enough data flows into the topic before sending
the records to the consumer. This helps in reducing the load on both consumer and
broker.

 fetch.max.wait.ms: It is the maximum amount of time the consumer has to wait for the
broker to send back records. The consumer will wait till fetch.max.wait.ms for the
broker to send back the record if there is not enough data. After that, it will again request
broker for the record.

Configuring Kafka Consumers


 max.partition.fetch.bytes: This property is used to control the maximum
number of bytes the server will return per partition.

 session.timeout.ms: It is the amount of time a consumer can stay connected to


the broker without sending heartbeats to the group coordinator. When
session.timeout.ms passes without consumer sending heartbeats to group
coordinator, it is considered to be dead, and group coordinator will trigger a
partition re-balance.

Configuring Kafka Consumer


 partition.assignment.strategy: It is the strategy used by PartitionAssignor to split the
partition among consumers in a consumer group. It can be given with two values.
o Range: Here, a consecutive set of partitions of a topic are split among the
consumers in the consumer group. Example, if topic1 and topic2 have 3
partitions respectively. And, c1 and c2 are two consumers in the consumer group
cg1 that subscribed to both topic1 and topic2. Using range strategy, c1 will
receive two partitions from topic1 and topic2, and c2 will receive one from topic1
and topic2.
o Round Robin: Assigns partitions to consumers in consumer group sequentially.
Example as in the above one, partition 0 and partition 2 of topic1 will be assigned
to c1 and partition1 to c2. Also, partition 0 and partition2 of topic2 will be
assigned to c2 and partition1 of topic2 will be assigned to c1.

Producer API and Consumer API


Kafka provides multiple APIs for publishing, consuming and streaming records for
different clients. Here we discuss five core APIs of Kafka for Java client.

 Producer API: The Producer API allows applications to send streams of records
to desired topics already created in brokers of Kafka cluster. The steps to create
and use Producer API had previously been covered in this course.
 Consumer API: The Consumer API allows an application to poll streams of
records from subscribed topics in brokers of Kafka cluster. The creation and
usage of Consumer API also are discussed in previous topic of this course.

Streams API
 The Streams API allows an application to transform streams of records from input topics
to output topics.
 Supports per-record stream processing with millisecond latency.
 Supports stateless processing, stateful processing, and windowing operation.
 Using Streams API, we can write Java applications to process data in real-time.
 No separate cluster is required for real-time processing using streams API.
 It is OS independent.
 It is highly scalable, elastic and fault-tolerant.
 It can be deployed to the cloud, VMs or containers.
 It can be used for small, medium or large use cases.
 It is fully integrated with Kafka security.
 It is part of open-source Apache Kafka project.

Connect API
The Connect API allows Kafka to pull records from source data system and push data to sink
data system. Connect API implements connectors that responsible for moving a large amount of
data (e.g., an entire database) into Kafka topics for stream processing and moving out of Kafka
(e.g., external storage systems).

AdminClient API
Adminclient API allows Kafka cluster to manage topics, partitions, offset, brokers and
ACLs. The AdminClinet API provides a set of tools that are implemented using Java
classes and called using a set of scripts.

Some of the tools provided by AdminClient API are:

 kafka-topics.sh: This tool helps in topic operations such as creating, modifying,


deleting and listing information about the topic. It is also used in adding more
partitions to a topic to expand the topic.
 kafka-consumer-groups.sh: This tool is used to list and describe both old
consumer group (maintained by zookeeper) and new consumer group
(maintained by Kafka Broker). This tool is also used to delete consumer groups
and remove offset information maintained by the zookeeper.

AdminClient API
 kafka-run-class.sh: This tool is used for exporting partition offsets which will
produce a file containing topic partitions and corresponding offsets in a defined
format that could be imported, taken a copy of the offset and edited to set the
current offset for the consumer group.

 kafka-configs.sh: This tool is used to change topic configurations for individual


topics, overriding client configuration (rates or bytes per second the producer or
consumer can send/receive data to/from topics).

You might also like