@@ -180,6 +180,8 @@ class BrokerConnection(object):
180
180
'receive_buffer_bytes' : None ,
181
181
'send_buffer_bytes' : None ,
182
182
'socket_options' : [(socket .IPPROTO_TCP , socket .TCP_NODELAY , 1 )],
183
+ 'sock_chunk_bytes' : 4096 , # undocumented experimental option
184
+ 'sock_chunk_buffer_count' : 1000 , # undocumented experimental option
183
185
'security_protocol' : 'PLAINTEXT' ,
184
186
'ssl_context' : None ,
185
187
'ssl_check_hostname' : True ,
@@ -748,19 +750,21 @@ def recv(self):
748
750
return responses
749
751
750
752
def _recv (self ):
751
- responses = []
752
- SOCK_CHUNK_BYTES = 4096
753
- while True :
753
+ """Take all available bytes from socket, return list of any responses from parser"""
754
+ recvd = []
755
+ while len ( recvd ) < self . config [ 'sock_chunk_buffer_count' ] :
754
756
try :
755
- data = self ._sock .recv (SOCK_CHUNK_BYTES )
756
- # We expect socket.recv to raise an exception if there is not
757
- # enough data to read the full bytes_to_read
757
+ data = self ._sock .recv (self . config [ 'sock_chunk_bytes' ] )
758
+ # We expect socket.recv to raise an exception if there are no
759
+ # bytes available to read from the socket in non-blocking mode.
758
760
# but if the socket is disconnected, we will get empty data
759
761
# without an exception raised
760
762
if not data :
761
763
log .error ('%s: socket disconnected' , self )
762
764
self .close (error = Errors .ConnectionError ('socket disconnected' ))
763
- break
765
+ return []
766
+ else :
767
+ recvd .append (data )
764
768
765
769
except SSLWantReadError :
766
770
break
@@ -770,27 +774,23 @@ def _recv(self):
770
774
log .exception ('%s: Error receiving network data'
771
775
' closing socket' , self )
772
776
self .close (error = Errors .ConnectionError (e ))
773
- break
777
+ return []
774
778
except BlockingIOError :
775
779
if six .PY3 :
776
780
break
777
781
raise
778
782
779
- if self ._sensors :
780
- self ._sensors .bytes_received .record (len (data ))
781
-
782
- try :
783
- more_responses = self ._protocol .receive_bytes (data )
784
- except Errors .KafkaProtocolError as e :
785
- self .close (e )
786
- break
787
- else :
788
- responses .extend ([resp for (_ , resp ) in more_responses ])
789
-
790
- if len (data ) < SOCK_CHUNK_BYTES :
791
- break
783
+ recvd_data = b'' .join (recvd )
784
+ if self ._sensors :
785
+ self ._sensors .bytes_received .record (len (recvd_data ))
792
786
793
- return responses
787
+ try :
788
+ responses = self ._protocol .receive_bytes (recvd_data )
789
+ except Errors .KafkaProtocolError as e :
790
+ self .close (e )
791
+ return []
792
+ else :
793
+ return [resp for (_ , resp ) in responses ] # drop correlation id
794
794
795
795
def requests_timed_out (self ):
796
796
if self .in_flight_requests :
0 commit comments