Skip to content

Commit 4988efc

Browse files
committed
Adding Dataproc InstantiateInlineWorkFlow samples
1 parent b613028 commit 4988efc

File tree

2 files changed

+182
-0
lines changed

2 files changed

+182
-0
lines changed
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
/*
2+
* Copyright 2020 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
// [START dataproc_instantiate_inline_workflow]
18+
import com.google.api.gax.longrunning.OperationFuture;
19+
import com.google.cloud.dataproc.v1.ClusterConfig;
20+
import com.google.cloud.dataproc.v1.GceClusterConfig;
21+
import com.google.cloud.dataproc.v1.HadoopJob;
22+
import com.google.cloud.dataproc.v1.ManagedCluster;
23+
import com.google.cloud.dataproc.v1.OrderedJob;
24+
import com.google.cloud.dataproc.v1.RegionName;
25+
import com.google.cloud.dataproc.v1.WorkflowMetadata;
26+
import com.google.cloud.dataproc.v1.WorkflowTemplate;
27+
import com.google.cloud.dataproc.v1.WorkflowTemplatePlacement;
28+
import com.google.cloud.dataproc.v1.WorkflowTemplateServiceClient;
29+
import com.google.cloud.dataproc.v1.WorkflowTemplateServiceSettings;
30+
import com.google.protobuf.Empty;
31+
import java.io.IOException;
32+
import java.util.concurrent.ExecutionException;
33+
34+
public class InstantiateInlineWorkflow {
35+
36+
public static void InstantiateInlineWorkflow() throws IOException, InterruptedException {
37+
// TODO(developer): Replace these variables before running the sample.
38+
String projectId = "your-project-id";
39+
String region = "your-project-region";
40+
instantiateInlineWorkflow(projectId, region);
41+
}
42+
43+
public static void instantiateInlineWorkflow(String projectId, String region)
44+
throws IOException, InterruptedException {
45+
String myEndpoint = String.format("%s-dataproc.googleapis.com:443", region);
46+
47+
// Configure the settings for the workflow template service client.
48+
WorkflowTemplateServiceSettings workflowTemplateServiceSettings =
49+
WorkflowTemplateServiceSettings.newBuilder().setEndpoint(myEndpoint).build();
50+
51+
// Create a workflow template service client with the configured settings. The client only
52+
// needs to be created once and can be reused for multiple requests. Using a try-with-resources
53+
// closes the client, but this can also be done manually with the .close() method.
54+
try (WorkflowTemplateServiceClient workflowTemplateServiceClient =
55+
WorkflowTemplateServiceClient.create(workflowTemplateServiceSettings)) {
56+
57+
// Configure the jobs within the workflow.
58+
HadoopJob teragenHadoopJob =
59+
HadoopJob.newBuilder()
60+
.setMainJarFileUri("file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar")
61+
.addArgs("teragen")
62+
.addArgs("1000")
63+
.addArgs("hdfs:///gen/")
64+
.build();
65+
OrderedJob teragen =
66+
OrderedJob.newBuilder().setHadoopJob(teragenHadoopJob).setStepId("teragen").build();
67+
68+
HadoopJob terasortHadoopJob =
69+
HadoopJob.newBuilder()
70+
.setMainJarFileUri("file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar")
71+
.addArgs("terasort")
72+
.addArgs("hdfs:///gen/")
73+
.addArgs("hdfs:///sort/")
74+
.build();
75+
OrderedJob terasort =
76+
OrderedJob.newBuilder()
77+
.setHadoopJob(terasortHadoopJob)
78+
.addPrerequisiteStepIds("teragen")
79+
.setStepId("terasort")
80+
.build();
81+
82+
// Configure the cluster placement for the workflow.
83+
GceClusterConfig gceClusterConfig =
84+
GceClusterConfig.newBuilder().setZoneUri("us-central1-a").build();
85+
ClusterConfig clusterConfig =
86+
ClusterConfig.newBuilder().setGceClusterConfig(gceClusterConfig).build();
87+
ManagedCluster managedCluster =
88+
ManagedCluster.newBuilder()
89+
.setClusterName("my-managed-cluster")
90+
.setConfig(clusterConfig)
91+
.build();
92+
WorkflowTemplatePlacement workflowTemplatePlacement =
93+
WorkflowTemplatePlacement.newBuilder().setManagedCluster(managedCluster).build();
94+
95+
// Create the inline workflow template.
96+
WorkflowTemplate workflowTemplate =
97+
WorkflowTemplate.newBuilder()
98+
.addJobs(teragen)
99+
.addJobs(terasort)
100+
.setPlacement(workflowTemplatePlacement)
101+
.build();
102+
103+
// Submit the instantiated inline workflow template request.
104+
String parent = RegionName.format(projectId, region);
105+
OperationFuture<Empty, WorkflowMetadata> instantiateInlineWorkflowTemplateAsync =
106+
workflowTemplateServiceClient.instantiateInlineWorkflowTemplateAsync(
107+
parent, workflowTemplate);
108+
instantiateInlineWorkflowTemplateAsync.get();
109+
110+
// Print out a success message.
111+
System.out.printf("Workflow ran successfully.");
112+
113+
} catch (ExecutionException e) {
114+
System.err.println(String.format("Error running workflow: %s ", e.getMessage()));
115+
}
116+
}
117+
}
118+
// [END dataproc_instantiate_inline_workflow]
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright 2019 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import static junit.framework.TestCase.assertNotNull;
18+
import static org.hamcrest.MatcherAssert.assertThat;
19+
20+
import java.io.ByteArrayOutputStream;
21+
import java.io.IOException;
22+
import java.io.PrintStream;
23+
import org.hamcrest.CoreMatchers;
24+
import org.junit.Before;
25+
import org.junit.BeforeClass;
26+
import org.junit.Test;
27+
import org.junit.runner.RunWith;
28+
import org.junit.runners.JUnit4;
29+
30+
@RunWith(JUnit4.class)
31+
public class InstantiateInlineWorkflowTest {
32+
33+
34+
private static final String REGION = "us-central1";
35+
private static final String PROJECT_ID = System.getenv("GOOGLE_CLOUD_PROJECT");
36+
37+
private ByteArrayOutputStream bout;
38+
39+
private static void requireEnv(String varName) {
40+
assertNotNull(
41+
String.format("Environment variable '%s' is required to perform these tests.", varName),
42+
System.getenv(varName));
43+
}
44+
45+
@BeforeClass
46+
public static void checkRequirements() {
47+
requireEnv("GOOGLE_APPLICATION_CREDENTIALS");
48+
requireEnv("GOOGLE_CLOUD_PROJECT");
49+
}
50+
51+
@Before
52+
public void setUp() {
53+
bout = new ByteArrayOutputStream();
54+
System.setOut(new PrintStream(bout));
55+
}
56+
57+
@Test
58+
public void InstanstiateInlineWorkflowTest() throws IOException, InterruptedException {
59+
InstantiateInlineWorkflow.instantiateInlineWorkflow(PROJECT_ID, REGION);
60+
String output = bout.toString();
61+
62+
assertThat(output, CoreMatchers.containsString("successfully"));
63+
}
64+
}

0 commit comments

Comments
 (0)