@@ -201,18 +201,22 @@ def __init__(self, **configs):
201
201
if key in configs :
202
202
self .config [key ] = configs [key ]
203
203
204
+ # these properties need to be set on top of the initialization pipeline
205
+ # because they are used when __del__ method is called
206
+ self ._closed = False
207
+ self ._wake_r , self ._wake_w = socket .socketpair ()
208
+ self ._selector = self .config ['selector' ]()
209
+
204
210
self .cluster = ClusterMetadata (** self .config )
205
211
self ._topics = set () # empty set will fetch all topic metadata
206
212
self ._metadata_refresh_in_progress = False
207
- self ._selector = self .config ['selector' ]()
208
213
self ._conns = Dict () # object to support weakrefs
209
214
self ._api_versions = None
210
215
self ._connecting = set ()
211
216
self ._sending = set ()
212
217
self ._refresh_on_disconnects = True
213
218
self ._last_bootstrap = 0
214
219
self ._bootstrap_fails = 0
215
- self ._wake_r , self ._wake_w = socket .socketpair ()
216
220
self ._wake_r .setblocking (False )
217
221
self ._wake_w .settimeout (self .config ['wakeup_timeout_ms' ] / 1000.0 )
218
222
self ._wake_lock = threading .Lock ()
@@ -226,7 +230,6 @@ def __init__(self, **configs):
226
230
227
231
self ._selector .register (self ._wake_r , selectors .EVENT_READ )
228
232
self ._idle_expiry_manager = IdleConnectionManager (self .config ['connections_max_idle_ms' ])
229
- self ._closed = False
230
233
self ._sensors = None
231
234
if self .config ['metrics' ]:
232
235
self ._sensors = KafkaClientMetrics (self .config ['metrics' ],
0 commit comments