Skip to content

Commit e15b9bf

Browse files
committed
Release connections to pool on transaction COMMIT/ROLLBACK.
1 parent 92978e8 commit e15b9bf

10 files changed

+169
-154
lines changed

src/main/java/com/github/pgasync/QueryExecutor.java

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,6 @@ public interface QueryExecutor {
2323
*/
2424
Observable<Row> queryRows(String sql, Object... params);
2525

26-
/**
27-
* Executes a simple query.
28-
*
29-
* @param sql SQL to execute.
30-
* @return Cold observable that emits 0-n rows.
31-
*/
32-
default Observable<Row> queryRows(String sql) {
33-
return queryRows(sql, (Object[]) null);
34-
}
35-
3626
/**
3727
* Executes an anonymous prepared statement. Uses native PostgreSQL syntax with $arg instead of ?
3828
* to mark parameters. Supported parameter types are String, Character, Number, Time, Date, Timestamp
@@ -44,16 +34,6 @@ default Observable<Row> queryRows(String sql) {
4434
*/
4535
Observable<ResultSet> querySet(String sql, Object... params);
4636

47-
/**
48-
* Executes a simple query.
49-
*
50-
* @param sql SQL to execute.
51-
* @return Cold observable that emits a single result set.
52-
*/
53-
default Observable<ResultSet> querySet(String sql) {
54-
return querySet(sql, (Object[]) null);
55-
}
56-
5737
/**
5838
* Executes a simple query.
5939
*

src/main/java/com/github/pgasync/TransactionExecutor.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
import rx.Observable;
44

5+
import java.util.function.Consumer;
6+
57
/**
68
* TransactionExecutor begins backend transactions.
79
*
@@ -14,4 +16,13 @@ public interface TransactionExecutor {
1416
*/
1517
Observable<Transaction> begin();
1618

19+
/**
20+
* Begins a transaction.
21+
*
22+
* @param onTransaction Called when transaction is successfully started.
23+
* @param onError Called on exception thrown
24+
*/
25+
default void begin(Consumer<Transaction> onTransaction, Consumer<Throwable> onError) {
26+
begin().subscribe(onTransaction::accept, onError::accept);
27+
}
1728
}

src/main/java/com/github/pgasync/impl/PgConnection.java

Lines changed: 44 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,11 @@
2222
import com.github.pgasync.impl.message.*;
2323
import rx.Observable;
2424
import rx.Subscriber;
25-
import rx.Subscription;
26-
import rx.subscriptions.Subscriptions;
25+
import rx.observers.Subscribers;
2726

28-
import java.util.ArrayList;
29-
import java.util.HashMap;
30-
import java.util.List;
31-
import java.util.Map;
32-
import java.util.concurrent.atomic.AtomicBoolean;
33-
import java.util.concurrent.atomic.AtomicReference;
27+
import java.util.*;
28+
import java.util.function.Consumer;
29+
import java.util.stream.Collectors;
3430

3531
import static com.github.pgasync.impl.message.RowDescription.ColumnDescription;
3632

