Skip to content

Commit e485a6e

Browse files
authored
Fix initialization order in KafkaClient (dpkp#2119)
Fix initialization order in KafkaClient
1 parent b32f369 commit e485a6e

File tree

1 file changed

+6
-3
lines changed

1 file changed

+6
-3
lines changed

kafka/client_async.py

+6-3
Original file line numberDiff line numberDiff line change
@@ -201,18 +201,22 @@ def __init__(self, **configs):
201201
if key in configs:
202202
self.config[key] = configs[key]
203203

204+
# these properties need to be set on top of the initialization pipeline
205+
# because they are used when __del__ method is called
206+
self._closed = False
207+
self._wake_r, self._wake_w = socket.socketpair()
208+
self._selector = self.config['selector']()
209+
204210
self.cluster = ClusterMetadata(**self.config)
205211
self._topics = set() # empty set will fetch all topic metadata
206212
self._metadata_refresh_in_progress = False
207-
self._selector = self.config['selector']()
208213
self._conns = Dict() # object to support weakrefs
209214
self._api_versions = None
210215
self._connecting = set()
211216
self._sending = set()
212217
self._refresh_on_disconnects = True
213218
self._last_bootstrap = 0
214219
self._bootstrap_fails = 0
215-
self._wake_r, self._wake_w = socket.socketpair()
216220
self._wake_r.setblocking(False)
217221
self._wake_w.settimeout(self.config['wakeup_timeout_ms'] / 1000.0)
218222
self._wake_lock = threading.Lock()
@@ -226,7 +230,6 @@ def __init__(self, **configs):
226230

227231
self._selector.register(self._wake_r, selectors.EVENT_READ)
228232
self._idle_expiry_manager = IdleConnectionManager(self.config['connections_max_idle_ms'])
229-
self._closed = False
230233
self._sensors = None
231234
if self.config['metrics']:
232235
self._sensors = KafkaClientMetrics(self.config['metrics'],

0 commit comments

Comments
 (0)