Skip to content

Commit a45d618

Browse files
committed
FS keyviz art with dataflow and functions
1 parent da15d5b commit a45d618

File tree

5 files changed

+776
-0
lines changed

5 files changed

+776
-0
lines changed

bigtable/beam/fs-keyviz-fn/pom.xml

+57
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
<project xmlns="http://maven.apache.org/POM/4.0.0"
2+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4+
<modelVersion>4.0.0</modelVersion>
5+
6+
<groupId>com.example.cloud.functions</groupId>
7+
<artifactId>functions-hello-world</artifactId>
8+
<version>1.0.0-SNAPSHOT</version>
9+
<properties>
10+
<maven.compiler.target>11</maven.compiler.target>
11+
<maven.compiler.source>11</maven.compiler.source>
12+
</properties>
13+
14+
<dependencies>
15+
<dependency>
16+
<groupId>com.google.cloud</groupId>
17+
<artifactId>google-cloud-datastore</artifactId>
18+
<version>1.103.0</version>
19+
</dependency>
20+
21+
<!-- Required for Function primitives -->
22+
<dependency>
23+
<groupId>com.google.cloud.functions</groupId>
24+
<artifactId>functions-framework-api</artifactId>
25+
<version>1.0.1</version>
26+
<scope>provided</scope>
27+
</dependency>
28+
<dependency>
29+
<groupId>org.apache.commons</groupId>
30+
<artifactId>commons-lang3</artifactId>
31+
<version>3.10</version>
32+
</dependency>
33+
34+
</dependencies>
35+
36+
<build>
37+
<plugins>
38+
<plugin>
39+
<!--
40+
Google Cloud Functions Framework Maven plugin
41+
42+
This plugin allows you to run Cloud Functions Java code
43+
locally. Use the following terminal command to run a
44+
given function locally:
45+
46+
mvn function:run -Drun.functionTarget=your.package.yourFunction
47+
-->
48+
<groupId>com.google.cloud.functions</groupId>
49+
<artifactId>function-maven-plugin</artifactId>
50+
<version>0.9.4</version>
51+
<configuration>
52+
<functionTarget>functions.HelloWorld</functionTarget>
53+
</configuration>
54+
</plugin>
55+
</plugins>
56+
</build>
57+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
package functions;
2+
3+
import static java.util.concurrent.TimeUnit.MILLISECONDS;
4+
import static java.util.concurrent.TimeUnit.MINUTES;
5+
import static java.util.concurrent.TimeUnit.SECONDS;
6+
7+
import com.google.cloud.datastore.Datastore;
8+
import com.google.cloud.datastore.DatastoreOptions;
9+
import com.google.cloud.datastore.Entity;
10+
import com.google.cloud.datastore.FullEntity;
11+
import com.google.cloud.datastore.IncompleteKey;
12+
import com.google.cloud.datastore.Key;
13+
import com.google.cloud.datastore.KeyFactory;
14+
import com.google.cloud.datastore.StringValue;
15+
import com.google.cloud.functions.BackgroundFunction;
16+
import com.google.cloud.functions.Context;
17+
import com.google.common.util.concurrent.Uninterruptibles;
18+
import functions.HelloWorld.PubSubMessage;
19+
import java.util.ArrayList;
20+
import java.util.List;
21+
import java.util.Map;
22+
import java.util.Random;
23+
import java.util.concurrent.Executors;
24+
import java.util.concurrent.ScheduledExecutorService;
25+
import java.util.concurrent.ScheduledFuture;
26+
import java.util.logging.Logger;
27+
import org.apache.commons.lang3.RandomStringUtils;
28+
29+
public class HelloWorld implements BackgroundFunction<PubSubMessage> {
30+
private static final Logger logger = Logger.getLogger(HelloWorld.class.getName());
31+
private static final int NANOS_PER_MICRO = 1_000;
32+
private static final int MICROS_PER_SECOND = 1_000_000;
33+
private static final int RANDOM_ID_BOUND = 10000;
34+
private static final int DURATION_MINUTES = 5;
35+
private static final int TARGET_QPS = 200;
36+
private static final int THREAD_COUNT = 25;
37+
38+
private final Datastore datastore;
39+
private final String kind;
40+
// Create a Key factory to construct keys associated with this project.
41+
private final KeyFactory keyFactory;
42+
Random rand = new Random();
43+
44+
public HelloWorld() {
45+
DatastoreOptions.Builder builder = DatastoreOptions.newBuilder();
46+
datastore = builder.build().getService();
47+
logger.info(datastore.toString());
48+
kind = "BillyFnGradient";
49+
keyFactory = datastore.newKeyFactory().setKind(kind);
50+
}
51+
52+
public void addEntity() {
53+
int numRows = 10;
54+
int rowHeight = (int) (RANDOM_ID_BOUND / numRows);
55+
int id = randomId(rand);
56+
int rowNumber = id / rowHeight;
57+
58+
float p = (float) rowNumber / numRows;
59+
if (Math.random() <= p) {
60+
61+
FullEntity<Key> fullEntity =
62+
Entity.newBuilder(keyFactory.newKey(id))
63+
.set("description", getRandomStringValue())
64+
.build();
65+
// logger.info("writing entity" + fullEntity.getKey());
66+
67+
Entity entity = datastore.put(fullEntity);
68+
}
69+
}
70+
71+
private StringValue getRandomStringValue() {
72+
String randomString =
73+
RandomStringUtils.random(
74+
1000,
75+
/* start= */ 0,
76+
/* end= */ 0,
77+
/* letters= */ true,
78+
/* numbers= */ true,
79+
/* chars= */ null);
80+
return StringValue.newBuilder(randomString).setExcludeFromIndexes(true).build();
81+
}
82+
83+
private int randomId(Random rand) {
84+
return rand.nextInt(RANDOM_ID_BOUND) + 1;
85+
}
86+
87+
@Override
88+
public void accept(PubSubMessage message, Context context) {
89+
long periodMillis = 1_000 / TARGET_QPS * THREAD_COUNT;
90+
List<ScheduledFuture<?>> scheduledFutures = new ArrayList<>();
91+
ScheduledExecutorService executor = Executors.newScheduledThreadPool(THREAD_COUNT);
92+
93+
for (int i = 0; i < THREAD_COUNT; i++) {
94+
ScheduledFuture<?> writeFuture =
95+
executor.scheduleAtFixedRate(
96+
() -> {
97+
Random random = new Random();
98+
try {
99+
addEntity();
100+
} catch (Exception e) {
101+
// catch all exceptions to avoid stopping traffic generator
102+
logger.info("Error when sending message" + e.getMessage());
103+
}
104+
},
105+
0,
106+
periodMillis,
107+
MILLISECONDS);
108+
scheduledFutures.add(writeFuture);
109+
}
110+
Uninterruptibles.sleepUninterruptibly(DURATION_MINUTES, MINUTES);
111+
scheduledFutures.forEach(future -> future.cancel(false));
112+
executor.shutdown();
113+
try {
114+
executor.awaitTermination(60, SECONDS);
115+
} catch (InterruptedException e) {
116+
throw new RuntimeException("Waiting termination when unexpectedly got interrupted", e);
117+
}
118+
}
119+
120+
/** A message that is published by publishers and consumed by subscribers. */
121+
public static class PubSubMessage {
122+
String data;
123+
Map<String, String> attributes;
124+
String messageId;
125+
String publishTime;
126+
}
127+
}

