Skip to content

Commit 1a7e7bf

Browse files
committed
fix gearpump#502, fix KafkaStorage loading data
1 parent 0ae4ad0 commit 1a7e7bf

File tree

1 file changed

+13
-16
lines changed

1 file changed

+13
-16
lines changed

external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/KafkaStorage.scala

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer}
2828
import org.apache.kafka.common.serialization.ByteArraySerializer
2929
import org.slf4j.Logger
3030

31+
import scala.collection.mutable
3132
import scala.util.{Try, Failure, Success}
3233

3334
object KafkaStorage {
@@ -87,23 +88,19 @@ private[kafka] class KafkaStorage(topic: String,
8788
}
8889

8990
private[kafka] def load(consumer: KafkaConsumer): List[(TimeStamp, Array[Byte])] = {
90-
@annotation.tailrec
91-
def fetch(offsets: List[(TimeStamp, Array[Byte])]): List[(TimeStamp, Array[Byte])] = {
92-
if (consumer.hasNext) {
93-
val kafkaMsg = consumer.next
94-
val offset = kafkaMsg.key.map { k =>
95-
Injection.invert[TimeStamp, Array[Byte]](k) match {
96-
case Success(time) => (time, kafkaMsg.msg)
97-
case Failure(e) => throw e
98-
}
99-
} orElse (throw new RuntimeException("offset key should not be null"))
100-
fetch(offsets :+ offset.get)
101-
} else {
102-
consumer.close()
103-
offsets
104-
}
91+
var messagesBuilder = new mutable.ArrayBuilder.ofRef[(TimeStamp, Array[Byte])]
92+
while (consumer.hasNext) {
93+
val kafkaMsg = consumer.next
94+
kafkaMsg.key.map { k =>
95+
Injection.invert[TimeStamp, Array[Byte]](k) match {
96+
case Success(time) =>
97+
messagesBuilder += (time -> kafkaMsg.msg)
98+
case Failure(e) => throw e
99+
}
100+
} orElse (throw new RuntimeException("offset key should not be null"))
105101
}
106-
fetch(List.empty[(TimeStamp, Array[Byte])])
102+
consumer.close()
103+
messagesBuilder.result().toList
107104
}
108105

109106
}

0 commit comments

Comments
 (0)