From 00367713ca745ed43c54c08376f3f07c4a8b2ef6 Mon Sep 17 00:00:00 2001 From: Daniel van der Ende Date: Tue, 23 May 2017 11:55:09 +0200 Subject: [PATCH] Add first parts of gssapi sasl --- kafka/conn.py | 40 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 39 insertions(+), 1 deletion(-) diff --git a/kafka/conn.py b/kafka/conn.py index 12bd08df4..edb9bd2a7 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -6,6 +6,7 @@ import logging import io from random import shuffle +import sasl import socket import time import traceback @@ -160,7 +161,7 @@ class BrokerConnection(object): 'sasl_plain_username': None, 'sasl_plain_password': None } - SASL_MECHANISMS = ('PLAIN',) + SASL_MECHANISMS = ('PLAIN', 'GSSAPI') def __init__(self, host, port, afi, **configs): self.hostname = host @@ -425,12 +426,49 @@ def _handle_sasl_handshake_response(self, future, response): if self.config['sasl_mechanism'] == 'PLAIN': return self._try_authenticate_plain(future) + elif self.config['sasl_mechanism'] == 'GSSAPI': + return self._try_authenticate_gssapi(future) else: return future.failure( Errors.UnsupportedSaslMechanismError( 'kafka-python does not support SASL mechanism %s' % self.config['sasl_mechanism'])) + def _try_authenticate_gssapi(self, future): + service = self.config['kafka_principal'] + + ret, chosen_mech, initial_response = self.sasl.start(s_mechs) + log.debug("Chosen mech: %s" % chosen_mech) + + initiate = RpcSaslProto() + initiate.state = 2 + initiate.token = initial_response + + for auth in res.auths: + if auth.mechanism == chosen_mech: + auth_method = initiate.auths.add() + auth_method.mechanism = chosen_mech + auth_method.method = auth.method + auth_method.protocol = auth.protocol + auth_method.serverId = self._trans.host + + self._send_sasl_message(initiate) + continue + + if res.state == 3: + res_token = self._evaluate_token(res) + response = RpcSaslProto() + response.token = res_token + response.state = 4 + self._send_sasl_message(response) + continue + + if res.state == 0: + return True + #negotiate = RpcSaslProto() + #negotiate.state = 1 + #self._send_sasl_message(negotiate) + def _try_authenticate_plain(self, future): if self.config['security_protocol'] == 'SASL_PLAINTEXT': log.warning('%s: Sending username and password in the clear', self)