Skip to content

Commit bb07ff9

Browse files
richard78917Gr1N
authored andcommitted
[fix] block in close
In case that close() is called while doing read operation, the close() call is blocked and whole application is stucked. Shut-down of socket will resolve this issue and releases thread blocked at readline(). Regression test for graceful shutdown also included.
1 parent f426d37 commit bb07ff9

File tree

4 files changed

+41
-2
lines changed

4 files changed

+41
-2
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
## 0.7.0 (2020-XX-XX)
44

5+
- Fixed issue when threads are blocked on close while reading, #11 by @richard78917
6+
57
## 0.6.0 (2020-01-08)
68

79
- Added Python 3.8.* support, #10

pynats/client.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,11 @@
1818

1919
import pkg_resources
2020

21-
from pynats.exceptions import NATSInvalidResponse, NATSUnexpectedResponse
21+
from pynats.exceptions import (
22+
NATSInvalidResponse,
23+
NATSSocketError,
24+
NATSUnexpectedResponse,
25+
)
2226
from pynats.nuid import NUID
2327

2428
__all__ = ("NATSSubscription", "NATSMessage", "NATSClient")
@@ -154,6 +158,7 @@ def connect(self) -> None:
154158
self._recv(INFO_RE)
155159

156160
def close(self) -> None:
161+
self._socket.shutdown(socket.SHUT_RDWR)
157162
self._socket_file.close()
158163
self._socket.close()
159164

@@ -279,7 +284,11 @@ def _readline(self, *, size: int = None) -> bytes:
279284
read = io.BytesIO()
280285

281286
while True:
282-
line = cast(bytes, self._socket_file.readline())
287+
raw_bytes = self._socket_file.readline()
288+
if not raw_bytes:
289+
raise NATSSocketError(b"unable to read from socket")
290+
291+
line = cast(bytes, raw_bytes)
283292
read.write(line)
284293

285294
if size is not None:

pynats/exceptions.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,9 @@ class NATSInvalidResponse(NATSError):
1515
def __init__(self, line: bytes, *args, **kwargs) -> None:
1616
self.line = line
1717
super().__init__()
18+
19+
20+
class NATSSocketError(NATSError):
21+
def __init__(self, line: bytes, *args, **kwargs) -> None:
22+
self.line = line
23+
super().__init__()

tests/test_client.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import pytest
88

99
from pynats import NATSClient
10+
from pynats.exceptions import NATSSocketError
1011

1112

1213
@pytest.fixture
@@ -179,3 +180,24 @@ def test_request_timeout(nats_url):
179180
with NATSClient(nats_url, socket_timeout=2) as client:
180181
with pytest.raises(socket.timeout):
181182
client.request("test-subject")
183+
184+
185+
def test_graceful_shutdown(nats_url):
186+
def worker(client, connected_event):
187+
client.connect()
188+
connected_event.set()
189+
try:
190+
client.wait()
191+
except NATSSocketError:
192+
assert True
193+
except Exception:
194+
raise AssertionError("unexpected Exception raised")
195+
196+
client = NATSClient(nats_url)
197+
connected_event = threading.Event()
198+
thread = threading.Thread(target=worker, args=[client, connected_event])
199+
thread.start()
200+
assert connected_event.wait(5), "unable to connect"
201+
client.close()
202+
thread.join(5)
203+
assert not thread.is_alive(), "thread did not finish"

0 commit comments

Comments
 (0)