Skip to content

Commit 0c488f2

Browse files
committed
Merge pull request gearpump#507 from clockfly/fix_#504
fix gearpump#504, 1) AppMaster return more detailed application runtime informat...
2 parents 4e579d2 + 85ac2b3 commit 0c488f2

File tree

14 files changed

+211
-61
lines changed

14 files changed

+211
-61
lines changed

core/src/main/scala/org/apache/gearpump/cluster/Application.scala

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,7 @@ import com.typesafe.config.Config
3535
* use ClusterConfigSource(filePath) to construct the object, while filePath points to the .conf file.
3636
*/
3737

38-
final class Application(val name : String, val appMaster : String, val userConfig: UserConfig, val clusterConfig: ClusterConfigSource = null) extends Serializable
39-
40-
object Application {
41-
def apply(name : String, appMaster : String, conf: UserConfig, appMasterClusterConfig: ClusterConfigSource = null) : Application = new Application(name, appMaster, conf, appMasterClusterConfig)
42-
}
38+
case class Application(val name : String, val appMaster : String, val userConfig: UserConfig, val clusterConfig: ClusterConfigSource = null)
4339

4440
/**
4541
* Used for verification. All AppMaster must extend this interface
@@ -55,7 +51,7 @@ abstract class ApplicationMaster extends Actor
5551
* request more resource from Master.
5652
* @param appJar: application Jar. If the jar is already in classpath, then it can be None.
5753
* @param masterProxy: The proxy to master actor, it will bridge the messages between appmaster and master
58-
* @param registerData: The AppMaster are required to register this data back to Master by [[org.apache.gearpump.cluster.AppMasterToMaster.RegisterAppMaster]]
54+
* @param registerData: The AppMaster are required to register this data back to Master by RegisterAppMaster
5955
*
6056
*/
6157
case class AppMasterContext(appId : Int, username : String,
@@ -93,7 +89,7 @@ case class ExecutorContext(executorId : Int, workerId: Int, appId : Int,
9389
* @param arguments Executor command line arguments
9490
* @param jar application jar
9591
* @param executorAkkaConfig Akka config used to initialize the actor system of this executor. It will
96-
* use [[org.apache.gearpump.util.Constants.GEARPUMP_CUSTOM_CONFIG_FILE]] to pass the config to executor
92+
* use org.apache.gearpump.util.Constants.GEARPUMP_CUSTOM_CONFIG_FILE to pass the config to executor
9793
* process
9894
*
9995
*/

core/src/main/scala/org/apache/gearpump/cluster/ClusterMessage.scala

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.gearpump.cluster
2020

2121
import akka.actor.ActorRef
22+
import org.apache.gearpump.TimeStamp
2223
import org.apache.gearpump.cluster.master.Master.{MasterInfo, MasterDescription}
2324
import org.apache.gearpump.cluster.scheduler.{Resource, ResourceAllocation, ResourceRequest}
2425
import org.apache.gearpump.cluster.worker.WorkerDescription
@@ -72,7 +73,12 @@ object AppMasterToMaster {
7273

7374
case class GetAppData(appId: Int, key: String)
7475
case class GetAppDataResult(key: String, value: Any)
75-
case class AppMasterDataDetail(appId: Int, application: Application)
76+
77+
//TODO:
78+
// clock field may not make sense for applications other than streaming
79+
case class AppMasterDataDetail(
80+
appId: Int, appName: String = null, application: Application = null,
81+
actorPath: String = null, clock: TimeStamp = 0, executors: List[String] = null)
7682

7783
case object GetAllWorkers
7884
case class GetWorkerData(workerId: Int)
@@ -95,7 +101,12 @@ object MasterToAppMaster {
95101
}
96102
case class AppMasterRegistered(appId: Int)
97103
case object ShutdownAppMaster
98-
case class AppMasterData(appId: Int, workerPath: String)
104+
105+
type AppMasterStatus = String
106+
val AppMasterActive: AppMasterStatus = "active"
107+
val AppMasterInActive: AppMasterStatus = "inactive"
108+
109+
case class AppMasterData(appId: Int, appName: String, appMasterPath: String, workerPath: String, status: AppMasterStatus)
99110
case class AppMasterDataRequest(appId: Int, detail: Boolean = false)
100111
case class AppMastersData(appMasters: List[AppMasterData])
101112
case object AppMastersDataRequest

core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,9 @@ private[cluster] class AppManager(masterHA : ActorRef, kvService: ActorRef, laun
6262
//from appid to appMaster data
6363
private var appMasterRegistry = Map.empty[Int, (ActorRef, AppMasterRuntimeInfo)]
6464

65+
// dead appmaster list
66+
private var deadAppMasters = Map.empty[Int, (ActorRef, AppMasterRuntimeInfo)]
67+
6568
def receive: Receive = null
6669

6770
masterHA ! GetMasterState
@@ -146,19 +149,32 @@ private[cluster] class AppManager(masterHA : ActorRef, kvService: ActorRef, laun
146149
val appMastersData = collection.mutable.ListBuffer[AppMasterData]()
147150
appMasterRegistry.foreach(pair => {
148151
val (id, (appMaster:ActorRef, info:AppMasterRuntimeInfo)) = pair
149-
appMastersData += AppMasterData(id,info.worker.path.toString)
150-
}
151-
)
152+
val appMasterPath = ActorUtil.getFullPath(context.system, appMaster.path)
153+
val workerPath = ActorUtil.getFullPath(context.system, info.worker.path)
154+
155+
appMastersData += AppMasterData(id, info.appName, appMasterPath, workerPath, AppMasterActive)
156+
})
157+
158+
deadAppMasters.foreach(pair => {
159+
val (id, (appMaster:ActorRef, info:AppMasterRuntimeInfo)) = pair
160+
appMastersData += AppMasterData(id, info.appName, null, null, AppMasterInActive)
161+
})
162+
152163
sender ! AppMastersData(appMastersData.toList)
153164
case appMasterDataRequest: AppMasterDataRequest =>
154165
val appId = appMasterDataRequest.appId
155166
val (appMaster, info) = appMasterRegistry.getOrElse(appId, (null, null))
156167
Option(info) match {
157-
case a@Some(data) =>
158-
val worker = a.get.worker
159-
sender ! AppMasterData(appId = appId, workerPath = worker.path.toString)
168+
case Some(info) =>
169+
val worker = info.worker
170+
171+
val appMasterPath = ActorUtil.getFullPath(context.system, appMaster.path)
172+
val workerPath = ActorUtil.getFullPath(context.system, worker.path)
173+
174+
sender ! AppMasterData(appId = appId, info.appName, appMasterPath, workerPath, AppMasterActive)
160175
case None =>
161-
sender ! AppMasterData(appId = appId, workerPath = null)
176+
// TODO: refactor this to make sure it more more clear that we don't have this application
177+
sender ! AppMasterData(appId = appId, null, appMasterPath = null, workerPath = null, AppMasterInActive)
162178
}
163179
case appMasterDataDetailRequest: AppMasterDataDetailRequest =>
164180
val appId = appMasterDataDetailRequest.appId
@@ -167,7 +183,7 @@ private[cluster] class AppManager(masterHA : ActorRef, kvService: ActorRef, laun
167183
case Some(_appMaster) =>
168184
_appMaster forward appMasterDataDetailRequest
169185
case None =>
170-
sender ! AppMasterDataDetail(appId, null)
186+
sender ! AppMasterDataDetail(appId)
171187
}
172188
}
173189

@@ -248,7 +264,11 @@ private[cluster] class AppManager(masterHA : ActorRef, kvService: ActorRef, laun
248264
case class RecoverApplication(applicationStatus : ApplicationState)
249265

250266
private def cleanApplicationData(appId : Int) : Unit = {
267+
268+
//add the dead app to dead appMaster
269+
appMasterRegistry.get(appId).map(deadAppMasters += appId -> _)
251270
appMasterRegistry -= appId
271+
252272
masterHA ! DeleteMasterState(appId)
253273
kvService ! DeleteKVGroup(appId.toString)
254274
}

core/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ private[cluster] class Worker(masterProxy : ActorRef) extends Actor with TimeOut
5757
private val address = ActorUtil.getFullPath(context.system, self.path)
5858
private var resource = Resource.empty
5959
private var allocatedResource = Map[ActorRef, Resource]()
60-
private var executorsInfo = Map[Int, ExecutorInfo]()
60+
private var executorsInfo = Map[ActorRef, ExecutorInfo]()
6161
private var id = -1
6262
private val createdTime = System.currentTimeMillis()
6363
private var masterInfo: MasterInfo = null
@@ -109,7 +109,7 @@ private[cluster] class Worker(masterProxy : ActorRef) extends Actor with TimeOut
109109

110110
sendMsgWithTimeOutCallBack(masterInfo.master,
111111
ResourceUpdate(self, id, resource), 30, updateResourceTimeOut())
112-
executorsInfo += launch.executorId -> ExecutorInfo(launch.appId, launch.executorId, launch.resource.slots)
112+
executorsInfo += executor -> ExecutorInfo(launch.appId, launch.executorId, launch.resource.slots)
113113
context.watch(executor)
114114
}
115115
case UpdateResourceFailed(reason, ex) =>
@@ -138,6 +138,7 @@ private[cluster] class Worker(masterProxy : ActorRef) extends Actor with TimeOut
138138
val allocated = allocatedResource.get(actor)
139139
if (allocated.isDefined) {
140140
resource = resource + allocated.get
141+
executorsInfo -= actor
141142
allocatedResource = allocatedResource - actor
142143
sendMsgWithTimeOutCallBack(master, ResourceUpdate(self, id, resource), 30, updateResourceTimeOut())
143144
}

core/src/main/scala/org/apache/gearpump/util/Graph.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import org.jgrapht.traverse.TopologicalOrderIterator
2626
import scala.collection.JavaConversions._
2727
import scala.language.implicitConversions
2828

29-
3029
/**
3130
* Application DAG
3231
*/
@@ -78,6 +77,16 @@ class Graph[N, E](private[Graph] val graph : DefaultDirectedGraph[N, Edge[E]]) e
7877
new Graph(newGraph)
7978
}
8079

80+
def isEmpty: Boolean = {
81+
val vertexCount = vertices.size
82+
val edgeCount = edges.length
83+
if (vertexCount + edgeCount == 0) {
84+
true
85+
} else {
86+
false
87+
}
88+
}
89+
8190
/**
8291
* Return an iterator of vertex in topological order
8392
*/

core/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit
2222
import akka.actor.Props
2323
import akka.testkit.TestProbe
2424
import org.apache.gearpump.cluster.ClientToMaster.ShutdownApplication
25-
import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMastersData, AppMastersDataRequest, ReplayFromTimestampWindowTrailingEdge}
25+
import org.apache.gearpump.cluster.MasterToAppMaster._
2626
import org.apache.gearpump.cluster.MasterToClient.{ReplayApplicationResult, ShutdownApplicationResult}
2727
import org.apache.gearpump.cluster.MasterToWorker.WorkerRegistered
2828
import org.apache.gearpump.cluster.WorkerToMaster.RegisterNewWorker
@@ -97,7 +97,7 @@ class MainSpec extends FlatSpec with Matchers with BeforeAndAfterEach with Maste
9797
Array("-master", s"$getHost:$getPort"))
9898

