Skip to content

Commit cc367a1

Browse files
committed
Merge remote branch 'origin/at_least_once' into add_test
2 parents 019152e + 174b9e7 commit cc367a1

File tree

5 files changed

+46
-26
lines changed

5 files changed

+46
-26
lines changed

examples/src/main/scala/org/apache/gearpump/streaming/examples/kafka/KafkaSpout.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ class KafkaSpout(conf: Configs) extends TaskActor(conf) {
105105
output(new Message(decoder.fromBytes(kafkaMsg.msg)))
106106
offsetManager.update(KafkaSource(kafkaMsg.topicAndPartition), timestamp, kafkaMsg.offset)
107107
} else {
108-
LOG.info(s"no more messages from ${topicAndPartitions(tpIndex)}")
108+
LOG.debug(s"no more messages from ${topicAndPartitions(tpIndex)}")
109109
}
110110
// poll message from each TopicAndPartition in a round-robin way
111111
// TODO: make it configurable

examples/src/main/scala/org/apache/gearpump/streaming/examples/kafka/Sum.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ class Sum (conf : Configs) extends TaskActor(conf) {
4646
val word = msg.msg.asInstanceOf[String]
4747
val count = current + 1
4848
map.put(word, count)
49-
output(new Message(word -> count.toString(), System.currentTimeMillis()))
49+
output(new Message(s"${msg.timestamp}" -> s"${word}:${count}", System.currentTimeMillis()))
5050
}
5151

