Skip to content

Commit 08b5433

Browse files
committed
Unify KafkaProducers (i.e. KafkaAsyncProducer & KafkaSyncProducer)
1 parent 37db100 commit 08b5433

19 files changed

+316
-504
lines changed

README.md

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,12 @@ Eventually, we worked out the ***modern-cpp-kafka***, -- a header-only library t
166166

167167
* KafkaConsumer
168168

169+
* Properties with random string as default
170+
171+
* `client.id`
172+
173+
* `group.id`
174+
169175
* More properties than ***librdkafka***
170176

171177
* `max.poll.records` (default: `500`): The maxmum number of records that a single call to `poll()` would return
@@ -178,20 +184,8 @@ Eventually, we worked out the ***modern-cpp-kafka***, -- a header-only library t
178184

179185
* `auto.commit.interval.ms`
180186

181-
* Properties with random string as default
182-
183-
* `client.id`
184-
185-
* `group.id`
186-
187187
* KafkaProducer
188188

189-
* Different default value from ***librdkafka***
190-
191-
* KafkaSyncProducer
192-
193-
* `linger.ms` (default: `0`)
194-
195189
* Properties with random string as default
196190

197191
* `client.id`

doc/GoodPracticesToUseKafkaProducer.md

Lines changed: 2 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -2,24 +2,16 @@
22

33
If we want to achieve high performance/availability, here're some rules of thumb.
44

5-
## Use a `KafkaAsyncProducer` for better throughput
5+
## Avoid using `syncSend` for better throughput
66

7-
You should not use a `KafkaSyncProducer` if you want to get a high throughput. The `Sync` means the `send` operation is a synchronous operation, and would not go on until the `acks` are received.
7+
You should never call `syncSend` if you want to get a high throughput. The `syncSend` is a synchronous operation, and would not go on until the `acks` are received.
88

99
## The `message.max.bytes` must be consistent with Kafka servers' setting
1010

1111
* Default value: 1000,000
1212

1313
* The default setting for brokers is `message.max.bytes = 1000012`, and do MAKE SURE the client side setting no larger than it. Otherwise, it might construct a MessageSet which would be rejected (error: INVALID_MESSAGE_SIZE) by brokers.
1414

15-
## In most cases, the default value for `linger.ms` is good enough
16-
17-
* It means the delay in milliseconds to wait for messages in the producer queue to accumulate before constructing MessageSets to transmit to brokers.
18-
19-
1. For a `KafkaSyncProducer`, it sends the messages one by one (after `acks` response received), thus it could use `linger.ms=0` (as default) to eliminate the unnecessary waiting time.
20-
21-
2. For a `KafkaAsyncProduer`, a larger `linger.ms` could be used to accumulate messages for MessageSets to improve the performance -- it should be the result of balancing between throughput and latency.
22-
2315
## Calculate `batch.num.messages` with the average message size
2416

2517
* Default value: 10,000
@@ -46,16 +38,6 @@ You should not use a `KafkaSyncProducer` if you want to get a high throughput. T
4638

4739
* The `acks=all` setting will highly impact the throughput & latency, and it would be obvious if the traffic latency between kafka brokers is high. But it's mandatory if we want to achieve high availability.
4840

49-
## Determine the default sending buffer (according to the latency)
50-
51-
* Default queue.buffing setting,
52-
53-
* `queue.buffering.max.messages=1000000`
54-
55-
* `queue.buffering.max.kbytes=0x100000` (1GB)
56-
57-
* Normally, the default settings should be good enough.
58-
5941
## How could a message miss after send?
6042

6143
* The message might even not have been received by the partition leader! (with `acks=0`)
@@ -64,13 +46,3 @@ You should not use a `KafkaSyncProducer` if you want to get a high throughput. T
6446

6547
* Once the message received by the partition leader, the leader crashed just after responding to the producer, but with no in-sync replica to synchronize for the message. (with `acks=all`, while brokers are with `min.insync.replicas=1`)
6648

