6
6
import itertools
7
7
import logging
8
8
import random
9
- import select
9
+
10
+ # selectors in stdlib as of py3.4
11
+ try :
12
+ import selectors # pylint: disable=import-error
13
+ except ImportError :
14
+ # vendored backport module
15
+ from . import selectors34 as selectors
16
+
10
17
import socket
11
18
import time
12
19
@@ -92,6 +99,7 @@ def __init__(self, **configs):
92
99
self .cluster = ClusterMetadata (** self .config )
93
100
self ._topics = set () # empty set will fetch all topic metadata
94
101
self ._metadata_refresh_in_progress = False
102
+ self ._selector = selectors .DefaultSelector ()
95
103
self ._conns = {}
96
104
self ._connecting = set ()
97
105
self ._refresh_on_disconnects = True
@@ -101,6 +109,7 @@ def __init__(self, **configs):
101
109
self ._bootstrap (collect_hosts (self .config ['bootstrap_servers' ]))
102
110
self ._wake_r , self ._wake_w = socket .socketpair ()
103
111
self ._wake_r .setblocking (False )
112
+ self ._selector .register (self ._wake_r , selectors .EVENT_READ )
104
113
105
114
def __del__ (self ):
106
115
self ._wake_r .close ()
@@ -160,11 +169,19 @@ def _can_connect(self, node_id):
160
169
def _conn_state_change (self , node_id , conn ):
161
170
if conn .connecting ():
162
171
self ._connecting .add (node_id )
172
+ self ._selector .register (conn ._sock , selectors .EVENT_WRITE )
163
173
164
174
elif conn .connected ():
165
175
log .debug ("Node %s connected" , node_id )
166
176
if node_id in self ._connecting :
167
177
self ._connecting .remove (node_id )
178
+
179
+ try :
180
+ self ._selector .unregister (conn ._sock )
181
+ except KeyError :
182
+ pass
183
+ self ._selector .register (conn ._sock , selectors .EVENT_READ , conn )
184
+
168
185
if 'bootstrap' in self ._conns and node_id != 'bootstrap' :
169
186
bootstrap = self ._conns .pop ('bootstrap' )
170
187
# XXX: make conn.close() require error to cause refresh
@@ -176,6 +193,10 @@ def _conn_state_change(self, node_id, conn):
176
193
elif conn .state is ConnectionStates .DISCONNECTING :
177
194
if node_id in self ._connecting :
178
195
self ._connecting .remove (node_id )
196
+ try :
197
+ self ._selector .unregister (conn ._sock )
198
+ except KeyError :
199
+ pass
179
200
if self ._refresh_on_disconnects :
180
201
log .warning ("Node %s connect failed -- refreshing metadata" , node_id )
181
202
self .cluster .request_update ()
@@ -388,45 +409,25 @@ def poll(self, timeout_ms=None, future=None, sleep=True):
388
409
389
410
return responses
390
411
391
- def _poll (self , timeout , sleep = False ):
412
+ def _poll (self , timeout , sleep = True ):
392
413
# select on reads across all connected sockets, blocking up to timeout
393
- sockets = dict ([(conn ._sock , conn )
394
- for conn in six .itervalues (self ._conns )
395
- if conn .state is ConnectionStates .CONNECTED
396
- and conn .in_flight_requests ])
397
- if not sockets :
398
- # if sockets are connecting, we can wake when they are writeable
399
- if self ._connecting :
400
- sockets = [self ._conns [node ]._sock for node in self ._connecting ]
401
- select .select ([self ._wake_r ], sockets , [], timeout )
402
- elif timeout :
403
- if sleep :
404
- log .debug ('Sleeping at %s for %s' , time .time (), timeout )
405
- select .select ([self ._wake_r ], [], [], timeout )
406
- log .debug ('Woke up at %s' , time .time ())
407
- else :
408
- log .warning ('_poll called with a non-zero timeout and'
409
- ' sleep=False -- but there was nothing to do.'
410
- ' This can cause high CPU usage during idle.' )
411
- self ._clear_wake_fd ()
412
- return []
413
-
414
- # Add a private pipe fd to allow external wakeups
415
- fds = list (sockets .keys ())
416
- fds .append (self ._wake_r )
417
- ready , _ , _ = select .select (fds , [], [], timeout )
418
-
414
+ assert self .in_flight_request_count () > 0 or self ._connecting or sleep
419
415
responses = []
420
- for sock in ready :
421
- if sock == self ._wake_r :
416
+ for key , events in self ._selector .select (timeout ):
417
+ if key .fileobj is self ._wake_r :
418
+ self ._clear_wake_fd ()
419
+ continue
420
+ elif not (events & selectors .EVENT_READ ):
422
421
continue
423
- conn = sockets [ sock ]
422
+ conn = key . data
424
423
while conn .in_flight_requests :
425
424
response = conn .recv () # Note: conn.recv runs callbacks / errbacks
425
+
426
+ # Incomplete responses are buffered internally
427
+ # while conn.in_flight_requests retains the request
426
428
if not response :
427
429
break
428
430
responses .append (response )
429
- self ._clear_wake_fd ()
430
431
return responses
431
432
432
433
def in_flight_request_count (self , node_id = None ):
0 commit comments