Skip to content

Commit f49e275

Browse files
committed
fix ClockService bug
1 parent 9335b1c commit f49e275

File tree

2 files changed

+4
-4
lines changed

2 files changed

+4
-4
lines changed

core/src/main/scala/org/apache/gearpump/cluster/AppManager.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ private[cluster] class AppManager() extends Actor with Stash {
114114
def waitForMasterState: Receive = {
115115
case GetSuccess(_, replicatedState: GSet, _) =>
116116
state = replicatedState.getValue().asScala.foldLeft(state) { (set, appState) =>
117-
set + appState.asInstanceOf[ApplicationState]
117+
set + appState.asInstanceOf[ApplicationState]
118118
}
119119
appId = state.map(_.appId).size
120120
LOG.info(s"Successfully recoeved application states for ${state.map(_.appId)}, nextAppId: ${appId}....")
@@ -158,7 +158,7 @@ private[cluster] class AppManager() extends Actor with Stash {
158158
case Some(info) =>
159159
val worker = info.worker
160160
LOG.info(s"Shuttdown app master at ${worker.path}, appId: $appId, executorId: $masterExecutorId")
161-
//cleanApplicationData(appId)
161+
cleanApplicationData(appId)
162162
worker ! ShutdownExecutor(appId, masterExecutorId, s"AppMaster $appId shutdown requested by master...")
163163
sender ! ShutdownApplicationResult(Success(appId))
164164
case None =>

streaming/src/main/scala/org/apache/gearpump/streaming/task/ClockService.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ import org.apache.gearpump.streaming.task.ClockService._
4747
dag.tasks.foreach { taskIdWithDescription =>
4848
val (taskGroupId, description) = taskIdWithDescription
4949
val taskClocks = new Array[TimeStamp](description.parallism)
50-
val taskgroupClock = new TaskGroupClock(taskGroupId, 0, taskClocks)
50+
val taskgroupClock = new TaskGroupClock(taskGroupId, Long.MaxValue, taskClocks)
5151
taskgroupClocks.add(taskgroupClock)
5252
taskgroupLookup.put(taskGroupId, taskgroupClock)
5353
}
@@ -88,7 +88,7 @@ import org.apache.gearpump.streaming.task.ClockService._
8888

8989
object ClockService {
9090

91-
class TaskGroupClock(val taskgroup : TaskGroup, var minClock : TimeStamp = 0L, var taskClocks : Array[TimeStamp] = null) extends Comparable[TaskGroupClock] {
91+
class TaskGroupClock(val taskgroup : TaskGroup, var minClock : TimeStamp = Long.MaxValue, var taskClocks : Array[TimeStamp] = null) extends Comparable[TaskGroupClock] {
9292
override def equals(obj: Any): Boolean = {
9393
if (this.eq(obj.asInstanceOf[AnyRef])) {
9494
return true

0 commit comments

Comments
 (0)