33
33
import com .rabbitmq .client .impl .ValueReader ;
34
34
import com .rabbitmq .client .impl .ValueWriter ;
35
35
import com .rabbitmq .utility .BlockingCell ;
36
+ import org .slf4j .Logger ;
37
+ import org .slf4j .LoggerFactory ;
36
38
37
39
/**
38
40
* Convenience class which manages simple RPC-style communication.
41
43
* and waiting for a response.
42
44
*/
43
45
public class RpcClient {
46
+
47
+ private static final Logger LOGGER = LoggerFactory .getLogger (RpcClient .class );
48
+
44
49
/** Channel we are communicating on */
45
50
private final Channel _channel ;
46
51
/** Exchange to send requests to */
@@ -192,9 +197,12 @@ public void handleDelivery(String consumerTag,
192
197
String replyId = properties .getCorrelationId ();
193
198
BlockingCell <Object > blocker =_continuationMap .remove (replyId );
194
199
if (blocker == null ) {
195
- throw new IllegalStateException ("No outstanding request for correlation ID " + replyId );
200
+ // Entry should have been removed if request timed out,
201
+ // log a warning nevertheless.
202
+ LOGGER .warn ("No outstanding request for correlation ID {}" , replyId );
203
+ } else {
204
+ blocker .set (new Response (consumerTag , envelope , properties , body ));
196
205
}
197
- blocker .set (new Response (consumerTag , envelope , properties , body ));
198
206
}
199
207
}
200
208
};
@@ -217,15 +225,23 @@ public Response doCall(AMQP.BasicProperties props, byte[] message, int timeout)
217
225
throws IOException , ShutdownSignalException , TimeoutException {
218
226
checkConsumer ();
219
227
BlockingCell <Object > k = new BlockingCell <Object >();
228
+ String replyId ;
220
229
synchronized (_continuationMap ) {
221
230
_correlationId ++;
222
- String replyId = "" + _correlationId ;
231
+ replyId = "" + _correlationId ;
223
232
props = ((props ==null ) ? new AMQP .BasicProperties .Builder () : props .builder ())
224
233
.correlationId (replyId ).replyTo (_replyTo ).build ();
225
234
_continuationMap .put (replyId , k );
226
235
}
227
236
publish (props , message );
228
- Object reply = k .uninterruptibleGet (timeout );
237
+ Object reply ;
238
+ try {
239
+ reply = k .uninterruptibleGet (timeout );
240
+ } catch (TimeoutException ex ) {
241
+ // Avoid potential leak. This entry is no longer needed by caller.
242
+ _continuationMap .remove (replyId );
243
+ throw ex ;
244
+ }
229
245
if (reply instanceof ShutdownSignalException ) {
230
246
ShutdownSignalException sig = (ShutdownSignalException ) reply ;
231
247
ShutdownSignalException wrapper =
0 commit comments