67-
## How does message disordering happen? How to avoid it?
68-
69-
* Take an example, -- a `KafkaAsyncProducer` just sent many messages, and a few of these messages (in the middle) failed to be delivered successfully. While the producer got the sending error, it might retry sending these messages again, thus causing the disordering.
70-
71-
* To avoid it. Two options,
72-
73-
* Use a `KafkaSyncProducer`, but this would severely impact the throughput.
74-
75-
* Embed some `sequence number` (e.g, record id, part of the `key`, etc) in the `ProducerRecord`, for de-duplication.
76-

doc/HowToMakeKafkaProducerReliable.md

Lines changed: 3 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ Here we'd focus on `KafkaProducer`, together with the `idempotence` feature. Let
6464

6565
### How to guarantee `Exactly Once`
6666

67-
* The `enable.idempotence` configuration would resolve such a problem. And this configuration is RECOMMENDED for both `KafkaSyncProducer` and `KafkaAsyncProducer`, as long as it's possible.
67+
* The `enable.idempotence` configuration is RECOMMENDED.
6868

6969

7070
## About `Ordering`
@@ -150,48 +150,12 @@ The `msgid` is used, (along with a base `msgid` value stored at the time the `PI
150150

151151
## Some examples
152152

153-
### `KafkaSyncProducer` demo
153+
### `KafkaProducer` demo
154154

155155
```cpp
156-
int ret = 0;
157156
std::atomic<bool> running = true;
158157

159-
KafkaSyncProducer producer(
160-
Properties({
161-
{ ProducerConfig::BOOTSTRAP_SERVERS, "192.168.0.1:9092,192.168.0.2:9092,192.168.0.3:9092" },
162-
{ ProducerConfig::ENABLE_IDEMPOTENCE, "true" },
163-
{ ProducerConfig::MESSAGE_TIMEOUT_MS, "60000"}
164-
})
165-
);
166-
167-
while (running) {
168-
const auto& msg = topMsgOfUpstream();
169-
try {
170-
auto record = ProducerRecord(topic, msg.key, msg.value);
171-
producer.send(record);
172-
popMsgFromUpstream();
173-
} catch (const KafkaException& e) {
174-
std::cerr << "Failed to send message! Reason: " << e.what() << std::endl;
175-
ret = e.error();
176-
break;
177-
}
178-
}
179-
180-
producer.close();
181-
182-
return ret;
183-
```
184-
185-
* It's easy to use `KafkaSyncProducer`, since it sends messages one by one.
186-
187-
* The throughput performance would not be that good, since there's only 1 message (embedded in 1 message batch) on the flight.
188-
189-
### `KafkaAsyncProducer` demo
190-
191-
```cpp
192-
std::atomic<bool> running = true;
193-
194-
KafkaAsyncProducer producer(
158+
KafkaProducer producer(
195159
Properties({
196160
{ ProducerConfig::BOOTSTRAP_SERVERS, "192.168.0.1:9092,192.168.0.2:9092,192.168.0.3:9092" },
197161
{ ProducerConfig::ENABLE_IDEMPOTENCE, "true" },

doc/KafkaProducerQuickStart.md

Lines changed: 17 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -4,57 +4,9 @@ Generally speaking, The `Modern C++ based Kafka API` is quite similar to the [Ka
44

55
We'd recommend users to cross-reference them, --especially the examples.
66

7-
Unlike Java's KafkaProducer, here we introduce two derived classes, -- `KafkaSyncProducer` and `KafkaAsyncProducer` --depending on different `send` behaviors (synchronous/asynchronous).
7+
## KafkaProducer
88

9-
## KafkaSyncProducer
10-
11-
* The "Sync" (in the name) means `send` is a blocking operation, and it will immediately get the RecordMetadata while the function returns. If anything wrong occurs, an exception would be thrown.
12-
13-
### Example
14-
```cpp
15-
// Create configuration object
16-
kafka::Properties props({
17-
{"bootstrap.servers", brokers},
18-
{"enable.idempotence", "true"},
19-
});
20-
21-
// Create a producer instance.
22-
kafka::KafkaSyncProducer producer(props);
23-
24-
// Read messages from stdin and produce to the broker.
25-
std::cout << "% Type message value and hit enter to produce message. (empty line to quit)" << std::endl;
26-
27-
for (std::string line; std::getline(std::cin, line);) {
28-
// The ProducerRecord doesn't own `line`, it is just a thin wrapper
29-
auto record = kafka::ProducerRecord(topic,
30-
kafka::NullKey,
31-
kafka::Value(line.c_str(), line.size()));
32-
33-
// Send the message.
34-
try {
35-
kafka::Producer::RecordMetadata metadata = producer.send(record);
36-
std::cout << "% Message delivered: " << metadata.toString() << std::endl;
37-
} catch (const kafka::KafkaException& e) {
38-
std::cerr << "% Message delivery failed: " << e.error().message() << std::endl;
39-
}
40-
41-
if (line.empty()) break;
42-
};
43-
44-
// producer.close(); // No explicit close is needed, RAII will take care of it
45-
```
46-
47-
* `ProducerConfig::BOOTSTRAP_SERVERS` is mandatory for ProducerConfig.
48-
49-
* `ProducerRecord` would not take any ownership for the `key` or `value`. Thus, the user must guarantee the memory block (pointed by `key` or `value`) is valid until being `send`.
50-
51-
* Since `send` is a blocking operation, the throughput will be highly impacted, but it is easier to make sure of the message delivery and logically it is simpler.
52-
53-
* At the end, the user could call `close` manually, or just leave it to the destructor (`close` would be called anyway).
54-
55-
## KafkaAsyncProducer
56-
57-
* The `Async` (in the name) means `send` is an unblocking operation, and the result (including errors) could only be got from the delivery callback.
9+
* The `send` is an unblocking operation, and the result (including errors) could only be got from the delivery callback.
5810

5911
### Example
6012
```cpp
@@ -65,7 +17,7 @@ Unlike Java's KafkaProducer, here we introduce two derived classes, -- `KafkaSyn
6517
});
6618

6719
// Create a producer instance.
68-
kafka::KafkaAsyncProducer producer(props);
20+
kafka::KafkaProducer producer(props);
6921

7022
// Read messages from stdin and produce to the broker.
7123
std::cout << "% Type message value and hit enter to produce message. (empty line to quit)" << std::endl;
@@ -92,27 +44,27 @@ Unlike Java's KafkaProducer, here we introduce two derived classes, -- `KafkaSyn
9244
}
9345
```
9446
95-
* Same with KafkaSyncProducer, the user must guarantee the memory block for `ProducerRecord`'s `key` is valid until being `send`.
47+
* User must guarantee the memory block for `ProducerRecord`'s `key` is valid until being `send`.
9648
9749
* By default, the memory block for `ProducerRecord`'s `value` must be valid until the delivery callback is called; Otherwise, the `send` should be with option `KafkaProducer::SendOption::ToCopyRecordValue`.
9850
9951
* It's guaranteed that the delivery callback would be triggered anyway after `send`, -- a producer would even be waiting for it before `close`. So, it's a good way to release these memory resources in the `Producer::Callback` function.
10052
101-
## KafkaAsyncProducer with `KafkaClient::EventsPollingOption::Manual`
53+
## `KafkaProducer` with `KafkaClient::EventsPollingOption::Manual`
10254
103-
While we construct a `KafkaAsyncProducer` with option `KafkaClient::EventsPollingOption::Auto` (default), an internal thread would be created for `MessageDelivery` callbacks handling.
55+
While we construct a `KafkaProducer` with option `KafkaClient::EventsPollingOption::Auto` (default), an internal thread would be created for `MessageDelivery` callbacks handling.
10456
10557
This might not be what you want, since then you have to use 2 different threads to send the messages and handle the `MessageDelivery` responses.
10658
10759
Here we have another choice, -- using `KafkaClient::EventsPollingOption::Manual`, thus the `MessageDelivery` callbacks would be called within member function `pollEvents()`.
10860
109-
* Note, if you constructed the `KafkaAsyncProducer` with `EventsPollingOption::Manual`, the `send()` would be an `unblocked` operation.
61+
* Note, if you constructed the `KafkaProducer` with `EventsPollingOption::Manual`, the `send()` would be an `unblocked` operation.
11062
I.e, once the `message buffering queue` becomes full, the `send()` operation would throw an exception (or return an `error code` with the input reference parameter), -- instead of blocking there.
11163
This makes sense, since you might want to call `pollEvents()` later, thus delivery-callback could be called for some messages (which could then be removed from the `message buffering queue`).
11264
11365
### Example
11466
```cpp
115-
kafak::KafkaAsyncProducer producer(props, KafkaClient::EventsPollingOption::Manual);
67+
kafak::KafkaProducer producer(props, KafkaClient::EventsPollingOption::Manual);
11668
11769
// Prepare "msgsToBeSent"
11870
auto std::map<int, std::pair<Key, Value>> msgsToBeSent = ...;
@@ -147,7 +99,7 @@ This makes sense, since you might want to call `pollEvents()` later, thus delive
14799

148100
### Example
149101
```cpp
150-
kafak::KafkaAsyncProducer producer(props);
102+
kafak::KafkaProducer producer(props);
151103

152104
auto record = kafka::ProducerRecord(topic, partition, Key(), Value());
153105

@@ -175,11 +127,11 @@ This makes sense, since you might want to call `pollEvents()` later, thus delive
175127
176128
## Error handling
177129
178-
Once an error occurs during `send()`, `KafkaSyncProducer` and `KafkaAsyncProducer` behave differently.
130+
`Error` might occur at different places while sending a message,
179131
180-
1. `KafkaSyncProducer` gets `std::error_code` by catching exceptions (with `error()` member function).
132+
1. A `KafkaException` would be triggered if `KafkaProducer` failed to trigger the send operation.
181133
182-
2. `KafkaAsyncProducer` gets `std::error_code` with delivery-callback (with a parameter of the callback function).
134+
2. Delivery `Error` would be passed through the delivery-callback.
183135
184136
There are 2 kinds of possible errors,
185137
@@ -211,10 +163,6 @@ If the cluster's configuration is with `auto.create.topics.enable=true`, the pro
211163
212164
Note, the default created topic may be not what you want (e.g, with `default.replication.factor=1` configuration as default, etc), thus causing other unexpected problems.
213165
214-
### What will happen after `ack` timeout?
215-
216-
If an ack failed to be received within `MESSAGE_TIMEOUT_MS`, an exception would be thrown for a KafkaSyncSend, or, an error code would be received by the delivery callback for a KafkaAsyncProducer.
217-
218166
### How to enhance the sending performance?
219167
220168
Enlarging the default `BATCH_NUM_MESSAGES` and `LINGER_MS` might improve message batching, thus enhancing the throughput.
@@ -231,15 +179,15 @@ Larger `QUEUE_BUFFERING_MAX_MESSAGES`/`QUEUE_BUFFERING_MAX_KBYTES` might help to
231179
232180
1. The Kafka cluster should be configured with `min.insync.replicas = 2` at least
233181
234-
2. Use a `KafkaSyncProducer` (with configuration `{ProducerConfig::ACKS, "all"}`); or use a `KafkaAsyncProducer` (with configuration `{ProducerConfig::ENABLE_IDEMPOTENCE, "true"}`), together with proper error-handling within the delivery callbacks.
182+
2. Configure the `KafkaProducer` with property `{ProducerConfig::ENABLE_IDEMPOTENCE, "true"}`, together with proper error-handling (within the delivery callback).
235183
236184
* Complete Answer,
237185
238186
* [How to Make KafkaProducer Reliable](HowToMakeKafkaProducerReliable.md)
239187
240188
### How many threads would be created by a KafkaProducer?
241189
242-
Excluding the user's main thread, KafkaSyncProducer would start another (N + 2) threads in the background, while `KafkaAsyncProducer` would start (N + 3) background threads. (N means the number of BOOTSTRAP_SERVERS)
190+
Excluding the user's main thread, `KafkaProducer` would start (N + 3) background threads. (N means the number of BOOTSTRAP_SERVERS)
243191
244192
Most of these background threads are started internally by librdkafka.
245193
@@ -249,15 +197,13 @@ Here is a brief introduction what they're used for,
249197
250198
2. Another 2 background threads would handle internal operations and kinds of timers, etc.
251199
252-
3. `KafkaAsyncProducer` has one more background thread to keep polling the delivery callback event.
200+
3. One more background thread to keep polling the delivery callback event.
253201
254-
E.g, if a KafkaSyncProducer was created with property of `BOOTSTRAP_SERVERS=127.0.0.1:8888,127.0.0.1:8889,127.0.0.1:8890`, it would take 6 threads in total (including the main thread).
202+
E.g, if a `KafkaProducer` was created with property of `BOOTSTRAP_SERVERS=127.0.0.1:8888,127.0.0.1:8889,127.0.0.1:8890`, it would take 7 threads in total (including the main thread).
255203
256204
### Which one of these threads will handle the callbacks
257205
258-
The `Producer::Callback` is only available for a `KafkaAsyncProducer`.
259-
260206
It will be handled by a background thread, not by the user's thread.
261207
262-
Note, should be careful if both the `KafkaAsyncProducer::send()` and the `Producer::Callback` might access the same container at the same time.
208+
Note, should be careful if both the `KafkaProducer::send()` and the `Producer::Callback` might access the same container at the same time.
263209

examples/kafka_async_producer_copy_payload.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ int main(int argc, char **argv)
2323
});
2424

2525
// Create a producer instance.
26-
kafka::KafkaAsyncProducer producer(props);
26+
kafka::KafkaProducer producer(props);
2727

2828
// Read messages from stdin and produce to the broker.
2929
std::cout << "% Type message value and hit enter to produce message. (empty line to quit)" << std::endl;

examples/kafka_async_producer_not_copy_payload.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ int main(int argc, char **argv)
2323
});
2424

2525
// Create a producer instance.
26-
kafka::KafkaAsyncProducer producer(props);
26+
kafka::KafkaProducer producer(props);
2727

2828
// Read messages from stdin and produce to the broker.
2929
std::cout << "% Type message value and hit enter to produce message. (empty line to quit)" << std::endl;

examples/kafka_sync_producer.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ int main(int argc, char **argv)
2222
});
2323

2424
// Create a producer instance.
25-
kafka::KafkaSyncProducer producer(props);
25+
kafka::KafkaProducer producer(props);
2626

2727
// Read messages from stdin and produce to the broker.
2828
std::cout << "% Type message value and hit enter to produce message. (empty line to quit)" << std::endl;
@@ -35,7 +35,7 @@ int main(int argc, char **argv)
3535

3636
// Send the message.
3737
try {
38-
kafka::Producer::RecordMetadata metadata = producer.send(record);
38+
kafka::Producer::RecordMetadata metadata = producer.syncSend(record);
3939
std::cout << "% Message delivered: " << metadata.toString() << std::endl;
4040
} catch (const kafka::KafkaException& e) {
4141
std::cerr << "% Message delivery failed: " << e.error().message() << std::endl;

0 commit comments

Comments
 (0)