diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 2f7afa1a6..6d67c6e7a 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -460,6 +460,13 @@ def _unpack_message_set(self, tp, records): except AttributeError: pass + # Control messages are used to enable transactions in Kafka and are generated by the + # broker. Clients should not return control batches (ie. those with this bit set) to + # applications. (since 0.11.0.0) + if getattr(batch, "is_control_batch", False): + batch = records.next_batch() + continue + for record in batch: key_size = len(record.key) if record.key is not None else -1 value_size = len(record.value) if record.value is not None else -1