Skip to content

Commit ee1c4a4

Browse files
swenzeldpkp
authored andcommitted
Enable SCRAM-SHA-256 and SCRAM-SHA-512 for sasl (dpkp#1918)
1 parent 31f846c commit ee1c4a4

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

57 files changed

+619
-136
lines changed

.travis.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ addons:
2525
cache:
2626
directories:
2727
- $HOME/.cache/pip
28-
- servers/
28+
- servers/dist
2929

3030
before_install:
3131
- source travis_java_install.sh

kafka/admin/client.py

+5-5
Original file line numberDiff line numberDiff line change
@@ -131,11 +131,11 @@ class KafkaAdminClient(object):
131131
metric_group_prefix (str): Prefix for metric names. Default: ''
132132
sasl_mechanism (str): Authentication mechanism when security_protocol
133133
is configured for SASL_PLAINTEXT or SASL_SSL. Valid values are:
134-
PLAIN, GSSAPI, OAUTHBEARER.
135-
sasl_plain_username (str): username for sasl PLAIN authentication.
136-
Required if sasl_mechanism is PLAIN.
137-
sasl_plain_password (str): password for sasl PLAIN authentication.
138-
Required if sasl_mechanism is PLAIN.
134+
PLAIN, GSSAPI, OAUTHBEARER, SCRAM-SHA-256, SCRAM-SHA-512.
135+
sasl_plain_username (str): username for sasl PLAIN and SCRAM authentication.
136+
Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms.
137+
sasl_plain_password (str): password for sasl PLAIN and SCRAM authentication.
138+
Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms.
139139
sasl_kerberos_service_name (str): Service name to include in GSSAPI
140140
sasl mechanism handshake. Default: 'kafka'
141141
sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI

kafka/client_async.py

+6-9
Original file line numberDiff line numberDiff line change
@@ -144,11 +144,11 @@ class KafkaClient(object):
144144
metric_group_prefix (str): Prefix for metric names. Default: ''
145145
sasl_mechanism (str): Authentication mechanism when security_protocol
146146
is configured for SASL_PLAINTEXT or SASL_SSL. Valid values are:
147-
PLAIN, GSSAPI, OAUTHBEARER.
148-
sasl_plain_username (str): username for sasl PLAIN authentication.
149-
Required if sasl_mechanism is PLAIN.
150-
sasl_plain_password (str): password for sasl PLAIN authentication.
151-
Required if sasl_mechanism is PLAIN.
147+
PLAIN, GSSAPI, OAUTHBEARER, SCRAM-SHA-256, SCRAM-SHA-512.
148+
sasl_plain_username (str): username for sasl PLAIN and SCRAM authentication.
149+
Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms.
150+
sasl_plain_password (str): password for sasl PLAIN and SCRAM authentication.
151+
Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms.
152152
sasl_kerberos_service_name (str): Service name to include in GSSAPI
153153
sasl mechanism handshake. Default: 'kafka'
154154
sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI
@@ -768,10 +768,7 @@ def least_loaded_node(self):
768768
inflight = curr_inflight
769769
found = node_id
770770

771-
if found is not None:
772-
return found
773-
774-
return None
771+
return found
775772

776773
def set_topics(self, topics):
777774
"""Set specific topics to track for metadata.

kafka/conn.py

+136-11
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,16 @@
11
from __future__ import absolute_import, division
22

3-
import collections
3+
import base64
44
import copy
55
import errno
6+
import hashlib
7+
import hmac
68
import io
79
import logging
810
from random import shuffle, uniform
911

12+
from uuid import uuid4
13+
1014
# selectors in stdlib as of py3.4
1115
try:
1216
import selectors # pylint: disable=import-error
@@ -16,7 +20,6 @@
1620

1721
import socket
1822
import struct
19-
import sys
2023
import threading
2124
import time
2225

@@ -39,6 +42,12 @@
3942
TimeoutError = socket.error
4043
BlockingIOError = Exception
4144

45+
def xor_bytes(left, right):
46+
return bytearray(ord(lb) ^ ord(rb) for lb, rb in zip(left, right))
47+
else:
48+
def xor_bytes(left, right):
49+
return bytes(lb ^ rb for lb, rb in zip(left, right))
50+
4251
log = logging.getLogger(__name__)
4352

4453
DEFAULT_KAFKA_PORT = 9092
@@ -98,6 +107,69 @@ class ConnectionStates(object):
98107
AUTHENTICATING = '<authenticating>'
99108

100109

110+
class ScramClient:
111+
MECHANISMS = {
112+
'SCRAM-SHA-256': hashlib.sha256,
113+
'SCRAM-SHA-512': hashlib.sha512
114+
}
115+
116+
def __init__(self, user, password, mechanism):
117+
self.nonce = str(uuid4()).replace('-', '')
118+
self.auth_message = ''
119+
self.salted_password = None
120+
self.user = user
121+
self.password = password.encode()
122+
self.hashfunc = self.MECHANISMS[mechanism]
123+
self.hashname = ''.join(mechanism.lower().split('-')[1:3])
124+
self.stored_key = None
125+
self.client_key = None
126+
self.client_signature = None
127+
self.client_proof = None
128+
self.server_key = None
129+
self.server_signature = None
130+
131+
def first_message(self):
132+
client_first_bare = 'n={},r={}'.format(self.user, self.nonce)
133+
self.auth_message += client_first_bare
134+
return 'n,,' + client_first_bare
135+
136+
def process_server_first_message(self, server_first_message):
137+
self.auth_message += ',' + server_first_message
138+
params = dict(pair.split('=', 1) for pair in server_first_message.split(','))
139+
server_nonce = params['r']
140+
if not server_nonce.startswith(self.nonce):
141+
raise ValueError("Server nonce, did not start with client nonce!")
142+
self.nonce = server_nonce
143+
self.auth_message += ',c=biws,r=' + self.nonce
144+
145+
salt = base64.b64decode(params['s'].encode())
146+
iterations = int(params['i'])
147+
self.create_salted_password(salt, iterations)
148+
149+
self.client_key = self.hmac(self.salted_password, b'Client Key')
150+
self.stored_key = self.hashfunc(self.client_key).digest()
151+
self.client_signature = self.hmac(self.stored_key, self.auth_message.encode())
152+
self.client_proof = xor_bytes(self.client_key, self.client_signature)
153+
self.server_key = self.hmac(self.salted_password, b'Server Key')
154+
self.server_signature = self.hmac(self.server_key, self.auth_message.encode())
155+
156+
def hmac(self, key, msg):
157+
return hmac.new(key, msg, digestmod=self.hashfunc).digest()
158+
159+
def create_salted_password(self, salt, iterations):
160+
self.salted_password = hashlib.pbkdf2_hmac(
161+
self.hashname, self.password, salt, iterations
162+
)
163+
164+
def final_message(self):
165+
client_final_no_proof = 'c=biws,r=' + self.nonce
166+
return 'c=biws,r={},p={}'.format(self.nonce, base64.b64encode(self.client_proof).decode())
167+
168+
def process_server_final_message(self, server_final_message):
169+
params = dict(pair.split('=', 1) for pair in server_final_message.split(','))
170+
if self.server_signature != base64.b64decode(params['v'].encode()):
171+
raise ValueError("Server sent wrong signature!")
172+
101173
class BrokerConnection(object):
102174
"""Initialize a Kafka broker connection
103175
@@ -178,11 +250,11 @@ class BrokerConnection(object):
178250
metric_group_prefix (str): Prefix for metric names. Default: ''
179251
sasl_mechanism (str): Authentication mechanism when security_protocol
180252
is configured for SASL_PLAINTEXT or SASL_SSL. Valid values are:
181-
PLAIN, GSSAPI, OAUTHBEARER.
182-
sasl_plain_username (str): username for sasl PLAIN authentication.
183-
Required if sasl_mechanism is PLAIN.
184-
sasl_plain_password (str): password for sasl PLAIN authentication.
185-
Required if sasl_mechanism is PLAIN.
253+
PLAIN, GSSAPI, OAUTHBEARER, SCRAM-SHA-256, SCRAM-SHA-512.
254+
sasl_plain_username (str): username for sasl PLAIN and SCRAM authentication.
255+
Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms.
256+
sasl_plain_password (str): password for sasl PLAIN and SCRAM authentication.
257+
Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms.
186258
sasl_kerberos_service_name (str): Service name to include in GSSAPI
187259
sasl mechanism handshake. Default: 'kafka'
188260
sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI
@@ -225,7 +297,7 @@ class BrokerConnection(object):
225297
'sasl_oauth_token_provider': None
226298
}
227299
SECURITY_PROTOCOLS = ('PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL')
228-
SASL_MECHANISMS = ('PLAIN', 'GSSAPI', 'OAUTHBEARER')
300+
SASL_MECHANISMS = ('PLAIN', 'GSSAPI', 'OAUTHBEARER', "SCRAM-SHA-256", "SCRAM-SHA-512")
229301

