Skip to content

Commit f8d1423

Browse files
committed
Merge branch 'add_test' of https://github.com/manuzhang/gearpump into add_test
2 parents b4d297e + 634c665 commit f8d1423

File tree

23 files changed

+366
-243
lines changed

23 files changed

+366
-243
lines changed

conf/kafka.conf

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ kafka {
77
socket.receive.buffer.size = 65536
88
fetch.message.max.bytes = 1048576
99
emit.batch.size = 100
10+
queue.size = 100000000
1011
}
1112

1213
producer {
@@ -25,5 +26,6 @@ kafka {
2526
commit.interval.ms = 1000 # 1s
2627
filter.class = "org.apache.gearpump.streaming.transaction.kafka.RelaxedTimeFilter"
2728
message.delay.ms = 10
29+
id = 0
2830
}
2931
}

core/src/main/scala/org/apache/gearpump/cluster/AppManager.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -252,10 +252,10 @@ private[cluster] object AppManager {
252252
}
253253

254254
def waitForActorSystemToStart(worker : ActorRef, masterConfig : Configs) : Receive = {
255-
case ExecutorLaunchRejected(reason, ex) =>
255+
case ExecutorLaunchRejected(reason, resource, ex) =>
256256
LOG.error(s"Executor Launch failed reason:$reason", ex)
257-
//TODO: ask master to allocate new resources and start appmaster on new node.
258-
context.stop(self)
257+
LOG.info(s"reallocate resource $resource to start appmaster")
258+
master ! RequestResource(appId, ResourceRequest(resource))
259259
case RegisterActorSystem(systemPath) =>
260260
LOG.info(s"Received RegisterActorSystem $systemPath for app master")
261261
//bind lifecycle with worker

core/src/main/scala/org/apache/gearpump/cluster/ClusterMessage.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,6 @@ object AppMasterToWorker {
7777
}
7878

7979
object WorkerToAppMaster {
80-
case class ExecutorLaunchRejected(reason: String = null, ex: Throwable = null)
80+
case class ExecutorLaunchRejected(reason: String = null, resource : Resource, ex: Throwable = null)
8181
}
8282

core/src/main/scala/org/apache/gearpump/cluster/Worker.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ private[cluster] class Worker(masterProxy : ActorRef) extends Actor{
7676
case launch : LaunchExecutor =>
7777
LOG.info(s"Worker[$id] LaunchExecutor ....$launch")
7878
if (resource.lessThan(launch.resource)) {
79-
sender ! ExecutorLaunchRejected("There is no free resource on this machine")
79+
sender ! ExecutorLaunchRejected("There is no free resource on this machine", launch.resource)
8080
} else {
8181
val actorName = actorNameForExecutor(launch.appId, launch.executorId)
8282

core/src/main/scala/org/apache/gearpump/cluster/scheduler/PriorityScheduler.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ class PriorityScheduler extends Scheduler{
5656
val PendingRequest(appMaster, request, timeStamp) = resourceRequests.dequeue()
5757
request.relaxation match {
5858
case ANY =>
59-
val newAllocated = allocateFairly(resourcesSnapShot, PendingRequest(appMaster, request, timeStamp), allocated)
59+
val newAllocated = allocateFairly(resourcesSnapShot, PendingRequest(appMaster, request, timeStamp))
6060
allocated = allocated.add(newAllocated)
6161
case ONEWORKER =>
6262
val availableResource = resourcesSnapShot.find{params =>
@@ -95,7 +95,7 @@ class PriorityScheduler extends Scheduler{
9595
allocateResource()
9696
}
9797

98-
private def allocateFairly(resources : mutable.HashMap[Int, (ActorRef, Resource)], pendindRequest : PendingRequest, allocated : Resource): Resource ={
98+
private def allocateFairly(resources : mutable.HashMap[Int, (ActorRef, Resource)], pendindRequest : PendingRequest): Resource ={
9999
val length = resources.size
100100
val flattenResource = resources.toArray.zipWithIndex.flatMap((workerWithIndex) => {
101101
val ((workerId, (worker, resource)), index) = workerWithIndex
@@ -104,8 +104,8 @@ class PriorityScheduler extends Scheduler{
104104
val PendingRequest(appMaster, request, timeStamp) = pendindRequest
105105
val total = Resource(flattenResource.size)
106106

107-
val newAllocated = Resource.min(total.subtract(allocated), request.resource)
108-
val singleAllocation = flattenResource.slice(allocated.slots, allocated.add(newAllocated).slots)
107+
val newAllocated = Resource.min(total, request.resource)
108+
val singleAllocation = flattenResource.take(newAllocated.slots)
109109
.groupBy((actor) => actor).mapValues(_.length).toArray.map((params) => {
110110
val ((workerId, worker), slots) = params
111111
resources.update(workerId, (worker, resources.get(workerId).get._2.subtract(Resource(slots))))

examples/src/main/scala/org/apache/gearpump/streaming/examples/kafka/KafkaBolt.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ class KafkaBolt(conf: Configs) extends TaskActor(conf) {
6969

7070
private def reportThroughput : Unit = {
7171
val current = System.currentTimeMillis()
72-
LOG.info(s"Task $taskId Throughput: ${((count - lastCount), ((current - lastTime) / 1000))} (messages, second)")
72+
LOG.info(s"Task $taskId; Actor ${self}; Throughput: ${((count - lastCount), ((current - lastTime) / 1000))} (messages, second)")
7373
lastCount = count
7474
lastTime = current
7575
}

examples/src/main/scala/org/apache/gearpump/streaming/examples/kafka/KafkaDefaultGrouper.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,10 @@ import kafka.common.TopicAndPartition
3131
* spout1 gets (topicA, partition2), (topicB, partition2)
3232
*/
3333
class KafkaDefaultGrouper {
34-
def group(topicAndPartitions: List[TopicAndPartition],
35-
taskNum: Int, taskId: TaskId): List[TopicAndPartition] = {
34+
def group(topicAndPartitions: Array[TopicAndPartition],
35+
taskNum: Int, taskId: TaskId): Array[TopicAndPartition] = {
3636
val taskToTopicAndPartitions = topicAndPartitions.groupBy(tp => tp.partition % taskNum).map(params =>
37-
(TaskId(taskId.groupId, params._1), params._2.toList)
37+
(TaskId(taskId.groupId, params._1), params._2)
3838
)
3939
taskToTopicAndPartitions(taskId)
4040
}

examples/src/main/scala/org/apache/gearpump/streaming/examples/kafka/KafkaSpout.scala

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,9 @@ class KafkaSpout(conf: Configs) extends TaskActor(conf) {
6464
private val grouper = new KafkaDefaultGrouper
6565
private val emitBatchSize = config.getConsumerEmitBatchSize
6666

67-
private val topicAndPartitions: List[TopicAndPartition] = {
67+
private val topicAndPartitions: Array[TopicAndPartition] = {
6868
val original = ZkUtils.getPartitionsForTopics(config.getZkClient(), config.getConsumerTopics)
69-
.flatMap(tps => { tps._2.map(TopicAndPartition(tps._1, _)) }).toList
69+
.flatMap(tps => { tps._2.map(TopicAndPartition(tps._1, _)) }).toArray
7070
val grouped = grouper.group(original,
7171
conf.dag.tasks(taskId.groupId).parallism, taskId)
7272
grouped.foreach(tp =>
@@ -76,7 +76,8 @@ class KafkaSpout(conf: Configs) extends TaskActor(conf) {
7676

7777
private val consumer = config.getConsumer(topicAndPartitions = topicAndPartitions)
7878
private val decoder = new StringDecoder()
79-
private val offsetManager = new OffsetManager(conf)
79+
private val offsetManager = new OffsetManager(
80+
config.getCheckpointManagerFactory.getCheckpointManager(conf), config.getCheckpointFilter)
8081
private val commitIntervalMS = config.getCheckpointCommitIntervalMS
8182
private var lastCommitTime = System.currentTimeMillis()
8283

@@ -90,23 +91,33 @@ class KafkaSpout(conf: Configs) extends TaskActor(conf) {
9091
val topicAndPartition = TopicAndPartition(source.name, source.partition)
9192
consumer.setStartOffset(topicAndPartition, offset)
9293
}
94+
consumer.start()
9395
self ! Message("start", System.currentTimeMillis())
9496
}
9597

9698
override def onNext(msg: Message): Unit = {
99+
97100
@annotation.tailrec
98-
def emit(msgNum: Int): Unit = {
101+
def fetchAndEmit(msgNum: Int, tpIndex: Int): Unit = {
99102
if (msgNum < emitBatchSize) {
100-
val kafkaMsg = consumer.nextMessage()
103+
val kafkaMsg = consumer.nextMessage(topicAndPartitions(tpIndex))
101104
if (kafkaMsg != null) {
102105
val timestamp = System.currentTimeMillis()
103106
output(new Message(decoder.fromBytes(kafkaMsg.msg)))
104107
offsetManager.update(KafkaSource(kafkaMsg.topicAndPartition), timestamp, kafkaMsg.offset)
108+
} else {
109+
LOG.debug(s"no more messages from ${topicAndPartitions(tpIndex)}")
110+
}
111+
// poll message from each TopicAndPartition in a round-robin way
112+
// TODO: make it configurable
113+
if (tpIndex + 1 == topicAndPartitions.size) {
114+
fetchAndEmit(msgNum + 1, 0)
115+
} else {
116+
fetchAndEmit(msgNum + 1, tpIndex + 1)
105117
}
106-
emit(msgNum + 1)
107118
}
108119
}
109-
emit(0)
120+
fetchAndEmit(0, 0)
110121
if (shouldCommitCheckpoint) {
111122
LOG.info("committing checkpoint...")
112123
offsetManager.checkpoint

examples/src/main/scala/org/apache/gearpump/streaming/examples/kafka/Sum.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ class Sum (conf : Configs) extends TaskActor(conf) {
4646
val word = msg.msg.asInstanceOf[String]
4747
val count = current + 1
4848
map.put(word, count)
49-
output(new Message(word -> count.toString(), System.currentTimeMillis()))
49+
output(new Message(s"${msg.timestamp}" -> s"${word}:${count}", System.currentTimeMillis()))
5050
}
5151

5252
override def onStop() : Unit = {

project/Build.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ object Build extends sbt.Build {
2929
val spraySwaggerVersion = "0.4.3"
3030
val swaggerUiVersion = "2.0.21"
3131
val specs2Version = "1.13"
32+
val mockitoVersion = "1.9.5"
3233

3334
val commonSettings = Defaults.defaultSettings ++ packAutoSettings ++
3435
Seq(
@@ -97,7 +98,8 @@ object Build extends sbt.Build {
9798
Seq(
9899
libraryDependencies ++= Seq(
99100
"org.apache.kafka" %% "kafka" % kafkaVersion,
100-
"org.specs2" %% "specs2" % specs2Version % "test"
101+
"org.specs2" %% "specs2" % specs2Version % "test",
102+
"org.mockito" % "mockito-core" % mockitoVersion % "test"
101103
)
102104
)
103105
) dependsOn(core)

streaming/src/main/scala/org/apache/gearpump/streaming/AppMaster.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import org.apache.gearpump.cluster.AppMasterToWorker._
2929
import org.apache.gearpump.cluster.MasterToAppMaster._
3030
import org.apache.gearpump.cluster.WorkerToAppMaster._
3131
import org.apache.gearpump.cluster._
32-
import org.apache.gearpump.cluster.scheduler.Resource
32+
import org.apache.gearpump.cluster.scheduler.{ResourceRequest, Resource}
3333
import org.apache.gearpump.streaming.AppMasterToExecutor.{LaunchTask, RecoverTasks, RestartTasks}
3434
import org.apache.gearpump.streaming.ConfigsHelper._
3535
import org.apache.gearpump.streaming.ExecutorToAppMaster._
@@ -212,8 +212,10 @@ class AppMaster (config : Configs) extends Actor {
212212
}
213213
launchTask(resource)
214214
}
215-
case ExecutorLaunchRejected(reason, ex) => {
215+
case ExecutorLaunchRejected(reason, resource, ex) => {
216216
LOG.error(s"Executor Launch failed reason:$reason", ex)
217+
LOG.info(s"reallocate resource $resource to start appmaster")
218+
master ! RequestResource(appId, ResourceRequest(resource))
217219
}
218220
}
219221

streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/CheckpointFilter.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,12 @@
1919
package org.apache.gearpump.streaming.transaction.api
2020

2121
import org.apache.gearpump.TimeStamp
22+
import org.apache.gearpump.streaming.transaction.api.CheckpointManager._
2223
import org.apache.gearpump.util.Configs
2324

24-
trait CheckpointFilter {
25-
def filter(checkpoint: Checkpoint, timestamp: TimeStamp,
26-
conf: Configs): Option[Long]
25+
class CheckpointFilter(conf: Configs) {
26+
def filter(records: List[Record], timestamp: TimeStamp): Option[Record] = {
27+
timeAndOffsets.sortBy(_._1).find(_._1 > timestamp)
28+
}
2729
}
2830

streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/CheckpointManager.scala

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,36 +18,39 @@
1818

1919
package org.apache.gearpump.streaming.transaction.api
2020

21-
import org.apache.gearpump.TimeStamp
21+
object CheckpointManager {
2222

23-
/**
24-
* a Source consists of its name and partition
25-
*/
26-
trait Source {
27-
def name: String
28-
def partition: Int
29-
}
23+
trait Source {
24+
def name: String
3025

31-
/**
32-
* a Checkpoint is a map from message timestamps to
33-
* message offsets of the input stream
34-
*/
35-
object Checkpoint {
36-
def apply(timestamp: TimeStamp, offset: Long): Checkpoint =
37-
Checkpoint(Map(timestamp -> offset))
26+
def partition: Int
27+
}
28+
29+
type Record = (Array[Byte], Array[Byte])
30+
31+
object Checkpoint {
32+
def apply(key: Array[Byte], payload: Array[Byte]): Checkpoint =
33+
Checkpoint(List((key, payload)))
34+
35+
def empty: Checkpoint =
36+
Checkpoint(List.empty[Record])
37+
38+
}
39+
40+
case class Checkpoint(records: List[Record])
3841
}
39-
case class Checkpoint(timeAndOffsets: Map[TimeStamp, Long])
4042

4143

4244
/**
4345
* CheckpointManager checkpoints message and its timestamp to a persistent system
4446
* such that we could replay messages around or after given time
4547
*/
4648
trait CheckpointManager {
49+
import org.apache.gearpump.streaming.transaction.api.CheckpointManager._
4750

4851
def start(): Unit
4952

50-
def register(sources: List[Source]): Unit
53+
def register(sources: Array[Source]): Unit
5154

5255
def writeCheckpoint(source: Source, checkpoint: Checkpoint): Unit
5356

streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/OffsetManager.scala

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,30 +19,27 @@
1919
package org.apache.gearpump.streaming.transaction.api
2020

2121
import org.apache.gearpump.TimeStamp
22-
import org.apache.gearpump.streaming.transaction.api.Source
23-
import org.apache.gearpump.streaming.transaction.kafka.KafkaConfig._
24-
import org.apache.gearpump.util.Configs
22+
import org.apache.gearpump.streaming.transaction.api.CheckpointManager._
23+
import org.apache.gearpump.streaming.transaction.kafka.KafkaUtil._
2524
import org.slf4j.{LoggerFactory, Logger}
2625

2726
object OffsetManager {
2827
private val LOG: Logger = LoggerFactory.getLogger(classOf[OffsetManager])
2928
}
30-
class OffsetManager(conf: Configs) {
29+
30+
class OffsetManager(checkpointManager: CheckpointManager,
31+
filter: CheckpointFilter) {
3132
import org.apache.gearpump.streaming.transaction.api.OffsetManager._
3233

33-
private val config = conf.config
34-
private val filter = config.getCheckpointFilter
35-
private val checkpointManager =
36-
config.getCheckpointManagerFactory.getCheckpointManager(conf)
37-
private var sources: List[Source] = null
34+
private var sources: Array[Source] = null
3835
private var offsetsByTimeAndSource = Map.empty[(Source, TimeStamp), Long]
3936

4037
def start(): Unit = {
4138
LOG.info("starting offsetManager...")
4239
checkpointManager.start()
4340
}
4441

45-
def register(sources: List[Source]) = {
42+
def register(sources: Array[Source]) = {
4643
this.sources = sources
4744
checkpointManager.register(this.sources)
4845
}
@@ -56,12 +53,20 @@ class OffsetManager(conf: Configs) {
5653
def checkpoint: Map[Source, Checkpoint] = {
5754
val checkpointsBySource = offsetsByTimeAndSource
5855
.groupBy(_._1._1)
59-
.mapValues[Checkpoint](values => {
60-
Checkpoint(values.map(entry => (entry._1._2, entry._2)))
61-
})
56+
.map { grouped => {
57+
val source = grouped._1
58+
val records = grouped._2.map {
59+
entry =>
60+
val timestamp = entry._1._2
61+
val offset = entry._2
62+
(longToByteArray(timestamp), longToByteArray(offset))
63+
}.toList
64+
source -> Checkpoint(records)
65+
}
66+
}
6267
checkpointsBySource.foreach {
63-
sourceAndCheckpoint =>
64-
checkpointManager.writeCheckpoint(sourceAndCheckpoint._1,
68+
sourceAndCheckpoint =>
69+
checkpointManager.writeCheckpoint(sourceAndCheckpoint._1,
6570
sourceAndCheckpoint._2)
6671
}
6772

@@ -72,8 +77,8 @@ class OffsetManager(conf: Configs) {
7277
def loadStartingOffsets(timestamp: TimeStamp): Map[Source, Long] = {
7378
LOG.info("loading start offsets...")
7479
sources.foldLeft(Map.empty[Source, Long]) { (accum, source) =>
75-
filter.filter(checkpointManager.readCheckpoint(source), timestamp, conf) match {
76-
case Some(offset) => accum + (source -> offset)
80+
filter.filter(checkpointManager.readCheckpoint(source).timeAndOffsets.toList, timestamp) match {
81+
case Some((_, offset)) => accum + (source -> offset)
7782
case None => accum
7883
}
7984
}

0 commit comments

Comments
 (0)