Skip to content

Commit c7fc65b

Browse files
committed
Some rebase fixes
1 parent 4d0c387 commit c7fc65b

File tree

4 files changed

+96
-29
lines changed

4 files changed

+96
-29
lines changed

kafka/consumer/fetcher.py

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@
2323
log = logging.getLogger(__name__)
2424

2525

26+
# Isolation levels
27+
READ_UNCOMMITTED = 0
28+
READ_COMMITTED = 1
29+
2630
ConsumerRecord = collections.namedtuple("ConsumerRecord",
2731
["topic", "partition", "offset", "timestamp", "timestamp_type",
2832
"key", "value", "checksum", "serialized_key_size", "serialized_value_size"])
@@ -114,6 +118,7 @@ def __init__(self, client, subscriptions, metrics, **configs):
114118
self._iterator = None
115119
self._fetch_futures = collections.deque()
116120
self._sensors = FetchManagerMetrics(metrics, self.config['metric_group_prefix'])
121+
self._isolation_level = READ_UNCOMMITTED
117122

118123
def send_fetches(self):
119124
"""Send FetchRequests for all assigned partitions that do not already have
@@ -670,7 +675,9 @@ def _create_fetch_requests(self):
670675
log.debug("Adding fetch request for partition %s at offset %d",
671676
partition, position)
672677

673-
if self.config['api_version'] >= (0, 10, 1):
678+
if self.config['api_version'] >= (0, 11, 0):
679+
version = 4
680+
elif self.config['api_version'] >= (0, 10, 1):
674681
version = 3
675682
elif self.config['api_version'] >= (0, 10):
676683
version = 2
@@ -696,12 +703,21 @@ def _create_fetch_requests(self):
696703
# dicts retain insert order.
697704
partition_data = list(partition_data.items())
698705
random.shuffle(partition_data)
699-
requests[node_id] = FetchRequest[version](
700-
-1, # replica_id
701-
self.config['fetch_max_wait_ms'],
702-
self.config['fetch_min_bytes'],
703-
self.config['fetch_max_bytes'],
704-
partition_data)
706+
if version == 3:
707+
requests[node_id] = FetchRequest[version](
708+
-1, # replica_id
709+
self.config['fetch_max_wait_ms'],
710+
self.config['fetch_min_bytes'],
711+
self.config['fetch_max_bytes'],
712+
partition_data)
713+
else:
714+
requests[node_id] = FetchRequest[version](
715+
-1, # replica_id
716+
self.config['fetch_max_wait_ms'],
717+
self.config['fetch_min_bytes'],
718+
self.config['fetch_max_bytes'],
719+
self._isolation_level,
720+
partition_data)
705721
return requests
706722

707723
def _handle_fetch_response(self, request, send_time, response):

kafka/record/default_records.py

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -198,14 +198,14 @@ def _read_msg(
198198

199199
key_len, pos = decode_varint(buffer, pos)
200200
if key_len >= 0:
201-
key = buffer[pos: pos + key_len]
201+
key = bytes(buffer[pos: pos + key_len])
202202
pos += key_len
203203
else:
204204
key = None
205205

206206
value_len, pos = decode_varint(buffer, pos)
207207
if value_len >= 0:
208-
value = buffer[pos: pos + value_len]
208+
value = bytes(buffer[pos: pos + value_len])
209209
pos += value_len
210210
else:
211211
value = None
@@ -227,7 +227,7 @@ def _read_msg(
227227
# Value is of type NULLABLE_BYTES, so it can be None
228228
h_value_len, pos = decode_varint(buffer, pos)
229229
if h_value_len >= 0:
230-
h_value = buffer[pos: pos + h_value_len]
230+
h_value = bytes(buffer[pos: pos + h_value_len])
231231
pos += h_value_len
232232
else:
233233
h_value = None
@@ -445,7 +445,7 @@ def append(self, offset, timestamp, key, value, headers,
445445
# Check if we can write this message
446446
if (required_size + len_func(main_buffer) > self._batch_size and
447447
not first_message):
448-
return None, 0
448+
return None
449449

450450
# Those should be updated after the length check
451451
if self._max_timestamp < timestamp:
@@ -456,7 +456,7 @@ def append(self, offset, timestamp, key, value, headers,
456456
encode_varint(message_len, main_buffer.append)
457457
main_buffer.extend(message_buffer)
458458

459-
return None, required_size
459+
return DefaultRecordMetadata(offset, required_size, timestamp)
460460

461461
def write_header(self, use_compression_type=True):
462462
batch_len = len(self._buffer)
@@ -561,3 +561,35 @@ def estimate_size_in_bytes(cls, key, value, headers):
561561
cls.HEADER_STRUCT.size + cls.MAX_RECORD_OVERHEAD +
562562
cls.size_of(key, value, headers)
563563
)
564+
565+
566+
class DefaultRecordMetadata(object):
567+
568+
__slots__ = ("_size", "_timestamp", "_offset")
569+
570+
def __init__(self, offset, size, timestamp):
571+
self._offset = offset
572+
self._size = size
573+
self._timestamp = timestamp
574+
575+
@property
576+
def offset(self):
577+
return self._offset
578+
579+
@property
580+
def crc(self):
581+
return None
582+
583+
@property
584+
def size(self):
585+
return self._size
586+
587+
@property
588+
def timestamp(self):
589+
return self._timestamp
590+
591+
def __repr__(self):
592+
return (
593+
"DefaultRecordMetadata(offset={!r}, size={!r}, timestamp={!r})"
594+
.format(self._offset, self._size, self._timestamp)
595+
)

test/record/test_default_records.py

Lines changed: 32 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,11 @@ def test_written_bytes_equals_size_in_bytes_v2():
5252
0, timestamp=9999999, key=key, value=value, headers=headers)
5353

5454
pos = builder.size()
55-
builder.append(0, timestamp=9999999, key=key, value=value, headers=headers)
55+
meta = builder.append(
56+
0, timestamp=9999999, key=key, value=value, headers=headers)
5657

5758
assert builder.size() - pos == size_in_bytes
59+
assert meta.size == size_in_bytes
5860

5961

6062
def test_estimate_size_in_bytes_bigger_than_batch_v2():
@@ -118,32 +120,50 @@ def test_default_batch_builder_validates_arguments():
118120
assert len(builder.build()) == 104
119121

120122

123+
def test_default_correct_metadata_response():
124+
builder = DefaultRecordBatchBuilder(
125+
magic=2, compression_type=0, is_transactional=0,
126+
producer_id=-1, producer_epoch=-1, base_sequence=-1,
127+
batch_size=1024 * 1024)
128+
meta = builder.append(
129+
0, timestamp=9999999, key=b"test", value=b"Super", headers=[])
130+
131+
assert meta.offset == 0
132+
assert meta.timestamp == 9999999
133+
assert meta.crc is None
134+
assert meta.size == 16
135+
assert repr(meta) == (
136+
"DefaultRecordMetadata(offset=0, size={}, timestamp={})"
137+
.format(meta.size, meta.timestamp)
138+
)
139+
140+
121141
def test_default_batch_size_limit():
122142
# First message can be added even if it's too big
123143
builder = DefaultRecordBatchBuilder(
124144
magic=2, compression_type=0, is_transactional=0,
125145
producer_id=-1, producer_epoch=-1, base_sequence=-1,
126146
batch_size=1024)
127147

128-
crc, size = builder.append(
148+
meta = builder.append(
129149
0, timestamp=None, key=None, value=b"M" * 2000, headers=[])
130-
assert size > 0
131-
assert crc is None
150+
assert meta.size > 0
151+
assert meta.crc is None
152+
assert meta.offset == 0
153+
assert meta.timestamp is not None
132154
assert len(builder.build()) > 2000
133155

134156
builder = DefaultRecordBatchBuilder(
135157
magic=2, compression_type=0, is_transactional=0,
136158
producer_id=-1, producer_epoch=-1, base_sequence=-1,
137159
batch_size=1024)
138-
crc, size = builder.append(
160+
meta = builder.append(
139161
0, timestamp=None, key=None, value=b"M" * 700, headers=[])
140-
assert size > 0
141-
crc, size = builder.append(
162+
assert meta is not None
163+
meta = builder.append(
142164
1, timestamp=None, key=None, value=b"M" * 700, headers=[])
143-
assert size == 0
144-
assert crc is None
145-
crc, size = builder.append(
165+
assert meta is None
166+
meta = builder.append(
146167
2, timestamp=None, key=None, value=b"M" * 700, headers=[])
147-
assert size == 0
148-
assert crc is None
168+
assert meta is None
149169
assert len(builder.build()) < 1000

test/test_producer.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -88,10 +88,7 @@ def test_kafka_producer_proper_record_metadata(kafka_broker, compression):
8888
retries=5,
8989
max_block_ms=10000,
9090
compression_type=compression)
91-
if producer.config['api_version'] >= (0, 10):
92-
magic = 1
93-
else:
94-
magic = 0
91+
magic = producer._max_usable_produce_magic()
9592

9693
topic = random_string(5)
9794
future = producer.send(
@@ -109,7 +106,9 @@ def test_kafka_producer_proper_record_metadata(kafka_broker, compression):
109106
else:
110107
assert record.timestamp == -1 # NO_TIMESTAMP
111108

112-
if magic == 1:
109+
if magic >= 2:
110+
assert record.checksum is None
111+
elif magic == 1:
113112
assert record.checksum == 1370034956
114113
else:
115114
assert record.checksum == 3296137851

0 commit comments

Comments
 (0)