5252
override def onStop() : Unit = {

streaming/src/main/scala/org/apache/gearpump/streaming/transaction/kafka/KafkaCheckpointManager.scala

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ import org.apache.gearpump.util.Configs
2929
import org.slf4j.{Logger, LoggerFactory}
3030

3131
import scala.collection.mutable.{Map => MutableMap}
32+
import kafka.consumer.SimpleConsumer
33+
import kafka.utils.ZkUtils
3234

3335

3436
object KafkaCheckpointManager {
@@ -45,7 +47,6 @@ class KafkaCheckpointManager(conf: Configs) extends CheckpointManager {
4547

4648
private var checkpointTopicAndPartitions: Array[TopicAndPartition] = null
4749
private val topicExists: MutableMap[TopicAndPartition, Boolean] = MutableMap.empty[TopicAndPartition, Boolean]
48-
private var consumer: KafkaConsumer = null
4950

5051
override def start(): Unit = {
5152
createTopics()
@@ -68,41 +69,48 @@ class KafkaCheckpointManager(conf: Configs) extends CheckpointManager {
6869
}
6970

7071
override def readCheckpoint(source: Source): Checkpoint = {
71-
if (!topicExists.getOrElse(TopicAndPartition(source.name, source.partition), false)) {
72+
val topicAndPartition = TopicAndPartition(source.name, source.partition)
73+
if (!topicExists.getOrElse(topicAndPartition, false)) {
7274
Checkpoint(Map.empty[TimeStamp, Long])
7375
} else {
7476
// get consumers only after topics having been created
7577
LOG.info("creating consumer...")
76-
if (null == consumer) {
77-
consumer = config.getConsumer(topicAndPartitions = checkpointTopicAndPartitions)
78-
consumer.start()
79-
}
78+
val msgIter = consume(topicAndPartition)
8079
val checkpointTopicAndPartition = getCheckpointTopicAndPartition(source)
8180

8281
@annotation.tailrec
8382
def fetch(timeAndOffsets: Map[TimeStamp, Long]): Map[TimeStamp, Long] = {
84-
val kafkaMsg = consumer.nextMessage(checkpointTopicAndPartition)
85-
if (kafkaMsg != null) {
86-
if (kafkaMsg.key != null) {
87-
fetch(timeAndOffsets +
88-
(byteArrayToLong(kafkaMsg.key) -> byteArrayToLong(kafkaMsg.msg)))
83+
if (msgIter.hasNext) {
84+
val key = msgIter.getKey
85+
if (key != null) {
86+
val timestamp = byteArrayToLong(key)
87+
val offset = byteArrayToLong(msgIter.next)
88+
fetch(timeAndOffsets + (timestamp -> offset))
8989
} else {
90-
LOG.error(s"timestamp is null at offset ${kafkaMsg.offset} for ${checkpointTopicAndPartition}")
90+
// TODO: this should not happen; need further investigation
91+
LOG.error(s"timestamp is null at offset ${msgIter.getOffset} for ${checkpointTopicAndPartition}")
9192
fetch(timeAndOffsets)
9293
}
9394
} else {
9495
timeAndOffsets
9596
}
9697
}
98+
msgIter.close()
9799
Checkpoint(fetch(Map.empty[TimeStamp, Long]))
98100
}
99101
}
100102

101103
override def close(): Unit = {
102104
producer.close()
103-
if (consumer != null) {
104-
consumer.close()
105-
}
105+
}
106+
107+
108+
private def consume(topicAndPartition: TopicAndPartition): MessageIterator = {
109+
val topic = topicAndPartition.topic
110+
val partition = topicAndPartition.partition
111+
val broker = KafkaUtil.getBroker(config.getZkClient(), topic, partition)
112+
new MessageIterator(broker.host, broker.port, topic, partition, config.getSocketTimeoutMS,
113+
config.getSocketReceiveBufferSize, config.getFetchMessageMaxBytes, config.getClientId)
106114
}
107115

108116
private def createTopics(): Unit = {

streaming/src/main/scala/org/apache/gearpump/streaming/transaction/kafka/KafkaConsumer.scala

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -54,14 +54,9 @@ class KafkaConsumer(topicAndPartitions: Array[TopicAndPartition],
5454
import org.apache.gearpump.streaming.transaction.kafka.KafkaConsumer._
5555

5656
private val leaders: Map[TopicAndPartition, Broker] = topicAndPartitions.map {
57-
tp =>
58-
val topic = tp.topic
59-
val partition = tp.partition
60-
val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partition)
61-
.getOrElse(throw new Exception(s"leader not available for TopicAndPartition(${tp.topic}, ${tp.partition})"))
62-
val broker = ZkUtils.getBrokerInfo(zkClient, leader)
63-
.getOrElse(throw new Exception(s"broker info not found for leader ${leader}"))
64-
tp -> Broker(broker.host, broker.port)
57+
tp => {
58+
tp -> KafkaUtil.getBroker(zkClient, tp.topic, tp.partition)
59+
}
6560
}.toMap
6661

6762
private val iterators: Map[TopicAndPartition, MessageIterator] = topicAndPartitions.map(
@@ -87,7 +82,7 @@ class KafkaConsumer(topicAndPartitions: Array[TopicAndPartition],
8782
if (msg != null) {
8883
incomingQueue(msg.topicAndPartition).put(msg)
8984
} else if (noMessages.size == topicAndPartitions.size) {
90-
LOG.info(s"no messages for all TopicAndPartitions. sleeping for ${noMessageSleepMS} ms")
85+
LOG.debug(s"no messages for all TopicAndPartitions. sleeping for ${noMessageSleepMS} ms")
9186
Thread.sleep(noMessageSleepMS)
9287
}
9388
}
@@ -107,6 +102,7 @@ class KafkaConsumer(topicAndPartitions: Array[TopicAndPartition],
107102
}
108103

109104
// fetch message from each TopicAndPartition in a round-robin way
105+
// TODO: make each MessageIterator run in its own thread
110106
private def fetchMessage(): KafkaMessage = {
111107
val msg = fetchMessage(topicAndPartitions(partitionIndex))
112108
partitionIndex = (partitionIndex + 1) % partitionNum
@@ -128,6 +124,8 @@ class KafkaConsumer(topicAndPartitions: Array[TopicAndPartition],
128124

129125
def close(): Unit = {
130126
iterators.foreach(_._2.close())
127+
fetchThread.interrupt()
128+
fetchThread.join()
131129
}
132130
}
133131

streaming/src/main/scala/org/apache/gearpump/streaming/transaction/kafka/KafkaUtil.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,11 @@
1818

1919
package org.apache.gearpump.streaming.transaction.kafka
2020

21+
import kafka.common.TopicAndPartition
22+
import kafka.utils.ZkUtils
23+
import org.apache.gearpump.streaming.transaction.kafka.KafkaConsumer.Broker
24+
import org.I0Itec.zkclient.ZkClient
25+
2126
object KafkaUtil {
2227

2328
def longToByteArray(long: Long): Array[Byte] = {
@@ -27,4 +32,13 @@ object KafkaUtil {
2732
def byteArrayToLong(bytes: Array[Byte]): Long = {
2833
java.nio.ByteBuffer.wrap(bytes).getLong
2934
}
35+
36+
37+
def getBroker(zkClient: ZkClient, topic: String, partition: Int): Broker = {
38+
val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partition)
39+
.getOrElse(throw new Exception(s"leader not available for TopicAndPartition(${topic}, ${partition})"))
40+
val broker = ZkUtils.getBrokerInfo(zkClient, leader)
41+
.getOrElse(throw new Exception(s"broker info not found for leader ${leader}"))
42+
Broker(broker.host, broker.port)
43+
}
3044
}

0 commit comments

Comments
 (0)