@@ -20,37 +20,34 @@ package org.apache.gearpump.streaming.transaction.kafka
20
20
21
21
import kafka .admin .AdminUtils
22
22
import kafka .common .{TopicExistsException , TopicAndPartition }
23
- import org .apache .gearpump .TimeStamp
24
- import org .apache .gearpump .streaming .transaction .api .{Checkpoint , CheckpointManager }
25
- import org .apache .gearpump .streaming .transaction .api .Source
26
- import org .apache .gearpump .streaming .transaction .kafka .KafkaConfig ._
27
- import org .apache .gearpump .streaming .transaction .kafka .KafkaUtil ._
28
- import org .apache .gearpump .util .Configs
23
+ import org .apache .gearpump .streaming .transaction .api .CheckpointManager
24
+ import org .apache .gearpump .streaming .transaction .api .CheckpointManager ._
29
25
import org .slf4j .{Logger , LoggerFactory }
30
26
31
27
import scala .collection .mutable .{Map => MutableMap }
32
- import kafka .consumer .SimpleConsumer
33
- import kafka .utils .ZkUtils
28
+ import org .I0Itec .zkclient .ZkClient
34
29
35
30
36
31
object KafkaCheckpointManager {
37
32
private val LOG : Logger = LoggerFactory .getLogger(classOf [KafkaCheckpointManager ])
38
33
}
39
34
40
- class KafkaCheckpointManager (conf : Configs ) extends CheckpointManager {
35
+ class KafkaCheckpointManager (checkpointId : Int ,
36
+ checkpointReplicas : Int ,
37
+ producer : KafkaProducer [Array [Byte ], Array [Byte ]],
38
+ clientId : String ,
39
+ socketTimeout : Int ,
40
+ receiveBufferSize : Int ,
41
+ fetchSize : Int ,
42
+ zkClient : ZkClient
43
+ ) extends CheckpointManager {
41
44
import org .apache .gearpump .streaming .transaction .kafka .KafkaCheckpointManager ._
42
45
43
- private val config = conf.config
44
- private val producer = config.getProducer[Array [Byte ], Array [Byte ]](
45
- producerConfig = config.getProducerConfig(serializerClass = " kafka.serializer.DefaultEncoder" )
46
- )
47
-
48
46
private var checkpointTopicAndPartitions : Array [TopicAndPartition ] = null
49
47
private val topicExists : MutableMap [TopicAndPartition , Boolean ] = MutableMap .empty[TopicAndPartition , Boolean ]
50
48
51
49
override def start (): Unit = {
52
50
createTopics()
53
-
54
51
}
55
52
56
53
override def register (topicAndPartitions : Array [Source ]): Unit = {
@@ -61,74 +58,70 @@ class KafkaCheckpointManager(conf: Configs) extends CheckpointManager {
61
58
override def writeCheckpoint (source : Source ,
62
59
checkpoint : Checkpoint ): Unit = {
63
60
val checkpointTopicAndPartition = getCheckpointTopicAndPartition(source)
64
- checkpoint.timeAndOffsets.foreach(timeAndOffset => {
65
- producer.send(checkpointTopicAndPartition.topic, longToByteArray(timeAndOffset._1),
66
- 0 , longToByteArray(timeAndOffset._2))
61
+ checkpoint.records.foreach(record => {
62
+ producer.send(checkpointTopicAndPartition.topic, record._1, 0 , record._2)
67
63
})
68
-
69
64
}
70
65
71
66
override def readCheckpoint (source : Source ): Checkpoint = {
72
67
val topicAndPartition = TopicAndPartition (source.name, source.partition)
68
+ // no checkpoint to read for the first time
73
69
if (! topicExists.getOrElse(topicAndPartition, false )) {
74
- Checkpoint ( Map .empty[ TimeStamp , Long ])
70
+ Checkpoint .empty
75
71
} else {
76
72
// get consumers only after topics having been created
77
73
LOG .info(" creating consumer..." )
78
74
val msgIter = consume(topicAndPartition)
79
75
val checkpointTopicAndPartition = getCheckpointTopicAndPartition(source)
80
76
81
77
@ annotation.tailrec
82
- def fetch (timeAndOffsets : Map [ TimeStamp , Long ]): Map [ TimeStamp , Long ] = {
78
+ def fetch (records : List [ Record ]): List [ Record ] = {
83
79
if (msgIter.hasNext) {
84
80
val key = msgIter.getKey
85
81
if (key != null ) {
86
- val timestamp = byteArrayToLong(key)
87
- val offset = byteArrayToLong(msgIter.next)
88
- fetch(timeAndOffsets + (timestamp -> offset))
82
+ val r : Record = (key, msgIter.next)
83
+ fetch(records :+ r)
89
84
} else {
90
85
// TODO: this should not happen; need further investigation
91
86
LOG .error(s " timestamp is null at offset ${msgIter.getOffset} for ${checkpointTopicAndPartition}" )
92
- fetch(timeAndOffsets )
87
+ fetch(records )
93
88
}
94
89
} else {
95
- timeAndOffsets
90
+ msgIter.close()
91
+ records
96
92
}
97
93
}
98
- msgIter.close()
99
- Checkpoint (fetch(Map .empty[TimeStamp , Long ]))
94
+ Checkpoint (fetch(List .empty[Record ]))
100
95
}
101
96
}
102
97
103
98
override def close (): Unit = {
104
99
producer.close()
100
+ zkClient.close()
105
101
}
106
102
107
103
108
104
private def consume (topicAndPartition : TopicAndPartition ): MessageIterator = {
109
105
val topic = topicAndPartition.topic
110
106
val partition = topicAndPartition.partition
111
- val broker = KafkaUtil .getBroker(config.getZkClient() , topic, partition)
112
- new MessageIterator (broker.host, broker.port, topic, partition, config.getSocketTimeoutMS ,
113
- config.getSocketReceiveBufferSize, config.getFetchMessageMaxBytes, config.getClientId )
107
+ val broker = KafkaUtil .getBroker(zkClient , topic, partition)
108
+ new MessageIterator (broker.host, broker.port, topic, partition, socketTimeout ,
109
+ receiveBufferSize, fetchSize, clientId )
114
110
}
115
111
116
112
private def createTopics (): Unit = {
117
113
checkpointTopicAndPartitions.foreach {
118
114
tp => {
119
- val zkClient = config.getZkClient()
120
115
try {
121
116
val topic = tp.topic
122
- AdminUtils .createTopic(zkClient, topic, 1 , config.getCheckpointReplicas )
117
+ AdminUtils .createTopic(zkClient, topic, 1 , checkpointReplicas )
123
118
topicExists.put(tp, false )
124
119
} catch {
125
120
case tee : TopicExistsException => {
126
121
LOG .info(s " ${tp} already exists " )
127
122
topicExists.put(tp, true )
128
123
}
129
124
case e : Exception => throw e
130
- } finally {
131
- zkClient.close()
132
125
}
133
126
}
134
127
}
@@ -139,7 +132,7 @@ class KafkaCheckpointManager(conf: Configs) extends CheckpointManager {
139
132
}
140
133
141
134
private def getCheckpointTopicAndPartition (source : Source ): TopicAndPartition = {
142
- TopicAndPartition (getCheckpointTopic(config.getCheckpointId , source.name, source.partition), 0 )
135
+ TopicAndPartition (getCheckpointTopic(checkpointId , source.name, source.partition), 0 )
143
136
}
144
137
145
138
}
0 commit comments