Skip to content

Commit fea10d9

Browse files
committed
LZ4 support in kafka 0.8/0.9 does not accept a ContentSize header
1 parent fb023fe commit fea10d9

File tree

1 file changed

+14
-6
lines changed

1 file changed

+14
-6
lines changed

kafka/codec.py

+14-6
Original file line numberDiff line numberDiff line change
@@ -229,21 +229,29 @@ def lz4_encode_old_kafka(payload):
229229
assert xxhash is not None
230230
data = lz4_encode(payload)
231231
header_size = 7
232-
if isinstance(data[4], int):
233-
flg = data[4]
234-
else:
235-
flg = ord(data[4])
232+
flg = data[4]
233+
if not isinstance(flg, int):
234+
flg = ord(flg)
235+
236236
content_size_bit = ((flg >> 3) & 1)
237237
if content_size_bit:
238-
header_size += 8
238+
# Old kafka does not accept the content-size field
239+
# so we need to discard it and reset the header flag
240+
flg -= 8
241+
data = bytearray(data)
242+
data[4] = flg
243+
data = bytes(data)
244+
payload = data[header_size+8:]
245+
else:
246+
payload = data[header_size:]
239247

240248
# This is the incorrect hc
241249
hc = xxhash.xxh32(data[0:header_size-1]).digest()[-2:-1] # pylint: disable-msg=no-member
242250

243251
return b''.join([
244252
data[0:header_size-1],
245253
hc,
246-
data[header_size:]
254+
payload
247255
])
248256

249257

0 commit comments

Comments
 (0)