Skip to content

Commit cda2d59

Browse files
committed
Merge pull request dpkp#640 from dpkp/selectors
Manage non-blocking I/O events with selectors module
2 parents 810f08b + 237bd73 commit cda2d59

File tree

3 files changed

+682
-33
lines changed

3 files changed

+682
-33
lines changed

kafka/client_async.py

Lines changed: 33 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,14 @@
66
import itertools
77
import logging
88
import random
9-
import select
9+
10+
# selectors in stdlib as of py3.4
11+
try:
12+
import selectors # pylint: disable=import-error
13+
except ImportError:
14+
# vendored backport module
15+
from . import selectors34 as selectors
16+
1017
import socket
1118
import time
1219

@@ -92,6 +99,7 @@ def __init__(self, **configs):
9299
self.cluster = ClusterMetadata(**self.config)
93100
self._topics = set() # empty set will fetch all topic metadata
94101
self._metadata_refresh_in_progress = False
102+
self._selector = selectors.DefaultSelector()
95103
self._conns = {}
96104
self._connecting = set()
97105
self._refresh_on_disconnects = True
@@ -101,6 +109,7 @@ def __init__(self, **configs):
101109
self._bootstrap(collect_hosts(self.config['bootstrap_servers']))
102110
self._wake_r, self._wake_w = socket.socketpair()
103111
self._wake_r.setblocking(False)
112+
self._selector.register(self._wake_r, selectors.EVENT_READ)
104113

105114
def __del__(self):
106115
self._wake_r.close()
@@ -160,11 +169,19 @@ def _can_connect(self, node_id):
160169
def _conn_state_change(self, node_id, conn):
161170
if conn.connecting():
162171
self._connecting.add(node_id)
172+
self._selector.register(conn._sock, selectors.EVENT_WRITE)
163173

164174
elif conn.connected():
165175
log.debug("Node %s connected", node_id)
166176
if node_id in self._connecting:
167177
self._connecting.remove(node_id)
178+
179+
try:
180+
self._selector.unregister(conn._sock)
181+
except KeyError:
182+
pass
183+
self._selector.register(conn._sock, selectors.EVENT_READ, conn)
184+
168185
if 'bootstrap' in self._conns and node_id != 'bootstrap':
169186
bootstrap = self._conns.pop('bootstrap')
170187
# XXX: make conn.close() require error to cause refresh
@@ -176,6 +193,10 @@ def _conn_state_change(self, node_id, conn):
176193
elif conn.state is ConnectionStates.DISCONNECTING:
177194
if node_id in self._connecting:
178195
self._connecting.remove(node_id)
196+
try:
197+
self._selector.unregister(conn._sock)
198+
except KeyError:
199+
pass
179200
if self._refresh_on_disconnects:
180201
log.warning("Node %s connect failed -- refreshing metadata", node_id)
181202
self.cluster.request_update()
@@ -388,45 +409,25 @@ def poll(self, timeout_ms=None, future=None, sleep=True):
388409

389410
return responses
390411

391-
def _poll(self, timeout, sleep=False):
412+
def _poll(self, timeout, sleep=True):
392413
# select on reads across all connected sockets, blocking up to timeout
393-
sockets = dict([(conn._sock, conn)
394-
for conn in six.itervalues(self._conns)
395-
if conn.state is ConnectionStates.CONNECTED
396-
and conn.in_flight_requests])
397-
if not sockets:
398-
# if sockets are connecting, we can wake when they are writeable
399-
if self._connecting:
400-
sockets = [self._conns[node]._sock for node in self._connecting]
401-
select.select([self._wake_r], sockets, [], timeout)
402-
elif timeout:
403-
if sleep:
404-
log.debug('Sleeping at %s for %s', time.time(), timeout)
405-
select.select([self._wake_r], [], [], timeout)
406-
log.debug('Woke up at %s', time.time())
407-
else:
408-
log.warning('_poll called with a non-zero timeout and'
409-
' sleep=False -- but there was nothing to do.'
410-
' This can cause high CPU usage during idle.')
411-
self._clear_wake_fd()
412-
return []
413-
414-
# Add a private pipe fd to allow external wakeups
415-
fds = list(sockets.keys())
416-
fds.append(self._wake_r)
417-
ready, _, _ = select.select(fds, [], [], timeout)
418-
414+
assert self.in_flight_request_count() > 0 or self._connecting or sleep
419415
responses = []
420-
for sock in ready:
421-
if sock == self._wake_r:
416+
for key, events in self._selector.select(timeout):
417+
if key.fileobj is self._wake_r:
418+
self._clear_wake_fd()
419+
continue
420+
elif not (events & selectors.EVENT_READ):
422421
continue
423-
conn = sockets[sock]
422+
conn = key.data
424423
while conn.in_flight_requests:
425424
response = conn.recv() # Note: conn.recv runs callbacks / errbacks
425+
426+
# Incomplete responses are buffered internally
427+
# while conn.in_flight_requests retains the request
426428
if not response:
427429
break
428430
responses.append(response)
429-
self._clear_wake_fd()
430431
return responses
431432

432433
def in_flight_request_count(self, node_id=None):

0 commit comments

Comments
 (0)