diff --git a/dataproc/README.md b/dataproc/README.md index 1d919e4631d..98622be7dc1 100644 --- a/dataproc/README.md +++ b/dataproc/README.md @@ -1,4 +1,4 @@ -# Cloud Dataproc API Example +# Cloud Dataproc API Examples [![Open in Cloud Shell][shell_img]][shell_link] @@ -7,21 +7,20 @@ Sample command-line programs for interacting with the Cloud Dataproc API. - -Please see [the tutorial on the using the Dataproc API with the Python client +See [the tutorial on the using the Dataproc API with the Python client library](https://cloud.google.com/dataproc/docs/tutorials/python-library-example) -for more information. +for information on a walkthrough you can run to try out the Cloud Dataproc API sample code. -Note that while this sample demonstrates interacting with Dataproc via the API, the functionality -demonstrated here could also be accomplished using the Cloud Console or the gcloud CLI. +Note that while this sample demonstrates interacting with Dataproc via the API, the functionality demonstrated here could also be accomplished using the Cloud Console or the gcloud CLI. -`list_clusters.py` is a simple command-line program to demonstrate connecting to the -Dataproc API and listing the clusters in a region +`list_clusters.py` is a simple command-line program to demonstrate connecting to the Cloud Dataproc API and listing the clusters in a region. -`create_cluster_and_submit_job.py` demonstrates how to create a cluster, submit the +`submit_job_to_cluster.py` demonstrates how to create a cluster, submit the `pyspark_sort.py` job, download the output from Google Cloud Storage, and output the result. -`pyspark_sort.py_gcs` is the asme as `pyspark_sort.py` but demonstrates +`single_job_workflow.py` uses the Cloud Dataproc InstantiateInlineWorkflowTemplate API to create an ephemeral cluster, run a job, then delete the cluster with one API request. + +`pyspark_sort.py_gcs` is the same as `pyspark_sort.py` but demonstrates reading from a GCS bucket. ## Prerequisites to run locally: @@ -59,32 +58,27 @@ To run list_clusters.py: python list_clusters.py $GOOGLE_CLOUD_PROJECT --region=$REGION -`submit_job_to_cluster.py` can create the Dataproc cluster, or use an existing one. -If you'd like to create a cluster ahead of time, either use the -[Cloud Console](console.cloud.google.com) or run: +`submit_job_to_cluster.py` can create the Dataproc cluster or use an existing cluster. To create a cluster before running the code, you can use the [Cloud Console](console.cloud.google.com) or run: gcloud dataproc clusters create your-cluster-name -To run submit_job_to_cluster.py, first create a GCS bucket for Dataproc to stage files, from the Cloud Console or with -gsutil: +To run submit_job_to_cluster.py, first create a GCS bucket (used by Cloud Dataproc to stage files) from the Cloud Console or with gsutil: gsutil mb gs:// -Set the environment variable's name: +Next, set the following environment variables: BUCKET=your-staging-bucket CLUSTER=your-cluster-name -Then, if you want to rely on an existing cluster, run: +Then, if you want to use an existing cluster, run: python submit_job_to_cluster.py --project_id=$GOOGLE_CLOUD_PROJECT --zone=us-central1-b --cluster_name=$CLUSTER --gcs_bucket=$BUCKET -Otherwise, if you want the script to create a new cluster for you: +Alternatively, to create a new cluster, which will be deleted at the end of the job, run: python submit_job_to_cluster.py --project_id=$GOOGLE_CLOUD_PROJECT --zone=us-central1-b --cluster_name=$CLUSTER --gcs_bucket=$BUCKET --create_new_cluster -This will setup a cluster, upload the PySpark file, submit the job, print the result, then -delete the cluster. +The script will setup a cluster, upload the PySpark file, submit the job, print the result, then, if it created the cluster, delete the cluster. -You can optionally specify a `--pyspark_file` argument to change from the default -`pyspark_sort.py` included in this script to a new script. +Optionally, you can add the `--pyspark_file` argument to change from the default `pyspark_sort.py` included in this script to a new script. diff --git a/dataproc/list_clusters.py b/dataproc/list_clusters.py index 9bbaa3b09c6..1639c413468 100644 --- a/dataproc/list_clusters.py +++ b/dataproc/list_clusters.py @@ -10,49 +10,54 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +"""Sample command-line program to list Cloud Dataproc clusters in a region. -""" Sample command-line program for listing Google Dataproc Clusters -""" +Example usage: +python list_clusters.py --project_id=my-project-id --region=global +""" import argparse -import googleapiclient.discovery +from google.cloud import dataproc_v1 +from google.cloud.dataproc_v1.gapic.transports import ( + cluster_controller_grpc_transport) # [START dataproc_list_clusters] def list_clusters(dataproc, project, region): - result = dataproc.projects().regions().clusters().list( - projectId=project, - region=region).execute() - return result + """List the details of clusters in the region.""" + for cluster in dataproc.list_clusters(project, region): + print(('{} - {}'.format(cluster.cluster_name, + cluster.status.State.Name( + cluster.status.state)))) # [END dataproc_list_clusters] -# [START dataproc_get_client] -def get_client(): - """Builds a client to the dataproc API.""" - dataproc = googleapiclient.discovery.build('dataproc', 'v1') - return dataproc -# [END dataproc_get_client] +def main(project_id, region): + if region == 'global': + # Use the default gRPC global endpoints. + dataproc_cluster_client = dataproc_v1.ClusterControllerClient() + else: + # Use a regional gRPC endpoint. See: + # https://cloud.google.com/dataproc/docs/concepts/regional-endpoints + client_transport = ( + cluster_controller_grpc_transport.ClusterControllerGrpcTransport( + address='{}-dataproc.googleapis.com:443'.format(region))) + dataproc_cluster_client = dataproc_v1.ClusterControllerClient( + client_transport) -def main(project_id, region): - dataproc = get_client() - result = list_clusters(dataproc, project_id, region) - print(result) + list_clusters(dataproc_cluster_client, project_id, region) if __name__ == '__main__': parser = argparse.ArgumentParser( - description=__doc__, - formatter_class=argparse.RawDescriptionHelpFormatter - ) + description=__doc__, formatter_class=( + argparse.RawDescriptionHelpFormatter)) parser.add_argument( - 'project_id', help='Project ID you want to access.'), - # Sets the region to "global" if it's not provided - # Note: sub-regions (e.g.: us-central1-a/b) are currently not supported + '--project_id', help='Project ID to access.', required=True) parser.add_argument( - '--region', default='global', help='Region to list clusters') + '--region', help='Region of clusters to list.', required=True) args = parser.parse_args() main(args.project_id, args.region) diff --git a/dataproc/python-api-walkthrough.md b/dataproc/python-api-walkthrough.md index 0004e2419cd..656b54ac946 100644 --- a/dataproc/python-api-walkthrough.md +++ b/dataproc/python-api-walkthrough.md @@ -121,7 +121,7 @@ Job output in Cloud Shell shows cluster creation, job submission, ... Creating cluster... Cluster created. - Uploading pyspark file to GCS + Uploading pyspark file to Cloud Storage new-cluster-name - RUNNING Submitted job ID ... Waiting for job to finish... @@ -140,12 +140,12 @@ Job output in Cloud Shell shows cluster creation, job submission, ### Next Steps: * **View job details from the Console.** View job details by selecting the - PySpark job from the Cloud Dataproc + PySpark job from the Cloud Dataproc [Jobs page](https://console.cloud.google.com/dataproc/jobs) in the Google Cloud Platform Console. * **Delete resources used in the walkthrough.** - The `submit_job.py` job deletes the cluster that it created for this + The `submit_job_to_cluster.py` job deletes the cluster that it created for this walkthrough. If you created a bucket to use for this walkthrough, diff --git a/dataproc/requirements.txt b/dataproc/requirements.txt index bc5d62ef28d..509e241a417 100644 --- a/dataproc/requirements.txt +++ b/dataproc/requirements.txt @@ -1,5 +1,6 @@ -google-api-python-client==1.7.8 +grpcio>=1.2.0 google-auth==1.6.2 google-auth-httplib2==0.0.3 google-cloud==0.34.0 google-cloud-storage==1.13.2 +google-cloud-dataproc==0.3.1 diff --git a/dataproc/single_job_workflow.py b/dataproc/single_job_workflow.py new file mode 100644 index 00000000000..b17ea0b9599 --- /dev/null +++ b/dataproc/single_job_workflow.py @@ -0,0 +1,208 @@ +#!/usr/bin/env python +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +r"""Sample Cloud Dataproc inline workflow to run a pyspark job on an ephermeral +cluster. +Example Usage to run the inline workflow on a managed cluster: +python single_job_workflow.py --project_id=$PROJECT --gcs_bucket=$BUCKET \ + --cluster_name=$CLUSTER --zone=$ZONE +Example Usage to run the inline workflow on a global region managed cluster: +python submit_job_to_cluster.py --project_id=$PROJECT --gcs_bucket=$BUCKET \ + --cluster_name=$CLUSTER --zone=$ZONE --global_region +""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import argparse +import os + +from google.cloud import dataproc_v1 +from google.cloud.dataproc_v1.gapic.transports import ( + workflow_template_service_grpc_transport) +from google.cloud import storage + +DEFAULT_FILENAME = "pyspark_sort.py" +waiting_callback = False + + +def get_pyspark_file(pyspark_file=None): + if pyspark_file: + f = open(pyspark_file, "rb") + return f, os.path.basename(pyspark_file) + else: + """Gets the PySpark file from current directory.""" + current_dir = os.path.dirname(os.path.abspath(__file__)) + f = open(os.path.join(current_dir, DEFAULT_FILENAME), "rb") + return f, DEFAULT_FILENAME + + +def get_region_from_zone(zone): + try: + region_as_list = zone.split("-")[:-1] + return "-".join(region_as_list) + except (AttributeError, IndexError, ValueError): + raise ValueError("Invalid zone provided, please check your input.") + + +def upload_pyspark_file(project, bucket_name, filename, spark_file): + """Uploads the PySpark file in this directory to the configured input + bucket.""" + print("Uploading pyspark file to Cloud Storage.") + client = storage.Client(project=project) + bucket = client.get_bucket(bucket_name) + blob = bucket.blob(filename) + blob.upload_from_file(spark_file) + + +def run_workflow(dataproc, project, region, zone, bucket_name, filename, + cluster_name): + + parent = "projects/{}/regions/{}".format(project, region) + zone_uri = ("https://www.googleapis.com/compute/v1/projects/{}/zones/{}" + .format(project, zone)) + + workflow_data = { + "placement": { + "managed_cluster": { + "cluster_name": cluster_name, + "config": { + "gce_cluster_config": {"zone_uri": zone_uri}, + "master_config": { + "num_instances": 1, + "machine_type_uri": "n1-standard-1", + }, + "worker_config": { + "num_instances": 2, + "machine_type_uri": "n1-standard-1", + }, + }, + } + }, + "jobs": [ + { + "pyspark_job": { + "main_python_file_uri": "gs://{}/{}".format( + bucket_name, filename) + }, + "step_id": "pyspark-job", + } + ], + } + + workflow = dataproc.instantiate_inline_workflow_template(parent, + workflow_data) + + workflow.add_done_callback(callback) + global waiting_callback + waiting_callback = True + + +def callback(operation_future): + # Reset global when callback returns. + global waiting_callback + waiting_callback = False + + +def wait_for_workflow_end(): + """Wait for cluster creation.""" + print("Waiting for workflow completion ...") + print("Workflow and job progress, and job driver output available from: " + "https://console.cloud.google.com/dataproc/workflows/") + + while True: + if not waiting_callback: + print("Workflow completed.") + break + + +def main( + project_id, + zone, + cluster_name, + bucket_name, + pyspark_file=None, + create_new_cluster=True, + global_region=True, +): + + # [START dataproc_get_workflow_template_client] + if global_region: + region = "global" + # Use the default gRPC global endpoints. + dataproc_workflow_client = dataproc_v1.WorkflowTemplateServiceClient() + else: + region = get_region_from_zone(zone) + # Use a regional gRPC endpoint. See: + # https://cloud.google.com/dataproc/docs/concepts/regional-endpoints + client_transport = (workflow_template_service_grpc_transport + .WorkflowTemplateServiceGrpcTransport( + address="{}-dataproc.googleapis.com:443" + .format(region))) + dataproc_workflow_client = dataproc_v1.WorkflowTemplateServiceClient( + client_transport + ) + # [END dataproc_get_workflow_template_client] + + try: + spark_file, spark_filename = get_pyspark_file(pyspark_file) + upload_pyspark_file(project_id, bucket_name, spark_filename, + spark_file) + + run_workflow( + dataproc_workflow_client, + project_id, + region, + zone, + bucket_name, + spark_filename, + cluster_name + ) + wait_for_workflow_end() + + finally: + spark_file.close() + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description=__doc__, formatter_class=(argparse + .RawDescriptionHelpFormatter)) + parser.add_argument( + "--project_id", help="Project ID you want to access.", required=True + ) + parser.add_argument( + "--zone", help="Zone to create clusters in/connect to", required=True + ) + parser.add_argument( + "--cluster_name", help="Name of the cluster to create/connect to", + required=True + ) + parser.add_argument( + "--gcs_bucket", help="Bucket to upload Pyspark file to", required=True + ) + parser.add_argument( + "--pyspark_file", help="Pyspark filename. Defaults to pyspark_sort.py" + ) + parser.add_argument("--global_region", + action="store_true", + help="If cluster is in the global region") + + args = parser.parse_args() + main( + args.project_id, + args.zone, + args.cluster_name, + args.gcs_bucket, + args.pyspark_file, + ) diff --git a/dataproc/submit_job_to_cluster.py b/dataproc/submit_job_to_cluster.py index f06d5981c16..1c648abc1a7 100644 --- a/dataproc/submit_job_to_cluster.py +++ b/dataproc/submit_job_to_cluster.py @@ -10,28 +10,48 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +r"""Sample command-line program to run a pyspark job on a new or existing +cluster. -""" Sample command-line program for listing Google Dataproc Clusters""" +Global region clusters are supported with --global_region flag. + +Example Usage to run the pyspark job on a new cluster: +python submit_job_to_cluster.py --project_id=$PROJECT --gcs_bucket=$BUCKET \ + --create_new_cluster --cluster_name=$CLUSTER --zone=$ZONE + +Example Usage to run the pyspark job on an existing global region cluster: +python submit_job_to_cluster.py --project_id=$PROJECT --gcs_bucket=$BUCKET \ + --global_region --cluster_name=$CLUSTER --zone=$ZONE + +""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function import argparse import os +from google.cloud import dataproc_v1 +from google.cloud.dataproc_v1.gapic.transports import ( + cluster_controller_grpc_transport) +from google.cloud.dataproc_v1.gapic.transports import ( + job_controller_grpc_transport) from google.cloud import storage -import googleapiclient.discovery DEFAULT_FILENAME = 'pyspark_sort.py' +waiting_callback = False -def get_default_pyspark_file(): - """Gets the PySpark file from this directory""" - current_dir = os.path.dirname(os.path.abspath(__file__)) - f = open(os.path.join(current_dir, DEFAULT_FILENAME), 'rb') - return f, DEFAULT_FILENAME - - -def get_pyspark_file(filename): - f = open(filename, 'rb') - return f, os.path.basename(filename) +def get_pyspark_file(pyspark_file=None): + if pyspark_file: + f = open(pyspark_file, "rb") + return f, os.path.basename(pyspark_file) + else: + """Gets the PySpark file from current directory.""" + current_dir = os.path.dirname(os.path.abspath(__file__)) + f = open(os.path.join(current_dir, DEFAULT_FILENAME), "rb") + return f, DEFAULT_FILENAME def get_region_from_zone(zone): @@ -42,222 +62,226 @@ def get_region_from_zone(zone): raise ValueError('Invalid zone provided, please check your input.') -def upload_pyspark_file(project_id, bucket_name, filename, file): - """Uploads the PySpark file in this directory to the configured - input bucket.""" - print('Uploading pyspark file to GCS') - client = storage.Client(project=project_id) +def upload_pyspark_file(project, bucket_name, filename, spark_file): + """Uploads the PySpark file in this directory to the configured input + bucket.""" + print('Uploading pyspark file to Cloud Storage.') + client = storage.Client(project=project) bucket = client.get_bucket(bucket_name) blob = bucket.blob(filename) - blob.upload_from_file(file) + blob.upload_from_file(spark_file) -def download_output(project_id, cluster_id, output_bucket, job_id): +def download_output(project, cluster_id, output_bucket, job_id): """Downloads the output file from Cloud Storage and returns it as a string.""" - print('Downloading output file') - client = storage.Client(project=project_id) + print('Downloading output file.') + client = storage.Client(project=project) bucket = client.get_bucket(output_bucket) output_blob = ( - 'google-cloud-dataproc-metainfo/{}/jobs/{}/driveroutput.000000000' - .format(cluster_id, job_id)) + ('google-cloud-dataproc-metainfo/{}/jobs/{}/driveroutput.000000000'. + format(cluster_id, job_id))) return bucket.blob(output_blob).download_as_string() # [START dataproc_create_cluster] def create_cluster(dataproc, project, zone, region, cluster_name): + """Create the cluster.""" print('Creating cluster...') zone_uri = \ 'https://www.googleapis.com/compute/v1/projects/{}/zones/{}'.format( project, zone) cluster_data = { - 'projectId': project, - 'clusterName': cluster_name, + 'project_id': project, + 'cluster_name': cluster_name, 'config': { - 'gceClusterConfig': { - 'zoneUri': zone_uri + 'gce_cluster_config': { + 'zone_uri': zone_uri }, - 'masterConfig': { - 'numInstances': 1, - 'machineTypeUri': 'n1-standard-1' + 'master_config': { + 'num_instances': 1, + 'machine_type_uri': 'n1-standard-1' }, - 'workerConfig': { - 'numInstances': 2, - 'machineTypeUri': 'n1-standard-1' + 'worker_config': { + 'num_instances': 2, + 'machine_type_uri': 'n1-standard-1' } } } - result = dataproc.projects().regions().clusters().create( - projectId=project, - region=region, - body=cluster_data).execute() - return result + + cluster = dataproc.create_cluster(project, region, cluster_data) + cluster.add_done_callback(callback) + global waiting_callback + waiting_callback = True # [END dataproc_create_cluster] -def wait_for_cluster_creation(dataproc, project_id, region, cluster_name): +def callback(operation_future): + # Reset global when callback returns. + global waiting_callback + waiting_callback = False + + +def wait_for_cluster_creation(): + """Wait for cluster creation.""" print('Waiting for cluster creation...') while True: - result = dataproc.projects().regions().clusters().list( - projectId=project_id, - region=region).execute() - cluster_list = result['clusters'] - cluster = [c - for c in cluster_list - if c['clusterName'] == cluster_name][0] - if cluster['status']['state'] == 'ERROR': - raise Exception(result['status']['details']) - if cluster['status']['state'] == 'RUNNING': + if not waiting_callback: print("Cluster created.") break # [START dataproc_list_clusters_with_detail] def list_clusters_with_details(dataproc, project, region): - result = dataproc.projects().regions().clusters().list( - projectId=project, - region=region).execute() - cluster_list = result['clusters'] - for cluster in cluster_list: - print("{} - {}" - .format(cluster['clusterName'], cluster['status']['state'])) - return result + """List the details of clusters in the region.""" + for cluster in dataproc.list_clusters(project, region): + print(('{} - {}'.format(cluster.cluster_name, + cluster.status.State.Name( + cluster.status.state)))) # [END dataproc_list_clusters_with_detail] -def get_cluster_id_by_name(cluster_list, cluster_name): +def get_cluster_id_by_name(dataproc, project_id, region, cluster_name): """Helper function to retrieve the ID and output bucket of a cluster by name.""" - cluster = [c for c in cluster_list if c['clusterName'] == cluster_name][0] - return cluster['clusterUuid'], cluster['config']['configBucket'] + for cluster in dataproc.list_clusters(project_id, region): + if cluster.cluster_name == cluster_name: + return cluster.cluster_uuid, cluster.config.config_bucket # [START dataproc_submit_pyspark_job] -def submit_pyspark_job(dataproc, project, region, - cluster_name, bucket_name, filename): - """Submits the Pyspark job to the cluster, assuming `filename` has - already been uploaded to `bucket_name`""" +def submit_pyspark_job(dataproc, project, region, cluster_name, bucket_name, + filename): + """Submit the Pyspark job to the cluster (assumes `filename` was uploaded + to `bucket_name.""" job_details = { - 'projectId': project, - 'job': { - 'placement': { - 'clusterName': cluster_name - }, - 'pysparkJob': { - 'mainPythonFileUri': 'gs://{}/{}'.format(bucket_name, filename) - } + 'placement': { + 'cluster_name': cluster_name + }, + 'pyspark_job': { + 'main_python_file_uri': 'gs://{}/{}'.format(bucket_name, filename) } } - result = dataproc.projects().regions().jobs().submit( - projectId=project, - region=region, - body=job_details).execute() - job_id = result['reference']['jobId'] - print('Submitted job ID {}'.format(job_id)) + + result = dataproc.submit_job( + project_id=project, region=region, job=job_details) + job_id = result.reference.job_id + print('Submitted job ID {}.'.format(job_id)) return job_id # [END dataproc_submit_pyspark_job] # [START dataproc_delete] def delete_cluster(dataproc, project, region, cluster): - print('Tearing down cluster') - result = dataproc.projects().regions().clusters().delete( - projectId=project, - region=region, - clusterName=cluster).execute() + """Delete the cluster.""" + print('Tearing down cluster.') + result = dataproc.delete_cluster( + project_id=project, region=region, cluster_name=cluster) return result # [END dataproc_delete] # [START dataproc_wait] def wait_for_job(dataproc, project, region, job_id): + """Wait for job to complete or error out.""" print('Waiting for job to finish...') while True: - result = dataproc.projects().regions().jobs().get( - projectId=project, - region=region, - jobId=job_id).execute() + job = dataproc.get_job(project, region, job_id) # Handle exceptions - if result['status']['state'] == 'ERROR': - raise Exception(result['status']['details']) - elif result['status']['state'] == 'DONE': + if job.status.State.Name(job.status.state) == 'ERROR': + raise Exception(job.status.details) + elif job.status.State.Name(job.status.state) == 'DONE': print('Job finished.') - return result + return job # [END dataproc_wait] -# [START dataproc_get_client] -def get_client(): - """Builds an http client authenticated with the service account - credentials.""" - dataproc = googleapiclient.discovery.build('dataproc', 'v1') - return dataproc -# [END dataproc_get_client] +def main(project_id, + zone, + cluster_name, + bucket_name, + pyspark_file=None, + create_new_cluster=True, + global_region=True): + + # [START dataproc_get_client] + if global_region: + region = 'global' + # Use the default gRPC global endpoints. + dataproc_cluster_client = dataproc_v1.ClusterControllerClient() + dataproc_job_client = dataproc_v1.JobControllerClient() + else: + region = get_region_from_zone(zone) + # Use a regional gRPC endpoint. See: + # https://cloud.google.com/dataproc/docs/concepts/regional-endpoints + client_transport = ( + cluster_controller_grpc_transport.ClusterControllerGrpcTransport( + address='{}-dataproc.googleapis.com:443'.format(region))) + job_transport = ( + job_controller_grpc_transport.JobControllerGrpcTransport( + address='{}-dataproc.googleapis.com:443'.format(region))) + dataproc_cluster_client = dataproc_v1.ClusterControllerClient( + client_transport) + dataproc_job_client = dataproc_v1.JobControllerClient(job_transport) + # [END dataproc_get_client] - -def main(project_id, zone, cluster_name, bucket_name, - pyspark_file=None, create_new_cluster=True): - dataproc = get_client() - region = get_region_from_zone(zone) try: - if pyspark_file: - spark_file, spark_filename = get_pyspark_file(pyspark_file) - else: - spark_file, spark_filename = get_default_pyspark_file() - + spark_file, spark_filename = get_pyspark_file(pyspark_file) if create_new_cluster: - create_cluster( - dataproc, project_id, zone, region, cluster_name) - wait_for_cluster_creation( - dataproc, project_id, region, cluster_name) - - upload_pyspark_file( - project_id, bucket_name, spark_filename, spark_file) + create_cluster(dataproc_cluster_client, project_id, zone, region, + cluster_name) + wait_for_cluster_creation() + upload_pyspark_file(project_id, bucket_name, spark_filename, + spark_file) - cluster_list = list_clusters_with_details( - dataproc, project_id, region)['clusters'] + list_clusters_with_details(dataproc_cluster_client, project_id, + region) (cluster_id, output_bucket) = ( - get_cluster_id_by_name(cluster_list, cluster_name)) + get_cluster_id_by_name(dataproc_cluster_client, project_id, + region, cluster_name)) # [START dataproc_call_submit_pyspark_job] - job_id = submit_pyspark_job( - dataproc, project_id, region, - cluster_name, bucket_name, spark_filename) + job_id = submit_pyspark_job(dataproc_job_client, project_id, region, + cluster_name, bucket_name, spark_filename) # [END dataproc_call_submit_pyspark_job] - wait_for_job(dataproc, project_id, region, job_id) + wait_for_job(dataproc_job_client, project_id, region, job_id) output = download_output(project_id, cluster_id, output_bucket, job_id) print('Received job output {}'.format(output)) return output finally: if create_new_cluster: - delete_cluster(dataproc, project_id, region, cluster_name) - spark_file.close() + delete_cluster(dataproc_cluster_client, project_id, region, + cluster_name) + spark_file.close() if __name__ == '__main__': - parser = argparse.ArgumentParser( - description=__doc__, - formatter_class=argparse.RawDescriptionHelpFormatter - ) - parser.add_argument( - '--project_id', help='Project ID you want to access.', required=True), - parser.add_argument( - '--zone', help='Zone to create clusters in/connect to', required=True) - parser.add_argument( - '--cluster_name', - help='Name of the cluster to create/connect to', required=True) - parser.add_argument( - '--gcs_bucket', help='Bucket to upload Pyspark file to', required=True) - parser.add_argument( - '--pyspark_file', help='Pyspark filename. Defaults to pyspark_sort.py') + parser = argparse.ArgumentParser(description=__doc__, + formatter_class=argparse. + RawDescriptionHelpFormatter) parser.add_argument( - '--create_new_cluster', - action='store_true', help='States if the cluster should be created') + '--project_id', help='Project ID you want to access.', required=True) + parser.add_argument('--zone', + help='Zone to create clusters in/connect to', + required=True) + parser.add_argument('--cluster_name', + help='Name of the cluster to create/connect to', + required=True) + parser.add_argument('--gcs_bucket', + help='Bucket to upload Pyspark file to', + required=True) + parser.add_argument('--pyspark_file', + help='Pyspark filename. Defaults to pyspark_sort.py') + parser.add_argument('--create_new_cluster', + action='store_true', + help='States if the cluster should be created') + parser.add_argument('--global_region', + action='store_true', + help='If cluster is in the global region') args = parser.parse_args() - main( - args.project_id, args.zone, args.cluster_name, - args.gcs_bucket, args.pyspark_file, args.create_new_cluster) + main(args.project_id, args.zone, args.cluster_name, args.gcs_bucket, + args.pyspark_file, args.create_new_cluster, args.global_region)