Skip to content

Commit 1b2ca43

Browse files
committed
Replace pool lock with CAS operations.
1 parent 5e12585 commit 1b2ca43

File tree

1 file changed

+56
-58
lines changed

1 file changed

+56
-58
lines changed

src/main/java/com/github/pgasync/impl/PgConnectionPool.java

Lines changed: 56 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -22,37 +22,35 @@
2222
import rx.observers.Subscribers;
2323

2424
import java.net.InetSocketAddress;
25-
import java.util.LinkedList;
26-
import java.util.Queue;
25+
import java.util.ArrayList;
26+
import java.util.List;
27+
import java.util.concurrent.BlockingQueue;
28+
import java.util.concurrent.LinkedBlockingQueue;
2729
import java.util.concurrent.atomic.AtomicBoolean;
30+
import java.util.concurrent.atomic.AtomicInteger;
2831

2932
/**
3033
* Pool for backend connections. Callbacks are queued and executed when pool has an available
3134
* connection.
3235
*
33-
* TODO: Locking scheme is optimized for small thread pools and doesn't scale all that well
34-
* for large ones.
35-
*
3636
* @author Antti Laisi
3737
*/
3838
public abstract class PgConnectionPool implements ConnectionPool {
3939

40-
final Queue<Subscriber<? super Connection>> waiters = new LinkedList<>();
41-
final Queue<Connection> connections = new LinkedList<>();
42-
final Object lock = new Object[0];
40+
final int poolSize;
41+
final AtomicInteger currentSize = new AtomicInteger();
42+
final BlockingQueue<Subscriber<? super Connection>> subscribers = new LinkedBlockingQueue<>();
43+
final BlockingQueue<Connection> connections = new LinkedBlockingQueue<>();
4344

4445
final InetSocketAddress address;
4546
final String username;
4647
final String password;
4748
final String database;
4849
final DataConverter dataConverter;
4950
final ConnectionValidator validator;
51+
final boolean pipeline;
5052

51-
final int poolSize;
52-
protected final boolean pipeline;
53-
54-
int currentSize;
55-
volatile boolean closed;
53+
final AtomicBoolean closed = new AtomicBoolean();
5654

5755
public PgConnectionPool(PoolProperties properties) {
5856
this.address = new InetSocketAddress(properties.getHostname(), properties.getPort());
@@ -78,76 +76,82 @@ public Observable<ResultSet> querySet(String sql, Object... params) {
7876
return getConnection()
7977
.doOnNext(this::releaseIfPipelining)
8078
.flatMap(connection -> connection.querySet(sql, params)
81-
.doOnTerminate(() -> releaseIfNotPipelining(connection)));
79+
.doOnTerminate(() -> releaseIfNotPipelining(connection)));
8280
}
8381

8482
@Override
8583
public Observable<Transaction> begin() {
8684
return getConnection()
8785
.flatMap(connection -> connection.begin()
88-
.doOnError(t -> release(connection))
89-
.map(tx -> new ReleasingTransaction(connection, tx)));
86+
.doOnError(t -> release(connection))
87+
.map(tx -> new ReleasingTransaction(connection, tx)));
9088
}
9189

9290
@Override
9391
public Observable<String> listen(String channel) {
9492
return getConnection()
9593
.lift(subscriber -> Subscribers.create(
9694
connection -> connection.listen(channel)
97-
.doOnSubscribe(() -> release(connection))
98-
.onErrorResumeNext(exception -> listen(channel))
99-
.subscribe(subscriber),
95+
.doOnSubscribe(() -> release(connection))
96+
.onErrorResumeNext(exception -> listen(channel))
97+
.subscribe(subscriber),
10098
subscriber::onError));
10199
}
102100

103101
@Override
102+
@SuppressWarnings("MismatchedQueryAndUpdateOfCollection")
104103
public void close() {
105-
closed = true;
106-
synchronized (lock) {
107-
for(Connection conn = connections.poll(); conn != null; conn = connections.poll()) {
108-
conn.close();
109-
// TODO: remove conn from listeners
110-
}
111-
for(Subscriber<? super Connection> waiter = waiters.poll(); waiter != null; waiter = waiters.poll()) {
112-
waiter.onError(new SqlException("Connection pool is closed"));
113-
}
104+
closed.set(true);
105+
106+
List<Connection> closeConnections = new ArrayList<>();
107+
if(connections.drainTo(closeConnections) > 0) {
108+
closeConnections.forEach(Connection::close);
109+
}
110+
111+
List<Subscriber<? super Connection>> cancelSubscribers = new ArrayList<>();
112+
if(subscribers.drainTo(cancelSubscribers) > 0) {
113+
cancelSubscribers.forEach(subscriber -> subscriber.onError(new SqlException("Connection pool is closed")));
114114
}
115115
}
116116

117117
@Override
118118
public Observable<Connection> getConnection() {
119-
if(closed) {
119+
if(closed.get()) {
120120
return Observable.error(new SqlException("Connection pool is closed"));
121121
}
122122

123123
return Observable.create(subscriber -> {
124124

125-
Connection connection;
126-
127-
synchronized (lock) {
128-
connection = connections.poll();
129-
if (connection == null) {
130-
if (currentSize < poolSize) {
131-
currentSize++;
132-
} else {
133-
waiters.add(subscriber);
134-
return;
135-
}
136-
}
137-
}
138-
125+
Connection connection = connections.poll();
139126
if (connection != null) {
140127
subscriber.onNext(connection);
141128
subscriber.onCompleted();
142129
return;
143130
}
144131

132+
if(!tryIncreaseSize()) {
133+
subscribers.add(subscriber);
134+
return;
135+
}
136+
145137
new PgConnection(openStream(address), dataConverter)
146138
.connect(username, password, database)
147139
.subscribe(subscriber);
148140
});
149141
}
150142

143+
private boolean tryIncreaseSize() {
144+
while(true) {
145+
final int current = currentSize.get();
146+
if(current == poolSize) {
147+
return false;
148+
}
149+
if(currentSize.compareAndSet(current, current + 1)) {
150+
return true;
151+
}
152+
}
153+
}
154+
151155
private void releaseIfPipelining(Connection connection) {
152156
if (pipeline) {
153157
release(connection);
@@ -163,29 +167,23 @@ private void releaseIfNotPipelining(Connection connection) {
163167
@Override
164168
public void release(Connection connection) {
165169

166-
if(closed) {
170+
if(closed.get()) {
167171
connection.close();
168172
return;
169173
}
170174

171-
Subscriber<? super Connection> next;
172-
boolean failed;
173-
174-
synchronized (lock) {
175-
failed = !((PgConnection) connection).isConnected();
176-
next = waiters.poll();
177-
if (next == null) {
178-
if(failed) {
179-
currentSize--;
180-
} else {
181-
connections.add(connection);
182-
}
183-
}
184-
}
175+
boolean failed = !((PgConnection) connection).isConnected();
176+
Subscriber<? super Connection> next = subscribers.poll();
185177

186178
if(next == null) {
179+
if(failed) {
180+
currentSize.decrementAndGet();
181+
} else {
182+
connections.add(connection);
183+
}
187184
return;
188185
}
186+
189187
if(failed) {
190188
getConnection().subscribe(next);
191189
return;

0 commit comments

Comments
 (0)