Skip to content

Commit d19e818

Browse files
author
Nathan Marz
committed
Merge remote-tracking branch 'martin/exec-not-fork'
2 parents 360f11f + 0dcfade commit d19e818

File tree

1 file changed

+53
-16
lines changed

1 file changed

+53
-16
lines changed

bin/storm

Lines changed: 53 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import sys
55
import random
66
import subprocess as sub
77
import getopt
8+
import re
89

910
def identity(x):
1011
return x
@@ -61,6 +62,7 @@ def confvalue(name, extrapaths):
6162
tokens = line.split(" ")
6263
if tokens[0] == "VALUE:":
6364
return " ".join(tokens[1:])
65+
return ""
6466

6567
def print_localconfvalue(name):
6668
"""Syntax: [storm localconfvalue conf-name]
@@ -82,12 +84,37 @@ def print_remoteconfvalue(name):
8284
"""
8385
print name + ": " + confvalue(name, [STORM_DIR + "/conf"])
8486

85-
def exec_storm_class(klass, jvmtype="-server", childopts="", extrajars=[], args=[]):
86-
nativepath = confvalue("java.library.path", extrajars)
87-
args_str = " ".join(map(lambda s: "\"" + s + "\"", args))
88-
command = "java " + jvmtype + " -Dstorm.home=" + STORM_DIR + " " + get_config_opts() + " -Djava.library.path=" + nativepath + " " + childopts + " -cp " + get_classpath(extrajars) + " " + klass + " " + args_str
89-
print "Running: " + command
90-
os.system(command)
87+
def parse_args(string):
88+
r"""Takes a string of whitespace-separated tokens and parses it into a list.
89+
Whitespace inside tokens may be quoted with single quotes, double quotes or
90+
backslash (similar to command-line arguments in bash).
91+
92+
>>> parse_args(r'''"a a" 'b b' c\ c "d'd" 'e"e' 'f\'f' "g\"g" "i""i" 'j''j' k" "k l' l' mm n\\n''')
93+
['a a', 'b b', 'c c', "d'd", 'e"e', "f'f", 'g"g', 'ii', 'jj', 'k k', 'l l', 'mm', r'n\n']
94+
"""
95+
re_split = re.compile(r'''((?:
96+
[^\s"'\\] |
97+
"(?: [^"\\] | \\.)*" |
98+
'(?: [^'\\] | \\.)*' |
99+
\\.
100+
)+)''', re.VERBOSE)
101+
args = re_split.split(string)[1::2]
102+
args = [re.compile(r'"((?:[^"\\]|\\.)*)"').sub('\\1', x) for x in args]
103+
args = [re.compile(r"'((?:[^'\\]|\\.)*)'").sub('\\1', x) for x in args]
104+
return [re.compile(r'\\(.)').sub('\\1', x) for x in args]
105+
106+
def exec_storm_class(klass, jvmtype="-server", jvmopts=[], extrajars=[], args=[], fork=False):
107+
all_args = [
108+
"java", jvmtype, get_config_opts(),
109+
"-Dstorm.home=" + STORM_DIR,
110+
"-Djava.library.path=" + confvalue("java.library.path", extrajars),
111+
"-cp", get_classpath(extrajars),
112+
] + jvmopts + [klass] + list(args)
113+
print "Running: " + " ".join(all_args)
114+
if fork:
115+
os.spawnvp(os.P_WAIT, "java", all_args)
116+
else:
117+
os.execvp("java", all_args) # replaces the current process and never returns
91118

92119
def jar(jarfile, klass, *args):
93120
"""Syntax: [storm jar topology-jar-path class ...]
@@ -103,7 +130,7 @@ def jar(jarfile, klass, *args):
103130
jvmtype="-client",
104131
extrajars=[jarfile, CONF_DIR, STORM_DIR + "/bin"],
105132
args=args,
106-
childopts="-Dstorm.jar=" + jarfile)
133+
jvmopts=["-Dstorm.jar=" + jarfile])
107134

108135
def kill(*args):
109136
"""Syntax: [storm kill topology-name [-w wait-time-secs]]
@@ -190,7 +217,8 @@ def shell(resourcesdir, command, *args):
190217
"backtype.storm.command.shell_submission",
191218
args=runnerargs,
192219
jvmtype="-client",
193-
extrajars=[CONF_DIR])
220+
extrajars=[CONF_DIR],
221+
fork=True)
194222
os.system("rm " + tmpjarpath)
195223

