@@ -5,6 +5,7 @@ import sys
5
5
import random
6
6
import subprocess as sub
7
7
import getopt
8
+ import re
8
9
9
10
def identity (x ):
10
11
return x
@@ -61,6 +62,7 @@ def confvalue(name, extrapaths):
61
62
tokens = line .split (" " )
62
63
if tokens [0 ] == "VALUE:" :
63
64
return " " .join (tokens [1 :])
65
+ return ""
64
66
65
67
def print_localconfvalue (name ):
66
68
"""Syntax: [storm localconfvalue conf-name]
@@ -82,12 +84,37 @@ def print_remoteconfvalue(name):
82
84
"""
83
85
print name + ": " + confvalue (name , [STORM_DIR + "/conf" ])
84
86
85
- def exec_storm_class (klass , jvmtype = "-server" , childopts = "" , extrajars = [], args = []):
86
- nativepath = confvalue ("java.library.path" , extrajars )
87
- args_str = " " .join (map (lambda s : "\" " + s + "\" " , args ))
88
- command = "java " + jvmtype + " -Dstorm.home=" + STORM_DIR + " " + get_config_opts () + " -Djava.library.path=" + nativepath + " " + childopts + " -cp " + get_classpath (extrajars ) + " " + klass + " " + args_str
89
- print "Running: " + command
90
- os .system (command )
87
+ def parse_args (string ):
88
+ r"""Takes a string of whitespace-separated tokens and parses it into a list.
89
+ Whitespace inside tokens may be quoted with single quotes, double quotes or
90
+ backslash (similar to command-line arguments in bash).
91
+
92
+ >>> parse_args(r'''"a a" 'b b' c\ c "d'd" 'e"e' 'f\'f' "g\"g" "i""i" 'j''j' k" "k l' l' mm n\\n''')
93
+ ['a a', 'b b', 'c c', "d'd", 'e"e', "f'f", 'g"g', 'ii', 'jj', 'k k', 'l l', 'mm', r'n\n']
94
+ """
95
+ re_split = re .compile (r'''((?:
96
+ [^\s"'\\] |
97
+ "(?: [^"\\] | \\.)*" |
98
+ '(?: [^'\\] | \\.)*' |
99
+ \\.
100
+ )+)''' , re .VERBOSE )
101
+ args = re_split .split (string )[1 ::2 ]
102
+ args = [re .compile (r'"((?:[^"\\]|\\.)*)"' ).sub ('\\ 1' , x ) for x in args ]
103
+ args = [re .compile (r"'((?:[^'\\]|\\.)*)'" ).sub ('\\ 1' , x ) for x in args ]
104
+ return [re .compile (r'\\(.)' ).sub ('\\ 1' , x ) for x in args ]
105
+
106
+ def exec_storm_class (klass , jvmtype = "-server" , jvmopts = [], extrajars = [], args = [], fork = False ):
107
+ all_args = [
108
+ "java" , jvmtype , get_config_opts (),
109
+ "-Dstorm.home=" + STORM_DIR ,
110
+ "-Djava.library.path=" + confvalue ("java.library.path" , extrajars ),
111
+ "-cp" , get_classpath (extrajars ),
112
+ ] + jvmopts + [klass ] + list (args )
113
+ print "Running: " + " " .join (all_args )
114
+ if fork :
115
+ os .spawnvp (os .P_WAIT , "java" , all_args )
116
+ else :
117
+ os .execvp ("java" , all_args ) # replaces the current process and never returns
91
118
92
119
def jar (jarfile , klass , * args ):
93
120
"""Syntax: [storm jar topology-jar-path class ...]
@@ -103,7 +130,7 @@ def jar(jarfile, klass, *args):
103
130
jvmtype = "-client" ,
104
131
extrajars = [jarfile , CONF_DIR , STORM_DIR + "/bin" ],
105
132
args = args ,
106
- childopts = "-Dstorm.jar=" + jarfile )
133
+ jvmopts = [ "-Dstorm.jar=" + jarfile ] )
107
134
108
135
def kill (* args ):
109
136
"""Syntax: [storm kill topology-name [-w wait-time-secs]]
@@ -190,7 +217,8 @@ def shell(resourcesdir, command, *args):
190
217
"backtype.storm.command.shell_submission" ,
191
218
args = runnerargs ,
192
219
jvmtype = "-client" ,
193
- extrajars = [CONF_DIR ])
220
+ extrajars = [CONF_DIR ],
221
+ fork = True )
194
222
os .system ("rm " + tmpjarpath )
195
223
196
224
def repl ():
@@ -212,12 +240,15 @@ def nimbus(klass="backtype.storm.daemon.nimbus"):
212
240
(https://github.com/nathanmarz/storm/wiki/Setting-up-a-Storm-cluster)
213
241
"""
214
242
cppaths = [STORM_DIR + "/log4j" , STORM_DIR + "/conf" ]
215
- childopts = confvalue ("nimbus.childopts" , cppaths ) + " -Dlogfile.name=nimbus.log -Dlog4j.configuration=storm.log.properties"
243
+ jvmopts = parse_args (confvalue ("nimbus.childopts" , cppaths )) + [
244
+ "-Dlogfile.name=nimbus.log" ,
245
+ "-Dlog4j.configuration=storm.log.properties" ,
246
+ ]
216
247
exec_storm_class (
217
248
klass ,
218
249
jvmtype = "-server" ,
219
250
extrajars = cppaths ,
220
- childopts = childopts )
251
+ jvmopts = jvmopts )
221
252
222
253
def supervisor (klass = "backtype.storm.daemon.supervisor" ):
223
254
"""Syntax: [storm supervisor]
@@ -229,12 +260,15 @@ def supervisor(klass="backtype.storm.daemon.supervisor"):
229
260
(https://github.com/nathanmarz/storm/wiki/Setting-up-a-Storm-cluster)
230
261
"""
231
262
cppaths = [STORM_DIR + "/log4j" , STORM_DIR + "/conf" ]
232
- childopts = confvalue ("supervisor.childopts" , cppaths ) + " -Dlogfile.name=supervisor.log -Dlog4j.configuration=storm.log.properties"
263
+ jvmopts = parse_args (confvalue ("supervisor.childopts" , cppaths )) + [
264
+ "-Dlogfile.name=supervisor.log" ,
265
+ "-Dlog4j.configuration=storm.log.properties" ,
266
+ ]
233
267
exec_storm_class (
234
268
klass ,
235
269
jvmtype = "-server" ,
236
270
extrajars = cppaths ,
237
- childopts = childopts )
271
+ jvmopts = jvmopts )
238
272
239
273
def ui ():
240
274
"""Syntax: [storm ui]
@@ -247,11 +281,14 @@ def ui():
247
281
(https://github.com/nathanmarz/storm/wiki/Setting-up-a-Storm-cluster)
248
282
"""
249
283
cppaths = [STORM_DIR + "/log4j" , STORM_DIR + "/conf" ]
250
- childopts = confvalue ("ui.childopts" , cppaths ) + " -Dlogfile.name=ui.log -Dlog4j.configuration=storm.log.properties"
284
+ jvmopts = parse_args (confvalue ("ui.childopts" , cppaths )) + [
285
+ "-Dlogfile.name=ui.log" ,
286
+ "-Dlog4j.configuration=storm.log.properties" ,
287
+ ]
251
288
exec_storm_class (
252
289
"backtype.storm.ui.core" ,
253
290
jvmtype = "-server" ,
254
- childopts = childopts ,
291
+ jvmopts = jvmopts ,
255
292
extrajars = [STORM_DIR + "/log4j" , STORM_DIR , STORM_DIR + "/conf" ])
256
293
257
294
def drpc ():
@@ -263,11 +300,11 @@ def drpc():
263
300
See Distributed RPC for more information.
264
301
(https://github.com/nathanmarz/storm/wiki/Distributed-RPC)
265
302
"""
266
- childopts = "-Xmx768m -Dlogfile.name=drpc.log -Dlog4j.configuration=storm.log.properties"
303
+ jvmopts = [ "-Xmx768m" , " -Dlogfile.name=drpc.log" , " -Dlog4j.configuration=storm.log.properties"]
267
304
exec_storm_class (
268
305
"backtype.storm.daemon.drpc" ,
269
306
jvmtype = "-server" ,
270
- childopts = childopts ,
307
+ jvmopts = jvmopts ,
271
308
extrajars = [STORM_DIR + "/log4j" , STORM_DIR + "/conf" ])
272
309
273
310
def dev_zookeeper ():
0 commit comments