Skip to content

Commit 6b801a8

Browse files
Lars Jørgen Solbergdpkp
Lars Jørgen Solberg
authored andcommitted
implement sasl PLAIN mechanism
1 parent c693709 commit 6b801a8

File tree

6 files changed

+176
-4
lines changed

6 files changed

+176
-4
lines changed

kafka/client_async.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,9 @@ class KafkaClient(object):
7070
'selector': selectors.DefaultSelector,
7171
'metrics': None,
7272
'metric_group_prefix': '',
73+
'sasl_mechanism': None,
74+
'sasl_plain_username': None,
75+
'sasl_plain_password': None,
7376
}
7477
API_VERSIONS = [
7578
(0, 10),
@@ -150,6 +153,13 @@ def __init__(self, **configs):
150153
metrics (kafka.metrics.Metrics): Optionally provide a metrics
151154
instance for capturing network IO stats. Default: None.
152155
metric_group_prefix (str): Prefix for metric names. Default: ''
156+
sasl_mechanism (str): string picking sasl mechanism when security_protocol
157+
is SASL_PLAINTEXT or SASL_SSL. Currently only PLAIN is supported.
158+
Default: None
159+
sasl_plain_username (str): username for sasl PLAIN authentication.
160+
Default: None
161+
sasl_plain_password (str): passowrd for sasl PLAIN authentication.
162+
Defualt: None
153163
"""
154164
self.config = copy.copy(self.DEFAULT_CONFIG)
155165
for key in self.config:

kafka/conn.py

Lines changed: 103 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import kafka.errors as Errors
1616
from kafka.future import Future
1717
from kafka.protocol.api import RequestHeader
18+
from kafka.protocol.admin import SaslHandShakeRequest, SaslHandShakeResponse
1819
from kafka.protocol.commit import GroupCoordinatorResponse
1920
from kafka.protocol.types import Int32
2021
from kafka.version import __version__
@@ -48,7 +49,7 @@ class ConnectionStates(object):
4849
CONNECTING = '<connecting>'
4950
HANDSHAKE = '<handshake>'
5051
CONNECTED = '<connected>'
51-
52+
AUTHENTICATING = '<authenticating>'
5253

5354
InFlightRequest = collections.namedtuple('InFlightRequest',
5455
['request', 'response_type', 'correlation_id', 'future', 'timestamp'])
@@ -73,6 +74,9 @@ class BrokerConnection(object):
7374
'ssl_password': None,
7475
'api_version': (0, 8, 2), # default to most restrictive
7576
'state_change_callback': lambda conn: True,
77+
'sasl_mechanism': None,
78+
'sasl_plain_username': None,
79+
'sasl_plain_password': None
7680
}
7781

7882
def __init__(self, host, port, afi, **configs):
@@ -188,6 +192,8 @@ def connect(self):
188192
if self.config['security_protocol'] in ('SSL', 'SASL_SSL'):
189193
log.debug('%s: initiating SSL handshake', str(self))
190194
self.state = ConnectionStates.HANDSHAKE
195+
elif self.config['security_protocol'] == 'SASL_PLAINTEXT':
196+
self.state = ConnectionStates.AUTHENTICATING
191197
else:
192198
self.state = ConnectionStates.CONNECTED
193199
self.config['state_change_callback'](self)
@@ -211,6 +217,15 @@ def connect(self):
211217
if self.state is ConnectionStates.HANDSHAKE:
212218
if self._try_handshake():
213219
log.debug('%s: completed SSL handshake.', str(self))
220+
if self.config['security_protocol'] == 'SASL_SSL':
221+
self.state = ConnectionStates.AUTHENTICATING
222+
else:
223+
self.state = ConnectionStates.CONNECTED
224+
self.config['state_change_callback'](self)
225+
226+
if self.state is ConnectionStates.AUTHENTICATING:
227+
if self._try_authenticate():
228+
log.debug('%s: Authenticated as %s', str(self), self.config['sasl_plain_username'])
214229
self.state = ConnectionStates.CONNECTED
215230
self.config['state_change_callback'](self)
216231

@@ -273,6 +288,90 @@ def _try_handshake(self):
273288

274289
return False
275290

291+
def _try_authenticate(self):
292+
assert self.config['security_protocol'] in ('SASL_PLAINTEXT', 'SASL_SSL')
293+
294+
if self.config['security_protocol'] == 'SASL_PLAINTEXT':
295+
log.warning('%s: Sending username and password in the clear', str(self))
296+
297+
# Build a SaslHandShakeRequest message
298+
correlation_id = self._next_correlation_id()
299+
request = SaslHandShakeRequest[0](self.config['sasl_mechanism'])
300+
header = RequestHeader(request,
301+
correlation_id=correlation_id,
302+
client_id=self.config['client_id'])
303+
304+
message = b''.join([header.encode(), request.encode()])
305+
size = Int32.encode(len(message))
306+
307+
# Attempt to send it over our socket
308+
try:
309+
self._sock.setblocking(True)
310+
self._sock.sendall(size + message)
311+
self._sock.setblocking(False)
312+
except (AssertionError, ConnectionError) as e:
313+
log.exception("Error sending %s to %s", request, self)
314+
error = Errors.ConnectionError("%s: %s" % (str(self), e))
315+
self.close(error=error)
316+
return False
317+
318+
future = Future()
319+
ifr = InFlightRequest(request=request,
320+
correlation_id=correlation_id,
321+
response_type=request.RESPONSE_TYPE,
322+
future=future,
323+
timestamp=time.time())
324+
self.in_flight_requests.append(ifr)
325+
326+
# Listen for a reply and check that the server supports the PLAIN mechanism
327+
response = None
328+
while not response:
329+
response = self.recv()
330+
331+
if not response.error_code is 0:
332+
raise Errors.for_code(response.error_code)
333+
334+
if not self.config['sasl_mechanism'] in response.enabled_mechanisms:
335+
raise Errors.AuthenticationMethodNotSupported(self.config['sasl_mechanism'] + " is not supported by broker")
336+
337+
return self._try_authenticate_plain()
338+
339+
def _try_authenticate_plain(self):
340+
data = b''
341+
try:
342+
self._sock.setblocking(True)
343+
# Send our credentials
344+
msg = bytes('\0'.join([self.config['sasl_plain_username'],
345+
self.config['sasl_plain_username'],
346+
self.config['sasl_plain_password']]).encode('utf-8'))
347+
size = Int32.encode(len(msg))
348+
self._sock.sendall(size + msg)
349+
350+
# The server will send a zero sized message (that is Int32(0)) on success.
351+
# The connection is closed on failure
352+
received_bytes = 0
353+
while received_bytes < 4:
354+
data = data + self._sock.recv(4 - received_bytes)
355+
received_bytes = received_bytes + len(data)
356+
if not data:
357+
log.error('%s: Authentication failed for user %s', self, self.config['sasl_plain_username'])
358+
self.close(error=Errors.ConnectionError('Authentication failed'))
359+
raise Errors.AuthenticationFailedError('Authentication failed for user {}'.format(self.config['sasl_plain_username']))
360+
self._sock.setblocking(False)
361+
except (AssertionError, ConnectionError) as e:
362+
log.exception("%s: Error receiving reply from server", self)
363+
error = Errors.ConnectionError("%s: %s" % (str(self), e))
364+
self.close(error=error)
365+
return False
366+
367+
with io.BytesIO() as buffer:
368+
buffer.write(data)
369+
buffer.seek(0)
370+
if not Int32.decode(buffer) == 0:
371+
raise Errors.KafkaError('Expected a zero sized reply after sending credentials')
372+
373+
return True
374+
276375
def blacked_out(self):
277376
"""
278377
Return true if we are disconnected from the given node and can't
@@ -292,7 +391,8 @@ def connecting(self):
292391
"""Returns True if still connecting (this may encompass several
293392
different states, such as SSL handshake, authorization, etc)."""
294393
return self.state in (ConnectionStates.CONNECTING,
295-
ConnectionStates.HANDSHAKE)
394+
ConnectionStates.HANDSHAKE,
395+
ConnectionStates.AUTHENTICATING)
296396

