Skip to content

Commit ea5d02c

Browse files
committed
Add error notification for some testcases
1 parent 785213e commit ea5d02c

File tree

8 files changed

+105
-28
lines changed

8 files changed

+105
-28
lines changed

include/kafka/Error.h

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,23 @@ class Error
7373
(_rkError ? rd_kafka_error_string(_rkError.get()) : rd_kafka_err2str(_respErr));
7474
}
7575

76+
/**
77+
* Detailed error string.
78+
*/
79+
std::string toString() const
80+
{
81+
std::ostringstream oss;
82+
83+
oss << rd_kafka_err2str(static_cast<rd_kafka_resp_err_t>(value())) << " [" << value() << "]";
84+
85+
if (auto fatal = isFatal()) oss << " | " << (*fatal ? "fatal" : "non-fatal");
86+
if (transactionRequiresAbort()) oss << " | transaction-requires-abort";
87+
if (auto retriable = isRetriable()) oss << " | " << (*retriable ? "retriable" : "non-retriable");
88+
if (_message) oss << " | " << *_message;
89+
90+
return oss.str();
91+
}
92+
7693
/**
7794
* Fatal error indicates that the client instance is no longer usable.
7895
*/
@@ -89,6 +106,18 @@ class Error
89106
return _rkError ? rd_kafka_error_is_retriable(_rkError.get()) : Optional<bool>{};
90107
}
91108

109+
/**
110+
* Show whether the error is an abortable transaction error.
111+
*
112+
* Note:
113+
* 1. Only valid for transactional API.
114+
* 2. If `true`, the producer must call `abortTransaction` and start a new transaction with `beginTransaction` to proceed with transactions.
115+
*/
116+
bool transactionRequiresAbort() const
117+
{
118+
return _rkError ? rd_kafka_error_txn_requires_abort(_rkError.get()) : false;
119+
}
120+
92121
private:
93122
rd_kafka_error_shared_ptr _rkError; // For error with rich info
94123
rd_kafka_resp_err_t _respErr{}; // For error with a simple response code

include/kafka/KafkaException.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,7 @@ class KafkaException: public std::exception
3838
*/
3939
const char* what() const noexcept override
4040
{
41-
_what = Utility::getLocalTimeString(_when) + ": " + _error->message() + " [" + std::to_string(_error->value())
42-
+ "] (" + std::string(_filename) + ":" + std::to_string(_lineno) + ")";
41+
_what = Utility::getLocalTimeString(_when) + ": " + _error->toString() + " (" + std::string(_filename) + ":" + std::to_string(_lineno) + ")";
4342
return _what.c_str();
4443
}
4544

tests/integration/TestKafkaConsumer.cc

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1861,12 +1861,46 @@ TEST(KafkaAutoCommitConsumer, AutoCreateTopics)
18611861
KafkaAutoCommitConsumer consumer(KafkaTestUtility::GetKafkaClientCommonConfig()
18621862
.put("allow.auto.create.topics", "true"));
18631863

1864+
// The error would be triggered while consumer tries to subscribe a non-existed topic.
1865+
consumer.setErrorCallback([](const Error& error) {
1866+
std::cout << "consumer met an error: " << error.toString() << std::endl;
1867+
EXPECT_EQ(RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART, error.value());
1868+
});
1869+
18641870
// Subscribe topics, but would never make it!
1865-
EXPECT_KAFKA_THROW(consumer.subscribe({topic}, Consumer::NullRebalanceCallback, std::chrono::seconds(10)), RD_KAFKA_RESP_ERR__TIMED_OUT);
1871+
EXPECT_KAFKA_THROW(consumer.subscribe({topic}, Consumer::NullRebalanceCallback, std::chrono::seconds(10)),
1872+
RD_KAFKA_RESP_ERR__TIMED_OUT);
18661873

18671874
EXPECT_TRUE(consumer.assignment().empty());
18681875
}
18691876

