16
16
import com .google .cloud .datastore .StructuredQuery .CompositeFilter ;
17
17
import com .google .cloud .datastore .StructuredQuery .PropertyFilter ;
18
18
import java .util .ArrayList ;
19
+ import java .util .Arrays ;
19
20
import java .util .Date ;
20
21
import java .util .List ;
21
22
import java .util .Random ;
@@ -47,7 +48,7 @@ public static void main(String[] args) {
47
48
48
49
Pipeline p = Pipeline .create (options );
49
50
// Initiates a new pipeline every second
50
- p .apply (GenerateSequence .from (0 ).withRate (1 , new Duration (1000 )))
51
+ p .apply (GenerateSequence .from (0 ).withRate (1 , new Duration (2000 )))
51
52
.apply (ParDo .of (new ReadFromTableFn ()));
52
53
53
54
p .run ();
@@ -63,16 +64,16 @@ public static class ReadFromTableFn extends DoFn<Long, Void> {
63
64
protected Datastore datastore ;
64
65
private KeyFactory keyFactory ;
65
66
// private final String kind = "BillyDF500reversed1mswithP";
66
- private final String kind = "BillyDFrw1s500rowsWithFetch " ;
67
+ private final String kind = "BillyDF_WRFP_2s500krows " ;
67
68
68
- private static final int RANDOM_ID_BOUND = 500 ;
69
+ private static final int RANDOM_ID_BOUND = 500000 ;
69
70
Random rand = new Random ();
70
71
71
72
public ReadFromTableFn () {
72
73
super ();
74
+ keys = new String [RANDOM_ID_BOUND ];
73
75
// downloadImageData(readDataOptions.getFilePath());
74
- // generateRowkeys(getNumRows(readDataOptions));
75
- // generateRowkeys(1000);
76
+ generateRowkeys ();
76
77
}
77
78
78
79
protected synchronized Datastore getDatastore () {
@@ -100,26 +101,27 @@ public void processElement(@Element Long count, OutputReceiver<Void> out) {
100
101
int c = 0 ;
101
102
102
103
if (count == 1 ) {
103
- for (int i = 0 ; i < maxInput ; i ++) {
104
+ for (int i = 0 ; i < keys . length ; i ++) {
104
105
105
- String paddedRowkey = String .format (numberFormat , i );
106
- String reversedRowkey = new StringBuilder (paddedRowkey ).reverse ().toString ();
107
- int rowNumber = i / rowHeight ;
108
-
109
- float p = (float ) rowNumber / numRows ;
110
- System .out .printf ("index: %d rowNumber %d prob %f count %d" , i , rowNumber , p , count );
106
+ // String paddedRowkey = String.format(numberFormat, i);
107
+ // String reversedRowkey = new StringBuilder(paddedRowkey).reverse().toString();
108
+ // int rowNumber = i / rowHeight;
109
+ //
110
+ // float p = (float) rowNumber / numRows;
111
+ // System.out.printf("index: %d rowNumber %d prob %f count %d", i, rowNumber, p, count);
111
112
112
113
FullEntity <com .google .cloud .datastore .Key > fullEntity =
113
- com .google .cloud .datastore .Entity .newBuilder (keyFactory .newKey (reversedRowkey ))
114
+ com .google .cloud .datastore .Entity .newBuilder (keyFactory .newKey (keys [ i ] ))
114
115
.set ("description" , getRandomStringValue ())
115
116
.build ();
116
117
entities .add (fullEntity );
117
118
// logger.info("writing entity" + fullEntity.getKey());
118
119
c ++;
119
120
121
+ // ds.put(fullEntity);
122
+
120
123
// 500 per write limit
121
124
if (i % 500 == 0 ) {
122
- System .out .printf ("500 mod 0" );
123
125
ds .put (entities .toArray (new FullEntity <?>[0 ]));
124
126
entities = new ArrayList <>();
125
127
}
@@ -138,8 +140,8 @@ public void processElement(@Element Long count, OutputReceiver<Void> out) {
138
140
// } else {
139
141
// query.setFilter(evenRangeFilter);
140
142
// }
141
- QueryResults <Entity > entityQueryResults = ds .run (query .build ());
142
- c = 0 ;
143
+ // QueryResults<Entity> entityQueryResults = ds.run(query.build());
144
+ // c = 0;
143
145
// while (entityQueryResults.hasNext()) {
144
146
// Entity entity = entityQueryResults.next();
145
147
// System.out.println(entity);
@@ -159,31 +161,13 @@ public void processElement(@Element Long count, OutputReceiver<Void> out) {
159
161
c ++;
160
162
}
161
163
}
162
- datastore .fetch ();
163
- System .out .println (c + " entities fetched" );
164
+ List <Entity > fetchedEntities = datastore .fetch ();
165
+ for (Entity e : fetchedEntities ) {
166
+ // System.out.println(e);
167
+ c ++;
168
+ }
164
169
165
- // String kind = "Billy10ms1krs";
166
- // int maxInput = 1000;
167
- // int maxLength = ("" + maxInput).length();
168
- // // Make each number the same length by padding with 0s
169
- // String numberFormat = "%0" + maxLength + "d";
170
- //
171
- // for (int i = 0; i < maxInput; i++) {
172
- // byte[] b = new byte[(int) 1000];
173
- // new Random().nextBytes(b);
174
- //
175
- // String paddedRowkey = String.format(numberFormat, i);
176
- // String reversedRowkey = new StringBuilder(paddedRowkey).reverse().toString();
177
- //
178
- // Entity entity = Entity.newBuilder()
179
- // .setKey(makeKey(kind, "testing" + reversedRowkey))
180
- // .putProperties(
181
- // "long",
182
- // makeValue(b.toString()).setExcludeFromIndexes(true).build()
183
- // ).build();
184
- //
185
- // out.output(entity);
186
- // }
170
+ System .out .println (c + " entities fetched" );
187
171
}
188
172
189
173
private StringValue getRandomStringValue () {
@@ -201,6 +185,20 @@ private StringValue getRandomStringValue() {
201
185
private int randomId (Random rand ) {
202
186
return rand .nextInt (RANDOM_ID_BOUND ) + 1 ;
203
187
}
188
+
189
+ private void generateRowkeys () {
190
+ int maxLength = ("" + RANDOM_ID_BOUND ).length ();
191
+ // Make each number the same length by padding with 0s
192
+ String numberFormat = "%0" + maxLength + "d" ;
193
+
194
+ for (int i = 0 ; i < RANDOM_ID_BOUND ; i ++) {
195
+ String paddedRowkey = String .format (numberFormat , i );
196
+ String reversedRowkey = new StringBuilder (paddedRowkey ).reverse ().toString ();
197
+ keys [i ] = "" + reversedRowkey ;
198
+ }
199
+ Arrays .sort (keys );
200
+ }
201
+
204
202
}
205
203
206
204
public interface ReadDataOptions extends BigtableOptions {
0 commit comments