16
16
import com .google .cloud .datastore .StructuredQuery .CompositeFilter ;
17
17
import com .google .cloud .datastore .StructuredQuery .PropertyFilter ;
18
18
import java .util .ArrayList ;
19
+ import java .util .Date ;
19
20
import java .util .List ;
20
21
import java .util .Random ;
21
22
35
36
36
37
public class FSReadData2 {
37
38
39
+ static final long START_TIME = new Date ().getTime ();
40
+ static final long KEY_VIZ_WINDOW_SECONDS = 10 ;
41
+
38
42
public static void main (String [] args ) {
39
43
DataflowPipelineOptions options =
40
44
PipelineOptionsFactory .fromArgs (args ).withValidation ().as (DataflowPipelineOptions .class );
@@ -43,7 +47,7 @@ public static void main(String[] args) {
43
47
44
48
Pipeline p = Pipeline .create (options );
45
49
// Initiates a new pipeline every second
46
- p .apply (GenerateSequence .from (0 ).withRate (1 , new Duration (1000 )))
50
+ p .apply (GenerateSequence .from (0 ).withRate (1 , new Duration (500 )))
47
51
.apply (ParDo .of (new ReadFromTableFn ()));
48
52
49
53
p .run ();
@@ -130,44 +134,49 @@ public void processElement(@Element Long count, OutputReceiver<Void> out) {
130
134
// c++;
131
135
// }
132
136
133
- // for (int i = 0; i < maxInput; i++) {
134
- // String paddedRowkey = String.format(numberFormat, i);
135
- // String reversedRowkey = new StringBuilder(paddedRowkey).reverse().toString();
136
- // datastore.get(keyFactory.newKey(reversedRowkey));
137
- // c++;
138
- // }
139
- // System.out.println(c + " entities fetched");
140
-
137
+ long timestampDiff = System .currentTimeMillis () - START_TIME ;
138
+ long seconds = timestampDiff / 1000 ;
139
+ int timeOffsetIndex = Math .toIntExact (seconds / KEY_VIZ_WINDOW_SECONDS );
141
140
142
141
for (int i = 0 ; i < maxInput ; i ++) {
143
-
144
- String paddedRowkey = String .format (numberFormat , i );
145
- String reversedRowkey = new StringBuilder (paddedRowkey ).reverse ().toString ();
146
- int rowNumber = i / rowHeight ;
147
-
148
- float p = (float ) rowNumber / numRows ;
149
- System .out .printf ("index: %d rowNumber %d prob %f count %d" , i , rowNumber , p , count );
150
-
151
- if (Math .random () <= p || count < 120 ) {
152
- FullEntity <com .google .cloud .datastore .Key > fullEntity =
153
- com .google .cloud .datastore .Entity .newBuilder (keyFactory .newKey (reversedRowkey ))
154
- .set ("description" , getRandomStringValue ())
155
- .build ();
156
- entities .add (fullEntity );
157
- // logger.info("writing entity" + fullEntity.getKey());
142
+ if (Math .random () <= (timeOffsetIndex % 10 ) / 10f ) {
143
+ String paddedRowkey = String .format (numberFormat , i );
144
+ String reversedRowkey = new StringBuilder (paddedRowkey ).reverse ().toString ();
145
+ datastore .get (keyFactory .newKey (reversedRowkey ));
158
146
c ++;
159
147
}
160
- // 500 per write limit
161
- if (i % 500 == 0 ) {
162
- System .out .printf ("500 mod 0" );
163
- ds .put (entities .toArray (new FullEntity <?>[0 ]));
164
- entities = new ArrayList <>();
165
- }
166
148
}
149
+ System .out .println (c + " entities fetched" );
167
150
168
- System .out .println (c + " entities updated" );
169
-
170
- ds .put (entities .toArray (new FullEntity <?>[0 ]));
151
+ // for (int i = 0; i < maxInput; i++) {
152
+ //
153
+ // String paddedRowkey = String.format(numberFormat, i);
154
+ // String reversedRowkey = new StringBuilder(paddedRowkey).reverse().toString();
155
+ // int rowNumber = i / rowHeight;
156
+ //
157
+ // float p = (float) rowNumber / numRows;
158
+ // System.out.printf("index: %d rowNumber %d prob %f count %d", i, rowNumber, p, count);
159
+ //
160
+ // if (Math.random() <= p || count < 120) {
161
+ // FullEntity<com.google.cloud.datastore.Key> fullEntity =
162
+ // com.google.cloud.datastore.Entity.newBuilder(keyFactory.newKey(reversedRowkey))
163
+ // .set("description", getRandomStringValue())
164
+ // .build();
165
+ // entities.add(fullEntity);
166
+ // // logger.info("writing entity" + fullEntity.getKey());
167
+ // c++;
168
+ // }
169
+ // // 500 per write limit
170
+ // if (i % 500 == 0) {
171
+ // System.out.printf("500 mod 0");
172
+ // ds.put(entities.toArray(new FullEntity<?>[0]));
173
+ // entities = new ArrayList<>();
174
+ // }
175
+ // }
176
+ //
177
+ // System.out.println(c + " entities updated");
178
+ //
179
+ // ds.put(entities.toArray(new FullEntity<?>[0]));
171
180
172
181
// String kind = "Billy10ms1krs";
173
182
// int maxInput = 1000;
0 commit comments