diff --git a/benchmark-framework/pom.xml b/benchmark-framework/pom.xml
index 256222f9d..0e324141f 100644
--- a/benchmark-framework/pom.xml
+++ b/benchmark-framework/pom.xml
@@ -39,6 +39,12 @@
${project.groupId}
driver-artemis
${project.version}
+
+
+ io.netty
+ *
+
+
${project.groupId}
@@ -65,11 +71,6 @@
driver-nats
${project.version}
-
- ${project.groupId}
- driver-nats-streaming
- ${project.version}
-
${project.groupId}
driver-nsq
@@ -139,6 +140,12 @@
HdrHistogram
2.1.12
+
+ ${project.groupId}
+ driver-nats-streaming
+ ${project.version}
+ provided
+
org.projectlombok
lombok
diff --git a/benchmark-framework/src/main/java/io/openmessaging/benchmark/worker/jackson/HistogramSerializer.java b/benchmark-framework/src/main/java/io/openmessaging/benchmark/worker/jackson/HistogramSerializer.java
index a1b61afa3..c1c253f16 100644
--- a/benchmark-framework/src/main/java/io/openmessaging/benchmark/worker/jackson/HistogramSerializer.java
+++ b/benchmark-framework/src/main/java/io/openmessaging/benchmark/worker/jackson/HistogramSerializer.java
@@ -17,6 +17,7 @@
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.ser.std.StdSerializer;
+import com.google.common.base.Preconditions;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.HdrHistogram.Histogram;
@@ -30,16 +31,40 @@ public HistogramSerializer() {
super(Histogram.class);
}
+ static byte[] toByteArray(ByteBuffer buffer) {
+ byte[] encodedBuffer = new byte[buffer.remaining()];
+ buffer.get(encodedBuffer);
+ return encodedBuffer;
+ }
+
+ static ByteBuffer serializeHistogram(Histogram histo, ByteBuffer buffer) {
+ buffer.clear();
+ while (true) {
+ final int outBytes = histo.encodeIntoCompressedByteBuffer(buffer);
+ Preconditions.checkState(outBytes == buffer.position());
+ final int capacity = buffer.capacity();
+ if (outBytes < capacity) {
+ // encoding succesful
+ break;
+ }
+ // We filled the entire buffer, an indication that the buffer was not
+ // large enough, so we double the buffer and try again.
+ // See: https://github.com/HdrHistogram/HdrHistogram/issues/201
+ buffer = ByteBuffer.allocate(capacity * 2);
+ }
+ buffer.flip();
+ return buffer;
+ }
+
@Override
public void serialize(
Histogram histogram, JsonGenerator jsonGenerator, SerializerProvider serializerProvider)
throws IOException {
ByteBuffer buffer = threadBuffer.get();
- buffer.clear();
- histogram.encodeIntoCompressedByteBuffer(buffer);
- byte[] arr = new byte[buffer.position()];
- buffer.flip();
- buffer.get(arr);
- jsonGenerator.writeBinary(arr);
+ ByteBuffer newBuffer = serializeHistogram(histogram, buffer);
+ if (newBuffer != buffer) {
+ threadBuffer.set(newBuffer);
+ }
+ jsonGenerator.writeBinary(toByteArray(newBuffer));
}
}
diff --git a/benchmark-framework/src/test/java/io/openmessaging/benchmark/worker/jackson/HistogramSerDeTest.java b/benchmark-framework/src/test/java/io/openmessaging/benchmark/worker/jackson/HistogramSerDeTest.java
index b06ef7566..9769eac6f 100644
--- a/benchmark-framework/src/test/java/io/openmessaging/benchmark/worker/jackson/HistogramSerDeTest.java
+++ b/benchmark-framework/src/test/java/io/openmessaging/benchmark/worker/jackson/HistogramSerDeTest.java
@@ -18,6 +18,8 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;
import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Random;
import org.HdrHistogram.Histogram;
import org.junit.jupiter.api.Test;
@@ -41,4 +43,64 @@ void deserialize() throws IOException {
assertThat(deserialized).isEqualTo(value);
}
+
+ /**
+ * Create a random histogram and insert the given number of samples.
+ *
+ * @param samples the number of samples to record into the histogram
+ * @return a Histogram with the given number of samples
+ */
+ private Histogram randomHisto(int samples) {
+ Random r = new Random(0xBADBEEF);
+ Histogram h = new org.HdrHistogram.Histogram(5);
+ for (int i = 0; i < samples; i++) {
+ h.recordValue(r.nextInt(10000000));
+ }
+
+ return h;
+ }
+
+ byte[] serializeRandomHisto(int samples, int initialBufferSize) throws Exception {
+ ByteBuffer inbuffer = ByteBuffer.allocate(initialBufferSize);
+ Histogram inHisto = randomHisto(samples);
+ byte[] serialBytes =
+ HistogramSerializer.toByteArray(HistogramSerializer.serializeHistogram(inHisto, inbuffer));
+
+ // check roundtrip
+ Histogram outHisto =
+ Histogram.decodeFromCompressedByteBuffer(ByteBuffer.wrap(serialBytes), Long.MIN_VALUE);
+ assertThat(inHisto).isEqualTo(outHisto);
+
+ return serialBytes;
+ }
+
+ @Test
+ public void testHistogram() throws Exception {
+
+ // in the worker it's 8 MB but it takes a while to make a histogram that big
+ final int bufSize = 1002;
+
+ int samples = 300;
+
+ // we do an exponential search to fit the crossover point where we need to grow the buffer
+ while (true) {
+ byte[] serialBytes = serializeRandomHisto(samples, bufSize);
+ // System.out.println("Samples: " + samples + ", histogram size: " + serialBytes.length);
+ if (serialBytes.length >= bufSize) {
+ break;
+ }
+ samples *= 1.05;
+ }
+
+ // then walk backwards across the point linearly with increment of 1 to check the boundary
+ // carefully
+ while (true) {
+ samples--;
+ byte[] serialBytes = serializeRandomHisto(samples, bufSize);
+ // System.out.println("Samples: " + samples + ", histogram size: " + serialBytes.length);
+ if (serialBytes.length < bufSize - 10) {
+ break;
+ }
+ }
+ }
}
diff --git a/bin/benchmark b/bin/benchmark
index 9954f632d..555893496 100755
--- a/bin/benchmark
+++ b/bin/benchmark
@@ -27,4 +27,4 @@ fi
JVM_MEM="${HEAP_OPTS} -XX:+UseG1GC"
JVM_GC_LOG=" -XX:+PrintGCDetails -XX:+PrintGCApplicationStoppedTime -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=64m -Xloggc:/dev/shm/benchmark-client-gc_%p.log"
-java -server -cp $CLASSPATH $JVM_MEM io.openmessaging.benchmark.Benchmark $*
+java -server -cp $CLASSPATH $JVM_MEM io.openmessaging.benchmark.Benchmark "$@"
diff --git a/bin/benchmark-worker b/bin/benchmark-worker
index ce81e0c8a..164f45b80 100755
--- a/bin/benchmark-worker
+++ b/bin/benchmark-worker
@@ -26,5 +26,12 @@ fi
JVM_MEM="${HEAP_OPTS} -XX:+UseG1GC"
JVM_GC_LOG=" -XX:+PrintGCDetails -XX:+PrintGCApplicationStoppedTime -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=64m -Xloggc:/dev/shm/benchmark-client-gc_%p.log"
-
-exec java -server -cp $CLASSPATH $JVM_MEM io.openmessaging.benchmark.worker.BenchmarkWorker $*
+# Required by Netty for optimized direct byte buffer access
+JVM_OPTS="$JVM_OPTS -Dio.netty.tryReflectionSetAccessible=true"
+JVM_OPTS="$JVM_OPTS --add-opens java.base/java.nio=ALL-UNNAMED --add-opens java.base/jdk.internal.misc=ALL-UNNAMED"
+# Set io.netty.tryReflectionSetAccessible in Pulsar's shaded client
+JVM_OPTS="$JVM_OPTS -Dorg.apache.pulsar.shade.io.netty.tryReflectionSetAccessible=true"
+# Required by Pulsar client optimized checksum calculation on other than Linux x86_64 platforms
+# reflection access to java.util.zip.CRC32C
+JVM_OPTS="$JVM_OPTS --add-opens java.base/java.util.zip=ALL-UNNAMED"
+exec java -server -cp $CLASSPATH $JVM_MEM $JVM_OPTS io.openmessaging.benchmark.worker.BenchmarkWorker "$@"
\ No newline at end of file
diff --git a/driver-kafka/deploy/hdd-deployment/alicloud/deploy.yaml b/driver-kafka/deploy/hdd-deployment/alicloud/deploy.yaml
index 24575d447..24816a5db 100644
--- a/driver-kafka/deploy/hdd-deployment/alicloud/deploy.yaml
+++ b/driver-kafka/deploy/hdd-deployment/alicloud/deploy.yaml
@@ -54,7 +54,7 @@
- set_fact:
zookeeperServers: "{{ groups['zookeeper'] | map('extract', hostvars, ['ansible_default_ipv4', 'address']) | map('regex_replace', '^(.*)$', '\\1:2181') | join(',') }}"
boostrapServers: "{{ groups['kafka'] | map('extract', hostvars, ['private_ip']) | map('regex_replace', '^(.*)$', '\\1:9092') | join(',') }}"
- kafkaVersion: "2.0.0"
+ kafkaVersion: "3.6.1"
- debug:
msg: "zookeeper servers: {{ zookeeperServers }}\nboostrap servers: {{ boostrapServers }}"
- name: Download Kafka package
diff --git a/driver-kafka/deploy/hdd-deployment/deploy.yaml b/driver-kafka/deploy/hdd-deployment/deploy.yaml
index a8d1de38d..b9f632b77 100644
--- a/driver-kafka/deploy/hdd-deployment/deploy.yaml
+++ b/driver-kafka/deploy/hdd-deployment/deploy.yaml
@@ -80,7 +80,7 @@
- set_fact:
zookeeperServers: "{{ groups['zookeeper'] | map('extract', hostvars, ['ansible_default_ipv4', 'address']) | map('regex_replace', '^(.*)$', '\\1:2181') | join(',') }}"
boostrapServers: "{{ groups['kafka'] | map('extract', hostvars, ['private_ip']) | map('regex_replace', '^(.*)$', '\\1:9092') | join(',') }}"
- kafkaVersion: "2.8.1"
+ kafkaVersion: "3.6.1"
- debug:
msg: "zookeeper servers: {{ zookeeperServers }}\nboostrap servers: {{ boostrapServers }}"
- name: Download Kafka package
diff --git a/driver-kafka/deploy/hdd-deployment/templates/kafka.service b/driver-kafka/deploy/hdd-deployment/templates/kafka.service
index c5afac56a..d35ac4b8b 100644
--- a/driver-kafka/deploy/hdd-deployment/templates/kafka.service
+++ b/driver-kafka/deploy/hdd-deployment/templates/kafka.service
@@ -5,7 +5,7 @@ After=network.target
[Service]
ExecStart=/opt/kafka/bin/kafka-server-start.sh config/server.properties
Environment='KAFKA_HEAP_OPTS=-Xms6g -Xmx6g -XX:MetaspaceSize=96m'
-Environment='KAFKA_JVM_PERFORMANCE_OPTS=-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80 -Djava.awt.headless=true"
+Environment='KAFKA_JVM_PERFORMANCE_OPTS=-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80 -Djava.awt.headless=true'
WorkingDirectory=/opt/kafka
RestartSec=1s
Restart=on-failure
diff --git a/driver-kafka/deploy/ssd-deployment/alicloud/deploy.yaml b/driver-kafka/deploy/ssd-deployment/alicloud/deploy.yaml
index 24575d447..24816a5db 100644
--- a/driver-kafka/deploy/ssd-deployment/alicloud/deploy.yaml
+++ b/driver-kafka/deploy/ssd-deployment/alicloud/deploy.yaml
@@ -54,7 +54,7 @@
- set_fact:
zookeeperServers: "{{ groups['zookeeper'] | map('extract', hostvars, ['ansible_default_ipv4', 'address']) | map('regex_replace', '^(.*)$', '\\1:2181') | join(',') }}"
boostrapServers: "{{ groups['kafka'] | map('extract', hostvars, ['private_ip']) | map('regex_replace', '^(.*)$', '\\1:9092') | join(',') }}"
- kafkaVersion: "2.0.0"
+ kafkaVersion: "3.6.1"
- debug:
msg: "zookeeper servers: {{ zookeeperServers }}\nboostrap servers: {{ boostrapServers }}"
- name: Download Kafka package
diff --git a/driver-kafka/deploy/ssd-deployment/deploy.yaml b/driver-kafka/deploy/ssd-deployment/deploy.yaml
index 6a1e502bb..d02ce5dda 100644
--- a/driver-kafka/deploy/ssd-deployment/deploy.yaml
+++ b/driver-kafka/deploy/ssd-deployment/deploy.yaml
@@ -103,7 +103,7 @@
- set_fact:
zookeeperServers: "{{ groups['zookeeper'] | map('extract', hostvars, ['ansible_default_ipv4', 'address']) | map('regex_replace', '^(.*)$', '\\1:2181') | join(',') }}"
bootstrapServers: "{{ groups['kafka'] | map('extract', hostvars, ['private_ip']) | map('regex_replace', '^(.*)$', '\\1:9092') | join(',') }}"
- kafkaVersion: "3.2.1"
+ kafkaVersion: "3.6.1"
tags: [client-code]
- debug:
msg: "zookeeper servers: {{ zookeeperServers }}\nbootstrap servers: {{ bootstrapServers }}"
diff --git a/driver-kafka/deploy/ssd-deployment/templates/kafka.service b/driver-kafka/deploy/ssd-deployment/templates/kafka.service
index 39c04c1a8..9d4a975ef 100644
--- a/driver-kafka/deploy/ssd-deployment/templates/kafka.service
+++ b/driver-kafka/deploy/ssd-deployment/templates/kafka.service
@@ -5,7 +5,7 @@ After=network.target
[Service]
ExecStart=/opt/kafka/bin/kafka-server-start.sh config/server.properties
Environment='KAFKA_HEAP_OPTS=-Xms16g -Xmx16g -XX:MetaspaceSize=96m'
-Environment='KAFKA_JVM_PERFORMANCE_OPTS=-server -XX:+UnlockExperimentalVMOptions -XX:+UseZGC -XX:+ParallelRefProcEnabled -XX:+AggressiveOpts -XX:+DoEscapeAnalysis -XX:ParallelGCThreads=12 -XX:ConcGCThreads=12 -XX:+DisableExplicitGC -XX:-ResizePLAB -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80 -Djava.awt.headless=true"
+Environment='KAFKA_JVM_PERFORMANCE_OPTS=-server -XX:+UnlockExperimentalVMOptions -XX:+UseZGC -XX:+ParallelRefProcEnabled -XX:+AggressiveOpts -XX:+DoEscapeAnalysis -XX:ParallelGCThreads=12 -XX:ConcGCThreads=12 -XX:+DisableExplicitGC -XX:-ResizePLAB -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80 -Djava.awt.headless=true'
WorkingDirectory=/opt/kafka
RestartSec=1s
Restart=on-failure
diff --git a/driver-kafka/pom.xml b/driver-kafka/pom.xml
index 176bdfff4..83324ab5c 100644
--- a/driver-kafka/pom.xml
+++ b/driver-kafka/pom.xml
@@ -33,7 +33,7 @@
org.apache.kafka
kafka-clients
- 3.2.1
+ 3.6.1
org.projectlombok
diff --git a/driver-kafka/src/main/java/io/openmessaging/benchmark/driver/kafka/KafkaBenchmarkDriver.java b/driver-kafka/src/main/java/io/openmessaging/benchmark/driver/kafka/KafkaBenchmarkDriver.java
index a2789ac2a..d506b5b37 100644
--- a/driver-kafka/src/main/java/io/openmessaging/benchmark/driver/kafka/KafkaBenchmarkDriver.java
+++ b/driver-kafka/src/main/java/io/openmessaging/benchmark/driver/kafka/KafkaBenchmarkDriver.java
@@ -45,6 +45,9 @@
public class KafkaBenchmarkDriver implements BenchmarkDriver {
+ private static final String ZONE_ID_CONFIG = "zone.id";
+ private static final String ZONE_ID_TEMPLATE = "{zone.id}";
+ private static final String KAFKA_CLIENT_ID = "client.id";
private Config config;
private List producers = Collections.synchronizedList(new ArrayList<>());
@@ -63,6 +66,13 @@ public void initialize(File configurationFile, StatsLogger statsLogger) throws I
Properties commonProperties = new Properties();
commonProperties.load(new StringReader(config.commonConfig));
+ if (commonProperties.containsKey(KAFKA_CLIENT_ID)) {
+ commonProperties.put(
+ KAFKA_CLIENT_ID,
+ applyZoneId(
+ commonProperties.getProperty(KAFKA_CLIENT_ID), System.getProperty(ZONE_ID_CONFIG)));
+ }
+
producerProperties = new Properties();
commonProperties.forEach((key, value) -> producerProperties.put(key, value));
producerProperties.load(new StringReader(config.producerConfig));
@@ -151,6 +161,10 @@ public void close() throws Exception {
admin.close();
}
+ private static String applyZoneId(String clientId, String zoneId) {
+ return clientId.replace(ZONE_ID_TEMPLATE, zoneId);
+ }
+
private static final ObjectMapper mapper =
new ObjectMapper(new YAMLFactory())
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
diff --git a/driver-kop/pom.xml b/driver-kop/pom.xml
index 74843ac77..def80e60b 100644
--- a/driver-kop/pom.xml
+++ b/driver-kop/pom.xml
@@ -50,6 +50,11 @@
org.apache.commons
commons-lang3
+
+ org.apache.logging.log4j
+ log4j-slf4j-impl
+ test
+
org.testng
testng
diff --git a/driver-nats/src/main/java/io/openmessaging/benchmark/driver/nats/NatsBenchmarkDriver.java b/driver-nats/src/main/java/io/openmessaging/benchmark/driver/nats/NatsBenchmarkDriver.java
index c5b677f30..8071f36df 100644
--- a/driver-nats/src/main/java/io/openmessaging/benchmark/driver/nats/NatsBenchmarkDriver.java
+++ b/driver-nats/src/main/java/io/openmessaging/benchmark/driver/nats/NatsBenchmarkDriver.java
@@ -28,7 +28,6 @@
import io.nats.client.Options;
import io.nats.client.PushSubscribeOptions;
import io.nats.client.api.ConsumerConfiguration;
-import io.nats.client.api.StorageType;
import io.nats.client.api.StreamConfiguration;
import io.nats.client.api.StreamInfo;
import io.nats.client.support.JsonUtils;
@@ -94,7 +93,8 @@ public CompletableFuture createTopic(String topic, int partitions) {
StreamConfiguration.builder()
.name(topic)
.subjects(topic)
- .storageType(StorageType.File)
+ .storageType(config.storageType)
+ .maxBytes(config.maxBytes)
.replicas(config.replicationFactor)
.build());
log.info("Created stream {} -- {}", topic, JsonUtils.getFormatted(streamInfo));
diff --git a/driver-nats/src/main/java/io/openmessaging/benchmark/driver/nats/NatsConfig.java b/driver-nats/src/main/java/io/openmessaging/benchmark/driver/nats/NatsConfig.java
index f6770c982..d76a900b3 100644
--- a/driver-nats/src/main/java/io/openmessaging/benchmark/driver/nats/NatsConfig.java
+++ b/driver-nats/src/main/java/io/openmessaging/benchmark/driver/nats/NatsConfig.java
@@ -13,8 +13,16 @@
*/
package io.openmessaging.benchmark.driver.nats;
+
+import io.nats.client.api.StorageType;
+
public class NatsConfig {
public String natsHostUrl;
public int replicationFactor;
+
+ public StorageType storageType = StorageType.File;
+
+ // -1 is unlimited
+ public int maxBytes = -1;
}
diff --git a/driver-pravega/README.md b/driver-pravega/README.md
index e67d9ba21..5b45bb6f9 100644
--- a/driver-pravega/README.md
+++ b/driver-pravega/README.md
@@ -30,13 +30,15 @@ check [how to build Pravega](doc/build_pravega.md).
# DEPLOY A PRAVEGA CLUSTER ON AMAZON WEB SERVICES
-You can deploy a Pravega cluster on AWS (for benchmarking purposes) using [Terraform 0.12.20](https://www.terraform.io/) and [Ansible 2.8.5](https://docs.ansible.com/ansible/latest/installation_guide/intro_installation.html).
-You’ll need to have both of those tools installed as well as the `terraform-inventory` [plugin](https://github.com/adammck/terraform-inventory) for Terraform.
+You can deploy a Pravega cluster on AWS (for benchmarking purposes) using [Terraform 0.12.20](https://www.terraform.io/)
+and [Ansible 2.8.5](https://docs.ansible.com/ansible/latest/installation_guide/intro_installation.html) (with a
+a version of `Jinja2=<3.0.3`).
+You’ll need to have both tools installed, as well as the `terraform-inventory` [plugin](https://github.com/adammck/terraform-inventory) for Terraform.
-You also need to install an Ansible modules to support metrics.
+You also need to install an Ansible modules to support metrics:
```
-ansible-galaxy install cloudalchemy.node-exporter
+ansible-galaxy install cloudalchemy.node_exporter
```
In addition, you’ll need to:
@@ -74,7 +76,7 @@ This will install the following [EC2](https://aws.amazon.com/ec2) instances (plu
| Resource | Description | Count |
|----------------------|-------------------------------------------------------------|-------|
| Controller instances | The VMs on which a Pravega controller will run | 1 |
-| Bookkeeper instances | The VMs on which a Bookkeeper and Segmentstore will run | 3 |
+| Bookkeeper instances | The VMs on which a Bookkeeper and Segment Store will run | 3 |
| ZooKeeper instances | The VMs on which a ZooKeeper node will run | 3 |
| Client instance | The VM from which the benchmarking suite itself will be run | 2 |
@@ -84,12 +86,12 @@ When you run `terraform apply`, you will be prompted to type `yes`. Type `yes` t
There’s a handful of configurable parameters related to the Terraform deployment that you can alter by modifying the defaults in the `terraform.tfvars` file.
-| Variable | Description | Default |
-|-------------------|--------------------------------------------------------------------------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------|
-| `region` | The AWS region in which the Pravega cluster will be deployed | `us-west-2` |
-| `public_key_path` | The path to the SSH public key that you’ve generated | `~/.ssh/pravega_aws.pub` |
-| `ami` | The [Amazon Machine Image (AWI)](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/AMIs.html) to be used by the cluster’s machines | `ami-9fa343e7` |
-| `instance_types` | The EC2 instance types used by the various components | `i3.4xlarge` (BookKeeper bookies), `m5.large`(Controller), `t3.small` (ZooKeeper), `c5.4xlarge` (benchmarking client) |
+| Variable | Description | Default |
+|-------------------|--------------------------------------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------|
+| `region` | The AWS region in which the Pravega cluster will be deployed | `us-east-2` |
+| `public_key_path` | The path to the SSH public key that you’ve generated | `~/.ssh/pravega_aws.pub` |
+| `ami` | The [Amazon Machine Image (AWI)](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/AMIs.html) to be used by the cluster’s machines | `ami-0bb2449c2217cb9b0` |
+| `instance_types` | The EC2 instance types used by the various components | `i3en.2xlarge` (Segment Store + Bookkeeper), `m5n.xlarge`(Controller), `t2.small` (ZooKeeper), `m5n.xlarge` (benchmarking client), `t2.large` (metrics) |
If you modify the `public_key_path`, make sure that you point to the appropriate SSH key path when running the [Ansible playbook](#_RUNNING_THE_ANSIBLE_PLAYBOOK).
diff --git a/driver-pravega/deploy/deploy.yaml b/driver-pravega/deploy/deploy.yaml
index 3846fa997..5aebfe169 100644
--- a/driver-pravega/deploy/deploy.yaml
+++ b/driver-pravega/deploy/deploy.yaml
@@ -98,20 +98,6 @@
echo 'LANG=en_US.utf-8
LC_ALL=en_US.utf-8' > /etc/environment
-#- name: Install nmon
-# hosts: ["!tier2"]
-# tags: ["nmon"]
-# connection: ssh
-# become: true
-# tasks:
-# - name: Download nmon
-# unarchive:
-# src: "http://sourceforge.net/projects/nmon/files/nmon16j.tar.gz"
-# remote_src: yes
-# dest: /tmp
-# - command: cp /tmp/nmon_AMD64_ubuntu1804 /usr/local/bin/nmon
-# - command: chmod a+x /usr/local/bin/nmon
-
- name: Metrics installation
hosts: ["metrics"]
tags: ["metrics"]
@@ -120,13 +106,38 @@
tasks:
- name: Add Extras Repo
shell: yum-config-manager --enable rhui-REGION-rhel-server-extras
- - name: Install RPM packages
- yum: pkg={{ item }} state=latest
- with_items:
- - docker
- - systemd:
+ when:
+ - ansible_facts['distribution'] == 'RedHat'
+ - ansible_facts['distribution_major_version'] | int <= 7
+ - name: Docker repo
+ yum_repository:
+ name: docker
+ description: repo for docker
+ baseurl: "https://download.docker.com/linux/centos/{{ ansible_facts['distribution_major_version'] }}/x86_64/stable/"
+ gpgcheck: no
+ when: ansible_facts['distribution'] == 'RedHat'
+ - name: Create Docker repo dir
+ file:
+ path: "/etc/yum.repos.d/"
+ state: directory
+ - name: Add Docker Centos extras
+ copy:
+ dest: /etc/yum.repos.d/docker-ce.repo
+ content: |
+ [centos-extras]
+ name=Centos extras - $basearch
+ baseurl=http://mirror.centos.org/centos/7/extras/x86_64
+ enabled=1
+ gpgcheck=1
+ gpgkey=http://centos.org/keys/RPM-GPG-KEY-CentOS-7
+ - name: Installing docker
+ yum:
+ state: latest
+ pkg: ['slirp4netns', 'fuse-overlayfs', 'container-selinux', 'docker-ce']
+ - name: Start docker
+ service:
+ name: docker
state: started
- name: "docker"
enabled: yes
- name: Prometheus installation
@@ -192,7 +203,7 @@
hosts: ["!tier2"]
tags: ["node-exporter"]
roles:
- - cloudalchemy.node-exporter
+ - cloudalchemy.node_exporter
- name: ZooKeeper setup
hosts: zookeeper
@@ -304,7 +315,7 @@
path: "{{ item.path }}"
state: unmounted
with_items:
- - { path: "/mnt/journal", src: "/dev/nvme0n1" }
+ - { path: "/mnt/journal", src: "/dev/nvme2n1" }
- { path: "/mnt/storage", src: "/dev/nvme1n1" }
- name: Format disks
filesystem:
@@ -312,7 +323,7 @@
dev: '{{ item }}'
force: yes
with_items:
- - '/dev/nvme0n1'
+ - '/dev/nvme2n1'
- '/dev/nvme1n1'
- name: Mount disks
mount:
@@ -322,7 +333,7 @@
opts: defaults,noatime,nodiscard
state: mounted
with_items:
- - { path: "/mnt/journal", src: "/dev/nvme0n1" }
+ - { path: "/mnt/journal", src: "/dev/nvme2n1" }
- { path: "/mnt/storage", src: "/dev/nvme1n1" }
- name: BookKeeper setup
@@ -350,7 +361,7 @@
dest: "/opt/bookkeeper/bin/common.sh"
- name: Format BookKeeper metadata in Zookeeper
command: >
- bin/bookkeeper shell metaformat -nonInteractive --force
+ bin/bookkeeper shell initnewcluster
args:
chdir: /opt/bookkeeper
when: groups['bookkeeper'][0] == inventory_hostname
diff --git a/driver-pravega/deploy/provision-pravega-aws.tf b/driver-pravega/deploy/provision-pravega-aws.tf
index 49f959a70..f3c4b6236 100644
--- a/driver-pravega/deploy/provision-pravega-aws.tf
+++ b/driver-pravega/deploy/provision-pravega-aws.tf
@@ -190,7 +190,7 @@ resource "aws_instance" "metrics" {
# Change the EFS provisioned TP here
resource "aws_efs_file_system" "tier2" {
throughput_mode = "provisioned"
- provisioned_throughput_in_mibps = 1000
+ provisioned_throughput_in_mibps = 100
tags = {
Name = "pravega-tier2"
}
diff --git a/driver-pravega/deploy/terraform.tfvars b/driver-pravega/deploy/terraform.tfvars
index 52c9e863e..ecac89770 100644
--- a/driver-pravega/deploy/terraform.tfvars
+++ b/driver-pravega/deploy/terraform.tfvars
@@ -1,12 +1,12 @@
public_key_path = "~/.ssh/pravega_aws.pub"
-region = "us-west-2"
-ami = "ami-9fa343e7" // RHEL-7.4 us-west-2
+region = "us-east-2"
+ami = "ami-0bb2449c2217cb9b0" // RHEL-7.9 us-east-2
instance_types = {
"controller" = "m5.large"
- "bookkeeper" = "i3en.6xlarge"
+ "bookkeeper" = "i3en.2xlarge"
"zookeeper" = "t2.small"
- "client" = "m5n.8xlarge"
+ "client" = "m5n.xlarge"
"metrics" = "t2.large"
}
@@ -14,6 +14,6 @@ num_instances = {
"controller" = 1
"bookkeeper" = 3
"zookeeper" = 3
- "client" = 2
+ "client" = 1
"metrics" = 1
}
diff --git a/driver-pravega/deploy/vars.yaml b/driver-pravega/deploy/vars.yaml
index 730eb107d..e6e9543aa 100644
--- a/driver-pravega/deploy/vars.yaml
+++ b/driver-pravega/deploy/vars.yaml
@@ -13,7 +13,7 @@
#
---
-pravegaVersion: "0.10.1"
+pravegaVersion: "0.12.0"
zookeeperVersion: "3.5.5"
bookkeeperVersion: "4.14.2"
prometheusVersion: "2.2.1"
diff --git a/driver-pravega/pom.xml b/driver-pravega/pom.xml
index 5cce48a41..d1d184f9e 100644
--- a/driver-pravega/pom.xml
+++ b/driver-pravega/pom.xml
@@ -27,7 +27,7 @@
- 0.10.2
+ 0.12.0
@@ -64,7 +64,7 @@
io.pravega
pravega-keycloak-client
- 0.11.0
+ ${pravegaVersion}
org.apache.commons
diff --git a/driver-rabbitmq/pom.xml b/driver-rabbitmq/pom.xml
index d92646d1d..6d1001bdf 100644
--- a/driver-rabbitmq/pom.xml
+++ b/driver-rabbitmq/pom.xml
@@ -33,7 +33,7 @@
com.rabbitmq
amqp-client
- 4.8.0
+ 5.18.0
io.netty
diff --git a/driver-redis/pom.xml b/driver-redis/pom.xml
index 38c9a2855..32f877b1b 100644
--- a/driver-redis/pom.xml
+++ b/driver-redis/pom.xml
@@ -38,7 +38,7 @@
redis.clients
jedis
- 3.7.0
+ 5.0.0
diff --git a/driver-redis/src/main/java/io/openmessaging/benchmark/driver/redis/RedisBenchmarkConsumer.java b/driver-redis/src/main/java/io/openmessaging/benchmark/driver/redis/RedisBenchmarkConsumer.java
index 877d12080..4e8bb88e8 100644
--- a/driver-redis/src/main/java/io/openmessaging/benchmark/driver/redis/RedisBenchmarkConsumer.java
+++ b/driver-redis/src/main/java/io/openmessaging/benchmark/driver/redis/RedisBenchmarkConsumer.java
@@ -27,9 +27,9 @@
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
-import redis.clients.jedis.StreamEntry;
import redis.clients.jedis.StreamEntryID;
import redis.clients.jedis.params.XReadGroupParams;
+import redis.clients.jedis.resps.StreamEntry;
public class RedisBenchmarkConsumer implements BenchmarkConsumer {
private final JedisPool pool;
diff --git a/driver-redis/src/main/java/io/openmessaging/benchmark/driver/redis/RedisBenchmarkDriver.java b/driver-redis/src/main/java/io/openmessaging/benchmark/driver/redis/RedisBenchmarkDriver.java
index 2579ccb30..d1ef95c3d 100644
--- a/driver-redis/src/main/java/io/openmessaging/benchmark/driver/redis/RedisBenchmarkDriver.java
+++ b/driver-redis/src/main/java/io/openmessaging/benchmark/driver/redis/RedisBenchmarkDriver.java
@@ -28,11 +28,11 @@
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
-import redis.clients.jedis.JedisPoolConfig;
public class RedisBenchmarkDriver implements BenchmarkDriver {
JedisPool jedisPool;
@@ -80,7 +80,7 @@ public CompletableFuture createConsumer(
}
private void setupJedisConn() {
- JedisPoolConfig poolConfig = new JedisPoolConfig();
+ GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig<>();
poolConfig.setMaxTotal(this.clientConfig.jedisPoolMaxTotal);
poolConfig.setMaxIdle(this.clientConfig.jedisPoolMaxIdle);
if (this.clientConfig.redisPass != null) {
diff --git a/driver-rocketmq/pom.xml b/driver-rocketmq/pom.xml
index 9e60ff3cc..3cd0a9139 100644
--- a/driver-rocketmq/pom.xml
+++ b/driver-rocketmq/pom.xml
@@ -26,7 +26,7 @@
driver-rocketmq
- 4.9.3
+ 5.1.4
diff --git a/driver-rocketmq/rocketmq.yaml b/driver-rocketmq/rocketmq.yaml
index b5aec62ee..b9b608088 100644
--- a/driver-rocketmq/rocketmq.yaml
+++ b/driver-rocketmq/rocketmq.yaml
@@ -19,5 +19,15 @@ driverClass: io.openmessaging.benchmark.driver.rocketmq.RocketMQBenchmarkDriver
clusterName: DefaultCluster
namesrvAddr: 127.0.0.1:9876
vipChannelEnabled: false
+
+batchCQ: true
+autoBatch: true
+# batchMaxBytes: 32768
+# batchMaxDelayMs: 10
+# totalBatchMaxBytes: 33554432
+
+enableBackpressure: true
+backpressureConcurrency: 1024
+
accessKey:
secretKey:
\ No newline at end of file
diff --git a/driver-rocketmq/src/main/java/io/openmessaging/benchmark/driver/rocketmq/RocketMQBenchmarkDriver.java b/driver-rocketmq/src/main/java/io/openmessaging/benchmark/driver/rocketmq/RocketMQBenchmarkDriver.java
index d7d547098..ccad035fd 100644
--- a/driver-rocketmq/src/main/java/io/openmessaging/benchmark/driver/rocketmq/RocketMQBenchmarkDriver.java
+++ b/driver-rocketmq/src/main/java/io/openmessaging/benchmark/driver/rocketmq/RocketMQBenchmarkDriver.java
@@ -26,9 +26,11 @@
import io.openmessaging.benchmark.driver.rocketmq.client.RocketMQClientConfig;
import java.io.File;
import java.io.IOException;
+import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
@@ -37,12 +39,18 @@
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
+import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.common.TopicAttributes;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.admin.MQAdminExt;
import org.apache.rocketmq.tools.command.CommandUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -80,6 +88,30 @@ public String getTopicNamePrefix() {
return "RocketMQ-Benchmark";
}
+ Map> cachedBrokerAddr = new ConcurrentHashMap<>();
+
+ int fetchCnt = 0;
+
+ private synchronized Set fetchMasterAndSlaveAddrByClusterName(
+ final MQAdminExt adminExt, final String clusterName)
+ throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
+ MQBrokerException, InterruptedException {
+ Set brokerList = cachedBrokerAddr.get(clusterName);
+ if (brokerList == null) {
+ brokerList =
+ CommandUtil.fetchMasterAndSlaveAddrByClusterName(
+ adminExt, this.rmqClientConfig.clusterName);
+ cachedBrokerAddr.put(clusterName, brokerList);
+ if (brokerList.isEmpty()) {
+ throw new RuntimeException("get brokerAddr return null, clusterName: " + clusterName);
+ }
+ }
+ if (fetchCnt++ % 100 == 0) {
+ log.info("fetch brokerAddr count: " + fetchCnt);
+ }
+ return brokerList;
+ }
+
@Override
public CompletableFuture createTopic(final String topic, final int partitions) {
return CompletableFuture.runAsync(
@@ -90,10 +122,15 @@ public CompletableFuture createTopic(final String topic, final int partiti
topicConfig.setReadQueueNums(partitions);
topicConfig.setWriteQueueNums(partitions);
topicConfig.setTopicName(topic);
+ if (Boolean.TRUE.equals(this.rmqClientConfig.batchCQ)) {
+ topicConfig
+ .getAttributes()
+ .put("+" + TopicAttributes.QUEUE_TYPE_ATTRIBUTE.getName(), "BatchCQ");
+ }
try {
Set brokerList =
- CommandUtil.fetchMasterAddrByClusterName(
+ fetchMasterAndSlaveAddrByClusterName(
this.rmqAdmin, this.rmqClientConfig.clusterName);
topicConfig.setReadQueueNums(Math.max(1, partitions / brokerList.size()));
topicConfig.setWriteQueueNums(Math.max(1, partitions / brokerList.size()));
@@ -130,6 +167,26 @@ public CompletableFuture createProducer(final String topic) {
if (null != this.rmqClientConfig.compressMsgBodyOverHowmuch) {
rmqProducer.setCompressMsgBodyOverHowmuch(this.rmqClientConfig.compressMsgBodyOverHowmuch);
}
+
+ if (null != this.rmqClientConfig.autoBatch) {
+ rmqProducer.setAutoBatch(this.rmqClientConfig.autoBatch);
+ }
+ if (null != this.rmqClientConfig.batchMaxBytes) {
+ rmqProducer.batchMaxBytes(this.rmqClientConfig.batchMaxBytes);
+ }
+ if (null != this.rmqClientConfig.batchMaxDelayMs) {
+ rmqProducer.batchMaxDelayMs(this.rmqClientConfig.batchMaxDelayMs);
+ }
+ if (null != this.rmqClientConfig.totalBatchMaxBytes) {
+ rmqProducer.totalBatchMaxBytes(this.rmqClientConfig.totalBatchMaxBytes);
+ }
+ if (null != this.rmqClientConfig.enableBackpressure) {
+ rmqProducer.setEnableBackpressureForAsyncMode(this.rmqClientConfig.enableBackpressure);
+ }
+ if (null != this.rmqClientConfig.backpressureConcurrency) {
+ rmqProducer.setBackPressureForAsyncSendNum(this.rmqClientConfig.backpressureConcurrency);
+ }
+
try {
rmqProducer.start();
} catch (MQClientException e) {
diff --git a/driver-rocketmq/src/main/java/io/openmessaging/benchmark/driver/rocketmq/client/RocketMQClientConfig.java b/driver-rocketmq/src/main/java/io/openmessaging/benchmark/driver/rocketmq/client/RocketMQClientConfig.java
index 840090452..26cb109bb 100644
--- a/driver-rocketmq/src/main/java/io/openmessaging/benchmark/driver/rocketmq/client/RocketMQClientConfig.java
+++ b/driver-rocketmq/src/main/java/io/openmessaging/benchmark/driver/rocketmq/client/RocketMQClientConfig.java
@@ -19,6 +19,14 @@ public class RocketMQClientConfig {
public Boolean vipChannelEnabled;
public Integer maxMessageSize;
public Integer compressMsgBodyOverHowmuch;
+ public Boolean batchCQ;
+ public Boolean autoBatch;
+ public Integer batchMaxBytes;
+ public Integer batchMaxDelayMs;
+ public Integer totalBatchMaxBytes;
+
+ public Boolean enableBackpressure;
+ public Integer backpressureConcurrency;
public String accessKey;
public String secretKey;
}
diff --git a/pom.xml b/pom.xml
index 5542f673b..f1876c32f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -69,7 +69,7 @@
4.14.4
10.3.3
3.12.0
- 2.17.1
+ 2.20.0
1.18.24
2.13.2
1.48
@@ -352,7 +352,13 @@
${maven.surefire.plugin.version}
- --add-opens java.base/java.util=ALL-UNNAMED --add-opens java.base/java.lang=ALL-UNNAMED
+
+
+ --add-opens java.base/java.util=ALL-UNNAMED --add-opens java.base/java.lang=ALL-UNNAMED
+ -Dio.netty.tryReflectionSetAccessible=true
+ --add-opens java.base/java.nio=ALL-UNNAMED --add-opens java.base/jdk.internal.misc=ALL-UNNAMED
+ -Dorg.apache.pulsar.shade.io.netty.tryReflectionSetAccessible=true
+ --add-opens java.base/java.util.zip=ALL-UNNAMED