@@ -47,7 +47,7 @@ public static void main(String[] args) {
47
47
48
48
Pipeline p = Pipeline .create (options );
49
49
// Initiates a new pipeline every second
50
- p .apply (GenerateSequence .from (0 ).withRate (1 , new Duration (500 )))
50
+ p .apply (GenerateSequence .from (0 ).withRate (1 , new Duration (1000 )))
51
51
.apply (ParDo .of (new ReadFromTableFn ()));
52
52
53
53
p .run ();
@@ -63,7 +63,7 @@ public static class ReadFromTableFn extends DoFn<Long, Void> {
63
63
protected Datastore datastore ;
64
64
private KeyFactory keyFactory ;
65
65
// private final String kind = "BillyDF500reversed1mswithP";
66
- private final String kind = "BillyDF500reversed1s " ;
66
+ private final String kind = "BillyDFrw1s500rows " ;
67
67
68
68
private static final int RANDOM_ID_BOUND = 500 ;
69
69
Random rand = new Random ();
@@ -97,26 +97,38 @@ public void processElement(@Element Long count, OutputReceiver<Void> out) {
97
97
// Make each number the same length by padding with 0s
98
98
String numberFormat = "%0" + maxLength + "d" ;
99
99
Datastore ds = getDatastore ();
100
+ int c = 0 ;
100
101
101
- // CompositeFilter evenRangeFilter = CompositeFilter
102
- // .and(
103
- // PropertyFilter.lt("__key__", keyFactory.newKey("250"))
104
- // // PropertyFilter.le("__key__", "1"),
105
- // // PropertyFilter.ge("__key__", "2"), PropertyFilter.le("__key__", "3"),
106
- // // PropertyFilter.ge("__key__", "4"), PropertyFilter.le("__key__", "5"),
107
- // // PropertyFilter.ge("__key__", "6"), PropertyFilter.le("__key__", "7"),
108
- // // PropertyFilter.ge("__key__", "8"), PropertyFilter.le("__key__", "9")
109
- // );
110
- //
111
- // CompositeFilter oddRangeFilter = CompositeFilter
112
- // .and(
113
- // PropertyFilter.ge("__key__", keyFactory.newKey("250"))
114
- // // , PropertyFilter.le("__key__", ""),
115
- // // PropertyFilter.ge("__key__", "3"), PropertyFilter.le("__key__", "4"),
116
- // // PropertyFilter.ge("__key__", "5"), PropertyFilter.le("__key__", "6"),
117
- // // PropertyFilter.ge("__key__", "7"), PropertyFilter.le("__key__", "8"),
118
- // // PropertyFilter.ge("__key__", "9")
119
- // );
102
+ if (count == 1 ) {
103
+ for (int i = 0 ; i < maxInput ; i ++) {
104
+
105
+ String paddedRowkey = String .format (numberFormat , i );
106
+ String reversedRowkey = new StringBuilder (paddedRowkey ).reverse ().toString ();
107
+ int rowNumber = i / rowHeight ;
108
+
109
+ float p = (float ) rowNumber / numRows ;
110
+ System .out .printf ("index: %d rowNumber %d prob %f count %d" , i , rowNumber , p , count );
111
+
112
+ FullEntity <com .google .cloud .datastore .Key > fullEntity =
113
+ com .google .cloud .datastore .Entity .newBuilder (keyFactory .newKey (reversedRowkey ))
114
+ .set ("description" , getRandomStringValue ())
115
+ .build ();
116
+ entities .add (fullEntity );
117
+ // logger.info("writing entity" + fullEntity.getKey());
118
+ c ++;
119
+
120
+ // 500 per write limit
121
+ if (i % 500 == 0 ) {
122
+ System .out .printf ("500 mod 0" );
123
+ ds .put (entities .toArray (new FullEntity <?>[0 ]));
124
+ entities = new ArrayList <>();
125
+ }
126
+ }
127
+
128
+ System .out .println (c + " entities written" );
129
+
130
+ ds .put (entities .toArray (new FullEntity <?>[0 ]));
131
+ }
120
132
121
133
EntityQuery .Builder query = Query .newEntityQueryBuilder ()
122
134
.setKind (kind );
@@ -127,7 +139,7 @@ public void processElement(@Element Long count, OutputReceiver<Void> out) {
127
139
// query.setFilter(evenRangeFilter);
128
140
// }
129
141
QueryResults <Entity > entityQueryResults = ds .run (query .build ());
130
- int c = 0 ;
142
+ c = 0 ;
131
143
// while (entityQueryResults.hasNext()) {
132
144
// Entity entity = entityQueryResults.next();
133
145
// System.out.println(entity);
@@ -139,7 +151,7 @@ public void processElement(@Element Long count, OutputReceiver<Void> out) {
139
151
int timeOffsetIndex = Math .toIntExact (seconds / KEY_VIZ_WINDOW_SECONDS );
140
152
141
153
for (int i = 0 ; i < maxInput ; i ++) {
142
- if (Math . random () <= ( timeOffsetIndex % 10 ) / 10f ) {
154
+ if (timeOffsetIndex % 2 == 0 ) {
143
155
String paddedRowkey = String .format (numberFormat , i );
144
156
String reversedRowkey = new StringBuilder (paddedRowkey ).reverse ().toString ();
145
157
datastore .get (keyFactory .newKey (reversedRowkey ));
@@ -148,36 +160,6 @@ public void processElement(@Element Long count, OutputReceiver<Void> out) {
148
160
}
149
161
System .out .println (c + " entities fetched" );
150
162
151
- // for (int i = 0; i < maxInput; i++) {
152
- //
153
- // String paddedRowkey = String.format(numberFormat, i);
154
- // String reversedRowkey = new StringBuilder(paddedRowkey).reverse().toString();
155
- // int rowNumber = i / rowHeight;
156
- //
157
- // float p = (float) rowNumber / numRows;
158
- // System.out.printf("index: %d rowNumber %d prob %f count %d", i, rowNumber, p, count);
159
- //
160
- // if (Math.random() <= p || count < 120) {
161
- // FullEntity<com.google.cloud.datastore.Key> fullEntity =
162
- // com.google.cloud.datastore.Entity.newBuilder(keyFactory.newKey(reversedRowkey))
163
- // .set("description", getRandomStringValue())
164
- // .build();
165
- // entities.add(fullEntity);
166
- // // logger.info("writing entity" + fullEntity.getKey());
167
- // c++;
168
- // }
169
- // // 500 per write limit
170
- // if (i % 500 == 0) {
171
- // System.out.printf("500 mod 0");
172
- // ds.put(entities.toArray(new FullEntity<?>[0]));
173
- // entities = new ArrayList<>();
174
- // }
175
- // }
176
- //
177
- // System.out.println(c + " entities updated");
178
- //
179
- // ds.put(entities.toArray(new FullEntity<?>[0]));
180
-
181
163
// String kind = "Billy10ms1krs";
182
164
// int maxInput = 1000;
183
165
// int maxLength = ("" + maxInput).length();
0 commit comments