Producing Messages with Kafka
Producers
Ryan Plant
COURSE AUTHOR
@ryan_plant blog.ryanplant.com
Development Environment Setup
- Adding Kafka Dependencies
Demo
- Browsing the API
Prerequisites
- Integrated Development Environment
- Java 8 JDK
- Maven
- Access to a test Kafka cluster
Kafka Producer Externals
Broker
Broker
Broker
Broker
Broker
Kafka Producer Internals
KafkaProducer
Topic
Metadata
[metadata]
ProducerRecord Partitions
RecordBatch
Topic RecordBatch
Partition Serializer<T> Partitioner RecordBatch
Timestamp send() Serializer Partitioner
Topic
Key
Value ProducerConfig Partitions
RecordBatch
Properties RecordBatch
bootstrap.servers RecordBatch RecordMetadata
key.serializer
value.serializer
…
Creating a Kafka Producer
KafkaProducer
Properties
bootstrap.servers
key.serializer
value.serializer
…
http://kafka.apache.org/documentation.html#producerconfigs
Properties props = new Properties();
props.put(“bootstrap.servers”, “BROKER-1:9092, BROKER-2:9093”);
props.put(“key.serializer”, “org.apache.kafka.common.serialization.StringSerializer”);
props.put(“value.serializer”, “org.apache.kafka.common.serialization.StringSerializer”);
Kafka Producer: Required Properties
bootstrap.servers
- Cluster membership: partition leaders, etc.
key and value serializers
- Classes used for message serialization and deserialization
Creating a Kafka Producer
public class KafkaProducerApp {
public static void main(String[] args){
Properties props = new Properties();
props.put(“bootstrap.servers”, “BROKER-1:9092, BROKER-2:9093”);
props.put(“key.serializer”, “org.apache.kafka.common.serialization.StringSerializer”);
props.put(“value.serializer”, “org.apache.kafka.common.serialization.StringSerializer”);
KafkaProducer myProducer = new KafkaProducer(props);
}
Basic Kafka Producer
“myProducer”
keySerializer =
valueSerializer =
StringSerializer
Serializer<T>
Serializer
ProducerConfig
“props”
bootstrap.servers
key.serializer
value.serializer
…
Kafka Producer Messages Records
“myProducer”
ProducerRecord
StringSerializer
Topic
Partition
Timestamp Serializer
Key
Value ProducerConfig
“props”
bootstrap.servers
key.serializer
value.serializer
…
props.put(“value.serializer”, “org.apache.kafka.common.serialization.StringSerializer”)
ProducerRecord myMessage = new ProducerRecord(“my_topic”,“My Message 1”);
ProducerRecord: Required Properties
topic
- Topic to which the ProducerRecord will be sent
value
- The message content (matching the serializer type for value)
Kafka Producer Shell Program
~$ bin/kafka-console-producer.sh \
> --broker-list localhost:9092, localhost:9093 \
> --topic my_topic
MyMessage 1
MyMessage 2
props.put(“value.serializer”, “org.apache.kafka.common.serialization.StringSerializer”)
ProducerRecord myMessage = new ProducerRecord(“my_topic”,“My Message 1”);
ProducerRecord myMessage = new ProducerRecord(“my_topic”,3.14159);
SerializationException: Can’t convert value of class ...
ProducerRecord: Required Properties
topic
- Topic to which the ProducerRecord will be sent
value
- The message content (matching the serializer type for value)
KafkaProducer instances
can only send
ProducerRecords that
match the key and value
serializers types it is
configured with.
ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value);
// Example:
ProducerRecord(“my_topic”, 1, 124535353325, “Course-001”,“My Message 1”);
// Defined in server.properties:
log.message.timestamp.type = [CreateTime, LogAppendTime]
// CreateTime: producer-set timestamp used.
// LogAppendTime: broker-set timestamp used when message is appended to commit log.
ProducerRecord: Optional Properties
partition
- specific partition within the topic to send ProducerRecord
timestamp
- the Unix timestamp applied to the record
ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value);
// Example:
ProducerRecord(“my_topic”, 1, 124535353325, “Course-001”,“My Message 1”);
ProducerRecord: Optional Properties
key
- a value to be used as the basis of determining the partitioning
strategy to be employed by the Kafka Producer
Remember the question?
Hint: How do Producers split messages to partitions?
Best Practice: Define a Key
Two useful purposes:
- Additional information in the message
- Can determine what partitions the
message will be written to
Downside:
- Additional overhead
- Depends on the serializer type used
Making Messaging Magic
public class KafkaProducerApp {
public static void main(String[] args){
Properties props = new Properties();
props.put(“bootstrap.servers”, “BROKER-1:9092, BROKER-2:9093”);
props.put(“key.serializer”, “org.apache.kafka.common.serialization.StringSerializer”);
props.put(“value.serializer”, “org.apache.kafka.common.serialization.StringSerializer”);
KafkaProducer myProducer = new KafkaProducer(props);
ProducerRecord myRecord = new ProducerRecord(“my_topic”, “Course-001”, “My Message 1”);
myProducer.send(myRecord); // Best practice: try..catch
}
Sending the Message, Part 1
“myProducer”
[metadata]
Metadata
“myRecord”
StringSerializer
“my_topic”
Partition
Timestamp send() Serializer Partitioner
Key
“…” ProducerConfig
“props”
bootstrap.servers
key.serializer
value.serializer
…
Kafka Producer Partitioning Strategy
PARTITIONER_CLASS_CONFIG
partitioner.class
Producer
Metadata
Producer Strategies
Record Config
direct
has YES has DefaultPartitioner
send() partition isValid key custom
round-
? ? ? ? robin
NO
DefaultPartitioner
key mod-
hash
InvalidArgumentException
custom
Sending the Message, Part 1
“myProducer”
Metadata
“myRecord”
StringSerializer DefaultPartitioner
“my_topic”
Partition
Timestamp send() Serializer Partitioner
Key
“…” ProducerConfig
“props”
bootstrap.servers
key.serializer
value.serializer
…
Sending the Message, Part 2
“myProducer”
Metadata
“myRecord” RecordAccumulator
StringSerializer DefaultPartitioner
“my_topic”
Partition
Timestamp send() Serializer Partitioner
Key
“…” ProducerConfig
“props”
bootstrap.servers
key.serializer
value.serializer
…
Micro-batching in Apache Kafka
At scale, efficiency is everything.
Small, fast batches of messages:
- Sending (Producer)
- Writing (Broker)
- Reading (Consumer)
Modern operating system functions:
- Pagecache
- Linux sendfile() system call (kernel)
Amortization of the constant cost
Sending the Message, Part 2
“myProducer”
Metadata
“myRecord” RecordAccumulator
StringSerializer DefaultPartitioner
“my_topic” TopicPartition
Partition RecordBatch
Timestamp send() Serializer Partitioner TopicPartition
Key RecordBatch
“…” ProducerConfig TopicPartition
RecordBatch
“props”
bootstrap.servers
key.serializer
value.serializer
…
Message Buffering
RecordBatch
ProducerRecord
record batch size == batch.size
ProducerRecord batch.size
linger.ms = 0
ProducerRecord
RecordBatch
linger.ms = 10
record batch size < batch.size
linger.ms = 1ms (remaining) ProducerRecord batch.size buffer.memory
ProducerRecord
max.block.ms
RecordBatch
record batch size < batch.size linger.ms = 10
batch.size
linger.ms = 5ms (remaining)
ProducerRecord
Sending the Message, Part 2
“myProducer”
Topic
Metadata
“myRecord” Partitions
RecordBatch
StringSerializer DefaultPartitioner
“my_topic” RecordBatch
Partition RecordBatch
Timestamp send() Serializer Partitioner
Topic
Key
“…” ProducerConfig Partitions
RecordBatch
“props” RecordBatch
bootstrap.servers RecordBatch RecordMetadata
key.serializer
value.serializer
…
Delivery Guarantees
Broker acknowledgement (“acks”)
- 0: fire and forget
- 1: leader acknowledged
- 2: replication quorum acknowledged
Broker responds with error
- “retries”
- “retry.backoff.ms”
Ordering Guarantees
Message order by partition
- No global order across partitions
Can get complicated with errors
- retries, retry.backoff.ms
- max.in.flight.request.per.connection
Delivery semantics
- At-most-once, at-least-once, only-once
Custom Producer application in Java
- Summary of what we’ve covered
- Similar to shell program
Demo Basic Producer configuration
Cluster setup:
- Three partitions
- Three brokers
- Replication factor: 3
Look for:
- Default partitioner
- No global order (across partitions)
Advanced Topics Not Covered
Custom Serializers
Custom Partitioners
Asynchronous Send
Compression
Advanced Settings
Kafka Producer Internals
- Properties -> ProducerConfig
Summary - Message -> ProducerRecord
- Processing Pipeline: Serializers and
Partitioners
- Micro-batching -> Record
Accumulator and RecordBuffer
Delivery and Ordering Guarantees
Java-based Producer