Skip to content

Commit b8c01e5

Browse files
committed
refacor interface
1 parent 5f4f293 commit b8c01e5

File tree

1 file changed

+8
-9
lines changed

1 file changed

+8
-9
lines changed

streaming/src/main/scala/org/apache/gearpump/streaming/AppMaster.scala

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ class AppMaster (config : Configs) extends Actor with AppDataStore{
7575
private var startedTasks = Set.empty[TaskId]
7676
private var scheduler : Cancellable = null
7777
private var needToUpdateStartClock = true
78+
private val store : AppDataStore = this
7879

7980
override def receive : Receive = null
8081

@@ -101,7 +102,7 @@ class AppMaster (config : Configs) extends Actor with AppDataStore{
101102
context.watch(master)
102103

103104
LOG.info(s"try to recover start clock")
104-
get(START_CLOCK).map{clock =>
105+
store.get(START_CLOCK).map{clock =>
105106
if(clock != null){
106107
startClock = clock.asInstanceOf[TimeStamp]
107108
LOG.info(s"recover start clock sucessfully and the start clock is ${new Date(startClock)}")
@@ -148,8 +149,6 @@ class AppMaster (config : Configs) extends Actor with AppDataStore{
148149
taskLocations = taskLocations.empty
149150
startedTasks = startedTasks.empty
150151
context.children.foreach(_ ! RestartTasks(startClock))}
151-
case AppDataReceived =>
152-
needToUpdateStartClock = true
153152
}
154153

155154
def executorMsgHandler: Receive = {
@@ -300,11 +299,8 @@ class AppMaster (config : Configs) extends Actor with AppDataStore{
300299
}
301300

302301
private def updateStatus : Unit = {
303-
if(needToUpdateStartClock){
304-
(clockService ? GetLatestMinClock).asInstanceOf[Future[LatestMinClock]].map{clock =>
305-
put(START_CLOCK, clock.clock)
306-
}
307-
needToUpdateStartClock = false
302+
(clockService ? GetLatestMinClock).asInstanceOf[Future[LatestMinClock]].map { clock =>
303+
store.put(START_CLOCK, clock.clock)
308304
}
309305
}
310306

@@ -313,7 +309,10 @@ class AppMaster (config : Configs) extends Actor with AppDataStore{
313309
}
314310

315311
override def put(key: String, value: Any): Unit = {
316-
master ! SaveAppData(appId, key, value)
312+
if(needToUpdateStartClock){
313+
(master ? SaveAppData(appId, key, value)).map(_ => needToUpdateStartClock = true)
314+
needToUpdateStartClock = false
315+
}
317316
}
318317

319318
override def get(key: String): Future[Any] = {

0 commit comments

Comments
 (0)