@@ -21,22 +21,24 @@ package org.apache.gearpump.cluster
21
21
import java .util .concurrent .TimeUnit
22
22
23
23
import akka .actor ._
24
+ import akka .cluster .Cluster
24
25
import akka .contrib .datareplication .Replicator ._
25
26
import akka .contrib .datareplication .{DataReplication , GSet }
27
+ import akka .pattern .ask
26
28
import org .apache .gearpump .cluster .AppMasterToMaster ._
27
29
import org .apache .gearpump .cluster .AppMasterToWorker ._
28
30
import org .apache .gearpump .cluster .ClientToMaster ._
29
31
import org .apache .gearpump .cluster .MasterToAppMaster ._
30
32
import org .apache .gearpump .cluster .MasterToClient .{ReplayApplicationResult , ShutdownApplicationResult , SubmitApplicationResult }
31
33
import org .apache .gearpump .cluster .WorkerToAppMaster ._
32
34
import org .apache .gearpump .cluster .scheduler .{Resource , ResourceRequest }
33
- import org .apache .gearpump .status .{ApplicationStatus , StatusManagerImpl , StatusManager }
34
35
import org .apache .gearpump .transport .HostPort
35
36
import org .apache .gearpump .util .ActorSystemBooter .{ActorCreated , BindLifeCycle , CreateActor , RegisterActorSystem }
36
37
import org .apache .gearpump .util ._
37
38
import org .slf4j .{Logger , LoggerFactory }
38
39
39
40
import scala .collection .JavaConverters ._
41
+ import scala .concurrent .Future
40
42
import scala .concurrent .duration .Duration
41
43
import scala .util .{Failure , Success }
42
44
/**
@@ -46,12 +48,12 @@ import scala.util.{Failure, Success}
46
48
/**
47
49
* This state will be persisted across the masters.
48
50
*/
49
- class ApplicationState (val appId : Int , val attemptId : Int , val state : Any ) extends Serializable {
51
+ class ApplicationState (val appId : Int , val attemptId : Int , val appMasterClass : Class [_ <: Actor ], val app : Application , val state : Any ) extends Serializable {
50
52
51
53
override def equals (other : Any ): Boolean = {
52
54
if (other.isInstanceOf [ApplicationState ]) {
53
55
val that = other.asInstanceOf [ApplicationState ]
54
- if (appId == that.appId && attemptId == that.attemptId) {
56
+ if (appId == that.appId && attemptId == that.attemptId && appMasterClass.equals(that.appMasterClass) && app.equals(that.app) ) {
55
57
true
56
58
} else {
57
59
false
@@ -85,11 +87,10 @@ private[cluster] class AppManager() extends Actor with Stash {
85
87
private val STATE = " masterstate"
86
88
private val TIMEOUT = Duration (5 , TimeUnit .SECONDS )
87
89
private val replicator = DataReplication (context.system).replicator
90
+ implicit val cluster = Cluster (context.system)
88
91
89
92
// TODO: We can use this state for appmaster HA to recover a new App master
90
93
private var state : Set [ApplicationState ] = Set .empty[ApplicationState ]
91
- // Now the status manager implementation is a work-around, it will be rewritten when the state checkpoint is done
92
- private val statusManager : StatusManager = new StatusManagerImpl
93
94
94
95
val masterClusterSize = systemconfig.getStringList(Constants .GEARPUMP_CLUSTER_MASTERS ).size()
95
96
@@ -105,18 +106,19 @@ private[cluster] class AppManager() extends Actor with Stash {
105
106
106
107
override def postStop : Unit = {
107
108
replicator ! Unsubscribe (STATE , self)
108
- statusManager.close()
109
109
}
110
110
111
111
LOG .info(" Recoving application state...." )
112
112
context.become(waitForMasterState)
113
113
114
114
def waitForMasterState : Receive = {
115
115
case GetSuccess (_, replicatedState : GSet , _) =>
116
- state = replicatedState.getValue().asScala.foldLeft(state) { (set, appState) =>
117
- set + appState.asInstanceOf [ApplicationState ]
116
+ replicatedState.getValue().asScala.foreach{item =>
117
+ val appState = item.asInstanceOf [ApplicationState ]
118
+ if (appState.appId > appId){
119
+ appId = appState.appId
120
+ }
118
121
}
119
- appId = state.map(_.appId).size
120
122
LOG .info(s " Successfully recoeved application states for ${state.map(_.appId)}, nextAppId: ${appId}.... " )
121
123
context.become(receiveHandler)
122
124
unstashAll()
@@ -144,10 +146,11 @@ private[cluster] class AppManager() extends Actor with Stash {
144
146
def clientMsgHandler : Receive = {
145
147
case submitApp@ SubmitApplication (appMasterClass, config, app) =>
146
148
LOG .info(s " AppManager Submiting Application $appId... " )
147
- val appWatcher = context.actorOf(Props (classOf [AppMasterStarter ], appId, appMasterClass, config.withStartTime( 0 ) , app), appId.toString)
149
+ val appWatcher = context.actorOf(Props (classOf [AppMasterStarter ], appId, appMasterClass, config, app), appId.toString)
148
150
149
151
LOG .info(s " Persist master state writeQuorum: ${writeQuorum}, timeout: ${TIMEOUT }... " )
150
- replicator ! Update (STATE , GSet (), WriteTo (writeQuorum), TIMEOUT )(_ + new ApplicationState (appId, 0 , null ))
152
+ val appState = new ApplicationState (appId, 0 , appMasterClass, app, null )
153
+ replicator ! Update (STATE , GSet (), WriteTo (writeQuorum), TIMEOUT )(_ + appState)
151
154
sender.tell(SubmitApplicationResult (Success (appId)), context.parent)
152
155
appId += 1
153
156
case ShutdownApplication (appId) =>
@@ -158,8 +161,7 @@ private[cluster] class AppManager() extends Actor with Stash {
158
161
val worker = info.worker
159
162
LOG .info(s " Shuttdown app master at ${worker.path}, appId: $appId, executorId: $masterExecutorId" )
160
163
worker ! ShutdownExecutor (appId, masterExecutorId, s " AppMaster $appId shutdown requested by master... " )
161
- appMasterRegistry -= appId
162
- statusManager.removeStatus(appId.toString)
164
+ // appMasterRegistry -= appId
163
165
sender ! ShutdownApplicationResult (Success (appId))
164
166
case None =>
165
167
val errorMsg = s " Find to find regisration information for appId: $appId"
@@ -219,20 +221,20 @@ private[cluster] class AppManager() extends Actor with Stash {
219
221
case None =>
220
222
sender ! AppMasterDataDetail (appId = appId, appDescription = null )
221
223
}
222
- case updateTimestampTrailingEdge : UpdateTimestampTrailingEdge =>
223
- val appId = updateTimestampTrailingEdge.appID
224
- val startTime = updateTimestampTrailingEdge.timeStamp
224
+ case postAppData : PostAppData =>
225
+ val appId = postAppData.appId
225
226
val (_, info) = appMasterRegistry.getOrElse(appId, (null , null ))
226
227
Option (info) match {
227
228
case a@ Some (data) =>
228
- val app = a.get.app
229
229
LOG .info(s " update timestamp traingling edge for application $appId" )
230
- statusManager.updateStatus(appId.toString, ApplicationStatus (appId, a.get.appMasterClass, app, startTime))
231
230
case None =>
232
231
LOG .error(s " no match application for app $appId when updating timestamp " )
233
232
}
234
233
}
235
234
235
+ implicit val timeout = akka.util.Timeout (3 , TimeUnit .SECONDS )
236
+ import context .dispatcher
237
+
236
238
def terminationWatch : Receive = {
237
239
case terminate : Terminated => {
238
240
terminate.getAddressTerminated()
@@ -244,8 +246,15 @@ private[cluster] class AppManager() extends Actor with Stash {
244
246
}
245
247
if (application.nonEmpty){
246
248
val appId = application.get._1
247
- val appStatus = statusManager.getStatus(appId.toString).asInstanceOf [ApplicationStatus ]
248
- self ! RecoverApplication (appStatus)
249
+ (replicator ? new Get (STATE , ReadFrom (readQuorum), TIMEOUT , None )).asInstanceOf [Future [ReplicatorMessage ]].map{
250
+ case GetSuccess (_, replicatedState : GSet , _) =>
251
+ val appState = replicatedState.getValue().asScala.find(_.asInstanceOf [ApplicationState ].appId == appId)
252
+ if (appState.nonEmpty){
253
+ self ! RecoverApplication (appState.get.asInstanceOf [ApplicationState ])
254
+ }
255
+ case _ =>
256
+ LOG .error(s " failed to recover application $appId, can not find application state " )
257
+ }
249
258
}
250
259
}
251
260
}
@@ -255,14 +264,13 @@ private[cluster] class AppManager() extends Actor with Stash {
255
264
val terminatedAppId = applicationStatus.appId
256
265
LOG .info(s " AppManager Recovering Application $terminatedAppId... " )
257
266
val appMasterClass = applicationStatus.appMasterClass
258
- val config = Configs .empty.withStartTime(applicationStatus.startClock)
259
- context.actorOf(Props (classOf [AppMasterStarter ], terminatedAppId, appMasterClass, config, applicationStatus.app), terminatedAppId.toString)
267
+ context.actorOf(Props (classOf [AppMasterStarter ], terminatedAppId, appMasterClass, Configs .empty, applicationStatus.app), terminatedAppId.toString)
260
268
}
261
269
262
- case class RecoverApplication (applicationStatus : ApplicationStatus )
270
+ case class RecoverApplication (applicationStatus : ApplicationState )
263
271
}
264
272
265
- case class AppMasterInfo (worker : ActorRef , app : Application , appMasterClass : Class [_ <: Actor ] ) extends AppMasterRegisterData
273
+ case class AppMasterInfo (worker : ActorRef ) extends AppMasterRegisterData
266
274
267
275
private [cluster] object AppManager {
268
276
private val masterExecutorId = - 1
@@ -285,7 +293,7 @@ private[cluster] object AppManager {
285
293
case ResourceAllocated (allocations) => {
286
294
LOG .info(s " Resource allocated for appMaster $app Id " )
287
295
val allocation = allocations(0 )
288
- val appMasterConfig = appConfig.withAppId(appId).withAppDescription(app).withAppMasterRegisterData(AppMasterInfo (allocation.worker, app, appMasterClass )).withExecutorId(masterExecutorId).withResource(allocation.resource)
296
+ val appMasterConfig = appConfig.withAppId(appId).withAppDescription(app).withAppMasterRegisterData(AppMasterInfo (allocation.worker)).withExecutorId(masterExecutorId).withResource(allocation.resource)
289
297
LOG .info(s " Try to launch a executor for app Master on ${allocation.worker} for app $appId" )
290
298
val name = actorNameForExecutor(appId, masterExecutorId)
291
299
val selfPath = ActorUtil .getFullPath(context)
0 commit comments