Skip to content

Commit dc438cb

Browse files
committed
write at start of dataflow job
1 parent 5c5eeea commit dc438cb

File tree

1 file changed

+35
-53
lines changed

1 file changed

+35
-53
lines changed

bigtable/beam/keyviz-art/src/main/java/keyviz/FSReadData2.java

+35-53
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public static void main(String[] args) {
4747

4848
Pipeline p = Pipeline.create(options);
4949
// Initiates a new pipeline every second
50-
p.apply(GenerateSequence.from(0).withRate(1, new Duration(500)))
50+
p.apply(GenerateSequence.from(0).withRate(1, new Duration(1000)))
5151
.apply(ParDo.of(new ReadFromTableFn()));
5252

5353
p.run();
@@ -63,7 +63,7 @@ public static class ReadFromTableFn extends DoFn<Long, Void> {
6363
protected Datastore datastore;
6464
private KeyFactory keyFactory;
6565
// private final String kind = "BillyDF500reversed1mswithP";
66-
private final String kind = "BillyDF500reversed1s";
66+
private final String kind = "BillyDFrw1s500rows";
6767

6868
private static final int RANDOM_ID_BOUND = 500;
6969
Random rand = new Random();
@@ -97,26 +97,38 @@ public void processElement(@Element Long count, OutputReceiver<Void> out) {
9797
// Make each number the same length by padding with 0s
9898
String numberFormat = "%0" + maxLength + "d";
9999
Datastore ds = getDatastore();
100+
int c = 0;
100101

101-
// CompositeFilter evenRangeFilter = CompositeFilter
102-
// .and(
103-
// PropertyFilter.lt("__key__", keyFactory.newKey("250"))
104-
// // PropertyFilter.le("__key__", "1"),
105-
// // PropertyFilter.ge("__key__", "2"), PropertyFilter.le("__key__", "3"),
106-
// // PropertyFilter.ge("__key__", "4"), PropertyFilter.le("__key__", "5"),
107-
// // PropertyFilter.ge("__key__", "6"), PropertyFilter.le("__key__", "7"),
108-
// // PropertyFilter.ge("__key__", "8"), PropertyFilter.le("__key__", "9")
109-
// );
110-
//
111-
// CompositeFilter oddRangeFilter = CompositeFilter
112-
// .and(
113-
// PropertyFilter.ge("__key__", keyFactory.newKey("250"))
114-
// // , PropertyFilter.le("__key__", ""),
115-
// // PropertyFilter.ge("__key__", "3"), PropertyFilter.le("__key__", "4"),
116-
// // PropertyFilter.ge("__key__", "5"), PropertyFilter.le("__key__", "6"),
117-
// // PropertyFilter.ge("__key__", "7"), PropertyFilter.le("__key__", "8"),
118-
// // PropertyFilter.ge("__key__", "9")
119-
// );
102+
if (count == 1) {
103+
for (int i = 0; i < maxInput; i++) {
104+
105+
String paddedRowkey = String.format(numberFormat, i);
106+
String reversedRowkey = new StringBuilder(paddedRowkey).reverse().toString();
107+
int rowNumber = i / rowHeight;
108+
109+
float p = (float) rowNumber / numRows;
110+
System.out.printf("index: %d rowNumber %d prob %f count %d", i, rowNumber, p, count);
111+
112+
FullEntity<com.google.cloud.datastore.Key> fullEntity =
113+
com.google.cloud.datastore.Entity.newBuilder(keyFactory.newKey(reversedRowkey))
114+
.set("description", getRandomStringValue())
115+
.build();
116+
entities.add(fullEntity);
117+
// logger.info("writing entity" + fullEntity.getKey());
118+
c++;
119+
120+
// 500 per write limit
121+
if (i % 500 == 0) {
122+
System.out.printf("500 mod 0");
123+
ds.put(entities.toArray(new FullEntity<?>[0]));
124+
entities = new ArrayList<>();
125+
}
126+
}
127+
128+
System.out.println(c + " entities written");
129+
130+
ds.put(entities.toArray(new FullEntity<?>[0]));
131+
}
120132

121133
EntityQuery.Builder query = Query.newEntityQueryBuilder()
122134
.setKind(kind);
@@ -127,7 +139,7 @@ public void processElement(@Element Long count, OutputReceiver<Void> out) {
127139
// query.setFilter(evenRangeFilter);
128140
// }
129141
QueryResults<Entity> entityQueryResults = ds.run(query.build());
130-
int c = 0;
142+
c = 0;
131143
// while (entityQueryResults.hasNext()) {
132144
// Entity entity = entityQueryResults.next();
133145
// System.out.println(entity);
@@ -139,7 +151,7 @@ public void processElement(@Element Long count, OutputReceiver<Void> out) {
139151
int timeOffsetIndex = Math.toIntExact(seconds / KEY_VIZ_WINDOW_SECONDS);
140152

141153
for (int i = 0; i < maxInput; i++) {
142-
if (Math.random() <= (timeOffsetIndex % 10) / 10f) {
154+
if (timeOffsetIndex % 2 == 0) {
143155
String paddedRowkey = String.format(numberFormat, i);
144156
String reversedRowkey = new StringBuilder(paddedRowkey).reverse().toString();
145157
datastore.get(keyFactory.newKey(reversedRowkey));
@@ -148,36 +160,6 @@ public void processElement(@Element Long count, OutputReceiver<Void> out) {
148160
}
149161
System.out.println(c + " entities fetched");
150162

151-
// for (int i = 0; i < maxInput; i++) {
152-
//
153-
// String paddedRowkey = String.format(numberFormat, i);
154-
// String reversedRowkey = new StringBuilder(paddedRowkey).reverse().toString();
155-
// int rowNumber = i / rowHeight;
156-
//
157-
// float p = (float) rowNumber / numRows;
158-
// System.out.printf("index: %d rowNumber %d prob %f count %d", i, rowNumber, p, count);
159-
//
160-
// if (Math.random() <= p || count < 120) {
161-
// FullEntity<com.google.cloud.datastore.Key> fullEntity =
162-
// com.google.cloud.datastore.Entity.newBuilder(keyFactory.newKey(reversedRowkey))
163-
// .set("description", getRandomStringValue())
164-
// .build();
165-
// entities.add(fullEntity);
166-
// // logger.info("writing entity" + fullEntity.getKey());
167-
// c++;
168-
// }
169-
// // 500 per write limit
170-
// if (i % 500 == 0) {
171-
// System.out.printf("500 mod 0");
172-
// ds.put(entities.toArray(new FullEntity<?>[0]));
173-
// entities = new ArrayList<>();
174-
// }
175-
// }
176-
//
177-
// System.out.println(c + " entities updated");
178-
//
179-
// ds.put(entities.toArray(new FullEntity<?>[0]));
180-
181163
// String kind = "Billy10ms1krs";
182164
// int maxInput = 1000;
183165
// int maxLength = ("" + maxInput).length();

0 commit comments

Comments
 (0)