diff --git a/.travis.yml b/.travis.yml
index 76838857..c4cfdbfe 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -8,8 +8,8 @@ services:
jobs:
include:
- stage: test
- name: "OracleJDK 8"
- jdk: oraclejdk8
+ name: "OpenJDK 8"
+ jdk: openjdk8
script: mvn -q clean verify -B && mvn -P testNativeTransport -q clean verify -B
# - if: type != pull_request
# name: "OpenJDK 11"
diff --git a/pom.xml b/pom.xml
index 7b8969d9..71fcddd7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -10,12 +10,12 @@
4.0.0
vertx-mysql-postgresql-client-parent
- 3.6.3
+ 3.6.4-SNAPSHOT
Vert.x MySQL/PostgreSQL Client Parent
- 3.6.3
+ 3.6.4-SNAPSHOT
true
diff --git a/vertx-mysql-postgresql-client-jasync/pom.xml b/vertx-mysql-postgresql-client-jasync/pom.xml
index 306bc683..9d82e88a 100644
--- a/vertx-mysql-postgresql-client-jasync/pom.xml
+++ b/vertx-mysql-postgresql-client-jasync/pom.xml
@@ -4,7 +4,7 @@
io.vertx
vertx-mysql-postgresql-client-parent
../
- 3.6.3
+ 3.6.4-SNAPSHOT
4.0.0
@@ -14,7 +14,7 @@
Vert.x MySQL/PostgreSQL Client based on jasync-sql/Kotlin
- 0.8.62
+ 0.9.51
false
false
diff --git a/vertx-mysql-postgresql-client-jasync/src/main/asciidoc/index.adoc b/vertx-mysql-postgresql-client-jasync/src/main/asciidoc/index.adoc
index 130cc496..7cfa7b49 100644
--- a/vertx-mysql-postgresql-client-jasync/src/main/asciidoc/index.adoc
+++ b/vertx-mysql-postgresql-client-jasync/src/main/asciidoc/index.adoc
@@ -3,7 +3,7 @@
The Async MySQL / PostgreSQL Client is responsible for providing an
interface for Vert.x applications that need to interact with a MySQL or PostgreSQL database.
-It uses Mauricio Linhares https://github.com/mauricio/postgresql-async[async driver] to interact with the MySQL
+It uses jasync-sql https://github.com/jasync-sql/jasync-sql[async driver] to interact with the MySQL
or PostgreSQL databases in a non blocking way.
== Using the MySQL and PostgreSQL client
@@ -16,8 +16,8 @@ application.
To use this client, you need to add the following jar to your `CLASSPATH`:
* ${maven.artifactId} ${maven.version} (the client)
-* scala-library 2.11.4
-* the postgress-async-2.11 and mysdql-async-2.11 from https://github.com/mauricio/postgresql-async
+* kotlin-library 1.3.20
+* the postgress-async and mysdql-async from https://github.com/jasync-sql/jasync-sql
* joda time
All these jars are downloadable from Maven Central.
@@ -32,7 +32,7 @@ If you are building a _Fat-jar_ using Maven or Gradle, just add the following de
----
io.vertx
- vertx-mysql-postgresql-client
+ vertx-mysql-postgresql-client-jasync
${maven.version}
----
@@ -41,7 +41,7 @@ If you are building a _Fat-jar_ using Maven or Gradle, just add the following de
[source,groovy,subs="+attributes"]
----
-compile 'io.vertx:vertx-mysql-postgresql-client:${maven.version}'
+compile 'io.vertx:vertx-mysql-postgresql-client-jasync:${maven.version}'
----
=== In an application using a vert.x distributions
@@ -49,7 +49,7 @@ compile 'io.vertx:vertx-mysql-postgresql-client:${maven.version}'
If you are using a vert.x distribution, add the jar files listed above to the `$VERTX_HOME/lib` directory.
Alternatively, you can edit the `vertx-stack.json` file located in `$VERTX_HOME`, and set `"included": true`
-for the `vertx-mysql-postgresql-client` dependency. Once done, launch: `vertx resolve --dir=lib --stack=
+for the `vertx-mysql-postgresql-client-jasync` dependency. Once done, launch: `vertx resolve --dir=lib --stack=
./vertx-stack.json`. It downloads the client and its dependencies.
== Creating a client
@@ -206,7 +206,8 @@ Both the PostgreSql and MySql clients take the same configuration:
"maxConnectionRetries" : ,
"connectionRetryDelay" : ,
"sslMode" : <"disable"|"prefer"|"require"|"verify-ca"|"verify-full">,
- "sslRootCert" :
+ "sslRootCert" : ,
+ "maxWaitQueueSize" :
}
----
@@ -237,3 +238,4 @@ Both the PostgreSql and MySql clients take the same configuration:
that the server host name matches that in the certificate
`sslRootCert` :: Path to SSL root certificate file. Is used if you want to verify privately issued certificate.
Refer to https://github.com/mauricio/postgresql-async[postgresql-async] documentation for more details.
+`maxWaitQueueSize`:: The maximum requests allowed in the wait queue for connection. Defaults to `-1` (unlimited).
diff --git a/vertx-mysql-postgresql-client-jasync/src/main/java/io/vertx/ext/asyncsql/impl/BaseSQLClient.java b/vertx-mysql-postgresql-client-jasync/src/main/java/io/vertx/ext/asyncsql/impl/BaseSQLClient.java
index 72cece4d..b50e157e 100644
--- a/vertx-mysql-postgresql-client-jasync/src/main/java/io/vertx/ext/asyncsql/impl/BaseSQLClient.java
+++ b/vertx-mysql-postgresql-client-jasync/src/main/java/io/vertx/ext/asyncsql/impl/BaseSQLClient.java
@@ -16,11 +16,9 @@
package io.vertx.ext.asyncsql.impl;
-import com.github.jasync.sql.db.Configuration;
import com.github.jasync.sql.db.Connection;
import com.github.jasync.sql.db.ConnectionPoolConfiguration;
import com.github.jasync.sql.db.SSLConfiguration;
-import com.github.jasync.sql.db.pool.PoolConfiguration;
import io.netty.buffer.PooledByteBufAllocator;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
@@ -31,14 +29,11 @@
import io.vertx.core.logging.LoggerFactory;
import io.vertx.ext.asyncsql.impl.pool.AsyncConnectionPool;
import io.vertx.ext.sql.SQLConnection;
+import kotlinx.coroutines.Dispatchers;
import java.nio.charset.Charset;
-import java.time.Duration;
-import java.time.temporal.ChronoUnit;
import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
/**
* Base class for the SQL client.
@@ -51,7 +46,6 @@ public abstract class BaseSQLClient {
protected final Vertx vertx;
protected final JsonObject globalConfig;
- private long testTimeout;
public BaseSQLClient(Vertx vertx, JsonObject globalConfig) {
this.vertx = vertx;
@@ -114,6 +108,7 @@ protected ConnectionPoolConfiguration getConnectionConfiguration(
long testTimeout = config.getLong("testTimeout", defaultTestTimeout);
Long queryTimeout = config.getLong("queryTimeout");
Map sslConfig = buildSslConfig(config);
+ String applicationName = config.getString("applicationName");
log.info("Creating configuration for " + host + ":" + port);
return new ConnectionPoolConfiguration(
@@ -122,7 +117,7 @@ protected ConnectionPoolConfiguration getConnectionConfiguration(
database,
username,
password,
- 0 /*maxActiveConnections, unused*/,
+ 1 /*maxActiveConnections, unused*/,
0 /*maxIdleTime, unused*/,
0 /*maxPendingQueries, unused*/,
0 /*connectionValidationInterval*/,
@@ -131,8 +126,12 @@ protected ConnectionPoolConfiguration getConnectionConfiguration(
queryTimeout,
vertx.nettyEventLoopGroup(),
vertx.nettyEventLoopGroup(), /*executor: in non-blocking world, we should only have one event loop group*/
+ Dispatchers.getDefault(),
new SSLConfiguration(sslConfig),
- charset);
+ charset,
+ 16777216,
+ PooledByteBufAllocator.DEFAULT,
+ applicationName);
}
private Map buildSslConfig(JsonObject config) {
diff --git a/vertx-mysql-postgresql-client-jasync/src/main/java/io/vertx/ext/asyncsql/impl/PostgreSQLClientImpl.java b/vertx-mysql-postgresql-client-jasync/src/main/java/io/vertx/ext/asyncsql/impl/PostgreSQLClientImpl.java
index 372036ef..076e992a 100644
--- a/vertx-mysql-postgresql-client-jasync/src/main/java/io/vertx/ext/asyncsql/impl/PostgreSQLClientImpl.java
+++ b/vertx-mysql-postgresql-client-jasync/src/main/java/io/vertx/ext/asyncsql/impl/PostgreSQLClientImpl.java
@@ -16,8 +16,6 @@
package io.vertx.ext.asyncsql.impl;
-import java.util.concurrent.ExecutorService;
-
import com.github.jasync.sql.db.Connection;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;
diff --git a/vertx-mysql-postgresql-client-jasync/src/main/java/io/vertx/ext/asyncsql/impl/pool/AsyncConnectionPool.java b/vertx-mysql-postgresql-client-jasync/src/main/java/io/vertx/ext/asyncsql/impl/pool/AsyncConnectionPool.java
index 627c3229..3e8da02b 100644
--- a/vertx-mysql-postgresql-client-jasync/src/main/java/io/vertx/ext/asyncsql/impl/pool/AsyncConnectionPool.java
+++ b/vertx-mysql-postgresql-client-jasync/src/main/java/io/vertx/ext/asyncsql/impl/pool/AsyncConnectionPool.java
@@ -22,6 +22,7 @@
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
+import io.vertx.core.http.ConnectionPoolTooBusyException;
import io.vertx.core.json.JsonObject;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
@@ -44,6 +45,7 @@ public abstract class AsyncConnectionPool {
public static final int DEFAULT_MAX_CONNECTION_RETRIES = 0; // No connection retries by default
public static final int DEFAULT_CONNECTION_RETRY_DELAY = 5_000; // 5 seconds between retries by default
public static final int DEFAULT_CONNECTION_RELEASE_DELAY = 0; // never release idle connection by default
+ public static final int DEFAULT_MAX_WAIT_QUEUE_SIZE = -1; // unlimited queue size
private static final Logger logger = LoggerFactory.getLogger(AsyncConnectionPool.class);
@@ -51,6 +53,7 @@ public abstract class AsyncConnectionPool {
private final int maxConnectionRetries;
private final int connectionRetryDelay;
private final int connectionReleaseDelay;
+ private final int maxQueueSize;
protected final ConnectionPoolConfiguration connectionConfig;
protected final Vertx vertx;
@@ -58,7 +61,7 @@ public abstract class AsyncConnectionPool {
private int poolSize = 0;
private final Deque availableConnections = new ArrayDeque<>();
private final Deque>> waiters = new ArrayDeque<>();
- private final Map timers = new HashMap<>();
+ private final Map timers = new HashMap<>();
public AsyncConnectionPool(Vertx vertx, JsonObject globalConfig, ConnectionPoolConfiguration connectionConfig) {
this.vertx = vertx;
@@ -66,6 +69,7 @@ public AsyncConnectionPool(Vertx vertx, JsonObject globalConfig, ConnectionPoolC
this.maxConnectionRetries = globalConfig.getInteger("maxConnectionRetries", DEFAULT_MAX_CONNECTION_RETRIES);
this.connectionRetryDelay = globalConfig.getInteger("connectionRetryDelay", DEFAULT_CONNECTION_RETRY_DELAY);
this.connectionReleaseDelay = globalConfig.getInteger("connectionReleaseDelay", DEFAULT_CONNECTION_RELEASE_DELAY);
+ this.maxQueueSize = globalConfig.getInteger("maxWaitQueueSize", DEFAULT_MAX_WAIT_QUEUE_SIZE);
this.connectionConfig = connectionConfig;
}
@@ -75,6 +79,10 @@ public synchronized int getPoolSize() {
return poolSize;
}
+ public synchronized int getQueueSize() {
+ return waiters.size();
+ }
+
private synchronized void createConnection(Handler> handler) {
poolSize += 1;
createAndConnect(new Handler>() {
@@ -128,7 +136,16 @@ private synchronized void createAndConnect(Handler> hand
}
private synchronized void waitForAvailableConnection(Handler> handler) {
- waiters.add(handler);
+ if (canAddWaiter()) {
+ waiters.add(handler);
+ return;
+ }
+
+ handler.handle(Future.failedFuture(new ConnectionPoolTooBusyException("Connection pool reached max wait queue size of " + maxQueueSize)));
+ }
+
+ private synchronized boolean canAddWaiter() {
+ return maxQueueSize < 0 || waiters.size() < maxQueueSize;
}
private synchronized void createOrWaitForAvailableConnection(Handler> handler) {
diff --git a/vertx-mysql-postgresql-client-jasync/src/main/java/io/vertx/ext/asyncsql/impl/pool/MysqlAsyncConnectionPool.java b/vertx-mysql-postgresql-client-jasync/src/main/java/io/vertx/ext/asyncsql/impl/pool/MysqlAsyncConnectionPool.java
index b47c0a15..f16d6f56 100644
--- a/vertx-mysql-postgresql-client-jasync/src/main/java/io/vertx/ext/asyncsql/impl/pool/MysqlAsyncConnectionPool.java
+++ b/vertx-mysql-postgresql-client-jasync/src/main/java/io/vertx/ext/asyncsql/impl/pool/MysqlAsyncConnectionPool.java
@@ -16,14 +16,12 @@
package io.vertx.ext.asyncsql.impl.pool;
-import com.github.jasync.sql.db.Configuration;
import com.github.jasync.sql.db.Connection;
import com.github.jasync.sql.db.ConnectionPoolConfiguration;
import com.github.jasync.sql.db.mysql.MySQLConnection;
import com.github.jasync.sql.db.mysql.util.CharsetMapper;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;
-import io.vertx.ext.asyncsql.impl.ConversionUtils;
/**
* Implementation of the {@link AsyncConnectionPool} for MySQL.
@@ -40,9 +38,7 @@ public MysqlAsyncConnectionPool(Vertx vertx, JsonObject globalConfig, Connection
protected Connection create() {
return new MySQLConnection(
connectionConfig.getConnectionConfiguration(),
- CharsetMapper.Companion.getInstance(),
- vertx.nettyEventLoopGroup(),
- vertx.nettyEventLoopGroup()
+ CharsetMapper.Companion.getInstance()
);
}
diff --git a/vertx-mysql-postgresql-client-jasync/src/main/java/io/vertx/ext/asyncsql/impl/pool/PostgresqlAsyncConnectionPool.java b/vertx-mysql-postgresql-client-jasync/src/main/java/io/vertx/ext/asyncsql/impl/pool/PostgresqlAsyncConnectionPool.java
index a43712aa..807f1e2f 100644
--- a/vertx-mysql-postgresql-client-jasync/src/main/java/io/vertx/ext/asyncsql/impl/pool/PostgresqlAsyncConnectionPool.java
+++ b/vertx-mysql-postgresql-client-jasync/src/main/java/io/vertx/ext/asyncsql/impl/pool/PostgresqlAsyncConnectionPool.java
@@ -16,7 +16,6 @@
package io.vertx.ext.asyncsql.impl.pool;
-import com.github.jasync.sql.db.Configuration;
import com.github.jasync.sql.db.Connection;
import com.github.jasync.sql.db.ConnectionPoolConfiguration;
import com.github.jasync.sql.db.postgresql.PostgreSQLConnection;
@@ -24,7 +23,6 @@
import com.github.jasync.sql.db.postgresql.column.PostgreSQLColumnEncoderRegistry;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;
-import io.vertx.ext.asyncsql.impl.ConversionUtils;
/**
* Implementation of the {@link AsyncConnectionPool} for PostGresSQL.
@@ -42,10 +40,7 @@ protected Connection create() {
return new PostgreSQLConnection(
connectionConfig.getConnectionConfiguration(),
PostgreSQLColumnEncoderRegistry.Companion.getInstance(),
- PostgreSQLColumnDecoderRegistry.Companion.getInstance(),
- vertx.nettyEventLoopGroup(),
- vertx.nettyEventLoopGroup()
- );
+ PostgreSQLColumnDecoderRegistry.Companion.getInstance());
}
}
diff --git a/vertx-mysql-postgresql-client-jasync/src/test/java/io/vertx/ext/asyncsql/impl/tool/AsyncConnectionPoolTest.java b/vertx-mysql-postgresql-client-jasync/src/test/java/io/vertx/ext/asyncsql/impl/tool/AsyncConnectionPoolTest.java
index 3aeeaae5..3b161834 100644
--- a/vertx-mysql-postgresql-client-jasync/src/test/java/io/vertx/ext/asyncsql/impl/tool/AsyncConnectionPoolTest.java
+++ b/vertx-mysql-postgresql-client-jasync/src/test/java/io/vertx/ext/asyncsql/impl/tool/AsyncConnectionPoolTest.java
@@ -232,6 +232,107 @@ public void testRetriesAndSuccess(TestContext context) {
});
}
+ // Try to get 30 connections from connectionPool with 15-connection, should result in queueSize of 15 and once the connections are released/given-back to pool waiters should run and queueSize should become 0
+ @Test
+ public void testQueueSize(TestContext context) throws InterruptedException {
+ final int TEST_LENGTH = 30;
+ final AsyncConnectionPool pool = new AsyncConnectionPoolMock(
+ new JsonObject()
+ .put("maxPoolSize", MAX_POOL_SIZE),
+ this::getGoodConnection);
+ final Queue connectionSet = new LinkedList<>();
+ final CountDownLatch countDownLatch = new CountDownLatch(TEST_LENGTH);
+
+ // Ask for 30 connections
+ for (int i = 0; i < TEST_LENGTH; i++) {
+ pool.take(result -> {
+ // We will decrease our CountDownLatch with each obtained connection
+ countDownLatch.countDown();
+ context.assertTrue(result.succeeded());
+ connectionSet.add(result.result());
+ });
+ }
+ // Wait up to 200 millisecond to obtain the 30 connections (it should not happen)
+ context.assertFalse(countDownLatch.await(200, TimeUnit.MILLISECONDS));
+ final int expectedWaiterQueueSize = TEST_LENGTH - MAX_POOL_SIZE;
+ context.assertEquals(pool.getQueueSize(), expectedWaiterQueueSize);
+ context.assertEquals(pool.getPoolSize(), MAX_POOL_SIZE);
+
+ // Now releasing/giving-back connection to the pool should reduce waiter queueSize to 0
+ for (int i = 0; i < MAX_POOL_SIZE; i++) {
+ pool.giveBack(connectionSet.poll());
+ }
+
+ // Once the connections available then the waiters should get the connection and run
+ context.assertTrue(countDownLatch.await(1, TimeUnit.SECONDS));
+ context.assertEquals(pool.getQueueSize(), 0);
+ context.assertEquals(pool.getPoolSize(), MAX_POOL_SIZE);
+ }
+
+ // Max queue size exceed should fail(maxWaitQueueSize=1, maxPoolSize=1)
+ @Test
+ public void testMaxQueueSizeExceeds(TestContext context) throws InterruptedException {
+ final int TEST_LENGTH = 2;
+ final int MAX_POOL_SIZE = 1;
+ final int MAX_WAIT_QUEUE_SIZE = 1;
+ final AsyncConnectionPool pool = new AsyncConnectionPoolMock(
+ new JsonObject()
+ .put("maxPoolSize", MAX_POOL_SIZE)
+ .put("maxWaitQueueSize", MAX_WAIT_QUEUE_SIZE),
+ this::getGoodConnection);
+
+ final CountDownLatch countDownLatch = new CountDownLatch(TEST_LENGTH);
+
+ // Ask for 2 connections
+ for (int i = 0; i < TEST_LENGTH; i++) {
+ pool.take(result -> {
+ // We will decrease our CountDownLatch with each obtained connection
+ countDownLatch.countDown();
+ context.assertTrue(result.succeeded());
+ });
+ }
+ // Wait up to 200 millisecond to obtain the 2 connections (it should not happen)
+ context.assertFalse(countDownLatch.await(200, TimeUnit.MILLISECONDS));
+ context.assertEquals(pool.getPoolSize(), MAX_POOL_SIZE);
+ context.assertEquals(pool.getQueueSize(), MAX_WAIT_QUEUE_SIZE);
+
+ // Now when user makes call for new connection request should fail with proper message
+ final Async waitForCompletion = context.async();
+ final String expectedExceptionMessage = "Connection pool reached max wait queue size of " + MAX_WAIT_QUEUE_SIZE;
+ pool.take(result -> {
+ context.assertTrue(result.failed());
+ context.assertEquals(result.cause().getMessage(), expectedExceptionMessage);
+ waitForCompletion.complete();
+ });
+ waitForCompletion.awaitSuccess(200);
+ }
+
+ // will try to get 1000 connection, 985 should wait in the queue(maxPoolSize=15)
+ @Test
+ public void testNoMaxWaitQueueLimitByDefault(TestContext context) throws InterruptedException {
+ final int TEST_LENGTH = 1000;
+ final AsyncConnectionPool pool = new AsyncConnectionPoolMock(
+ new JsonObject()
+ .put("maxPoolSize", MAX_POOL_SIZE),
+ this::getGoodConnection);
+ final CountDownLatch countDownLatch = new CountDownLatch(TEST_LENGTH);
+
+ // Ask for 1000 connections
+ for (int i = 0; i < TEST_LENGTH; i++) {
+ pool.take(result -> {
+ // We will decrease our CountDownLatch with each obtained connection
+ countDownLatch.countDown();
+ context.assertTrue(result.succeeded());
+ });
+ }
+ // Wait up to 200 millisecond to obtain the 1000 connections (it should not happen)
+ context.assertFalse(countDownLatch.await(200, TimeUnit.MILLISECONDS));
+
+ // Only 15 connection will be provided and remaining should wait in waiters queue
+ final int expectedWaiterQueueSize = TEST_LENGTH - MAX_POOL_SIZE;
+ context.assertEquals(pool.getQueueSize(), expectedWaiterQueueSize);
+ }
+
private Connection getGoodConnection() {
final Connection connection = Mockito.mock(Connection.class);
Mockito.when(connection.connect()).thenAnswer(new Answer>(){
diff --git a/vertx-mysql-postgresql-client-scala/pom.xml b/vertx-mysql-postgresql-client-scala/pom.xml
index d507f98e..184dd49c 100644
--- a/vertx-mysql-postgresql-client-scala/pom.xml
+++ b/vertx-mysql-postgresql-client-scala/pom.xml
@@ -4,13 +4,13 @@
io.vertx
vertx-mysql-postgresql-client-parent
../
- 3.6.3
+ 3.6.4-SNAPSHOT
4.0.0
vertx-mysql-postgresql-client
- 3.6.3
+ 3.6.4-SNAPSHOT
Vert.x MySQL/PostgreSQL Client based on Scala