@@ -30,6 +30,7 @@ import org.apache.gearpump.cluster.MasterToAppMaster._
30
30
import org .apache .gearpump .cluster .MasterToClient .{ReplayApplicationResult , ShutdownApplicationResult , SubmitApplicationResult }
31
31
import org .apache .gearpump .cluster .WorkerToAppMaster ._
32
32
import org .apache .gearpump .cluster .scheduler .{Resource , ResourceRequest }
33
+ import org .apache .gearpump .status .{ApplicationStatus , StatusManagerImpl , StatusManager }
33
34
import org .apache .gearpump .transport .HostPort
34
35
import org .apache .gearpump .util .ActorSystemBooter .{ActorCreated , BindLifeCycle , CreateActor , RegisterActorSystem }
35
36
import org .apache .gearpump .util ._
@@ -87,6 +88,8 @@ private[cluster] class AppManager() extends Actor with Stash {
87
88
88
89
// TODO: We can use this state for appmaster HA to recover a new App master
89
90
private var state : Set [ApplicationState ] = Set .empty[ApplicationState ]
91
+ // Now the status manager implementation is a work-around, it will be rewritten when the state checkpoint is done
92
+ private val statusManager : StatusManager = new StatusManagerImpl
90
93
91
94
val masterClusterSize = systemconfig.getStringList(Constants .GEARPUMP_CLUSTER_MASTERS ).size()
92
95
@@ -102,6 +105,7 @@ private[cluster] class AppManager() extends Actor with Stash {
102
105
103
106
override def postStop : Unit = {
104
107
replicator ! Unsubscribe (STATE , self)
108
+ statusManager.close()
105
109
}
106
110
107
111
LOG .info(" Recoving application state...." )
@@ -129,7 +133,7 @@ private[cluster] class AppManager() extends Actor with Stash {
129
133
stash()
130
134
}
131
135
132
- def receiveHandler = masterHAMsgHandler orElse clientMsgHandler orElse appMasterMessage orElse terminationWatch
136
+ def receiveHandler = masterHAMsgHandler orElse clientMsgHandler orElse appMasterMessage orElse terminationWatch orElse selfMsgHandler
133
137
134
138
def masterHAMsgHandler : Receive = {
135
139
case update : UpdateResponse => LOG .info(s " we get update $update" )
@@ -140,7 +144,7 @@ private[cluster] class AppManager() extends Actor with Stash {
140
144
def clientMsgHandler : Receive = {
141
145
case submitApp@ SubmitApplication (appMasterClass, config, app) =>
142
146
LOG .info(s " AppManager Submiting Application $appId... " )
143
- val appWatcher = context.actorOf(Props (classOf [AppMasterStarter ], appId, appMasterClass, config, app), appId.toString)
147
+ val appWatcher = context.actorOf(Props (classOf [AppMasterStarter ], appId, appMasterClass, config.withStartTime( 0 ) , app), appId.toString)
144
148
145
149
LOG .info(s " Persist master state writeQuorum: ${writeQuorum}, timeout: ${TIMEOUT }... " )
146
150
replicator ! Update (STATE , GSet (), WriteTo (writeQuorum), TIMEOUT )(_ + new ApplicationState (appId, 0 , null ))
@@ -154,6 +158,8 @@ private[cluster] class AppManager() extends Actor with Stash {
154
158
val worker = info.worker
155
159
LOG .info(s " Shuttdown app master at ${worker.path}, appId: $appId, executorId: $masterExecutorId" )
156
160
worker ! ShutdownExecutor (appId, masterExecutorId, s " AppMaster $appId shutdown requested by master... " )
161
+ appMasterRegistry -= appId
162
+ statusManager.removeStatus(appId.toString)
157
163
sender ! ShutdownApplicationResult (Success (appId))
158
164
case None =>
159
165
val errorMsg = s " Find to find regisration information for appId: $appId"
@@ -213,23 +219,50 @@ private[cluster] class AppManager() extends Actor with Stash {
213
219
case None =>
214
220
sender ! AppMasterDataDetail (appId = appId, appDescription = null )
215
221
}
216
-
222
+ case updateTimestampTrailingEdge : UpdateTimestampTrailingEdge =>
223
+ val appId = updateTimestampTrailingEdge.appID
224
+ val startTime = updateTimestampTrailingEdge.timeStamp
225
+ val (_, info) = appMasterRegistry.getOrElse(appId, (null , null ))
226
+ Option (info) match {
227
+ case a@ Some (data) =>
228
+ val app = a.get.app
229
+ LOG .info(s " update timestamp traingling edge for application $appId" )
230
+ statusManager.updateStatus(appId.toString, ApplicationStatus (appId, a.get.appMasterClass, app, startTime))
231
+ case None =>
232
+ LOG .error(s " no match application for app $appId when updating timestamp " )
233
+ }
217
234
}
218
235
219
236
def terminationWatch : Receive = {
220
- // TODO: fix this
221
237
case terminate : Terminated => {
222
238
terminate.getAddressTerminated()
223
- // TODO: Check whether this belongs to a app master
224
239
LOG .info(s " App Master is terminiated, network down: ${terminate.getAddressTerminated()}" )
225
-
226
- // TODO: we need to find out whether it is a normal terminaiton, or abnormal.
227
- // The appMaster HA can be implemented by restoring the state from Set[ApplicationState]
240
+ // Now we assume that the only normal way to stop the application is submitting a ShutdownApplication request
241
+ val application = appMasterRegistry.find{param =>
242
+ val (_, (actorRef, _)) = param
243
+ actorRef.compareTo(terminate.actor) == 0
244
+ }
245
+ if (application.nonEmpty){
246
+ val appId = application.get._1
247
+ val appStatus = statusManager.getStatus(appId.toString).asInstanceOf [ApplicationStatus ]
248
+ self ! RecoverApplication (appStatus)
249
+ }
228
250
}
229
251
}
252
+
253
+ def selfMsgHandler : Receive = {
254
+ case RecoverApplication (applicationStatus) =>
255
+ val terminatedAppId = applicationStatus.appId
256
+ LOG .info(s " AppManager Recovering Application $terminatedAppId... " )
257
+ val appMasterClass = applicationStatus.appMasterClass
258
+ val config = Configs .empty.withStartTime(applicationStatus.startClock)
259
+ context.actorOf(Props (classOf [AppMasterStarter ], terminatedAppId, appMasterClass, config, applicationStatus.app), terminatedAppId.toString)
260
+ }
261
+
262
+ case class RecoverApplication (applicationStatus : ApplicationStatus )
230
263
}
231
264
232
- case class AppMasterInfo (worker : ActorRef ) extends AppMasterRegisterData
265
+ case class AppMasterInfo (worker : ActorRef , app : Application , appMasterClass : Class [_ <: Actor ] ) extends AppMasterRegisterData
233
266
234
267
private [cluster] object AppManager {
235
268
private val masterExecutorId = - 1
@@ -252,7 +285,7 @@ private[cluster] object AppManager {
252
285
case ResourceAllocated (allocations) => {
253
286
LOG .info(s " Resource allocated for appMaster $app Id " )
254
287
val allocation = allocations(0 )
255
- val appMasterConfig = appConfig.withAppId(appId).withAppDescription(app).withAppMasterRegisterData(AppMasterInfo (allocation.worker)).withExecutorId(masterExecutorId).withResource(allocation.resource)
288
+ val appMasterConfig = appConfig.withAppId(appId).withAppDescription(app).withAppMasterRegisterData(AppMasterInfo (allocation.worker, app, appMasterClass )).withExecutorId(masterExecutorId).withResource(allocation.resource)
256
289
LOG .info(s " Try to launch a executor for app Master on ${allocation.worker} for app $appId" )
257
290
val name = actorNameForExecutor(appId, masterExecutorId)
258
291
val selfPath = ActorUtil .getFullPath(context)
0 commit comments