@@ -40,7 +36,7 @@
4036
*
4137
* @author Antti Laisi
4238
*/
43-
public class PgConnection implements Connection, Transaction {
39+
public class PgConnection implements Connection {
4440

4541
final PgProtocolStream stream;
4642
final DataConverter dataConverter;
@@ -53,7 +49,7 @@ public PgConnection(PgProtocolStream stream, DataConverter dataConverter) {
5349
Observable<Connection> connect(String username, String password, String database) {
5450
return stream.connect(new StartupMessage(username, database))
5551
.flatMap(message -> authenticate(username, password, message))
56-
.filter(ReadyForQuery.class::isInstance)
52+
.single(message -> message == ReadyForQuery.INSTANCE)
5753
.map(ready -> this);
5854
}
5955

@@ -81,30 +77,17 @@ public Observable<Row> queryRows(String sql, Object... params) {
8177

8278
@Override
8379
public Observable<Transaction> begin() {
84-
return queryRows("BEGIN").map(row -> this);
85-
}
86-
87-
@Override
88-
public Observable<Void> commit() {
89-
return queryRows("COMMIT").map(row -> null);
90-
}
91-
92-
@Override
93-
public Observable<Void> rollback() {
94-
return queryRows("ROLLBACK").map(row -> null);
80+
return querySet("BEGIN").map(rs -> new PgConnectionTransaction());
9581
}
9682

9783
@Override
9884
public Observable<String> listen(String channel) {
99-
AtomicReference<String> token = new AtomicReference<>();
100-
return Observable.<String>create(subscriber ->
101-
102-
querySet("LISTEN " + channel)
103-
.subscribe( rs -> token.set(stream.registerNotificationHandler(channel, subscriber::onNext)),
104-
subscriber::onError)
105-
106-
).doOnUnsubscribe(() -> querySet("UNLISTEN " + channel)
107-
.subscribe(rs -> stream.unRegisterNotificationHandler(channel, token.get())));
85+
// TODO: wait for commit before sending unlisten as otherwise it can be rolled back
86+
return querySet("LISTEN " + channel)
87+
.<String>lift(subscriber -> Subscribers.create( rs -> stream.listen(channel)
88+
.subscribe(subscriber),
89+
subscriber::onError))
90+
.doOnUnsubscribe(() -> querySet("UNLISTEN " + channel).subscribe(rs -> { }));
10891
}
10992

11093
@Override
@@ -187,4 +170,35 @@ static Map<String,PgColumn> getColumns(ColumnDescription[] descriptions) {
187170
return columns;
188171
}
189172

173+
/**
174+
* Transaction that rollbacks the tx on backend error and closes the connection on COMMIT/ROLLBACK failure.
175+
*/
176+
class PgConnectionTransaction implements Transaction {
177+
@Override
178+
public Observable<Void> commit() {
179+
return PgConnection.this.querySet("COMMIT")
180+
.map(rs -> (Void) null)
181+
.doOnError(exception -> close());
182+
}
183+
@Override
184+
public Observable<Void> rollback() {
185+
return PgConnection.this.querySet("ROLLBACK")
186+
.map(rs -> (Void) null)
187+
.doOnError(exception -> close());
188+
}
189+
@Override
190+
public Observable<Row> queryRows(String sql, Object... params) {
191+
return PgConnection.this.queryRows(sql, params)
192+
.onErrorResumeNext(this::doRollback);
193+
}
194+
@Override
195+
public Observable<ResultSet> querySet(String sql, Object... params) {
196+
return PgConnection.this.querySet(sql, params)
197+
.onErrorResumeNext(this::doRollback);
198+
}
199+
<T> Observable<T> doRollback(Throwable t) {
200+
return rollback().flatMap(__ -> Observable.error(t));
201+
}
202+
}
203+
190204
}

src/main/java/com/github/pgasync/impl/PgConnectionPool.java

Lines changed: 23 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,14 @@
1919
import com.github.pgasync.impl.conversion.DataConverter;
2020
import rx.Observable;
2121
import rx.Subscriber;
22+
import rx.observers.Subscribers;
2223

2324
import java.net.InetSocketAddress;
2425
import java.util.LinkedList;
2526
import java.util.Map;
2627
import java.util.Queue;
2728
import java.util.concurrent.ConcurrentHashMap;
29+
import java.util.concurrent.atomic.AtomicBoolean;
2830

2931
/**
3032
* Pool for backend connections. Callbacks are queued and executed when pool has an available
@@ -72,17 +74,15 @@ public Observable<Row> queryRows(String sql, Object... params) {
7274
return getConnection()
7375
.doOnNext(this::releaseIfPipelining)
7476
.flatMap(connection -> connection.queryRows(sql, params)
75-
.doOnError(t -> releaseIfNotPipelining(connection))
76-
.doOnCompleted(() -> releaseIfNotPipelining(connection)));
77+
.doOnTerminate(() -> releaseIfNotPipelining(connection)));
7778
}
7879

7980
@Override
8081
public Observable<ResultSet> querySet(String sql, Object... params) {
8182
return getConnection()
8283
.doOnNext(this::releaseIfPipelining)
8384
.flatMap(connection -> connection.querySet(sql, params)
84-
.doOnError(t -> releaseIfNotPipelining(connection))
85-
.doOnCompleted(() -> releaseIfNotPipelining(connection)));
85+
.doOnTerminate(() -> releaseIfNotPipelining(connection)));
8686
}
8787

8888
@Override
@@ -96,8 +96,11 @@ public Observable<Transaction> begin() {
9696
@Override
9797
public Observable<String> listen(String channel) {
9898
return getConnection()
99-
.flatMap(connection -> connection.listen(channel)
100-
.doOnSubscribe(() -> release(connection)));
99+
.lift(subscriber -> Subscribers.create(
100+
connection -> connection.listen(channel)
101+
.doOnSubscribe(() -> release(connection))
102+
.subscribe(subscriber),
103+
subscriber::onError));
101104
}
102105

103106
@Override
@@ -232,10 +235,12 @@ void validateAndApply(Connection connection, Consumer<Connection> onConnection,
232235
*/
233236

234237
/**
235-
* Transaction that rollbacks the tx on backend error and chains releasing the connection after COMMIT/ROLLBACK.
238+
* Transaction that chains releasing the connection after COMMIT/ROLLBACK.
236239
*/
237240
class ReleasingTransaction implements Transaction {
238-
Connection txconn;
241+
242+
final AtomicBoolean released = new AtomicBoolean();
243+
final Connection txconn;
239244
final Transaction transaction;
240245

241246
ReleasingTransaction(Connection txconn, Transaction transaction) {
@@ -247,53 +252,41 @@ class ReleasingTransaction implements Transaction {
247252
public Observable<Void> rollback() {
248253
return transaction.rollback()
249254
.doOnCompleted(this::releaseConnection)
250-
.doOnError(exception -> closeAndRelease());
255+
.doOnError(exception -> closeAndReleaseConnection());
251256
}
252257

253258
@Override
254259
public Observable<Void> commit() {
255260
return transaction.commit()
256261
.doOnCompleted(this::releaseConnection)
257-
.onErrorResumeNext(this::doRollback);
262+
.doOnError(exception -> closeAndReleaseConnection());
258263
}
259264

260265
@Override
261266
public Observable<Row> queryRows(String sql, Object... params) {
262-
if (txconn == null) {
267+
if (released.get()) {
263268
return Observable.error(new SqlException("Transaction is already completed"));
264269
}
265-
return txconn.queryRows(sql)
266-
.onErrorResumeNext(this::doRollback);
270+
return transaction.queryRows(sql)
271+
.doOnError(exception -> releaseConnection());
267272
}
268273

269274
@Override
270275
public Observable<ResultSet> querySet(String sql, Object... params) {
271-
if (txconn == null) {
276+
if (released.get()) {
272277
return Observable.error(new SqlException("Transaction is already completed"));
273278
}
274-
return txconn.querySet(sql, params)
275-
.onErrorResumeNext(this::doRollback);
279+
return transaction.querySet(sql, params)
280+
.doOnError(exception -> releaseConnection());
276281
}
277282

278283
void releaseConnection() {
279284
release(txconn);
280-
txconn = null;
285+
released.set(true);
281286
}
282-
void closeAndRelease() {
287+
void closeAndReleaseConnection() {
283288
txconn.close();
284289
releaseConnection();
285290
}
286-
287-
<T> Observable<T> doRollback(Throwable exception) {
288-
if (!((PgConnection) txconn).isConnected()) {
289-
release(txconn);
290-
txconn = null;
291-
return Observable.error(exception);
292-
}
293-
294-
return transaction.rollback()
295-
.doOnError(rollbackException -> closeAndRelease())
296-
.flatMap(__ -> Observable.error(exception));
297-
}
298291
}
299292
}

src/main/java/com/github/pgasync/impl/PgProtocolStream.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@
1919
import com.github.pgasync.impl.message.StartupMessage;
2020
import rx.Observable;
2121

22-
import java.util.function.Consumer;
23-
2422
/**
2523
* Stream of messages from/to backend server.
2624
*
@@ -34,12 +32,10 @@ public interface PgProtocolStream {
3432

3533
Observable<Message> send(Message... messages);
3634

35+
Observable<String> listen(String channel);
36+
3737
boolean isConnected();
3838

3939
void close();
4040

41-
String registerNotificationHandler(String channel, Consumer<String> onNotification);
42-
43-
void unRegisterNotificationHandler(String channel, String unlistenToken);
44-
4541
}

src/main/java/com/github/pgasync/impl/netty/NettyPgConnectionPool.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public NettyPgConnectionPool(PoolProperties properties) {
4040

4141
@Override
4242
protected PgProtocolStream openStream(InetSocketAddress address) {
43-
return new NettyPgProtocolStream(group, address, useSsl, pipeline);
43+
return new NettyPgProtocolStream(group, address, useSsl);
4444
}
4545

4646
@Override

0 commit comments

Comments
 (0)