196224
def repl():
@@ -212,12 +240,15 @@ def nimbus(klass="backtype.storm.daemon.nimbus"):
212240
(https://github.com/nathanmarz/storm/wiki/Setting-up-a-Storm-cluster)
213241
"""
214242
cppaths = [STORM_DIR + "/log4j", STORM_DIR + "/conf"]
215-
childopts = confvalue("nimbus.childopts", cppaths) + " -Dlogfile.name=nimbus.log -Dlog4j.configuration=storm.log.properties"
243+
jvmopts = parse_args(confvalue("nimbus.childopts", cppaths)) + [
244+
"-Dlogfile.name=nimbus.log",
245+
"-Dlog4j.configuration=storm.log.properties",
246+
]
216247
exec_storm_class(
217248
klass,
218249
jvmtype="-server",
219250
extrajars=cppaths,
220-
childopts=childopts)
251+
jvmopts=jvmopts)
221252

222253
def supervisor(klass="backtype.storm.daemon.supervisor"):
223254
"""Syntax: [storm supervisor]
@@ -229,12 +260,15 @@ def supervisor(klass="backtype.storm.daemon.supervisor"):
229260
(https://github.com/nathanmarz/storm/wiki/Setting-up-a-Storm-cluster)
230261
"""
231262
cppaths = [STORM_DIR + "/log4j", STORM_DIR + "/conf"]
232-
childopts = confvalue("supervisor.childopts", cppaths) + " -Dlogfile.name=supervisor.log -Dlog4j.configuration=storm.log.properties"
263+
jvmopts = parse_args(confvalue("supervisor.childopts", cppaths)) + [
264+
"-Dlogfile.name=supervisor.log",
265+
"-Dlog4j.configuration=storm.log.properties",
266+
]
233267
exec_storm_class(
234268
klass,
235269
jvmtype="-server",
236270
extrajars=cppaths,
237-
childopts=childopts)
271+
jvmopts=jvmopts)
238272

239273
def ui():
240274
"""Syntax: [storm ui]
@@ -247,11 +281,14 @@ def ui():
247281
(https://github.com/nathanmarz/storm/wiki/Setting-up-a-Storm-cluster)
248282
"""
249283
cppaths = [STORM_DIR + "/log4j", STORM_DIR + "/conf"]
250-
childopts = confvalue("ui.childopts", cppaths) + " -Dlogfile.name=ui.log -Dlog4j.configuration=storm.log.properties"
284+
jvmopts = parse_args(confvalue("ui.childopts", cppaths)) + [
285+
"-Dlogfile.name=ui.log",
286+
"-Dlog4j.configuration=storm.log.properties",
287+
]
251288
exec_storm_class(
252289
"backtype.storm.ui.core",
253290
jvmtype="-server",
254-
childopts=childopts,
291+
jvmopts=jvmopts,
255292
extrajars=[STORM_DIR + "/log4j", STORM_DIR, STORM_DIR + "/conf"])
256293

257294
def drpc():
@@ -263,11 +300,11 @@ def drpc():
263300
See Distributed RPC for more information.
264301
(https://github.com/nathanmarz/storm/wiki/Distributed-RPC)
265302
"""
266-
childopts = "-Xmx768m -Dlogfile.name=drpc.log -Dlog4j.configuration=storm.log.properties"
303+
jvmopts = ["-Xmx768m", "-Dlogfile.name=drpc.log", "-Dlog4j.configuration=storm.log.properties"]
267304
exec_storm_class(
268305
"backtype.storm.daemon.drpc",
269306
jvmtype="-server",
270-
childopts=childopts,
307+
jvmopts=jvmopts,
271308
extrajars=[STORM_DIR + "/log4j", STORM_DIR + "/conf"])
272309

273310
def dev_zookeeper():

0 commit comments

Comments
 (0)