Skip to content

Commit 3f675f4

Browse files
committed
Merge branch '5.3.x-stable' into 5.x.x-stable
2 parents 668a3f7 + 11fdaa9 commit 3f675f4

File tree

2 files changed

+32
-4
lines changed

2 files changed

+32
-4
lines changed

src/main/java/com/rabbitmq/client/RpcClient.java

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
import com.rabbitmq.client.impl.ValueReader;
3434
import com.rabbitmq.client.impl.ValueWriter;
3535
import com.rabbitmq.utility.BlockingCell;
36+
import org.slf4j.Logger;
37+
import org.slf4j.LoggerFactory;
3638

3739
/**
3840
* Convenience class which manages simple RPC-style communication.
@@ -41,6 +43,9 @@
4143
* and waiting for a response.
4244
*/
4345
public class RpcClient {
46+
47+
private static final Logger LOGGER = LoggerFactory.getLogger(RpcClient.class);
48+
4449
/** Channel we are communicating on */
4550
private final Channel _channel;
4651
/** Exchange to send requests to */
@@ -192,9 +197,12 @@ public void handleDelivery(String consumerTag,
192197
String replyId = properties.getCorrelationId();
193198
BlockingCell<Object> blocker =_continuationMap.remove(replyId);
194199
if (blocker == null) {
195-
throw new IllegalStateException("No outstanding request for correlation ID " + replyId);
200+
// Entry should have been removed if request timed out,
201+
// log a warning nevertheless.
202+
LOGGER.warn("No outstanding request for correlation ID {}", replyId);
203+
} else {
204+
blocker.set(new Response(consumerTag, envelope, properties, body));
196205
}
197-
blocker.set(new Response(consumerTag, envelope, properties, body));
198206
}
199207
}
200208
};
@@ -217,15 +225,23 @@ public Response doCall(AMQP.BasicProperties props, byte[] message, int timeout)
217225
throws IOException, ShutdownSignalException, TimeoutException {
218226
checkConsumer();
219227
BlockingCell<Object> k = new BlockingCell<Object>();
228+
String replyId;
220229
synchronized (_continuationMap) {
221230
_correlationId++;
222-
String replyId = "" + _correlationId;
231+
replyId = "" + _correlationId;
223232
props = ((props==null) ? new AMQP.BasicProperties.Builder() : props.builder())
224233
.correlationId(replyId).replyTo(_replyTo).build();
225234
_continuationMap.put(replyId, k);
226235
}
227236
publish(props, message);
228-
Object reply = k.uninterruptibleGet(timeout);
237+
Object reply;
238+
try {
239+
reply = k.uninterruptibleGet(timeout);
240+
} catch (TimeoutException ex) {
241+
// Avoid potential leak. This entry is no longer needed by caller.
242+
_continuationMap.remove(replyId);
243+
throw ex;
244+
}
229245
if (reply instanceof ShutdownSignalException) {
230246
ShutdownSignalException sig = (ShutdownSignalException) reply;
231247
ShutdownSignalException wrapper =

src/test/java/com/rabbitmq/client/test/RpcTest.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.io.IOException;
2525
import java.util.HashMap;
2626
import java.util.Map;
27+
import java.util.concurrent.TimeoutException;
2728

2829
import static org.junit.Assert.assertEquals;
2930

@@ -72,6 +73,17 @@ public void rpc() throws Exception {
7273
client.close();
7374
}
7475

76+
@Test public void rpcResponseTimeout() throws Exception {
77+
RpcClient client = new RpcClient(clientChannel, "", queue);
78+
try {
79+
client.doCall(null, "hello".getBytes(), 200);
80+
} catch (TimeoutException e) {
81+
// OK
82+
}
83+
assertEquals(0, client.getContinuationMap().size());
84+
client.close();
85+
}
86+
7587
private static class TestRpcServer extends RpcServer {
7688

7789
public TestRpcServer(Channel channel, String queueName) throws IOException {

0 commit comments

Comments
 (0)