Skip to content

Commit 6013e88

Browse files
author
Mikko Tiihonen
committed
Fix race condition between release and getConnection: Move all pool state back inside a big lock.
Also add findbugs @GuardedBy annotations to make static checking of locking reliable
1 parent 59dfc2e commit 6013e88

File tree

2 files changed

+108
-66
lines changed

2 files changed

+108
-66
lines changed

pom.xml

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,16 @@
5454
<target>1.8</target>
5555
</configuration>
5656
</plugin>
57+
<plugin>
58+
<groupId>org.codehaus.mojo</groupId>
59+
<artifactId>findbugs-maven-plugin</artifactId>
60+
<version>3.0.1</version>
61+
<configuration>
62+
<findbugsXmlOutput>true</findbugsXmlOutput>
63+
<effort>Max</effort>
64+
<threshold>Low</threshold>
65+
</configuration>
66+
</plugin>
5767
</plugins>
5868
</build>
5969

@@ -73,6 +83,13 @@
7383
<artifactId>rxjava</artifactId>
7484
<version>1.0.14</version>
7585
</dependency>
86+
<dependency>
87+
<groupId>com.google.code.findbugs</groupId>
88+
<artifactId>annotations</artifactId>
89+
<version>3.0.0</version>
90+
<scope>provided</scope>
91+
</dependency>
92+
7693
<dependency>
7794
<groupId>junit</groupId>
7895
<artifactId>junit</artifactId>

src/main/java/com/github/pgasync/impl/PgConnectionPool.java

Lines changed: 91 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,12 @@
2222
import rx.observers.Subscribers;
2323

2424
import java.net.InetSocketAddress;
25-
import java.util.concurrent.BlockingQueue;
26-
import java.util.concurrent.LinkedBlockingQueue;
25+
import java.util.LinkedList;
26+
import java.util.Queue;
2727
import java.util.concurrent.atomic.AtomicBoolean;
28-
import java.util.concurrent.atomic.AtomicInteger;
29-
28+
import java.util.concurrent.locks.Condition;
29+
import java.util.concurrent.locks.ReentrantLock;
30+
import javax.annotation.concurrent.GuardedBy;
3031
import static java.util.concurrent.TimeUnit.SECONDS;
3132

3233
/**
@@ -38,9 +39,17 @@
3839
public abstract class PgConnectionPool implements ConnectionPool {
3940

4041
final int poolSize;
41-
final AtomicInteger currentSize = new AtomicInteger();
42-
final BlockingQueue<Subscriber<? super Connection>> subscribers = new LinkedBlockingQueue<>();
43-
final BlockingQueue<Connection> connections = new LinkedBlockingQueue<>();
42+
final ReentrantLock lock = new ReentrantLock();
43+
@GuardedBy("lock")
44+
final Condition closingConnectionReleased = lock.newCondition();
45+
@GuardedBy("lock")
46+
int currentSize;
47+
@GuardedBy("lock")
48+
boolean closed;
49+
@GuardedBy("lock")
50+
final Queue<Subscriber<? super Connection>> subscribers = new LinkedList<>();
51+
@GuardedBy("lock")
52+
final Queue<Connection> connections = new LinkedList<>();
4453

4554
final InetSocketAddress address;
4655
final String username;
@@ -50,8 +59,6 @@ public abstract class PgConnectionPool implements ConnectionPool {
5059
final ConnectionValidator validator;
5160
final boolean pipeline;
5261

53-
final AtomicBoolean closed = new AtomicBoolean();
54-
5562
public PgConnectionPool(PoolProperties properties) {
5663
this.address = new InetSocketAddress(properties.getHostname(), properties.getPort());
5764
this.username = properties.getUsername();
@@ -101,63 +108,82 @@ public Observable<String> listen(String channel) {
101108

102109
@Override
103110
public void close() {
104-
closed.set(true);
105-
106-
while(!subscribers.isEmpty()) {
107-
Subscriber<? super Connection> subscriber = subscribers.poll();
108-
if(subscriber != null) {
109-
subscriber.onError(new SqlException("Connection pool is closing"));
110-
}
111-
}
112111

112+
lock.lock();
113113
try {
114-
while (currentSize.get() > 0) {
115-
Connection connection = connections.poll(10, SECONDS);
116-
if(connection == null) {
117-
break;
114+
closed = true;
115+
116+
while(!subscribers.isEmpty()) {
117+
Subscriber<? super Connection> subscriber = subscribers.poll();
118+
if(subscriber != null) {
119+
subscriber.onError(new SqlException("Connection pool is closing"));
118120
}
119-
connection.close();
120-
currentSize.decrementAndGet();
121121
}
122-
} catch (InterruptedException e) { /* ignore */ }
122+
123+
try {
124+
while (currentSize > 0) {
125+
Connection connection = connections.poll();
126+
if(connection == null) {
127+
if (closingConnectionReleased.await(10, SECONDS)) {
128+
break;
129+
}
130+
continue;
131+
}
132+
currentSize--;
133+
connection.close();
134+
}
135+
} catch (InterruptedException e) { /* ignore */ }
136+
} finally {
137+
lock.unlock();
138+
}
123139
}
124140