9999
masterReceiver.expectMsg(PROCESS_BOOT_TIME, AppMastersDataRequest)
100-
masterReceiver.reply(AppMastersData(List(AppMasterData(0, null))))
100+
masterReceiver.reply(AppMastersData(List(AppMasterData(0, "appName", null, null, AppMasterActive))))
101101

102102
info.destroy()
103103
}

core/src/test/scala/org/apache/gearpump/cluster/master/AppManagerSpec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ class AppManagerSpec extends FlatSpec with Matchers with BeforeAndAfterEach with
9999
assert(mockClient.receiveN(1).head.asInstanceOf[ResolveAppIdResult].appMaster.isFailure)
100100

101101
mockClient.send(appManager, AppMasterDataRequest(1))
102-
mockClient.expectMsg(AppMasterData(1, null))
102+
mockClient.expectMsg(AppMasterData(1, appName = null, null, null, AppMasterInActive))
103103
}
104104

105105
"AppManager" should "reject the application submission if the app name already existed" in {

core/src/test/scala/org/apache/gearpump/util/GraphSpec.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,4 +92,9 @@ class GraphSpec extends PropSpec with PropertyChecks with Matchers {
9292
}
9393
}
9494

95+
property("Check empty graph") {
96+
val graph = Graph.empty[String, String]
97+
assert(graph.isEmpty)
98+
}
99+
95100
}

