Skip to content

Commit 8c514bc

Browse files
committed
add AppMaster fault tolerance
1 parent e31ab03 commit 8c514bc

File tree

10 files changed

+168
-26
lines changed

10 files changed

+168
-26
lines changed

core/src/main/scala/org/apache/gearpump/cluster/AppManager.scala

Lines changed: 43 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import org.apache.gearpump.cluster.MasterToAppMaster._
3030
import org.apache.gearpump.cluster.MasterToClient.{ReplayApplicationResult, ShutdownApplicationResult, SubmitApplicationResult}
3131
import org.apache.gearpump.cluster.WorkerToAppMaster._
3232
import org.apache.gearpump.cluster.scheduler.{Resource, ResourceRequest}
33+
import org.apache.gearpump.status.{ApplicationStatus, StatusManagerImpl, StatusManager}
3334
import org.apache.gearpump.transport.HostPort
3435
import org.apache.gearpump.util.ActorSystemBooter.{ActorCreated, BindLifeCycle, CreateActor, RegisterActorSystem}
3536
import org.apache.gearpump.util._
@@ -87,6 +88,8 @@ private[cluster] class AppManager() extends Actor with Stash {
8788

8889
//TODO: We can use this state for appmaster HA to recover a new App master
8990
private var state: Set[ApplicationState] = Set.empty[ApplicationState]
91+
//Now the status manager implementation is a work-around, it will be rewritten when the state checkpoint is done
92+
private val statusManager : StatusManager = new StatusManagerImpl
9093

9194
val masterClusterSize = systemconfig.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS).size()
9295

