Skip to content

Commit 01a0eb0

Browse files
Jaskeylizhanhui
authored andcommitted
Fix possible NullPointerException when retry in send Async way
1 parent 15af63e commit 01a0eb0

File tree

1 file changed

+14
-8
lines changed

1 file changed

+14
-8
lines changed

client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -438,34 +438,40 @@ private void onExceptionImpl(final String brokerName, //
438438
) {
439439
int tmp = curTimes.incrementAndGet();
440440
if (needRetry && tmp <= timesTotal) {
441-
MessageQueue tmpmq = producer.selectOneMessageQueue(topicPublishInfo, brokerName);
442-
String addr = instance.findBrokerAddressInPublish(tmpmq.getBrokerName());
441+
String retryBrokerName = brokerName;//by default, it will send to the same broker
442+
if (topicPublishInfo != null) { //select one message queue accordingly, in order to determine which broker to send
443+
MessageQueue mqChosen = producer.selectOneMessageQueue(topicPublishInfo, brokerName);
444+
retryBrokerName = mqChosen.getBrokerName();
445+
}
446+
String addr = instance.findBrokerAddressInPublish(retryBrokerName);
443447
log.info("async send msg by retry {} times. topic={}, brokerAddr={}, brokerName={}", tmp, msg.getTopic(), addr,
444-
tmpmq.getBrokerName());
448+
retryBrokerName);
445449
try {
446450
request.setOpaque(RemotingCommand.createNewRequestId());
447-
sendMessageAsync(addr, tmpmq.getBrokerName(), msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance,
451+
sendMessageAsync(addr, retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance,
448452
timesTotal, curTimes, context, producer);
449453
} catch (InterruptedException e1) {
450-
onExceptionImpl(tmpmq.getBrokerName(), msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
454+
onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
451455
context, false, producer);
452456
} catch (RemotingConnectException e1) {
453457
producer.updateFaultItem(brokerName, 3000, true);
454-
onExceptionImpl(tmpmq.getBrokerName(), msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
458+
onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
455459
context, true, producer);
456460
} catch (RemotingTooMuchRequestException e1) {
457-
onExceptionImpl(tmpmq.getBrokerName(), msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
461+
onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
458462
context, false, producer);
459463
} catch (RemotingException e1) {
460464
producer.updateFaultItem(brokerName, 3000, true);
461-
onExceptionImpl(tmpmq.getBrokerName(), msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
465+
onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
462466
context, true, producer);
463467
}
464468
} else {
469+
465470
if (context != null) {
466471
context.setException(e);
467472
context.getProducer().executeSendMessageHookAfter(context);
468473
}
474+
469475
try {
470476
sendCallback.onException(e);
471477
} catch (Exception ignored) {

0 commit comments

Comments
 (0)