Skip to content

Commit 6d4702f

Browse files
committed
revert status manager
1 parent 8c514bc commit 6d4702f

File tree

10 files changed

+47
-132
lines changed

10 files changed

+47
-132
lines changed

core/src/main/scala/org/apache/gearpump/cluster/AppManager.scala

Lines changed: 33 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -21,22 +21,24 @@ package org.apache.gearpump.cluster
2121
import java.util.concurrent.TimeUnit
2222

2323
import akka.actor._
24+
import akka.cluster.Cluster
2425
import akka.contrib.datareplication.Replicator._
2526
import akka.contrib.datareplication.{DataReplication, GSet}
27+
import akka.pattern.ask
2628
import org.apache.gearpump.cluster.AppMasterToMaster._
2729
import org.apache.gearpump.cluster.AppMasterToWorker._
2830
import org.apache.gearpump.cluster.ClientToMaster._
2931
import org.apache.gearpump.cluster.MasterToAppMaster._
3032
import org.apache.gearpump.cluster.MasterToClient.{ReplayApplicationResult, ShutdownApplicationResult, SubmitApplicationResult}
3133
import org.apache.gearpump.cluster.WorkerToAppMaster._
3234
import org.apache.gearpump.cluster.scheduler.{Resource, ResourceRequest}
33-
import org.apache.gearpump.status.{ApplicationStatus, StatusManagerImpl, StatusManager}
3435
import org.apache.gearpump.transport.HostPort
3536
import org.apache.gearpump.util.ActorSystemBooter.{ActorCreated, BindLifeCycle, CreateActor, RegisterActorSystem}
3637
import org.apache.gearpump.util._
3738
import org.slf4j.{Logger, LoggerFactory}
3839

