Skip to content

Commit b0124ad

Browse files
committed
Improved receive messages logic
1 parent a55f686 commit b0124ad

File tree

3 files changed

+25
-11
lines changed

3 files changed

+25
-11
lines changed

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
## 0.3.0 (2018-XX-XX)
44

5-
- Improved receive messages logic: `.strip()` replaced with `.replace()`
5+
- Improved receive messages logic
66
- Fixed regex deprecation warnings
77

88
## 0.2.0 (2018-09-01)

pynats/client.py

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -265,26 +265,35 @@ def _recv(self, *commands: Pattern[bytes]) -> Tuple[Pattern[bytes], Match[bytes]
265265

266266
return command, result
267267

268-
def _readline(self) -> bytes:
269-
lines = []
268+
def _readline(self, *, size: int = None) -> bytes:
269+
read = b""
270270

271271
while True:
272272
line = cast(bytes, self._socket_file.readline())
273-
lines.append(line)
273+
read += line
274274

275-
if line.endswith(_CRLF_):
275+
if size is not None:
276+
if len(self._strip(read)) == size:
277+
break
278+
elif line.endswith(_CRLF_):
276279
break
277280

278-
return b"".join(lines)
281+
return read
282+
283+
def _strip(self, line: bytes) -> bytes:
284+
return line[: -len(_CRLF_)]
279285

280286
def _get_command(self, line: bytes) -> Optional[Pattern[bytes]]:
281-
values = line.replace(_CRLF_, b"").split(b" ", 1)
287+
values = self._strip(line).split(b" ", 1)
282288

283289
return COMMANDS.get(values[0])
284290

285291
def _handle_message(self, result: Match[bytes]) -> None:
286292
message_data = result.groupdict()
287-
message_payload = self._readline().replace(_CRLF_, b"")
293+
294+
message_payload_size = int(message_data["size"])
295+
message_payload = self._readline(size=message_payload_size)
296+
message_payload = self._strip(message_payload)
288297

289298
message = NATSMessage(
290299
sid=int(message_data["sid"].decode()),

tests/test_client.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,12 @@ def worker():
136136
with NATSClient(socket_timeout=2) as client:
137137

138138
def callback(message):
139-
client.publish(message.reply, payload=msgpack.packb(32))
139+
client.publish(
140+
message.reply,
141+
payload=msgpack.packb(
142+
{b"v": 3338} if message.payload else {b"v": 32}
143+
),
144+
)
140145

141146
client.subscribe(
142147
"test-subject", callback=callback, queue="test-queue", max_messages=2
@@ -153,13 +158,13 @@ def callback(message):
153158
resp = client.request("test-subject")
154159
assert resp.subject.startswith("_INBOX.")
155160
assert resp.reply == ""
156-
assert msgpack.unpackb(resp.payload) == 32
161+
assert msgpack.unpackb(resp.payload) == {b"v": 32}
157162

158163
# request with payload
159164
resp = client.request("test-subject", payload=msgpack.packb("test-payload"))
160165
assert resp.subject.startswith("_INBOX.")
161166
assert resp.reply == ""
162-
assert msgpack.unpackb(resp.payload) == 32
167+
assert msgpack.unpackb(resp.payload) == {b"v": 3338}
163168

164169
t.join()
165170

0 commit comments

Comments
 (0)