Skip to content

Commit 2892dd8

Browse files
author
Nathan Marz
committed
disable timeouts by default to prevent it from affecting tests
1 parent 348d7f3 commit 2892dd8

File tree

7 files changed

+48
-30
lines changed

7 files changed

+48
-30
lines changed

conf/defaults.yaml

+1
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ zmq.threads: 1
7070
zmq.linger.millis: 5000
7171

7272
### topology.* configs are for specific executing storms
73+
topology.enable.message.timeouts: true
7374
topology.debug: false
7475
topology.optimize: true
7576
topology.workers: 1

src/clj/backtype/storm/daemon/executor.clj

+1-1
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@
203203
receive-queue (:receive-queue executor-data)
204204
context (:worker-context executor-data)]
205205
(when tick-time-secs
206-
(if-not (pos? tick-time-secs)
206+
(if-not (storm-conf TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS)
207207
(log-message "Timeouts disabled for executor " (:executor-id executor-data))
208208
(schedule-recurring
209209
(:timer worker)

src/clj/backtype/storm/testing.clj

+1-1
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@
101101
daemon-conf (merge (read-storm-config)
102102
{TOPOLOGY-SKIP-MISSING-KRYO-REGISTRATIONS true
103103
ZMQ-LINGER-MILLIS 0
104-
TOPOLOGY-MESSAGE-TIMEOUT-SECS -1
104+
TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS false
105105
}
106106
daemon-conf
107107
{STORM-CLUSTER-MODE "local"

src/jvm/backtype/storm/Config.java

+7
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,13 @@ public class Config extends HashMap<String, Object> {
282282
public static String TASK_REFRESH_POLL_SECS = "task.refresh.poll.secs";
283283

284284

285+
286+
/**
287+
* True if Storm should timeout messages or not. Defaults to true. This is meant to be used
288+
* in unit tests to prevent tuples from being accidentally timed out during the test.
289+
*/
290+
public static String TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS = "topology.enable.message.timeouts";
291+
285292
/**
286293
* When set to true, Storm will log every message that's emitted.
287294
*/

src/jvm/backtype/storm/coordination/CoordinatedBolt.java

+1
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,7 @@ public void execute(Tuple tuple) {
311311

312312
public void cleanup() {
313313
_delegate.cleanup();
314+
_tracked.cleanup();
314315
}
315316

316317
public void declareOutputFields(OutputFieldsDeclarer declarer) {

src/jvm/backtype/storm/utils/TimeCacheMap.java

+36-27
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,9 @@ public static interface ExpiredCallback<K, V> {
2626

2727
private LinkedList<HashMap<K, V>> _buckets;
2828

29+
private final Object _lock = new Object();
2930
private Thread _cleaner;
3031
private ExpiredCallback _callback;
31-
private final Object _lock = new Object();
3232

3333
public TimeCacheMap(int expirationSecs, int numBuckets, ExpiredCallback<K, V> callback) {
3434
if(numBuckets<2) {
@@ -39,6 +39,7 @@ public TimeCacheMap(int expirationSecs, int numBuckets, ExpiredCallback<K, V> ca
3939
_buckets.add(new HashMap<K, V>());
4040
}
4141

42+
4243
_callback = callback;
4344
final long expirationMillis = expirationSecs * 1000L;
4445
final long sleepTime = expirationMillis / (numBuckets-1);
@@ -81,53 +82,61 @@ public TimeCacheMap(int expirationSecs, int numBuckets) {
8182

8283

8384
public boolean containsKey(K key) {
84-
for(HashMap<K, V> bucket: _buckets) {
85-
if(bucket.containsKey(key)) {
86-
return true;
85+
synchronized(_lock) {
86+
for(HashMap<K, V> bucket: _buckets) {
87+
if(bucket.containsKey(key)) {
88+
return true;
89+
}
8790
}
91+
return false;
8892
}
89-
return false;
9093
}
9194

9295
public V get(K key) {
93-
for(HashMap<K, V> bucket: _buckets) {
94-
if(bucket.containsKey(key)) {
95-
return bucket.get(key);
96+
synchronized(_lock) {
97+
for(HashMap<K, V> bucket: _buckets) {
98+
if(bucket.containsKey(key)) {
99+
return bucket.get(key);
100+
}
96101
}
102+
return null;
97103
}
98-
return null;
99104
}
100105

101106
public void put(K key, V value) {
102-
Iterator<HashMap<K, V>> it = _buckets.iterator();
103-
HashMap<K, V> bucket = it.next();
104-
bucket.put(key, value);
105-
while(it.hasNext()) {
106-
bucket = it.next();
107-
bucket.remove(key);
107+
synchronized(_lock) {
108+
Iterator<HashMap<K, V>> it = _buckets.iterator();
109+
HashMap<K, V> bucket = it.next();
110+
bucket.put(key, value);
111+
while(it.hasNext()) {
112+
bucket = it.next();
113+
bucket.remove(key);
114+
}
108115
}
109116
}
110117

111-
112118
public Object remove(K key) {
113-
for(HashMap<K, V> bucket: _buckets) {
114-
if(bucket.containsKey(key)) {
115-
return bucket.remove(key);
119+
synchronized(_lock) {
120+
for(HashMap<K, V> bucket: _buckets) {
121+
if(bucket.containsKey(key)) {
122+
return bucket.remove(key);
123+
}
116124
}
125+
return null;
117126
}
118-
return null;
119127
}
120128

121129
public int size() {
122-
int size = 0;
123-
for(HashMap<K, V> bucket: _buckets) {
124-
size+=bucket.size();
130+
synchronized(_lock) {
131+
int size = 0;
132+
for(HashMap<K, V> bucket: _buckets) {
133+
size+=bucket.size();
134+
}
135+
return size;
125136
}
126-
return size;
127137
}
128-
138+
129139
public void cleanup() {
130140
_cleaner.interrupt();
131-
}
132-
141+
}
133142
}

test/clj/backtype/storm/integration_test.clj

+1-1
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@
5555
(assert-loop #(.isFailed tracker %) ids))
5656

5757
(deftest test-timeout
58-
(with-simulated-time-local-cluster [cluster]
58+
(with-simulated-time-local-cluster [cluster :daemon-conf {TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true}]
5959
(let [feeder (feeder-spout ["field1"])
6060
tracker (AckFailMapTracker.)
6161
_ (.setAckFailDelegate feeder tracker)

0 commit comments

Comments
 (0)