bigtable/beam/keyviz-art/pom.xml

+46
Original file line numberDiff line numberDiff line change
@@ -41,22 +41,68 @@
4141
</properties>
4242

4343
<dependencies>
44+
45+
4446
<dependency>
4547
<groupId>org.apache.beam</groupId>
4648
<artifactId>beam-runners-direct-java</artifactId>
4749
<version>${apache_beam.version}</version>
4850
</dependency>
51+
4952
<dependency>
5053
<groupId>org.apache.beam</groupId>
5154
<artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
5255
<version>${apache_beam.version}</version>
5356
</dependency>
57+
<!-- https://mvnrepository.com/artifact/com.google.cloud/google-cloud-datastore -->
58+
5459

5560
<dependency>
5661
<groupId>com.google.cloud.bigtable</groupId>
5762
<artifactId>bigtable-hbase-beam</artifactId>
5863
<version>1.15.0</version>
64+
<exclusions>
65+
<exclusion>
66+
<groupId>com.google.guava</groupId>
67+
<artifactId>guava</artifactId>
68+
</exclusion>
69+
</exclusions>
5970
</dependency>
71+
<dependency>
72+
<groupId>com.google.cloud</groupId>
73+
<artifactId>google-cloud-datastore</artifactId>
74+
<version>1.103.0</version>
75+
<exclusions>
76+
<exclusion>
77+
<groupId>com.google.guava</groupId>
78+
<artifactId>guava</artifactId>
79+
</exclusion>
80+
</exclusions>
81+
82+
</dependency>
83+
84+
<!-- <dependency>-->
85+
<!-- <groupId>com.google.api.grpc</groupId>-->
86+
<!-- <artifactId>proto-google-cloud-datastore-v1</artifactId>-->
87+
<!-- <version>0.87.0</version>-->
88+
<!-- <exclusions>-->
89+
<!-- <exclusion>-->
90+
<!-- <groupId>com.google.guava</groupId>-->
91+
<!-- <artifactId>guava</artifactId>-->
92+
<!-- </exclusion>-->
93+
<!-- </exclusions>-->
94+
<!-- </dependency>-->
95+
<!-- <dependency>-->
96+
<!-- <groupId>com.google.cloud.datastore</groupId>-->
97+
<!-- <artifactId>datastore-v1-proto-client</artifactId>-->
98+
<!-- <version>1.6.3</version>-->
99+
<!-- <exclusions>-->
100+
<!-- <exclusion>-->
101+
<!-- <groupId>com.google.guava</groupId>-->
102+
<!-- <artifactId>guava</artifactId>-->
103+
<!-- </exclusion>-->
104+
<!-- </exclusions>-->
105+
<!-- </dependency>-->
60106

61107
<dependency>
62108
<groupId>junit</groupId>

0 commit comments

Comments
 (0)