Skip to content

Commit ce96752

Browse files
authored
Make BrokerConnection .host / .port / .afi immutable, use _sock_* attributes for current lookups (dpkp#1422)
1 parent 4abdb1b commit ce96752

File tree

2 files changed

+45
-24
lines changed

2 files changed

+45
-24
lines changed

kafka/conn.py

+25-15
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,14 @@ class SSLWantWriteError(Exception):
7878
gssapi = None
7979
GSSError = None
8080

81+
82+
AFI_NAMES = {
83+
socket.AF_UNSPEC: "unspecified",
84+
socket.AF_INET: "IPv4",
85+
socket.AF_INET6: "IPv6",
86+
}
87+
88+
8189
class ConnectionStates(object):
8290
DISCONNECTING = '<disconnecting>'
8391
DISCONNECTED = '<disconnected>'
@@ -204,13 +212,12 @@ class BrokerConnection(object):
204212
SASL_MECHANISMS = ('PLAIN', 'GSSAPI')
205213

206214
def __init__(self, host, port, afi, **configs):
207-
self.hostname = host
208215
self.host = host
209216
self.port = port
210217
self.afi = afi
211-
self._init_host = host
212-
self._init_port = port
213-
self._init_afi = afi
218+
self._sock_ip = host
219+
self._sock_port = port
220+
self._sock_afi = afi
214221
self.in_flight_requests = collections.deque()
215222
self._api_versions = None
216223

@@ -266,10 +273,10 @@ def __init__(self, host, port, afi, **configs):
266273

267274
def _next_afi_host_port(self):
268275
if not self._gai:
269-
self._gai = dns_lookup(self._init_host, self._init_port, self._init_afi)
276+
self._gai = dns_lookup(self.host, self.port, self.afi)
270277
if not self._gai:
271278
log.error('DNS lookup failed for %s:%i (%s)',
272-
self._init_host, self._init_port, self._init_afi)
279+
self.host, self.port, self.afi)
273280
return
274281

275282
afi, _, __, ___, sockaddr = self._gai.pop(0)
@@ -286,8 +293,8 @@ def connect(self):
286293
return
287294
else:
288295
log.debug('%s: creating new socket', self)
289-
self.afi, self.host, self.port = next_lookup
290-
self._sock = socket.socket(self.afi, socket.SOCK_STREAM)
296+
self._sock_afi, self._sock_ip, self._sock_port = next_lookup
297+
self._sock = socket.socket(self._sock_afi, socket.SOCK_STREAM)
291298

292299
for option in self.config['socket_options']:
293300
log.debug('%s: setting socket option %s', self, option)
@@ -301,15 +308,17 @@ def connect(self):
301308
# so we need to double check that we are still connecting before
302309
if self.connecting():
303310
self.config['state_change_callback'](self)
304-
log.info('%s: connecting to %s:%d', self, self.host, self.port)
311+
log.info('%s: connecting to %s:%d [%s:%d %s]', self, self.host,
312+
self.port, self._sock_ip, self._sock_port,
313+
AFI_NAMES[self._sock_afi])
305314

306315
if self.state is ConnectionStates.CONNECTING:
307316
# in non-blocking mode, use repeated calls to socket.connect_ex
308317
# to check connection status
309318
request_timeout = self.config['request_timeout_ms'] / 1000.0
310319
ret = None
311320
try:
312-
ret = self._sock.connect_ex((self.host, self.port))
321+
ret = self._sock.connect_ex((self._sock_ip, self._sock_port))
313322
except socket.error as err:
314323
ret = err.errno
315324

@@ -400,7 +409,7 @@ def _wrap_ssl(self):
400409
try:
401410
self._sock = self._ssl_context.wrap_socket(
402411
self._sock,
403-
server_hostname=self.hostname,
412+
server_hostname=self.host,
404413
do_handshake_on_connect=False)
405414
except ssl.SSLError as e:
406415
log.exception('%s: Failed to wrap socket in SSLContext!', self)
@@ -524,7 +533,7 @@ def _try_authenticate_plain(self, future):
524533
return future.success(True)
525534

