diff --git a/benchmark-framework/pom.xml b/benchmark-framework/pom.xml index 256222f9d..0e324141f 100644 --- a/benchmark-framework/pom.xml +++ b/benchmark-framework/pom.xml @@ -39,6 +39,12 @@ ${project.groupId} driver-artemis ${project.version} + + + io.netty + * + + ${project.groupId} @@ -65,11 +71,6 @@ driver-nats ${project.version} - - ${project.groupId} - driver-nats-streaming - ${project.version} - ${project.groupId} driver-nsq @@ -139,6 +140,12 @@ HdrHistogram 2.1.12 + + ${project.groupId} + driver-nats-streaming + ${project.version} + provided + org.projectlombok lombok diff --git a/benchmark-framework/src/main/java/io/openmessaging/benchmark/worker/jackson/HistogramSerializer.java b/benchmark-framework/src/main/java/io/openmessaging/benchmark/worker/jackson/HistogramSerializer.java index a1b61afa3..c1c253f16 100644 --- a/benchmark-framework/src/main/java/io/openmessaging/benchmark/worker/jackson/HistogramSerializer.java +++ b/benchmark-framework/src/main/java/io/openmessaging/benchmark/worker/jackson/HistogramSerializer.java @@ -17,6 +17,7 @@ import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.databind.SerializerProvider; import com.fasterxml.jackson.databind.ser.std.StdSerializer; +import com.google.common.base.Preconditions; import java.io.IOException; import java.nio.ByteBuffer; import org.HdrHistogram.Histogram; @@ -30,16 +31,40 @@ public HistogramSerializer() { super(Histogram.class); } + static byte[] toByteArray(ByteBuffer buffer) { + byte[] encodedBuffer = new byte[buffer.remaining()]; + buffer.get(encodedBuffer); + return encodedBuffer; + } + + static ByteBuffer serializeHistogram(Histogram histo, ByteBuffer buffer) { + buffer.clear(); + while (true) { + final int outBytes = histo.encodeIntoCompressedByteBuffer(buffer); + Preconditions.checkState(outBytes == buffer.position()); + final int capacity = buffer.capacity(); + if (outBytes < capacity) { + // encoding succesful + break; + } + // We filled the entire buffer, an indication that the buffer was not + // large enough, so we double the buffer and try again. + // See: https://github.com/HdrHistogram/HdrHistogram/issues/201 + buffer = ByteBuffer.allocate(capacity * 2); + } + buffer.flip(); + return buffer; + } + @Override public void serialize( Histogram histogram, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) throws IOException { ByteBuffer buffer = threadBuffer.get(); - buffer.clear(); - histogram.encodeIntoCompressedByteBuffer(buffer); - byte[] arr = new byte[buffer.position()]; - buffer.flip(); - buffer.get(arr); - jsonGenerator.writeBinary(arr); + ByteBuffer newBuffer = serializeHistogram(histogram, buffer); + if (newBuffer != buffer) { + threadBuffer.set(newBuffer); + } + jsonGenerator.writeBinary(toByteArray(newBuffer)); } } diff --git a/benchmark-framework/src/test/java/io/openmessaging/benchmark/worker/jackson/HistogramSerDeTest.java b/benchmark-framework/src/test/java/io/openmessaging/benchmark/worker/jackson/HistogramSerDeTest.java index b06ef7566..9769eac6f 100644 --- a/benchmark-framework/src/test/java/io/openmessaging/benchmark/worker/jackson/HistogramSerDeTest.java +++ b/benchmark-framework/src/test/java/io/openmessaging/benchmark/worker/jackson/HistogramSerDeTest.java @@ -18,6 +18,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.module.SimpleModule; import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Random; import org.HdrHistogram.Histogram; import org.junit.jupiter.api.Test; @@ -41,4 +43,64 @@ void deserialize() throws IOException { assertThat(deserialized).isEqualTo(value); } + + /** + * Create a random histogram and insert the given number of samples. + * + * @param samples the number of samples to record into the histogram + * @return a Histogram with the given number of samples + */ + private Histogram randomHisto(int samples) { + Random r = new Random(0xBADBEEF); + Histogram h = new org.HdrHistogram.Histogram(5); + for (int i = 0; i < samples; i++) { + h.recordValue(r.nextInt(10000000)); + } + + return h; + } + + byte[] serializeRandomHisto(int samples, int initialBufferSize) throws Exception { + ByteBuffer inbuffer = ByteBuffer.allocate(initialBufferSize); + Histogram inHisto = randomHisto(samples); + byte[] serialBytes = + HistogramSerializer.toByteArray(HistogramSerializer.serializeHistogram(inHisto, inbuffer)); + + // check roundtrip + Histogram outHisto = + Histogram.decodeFromCompressedByteBuffer(ByteBuffer.wrap(serialBytes), Long.MIN_VALUE); + assertThat(inHisto).isEqualTo(outHisto); + + return serialBytes; + } + + @Test + public void testHistogram() throws Exception { + + // in the worker it's 8 MB but it takes a while to make a histogram that big + final int bufSize = 1002; + + int samples = 300; + + // we do an exponential search to fit the crossover point where we need to grow the buffer + while (true) { + byte[] serialBytes = serializeRandomHisto(samples, bufSize); + // System.out.println("Samples: " + samples + ", histogram size: " + serialBytes.length); + if (serialBytes.length >= bufSize) { + break; + } + samples *= 1.05; + } + + // then walk backwards across the point linearly with increment of 1 to check the boundary + // carefully + while (true) { + samples--; + byte[] serialBytes = serializeRandomHisto(samples, bufSize); + // System.out.println("Samples: " + samples + ", histogram size: " + serialBytes.length); + if (serialBytes.length < bufSize - 10) { + break; + } + } + } } diff --git a/bin/benchmark b/bin/benchmark index 9954f632d..555893496 100755 --- a/bin/benchmark +++ b/bin/benchmark @@ -27,4 +27,4 @@ fi JVM_MEM="${HEAP_OPTS} -XX:+UseG1GC" JVM_GC_LOG=" -XX:+PrintGCDetails -XX:+PrintGCApplicationStoppedTime -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=64m -Xloggc:/dev/shm/benchmark-client-gc_%p.log" -java -server -cp $CLASSPATH $JVM_MEM io.openmessaging.benchmark.Benchmark $* +java -server -cp $CLASSPATH $JVM_MEM io.openmessaging.benchmark.Benchmark "$@" diff --git a/bin/benchmark-worker b/bin/benchmark-worker index ce81e0c8a..164f45b80 100755 --- a/bin/benchmark-worker +++ b/bin/benchmark-worker @@ -26,5 +26,12 @@ fi JVM_MEM="${HEAP_OPTS} -XX:+UseG1GC" JVM_GC_LOG=" -XX:+PrintGCDetails -XX:+PrintGCApplicationStoppedTime -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=64m -Xloggc:/dev/shm/benchmark-client-gc_%p.log" - -exec java -server -cp $CLASSPATH $JVM_MEM io.openmessaging.benchmark.worker.BenchmarkWorker $* +# Required by Netty for optimized direct byte buffer access +JVM_OPTS="$JVM_OPTS -Dio.netty.tryReflectionSetAccessible=true" +JVM_OPTS="$JVM_OPTS --add-opens java.base/java.nio=ALL-UNNAMED --add-opens java.base/jdk.internal.misc=ALL-UNNAMED" +# Set io.netty.tryReflectionSetAccessible in Pulsar's shaded client +JVM_OPTS="$JVM_OPTS -Dorg.apache.pulsar.shade.io.netty.tryReflectionSetAccessible=true" +# Required by Pulsar client optimized checksum calculation on other than Linux x86_64 platforms +# reflection access to java.util.zip.CRC32C +JVM_OPTS="$JVM_OPTS --add-opens java.base/java.util.zip=ALL-UNNAMED" +exec java -server -cp $CLASSPATH $JVM_MEM $JVM_OPTS io.openmessaging.benchmark.worker.BenchmarkWorker "$@" \ No newline at end of file diff --git a/driver-kafka/deploy/hdd-deployment/alicloud/deploy.yaml b/driver-kafka/deploy/hdd-deployment/alicloud/deploy.yaml index 24575d447..24816a5db 100644 --- a/driver-kafka/deploy/hdd-deployment/alicloud/deploy.yaml +++ b/driver-kafka/deploy/hdd-deployment/alicloud/deploy.yaml @@ -54,7 +54,7 @@ - set_fact: zookeeperServers: "{{ groups['zookeeper'] | map('extract', hostvars, ['ansible_default_ipv4', 'address']) | map('regex_replace', '^(.*)$', '\\1:2181') | join(',') }}" boostrapServers: "{{ groups['kafka'] | map('extract', hostvars, ['private_ip']) | map('regex_replace', '^(.*)$', '\\1:9092') | join(',') }}" - kafkaVersion: "2.0.0" + kafkaVersion: "3.6.1" - debug: msg: "zookeeper servers: {{ zookeeperServers }}\nboostrap servers: {{ boostrapServers }}" - name: Download Kafka package diff --git a/driver-kafka/deploy/hdd-deployment/deploy.yaml b/driver-kafka/deploy/hdd-deployment/deploy.yaml index a8d1de38d..b9f632b77 100644 --- a/driver-kafka/deploy/hdd-deployment/deploy.yaml +++ b/driver-kafka/deploy/hdd-deployment/deploy.yaml @@ -80,7 +80,7 @@ - set_fact: zookeeperServers: "{{ groups['zookeeper'] | map('extract', hostvars, ['ansible_default_ipv4', 'address']) | map('regex_replace', '^(.*)$', '\\1:2181') | join(',') }}" boostrapServers: "{{ groups['kafka'] | map('extract', hostvars, ['private_ip']) | map('regex_replace', '^(.*)$', '\\1:9092') | join(',') }}" - kafkaVersion: "2.8.1" + kafkaVersion: "3.6.1" - debug: msg: "zookeeper servers: {{ zookeeperServers }}\nboostrap servers: {{ boostrapServers }}" - name: Download Kafka package diff --git a/driver-kafka/deploy/hdd-deployment/templates/kafka.service b/driver-kafka/deploy/hdd-deployment/templates/kafka.service index c5afac56a..d35ac4b8b 100644 --- a/driver-kafka/deploy/hdd-deployment/templates/kafka.service +++ b/driver-kafka/deploy/hdd-deployment/templates/kafka.service @@ -5,7 +5,7 @@ After=network.target [Service] ExecStart=/opt/kafka/bin/kafka-server-start.sh config/server.properties Environment='KAFKA_HEAP_OPTS=-Xms6g -Xmx6g -XX:MetaspaceSize=96m' -Environment='KAFKA_JVM_PERFORMANCE_OPTS=-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80 -Djava.awt.headless=true" +Environment='KAFKA_JVM_PERFORMANCE_OPTS=-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80 -Djava.awt.headless=true' WorkingDirectory=/opt/kafka RestartSec=1s Restart=on-failure diff --git a/driver-kafka/deploy/ssd-deployment/alicloud/deploy.yaml b/driver-kafka/deploy/ssd-deployment/alicloud/deploy.yaml index 24575d447..24816a5db 100644 --- a/driver-kafka/deploy/ssd-deployment/alicloud/deploy.yaml +++ b/driver-kafka/deploy/ssd-deployment/alicloud/deploy.yaml @@ -54,7 +54,7 @@ - set_fact: zookeeperServers: "{{ groups['zookeeper'] | map('extract', hostvars, ['ansible_default_ipv4', 'address']) | map('regex_replace', '^(.*)$', '\\1:2181') | join(',') }}" boostrapServers: "{{ groups['kafka'] | map('extract', hostvars, ['private_ip']) | map('regex_replace', '^(.*)$', '\\1:9092') | join(',') }}" - kafkaVersion: "2.0.0" + kafkaVersion: "3.6.1" - debug: msg: "zookeeper servers: {{ zookeeperServers }}\nboostrap servers: {{ boostrapServers }}" - name: Download Kafka package diff --git a/driver-kafka/deploy/ssd-deployment/deploy.yaml b/driver-kafka/deploy/ssd-deployment/deploy.yaml index 6a1e502bb..d02ce5dda 100644 --- a/driver-kafka/deploy/ssd-deployment/deploy.yaml +++ b/driver-kafka/deploy/ssd-deployment/deploy.yaml @@ -103,7 +103,7 @@ - set_fact: zookeeperServers: "{{ groups['zookeeper'] | map('extract', hostvars, ['ansible_default_ipv4', 'address']) | map('regex_replace', '^(.*)$', '\\1:2181') | join(',') }}" bootstrapServers: "{{ groups['kafka'] | map('extract', hostvars, ['private_ip']) | map('regex_replace', '^(.*)$', '\\1:9092') | join(',') }}" - kafkaVersion: "3.2.1" + kafkaVersion: "3.6.1" tags: [client-code] - debug: msg: "zookeeper servers: {{ zookeeperServers }}\nbootstrap servers: {{ bootstrapServers }}" diff --git a/driver-kafka/deploy/ssd-deployment/templates/kafka.service b/driver-kafka/deploy/ssd-deployment/templates/kafka.service index 39c04c1a8..9d4a975ef 100644 --- a/driver-kafka/deploy/ssd-deployment/templates/kafka.service +++ b/driver-kafka/deploy/ssd-deployment/templates/kafka.service @@ -5,7 +5,7 @@ After=network.target [Service] ExecStart=/opt/kafka/bin/kafka-server-start.sh config/server.properties Environment='KAFKA_HEAP_OPTS=-Xms16g -Xmx16g -XX:MetaspaceSize=96m' -Environment='KAFKA_JVM_PERFORMANCE_OPTS=-server -XX:+UnlockExperimentalVMOptions -XX:+UseZGC -XX:+ParallelRefProcEnabled -XX:+AggressiveOpts -XX:+DoEscapeAnalysis -XX:ParallelGCThreads=12 -XX:ConcGCThreads=12 -XX:+DisableExplicitGC -XX:-ResizePLAB -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80 -Djava.awt.headless=true" +Environment='KAFKA_JVM_PERFORMANCE_OPTS=-server -XX:+UnlockExperimentalVMOptions -XX:+UseZGC -XX:+ParallelRefProcEnabled -XX:+AggressiveOpts -XX:+DoEscapeAnalysis -XX:ParallelGCThreads=12 -XX:ConcGCThreads=12 -XX:+DisableExplicitGC -XX:-ResizePLAB -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80 -Djava.awt.headless=true' WorkingDirectory=/opt/kafka RestartSec=1s Restart=on-failure diff --git a/driver-kafka/pom.xml b/driver-kafka/pom.xml index 176bdfff4..83324ab5c 100644 --- a/driver-kafka/pom.xml +++ b/driver-kafka/pom.xml @@ -33,7 +33,7 @@ org.apache.kafka kafka-clients - 3.2.1 + 3.6.1 org.projectlombok diff --git a/driver-kafka/src/main/java/io/openmessaging/benchmark/driver/kafka/KafkaBenchmarkDriver.java b/driver-kafka/src/main/java/io/openmessaging/benchmark/driver/kafka/KafkaBenchmarkDriver.java index a2789ac2a..d506b5b37 100644 --- a/driver-kafka/src/main/java/io/openmessaging/benchmark/driver/kafka/KafkaBenchmarkDriver.java +++ b/driver-kafka/src/main/java/io/openmessaging/benchmark/driver/kafka/KafkaBenchmarkDriver.java @@ -45,6 +45,9 @@ public class KafkaBenchmarkDriver implements BenchmarkDriver { + private static final String ZONE_ID_CONFIG = "zone.id"; + private static final String ZONE_ID_TEMPLATE = "{zone.id}"; + private static final String KAFKA_CLIENT_ID = "client.id"; private Config config; private List producers = Collections.synchronizedList(new ArrayList<>()); @@ -63,6 +66,13 @@ public void initialize(File configurationFile, StatsLogger statsLogger) throws I Properties commonProperties = new Properties(); commonProperties.load(new StringReader(config.commonConfig)); + if (commonProperties.containsKey(KAFKA_CLIENT_ID)) { + commonProperties.put( + KAFKA_CLIENT_ID, + applyZoneId( + commonProperties.getProperty(KAFKA_CLIENT_ID), System.getProperty(ZONE_ID_CONFIG))); + } + producerProperties = new Properties(); commonProperties.forEach((key, value) -> producerProperties.put(key, value)); producerProperties.load(new StringReader(config.producerConfig)); @@ -151,6 +161,10 @@ public void close() throws Exception { admin.close(); } + private static String applyZoneId(String clientId, String zoneId) { + return clientId.replace(ZONE_ID_TEMPLATE, zoneId); + } + private static final ObjectMapper mapper = new ObjectMapper(new YAMLFactory()) .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); diff --git a/driver-kop/pom.xml b/driver-kop/pom.xml index 74843ac77..def80e60b 100644 --- a/driver-kop/pom.xml +++ b/driver-kop/pom.xml @@ -50,6 +50,11 @@ org.apache.commons commons-lang3 + + org.apache.logging.log4j + log4j-slf4j-impl + test + org.testng testng diff --git a/driver-nats/src/main/java/io/openmessaging/benchmark/driver/nats/NatsBenchmarkDriver.java b/driver-nats/src/main/java/io/openmessaging/benchmark/driver/nats/NatsBenchmarkDriver.java index c5b677f30..8071f36df 100644 --- a/driver-nats/src/main/java/io/openmessaging/benchmark/driver/nats/NatsBenchmarkDriver.java +++ b/driver-nats/src/main/java/io/openmessaging/benchmark/driver/nats/NatsBenchmarkDriver.java @@ -28,7 +28,6 @@ import io.nats.client.Options; import io.nats.client.PushSubscribeOptions; import io.nats.client.api.ConsumerConfiguration; -import io.nats.client.api.StorageType; import io.nats.client.api.StreamConfiguration; import io.nats.client.api.StreamInfo; import io.nats.client.support.JsonUtils; @@ -94,7 +93,8 @@ public CompletableFuture createTopic(String topic, int partitions) { StreamConfiguration.builder() .name(topic) .subjects(topic) - .storageType(StorageType.File) + .storageType(config.storageType) + .maxBytes(config.maxBytes) .replicas(config.replicationFactor) .build()); log.info("Created stream {} -- {}", topic, JsonUtils.getFormatted(streamInfo)); diff --git a/driver-nats/src/main/java/io/openmessaging/benchmark/driver/nats/NatsConfig.java b/driver-nats/src/main/java/io/openmessaging/benchmark/driver/nats/NatsConfig.java index f6770c982..d76a900b3 100644 --- a/driver-nats/src/main/java/io/openmessaging/benchmark/driver/nats/NatsConfig.java +++ b/driver-nats/src/main/java/io/openmessaging/benchmark/driver/nats/NatsConfig.java @@ -13,8 +13,16 @@ */ package io.openmessaging.benchmark.driver.nats; + +import io.nats.client.api.StorageType; + public class NatsConfig { public String natsHostUrl; public int replicationFactor; + + public StorageType storageType = StorageType.File; + + // -1 is unlimited + public int maxBytes = -1; } diff --git a/driver-pravega/README.md b/driver-pravega/README.md index e67d9ba21..5b45bb6f9 100644 --- a/driver-pravega/README.md +++ b/driver-pravega/README.md @@ -30,13 +30,15 @@ check [how to build Pravega](doc/build_pravega.md). # DEPLOY A PRAVEGA CLUSTER ON AMAZON WEB SERVICES -You can deploy a Pravega cluster on AWS (for benchmarking purposes) using [Terraform 0.12.20](https://www.terraform.io/) and [Ansible 2.8.5](https://docs.ansible.com/ansible/latest/installation_guide/intro_installation.html). -You’ll need to have both of those tools installed as well as the `terraform-inventory` [plugin](https://github.com/adammck/terraform-inventory) for Terraform. +You can deploy a Pravega cluster on AWS (for benchmarking purposes) using [Terraform 0.12.20](https://www.terraform.io/) +and [Ansible 2.8.5](https://docs.ansible.com/ansible/latest/installation_guide/intro_installation.html) (with a +a version of `Jinja2=<3.0.3`). +You’ll need to have both tools installed, as well as the `terraform-inventory` [plugin](https://github.com/adammck/terraform-inventory) for Terraform. -You also need to install an Ansible modules to support metrics. +You also need to install an Ansible modules to support metrics: ``` -ansible-galaxy install cloudalchemy.node-exporter +ansible-galaxy install cloudalchemy.node_exporter ``` In addition, you’ll need to: @@ -74,7 +76,7 @@ This will install the following [EC2](https://aws.amazon.com/ec2) instances (plu | Resource | Description | Count | |----------------------|-------------------------------------------------------------|-------| | Controller instances | The VMs on which a Pravega controller will run | 1 | -| Bookkeeper instances | The VMs on which a Bookkeeper and Segmentstore will run | 3 | +| Bookkeeper instances | The VMs on which a Bookkeeper and Segment Store will run | 3 | | ZooKeeper instances | The VMs on which a ZooKeeper node will run | 3 | | Client instance | The VM from which the benchmarking suite itself will be run | 2 | @@ -84,12 +86,12 @@ When you run `terraform apply`, you will be prompted to type `yes`. Type `yes` t There’s a handful of configurable parameters related to the Terraform deployment that you can alter by modifying the defaults in the `terraform.tfvars` file. -| Variable | Description | Default | -|-------------------|--------------------------------------------------------------------------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------| -| `region` | The AWS region in which the Pravega cluster will be deployed | `us-west-2` | -| `public_key_path` | The path to the SSH public key that you’ve generated | `~/.ssh/pravega_aws.pub` | -| `ami` | The [Amazon Machine Image (AWI)](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/AMIs.html) to be used by the cluster’s machines | `ami-9fa343e7` | -| `instance_types` | The EC2 instance types used by the various components | `i3.4xlarge` (BookKeeper bookies), `m5.large`(Controller), `t3.small` (ZooKeeper), `c5.4xlarge` (benchmarking client) | +| Variable | Description | Default | +|-------------------|--------------------------------------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------| +| `region` | The AWS region in which the Pravega cluster will be deployed | `us-east-2` | +| `public_key_path` | The path to the SSH public key that you’ve generated | `~/.ssh/pravega_aws.pub` | +| `ami` | The [Amazon Machine Image (AWI)](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/AMIs.html) to be used by the cluster’s machines | `ami-0bb2449c2217cb9b0` | +| `instance_types` | The EC2 instance types used by the various components | `i3en.2xlarge` (Segment Store + Bookkeeper), `m5n.xlarge`(Controller), `t2.small` (ZooKeeper), `m5n.xlarge` (benchmarking client), `t2.large` (metrics) | If you modify the `public_key_path`, make sure that you point to the appropriate SSH key path when running the [Ansible playbook](#_RUNNING_THE_ANSIBLE_PLAYBOOK). diff --git a/driver-pravega/deploy/deploy.yaml b/driver-pravega/deploy/deploy.yaml index 3846fa997..5aebfe169 100644 --- a/driver-pravega/deploy/deploy.yaml +++ b/driver-pravega/deploy/deploy.yaml @@ -98,20 +98,6 @@ echo 'LANG=en_US.utf-8 LC_ALL=en_US.utf-8' > /etc/environment -#- name: Install nmon -# hosts: ["!tier2"] -# tags: ["nmon"] -# connection: ssh -# become: true -# tasks: -# - name: Download nmon -# unarchive: -# src: "http://sourceforge.net/projects/nmon/files/nmon16j.tar.gz" -# remote_src: yes -# dest: /tmp -# - command: cp /tmp/nmon_AMD64_ubuntu1804 /usr/local/bin/nmon -# - command: chmod a+x /usr/local/bin/nmon - - name: Metrics installation hosts: ["metrics"] tags: ["metrics"] @@ -120,13 +106,38 @@ tasks: - name: Add Extras Repo shell: yum-config-manager --enable rhui-REGION-rhel-server-extras - - name: Install RPM packages - yum: pkg={{ item }} state=latest - with_items: - - docker - - systemd: + when: + - ansible_facts['distribution'] == 'RedHat' + - ansible_facts['distribution_major_version'] | int <= 7 + - name: Docker repo + yum_repository: + name: docker + description: repo for docker + baseurl: "https://download.docker.com/linux/centos/{{ ansible_facts['distribution_major_version'] }}/x86_64/stable/" + gpgcheck: no + when: ansible_facts['distribution'] == 'RedHat' + - name: Create Docker repo dir + file: + path: "/etc/yum.repos.d/" + state: directory + - name: Add Docker Centos extras + copy: + dest: /etc/yum.repos.d/docker-ce.repo + content: | + [centos-extras] + name=Centos extras - $basearch + baseurl=http://mirror.centos.org/centos/7/extras/x86_64 + enabled=1 + gpgcheck=1 + gpgkey=http://centos.org/keys/RPM-GPG-KEY-CentOS-7 + - name: Installing docker + yum: + state: latest + pkg: ['slirp4netns', 'fuse-overlayfs', 'container-selinux', 'docker-ce'] + - name: Start docker + service: + name: docker state: started - name: "docker" enabled: yes - name: Prometheus installation @@ -192,7 +203,7 @@ hosts: ["!tier2"] tags: ["node-exporter"] roles: - - cloudalchemy.node-exporter + - cloudalchemy.node_exporter - name: ZooKeeper setup hosts: zookeeper @@ -304,7 +315,7 @@ path: "{{ item.path }}" state: unmounted with_items: - - { path: "/mnt/journal", src: "/dev/nvme0n1" } + - { path: "/mnt/journal", src: "/dev/nvme2n1" } - { path: "/mnt/storage", src: "/dev/nvme1n1" } - name: Format disks filesystem: @@ -312,7 +323,7 @@ dev: '{{ item }}' force: yes with_items: - - '/dev/nvme0n1' + - '/dev/nvme2n1' - '/dev/nvme1n1' - name: Mount disks mount: @@ -322,7 +333,7 @@ opts: defaults,noatime,nodiscard state: mounted with_items: - - { path: "/mnt/journal", src: "/dev/nvme0n1" } + - { path: "/mnt/journal", src: "/dev/nvme2n1" } - { path: "/mnt/storage", src: "/dev/nvme1n1" } - name: BookKeeper setup @@ -350,7 +361,7 @@ dest: "/opt/bookkeeper/bin/common.sh" - name: Format BookKeeper metadata in Zookeeper command: > - bin/bookkeeper shell metaformat -nonInteractive --force + bin/bookkeeper shell initnewcluster args: chdir: /opt/bookkeeper when: groups['bookkeeper'][0] == inventory_hostname diff --git a/driver-pravega/deploy/provision-pravega-aws.tf b/driver-pravega/deploy/provision-pravega-aws.tf index 49f959a70..f3c4b6236 100644 --- a/driver-pravega/deploy/provision-pravega-aws.tf +++ b/driver-pravega/deploy/provision-pravega-aws.tf @@ -190,7 +190,7 @@ resource "aws_instance" "metrics" { # Change the EFS provisioned TP here resource "aws_efs_file_system" "tier2" { throughput_mode = "provisioned" - provisioned_throughput_in_mibps = 1000 + provisioned_throughput_in_mibps = 100 tags = { Name = "pravega-tier2" } diff --git a/driver-pravega/deploy/terraform.tfvars b/driver-pravega/deploy/terraform.tfvars index 52c9e863e..ecac89770 100644 --- a/driver-pravega/deploy/terraform.tfvars +++ b/driver-pravega/deploy/terraform.tfvars @@ -1,12 +1,12 @@ public_key_path = "~/.ssh/pravega_aws.pub" -region = "us-west-2" -ami = "ami-9fa343e7" // RHEL-7.4 us-west-2 +region = "us-east-2" +ami = "ami-0bb2449c2217cb9b0" // RHEL-7.9 us-east-2 instance_types = { "controller" = "m5.large" - "bookkeeper" = "i3en.6xlarge" + "bookkeeper" = "i3en.2xlarge" "zookeeper" = "t2.small" - "client" = "m5n.8xlarge" + "client" = "m5n.xlarge" "metrics" = "t2.large" } @@ -14,6 +14,6 @@ num_instances = { "controller" = 1 "bookkeeper" = 3 "zookeeper" = 3 - "client" = 2 + "client" = 1 "metrics" = 1 } diff --git a/driver-pravega/deploy/vars.yaml b/driver-pravega/deploy/vars.yaml index 730eb107d..e6e9543aa 100644 --- a/driver-pravega/deploy/vars.yaml +++ b/driver-pravega/deploy/vars.yaml @@ -13,7 +13,7 @@ # --- -pravegaVersion: "0.10.1" +pravegaVersion: "0.12.0" zookeeperVersion: "3.5.5" bookkeeperVersion: "4.14.2" prometheusVersion: "2.2.1" diff --git a/driver-pravega/pom.xml b/driver-pravega/pom.xml index 5cce48a41..d1d184f9e 100644 --- a/driver-pravega/pom.xml +++ b/driver-pravega/pom.xml @@ -27,7 +27,7 @@ - 0.10.2 + 0.12.0 @@ -64,7 +64,7 @@ io.pravega pravega-keycloak-client - 0.11.0 + ${pravegaVersion} org.apache.commons diff --git a/driver-rabbitmq/pom.xml b/driver-rabbitmq/pom.xml index d92646d1d..6d1001bdf 100644 --- a/driver-rabbitmq/pom.xml +++ b/driver-rabbitmq/pom.xml @@ -33,7 +33,7 @@ com.rabbitmq amqp-client - 4.8.0 + 5.18.0 io.netty diff --git a/driver-redis/pom.xml b/driver-redis/pom.xml index 38c9a2855..32f877b1b 100644 --- a/driver-redis/pom.xml +++ b/driver-redis/pom.xml @@ -38,7 +38,7 @@ redis.clients jedis - 3.7.0 + 5.0.0 diff --git a/driver-redis/src/main/java/io/openmessaging/benchmark/driver/redis/RedisBenchmarkConsumer.java b/driver-redis/src/main/java/io/openmessaging/benchmark/driver/redis/RedisBenchmarkConsumer.java index 877d12080..4e8bb88e8 100644 --- a/driver-redis/src/main/java/io/openmessaging/benchmark/driver/redis/RedisBenchmarkConsumer.java +++ b/driver-redis/src/main/java/io/openmessaging/benchmark/driver/redis/RedisBenchmarkConsumer.java @@ -27,9 +27,9 @@ import org.slf4j.LoggerFactory; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; -import redis.clients.jedis.StreamEntry; import redis.clients.jedis.StreamEntryID; import redis.clients.jedis.params.XReadGroupParams; +import redis.clients.jedis.resps.StreamEntry; public class RedisBenchmarkConsumer implements BenchmarkConsumer { private final JedisPool pool; diff --git a/driver-redis/src/main/java/io/openmessaging/benchmark/driver/redis/RedisBenchmarkDriver.java b/driver-redis/src/main/java/io/openmessaging/benchmark/driver/redis/RedisBenchmarkDriver.java index 2579ccb30..d1ef95c3d 100644 --- a/driver-redis/src/main/java/io/openmessaging/benchmark/driver/redis/RedisBenchmarkDriver.java +++ b/driver-redis/src/main/java/io/openmessaging/benchmark/driver/redis/RedisBenchmarkDriver.java @@ -28,11 +28,11 @@ import java.util.Random; import java.util.concurrent.CompletableFuture; import org.apache.bookkeeper.stats.StatsLogger; +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; -import redis.clients.jedis.JedisPoolConfig; public class RedisBenchmarkDriver implements BenchmarkDriver { JedisPool jedisPool; @@ -80,7 +80,7 @@ public CompletableFuture createConsumer( } private void setupJedisConn() { - JedisPoolConfig poolConfig = new JedisPoolConfig(); + GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig<>(); poolConfig.setMaxTotal(this.clientConfig.jedisPoolMaxTotal); poolConfig.setMaxIdle(this.clientConfig.jedisPoolMaxIdle); if (this.clientConfig.redisPass != null) { diff --git a/driver-rocketmq/pom.xml b/driver-rocketmq/pom.xml index 9e60ff3cc..3cd0a9139 100644 --- a/driver-rocketmq/pom.xml +++ b/driver-rocketmq/pom.xml @@ -26,7 +26,7 @@ driver-rocketmq - 4.9.3 + 5.1.4 diff --git a/driver-rocketmq/rocketmq.yaml b/driver-rocketmq/rocketmq.yaml index b5aec62ee..b9b608088 100644 --- a/driver-rocketmq/rocketmq.yaml +++ b/driver-rocketmq/rocketmq.yaml @@ -19,5 +19,15 @@ driverClass: io.openmessaging.benchmark.driver.rocketmq.RocketMQBenchmarkDriver clusterName: DefaultCluster namesrvAddr: 127.0.0.1:9876 vipChannelEnabled: false + +batchCQ: true +autoBatch: true +# batchMaxBytes: 32768 +# batchMaxDelayMs: 10 +# totalBatchMaxBytes: 33554432 + +enableBackpressure: true +backpressureConcurrency: 1024 + accessKey: secretKey: \ No newline at end of file diff --git a/driver-rocketmq/src/main/java/io/openmessaging/benchmark/driver/rocketmq/RocketMQBenchmarkDriver.java b/driver-rocketmq/src/main/java/io/openmessaging/benchmark/driver/rocketmq/RocketMQBenchmarkDriver.java index d7d547098..ccad035fd 100644 --- a/driver-rocketmq/src/main/java/io/openmessaging/benchmark/driver/rocketmq/RocketMQBenchmarkDriver.java +++ b/driver-rocketmq/src/main/java/io/openmessaging/benchmark/driver/rocketmq/RocketMQBenchmarkDriver.java @@ -26,9 +26,11 @@ import io.openmessaging.benchmark.driver.rocketmq.client.RocketMQClientConfig; import java.io.File; import java.io.IOException; +import java.util.Map; import java.util.Random; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import org.apache.bookkeeper.stats.StatsLogger; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.acl.common.AclClientRPCHook; @@ -37,12 +39,18 @@ import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely; +import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.common.TopicAttributes; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.remoting.exception.RemotingConnectException; +import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.admin.MQAdminExt; import org.apache.rocketmq.tools.command.CommandUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -80,6 +88,30 @@ public String getTopicNamePrefix() { return "RocketMQ-Benchmark"; } + Map> cachedBrokerAddr = new ConcurrentHashMap<>(); + + int fetchCnt = 0; + + private synchronized Set fetchMasterAndSlaveAddrByClusterName( + final MQAdminExt adminExt, final String clusterName) + throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, + MQBrokerException, InterruptedException { + Set brokerList = cachedBrokerAddr.get(clusterName); + if (brokerList == null) { + brokerList = + CommandUtil.fetchMasterAndSlaveAddrByClusterName( + adminExt, this.rmqClientConfig.clusterName); + cachedBrokerAddr.put(clusterName, brokerList); + if (brokerList.isEmpty()) { + throw new RuntimeException("get brokerAddr return null, clusterName: " + clusterName); + } + } + if (fetchCnt++ % 100 == 0) { + log.info("fetch brokerAddr count: " + fetchCnt); + } + return brokerList; + } + @Override public CompletableFuture createTopic(final String topic, final int partitions) { return CompletableFuture.runAsync( @@ -90,10 +122,15 @@ public CompletableFuture createTopic(final String topic, final int partiti topicConfig.setReadQueueNums(partitions); topicConfig.setWriteQueueNums(partitions); topicConfig.setTopicName(topic); + if (Boolean.TRUE.equals(this.rmqClientConfig.batchCQ)) { + topicConfig + .getAttributes() + .put("+" + TopicAttributes.QUEUE_TYPE_ATTRIBUTE.getName(), "BatchCQ"); + } try { Set brokerList = - CommandUtil.fetchMasterAddrByClusterName( + fetchMasterAndSlaveAddrByClusterName( this.rmqAdmin, this.rmqClientConfig.clusterName); topicConfig.setReadQueueNums(Math.max(1, partitions / brokerList.size())); topicConfig.setWriteQueueNums(Math.max(1, partitions / brokerList.size())); @@ -130,6 +167,26 @@ public CompletableFuture createProducer(final String topic) { if (null != this.rmqClientConfig.compressMsgBodyOverHowmuch) { rmqProducer.setCompressMsgBodyOverHowmuch(this.rmqClientConfig.compressMsgBodyOverHowmuch); } + + if (null != this.rmqClientConfig.autoBatch) { + rmqProducer.setAutoBatch(this.rmqClientConfig.autoBatch); + } + if (null != this.rmqClientConfig.batchMaxBytes) { + rmqProducer.batchMaxBytes(this.rmqClientConfig.batchMaxBytes); + } + if (null != this.rmqClientConfig.batchMaxDelayMs) { + rmqProducer.batchMaxDelayMs(this.rmqClientConfig.batchMaxDelayMs); + } + if (null != this.rmqClientConfig.totalBatchMaxBytes) { + rmqProducer.totalBatchMaxBytes(this.rmqClientConfig.totalBatchMaxBytes); + } + if (null != this.rmqClientConfig.enableBackpressure) { + rmqProducer.setEnableBackpressureForAsyncMode(this.rmqClientConfig.enableBackpressure); + } + if (null != this.rmqClientConfig.backpressureConcurrency) { + rmqProducer.setBackPressureForAsyncSendNum(this.rmqClientConfig.backpressureConcurrency); + } + try { rmqProducer.start(); } catch (MQClientException e) { diff --git a/driver-rocketmq/src/main/java/io/openmessaging/benchmark/driver/rocketmq/client/RocketMQClientConfig.java b/driver-rocketmq/src/main/java/io/openmessaging/benchmark/driver/rocketmq/client/RocketMQClientConfig.java index 840090452..26cb109bb 100644 --- a/driver-rocketmq/src/main/java/io/openmessaging/benchmark/driver/rocketmq/client/RocketMQClientConfig.java +++ b/driver-rocketmq/src/main/java/io/openmessaging/benchmark/driver/rocketmq/client/RocketMQClientConfig.java @@ -19,6 +19,14 @@ public class RocketMQClientConfig { public Boolean vipChannelEnabled; public Integer maxMessageSize; public Integer compressMsgBodyOverHowmuch; + public Boolean batchCQ; + public Boolean autoBatch; + public Integer batchMaxBytes; + public Integer batchMaxDelayMs; + public Integer totalBatchMaxBytes; + + public Boolean enableBackpressure; + public Integer backpressureConcurrency; public String accessKey; public String secretKey; } diff --git a/pom.xml b/pom.xml index 5542f673b..f1876c32f 100644 --- a/pom.xml +++ b/pom.xml @@ -69,7 +69,7 @@ 4.14.4 10.3.3 3.12.0 - 2.17.1 + 2.20.0 1.18.24 2.13.2 1.48 @@ -352,7 +352,13 @@ ${maven.surefire.plugin.version} - --add-opens java.base/java.util=ALL-UNNAMED --add-opens java.base/java.lang=ALL-UNNAMED + + + --add-opens java.base/java.util=ALL-UNNAMED --add-opens java.base/java.lang=ALL-UNNAMED + -Dio.netty.tryReflectionSetAccessible=true + --add-opens java.base/java.nio=ALL-UNNAMED --add-opens java.base/jdk.internal.misc=ALL-UNNAMED + -Dorg.apache.pulsar.shade.io.netty.tryReflectionSetAccessible=true + --add-opens java.base/java.util.zip=ALL-UNNAMED