Skip to content

Commit 4162989

Browse files
authored
Use socket_options configuration to setsockopts(). Default TCP_NODELAY (dpkp#783)
1 parent 64d3607 commit 4162989

File tree

4 files changed

+39
-17
lines changed

4 files changed

+39
-17
lines changed

kafka/client_async.py

+11-7
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ class KafkaClient(object):
5454
'max_in_flight_requests_per_connection': 5,
5555
'receive_buffer_bytes': None,
5656
'send_buffer_bytes': None,
57+
'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)],
5758
'retry_backoff_ms': 100,
5859
'metadata_max_age_ms': 300000,
5960
'security_protocol': 'PLAINTEXT',
@@ -93,26 +94,29 @@ def __init__(self, **configs):
9394
server-side log entries that correspond to this client. Also
9495
submitted to GroupCoordinator for logging with respect to
9596
consumer group administration. Default: 'kafka-python-{version}'
96-
request_timeout_ms (int): Client request timeout in milliseconds.
97-
Default: 40000.
9897
reconnect_backoff_ms (int): The amount of time in milliseconds to
9998
wait before attempting to reconnect to a given host.
10099
Default: 50.
100+
request_timeout_ms (int): Client request timeout in milliseconds.
101+
Default: 40000.
102+
retry_backoff_ms (int): Milliseconds to backoff when retrying on
103+
errors. Default: 100.
101104
max_in_flight_requests_per_connection (int): Requests are pipelined
102105
to kafka brokers up to this number of maximum requests per
103106
broker connection. Default: 5.
104-
send_buffer_bytes (int): The size of the TCP send buffer
105-
(SO_SNDBUF) to use when sending data. Default: None (relies on
106-
system defaults). Java client defaults to 131072.
107107
receive_buffer_bytes (int): The size of the TCP receive buffer
108108
(SO_RCVBUF) to use when reading data. Default: None (relies on
109109
system defaults). Java client defaults to 32768.
110+
send_buffer_bytes (int): The size of the TCP send buffer
111+
(SO_SNDBUF) to use when sending data. Default: None (relies on
112+
system defaults). Java client defaults to 131072.
113+
socket_options (list): List of tuple-arguments to socket.setsockopt
114+
to apply to broker connection sockets. Default:
115+
[(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)]
110116
metadata_max_age_ms (int): The period of time in milliseconds after
111117
which we force a refresh of metadata even if we haven't seen any
112118
partition leadership changes to proactively discover any new
113119
brokers or partitions. Default: 300000
114-
retry_backoff_ms (int): Milliseconds to backoff when retrying on
115-
errors. Default: 100.
116120
security_protocol (str): Protocol used to communicate with brokers.
117121
Valid values are: PLAINTEXT, SSL. Default: PLAINTEXT.
118122
ssl_context (ssl.SSLContext): pre-configured SSLContext for wrapping

kafka/conn.py

+14-6
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ class BrokerConnection(object):
6060
'max_in_flight_requests_per_connection': 5,
6161
'receive_buffer_bytes': None,
6262
'send_buffer_bytes': None,
63+
'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)],
6364
'security_protocol': 'PLAINTEXT',
6465
'ssl_context': None,
6566
'ssl_check_hostname': True,
@@ -84,6 +85,15 @@ def __init__(self, host, port, afi, **configs):
8485
if key in configs:
8586
self.config[key] = configs[key]
8687

88+
if self.config['receive_buffer_bytes'] is not None:
89+
self.config['socket_options'].append(
90+
(socket.SOL_SOCKET, socket.SO_RCVBUF,
91+
self.config['receive_buffer_bytes']))
92+
if self.config['send_buffer_bytes'] is not None:
93+
self.config['socket_options'].append(
94+
(socket.SOL_SOCKET, socket.SO_SNDBUF,
95+
self.config['send_buffer_bytes']))
96+
8797
self.state = ConnectionStates.DISCONNECTED
8898
self._sock = None
8999
self._ssl_context = None
@@ -144,12 +154,10 @@ def connect(self):
144154
self._sock = socket.socket(afi, socket.SOCK_STREAM)
145155
else:
146156
self._sock = socket.socket(self.afi, socket.SOCK_STREAM)
147-
if self.config['receive_buffer_bytes'] is not None:
148-
self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF,
149-
self.config['receive_buffer_bytes'])
150-
if self.config['send_buffer_bytes'] is not None:
151-
self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF,
152-
self.config['send_buffer_bytes'])
157+
158+
for option in self.config['socket_options']:
159+
self._sock.setsockopt(*option)
160+
153161
self._sock.setblocking(False)
154162
if self.config['security_protocol'] in ('SSL', 'SASL_SSL'):
155163
self._wrap_ssl()

kafka/consumer/group.py

+9-4
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import copy
44
import logging
5+
import socket
56
import time
67

78
import six
@@ -114,12 +115,15 @@ class KafkaConsumer(six.Iterator):
114115
rebalances. Default: 3000
115116
session_timeout_ms (int): The timeout used to detect failures when
116117
using Kafka's group managementment facilities. Default: 30000
117-
send_buffer_bytes (int): The size of the TCP send buffer
118-
(SO_SNDBUF) to use when sending data. Default: None (relies on
119-
system defaults). The java client defaults to 131072.
120118
receive_buffer_bytes (int): The size of the TCP receive buffer
121119
(SO_RCVBUF) to use when reading data. Default: None (relies on
122120
system defaults). The java client defaults to 32768.
121+
send_buffer_bytes (int): The size of the TCP send buffer
122+
(SO_SNDBUF) to use when sending data. Default: None (relies on
123+
system defaults). The java client defaults to 131072.
124+
socket_options (list): List of tuple-arguments to socket.setsockopt
125+
to apply to broker connection sockets. Default:
126+
[(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)]
123127
consumer_timeout_ms (int): number of milliseconds to block during
124128
message iteration before raising StopIteration (i.e., ending the
125129
iterator). Default -1 (block forever).
@@ -209,8 +213,9 @@ class KafkaConsumer(six.Iterator):
209213
'partition_assignment_strategy': (RangePartitionAssignor, RoundRobinPartitionAssignor),
210214
'heartbeat_interval_ms': 3000,
211215
'session_timeout_ms': 30000,
212-
'send_buffer_bytes': None,
213216
'receive_buffer_bytes': None,
217+
'send_buffer_bytes': None,
218+
'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)],
214219
'consumer_timeout_ms': -1,
215220
'skip_double_compressed_messages': False,
216221
'security_protocol': 'PLAINTEXT',

kafka/producer/kafka.py

+5
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import atexit
44
import copy
55
import logging
6+
import socket
67
import threading
78
import time
89
import weakref
@@ -188,6 +189,9 @@ class KafkaProducer(object):
188189
send_buffer_bytes (int): The size of the TCP send buffer
189190
(SO_SNDBUF) to use when sending data. Default: None (relies on
190191
system defaults). Java client defaults to 131072.
192+
socket_options (list): List of tuple-arguments to socket.setsockopt
193+
to apply to broker connection sockets. Default:
194+
[(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)]
191195
reconnect_backoff_ms (int): The amount of time in milliseconds to
192196
wait before attempting to reconnect to a given host.
193197
Default: 50.
@@ -256,6 +260,7 @@ class KafkaProducer(object):
256260
'request_timeout_ms': 30000,
257261
'receive_buffer_bytes': None,
258262
'send_buffer_bytes': None,
263+
'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)],
259264
'reconnect_backoff_ms': 50,
260265
'max_in_flight_requests_per_connection': 5,
261266
'security_protocol': 'PLAINTEXT',

0 commit comments

Comments
 (0)