1
+ /*
2
+ * Copyright 2020 Google LLC
3
+ *
4
+ * Licensed under the Apache License, Version 2.0 (the "License");
5
+ * you may not use this file except in compliance with the License.
6
+ * You may obtain a copy of the License at
7
+ *
8
+ * http://www.apache.org/licenses/LICENSE-2.0
9
+ *
10
+ * Unless required by applicable law or agreed to in writing, software
11
+ * distributed under the License is distributed on an "AS IS" BASIS,
12
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
+ * See the License for the specific language governing permissions and
14
+ * limitations under the License.
15
+ */
16
+
17
+ package keyviz ;
18
+
19
+ import com .google .cloud .bigtable .beam .CloudBigtableIO ;
20
+ import com .google .cloud .bigtable .beam .CloudBigtableTableConfiguration ;
21
+ import java .util .Random ;
22
+ import org .apache .beam .runners .dataflow .options .DataflowPipelineOptions ;
23
+ import org .apache .beam .sdk .Pipeline ;
24
+ import org .apache .beam .sdk .io .GenerateSequence ;
25
+ import org .apache .beam .sdk .options .Default ;
26
+ import org .apache .beam .sdk .options .Description ;
27
+ import org .apache .beam .sdk .options .PipelineOptionsFactory ;
28
+ import org .apache .beam .sdk .transforms .DoFn ;
29
+ import org .apache .beam .sdk .transforms .ParDo ;
30
+ import org .apache .hadoop .hbase .client .Mutation ;
31
+ import org .apache .hadoop .hbase .client .Put ;
32
+ import org .apache .hadoop .hbase .util .Bytes ;
33
+
34
+ /**
35
+ * A Beam job that loads random data into Cloud Bigtable.
36
+ */
37
+ public class LoadData {
38
+
39
+ static final long ONE_MB = 1000 * 1000 ;
40
+ static final long ONE_GB = 1000 * ONE_MB ;
41
+ static final String COLUMN_FAMILY = "cf" ;
42
+
43
+ public static void main (String [] args ) {
44
+
45
+ WriteDataOptions options =
46
+ PipelineOptionsFactory .fromArgs (args ).withValidation ().as (WriteDataOptions .class );
47
+ Pipeline p = Pipeline .create (options );
48
+ CloudBigtableTableConfiguration bigtableTableConfig =
49
+ new CloudBigtableTableConfiguration .Builder ()
50
+ .withProjectId (options .getBigtableProjectId ())
51
+ .withInstanceId (options .getBigtableInstanceId ())
52
+ .withTableId (options .getBigtableTableId ())
53
+ .build ();
54
+
55
+ long rowSize = options .getMegabytesPerRow () * ONE_MB ;
56
+ final long max =
57
+ (Math .round ((options .getGigabytesWritten () * ONE_GB )) / rowSize );
58
+ // Make each number the same length by padding with 0s
59
+ int maxLength = ("" + max ).length ();
60
+ String numberFormat = "%0" + maxLength + "d" ;
61
+
62
+ p .apply (GenerateSequence .from (0 ).to (max ))
63
+ .apply (
64
+ ParDo .of (
65
+ new DoFn <Long , Mutation >() {
66
+ @ ProcessElement
67
+ public void processElement (@ Element Long rowkey , OutputReceiver <Mutation > out ) {
68
+ String paddedRowkey = String .format (numberFormat , rowkey );
69
+
70
+ // Reverse the rowkey for more efficient writing
71
+ String reversedRowkey = new StringBuilder (paddedRowkey ).reverse ().toString ();
72
+ Put row = new Put (Bytes .toBytes (reversedRowkey ));
73
+
74
+ // Generate random bytes
75
+ byte [] b = new byte [(int ) rowSize ];
76
+ new Random ().nextBytes (b );
77
+
78
+ long timestamp = System .currentTimeMillis ();
79
+ row .addColumn (Bytes .toBytes (COLUMN_FAMILY ), Bytes .toBytes ("C" ), timestamp , b );
80
+ out .output (row );
81
+ }
82
+ }))
83
+ .apply (CloudBigtableIO .writeToTable (bigtableTableConfig ));
84
+
85
+ p .run ().waitUntilFinish ();
86
+ }
87
+
88
+ public interface WriteDataOptions extends BigtableOptions {
89
+
90
+ @ Description ("The number of gigabytes to write" )
91
+ @ Default .Double (40 )
92
+ double getGigabytesWritten ();
93
+
94
+ void setGigabytesWritten (double gigabytesWritten );
95
+
96
+ @ Description ("The number of megabytes per row to write" )
97
+ @ Default .Long (5 )
98
+ long getMegabytesPerRow ();
99
+
100
+ void setMegabytesPerRow (long megabytesPerRow );
101
+ }
102
+
103
+ public interface BigtableOptions extends DataflowPipelineOptions {
104
+
105
+ @ Description ("The Bigtable project ID, this can be different than your Dataflow project" )
106
+ @ Default .String ("bigtable-project" )
107
+ String getBigtableProjectId ();
108
+
109
+ void setBigtableProjectId (String bigtableProjectId );
110
+
111
+ @ Description ("The Bigtable instance ID" )
112
+ @ Default .String ("bigtable-instance" )
113
+ String getBigtableInstanceId ();
114
+
115
+ void setBigtableInstanceId (String bigtableInstanceId );
116
+
117
+ @ Description ("The Bigtable table ID in the instance." )
118
+ @ Default .String ("bigtable-table" )
119
+ String getBigtableTableId ();
120
+
121
+ void setBigtableTableId (String bigtableTableId );
122
+ }
123
+ }
0 commit comments