Skip to content

Commit 873f567

Browse files
committed
use job name and sync client
1 parent 2cb07bb commit 873f567

File tree

1 file changed

+54
-53
lines changed

1 file changed

+54
-53
lines changed

bigtable/beam/keyviz-art/src/main/java/keyviz/FSReadData2.java

+54-53
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,13 @@ public class FSReadData2 {
3939

4040
static final long START_TIME = new Date().getTime();
4141
static final long KEY_VIZ_WINDOW_SECONDS = 10;
42+
private static final String kind = "BillyDF-W-sync-2s500krows";
43+
private static final int RANDOM_ID_BOUND = 500_000;
4244

4345
public static void main(String[] args) {
4446
DataflowPipelineOptions options =
4547
PipelineOptionsFactory.fromArgs(args).withValidation().as(DataflowPipelineOptions.class);
48+
options.setJobName(kind);
4649
System.out.println("options");
4750
System.out.println(options);
4851

@@ -63,10 +66,7 @@ public static class ReadFromTableFn extends DoFn<Long, Void> {
6366
protected Connection connection;
6467
protected Datastore datastore;
6568
private KeyFactory keyFactory;
66-
// private final String kind = "BillyDF500reversed1mswithP";
67-
private final String kind = "BillyDF_WRFP_2s500krows";
6869

69-
private static final int RANDOM_ID_BOUND = 500000;
7070
Random rand = new Random();
7171

7272
public ReadFromTableFn() {
@@ -76,8 +76,8 @@ public ReadFromTableFn() {
7676
generateRowkeys();
7777
}
7878

79-
protected synchronized Datastore getDatastore() {
80-
if (this.connection == null) {
79+
protected Datastore getDatastore() {
80+
if (this.datastore == null) {
8181
DatastoreOptions.Builder builder = DatastoreOptions.newBuilder();
8282
datastore = builder.setProjectId("datastore-mode-kv-prod").build().getService();
8383
keyFactory = datastore.newKeyFactory().setKind(kind);
@@ -100,38 +100,38 @@ public void processElement(@Element Long count, OutputReceiver<Void> out) {
100100
Datastore ds = getDatastore();
101101
int c = 0;
102102

103-
if (count == 1) {
104-
for (int i = 0; i < keys.length; i++) {
105-
106-
// String paddedRowkey = String.format(numberFormat, i);
107-
// String reversedRowkey = new StringBuilder(paddedRowkey).reverse().toString();
108-
// int rowNumber = i / rowHeight;
109-
//
110-
// float p = (float) rowNumber / numRows;
111-
// System.out.printf("index: %d rowNumber %d prob %f count %d", i, rowNumber, p, count);
112-
113-
FullEntity<com.google.cloud.datastore.Key> fullEntity =
114-
com.google.cloud.datastore.Entity.newBuilder(keyFactory.newKey(keys[i]))
115-
.set("description", getRandomStringValue())
116-
.build();
117-
entities.add(fullEntity);
118-
// logger.info("writing entity" + fullEntity.getKey());
119-
c++;
120-
121-
// ds.put(fullEntity);
122-
123-
// 500 per write limit
124-
if (i % 500 == 0) {
125-
ds.put(entities.toArray(new FullEntity<?>[0]));
126-
entities = new ArrayList<>();
127-
}
128-
}
103+
// if (count == 1) {
104+
for (int i = 0; i < keys.length; i++) {
105+
106+
// String paddedRowkey = String.format(numberFormat, i);
107+
// String reversedRowkey = new StringBuilder(paddedRowkey).reverse().toString();
108+
// int rowNumber = i / rowHeight;
109+
//
110+
// float p = (float) rowNumber / numRows;
111+
// System.out.printf("index: %d rowNumber %d prob %f count %d", i, rowNumber, p, count);
112+
113+
FullEntity<com.google.cloud.datastore.Key> fullEntity =
114+
com.google.cloud.datastore.Entity.newBuilder(keyFactory.newKey(keys[i]))
115+
.set("description", getRandomStringValue())
116+
.build();
117+
entities.add(fullEntity);
118+
// logger.info("writing entity" + fullEntity.getKey());
119+
c++;
129120

130-
System.out.println(c + " entities written");
121+
// ds.put(fullEntity);
131122

132-
ds.put(entities.toArray(new FullEntity<?>[0]));
123+
// 500 per write limit
124+
if (i % 500 == 0) {
125+
ds.put(entities.toArray(new FullEntity<?>[0]));
126+
entities = new ArrayList<>();
127+
}
133128
}
134129

130+
System.out.println(c + " entities written");
131+
132+
ds.put(entities.toArray(new FullEntity<?>[0]));
133+
// }
134+
135135
EntityQuery.Builder query = Query.newEntityQueryBuilder()
136136
.setKind(kind);
137137

@@ -148,26 +148,27 @@ public void processElement(@Element Long count, OutputReceiver<Void> out) {
148148
// c++;
149149
// }
150150

151-
long timestampDiff = System.currentTimeMillis() - START_TIME;
152-
long seconds = timestampDiff / 1000;
153-
int timeOffsetIndex = Math.toIntExact(seconds / KEY_VIZ_WINDOW_SECONDS);
154-
155-
List<com.google.cloud.datastore.Key> keysToFetch = new ArrayList<>();
156-
for (int i = 0; i < maxInput; i++) {
157-
if (timeOffsetIndex % 2 == 0) {
158-
String paddedRowkey = String.format(numberFormat, i);
159-
String reversedRowkey = new StringBuilder(paddedRowkey).reverse().toString();
160-
keysToFetch.add(keyFactory.newKey(reversedRowkey));
161-
c++;
162-
}
163-
}
164-
List<Entity> fetchedEntities = datastore.fetch();
165-
for (Entity e : fetchedEntities) {
166-
// System.out.println(e);
167-
c++;
168-
}
169-
170-
System.out.println(c + " entities fetched");
151+
// Fetch all rows
152+
// long timestampDiff = System.currentTimeMillis() - START_TIME;
153+
// long seconds = timestampDiff / 1000;
154+
// int timeOffsetIndex = Math.toIntExact(seconds / KEY_VIZ_WINDOW_SECONDS);
155+
//
156+
// List<com.google.cloud.datastore.Key> keysToFetch = new ArrayList<>();
157+
// for (int i = 0; i < maxInput; i++) {
158+
// if (timeOffsetIndex % 2 == 0) {
159+
// String paddedRowkey = String.format(numberFormat, i);
160+
// String reversedRowkey = new StringBuilder(paddedRowkey).reverse().toString();
161+
// keysToFetch.add(keyFactory.newKey(reversedRowkey));
162+
// c++;
163+
// }
164+
// }
165+
// List<Entity> fetchedEntities = datastore.fetch();
166+
// for (Entity e : fetchedEntities) {
167+
// // System.out.println(e);
168+
// c++;
169+
// }
170+
//
171+
// System.out.println(c + " entities fetched");
171172
}
172173

173174
private StringValue getRandomStringValue() {

0 commit comments

Comments
 (0)