Skip to content

Commit c34d138

Browse files
committed
Add initial producer-sender metrics
1 parent 20f4c95 commit c34d138

File tree

5 files changed

+261
-18
lines changed

5 files changed

+261
-18
lines changed

kafka/producer/buffer.py

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from __future__ import absolute_import
1+
from __future__ import absolute_import, division
22

33
import collections
44
import io
@@ -55,13 +55,17 @@ def __init__(self, buf, batch_size, compression_type=None, message_version=0):
5555
self._batch_size = batch_size
5656
self._closed = False
5757
self._messages = 0
58+
self._bytes_written = 4 # Int32 header is 4 bytes
59+
self._final_size = None
5860

5961
def append(self, offset, message):
6062
"""Apend a Message to the MessageSet.
6163
6264
Arguments:
6365
offset (int): offset of the message
6466
message (Message or bytes): message struct or encoded bytes
67+
68+
Returns: bytes written
6569
"""
6670
if isinstance(message, Message):
6771
encoded = message.encode()
@@ -70,6 +74,8 @@ def append(self, offset, message):
7074
msg = Int64.encode(offset) + Int32.encode(len(encoded)) + encoded
7175
self._buffer.write(msg)
7276
self._messages += 1
77+
self._bytes_written += len(msg)
78+
return len(msg)
7379

7480
def has_room_for(self, key, value):
7581
if self._closed:
@@ -107,16 +113,20 @@ def close(self):
107113
self._buffer.write(Int32.encode(len(encoded)))
108114
self._buffer.write(encoded)
109115

110-
# Update the message set size, and return ready for full read()
111-
size = self._buffer.tell() - 4
116+
# Update the message set size (less the 4 byte header),
117+
# and return with buffer ready for full read()
118+
self._final_size = self._buffer.tell()
112119
self._buffer.seek(0)
113-
self._buffer.write(Int32.encode(size))
120+
self._buffer.write(Int32.encode(self._final_size - 4))
114121

115122
self._buffer.seek(0)
116123
self._closed = True
117124

118125
def size_in_bytes(self):
119-
return self._buffer.tell()
126+
return self._final_size or self._buffer.tell()
127+
128+
def compression_rate(self):
129+
return self.size_in_bytes() / self._bytes_written
120130

121131
def buffer(self):
122132
return self._buffer

kafka/producer/kafka.py

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
from .. import errors as Errors
1111
from ..client_async import KafkaClient
12+
from ..metrics import MetricConfig, Metrics
1213
from ..partitioner.default import DefaultPartitioner
1314
from ..protocol.message import Message, MessageSet
1415
from ..structs import TopicPartition
@@ -220,6 +221,13 @@ class KafkaProducer(object):
220221
api_version_auto_timeout_ms (int): number of milliseconds to throw a
221222
timeout exception from the constructor when checking the broker
222223
api version. Only applies if api_version set to 'auto'
224+
metric_reporters (list): A list of classes to use as metrics reporters.
225+
Implementing the AbstractMetricsReporter interface allows plugging
226+
in classes that will be notified of new metric creation. Default: []
227+
metrics_num_samples (int): The number of samples maintained to compute
228+
metrics. Default: 2
229+
metrics_sample_window_ms (int): The maximum age in milliseconds of
230+
samples used to compute metrics. Default: 30000
223231
224232
Note:
225233
Configuration parameters are described in more detail at
@@ -255,7 +263,10 @@ class KafkaProducer(object):
255263
'ssl_keyfile': None,
256264
'ssl_crlfile': None,
257265
'api_version': None,
258-
'api_version_auto_timeout_ms': 2000
266+
'api_version_auto_timeout_ms': 2000,
267+
'metric_reporters': [],
268+
'metrics_num_samples': 2,
269+
'metrics_sample_window_ms': 30000,
259270
}
260271

261272
def __init__(self, **configs):
@@ -285,6 +296,14 @@ def __init__(self, **configs):
285296
log.warning('use api_version=%s (%s is deprecated)',
286297
str(self.config['api_version']), deprecated)
287298

299+
# Configure metrics
300+
metrics_tags = {'client-id': self.config['client_id']}
301+
metric_config = MetricConfig(samples=self.config['metrics_num_samples'],
302+
time_window_ms=self.config['metrics_sample_window_ms'],
303+
tags=metrics_tags)
304+
reporters = [reporter() for reporter in self.config['metric_reporters']]
305+
self._metrics = Metrics(metric_config, reporters)
306+
288307
client = KafkaClient(**self.config)
289308

290309
# Get auto-discovered version from client if necessary
@@ -298,7 +317,8 @@ def __init__(self, **configs):
298317
self._accumulator = RecordAccumulator(message_version=message_version, **self.config)
299318
self._metadata = client.cluster
300319
guarantee_message_order = bool(self.config['max_in_flight_requests_per_connection'] == 1)
301-
self._sender = Sender(client, self._metadata, self._accumulator,
320+
self._sender = Sender(client, self._metadata,
321+
self._accumulator, self._metrics,
302322
guarantee_message_order=guarantee_message_order,
303323
**self.config)
304324
self._sender.daemon = True
@@ -382,6 +402,7 @@ def close(self, timeout=None):
382402
if not invoked_from_callback:
383403
self._sender.join()
384404

405+
self._metrics.close()
385406
try:
386407
self.config['key_serializer'].close()
387408
except AttributeError:

kafka/producer/record_accumulator.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ def get(self):
3838
class RecordBatch(object):
3939
def __init__(self, tp, records, message_version=0):
4040
self.record_count = 0
41-
#self.max_record_size = 0 # for metrics only
41+
self.max_record_size = 0
4242
now = time.time()
4343
self.created = now
4444
self.drained = None
@@ -56,8 +56,8 @@ def try_append(self, timestamp_ms, key, value):
5656
return None
5757

5858
msg = Message(value, key=key, magic=self.message_version)
59-
self.records.append(self.record_count, msg)
60-
# self.max_record_size = max(self.max_record_size, Record.record_size(key, value)) # for metrics only
59+
record_size = self.records.append(self.record_count, msg)
60+
self.max_record_size = max(self.max_record_size, record_size)
6161
self.last_append = time.time()
6262
future = FutureRecordMetadata(self.produce_future, self.record_count,
6363
timestamp_ms)

0 commit comments

Comments
 (0)