@@ -39,10 +39,13 @@ public class FSReadData2 {
39
39
40
40
static final long START_TIME = new Date ().getTime ();
41
41
static final long KEY_VIZ_WINDOW_SECONDS = 10 ;
42
+ private static final String kind = "BillyDF-W-sync-2s500krows" ;
43
+ private static final int RANDOM_ID_BOUND = 500_000 ;
42
44
43
45
public static void main (String [] args ) {
44
46
DataflowPipelineOptions options =
45
47
PipelineOptionsFactory .fromArgs (args ).withValidation ().as (DataflowPipelineOptions .class );
48
+ options .setJobName (kind );
46
49
System .out .println ("options" );
47
50
System .out .println (options );
48
51
@@ -63,10 +66,7 @@ public static class ReadFromTableFn extends DoFn<Long, Void> {
63
66
protected Connection connection ;
64
67
protected Datastore datastore ;
65
68
private KeyFactory keyFactory ;
66
- // private final String kind = "BillyDF500reversed1mswithP";
67
- private final String kind = "BillyDF_WRFP_2s500krows" ;
68
69
69
- private static final int RANDOM_ID_BOUND = 500000 ;
70
70
Random rand = new Random ();
71
71
72
72
public ReadFromTableFn () {
@@ -76,8 +76,8 @@ public ReadFromTableFn() {
76
76
generateRowkeys ();
77
77
}
78
78
79
- protected synchronized Datastore getDatastore () {
80
- if (this .connection == null ) {
79
+ protected Datastore getDatastore () {
80
+ if (this .datastore == null ) {
81
81
DatastoreOptions .Builder builder = DatastoreOptions .newBuilder ();
82
82
datastore = builder .setProjectId ("datastore-mode-kv-prod" ).build ().getService ();
83
83
keyFactory = datastore .newKeyFactory ().setKind (kind );
@@ -100,38 +100,38 @@ public void processElement(@Element Long count, OutputReceiver<Void> out) {
100
100
Datastore ds = getDatastore ();
101
101
int c = 0 ;
102
102
103
- if (count == 1 ) {
104
- for (int i = 0 ; i < keys .length ; i ++) {
105
-
106
- // String paddedRowkey = String.format(numberFormat, i);
107
- // String reversedRowkey = new StringBuilder(paddedRowkey).reverse().toString();
108
- // int rowNumber = i / rowHeight;
109
- //
110
- // float p = (float) rowNumber / numRows;
111
- // System.out.printf("index: %d rowNumber %d prob %f count %d", i, rowNumber, p, count);
112
-
113
- FullEntity <com .google .cloud .datastore .Key > fullEntity =
114
- com .google .cloud .datastore .Entity .newBuilder (keyFactory .newKey (keys [i ]))
115
- .set ("description" , getRandomStringValue ())
116
- .build ();
117
- entities .add (fullEntity );
118
- // logger.info("writing entity" + fullEntity.getKey());
119
- c ++;
120
-
121
- // ds.put(fullEntity);
122
-
123
- // 500 per write limit
124
- if (i % 500 == 0 ) {
125
- ds .put (entities .toArray (new FullEntity <?>[0 ]));
126
- entities = new ArrayList <>();
127
- }
128
- }
103
+ // if (count == 1) {
104
+ for (int i = 0 ; i < keys .length ; i ++) {
105
+
106
+ // String paddedRowkey = String.format(numberFormat, i);
107
+ // String reversedRowkey = new StringBuilder(paddedRowkey).reverse().toString();
108
+ // int rowNumber = i / rowHeight;
109
+ //
110
+ // float p = (float) rowNumber / numRows;
111
+ // System.out.printf("index: %d rowNumber %d prob %f count %d", i, rowNumber, p, count);
112
+
113
+ FullEntity <com .google .cloud .datastore .Key > fullEntity =
114
+ com .google .cloud .datastore .Entity .newBuilder (keyFactory .newKey (keys [i ]))
115
+ .set ("description" , getRandomStringValue ())
116
+ .build ();
117
+ entities .add (fullEntity );
118
+ // logger.info("writing entity" + fullEntity.getKey());
119
+ c ++;
129
120
130
- System . out . println ( c + " entities written" );
121
+ // ds.put(fullEntity );
131
122
132
- ds .put (entities .toArray (new FullEntity <?>[0 ]));
123
+ // 500 per write limit
124
+ if (i % 500 == 0 ) {
125
+ ds .put (entities .toArray (new FullEntity <?>[0 ]));
126
+ entities = new ArrayList <>();
127
+ }
133
128
}
134
129
130
+ System .out .println (c + " entities written" );
131
+
132
+ ds .put (entities .toArray (new FullEntity <?>[0 ]));
133
+ // }
134
+
135
135
EntityQuery .Builder query = Query .newEntityQueryBuilder ()
136
136
.setKind (kind );
137
137
@@ -148,26 +148,27 @@ public void processElement(@Element Long count, OutputReceiver<Void> out) {
148
148
// c++;
149
149
// }
150
150
151
- long timestampDiff = System .currentTimeMillis () - START_TIME ;
152
- long seconds = timestampDiff / 1000 ;
153
- int timeOffsetIndex = Math .toIntExact (seconds / KEY_VIZ_WINDOW_SECONDS );
154
-
155
- List <com .google .cloud .datastore .Key > keysToFetch = new ArrayList <>();
156
- for (int i = 0 ; i < maxInput ; i ++) {
157
- if (timeOffsetIndex % 2 == 0 ) {
158
- String paddedRowkey = String .format (numberFormat , i );
159
- String reversedRowkey = new StringBuilder (paddedRowkey ).reverse ().toString ();
160
- keysToFetch .add (keyFactory .newKey (reversedRowkey ));
161
- c ++;
162
- }
163
- }
164
- List <Entity > fetchedEntities = datastore .fetch ();
165
- for (Entity e : fetchedEntities ) {
166
- // System.out.println(e);
167
- c ++;
168
- }
169
-
170
- System .out .println (c + " entities fetched" );
151
+ // Fetch all rows
152
+ // long timestampDiff = System.currentTimeMillis() - START_TIME;
153
+ // long seconds = timestampDiff / 1000;
154
+ // int timeOffsetIndex = Math.toIntExact(seconds / KEY_VIZ_WINDOW_SECONDS);
155
+ //
156
+ // List<com.google.cloud.datastore.Key> keysToFetch = new ArrayList<>();
157
+ // for (int i = 0; i < maxInput; i++) {
158
+ // if (timeOffsetIndex % 2 == 0) {
159
+ // String paddedRowkey = String.format(numberFormat, i);
160
+ // String reversedRowkey = new StringBuilder(paddedRowkey).reverse().toString();
161
+ // keysToFetch.add(keyFactory.newKey(reversedRowkey));
162
+ // c++;
163
+ // }
164
+ // }
165
+ // List<Entity> fetchedEntities = datastore.fetch();
166
+ // for (Entity e : fetchedEntities) {
167
+ // // System.out.println(e);
168
+ // c++;
169
+ // }
170
+ //
171
+ // System.out.println(c + " entities fetched");
171
172
}
172
173
173
174
private StringValue getRandomStringValue () {
0 commit comments