Skip to content

Commit 795cb9b

Browse files
committed
KAFKA-3025: Message v1 -- add timetamp and use relative offset in compressed messagesets
1 parent 7f4a936 commit 795cb9b

File tree

7 files changed

+132
-50
lines changed

7 files changed

+132
-50
lines changed

kafka/consumer/fetcher.py

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020

2121
ConsumerRecord = collections.namedtuple("ConsumerRecord",
22-
["topic", "partition", "offset", "key", "value"])
22+
["topic", "partition", "offset", "timestamp", "timestamp_type", "key", "value"])
2323

2424

2525
class NoOffsetForPartitionError(Errors.KafkaError):
@@ -351,17 +351,33 @@ def fetched_records(self):
351351
position)
352352
return dict(drained)
353353

354-
def _unpack_message_set(self, tp, messages):
354+
def _unpack_message_set(self, tp, messages, relative_offset=0):
355355
try:
356356
for offset, size, msg in messages:
357357
if self.config['check_crcs'] and not msg.validate_crc():
358358
raise Errors.InvalidMessageError(msg)
359359
elif msg.is_compressed():
360-
for record in self._unpack_message_set(tp, msg.decompress()):
360+
mset = msg.decompress()
361+
# new format uses relative offsets for compressed messages
362+
if msg.magic > 0:
363+
last_offset, _, _ = mset[-1]
364+
relative = offset - last_offset
365+
else:
366+
relative = 0
367+
for record in self._unpack_message_set(tp, mset, relative):
361368
yield record
362369
else:
370+
# Message v1 adds timestamp
371+
if msg.magic > 0:
372+
timestamp = msg.timestamp
373+
timestamp_type = msg.timestamp_type
374+
else:
375+
timestamp = timestamp_type = None
363376
key, value = self._deserialize(msg)
364-
yield ConsumerRecord(tp.topic, tp.partition, offset, key, value)
377+
yield ConsumerRecord(tp.topic, tp.partition,
378+
offset + relative_offset,
379+
timestamp, timestamp_type,
380+
key, value)
365381
# If unpacking raises StopIteration, it is erroneously
366382
# caught by the generator. We want all exceptions to be raised
367383
# back to the user. See Issue 545

kafka/producer/buffer.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ class MessageSetBuffer(object):
2929
'snappy': (has_snappy, snappy_encode, Message.CODEC_SNAPPY),
3030
'lz4': (has_lz4, lz4_encode, Message.CODEC_LZ4),
3131
}
32-
def __init__(self, buf, batch_size, compression_type=None):
32+
def __init__(self, buf, batch_size, compression_type=None, message_version=0):
3333
if compression_type is not None:
3434
assert compression_type in self._COMPRESSORS, 'Unrecognized compression type'
3535
checker, encoder, attributes = self._COMPRESSORS[compression_type]
@@ -40,6 +40,7 @@ def __init__(self, buf, batch_size, compression_type=None):
4040
self._compressor = None
4141
self._compression_attributes = None
4242

43+
self._message_version = message_version
4344
self._buffer = buf
4445
# Init MessageSetSize to 0 -- update on close
4546
self._buffer.seek(0)
@@ -85,7 +86,8 @@ def close(self):
8586
# TODO: avoid copies with bytearray / memoryview
8687
self._buffer.seek(4)
8788
msg = Message(self._compressor(self._buffer.read()),
88-
attributes=self._compression_attributes)
89+
attributes=self._compression_attributes,
90+
magic=self._message_version)
8991
encoded = msg.encode()
9092
self._buffer.seek(4)
9193
self._buffer.write(Int64.encode(0)) # offset 0 for wrapper msg

kafka/producer/future.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,16 +29,21 @@ def await(self, timeout=None):
2929

3030

3131
class FutureRecordMetadata(Future):
32-
def __init__(self, produce_future, relative_offset):
32+
def __init__(self, produce_future, relative_offset, timestamp_ms):
3333
super(FutureRecordMetadata, self).__init__()
3434
self._produce_future = produce_future
3535
self.relative_offset = relative_offset
36+
self.timestamp_ms = timestamp_ms
3637
produce_future.add_callback(self._produce_success)
3738
produce_future.add_errback(self.failure)
3839

39-
def _produce_success(self, base_offset):
40+
def _produce_success(self, offset_and_timestamp):
41+
base_offset, timestamp_ms = offset_and_timestamp
42+
if timestamp_ms is None:
43+
timestamp_ms = self.timestamp_ms
4044
self.success(RecordMetadata(self._produce_future.topic_partition,
41-
base_offset, self.relative_offset))
45+
base_offset, timestamp_ms,
46+
self.relative_offset))
4247

4348
def get(self, timeout=None):
4449
if not self.is_done and not self._produce_future.await(timeout):
@@ -51,12 +56,13 @@ def get(self, timeout=None):
5156

5257

