@@ -43,7 +43,7 @@ public static void main(String[] args) {
43
43
44
44
Pipeline p = Pipeline .create (options );
45
45
// Initiates a new pipeline every second
46
- p .apply (GenerateSequence .from (0 ).withRate (1 , new Duration (1 )))
46
+ p .apply (GenerateSequence .from (0 ).withRate (1 , new Duration (1000 )))
47
47
.apply (ParDo .of (new ReadFromTableFn ()));
48
48
49
49
p .run ();
@@ -58,7 +58,8 @@ public static class ReadFromTableFn extends DoFn<Long, Void> {
58
58
protected Connection connection ;
59
59
protected Datastore datastore ;
60
60
private KeyFactory keyFactory ;
61
- private final String kind = "BillyDF500reversed1mswithP" ;
61
+ // private final String kind = "BillyDF500reversed1mswithP";
62
+ private final String kind = "BillyDF500reversed1s" ;
62
63
63
64
private static final int RANDOM_ID_BOUND = 500 ;
64
65
Random rand = new Random ();
@@ -93,70 +94,80 @@ public void processElement(@Element Long count, OutputReceiver<Void> out) {
93
94
String numberFormat = "%0" + maxLength + "d" ;
94
95
Datastore ds = getDatastore ();
95
96
96
- CompositeFilter evenRangeFilter = CompositeFilter
97
- .and (
98
- PropertyFilter .lt ("__key__" , keyFactory .newKey ("250" ))
99
- // PropertyFilter.le("__key__", "1"),
100
- // PropertyFilter.ge("__key__", "2"), PropertyFilter.le("__key__", "3"),
101
- // PropertyFilter.ge("__key__", "4"), PropertyFilter.le("__key__", "5"),
102
- // PropertyFilter.ge("__key__", "6"), PropertyFilter.le("__key__", "7"),
103
- // PropertyFilter.ge("__key__", "8"), PropertyFilter.le("__key__", "9")
104
- );
105
-
106
- CompositeFilter oddRangeFilter = CompositeFilter
107
- .and (
108
- PropertyFilter .ge ("__key__" , keyFactory .newKey ("250" ))
109
- // , PropertyFilter.le("__key__", ""),
110
- // PropertyFilter.ge("__key__", "3"), PropertyFilter.le("__key__", "4"),
111
- // PropertyFilter.ge("__key__", "5"), PropertyFilter.le("__key__", "6"),
112
- // PropertyFilter.ge("__key__", "7"), PropertyFilter.le("__key__", "8"),
113
- // PropertyFilter.ge("__key__", "9")
114
- );
97
+ // CompositeFilter evenRangeFilter = CompositeFilter
98
+ // .and(
99
+ // PropertyFilter.lt("__key__", keyFactory.newKey("250"))
100
+ // // PropertyFilter.le("__key__", "1"),
101
+ // // PropertyFilter.ge("__key__", "2"), PropertyFilter.le("__key__", "3"),
102
+ // // PropertyFilter.ge("__key__", "4"), PropertyFilter.le("__key__", "5"),
103
+ // // PropertyFilter.ge("__key__", "6"), PropertyFilter.le("__key__", "7"),
104
+ // // PropertyFilter.ge("__key__", "8"), PropertyFilter.le("__key__", "9")
105
+ // );
106
+ //
107
+ // CompositeFilter oddRangeFilter = CompositeFilter
108
+ // .and(
109
+ // PropertyFilter.ge("__key__", keyFactory.newKey("250"))
110
+ // // , PropertyFilter.le("__key__", ""),
111
+ // // PropertyFilter.ge("__key__", "3"), PropertyFilter.le("__key__", "4"),
112
+ // // PropertyFilter.ge("__key__", "5"), PropertyFilter.le("__key__", "6"),
113
+ // // PropertyFilter.ge("__key__", "7"), PropertyFilter.le("__key__", "8"),
114
+ // // PropertyFilter.ge("__key__", "9")
115
+ // );
115
116
116
117
EntityQuery .Builder query = Query .newEntityQueryBuilder ()
117
118
.setKind (kind );
118
- if ((count / (2 * 60 * 1000 )) % 2 == 0 ) {
119
- query .setFilter (oddRangeFilter );
120
- } else {
121
- query .setFilter (evenRangeFilter );
122
- }
119
+
120
+ // if ((count / (2 * 60 * 1000)) % 2 == 0) {
121
+ // query.setFilter(oddRangeFilter);
122
+ // } else {
123
+ // query.setFilter(evenRangeFilter);
124
+ // }
123
125
QueryResults <Entity > entityQueryResults = ds .run (query .build ());
124
126
int c = 0 ;
125
- while (entityQueryResults .hasNext ()) {
126
- Entity entity = entityQueryResults .next ();
127
- System .out .println (entity );
128
- c ++;
129
- }
130
- System .out .println (c + " rows queried" );
131
-
127
+ // while (entityQueryResults.hasNext()) {
128
+ // Entity entity = entityQueryResults.next();
129
+ // System.out.println(entity);
130
+ // c++;
131
+ // }
132
132
133
133
// for (int i = 0; i < maxInput; i++) {
134
- //
135
134
// String paddedRowkey = String.format(numberFormat, i);
136
135
// String reversedRowkey = new StringBuilder(paddedRowkey).reverse().toString();
137
- // int rowNumber = i / rowHeight;
138
- //
139
- // float p = (float) rowNumber / numRows;
140
- // System.out.printf("index: %d rowNumber %d prob %f count %d", i, rowNumber, p, count);
141
- //
142
- // if (Math.random() <= p || count < 3000) {
143
- // FullEntity<com.google.cloud.datastore.Key> fullEntity =
144
- // com.google.cloud.datastore.Entity.newBuilder(keyFactory.newKey(reversedRowkey))
145
- // .set("description", getRandomStringValue())
146
- // .build();
147
- // entities.add(fullEntity);
148
- //
149
- // // logger.info("writing entity" + fullEntity.getKey());
150
- //
151
- // }
152
- // // 500 per write limit
153
- // if (i % 500 == 0) {
154
- // System.out.printf("500 mod 0");
155
- // ds.put(entities.toArray(new FullEntity<?>[0]));
156
- // entities = new ArrayList<>();
157
- // }
136
+ // datastore.get(keyFactory.newKey(reversedRowkey));
137
+ // c++;
158
138
// }
159
- // ds.put(entities.toArray(new FullEntity<?>[0]));
139
+ // System.out.println(c + " entities fetched");
140
+
141
+
142
+ for (int i = 0 ; i < maxInput ; i ++) {
143
+
144
+ String paddedRowkey = String .format (numberFormat , i );
145
+ String reversedRowkey = new StringBuilder (paddedRowkey ).reverse ().toString ();
146
+ int rowNumber = i / rowHeight ;
147
+
148
+ float p = (float ) rowNumber / numRows ;
149
+ System .out .printf ("index: %d rowNumber %d prob %f count %d" , i , rowNumber , p , count );
150
+
151
+ if (Math .random () <= p || count < 120 ) {
152
+ FullEntity <com .google .cloud .datastore .Key > fullEntity =
153
+ com .google .cloud .datastore .Entity .newBuilder (keyFactory .newKey (reversedRowkey ))
154
+ .set ("description" , getRandomStringValue ())
155
+ .build ();
156
+ entities .add (fullEntity );
157
+ // logger.info("writing entity" + fullEntity.getKey());
158
+ c ++;
159
+ }
160
+ // 500 per write limit
161
+ if (i % 500 == 0 ) {
162
+ System .out .printf ("500 mod 0" );
163
+ ds .put (entities .toArray (new FullEntity <?>[0 ]));
164
+ entities = new ArrayList <>();
165
+ }
166
+ }
167
+
168
+ System .out .println (c + " entities updated" );
169
+
170
+ ds .put (entities .toArray (new FullEntity <?>[0 ]));
160
171
161
172
// String kind = "Billy10ms1krs";
162
173
// int maxInput = 1000;
0 commit comments