|
1 | 1 | from __future__ import absolute_import, division
|
2 | 2 |
|
3 |
| -import collections |
| 3 | +import base64 |
4 | 4 | import copy
|
5 | 5 | import errno
|
| 6 | +import hashlib |
| 7 | +import hmac |
6 | 8 | import io
|
7 | 9 | import logging
|
8 | 10 | from random import shuffle, uniform
|
9 | 11 |
|
| 12 | +from uuid import uuid4 |
| 13 | + |
10 | 14 | # selectors in stdlib as of py3.4
|
11 | 15 | try:
|
12 | 16 | import selectors # pylint: disable=import-error
|
|
16 | 20 |
|
17 | 21 | import socket
|
18 | 22 | import struct
|
19 |
| -import sys |
20 | 23 | import threading
|
21 | 24 | import time
|
22 | 25 |
|
|
39 | 42 | TimeoutError = socket.error
|
40 | 43 | BlockingIOError = Exception
|
41 | 44 |
|
| 45 | + def xor_bytes(left, right): |
| 46 | + return bytearray(ord(lb) ^ ord(rb) for lb, rb in zip(left, right)) |
| 47 | +else: |
| 48 | + def xor_bytes(left, right): |
| 49 | + return bytes(lb ^ rb for lb, rb in zip(left, right)) |
| 50 | + |
42 | 51 | log = logging.getLogger(__name__)
|
43 | 52 |
|
44 | 53 | DEFAULT_KAFKA_PORT = 9092
|
@@ -98,6 +107,69 @@ class ConnectionStates(object):
|
98 | 107 | AUTHENTICATING = '<authenticating>'
|
99 | 108 |
|
100 | 109 |
|
| 110 | +class ScramClient: |
| 111 | + MECHANISMS = { |
| 112 | + 'SCRAM-SHA-256': hashlib.sha256, |
| 113 | + 'SCRAM-SHA-512': hashlib.sha512 |
| 114 | + } |
| 115 | + |
| 116 | + def __init__(self, user, password, mechanism): |
| 117 | + self.nonce = str(uuid4()).replace('-', '') |
| 118 | + self.auth_message = '' |
| 119 | + self.salted_password = None |
| 120 | + self.user = user |
| 121 | + self.password = password.encode() |
| 122 | + self.hashfunc = self.MECHANISMS[mechanism] |
| 123 | + self.hashname = ''.join(mechanism.lower().split('-')[1:3]) |
| 124 | + self.stored_key = None |
| 125 | + self.client_key = None |
| 126 | + self.client_signature = None |
| 127 | + self.client_proof = None |
| 128 | + self.server_key = None |
| 129 | + self.server_signature = None |
| 130 | + |
| 131 | + def first_message(self): |
| 132 | + client_first_bare = 'n={},r={}'.format(self.user, self.nonce) |
| 133 | + self.auth_message += client_first_bare |
| 134 | + return 'n,,' + client_first_bare |
| 135 | + |
| 136 | + def process_server_first_message(self, server_first_message): |
| 137 | + self.auth_message += ',' + server_first_message |
| 138 | + params = dict(pair.split('=', 1) for pair in server_first_message.split(',')) |
| 139 | + server_nonce = params['r'] |
| 140 | + if not server_nonce.startswith(self.nonce): |
| 141 | + raise ValueError("Server nonce, did not start with client nonce!") |
| 142 | + self.nonce = server_nonce |
| 143 | + self.auth_message += ',c=biws,r=' + self.nonce |
| 144 | + |
| 145 | + salt = base64.b64decode(params['s'].encode()) |
| 146 | + iterations = int(params['i']) |
| 147 | + self.create_salted_password(salt, iterations) |
| 148 | + |
| 149 | + self.client_key = self.hmac(self.salted_password, b'Client Key') |
| 150 | + self.stored_key = self.hashfunc(self.client_key).digest() |
| 151 | + self.client_signature = self.hmac(self.stored_key, self.auth_message.encode()) |
| 152 | + self.client_proof = xor_bytes(self.client_key, self.client_signature) |
| 153 | + self.server_key = self.hmac(self.salted_password, b'Server Key') |
| 154 | + self.server_signature = self.hmac(self.server_key, self.auth_message.encode()) |
| 155 | + |
| 156 | + def hmac(self, key, msg): |
| 157 | + return hmac.new(key, msg, digestmod=self.hashfunc).digest() |
| 158 | + |
| 159 | + def create_salted_password(self, salt, iterations): |
| 160 | + self.salted_password = hashlib.pbkdf2_hmac( |
| 161 | + self.hashname, self.password, salt, iterations |
| 162 | + ) |
| 163 | + |
| 164 | + def final_message(self): |
| 165 | + client_final_no_proof = 'c=biws,r=' + self.nonce |
| 166 | + return 'c=biws,r={},p={}'.format(self.nonce, base64.b64encode(self.client_proof).decode()) |
| 167 | + |
| 168 | + def process_server_final_message(self, server_final_message): |
| 169 | + params = dict(pair.split('=', 1) for pair in server_final_message.split(',')) |
| 170 | + if self.server_signature != base64.b64decode(params['v'].encode()): |
| 171 | + raise ValueError("Server sent wrong signature!") |
| 172 | + |
101 | 173 | class BrokerConnection(object):
|
102 | 174 | """Initialize a Kafka broker connection
|
103 | 175 |
|
@@ -178,11 +250,11 @@ class BrokerConnection(object):
|
178 | 250 | metric_group_prefix (str): Prefix for metric names. Default: ''
|
179 | 251 | sasl_mechanism (str): Authentication mechanism when security_protocol
|
180 | 252 | is configured for SASL_PLAINTEXT or SASL_SSL. Valid values are:
|
181 |
| - PLAIN, GSSAPI, OAUTHBEARER. |
182 |
| - sasl_plain_username (str): username for sasl PLAIN authentication. |
183 |
| - Required if sasl_mechanism is PLAIN. |
184 |
| - sasl_plain_password (str): password for sasl PLAIN authentication. |
185 |
| - Required if sasl_mechanism is PLAIN. |
| 253 | + PLAIN, GSSAPI, OAUTHBEARER, SCRAM-SHA-256, SCRAM-SHA-512. |
| 254 | + sasl_plain_username (str): username for sasl PLAIN and SCRAM authentication. |
| 255 | + Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms. |
| 256 | + sasl_plain_password (str): password for sasl PLAIN and SCRAM authentication. |
| 257 | + Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms. |
186 | 258 | sasl_kerberos_service_name (str): Service name to include in GSSAPI
|
187 | 259 | sasl mechanism handshake. Default: 'kafka'
|
188 | 260 | sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI
|
@@ -225,7 +297,7 @@ class BrokerConnection(object):
|
225 | 297 | 'sasl_oauth_token_provider': None
|
226 | 298 | }
|
227 | 299 | SECURITY_PROTOCOLS = ('PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL')
|
228 |
| - SASL_MECHANISMS = ('PLAIN', 'GSSAPI', 'OAUTHBEARER') |
| 300 | + SASL_MECHANISMS = ('PLAIN', 'GSSAPI', 'OAUTHBEARER', "SCRAM-SHA-256", "SCRAM-SHA-512") |
229 | 301 |
|
230 | 302 | def __init__(self, host, port, afi, **configs):
|
231 | 303 | self.host = host
|
@@ -260,9 +332,13 @@ def __init__(self, host, port, afi, **configs):
|
260 | 332 | if self.config['security_protocol'] in ('SASL_PLAINTEXT', 'SASL_SSL'):
|
261 | 333 | assert self.config['sasl_mechanism'] in self.SASL_MECHANISMS, (
|
262 | 334 | 'sasl_mechanism must be in ' + ', '.join(self.SASL_MECHANISMS))
|
263 |
| - if self.config['sasl_mechanism'] == 'PLAIN': |
264 |
| - assert self.config['sasl_plain_username'] is not None, 'sasl_plain_username required for PLAIN sasl' |
265 |
| - assert self.config['sasl_plain_password'] is not None, 'sasl_plain_password required for PLAIN sasl' |
| 335 | + if self.config['sasl_mechanism'] in ('PLAIN', 'SCRAM-SHA-256', 'SCRAM-SHA-512'): |
| 336 | + assert self.config['sasl_plain_username'] is not None, ( |
| 337 | + 'sasl_plain_username required for PLAIN or SCRAM sasl' |
| 338 | + ) |
| 339 | + assert self.config['sasl_plain_password'] is not None, ( |
| 340 | + 'sasl_plain_password required for PLAIN or SCRAM sasl' |
| 341 | + ) |
266 | 342 | if self.config['sasl_mechanism'] == 'GSSAPI':
|
267 | 343 | assert gssapi is not None, 'GSSAPI lib not available'
|
268 | 344 | assert self.config['sasl_kerberos_service_name'] is not None, 'sasl_kerberos_service_name required for GSSAPI sasl'
|
@@ -553,6 +629,8 @@ def _handle_sasl_handshake_response(self, future, response):
|
553 | 629 | return self._try_authenticate_gssapi(future)
|
554 | 630 | elif self.config['sasl_mechanism'] == 'OAUTHBEARER':
|
555 | 631 | return self._try_authenticate_oauth(future)
|
| 632 | + elif self.config['sasl_mechanism'].startswith("SCRAM-SHA-"): |
| 633 | + return self._try_authenticate_scram(future) |
556 | 634 | else:
|
557 | 635 | return future.failure(
|
558 | 636 | Errors.UnsupportedSaslMechanismError(
|
@@ -653,6 +731,53 @@ def _try_authenticate_plain(self, future):
|
653 | 731 | log.info('%s: Authenticated as %s via PLAIN', self, self.config['sasl_plain_username'])
|
654 | 732 | return future.success(True)
|
655 | 733 |
|
| 734 | + def _try_authenticate_scram(self, future): |
| 735 | + if self.config['security_protocol'] == 'SASL_PLAINTEXT': |
| 736 | + log.warning('%s: Exchanging credentials in the clear', self) |
| 737 | + |
| 738 | + scram_client = ScramClient( |
| 739 | + self.config['sasl_plain_username'], self.config['sasl_plain_password'], self.config['sasl_mechanism'] |
| 740 | + ) |
| 741 | + |
| 742 | + err = None |
| 743 | + close = False |
| 744 | + with self._lock: |
| 745 | + if not self._can_send_recv(): |
| 746 | + err = Errors.NodeNotReadyError(str(self)) |
| 747 | + close = False |
| 748 | + else: |
| 749 | + try: |
| 750 | + client_first = scram_client.first_message().encode() |
| 751 | + size = Int32.encode(len(client_first)) |
| 752 | + self._send_bytes_blocking(size + client_first) |
| 753 | + |
| 754 | + (data_len,) = struct.unpack('>i', self._recv_bytes_blocking(4)) |
| 755 | + server_first = self._recv_bytes_blocking(data_len).decode() |
| 756 | + scram_client.process_server_first_message(server_first) |
| 757 | + |
| 758 | + client_final = scram_client.final_message().encode() |
| 759 | + size = Int32.encode(len(client_final)) |
| 760 | + self._send_bytes_blocking(size + client_final) |
| 761 | + |
| 762 | + (data_len,) = struct.unpack('>i', self._recv_bytes_blocking(4)) |
| 763 | + server_final = self._recv_bytes_blocking(data_len).decode() |
| 764 | + scram_client.process_server_final_message(server_final) |
| 765 | + |
| 766 | + except (ConnectionError, TimeoutError) as e: |
| 767 | + log.exception("%s: Error receiving reply from server", self) |
| 768 | + err = Errors.KafkaConnectionError("%s: %s" % (self, e)) |
| 769 | + close = True |
| 770 | + |
| 771 | + if err is not None: |
| 772 | + if close: |
| 773 | + self.close(error=err) |
| 774 | + return future.failure(err) |
| 775 | + |
| 776 | + log.info( |
| 777 | + '%s: Authenticated as %s via %s', self, self.config['sasl_plain_username'], self.config['sasl_mechanism'] |
| 778 | + ) |
| 779 | + return future.success(True) |
| 780 | + |
656 | 781 | def _try_authenticate_gssapi(self, future):
|
657 | 782 | kerberos_damin_name = self.config['sasl_kerberos_domain_name'] or self.host
|
658 | 783 | auth_id = self.config['sasl_kerberos_service_name'] + '@' + kerberos_damin_name
|
|
0 commit comments