@@ -102,6 +105,7 @@ private[cluster] class AppManager() extends Actor with Stash {
102105

103106
override def postStop: Unit = {
104107
replicator ! Unsubscribe(STATE, self)
108+
statusManager.close()
105109
}
106110

107111
LOG.info("Recoving application state....")
@@ -129,7 +133,7 @@ private[cluster] class AppManager() extends Actor with Stash {
129133
stash()
130134
}
131135

132-
def receiveHandler = masterHAMsgHandler orElse clientMsgHandler orElse appMasterMessage orElse terminationWatch
136+
def receiveHandler = masterHAMsgHandler orElse clientMsgHandler orElse appMasterMessage orElse terminationWatch orElse selfMsgHandler
133137

134138
def masterHAMsgHandler: Receive = {
135139
case update: UpdateResponse => LOG.info(s"we get update $update")
@@ -140,7 +144,7 @@ private[cluster] class AppManager() extends Actor with Stash {
140144
def clientMsgHandler: Receive = {
141145
case submitApp@SubmitApplication(appMasterClass, config, app) =>
142146
LOG.info(s"AppManager Submiting Application $appId...")
143-
val appWatcher = context.actorOf(Props(classOf[AppMasterStarter], appId, appMasterClass, config, app), appId.toString)
147+
val appWatcher = context.actorOf(Props(classOf[AppMasterStarter], appId, appMasterClass, config.withStartTime(0), app), appId.toString)
144148

145149
LOG.info(s"Persist master state writeQuorum: ${writeQuorum}, timeout: ${TIMEOUT}...")
146150
replicator ! Update(STATE, GSet(), WriteTo(writeQuorum), TIMEOUT)(_ + new ApplicationState(appId, 0, null))
@@ -154,6 +158,8 @@ private[cluster] class AppManager() extends Actor with Stash {
154158
val worker = info.worker
155159
LOG.info(s"Shuttdown app master at ${worker.path}, appId: $appId, executorId: $masterExecutorId")
156160
worker ! ShutdownExecutor(appId, masterExecutorId, s"AppMaster $appId shutdown requested by master...")
161+
appMasterRegistry -= appId
162+
statusManager.removeStatus(appId.toString)
157163
sender ! ShutdownApplicationResult(Success(appId))
158164
case None =>
159165
val errorMsg = s"Find to find regisration information for appId: $appId"
@@ -213,23 +219,50 @@ private[cluster] class AppManager() extends Actor with Stash {
213219
case None =>
214220
sender ! AppMasterDataDetail(appId = appId, appDescription = null)
215221
}
216-
222+
case updateTimestampTrailingEdge : UpdateTimestampTrailingEdge =>
223+
val appId = updateTimestampTrailingEdge.appID
224+
val startTime = updateTimestampTrailingEdge.timeStamp
225+
val (_, info) = appMasterRegistry.getOrElse(appId, (null, null))
226+
Option(info) match {
227+
case a@Some(data) =>
228+
val app = a.get.app
229+
LOG.info(s"update timestamp traingling edge for application $appId")
230+
statusManager.updateStatus(appId.toString, ApplicationStatus(appId, a.get.appMasterClass, app, startTime))
231+
case None =>
232+
LOG.error(s"no match application for app$appId when updating timestamp")
233+
}
217234
}
218235

219236
def terminationWatch: Receive = {
220-
//TODO: fix this
221237
case terminate: Terminated => {
222238
terminate.getAddressTerminated()
223-
//TODO: Check whether this belongs to a app master
224239
LOG.info(s"App Master is terminiated, network down: ${terminate.getAddressTerminated()}")
225-
226-
//TODO: we need to find out whether it is a normal terminaiton, or abnormal.
227-
// The appMaster HA can be implemented by restoring the state from Set[ApplicationState]
240+
//Now we assume that the only normal way to stop the application is submitting a ShutdownApplication request
241+
val application = appMasterRegistry.find{param =>
242+
val (_, (actorRef, _)) = param
243+
actorRef.compareTo(terminate.actor) == 0
244+
}
245+
if(application.nonEmpty){
246+
val appId = application.get._1
247+
val appStatus = statusManager.getStatus(appId.toString).asInstanceOf[ApplicationStatus]
248+
self ! RecoverApplication(appStatus)
249+
}
228250
}
229251
}
252+
253+
def selfMsgHandler: Receive = {
254+
case RecoverApplication(applicationStatus) =>
255+
val terminatedAppId = applicationStatus.appId
256+
LOG.info(s"AppManager Recovering Application $terminatedAppId...")
257+
val appMasterClass = applicationStatus.appMasterClass
258+
val config = Configs.empty.withStartTime(applicationStatus.startClock)
259+
context.actorOf(Props(classOf[AppMasterStarter], terminatedAppId, appMasterClass, config, applicationStatus.app), terminatedAppId.toString)
260+
}
261+
262+
case class RecoverApplication(applicationStatus : ApplicationStatus)
230263
}
231264

232-
case class AppMasterInfo(worker : ActorRef) extends AppMasterRegisterData
265+
case class AppMasterInfo(worker : ActorRef, app : Application, appMasterClass : Class[_ <: Actor]) extends AppMasterRegisterData
233266

234267
private[cluster] object AppManager {
235268
private val masterExecutorId = -1
@@ -252,7 +285,7 @@ private[cluster] object AppManager {
252285
case ResourceAllocated(allocations) => {
253286
LOG.info(s"Resource allocated for appMaster $app Id")
254287
val allocation = allocations(0)
255-
val appMasterConfig = appConfig.withAppId(appId).withAppDescription(app).withAppMasterRegisterData(AppMasterInfo(allocation.worker)).withExecutorId(masterExecutorId).withResource(allocation.resource)
288+
val appMasterConfig = appConfig.withAppId(appId).withAppDescription(app).withAppMasterRegisterData(AppMasterInfo(allocation.worker, app, appMasterClass)).withExecutorId(masterExecutorId).withResource(allocation.resource)
256289
LOG.info(s"Try to launch a executor for app Master on ${allocation.worker} for app $appId")
257290
val name = actorNameForExecutor(appId, masterExecutorId)
258291
val selfPath = ActorUtil.getFullPath(context)

core/src/main/scala/org/apache/gearpump/cluster/ClusterMessage.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.gearpump.cluster
2020

2121
import akka.actor.{Actor, ActorRef}
22+
import org.apache.gearpump.TimeStamp
2223
import org.apache.gearpump.cluster.scheduler.{ResourceRequest, ResourceAllocation, Resource}
2324
import org.apache.gearpump.util.Configs
2425

@@ -58,6 +59,7 @@ trait AppMasterRegisterData
5859
object AppMasterToMaster {
5960
case class RegisterAppMaster(appMaster: ActorRef, appId: Int, executorId: Int, resource: Resource, registerData : AppMasterRegisterData)
6061
case class RequestResource(appId: Int, request: ResourceRequest)
62+
case class UpdateTimestampTrailingEdge(appID : Int, timeStamp : TimeStamp)
6163
}
6264

6365
object MasterToAppMaster {

core/src/main/scala/org/apache/gearpump/cluster/Master.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,8 @@ private[cluster] class Master extends Actor with Stash {
8787
case appMasterDataDetailRequest: AppMasterDataDetailRequest =>
8888
LOG.info("Master received AppMasterDataDetailRequest")
8989
appManager forward appMasterDataDetailRequest
90+
case update : UpdateTimestampTrailingEdge =>
91+
appManager forward update
9092
}
9193

9294
def clientMsgHandler : Receive = {
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.gearpump.status
19+
20+
import akka.actor.Actor
21+
import org.apache.gearpump.TimeStamp
22+
import org.apache.gearpump.cluster.Application
23+
24+
trait Status extends Serializable
25+
26+
case class ApplicationStatus(appId : Int, appMasterClass : Class[_ <: Actor], app : Application, startClock : TimeStamp) extends Status
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.gearpump.status
19+
20+
trait StatusManager {
21+
def updateStatus(key : String, status : Status) : Unit
22+
23+
def getStatus(key : String) : Status
24+
25+
def removeStatus(key : String) : Unit
26+
27+
def close() : Unit
28+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.gearpump.status
19+
20+
class StatusManagerImpl extends StatusManager{
21+
private var keyToStatus = Map.empty[String, Status]
22+
23+
override def updateStatus(key: String, status: Status): Unit = {
24+
keyToStatus += (key -> status)
25+
}
26+
27+
override def getStatus(key: String): Status = {
28+
if(keyToStatus.contains(key)){
29+
keyToStatus.get(key).get
30+
}else {
31+
null
32+
}
33+
}
34+
35+
override def removeStatus(key: String): Unit = {
36+
keyToStatus -= key
37+
}
38+
39+
override def close(): Unit = {
40+
}
41+
}

core/src/main/scala/org/apache/gearpump/util/Configs.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.gearpump.util
2020

2121
import akka.actor.ActorRef
2222
import com.typesafe.config.ConfigFactory
23+
import org.apache.gearpump._
2324
import org.apache.gearpump.cluster.scheduler.Resource
2425
import org.apache.gearpump.cluster.{AppMasterRegisterData, Application}
2526
import org.apache.gearpump.util.Constants._
@@ -74,6 +75,9 @@ class Configs(val config: Map[String, _]) extends Serializable{
7475

7576
def withWorkerId(id : Int) = withValue(WORKER_ID, id)
7677
def workerId : Int = getInt(WORKER_ID)
78+
79+
def withStartTime(time : TimeStamp) = withValue(START_TIME, time)
80+
def startTime = getLong(START_TIME)
7781
}
7882

7983
object Configs {

rest/src/main/scala/org/apache/gearpump/services/Json4sSupport.scala

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,15 @@
1818

1919
package org.apache.gearpump.services
2020

21-
import akka.actor.{Actor, ActorSystem, ActorRef, ActorContext}
21+
import java.util.UUID
22+
23+
import akka.actor.Actor
2224
import org.apache.gearpump.cluster.AppMasterInfo
2325
import org.apache.gearpump.partitioner.Partitioner
24-
import org.apache.gearpump.streaming.task.TaskActor
25-
import org.apache.gearpump.streaming.{TaskDescription, AppDescription}
26+
import org.apache.gearpump.streaming.{AppDescription, TaskDescription}
2627
import org.apache.gearpump.util.{Configs, Graph}
27-
import org.slf4j.{LoggerFactory, Logger}
28-
import spray.httpx.Json4sJacksonSupport
2928
import org.json4s._
30-
import java.util.UUID
31-
32-
import scala.collection.parallel.mutable
33-
import scala.concurrent.ExecutionContext
29+
import spray.httpx.Json4sJacksonSupport
3430

3531
object Json4sSupport extends Json4sJacksonSupport {
3632
implicit def json4sJacksonFormats: Formats = jackson.Serialization.formats(NoTypeHints) + new UUIDFormat + new AppMasterInfoSerializer + new AppDescriptionSerializer
@@ -56,7 +52,7 @@ object Json4sSupport extends Json4sJacksonSupport {
5652
{ //from json
5753
case JObject(JField("worker", JString(s)) :: Nil ) =>
5854
//only need to serialize to json
59-
AppMasterInfo(null)
55+
AppMasterInfo(null, null, null)
6056
},
6157
{ //to json
6258
case x:AppMasterInfo =>

streaming/src/main/scala/org/apache/gearpump/streaming/AppMaster.scala

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import org.apache.gearpump.cluster.AppMasterToWorker._
2929
import org.apache.gearpump.cluster.MasterToAppMaster._
3030
import org.apache.gearpump.cluster.WorkerToAppMaster._
3131
import org.apache.gearpump.cluster._
32-
import org.apache.gearpump.cluster.scheduler.{ResourceRequest, Resource}
32+
import org.apache.gearpump.cluster.scheduler.{Resource, ResourceRequest}
3333
import org.apache.gearpump.streaming.AppMasterToExecutor.{LaunchTask, RecoverTasks, RestartTasks}
3434
import org.apache.gearpump.streaming.ConfigsHelper._
3535
import org.apache.gearpump.streaming.ExecutorToAppMaster._
@@ -64,17 +64,21 @@ class AppMaster (config : Configs) extends Actor {
6464
private val taskSet = new TaskSet(config, DAG(appDescription.dag))
6565

6666
private var clockService : ActorRef = null
67-
private var startClock : TimeStamp = 0L
67+
private var startClock : TimeStamp = config.startTime
6868

6969
private var taskLocations = Map.empty[HostPort, Set[TaskId]]
7070
private var executorIdToTasks = Map.empty[Int, Set[TaskId]]
7171

7272
private var startedTasks = Set.empty[TaskId]
73+
private var scheduler : Cancellable = null
7374

7475
override def receive : Receive = null
7576

7677
override def preStart: Unit = {
7778
LOG.info(s"AppMaster[$appId] is launched $appDescription")
79+
scheduler = context.system.scheduler.schedule(new FiniteDuration(5, TimeUnit.SECONDS),
80+
new FiniteDuration(5, TimeUnit.SECONDS))(updateStatus)
81+
7882
val dag = DAG(appDescription.dag)
7983

8084
LOG.info("AppMaster is launched xxxxxxxxxxxxxxxxx")
@@ -284,6 +288,16 @@ class AppMaster (config : Configs) extends Actor {
284288
}
285289
}
286290
}
291+
292+
private def updateStatus : Unit = {
293+
(clockService ? GetLatestMinClock).asInstanceOf[Future[LatestMinClock]].map{clock =>
294+
master ! UpdateTimestampTrailingEdge(appId, clock.clock)
295+
}
296+
}
297+
298+
override def postStop : Unit = {
299+
scheduler.cancel()
300+
}
287301
}
288302

289303
object AppMaster {

streaming/src/main/scala/org/apache/gearpump/streaming/ConfigHelper.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import java.util
44

55
import akka.actor.Actor
66
import com.typesafe.config.Config
7-
import org.apache.gearpump._
87
import org.apache.gearpump.streaming.task.TaskId
98
import org.apache.gearpump.util.Constants._
109
import org.apache.gearpump.util.{Configs, Constants}
@@ -15,9 +14,6 @@ class ConfigsHelper(config : Configs) {
1514

1615
def withDag(taskDag : DAG) = config.withValue(TASK_DAG, taskDag)
1716
def dag : DAG = config.getAnyRef(TASK_DAG).asInstanceOf[DAG]
18-
19-
def withStartTime(time : TimeStamp) = config.withValue(START_TIME, time)
20-
def startTime = config.getLong(START_TIME)
2117
}
2218

2319
object ConfigsHelper {

0 commit comments

Comments
 (0)