File tree 1 file changed +14
-6
lines changed
1 file changed +14
-6
lines changed Original file line number Diff line number Diff line change @@ -229,21 +229,29 @@ def lz4_encode_old_kafka(payload):
229
229
assert xxhash is not None
230
230
data = lz4_encode (payload )
231
231
header_size = 7
232
- if isinstance ( data [4 ], int ):
233
- flg = data [ 4 ]
234
- else :
235
- flg = ord ( data [ 4 ])
232
+ flg = data [4 ]
233
+ if not isinstance ( flg , int ):
234
+ flg = ord ( flg )
235
+
236
236
content_size_bit = ((flg >> 3 ) & 1 )
237
237
if content_size_bit :
238
- header_size += 8
238
+ # Old kafka does not accept the content-size field
239
+ # so we need to discard it and reset the header flag
240
+ flg -= 8
241
+ data = bytearray (data )
242
+ data [4 ] = flg
243
+ data = bytes (data )
244
+ payload = data [header_size + 8 :]
245
+ else :
246
+ payload = data [header_size :]
239
247
240
248
# This is the incorrect hc
241
249
hc = xxhash .xxh32 (data [0 :header_size - 1 ]).digest ()[- 2 :- 1 ] # pylint: disable-msg=no-member
242
250
243
251
return b'' .join ([
244
252
data [0 :header_size - 1 ],
245
253
hc ,
246
- data [ header_size :]
254
+ payload
247
255
])
248
256
249
257
You can’t perform that action at this time.
0 commit comments