297397
def disconnected(self):
298398
"""Return True iff socket is closed"""
@@ -385,7 +485,7 @@ def recv(self):
385485
Return response if available
386486
"""
387487
assert not self._processing, 'Recursion not supported'
388-
if not self.connected():
488+
if not self.connected() and not self.state is ConnectionStates.AUTHENTICATING:
389489
log.warning('%s cannot recv: socket not connected', self)
390490
# If requests are pending, we should close the socket and
391491
# fail all the pending request futures

kafka/consumer/group.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,13 @@ class KafkaConsumer(six.Iterator):
186186
(such as offsets) should be exposed to the consumer. If set to True
187187
the only way to receive records from an internal topic is
188188
subscribing to it. Requires 0.10+ Default: True
189+
sasl_mechanism (str): string picking sasl mechanism when security_protocol
190+
is SASL_PLAINTEXT or SASL_SSL. Currently only PLAIN is supported.
191+
Default: None
192+
sasl_plain_username (str): username for sasl PLAIN authentication.
193+
Default: None
194+
sasl_plain_password (str): passowrd for sasl PLAIN authentication.
195+
Defualt: None
189196
190197
Note:
191198
Configuration parameters are described in more detail at
@@ -234,6 +241,9 @@ class KafkaConsumer(six.Iterator):
234241
'metrics_sample_window_ms': 30000,
235242
'selector': selectors.DefaultSelector,
236243
'exclude_internal_topics': True,
244+
'sasl_mechanism': None,
245+
'sasl_plain_username': None,
246+
'sasl_plain_password': None,
237247
}
238248

239249
def __init__(self, *topics, **configs):

kafka/errors.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,14 @@ class CommitFailedError(KafkaError):
5858
pass
5959

6060

61+
class AuthenticationMethodNotSupported(KafkaError):
62+
pass
63+
64+
65+
class AuthenticationFailedError(KafkaError):
66+
retriable = False
67+
68+
6169
class BrokerResponseError(KafkaError):
6270
errno = None
6371
message = None
@@ -328,6 +336,18 @@ class InvalidTimestampError(BrokerResponseError):
328336
description = ('The timestamp of the message is out of acceptable range.')
329337

330338

339+
class UnsupportedSaslMechanismError(BrokerResponseError):
340+
errno = 33
341+
message = 'UNSUPPORTED_SASL_MECHANISM'
342+
description = ('The broker does not support the requested SASL mechanism.')
343+
344+
345+
class IllegalSaslStateError(BrokerResponseError):
346+
errno = 34
347+
message = 'ILLEGAL_SASL_STATE'
348+
description = ('Request is not valid given the current SASL state.')
349+
350+
331351
class KafkaUnavailableError(KafkaError):
332352
pass
333353

kafka/producer/kafka.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,8 @@ class KafkaProducer(object):
199199
to kafka brokers up to this number of maximum requests per
200200
broker connection. Default: 5.
201201
security_protocol (str): Protocol used to communicate with brokers.
202-
Valid values are: PLAINTEXT, SSL. Default: PLAINTEXT.
202+
Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL.
203+
Default: PLAINTEXT.
203204
ssl_context (ssl.SSLContext): pre-configured SSLContext for wrapping
204205
socket connections. If provided, all other ssl_* configurations
205206
will be ignored. Default: None.
@@ -235,6 +236,13 @@ class KafkaProducer(object):
235236
selector (selectors.BaseSelector): Provide a specific selector
236237
implementation to use for I/O multiplexing.
237238
Default: selectors.DefaultSelector
239+
sasl_mechanism (str): string picking sasl mechanism when security_protocol
240+
is SASL_PLAINTEXT or SASL_SSL. Currently only PLAIN is supported.
241+
Default: None
242+
sasl_plain_username (str): username for sasl PLAIN authentication.
243+
Default: None
244+
sasl_plain_password (str): passowrd for sasl PLAIN authentication.
245+
Defualt: None
238246
239247
Note:
240248
Configuration parameters are described in more detail at
@@ -276,6 +284,9 @@ class KafkaProducer(object):
276284
'metrics_num_samples': 2,
277285
'metrics_sample_window_ms': 30000,
278286
'selector': selectors.DefaultSelector,
287+
'sasl_mechanism': None,
288+
'sasl_plain_username': None,
289+
'sasl_plain_password': None,
279290
}
280291

281292
def __init__(self, **configs):

kafka/protocol/admin.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,3 +78,24 @@ class DescribeGroupsRequest_v0(Struct):
7878

7979
DescribeGroupsRequest = [DescribeGroupsRequest_v0]
8080
DescribeGroupsResponse = [DescribeGroupsResponse_v0]
81+
82+
83+
class SaslHandShakeResponse_v0(Struct):
84+
API_KEY = 17
85+
API_VERSION = 0
86+
SCHEMA = Schema(
87+
('error_code', Int16),
88+
('enabled_mechanisms', Array(String('utf-8')))
89+
)
90+
91+
92+
class SaslHandShakeRequest_v0(Struct):
93+
API_KEY = 17
94+
API_VERSION = 0
95+
RESPONSE_TYPE = SaslHandShakeResponse_v0
96+
SCHEMA = Schema(
97+
('mechanism', String('utf-8'))
98+
)
99+
100+
SaslHandShakeRequest = [SaslHandShakeRequest_v0]
101+
SaslHandShakeResponse = [SaslHandShakeResponse_v0]

0 commit comments

Comments
 (0)