Skip to content

Commit 86c45f4

Browse files
author
Ben Osheroff
committed
Merge pull request zendesk#73 from zendesk/file_producer_fixes
fixes for file producer
2 parents 89ef98c + 4d20670 commit 86c45f4

File tree

3 files changed

+8
-4
lines changed

3 files changed

+8
-4
lines changed

src/main/java/com/zendesk/maxwell/MaxwellConfig.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ private OptionParser getOptionParser() {
4949
parser.accepts( "log_level", "log level, one of DEBUG|INFO|WARN|ERROR" ).withRequiredArg();
5050
parser.accepts( "host", "mysql host" ).withRequiredArg();
5151
parser.accepts( "user", "mysql username" ).withRequiredArg();
52+
parser.accepts( "output_file", "output file for 'file' producer" ).withRequiredArg();
5253
parser.accepts( "password", "mysql password" ).withRequiredArg();
5354
parser.accepts( "port", "mysql port" ).withRequiredArg();
5455
parser.accepts( "producer", "producer type: stdout|file|kafka" ).withRequiredArg();
@@ -97,6 +98,9 @@ private void parse(String [] argv) {
9798

9899
if ( options.has("kafka_topic"))
99100
this.kafkaTopic = (String) options.valueOf("kafka_topic");
101+
102+
if ( options.has("output_file"))
103+
this.outputFile = (String) options.valueOf("output_file");
100104
}
101105

102106
private Properties readPropertiesFile(String filename, Boolean abortOnMissing) {
@@ -155,6 +159,9 @@ private void setDefaults() {
155159
} else if ( this.producerType.equals("kafka")
156160
&& !this.kafkaProperties.containsKey("bootstrap.servers")) {
157161
usage("You must specify kafka.bootstrap.servers for the kafka producer!");
162+
} else if ( this.producerType.equals("file")
163+
&& this.outputFile == null) {
164+
usage("please specify --output_file=FILE to use the file producer");
158165
}
159166

160167
if ( this.mysqlPort == null )

src/main/java/com/zendesk/maxwell/producer/AbstractProducer.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,5 @@ public AbstractProducer(MaxwellContext context) {
1111
}
1212
abstract public void push(MaxwellAbstractRowsEvent e) throws Exception;
1313

14-
public void onComplete(MaxwellAbstractRowsEvent e) {
15-
System.out.println("processed " + e.getBinlogFilename() + ":" + e.getHeader().getPosition());
16-
}
14+
public void onComplete(MaxwellAbstractRowsEvent e) { }
1715
}

src/main/java/com/zendesk/maxwell/producer/FileProducer.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ public void push(MaxwellAbstractRowsEvent e) throws Exception {
2424
this.fileWriter.write('\n');
2525
this.fileWriter.flush();
2626
}
27-
this.onComplete(e);
2827
context.setInitialPosition(e.getNextBinlogPosition());
2928
}
3029
}

0 commit comments

Comments
 (0)