|
| 1 | +from __future__ import absolute_import |
| 2 | + |
1 | 3 | import copy
|
2 | 4 | import collections
|
3 | 5 | import logging
|
|
6 | 8 | import six
|
7 | 9 |
|
8 | 10 | from .base import BaseCoordinator
|
9 |
| -import kafka.common as Errors |
10 |
| -from kafka.common import OffsetAndMetadata, TopicPartition |
11 |
| -from kafka.future import Future |
12 |
| -from kafka.protocol.commit import ( |
| 11 | +from .protocol import ( |
| 12 | + ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment, |
| 13 | + ConsumerProtocol) |
| 14 | +from ..common import OffsetAndMetadata, TopicPartition |
| 15 | +from ..future import Future |
| 16 | +from ..protocol.commit import ( |
13 | 17 | OffsetCommitRequest_v2, OffsetCommitRequest_v1, OffsetCommitRequest_v0,
|
14 | 18 | OffsetFetchRequest_v0, OffsetFetchRequest_v1)
|
15 |
| -from kafka.protocol.struct import Struct |
16 |
| -from kafka.protocol.types import Array, Bytes, Int16, Int32, Schema, String |
17 |
| - |
18 |
| -log = logging.getLogger(__name__) |
19 |
| - |
20 |
| - |
21 |
| -class ConsumerProtocolMemberMetadata(Struct): |
22 |
| - SCHEMA = Schema( |
23 |
| - ('version', Int16), |
24 |
| - ('subscription', Array(String('utf-8'))), |
25 |
| - ('user_data', Bytes)) |
26 |
| - |
27 |
| - |
28 |
| -class ConsumerProtocolMemberAssignment(Struct): |
29 |
| - SCHEMA = Schema( |
30 |
| - ('version', Int16), |
31 |
| - ('assignment', Array( |
32 |
| - ('topic', String('utf-8')), |
33 |
| - ('partitions', Array(Int32)))), |
34 |
| - ('user_data', Bytes)) |
35 |
| - |
36 |
| - def partitions(self): |
37 |
| - return [TopicPartition(topic, partition) |
38 |
| - for topic, partitions in self.assignment # pylint: disable-msg=no-member |
39 |
| - for partition in partitions] |
40 | 19 |
|
| 20 | +import kafka.common as Errors |
41 | 21 |
|
42 |
| -class ConsumerProtocol(object): |
43 |
| - PROTOCOL_TYPE = 'consumer' |
44 |
| - ASSIGNMENT_STRATEGIES = ('roundrobin',) |
45 |
| - METADATA = ConsumerProtocolMemberMetadata |
46 |
| - ASSIGNMENT = ConsumerProtocolMemberAssignment |
| 22 | +log = logging.getLogger(__name__) |
47 | 23 |
|
48 | 24 |
|
49 | 25 | class ConsumerCoordinator(BaseCoordinator):
|
|
0 commit comments