Skip to content

Commit 931670f

Browse files
committed
Merge pull request dpkp#232 from dpkp/directory_layout
Separate Consumer/Producer/Partitioner modules
2 parents 04dbd0e + 27e812e commit 931670f

17 files changed

+950
-873
lines changed

kafka/consumer.py

Lines changed: 0 additions & 698 deletions
This file was deleted.

kafka/consumer/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
from .simple import SimpleConsumer
2+
from .multiprocess import MultiProcessConsumer
3+
4+
__all__ = [
5+
'SimpleConsumer', 'MultiProcessConsumer'
6+
]

kafka/consumer/base.py

Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
from __future__ import absolute_import
2+
3+
import logging
4+
import numbers
5+
from threading import Lock
6+
7+
import kafka.common
8+
from kafka.common import (
9+
OffsetRequest, OffsetCommitRequest, OffsetFetchRequest,
10+
UnknownTopicOrPartitionError
11+
)
12+
13+
from kafka.util import ReentrantTimer
14+
15+
log = logging.getLogger("kafka")
16+
17+
AUTO_COMMIT_MSG_COUNT = 100
18+
AUTO_COMMIT_INTERVAL = 5000
19+
20+
FETCH_DEFAULT_BLOCK_TIMEOUT = 1
21+
FETCH_MAX_WAIT_TIME = 100
22+
FETCH_MIN_BYTES = 4096
23+
FETCH_BUFFER_SIZE_BYTES = 4096
24+
MAX_FETCH_BUFFER_SIZE_BYTES = FETCH_BUFFER_SIZE_BYTES * 8
25+
26+
ITER_TIMEOUT_SECONDS = 60
27+
NO_MESSAGES_WAIT_TIME_SECONDS = 0.1
28+
29+
30+
class Consumer(object):
31+
"""
32+
Base class to be used by other consumers. Not to be used directly
33+
34+
This base class provides logic for
35+
* initialization and fetching metadata of partitions
36+
* Auto-commit logic
37+
* APIs for fetching pending message count
38+
"""
39+
def __init__(self, client, group, topic, partitions=None, auto_commit=True,
40+
auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
41+
auto_commit_every_t=AUTO_COMMIT_INTERVAL):
42+
43+
self.client = client
44+
self.topic = topic
45+
self.group = group
46+
self.client.load_metadata_for_topics(topic)
47+
self.offsets = {}
48+
49+
if not partitions:
50+
partitions = self.client.get_partition_ids_for_topic(topic)
51+
else:
52+
assert all(isinstance(x, numbers.Integral) for x in partitions)
53+
54+
# Variables for handling offset commits
55+
self.commit_lock = Lock()
56+
self.commit_timer = None
57+
self.count_since_commit = 0
58+
self.auto_commit = auto_commit
59+
self.auto_commit_every_n = auto_commit_every_n
60+
self.auto_commit_every_t = auto_commit_every_t
61+
62+
# Set up the auto-commit timer
63+
if auto_commit is True and auto_commit_every_t is not None:
64+
self.commit_timer = ReentrantTimer(auto_commit_every_t,
65+
self.commit)
66+
self.commit_timer.start()
67+
68+
if auto_commit:
69+
self.fetch_last_known_offsets(partitions)
70+
else:
71+
for partition in partitions:
72+
self.offsets[partition] = 0
73+
74+
def fetch_last_known_offsets(self, partitions=None):
75+
if not partitions:
76+
partitions = self.client.get_partition_ids_for_topic(self.topic)
77+
78+
def get_or_init_offset(resp):
79+
try:
80+
kafka.common.check_error(resp)
81+
return resp.offset
82+
except UnknownTopicOrPartitionError:
83+
return 0
84+
85+
for partition in partitions:
86+
req = OffsetFetchRequest(self.topic, partition)
87+
(resp,) = self.client.send_offset_fetch_request(self.group, [req],
88+
fail_on_error=False)
89+
self.offsets[partition] = get_or_init_offset(resp)
90+
self.fetch_offsets = self.offsets.copy()
91+
92+
def commit(self, partitions=None):
93+
"""
94+
Commit offsets for this consumer
95+
96+
partitions: list of partitions to commit, default is to commit
97+
all of them
98+
"""
99+
100+
# short circuit if nothing happened. This check is kept outside
101+
# to prevent un-necessarily acquiring a lock for checking the state
102+
if self.count_since_commit == 0:
103+
return
104+
105+
with self.commit_lock:
106+
# Do this check again, just in case the state has changed
107+
# during the lock acquiring timeout
108+
if self.count_since_commit == 0:
109+
return
110+
111+
reqs = []
112+
if not partitions: # commit all partitions
113+
partitions = self.offsets.keys()
114+
115+
for partition in partitions:
116+
offset = self.offsets[partition]
117+
log.debug("Commit offset %d in SimpleConsumer: "
118+
"group=%s, topic=%s, partition=%s" %
119+
(offset, self.group, self.topic, partition))
120+
121+
reqs.append(OffsetCommitRequest(self.topic, partition,
122+
offset, None))
123+
124+
resps = self.client.send_offset_commit_request(self.group, reqs)
125+
for resp in resps:
126+
kafka.common.check_error(resp)
127+
128+
self.count_since_commit = 0
129+
130+
def _auto_commit(self):
131+
"""
132+
Check if we have to commit based on number of messages and commit
133+
"""
134+
135+
# Check if we are supposed to do an auto-commit
136+
if not self.auto_commit or self.auto_commit_every_n is None:
137+
return
138+
139+
if self.count_since_commit >= self.auto_commit_every_n:
140+
self.commit()
141+
142+
def stop(self):
143+
if self.commit_timer is not None:
144+
self.commit_timer.stop()
145+
self.commit()
146+
147+
def pending(self, partitions=None):
148+
"""
149+
Gets the pending message count
150+
151+
partitions: list of partitions to check for, default is to check all
152+
"""
153+
if not partitions:
154+
partitions = self.offsets.keys()
155+
156+
total = 0
157+
reqs = []
158+
159+
for partition in partitions:
160+
reqs.append(OffsetRequest(self.topic, partition, -1, 1))
161+
162+
resps = self.client.send_offset_request(reqs)
163+
for resp in resps:
164+
partition = resp.partition
165+
pending = resp.offsets[0]
166+
offset = self.offsets[partition]
167+
total += pending - offset - (1 if offset > 0 else 0)
168+
169+
return total

0 commit comments

Comments
 (0)