@@ -39,6 +39,7 @@ class Fetcher(six.Iterator):
39
39
'fetch_max_wait_ms' : 500 ,
40
40
'max_partition_fetch_bytes' : 1048576 ,
41
41
'check_crcs' : True ,
42
+ 'skip_double_compressed_messages' : False ,
42
43
'iterator_refetch_records' : 1 , # undocumented -- interface may change
43
44
'api_version' : (0 , 8 , 0 ),
44
45
}
@@ -71,6 +72,13 @@ def __init__(self, client, subscriptions, metrics, metric_group_prefix,
71
72
consumed. This ensures no on-the-wire or on-disk corruption to
72
73
the messages occurred. This check adds some overhead, so it may
73
74
be disabled in cases seeking extreme performance. Default: True
75
+ skip_double_compressed_messages (bool): A bug in KafkaProducer
76
+ caused some messages to be corrupted via double-compression.
77
+ By default, the fetcher will return the messages as a compressed
78
+ blob of bytes with a single offset, i.e. how the message was
79
+ actually published to the cluster. If you prefer to have the
80
+ fetcher automatically detect corrupt messages and skip them,
81
+ set this option to True. Default: False.
74
82
"""
75
83
self .config = copy .copy (self .DEFAULT_CONFIG )
76
84
for key in self .config :
@@ -368,6 +376,10 @@ def _unpack_message_set(self, tp, messages):
368
376
' double-compressed. This should not'
369
377
' happen -- check your producers!' ,
370
378
tp , offset )
379
+ if self .config ['skip_double_compressed_messages' ]:
380
+ log .warning ('Skipping double-compressed message at'
381
+ ' %s %d' , tp , offset )
382
+ continue
371
383
372
384
if msg .magic > 0 :
373
385
last_offset , _ , _ = inner_mset [- 1 ]
0 commit comments