services/dashboard/partials/apps.html

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,16 @@
66
<tr>
77
<td>ID</td>
88
<td>Name</td>
9-
<td>Worker Path</td>
9+
<td>Status</td>
10+
<td>AppMaster Path</td>
1011
</tr>
1112
</thead>
1213
<tbody>
1314
<tr ng-repeat="app in apps track by $index">
1415
<td ng-click="loadDag(app.id)">{{app.id}}</td>
1516
<td ng-click="loadDag(app.id)">{{app.name}}</td>
16-
<td ng-click="loadDag(app.id)">{{app.workerPath}}</td>
17+
<td ng-click="loadDag(app.id)">{{app.status}}</td>
18+
<td ng-click="loadDag(app.id)">{{app.appMasterPath}}</td>
1719
</tr>
1820
</tbody>
1921
</table>

services/src/main/scala/org/apache/gearpump/services/AppMasterService.scala

Lines changed: 88 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,21 @@ package org.apache.gearpump.services
2020

2121
import akka.actor.{ActorSystem, ActorRef}
2222
import akka.pattern.ask
23-
import org.apache.gearpump.cluster.AppMasterToMaster.AppMasterDataDetail
23+
import org.apache.gearpump._
24+
import org.apache.gearpump.cluster.AppMasterToMaster.{AppMasterDataDetail}
2425
import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMasterDataDetailRequest, AppMasterDataRequest}
25-
import org.apache.gearpump.cluster.UserConfig
26+
import org.apache.gearpump.cluster.{Application, UserConfig}
2627
import org.apache.gearpump.partitioner.Partitioner
28+
import org.apache.gearpump.streaming.appmaster.AppMaster
2729
import org.apache.gearpump.streaming.{AppDescription, TaskDescription, DAG}
2830
import org.apache.gearpump.util.{Graph, Constants}
2931
import spray.http.StatusCodes
3032
import spray.routing.HttpService
33+
import upickle.{Js, Writer, Reader}
3134

