Kafka Internals
Kafka Internals
We are taking a step ahead here and will discuss the following concepts in this course:
bin/zookeeper-server-start.sh config/zookeeper.properties
broker.id=1
port=9093
log.dirs=/tmp/kafka-logs-1
Broker1
bin/kafka-server-start.sh config/server.properties
Broker2
bin/kafka-server-start.sh config/server-one.properties
Broker3
bin/kafka-server-start.sh config/server-two.properties
Topic:Multibrokerapplication PartitionCount:1
ReplicationFactor:3 Configs:
From the above output, we can understand Multibrokerapplication has one partition
with three replicas.
The first broker (broker.id 0) is the leader of the partition and all other
brokers replicate this partition.
Cluster Membership
In Kafka, brokers maintain their membership in the cluster through Zookeeper. Every
broker in a cluster has a unique ID specified in their configuration file.
As soon as a broker process starts, it registers itself with the Zookeeper with its
unique ID creating an ephemeral node. Kafka Components such as producer and
consumer subscribe to the /broker/ids path in Zookeeper and get notified each time a
broker is added or removed in the cluster.
When a broker crashes and loses connectivity to Zookeeper, its ephemeral node is
removed from Zookeeper but continue to exist in other data structures. When the broker
is brought back, or a new broker is added to the cluster with the same ID, it immediately
joins the cluster.
Kafka Internals
A prior knowledge of the internal architecture and working of Kafka helps to tune,
troubleshoot Kafka for better performance and throughput.
Replication in Kafka
How Kafka handles requests from Producers and Consumers
Kafka Storage formats such as file formats and indexes
Replication in Kafka
Kafka is a distributed system and it works on a cluster of
computers. Replication plays a crucial role in achieving fault tolerance, stronger
durabilityand higher availability in a Kafka system.
Replicas are stored on brokers, and a broker may have several other replicas of
other topic's partitions.
One replica of a topic partition acts as a leader. All producer and consumer
requestsgo through the leader for that topic partition.
Follower are the replicas other than the leader that replicate messages from the
leader and remain updated with the recent message. In the event of a leader
crash, one follower will be promoted as leader.
How Replication Works?
Fetch requests are constantly sent by the follower replicas to the leader to
consume recent messages. The request contains an offset of the
message the replica wants to receive. The leader responds to the request with
a message at that offset.
Message requests to the leader are in sequential order. The offset requested by
the replica, helps the leader to identify the last message received by
the follower replica.
Synchronous Replication
Asynchronous Replication
Synchronous Replication
Synchronous replication works as follows:
1. The Producer identifies the lead replica from the Zookeeper and publishes a
message.
2. The published message is written to the log of the lead replica, and all of its
followers in ISR (In-sync Replica) start to pull the messages. Please note that
every lead replica maintains In-Sync Replicas (ISR) in the cluster. These are the
minimum set of replicas that ought to be in sync with the lead replica whenever
there is a change in the lead replica.
3. Once the message is written to its respective logs, each of the follower replicas
send an acknowledgment to the lead replica.
4. Once all expected acknowledgments are received, and the replication is
complete, the lead replica sends an acknowledgment to the Producer.
Asynchronous Replication
In an asynchronous replication, once the published message is written to the lead
replica log, it acknowledges the message producer without waitingfor an
acknowledgment from follower replicas.
In such replication, in the event of broker failure, there is no guarantee that all the
follower replicas would have committed the published message.
2. Once it comes back, it will immediately truncate its log to the offset position of the
last message committed to its log.
3. Then it starts reading the messages from that point from the leader.
4. Once it is fully synced with the leader, it is added back to the current ISR by the
leader.
To elect the new leader, all the follower ISRs register themselves with the
Zookeeper.
The first one to register becomes the new leader replica, and the offset of its log
end becomes the offset of the last committed message.
Once there is a leader change, the replica truncates to the offset of the last
committed message and starts to catch up with the new leader.
The new leader waits until all live replicas get in sync, write the new ISR to the
Zookeeper and opens itself for any read/write from producer or consumer.
Processing Requests
In Kafka, the client initiates a connection and sends requests. The requests could be
a fetch request from consumer or a write request from the producer. The
broker processes the request in the order; they are received. The broker then
responds back to the client preserving the order of the message they store.
partition leaders
partition replicas
controller
Kafka uses a binary protocol (TCP) that specifies a format and defines how a broker
should respond:
Request Header
All the request headers should have:
2. The acceptor thread creates a client connection, and hands over the connection
to the processor thread, also called network thread.
3. The processor thread takes requests from client connections and places them in
a Request Queue.
4. From the Request Queue, the I/O threadpicks up the request, processes them
and places them in a Response Queue.
5. From the Response Queue, the process thread picks up the response and sends
back to clients.
Produce
Metadata
Fetch
Metadata Requests
Metadata Requests are sent both by the Producer and Consumer to any broker
and contain a list of topics it is interested in.
The broker will respond to metadata request with the partitions of the
topic, replicas for the partition, and the leader replica.
It could be sent to any broker since all brokers have a copy of metadata cache
with this information.
Produce Requests
Produce requests are sent from the producer application and contain messages
that need to be written to the Broker.
The Producer sends these requests only to the leader replica. Otherwise, it will
receive an error response saying "Not a Leader for Partition".
The Producer could send these requests to the leader replica of a partition, by
sending metadata request to any broker.
Produce Requests
On receiving a Produce request, the leader replica will check if the respective
user has write privilege to the topic partition.
If the privilege is available, it will write the data and check 'ack' (acknowledgment
configuration).
Fetch Requests
Fetch requests are sent by the consumer to Broker to read messages from
an offset of a topic's leader partition. It is also used among Brokers to replicate
partitions.
The client sends an upper limit (maximum amount of data the client can accept
from the broker), and a lower limit (minimum amount of data needed for a data
transfer to happen).
The Fetch request should have an upper limit. Otherwise, the broker will send a
large amount of data making the client run out of memory.
Likewise, setting a minimum limit will ensure that broker sends the response only
when there is sizable amount of data available to be sent to the client. Else,
smaller amount of data processing happens resulting in resources being utilized
inefficiently.
Fetch Requests
As soon as a leader replica receives a Fetch request, it validates the request by
checking if the offset specified in the request exists for that particular partition.
o If it is valid, the leader replica will read messages from that offset till the
specified upper limit and return the messages to the client.
o The broker will respond with an errormessage if it is an old one or if it does not
exist.
To avoid any inconsistency, the leader replica always sends messages that are
replicated to all its followers.
Physical Storage
A Partition is the basic storage unit in Kafka. A partition cannot be broken down and
stored among multiple brokers or multiple discs of the same broker.
The administrator can provide a list of directories using the log.dirs parameter, to store
partitions, during configuration.
When a topic is created, Kafka decides how the partition should be allocated among
brokers of the cluster.
Partition Distribution
Given below is a high-level algorithm to distribute the partitions across the brokers
evenly. Assuming the values for topic partitions, replication factorand the number of
nodes are known.
Find out the number of partitions that each node can accept so that the number
of partitions in each node is roughly the same.
Assign all partition replicas that remain unassigned, evenly to nodes that can
accept them.
Messages sent from the producer are retained in the Kafka broker till the time
configured for each topic of the message, referred as retention period.
the amount of time the message should be kept before deleting them.
the amount of data that can be stored before the older messages are purged.
Partition Segments
Each partition is split into segments of a particular size, say 1 GB or configured
to be retained for a proper amount of time, say one week.
Each partition has numerous segments. When any of the above limits
(size/retention period) exceeds while writing data to a partition segment, then that
segment is closed, and a new one is started.
A broker may open multiple file handles for every segment for all of its partitions,
irrespective of active or inactive.
Segment File
Each partition segment is stored in a single data file. The file contains messages and
their offsets.
key
value
offset
message size
checksum code - helps to detect message corruption.
magic byte - indicates the version of the message format.
compression codec - indicates message compression format (snappy, GZip or LZ4).
timestamp - denotes the time message was sent from the producer or received by the
consumer.
Partition Index
Kafka broker allows the consumer to read data from any available offset.
To help brokers to locate the given partition quickly, Kafka maintains an index for
each partition.
An index is a mapping of offsets to segment files and its position within the
file.
When messages are deleted, corresponding offset entries will be deleted from
the index.
Log Compaction
Kafka stores messages till it reaches its retention period and purges messages that are
older than the retention period.
Retention Period of Topic: Messages that are older than the retention period
will be deleted.
Clean: Portion of partition log that has messages already compacted. This will
have messages with keys having only one value, which will be the latest value.
Dirty: Portion of the partition log that has messages written after the last
compaction.
Compaction Threads
When compaction is enabled, the following things happen:
The broker starts a compaction managerthread and multiple compaction
threads. Compaction threads are responsible for the compaction task.
The compaction thread starts the compaction task by choosing the partition, with
the highest ratio of dirty messages to the total size of the partition and starts
cleaning.
The compaction thread reads the dirty portion of the partition and creates an in-
memory heap of map entries. A map entry comprises of hash of each
message (a key-value pair) of 16 bytes and the offset of the previous
message that had the same key which is 8 bytes. Thus, each map entry is 24
bytes in size.
After building the offset map for the dirty segments of the partition log, the
compaction thread starts reading the clean segments, starting from the oldest.
It checks their content against the offset map.
o If the key is present, the message in the segment is ignored, since there
will be a newer value for the key later in the partition.
Once the replacement segment is built with all latest messages for their keys, the
original segment is replaced.
Finally, only clean segments exist with one message per key – each keys having
the latest value.
When the compaction thread finds the key with the NULL value, regular
compaction is done replacing the segment with the key with the NULL value.
This message with the key with the NULL value is also called a tombstone
message and is retained for a configured time.
When a consumer sees a tombstone message, it knows that the value is deleted.
After the configured time, the compaction thread will delete the tombstone
message, and the key will be completely removed from the partition log.
Producer Components:
After sending the ProducerRecord, the producer serializes the key and the
value objects to ByteArray.
Once a partition is chosen for the data, the producer will know which partition of
the topic the data should go.
The partitioner then adds the record to a batch of records that will be sent to a
specific partition of a topic.
If the broker fails to write to the partition log, it will send back an error.
When the producer receives the error, it will try resending the record.
Creating a Producer Object with all the properties we want to pass to the
Producer.
Creating a ProdumerRecord Object which has the name of the topic, key and
value of the record we want to send to the Broker.
value.serializer: This is the name of the class that serializes the values of the
records produced from Kafka.It is the class that
implementsorg.apache.kafka.common.serialization.Serializer interface.
It needs two parameters, the topic name to which we want to send the record and
the key and valueof the record we are sending to Kafka Broker (which are of string
type).
The following code snippet displays the creation of a ProducerRecord object in Java.
The below code snippet is used to send the ProducerRecord to Kafka broker.
producer.send(record);
Synchronous Send: Here, the send() method returns a future object, and we use
the get()method of that object to wait and see if the send() was successful.
Asynchronous send: Here, the send() method is called with a callback function which
will be triggered on getting a response from the Kafka broker.
acks: It is the number of acknowledgments that producer requires the leader to have
received from the followers, to consider the request to be completed.
The ack configuration ensures the durability of the message sent. It can have three
values viz. 0, 1, and all.
Configuring ACKS
i. acks=0:
This setting can be used for achieving high throughput and is the fastest one.
When acks is set to 0, the Producer will not wait for an acknowledgment from any of the
brokers to assume the message is sent successfully.
This setting provides the least durability as there could be message loss - sometimes the
message is not sent to the broker, and the Producer will not know about it.
ii.acks=1:
With this setting, the Producer will receive a success response from the broker as soon
as the message is written to the leader replica.
If the leader replica crashes in between and the message could not be written to the
leader, the producer will receive an error response, and it will resend the message
avoiding loss of data.
There can still be a message loss if the leader crashes and a replica gets elected as a
new leader without this message.
Configuring ACKS...
With acks = 1, there will be a tradeoff between throughput and latency depending on
whether the send() method sends data synchronously or asynchronously.
If the data is sent synchronously, waiting for a reply from the server using
the FutureObject get() method, there will be an increase in latency.
If the data is sent asynchronously with a callback, the throughput depends on the
number of messages sent from the Producer before receiving a reply from the server.
iii. acks=all:
The Producer will receive a success response from the broker, once all in-sync replicas
received the message.
This setting ensures the safest message publish, as it ensures the data had reached at
more than one replicas in the cluster.
But it has the highest latency as the broker is waiting for the message to be written to
more than one replicas.
compression.type: This is used to set the compression format for the messages
sent from the Producer. The default value is None- meaning no compression.
For compressing messages before sending to brokers, gzip, snappy, or lz4
algorithm can be specified.
retries: The message from the Producer sometimes will not be written to broker
due to transient errors (no leader for a partition). The value of retries decides the
number of times the producer resends data to the broker on receiving a transient
error. It is set to a value greater than zero.
Consumer Overview
Consumers are applications doing real-time analysis, data warehousing
solutions, applications having NoSQL, backend services or other subscriber-
based solutions that read data from Kafka topics, validates those
messagesand write them to other applications.
To consume messages from topics in the broker,
The application will create a consumer object, subscribe to the required topic.
Consume messages from the topic, validate them and write them to another application
or data store.
Consumer Group:
Consider the situation when Producer is sending data to the broker at a faster rate than
your application could read. Your application will fall back failing to keep up with the
rate of messages coming from Producer.
To scale the consumption of messages from the topic, more consumers will be
added so that the reading will be balanced between the consumers.
Consuming Messages
To consume message from Kafka broker, the consumer subscribes to the required
topic on the broker.
While subscribing, the consumer connects to any of the live nodes in the cluster to get
metadata about leaders of partitions.
Once getting metadata about leader partition, the consumer sends a fetch request to
the broker with the leader partition giving the offset of the message.
Each topic is divided into multiple partitions, and each consumer in a consumer group
will be assigned to a different set of partitions of the topic.
Thus a whole consumer group reads the topic.
Once a partition is consumed by the consumer, it changes the offset to the next partition
to be read.
A consumer can change to an older offsetthus re-consuming the message.
Scaling Data Consumption
If your application has only one consumer instanceconsuming and processing
partitions, it will be unable to keep up with the rate of messages flowing into topics.
To scale message consumption, more consumers are added so that the topic
partitions are split between multiple consumers. Here any of the four situations may
occur:
1. When multiple consumers are subscribed to a topic, and all of them belong to the
same consumer group. Each consumer in the consumer group will get different set of
partitions of the topic and the consumer group as a whole reads the topic.
2. When the topic has multiple partitions, and only one consumer subscribed to the
topic, and the consumer group has only one consumer. All the topic partition will be
consumed by that single consumer in the consumer group.
4. When the number of consumers in the consumer group grows to a number greater
than the number of topic partitions, some of the consumers in the consumer group
remain idle and get no message partitions.
Heartbeats are sent when the consumer is retrieving messages from its topic
partitions.
Consumer Rebalance
As long as a consumer sends heartbeats at regular intervals to the broker, it is
considered to be live, and processingmessages from the partitions it is
assigned with.
If the consumer stops sending heartbeats for a long time, the group coordinator
will consider the consumer is dead and triggers a consumer rebalance.
When the consumer is crashed, it makes the group coordinator wait for some
timewithout heartbeats, and decide that it is dead and a rebalance should be
triggered.
The first step involves the creation of Kafka consumer object. Create a properties
object, with the properties to be provided to the consumer.
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("group.id", "CountryCounter");
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserialize
r");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeseriali
zer");
The subsequent code snippet shows how to subscribe to topics using subscribe()
method.
consumer.subscribe(Collections.singletonList("customerCountries"));
Consumer Polling
Step 3 involves fetching (polling) the server for more data, from subscribed topics using
the poll() method.
The poll loop handles coordination between consumers in the consumer group, partition
rebalancing, heartbeats and fetching data from servers.
The poll() method returns data from the topic partitions assigned to it.
custCountryMap.put(record.value(), updatedCount)
JSONObject json = new JSONObject(custCountryMap);
System.out.println(json.toString(4))
}
}
} finally {
consumer.close();
}
fetch.max.wait.ms: It is the maximum amount of time the consumer has to wait for the
broker to send back records. The consumer will wait till fetch.max.wait.ms for the
broker to send back the record if there is not enough data. After that, it will again request
broker for the record.
Producer API: The Producer API allows applications to send streams of records
to desired topics already created in brokers of Kafka cluster. The steps to create
and use Producer API had previously been covered in this course.
Consumer API: The Consumer API allows an application to poll streams of
records from subscribed topics in brokers of Kafka cluster. The creation and
usage of Consumer API also are discussed in previous topic of this course.
Streams API
The Streams API allows an application to transform streams of records from input topics
to output topics.
Supports per-record stream processing with millisecond latency.
Supports stateless processing, stateful processing, and windowing operation.
Using Streams API, we can write Java applications to process data in real-time.
No separate cluster is required for real-time processing using streams API.
It is OS independent.
It is highly scalable, elastic and fault-tolerant.
It can be deployed to the cloud, VMs or containers.
It can be used for small, medium or large use cases.
It is fully integrated with Kafka security.
It is part of open-source Apache Kafka project.
Connect API
The Connect API allows Kafka to pull records from source data system and push data to sink
data system. Connect API implements connectors that responsible for moving a large amount of
data (e.g., an entire database) into Kafka topics for stream processing and moving out of Kafka
(e.g., external storage systems).
AdminClient API
Adminclient API allows Kafka cluster to manage topics, partitions, offset, brokers and
ACLs. The AdminClinet API provides a set of tools that are implemented using Java
classes and called using a set of scripts.
AdminClient API
kafka-run-class.sh: This tool is used for exporting partition offsets which will
produce a file containing topic partitions and corresponding offsets in a defined
format that could be imported, taken a copy of the offset and edited to set the
current offset for the consumer group.