1877+
TEST(KafkaAutoCommitConsumer, CreateTopicAfterSubscribe)
1878+
{
1879+
const Topic topic = Utility::getRandomString();
1880+
1881+
auto createTopicAfterSeconds = [topic](int seconds) {
1882+
std::this_thread::sleep_for(std::chrono::seconds(seconds));
1883+
KafkaTestUtility::CreateKafkaTopic(topic, 1, 1);
1884+
};
1885+
1886+
KafkaAutoCommitConsumer consumer(KafkaTestUtility::GetKafkaClientCommonConfig());
1887+
1888+
// The error would be triggered while consumer tries to subscribe a non-existed topic.
1889+
consumer.setErrorCallback([](const Error& error) {
1890+
KafkaTestUtility::DumpError(error);
1891+
EXPECT_EQ(RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART, error.value());
1892+
});
1893+
1894+
// The topic would be created after 5 seconds
1895+
KafkaTestUtility::JoiningThread consumer1Thread(createTopicAfterSeconds, 5);
1896+
1897+
std::cout << "[" << Utility::getCurrentTime() << "] Consumer will subscribe" << std::endl;
1898+
EXPECT_KAFKA_NO_THROW(consumer.subscribe({topic}));
1899+
std::cout << "[" << Utility::getCurrentTime() << "] Consumer just subscribed" << std::endl;
1900+
1901+
EXPECT_FALSE(consumer.assignment().empty());
1902+
}
1903+
18701904
TEST(KafkaAutoCommitConsumer, CooperativeRebalance)
18711905
{
18721906
constexpr int NUM_TOPICS = 3;

tests/robustness/TestKafkaConsumer.cc

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ TEST(KafkaManualCommitConsumer, AlwaysFinishClosing_ManuallyPollEvents)
4242
{
4343
// Start a consumer (which need to call `pollEvents()` to trigger the commit callback)
4444
KafkaManualCommitConsumer consumer(props, KafkaClient::EventsPollingOption::Manual);
45+
consumer.setErrorCallback(KafkaTestUtility::DumpError);
4546
std::cout << "[" << Utility::getCurrentTime() << "] " << consumer.name() << " started" << std::endl;
4647

4748
// Subscribe the topic
@@ -106,6 +107,7 @@ TEST(KafkaManualCommitConsumer, CommitOffsetWhileBrokersStop)
106107
{
107108
// Start a consumer
108109
KafkaManualCommitConsumer consumer(props);
110+
consumer.setErrorCallback(KafkaTestUtility::DumpError);
109111
std::cout << "[" << Utility::getCurrentTime() << "] " << consumer.name() << " started" << std::endl;
110112

111113
// Subscribe th topic
@@ -165,6 +167,7 @@ TEST(KafkaAutoCommitConsumer, BrokerStopBeforeConsumerStart)
165167

166168
// Start the consumer
167169
KafkaAutoCommitConsumer consumer(props);
170+
consumer.setErrorCallback(KafkaTestUtility::DumpError);
168171
std::cout << "[" << Utility::getCurrentTime() << "] " << consumer.name() << " started" << std::endl;
169172

170173

@@ -202,21 +205,21 @@ TEST(KafkaAutoCommitConsumer, BrokerStopBeforeConsumerStart)
202205

203206
TEST(KafkaAutoCommitConsumer, BrokerStopBeforeSubscription)
204207
{
208+
const Topic topic = Utility::getRandomString();
209+
KafkaTestUtility::CreateKafkaTopic(topic, 5, 3);
210+
205211
// Consumer properties
206212
const auto props = KafkaTestUtility::GetKafkaClientCommonConfig()
207213
.put(ConsumerConfig::SESSION_TIMEOUT_MS, "30000")
208214
.put(ConsumerConfig::ENABLE_PARTITION_EOF, "true");
209215

210216
// Start the consumer
211217
KafkaAutoCommitConsumer consumer(props);
218+
consumer.setErrorCallback(KafkaTestUtility::DumpError);
212219
std::cout << "[" << Utility::getCurrentTime() << "] " << consumer.name() << " started" << std::endl;
213220

214221
// Pause the brokers for a while
215222
auto asyncTask = KafkaTestUtility::PauseBrokersForAWhile(std::chrono::seconds(5));
216-
217-
const Topic topic = Utility::getRandomString();
218-
KafkaTestUtility::CreateKafkaTopic(topic, 5, 3);
219-
220223
TopicPartitions assignment;
221224
// In some corner cases, the assigned partitions might be empty (due to "Local: Broker node update" error), and we'll retry
222225
while (assignment.empty())
@@ -261,6 +264,7 @@ TEST(KafkaAutoCommitConsumer, BrokerStopBeforeSeek)
261264

262265
// Start the consumer
263266
KafkaAutoCommitConsumer consumer(props);
267+
consumer.setErrorCallback(KafkaTestUtility::DumpError);
264268
std::cout << "[" << Utility::getCurrentTime() << "] " << consumer.name() << " started" << std::endl;
265269

266270
// Subscribe the topic
@@ -325,6 +329,7 @@ TEST(KafkaAutoCommitConsumer, BrokerStopDuringMsgPoll)
325329

326330
// Start the consumer
327331
KafkaAutoCommitConsumer consumer(props);
332+
consumer.setErrorCallback(KafkaTestUtility::DumpError);
328333
std::cout << "[" << Utility::getCurrentTime() << "] " << consumer.name() << " started" << std::endl;
329334

330335
// Subscribe the topic

tests/robustness/TestKafkaProducer.cc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ TEST(KafkaSyncProducer, RecordTimestamp)
3232

3333
// Prepare a producer
3434
KafkaSyncProducer producer(KafkaTestUtility::GetKafkaClientCommonConfig());
35+
producer.setErrorCallback(KafkaTestUtility::DumpError);
3536

3637
constexpr int TIME_LAPSE_THRESHOLD_MS = 1000;
3738
using namespace std::chrono;
@@ -124,6 +125,7 @@ TEST(KafkaAsyncProducer, NoMissedDeliveryCallback)
124125

125126
{
126127
KafkaAsyncProducer producer(KafkaTestUtility::GetKafkaClientCommonConfig().put(ProducerConfig::MESSAGE_TIMEOUT_MS, "5000"));
128+
producer.setErrorCallback(KafkaTestUtility::DumpError);
127129

128130
// Pause the brokers for a while
129131
auto asyncTask = KafkaTestUtility::PauseBrokersForAWhile(std::chrono::seconds(5));
@@ -157,6 +159,7 @@ TEST(KafkaAsyncProducer, MightMissDeliveryCallbackIfCloseWithLimitedTimeout)
157159
std::size_t deliveryCount = 0;
158160
{
159161
KafkaAsyncProducer producer(KafkaTestUtility::GetKafkaClientCommonConfig());
162+
producer.setErrorCallback(KafkaTestUtility::DumpError);
160163

161164
KafkaTestUtility::PauseBrokers();
162165

@@ -202,6 +205,7 @@ TEST(KafkaAsyncProducer, BrokerStopWhileSendingMessages)
202205
std::size_t deliveryCount = 0;
203206
{
204207
KafkaAsyncProducer producer(KafkaTestUtility::GetKafkaClientCommonConfig());
208+
producer.setErrorCallback(KafkaTestUtility::DumpError);
205209

206210
// Pause the brokers for a while (shorter then the default "MESSAGE_TIMEOUT_MS" for producer, which is 10 seconds)
207211
auto asyncTask = KafkaTestUtility::PauseBrokersForAWhile(std::chrono::seconds(5));
@@ -249,6 +253,7 @@ TEST(KafkaAsyncProducer, Send_AckTimeout)
249253
{
250254
KafkaAsyncProducer producer(KafkaTestUtility::GetKafkaClientCommonConfig()
251255
.put(ProducerConfig::MESSAGE_TIMEOUT_MS, "3000")); // If with no response, the delivery would fail in a short time
256+
producer.setErrorCallback(KafkaTestUtility::DumpError);
252257

253258
// Pause the brokers for a while
254259
auto asyncTask = KafkaTestUtility::PauseBrokersForAWhile(std::chrono::seconds(5));
@@ -287,6 +292,7 @@ TEST(KafkaAsyncProducer, ManuallyPollEvents_AckTimeout)
287292
KafkaAsyncProducer producer(KafkaTestUtility::GetKafkaClientCommonConfig()
288293
.put(ProducerConfig::MESSAGE_TIMEOUT_MS, "3000"), // If with no response, the delivery would fail in a short time
289294
KafkaClient::EventsPollingOption::Manual); // Manually call `pollEvents()`
295+
producer.setErrorCallback(KafkaTestUtility::DumpError);
290296

291297
// Pause the brokers for a while
292298
auto asyncTask = KafkaTestUtility::PauseBrokersForAWhile(std::chrono::seconds(5));
@@ -333,6 +339,7 @@ TEST(KafkaAsyncProducer, ManuallyPollEvents_AlwaysFinishClosing)
333339
KafkaAsyncProducer producer(KafkaTestUtility::GetKafkaClientCommonConfig()
334340
.put(ProducerConfig::MESSAGE_TIMEOUT_MS, "3000"), // If with no response, the delivery would fail in a short time
335341
KafkaClient::EventsPollingOption::Manual); // Manually call `pollEvents()`
342+
producer.setErrorCallback(KafkaTestUtility::DumpError);
336343

337344
// Pause the brokers for a while
338345
auto asyncTask = KafkaTestUtility::PauseBrokersForAWhile(std::chrono::seconds(5));
@@ -362,6 +369,7 @@ TEST(KafkaSyncProducer, Send_AckTimeout)
362369
KafkaTestUtility::CreateKafkaTopic(topic, 5, 3);
363370

364371
KafkaSyncProducer producer(KafkaTestUtility::GetKafkaClientCommonConfig().put(ProducerConfig::MESSAGE_TIMEOUT_MS, "3000"));
372+
producer.setErrorCallback(KafkaTestUtility::DumpError);
365373

366374
// Pause the brokers for a while
367375
auto asyncTask = KafkaTestUtility::PauseBrokersForAWhile(std::chrono::seconds(5));

tests/robustness/TestTransaction.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ TEST(Transaction, DeliveryFailure)
2626
KafkaAsyncProducer producer(KafkaTestUtility::GetKafkaClientCommonConfig()
2727
.put(ProducerConfig::MESSAGE_TIMEOUT_MS, "3000") // The delivery would fail in a short timeout
2828
.put(ProducerConfig::TRANSACTIONAL_ID, transactionId));
29+
producer.setErrorCallback(KafkaTestUtility::DumpError);
2930

3031
std::cout << "[" << Utility::getCurrentTime() << "] Producer created." << std::endl;
3132

tests/unit/TestKafkaException.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ TEST(KafkaException, Basic)
4848
}
4949
catch (const Kafka::KafkaException& e)
5050
{
51-
std::regex reMatch(R"(.*something wrong here \[7\] \(some_filename:100\))");
51+
std::regex reMatch(R"(.*Broker: Request timed out \[7\] \| something wrong here \(some_filename:100\))");
5252
EXPECT_TRUE(std::regex_match(e.what(), reMatch));
5353
}
5454
}

tests/utils/TestUtility.h

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#include <boost/algorithm/string.hpp>
1010

1111
#include <cstdlib>
12+
#include <functional>
1213
#include <list>
1314
#include <regex>
1415
#include <signal.h>
@@ -19,31 +20,21 @@
1920
do { \
2021
try { \
2122
expr; \
22-
} catch (const KafkaException& e) { \
23-
const auto& error = e.error(); \
24-
std::cout << "Exception caught: " << e.what() \
25-
<< (error.isFatal() ? (*error.isFatal() ? ", fatal" : ", non-fatal") : "") \
26-
<< (error.isRetriable() ? (*error.isRetriable() ? ", retriable" : ", non-retriable") : "") \
27-
<< std::endl; \
28-
EXPECT_EQ(err, e.error().value()); \
29-
break; \
23+
} catch (const KafkaException& e) { \
24+
std::cout << "Exception caught: " << e.what() << std::endl; \
25+
EXPECT_EQ(err, e.error().value()); \
26+
break; \
3027
} catch (...){ \
3128
} \
3229
EXPECT_FALSE(true); \
3330
} while(false)
3431

35-
#define EXPECT_KAFKA_NO_THROW(expr) \
36-
try { \
37-
expr; \
38-
} catch (...){ \
39-
EXPECT_FALSE(true); \
40-
}
41-
42-
#define EXPECT_KAFKA_NO_THROW(expr) \
43-
try { \
44-
expr; \
45-
} catch (...){ \
46-
EXPECT_FALSE(true); \
32+
#define EXPECT_KAFKA_NO_THROW(expr) \
33+
try { \
34+
expr; \
35+
} catch (const KafkaException& e) { \
36+
std::cerr << "Exception caught: " << e.what() << std::endl; \
37+
EXPECT_FALSE(true); \
4738
}
4839

4940
#define RETRY_FOR_ERROR(expr, errToRetry, maxRetries) \
@@ -69,9 +60,19 @@
6960
} \
7061
}
7162

63+
7264
namespace Kafka = KAFKA_API;
7365

7466
namespace KafkaTestUtility {
67+
68+
inline void
69+
DumpError(const Kafka::Error& error)
70+
{
71+
// https://en.wikipedia.org/wiki/ANSI_escape_code
72+
std::cerr << "\033[1;31m" << "[" << Kafka::Utility::getCurrentTime() << "] ==> Met Error: " << "\033[0m";
73+
std::cerr << "\033[4;35m" << error.toString() << "\033[0m" << std::endl;
74+
};
75+
7576
inline void
7677
PrintDividingLine(const std::string& description = "")
7778
{

0 commit comments

Comments
 (0)