Skip to content

Commit 96a17ca

Browse files
committed
trying new fs keyviz stuff
1 parent 8fb36e7 commit 96a17ca

File tree

1 file changed

+67
-56
lines changed

1 file changed

+67
-56
lines changed

bigtable/beam/keyviz-art/src/main/java/keyviz/FSReadData2.java

+67-56
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public static void main(String[] args) {
4343

4444
Pipeline p = Pipeline.create(options);
4545
// Initiates a new pipeline every second
46-
p.apply(GenerateSequence.from(0).withRate(1, new Duration(1)))
46+
p.apply(GenerateSequence.from(0).withRate(1, new Duration(1000)))
4747
.apply(ParDo.of(new ReadFromTableFn()));
4848

4949
p.run();
@@ -58,7 +58,8 @@ public static class ReadFromTableFn extends DoFn<Long, Void> {
5858
protected Connection connection;
5959
protected Datastore datastore;
6060
private KeyFactory keyFactory;
61-
private final String kind = "BillyDF500reversed1mswithP";
61+
// private final String kind = "BillyDF500reversed1mswithP";
62+
private final String kind = "BillyDF500reversed1s";
6263

6364
private static final int RANDOM_ID_BOUND = 500;
6465
Random rand = new Random();
@@ -93,70 +94,80 @@ public void processElement(@Element Long count, OutputReceiver<Void> out) {
9394
String numberFormat = "%0" + maxLength + "d";
9495
Datastore ds = getDatastore();
9596

96-
CompositeFilter evenRangeFilter = CompositeFilter
97-
.and(
98-
PropertyFilter.lt("__key__", keyFactory.newKey("250"))
99-
// PropertyFilter.le("__key__", "1"),
100-
// PropertyFilter.ge("__key__", "2"), PropertyFilter.le("__key__", "3"),
101-
// PropertyFilter.ge("__key__", "4"), PropertyFilter.le("__key__", "5"),
102-
// PropertyFilter.ge("__key__", "6"), PropertyFilter.le("__key__", "7"),
103-
// PropertyFilter.ge("__key__", "8"), PropertyFilter.le("__key__", "9")
104-
);
105-
106-
CompositeFilter oddRangeFilter = CompositeFilter
107-
.and(
108-
PropertyFilter.ge("__key__", keyFactory.newKey("250"))
109-
// , PropertyFilter.le("__key__", ""),
110-
// PropertyFilter.ge("__key__", "3"), PropertyFilter.le("__key__", "4"),
111-
// PropertyFilter.ge("__key__", "5"), PropertyFilter.le("__key__", "6"),
112-
// PropertyFilter.ge("__key__", "7"), PropertyFilter.le("__key__", "8"),
113-
// PropertyFilter.ge("__key__", "9")
114-
);
97+
// CompositeFilter evenRangeFilter = CompositeFilter
98+
// .and(
99+
// PropertyFilter.lt("__key__", keyFactory.newKey("250"))
100+
// // PropertyFilter.le("__key__", "1"),
101+
// // PropertyFilter.ge("__key__", "2"), PropertyFilter.le("__key__", "3"),
102+
// // PropertyFilter.ge("__key__", "4"), PropertyFilter.le("__key__", "5"),
103+
// // PropertyFilter.ge("__key__", "6"), PropertyFilter.le("__key__", "7"),
104+
// // PropertyFilter.ge("__key__", "8"), PropertyFilter.le("__key__", "9")
105+
// );
106+
//
107+
// CompositeFilter oddRangeFilter = CompositeFilter
108+
// .and(
109+
// PropertyFilter.ge("__key__", keyFactory.newKey("250"))
110+
// // , PropertyFilter.le("__key__", ""),
111+
// // PropertyFilter.ge("__key__", "3"), PropertyFilter.le("__key__", "4"),
112+
// // PropertyFilter.ge("__key__", "5"), PropertyFilter.le("__key__", "6"),
113+
// // PropertyFilter.ge("__key__", "7"), PropertyFilter.le("__key__", "8"),
114+
// // PropertyFilter.ge("__key__", "9")
115+
// );
115116

116117
EntityQuery.Builder query = Query.newEntityQueryBuilder()
117118
.setKind(kind);
118-
if ((count / (2 * 60 * 1000)) % 2 == 0) {
119-
query.setFilter(oddRangeFilter);
120-
} else {
121-
query.setFilter(evenRangeFilter);
122-
}
119+
120+
// if ((count / (2 * 60 * 1000)) % 2 == 0) {
121+
// query.setFilter(oddRangeFilter);
122+
// } else {
123+
// query.setFilter(evenRangeFilter);
124+
// }
123125
QueryResults<Entity> entityQueryResults = ds.run(query.build());
124126
int c = 0;
125-
while (entityQueryResults.hasNext()) {
126-
Entity entity = entityQueryResults.next();
127-
System.out.println(entity);
128-
c++;
129-
}
130-
System.out.println(c + " rows queried");
131-
127+
// while (entityQueryResults.hasNext()) {
128+
// Entity entity = entityQueryResults.next();
129+
// System.out.println(entity);
130+
// c++;
131+
// }
132132

133133
// for (int i = 0; i < maxInput; i++) {
134-
//
135134
// String paddedRowkey = String.format(numberFormat, i);
136135
// String reversedRowkey = new StringBuilder(paddedRowkey).reverse().toString();
137-
// int rowNumber = i / rowHeight;
138-
//
139-
// float p = (float) rowNumber / numRows;
140-
// System.out.printf("index: %d rowNumber %d prob %f count %d", i, rowNumber, p, count);
141-
//
142-
// if (Math.random() <= p || count < 3000) {
143-
// FullEntity<com.google.cloud.datastore.Key> fullEntity =
144-
// com.google.cloud.datastore.Entity.newBuilder(keyFactory.newKey(reversedRowkey))
145-
// .set("description", getRandomStringValue())
146-
// .build();
147-
// entities.add(fullEntity);
148-
//
149-
// // logger.info("writing entity" + fullEntity.getKey());
150-
//
151-
// }
152-
// // 500 per write limit
153-
// if (i % 500 == 0) {
154-
// System.out.printf("500 mod 0");
155-
// ds.put(entities.toArray(new FullEntity<?>[0]));
156-
// entities = new ArrayList<>();
157-
// }
136+
// datastore.get(keyFactory.newKey(reversedRowkey));
137+
// c++;
158138
// }
159-
// ds.put(entities.toArray(new FullEntity<?>[0]));
139+
// System.out.println(c + " entities fetched");
140+
141+
142+
for (int i = 0; i < maxInput; i++) {
143+
144+
String paddedRowkey = String.format(numberFormat, i);
145+
String reversedRowkey = new StringBuilder(paddedRowkey).reverse().toString();
146+
int rowNumber = i / rowHeight;
147+
148+
float p = (float) rowNumber / numRows;
149+
System.out.printf("index: %d rowNumber %d prob %f count %d", i, rowNumber, p, count);
150+
151+
if (Math.random() <= p || count < 120) {
152+
FullEntity<com.google.cloud.datastore.Key> fullEntity =
153+
com.google.cloud.datastore.Entity.newBuilder(keyFactory.newKey(reversedRowkey))
154+
.set("description", getRandomStringValue())
155+
.build();
156+
entities.add(fullEntity);
157+
// logger.info("writing entity" + fullEntity.getKey());
158+
c++;
159+
}
160+
// 500 per write limit
161+
if (i % 500 == 0) {
162+
System.out.printf("500 mod 0");
163+
ds.put(entities.toArray(new FullEntity<?>[0]));
164+
entities = new ArrayList<>();
165+
}
166+
}
167+
168+
System.out.println(c + " entities updated");
169+
170+
ds.put(entities.toArray(new FullEntity<?>[0]));
160171

161172
// String kind = "Billy10ms1krs";
162173
// int maxInput = 1000;

0 commit comments

Comments
 (0)