@@ -75,6 +75,7 @@ class AppMaster (config : Configs) extends Actor with AppDataStore{
75
75
private var startedTasks = Set .empty[TaskId ]
76
76
private var scheduler : Cancellable = null
77
77
private var needToUpdateStartClock = true
78
+ private val store : AppDataStore = this
78
79
79
80
override def receive : Receive = null
80
81
@@ -101,7 +102,7 @@ class AppMaster (config : Configs) extends Actor with AppDataStore{
101
102
context.watch(master)
102
103
103
104
LOG .info(s " try to recover start clock " )
104
- get(START_CLOCK ).map{clock =>
105
+ store. get(START_CLOCK ).map{clock =>
105
106
if (clock != null ){
106
107
startClock = clock.asInstanceOf [TimeStamp ]
107
108
LOG .info(s " recover start clock sucessfully and the start clock is ${new Date (startClock)}" )
@@ -148,8 +149,6 @@ class AppMaster (config : Configs) extends Actor with AppDataStore{
148
149
taskLocations = taskLocations.empty
149
150
startedTasks = startedTasks.empty
150
151
context.children.foreach(_ ! RestartTasks (startClock))}
151
- case AppDataReceived =>
152
- needToUpdateStartClock = true
153
152
}
154
153
155
154
def executorMsgHandler : Receive = {
@@ -300,11 +299,8 @@ class AppMaster (config : Configs) extends Actor with AppDataStore{
300
299
}
301
300
302
301
private def updateStatus : Unit = {
303
- if (needToUpdateStartClock){
304
- (clockService ? GetLatestMinClock ).asInstanceOf [Future [LatestMinClock ]].map{clock =>
305
- put(START_CLOCK , clock.clock)
306
- }
307
- needToUpdateStartClock = false
302
+ (clockService ? GetLatestMinClock ).asInstanceOf [Future [LatestMinClock ]].map { clock =>
303
+ store.put(START_CLOCK , clock.clock)
308
304
}
309
305
}
310
306
@@ -313,7 +309,10 @@ class AppMaster (config : Configs) extends Actor with AppDataStore{
313
309
}
314
310
315
311
override def put (key : String , value : Any ): Unit = {
316
- master ! SaveAppData (appId, key, value)
312
+ if (needToUpdateStartClock){
313
+ (master ? SaveAppData (appId, key, value)).map(_ => needToUpdateStartClock = true )
314
+ needToUpdateStartClock = false
315
+ }
317
316
}
318
317
319
318
override def get (key : String ): Future [Any ] = {
0 commit comments