Skip to content

Commit 5c5eeea

Browse files
committed
trying new fs keyviz stuff
1 parent 96a17ca commit 5c5eeea

File tree

1 file changed

+42
-33
lines changed

1 file changed

+42
-33
lines changed

bigtable/beam/keyviz-art/src/main/java/keyviz/FSReadData2.java

+42-33
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import com.google.cloud.datastore.StructuredQuery.CompositeFilter;
1717
import com.google.cloud.datastore.StructuredQuery.PropertyFilter;
1818
import java.util.ArrayList;
19+
import java.util.Date;
1920
import java.util.List;
2021
import java.util.Random;
2122

@@ -35,6 +36,9 @@
3536

3637
public class FSReadData2 {
3738

39+
static final long START_TIME = new Date().getTime();
40+
static final long KEY_VIZ_WINDOW_SECONDS = 10;
41+
3842
public static void main(String[] args) {
3943
DataflowPipelineOptions options =
4044
PipelineOptionsFactory.fromArgs(args).withValidation().as(DataflowPipelineOptions.class);
@@ -43,7 +47,7 @@ public static void main(String[] args) {
4347

4448
Pipeline p = Pipeline.create(options);
4549
// Initiates a new pipeline every second
46-
p.apply(GenerateSequence.from(0).withRate(1, new Duration(1000)))
50+
p.apply(GenerateSequence.from(0).withRate(1, new Duration(500)))
4751
.apply(ParDo.of(new ReadFromTableFn()));
4852

4953
p.run();
@@ -130,44 +134,49 @@ public void processElement(@Element Long count, OutputReceiver<Void> out) {
130134
// c++;
131135
// }
132136

133-
// for (int i = 0; i < maxInput; i++) {
134-
// String paddedRowkey = String.format(numberFormat, i);
135-
// String reversedRowkey = new StringBuilder(paddedRowkey).reverse().toString();
136-
// datastore.get(keyFactory.newKey(reversedRowkey));
137-
// c++;
138-
// }
139-
// System.out.println(c + " entities fetched");
140-
137+
long timestampDiff = System.currentTimeMillis() - START_TIME;
138+
long seconds = timestampDiff / 1000;
139+
int timeOffsetIndex = Math.toIntExact(seconds / KEY_VIZ_WINDOW_SECONDS);
141140

142141
for (int i = 0; i < maxInput; i++) {
143-
144-
String paddedRowkey = String.format(numberFormat, i);
145-
String reversedRowkey = new StringBuilder(paddedRowkey).reverse().toString();
146-
int rowNumber = i / rowHeight;
147-
148-
float p = (float) rowNumber / numRows;
149-
System.out.printf("index: %d rowNumber %d prob %f count %d", i, rowNumber, p, count);
150-
151-
if (Math.random() <= p || count < 120) {
152-
FullEntity<com.google.cloud.datastore.Key> fullEntity =
153-
com.google.cloud.datastore.Entity.newBuilder(keyFactory.newKey(reversedRowkey))
154-
.set("description", getRandomStringValue())
155-
.build();
156-
entities.add(fullEntity);
157-
// logger.info("writing entity" + fullEntity.getKey());
142+
if (Math.random() <= (timeOffsetIndex % 10) / 10f) {
143+
String paddedRowkey = String.format(numberFormat, i);
144+
String reversedRowkey = new StringBuilder(paddedRowkey).reverse().toString();
145+
datastore.get(keyFactory.newKey(reversedRowkey));
158146
c++;
159147
}
160-
// 500 per write limit
161-
if (i % 500 == 0) {
162-
System.out.printf("500 mod 0");
163-
ds.put(entities.toArray(new FullEntity<?>[0]));
164-
entities = new ArrayList<>();
165-
}
166148
}
149+
System.out.println(c + " entities fetched");
167150

168-
System.out.println(c + " entities updated");
169-
170-
ds.put(entities.toArray(new FullEntity<?>[0]));
151+
// for (int i = 0; i < maxInput; i++) {
152+
//
153+
// String paddedRowkey = String.format(numberFormat, i);
154+
// String reversedRowkey = new StringBuilder(paddedRowkey).reverse().toString();
155+
// int rowNumber = i / rowHeight;
156+
//
157+
// float p = (float) rowNumber / numRows;
158+
// System.out.printf("index: %d rowNumber %d prob %f count %d", i, rowNumber, p, count);
159+
//
160+
// if (Math.random() <= p || count < 120) {
161+
// FullEntity<com.google.cloud.datastore.Key> fullEntity =
162+
// com.google.cloud.datastore.Entity.newBuilder(keyFactory.newKey(reversedRowkey))
163+
// .set("description", getRandomStringValue())
164+
// .build();
165+
// entities.add(fullEntity);
166+
// // logger.info("writing entity" + fullEntity.getKey());
167+
// c++;
168+
// }
169+
// // 500 per write limit
170+
// if (i % 500 == 0) {
171+
// System.out.printf("500 mod 0");
172+
// ds.put(entities.toArray(new FullEntity<?>[0]));
173+
// entities = new ArrayList<>();
174+
// }
175+
// }
176+
//
177+
// System.out.println(c + " entities updated");
178+
//
179+
// ds.put(entities.toArray(new FullEntity<?>[0]));
171180

172181
// String kind = "Billy10ms1krs";
173182
// int maxInput = 1000;

0 commit comments

Comments
 (0)