23
23
24
24
class Client implements IConnection {
25
25
private static final Logger LOG = LoggerFactory .getLogger (Client .class );
26
- private final int max_retries ;
27
- private final int base_sleep_ms ;
28
- private final int max_sleep_ms ;
26
+ private final int max_retries ;
27
+ private final int base_sleep_ms ;
28
+ private final int max_sleep_ms ;
29
29
private LinkedBlockingQueue <Object > message_queue ; //entry should either be TaskMessage or ControlMessage
30
30
private AtomicReference <Channel > channelRef ;
31
31
private final ClientBootstrap bootstrap ;
32
32
private InetSocketAddress remote_addr ;
33
- private AtomicInteger retries ;
33
+ private AtomicInteger retries ;
34
34
private final Random random = new Random ();
35
35
private final ChannelFactory factory ;
36
36
private final int buffer_size ;
37
37
private final AtomicBoolean being_closed ;
38
38
39
39
@ SuppressWarnings ("rawtypes" )
40
40
Client (Map storm_conf , String host , int port ) {
41
- message_queue = new LinkedBlockingQueue <Object >();
41
+ message_queue = new LinkedBlockingQueue <Object >();
42
42
retries = new AtomicInteger (0 );
43
43
channelRef = new AtomicReference <Channel >(null );
44
44
being_closed = new AtomicBoolean (false );
45
45
46
- // Configure
46
+ // Configure
47
47
buffer_size = Utils .getInt (storm_conf .get (Config .STORM_MESSAGING_NETTY_BUFFER_SIZE ));
48
- max_retries = Utils .getInt (storm_conf .get (Config .STORM_MESSAGING_NETTY_MAX_RETRIES ));
48
+ max_retries = Math . min ( 30 , Utils .getInt (storm_conf .get (Config .STORM_MESSAGING_NETTY_MAX_RETRIES ) ));
49
49
base_sleep_ms = Utils .getInt (storm_conf .get (Config .STORM_MESSAGING_NETTY_MIN_SLEEP_MS ));
50
50
max_sleep_ms = Utils .getInt (storm_conf .get (Config .STORM_MESSAGING_NETTY_MAX_SLEEP_MS ));
51
51
int maxWorkers = Utils .getInt (storm_conf .get (Config .STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS ));
@@ -74,9 +74,9 @@ class Client implements IConnection {
74
74
void reconnect () {
75
75
try {
76
76
int tried_count = retries .incrementAndGet ();
77
- if (tried_count < max_retries ) {
77
+ if (tried_count <= max_retries ) {
78
78
Thread .sleep (getSleepTimeMs ());
79
- LOG .info ("Reconnect ... [{}]" , tried_count );
79
+ LOG .info ("Reconnect ... [{}]" , tried_count );
80
80
bootstrap .connect (remote_addr );
81
81
LOG .debug ("connection started..." );
82
82
} else {
@@ -85,27 +85,25 @@ void reconnect() {
85
85
}
86
86
} catch (InterruptedException e ) {
87
87
LOG .warn ("connection failed" , e );
88
- }
88
+ }
89
89
}
90
90
91
91
/**
92
92
* # of milliseconds to wait per exponential back-off policy
93
93
*/
94
94
private int getSleepTimeMs ()
95
95
{
96
- int backoff = 1 << Math . max ( 1 , retries .get () );
96
+ int backoff = 1 << retries .get ();
97
97
int sleepMs = base_sleep_ms * Math .max (1 , random .nextInt (backoff ));
98
98
if ( sleepMs > max_sleep_ms )
99
99
sleepMs = max_sleep_ms ;
100
- if ( sleepMs < base_sleep_ms )
101
- sleepMs = base_sleep_ms ;
102
100
return sleepMs ;
103
101
}
104
102
105
103
/**
106
- * Enqueue a task message to be sent to server
104
+ * Enqueue a task message to be sent to server
107
105
*/
108
- public void send (int task , byte [] message ) {
106
+ public void send (int task , byte [] message ) {
109
107
//throw exception if the client is being closed
110
108
if (being_closed .get ()) {
111
109
throw new RuntimeException ("Client is being closed, and does not take requests any more" );
@@ -128,43 +126,43 @@ MessageBatch takeMessages() throws InterruptedException {
128
126
MessageBatch batch = new MessageBatch (buffer_size );
129
127
Object msg = message_queue .take ();
130
128
batch .add (msg );
131
-
129
+
132
130
//we will discard any message after CLOSE
133
- if (msg ==ControlMessage .CLOSE_MESSAGE )
131
+ if (msg ==ControlMessage .CLOSE_MESSAGE )
134
132
return batch ;
135
-
133
+
136
134
while (!batch .isFull ()) {
137
135
//peek the next message
138
136
msg = message_queue .peek ();
139
137
//no more messages
140
138
if (msg == null ) break ;
141
-
139
+
142
140
//we will discard any message after CLOSE
143
141
if (msg ==ControlMessage .CLOSE_MESSAGE ) {
144
142
message_queue .take ();
145
143
batch .add (msg );
146
144
break ;
147
145
}
148
-
146
+
149
147
//try to add this msg into batch
150
148
if (!batch .tryAdd ((TaskMessage ) msg ))
151
149
break ;
152
-
150
+
153
151
//remove this message
154
152
message_queue .take ();
155
153
}
156
154
157
155
return batch ;
158
156
}
159
-
157
+
160
158
/**
161
159
* gracefully close this client.
162
- *
160
+ *
163
161
* We will send all existing requests, and then invoke close_n_release() method
164
162
*/
165
163
public synchronized void close () {
166
- if (!being_closed .get ()) {
167
- //enqueue a CLOSE message so that shutdown() will be invoked
164
+ if (!being_closed .get ()) {
165
+ //enqueue a CLOSE message so that shutdown() will be invoked
168
166
try {
169
167
message_queue .put (ControlMessage .CLOSE_MESSAGE );
170
168
being_closed .set (true );
@@ -178,10 +176,10 @@ public synchronized void close() {
178
176
* close_n_release() is invoked after all messages have been sent.
179
177
*/
180
178
void close_n_release () {
181
- if (channelRef .get () != null )
179
+ if (channelRef .get () != null )
182
180
channelRef .get ().close ().awaitUninterruptibly ();
183
181
184
- //we need to release resources
182
+ //we need to release resources
185
183
new Thread (new Runnable () {
186
184
@ Override
187
185
public void run () {
@@ -194,10 +192,10 @@ public TaskMessage recv(int flags) {
194
192
}
195
193
196
194
void setChannel (Channel channel ) {
197
- channelRef .set (channel );
198
- //reset retries
195
+ channelRef .set (channel );
196
+ //reset retries
199
197
if (channel != null )
200
- retries .set (0 );
198
+ retries .set (0 );
201
199
}
202
200
203
201
}
0 commit comments