Skip to content

Commit 90d2544

Browse files
committed
A new parameter for producer.send(...), -- ActionWhileQueueIsFull
1 parent b81072d commit 90d2544

File tree

2 files changed

+29
-17
lines changed

2 files changed

+29
-17
lines changed

include/kafka/KafkaProducer.h

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,11 @@ class KafkaProducer: public KafkaClient
6969
*/
7070
enum class SendOption { NoCopyRecordValue, ToCopyRecordValue };
7171

72+
/**
73+
* Choose the action while the sending buffer is full.
74+
*/
75+
enum class ActionWhileQueueIsFull { Block, NoBlock };
76+
7277
/**
7378
* Asynchronously send a record to a topic.
7479
*
@@ -87,7 +92,10 @@ class KafkaProducer: public KafkaClient
8792
* Broker errors,
8893
* - [Error Codes] (https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes)
8994
*/
90-
void send(const producer::ProducerRecord& record, const producer::Callback& deliveryCb, SendOption option = SendOption::NoCopyRecordValue);
95+
void send(const producer::ProducerRecord& record,
96+
const producer::Callback& deliveryCb,
97+
SendOption option = SendOption::NoCopyRecordValue,
98+
ActionWhileQueueIsFull action = ActionWhileQueueIsFull::Block);
9199

92100
/**
93101
* Asynchronously send a record to a topic.
@@ -107,9 +115,13 @@ class KafkaProducer: public KafkaClient
107115
* Broker errors,
108116
* - [Error Codes] (https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes)
109117
*/
110-
void send(const producer::ProducerRecord& record, const producer::Callback& deliveryCb, Error& error, SendOption option = SendOption::NoCopyRecordValue)
118+
void send(const producer::ProducerRecord& record,
119+
const producer::Callback& deliveryCb,
120+
Error& error,
121+
SendOption option = SendOption::NoCopyRecordValue,
122+
ActionWhileQueueIsFull action = ActionWhileQueueIsFull::Block)
111123
{
112-
try { send(record, deliveryCb, option); } catch (const KafkaException& e) { error = e.error(); }
124+
try { send(record, deliveryCb, option, action); } catch (const KafkaException& e) { error = e.error(); }
113125
}
114126

115127
/**
@@ -183,8 +195,6 @@ class KafkaProducer: public KafkaClient
183195
const producer::Callback _deliveryCb;
184196
};
185197

186-
enum class ActionWhileQueueIsFull { Block, NoBlock };
187-
188198
// Validate properties (and fix it if necesary)
189199
static Properties validateAndReformProperties(const Properties& properties);
190200

@@ -320,10 +330,11 @@ KafkaProducer::deliveryCallback(rd_kafka_t* rk, const rd_kafka_message_t* rkmsg,
320330
inline void
321331
KafkaProducer::send(const producer::ProducerRecord& record,
322332
const producer::Callback& deliveryCb,
323-
SendOption option)
333+
SendOption option,
334+
ActionWhileQueueIsFull action)
324335
{
325336
auto deliveryCbOpaque = std::make_unique<DeliveryCbOpaque>(record.id(), deliveryCb);
326-
auto queueFullAction = (isWithAutoEventsPolling() ? ActionWhileQueueIsFull::Block : ActionWhileQueueIsFull::NoBlock);
337+
auto queueFullAction = (isWithAutoEventsPolling() ? action : ActionWhileQueueIsFull::NoBlock);
327338

328339
const auto* topic = record.topic().c_str();
329340
const auto partition = record.partition();

include/kafka/addons/KafkaRecoverableProducer.h

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -205,14 +205,14 @@ class KafkaRecoverableProducer
205205
* Broker errors,
206206
* - [Error Codes] (https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes)
207207
*/
208-
void send(const producer::ProducerRecord& record,
209-
const producer::Callback& deliveryCb,
210-
KafkaProducer::SendOption option = KafkaProducer::SendOption::NoCopyRecordValue)
208+
void send(const producer::ProducerRecord& record,
209+
const producer::Callback& deliveryCb,
210+
KafkaProducer::SendOption option = KafkaProducer::SendOption::NoCopyRecordValue,
211+
KafkaProducer::ActionWhileQueueIsFull action = KafkaProducer::ActionWhileQueueIsFull::Block)
211212
{
212213
std::lock_guard<std::mutex> lock(_producerMutex);
213214

214-
_producer->send(record, deliveryCb, option);
215-
215+
_producer->send(record, deliveryCb, option, action);
216216
}
217217

218218
/**
@@ -234,14 +234,15 @@ class KafkaRecoverableProducer
234234
* - [Error Codes] (https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes)
235235
*/
236236

237-
void send(const producer::ProducerRecord& record,
238-
const producer::Callback& deliveryCb,
239-
Error& error,
240-
KafkaProducer::SendOption option = KafkaProducer::SendOption::NoCopyRecordValue)
237+
void send(const producer::ProducerRecord& record,
238+
const producer::Callback& deliveryCb,
239+
Error& error,
240+
KafkaProducer::SendOption option = KafkaProducer::SendOption::NoCopyRecordValue,
241+
KafkaProducer::ActionWhileQueueIsFull action = KafkaProducer::ActionWhileQueueIsFull::Block)
241242
{
242243
std::lock_guard<std::mutex> lock(_producerMutex);
243244

244-
_producer->send(record, deliveryCb, error, option);
245+
_producer->send(record, deliveryCb, error, option, action);
245246
}
246247

247248
/**

0 commit comments

Comments
 (0)