Skip to content

Commit a69320b

Browse files
authored
Read all available socket bytes (dpkp#1332)
* Recv all available network bytes before parsing * Add experimental support for configuring socket chunking parameters
1 parent 0a74924 commit a69320b

File tree

4 files changed

+28
-22
lines changed

4 files changed

+28
-22
lines changed

kafka/client_async.py

+2
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,8 @@ class KafkaClient(object):
154154
'receive_buffer_bytes': None,
155155
'send_buffer_bytes': None,
156156
'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)],
157+
'sock_chunk_bytes': 4096, # undocumented experimental option
158+
'sock_chunk_buffer_count': 1000, # undocumented experimental option
157159
'retry_backoff_ms': 100,
158160
'metadata_max_age_ms': 300000,
159161
'security_protocol': 'PLAINTEXT',

kafka/conn.py

+22-22
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,8 @@ class BrokerConnection(object):
180180
'receive_buffer_bytes': None,
181181
'send_buffer_bytes': None,
182182
'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)],
183+
'sock_chunk_bytes': 4096, # undocumented experimental option
184+
'sock_chunk_buffer_count': 1000, # undocumented experimental option
183185
'security_protocol': 'PLAINTEXT',
184186
'ssl_context': None,
185187
'ssl_check_hostname': True,
@@ -748,19 +750,21 @@ def recv(self):
748750
return responses
749751

750752
def _recv(self):
751-
responses = []
752-
SOCK_CHUNK_BYTES = 4096
753-
while True:
753+
"""Take all available bytes from socket, return list of any responses from parser"""
754+
recvd = []
755+
while len(recvd) < self.config['sock_chunk_buffer_count']:
754756
try:
755-
data = self._sock.recv(SOCK_CHUNK_BYTES)
756-
# We expect socket.recv to raise an exception if there is not
757-
# enough data to read the full bytes_to_read
757+
data = self._sock.recv(self.config['sock_chunk_bytes'])
758+
# We expect socket.recv to raise an exception if there are no
759+
# bytes available to read from the socket in non-blocking mode.
758760
# but if the socket is disconnected, we will get empty data
759761
# without an exception raised
760762
if not data:
761763
log.error('%s: socket disconnected', self)
762764
self.close(error=Errors.ConnectionError('socket disconnected'))
763-
break
765+
return []
766+
else:
767+
recvd.append(data)
764768

765769
except SSLWantReadError:
766770
break
@@ -770,27 +774,23 @@ def _recv(self):
770774
log.exception('%s: Error receiving network data'
771775
' closing socket', self)
772776
self.close(error=Errors.ConnectionError(e))
773-
break
777+
return []
774778
except BlockingIOError:
775779
if six.PY3:
776780
break
777781
raise
778782

779-
if self._sensors:
780-
self._sensors.bytes_received.record(len(data))
781-
782-
try:
783-
more_responses = self._protocol.receive_bytes(data)
784-
except Errors.KafkaProtocolError as e:
785-
self.close(e)
786-
break
787-
else:
788-
responses.extend([resp for (_, resp) in more_responses])
789-
790-
if len(data) < SOCK_CHUNK_BYTES:
791-
break
783+
recvd_data = b''.join(recvd)
784+
if self._sensors:
785+
self._sensors.bytes_received.record(len(recvd_data))
792786

793-
return responses
787+
try:
788+
responses = self._protocol.receive_bytes(recvd_data)
789+
except Errors.KafkaProtocolError as e:
790+
self.close(e)
791+
return []
792+
else:
793+
return [resp for (_, resp) in responses] # drop correlation id
794794

795795
def requests_timed_out(self):
796796
if self.in_flight_requests:

kafka/consumer/group.py

+2
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,8 @@ class KafkaConsumer(six.Iterator):
270270
'receive_buffer_bytes': None,
271271
'send_buffer_bytes': None,
272272
'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)],
273+
'sock_chunk_bytes': 4096, # undocumented experimental option
274+
'sock_chunk_buffer_count': 1000, # undocumented experimental option
273275
'consumer_timeout_ms': float('inf'),
274276
'skip_double_compressed_messages': False,
275277
'security_protocol': 'PLAINTEXT',

kafka/producer/kafka.py

+2
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,8 @@ class KafkaProducer(object):
292292
'receive_buffer_bytes': None,
293293
'send_buffer_bytes': None,
294294
'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)],
295+
'sock_chunk_bytes': 4096, # undocumented experimental option
296+
'sock_chunk_buffer_count': 1000, # undocumented experimental option
295297
'reconnect_backoff_ms': 50,
296298
'reconnect_backoff_max': 1000,
297299
'max_in_flight_requests_per_connection': 5,

0 commit comments

Comments
 (0)