1
1
/*
2
- Copyright 2015, Google, Inc.
3
- Licensed under the Apache License, Version 2.0 (the "License");
4
- you may not use this file except in compliance with the License.
5
- You may obtain a copy of the License at
6
-
7
- http://www.apache.org/licenses/LICENSE-2.0
8
-
9
- Unless required by applicable law or agreed to in writing, software
10
- distributed under the License is distributed on an "AS IS" BASIS,
11
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
- See the License for the specific language governing permissions and
2
+ Copyright 2015, Google, Inc.
3
+ Licensed under the Apache License, Version 2.0 (the "License");
4
+ you may not use this file except in compliance with the License.
5
+ You may obtain a copy of the License at
6
+
7
+ http://www.apache.org/licenses/LICENSE-2.0
8
+
9
+ Unless required by applicable law or agreed to in writing, software
10
+ distributed under the License is distributed on an "AS IS" BASIS,
11
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
+ See the License for the specific language governing permissions and
13
13
limitations under the License.
14
14
*/
15
15
package com .google .cloud .bigquery .samples ;
31
31
32
32
33
33
/**
34
- * TODO: Insert description here. (generated by elibixby)
34
+ * Example of Bigquery Streaming.
35
35
*/
36
- public class StreamingSample extends BigqueryUtils {
36
+ public class StreamingSample {
37
+
38
+ /**
39
+ * Empty constructor since this is just a collection of static methods.
40
+ */
41
+ protected StreamingSample () {
42
+
43
+ }
44
+
37
45
38
-
39
-
46
+ /**
47
+ * Command line that demonstrates Bigquery streaming.
48
+ * @param args Command line args, should be empty
49
+ * @throws IOException IOexception
50
+ */
40
51
// [START main]
41
- public static void main (String [] args ) throws IOException {
52
+ public static void main (final String [] args ) throws IOException {
42
53
final Scanner scanner = new Scanner (System .in );
43
54
System .out .println ("Enter your project id: " );
44
55
String projectId = scanner .nextLine ();
@@ -47,54 +58,70 @@ public static void main(String[] args) throws IOException{
47
58
System .out .println ("Enter your table id: " );
48
59
String tableId = scanner .nextLine ();
49
60
scanner .close ();
50
-
61
+
51
62
System .out .println ("Enter JSON to stream to BigQuery: \n "
52
63
+ "Press End-of-stream (CTRL-D) to stop" );
53
-
64
+
54
65
JsonReader fromCLI = new JsonReader (new InputStreamReader (System .in ));
55
-
66
+
56
67
Iterator <TableDataInsertAllResponse > responses = run (projectId ,
57
68
datasetId ,
58
69
tableId ,
59
70
fromCLI );
60
-
61
- while (responses .hasNext ()){
71
+
72
+ while (responses .hasNext ()) {
62
73
System .out .println (responses .next ());
63
74
}
64
-
75
+
65
76
fromCLI .close ();
66
77
}
67
78
// [END main]
68
-
69
-
70
-
71
- // [START run]
79
+
80
+
81
+ /**
82
+ * Run the bigquery ClI.
83
+ * @param projectId Project id
84
+ * @param datasetId datasetid
85
+ * @param tableId tableid
86
+ * @param rows The source of the JSON rows we are streaming in.
87
+ * @return Returns Iterates through the stream responses
88
+ * @throws IOException Thrown if there is an error connecting to Bigquery.
89
+ * @throws InterruptedException Should never be thrown
90
+ */
91
+ // [START run]
72
92
public static Iterator <TableDataInsertAllResponse > run (final String projectId ,
73
- final String datasetId ,
93
+ final String datasetId ,
74
94
final String tableId ,
75
- final JsonReader rows ) throws IOException {
76
-
77
-
95
+ final JsonReader rows ) throws IOException {
96
+
97
+
78
98
final Bigquery bigquery = BigqueryServiceFactory .getService ();
79
99
final Gson gson = new Gson ();
80
100
rows .beginArray ();
81
-
82
- return new Iterator <TableDataInsertAllResponse >(){
83
101
102
+ return new Iterator <TableDataInsertAllResponse >() {
103
+
104
+ /**
105
+ * Get the next row in the stream
106
+ * @return True if there is another row in the stream
107
+ */
84
108
public boolean hasNext () {
85
109
try {
86
110
return rows .hasNext ();
87
111
} catch (IOException e ) {
88
- // TODO(elibixby): Auto-generated catch block
89
112
e .printStackTrace ();
90
113
}
91
114
return false ;
92
115
}
93
116
117
+ /**
118
+ *
119
+ * @return Next page of data
120
+ */
94
121
public TableDataInsertAllResponse next () {
95
122
try {
96
123
Map <String , Object > rowData = gson .<Map <String , Object >>fromJson (
97
- rows ,
124
+ rows ,
98
125
(new HashMap <String , Object >()).getClass ());
99
126
return streamRow (bigquery ,
100
127
projectId ,
@@ -112,25 +139,35 @@ public TableDataInsertAllResponse next() {
112
139
public void remove () {
113
140
this .next ();
114
141
}
115
-
142
+
116
143
};
117
-
144
+
118
145
}
119
146
// [END run]
120
-
147
+
148
+ /**
149
+ *
150
+ * @param bigquery The bigquery service
151
+ * @param projectId project id from Google Developers console
152
+ * @param datasetId id of teh dataset
153
+ * @param tableId if the table we're streaming
154
+ * @param row Id of the row we're inserting
155
+ * @return Response from the insert
156
+ * @throws IOException ioexception
157
+ */
121
158
// [START streamRow]
122
- public static TableDataInsertAllResponse streamRow (Bigquery bigquery ,
123
- String projectId ,
124
- String datasetId ,
125
- String tableId ,
126
- TableDataInsertAllRequest .Rows row ) throws IOException {
127
-
159
+ public static TableDataInsertAllResponse streamRow (final Bigquery bigquery ,
160
+ final String projectId ,
161
+ final String datasetId ,
162
+ final String tableId ,
163
+ final TableDataInsertAllRequest .Rows row ) throws IOException {
164
+
128
165
return bigquery .tabledata ().insertAll (
129
- projectId ,
130
- datasetId ,
131
- tableId ,
132
- new TableDataInsertAllRequest ().setRows (Collections . singletonList ( row ))). execute ();
133
-
166
+ projectId ,
167
+ datasetId ,
168
+ tableId ,
169
+ new TableDataInsertAllRequest ().setRows (
170
+ Collections . singletonList ( row ))). execute ();
134
171
}
135
172
// [END streamRow]
136
173
}
0 commit comments