Skip to content

Commit ad13500

Browse files
committed
Add skip_double_compressed_messages option to KafkaConsumer
1 parent ed6098c commit ad13500

File tree

2 files changed

+20
-0
lines changed

2 files changed

+20
-0
lines changed

kafka/consumer/fetcher.py

+12
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ class Fetcher(six.Iterator):
3939
'fetch_max_wait_ms': 500,
4040
'max_partition_fetch_bytes': 1048576,
4141
'check_crcs': True,
42+
'skip_double_compressed_messages': False,
4243
'iterator_refetch_records': 1, # undocumented -- interface may change
4344
'api_version': (0, 8, 0),
4445
}
@@ -71,6 +72,13 @@ def __init__(self, client, subscriptions, metrics, metric_group_prefix,
7172
consumed. This ensures no on-the-wire or on-disk corruption to
7273
the messages occurred. This check adds some overhead, so it may
7374
be disabled in cases seeking extreme performance. Default: True
75+
skip_double_compressed_messages (bool): A bug in KafkaProducer
76+
caused some messages to be corrupted via double-compression.
77+
By default, the fetcher will return the messages as a compressed
78+
blob of bytes with a single offset, i.e. how the message was
79+
actually published to the cluster. If you prefer to have the
80+
fetcher automatically detect corrupt messages and skip them,
81+
set this option to True. Default: False.
7482
"""
7583
self.config = copy.copy(self.DEFAULT_CONFIG)
7684
for key in self.config:
@@ -368,6 +376,10 @@ def _unpack_message_set(self, tp, messages):
368376
' double-compressed. This should not'
369377
' happen -- check your producers!',
370378
tp, offset)
379+
if self.config['skip_double_compressed_messages']:
380+
log.warning('Skipping double-compressed message at'
381+
' %s %d', tp, offset)
382+
continue
371383

372384
if msg.magic > 0:
373385
last_offset, _, _ = inner_mset[-1]

kafka/consumer/group.py

+8
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,13 @@ class KafkaConsumer(six.Iterator):
123123
consumer_timeout_ms (int): number of milliseconds to block during
124124
message iteration before raising StopIteration (i.e., ending the
125125
iterator). Default -1 (block forever).
126+
skip_double_compressed_messages (bool): A bug in KafkaProducer <= 1.2.4
127+
caused some messages to be corrupted via double-compression.
128+
By default, the fetcher will return these messages as a compressed
129+
blob of bytes with a single offset, i.e. how the message was
130+
actually published to the cluster. If you prefer to have the
131+
fetcher automatically detect corrupt messages and skip them,
132+
set this option to True. Default: False.
126133
security_protocol (str): Protocol used to communicate with brokers.
127134
Valid values are: PLAINTEXT, SSL. Default: PLAINTEXT.
128135
ssl_context (ssl.SSLContext): pre-configured SSLContext for wrapping
@@ -189,6 +196,7 @@ class KafkaConsumer(six.Iterator):
189196
'send_buffer_bytes': None,
190197
'receive_buffer_bytes': None,
191198
'consumer_timeout_ms': -1,
199+
'skip_double_compressed_messages': False,
192200
'security_protocol': 'PLAINTEXT',
193201
'ssl_context': None,
194202
'ssl_check_hostname': True,

0 commit comments

Comments
 (0)