diff --git a/.travis.yml b/.travis.yml index 76838857..c4cfdbfe 100644 --- a/.travis.yml +++ b/.travis.yml @@ -8,8 +8,8 @@ services: jobs: include: - stage: test - name: "OracleJDK 8" - jdk: oraclejdk8 + name: "OpenJDK 8" + jdk: openjdk8 script: mvn -q clean verify -B && mvn -P testNativeTransport -q clean verify -B # - if: type != pull_request # name: "OpenJDK 11" diff --git a/pom.xml b/pom.xml index 7b8969d9..71fcddd7 100644 --- a/pom.xml +++ b/pom.xml @@ -10,12 +10,12 @@ 4.0.0 vertx-mysql-postgresql-client-parent - 3.6.3 + 3.6.4-SNAPSHOT Vert.x MySQL/PostgreSQL Client Parent - 3.6.3 + 3.6.4-SNAPSHOT true diff --git a/vertx-mysql-postgresql-client-jasync/pom.xml b/vertx-mysql-postgresql-client-jasync/pom.xml index 306bc683..9d82e88a 100644 --- a/vertx-mysql-postgresql-client-jasync/pom.xml +++ b/vertx-mysql-postgresql-client-jasync/pom.xml @@ -4,7 +4,7 @@ io.vertx vertx-mysql-postgresql-client-parent ../ - 3.6.3 + 3.6.4-SNAPSHOT 4.0.0 @@ -14,7 +14,7 @@ Vert.x MySQL/PostgreSQL Client based on jasync-sql/Kotlin - 0.8.62 + 0.9.51 false false diff --git a/vertx-mysql-postgresql-client-jasync/src/main/asciidoc/index.adoc b/vertx-mysql-postgresql-client-jasync/src/main/asciidoc/index.adoc index 130cc496..7cfa7b49 100644 --- a/vertx-mysql-postgresql-client-jasync/src/main/asciidoc/index.adoc +++ b/vertx-mysql-postgresql-client-jasync/src/main/asciidoc/index.adoc @@ -3,7 +3,7 @@ The Async MySQL / PostgreSQL Client is responsible for providing an interface for Vert.x applications that need to interact with a MySQL or PostgreSQL database. -It uses Mauricio Linhares https://github.com/mauricio/postgresql-async[async driver] to interact with the MySQL +It uses jasync-sql https://github.com/jasync-sql/jasync-sql[async driver] to interact with the MySQL or PostgreSQL databases in a non blocking way. == Using the MySQL and PostgreSQL client @@ -16,8 +16,8 @@ application. To use this client, you need to add the following jar to your `CLASSPATH`: * ${maven.artifactId} ${maven.version} (the client) -* scala-library 2.11.4 -* the postgress-async-2.11 and mysdql-async-2.11 from https://github.com/mauricio/postgresql-async +* kotlin-library 1.3.20 +* the postgress-async and mysdql-async from https://github.com/jasync-sql/jasync-sql * joda time All these jars are downloadable from Maven Central. @@ -32,7 +32,7 @@ If you are building a _Fat-jar_ using Maven or Gradle, just add the following de ---- io.vertx - vertx-mysql-postgresql-client + vertx-mysql-postgresql-client-jasync ${maven.version} ---- @@ -41,7 +41,7 @@ If you are building a _Fat-jar_ using Maven or Gradle, just add the following de [source,groovy,subs="+attributes"] ---- -compile 'io.vertx:vertx-mysql-postgresql-client:${maven.version}' +compile 'io.vertx:vertx-mysql-postgresql-client-jasync:${maven.version}' ---- === In an application using a vert.x distributions @@ -49,7 +49,7 @@ compile 'io.vertx:vertx-mysql-postgresql-client:${maven.version}' If you are using a vert.x distribution, add the jar files listed above to the `$VERTX_HOME/lib` directory. Alternatively, you can edit the `vertx-stack.json` file located in `$VERTX_HOME`, and set `"included": true` -for the `vertx-mysql-postgresql-client` dependency. Once done, launch: `vertx resolve --dir=lib --stack= +for the `vertx-mysql-postgresql-client-jasync` dependency. Once done, launch: `vertx resolve --dir=lib --stack= ./vertx-stack.json`. It downloads the client and its dependencies. == Creating a client @@ -206,7 +206,8 @@ Both the PostgreSql and MySql clients take the same configuration: "maxConnectionRetries" : , "connectionRetryDelay" : , "sslMode" : <"disable"|"prefer"|"require"|"verify-ca"|"verify-full">, - "sslRootCert" : + "sslRootCert" : , + "maxWaitQueueSize" : } ---- @@ -237,3 +238,4 @@ Both the PostgreSql and MySql clients take the same configuration: that the server host name matches that in the certificate `sslRootCert` :: Path to SSL root certificate file. Is used if you want to verify privately issued certificate. Refer to https://github.com/mauricio/postgresql-async[postgresql-async] documentation for more details. +`maxWaitQueueSize`:: The maximum requests allowed in the wait queue for connection. Defaults to `-1` (unlimited). diff --git a/vertx-mysql-postgresql-client-jasync/src/main/java/io/vertx/ext/asyncsql/impl/BaseSQLClient.java b/vertx-mysql-postgresql-client-jasync/src/main/java/io/vertx/ext/asyncsql/impl/BaseSQLClient.java index 72cece4d..b50e157e 100644 --- a/vertx-mysql-postgresql-client-jasync/src/main/java/io/vertx/ext/asyncsql/impl/BaseSQLClient.java +++ b/vertx-mysql-postgresql-client-jasync/src/main/java/io/vertx/ext/asyncsql/impl/BaseSQLClient.java @@ -16,11 +16,9 @@ package io.vertx.ext.asyncsql.impl; -import com.github.jasync.sql.db.Configuration; import com.github.jasync.sql.db.Connection; import com.github.jasync.sql.db.ConnectionPoolConfiguration; import com.github.jasync.sql.db.SSLConfiguration; -import com.github.jasync.sql.db.pool.PoolConfiguration; import io.netty.buffer.PooledByteBufAllocator; import io.vertx.core.AsyncResult; import io.vertx.core.Future; @@ -31,14 +29,11 @@ import io.vertx.core.logging.LoggerFactory; import io.vertx.ext.asyncsql.impl.pool.AsyncConnectionPool; import io.vertx.ext.sql.SQLConnection; +import kotlinx.coroutines.Dispatchers; import java.nio.charset.Charset; -import java.time.Duration; -import java.time.temporal.ChronoUnit; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; /** * Base class for the SQL client. @@ -51,7 +46,6 @@ public abstract class BaseSQLClient { protected final Vertx vertx; protected final JsonObject globalConfig; - private long testTimeout; public BaseSQLClient(Vertx vertx, JsonObject globalConfig) { this.vertx = vertx; @@ -114,6 +108,7 @@ protected ConnectionPoolConfiguration getConnectionConfiguration( long testTimeout = config.getLong("testTimeout", defaultTestTimeout); Long queryTimeout = config.getLong("queryTimeout"); Map sslConfig = buildSslConfig(config); + String applicationName = config.getString("applicationName"); log.info("Creating configuration for " + host + ":" + port); return new ConnectionPoolConfiguration( @@ -122,7 +117,7 @@ protected ConnectionPoolConfiguration getConnectionConfiguration( database, username, password, - 0 /*maxActiveConnections, unused*/, + 1 /*maxActiveConnections, unused*/, 0 /*maxIdleTime, unused*/, 0 /*maxPendingQueries, unused*/, 0 /*connectionValidationInterval*/, @@ -131,8 +126,12 @@ protected ConnectionPoolConfiguration getConnectionConfiguration( queryTimeout, vertx.nettyEventLoopGroup(), vertx.nettyEventLoopGroup(), /*executor: in non-blocking world, we should only have one event loop group*/ + Dispatchers.getDefault(), new SSLConfiguration(sslConfig), - charset); + charset, + 16777216, + PooledByteBufAllocator.DEFAULT, + applicationName); } private Map buildSslConfig(JsonObject config) { diff --git a/vertx-mysql-postgresql-client-jasync/src/main/java/io/vertx/ext/asyncsql/impl/PostgreSQLClientImpl.java b/vertx-mysql-postgresql-client-jasync/src/main/java/io/vertx/ext/asyncsql/impl/PostgreSQLClientImpl.java index 372036ef..076e992a 100644 --- a/vertx-mysql-postgresql-client-jasync/src/main/java/io/vertx/ext/asyncsql/impl/PostgreSQLClientImpl.java +++ b/vertx-mysql-postgresql-client-jasync/src/main/java/io/vertx/ext/asyncsql/impl/PostgreSQLClientImpl.java @@ -16,8 +16,6 @@ package io.vertx.ext.asyncsql.impl; -import java.util.concurrent.ExecutorService; - import com.github.jasync.sql.db.Connection; import io.vertx.core.Vertx; import io.vertx.core.json.JsonObject; diff --git a/vertx-mysql-postgresql-client-jasync/src/main/java/io/vertx/ext/asyncsql/impl/pool/AsyncConnectionPool.java b/vertx-mysql-postgresql-client-jasync/src/main/java/io/vertx/ext/asyncsql/impl/pool/AsyncConnectionPool.java index 627c3229..3e8da02b 100644 --- a/vertx-mysql-postgresql-client-jasync/src/main/java/io/vertx/ext/asyncsql/impl/pool/AsyncConnectionPool.java +++ b/vertx-mysql-postgresql-client-jasync/src/main/java/io/vertx/ext/asyncsql/impl/pool/AsyncConnectionPool.java @@ -22,6 +22,7 @@ import io.vertx.core.Future; import io.vertx.core.Handler; import io.vertx.core.Vertx; +import io.vertx.core.http.ConnectionPoolTooBusyException; import io.vertx.core.json.JsonObject; import io.vertx.core.logging.Logger; import io.vertx.core.logging.LoggerFactory; @@ -44,6 +45,7 @@ public abstract class AsyncConnectionPool { public static final int DEFAULT_MAX_CONNECTION_RETRIES = 0; // No connection retries by default public static final int DEFAULT_CONNECTION_RETRY_DELAY = 5_000; // 5 seconds between retries by default public static final int DEFAULT_CONNECTION_RELEASE_DELAY = 0; // never release idle connection by default + public static final int DEFAULT_MAX_WAIT_QUEUE_SIZE = -1; // unlimited queue size private static final Logger logger = LoggerFactory.getLogger(AsyncConnectionPool.class); @@ -51,6 +53,7 @@ public abstract class AsyncConnectionPool { private final int maxConnectionRetries; private final int connectionRetryDelay; private final int connectionReleaseDelay; + private final int maxQueueSize; protected final ConnectionPoolConfiguration connectionConfig; protected final Vertx vertx; @@ -58,7 +61,7 @@ public abstract class AsyncConnectionPool { private int poolSize = 0; private final Deque availableConnections = new ArrayDeque<>(); private final Deque>> waiters = new ArrayDeque<>(); - private final Map timers = new HashMap<>(); + private final Map timers = new HashMap<>(); public AsyncConnectionPool(Vertx vertx, JsonObject globalConfig, ConnectionPoolConfiguration connectionConfig) { this.vertx = vertx; @@ -66,6 +69,7 @@ public AsyncConnectionPool(Vertx vertx, JsonObject globalConfig, ConnectionPoolC this.maxConnectionRetries = globalConfig.getInteger("maxConnectionRetries", DEFAULT_MAX_CONNECTION_RETRIES); this.connectionRetryDelay = globalConfig.getInteger("connectionRetryDelay", DEFAULT_CONNECTION_RETRY_DELAY); this.connectionReleaseDelay = globalConfig.getInteger("connectionReleaseDelay", DEFAULT_CONNECTION_RELEASE_DELAY); + this.maxQueueSize = globalConfig.getInteger("maxWaitQueueSize", DEFAULT_MAX_WAIT_QUEUE_SIZE); this.connectionConfig = connectionConfig; } @@ -75,6 +79,10 @@ public synchronized int getPoolSize() { return poolSize; } + public synchronized int getQueueSize() { + return waiters.size(); + } + private synchronized void createConnection(Handler> handler) { poolSize += 1; createAndConnect(new Handler>() { @@ -128,7 +136,16 @@ private synchronized void createAndConnect(Handler> hand } private synchronized void waitForAvailableConnection(Handler> handler) { - waiters.add(handler); + if (canAddWaiter()) { + waiters.add(handler); + return; + } + + handler.handle(Future.failedFuture(new ConnectionPoolTooBusyException("Connection pool reached max wait queue size of " + maxQueueSize))); + } + + private synchronized boolean canAddWaiter() { + return maxQueueSize < 0 || waiters.size() < maxQueueSize; } private synchronized void createOrWaitForAvailableConnection(Handler> handler) { diff --git a/vertx-mysql-postgresql-client-jasync/src/main/java/io/vertx/ext/asyncsql/impl/pool/MysqlAsyncConnectionPool.java b/vertx-mysql-postgresql-client-jasync/src/main/java/io/vertx/ext/asyncsql/impl/pool/MysqlAsyncConnectionPool.java index b47c0a15..f16d6f56 100644 --- a/vertx-mysql-postgresql-client-jasync/src/main/java/io/vertx/ext/asyncsql/impl/pool/MysqlAsyncConnectionPool.java +++ b/vertx-mysql-postgresql-client-jasync/src/main/java/io/vertx/ext/asyncsql/impl/pool/MysqlAsyncConnectionPool.java @@ -16,14 +16,12 @@ package io.vertx.ext.asyncsql.impl.pool; -import com.github.jasync.sql.db.Configuration; import com.github.jasync.sql.db.Connection; import com.github.jasync.sql.db.ConnectionPoolConfiguration; import com.github.jasync.sql.db.mysql.MySQLConnection; import com.github.jasync.sql.db.mysql.util.CharsetMapper; import io.vertx.core.Vertx; import io.vertx.core.json.JsonObject; -import io.vertx.ext.asyncsql.impl.ConversionUtils; /** * Implementation of the {@link AsyncConnectionPool} for MySQL. @@ -40,9 +38,7 @@ public MysqlAsyncConnectionPool(Vertx vertx, JsonObject globalConfig, Connection protected Connection create() { return new MySQLConnection( connectionConfig.getConnectionConfiguration(), - CharsetMapper.Companion.getInstance(), - vertx.nettyEventLoopGroup(), - vertx.nettyEventLoopGroup() + CharsetMapper.Companion.getInstance() ); } diff --git a/vertx-mysql-postgresql-client-jasync/src/main/java/io/vertx/ext/asyncsql/impl/pool/PostgresqlAsyncConnectionPool.java b/vertx-mysql-postgresql-client-jasync/src/main/java/io/vertx/ext/asyncsql/impl/pool/PostgresqlAsyncConnectionPool.java index a43712aa..807f1e2f 100644 --- a/vertx-mysql-postgresql-client-jasync/src/main/java/io/vertx/ext/asyncsql/impl/pool/PostgresqlAsyncConnectionPool.java +++ b/vertx-mysql-postgresql-client-jasync/src/main/java/io/vertx/ext/asyncsql/impl/pool/PostgresqlAsyncConnectionPool.java @@ -16,7 +16,6 @@ package io.vertx.ext.asyncsql.impl.pool; -import com.github.jasync.sql.db.Configuration; import com.github.jasync.sql.db.Connection; import com.github.jasync.sql.db.ConnectionPoolConfiguration; import com.github.jasync.sql.db.postgresql.PostgreSQLConnection; @@ -24,7 +23,6 @@ import com.github.jasync.sql.db.postgresql.column.PostgreSQLColumnEncoderRegistry; import io.vertx.core.Vertx; import io.vertx.core.json.JsonObject; -import io.vertx.ext.asyncsql.impl.ConversionUtils; /** * Implementation of the {@link AsyncConnectionPool} for PostGresSQL. @@ -42,10 +40,7 @@ protected Connection create() { return new PostgreSQLConnection( connectionConfig.getConnectionConfiguration(), PostgreSQLColumnEncoderRegistry.Companion.getInstance(), - PostgreSQLColumnDecoderRegistry.Companion.getInstance(), - vertx.nettyEventLoopGroup(), - vertx.nettyEventLoopGroup() - ); + PostgreSQLColumnDecoderRegistry.Companion.getInstance()); } } diff --git a/vertx-mysql-postgresql-client-jasync/src/test/java/io/vertx/ext/asyncsql/impl/tool/AsyncConnectionPoolTest.java b/vertx-mysql-postgresql-client-jasync/src/test/java/io/vertx/ext/asyncsql/impl/tool/AsyncConnectionPoolTest.java index 3aeeaae5..3b161834 100644 --- a/vertx-mysql-postgresql-client-jasync/src/test/java/io/vertx/ext/asyncsql/impl/tool/AsyncConnectionPoolTest.java +++ b/vertx-mysql-postgresql-client-jasync/src/test/java/io/vertx/ext/asyncsql/impl/tool/AsyncConnectionPoolTest.java @@ -232,6 +232,107 @@ public void testRetriesAndSuccess(TestContext context) { }); } + // Try to get 30 connections from connectionPool with 15-connection, should result in queueSize of 15 and once the connections are released/given-back to pool waiters should run and queueSize should become 0 + @Test + public void testQueueSize(TestContext context) throws InterruptedException { + final int TEST_LENGTH = 30; + final AsyncConnectionPool pool = new AsyncConnectionPoolMock( + new JsonObject() + .put("maxPoolSize", MAX_POOL_SIZE), + this::getGoodConnection); + final Queue connectionSet = new LinkedList<>(); + final CountDownLatch countDownLatch = new CountDownLatch(TEST_LENGTH); + + // Ask for 30 connections + for (int i = 0; i < TEST_LENGTH; i++) { + pool.take(result -> { + // We will decrease our CountDownLatch with each obtained connection + countDownLatch.countDown(); + context.assertTrue(result.succeeded()); + connectionSet.add(result.result()); + }); + } + // Wait up to 200 millisecond to obtain the 30 connections (it should not happen) + context.assertFalse(countDownLatch.await(200, TimeUnit.MILLISECONDS)); + final int expectedWaiterQueueSize = TEST_LENGTH - MAX_POOL_SIZE; + context.assertEquals(pool.getQueueSize(), expectedWaiterQueueSize); + context.assertEquals(pool.getPoolSize(), MAX_POOL_SIZE); + + // Now releasing/giving-back connection to the pool should reduce waiter queueSize to 0 + for (int i = 0; i < MAX_POOL_SIZE; i++) { + pool.giveBack(connectionSet.poll()); + } + + // Once the connections available then the waiters should get the connection and run + context.assertTrue(countDownLatch.await(1, TimeUnit.SECONDS)); + context.assertEquals(pool.getQueueSize(), 0); + context.assertEquals(pool.getPoolSize(), MAX_POOL_SIZE); + } + + // Max queue size exceed should fail(maxWaitQueueSize=1, maxPoolSize=1) + @Test + public void testMaxQueueSizeExceeds(TestContext context) throws InterruptedException { + final int TEST_LENGTH = 2; + final int MAX_POOL_SIZE = 1; + final int MAX_WAIT_QUEUE_SIZE = 1; + final AsyncConnectionPool pool = new AsyncConnectionPoolMock( + new JsonObject() + .put("maxPoolSize", MAX_POOL_SIZE) + .put("maxWaitQueueSize", MAX_WAIT_QUEUE_SIZE), + this::getGoodConnection); + + final CountDownLatch countDownLatch = new CountDownLatch(TEST_LENGTH); + + // Ask for 2 connections + for (int i = 0; i < TEST_LENGTH; i++) { + pool.take(result -> { + // We will decrease our CountDownLatch with each obtained connection + countDownLatch.countDown(); + context.assertTrue(result.succeeded()); + }); + } + // Wait up to 200 millisecond to obtain the 2 connections (it should not happen) + context.assertFalse(countDownLatch.await(200, TimeUnit.MILLISECONDS)); + context.assertEquals(pool.getPoolSize(), MAX_POOL_SIZE); + context.assertEquals(pool.getQueueSize(), MAX_WAIT_QUEUE_SIZE); + + // Now when user makes call for new connection request should fail with proper message + final Async waitForCompletion = context.async(); + final String expectedExceptionMessage = "Connection pool reached max wait queue size of " + MAX_WAIT_QUEUE_SIZE; + pool.take(result -> { + context.assertTrue(result.failed()); + context.assertEquals(result.cause().getMessage(), expectedExceptionMessage); + waitForCompletion.complete(); + }); + waitForCompletion.awaitSuccess(200); + } + + // will try to get 1000 connection, 985 should wait in the queue(maxPoolSize=15) + @Test + public void testNoMaxWaitQueueLimitByDefault(TestContext context) throws InterruptedException { + final int TEST_LENGTH = 1000; + final AsyncConnectionPool pool = new AsyncConnectionPoolMock( + new JsonObject() + .put("maxPoolSize", MAX_POOL_SIZE), + this::getGoodConnection); + final CountDownLatch countDownLatch = new CountDownLatch(TEST_LENGTH); + + // Ask for 1000 connections + for (int i = 0; i < TEST_LENGTH; i++) { + pool.take(result -> { + // We will decrease our CountDownLatch with each obtained connection + countDownLatch.countDown(); + context.assertTrue(result.succeeded()); + }); + } + // Wait up to 200 millisecond to obtain the 1000 connections (it should not happen) + context.assertFalse(countDownLatch.await(200, TimeUnit.MILLISECONDS)); + + // Only 15 connection will be provided and remaining should wait in waiters queue + final int expectedWaiterQueueSize = TEST_LENGTH - MAX_POOL_SIZE; + context.assertEquals(pool.getQueueSize(), expectedWaiterQueueSize); + } + private Connection getGoodConnection() { final Connection connection = Mockito.mock(Connection.class); Mockito.when(connection.connect()).thenAnswer(new Answer>(){ diff --git a/vertx-mysql-postgresql-client-scala/pom.xml b/vertx-mysql-postgresql-client-scala/pom.xml index d507f98e..184dd49c 100644 --- a/vertx-mysql-postgresql-client-scala/pom.xml +++ b/vertx-mysql-postgresql-client-scala/pom.xml @@ -4,13 +4,13 @@ io.vertx vertx-mysql-postgresql-client-parent ../ - 3.6.3 + 3.6.4-SNAPSHOT 4.0.0 vertx-mysql-postgresql-client - 3.6.3 + 3.6.4-SNAPSHOT Vert.x MySQL/PostgreSQL Client based on Scala