@@ -20,9 +20,10 @@ class KafkaClient(object):
20
20
CLIENT_ID = "kafka-python"
21
21
ID_GEN = count ()
22
22
23
- def __init__ (self , host , port , bufsize = 4096 ):
23
+ def __init__ (self , host , port , bufsize = 4096 , client_id = CLIENT_ID ):
24
24
# We need one connection to bootstrap
25
- self .bufsize = bufsize
25
+ self .bufsize = bufsize
26
+ self .client_id = client_id
26
27
self .conns = { # (host, port) -> KafkaConnection
27
28
(host , port ): KafkaConnection (host , port , bufsize )
28
29
}
@@ -59,7 +60,7 @@ def _load_metadata_for_topics(self, *topics):
59
60
recurse in the event of a retry.
60
61
"""
61
62
requestId = self ._next_id ()
62
- request = KafkaProtocol .encode_metadata_request (KafkaClient . CLIENT_ID ,
63
+ request = KafkaProtocol .encode_metadata_request (self . client_id ,
63
64
requestId , topics )
64
65
65
66
response = self ._send_broker_unaware_request (requestId , request )
@@ -156,7 +157,7 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
156
157
for broker , payloads in payloads_by_broker .items ():
157
158
conn = self ._get_conn_for_broker (broker )
158
159
requestId = self ._next_id ()
159
- request = encoder_fn (client_id = KafkaClient . CLIENT_ID ,
160
+ request = encoder_fn (client_id = self . client_id ,
160
161
correlation_id = requestId , payloads = payloads )
161
162
162
163
# Send the request, recv the response
0 commit comments