Skip to content

Commit 5f01f53

Browse files
committed
fix message iterator and checkpoint bugs
1 parent 918001b commit 5f01f53

File tree

6 files changed

+30
-48
lines changed

6 files changed

+30
-48
lines changed

examples/src/main/scala/org/apache/gearpump/streaming/examples/kafka/KafkaSpout.scala

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -85,11 +85,12 @@ class KafkaSpout(conf: Configs) extends TaskActor(conf) {
8585
override def onStart(taskContext: TaskContext): Unit = {
8686
offsetManager.register(topicAndPartitions.map(KafkaSource(_)))
8787
offsetManager.start()
88-
offsetManager.loadStartingOffsets(taskContext.startTime).foreach{
88+
offsetManager.loadStartOffsets(taskContext.startTime).foreach{
8989
entry =>
9090
val source = entry._1
9191
val offset = entry._2
9292
val topicAndPartition = TopicAndPartition(source.name, source.partition)
93+
LOG.info(s"set start offsets for ${topicAndPartition}")
9394
consumer.setStartOffset(topicAndPartition, offset)
9495
}
9596
consumer.start()
@@ -110,6 +111,11 @@ class KafkaSpout(conf: Configs) extends TaskActor(conf) {
110111
} else {
111112
LOG.debug(s"no more messages from ${topicAndPartitions(tpIndex)}")
112113
}
114+
if (shouldCommitCheckpoint) {
115+
LOG.info("committing checkpoint...")
116+
offsetManager.checkpoint
117+
lastCommitTime = System.currentTimeMillis()
118+
}
113119
// poll message from each TopicAndPartition in a round-robin way
114120
// TODO: make it configurable
115121
if (tpIndex + 1 == topicAndPartitions.size) {
@@ -120,11 +126,6 @@ class KafkaSpout(conf: Configs) extends TaskActor(conf) {
120126
}
121127
}
122128
fetchAndEmit(0, 0)
123-
if (shouldCommitCheckpoint) {
124-
LOG.info("committing checkpoint...")
125-
offsetManager.checkpoint
126-
lastCommitTime = System.currentTimeMillis()
127-
}
128129
self ! Message("continue", Message.noTimeStamp)
129130
}
130131

streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/OffsetManager.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,7 @@ class OffsetManager(checkpointManager: CheckpointManager[TimeStamp, Long],
8787
checkpointsBySource
8888
}
8989

90-
def loadStartingOffsets(timestamp: TimeStamp): Map[Source, Long] = {
91-
LOG.info("loading start offsets...")
90+
def loadStartOffsets(timestamp: TimeStamp): Map[Source, Long] = {
9291
checkpointManager.sourceAndCheckpoints(OffsetSerDe).foldLeft(Map.empty[Source, Long]) { (accum, iter) =>
9392
filter.filter(iter._2.records, timestamp) match {
9493
case Some((_, offset)) => accum + (iter._1 -> offset)

streaming/src/main/scala/org/apache/gearpump/streaming/transaction/kafka/KafkaCheckpointManager.scala

Lines changed: 9 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ class KafkaCheckpointManager[K, V](checkpointId: Int,
4444

4545
private var sources: Array[Source] = null
4646
private var checkpointTopicAndPartitions: Array[TopicAndPartition] = null
47-
private val topicExists: MutableMap[TopicAndPartition, Boolean] = MutableMap.empty[TopicAndPartition, Boolean]
47+
private var existingTopics: Set[TopicAndPartition] = Set.empty[TopicAndPartition]
4848

4949
override def start(): Unit = {
5050
createTopics()
@@ -75,28 +75,18 @@ class KafkaCheckpointManager[K, V](checkpointId: Int,
7575
}
7676

7777
override def readCheckpoint(source: Source, checkpointSerDe: CheckpointSerDe[K, V]): Checkpoint[K, V] = {
78-
val topicAndPartition = TopicAndPartition(source.name, source.partition)
78+
val checkpointTopicAndPartition = getCheckpointTopicAndPartition(source)
7979
// no checkpoint to read for the first time
80-
if (!topicExists.getOrElse(topicAndPartition, false)) {
80+
if (!existingTopics(checkpointTopicAndPartition)) {
8181
Checkpoint.empty
8282
} else {
83-
// get consumers only after topics having been created
84-
LOG.info("creating consumer...")
85-
val msgIter = consume(topicAndPartition)
86-
val checkpointTopicAndPartition = getCheckpointTopicAndPartition(source)
87-
83+
val msgIter = getMessageIterator(checkpointTopicAndPartition)
8884
@annotation.tailrec
8985
def fetch(records: List[(K, V)]): List[(K, V)] = {
9086
if (msgIter.hasNext) {
91-
val key = msgIter.getKey
92-
if (key != null) {
93-
val r: (K, V) = (checkpointSerDe.fromKeyBytes(key), checkpointSerDe.fromValueBytes(msgIter.next))
94-
fetch(records :+ r)
95-
} else {
96-
// TODO: this should not happen; need further investigation
97-
LOG.error(s"timestamp is null at offset ${msgIter.getOffset} for ${checkpointTopicAndPartition}")
98-
fetch(records)
99-
}
87+
val (_, key, payload) = msgIter.next
88+
val r: (K, V) = (checkpointSerDe.fromKeyBytes(key), checkpointSerDe.fromValueBytes(payload))
89+
fetch(records :+ r)
10090
} else {
10191
msgIter.close()
10292
records
@@ -112,7 +102,7 @@ class KafkaCheckpointManager[K, V](checkpointId: Int,
112102
}
113103

114104

115-
private def consume(topicAndPartition: TopicAndPartition): MessageIterator = {
105+
private def getMessageIterator(topicAndPartition: TopicAndPartition): MessageIterator = {
116106
val topic = topicAndPartition.topic
117107
val partition = topicAndPartition.partition
118108
val broker = KafkaUtil.getBroker(zkClient, topic, partition)
@@ -126,11 +116,10 @@ class KafkaCheckpointManager[K, V](checkpointId: Int,
126116
try {
127117
val topic = tp.topic
128118
AdminUtils.createTopic(zkClient, topic, 1, checkpointReplicas)
129-
topicExists.put(tp, false)
130119
} catch {
131120
case tee: TopicExistsException => {
132121
LOG.info(s"${tp} already exists")
133-
topicExists.put(tp, true)
122+
existingTopics += tp
134123
}
135124
case e: Exception => throw e
136125
}

streaming/src/main/scala/org/apache/gearpump/streaming/transaction/kafka/KafkaConsumer.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ class KafkaConsumer(topicAndPartitions: Array[TopicAndPartition],
6464
val tp = iter._1
6565
val broker = iter._2
6666
if (!accum.contains(broker)) {
67-
val fetchThread = new FetchThread(broker)
67+
val fetchThread = new FetchThread(broker.host, broker.port)
6868
fetchThread.addTopicAndPartition(tp)
6969
accum + (broker -> fetchThread)
7070
} else {
@@ -94,7 +94,7 @@ class KafkaConsumer(topicAndPartitions: Array[TopicAndPartition],
9494
fetchThreads.foreach(_._2.join())
9595
}
9696

97-
class FetchThread(broker: Broker) extends Thread {
97+
class FetchThread(host: String, port: Int) extends Thread {
9898
private var topicAndPartitions: List[TopicAndPartition] = List.empty[TopicAndPartition]
9999
private var iterators: Map[TopicAndPartition, MessageIterator] = Map.empty[TopicAndPartition, MessageIterator]
100100

@@ -103,7 +103,7 @@ class KafkaConsumer(topicAndPartitions: Array[TopicAndPartition],
103103

104104
def addTopicAndPartition(topicAndPartition: TopicAndPartition) = {
105105
topicAndPartitions :+= topicAndPartition
106-
val iter = new MessageIterator(broker.host, broker.port, topicAndPartition.topic, topicAndPartition.partition,
106+
val iter = new MessageIterator(host, port, topicAndPartition.topic, topicAndPartition.partition,
107107
socketTimeout, socketBufferSize, fetchSize, clientId)
108108
iterators += topicAndPartition -> iter
109109
}
@@ -129,7 +129,8 @@ class KafkaConsumer(topicAndPartitions: Array[TopicAndPartition],
129129
if (queue.size < fetchThreshold) {
130130
val iter = iterators(tp)
131131
if (iter.hasNext) {
132-
val msg = KafkaMessage(tp, iter.getOffset, iter.getKey, iter.next)
132+
val (offset, key, payload) = iter.next
133+
val msg = KafkaMessage(tp, offset, key, payload)
133134
queue.put((msg, timeExtractor(msg)))
134135
hasNextSet += tp
135136
} else {

streaming/src/main/scala/org/apache/gearpump/streaming/transaction/kafka/KafkaProducer.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ class KafkaProducer[K, V](config: ProducerConfig,
2828
private var buffer = ArrayBuffer[KeyedMessage[K, V]]()
2929
private val producer = new Producer[K, V](config)
3030

31-
def send(topic: String, key: K, msg: V): Unit = send(topic, key, null, msg)
31+
def send(topic: String, key: K, msg: V): Unit = send(topic, key, key, msg)
3232

3333
def send(topic: String, key: K, partKey: Any, msg: V): Unit = {
3434
buffer += new KeyedMessage[K, V](topic, key, partKey, msg)

streaming/src/main/scala/org/apache/gearpump/streaming/transaction/kafka/MessageIterator.scala

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -40,30 +40,22 @@ class MessageIterator(host: String,
4040
OffsetRequest.EarliestTime, -1)
4141
private var iter = iterator(startOffset)
4242
private var readMessages = 0L
43-
private var offset = startOffset
44-
private var key: Array[Byte] = null
45-
private var nextOffset = offset
43+
private var nextOffset = startOffset
4644

4745
def setStartOffset(startOffset: Long): Unit = {
4846
this.startOffset = startOffset
4947
}
5048

51-
def getKey: Array[Byte] = {
52-
key
53-
}
54-
55-
def getOffset: Long = {
56-
offset
57-
}
58-
59-
def next: Array[Byte] = {
49+
def next: (Long, Array[Byte], Array[Byte]) = {
6050
val mo = iter.next()
6151
val message = mo.message
52+
val offset = mo.offset
53+
val key = Utils.readBytes(message.key)
54+
val payload = Utils.readBytes(message.payload)
55+
6256
readMessages += 1
63-
offset = mo.offset
64-
key = Utils.readBytes(message.key)
6557
nextOffset = mo.nextOffset
66-
Utils.readBytes(mo.message.payload)
58+
(offset, key, payload)
6759
}
6860

6961

0 commit comments

Comments
 (0)