22
22
import rx .observers .Subscribers ;
23
23
24
24
import java .net .InetSocketAddress ;
25
- import java .util .concurrent . BlockingQueue ;
26
- import java .util .concurrent . LinkedBlockingQueue ;
25
+ import java .util .LinkedList ;
26
+ import java .util .Queue ;
27
27
import java .util .concurrent .atomic .AtomicBoolean ;
28
- import java .util .concurrent .atomic .AtomicInteger ;
29
-
28
+ import java .util .concurrent .locks .Condition ;
29
+ import java .util .concurrent .locks .ReentrantLock ;
30
+ import javax .annotation .concurrent .GuardedBy ;
30
31
import static java .util .concurrent .TimeUnit .SECONDS ;
31
32
32
33
/**
38
39
public abstract class PgConnectionPool implements ConnectionPool {
39
40
40
41
final int poolSize ;
41
- final AtomicInteger currentSize = new AtomicInteger ();
42
- final BlockingQueue <Subscriber <? super Connection >> subscribers = new LinkedBlockingQueue <>();
43
- final BlockingQueue <Connection > connections = new LinkedBlockingQueue <>();
42
+ final ReentrantLock lock = new ReentrantLock ();
43
+ @ GuardedBy ("lock" )
44
+ final Condition closingConnectionReleased = lock .newCondition ();
45
+ @ GuardedBy ("lock" )
46
+ int currentSize ;
47
+ @ GuardedBy ("lock" )
48
+ boolean closed ;
49
+ @ GuardedBy ("lock" )
50
+ final Queue <Subscriber <? super Connection >> subscribers = new LinkedList <>();
51
+ @ GuardedBy ("lock" )
52
+ final Queue <Connection > connections = new LinkedList <>();
44
53
45
54
final InetSocketAddress address ;
46
55
final String username ;
@@ -50,8 +59,6 @@ public abstract class PgConnectionPool implements ConnectionPool {
50
59
final ConnectionValidator validator ;
51
60
final boolean pipeline ;
52
61
53
- final AtomicBoolean closed = new AtomicBoolean ();
54
-
55
62
public PgConnectionPool (PoolProperties properties ) {
56
63
this .address = new InetSocketAddress (properties .getHostname (), properties .getPort ());
57
64
this .username = properties .getUsername ();
@@ -101,63 +108,82 @@ public Observable<String> listen(String channel) {
101
108
102
109
@ Override
103
110
public void close () {
104
- closed .set (true );
105
-
106
- while (!subscribers .isEmpty ()) {
107
- Subscriber <? super Connection > subscriber = subscribers .poll ();
108
- if (subscriber != null ) {
109
- subscriber .onError (new SqlException ("Connection pool is closing" ));
110
- }
111
- }
112
111
112
+ lock .lock ();
113
113
try {
114
- while (currentSize .get () > 0 ) {
115
- Connection connection = connections .poll (10 , SECONDS );
116
- if (connection == null ) {
117
- break ;
114
+ closed = true ;
115
+
116
+ while (!subscribers .isEmpty ()) {
117
+ Subscriber <? super Connection > subscriber = subscribers .poll ();
118
+ if (subscriber != null ) {
119
+ subscriber .onError (new SqlException ("Connection pool is closing" ));
118
120
}
119
- connection .close ();
120
- currentSize .decrementAndGet ();
121
121
}
122
- } catch (InterruptedException e ) { /* ignore */ }
122
+
123
+ try {
124
+ while (currentSize > 0 ) {
125
+ Connection connection = connections .poll ();
126
+ if (connection == null ) {
127
+ if (closingConnectionReleased .await (10 , SECONDS )) {
128
+ break ;
129
+ }
130
+ continue ;
131
+ }
132
+ currentSize --;
133
+ connection .close ();
134
+ }
135
+ } catch (InterruptedException e ) { /* ignore */ }
136
+ } finally {
137
+ lock .unlock ();
138
+ }
123
139
}
124
140
125
141
@ Override
126
142
public Observable <Connection > getConnection () {
127
- if (closed .get ()) {
128
- return Observable .error (new SqlException ("Connection pool is closed" ));
129
- }
130
-
131
143
return Observable .create (subscriber -> {
144
+ boolean locked = true ;
145
+ lock .lock ();
146
+ try {
147
+ if (closed ) {
148
+ lock .unlock ();
149
+ locked = false ;
150
+ subscriber .onError (new SqlException ("Connection pool is closed" ));
151
+ return ;
152
+ }
132
153
133
- Connection connection = connections .poll ();
134
- if (connection != null ) {
135
- subscriber .onNext (connection );
136
- subscriber .onCompleted ();
137
- return ;
138
- }
154
+ Connection connection = connections .poll ();
155
+ if (connection != null ) {
156
+ lock .unlock ();
157
+ locked = false ;
158
+ subscriber .onNext (connection );
159
+ subscriber .onCompleted ();
160
+ return ;
161
+ }
139
162
140
- if (!tryIncreaseSize ()) {
141
- subscribers .add (subscriber );
142
- return ;
163
+ if (!tryIncreaseSize ()) {
164
+ subscribers .add (subscriber );
165
+ return ;
166
+ }
167
+ lock .unlock ();
168
+ locked = false ;
169
+
170
+ new PgConnection (openStream (address ), dataConverter )
171
+ .connect (username , password , database )
172
+ .subscribe (subscriber );
173
+ } finally {
174
+ if (locked ) {
175
+ lock .unlock ();
176
+ }
143
177
}
144
-
145
- new PgConnection (openStream (address ), dataConverter )
146
- .connect (username , password , database )
147
- .subscribe (subscriber );
148
178
});
149
179
}
150
180
151
181
private boolean tryIncreaseSize () {
152
- while (true ) {
153
- final int current = currentSize .get ();
154
- if (current == poolSize ) {
155
- return false ;
156
- }
157
- if (currentSize .compareAndSet (current , current + 1 )) {
158
- return true ;
159
- }
182
+ if (currentSize >= poolSize ) {
183
+ return false ;
160
184
}
185
+ currentSize ++;
186
+ return true ;
161
187
}
162
188
163
189
private void releaseIfPipelining (Connection connection ) {
@@ -174,27 +200,26 @@ private void releaseIfNotPipelining(Connection connection) {
174
200
175
201
@ Override
176
202
public void release (Connection connection ) {
177
-
178
- if (closed .get ()) {
179
- connection .close ();
180
- return ;
181
- }
182
-
183
203
boolean failed = !((PgConnection ) connection ).isConnected ();
184
- Subscriber <? super Connection > next = subscribers .poll ();
185
204
186
- if (next == null ) {
187
- if (failed ) {
188
- currentSize .decrementAndGet ();
189
- } else {
190
- connections .add (connection );
205
+ Subscriber <? super Connection > next ;
206
+ lock .lock ();
207
+ try {
208
+ if (subscribers .isEmpty ()) {
209
+ if (failed ) {
210
+ currentSize --;
211
+ } else {
212
+ connections .add (connection );
213
+ }
214
+ if (closed ) {
215
+ this .closingConnectionReleased .signalAll ();
216
+ }
217
+ return ;
191
218
}
192
- return ;
193
- }
194
219
195
- if ( failed ) {
196
- getConnection (). subscribe ( next );
197
- return ;
220
+ next = subscribers . poll ();
221
+ } finally {
222
+ lock . unlock () ;
198
223
}
199
224
200
225
next .onNext (connection );
0 commit comments