Skip to content

Commit c08bfb9

Browse files
committed
fix ClockService
1 parent 9903380 commit c08bfb9

File tree

2 files changed

+4
-4
lines changed
  • examples/src/main/scala/org/apache/gearpump/streaming/examples/wordcount
  • streaming/src/main/scala/org/apache/gearpump/streaming/task

2 files changed

+4
-4
lines changed

examples/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,6 @@ class Split(conf : Configs) extends TaskActor(conf) {
5454
output(new Message(msg, System.currentTimeMillis()))
5555
}
5656
}
57-
self ! Message("continue")
57+
self ! Message("continue", System.currentTimeMillis())
5858
}
5959
}

streaming/src/main/scala/org/apache/gearpump/streaming/task/ClockService.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ import org.apache.gearpump.streaming.task.ClockService._
4646
override def preStart : Unit = {
4747
dag.tasks.foreach { taskIdWithDescription =>
4848
val (taskGroupId, description) = taskIdWithDescription
49-
val taskClocks = new Array[TimeStamp](description.parallism)
50-
val taskgroupClock = new TaskGroupClock(taskGroupId, 0L, taskClocks)
49+
val taskClocks = new Array[TimeStamp](description.parallism).map(_ => Long.MaxValue)
50+
val taskgroupClock = new TaskGroupClock(taskGroupId, Long.MaxValue, taskClocks)
5151
taskgroupClocks.add(taskgroupClock)
5252
taskgroupLookup.put(taskGroupId, taskgroupClock)
5353
}
@@ -88,7 +88,7 @@ import org.apache.gearpump.streaming.task.ClockService._
8888

8989
object ClockService {
9090

91-
class TaskGroupClock(val taskgroup : TaskGroup, var minClock : TimeStamp = 0L, var taskClocks : Array[TimeStamp] = null) extends Comparable[TaskGroupClock] {
91+
class TaskGroupClock(val taskgroup : TaskGroup, var minClock : TimeStamp = Long.MaxValue, var taskClocks : Array[TimeStamp] = null) extends Comparable[TaskGroupClock] {
9292
override def equals(obj: Any): Boolean = {
9393
if (this.eq(obj.asInstanceOf[AnyRef])) {
9494
return true

0 commit comments

Comments
 (0)