Skip to content

Commit b710495

Browse files
author
Dana Powers
committed
Move ConsumerProtocol definition to kafka.coordinator.protocol
1 parent cc22d1b commit b710495

File tree

3 files changed

+44
-35
lines changed

3 files changed

+44
-35
lines changed

kafka/coordinator/assignors/roundrobin.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
from .abstract import AbstractPartitionAssignor
88
from ...common import TopicPartition
9-
from ..consumer import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment
9+
from ..protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment
1010

1111
log = logging.getLogger(__name__)
1212

kafka/coordinator/consumer.py

+10-34
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from __future__ import absolute_import
2+
13
import copy
24
import collections
35
import logging
@@ -6,44 +8,18 @@
68
import six
79

810
from .base import BaseCoordinator
9-
import kafka.common as Errors
10-
from kafka.common import OffsetAndMetadata, TopicPartition
11-
from kafka.future import Future
12-
from kafka.protocol.commit import (
11+
from .protocol import (
12+
ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment,
13+
ConsumerProtocol)
14+
from ..common import OffsetAndMetadata, TopicPartition
15+
from ..future import Future
16+
from ..protocol.commit import (
1317
OffsetCommitRequest_v2, OffsetCommitRequest_v1, OffsetCommitRequest_v0,
1418
OffsetFetchRequest_v0, OffsetFetchRequest_v1)
15-
from kafka.protocol.struct import Struct
16-
from kafka.protocol.types import Array, Bytes, Int16, Int32, Schema, String
17-
18-
log = logging.getLogger(__name__)
19-
20-
21-
class ConsumerProtocolMemberMetadata(Struct):
22-
SCHEMA = Schema(
23-
('version', Int16),
24-
('subscription', Array(String('utf-8'))),
25-
('user_data', Bytes))
26-
27-
28-
class ConsumerProtocolMemberAssignment(Struct):
29-
SCHEMA = Schema(
30-
('version', Int16),
31-
('assignment', Array(
32-
('topic', String('utf-8')),
33-
('partitions', Array(Int32)))),
34-
('user_data', Bytes))
35-
36-
def partitions(self):
37-
return [TopicPartition(topic, partition)
38-
for topic, partitions in self.assignment # pylint: disable-msg=no-member
39-
for partition in partitions]
4019

20+
import kafka.common as Errors
4121

42-
class ConsumerProtocol(object):
43-
PROTOCOL_TYPE = 'consumer'
44-
ASSIGNMENT_STRATEGIES = ('roundrobin',)
45-
METADATA = ConsumerProtocolMemberMetadata
46-
ASSIGNMENT = ConsumerProtocolMemberAssignment
22+
log = logging.getLogger(__name__)
4723

4824

4925
class ConsumerCoordinator(BaseCoordinator):

kafka/coordinator/protocol.py

+33
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
from __future__ import absolute_import
2+
3+
from kafka.common import TopicPartition
4+
from kafka.protocol.struct import Struct
5+
from kafka.protocol.types import Array, Bytes, Int16, Int32, Schema, String
6+
7+
8+
class ConsumerProtocolMemberMetadata(Struct):
9+
SCHEMA = Schema(
10+
('version', Int16),
11+
('subscription', Array(String('utf-8'))),
12+
('user_data', Bytes))
13+
14+
15+
class ConsumerProtocolMemberAssignment(Struct):
16+
SCHEMA = Schema(
17+
('version', Int16),
18+
('assignment', Array(
19+
('topic', String('utf-8')),
20+
('partitions', Array(Int32)))),
21+
('user_data', Bytes))
22+
23+
def partitions(self):
24+
return [TopicPartition(topic, partition)
25+
for topic, partitions in self.assignment # pylint: disable-msg=no-member
26+
for partition in partitions]
27+
28+
29+
class ConsumerProtocol(object):
30+
PROTOCOL_TYPE = 'consumer'
31+
ASSIGNMENT_STRATEGIES = ('roundrobin',)
32+
METADATA = ConsumerProtocolMemberMetadata
33+
ASSIGNMENT = ConsumerProtocolMemberAssignment

0 commit comments

Comments
 (0)