Skip to content

Commit 5e12585

Browse files
committed
Re-establish LISTEN subscription on error.
1 parent e15b9bf commit 5e12585

File tree

1 file changed

+3
-6
lines changed

1 file changed

+3
-6
lines changed

src/main/java/com/github/pgasync/impl/PgConnectionPool.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,7 @@
2323

2424
import java.net.InetSocketAddress;
2525
import java.util.LinkedList;
26-
import java.util.Map;
2726
import java.util.Queue;
28-
import java.util.concurrent.ConcurrentHashMap;
2927
import java.util.concurrent.atomic.AtomicBoolean;
3028

3129
/**
@@ -43,8 +41,6 @@ public abstract class PgConnectionPool implements ConnectionPool {
4341
final Queue<Connection> connections = new LinkedList<>();
4442
final Object lock = new Object[0];
4543

46-
final Map<String,Connection> listeners = new ConcurrentHashMap<>();
47-
4844
final InetSocketAddress address;
4945
final String username;
5046
final String password;
@@ -98,8 +94,9 @@ public Observable<String> listen(String channel) {
9894
return getConnection()
9995
.lift(subscriber -> Subscribers.create(
10096
connection -> connection.listen(channel)
101-
.doOnSubscribe(() -> release(connection))
102-
.subscribe(subscriber),
97+
.doOnSubscribe(() -> release(connection))
98+
.onErrorResumeNext(exception -> listen(channel))
99+
.subscribe(subscriber),
103100
subscriber::onError));
104101
}
105102

0 commit comments

Comments
 (0)