3940
import scala.collection.JavaConverters._
41+
import scala.concurrent.Future
4042
import scala.concurrent.duration.Duration
4143
import scala.util.{Failure, Success}
4244
/**
@@ -46,12 +48,12 @@ import scala.util.{Failure, Success}
4648
/**
4749
* This state will be persisted across the masters.
4850
*/
49-
class ApplicationState(val appId : Int, val attemptId : Int, val state : Any) extends Serializable {
51+
class ApplicationState(val appId : Int, val attemptId : Int, val appMasterClass : Class[_ <: Actor], val app : Application, val state : Any) extends Serializable {
5052

5153
override def equals(other: Any): Boolean = {
5254
if (other.isInstanceOf[ApplicationState]) {
5355
val that = other.asInstanceOf[ApplicationState]
54-
if (appId == that.appId && attemptId == that.attemptId) {
56+
if (appId == that.appId && attemptId == that.attemptId && appMasterClass.equals(that.appMasterClass) && app.equals(that.app)) {
5557
true
5658
} else {
5759
false
@@ -85,11 +87,10 @@ private[cluster] class AppManager() extends Actor with Stash {
8587
private val STATE = "masterstate"
8688
private val TIMEOUT = Duration(5, TimeUnit.SECONDS)
8789
private val replicator = DataReplication(context.system).replicator
90+
implicit val cluster = Cluster(context.system)
8891

8992
//TODO: We can use this state for appmaster HA to recover a new App master
9093
private var state: Set[ApplicationState] = Set.empty[ApplicationState]
91-
//Now the status manager implementation is a work-around, it will be rewritten when the state checkpoint is done
92-
private val statusManager : StatusManager = new StatusManagerImpl
9394

9495
val masterClusterSize = systemconfig.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS).size()
9596

@@ -105,18 +106,19 @@ private[cluster] class AppManager() extends Actor with Stash {
105106

106107
override def postStop: Unit = {
107108
replicator ! Unsubscribe(STATE, self)
108-
statusManager.close()
109109
}
110110

111111
LOG.info("Recoving application state....")
112112
context.become(waitForMasterState)
113113

114114
def waitForMasterState: Receive = {
115115
case GetSuccess(_, replicatedState: GSet, _) =>
116-
state = replicatedState.getValue().asScala.foldLeft(state) { (set, appState) =>
117-
set + appState.asInstanceOf[ApplicationState]
116+
replicatedState.getValue().asScala.foreach{item =>
117+
val appState = item.asInstanceOf[ApplicationState]
118+
if(appState.appId > appId){
119+
appId = appState.appId
120+
}
118121
}
119-
appId = state.map(_.appId).size
120122
LOG.info(s"Successfully recoeved application states for ${state.map(_.appId)}, nextAppId: ${appId}....")
121123
context.become(receiveHandler)
122124
unstashAll()
@@ -144,10 +146,11 @@ private[cluster] class AppManager() extends Actor with Stash {
144146
def clientMsgHandler: Receive = {
145147
case submitApp@SubmitApplication(appMasterClass, config, app) =>
146148
LOG.info(s"AppManager Submiting Application $appId...")
147-
val appWatcher = context.actorOf(Props(classOf[AppMasterStarter], appId, appMasterClass, config.withStartTime(0), app), appId.toString)
149+
val appWatcher = context.actorOf(Props(classOf[AppMasterStarter], appId, appMasterClass, config, app), appId.toString)
148150

149151
LOG.info(s"Persist master state writeQuorum: ${writeQuorum}, timeout: ${TIMEOUT}...")
150-
replicator ! Update(STATE, GSet(), WriteTo(writeQuorum), TIMEOUT)(_ + new ApplicationState(appId, 0, null))
152+
val appState = new ApplicationState(appId, 0, appMasterClass, app, null)
153+
replicator ! Update(STATE, GSet(), WriteTo(writeQuorum), TIMEOUT)(_ + appState)
151154
sender.tell(SubmitApplicationResult(Success(appId)), context.parent)
152155
appId += 1
153156
case ShutdownApplication(appId) =>
@@ -158,8 +161,7 @@ private[cluster] class AppManager() extends Actor with Stash {
158161
val worker = info.worker
159162
LOG.info(s"Shuttdown app master at ${worker.path}, appId: $appId, executorId: $masterExecutorId")
160163
worker ! ShutdownExecutor(appId, masterExecutorId, s"AppMaster $appId shutdown requested by master...")
161-
appMasterRegistry -= appId
162-
statusManager.removeStatus(appId.toString)
164+
//appMasterRegistry -= appId
163165
sender ! ShutdownApplicationResult(Success(appId))
164166
case None =>
165167
val errorMsg = s"Find to find regisration information for appId: $appId"
@@ -219,20 +221,20 @@ private[cluster] class AppManager() extends Actor with Stash {
219221
case None =>
220222
sender ! AppMasterDataDetail(appId = appId, appDescription = null)
221223
}
222-
case updateTimestampTrailingEdge : UpdateTimestampTrailingEdge =>
223-
val appId = updateTimestampTrailingEdge.appID
224-
val startTime = updateTimestampTrailingEdge.timeStamp
224+
case postAppData : PostAppData =>
225+
val appId = postAppData.appId
225226
val (_, info) = appMasterRegistry.getOrElse(appId, (null, null))
226227
Option(info) match {
227228
case a@Some(data) =>
228-
val app = a.get.app
229229
LOG.info(s"update timestamp traingling edge for application $appId")
230-
statusManager.updateStatus(appId.toString, ApplicationStatus(appId, a.get.appMasterClass, app, startTime))
231230
case None =>
232231
LOG.error(s"no match application for app$appId when updating timestamp")
233232
}
234233
}
235234

235+
implicit val timeout = akka.util.Timeout(3, TimeUnit.SECONDS)
236+
import context.dispatcher
237+
236238
def terminationWatch: Receive = {
237239
case terminate: Terminated => {
238240
terminate.getAddressTerminated()
@@ -244,8 +246,15 @@ private[cluster] class AppManager() extends Actor with Stash {
244246
}
245247
if(application.nonEmpty){
246248
val appId = application.get._1
247-
val appStatus = statusManager.getStatus(appId.toString).asInstanceOf[ApplicationStatus]
248-
self ! RecoverApplication(appStatus)
249+
(replicator ? new Get(STATE, ReadFrom(readQuorum), TIMEOUT, None)).asInstanceOf[Future[ReplicatorMessage]].map{
250+
case GetSuccess(_, replicatedState: GSet, _) =>
251+
val appState = replicatedState.getValue().asScala.find(_.asInstanceOf[ApplicationState].appId == appId)
252+
if(appState.nonEmpty){
253+
self ! RecoverApplication(appState.get.asInstanceOf[ApplicationState])
254+
}
255+
case _ =>
256+
LOG.error(s"failed to recover application $appId, can not find application state")
257+
}
249258
}
250259
}
251260
}
@@ -255,14 +264,13 @@ private[cluster] class AppManager() extends Actor with Stash {
255264
val terminatedAppId = applicationStatus.appId
256265
LOG.info(s"AppManager Recovering Application $terminatedAppId...")
257266
val appMasterClass = applicationStatus.appMasterClass
258-
val config = Configs.empty.withStartTime(applicationStatus.startClock)
259-
context.actorOf(Props(classOf[AppMasterStarter], terminatedAppId, appMasterClass, config, applicationStatus.app), terminatedAppId.toString)
267+
context.actorOf(Props(classOf[AppMasterStarter], terminatedAppId, appMasterClass, Configs.empty, applicationStatus.app), terminatedAppId.toString)
260268
}
261269

262-
case class RecoverApplication(applicationStatus : ApplicationStatus)
270+
case class RecoverApplication(applicationStatus : ApplicationState)
263271
}
264272

265-
case class AppMasterInfo(worker : ActorRef, app : Application, appMasterClass : Class[_ <: Actor]) extends AppMasterRegisterData
273+
case class AppMasterInfo(worker : ActorRef) extends AppMasterRegisterData
266274

267275
private[cluster] object AppManager {
268276
private val masterExecutorId = -1
@@ -285,7 +293,7 @@ private[cluster] object AppManager {
285293
case ResourceAllocated(allocations) => {
286294
LOG.info(s"Resource allocated for appMaster $app Id")
287295
val allocation = allocations(0)
288-
val appMasterConfig = appConfig.withAppId(appId).withAppDescription(app).withAppMasterRegisterData(AppMasterInfo(allocation.worker, app, appMasterClass)).withExecutorId(masterExecutorId).withResource(allocation.resource)
296+
val appMasterConfig = appConfig.withAppId(appId).withAppDescription(app).withAppMasterRegisterData(AppMasterInfo(allocation.worker)).withExecutorId(masterExecutorId).withResource(allocation.resource)
289297
LOG.info(s"Try to launch a executor for app Master on ${allocation.worker} for app $appId")
290298
val name = actorNameForExecutor(appId, masterExecutorId)
291299
val selfPath = ActorUtil.getFullPath(context)

core/src/main/scala/org/apache/gearpump/cluster/ClusterMessage.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,7 @@
1919
package org.apache.gearpump.cluster
2020

2121
import akka.actor.{Actor, ActorRef}
22-
import org.apache.gearpump.TimeStamp
23-
import org.apache.gearpump.cluster.scheduler.{ResourceRequest, ResourceAllocation, Resource}
22+
import org.apache.gearpump.cluster.scheduler.{Resource, ResourceAllocation, ResourceRequest}
2423
import org.apache.gearpump.util.Configs
2524

2625
import scala.util.Try
@@ -59,7 +58,8 @@ trait AppMasterRegisterData
5958
object AppMasterToMaster {
6059
case class RegisterAppMaster(appMaster: ActorRef, appId: Int, executorId: Int, resource: Resource, registerData : AppMasterRegisterData)
6160
case class RequestResource(appId: Int, request: ResourceRequest)
62-
case class UpdateTimestampTrailingEdge(appID : Int, timeStamp : TimeStamp)
61+
case class PostAppData(appId: Int, key: String, value: Any)
62+
case class GetAppData(appId: Int, key: String)
6363
}
6464

6565
object MasterToAppMaster {
@@ -73,6 +73,7 @@ object MasterToAppMaster {
7373
case class AppMasterDataDetailRequest(appId: Int)
7474
case class AppMasterDataDetail(val appId: Int, val appDescription: Application)
7575
case class ReplayFromTimestampWindowTrailingEdge(appId: Int)
76+
case class GetAppDataResult(key: String, value: Any)
7677
}
7778

7879
object AppMasterToWorker {

core/src/main/scala/org/apache/gearpump/cluster/Master.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,8 @@ private[cluster] class Master extends Actor with Stash {
8787
case appMasterDataDetailRequest: AppMasterDataDetailRequest =>
8888
LOG.info("Master received AppMasterDataDetailRequest")
8989
appManager forward appMasterDataDetailRequest
90-
case update : UpdateTimestampTrailingEdge =>
91-
appManager forward update
90+
case post : PostAppData =>
91+
appManager forward post
9292
}
9393

9494
def clientMsgHandler : Receive = {

core/src/main/scala/org/apache/gearpump/status/Status.scala

Lines changed: 0 additions & 26 deletions
This file was deleted.

core/src/main/scala/org/apache/gearpump/status/StatusManager.scala

Lines changed: 0 additions & 28 deletions
This file was deleted.

core/src/main/scala/org/apache/gearpump/status/StatusManagerImpl.scala

Lines changed: 0 additions & 41 deletions
This file was deleted.

core/src/main/scala/org/apache/gearpump/util/Configs.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ package org.apache.gearpump.util
2020

2121
import akka.actor.ActorRef
2222
import com.typesafe.config.ConfigFactory
23-
import org.apache.gearpump._
2423
import org.apache.gearpump.cluster.scheduler.Resource
2524
import org.apache.gearpump.cluster.{AppMasterRegisterData, Application}
2625
import org.apache.gearpump.util.Constants._
@@ -75,9 +74,6 @@ class Configs(val config: Map[String, _]) extends Serializable{
7574

7675
def withWorkerId(id : Int) = withValue(WORKER_ID, id)
7776
def workerId : Int = getInt(WORKER_ID)
78-
79-
def withStartTime(time : TimeStamp) = withValue(START_TIME, time)
80-
def startTime = getLong(START_TIME)
8177
}
8278

8379
object Configs {

rest/src/main/scala/org/apache/gearpump/services/Json4sSupport.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ object Json4sSupport extends Json4sJacksonSupport {
5252
{ //from json
5353
case JObject(JField("worker", JString(s)) :: Nil ) =>
5454
//only need to serialize to json
55-
AppMasterInfo(null, null, null)
55+
AppMasterInfo(null)
5656
},
5757
{ //to json
5858
case x:AppMasterInfo =>

streaming/src/main/scala/org/apache/gearpump/streaming/AppMaster.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,8 @@ class AppMaster (config : Configs) extends Actor {
6464
private val taskSet = new TaskSet(config, DAG(appDescription.dag))
6565

6666
private var clockService : ActorRef = null
67-
private var startClock : TimeStamp = config.startTime
67+
private val START_CLOCK = "startClock"
68+
private var startClock : TimeStamp = 0L
6869

6970
private var taskLocations = Map.empty[HostPort, Set[TaskId]]
7071
private var executorIdToTasks = Map.empty[Int, Set[TaskId]]
@@ -291,7 +292,7 @@ class AppMaster (config : Configs) extends Actor {
291292

292293
private def updateStatus : Unit = {
293294
(clockService ? GetLatestMinClock).asInstanceOf[Future[LatestMinClock]].map{clock =>
294-
master ! UpdateTimestampTrailingEdge(appId, clock.clock)
295+
master ! PostAppData(appId, START_CLOCK, clock.clock)
295296
}
296297
}
297298

streaming/src/main/scala/org/apache/gearpump/streaming/ConfigHelper.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import java.util
44

55
import akka.actor.Actor
66
import com.typesafe.config.Config
7+
import org.apache.gearpump._
78
import org.apache.gearpump.streaming.task.TaskId
89
import org.apache.gearpump.util.Constants._
910
import org.apache.gearpump.util.{Configs, Constants}
@@ -14,6 +15,9 @@ class ConfigsHelper(config : Configs) {
1415

1516
def withDag(taskDag : DAG) = config.withValue(TASK_DAG, taskDag)
1617
def dag : DAG = config.getAnyRef(TASK_DAG).asInstanceOf[DAG]
18+
19+
def withStartTime(time : TimeStamp) = config.withValue(START_TIME, time)
20+
def startTime = config.getLong(START_TIME)
1721
}
1822

1923
object ConfigsHelper {

0 commit comments

Comments
 (0)