forked from dpkp/kafka-python
-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy path__init__.py
59 lines (46 loc) · 1.78 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import logging
from kafka.sasl import gssapi, oauthbearer, plain, scram
MECHANISMS = {
'GSSAPI': gssapi,
'OAUTHBEARER': oauthbearer,
'PLAIN': plain,
'SCRAM-SHA-256': scram,
'SCRAM-SHA-512': scram,
}
try:
from kafka.sasl import msk
MECHANISMS['AWS_MSK_IAM'] = msk
except ImportError:
pass
log = logging.getLogger(__name__)
def register_mechanism(key, module):
"""
Registers a custom SASL mechanism that can be used via sasl_mechanism={key}.
Example:
import kakfa.sasl
from kafka import KafkaProducer
from mymodule import custom_sasl
kafka.sasl.register_mechanism('CUSTOM_SASL', custom_sasl)
producer = KafkaProducer(sasl_mechanism='CUSTOM_SASL')
Arguments:
key (str): The name of the mechanism returned by the broker and used
in the sasl_mechanism config value.
module (module): A module that implements the following methods...
def validate_config(conn: BrokerConnection): -> None:
# Raises an AssertionError for missing or invalid conifg values.
def try_authenticate(conn: BrokerConncetion, future: -> Future):
# Executes authentication routine and returns a resolved Future.
Raises:
AssertionError: The registered module does not define a required method.
"""
assert callable(getattr(module, 'validate_config', None)), (
'Custom SASL mechanism {} must implement method #validate_config()'
.format(key)
)
assert callable(getattr(module, 'try_authenticate', None)), (
'Custom SASL mechanism {} must implement method #try_authenticate()'
.format(key)
)
if key in MECHANISMS:
log.warning(f'Overriding existing SASL mechanism {key}')
MECHANISMS[key] = module