@@ -78,6 +78,14 @@ class SSLWantWriteError(Exception):
78
78
gssapi = None
79
79
GSSError = None
80
80
81
+
82
+ AFI_NAMES = {
83
+ socket .AF_UNSPEC : "unspecified" ,
84
+ socket .AF_INET : "IPv4" ,
85
+ socket .AF_INET6 : "IPv6" ,
86
+ }
87
+
88
+
81
89
class ConnectionStates (object ):
82
90
DISCONNECTING = '<disconnecting>'
83
91
DISCONNECTED = '<disconnected>'
@@ -204,13 +212,12 @@ class BrokerConnection(object):
204
212
SASL_MECHANISMS = ('PLAIN' , 'GSSAPI' )
205
213
206
214
def __init__ (self , host , port , afi , ** configs ):
207
- self .hostname = host
208
215
self .host = host
209
216
self .port = port
210
217
self .afi = afi
211
- self ._init_host = host
212
- self ._init_port = port
213
- self ._init_afi = afi
218
+ self ._sock_ip = host
219
+ self ._sock_port = port
220
+ self ._sock_afi = afi
214
221
self .in_flight_requests = collections .deque ()
215
222
self ._api_versions = None
216
223
@@ -266,10 +273,10 @@ def __init__(self, host, port, afi, **configs):
266
273
267
274
def _next_afi_host_port (self ):
268
275
if not self ._gai :
269
- self ._gai = dns_lookup (self ._init_host , self ._init_port , self ._init_afi )
276
+ self ._gai = dns_lookup (self .host , self .port , self .afi )
270
277
if not self ._gai :
271
278
log .error ('DNS lookup failed for %s:%i (%s)' ,
272
- self ._init_host , self ._init_port , self ._init_afi )
279
+ self .host , self .port , self .afi )
273
280
return
274
281
275
282
afi , _ , __ , ___ , sockaddr = self ._gai .pop (0 )
@@ -286,8 +293,8 @@ def connect(self):
286
293
return
287
294
else :
288
295
log .debug ('%s: creating new socket' , self )
289
- self .afi , self .host , self .port = next_lookup
290
- self ._sock = socket .socket (self .afi , socket .SOCK_STREAM )
296
+ self ._sock_afi , self ._sock_ip , self ._sock_port = next_lookup
297
+ self ._sock = socket .socket (self ._sock_afi , socket .SOCK_STREAM )
291
298
292
299
for option in self .config ['socket_options' ]:
293
300
log .debug ('%s: setting socket option %s' , self , option )
@@ -301,15 +308,17 @@ def connect(self):
301
308
# so we need to double check that we are still connecting before
302
309
if self .connecting ():
303
310
self .config ['state_change_callback' ](self )
304
- log .info ('%s: connecting to %s:%d' , self , self .host , self .port )
311
+ log .info ('%s: connecting to %s:%d [%s:%d %s]' , self , self .host ,
312
+ self .port , self ._sock_ip , self ._sock_port ,
313
+ AFI_NAMES [self ._sock_afi ])
305
314
306
315
if self .state is ConnectionStates .CONNECTING :
307
316
# in non-blocking mode, use repeated calls to socket.connect_ex
308
317
# to check connection status
309
318
request_timeout = self .config ['request_timeout_ms' ] / 1000.0
310
319
ret = None
311
320
try :
312
- ret = self ._sock .connect_ex ((self .host , self .port ))
321
+ ret = self ._sock .connect_ex ((self ._sock_ip , self ._sock_port ))
313
322
except socket .error as err :
314
323
ret = err .errno
315
324
@@ -400,7 +409,7 @@ def _wrap_ssl(self):
400
409
try :
401
410
self ._sock = self ._ssl_context .wrap_socket (
402
411
self ._sock ,
403
- server_hostname = self .hostname ,
412
+ server_hostname = self .host ,
404
413
do_handshake_on_connect = False )
405
414
except ssl .SSLError as e :
406
415
log .exception ('%s: Failed to wrap socket in SSLContext!' , self )
@@ -524,7 +533,7 @@ def _try_authenticate_plain(self, future):
524
533
return future .success (True )
525
534
526
535
def _try_authenticate_gssapi (self , future ):
527
- auth_id = self .config ['sasl_kerberos_service_name' ] + '@' + self .hostname
536
+ auth_id = self .config ['sasl_kerberos_service_name' ] + '@' + self .host
528
537
gssapi_name = gssapi .Name (
529
538
auth_id ,
530
539
name_type = gssapi .NameType .hostbased_service
@@ -962,9 +971,10 @@ def connect():
962
971
self .config [key ] = stashed [key ]
963
972
return version
964
973
965
- def __repr__ (self ):
966
- return "<BrokerConnection node_id=%s host=%s/%s port=%d>" % (
967
- self .node_id , self .hostname , self .host , self .port )
974
+ def __str__ (self ):
975
+ return "<BrokerConnection node_id=%s host=%s:%d %s [%s:%d %s]>" % (
976
+ self .node_id , self .host , self .port , self .state ,
977
+ self ._sock_ip , self ._sock_port , AFI_NAMES [self ._sock_afi ])
968
978
969
979
970
980
class BrokerConnectionMetrics (object ):
0 commit comments