3235
import scala.concurrent.{ExecutionContext, Future}
3336
import scala.util.{Failure, Success, Try}
37+
import AppMasterService._
3438

3539
trait AppMasterService extends HttpService {
3640
import upickle._
@@ -47,15 +51,16 @@ trait AppMasterService extends HttpService {
4751
case true =>
4852
onComplete((master ? AppMasterDataDetailRequest(appId)).asInstanceOf[Future[AppMasterDataDetail]]) {
4953
case Success(value: AppMasterDataDetail) =>
50-
val appDescription: AppDescription = value.application
51-
Option(appDescription) match {
52-
case Some(app) =>
53-
complete(write(app))
54-
case None =>
55-
complete(StatusCodes.InternalServerError, "UserConfig is null")
54+
if (isStreamingApplication(value)) {
55+
val streamApp: StreamingAppMasterDataDetail = value
56+
complete(write(streamApp))
57+
} else {
58+
val app: GeneralAppMasterDataDetail = value
59+
complete(write(app))
5660
}
57-
case Failure(ex) => complete(StatusCodes.InternalServerError, s"An error occurred: ${ex.getMessage}")
58-
}
61+
case Failure(ex) =>
62+
complete(StatusCodes.InternalServerError, s"An error occurred: ${ex.getMessage}")
63+
}
5964
case false =>
6065
onComplete((master ? AppMasterDataRequest(appId)).asInstanceOf[Future[AppMasterData]]) {
6166
case Success(value: AppMasterData) => complete(write(value))
@@ -67,3 +72,76 @@ trait AppMasterService extends HttpService {
6772
}
6873
}
6974
}
75+
76+
object AppMasterService {
77+
case class StreamingAppMasterDataDetail(appId: Int, appName: String = null, dag: Graph[TaskDescription, Partitioner] = null,
78+
actorPath: String = null, clock: TimeStamp = 0, executors: List[String] = null)
79+
80+
case class GeneralAppMasterDataDetail(
81+
appId: Int, appName: String = null, actorPath: String = null, executors: List[String] = null)
82+
83+
// for now we only serialize name and dag. We may also need a reader once we allow DAG mods.
84+
implicit val writer: Writer[StreamingAppMasterDataDetail] = upickle.Writer[StreamingAppMasterDataDetail] {
85+
86+
case app =>
87+
88+
val appId = Js.Num(app.appId)
89+
val appName = Js.Str(app.appName)
90+
val actorPath = Js.Str(app.actorPath)
91+
val clock = Js.Num(app.clock)
92+
93+
val executorsSeq = Some(app.executors).map{ executors =>
94+
executors.map(Js.Str(_)).toSeq
95+
}.map { seq =>
96+
Js.Arr(seq: _*)
97+
}
98+
99+
val dag = Some(app.dag).map {dag =>
100+
Js.Obj(
101+
("vertices", Js.Arr(app.dag.vertices.map(taskDescription => {
102+
Js.Str(taskDescription.taskClass)
103+
}).toSeq:_*)),
104+
("edges", Js.Arr(app.dag.edges.map(f => {
105+
var (node1, edge, node2) = f
106+
Js.Arr(Js.Str(node1.taskClass), Js.Str(edge.getClass.getName), Js.Str(node2.taskClass))
107+
}).toSeq:_*)))
108+
}.getOrElse(Js.Null)
109+
110+
Js.Obj(
111+
("appId", appId),
112+
("appName", appName),
113+
("actorPath", actorPath),
114+
("clock", clock),
115+
("executors", executorsSeq.getOrElse(Js.Null)),
116+
("dag", dag)
117+
)
118+
}
119+
120+
implicit def appMasterDetailToStreaming(app: AppMasterDataDetail)(implicit system: ActorSystem): StreamingAppMasterDataDetail = {
121+
import app.{appId, appName, application, actorPath, clock, executors}
122+
import AppDescription._
123+
val appDescription: AppDescription = application
124+
return new StreamingAppMasterDataDetail(appId, appName, appDescription.dag, actorPath, clock, executors)
125+
}
126+
127+
implicit def appMasterDetailToGeneralApp(app: AppMasterDataDetail)(implicit system: ActorSystem): GeneralAppMasterDataDetail = {
128+
import app.{appId, appName, application, actorPath, executors}
129+
import AppDescription._
130+
val appDescription: AppDescription = application
131+
return new GeneralAppMasterDataDetail(appId, appName, actorPath, executors)
132+
}
133+
134+
def isStreamingApplication(appDetail: AppMasterDataDetail): Boolean = {
135+
val app = appDetail.application
136+
if (app == null) {
137+
return false
138+
} else {
139+
val className = app.appMaster
140+
if (className == classOf[AppMaster].getName) {
141+
return true;
142+
} else {
143+
return false;
144+
}
145+
}
146+
}
147+
}

0 commit comments

Comments
 (0)