Skip to content

Commit 0ae4ad0

Browse files
committed
Merge pull request gearpump#500 from clockfly/fix_#476
fix gearpump#476, fix worker and master log path in rest
2 parents a4d4b76 + 0f648a7 commit 0ae4ad0

File tree

3 files changed

+28
-12
lines changed

3 files changed

+28
-12
lines changed

core/src/main/scala/org/apache/gearpump/cluster/master/Master.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -126,8 +126,9 @@ private[cluster] class Master extends Actor with Stash {
126126

127127
case GetMasterData =>
128128
val aliveFor = System.currentTimeMillis() - birth
129-
val logFilePath = System.getProperty("gearpump.log.file")
130-
val masterDescription = MasterDescription(hostPort.toTuple, getMasterClusterList.map(_.toTuple), aliveFor, logFilePath, jarStoreRootPath, MasterStatus.Synced)
129+
val logFileDir = LogUtil.daemonLogDir(systemConfig).getAbsolutePath
130+
val userDir = System.getProperty("user.dir");
131+
val masterDescription = MasterDescription(hostPort.toTuple, getMasterClusterList.map(_.toTuple), aliveFor, logFileDir, jarStoreRootPath, MasterStatus.Synced, userDir)
131132
sender ! MasterData(masterDescription)
132133

133134
case invalidAppMaster: InvalidAppMaster =>
@@ -217,7 +218,8 @@ object Master {
217218

218219
case class MasterDescription(leader: (String, Int), cluster: List[(String, Int)], aliveFor: Long,
219220
logFile: String, jarStore: String,
220-
masterStatus: MasterStatus.Type)
221+
masterStatus: MasterStatus.Type,
222+
homeDirectory: String)
221223

222224
case class SlotStatus(totalSlots: Int, availableSlots: Int)
223225
}

core/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,8 @@ import scala.util.{Failure, Success, Try}
4343

4444
case class WorkerDescription(workerId: Int, state: String, actorPath: String,
4545
aliveFor: Long, logFile: String,
46-
executors: Array[ExecutorInfo], totalSlots: Int)
46+
executors: Array[ExecutorInfo], totalSlots: Int, availableSlots: Int,
47+
homeDirectory: String)
4748
/**
4849
* masterProxy is used to resolve the master
4950
*/
@@ -52,7 +53,7 @@ private[cluster] class Worker(masterProxy : ActorRef) extends Actor with TimeOut
5253

5354
private val systemConfig : Config = context.system.settings.config
5455
private val configStr = systemConfig.root().render
55-
private val logFile = System.getProperty("gearpump.log.file")
56+
5657
private val address = ActorUtil.getFullPath(context.system, self.path)
5758
private var resource = Resource.empty
5859
private var allocatedResource = Map[ActorRef, Resource]()
@@ -61,6 +62,8 @@ private[cluster] class Worker(masterProxy : ActorRef) extends Actor with TimeOut
6162
private val createdTime = System.currentTimeMillis()
6263
private var masterInfo: MasterInfo = null
6364

65+
private var totalSlots: Int = 0
66+
6467
override def receive : Receive = null
6568
val LOG : Logger = LogUtil.getLogger(getClass, worker = id)
6669

@@ -116,8 +119,10 @@ private[cluster] class Worker(masterProxy : ActorRef) extends Actor with TimeOut
116119
LOG.info(s"Worker $id update resource succeed")
117120
case GetWorkerData(workerId) =>
118121
val aliveFor = System.currentTimeMillis() - createdTime
122+
val logDir = LogUtil.daemonLogDir(systemConfig).getAbsolutePath
123+
val userDir = System.getProperty("user.dir");
119124
sender ! WorkerData(Some(WorkerDescription(id, "active", address,
120-
aliveFor, logFile, executorsInfo.values.toArray, resource.slots)))
125+
aliveFor, logDir, executorsInfo.values.toArray, totalSlots, resource.slots, userDir)))
121126
}
122127

123128
def terminationWatch(master : ActorRef) : Receive = {
@@ -143,8 +148,8 @@ private[cluster] class Worker(masterProxy : ActorRef) extends Actor with TimeOut
143148
import context.dispatcher
144149
override def preStart() : Unit = {
145150
LOG.info(s"Worker[$id] Sending master RegisterNewWorker")
146-
val slots = systemConfig.getInt(Constants.GEARPUMP_WORKER_SLOTS)
147-
this.resource = Resource(slots)
151+
totalSlots = systemConfig.getInt(Constants.GEARPUMP_WORKER_SLOTS)
152+
this.resource = Resource(totalSlots)
148153
masterProxy ! RegisterNewWorker
149154
context.become(waitForMasterConfirm(repeatActionUtil(30)(Unit)))
150155
}

core/src/main/scala/org/apache/gearpump/util/LogUtil.scala

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package org.apache.gearpump.util
22

3+
import java.io.File
34
import java.net.InetAddress
45
import java.util.Properties
56

@@ -63,17 +64,25 @@ object LogUtil {
6364
processType match {
6465
case ProcessType.APPLICATION =>
6566
props.setProperty("log4j.rootLogger", "${gearpump.application.logger}")
66-
val appLogDir = config.getString(Constants.GEARPUMP_LOG_APPLICATION_DIR)
67-
props.setProperty("gearpump.application.log.rootdir", appLogDir)
67+
props.setProperty("gearpump.application.log.rootdir", applicationLogDir(config).getAbsolutePath)
6868
case _ =>
6969
props.setProperty("log4j.rootLogger", "${gearpump.root.logger}")
70-
val daemonLogDir = config.getString(Constants.GEARPUMP_LOG_DAEMON_DIR)
71-
props.setProperty("gearpump.log.dir", daemonLogDir)
70+
props.setProperty("gearpump.log.dir", daemonLogDir(config).getAbsolutePath)
7271
}
7372

7473
PropertyConfigurator.configure(props)
7574
}
7675

76+
def daemonLogDir(config: Config): File = {
77+
val dir = config.getString(Constants.GEARPUMP_LOG_DAEMON_DIR)
78+
new File(dir)
79+
}
80+
81+
def applicationLogDir(config: Config): File = {
82+
val appLogDir = config.getString(Constants.GEARPUMP_LOG_APPLICATION_DIR)
83+
new File(appLogDir)
84+
}
85+
7786
private def setHostnameSystemProperty : Unit = {
7887
val hostname = Try(InetAddress.getLocalHost.getHostName).getOrElse("local")
7988
//as log4j missing the HOSTNAME system property, add it to system property, just like logback does

0 commit comments

Comments
 (0)