0% found this document useful (0 votes)
9 views6 pages

gcloud dataflow jobs run iotflow

Uploaded by

vharish10859
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as TXT, PDF, TXT or read online on Scribd
0% found this document useful (0 votes)
9 views6 pages

gcloud dataflow jobs run iotflow

Uploaded by

vharish10859
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as TXT, PDF, TXT or read online on Scribd
You are on page 1/ 6

gcloud dataflow jobs run iotflow \

--gcs-location gs://dataflow-templates-us-west1/latest/PubSub_to_BigQuery \
--region us-west1 \
--worker-machine-type e2-medium \
--staging-location gs://qwiklabs-gcp-01-102801ff5577/temp \
--parameters inputTopic=projects/pubsub-public-data/topics/taxirides-
realtime,outputTableSpec=qwiklabs-gcp-01-102801ff5577:taxirides.realtime

# Copyright 2017 Google Inc.


#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""`data_ingestion.py` is a Dataflow pipeline which reads a file and writes its
contents to a BigQuery table.
This example does not do any transformation on the data.
"""

import argparse
import logging
import re
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

class DataIngestion:
"""A helper class which contains the logic to translate the file into
a format BigQuery will accept."""
def parse_method(self, string_input):
"""This method translates a single line of comma separated values to a
dictionary which can be loaded into BigQuery.

Args:
string_input: A comma separated list of values in the form of

state_abbreviation,gender,year,name,count_of_babies,dataset_created_date
Example string_input: KS,F,1923,Dorothy,654,11/28/2016

Returns:
A dict mapping BigQuery column names as keys to the corresponding value
parsed from string_input. In this example, the data is not transformed,
and
remains in the same format as the CSV.
example output:
{
'state': 'KS',
'gender': 'F',
'year': '1923',
'name': 'Dorothy',
'number': '654',
'created_date': '11/28/2016'
}
"""
# Strip out carriage return, newline and quote characters.
values = re.split(",",
re.sub('\r\n', '', re.sub('"', '', string_input)))
row = dict(
zip(('state', 'gender', 'year', 'name', 'number', 'created_date'),
values))
return row

def run(argv=None):
"""The main function which creates the pipeline and runs it."""

parser = argparse.ArgumentParser()

# Here we add some specific command line arguments we expect.


# Specifically we have the input file to read and the output table to write.
# This is the final stage of the pipeline, where we define the destination
# of the data. In this case we are writing to BigQuery.
parser.add_argument(
'--input',
dest='input',
required=False,
help='Input file to read. This can be a local file or '
'a file in a Google Storage Bucket.',
# This example file contains a total of only 10 lines.
# Useful for developing on a small set of data.
default='gs://spls/gsp290/data_files/head_usa_names.csv')

# This defaults to the lake dataset in your BigQuery project. You'll have
# to create the lake dataset yourself using this command:
# bq mk lake
parser.add_argument('--output',
dest='output',
required=False,
help='Output BQ table to write results to.',
default='lake.usa_names')

# Parse arguments from the command line.


known_args, pipeline_args = parser.parse_known_args(argv)

# DataIngestion is a class we built in this script to hold the logic for


# transforming the file into a BigQuery table.
data_ingestion = DataIngestion()

# Initiate the pipeline using the pipeline arguments passed in from the
# command line. This includes information such as the project ID and
# where Dataflow should store temp files.
p = beam.Pipeline(options=PipelineOptions(pipeline_args))

(p
# Read the file. This is the source of the pipeline. All further
# processing starts with lines read from the file. We use the input
# argument from the command line. We also skip the first line which is a
# header row.
| 'Read from a File' >> beam.io.ReadFromText(known_args.input,
skip_header_lines=1)
# This stage of the pipeline translates from a CSV file single row
# input as a string, to a dictionary object consumable by BigQuery.
# It refers to a function we have written. This function will
# be run in parallel on different workers using input from the
# previous stage of the pipeline.
| 'String To BigQuery Row' >>
beam.Map(lambda s: data_ingestion.parse_method(s))
| 'Write to BigQuery' >> beam.io.Write(
beam.io.BigQuerySink(
# The table name is a required argument for the BigQuery sink.
# In this case we use the value passed in from the command line.
known_args.output,
# Here we use the simplest way of defining a schema:
# fieldName:fieldType
schema='state:STRING,gender:STRING,year:STRING,name:STRING,'
'number:STRING,created_date:STRING',
# Creates the table in BigQuery if it does not yet exist.
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
# Deletes all data in the BigQuery table before writing.
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))
p.run().wait_until_finish()

if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()

data transformation.py
# Copyright 2017 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

""" data_transformation.py is a Dataflow pipeline which reads a file and writes


its contents to a BigQuery table.

