@@ -43,7 +43,8 @@ import scala.util.{Failure, Success, Try}
43
43
44
44
case class WorkerDescription (workerId : Int , state : String , actorPath : String ,
45
45
aliveFor : Long , logFile : String ,
46
- executors : Array [ExecutorInfo ], totalSlots : Int )
46
+ executors : Array [ExecutorInfo ], totalSlots : Int , availableSlots : Int ,
47
+ homeDirectory : String )
47
48
/**
48
49
* masterProxy is used to resolve the master
49
50
*/
@@ -52,7 +53,7 @@ private[cluster] class Worker(masterProxy : ActorRef) extends Actor with TimeOut
52
53
53
54
private val systemConfig : Config = context.system.settings.config
54
55
private val configStr = systemConfig.root().render
55
- private val logFile = System .getProperty( " gearpump.log.file " )
56
+
56
57
private val address = ActorUtil .getFullPath(context.system, self.path)
57
58
private var resource = Resource .empty
58
59
private var allocatedResource = Map [ActorRef , Resource ]()
@@ -61,6 +62,8 @@ private[cluster] class Worker(masterProxy : ActorRef) extends Actor with TimeOut
61
62
private val createdTime = System .currentTimeMillis()
62
63
private var masterInfo : MasterInfo = null
63
64
65
+ private var totalSlots : Int = 0
66
+
64
67
override def receive : Receive = null
65
68
val LOG : Logger = LogUtil .getLogger(getClass, worker = id)
66
69
@@ -116,8 +119,10 @@ private[cluster] class Worker(masterProxy : ActorRef) extends Actor with TimeOut
116
119
LOG .info(s " Worker $id update resource succeed " )
117
120
case GetWorkerData (workerId) =>
118
121
val aliveFor = System .currentTimeMillis() - createdTime
122
+ val logDir = LogUtil .daemonLogDir(systemConfig).getAbsolutePath
123
+ val userDir = System .getProperty(" user.dir" );
119
124
sender ! WorkerData (Some (WorkerDescription (id, " active" , address,
120
- aliveFor, logFile , executorsInfo.values.toArray, resource.slots)))
125
+ aliveFor, logDir , executorsInfo.values.toArray, totalSlots, resource.slots, userDir )))
121
126
}
122
127
123
128
def terminationWatch (master : ActorRef ) : Receive = {
@@ -143,8 +148,8 @@ private[cluster] class Worker(masterProxy : ActorRef) extends Actor with TimeOut
143
148
import context .dispatcher
144
149
override def preStart () : Unit = {
145
150
LOG .info(s " Worker[ $id] Sending master RegisterNewWorker " )
146
- val slots = systemConfig.getInt(Constants .GEARPUMP_WORKER_SLOTS )
147
- this .resource = Resource (slots )
151
+ totalSlots = systemConfig.getInt(Constants .GEARPUMP_WORKER_SLOTS )
152
+ this .resource = Resource (totalSlots )
148
153
masterProxy ! RegisterNewWorker
149
154
context.become(waitForMasterConfirm(repeatActionUtil(30 )(Unit )))
150
155
}
0 commit comments