Skip to content

Commit c638db0

Browse files
committed
Ensure we don't overflow the backoff value.
The first attempt to fix this (213102b) did not correctly address the issue. The 32 bit signed integer frequently overflows, resulting in a bad value for Random.nextInt(). The default for storm.messaging.netty.max_retries is now 30 (instead of 100), and there is an upper limit of 30 for max_retries. I also did a whitespace cleanup.
1 parent 4e19589 commit c638db0

File tree

2 files changed

+29
-31
lines changed

2 files changed

+29
-31
lines changed

conf/defaults.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ zmq.hwm: 0
8686
storm.messaging.netty.server_worker_threads: 1
8787
storm.messaging.netty.client_worker_threads: 1
8888
storm.messaging.netty.buffer_size: 5242880 #5MB buffer
89-
storm.messaging.netty.max_retries: 100
89+
storm.messaging.netty.max_retries: 30
9090
storm.messaging.netty.max_wait_ms: 1000
9191
storm.messaging.netty.min_wait_ms: 100
9292

storm-netty/src/jvm/backtype/storm/messaging/netty/Client.java

+28-30
Original file line numberDiff line numberDiff line change
@@ -23,29 +23,29 @@
2323

2424
class Client implements IConnection {
2525
private static final Logger LOG = LoggerFactory.getLogger(Client.class);
26-
private final int max_retries;
27-
private final int base_sleep_ms;
28-
private final int max_sleep_ms;
26+
private final int max_retries;
27+
private final int base_sleep_ms;
28+
private final int max_sleep_ms;
2929
private LinkedBlockingQueue<Object> message_queue; //entry should either be TaskMessage or ControlMessage
3030
private AtomicReference<Channel> channelRef;
3131
private final ClientBootstrap bootstrap;
3232
private InetSocketAddress remote_addr;
33-
private AtomicInteger retries;
33+
private AtomicInteger retries;
3434
private final Random random = new Random();
3535
private final ChannelFactory factory;
3636
private final int buffer_size;
3737
private final AtomicBoolean being_closed;
3838

3939
@SuppressWarnings("rawtypes")
4040
Client(Map storm_conf, String host, int port) {
41-
message_queue = new LinkedBlockingQueue<Object>();
41+
message_queue = new LinkedBlockingQueue<Object>();
4242
retries = new AtomicInteger(0);
4343
channelRef = new AtomicReference<Channel>(null);
4444
being_closed = new AtomicBoolean(false);
4545

46-
// Configure
46+
// Configure
4747
buffer_size = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
48-
max_retries = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MAX_RETRIES));
48+
max_retries = Math.min(30, Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MAX_RETRIES)));
4949
base_sleep_ms = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MIN_SLEEP_MS));
5050
max_sleep_ms = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MAX_SLEEP_MS));
5151
int maxWorkers = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS));
@@ -74,9 +74,9 @@ class Client implements IConnection {
7474
void reconnect() {
7575
try {
7676
int tried_count = retries.incrementAndGet();
77-
if (tried_count < max_retries) {
77+
if (tried_count <= max_retries) {
7878
Thread.sleep(getSleepTimeMs());
79-
LOG.info("Reconnect ... [{}]", tried_count);
79+
LOG.info("Reconnect ... [{}]", tried_count);
8080
bootstrap.connect(remote_addr);
8181
LOG.debug("connection started...");
8282
} else {
@@ -85,27 +85,25 @@ void reconnect() {
8585
}
8686
} catch (InterruptedException e) {
8787
LOG.warn("connection failed", e);
88-
}
88+
}
8989
}
9090

9191
/**
9292
* # of milliseconds to wait per exponential back-off policy
9393
*/
9494
private int getSleepTimeMs()
9595
{
96-
int backoff = 1 << Math.max(1, retries.get());
96+
int backoff = 1 << retries.get();
9797
int sleepMs = base_sleep_ms * Math.max(1, random.nextInt(backoff));
9898
if ( sleepMs > max_sleep_ms )
9999
sleepMs = max_sleep_ms;
100-
if ( sleepMs < base_sleep_ms )
101-
sleepMs = base_sleep_ms;
102100
return sleepMs;
103101
}
104102

105103
/**
106-
* Enqueue a task message to be sent to server
104+
* Enqueue a task message to be sent to server
107105
*/
108-
public void send(int task, byte[] message) {
106+
public void send(int task, byte[] message) {
109107
//throw exception if the client is being closed
110108
if (being_closed.get()) {
111109
throw new RuntimeException("Client is being closed, and does not take requests any more");
@@ -128,43 +126,43 @@ MessageBatch takeMessages() throws InterruptedException {
128126
MessageBatch batch = new MessageBatch(buffer_size);
129127
Object msg = message_queue.take();
130128
batch.add(msg);
131-
129+
132130
//we will discard any message after CLOSE
133-
if (msg==ControlMessage.CLOSE_MESSAGE)
131+
if (msg==ControlMessage.CLOSE_MESSAGE)
134132
return batch;
135-
133+
136134
while (!batch.isFull()) {
137135
//peek the next message
138136
msg = message_queue.peek();
139137
//no more messages
140138
if (msg == null) break;
141-
139+
142140
//we will discard any message after CLOSE
143141
if (msg==ControlMessage.CLOSE_MESSAGE) {
144142
message_queue.take();
145143
batch.add(msg);
146144
break;
147145
}
148-
146+
149147
//try to add this msg into batch
150148
if (!batch.tryAdd((TaskMessage) msg))
151149
break;
152-
150+
153151
//remove this message
154152
message_queue.take();
155153
}
156154

157155
return batch;
158156
}
159-
157+
160158
/**
161159
* gracefully close this client.
162-
*
160+
*
163161
* We will send all existing requests, and then invoke close_n_release() method
164162
*/
165163
public synchronized void close() {
166-
if (!being_closed.get()) {
167-
//enqueue a CLOSE message so that shutdown() will be invoked
164+
if (!being_closed.get()) {
165+
//enqueue a CLOSE message so that shutdown() will be invoked
168166
try {
169167
message_queue.put(ControlMessage.CLOSE_MESSAGE);
170168
being_closed.set(true);
@@ -178,10 +176,10 @@ public synchronized void close() {
178176
* close_n_release() is invoked after all messages have been sent.
179177
*/
180178
void close_n_release() {
181-
if (channelRef.get() != null)
179+
if (channelRef.get() != null)
182180
channelRef.get().close().awaitUninterruptibly();
183181

184-
//we need to release resources
182+
//we need to release resources
185183
new Thread(new Runnable() {
186184
@Override
187185
public void run() {
@@ -194,10 +192,10 @@ public TaskMessage recv(int flags) {
194192
}
195193

196194
void setChannel(Channel channel) {
197-
channelRef.set(channel);
198-
//reset retries
195+
channelRef.set(channel);
196+
//reset retries
199197
if (channel != null)
200-
retries.set(0);
198+
retries.set(0);
201199
}
202200

203201
}

0 commit comments

Comments
 (0)