@@ -55,7 +55,6 @@ def __init__(self, host, port, timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS):
55
55
self .host = host
56
56
self .port = port
57
57
self .timeout = timeout
58
- self ._dirty = None
59
58
self ._sock = None
60
59
61
60
self .reinit ()
@@ -68,7 +67,11 @@ def __repr__(self):
68
67
###################
69
68
70
69
def _raise_connection_error (self ):
71
- self ._dirty = True
70
+ # Cleanup socket if we have one
71
+ if self ._sock :
72
+ self .close ()
73
+
74
+ # And then raise
72
75
raise ConnectionError ("Kafka @ {0}:{1} went away" .format (self .host , self .port ))
73
76
74
77
def _read_bytes (self , num_bytes ):
@@ -78,7 +81,7 @@ def _read_bytes(self, num_bytes):
78
81
log .debug ("About to read %d bytes from Kafka" , num_bytes )
79
82
80
83
# Make sure we have a connection
81
- if self . _dirty or not self ._sock :
84
+ if not self ._sock :
82
85
self .reinit ()
83
86
84
87
while bytes_left :
@@ -110,7 +113,7 @@ def send(self, request_id, payload):
110
113
log .debug ("About to send %d bytes to Kafka, request %d" % (len (payload ), request_id ))
111
114
112
115
# Make sure we have a connection
113
- if self . _dirty or not self ._sock :
116
+ if not self ._sock :
114
117
self .reinit ()
115
118
116
119
try :
@@ -158,10 +161,12 @@ def reinit(self):
158
161
Re-initialize the socket connection
159
162
"""
160
163
log .debug ("Reinitializing socket connection for %s:%d" % (self .host , self .port ))
161
- self .close ()
164
+
165
+ if self ._sock :
166
+ self .close ()
167
+
162
168
try :
163
169
self ._sock = socket .create_connection ((self .host , self .port ), self .timeout )
164
- self ._dirty = False
165
170
except socket .error :
166
171
log .exception ('Unable to connect to kafka broker at %s:%d' % (self .host , self .port ))
167
172
self ._raise_connection_error ()
0 commit comments