diff --git a/appengine/bigquery-datastore-import/README.md b/appengine/bigquery-datastore-import/README.md new file mode 100644 index 00000000000..8ce01522581 --- /dev/null +++ b/appengine/bigquery-datastore-import/README.md @@ -0,0 +1,4 @@ +bigquery-appengine-datastore-import-sample +========================================== + +Demonstrates how to extract and transform data from the Datastore into a format suitable for ingestion by Google BigQuery, via the App Engine MapReduce library. \ No newline at end of file diff --git a/appengine/bigquery-datastore-import/app.yaml b/appengine/bigquery-datastore-import/app.yaml new file mode 100644 index 00000000000..9f7c4b53c05 --- /dev/null +++ b/appengine/bigquery-datastore-import/app.yaml @@ -0,0 +1,12 @@ +application: your_app_id +version: codelab +runtime: python27 +api_version: 1 +threadsafe: false + +handlers: +- url: /mapreduce(/.*)? + script: mapreduce/main.py + +- url: /.* + script: main.py diff --git a/appengine/bigquery-datastore-import/main.py b/appengine/bigquery-datastore-import/main.py new file mode 100644 index 00000000000..64344d9aa4d --- /dev/null +++ b/appengine/bigquery-datastore-import/main.py @@ -0,0 +1,177 @@ +#!/usr/bin/env python +# +# Copyright 2012 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Sample app demonstrates extraction of GAE Datastore data to Google BigQuery + +Uses the App Engine MapReduce mapper pipeline to read entities +out of the App Engine Datastore, write processed entities into +Cloud Storage in CSV format, then starts another pipeline that +creates a BigQuery ingestion job. Uses code from the log2bq +project: http://code.google.com/p/log2bq/ +""" + + +__author__ = 'manoochehri@google.com (Michael Manoochehri)' + + +import time +import calendar +import datetime +import httplib2 + +from google.appengine.api import taskqueue +from google.appengine.api import users +from google.appengine.ext import blobstore +from google.appengine.ext import db +from google.appengine.ext import webapp +from google.appengine.ext.webapp.util import run_wsgi_app +from google.appengine.ext.webapp import blobstore_handlers +from google.appengine.ext.webapp import util +from google.appengine.ext.webapp import template + +from mapreduce.lib import files +from mapreduce import base_handler +from mapreduce import mapreduce_pipeline + +from apiclient.discovery import build +from oauth2client.appengine import AppAssertionCredentials + + +SCOPE = 'https://www.googleapis.com/auth/bigquery' +PROJECT_ID = 'XXXXXXXXXXXX' # Your Project ID here +BQ_DATASET_ID = 'datastore_data' +GS_BUCKET = 'datastore_csvoutput' +ENTITY_KIND = 'main.ProductSalesData' + + +class ProductSalesData(db.Model): + product_id = db.IntegerProperty(required=True) + date = db.DateTimeProperty(verbose_name=None, + auto_now=True, + auto_now_add=True) + store = db.StringProperty(required=True) + + +class DatastoreMapperPipeline(base_handler.PipelineBase): + def run(self, entity_type): + output = yield mapreduce_pipeline.MapperPipeline( + "Datastore Mapper %s" % entity_type, + "main.datastore_map", + "mapreduce.input_readers.DatastoreInputReader", + output_writer_spec="mapreduce.output_writers.FileOutputWriter", + params={ + "input_reader":{ + "entity_kind": entity_type, + }, + "output_writer":{ + "filesystem": "gs", + "gs_bucket_name": GS_BUCKET, + "output_sharding":"none", + } + }, + shards=12) + yield CloudStorageToBigQuery(output) + + +class CloudStorageToBigQuery(base_handler.PipelineBase): + def run(self, csv_output): + + credentials = AppAssertionCredentials(scope=SCOPE) + http = credentials.authorize(httplib2.Http()) + bigquery_service = build("bigquery", "v2", http=http) + + jobs = bigquery_service.jobs() + table_name = 'datastore_data_%s' % datetime.datetime.utcnow().strftime( + '%m%d%Y_%H%M%S') + files = [str(f.replace('/gs/', 'gs://')) for f in csv_output] + result = jobs.insert(projectId=PROJECT_ID, + body=build_job_data(table_name,files)) + result.execute() + + +def build_job_data(table_name, files): + return {"projectId": PROJECT_ID, + "configuration":{ + "load": { + "sourceUris": files, + "schema":{ + "fields":[ + { + "name":"product_id", + "type":"INTEGER", + }, + { + "name":"date", + "type":"INTEGER", + }, + { + "name":"store", + "type":"STRING", + } + ] + }, + "destinationTable":{ + "projectId": PROJECT_ID, + "datasetId": BQ_DATASET_ID, + "tableId": table_name, + }, + "maxBadRecords": 0, + } + } + } + + +def datastore_map(entity_type): + data = db.to_dict(entity_type) + resultlist = [data.get('product_id'), + timestamp_to_posix(data.get('date')), + data.get('store')] + result = ','.join(['"%s"' % field for field in resultlist]) + yield("%s\n" % result) + + +def timestamp_to_posix(timestamp): + return int(time.mktime(timestamp.timetuple())) + + +class DatastoretoBigQueryStart(webapp.RequestHandler): + def get(self): + pipeline = DatastoreMapperPipeline(ENTITY_KIND) + pipeline.start() + path = pipeline.base_path + "/status?root=" + pipeline.pipeline_id + self.redirect(path) + + +class AddDataHandler(webapp.RequestHandler): + def get(self): + for i in range(0,9): + data = ProductSalesData(product_id=i, + store='Store %s' % str(i)) + self.response.out.write('Added sample Datastore entity #%s
' % str(i)) + data.put() + self.response.out.write('Click here to start the Datastore to BigQuery pipeline.') + + +application = webapp.WSGIApplication( + [('/start', DatastoretoBigQueryStart), + ('/add_data', AddDataHandler)], + debug=True) + +def main(): + run_wsgi_app(application) + +if __name__ == "__main__": + main()