5358
class RecordMetadata(collections.namedtuple(
54-
'RecordMetadata', 'topic partition topic_partition offset')):
55-
def __new__(cls, tp, base_offset, relative_offset=None):
59+
'RecordMetadata', 'topic partition topic_partition offset timestamp')):
60+
def __new__(cls, tp, base_offset, timestamp, relative_offset=None):
5661
offset = base_offset
5762
if relative_offset is not None and base_offset != -1:
5863
offset += relative_offset
59-
return super(RecordMetadata, cls).__new__(cls, tp.topic, tp.partition, tp, offset)
64+
return super(RecordMetadata, cls).__new__(cls, tp.topic, tp.partition,
65+
tp, offset, timestamp)
6066

6167
def __str__(self):
6268
return 'RecordMetadata(topic=%s, partition=%s, offset=%s)' % (

kafka/producer/kafka.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -347,7 +347,7 @@ def partitions_for(self, topic):
347347
max_wait = self.config['max_block_ms'] / 1000.0
348348
return self._wait_on_metadata(topic, max_wait)
349349

350-
def send(self, topic, value=None, key=None, partition=None):
350+
def send(self, topic, value=None, key=None, partition=None, timestamp_ms=None):
351351
"""Publish a message to a topic.
352352
353353
Arguments:
@@ -368,6 +368,8 @@ def send(self, topic, value=None, key=None, partition=None):
368368
partition (but if key is None, partition is chosen randomly).
369369
Must be type bytes, or be serializable to bytes via configured
370370
key_serializer.
371+
timestamp_ms (int, optional): epoch milliseconds (from Jan 1 1970 UTC)
372+
to use as the message timestamp. Defaults to current time.
371373
372374
Returns:
373375
FutureRecordMetadata: resolves to RecordMetadata
@@ -396,8 +398,11 @@ def send(self, topic, value=None, key=None, partition=None):
396398
self._ensure_valid_record_size(message_size)
397399

398400
tp = TopicPartition(topic, partition)
401+
if timestamp_ms is None:
402+
timestamp_ms = int(time.time() * 1000)
399403
log.debug("Sending (key=%s value=%s) to %s", key, value, tp)
400-
result = self._accumulator.append(tp, key_bytes, value_bytes,
404+
result = self._accumulator.append(tp, timestamp_ms,
405+
key_bytes, value_bytes,
401406
self.config['max_block_ms'])
402407
future, batch_is_full, new_batch_created = result
403408
if batch_is_full or new_batch_created:
@@ -416,8 +421,10 @@ def send(self, topic, value=None, key=None, partition=None):
416421
except Exception as e:
417422
log.debug("Exception occurred during message send: %s", e)
418423
return FutureRecordMetadata(
419-
FutureProduceResult(TopicPartition(topic, partition)),
420-
-1).failure(e)
424+
FutureProduceResult(
425+
TopicPartition(topic, partition)),
426+
-1, None
427+
).failure(e)
421428

422429
def flush(self, timeout=None):
423430
"""

kafka/producer/record_accumulator.py

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ def get(self):
3636

3737

3838
class RecordBatch(object):
39-
def __init__(self, tp, records):
39+
def __init__(self, tp, records, message_version=0):
4040
self.record_count = 0
4141
#self.max_record_size = 0 # for metrics only
4242
now = time.time()
@@ -46,30 +46,33 @@ def __init__(self, tp, records):
4646
self.last_attempt = now
4747
self.last_append = now
4848
self.records = records
49+
self.message_version = message_version
4950
self.topic_partition = tp
5051
self.produce_future = FutureProduceResult(tp)
5152
self._retry = False
5253

53-
def try_append(self, key, value):
54+
def try_append(self, timestamp_ms, key, value):
5455
if not self.records.has_room_for(key, value):
5556
return None
5657

57-
self.records.append(self.record_count, Message(value, key=key))
58+
msg = Message(value, key=key, magic=self.message_version)
59+
self.records.append(self.record_count, msg)
5860
# self.max_record_size = max(self.max_record_size, Record.record_size(key, value)) # for metrics only
5961
self.last_append = time.time()
60-
future = FutureRecordMetadata(self.produce_future, self.record_count)
62+
future = FutureRecordMetadata(self.produce_future, self.record_count,
63+
timestamp_ms)
6164
self.record_count += 1
6265
return future
6366

64-
def done(self, base_offset=None, exception=None):
67+
def done(self, base_offset=None, timestamp_ms=None, exception=None):
6568
log.debug("Produced messages to topic-partition %s with base offset"
6669
" %s and error %s.", self.topic_partition, base_offset,
6770
exception) # trace
6871
if self.produce_future.is_done:
6972
log.warning('Batch is already closed -- ignoring batch.done()')
7073
return
7174
elif exception is None:
72-
self.produce_future.success(base_offset)
75+
self.produce_future.success((base_offset, timestamp_ms))
7376
else:
7477
self.produce_future.failure(exception)
7578

@@ -78,7 +81,7 @@ def maybe_expire(self, request_timeout_ms, linger_ms):
7881
if ((self.records.is_full() and request_timeout_ms < since_append_ms)
7982
or (request_timeout_ms < (since_append_ms + linger_ms))):
8083
self.records.close()
81-
self.done(-1, Errors.KafkaTimeoutError(
84+
self.done(-1, None, Errors.KafkaTimeoutError(
8285
"Batch containing %s record(s) expired due to timeout while"
8386
" requesting metadata from brokers for %s", self.record_count,
8487
self.topic_partition))
@@ -137,6 +140,7 @@ class RecordAccumulator(object):
137140
'compression_type': None,
138141
'linger_ms': 0,
139142
'retry_backoff_ms': 100,
143+
'message_version': 0,
140144
}
141145

142146
def __init__(self, **configs):
@@ -155,7 +159,7 @@ def __init__(self, **configs):
155159
self.config['batch_size'])
156160
self._incomplete = IncompleteRecordBatches()
157161

158-
def append(self, tp, key, value, max_time_to_block_ms):
162+
def append(self, tp, timestamp_ms, key, value, max_time_to_block_ms):
159163
"""Add a record to the accumulator, return the append result.
160164
161165
The append result will contain the future metadata, and flag for
@@ -164,6 +168,7 @@ def append(self, tp, key, value, max_time_to_block_ms):
164168
Arguments:
165169
tp (TopicPartition): The topic/partition to which this record is
166170
being sent
171+
timestamp_ms (int): The timestamp of the record (epoch ms)
167172
key (bytes): The key for the record
168173
value (bytes): The value for the record
169174
max_time_to_block_ms (int): The maximum time in milliseconds to
@@ -188,7 +193,7 @@ def append(self, tp, key, value, max_time_to_block_ms):
188193
dq = self._batches[tp]
189194
if dq:
190195
last = dq[-1]
191-
future = last.try_append(key, value)
196+
future = last.try_append(timestamp_ms, key, value)
192197
if future is not None:
193198
batch_is_full = len(dq) > 1 or last.records.is_full()
194199
return future, batch_is_full, False
@@ -211,7 +216,7 @@ def append(self, tp, key, value, max_time_to_block_ms):
211216

212217
if dq:
213218
last = dq[-1]
214-
future = last.try_append(key, value)
219+
future = last.try_append(timestamp_ms, key, value)
215220
if future is not None:
216221
# Somebody else found us a batch, return the one we
217222
# waited for! Hopefully this doesn't happen often...
@@ -220,9 +225,10 @@ def append(self, tp, key, value, max_time_to_block_ms):
220225
return future, batch_is_full, False
221226

222227
records = MessageSetBuffer(buf, self.config['batch_size'],
223-
self.config['compression_type'])
224-
batch = RecordBatch(tp, records)
225-
future = batch.try_append(key, value)
228+
self.config['compression_type'],
229+
self.config['message_version'])
230+
batch = RecordBatch(tp, records, self.config['message_version'])
231+
future = batch.try_append(timestamp_ms, key, value)
226232
if not future:
227233
raise Exception()
228234

kafka/producer/sender.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ def add_topic(self, topic):
163163
def _failed_produce(self, batches, node_id, error):
164164
log.debug("Error sending produce request to node %d: %s", node_id, error) # trace
165165
for batch in batches:
166-
self._complete_batch(batch, error, -1)
166+
self._complete_batch(batch, error, -1, None)
167167

168168
def _handle_produce_response(self, batches, response):
169169
"""Handle a produce response."""
@@ -183,15 +183,16 @@ def _handle_produce_response(self, batches, response):
183183
else:
184184
# this is the acks = 0 case, just complete all requests
185185
for batch in batches:
186-
self._complete_batch(batch, None, -1)
186+
self._complete_batch(batch, None, -1, None)
187187

188-
def _complete_batch(self, batch, error, base_offset):
188+
def _complete_batch(self, batch, error, base_offset, timestamp_ms=None):
189189
"""Complete or retry the given batch of records.
190190
191191
Arguments:
192192
batch (RecordBatch): The record batch
193193
error (Exception): The error (or None if none)
194194
base_offset (int): The base offset assigned to the records if successful
195+
timestamp_ms (int, optional): The timestamp returned by the broker for this batch
195196
"""
196197
# Standardize no-error to None
197198
if error is Errors.NoError:
@@ -210,7 +211,7 @@ def _complete_batch(self, batch, error, base_offset):
210211
error = error(batch.topic_partition.topic)
211212

212213
# tell the user the result of their request
213-
batch.done(base_offset, error)
214+
batch.done(base_offset, timestamp_ms, error)
214215
self._accumulator.deallocate(batch)
215216

216217
if getattr(error, 'invalid_metadata', False):

0 commit comments

Comments
 (0)