22
22
import rx .observers .Subscribers ;
23
23
24
24
import java .net .InetSocketAddress ;
25
- import java .util .LinkedList ;
26
- import java .util .Queue ;
25
+ import java .util .ArrayList ;
26
+ import java .util .List ;
27
+ import java .util .concurrent .BlockingQueue ;
28
+ import java .util .concurrent .LinkedBlockingQueue ;
27
29
import java .util .concurrent .atomic .AtomicBoolean ;
30
+ import java .util .concurrent .atomic .AtomicInteger ;
28
31
29
32
/**
30
33
* Pool for backend connections. Callbacks are queued and executed when pool has an available
31
34
* connection.
32
35
*
33
- * TODO: Locking scheme is optimized for small thread pools and doesn't scale all that well
34
- * for large ones.
35
- *
36
36
* @author Antti Laisi
37
37
*/
38
38
public abstract class PgConnectionPool implements ConnectionPool {
39
39
40
- final Queue <Subscriber <? super Connection >> waiters = new LinkedList <>();
41
- final Queue <Connection > connections = new LinkedList <>();
42
- final Object lock = new Object [0 ];
40
+ final int poolSize ;
41
+ final AtomicInteger currentSize = new AtomicInteger ();
42
+ final BlockingQueue <Subscriber <? super Connection >> subscribers = new LinkedBlockingQueue <>();
43
+ final BlockingQueue <Connection > connections = new LinkedBlockingQueue <>();
43
44
44
45
final InetSocketAddress address ;
45
46
final String username ;
46
47
final String password ;
47
48
final String database ;
48
49
final DataConverter dataConverter ;
49
50
final ConnectionValidator validator ;
51
+ final boolean pipeline ;
50
52
51
- final int poolSize ;
52
- protected final boolean pipeline ;
53
-
54
- int currentSize ;
55
- volatile boolean closed ;
53
+ final AtomicBoolean closed = new AtomicBoolean ();
56
54
57
55
public PgConnectionPool (PoolProperties properties ) {
58
56
this .address = new InetSocketAddress (properties .getHostname (), properties .getPort ());
@@ -78,76 +76,82 @@ public Observable<ResultSet> querySet(String sql, Object... params) {
78
76
return getConnection ()
79
77
.doOnNext (this ::releaseIfPipelining )
80
78
.flatMap (connection -> connection .querySet (sql , params )
81
- .doOnTerminate (() -> releaseIfNotPipelining (connection )));
79
+ .doOnTerminate (() -> releaseIfNotPipelining (connection )));
82
80
}
83
81
84
82
@ Override
85
83
public Observable <Transaction > begin () {
86
84
return getConnection ()
87
85
.flatMap (connection -> connection .begin ()
88
- .doOnError (t -> release (connection ))
89
- .map (tx -> new ReleasingTransaction (connection , tx )));
86
+ .doOnError (t -> release (connection ))
87
+ .map (tx -> new ReleasingTransaction (connection , tx )));
90
88
}
91
89
92
90
@ Override
93
91
public Observable <String > listen (String channel ) {
94
92
return getConnection ()
95
93
.lift (subscriber -> Subscribers .create (
96
94
connection -> connection .listen (channel )
97
- .doOnSubscribe (() -> release (connection ))
98
- .onErrorResumeNext (exception -> listen (channel ))
99
- .subscribe (subscriber ),
95
+ .doOnSubscribe (() -> release (connection ))
96
+ .onErrorResumeNext (exception -> listen (channel ))
97
+ .subscribe (subscriber ),
100
98
subscriber ::onError ));
101
99
}
102
100
103
101
@ Override
102
+ @ SuppressWarnings ("MismatchedQueryAndUpdateOfCollection" )
104
103
public void close () {
105
- closed = true ;
106
- synchronized (lock ) {
107
- for (Connection conn = connections .poll (); conn != null ; conn = connections .poll ()) {
108
- conn .close ();
109
- // TODO: remove conn from listeners
110
- }
111
- for (Subscriber <? super Connection > waiter = waiters .poll (); waiter != null ; waiter = waiters .poll ()) {
112
- waiter .onError (new SqlException ("Connection pool is closed" ));
113
- }
104
+ closed .set (true );
105
+
106
+ List <Connection > closeConnections = new ArrayList <>();
107
+ if (connections .drainTo (closeConnections ) > 0 ) {
108
+ closeConnections .forEach (Connection ::close );
109
+ }
110
+
111
+ List <Subscriber <? super Connection >> cancelSubscribers = new ArrayList <>();
112
+ if (subscribers .drainTo (cancelSubscribers ) > 0 ) {
113
+ cancelSubscribers .forEach (subscriber -> subscriber .onError (new SqlException ("Connection pool is closed" )));
114
114
}
115
115
}
116
116
117
117
@ Override
118
118
public Observable <Connection > getConnection () {
119
- if (closed ) {
119
+ if (closed . get () ) {
120
120
return Observable .error (new SqlException ("Connection pool is closed" ));
121
121
}
122
122
123
123
return Observable .create (subscriber -> {
124
124
125
- Connection connection ;
126
-
127
- synchronized (lock ) {
128
- connection = connections .poll ();
129
- if (connection == null ) {
130
- if (currentSize < poolSize ) {
131
- currentSize ++;
132
- } else {
133
- waiters .add (subscriber );
134
- return ;
135
- }
136
- }
137
- }
138
-
125
+ Connection connection = connections .poll ();
139
126
if (connection != null ) {
140
127
subscriber .onNext (connection );
141
128
subscriber .onCompleted ();
142
129
return ;
143
130
}
144
131
132
+ if (!tryIncreaseSize ()) {
133
+ subscribers .add (subscriber );
134
+ return ;
135
+ }
136
+
145
137
new PgConnection (openStream (address ), dataConverter )
146
138
.connect (username , password , database )
147
139
.subscribe (subscriber );
148
140
});
149
141
}
150
142
143
+ private boolean tryIncreaseSize () {
144
+ while (true ) {
145
+ final int current = currentSize .get ();
146
+ if (current == poolSize ) {
147
+ return false ;
148
+ }
149
+ if (currentSize .compareAndSet (current , current + 1 )) {
150
+ return true ;
151
+ }
152
+ }
153
+ }
154
+
151
155
private void releaseIfPipelining (Connection connection ) {
152
156
if (pipeline ) {
153
157
release (connection );
@@ -163,29 +167,23 @@ private void releaseIfNotPipelining(Connection connection) {
163
167
@ Override
164
168
public void release (Connection connection ) {
165
169
166
- if (closed ) {
170
+ if (closed . get () ) {
167
171
connection .close ();
168
172
return ;
169
173
}
170
174
171
- Subscriber <? super Connection > next ;
172
- boolean failed ;
173
-
174
- synchronized (lock ) {
175
- failed = !((PgConnection ) connection ).isConnected ();
176
- next = waiters .poll ();
177
- if (next == null ) {
178
- if (failed ) {
179
- currentSize --;
180
- } else {
181
- connections .add (connection );
182
- }
183
- }
184
- }
175
+ boolean failed = !((PgConnection ) connection ).isConnected ();
176
+ Subscriber <? super Connection > next = subscribers .poll ();
185
177
186
178
if (next == null ) {
179
+ if (failed ) {
180
+ currentSize .decrementAndGet ();
181
+ } else {
182
+ connections .add (connection );
183
+ }
187
184
return ;
188
185
}
186
+
189
187
if (failed ) {
190
188
getConnection ().subscribe (next );
191
189
return ;
0 commit comments