From 059789a79f1fedaa5ed746c5e54f6a3236396450 Mon Sep 17 00:00:00 2001 From: iranna patil Date: Tue, 12 Feb 2019 14:44:30 +0100 Subject: [PATCH] Adding maxWaitQueueSize for connection waiters, To avoid unlimited growth of waiters queue, which was causing un-expected response times and memory issues as queue grows. --- .../src/main/asciidoc/index.adoc | 4 +- .../ext/asyncsql/impl/BaseSQLClient.java | 8 -- .../asyncsql/impl/PostgreSQLClientImpl.java | 2 - .../impl/pool/AsyncConnectionPool.java | 21 +++- .../pool/PostgresqlAsyncConnectionPool.java | 2 - .../impl/tool/AsyncConnectionPoolTest.java | 101 ++++++++++++++++++ 6 files changed, 123 insertions(+), 15 deletions(-) diff --git a/vertx-mysql-postgresql-client-jasync/src/main/asciidoc/index.adoc b/vertx-mysql-postgresql-client-jasync/src/main/asciidoc/index.adoc index a92d3318..7cfa7b49 100644 --- a/vertx-mysql-postgresql-client-jasync/src/main/asciidoc/index.adoc +++ b/vertx-mysql-postgresql-client-jasync/src/main/asciidoc/index.adoc @@ -206,7 +206,8 @@ Both the PostgreSql and MySql clients take the same configuration: "maxConnectionRetries" : , "connectionRetryDelay" : , "sslMode" : <"disable"|"prefer"|"require"|"verify-ca"|"verify-full">, - "sslRootCert" : + "sslRootCert" : , + "maxWaitQueueSize" : } ---- @@ -237,3 +238,4 @@ Both the PostgreSql and MySql clients take the same configuration: that the server host name matches that in the certificate `sslRootCert` :: Path to SSL root certificate file. Is used if you want to verify privately issued certificate. Refer to https://github.com/mauricio/postgresql-async[postgresql-async] documentation for more details. +`maxWaitQueueSize`:: The maximum requests allowed in the wait queue for connection. Defaults to `-1` (unlimited). diff --git a/vertx-mysql-postgresql-client-jasync/src/main/java/io/vertx/ext/asyncsql/impl/BaseSQLClient.java b/vertx-mysql-postgresql-client-jasync/src/main/java/io/vertx/ext/asyncsql/impl/BaseSQLClient.java index 72cece4d..29d5279c 100644 --- a/vertx-mysql-postgresql-client-jasync/src/main/java/io/vertx/ext/asyncsql/impl/BaseSQLClient.java +++ b/vertx-mysql-postgresql-client-jasync/src/main/java/io/vertx/ext/asyncsql/impl/BaseSQLClient.java @@ -16,12 +16,9 @@ package io.vertx.ext.asyncsql.impl; -import com.github.jasync.sql.db.Configuration; import com.github.jasync.sql.db.Connection; import com.github.jasync.sql.db.ConnectionPoolConfiguration; import com.github.jasync.sql.db.SSLConfiguration; -import com.github.jasync.sql.db.pool.PoolConfiguration; -import io.netty.buffer.PooledByteBufAllocator; import io.vertx.core.AsyncResult; import io.vertx.core.Future; import io.vertx.core.Handler; @@ -33,12 +30,8 @@ import io.vertx.ext.sql.SQLConnection; import java.nio.charset.Charset; -import java.time.Duration; -import java.time.temporal.ChronoUnit; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; /** * Base class for the SQL client. @@ -51,7 +44,6 @@ public abstract class BaseSQLClient { protected final Vertx vertx; protected final JsonObject globalConfig; - private long testTimeout; public BaseSQLClient(Vertx vertx, JsonObject globalConfig) { this.vertx = vertx; diff --git a/vertx-mysql-postgresql-client-jasync/src/main/java/io/vertx/ext/asyncsql/impl/PostgreSQLClientImpl.java b/vertx-mysql-postgresql-client-jasync/src/main/java/io/vertx/ext/asyncsql/impl/PostgreSQLClientImpl.java index 372036ef..076e992a 100644 --- a/vertx-mysql-postgresql-client-jasync/src/main/java/io/vertx/ext/asyncsql/impl/PostgreSQLClientImpl.java +++ b/vertx-mysql-postgresql-client-jasync/src/main/java/io/vertx/ext/asyncsql/impl/PostgreSQLClientImpl.java @@ -16,8 +16,6 @@ package io.vertx.ext.asyncsql.impl; -import java.util.concurrent.ExecutorService; - import com.github.jasync.sql.db.Connection; import io.vertx.core.Vertx; import io.vertx.core.json.JsonObject; diff --git a/vertx-mysql-postgresql-client-jasync/src/main/java/io/vertx/ext/asyncsql/impl/pool/AsyncConnectionPool.java b/vertx-mysql-postgresql-client-jasync/src/main/java/io/vertx/ext/asyncsql/impl/pool/AsyncConnectionPool.java index 627c3229..3e8da02b 100644 --- a/vertx-mysql-postgresql-client-jasync/src/main/java/io/vertx/ext/asyncsql/impl/pool/AsyncConnectionPool.java +++ b/vertx-mysql-postgresql-client-jasync/src/main/java/io/vertx/ext/asyncsql/impl/pool/AsyncConnectionPool.java @@ -22,6 +22,7 @@ import io.vertx.core.Future; import io.vertx.core.Handler; import io.vertx.core.Vertx; +import io.vertx.core.http.ConnectionPoolTooBusyException; import io.vertx.core.json.JsonObject; import io.vertx.core.logging.Logger; import io.vertx.core.logging.LoggerFactory; @@ -44,6 +45,7 @@ public abstract class AsyncConnectionPool { public static final int DEFAULT_MAX_CONNECTION_RETRIES = 0; // No connection retries by default public static final int DEFAULT_CONNECTION_RETRY_DELAY = 5_000; // 5 seconds between retries by default public static final int DEFAULT_CONNECTION_RELEASE_DELAY = 0; // never release idle connection by default + public static final int DEFAULT_MAX_WAIT_QUEUE_SIZE = -1; // unlimited queue size private static final Logger logger = LoggerFactory.getLogger(AsyncConnectionPool.class); @@ -51,6 +53,7 @@ public abstract class AsyncConnectionPool { private final int maxConnectionRetries; private final int connectionRetryDelay; private final int connectionReleaseDelay; + private final int maxQueueSize; protected final ConnectionPoolConfiguration connectionConfig; protected final Vertx vertx; @@ -58,7 +61,7 @@ public abstract class AsyncConnectionPool { private int poolSize = 0; private final Deque availableConnections = new ArrayDeque<>(); private final Deque>> waiters = new ArrayDeque<>(); - private final Map timers = new HashMap<>(); + private final Map timers = new HashMap<>(); public AsyncConnectionPool(Vertx vertx, JsonObject globalConfig, ConnectionPoolConfiguration connectionConfig) { this.vertx = vertx; @@ -66,6 +69,7 @@ public AsyncConnectionPool(Vertx vertx, JsonObject globalConfig, ConnectionPoolC this.maxConnectionRetries = globalConfig.getInteger("maxConnectionRetries", DEFAULT_MAX_CONNECTION_RETRIES); this.connectionRetryDelay = globalConfig.getInteger("connectionRetryDelay", DEFAULT_CONNECTION_RETRY_DELAY); this.connectionReleaseDelay = globalConfig.getInteger("connectionReleaseDelay", DEFAULT_CONNECTION_RELEASE_DELAY); + this.maxQueueSize = globalConfig.getInteger("maxWaitQueueSize", DEFAULT_MAX_WAIT_QUEUE_SIZE); this.connectionConfig = connectionConfig; } @@ -75,6 +79,10 @@ public synchronized int getPoolSize() { return poolSize; } + public synchronized int getQueueSize() { + return waiters.size(); + } + private synchronized void createConnection(Handler> handler) { poolSize += 1; createAndConnect(new Handler>() { @@ -128,7 +136,16 @@ private synchronized void createAndConnect(Handler> hand } private synchronized void waitForAvailableConnection(Handler> handler) { - waiters.add(handler); + if (canAddWaiter()) { + waiters.add(handler); + return; + } + + handler.handle(Future.failedFuture(new ConnectionPoolTooBusyException("Connection pool reached max wait queue size of " + maxQueueSize))); + } + + private synchronized boolean canAddWaiter() { + return maxQueueSize < 0 || waiters.size() < maxQueueSize; } private synchronized void createOrWaitForAvailableConnection(Handler> handler) { diff --git a/vertx-mysql-postgresql-client-jasync/src/main/java/io/vertx/ext/asyncsql/impl/pool/PostgresqlAsyncConnectionPool.java b/vertx-mysql-postgresql-client-jasync/src/main/java/io/vertx/ext/asyncsql/impl/pool/PostgresqlAsyncConnectionPool.java index a43712aa..572b4490 100644 --- a/vertx-mysql-postgresql-client-jasync/src/main/java/io/vertx/ext/asyncsql/impl/pool/PostgresqlAsyncConnectionPool.java +++ b/vertx-mysql-postgresql-client-jasync/src/main/java/io/vertx/ext/asyncsql/impl/pool/PostgresqlAsyncConnectionPool.java @@ -16,7 +16,6 @@ package io.vertx.ext.asyncsql.impl.pool; -import com.github.jasync.sql.db.Configuration; import com.github.jasync.sql.db.Connection; import com.github.jasync.sql.db.ConnectionPoolConfiguration; import com.github.jasync.sql.db.postgresql.PostgreSQLConnection; @@ -24,7 +23,6 @@ import com.github.jasync.sql.db.postgresql.column.PostgreSQLColumnEncoderRegistry; import io.vertx.core.Vertx; import io.vertx.core.json.JsonObject; -import io.vertx.ext.asyncsql.impl.ConversionUtils; /** * Implementation of the {@link AsyncConnectionPool} for PostGresSQL. diff --git a/vertx-mysql-postgresql-client-jasync/src/test/java/io/vertx/ext/asyncsql/impl/tool/AsyncConnectionPoolTest.java b/vertx-mysql-postgresql-client-jasync/src/test/java/io/vertx/ext/asyncsql/impl/tool/AsyncConnectionPoolTest.java index 3aeeaae5..3b161834 100644 --- a/vertx-mysql-postgresql-client-jasync/src/test/java/io/vertx/ext/asyncsql/impl/tool/AsyncConnectionPoolTest.java +++ b/vertx-mysql-postgresql-client-jasync/src/test/java/io/vertx/ext/asyncsql/impl/tool/AsyncConnectionPoolTest.java @@ -232,6 +232,107 @@ public void testRetriesAndSuccess(TestContext context) { }); } + // Try to get 30 connections from connectionPool with 15-connection, should result in queueSize of 15 and once the connections are released/given-back to pool waiters should run and queueSize should become 0 + @Test + public void testQueueSize(TestContext context) throws InterruptedException { + final int TEST_LENGTH = 30; + final AsyncConnectionPool pool = new AsyncConnectionPoolMock( + new JsonObject() + .put("maxPoolSize", MAX_POOL_SIZE), + this::getGoodConnection); + final Queue connectionSet = new LinkedList<>(); + final CountDownLatch countDownLatch = new CountDownLatch(TEST_LENGTH); + + // Ask for 30 connections + for (int i = 0; i < TEST_LENGTH; i++) { + pool.take(result -> { + // We will decrease our CountDownLatch with each obtained connection + countDownLatch.countDown(); + context.assertTrue(result.succeeded()); + connectionSet.add(result.result()); + }); + } + // Wait up to 200 millisecond to obtain the 30 connections (it should not happen) + context.assertFalse(countDownLatch.await(200, TimeUnit.MILLISECONDS)); + final int expectedWaiterQueueSize = TEST_LENGTH - MAX_POOL_SIZE; + context.assertEquals(pool.getQueueSize(), expectedWaiterQueueSize); + context.assertEquals(pool.getPoolSize(), MAX_POOL_SIZE); + + // Now releasing/giving-back connection to the pool should reduce waiter queueSize to 0 + for (int i = 0; i < MAX_POOL_SIZE; i++) { + pool.giveBack(connectionSet.poll()); + } + + // Once the connections available then the waiters should get the connection and run + context.assertTrue(countDownLatch.await(1, TimeUnit.SECONDS)); + context.assertEquals(pool.getQueueSize(), 0); + context.assertEquals(pool.getPoolSize(), MAX_POOL_SIZE); + } + + // Max queue size exceed should fail(maxWaitQueueSize=1, maxPoolSize=1) + @Test + public void testMaxQueueSizeExceeds(TestContext context) throws InterruptedException { + final int TEST_LENGTH = 2; + final int MAX_POOL_SIZE = 1; + final int MAX_WAIT_QUEUE_SIZE = 1; + final AsyncConnectionPool pool = new AsyncConnectionPoolMock( + new JsonObject() + .put("maxPoolSize", MAX_POOL_SIZE) + .put("maxWaitQueueSize", MAX_WAIT_QUEUE_SIZE), + this::getGoodConnection); + + final CountDownLatch countDownLatch = new CountDownLatch(TEST_LENGTH); + + // Ask for 2 connections + for (int i = 0; i < TEST_LENGTH; i++) { + pool.take(result -> { + // We will decrease our CountDownLatch with each obtained connection + countDownLatch.countDown(); + context.assertTrue(result.succeeded()); + }); + } + // Wait up to 200 millisecond to obtain the 2 connections (it should not happen) + context.assertFalse(countDownLatch.await(200, TimeUnit.MILLISECONDS)); + context.assertEquals(pool.getPoolSize(), MAX_POOL_SIZE); + context.assertEquals(pool.getQueueSize(), MAX_WAIT_QUEUE_SIZE); + + // Now when user makes call for new connection request should fail with proper message + final Async waitForCompletion = context.async(); + final String expectedExceptionMessage = "Connection pool reached max wait queue size of " + MAX_WAIT_QUEUE_SIZE; + pool.take(result -> { + context.assertTrue(result.failed()); + context.assertEquals(result.cause().getMessage(), expectedExceptionMessage); + waitForCompletion.complete(); + }); + waitForCompletion.awaitSuccess(200); + } + + // will try to get 1000 connection, 985 should wait in the queue(maxPoolSize=15) + @Test + public void testNoMaxWaitQueueLimitByDefault(TestContext context) throws InterruptedException { + final int TEST_LENGTH = 1000; + final AsyncConnectionPool pool = new AsyncConnectionPoolMock( + new JsonObject() + .put("maxPoolSize", MAX_POOL_SIZE), + this::getGoodConnection); + final CountDownLatch countDownLatch = new CountDownLatch(TEST_LENGTH); + + // Ask for 1000 connections + for (int i = 0; i < TEST_LENGTH; i++) { + pool.take(result -> { + // We will decrease our CountDownLatch with each obtained connection + countDownLatch.countDown(); + context.assertTrue(result.succeeded()); + }); + } + // Wait up to 200 millisecond to obtain the 1000 connections (it should not happen) + context.assertFalse(countDownLatch.await(200, TimeUnit.MILLISECONDS)); + + // Only 15 connection will be provided and remaining should wait in waiters queue + final int expectedWaiterQueueSize = TEST_LENGTH - MAX_POOL_SIZE; + context.assertEquals(pool.getQueueSize(), expectedWaiterQueueSize); + } + private Connection getGoodConnection() { final Connection connection = Mockito.mock(Connection.class); Mockito.when(connection.connect()).thenAnswer(new Answer>(){