You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Copy file name to clipboardExpand all lines: doc/GoodPracticesToUseKafkaProducer.md
+2-30Lines changed: 2 additions & 30 deletions
Display the source diff
Display the rich diff
Original file line number
Diff line number
Diff line change
@@ -2,24 +2,16 @@
2
2
3
3
If we want to achieve high performance/availability, here're some rules of thumb.
4
4
5
-
## Use a `KafkaAsyncProducer` for better throughput
5
+
## Avoid using `syncSend` for better throughput
6
6
7
-
You should not use a `KafkaSyncProducer` if you want to get a high throughput. The `Sync` means the `send` operation is a synchronous operation, and would not go on until the `acks` are received.
7
+
You should never call `syncSend` if you want to get a high throughput. The `syncSend` is a synchronous operation, and would not go on until the `acks` are received.
8
8
9
9
## The `message.max.bytes` must be consistent with Kafka servers' setting
10
10
11
11
* Default value: 1000,000
12
12
13
13
* The default setting for brokers is `message.max.bytes = 1000012`, and do MAKE SURE the client side setting no larger than it. Otherwise, it might construct a MessageSet which would be rejected (error: INVALID_MESSAGE_SIZE) by brokers.
14
14
15
-
## In most cases, the default value for `linger.ms` is good enough
16
-
17
-
* It means the delay in milliseconds to wait for messages in the producer queue to accumulate before constructing MessageSets to transmit to brokers.
18
-
19
-
1. For a `KafkaSyncProducer`, it sends the messages one by one (after `acks` response received), thus it could use `linger.ms=0` (as default) to eliminate the unnecessary waiting time.
20
-
21
-
2. For a `KafkaAsyncProduer`, a larger `linger.ms` could be used to accumulate messages for MessageSets to improve the performance -- it should be the result of balancing between throughput and latency.
22
-
23
15
## Calculate `batch.num.messages` with the average message size
24
16
25
17
* Default value: 10,000
@@ -46,16 +38,6 @@ You should not use a `KafkaSyncProducer` if you want to get a high throughput. T
46
38
47
39
* The `acks=all` setting will highly impact the throughput & latency, and it would be obvious if the traffic latency between kafka brokers is high. But it's mandatory if we want to achieve high availability.
48
40
49
-
## Determine the default sending buffer (according to the latency)
50
-
51
-
* Default queue.buffing setting,
52
-
53
-
*`queue.buffering.max.messages=1000000`
54
-
55
-
*`queue.buffering.max.kbytes=0x100000` (1GB)
56
-
57
-
* Normally, the default settings should be good enough.
58
-
59
41
## How could a message miss after send?
60
42
61
43
* The message might even not have been received by the partition leader! (with `acks=0`)
@@ -64,13 +46,3 @@ You should not use a `KafkaSyncProducer` if you want to get a high throughput. T
64
46
65
47
* Once the message received by the partition leader, the leader crashed just after responding to the producer, but with no in-sync replica to synchronize for the message. (with `acks=all`, while brokers are with `min.insync.replicas=1`)
66
48
67
-
## How does message disordering happen? How to avoid it?
68
-
69
-
* Take an example, -- a `KafkaAsyncProducer` just sent many messages, and a few of these messages (in the middle) failed to be delivered successfully. While the producer got the sending error, it might retry sending these messages again, thus causing the disordering.
70
-
71
-
* To avoid it. Two options,
72
-
73
-
* Use a `KafkaSyncProducer`, but this would severely impact the throughput.
74
-
75
-
* Embed some `sequence number` (e.g, record id, part of the `key`, etc) in the `ProducerRecord`, for de-duplication.
Copy file name to clipboardExpand all lines: doc/HowToMakeKafkaProducerReliable.md
+3-39Lines changed: 3 additions & 39 deletions
Display the source diff
Display the rich diff
Original file line number
Diff line number
Diff line change
@@ -64,7 +64,7 @@ Here we'd focus on `KafkaProducer`, together with the `idempotence` feature. Let
64
64
65
65
### How to guarantee `Exactly Once`
66
66
67
-
* The `enable.idempotence` configuration would resolve such a problem. And this configuration is RECOMMENDED for both `KafkaSyncProducer` and `KafkaAsyncProducer`, as long as it's possible.
67
+
* The `enable.idempotence` configuration is RECOMMENDED.
68
68
69
69
70
70
## About `Ordering`
@@ -150,48 +150,12 @@ The `msgid` is used, (along with a base `msgid` value stored at the time the `PI
Copy file name to clipboardExpand all lines: doc/KafkaProducerQuickStart.md
+17-71Lines changed: 17 additions & 71 deletions
Display the source diff
Display the rich diff
Original file line number
Diff line number
Diff line change
@@ -4,57 +4,9 @@ Generally speaking, The `Modern C++ based Kafka API` is quite similar to the [Ka
4
4
5
5
We'd recommend users to cross-reference them, --especially the examples.
6
6
7
-
Unlike Java's KafkaProducer, here we introduce two derived classes, -- `KafkaSyncProducer` and `KafkaAsyncProducer` --depending on different `send` behaviors (synchronous/asynchronous).
7
+
## KafkaProducer
8
8
9
-
## KafkaSyncProducer
10
-
11
-
* The "Sync" (in the name) means `send` is a blocking operation, and it will immediately get the RecordMetadata while the function returns. If anything wrong occurs, an exception would be thrown.
12
-
13
-
### Example
14
-
```cpp
15
-
// Create configuration object
16
-
kafka::Properties props({
17
-
{"bootstrap.servers", brokers},
18
-
{"enable.idempotence", "true"},
19
-
});
20
-
21
-
// Create a producer instance.
22
-
kafka::KafkaSyncProducer producer(props);
23
-
24
-
// Read messages from stdin and produce to the broker.
25
-
std::cout << "% Type message value and hit enter to produce message. (empty line to quit)" << std::endl;
26
-
27
-
for (std::string line; std::getline(std::cin, line);) {
28
-
// The ProducerRecord doesn't own `line`, it is just a thin wrapper
// producer.close(); // No explicit close is needed, RAII will take care of it
45
-
```
46
-
47
-
* `ProducerConfig::BOOTSTRAP_SERVERS` is mandatory for ProducerConfig.
48
-
49
-
* `ProducerRecord` would not take any ownership for the `key` or `value`. Thus, the user must guarantee the memory block (pointed by `key` or `value`) is valid until being `send`.
50
-
51
-
* Since `send` is a blocking operation, the throughput will be highly impacted, but it is easier to make sure of the message delivery and logically it is simpler.
52
-
53
-
* At the end, the user could call `close` manually, or just leave it to the destructor (`close` would be called anyway).
54
-
55
-
## KafkaAsyncProducer
56
-
57
-
* The `Async` (in the name) means `send` is an unblocking operation, and the result (including errors) could only be got from the delivery callback.
9
+
* The `send` is an unblocking operation, and the result (including errors) could only be got from the delivery callback.
58
10
59
11
### Example
60
12
```cpp
@@ -65,7 +17,7 @@ Unlike Java's KafkaProducer, here we introduce two derived classes, -- `KafkaSyn
65
17
});
66
18
67
19
// Create a producer instance.
68
-
kafka::KafkaAsyncProducer producer(props);
20
+
kafka::KafkaProducer producer(props);
69
21
70
22
// Read messages from stdin and produce to the broker.
71
23
std::cout << "% Type message value and hit enter to produce message. (empty line to quit)" << std::endl;
@@ -92,27 +44,27 @@ Unlike Java's KafkaProducer, here we introduce two derived classes, -- `KafkaSyn
92
44
}
93
45
```
94
46
95
-
*Same with KafkaSyncProducer, the user must guarantee the memory block for `ProducerRecord`'s `key` is valid until being `send`.
47
+
* User must guarantee the memory block for `ProducerRecord`'s `key` is valid until being `send`.
96
48
97
49
* By default, the memory block for `ProducerRecord`'s `value` must be valid until the delivery callback is called; Otherwise, the `send` should be with option `KafkaProducer::SendOption::ToCopyRecordValue`.
98
50
99
51
* It's guaranteed that the delivery callback would be triggered anyway after `send`, -- a producer would even be waiting for it before `close`. So, it's a good way to release these memory resources in the `Producer::Callback` function.
100
52
101
-
## KafkaAsyncProducer with `KafkaClient::EventsPollingOption::Manual`
53
+
## `KafkaProducer` with `KafkaClient::EventsPollingOption::Manual`
102
54
103
-
While we construct a `KafkaAsyncProducer` with option `KafkaClient::EventsPollingOption::Auto` (default), an internal thread would be created for `MessageDelivery` callbacks handling.
55
+
While we construct a `KafkaProducer` with option `KafkaClient::EventsPollingOption::Auto` (default), an internal thread would be created for `MessageDelivery` callbacks handling.
104
56
105
57
This might not be what you want, since then you have to use 2 different threads to send the messages and handle the `MessageDelivery` responses.
106
58
107
59
Here we have another choice, -- using `KafkaClient::EventsPollingOption::Manual`, thus the `MessageDelivery` callbacks would be called within member function `pollEvents()`.
108
60
109
-
* Note, if you constructed the `KafkaAsyncProducer` with `EventsPollingOption::Manual`, the `send()` would be an `unblocked` operation.
61
+
* Note, if you constructed the `KafkaProducer` with `EventsPollingOption::Manual`, the `send()` would be an `unblocked` operation.
110
62
I.e, once the `message buffering queue` becomes full, the `send()` operation would throw an exception (or return an `error code` with the input reference parameter), -- instead of blocking there.
111
63
This makes sense, since you might want to call `pollEvents()` later, thus delivery-callback could be called for some messages (which could then be removed from the `message buffering queue`).
auto std::map<int, std::pair<Key, Value>> msgsToBeSent = ...;
@@ -147,7 +99,7 @@ This makes sense, since you might want to call `pollEvents()` later, thus delive
147
99
148
100
### Example
149
101
```cpp
150
-
kafak::KafkaAsyncProducer producer(props);
102
+
kafak::KafkaProducerproducer(props);
151
103
152
104
auto record = kafka::ProducerRecord(topic, partition, Key(), Value());
153
105
@@ -175,11 +127,11 @@ This makes sense, since you might want to call `pollEvents()` later, thus delive
175
127
176
128
## Error handling
177
129
178
-
Once an error occurs during `send()`, `KafkaSyncProducer` and `KafkaAsyncProducer` behave differently.
130
+
`Error` might occur at different places while sending a message,
179
131
180
-
1.`KafkaSyncProducer` gets `std::error_code` by catching exceptions (with `error()` member function).
132
+
1. A `KafkaException` would be triggered if `KafkaProducer` failed to trigger the send operation.
181
133
182
-
2.`KafkaAsyncProducer` gets `std::error_code` with delivery-callback (with a parameter of the callback function).
134
+
2. Delivery `Error` would be passed through the delivery-callback.
183
135
184
136
There are 2 kinds of possible errors,
185
137
@@ -211,10 +163,6 @@ If the cluster's configuration is with `auto.create.topics.enable=true`, the pro
211
163
212
164
Note, the default created topic may be not what you want (e.g, with `default.replication.factor=1` configuration as default, etc), thus causing other unexpected problems.
213
165
214
-
### What will happen after `ack` timeout?
215
-
216
-
If an ack failed to be received within `MESSAGE_TIMEOUT_MS`, an exception would be thrown for a KafkaSyncSend, or, an error code would be received by the delivery callback for a KafkaAsyncProducer.
217
-
218
166
### How to enhance the sending performance?
219
167
220
168
Enlarging the default `BATCH_NUM_MESSAGES` and `LINGER_MS` might improve message batching, thus enhancing the throughput.
@@ -231,15 +179,15 @@ Larger `QUEUE_BUFFERING_MAX_MESSAGES`/`QUEUE_BUFFERING_MAX_KBYTES` might help to
231
179
232
180
1. The Kafka cluster should be configured with `min.insync.replicas = 2` at least
233
181
234
-
2.Use a `KafkaSyncProducer` (with configuration`{ProducerConfig::ACKS, "all"}`); or use a `KafkaAsyncProducer` (with configuration `{ProducerConfig::ENABLE_IDEMPOTENCE, "true"}`), together with proper error-handling within the delivery callbacks.
182
+
2. Configure the `KafkaProducer` with property `{ProducerConfig::ENABLE_IDEMPOTENCE, "true"}`, together with proper error-handling (within the delivery callback).
235
183
236
184
* Complete Answer,
237
185
238
186
* [How to Make KafkaProducer Reliable](HowToMakeKafkaProducerReliable.md)
239
187
240
188
### How many threads would be created by a KafkaProducer?
241
189
242
-
Excluding the user's main thread, KafkaSyncProducer would start another (N + 2) threads in the background, while `KafkaAsyncProducer` would start (N + 3) background threads. (N means the number of BOOTSTRAP_SERVERS)
190
+
Excluding the user's main thread, `KafkaProducer` would start (N + 3) background threads. (N means the number of BOOTSTRAP_SERVERS)
243
191
244
192
Most of these background threads are started internally by librdkafka.
245
193
@@ -249,15 +197,13 @@ Here is a brief introduction what they're used for,
249
197
250
198
2. Another 2 background threads would handle internal operations and kinds of timers, etc.
251
199
252
-
3.`KafkaAsyncProducer` has one more background thread to keep polling the delivery callback event.
200
+
3. One more background thread to keep polling the delivery callback event.
253
201
254
-
E.g, if a KafkaSyncProducer was created with property of `BOOTSTRAP_SERVERS=127.0.0.1:8888,127.0.0.1:8889,127.0.0.1:8890`, it would take 6 threads in total (including the main thread).
202
+
E.g, if a `KafkaProducer` was created with property of `BOOTSTRAP_SERVERS=127.0.0.1:8888,127.0.0.1:8889,127.0.0.1:8890`, it would take 7 threads in total (including the main thread).
255
203
256
204
### Which one of these threads will handle the callbacks
257
205
258
-
The `Producer::Callback` is only available for a `KafkaAsyncProducer`.
259
-
260
206
It will be handled by a background thread, not by the user's thread.
261
207
262
-
Note, should be careful if both the `KafkaAsyncProducer::send()` and the `Producer::Callback` might access the same container at the same time.
208
+
Note, should be careful if both the `KafkaProducer::send()` and the `Producer::Callback` might access the same container at the same time.
0 commit comments