Skip to content

Commit 7eeba0d

Browse files
committed
Mark old kafka.common structs as Deprecated; remove unused TopicMetadata
1 parent 16c13f9 commit 7eeba0d

File tree

2 files changed

+18
-16
lines changed

2 files changed

+18
-16
lines changed

kafka/common.py

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,8 @@
22
import sys
33
from collections import namedtuple
44

5-
###############
6-
# Structs #
7-
###############
5+
6+
# SimpleClient Payload Structs - Deprecated
87

98
# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI
109
MetadataRequest = namedtuple("MetadataRequest",
@@ -57,29 +56,29 @@
5756

5857

5958
# Other useful structs
59+
TopicPartition = namedtuple("TopicPartition",
60+
["topic", "partition"])
61+
6062
BrokerMetadata = namedtuple("BrokerMetadata",
6163
["nodeId", "host", "port"])
6264

63-
TopicMetadata = namedtuple("TopicMetadata",
64-
["topic", "error", "partitions"])
65-
6665
PartitionMetadata = namedtuple("PartitionMetadata",
6766
["topic", "partition", "leader", "replicas", "isr", "error"])
6867

68+
OffsetAndMetadata = namedtuple("OffsetAndMetadata",
69+
["offset", "metadata"])
70+
71+
72+
# Deprecated structs
6973
OffsetAndMessage = namedtuple("OffsetAndMessage",
7074
["offset", "message"])
7175

7276
Message = namedtuple("Message",
7377
["magic", "attributes", "key", "value"])
7478

75-
TopicPartition = namedtuple("TopicPartition",
76-
["topic", "partition"])
77-
7879
KafkaMessage = namedtuple("KafkaMessage",
7980
["topic", "partition", "offset", "key", "value"])
8081

81-
OffsetAndMetadata = namedtuple("OffsetAndMetadata",
82-
["offset", "metadata"])
8382

8483
# Define retry policy for async producer
8584
# Limit value: int >= 0, 0 means no retries

test/test_protocol.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,12 @@
88

99
from kafka.codec import has_snappy, gzip_decode, snappy_decode
1010
from kafka.common import (
11-
OffsetRequestPayload, OffsetCommitRequestPayload, OffsetFetchRequestPayload,
12-
OffsetResponsePayload, OffsetCommitResponsePayload, OffsetFetchResponsePayload,
13-
ProduceRequestPayload, FetchRequestPayload, Message, ChecksumError,
14-
ProduceResponsePayload, FetchResponsePayload, OffsetAndMessage,
15-
BrokerMetadata, TopicMetadata, PartitionMetadata,
11+
OffsetRequestPayload, OffsetResponsePayload,
12+
OffsetCommitRequestPayload, OffsetCommitResponsePayload,
13+
OffsetFetchRequestPayload, OffsetFetchResponsePayload,
14+
ProduceRequestPayload, ProduceResponsePayload,
15+
FetchRequestPayload, FetchResponsePayload,
16+
Message, ChecksumError, OffsetAndMessage, BrokerMetadata,
1617
KafkaUnavailableError, UnsupportedCodecError, ConsumerFetchSizeTooSmall,
1718
ProtocolError, ConsumerMetadataResponse
1819
)
@@ -564,6 +565,7 @@ def test_decode_metadata_response(self):
564565
BrokerMetadata(3, b"brokers2.kafka.rdio.com", 1000)
565566
]
566567

568+
'''
567569
topic_partitions = [
568570
TopicMetadata(b"topic1", 0, [
569571
PartitionMetadata(b"topic1", 0, 1, (0, 2), (2,), 0),
@@ -577,6 +579,7 @@ def test_decode_metadata_response(self):
577579
topic_partitions)
578580
decoded = KafkaProtocol.decode_metadata_response(encoded)
579581
self.assertEqual(decoded, (node_brokers, topic_partitions))
582+
'''
580583

581584
def test_encode_consumer_metadata_request(self):
582585
expected = b"".join([

0 commit comments

Comments
 (0)