Skip to content

Commit 634c665

Browse files
committed
transaction interface refactor
1 parent c5fcb84 commit 634c665

File tree

10 files changed

+196
-155
lines changed

10 files changed

+196
-155
lines changed

examples/src/main/scala/org/apache/gearpump/streaming/examples/kafka/KafkaSpout.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,8 @@ class KafkaSpout(conf: Configs) extends TaskActor(conf) {
7676

7777
private val consumer = config.getConsumer(topicAndPartitions = topicAndPartitions)
7878
private val decoder = new StringDecoder()
79-
private val offsetManager = new OffsetManager(conf)
79+
private val offsetManager = new OffsetManager(
80+
config.getCheckpointManagerFactory.getCheckpointManager(conf), config.getCheckpointFilter)
8081
private val commitIntervalMS = config.getCheckpointCommitIntervalMS
8182
private var lastCommitTime = System.currentTimeMillis()
8283

streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/CheckpointFilter.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,12 @@
1919
package org.apache.gearpump.streaming.transaction.api
2020

2121
import org.apache.gearpump.TimeStamp
22+
import org.apache.gearpump.streaming.transaction.api.CheckpointManager._
2223
import org.apache.gearpump.util.Configs
2324

24-
trait CheckpointFilter {
25-
def filter(checkpoint: Checkpoint, timestamp: TimeStamp,
26-
conf: Configs): Option[Long]
25+
class CheckpointFilter(conf: Configs) {
26+
def filter(records: List[Record], timestamp: TimeStamp): Option[Record] = {
27+
timeAndOffsets.sortBy(_._1).find(_._1 > timestamp)
28+
}
2729
}
2830

streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/CheckpointManager.scala

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -18,35 +18,35 @@
1818

1919
package org.apache.gearpump.streaming.transaction.api
2020

21-
import org.apache.gearpump.TimeStamp
21+
object CheckpointManager {
2222

23-
/**
24-
* a Source consists of its name and partition
25-
*/
26-
trait Source {
27-
def name: String
28-
def partition: Int
29-
}
23+
trait Source {
24+
def name: String
3025

31-
/**
32-
* a Checkpoint is a map from message timestamps to
33-
* message offsets of the input stream
34-
*/
35-
object Checkpoint {
36-
def apply(timestamp: TimeStamp, offset: Long): Checkpoint =
37-
Checkpoint(Map(timestamp -> offset))
26+
def partition: Int
27+
}
28+
29+
type Record = (Array[Byte], Array[Byte])
30+
31+
object Checkpoint {
32+
def apply(key: Array[Byte], payload: Array[Byte]): Checkpoint =
33+
Checkpoint(List((key, payload)))
34+
35+
def empty: Checkpoint =
36+
Checkpoint(List.empty[Record])
37+
38+
}
3839

39-
def empty: Checkpoint =
40-
Checkpoint(Map.empty[TimeStamp, Long])
40+
case class Checkpoint(records: List[Record])
4141
}
42-
case class Checkpoint(timeAndOffsets: Map[TimeStamp, Long])
4342

4443

4544
/**
4645
* CheckpointManager checkpoints message and its timestamp to a persistent system
4746
* such that we could replay messages around or after given time
4847
*/
4948
trait CheckpointManager {
49+
import org.apache.gearpump.streaming.transaction.api.CheckpointManager._
5050

5151
def start(): Unit
5252

streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/OffsetManager.scala

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,18 @@
1919
package org.apache.gearpump.streaming.transaction.api
2020

2121
import org.apache.gearpump.TimeStamp
22-
import org.apache.gearpump.streaming.transaction.kafka.KafkaConfig._
23-
import org.apache.gearpump.util.Configs
22+
import org.apache.gearpump.streaming.transaction.api.CheckpointManager._
23+
import org.apache.gearpump.streaming.transaction.kafka.KafkaUtil._
2424
import org.slf4j.{LoggerFactory, Logger}
2525

2626
object OffsetManager {
2727
private val LOG: Logger = LoggerFactory.getLogger(classOf[OffsetManager])
2828
}
29-
class OffsetManager(conf: Configs) {
29+
30+
class OffsetManager(checkpointManager: CheckpointManager,
31+
filter: CheckpointFilter) {
3032
import org.apache.gearpump.streaming.transaction.api.OffsetManager._
3133

32-
private val config = conf.config
33-
private val filter = config.getCheckpointFilter
34-
private val checkpointManager =
35-
config.getCheckpointManagerFactory.getCheckpointManager(conf)
3634
private var sources: Array[Source] = null
3735
private var offsetsByTimeAndSource = Map.empty[(Source, TimeStamp), Long]
3836

@@ -55,12 +53,20 @@ class OffsetManager(conf: Configs) {
5553
def checkpoint: Map[Source, Checkpoint] = {
5654
val checkpointsBySource = offsetsByTimeAndSource
5755
.groupBy(_._1._1)
58-
.mapValues[Checkpoint](values => {
59-
Checkpoint(values.map(entry => (entry._1._2, entry._2)))
60-
})
56+
.map { grouped => {
57+
val source = grouped._1
58+
val records = grouped._2.map {
59+
entry =>
60+
val timestamp = entry._1._2
61+
val offset = entry._2
62+
(longToByteArray(timestamp), longToByteArray(offset))
63+
}.toList
64+
source -> Checkpoint(records)
65+
}
66+
}
6167
checkpointsBySource.foreach {
62-
sourceAndCheckpoint =>
63-
checkpointManager.writeCheckpoint(sourceAndCheckpoint._1,
68+
sourceAndCheckpoint =>
69+
checkpointManager.writeCheckpoint(sourceAndCheckpoint._1,
6470
sourceAndCheckpoint._2)
6571
}
6672

@@ -71,8 +77,8 @@ class OffsetManager(conf: Configs) {
7177
def loadStartingOffsets(timestamp: TimeStamp): Map[Source, Long] = {
7278
LOG.info("loading start offsets...")
7379
sources.foldLeft(Map.empty[Source, Long]) { (accum, source) =>
74-
filter.filter(checkpointManager.readCheckpoint(source), timestamp, conf) match {
75-
case Some(offset) => accum + (source -> offset)
80+
filter.filter(checkpointManager.readCheckpoint(source).timeAndOffsets.toList, timestamp) match {
81+
case Some((_, offset)) => accum + (source -> offset)
7682
case None => accum
7783
}
7884
}

streaming/src/main/scala/org/apache/gearpump/streaming/transaction/kafka/KafkaCheckpointManager.scala

Lines changed: 29 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -20,37 +20,34 @@ package org.apache.gearpump.streaming.transaction.kafka
2020

2121
import kafka.admin.AdminUtils
2222
import kafka.common.{TopicExistsException, TopicAndPartition}
23-
import org.apache.gearpump.TimeStamp
24-
import org.apache.gearpump.streaming.transaction.api.{Checkpoint, CheckpointManager}
25-
import org.apache.gearpump.streaming.transaction.api.Source
26-
import org.apache.gearpump.streaming.transaction.kafka.KafkaConfig._
27-
import org.apache.gearpump.streaming.transaction.kafka.KafkaUtil._
28-
import org.apache.gearpump.util.Configs
23+
import org.apache.gearpump.streaming.transaction.api.CheckpointManager
24+
import org.apache.gearpump.streaming.transaction.api.CheckpointManager._
2925
import org.slf4j.{Logger, LoggerFactory}
3026

3127
import scala.collection.mutable.{Map => MutableMap}
32-
import kafka.consumer.SimpleConsumer
33-
import kafka.utils.ZkUtils
28+
import org.I0Itec.zkclient.ZkClient
3429

3530

3631
object KafkaCheckpointManager {
3732
private val LOG: Logger = LoggerFactory.getLogger(classOf[KafkaCheckpointManager])
3833
}
3934

40-
class KafkaCheckpointManager(conf: Configs) extends CheckpointManager {
35+
class KafkaCheckpointManager(checkpointId: Int,
36+
checkpointReplicas: Int,
37+
producer: KafkaProducer[Array[Byte], Array[Byte]],
38+
clientId: String,
39+
socketTimeout: Int,
40+
receiveBufferSize: Int,
41+
fetchSize: Int,
42+
zkClient: ZkClient
43+
) extends CheckpointManager {
4144
import org.apache.gearpump.streaming.transaction.kafka.KafkaCheckpointManager._
4245

43-
private val config = conf.config
44-
private val producer = config.getProducer[Array[Byte], Array[Byte]](
45-
producerConfig = config.getProducerConfig(serializerClass = "kafka.serializer.DefaultEncoder")
46-
)
47-
4846
private var checkpointTopicAndPartitions: Array[TopicAndPartition] = null
4947
private val topicExists: MutableMap[TopicAndPartition, Boolean] = MutableMap.empty[TopicAndPartition, Boolean]
5048

5149
override def start(): Unit = {
5250
createTopics()
53-
5451
}
5552

5653
override def register(topicAndPartitions: Array[Source]): Unit = {
@@ -61,74 +58,70 @@ class KafkaCheckpointManager(conf: Configs) extends CheckpointManager {
6158
override def writeCheckpoint(source: Source,
6259
checkpoint: Checkpoint): Unit = {
6360
val checkpointTopicAndPartition = getCheckpointTopicAndPartition(source)
64-
checkpoint.timeAndOffsets.foreach(timeAndOffset => {
65-
producer.send(checkpointTopicAndPartition.topic, longToByteArray(timeAndOffset._1),
66-
0, longToByteArray(timeAndOffset._2))
61+
checkpoint.records.foreach(record => {
62+
producer.send(checkpointTopicAndPartition.topic, record._1, 0, record._2)
6763
})
68-
6964
}
7065

7166
override def readCheckpoint(source: Source): Checkpoint = {
7267
val topicAndPartition = TopicAndPartition(source.name, source.partition)
68+
// no checkpoint to read for the first time
7369
if (!topicExists.getOrElse(topicAndPartition, false)) {
74-
Checkpoint(Map.empty[TimeStamp, Long])
70+
Checkpoint.empty
7571
} else {
7672
// get consumers only after topics having been created
7773
LOG.info("creating consumer...")
7874
val msgIter = consume(topicAndPartition)
7975
val checkpointTopicAndPartition = getCheckpointTopicAndPartition(source)
8076

8177
@annotation.tailrec
82-
def fetch(timeAndOffsets: Map[TimeStamp, Long]): Map[TimeStamp, Long] = {
78+
def fetch(records: List[Record]): List[Record] = {
8379
if (msgIter.hasNext) {
8480
val key = msgIter.getKey
8581
if (key != null) {
86-
val timestamp = byteArrayToLong(key)
87-
val offset = byteArrayToLong(msgIter.next)
88-
fetch(timeAndOffsets + (timestamp -> offset))
82+
val r: Record = (key, msgIter.next)
83+
fetch(records :+ r)
8984
} else {
9085
// TODO: this should not happen; need further investigation
9186
LOG.error(s"timestamp is null at offset ${msgIter.getOffset} for ${checkpointTopicAndPartition}")
92-
fetch(timeAndOffsets)
87+
fetch(records)
9388
}
9489
} else {
95-
timeAndOffsets
90+
msgIter.close()
91+
records
9692
}
9793
}
98-
msgIter.close()
99-
Checkpoint(fetch(Map.empty[TimeStamp, Long]))
94+
Checkpoint(fetch(List.empty[Record]))
10095
}
10196
}
10297

10398
override def close(): Unit = {
10499
producer.close()
100+
zkClient.close()
105101
}
106102

107103

108104
private def consume(topicAndPartition: TopicAndPartition): MessageIterator = {
109105
val topic = topicAndPartition.topic
110106
val partition = topicAndPartition.partition
111-
val broker = KafkaUtil.getBroker(config.getZkClient(), topic, partition)
112-
new MessageIterator(broker.host, broker.port, topic, partition, config.getSocketTimeoutMS,
113-
config.getSocketReceiveBufferSize, config.getFetchMessageMaxBytes, config.getClientId)
107+
val broker = KafkaUtil.getBroker(zkClient, topic, partition)
108+
new MessageIterator(broker.host, broker.port, topic, partition, socketTimeout,
109+
receiveBufferSize, fetchSize, clientId)
114110
}
115111

116112
private def createTopics(): Unit = {
117113
checkpointTopicAndPartitions.foreach {
118114
tp => {
119-
val zkClient = config.getZkClient()
120115
try {
121116
val topic = tp.topic
122-
AdminUtils.createTopic(zkClient, topic, 1, config.getCheckpointReplicas)
117+
AdminUtils.createTopic(zkClient, topic, 1, checkpointReplicas)
123118
topicExists.put(tp, false)
124119
} catch {
125120
case tee: TopicExistsException => {
126121
LOG.info(s"${tp} already exists")
127122
topicExists.put(tp, true)
128123
}
129124
case e: Exception => throw e
130-
} finally {
131-
zkClient.close()
132125
}
133126
}
134127
}
@@ -139,7 +132,7 @@ class KafkaCheckpointManager(conf: Configs) extends CheckpointManager {
139132
}
140133

141134
private def getCheckpointTopicAndPartition(source: Source): TopicAndPartition = {
142-
TopicAndPartition(getCheckpointTopic(config.getCheckpointId, source.name, source.partition), 0)
135+
TopicAndPartition(getCheckpointTopic(checkpointId, source.name, source.partition), 0)
143136
}
144137

145138
}

streaming/src/main/scala/org/apache/gearpump/streaming/transaction/kafka/KafkaCheckpointManagerFactory.scala

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,25 @@
1919
package org.apache.gearpump.streaming.transaction.kafka
2020

2121
import org.apache.gearpump.streaming.transaction.api.{CheckpointManager, CheckpointManagerFactory}
22+
import org.apache.gearpump.streaming.transaction.kafka.KafkaConfig._
2223
import org.apache.gearpump.util.Configs
2324

2425
class KafkaCheckpointManagerFactory extends CheckpointManagerFactory {
2526
override def getCheckpointManager(conf: Configs): CheckpointManager = {
26-
new KafkaCheckpointManager(conf)
27+
val config = conf.config
28+
val checkpointId = config.getCheckpointId
29+
val checkpointReplicas = config.getCheckpointReplicas
30+
val producer = config.getProducer[Array[Byte], Array[Byte]](
31+
producerConfig = config.getProducerConfig(
32+
serializerClass = "kafka.serializer.DefaultEncoder")
33+
)
34+
val clientId = config.getClientId
35+
val socketTimeout = config.getSocketTimeoutMS
36+
val receiveBufferSize = config.getSocketReceiveBufferSize
37+
val fetchSize = config.getFetchMessageMaxBytes
38+
val zkClient = config.getZkClient()
39+
new KafkaCheckpointManager(
40+
checkpointId, checkpointReplicas, producer, clientId,
41+
socketTimeout, receiveBufferSize, fetchSize, zkClient)
2742
}
2843
}

0 commit comments

Comments
 (0)