9
9
10
10
from .. import errors as Errors
11
11
from ..client_async import KafkaClient
12
+ from ..metrics import MetricConfig , Metrics
12
13
from ..partitioner .default import DefaultPartitioner
13
14
from ..protocol .message import Message , MessageSet
14
15
from ..structs import TopicPartition
@@ -220,6 +221,13 @@ class KafkaProducer(object):
220
221
api_version_auto_timeout_ms (int): number of milliseconds to throw a
221
222
timeout exception from the constructor when checking the broker
222
223
api version. Only applies if api_version set to 'auto'
224
+ metric_reporters (list): A list of classes to use as metrics reporters.
225
+ Implementing the AbstractMetricsReporter interface allows plugging
226
+ in classes that will be notified of new metric creation. Default: []
227
+ metrics_num_samples (int): The number of samples maintained to compute
228
+ metrics. Default: 2
229
+ metrics_sample_window_ms (int): The maximum age in milliseconds of
230
+ samples used to compute metrics. Default: 30000
223
231
224
232
Note:
225
233
Configuration parameters are described in more detail at
@@ -255,7 +263,10 @@ class KafkaProducer(object):
255
263
'ssl_keyfile' : None ,
256
264
'ssl_crlfile' : None ,
257
265
'api_version' : None ,
258
- 'api_version_auto_timeout_ms' : 2000
266
+ 'api_version_auto_timeout_ms' : 2000 ,
267
+ 'metric_reporters' : [],
268
+ 'metrics_num_samples' : 2 ,
269
+ 'metrics_sample_window_ms' : 30000 ,
259
270
}
260
271
261
272
def __init__ (self , ** configs ):
@@ -285,6 +296,14 @@ def __init__(self, **configs):
285
296
log .warning ('use api_version=%s (%s is deprecated)' ,
286
297
str (self .config ['api_version' ]), deprecated )
287
298
299
+ # Configure metrics
300
+ metrics_tags = {'client-id' : self .config ['client_id' ]}
301
+ metric_config = MetricConfig (samples = self .config ['metrics_num_samples' ],
302
+ time_window_ms = self .config ['metrics_sample_window_ms' ],
303
+ tags = metrics_tags )
304
+ reporters = [reporter () for reporter in self .config ['metric_reporters' ]]
305
+ self ._metrics = Metrics (metric_config , reporters )
306
+
288
307
client = KafkaClient (** self .config )
289
308
290
309
# Get auto-discovered version from client if necessary
@@ -298,7 +317,8 @@ def __init__(self, **configs):
298
317
self ._accumulator = RecordAccumulator (message_version = message_version , ** self .config )
299
318
self ._metadata = client .cluster
300
319
guarantee_message_order = bool (self .config ['max_in_flight_requests_per_connection' ] == 1 )
301
- self ._sender = Sender (client , self ._metadata , self ._accumulator ,
320
+ self ._sender = Sender (client , self ._metadata ,
321
+ self ._accumulator , self ._metrics ,
302
322
guarantee_message_order = guarantee_message_order ,
303
323
** self .config )
304
324
self ._sender .daemon = True
@@ -382,6 +402,7 @@ def close(self, timeout=None):
382
402
if not invoked_from_callback :
383
403
self ._sender .join ()
384
404
405
+ self ._metrics .close ()
385
406
try :
386
407
self .config ['key_serializer' ].close ()
387
408
except AttributeError :
0 commit comments