Skip to content

Commit 0678a45

Browse files
committed
Refactoring a bit, cleanup for 0.8
Marking some stuff as not compatible for 0.8 (will be added in 0.8.1)
1 parent b6d98c0 commit 0678a45

File tree

8 files changed

+216
-184
lines changed

8 files changed

+216
-184
lines changed

example.py

+10-16
Original file line numberDiff line numberDiff line change
@@ -2,27 +2,21 @@
22

33
from kafka.client import KafkaClient, FetchRequest, ProduceRequest
44
from kafka.consumer import SimpleConsumer
5+
from kafka.producer import SimpleProducer
56

6-
def produce_example(kafka):
7-
message = kafka.create_message("testing")
8-
request = ProduceRequest("my-topic", -1, [message])
9-
kafka.send_message_set(request)
7+
def produce_example(client):
8+
producer = SimpleProducer(client, "my-topic")
9+
producer.send_message("test")
1010

11-
def consume_example(kafka):
12-
request = FetchRequest("my-topic", 0, 0, 1024)
13-
(messages, nextRequest) = kafka.get_message_set(request)
14-
for message in messages:
15-
print("Got Message: %s" % (message,))
16-
print(nextRequest)
17-
18-
def produce_gz_example(kafka):
19-
message = kafka.create_gzip_message("this message was gzipped", "along with this one")
20-
request = ProduceRequest("my-topic", 0, [message])
21-
kafka.send_message_set(request)
11+
def consume_example(client):
12+
consumer = SimpleConsumer(client, "test-group", "my-topic")
13+
for message in consumer:
14+
print(message)
2215

2316
def main():
2417
client = KafkaClient("localhost", 9092)
25-
consumer = SimpleConsumer(client, "test-group", "my-topic")
18+
produce_example(client)
19+
consume_example(client)
2620

2721
if __name__ == "__main__":
2822
logging.basicConfig(level=logging.DEBUG)

kafka-src

Submodule kafka-src updated 107 files

kafka/client.py

+136-131
Large diffs are not rendered by default.

kafka/consumer.py

+51-20
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from itertools import izip_longest, repeat
12
import logging
23
from threading import Lock
34