This example reads a json schema of the intended output into BigQuery,
and transforms the date data to match the format BigQuery expects.
"""

import argparse
import csv
import logging
import os

import apache_beam as beam


from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json

class DataTransformation:
"""A helper class which contains the logic to translate the file into a
format BigQuery will accept."""

def __init__(self):
dir_path = os.path.dirname(os.path.realpath(__file__))
self.schema_str = ''
# Here we read the output schema from a json file. This is used to specify
the types
# of data we are writing to BigQuery.
schema_file = os.path.join(dir_path, 'resources',
'usa_names_year_as_date.json')
with open(schema_file) \
as f:
data = f.read()
# Wrapping the schema in fields is required for the BigQuery API.
self.schema_str = '{"fields": ' + data + '}'

def parse_method(self, string_input):


"""This method translates a single line of comma separated values to a
dictionary which can be loaded into BigQuery.

Args:
string_input: A comma separated list of values in the form of

state_abbreviation,gender,year,name,count_of_babies,dataset_created_date
example string_input: KS,F,1923,Dorothy,654,11/28/2016

Returns:
A dict mapping BigQuery column names as keys to the corresponding value
parsed from string_input. In this example, the data is not
transformed, and
remains in the same format as the CSV. There are no date format
transformations.

example output:
{'state': 'KS',
'gender': 'F',
'year': '1923-01-01', <- This is the BigQuery date format.
'name': 'Dorothy',
'number': '654',
'created_date': '11/28/2016'
}
"""
# Strip out return characters and quote characters.
schema = parse_table_schema_from_json(self.schema_str)

field_map = [f for f in schema.fields]

# Use a CSV Reader which can handle quoted strings etc.


reader = csv.reader(string_input.split('\n'))
for csv_row in reader:
# Our source data only contains year, so default January 1st as the
# month and day.
month = '01'
day = '01'
# The year comes from our source data.
year = csv_row[2]

row = {}
i = 0
# Iterate over the values from our csv file, applying any
transformation logic.
for value in csv_row:
# If the schema indicates this field is a date format, we must
# transform the date from the source data into a format that
# BigQuery can understand.
if field_map[i].type == 'DATE':
# Format the date to YYYY-MM-DD format which BigQuery
# accepts.
value = '-'.join((year, month, day))

row[field_map[i].name] = value
i += 1

return row

def run(argv=None):
"""The main function which creates the pipeline and runs it."""
parser = argparse.ArgumentParser()
# Here we add some specific command line arguments we expect. Specifically
# we have the input file to load and the output table to write to.
parser.add_argument(
'--input', dest='input', required=False,
help='Input file to read. This can be a local file or '
'a file in a Google Storage Bucket.',
# This example file contains a total of only 10 lines.
# It is useful for developing on a small set of data
default='gs://spls/gsp290/data_files/head_usa_names.csv')
# This defaults to the temp dataset in your BigQuery project. You'll have
# to create the temp dataset yourself using bq mk temp
parser.add_argument('--output', dest='output', required=False,
help='Output BQ table to write results to.',
default='lake.usa_names_transformed')

# Parse arguments from the command line.


known_args, pipeline_args = parser.parse_known_args(argv)
# DataTransformation is a class we built in this script to hold the logic for
# transforming the file into a BigQuery table.
data_ingestion = DataTransformation()

# Initiate the pipeline using the pipeline arguments passed in from the
# command line. This includes information like where Dataflow should
# store temp files, and what the project id is.
p = beam.Pipeline(options=PipelineOptions(pipeline_args))
schema = parse_table_schema_from_json(data_ingestion.schema_str)

(p
# Read the file. This is the source of the pipeline. All further
# processing starts with lines read from the file. We use the input
# argument from the command line. We also skip the first line which is a
# header row.
| 'Read From Text' >> beam.io.ReadFromText(known_args.input,
skip_header_lines=1)
# This stage of the pipeline translates from a CSV file single row
# input as a string, to a dictionary object consumable by BigQuery.
# It refers to a function we have written. This function will
# be run in parallel on different workers using input from the
# previous stage of the pipeline.
| 'String to BigQuery Row' >> beam.Map(lambda s:
data_ingestion.parse_method(s))
| 'Write to BigQuery' >> beam.io.Write(
beam.io.BigQuerySink(
# The table name is a required argument for the BigQuery sink.
# In this case we use the value passed in from the command line.
known_args.output,
# Here we use the JSON schema read in from a JSON file.
# Specifying the schema allows the API to create the table correctly if
it does not yet exist.
schema=schema,
# Creates the table in BigQuery if it does not yet exist.
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
# Deletes all data in the BigQuery table before writing.
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))
p.run().wait_until_finish()

if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()

You might also like