Skip to content

Adding maxWaitQueueSize for connection waiters, To avoid unlimited gr… #143

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,8 @@ Both the PostgreSql and MySql clients take the same configuration:
"maxConnectionRetries" : <maximum-number-of-connection-retries>,
"connectionRetryDelay" : <delay-in-milliseconds>,
"sslMode" : <"disable"|"prefer"|"require"|"verify-ca"|"verify-full">,
"sslRootCert" : <path to file with certificate>
"sslRootCert" : <path to file with certificate>,
"maxWaitQueueSize" : <maximum-number-of-waiters-in-queue-for-connection>
}
----

Expand Down Expand Up @@ -237,3 +238,4 @@ Both the PostgreSql and MySql clients take the same configuration:
that the server host name matches that in the certificate
`sslRootCert` :: Path to SSL root certificate file. Is used if you want to verify privately issued certificate.
Refer to https://github.com/mauricio/postgresql-async[postgresql-async] documentation for more details.
`maxWaitQueueSize`:: The maximum requests allowed in the wait queue for connection. Defaults to `-1` (unlimited).
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,9 @@

package io.vertx.ext.asyncsql.impl;

import com.github.jasync.sql.db.Configuration;
import com.github.jasync.sql.db.Connection;
import com.github.jasync.sql.db.ConnectionPoolConfiguration;
import com.github.jasync.sql.db.SSLConfiguration;
import com.github.jasync.sql.db.pool.PoolConfiguration;
import io.netty.buffer.PooledByteBufAllocator;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
Expand All @@ -33,12 +30,8 @@
import io.vertx.ext.sql.SQLConnection;

import java.nio.charset.Charset;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