@@ -30,7 +31,7 @@ def __init__(self, client, group, topic, auto_commit=False, auto_commit_every_n=
3031
self.client = client
3132
self.topic = topic
3233
self.group = group
33-
self.client.load_metadata_for_topics(topic)
34+
self.client._load_metadata_for_topics(topic)
3435
self.offsets = {}
3536

3637
# Set up the auto-commit timer
@@ -54,12 +55,16 @@ def get_or_init_offset_callback(resp):
5455
raise Exception("OffsetFetchRequest for topic=%s, partition=%d failed with errorcode=%s" % (
5556
resp.topic, resp.partition, resp.error))
5657

58+
# Uncomment for 0.8.1
59+
#
60+
#for partition in self.client.topic_partitions[topic]:
61+
# req = OffsetFetchRequest(topic, partition)
62+
# (offset,) = self.client.send_offset_fetch_request(group, [req],
63+
# callback=get_or_init_offset_callback, fail_on_error=False)
64+
# self.offsets[partition] = offset
65+
5766
for partition in self.client.topic_partitions[topic]:
58-
req = OffsetFetchRequest(topic, partition)
59-
(offset,) = self.client.send_offset_fetch_request(group, [req],
60-
callback=get_or_init_offset_callback, fail_on_error=False)
61-
self.offsets[partition] = offset
62-
print self.offsets
67+
self.offsets[partition] = 0
6368

6469
def seek(self, offset, whence):
6570
"""
@@ -71,25 +76,30 @@ def seek(self, offset, whence):
7176
1 is relative to the current offset
7277
2 is relative to the latest known offset (tail)
7378
"""
74-
if whence == 1:
75-
# relative to current position
79+
if whence == 1: # relative to current position
7680
for partition, _offset in self.offsets.items():
7781
self.offset[partition] = _offset + offset
78-
elif whence in (0, 2):
79-
# relative to beginning or end
82+
elif whence in (0, 2): # relative to beginning or end
83+
# divide the request offset by number of partitions, distribute the remained evenly
84+
(delta, rem) = divmod(offset, len(self.offsets))
85+
deltas = {}
86+
for partition, r in izip_longest(self.offsets.keys(), repeat(1, rem), fillvalue=0):
87+
deltas[partition] = delta + r
88+
8089
reqs = []
81-
for partition in offsets.keys():
90+
for partition in self.offsets.keys():
8291
if whence == 0:
8392
reqs.append(OffsetRequest(self.topic, partition, -2, 1))
8493
elif whence == 2:
8594
reqs.append(OffsetRequest(self.topic, partition, -1, 1))
8695
else:
8796
pass
88-
resps = self.client.send_offset_request([req])
97+
98+
resps = self.client.send_offset_request(reqs)
8999
for resp in resps:
90-
self.offsets[resp.partition] = resp.offsets[0] + offset
100+
self.offsets[resp.partition] = resp.offsets[0] + deltas[resp.partition]
91101
else:
92-
raise
102+
raise ValueError("Unexpected value for `whence`, %d" % whence)
93103

94104
def commit(self, partitions=[]):
95105
"""
@@ -98,6 +108,8 @@ def commit(self, partitions=[]):
98108
partitions: list of partitions to commit, default is to commit all of them
99109
"""
100110

111+
raise NotImplementedError("Broker-managed offsets not supported in 0.8")
112+
101113
# short circuit if nothing happened
102114
if self.count_since_commit == 0:
103115
return
@@ -121,15 +133,31 @@ def commit(self, partitions=[]):
121133
self.count_since_commit = 0
122134

123135
def __iter__(self):
136+
"""
137+
Create an iterate per partition. Iterate through them calling next() until they are
138+
all exhausted.
139+
"""
124140
iters = {}
125141
for partition, offset in self.offsets.items():
126142
iters[partition] = self.__iter_partition__(partition, offset)
127143

144+
if len(iters) == 0:
145+
return
146+
128147
while True:
129-
for it in iters.values():
130-
yield it.next()
148+
if len(iters) == 0:
149+
break
150+
151+
for partition, it in iters.items():
152+
try:
153+
yield it.next()
154+
except StopIteration:
155+
log.debug("Done iterating over partition %s" % partition)
156+
del iters[partition]
157+
continue # skip auto-commit since we didn't yield anything
158+
159+
# auto commit logic
131160
self.count_since_commit += 1
132-
# deal with auto commits
133161
if self.auto_commit is True:
134162
if self.auto_commit_every_n is not None and self.count_since_commit > self.auto_commit_every_n:
135163
if self.commit_timer is not None:
@@ -140,19 +168,22 @@ def __iter__(self):
140168
self.commit()
141169

142170
def __iter_partition__(self, partition, offset):
171+
"""
172+
Iterate over the messages in a partition. Create a FetchRequest to get back
173+
a batch of messages, yield them one at a time. After a batch is exhausted,
174+
start a new batch unless we've reached the end of ths partition.
175+
"""
143176
while True:
144-
req = FetchRequest(self.topic, partition, offset, 1024)
177+
req = FetchRequest(self.topic, partition, offset, 1024) # TODO configure fetch size
145178
(resp,) = self.client.send_fetch_request([req])
146179
assert resp.topic == self.topic
147180
assert resp.partition == partition
148181
next_offset = None
149182
for message in resp.messages:
150183
next_offset = message.offset
151-
print partition, message, message.offset
152184
yield message
153185
# update the internal state _after_ we yield the message
154186
self.offsets[partition] = message.offset
155-
print partition, next_offset
156187
if next_offset is None:
157188
break
158189
else:

kafka/producer.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,11 @@ class SimpleProducer(object):
1313
def __init__(self, client, topic):
1414
self.client = client
1515
self.topic = topic
16-
self.client.load_metadata_for_topics(topic)
16+
self.client._load_metadata_for_topics(topic)
1717
self.next_partition = cycle(self.client.topic_partitions[topic])
1818

1919
def send_message(self, msg):
2020
req = ProduceRequest(self.topic, self.next_partition.next(),
2121
messages=[create_message(msg)])
22-
resp = self.client.send_produce_request([req]).next()
22+
resp = self.client.send_produce_request([req])[0]
23+
assert resp.error == 0

kafka/protocol.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ def encode_fetch_request(cls, client_id, correlation_id, payloads=[], max_wait_t
217217
return struct.pack('>i%ds' % len(message), len(message), message)
218218

219219
@classmethod
220-
def decode_fetch_response_iter(cls, data):
220+
def decode_fetch_response(cls, data):
221221
"""
222222
Decode bytes to a FetchResponse
223223

kafka/queue.py

+2
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88

99
log = logging.getLogger("kafka")
1010

11+
raise NotImplementedError("Still need to refactor this class")
12+
1113
class KafkaConsumerProcess(Process):
1214
def __init__(self, client, topic, partition, out_queue, barrier, consumer_fetch_size=1024, consumer_sleep=200):
1315
self.client = copy(client)

test/integration.py

+12-13
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,7 @@ def test_produce_100k_gzipped(self):
243243
def test_consume_none(self):
244244
fetch = FetchRequest("test_consume_none", 0, 0, 1024)
245245

246-
fetch_resp = self.client.send_fetch_request([fetch]).next()
246+
fetch_resp = self.client.send_fetch_request([fetch])[0]
247247
self.assertEquals(fetch_resp.error, 0)
248248
self.assertEquals(fetch_resp.topic, "test_consume_none")
249249
self.assertEquals(fetch_resp.partition, 0)
@@ -263,7 +263,7 @@ def test_produce_consume(self):
263263

264264
fetch = FetchRequest("test_produce_consume", 0, 0, 1024)
265265

266-
fetch_resp = self.client.send_fetch_request([fetch]).next()
266+
fetch_resp = self.client.send_fetch_request([fetch])[0]
267267
self.assertEquals(fetch_resp.error, 0)
268268

269269
messages = list(fetch_resp.messages)
@@ -343,6 +343,7 @@ def test_produce_consume_two_partitions(self):
343343
# Offset Tests #
344344
####################
345345

346+
@unittest.skip("No supported until 0.8.1")
346347
def test_commit_fetch_offsets(self):
347348
req = OffsetCommitRequest("test_commit_fetch_offsets", 0, 42, "metadata")
348349
(resp,) = self.client.send_offset_commit_request("group", [req])
@@ -428,22 +429,20 @@ def test_consumer(self):
428429
self.assertEquals(len(all_messages), 200)
429430
self.assertEquals(len(all_messages), len(set(all_messages))) # make sure there are no dupes
430431

431-
# Produce more messages
432-
produce3 = ProduceRequest("test_consumer", 1, messages=[
433-
create_message("Test message 3 %d" % i) for i in range(10)
434-
])
435-
436-
for resp in self.client.send_produce_request([produce3]):
437-
self.assertEquals(resp.error, 0)
438-
self.assertEquals(resp.offset, 100)
432+
consumer.seek(-10, 2)
433+
all_messages = []
434+
for message in consumer:
435+
all_messages.append(message)
439436

440-
# Start a new consumer, make sure we only get the newly produced messages
441-
consumer = SimpleConsumer(self.client, "group1", "test_consumer")
437+
self.assertEquals(len(all_messages), 10)
442438

439+
consumer.seek(-13, 2)
443440
all_messages = []
444441
for message in consumer:
445442
all_messages.append(message)
446-
self.assertEquals(len(all_messages), 10)
443+
444+
self.assertEquals(len(all_messages), 13)
445+
447446

448447
if __name__ == "__main__":
449448
logging.basicConfig(level=logging.DEBUG)

0 commit comments

Comments
 (0)