Skip to content

Commit 13fe472

Browse files
committed
add TimeExtractor and change api
1 parent 3245c7f commit 13fe472

File tree

12 files changed

+98
-51
lines changed

12 files changed

+98
-51
lines changed

examples/src/main/scala/org/apache/gearpump/streaming/examples/kafka/KafkaSpout.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -101,10 +101,11 @@ class KafkaSpout(conf: Configs) extends TaskActor(conf) {
101101
@annotation.tailrec
102102
def fetchAndEmit(msgNum: Int, tpIndex: Int): Unit = {
103103
if (msgNum < emitBatchSize) {
104-
val kafkaMsg = consumer.nextMessage(topicAndPartitions(tpIndex))
105-
if (kafkaMsg != null) {
106-
val timestamp = System.currentTimeMillis()
107-
output(new Message(decoder.fromBytes(kafkaMsg.msg)))
104+
val msgWithTime = consumer.nextMessageWithTime(topicAndPartitions(tpIndex))
105+
if (msgWithTime != null) {
106+
val kafkaMsg = msgWithTime._1
107+
val timestamp = msgWithTime._2
108+
output(new Message(decoder.fromBytes(kafkaMsg.msg), timestamp))
108109
offsetManager.update(KafkaSource(kafkaMsg.topicAndPartition), timestamp, kafkaMsg.offset)
109110
} else {
110111
LOG.debug(s"no more messages from ${topicAndPartitions(tpIndex)}")

project/Build.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ object Build extends sbt.Build {
2929
val spraySwaggerVersion = "0.4.3"
3030
val swaggerUiVersion = "2.0.21"
3131

32-
3332
val commonSettings = Defaults.defaultSettings ++ packAutoSettings ++
3433
Seq(
3534
scalaVersion := scalaVersionNumber,
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.gearpump.streaming.transaction.api
20+
21+
trait Source {
22+
def name: String
23+
24+
def partition: Int
25+
}
26+
27+
object Checkpoint {
28+
def apply[K, V](records: List[(K, V)]): Checkpoint[K, V] = new Checkpoint(records)
29+
30+
def empty[K, V]: Checkpoint[K, V] = new Checkpoint(List.empty[(K, V)])
31+
}
32+
33+
class Checkpoint[K, V](val records: List[(K, V)])
34+
35+
trait CheckpointSerDe[K, V] {
36+
def toKeyBytes(key: K): Array[Byte]
37+
def fromKeyBytes(bytes: Array[Byte]): K
38+
39+
def toValueBytes(value: V): Array[Byte]
40+
def fromValueBytes(bytes: Array[Byte]): V
41+
}
42+
43+
trait CheckpointFilter[K, V] {
44+
def filter(records: List[(K, V)], predicate: K): Option[(K, V)]
45+
}

streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/CheckpointManager.scala

Lines changed: 0 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -18,43 +18,11 @@
1818

1919
package org.apache.gearpump.streaming.transaction.api
2020

21-
object CheckpointManager {
22-
23-
trait Source {
24-
def name: String
25-
26-
def partition: Int
27-
}
28-
29-
object Checkpoint {
30-
def apply[K, V](records: List[(K, V)]): Checkpoint[K, V] = new Checkpoint(records)
31-
32-
def empty[K, V]: Checkpoint[K, V] = new Checkpoint(List.empty[(K, V)])
33-
}
34-
35-
class Checkpoint[K, V](val records: List[(K, V)])
36-
37-
trait CheckpointSerDe[K, V] {
38-
def toKeyBytes(key: K): Array[Byte]
39-
def fromKeyBytes(bytes: Array[Byte]): K
40-
41-
def toValueBytes(value: V): Array[Byte]
42-
def fromValueBytes(bytes: Array[Byte]): V
43-
}
44-
45-
trait CheckpointFilter[K, V] {
46-
def filter(records: List[(K, V)], predicate: K): Option[(K, V)]
47-
}
48-
}
49-
50-
5121
/**
5222
* CheckpointManager checkpoints message and its timestamp to a persistent system
5323
* such that we could replay messages around or after given time
5424
*/
5525
trait CheckpointManager[K, V] {
56-
import org.apache.gearpump.streaming.transaction.api.CheckpointManager._
57-
5826
def start(): Unit
5927

6028
def register(sources: Array[Source]): Unit

streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/OffsetFilter.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@
1919
package org.apache.gearpump.streaming.transaction.api
2020

2121
import org.apache.gearpump.TimeStamp
22-
import org.apache.gearpump.streaming.transaction.api.CheckpointManager._
23-
import org.apache.gearpump.util.Configs
2422

2523
class OffsetFilter extends CheckpointFilter[TimeStamp, Long] {
2624
override def filter(timeAndOffsets: List[(TimeStamp, Long)],

streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/OffsetManager.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
package org.apache.gearpump.streaming.transaction.api
2020

2121
import org.apache.gearpump.TimeStamp
22-
import org.apache.gearpump.streaming.transaction.api.CheckpointManager._
2322
import org.apache.gearpump.streaming.transaction.kafka.KafkaUtil._
2423
import org.slf4j.{LoggerFactory, Logger}
2524

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.gearpump.streaming.transaction.api
20+
21+
/**
22+
* shamelessly stolen from summingbird
23+
* https://github.com/twitter/summingbird/blob/develop/summingbird-core/src/main/scala/com/twitter/summingbird/TimeExtractor.scala
24+
*/
25+
object TimeExtractor {
26+
def apply[T](fn: T => Long): TimeExtractor[T] =
27+
new TimeExtractor[T] {
28+
override def apply(t: T) = fn(t)
29+
}
30+
}
31+
32+
trait TimeExtractor[T] extends java.io.Serializable {
33+
def apply(t: T): Long
34+
}

streaming/src/main/scala/org/apache/gearpump/streaming/transaction/kafka/KafkaCheckpointManager.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,7 @@ package org.apache.gearpump.streaming.transaction.kafka
2020

2121
import kafka.admin.AdminUtils
2222
import kafka.common.{TopicExistsException, TopicAndPartition}
23-
import org.apache.gearpump.streaming.transaction.api.CheckpointManager
24-
import org.apache.gearpump.streaming.transaction.api.CheckpointManager._
23+
import org.apache.gearpump.streaming.transaction.api.{CheckpointSerDe, Checkpoint, Source, CheckpointManager}
2524
import org.slf4j.{Logger, LoggerFactory}
2625

2726
import scala.collection.mutable.{Map => MutableMap}

streaming/src/main/scala/org/apache/gearpump/streaming/transaction/kafka/KafkaConfig.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import kafka.serializer.Decoder
2727
import kafka.utils.ZKStringSerializer
2828
import org.I0Itec.zkclient.ZkClient
2929
import org.I0Itec.zkclient.serialize.ZkSerializer
30-
import org.apache.gearpump.streaming.transaction.api.{OffsetFilter, CheckpointManagerFactory}
30+
import org.apache.gearpump.streaming.transaction.api.{TimeExtractor, OffsetFilter, CheckpointManagerFactory}
3131
import org.slf4j.{Logger, LoggerFactory}
3232

3333
import scala.collection.JavaConversions._
@@ -88,9 +88,11 @@ object KafkaConfig {
8888
receiveBufferSize: Int = getSocketReceiveBufferSize,
8989
fetchSize: Int = getFetchMessageMaxBytes,
9090
zkClient: ZkClient = getZkClient(),
91-
queueSize: Int = getConsumerQueueSize): KafkaConsumer = {
91+
queueSize: Int = getConsumerQueueSize,
92+
timeExtractor: TimeExtractor[KafkaMessage]
93+
= TimeExtractor(_ => System.currentTimeMillis())): KafkaConsumer = {
9294
new KafkaConsumer(topicAndPartitions, clientId, socketTimeout,
93-
receiveBufferSize, fetchSize, zkClient, queueSize)
95+
receiveBufferSize, fetchSize, zkClient, queueSize, timeExtractor)
9496
}
9597

9698
def getZookeeperConnect = {

streaming/src/main/scala/org/apache/gearpump/streaming/transaction/kafka/KafkaConsumer.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ package org.apache.gearpump.streaming.transaction.kafka
2121
import kafka.common.TopicAndPartition
2222

2323
import org.I0Itec.zkclient.ZkClient
24+
import org.apache.gearpump.TimeStamp
25+
import org.apache.gearpump.streaming.transaction.api.TimeExtractor
2426
import org.slf4j.{Logger, LoggerFactory}
2527
import java.util.concurrent.LinkedBlockingQueue
2628

@@ -46,7 +48,8 @@ object KafkaConsumer {
4648
class KafkaConsumer(topicAndPartitions: Array[TopicAndPartition],
4749
clientId: String, socketTimeout: Int,
4850
receiveBufferSize: Int, fetchSize: Int,
49-
zkClient: ZkClient, queueSize: Int) {
51+
zkClient: ZkClient, queueSize: Int,
52+
timeExtractor: TimeExtractor[KafkaMessage]) {
5053
import org.apache.gearpump.streaming.transaction.kafka.KafkaConsumer._
5154

5255
private val leaders: Map[TopicAndPartition, Broker] = topicAndPartitions.map {
@@ -69,14 +72,14 @@ class KafkaConsumer(topicAndPartitions: Array[TopicAndPartition],
6972

7073
private var noMessages: Set[TopicAndPartition] = Set.empty[TopicAndPartition]
7174
private val noMessageSleepMS = 100
72-
private val incomingQueue = topicAndPartitions.map(_ -> new LinkedBlockingQueue[KafkaMessage](queueSize)).toMap
75+
private val incomingQueue = topicAndPartitions.map(_ -> new LinkedBlockingQueue[(KafkaMessage, TimeStamp)](queueSize)).toMap
7376

7477
private val fetchThread = new Thread {
7578
override def run(): Unit = {
7679
while (!Thread.currentThread.isInterrupted) {
7780
val msg = fetchMessage()
7881
if (msg != null) {
79-
incomingQueue(msg.topicAndPartition).put(msg)
82+
incomingQueue(msg.topicAndPartition).put(msg, timeExtractor(msg))
8083
} else if (noMessages.size == topicAndPartitions.size) {
8184
LOG.debug(s"no messages for all TopicAndPartitions. sleeping for ${noMessageSleepMS} ms")
8285
Thread.sleep(noMessageSleepMS)
@@ -93,7 +96,7 @@ class KafkaConsumer(topicAndPartitions: Array[TopicAndPartition],
9396
iterators(topicAndPartition).setStartOffset(startOffset)
9497
}
9598

96-
def nextMessage(topicAndPartition: TopicAndPartition): KafkaMessage = {
99+
def nextMessageWithTime(topicAndPartition: TopicAndPartition): (KafkaMessage, TimeStamp) = {
97100
incomingQueue(topicAndPartition).poll()
98101
}
99102

streaming/src/main/scala/org/apache/gearpump/streaming/transaction/kafka/KafkaSource.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818

1919
package org.apache.gearpump.streaming.transaction.kafka
2020

21-
import org.apache.gearpump.streaming.transaction.api.CheckpointManager.Source
2221
import kafka.common.TopicAndPartition
22+
import org.apache.gearpump.streaming.transaction.api.Source
2323

2424
object KafkaSource {
2525
def apply(name: String, partition: Int): KafkaSource = {

streaming/src/main/scala/org/apache/gearpump/streaming/transaction/kafka/KafkaUtil.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
package org.apache.gearpump.streaming.transaction.kafka
2020

21-
import kafka.common.TopicAndPartition
2221
import kafka.utils.ZkUtils
2322
import org.apache.gearpump.streaming.transaction.kafka.KafkaConsumer.Broker
2423
import org.I0Itec.zkclient.ZkClient

0 commit comments

Comments
 (0)