gcloud dataflow jobs run iotflow
gcloud dataflow jobs run iotflow
--gcs-location gs://dataflow-templates-us-west1/latest/PubSub_to_BigQuery \
--region us-west1 \
--worker-machine-type e2-medium \
--staging-location gs://qwiklabs-gcp-01-102801ff5577/temp \
--parameters inputTopic=projects/pubsub-public-data/topics/taxirides-
realtime,outputTableSpec=qwiklabs-gcp-01-102801ff5577:taxirides.realtime
import argparse
import logging
import re
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
class DataIngestion:
"""A helper class which contains the logic to translate the file into
a format BigQuery will accept."""
def parse_method(self, string_input):
"""This method translates a single line of comma separated values to a
dictionary which can be loaded into BigQuery.
Args:
string_input: A comma separated list of values in the form of
state_abbreviation,gender,year,name,count_of_babies,dataset_created_date
Example string_input: KS,F,1923,Dorothy,654,11/28/2016
Returns:
A dict mapping BigQuery column names as keys to the corresponding value
parsed from string_input. In this example, the data is not transformed,
and
remains in the same format as the CSV.
example output:
{
'state': 'KS',
'gender': 'F',
'year': '1923',
'name': 'Dorothy',
'number': '654',
'created_date': '11/28/2016'
}
"""
# Strip out carriage return, newline and quote characters.
values = re.split(",",
re.sub('\r\n', '', re.sub('"', '', string_input)))
row = dict(
zip(('state', 'gender', 'year', 'name', 'number', 'created_date'),
values))
return row
def run(argv=None):
"""The main function which creates the pipeline and runs it."""
parser = argparse.ArgumentParser()
# This defaults to the lake dataset in your BigQuery project. You'll have
# to create the lake dataset yourself using this command:
# bq mk lake
parser.add_argument('--output',
dest='output',
required=False,
help='Output BQ table to write results to.',
default='lake.usa_names')
# Initiate the pipeline using the pipeline arguments passed in from the
# command line. This includes information such as the project ID and
# where Dataflow should store temp files.
p = beam.Pipeline(options=PipelineOptions(pipeline_args))
(p
# Read the file. This is the source of the pipeline. All further
# processing starts with lines read from the file. We use the input
# argument from the command line. We also skip the first line which is a
# header row.
| 'Read from a File' >> beam.io.ReadFromText(known_args.input,
skip_header_lines=1)
# This stage of the pipeline translates from a CSV file single row
# input as a string, to a dictionary object consumable by BigQuery.
# It refers to a function we have written. This function will
# be run in parallel on different workers using input from the
# previous stage of the pipeline.
| 'String To BigQuery Row' >>
beam.Map(lambda s: data_ingestion.parse_method(s))
| 'Write to BigQuery' >> beam.io.Write(
beam.io.BigQuerySink(
# The table name is a required argument for the BigQuery sink.
# In this case we use the value passed in from the command line.
known_args.output,
# Here we use the simplest way of defining a schema:
# fieldName:fieldType
schema='state:STRING,gender:STRING,year:STRING,name:STRING,'
'number:STRING,created_date:STRING',
# Creates the table in BigQuery if it does not yet exist.
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
# Deletes all data in the BigQuery table before writing.
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))
p.run().wait_until_finish()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
data transformation.py
# Copyright 2017 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
This example reads a json schema of the intended output into BigQuery,
and transforms the date data to match the format BigQuery expects.
"""
import argparse
import csv
import logging
import os
class DataTransformation:
"""A helper class which contains the logic to translate the file into a
format BigQuery will accept."""
def __init__(self):
dir_path = os.path.dirname(os.path.realpath(__file__))
self.schema_str = ''
# Here we read the output schema from a json file. This is used to specify
the types
# of data we are writing to BigQuery.
schema_file = os.path.join(dir_path, 'resources',
'usa_names_year_as_date.json')
with open(schema_file) \
as f:
data = f.read()
# Wrapping the schema in fields is required for the BigQuery API.
self.schema_str = '{"fields": ' + data + '}'
Args:
string_input: A comma separated list of values in the form of
state_abbreviation,gender,year,name,count_of_babies,dataset_created_date
example string_input: KS,F,1923,Dorothy,654,11/28/2016
Returns:
A dict mapping BigQuery column names as keys to the corresponding value
parsed from string_input. In this example, the data is not
transformed, and
remains in the same format as the CSV. There are no date format
transformations.
example output:
{'state': 'KS',
'gender': 'F',
'year': '1923-01-01', <- This is the BigQuery date format.
'name': 'Dorothy',
'number': '654',
'created_date': '11/28/2016'
}
"""
# Strip out return characters and quote characters.
schema = parse_table_schema_from_json(self.schema_str)
row = {}
i = 0
# Iterate over the values from our csv file, applying any
transformation logic.
for value in csv_row:
# If the schema indicates this field is a date format, we must
# transform the date from the source data into a format that
# BigQuery can understand.
if field_map[i].type == 'DATE':
# Format the date to YYYY-MM-DD format which BigQuery
# accepts.
value = '-'.join((year, month, day))
row[field_map[i].name] = value
i += 1
return row
def run(argv=None):
"""The main function which creates the pipeline and runs it."""
parser = argparse.ArgumentParser()
# Here we add some specific command line arguments we expect. Specifically
# we have the input file to load and the output table to write to.
parser.add_argument(
'--input', dest='input', required=False,
help='Input file to read. This can be a local file or '
'a file in a Google Storage Bucket.',
# This example file contains a total of only 10 lines.
# It is useful for developing on a small set of data
default='gs://spls/gsp290/data_files/head_usa_names.csv')
# This defaults to the temp dataset in your BigQuery project. You'll have
# to create the temp dataset yourself using bq mk temp
parser.add_argument('--output', dest='output', required=False,
help='Output BQ table to write results to.',
default='lake.usa_names_transformed')
# Initiate the pipeline using the pipeline arguments passed in from the
# command line. This includes information like where Dataflow should
# store temp files, and what the project id is.
p = beam.Pipeline(options=PipelineOptions(pipeline_args))
schema = parse_table_schema_from_json(data_ingestion.schema_str)
(p
# Read the file. This is the source of the pipeline. All further
# processing starts with lines read from the file. We use the input
# argument from the command line. We also skip the first line which is a
# header row.
| 'Read From Text' >> beam.io.ReadFromText(known_args.input,
skip_header_lines=1)
# This stage of the pipeline translates from a CSV file single row
# input as a string, to a dictionary object consumable by BigQuery.
# It refers to a function we have written. This function will
# be run in parallel on different workers using input from the
# previous stage of the pipeline.
| 'String to BigQuery Row' >> beam.Map(lambda s:
data_ingestion.parse_method(s))
| 'Write to BigQuery' >> beam.io.Write(
beam.io.BigQuerySink(
# The table name is a required argument for the BigQuery sink.
# In this case we use the value passed in from the command line.
known_args.output,
# Here we use the JSON schema read in from a JSON file.
# Specifying the schema allows the API to create the table correctly if
it does not yet exist.
schema=schema,
# Creates the table in BigQuery if it does not yet exist.
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
# Deletes all data in the BigQuery table before writing.
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))
p.run().wait_until_finish()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()