Skip to content

Commit 1c5a37e

Browse files
committed
#7: Initial support for validation queries
1 parent fe1472a commit 1c5a37e

File tree

4 files changed

+103
-11
lines changed

4 files changed

+103
-11
lines changed

src/main/java/com/github/pgasync/ConnectionPoolBuilder.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
package com.github.pgasync;
1616

17+
import com.github.pgasync.impl.ConnectionValidator;
1718
import com.github.pgasync.impl.conversion.DataConverter;
1819
import com.github.pgasync.impl.netty.NettyPgConnectionPool;
1920

@@ -82,6 +83,11 @@ public ConnectionPoolBuilder ssl(boolean ssl) {
8283
return this;
8384
}
8485

86+
public ConnectionPoolBuilder validationQuery(String validationQuery) {
87+
properties.validationQuery = validationQuery;
88+
return this;
89+
}
90+
8591
/**
8692
* Configuration for a pool.
8793
*/
@@ -96,6 +102,7 @@ public static class PoolProperties {
96102
DataConverter dataConverter = null;
97103
List<Converter<?>> converters = new ArrayList<>();
98104
boolean useSsl;
105+
String validationQuery = "SELECT 1";
99106

100107
public String getHostname() {
101108
return hostname;
@@ -121,5 +128,8 @@ public boolean getUseSsl() {
121128
public DataConverter getDataConverter() {
122129
return dataConverter != null ? dataConverter : new DataConverter(converters);
123130
}
131+
public ConnectionValidator getValidator() {
132+
return new ConnectionValidator(validationQuery);
133+
}
124134
}
125135
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
15+
package com.github.pgasync.impl;
16+
17+
18+
import com.github.pgasync.Connection;
19+
20+
import java.util.function.Consumer;
21+
22+
/**
23+
* @author Antti Laisi
24+
*/
25+
public class ConnectionValidator {
26+
27+
final String validationQuery;
28+
29+
public ConnectionValidator(String validationQuery) {
30+
this.validationQuery = validationQuery;
31+
}
32+
33+
void validate(Connection connection, Runnable onValid, Consumer<Throwable> onError) {
34+
if(validationQuery == null) {
35+
onValid.run();
36+
return;
37+
}
38+
connection.query(validationQuery, rs -> onValid.run(), onError);
39+
}
40+
}

src/main/java/com/github/pgasync/impl/Functions.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,17 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
115
package com.github.pgasync.impl;
216

317
import java.util.function.Consumer;
@@ -20,7 +34,7 @@ enum Functions {
2034
static <A extends Function<T,A> & Supplier<R>, T, R> Supplier<R> reduce(A accumulator, Stream<T> stream) {
2135
return stream.reduce(
2236
accumulator,
23-
(acc, t) -> acc.apply(t),
37+
Function::apply,
2438
(l, r) -> { throw new UnsupportedOperationException(); });
2539
}
2640

src/main/java/com/github/pgasync/impl/PgConnectionPool.java

Lines changed: 38 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ static class QueuedCallback {
5151
final String password;
5252
final String database;
5353
final DataConverter dataConverter;
54+
final ConnectionValidator validator;
5455

5556
final int poolSize;
5657
int currentSize;
@@ -63,6 +64,7 @@ public PgConnectionPool(PoolProperties properties) {
6364
this.database = properties.getDatabase();
6465
this.poolSize = properties.getPoolSize();
6566
this.dataConverter = properties.getDataConverter();
67+
this.validator = properties.getValidator();
6668
}
6769

6870
@Override
@@ -115,6 +117,10 @@ public void close() {
115117

116118
@Override
117119
public void getConnection(final Consumer<Connection> onConnection, final Consumer<Throwable> onError) {
120+
getConnection(onConnection, onError, 0);
121+
}
122+
123+
void getConnection(final Consumer<Connection> onConnection, final Consumer<Throwable> onError, final int attempt) {
118124
if(closed) {
119125
onError.accept(new SqlException("Connection pool is closed"));
120126
return;
@@ -133,18 +139,14 @@ public void getConnection(final Consumer<Connection> onConnection, final Consume
133139
}
134140
}
135141

136-
if (connection == null) {
137-
new PgConnection(openStream(address), dataConverter)
138-
.connect(username, password, database, onConnection, onError);
142+
if (connection != null) {
143+
validateAndApply(connection, onConnection, onError, attempt);
139144
return;
140145
}
141146

142-
try {
143-
onConnection.accept(connection);
144-
} catch (Throwable t) {
145-
release(connection);
146-
onError.accept(t);
147-
}
147+
new PgConnection(openStream(address), dataConverter)
148+
.connect(username, password, database, onConnection, onError);
149+
148150
}
149151

150152
@Override
@@ -171,7 +173,7 @@ public void release(Connection connection) {
171173
if(failed) {
172174
getConnection(next.connectionHandler, next.errorHandler);
173175
} else {
174-
next.connectionHandler.accept(connection);
176+
validateAndApply(connection, next.connectionHandler, next.errorHandler, 0);
175177
}
176178
}
177179
}
@@ -184,6 +186,32 @@ public void release(Connection connection) {
184186
*/
185187
protected abstract PgProtocolStream openStream(InetSocketAddress address);
186188

189+
void validateAndApply(Connection connection, Consumer<Connection> onConnection, Consumer<Throwable> onError, int attempt) {
190+
191+
Runnable onValid = () -> {
192+
try {
193+
onConnection.accept(connection);
194+
} catch (Throwable t) {
195+
release(connection);
196+
onError.accept(t);
197+
}
198+
};
199+
200+
Consumer<Throwable> onValidationFailed = err -> {
201+
if(attempt > poolSize) {
202+
onError.accept(err);
203+
return;
204+
}
205+
try {
206+
connection.close();
207+
} catch (Throwable t) { /* ignored */ }
208+
release(connection);
209+
getConnection(onConnection, onError, attempt + 1);
210+
};
211+
212+
validator.validate(connection, onValid, onValidationFailed);
213+
}
214+
187215
/**
188216
* Transaction that rollbacks the tx on backend error and chains releasing the connection after COMMIT/ROLLBACK.
189217
*/

0 commit comments

Comments
 (0)