1
+ from itertools import izip_longest , repeat
1
2
import logging
2
3
from threading import Lock
3
4
@@ -30,7 +31,7 @@ def __init__(self, client, group, topic, auto_commit=False, auto_commit_every_n=
30
31
self .client = client
31
32
self .topic = topic
32
33
self .group = group
33
- self .client .load_metadata_for_topics (topic )
34
+ self .client ._load_metadata_for_topics (topic )
34
35
self .offsets = {}
35
36
36
37
# Set up the auto-commit timer
@@ -54,12 +55,16 @@ def get_or_init_offset_callback(resp):
54
55
raise Exception ("OffsetFetchRequest for topic=%s, partition=%d failed with errorcode=%s" % (
55
56
resp .topic , resp .partition , resp .error ))
56
57
58
+ # Uncomment for 0.8.1
59
+ #
60
+ #for partition in self.client.topic_partitions[topic]:
61
+ # req = OffsetFetchRequest(topic, partition)
62
+ # (offset,) = self.client.send_offset_fetch_request(group, [req],
63
+ # callback=get_or_init_offset_callback, fail_on_error=False)
64
+ # self.offsets[partition] = offset
65
+
57
66
for partition in self .client .topic_partitions [topic ]:
58
- req = OffsetFetchRequest (topic , partition )
59
- (offset ,) = self .client .send_offset_fetch_request (group , [req ],
60
- callback = get_or_init_offset_callback , fail_on_error = False )
61
- self .offsets [partition ] = offset
62
- print self .offsets
67
+ self .offsets [partition ] = 0
63
68
64
69
def seek (self , offset , whence ):
65
70
"""
@@ -71,25 +76,30 @@ def seek(self, offset, whence):
71
76
1 is relative to the current offset
72
77
2 is relative to the latest known offset (tail)
73
78
"""
74
- if whence == 1 :
75
- # relative to current position
79
+ if whence == 1 : # relative to current position
76
80
for partition , _offset in self .offsets .items ():
77
81
self .offset [partition ] = _offset + offset
78
- elif whence in (0 , 2 ):
79
- # relative to beginning or end
82
+ elif whence in (0 , 2 ): # relative to beginning or end
83
+ # divide the request offset by number of partitions, distribute the remained evenly
84
+ (delta , rem ) = divmod (offset , len (self .offsets ))
85
+ deltas = {}
86
+ for partition , r in izip_longest (self .offsets .keys (), repeat (1 , rem ), fillvalue = 0 ):
87
+ deltas [partition ] = delta + r
88
+
80
89
reqs = []
81
- for partition in offsets .keys ():
90
+ for partition in self . offsets .keys ():
82
91
if whence == 0 :
83
92
reqs .append (OffsetRequest (self .topic , partition , - 2 , 1 ))
84
93
elif whence == 2 :
85
94
reqs .append (OffsetRequest (self .topic , partition , - 1 , 1 ))
86
95
else :
87
96
pass
88
- resps = self .client .send_offset_request ([req ])
97
+
98
+ resps = self .client .send_offset_request (reqs )
89
99
for resp in resps :
90
- self .offsets [resp .partition ] = resp .offsets [0 ] + offset
100
+ self .offsets [resp .partition ] = resp .offsets [0 ] + deltas [ resp . partition ]
91
101
else :
92
- raise
102
+ raise ValueError ( "Unexpected value for `whence`, %d" % whence )
93
103
94
104
def commit (self , partitions = []):
95
105
"""
@@ -98,6 +108,8 @@ def commit(self, partitions=[]):
98
108
partitions: list of partitions to commit, default is to commit all of them
99
109
"""
100
110
111
+ raise NotImplementedError ("Broker-managed offsets not supported in 0.8" )
112
+
101
113
# short circuit if nothing happened
102
114
if self .count_since_commit == 0 :
103
115
return
@@ -121,15 +133,31 @@ def commit(self, partitions=[]):
121
133
self .count_since_commit = 0
122
134
123
135
def __iter__ (self ):
136
+ """
137
+ Create an iterate per partition. Iterate through them calling next() until they are
138
+ all exhausted.
139
+ """
124
140
iters = {}
125
141
for partition , offset in self .offsets .items ():
126
142
iters [partition ] = self .__iter_partition__ (partition , offset )
127
143
144
+ if len (iters ) == 0 :
145
+ return
146
+
128
147
while True :
129
- for it in iters .values ():
130
- yield it .next ()
148
+ if len (iters ) == 0 :
149
+ break
150
+
151
+ for partition , it in iters .items ():
152
+ try :
153
+ yield it .next ()
154
+ except StopIteration :
155
+ log .debug ("Done iterating over partition %s" % partition )
156
+ del iters [partition ]
157
+ continue # skip auto-commit since we didn't yield anything
158
+
159
+ # auto commit logic
131
160
self .count_since_commit += 1
132
- # deal with auto commits
133
161
if self .auto_commit is True :
134
162
if self .auto_commit_every_n is not None and self .count_since_commit > self .auto_commit_every_n :
135
163
if self .commit_timer is not None :
@@ -140,19 +168,22 @@ def __iter__(self):
140
168
self .commit ()
141
169
142
170
def __iter_partition__ (self , partition , offset ):
171
+ """
172
+ Iterate over the messages in a partition. Create a FetchRequest to get back
173
+ a batch of messages, yield them one at a time. After a batch is exhausted,
174
+ start a new batch unless we've reached the end of ths partition.
175
+ """
143
176
while True :
144
- req = FetchRequest (self .topic , partition , offset , 1024 )
177
+ req = FetchRequest (self .topic , partition , offset , 1024 ) # TODO configure fetch size
145
178
(resp ,) = self .client .send_fetch_request ([req ])
146
179
assert resp .topic == self .topic
147
180
assert resp .partition == partition
148
181
next_offset = None
149
182
for message in resp .messages :
150
183
next_offset = message .offset
151
- print partition , message , message .offset
152
184
yield message
153
185
# update the internal state _after_ we yield the message
154
186
self .offsets [partition ] = message .offset
155
- print partition , next_offset
156
187
if next_offset is None :
157
188
break
158
189
else :
0 commit comments