from __future__ import absolute_import import abc import logging from kafka.sasl.abc import SaslMechanism log = logging.getLogger(__name__) class SaslMechanismOAuth(SaslMechanism): def __init__(self, **config): assert 'sasl_oauth_token_provider' in config, 'sasl_oauth_token_provider required for OAUTHBEARER sasl' assert isinstance(config['sasl_oauth_token_provider'], AbstractTokenProvider), \ 'sasl_oauth_token_provider must implement kafka.sasl.oauth.AbstractTokenProvider' self.token_provider = config['sasl_oauth_token_provider'] self._error = None self._is_done = False self._is_authenticated = False def auth_bytes(self): if self._error: # Server should respond to this with SaslAuthenticate failure, which ends the auth process return self._error token = self.token_provider.token() extensions = self._token_extensions() return "n,,\x01auth=Bearer {}{}\x01\x01".format(token, extensions).encode('utf-8') def receive(self, auth_bytes): if auth_bytes != b'': error = auth_bytes.decode('utf-8') log.debug("Sending x01 response to server after receiving SASL OAuth error: %s", error) self._error = b'\x01' else: self._is_done = True self._is_authenticated = True def is_done(self): return self._is_done def is_authenticated(self): return self._is_authenticated def _token_extensions(self): """ Return a string representation of the OPTIONAL key-value pairs that can be sent with an OAUTHBEARER initial request. """ # Builds up a string separated by \x01 via a dict of key value pairs extensions = self.token_provider.extensions() msg = '\x01'.join(['{}={}'.format(k, v) for k, v in extensions.items()]) return '\x01' + msg if msg else '' def auth_details(self): if not self.is_authenticated: raise RuntimeError('Not authenticated yet!') return 'Authenticated via SASL / OAuth' # This statement is compatible with both Python 2.7 & 3+ ABC = abc.ABCMeta('ABC', (object,), {'__slots__': ()}) class AbstractTokenProvider(ABC): """ A Token Provider must be used for the SASL OAuthBearer protocol. The implementation should ensure token reuse so that multiple calls at connect time do not create multiple tokens. The implementation should also periodically refresh the token in order to guarantee that each call returns an unexpired token. A timeout error should be returned after a short period of inactivity so that the broker can log debugging info and retry. Token Providers MUST implement the token() method """ def __init__(self, **config): pass @abc.abstractmethod def token(self): """ Returns a (str) ID/Access Token to be sent to the Kafka client. """ pass def extensions(self): """ This is an OPTIONAL method that may be implemented. Returns a map of key-value pairs that can be sent with the SASL/OAUTHBEARER initial client request. If not implemented, the values are ignored. This feature is only available in Kafka >= 2.1.0. All returned keys and values should be type str """ return {}