/**
* Base class for the SQL client.
Expand All @@ -51,7 +44,6 @@ public abstract class BaseSQLClient {
protected final Vertx vertx;

protected final JsonObject globalConfig;
private long testTimeout;

public BaseSQLClient(Vertx vertx, JsonObject globalConfig) {
this.vertx = vertx;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

package io.vertx.ext.asyncsql.impl;

import java.util.concurrent.ExecutorService;

import com.github.jasync.sql.db.Connection;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.http.ConnectionPoolTooBusyException;
import io.vertx.core.json.JsonObject;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
Expand All @@ -44,28 +45,31 @@ public abstract class AsyncConnectionPool {
public static final int DEFAULT_MAX_CONNECTION_RETRIES = 0; // No connection retries by default
public static final int DEFAULT_CONNECTION_RETRY_DELAY = 5_000; // 5 seconds between retries by default
public static final int DEFAULT_CONNECTION_RELEASE_DELAY = 0; // never release idle connection by default
public static final int DEFAULT_MAX_WAIT_QUEUE_SIZE = -1; // unlimited queue size

private static final Logger logger = LoggerFactory.getLogger(AsyncConnectionPool.class);

private final int maxPoolSize;
private final int maxConnectionRetries;
private final int connectionRetryDelay;
private final int connectionReleaseDelay;
private final int maxQueueSize;

protected final ConnectionPoolConfiguration connectionConfig;
protected final Vertx vertx;

private int poolSize = 0;
private final Deque<Connection> availableConnections = new ArrayDeque<>();
private final Deque<Handler<AsyncResult<Connection>>> waiters = new ArrayDeque<>();
private final Map<Connection,Long> timers = new HashMap<>();
private final Map<Connection, Long> timers = new HashMap<>();

public AsyncConnectionPool(Vertx vertx, JsonObject globalConfig, ConnectionPoolConfiguration connectionConfig) {
this.vertx = vertx;
this.maxPoolSize = globalConfig.getInteger("maxPoolSize", DEFAULT_MAX_POOL_SIZE);
this.maxConnectionRetries = globalConfig.getInteger("maxConnectionRetries", DEFAULT_MAX_CONNECTION_RETRIES);
this.connectionRetryDelay = globalConfig.getInteger("connectionRetryDelay", DEFAULT_CONNECTION_RETRY_DELAY);
this.connectionReleaseDelay = globalConfig.getInteger("connectionReleaseDelay", DEFAULT_CONNECTION_RELEASE_DELAY);
this.maxQueueSize = globalConfig.getInteger("maxWaitQueueSize", DEFAULT_MAX_WAIT_QUEUE_SIZE);
this.connectionConfig = connectionConfig;
}

Expand All @@ -75,6 +79,10 @@ public synchronized int getPoolSize() {
return poolSize;
}

public synchronized int getQueueSize() {
return waiters.size();
}

private synchronized void createConnection(Handler<AsyncResult<Connection>> handler) {
poolSize += 1;
createAndConnect(new Handler<AsyncResult<Connection>>() {
Expand Down Expand Up @@ -128,7 +136,16 @@ private synchronized void createAndConnect(Handler<AsyncResult<Connection>> hand
}

private synchronized void waitForAvailableConnection(Handler<AsyncResult<Connection>> handler) {
waiters.add(handler);
if (canAddWaiter()) {
waiters.add(handler);
return;
}

handler.handle(Future.failedFuture(new ConnectionPoolTooBusyException("Connection pool reached max wait queue size of " + maxQueueSize)));
}

private synchronized boolean canAddWaiter() {
return maxQueueSize < 0 || waiters.size() < maxQueueSize;
}

private synchronized void createOrWaitForAvailableConnection(Handler<AsyncResult<Connection>> handler) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,13 @@

package io.vertx.ext.asyncsql.impl.pool;

import com.github.jasync.sql.db.Configuration;
import com.github.jasync.sql.db.Connection;
import com.github.jasync.sql.db.ConnectionPoolConfiguration;
import com.github.jasync.sql.db.postgresql.PostgreSQLConnection;
import com.github.jasync.sql.db.postgresql.column.PostgreSQLColumnDecoderRegistry;
import com.github.jasync.sql.db.postgresql.column.PostgreSQLColumnEncoderRegistry;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.asyncsql.impl.ConversionUtils;

/**
* Implementation of the {@link AsyncConnectionPool} for PostGresSQL.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,107 @@ public void testRetriesAndSuccess(TestContext context) {
});
}

// Try to get 30 connections from connectionPool with 15-connection, should result in queueSize of 15 and once the connections are released/given-back to pool waiters should run and queueSize should become 0
@Test
public void testQueueSize(TestContext context) throws InterruptedException {
final int TEST_LENGTH = 30;
final AsyncConnectionPool pool = new AsyncConnectionPoolMock(
new JsonObject()
.put("maxPoolSize", MAX_POOL_SIZE),
this::getGoodConnection);
final Queue<Connection> connectionSet = new LinkedList<>();
final CountDownLatch countDownLatch = new CountDownLatch(TEST_LENGTH);

// Ask for 30 connections
for (int i = 0; i < TEST_LENGTH; i++) {
pool.take(result -> {
// We will decrease our CountDownLatch with each obtained connection
countDownLatch.countDown();
context.assertTrue(result.succeeded());
connectionSet.add(result.result());
});
}
// Wait up to 200 millisecond to obtain the 30 connections (it should not happen)
context.assertFalse(countDownLatch.await(200, TimeUnit.MILLISECONDS));
final int expectedWaiterQueueSize = TEST_LENGTH - MAX_POOL_SIZE;
context.assertEquals(pool.getQueueSize(), expectedWaiterQueueSize);
context.assertEquals(pool.getPoolSize(), MAX_POOL_SIZE);

// Now releasing/giving-back connection to the pool should reduce waiter queueSize to 0
for (int i = 0; i < MAX_POOL_SIZE; i++) {
pool.giveBack(connectionSet.poll());
}

// Once the connections available then the waiters should get the connection and run
context.assertTrue(countDownLatch.await(1, TimeUnit.SECONDS));
context.assertEquals(pool.getQueueSize(), 0);
context.assertEquals(pool.getPoolSize(), MAX_POOL_SIZE);
}

// Max queue size exceed should fail(maxWaitQueueSize=1, maxPoolSize=1)
@Test
public void testMaxQueueSizeExceeds(TestContext context) throws InterruptedException {
final int TEST_LENGTH = 2;
final int MAX_POOL_SIZE = 1;
final int MAX_WAIT_QUEUE_SIZE = 1;
final AsyncConnectionPool pool = new AsyncConnectionPoolMock(
new JsonObject()
.put("maxPoolSize", MAX_POOL_SIZE)
.put("maxWaitQueueSize", MAX_WAIT_QUEUE_SIZE),
this::getGoodConnection);

final CountDownLatch countDownLatch = new CountDownLatch(TEST_LENGTH);

// Ask for 2 connections
for (int i = 0; i < TEST_LENGTH; i++) {
pool.take(result -> {
// We will decrease our CountDownLatch with each obtained connection
countDownLatch.countDown();
context.assertTrue(result.succeeded());
});
}
// Wait up to 200 millisecond to obtain the 2 connections (it should not happen)
context.assertFalse(countDownLatch.await(200, TimeUnit.MILLISECONDS));
context.assertEquals(pool.getPoolSize(), MAX_POOL_SIZE);
context.assertEquals(pool.getQueueSize(), MAX_WAIT_QUEUE_SIZE);

// Now when user makes call for new connection request should fail with proper message
final Async waitForCompletion = context.async();
final String expectedExceptionMessage = "Connection pool reached max wait queue size of " + MAX_WAIT_QUEUE_SIZE;
pool.take(result -> {
context.assertTrue(result.failed());
context.assertEquals(result.cause().getMessage(), expectedExceptionMessage);
waitForCompletion.complete();
});
waitForCompletion.awaitSuccess(200);
}

// will try to get 1000 connection, 985 should wait in the queue(maxPoolSize=15)
@Test
public void testNoMaxWaitQueueLimitByDefault(TestContext context) throws InterruptedException {
final int TEST_LENGTH = 1000;
final AsyncConnectionPool pool = new AsyncConnectionPoolMock(
new JsonObject()
.put("maxPoolSize", MAX_POOL_SIZE),
this::getGoodConnection);
final CountDownLatch countDownLatch = new CountDownLatch(TEST_LENGTH);

// Ask for 1000 connections
for (int i = 0; i < TEST_LENGTH; i++) {
pool.take(result -> {
// We will decrease our CountDownLatch with each obtained connection
countDownLatch.countDown();
context.assertTrue(result.succeeded());
});
}
// Wait up to 200 millisecond to obtain the 1000 connections (it should not happen)
context.assertFalse(countDownLatch.await(200, TimeUnit.MILLISECONDS));

// Only 15 connection will be provided and remaining should wait in waiters queue
final int expectedWaiterQueueSize = TEST_LENGTH - MAX_POOL_SIZE;
context.assertEquals(pool.getQueueSize(), expectedWaiterQueueSize);
}

private Connection getGoodConnection() {
final Connection connection = Mockito.mock(Connection.class);
Mockito.when(connection.connect()).thenAnswer(new Answer<CompletableFuture<? extends Connection>>(){
Expand Down