@@ -28,6 +28,7 @@ import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer}
28
28
import org .apache .kafka .common .serialization .ByteArraySerializer
29
29
import org .slf4j .Logger
30
30
31
+ import scala .collection .mutable
31
32
import scala .util .{Try , Failure , Success }
32
33
33
34
object KafkaStorage {
@@ -87,23 +88,19 @@ private[kafka] class KafkaStorage(topic: String,
87
88
}
88
89
89
90
private [kafka] def load (consumer : KafkaConsumer ): List [(TimeStamp , Array [Byte ])] = {
90
- @ annotation.tailrec
91
- def fetch (offsets : List [(TimeStamp , Array [Byte ])]): List [(TimeStamp , Array [Byte ])] = {
92
- if (consumer.hasNext) {
93
- val kafkaMsg = consumer.next
94
- val offset = kafkaMsg.key.map { k =>
95
- Injection .invert[TimeStamp , Array [Byte ]](k) match {
96
- case Success (time) => (time, kafkaMsg.msg)
97
- case Failure (e) => throw e
98
- }
99
- } orElse (throw new RuntimeException (" offset key should not be null" ))
100
- fetch(offsets :+ offset.get)
101
- } else {
102
- consumer.close()
103
- offsets
104
- }
91
+ var messagesBuilder = new mutable.ArrayBuilder .ofRef[(TimeStamp , Array [Byte ])]
92
+ while (consumer.hasNext) {
93
+ val kafkaMsg = consumer.next
94
+ kafkaMsg.key.map { k =>
95
+ Injection .invert[TimeStamp , Array [Byte ]](k) match {
96
+ case Success (time) =>
97
+ messagesBuilder += (time -> kafkaMsg.msg)
98
+ case Failure (e) => throw e
99
+ }
100
+ } orElse (throw new RuntimeException (" offset key should not be null" ))
105
101
}
106
- fetch(List .empty[(TimeStamp , Array [Byte ])])
102
+ consumer.close()
103
+ messagesBuilder.result().toList
107
104
}
108
105
109
106
}
0 commit comments