230302
def __init__(self, host, port, afi, **configs):
231303
self.host = host
@@ -260,9 +332,13 @@ def __init__(self, host, port, afi, **configs):
260332
if self.config['security_protocol'] in ('SASL_PLAINTEXT', 'SASL_SSL'):
261333
assert self.config['sasl_mechanism'] in self.SASL_MECHANISMS, (
262334
'sasl_mechanism must be in ' + ', '.join(self.SASL_MECHANISMS))
263-
if self.config['sasl_mechanism'] == 'PLAIN':
264-
assert self.config['sasl_plain_username'] is not None, 'sasl_plain_username required for PLAIN sasl'
265-
assert self.config['sasl_plain_password'] is not None, 'sasl_plain_password required for PLAIN sasl'
335+
if self.config['sasl_mechanism'] in ('PLAIN', 'SCRAM-SHA-256', 'SCRAM-SHA-512'):
336+
assert self.config['sasl_plain_username'] is not None, (
337+
'sasl_plain_username required for PLAIN or SCRAM sasl'
338+
)
339+
assert self.config['sasl_plain_password'] is not None, (
340+
'sasl_plain_password required for PLAIN or SCRAM sasl'
341+
)
266342
if self.config['sasl_mechanism'] == 'GSSAPI':
267343
assert gssapi is not None, 'GSSAPI lib not available'
268344
assert self.config['sasl_kerberos_service_name'] is not None, 'sasl_kerberos_service_name required for GSSAPI sasl'
@@ -553,6 +629,8 @@ def _handle_sasl_handshake_response(self, future, response):
553629
return self._try_authenticate_gssapi(future)
554630
elif self.config['sasl_mechanism'] == 'OAUTHBEARER':
555631
return self._try_authenticate_oauth(future)
632+
elif self.config['sasl_mechanism'].startswith("SCRAM-SHA-"):
633+
return self._try_authenticate_scram(future)
556634
else:
557635
return future.failure(
558636
Errors.UnsupportedSaslMechanismError(
@@ -653,6 +731,53 @@ def _try_authenticate_plain(self, future):
653731
log.info('%s: Authenticated as %s via PLAIN', self, self.config['sasl_plain_username'])
654732
return future.success(True)
655733

734+
def _try_authenticate_scram(self, future):
735+
if self.config['security_protocol'] == 'SASL_PLAINTEXT':
736+
log.warning('%s: Exchanging credentials in the clear', self)
737+
738+
scram_client = ScramClient(
739+
self.config['sasl_plain_username'], self.config['sasl_plain_password'], self.config['sasl_mechanism']
740+
)
741+
742+
err = None
743+
close = False
744+
with self._lock:
745+
if not self._can_send_recv():
746+
err = Errors.NodeNotReadyError(str(self))
747+
close = False
748+
else:
749+
try:
750+
client_first = scram_client.first_message().encode()
751+
size = Int32.encode(len(client_first))
752+
self._send_bytes_blocking(size + client_first)
753+
754+
(data_len,) = struct.unpack('>i', self._recv_bytes_blocking(4))
755+
server_first = self._recv_bytes_blocking(data_len).decode()
756+
scram_client.process_server_first_message(server_first)
757+
758+
client_final = scram_client.final_message().encode()
759+
size = Int32.encode(len(client_final))
760+
self._send_bytes_blocking(size + client_final)
761+
762+
(data_len,) = struct.unpack('>i', self._recv_bytes_blocking(4))
763+
server_final = self._recv_bytes_blocking(data_len).decode()
764+
scram_client.process_server_final_message(server_final)
765+
766+
except (ConnectionError, TimeoutError) as e:
767+
log.exception("%s: Error receiving reply from server", self)
768+
err = Errors.KafkaConnectionError("%s: %s" % (self, e))
769+
close = True
770+
771+
if err is not None:
772+
if close:
773+
self.close(error=err)
774+
return future.failure(err)
775+
776+
log.info(
777+
'%s: Authenticated as %s via %s', self, self.config['sasl_plain_username'], self.config['sasl_mechanism']
778+
)
779+
return future.success(True)
780+
656781
def _try_authenticate_gssapi(self, future):
657782
kerberos_damin_name = self.config['sasl_kerberos_domain_name'] or self.host
658783
auth_id = self.config['sasl_kerberos_service_name'] + '@' + kerberos_damin_name

kafka/consumer/group.py

+5-5
Original file line numberDiff line numberDiff line change
@@ -232,11 +232,11 @@ class KafkaConsumer(six.Iterator):
232232
subscribing to it. Requires 0.10+ Default: True
233233
sasl_mechanism (str): Authentication mechanism when security_protocol
234234
is configured for SASL_PLAINTEXT or SASL_SSL. Valid values are:
235-
PLAIN, GSSAPI, OAUTHBEARER.
236-
sasl_plain_username (str): Username for sasl PLAIN authentication.
237-
Required if sasl_mechanism is PLAIN.
238-
sasl_plain_password (str): Password for sasl PLAIN authentication.
239-
Required if sasl_mechanism is PLAIN.
235+
PLAIN, GSSAPI, OAUTHBEARER, SCRAM-SHA-256, SCRAM-SHA-512.
236+
sasl_plain_username (str): username for sasl PLAIN and SCRAM authentication.
237+
Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms.
238+
sasl_plain_password (str): password for sasl PLAIN and SCRAM authentication.
239+
Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms.
240240
sasl_kerberos_service_name (str): Service name to include in GSSAPI
241241
sasl mechanism handshake. Default: 'kafka'
242242
sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI

kafka/producer/kafka.py

+5-5
Original file line numberDiff line numberDiff line change
@@ -269,11 +269,11 @@ class KafkaProducer(object):
269269
Default: selectors.DefaultSelector
270270
sasl_mechanism (str): Authentication mechanism when security_protocol
271271
is configured for SASL_PLAINTEXT or SASL_SSL. Valid values are:
272-
PLAIN, GSSAPI, OAUTHBEARER.
273-
sasl_plain_username (str): username for sasl PLAIN authentication.
274-
Required if sasl_mechanism is PLAIN.
275-
sasl_plain_password (str): password for sasl PLAIN authentication.
276-
Required if sasl_mechanism is PLAIN.
272+
PLAIN, GSSAPI, OAUTHBEARER, SCRAM-SHA-256, SCRAM-SHA-512.
273+
sasl_plain_username (str): username for sasl PLAIN and SCRAM authentication.
274+
Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms.
275+
sasl_plain_password (str): password for sasl PLAIN and SCRAM authentication.
276+
Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms.
277277
sasl_kerberos_service_name (str): Service name to include in GSSAPI
278278
sasl mechanism handshake. Default: 'kafka'
279279
sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI

requirements-dev.txt

+1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ lz4==2.1.2
88
xxhash==1.3.0
99
python-snappy==0.5.3
1010
tox==3.5.3
11+
mock==3.0.5
1112
pylint==1.9.3
1213
pytest-pylint==0.12.3
1314
pytest-mock==1.10.0

servers/0.10.0.0/resources/kafka.properties

+3-1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ broker.id={broker_id}
2424
listeners={transport}://{host}:{port}
2525
security.inter.broker.protocol={transport}
2626

27+
{sasl_config}
28+
2729
ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks
2830
ssl.keystore.password=foobar
2931
ssl.key.password=foobar
@@ -121,7 +123,7 @@ log.cleaner.enable=false
121123
# tune down offset topics to reduce setup time in tests
122124
offsets.commit.timeout.ms=500
123125
offsets.topic.num.partitions=2
124-
offsets.topic.replication.factor=2
126+
offsets.topic.replication.factor=1
125127

126128
# Allow shorter session timeouts for tests
127129
group.min.session.timeout.ms=1000
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
KafkaServer {{
2+
{jaas_config}
3+
}};
4+
Client {{}};

servers/0.10.0.1/resources/kafka.properties

+3-1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ broker.id={broker_id}
2424
listeners={transport}://{host}:{port}
2525
security.inter.broker.protocol={transport}
2626

27+
{sasl_config}
28+
2729
ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks
2830
ssl.keystore.password=foobar
2931
ssl.key.password=foobar
@@ -121,7 +123,7 @@ log.cleaner.enable=false
121123
# tune down offset topics to reduce setup time in tests
122124
offsets.commit.timeout.ms=500
123125
offsets.topic.num.partitions=2
124-
offsets.topic.replication.factor=2
126+
offsets.topic.replication.factor=1
125127

126128
# Allow shorter session timeouts for tests
127129
group.min.session.timeout.ms=1000
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
KafkaServer {{
2+
{jaas_config}
3+
}};
4+
Client {{}};

servers/0.10.1.1/resources/kafka.properties

+3-1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ broker.id={broker_id}
2424
listeners={transport}://{host}:{port}
2525
security.inter.broker.protocol={transport}
2626

27+
{sasl_config}
28+
2729
ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks
2830
ssl.keystore.password=foobar
2931
ssl.key.password=foobar
@@ -121,7 +123,7 @@ log.cleaner.enable=false
121123
# tune down offset topics to reduce setup time in tests
122124
offsets.commit.timeout.ms=500
123125
offsets.topic.num.partitions=2
124-
offsets.topic.replication.factor=2
126+
offsets.topic.replication.factor=1
125127

126128
# Allow shorter session timeouts for tests
127129
group.min.session.timeout.ms=1000
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
KafkaServer {{
2+
{jaas_config}
3+
}};
4+
Client {{}};

0 commit comments

Comments
 (0)