Skip to content

Commit d20abc8

Browse files
committed
Merge remote-tracking branch 'upstream/master'
2 parents b6b8308 + c13755f commit d20abc8

18 files changed

+406
-291
lines changed

conf/kafka.conf

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@ kafka {
44
topics = ["topic1", "topic2", "topic3"]
55
client.id = "gearpump-app"
66
socket.timeout.ms = 30000
7-
socket.receive.buffer.size = 65536
7+
socket.receive.buffer.bytes = 65536
88
fetch.message.max.bytes = 1048576
99
emit.batch.size = 100
10-
queue.size = 100000000
10+
fetch.threshold = 5000
1111
}
1212

1313
producer {
@@ -24,7 +24,6 @@ kafka {
2424
manager.factory.class = "org.apache.gearpump.streaming.transaction.kafka.KafkaCheckpointManagerFactory"
2525
replicas = 1
2626
commit.interval.ms = 1000 # 1s
27-
filter.class = "org.apache.gearpump.streaming.transaction.kafka.RelaxedTimeFilter"
2827
message.delay.ms = 10
2928
id = 0
3029
}

examples/src/main/scala/org/apache/gearpump/streaming/examples/kafka/KafkaBolt.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ class KafkaBolt(conf: Configs) extends TaskActor(conf) {
6969

7070
private def reportThroughput : Unit = {
7171
val current = System.currentTimeMillis()
72-
LOG.info(s"Task $taskId Throughput: ${((count - lastCount), ((current - lastTime) / 1000))} (messages, second)")
72+
LOG.info(s"Task $taskId; Throughput: ${((count - lastCount), ((current - lastTime) / 1000))} (messages, second)")
7373
lastCount = count
7474
lastTime = current
7575
}

examples/src/main/scala/org/apache/gearpump/streaming/examples/kafka/KafkaSpout.scala

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import kafka.serializer.StringDecoder
2727
import kafka.utils.{Utils, ZkUtils}
2828

2929
import org.apache.gearpump.streaming.ConfigsHelper._
30-
import org.apache.gearpump.streaming.transaction.api.{Checkpoint, OffsetManager}
30+
import org.apache.gearpump.streaming.transaction.api.{RelaxedTimeFilter, OffsetManager}
3131
import org.apache.gearpump.streaming.transaction.kafka.KafkaConfig._
3232
import org.apache.gearpump.streaming.transaction.kafka.KafkaUtil._
3333
import org.apache.gearpump.streaming.transaction.kafka.{KafkaMessage, KafkaSource, KafkaUtil}
@@ -76,18 +76,21 @@ class KafkaSpout(conf: Configs) extends TaskActor(conf) {
7676

7777
private val consumer = config.getConsumer(topicAndPartitions = topicAndPartitions)
7878
private val decoder = new StringDecoder()
79-
private val offsetManager = new OffsetManager(conf)
79+
private val offsetManager = new OffsetManager(
80+
config.getCheckpointManagerFactory.getCheckpointManager[TimeStamp, Long](conf),
81+
new RelaxedTimeFilter(config.getCheckpointMessageDelayMS))
8082
private val commitIntervalMS = config.getCheckpointCommitIntervalMS
8183
private var lastCommitTime = System.currentTimeMillis()
8284

8385
override def onStart(taskContext: TaskContext): Unit = {
8486
offsetManager.register(topicAndPartitions.map(KafkaSource(_)))
8587
offsetManager.start()
86-
offsetManager.loadStartingOffsets(taskContext.startTime).foreach{
88+
offsetManager.loadStartOffsets(taskContext.startTime).foreach{
8789
entry =>
8890
val source = entry._1
8991
val offset = entry._2
9092
val topicAndPartition = TopicAndPartition(source.name, source.partition)
93+
LOG.info(s"set start offsets for ${topicAndPartition}")
9194
consumer.setStartOffset(topicAndPartition, offset)
9295
}
9396
consumer.start()
@@ -99,13 +102,19 @@ class KafkaSpout(conf: Configs) extends TaskActor(conf) {
99102
@annotation.tailrec
100103
def fetchAndEmit(msgNum: Int, tpIndex: Int): Unit = {
101104
if (msgNum < emitBatchSize) {
102-
val kafkaMsg = consumer.nextMessage(topicAndPartitions(tpIndex))
103-
if (kafkaMsg != null) {
104-
val timestamp = System.currentTimeMillis()
105-
output(new Message(decoder.fromBytes(kafkaMsg.msg)))
105+
val msgWithTime = consumer.nextMessageWithTime(topicAndPartitions(tpIndex))
106+
if (msgWithTime != null) {
107+
val kafkaMsg = msgWithTime._1
108+
val timestamp = msgWithTime._2
109+
output(new Message(decoder.fromBytes(kafkaMsg.msg), timestamp))
106110
offsetManager.update(KafkaSource(kafkaMsg.topicAndPartition), timestamp, kafkaMsg.offset)
107111
} else {
108-
LOG.info(s"no more messages from ${topicAndPartitions(tpIndex)}")
112+
LOG.debug(s"no more messages from ${topicAndPartitions(tpIndex)}")
113+
}
114+
if (shouldCommitCheckpoint) {
115+
LOG.info("committing checkpoint...")
116+
offsetManager.checkpoint
117+
lastCommitTime = System.currentTimeMillis()
109118
}
110119
// poll message from each TopicAndPartition in a round-robin way
111120
// TODO: make it configurable
@@ -117,11 +126,6 @@ class KafkaSpout(conf: Configs) extends TaskActor(conf) {
117126
}
118127
}
119128
fetchAndEmit(0, 0)
120-
if (shouldCommitCheckpoint) {
121-
LOG.info("committing checkpoint...")
122-
offsetManager.checkpoint
123-
lastCommitTime = System.currentTimeMillis()
124-
}
125129
self ! Message("continue", Message.noTimeStamp)
126130
}
127131

examples/src/main/scala/org/apache/gearpump/streaming/examples/kafka/Sum.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ class Sum (conf : Configs) extends TaskActor(conf) {
4646
val word = msg.msg.asInstanceOf[String]
4747
val count = current + 1
4848
map.put(word, count)
49-
output(new Message(word -> count.toString(), System.currentTimeMillis()))
49+
output(new Message(s"${msg.timestamp}" -> s"${word}:${count}", System.currentTimeMillis()))
5050
}
5151

5252
override def onStop() : Unit = {
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.gearpump.streaming.transaction.api
20+
21+
trait Source {
22+
def name: String
23+
24+
def partition: Int
25+
}
26+
27+
object Checkpoint {
28+
def apply[K, V](records: List[(K, V)]): Checkpoint[K, V] = new Checkpoint(records)
29+
30+
def empty[K, V]: Checkpoint[K, V] = new Checkpoint(List.empty[(K, V)])
31+
}
32+
33+
class Checkpoint[K, V](val records: List[(K, V)])
34+
35+
trait CheckpointSerDe[K, V] {
36+
def toKeyBytes(key: K): Array[Byte]
37+
def fromKeyBytes(bytes: Array[Byte]): K
38+
39+
def toValueBytes(value: V): Array[Byte]
40+
def fromValueBytes(bytes: Array[Byte]): V
41+
}
42+
43+
trait CheckpointFilter[K, V] {
44+
def filter(records: List[(K, V)], predicate: K): Option[(K, V)]
45+
}

streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/CheckpointManager.scala

Lines changed: 7 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -18,43 +18,22 @@
1818

1919
package org.apache.gearpump.streaming.transaction.api
2020

21-
import org.apache.gearpump.TimeStamp
22-
23-
/**
24-
* a Source consists of its name and partition
25-
*/
26-
trait Source {
27-
def name: String
28-
def partition: Int
29-
}
30-
31-
/**
32-
* a Checkpoint is a map from message timestamps to
33-
* message offsets of the input stream
34-
*/
35-
object Checkpoint {
36-
def apply(timestamp: TimeStamp, offset: Long): Checkpoint =
37-
Checkpoint(Map(timestamp -> offset))
38-
39-
def empty: Checkpoint =
40-
Checkpoint(Map.empty[TimeStamp, Long])
41-
}
42-
case class Checkpoint(timeAndOffsets: Map[TimeStamp, Long])
43-
44-
4521
/**
4622
* CheckpointManager checkpoints message and its timestamp to a persistent system
4723
* such that we could replay messages around or after given time
4824
*/
49-
trait CheckpointManager {
50-
25+
trait CheckpointManager[K, V] {
5126
def start(): Unit
5227

5328
def register(sources: Array[Source]): Unit
5429

55-
def writeCheckpoint(source: Source, checkpoint: Checkpoint): Unit
30+
def writeCheckpoint(source: Source, checkpoint: Checkpoint[K, V],
31+
checkpointSerDe: CheckpointSerDe[K, V]): Unit
32+
33+
def readCheckpoint(source: Source,
34+
checkpointSerDe: CheckpointSerDe[K, V]): Checkpoint[K, V]
5635

57-
def readCheckpoint(source: Source): Checkpoint
36+
def sourceAndCheckpoints(checkpointSerDe: CheckpointSerDe[K, V]): Map[Source, Checkpoint[K, V]]
5837

5938
def close(): Unit
6039
}

streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/CheckpointManagerFactory.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,5 +22,5 @@ import org.apache.gearpump.util.Configs
2222

2323

2424
trait CheckpointManagerFactory {
25-
def getCheckpointManager(conf: Configs): CheckpointManager
25+
def getCheckpointManager[K, V](conf: Configs): CheckpointManager[K, V]
2626
}
Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,21 @@
1616
* limitations under the License.
1717
*/
1818

19-
package org.apache.gearpump.streaming.transaction.kafka
19+
package org.apache.gearpump.streaming.transaction.api
2020

2121
import org.apache.gearpump.TimeStamp
22-
import org.apache.gearpump.streaming.transaction.api.{Checkpoint, CheckpointFilter}
23-
import org.apache.gearpump.streaming.transaction.kafka.KafkaConfig._
24-
import org.apache.gearpump.util.Configs
2522

26-
class RelaxedTimeFilter extends CheckpointFilter {
27-
override def filter(checkpoint: Checkpoint,
28-
timestamp: TimeStamp, conf: Configs): Option[Long] = {
29-
val delta = conf.config.getCheckpointMessageDelayMS
30-
checkpoint.timeAndOffsets.toList.sortBy(_._1).find(_._1 > (timestamp - delta)).map(_._2)
23+
class OffsetFilter extends CheckpointFilter[TimeStamp, Long] {
24+
override def filter(timeAndOffsets: List[(TimeStamp, Long)],
25+
predicate: TimeStamp): Option[(TimeStamp, Long)] = {
26+
timeAndOffsets.sortBy(_._1).find(_._1 >= predicate)
3127
}
3228
}
29+
30+
class RelaxedTimeFilter(delta: Long) extends OffsetFilter {
31+
override def filter(timeAndOffsets: List[(TimeStamp, Long)],
32+
timestamp: TimeStamp): Option[(TimeStamp, Long)] = {
33+
super.filter(timeAndOffsets, timestamp - delta)
34+
}
35+
}
36+

streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/OffsetManager.scala

Lines changed: 45 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -19,60 +19,78 @@
1919
package org.apache.gearpump.streaming.transaction.api
2020

2121
import org.apache.gearpump.TimeStamp
22-
import org.apache.gearpump.streaming.transaction.kafka.KafkaConfig._
23-
import org.apache.gearpump.util.Configs
22+
import org.apache.gearpump.streaming.transaction.kafka.KafkaUtil._
2423
import org.slf4j.{LoggerFactory, Logger}
2524

2625
object OffsetManager {
26+
object OffsetSerDe extends CheckpointSerDe[TimeStamp, Long] {
27+
override def toKeyBytes(timestamp: TimeStamp): Array[Byte] = longToByteArray(timestamp)
28+
29+
override def toValueBytes(offset: Long): Array[Byte] = longToByteArray(offset)
30+
31+
override def fromKeyBytes(bytes: Array[Byte]): TimeStamp = byteArrayToLong(bytes)
32+
33+
override def fromValueBytes(bytes: Array[Byte]): Long = byteArrayToLong(bytes)
34+
}
35+
2736
private val LOG: Logger = LoggerFactory.getLogger(classOf[OffsetManager])
2837
}
29-
class OffsetManager(conf: Configs) {
38+
39+
class OffsetManager(checkpointManager: CheckpointManager[TimeStamp, Long],
40+
filter: OffsetFilter) {
3041
import org.apache.gearpump.streaming.transaction.api.OffsetManager._
3142

32-
private val config = conf.config
33-
private val filter = config.getCheckpointFilter
34-
private val checkpointManager =
35-
config.getCheckpointManagerFactory.getCheckpointManager(conf)
36-
private var sources: Array[Source] = null
3743
private var offsetsByTimeAndSource = Map.empty[(Source, TimeStamp), Long]
3844

3945
def start(): Unit = {
4046
LOG.info("starting offsetManager...")
4147
checkpointManager.start()
4248
}
4349

44-
def register(sources: Array[Source]) = {
45-
this.sources = sources
46-
checkpointManager.register(this.sources)
50+
def register(sources: Array[Source]): Unit = {
51+
checkpointManager.register(sources)
4752
}
4853

49-
def update(source: Source, timestamp: TimeStamp, offset: Long) = {
50-
if (!offsetsByTimeAndSource.contains((source, timestamp))) {
54+
/**
55+
* we only record the smallest offset at a timestamp for a source
56+
* @return whether the new offset is written
57+
*/
58+
def update(source: Source, timestamp: TimeStamp, offset: Long): Boolean = {
59+
if (!offsetsByTimeAndSource.contains((source, timestamp)) ||
60+
offsetsByTimeAndSource.get((source, timestamp)).get > offset) {
5161
offsetsByTimeAndSource += (source, timestamp) -> offset
62+
true
63+
} else {
64+
false
5265
}
5366
}
5467

55-
def checkpoint: Map[Source, Checkpoint] = {
68+
def checkpoint: Map[Source, Checkpoint[TimeStamp, Long]] = {
5669
val checkpointsBySource = offsetsByTimeAndSource
5770
.groupBy(_._1._1)
58-
.mapValues[Checkpoint](values => {
59-
Checkpoint(values.map(entry => (entry._1._2, entry._2)))
60-
})
71+
.map { grouped => {
72+
val source = grouped._1
73+
// TODO: this is not efficient
74+
val records = grouped._2
75+
.map(entry => entry._1._2 -> entry._2)
76+
.toList.sortBy(_._1)
77+
source -> Checkpoint[TimeStamp, Long](records)
78+
}
79+
}
6180
checkpointsBySource.foreach {
62-
sourceAndCheckpoint =>
63-
checkpointManager.writeCheckpoint(sourceAndCheckpoint._1,
64-
sourceAndCheckpoint._2)
81+
sourceAndCheckpoint =>
82+
checkpointManager.writeCheckpoint(sourceAndCheckpoint._1,
83+
sourceAndCheckpoint._2, OffsetSerDe)
6584
}
6685

6786
offsetsByTimeAndSource = Map.empty[(Source, TimeStamp), Long]
6887
checkpointsBySource
6988
}
7089

71-
def loadStartingOffsets(timestamp: TimeStamp): Map[Source, Long] = {
72-
LOG.info("loading start offsets...")
73-
sources.foldLeft(Map.empty[Source, Long]) { (accum, source) =>
74-
filter.filter(checkpointManager.readCheckpoint(source), timestamp, conf) match {
75-
case Some(offset) => accum + (source -> offset)
90+
def loadStartOffsets(timestamp: TimeStamp): Map[Source, Long] = {
91+
checkpointManager.sourceAndCheckpoints(OffsetSerDe).foldLeft(Map.empty[Source, Long]) { (accum, iter) =>
92+
filter.filter(iter._2.records, timestamp) match {
93+
case Some((_, offset)) => accum + (iter._1 -> offset)
7694
case None => accum
7795
}
7896
}
@@ -82,4 +100,6 @@ class OffsetManager(conf: Configs) {
82100
checkpointManager.close()
83101
}
84102

103+
104+
85105
}

streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/CheckpointFilter.scala renamed to streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/TimeExtractor.scala

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,17 @@
1818

1919
package org.apache.gearpump.streaming.transaction.api
2020

21-
import org.apache.gearpump.TimeStamp
22-
import org.apache.gearpump.util.Configs
23-
24-
trait CheckpointFilter {
25-
def filter(checkpoint: Checkpoint, timestamp: TimeStamp,
26-
conf: Configs): Option[Long]
21+
/**
22+
* shamelessly stolen from summingbird
23+
* https://github.com/twitter/summingbird/blob/develop/summingbird-core/src/main/scala/com/twitter/summingbird/TimeExtractor.scala
24+
*/
25+
object TimeExtractor {
26+
def apply[T](fn: T => Long): TimeExtractor[T] =
27+
new TimeExtractor[T] {
28+
override def apply(t: T) = fn(t)
29+
}
2730
}
2831

32+
trait TimeExtractor[T] extends java.io.Serializable {
33+
def apply(t: T): Long
34+
}

0 commit comments

Comments
 (0)