@@ -49,6 +49,7 @@ private OptionParser getOptionParser() {
49
49
parser .accepts ( "log_level" , "log level, one of DEBUG|INFO|WARN|ERROR" ).withRequiredArg ();
50
50
parser .accepts ( "host" , "mysql host" ).withRequiredArg ();
51
51
parser .accepts ( "user" , "mysql username" ).withRequiredArg ();
52
+ parser .accepts ( "output_file" , "output file for 'file' producer" ).withRequiredArg ();
52
53
parser .accepts ( "password" , "mysql password" ).withRequiredArg ();
53
54
parser .accepts ( "port" , "mysql port" ).withRequiredArg ();
54
55
parser .accepts ( "producer" , "producer type: stdout|file|kafka" ).withRequiredArg ();
@@ -97,6 +98,9 @@ private void parse(String [] argv) {
97
98
98
99
if ( options .has ("kafka_topic" ))
99
100
this .kafkaTopic = (String ) options .valueOf ("kafka_topic" );
101
+
102
+ if ( options .has ("output_file" ))
103
+ this .outputFile = (String ) options .valueOf ("output_file" );
100
104
}
101
105
102
106
private Properties readPropertiesFile (String filename , Boolean abortOnMissing ) {
@@ -155,6 +159,9 @@ private void setDefaults() {
155
159
} else if ( this .producerType .equals ("kafka" )
156
160
&& !this .kafkaProperties .containsKey ("bootstrap.servers" )) {
157
161
usage ("You must specify kafka.bootstrap.servers for the kafka producer!" );
162
+ } else if ( this .producerType .equals ("file" )
163
+ && this .outputFile == null ) {
164
+ usage ("please specify --output_file=FILE to use the file producer" );
158
165
}
159
166
160
167
if ( this .mysqlPort == null )
0 commit comments