Skip to content

Commit 8ceb5af

Browse files
committed
fix gearpump#417 add a experiment module to distribute a service
1 parent 0ae4ad0 commit 8ceb5af

File tree

7 files changed

+273
-2
lines changed

7 files changed

+273
-2
lines changed

core/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,20 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
118
package org.apache.gearpump.cluster.main
219

320
import java.net.{URL, URLClassLoader}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
This directory contains the example that distribute a zip file, install and start a service. This README explain how to quick-start this example.
2+
3+
In order to run the example:
4+
5+
1. Start a gearpump cluster, including Master and Workers.
6+
7+
2. Start the AppMaster:<br>
8+
```bash
9+
target/pack/bin/gear app -jar experiments/distributeservice/target/$SCALA_VERSION_MAJOR/gearpump-experiments-distributeservice_$VERSION.jar org.apache.gearpump.distributeservice.DistributeService -master 127.0.0.1:3000
10+
```
11+
3. Distribute the file:<br>
12+
```bash
13+
target/pack/bin/gear app -jar experiments/distributeservice/target/$SCALA_VERSION_MAJOR/gearpump-experiments-distributeservice_$VERSION.jar org.apache.gearpump.distributeservice.DistributeServiceClient -master 127.0.0.1:3000 -appid $APPID -file ${File_Path}
14+
```
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.gearpump.experiments.distributeservice
19+
20+
import java.io.File
21+
22+
import akka.actor.{Deploy, Props}
23+
import akka.remote.RemoteScope
24+
import com.typesafe.config.{ConfigFactory, Config}
25+
import org.apache.gearpump.cluster.ClientToMaster.ShutdownApplication
26+
import org.apache.gearpump.cluster.appmaster.ExecutorSystemScheduler.{ExecutorSystemJvmConfig, StartExecutorSystemTimeout, ExecutorSystemStarted}
27+
import org.apache.gearpump.cluster.{ExecutorContext, ApplicationMaster, Application, AppMasterContext}
28+
import org.apache.gearpump.experiments.distributeservice.DistServiceAppMaster.{DistributeFile, FileContainer, GetFileContainer}
29+
import org.apache.gearpump.util._
30+
import org.slf4j.Logger
31+
32+
import akka.pattern.{ask, pipe}
33+
import scala.concurrent.Future
34+
35+
class DistServiceAppMaster(appContext : AppMasterContext, app : Application) extends ApplicationMaster {
36+
import appContext._
37+
import context.dispatcher
38+
implicit val timeout = Constants.FUTURE_TIMEOUT
39+
private val LOG: Logger = LogUtil.getLogger(getClass, app = appId)
40+
private var currentExecutorId = 0
41+
private var fileServerPort = -1
42+
43+
val rootDirectory = new File("/tmp")
44+
val host = context.system.settings.config.getString(Constants.NETTY_TCP_HOSTNAME)
45+
val server = context.actorOf(Props(classOf[FileServer], rootDirectory, host , 0))
46+
47+
override def preStart(): Unit = {
48+
LOG.info(s"Distribute Service AppMaster started")
49+
ActorUtil.launchExecutorOnEachWorker(masterProxy, getExecutorJvmConfig, self)
50+
}
51+
52+
(server ? FileServer.GetPort).asInstanceOf[Future[FileServer.Port]] pipeTo self
53+
54+
override def receive: Receive = {
55+
case ExecutorSystemStarted(executorSystem) =>
56+
import executorSystem.{address, worker, resource => executorResource}
57+
val executorContext = ExecutorContext(currentExecutorId, worker.workerId, appId, self, executorResource)
58+
//start executor
59+
val executor = context.actorOf(Props(classOf[DistServiceExecutor], executorContext, app.userConfig)
60+
.withDeploy(Deploy(scope = RemoteScope(address))), currentExecutorId.toString)
61+
executorSystem.bindLifeCycleWith(executor)
62+
currentExecutorId += 1
63+
case StartExecutorSystemTimeout =>
64+
LOG.error(s"Failed to allocate resource in time")
65+
masterProxy ! ShutdownApplication(appId)
66+
context.stop(self)
67+
case FileServer.Port(port) =>
68+
this.fileServerPort = port
69+
case GetFileContainer =>
70+
val name = Math.abs(new java.util.Random().nextLong()).toString
71+
sender ! new FileContainer(s"http://$host:$fileServerPort/$name")
72+
case DistributeFile(url, fileName) =>
73+
context.children.foreach(_ ! DistributeFile(url, fileName))
74+
}
75+
76+
private def getExecutorJvmConfig: ExecutorSystemJvmConfig = {
77+
val config: Config = Option(app.clusterConfig).map(_.getConfig).getOrElse(ConfigFactory.empty())
78+
val jvmSetting = Util.resolveJvmSetting(config.withFallback(context.system.settings.config)).executor
79+
ExecutorSystemJvmConfig(jvmSetting.classPath, jvmSetting.vmargs,
80+
appJar, username, config)
81+
}
82+
}
83+
84+
object DistServiceAppMaster {
85+
case object GetFileContainer
86+
87+
case class FileContainer(url: String)
88+
89+
case class DistributeFile(url: String, fileName: String)
90+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.gearpump.experiments.distributeservice
19+
20+
import java.io.File
21+
22+
import akka.actor.Actor
23+
import org.apache.commons.io.FileUtils
24+
import org.apache.gearpump.cluster.{UserConfig, ExecutorContext}
25+
import org.apache.gearpump.experiments.distributeservice.DistServiceAppMaster.DistributeFile
26+
import org.apache.gearpump.util.{FileServer, LogUtil}
27+
import org.slf4j.Logger
28+
29+
class DistServiceExecutor(executorContext: ExecutorContext, userConf : UserConfig) extends Actor {
30+
import executorContext._
31+
private val LOG: Logger = LogUtil.getLogger(getClass, executor = executorId, app = appId)
32+
33+
LOG.info(s"ShellExecutor started!")
34+
override def receive: Receive = {
35+
case DistributeFile(url, fileName) =>
36+
val tempFile = File.createTempFile(s"executor$executorId", fileName)
37+
val client = FileServer.newClient
38+
val bytes = client.get(url).get
39+
FileUtils.writeByteArrayToFile(tempFile, bytes)
40+
LOG.info(s"executor $executorId retrieve file $fileName to local ${tempFile.getPath}")
41+
}
42+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.gearpump.experiments.distributeservice
19+
20+
import org.apache.gearpump.cluster.client.ClientContext
21+
import org.apache.gearpump.cluster.{AppJar, UserConfig, Application}
22+
import org.apache.gearpump.cluster.main.{ParseResult, CLIOption, ArgumentsParser}
23+
import org.apache.gearpump.util.LogUtil
24+
import org.slf4j.Logger
25+
26+
object DistributeService extends App with ArgumentsParser {
27+
private val LOG: Logger = LogUtil.getLogger(getClass)
28+
29+
override val options: Array[(String, CLIOption[Any])] = Array(
30+
"master" -> CLIOption[String]("<host1:port1,host2:port2,host3:port3>", required = true)
31+
)
32+
33+
def application(config: ParseResult) : Application = {
34+
Application("DistributedShell", classOf[DistServiceAppMaster].getName, UserConfig.empty)
35+
}
36+
37+
LOG.info(s"Distribute Service submitting application...")
38+
val config = parse(args)
39+
val context = ClientContext(config.getString("master"))
40+
implicit val system = context.system
41+
val appId = context.submit(application(config))
42+
context.close()
43+
LOG.info(s"Distribute Service Application started with appId $appId !")
44+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.gearpump.experiments.distributeservice
19+
20+
import java.io.File
21+
import org.apache.commons.io.FileUtils
22+
import org.apache.gearpump.cluster.client.ClientContext
23+
import org.apache.gearpump.cluster.main.{CLIOption, ArgumentsParser}
24+
import org.apache.gearpump.experiments.distributeservice.DistServiceAppMaster.{DistributeFile, FileContainer, GetFileContainer}
25+
import org.apache.gearpump.util.{FileServer, Constants}
26+
import org.slf4j.{LoggerFactory, Logger}
27+
28+
import akka.pattern.ask
29+
import scala.concurrent.Future
30+
import scala.util.{Failure, Success}
31+
32+
object DistributeServiceClient extends App with ArgumentsParser{
33+
implicit val timeout = Constants.FUTURE_TIMEOUT
34+
import scala.concurrent.ExecutionContext.Implicits.global
35+
private val LOG: Logger = LoggerFactory.getLogger(getClass)
36+
37+
override val options: Array[(String, CLIOption[Any])] = Array(
38+
"master" -> CLIOption[String]("<host1:port1,host2:port2,host3:port3>", required = true),
39+
"appid" -> CLIOption[Int]("<the distributed shell appid>", required = true),
40+
"file" -> CLIOption[String]("<file path>", required = true)
41+
)
42+
43+
val config = parse(args)
44+
val context = ClientContext(config.getString("master"))
45+
val appid = config.getInt("appid")
46+
val file = new File(config.getString("file"))
47+
val appMaster = context.resolveAppID(appid)
48+
(appMaster ? GetFileContainer).asInstanceOf[Future[FileContainer]].map { container =>
49+
val bytes = FileUtils.readFileToByteArray(file)
50+
val result = FileServer.newClient.save(container.url, bytes)
51+
result match {
52+
case Success(_) =>
53+
appMaster ! DistributeFile(container.url, file.getName)
54+
context.close()
55+
case Failure(ex) => throw ex
56+
}
57+
}
58+
}

project/Build.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ object Build extends sbt.Build {
191191
packExtraClasspath := new DefaultValueMap(Seq("${PROG_HOME}/conf", "${PROG_HOME}/dashboard"))
192192
)
193193
).dependsOn(core, streaming, services, external_kafka)
194-
.aggregate(core, streaming, fsio, examples_kafka, sol, wordcount, complexdag, services, external_kafka, examples, distributedshell)
194+
.aggregate(core, streaming, fsio, examples_kafka, sol, wordcount, complexdag, services, external_kafka, examples, distributedshell, distributeservice)
195195

196196
lazy val core = Project(
197197
id = "gearpump-core",
@@ -335,5 +335,11 @@ object Build extends sbt.Build {
335335
base = file("examples/distributedshell"),
336336
settings = commonSettings
337337
) dependsOn(core % "test->test", core % "provided")
338-
338+
339+
lazy val distributeservice = Project(
340+
id = "gearpump-experiments-distributeservice",
341+
base = file("experiments/distributeservice"),
342+
settings = commonSettings
343+
) dependsOn(core % "test->test;compile->compile")
344+
339345
}

0 commit comments

Comments
 (0)