@@ -69,6 +69,11 @@ class KafkaProducer: public KafkaClient
69
69
*/
70
70
enum class SendOption { NoCopyRecordValue, ToCopyRecordValue };
71
71
72
+ /* *
73
+ * Choose the action while the sending buffer is full.
74
+ */
75
+ enum class ActionWhileQueueIsFull { Block, NoBlock };
76
+
72
77
/* *
73
78
* Asynchronously send a record to a topic.
74
79
*
@@ -87,7 +92,10 @@ class KafkaProducer: public KafkaClient
87
92
* Broker errors,
88
93
* - [Error Codes] (https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes)
89
94
*/
90
- void send (const producer::ProducerRecord& record, const producer::Callback& deliveryCb, SendOption option = SendOption::NoCopyRecordValue);
95
+ void send (const producer::ProducerRecord& record,
96
+ const producer::Callback& deliveryCb,
97
+ SendOption option = SendOption::NoCopyRecordValue,
98
+ ActionWhileQueueIsFull action = ActionWhileQueueIsFull::Block);
91
99
92
100
/* *
93
101
* Asynchronously send a record to a topic.
@@ -107,9 +115,13 @@ class KafkaProducer: public KafkaClient
107
115
* Broker errors,
108
116
* - [Error Codes] (https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes)
109
117
*/
110
- void send (const producer::ProducerRecord& record, const producer::Callback& deliveryCb, Error& error, SendOption option = SendOption::NoCopyRecordValue)
118
+ void send (const producer::ProducerRecord& record,
119
+ const producer::Callback& deliveryCb,
120
+ Error& error,
121
+ SendOption option = SendOption::NoCopyRecordValue,
122
+ ActionWhileQueueIsFull action = ActionWhileQueueIsFull::Block)
111
123
{
112
- try { send (record, deliveryCb, option); } catch (const KafkaException& e) { error = e.error (); }
124
+ try { send (record, deliveryCb, option, action ); } catch (const KafkaException& e) { error = e.error (); }
113
125
}
114
126
115
127
/* *
@@ -183,8 +195,6 @@ class KafkaProducer: public KafkaClient
183
195
const producer::Callback _deliveryCb;
184
196
};
185
197
186
- enum class ActionWhileQueueIsFull { Block, NoBlock };
187
-
188
198
// Validate properties (and fix it if necesary)
189
199
static Properties validateAndReformProperties (const Properties& properties);
190
200
@@ -320,10 +330,11 @@ KafkaProducer::deliveryCallback(rd_kafka_t* rk, const rd_kafka_message_t* rkmsg,
320
330
inline void
321
331
KafkaProducer::send (const producer::ProducerRecord& record,
322
332
const producer::Callback& deliveryCb,
323
- SendOption option)
333
+ SendOption option,
334
+ ActionWhileQueueIsFull action)
324
335
{
325
336
auto deliveryCbOpaque = std::make_unique<DeliveryCbOpaque>(record.id (), deliveryCb);
326
- auto queueFullAction = (isWithAutoEventsPolling () ? ActionWhileQueueIsFull::Block : ActionWhileQueueIsFull::NoBlock);
337
+ auto queueFullAction = (isWithAutoEventsPolling () ? action : ActionWhileQueueIsFull::NoBlock);
327
338
328
339
const auto * topic = record.topic ().c_str ();
329
340
const auto partition = record.partition ();
0 commit comments