@@ -438,34 +438,40 @@ private void onExceptionImpl(final String brokerName, //
438
438
) {
439
439
int tmp = curTimes .incrementAndGet ();
440
440
if (needRetry && tmp <= timesTotal ) {
441
- MessageQueue tmpmq = producer .selectOneMessageQueue (topicPublishInfo , brokerName );
442
- String addr = instance .findBrokerAddressInPublish (tmpmq .getBrokerName ());
441
+ String retryBrokerName = brokerName ;//by default, it will send to the same broker
442
+ if (topicPublishInfo != null ) { //select one message queue accordingly, in order to determine which broker to send
443
+ MessageQueue mqChosen = producer .selectOneMessageQueue (topicPublishInfo , brokerName );
444
+ retryBrokerName = mqChosen .getBrokerName ();
445
+ }
446
+ String addr = instance .findBrokerAddressInPublish (retryBrokerName );
443
447
log .info ("async send msg by retry {} times. topic={}, brokerAddr={}, brokerName={}" , tmp , msg .getTopic (), addr ,
444
- tmpmq . getBrokerName () );
448
+ retryBrokerName );
445
449
try {
446
450
request .setOpaque (RemotingCommand .createNewRequestId ());
447
- sendMessageAsync (addr , tmpmq . getBrokerName () , msg , timeoutMillis , request , sendCallback , topicPublishInfo , instance ,
451
+ sendMessageAsync (addr , retryBrokerName , msg , timeoutMillis , request , sendCallback , topicPublishInfo , instance ,
448
452
timesTotal , curTimes , context , producer );
449
453
} catch (InterruptedException e1 ) {
450
- onExceptionImpl (tmpmq . getBrokerName () , msg , timeoutMillis , request , sendCallback , topicPublishInfo , instance , timesTotal , curTimes , e1 ,
454
+ onExceptionImpl (retryBrokerName , msg , timeoutMillis , request , sendCallback , topicPublishInfo , instance , timesTotal , curTimes , e1 ,
451
455
context , false , producer );
452
456
} catch (RemotingConnectException e1 ) {
453
457
producer .updateFaultItem (brokerName , 3000 , true );
454
- onExceptionImpl (tmpmq . getBrokerName () , msg , timeoutMillis , request , sendCallback , topicPublishInfo , instance , timesTotal , curTimes , e1 ,
458
+ onExceptionImpl (retryBrokerName , msg , timeoutMillis , request , sendCallback , topicPublishInfo , instance , timesTotal , curTimes , e1 ,
455
459
context , true , producer );
456
460
} catch (RemotingTooMuchRequestException e1 ) {
457
- onExceptionImpl (tmpmq . getBrokerName () , msg , timeoutMillis , request , sendCallback , topicPublishInfo , instance , timesTotal , curTimes , e1 ,
461
+ onExceptionImpl (retryBrokerName , msg , timeoutMillis , request , sendCallback , topicPublishInfo , instance , timesTotal , curTimes , e1 ,
458
462
context , false , producer );
459
463
} catch (RemotingException e1 ) {
460
464
producer .updateFaultItem (brokerName , 3000 , true );
461
- onExceptionImpl (tmpmq . getBrokerName () , msg , timeoutMillis , request , sendCallback , topicPublishInfo , instance , timesTotal , curTimes , e1 ,
465
+ onExceptionImpl (retryBrokerName , msg , timeoutMillis , request , sendCallback , topicPublishInfo , instance , timesTotal , curTimes , e1 ,
462
466
context , true , producer );
463
467
}
464
468
} else {
469
+
465
470
if (context != null ) {
466
471
context .setException (e );
467
472
context .getProducer ().executeSendMessageHookAfter (context );
468
473
}
474
+
469
475
try {
470
476
sendCallback .onException (e );
471
477
} catch (Exception ignored ) {
0 commit comments