Skip to content

Commit 9c3a4b7

Browse files
author
Nathan Marz
committed
make zookeeper cluster used for transactional state customizable
1 parent 6bcd3bb commit 9c3a4b7

File tree

4 files changed

+31
-9
lines changed

4 files changed

+31
-9
lines changed

conf/defaults.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ drpc.port: 3772
3232
drpc.invocations.port: 3773
3333

3434
transactional.zookeeper.root: "/transactional"
35+
transactional.zookeeper.servers: null
36+
transactional.zookeeper.port: null
3537

3638
### supervisor.* configs are for node supervisors
3739
# Define the amount of workers that can be run on this machine. Each worker is assigned a port to use for communication

src/jvm/backtype/storm/Config.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -371,6 +371,18 @@ public class Config extends HashMap<String, Object> {
371371
*/
372372
public static String TRANSACTIONAL_ZOOKEEPER_ROOT="transactional.zookeeper.root";
373373

374+
/**
375+
* The list of zookeeper servers in which to keep the transactional state. If null (which is default),
376+
* will use storm.zookeeper.servers
377+
*/
378+
public static String TRANSACTIONAL_ZOOKEEPER_SERVERS="transactional.zookeeper.servers";
379+
380+
/**
381+
* The port to use to connect to the transactional zookeeper servers. If null (which is default),
382+
* will use storm.zookeeper.port
383+
*/
384+
public static String TRANSACTIONAL_ZOOKEEPER_PORT="transactional.zookeeper.port";
385+
374386
/**
375387
* The number of threads that should be used by the zeromq context in each worker process.
376388
*/

src/jvm/backtype/storm/transactional/state/TransactionalState.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,10 @@ protected TransactionalState(Map conf, String id, Map componentConf, String subr
3535
.get(Config.TOPOLOGY_KRYO_REGISTER));
3636
}
3737
String rootDir = conf.get(Config.TRANSACTIONAL_ZOOKEEPER_ROOT) + "/" + id + "/" + subroot;
38-
CuratorFramework initter = Utils.newCurator(conf);
38+
List<String> servers = (List<String>) getWithBackup(conf, Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, Config.STORM_ZOOKEEPER_SERVERS);
39+
Object port = getWithBackup(conf, Config.TRANSACTIONAL_ZOOKEEPER_PORT, Config.STORM_ZOOKEEPER_PORT);
40+
Object sessionTimeout = conf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT);
41+
CuratorFramework initter = Utils.newCurator(servers, port, sessionTimeout);
3942
try {
4043
initter.create().creatingParentsIfNeeded().forPath(rootDir);
4144
} catch(KeeperException.NodeExistsException e) {
@@ -44,7 +47,7 @@ protected TransactionalState(Map conf, String id, Map componentConf, String subr
4447

4548
initter.close();
4649

47-
_curator = Utils.newCurator(conf, rootDir);
50+
_curator = Utils.newCurator(servers, port, sessionTimeout, rootDir);
4851
_ser = new KryoValuesSerializer(conf);
4952
_des = new KryoValuesDeserializer(conf);
5053
} catch (Exception e) {
@@ -111,4 +114,10 @@ public Object getData(String path) {
111114
public void close() {
112115
_curator.close();
113116
}
117+
118+
private Object getWithBackup(Map amap, Object primary, Object backup) {
119+
Object ret = amap.get(primary);
120+
if(ret==null) return amap.get(backup);
121+
return ret;
122+
}
114123
}

src/jvm/backtype/storm/utils/Utils.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package backtype.storm.utils;
22

3-
import backtype.storm.Config;
43
import backtype.storm.generated.ComponentCommon;
54
import backtype.storm.generated.ComponentObject;
65
import backtype.storm.generated.StormTopology;
@@ -206,15 +205,15 @@ public static long randomLong() {
206205
return UUID.randomUUID().getLeastSignificantBits();
207206
}
208207

209-
public static CuratorFramework newCurator(Map conf, String root) {
208+
public static CuratorFramework newCurator(List<String> servers, Object port, Object sessionTimeout, String root) {
210209
List<String> serverPorts = new ArrayList<String>();
211-
for(String zkServer: (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS)) {
212-
serverPorts.add(zkServer + ":" + Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_PORT)));
210+
for(String zkServer: (List<String>) servers) {
211+
serverPorts.add(zkServer + ":" + Utils.getInt(port));
213212
}
214213
String zkStr = StringUtils.join(serverPorts, ",") + root;
215214
try {
216215
CuratorFramework ret = CuratorFrameworkFactory.newClient(zkStr,
217-
Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)),
216+
Utils.getInt(sessionTimeout),
218217
15000, new RetryNTimes(5, 1000));
219218
ret.start();
220219
return ret;
@@ -223,8 +222,8 @@ public static CuratorFramework newCurator(Map conf, String root) {
223222
}
224223
}
225224

226-
public static CuratorFramework newCurator(Map conf) {
227-
return newCurator(conf, "");
225+
public static CuratorFramework newCurator(List<String> servers, Object port, Object sessionTimeout) {
226+
return newCurator(servers, port, sessionTimeout, "");
228227
}
229228

230229
/**

0 commit comments

Comments
 (0)