125141
@Override
126142
public Observable<Connection> getConnection() {
127-
if(closed.get()) {
128-
return Observable.error(new SqlException("Connection pool is closed"));
129-
}
130-
131143
return Observable.create(subscriber -> {
144+
boolean locked = true;
145+
lock.lock();
146+
try {
147+
if (closed) {
148+
lock.unlock();
149+
locked = false;
150+
subscriber.onError(new SqlException("Connection pool is closed"));
151+
return;
152+
}
132153

133-
Connection connection = connections.poll();
134-
if (connection != null) {
135-
subscriber.onNext(connection);
136-
subscriber.onCompleted();
137-
return;
138-
}
154+
Connection connection = connections.poll();
155+
if (connection != null) {
156+
lock.unlock();
157+
locked = false;
158+
subscriber.onNext(connection);
159+
subscriber.onCompleted();
160+
return;
161+
}
139162

140-
if(!tryIncreaseSize()) {
141-
subscribers.add(subscriber);
142-
return;
163+
if(!tryIncreaseSize()) {
164+
subscribers.add(subscriber);
165+
return;
166+
}
167+
lock.unlock();
168+
locked = false;
169+
170+
new PgConnection(openStream(address), dataConverter)
171+
.connect(username, password, database)
172+
.subscribe(subscriber);
173+
} finally {
174+
if (locked) {
175+
lock.unlock();
176+
}
143177
}
144-
145-
new PgConnection(openStream(address), dataConverter)
146-
.connect(username, password, database)
147-
.subscribe(subscriber);
148178
});
149179
}
150180

151181
private boolean tryIncreaseSize() {
152-
while(true) {
153-
final int current = currentSize.get();
154-
if(current == poolSize) {
155-
return false;
156-
}
157-
if(currentSize.compareAndSet(current, current + 1)) {
158-
return true;
159-
}
182+
if(currentSize >= poolSize) {
183+
return false;
160184
}
185+
currentSize++;
186+
return true;
161187
}
162188

163189
private void releaseIfPipelining(Connection connection) {
@@ -174,27 +200,26 @@ private void releaseIfNotPipelining(Connection connection) {
174200

175201
@Override
176202
public void release(Connection connection) {
177-
178-
if(closed.get()) {
179-
connection.close();
180-
return;
181-
}
182-
183203
boolean failed = !((PgConnection) connection).isConnected();
184-
Subscriber<? super Connection> next = subscribers.poll();
185204

186-
if(next == null) {
187-
if(failed) {
188-
currentSize.decrementAndGet();
189-
} else {
190-
connections.add(connection);
205+
Subscriber<? super Connection> next;
206+
lock.lock();
207+
try {
208+
if(subscribers.isEmpty()) {
209+
if(failed) {
210+
currentSize--;
211+
} else {
212+
connections.add(connection);
213+
}
214+
if (closed) {
215+
this.closingConnectionReleased.signalAll();
216+
}
217+
return;
191218
}
192-
return;
193-
}
194219

195-
if(failed) {
196-
getConnection().subscribe(next);
197-
return;
220+
next = subscribers.poll();
221+
} finally {
222+
lock.unlock();
198223
}
199224

200225
next.onNext(connection);

0 commit comments

Comments
 (0)