526535
def _try_authenticate_gssapi(self, future):
527-
auth_id = self.config['sasl_kerberos_service_name'] + '@' + self.hostname
536+
auth_id = self.config['sasl_kerberos_service_name'] + '@' + self.host
528537
gssapi_name = gssapi.Name(
529538
auth_id,
530539
name_type=gssapi.NameType.hostbased_service
@@ -962,9 +971,10 @@ def connect():
962971
self.config[key] = stashed[key]
963972
return version
964973

965-
def __repr__(self):
966-
return "<BrokerConnection node_id=%s host=%s/%s port=%d>" % (
967-
self.node_id, self.hostname, self.host, self.port)
974+
def __str__(self):
975+
return "<BrokerConnection node_id=%s host=%s:%d %s [%s:%d %s]>" % (
976+
self.node_id, self.host, self.port, self.state,
977+
self._sock_ip, self._sock_port, AFI_NAMES[self._sock_afi])
968978

969979

970980
class BrokerConnectionMetrics(object):

test/test_conn.py

+20-9
Original file line numberDiff line numberDiff line change
@@ -255,35 +255,43 @@ def test_lookup_on_connect():
255255
hostname = 'example.org'
256256
port = 9092
257257
conn = BrokerConnection(hostname, port, socket.AF_UNSPEC)
258-
assert conn.host == conn.hostname == hostname
258+
assert conn.host == hostname
259+
assert conn.port == port
260+
assert conn.afi == socket.AF_UNSPEC
259261
ip1 = '127.0.0.1'
262+
afi1 = socket.AF_INET
260263
mock_return1 = [
261-
(2, 2, 17, '', (ip1, 9092)),
264+
(afi1, socket.SOCK_STREAM, 6, '', (ip1, 9092)),
262265
]
263266
with mock.patch("socket.getaddrinfo", return_value=mock_return1) as m:
264267
conn.connect()
265268
m.assert_called_once_with(hostname, port, 0, 1)
266269
conn.close()
267-
assert conn.host == ip1
270+
assert conn._sock_ip == ip1
271+
assert conn._sock_port == 9092
272+
assert conn._sock_afi == afi1
268273

269-
ip2 = '127.0.0.2'
274+
ip2 = '::1'
275+
afi2 = socket.AF_INET6
270276
mock_return2 = [
271-
(2, 2, 17, '', (ip2, 9092)),
277+
(afi2, socket.SOCK_STREAM, 6, '', (ip2, 9092)),
272278
]
273279

274280
with mock.patch("socket.getaddrinfo", return_value=mock_return2) as m:
275281
conn.last_attempt = 0
276282
conn.connect()
277283
m.assert_called_once_with(hostname, port, 0, 1)
278284
conn.close()
279-
assert conn.host == ip2
285+
assert conn._sock_ip == ip2
286+
assert conn._sock_port == 9092
287+
assert conn._sock_afi == afi2
280288

281289

282290
def test_relookup_on_failure():
283291
hostname = 'example.org'
284292
port = 9092
285293
conn = BrokerConnection(hostname, port, socket.AF_UNSPEC)
286-
assert conn.host == conn.hostname == hostname
294+
assert conn.host == hostname
287295
mock_return1 = []
288296
with mock.patch("socket.getaddrinfo", return_value=mock_return1) as m:
289297
last_attempt = conn.last_attempt
@@ -293,13 +301,16 @@ def test_relookup_on_failure():
293301
assert conn.last_attempt > last_attempt
294302

295303
ip2 = '127.0.0.2'
304+
afi2 = socket.AF_INET
296305
mock_return2 = [
297-
(2, 2, 17, '', (ip2, 9092)),
306+
(afi2, socket.SOCK_STREAM, 6, '', (ip2, 9092)),
298307
]
299308

300309
with mock.patch("socket.getaddrinfo", return_value=mock_return2) as m:
301310
conn.last_attempt = 0
302311
conn.connect()
303312
m.assert_called_once_with(hostname, port, 0, 1)
304313
conn.close()
305-
assert conn.host == ip2
314+
assert conn._sock_ip == ip2
315+
assert conn._sock_port == 9092
316+
assert conn._sock_afi == afi2

0 commit comments

Comments
 (0)