Skip to content

Commit 2cfec9d

Browse files
committed
Merge pull request dpkp#217 from locationlabs/transaction
Commit/rollback consumer offsets via context manager
2 parents a1dbb77 + 20db3b5 commit 2cfec9d

File tree

2 files changed

+287
-0
lines changed

2 files changed

+287
-0
lines changed

kafka/context.py

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
"""
2+
Context manager to commit/rollback consumer offsets.
3+
"""
4+
from logging import getLogger
5+
6+
from kafka.common import check_error, OffsetCommitRequest, OffsetOutOfRangeError
7+
8+
9+
class OffsetCommitContext(object):
10+
"""
11+
Provides commit/rollback semantics around a `SimpleConsumer`.
12+
13+
Usage assumes that `auto_commit` is disabled, that messages are consumed in
14+
batches, and that the consuming process will record its own successful
15+
processing of each message. Both the commit and rollback operations respect
16+
a "high-water mark" to ensure that last unsuccessfully processed message
17+
will be retried.
18+
19+
Example:
20+
21+
consumer = SimpleConsumer(client, group, topic, auto_commit=False)
22+
consumer.provide_partition_info()
23+
consumer.fetch_last_known_offsets()
24+
25+
while some_condition:
26+
with OffsetCommitContext(consumer) as context:
27+
messages = consumer.get_messages(count, block=False)
28+
29+
for partition, message in messages:
30+
if can_process(message):
31+
context.mark(partition, message.offset)
32+
else:
33+
break
34+
35+
if not context:
36+
sleep(delay)
37+
38+
39+
These semantics allow for deferred message processing (e.g. if `can_process`
40+
compares message time to clock time) and for repeated processing of the last
41+
unsuccessful message (until some external error is resolved).
42+
"""
43+
44+
def __init__(self, consumer):
45+
"""
46+
:param consumer: an instance of `SimpleConsumer`
47+
"""
48+
self.consumer = consumer
49+
self.initial_offsets = None
50+
self.high_water_mark = None
51+
self.logger = getLogger("kafka.context")
52+
53+
def mark(self, partition, offset):
54+
"""
55+
Set the high-water mark in the current context.
56+
57+
In order to know the current partition, it is helpful to initialize
58+
the consumer to provide partition info via:
59+
60+
consumer.provide_partition_info()
61+
"""
62+
max_offset = max(offset + 1, self.high_water_mark.get(partition, 0))
63+
64+
self.logger.debug("Setting high-water mark to: %s",
65+
{partition: max_offset})
66+
67+
self.high_water_mark[partition] = max_offset
68+
69+
def __nonzero__(self):
70+
"""
71+
Return whether any operations were marked in the context.
72+
"""
73+
return bool(self.high_water_mark)
74+
75+
def __enter__(self):
76+
"""
77+
Start a new context:
78+
79+
- Record the initial offsets for rollback
80+
- Reset the high-water mark
81+
"""
82+
self.initial_offsets = dict(self.consumer.offsets)
83+
self.high_water_mark = dict()
84+
85+
self.logger.debug("Starting context at: %s", self.initial_offsets)
86+
87+
return self
88+
89+
def __exit__(self, exc_type, exc_value, traceback):
90+
"""
91+
End a context.
92+
93+
- If there was no exception, commit up to the current high-water mark.
94+
- If there was an offset of range error, attempt to find the correct
95+
initial offset.
96+
- If there was any other error, roll back to the initial offsets.
97+
"""
98+
if exc_type is None:
99+
self.commit()
100+
elif isinstance(exc_value, OffsetOutOfRangeError):
101+
self.handle_out_of_range()
102+
return True
103+
else:
104+
self.rollback()
105+
106+
def commit(self):
107+
"""
108+
Commit this context's offsets:
109+
110+
- If the high-water mark has moved, commit up to and position the
111+
consumer at the high-water mark.
112+
- Otherwise, reset to the consumer to the initial offsets.
113+
"""
114+
if self.high_water_mark:
115+
self.logger.info("Committing offsets: %s", self.high_water_mark)
116+
self.commit_partition_offsets(self.high_water_mark)
117+
self.update_consumer_offsets(self.high_water_mark)
118+
else:
119+
self.update_consumer_offsets(self.initial_offsets)
120+
121+
def rollback(self):
122+
"""
123+
Rollback this context:
124+
125+
- Position the consumer at the initial offsets.
126+
"""
127+
self.logger.info("Rolling back context: %s", self.initial_offsets)
128+
self.update_consumer_offsets(self.initial_offsets)
129+
130+
def commit_partition_offsets(self, partition_offsets):
131+
"""
132+
Commit explicit partition/offset pairs.
133+
"""
134+
self.logger.debug("Committing partition offsets: %s", partition_offsets)
135+
136+
commit_requests = [
137+
OffsetCommitRequest(self.consumer.topic, partition, offset, None)
138+
for partition, offset in partition_offsets.items()
139+
]
140+
commit_responses = self.consumer.client.send_offset_commit_request(
141+
self.consumer.group,
142+
commit_requests,
143+
)
144+
for commit_response in commit_responses:
145+
check_error(commit_response)
146+
147+
def update_consumer_offsets(self, partition_offsets):
148+
"""
149+
Update consumer offsets to explicit positions.
150+
"""
151+
self.logger.debug("Updating consumer offsets to: %s", partition_offsets)
152+
153+
for partition, offset in partition_offsets.items():
154+
self.consumer.offsets[partition] = offset
155+
156+
# consumer keeps other offset states beyond its `offsets` dictionary,
157+
# a relative seek with zero delta forces the consumer to reset to the
158+
# current value of the `offsets` dictionary
159+
self.consumer.seek(0, 1)
160+
161+
def handle_out_of_range(self):
162+
"""
163+
Handle out of range condition by seeking to the beginning of valid
164+
ranges.
165+
166+
This assumes that an out of range doesn't happen by seeking past the end
167+
of valid ranges -- which is far less likely.
168+
"""
169+
self.logger.info("Seeking beginning of partition on out of range error")
170+
self.consumer.seek(0, 0)

test/test_context.py

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
"""
2+
OffsetCommitContext tests.
3+
"""
4+
from unittest2 import TestCase
5+
6+
from mock import MagicMock, patch
7+
8+
from kafka.common import OffsetOutOfRangeError
9+
from kafka.context import OffsetCommitContext
10+
11+
12+
class TestOffsetCommitContext(TestCase):
13+
"""
14+
OffsetCommitContext tests.
15+
"""
16+
17+
def setUp(self):
18+
self.client = MagicMock()
19+
self.consumer = MagicMock()
20+
self.topic = "topic"
21+
self.group = "group"
22+
self.partition = 0
23+
self.consumer.topic = self.topic
24+
self.consumer.group = self.group
25+
self.consumer.client = self.client
26+
self.consumer.offsets = {self.partition: 0}
27+
self.context = OffsetCommitContext(self.consumer)
28+
29+
def test_noop(self):
30+
"""
31+
Should revert consumer after context exit with no mark() call.
32+
"""
33+
with self.context:
34+
# advance offset
35+
self.consumer.offsets = {self.partition: 1}
36+
37+
# offset restored
38+
self.assertEqual(self.consumer.offsets, {self.partition: 0})
39+
# and seek called with relative zero delta
40+
self.assertEqual(self.consumer.seek.call_count, 1)
41+
self.assertEqual(self.consumer.seek.call_args[0], (0, 1))
42+
43+
def test_mark(self):
44+
"""
45+
Should remain at marked location ater context exit.
46+
"""
47+
with self.context as context:
48+
context.mark(self.partition, 0)
49+
# advance offset
50+
self.consumer.offsets = {self.partition: 1}
51+
52+
# offset sent to client
53+
self.assertEqual(self.client.send_offset_commit_request.call_count, 1)
54+
55+
# offset remains advanced
56+
self.assertEqual(self.consumer.offsets, {self.partition: 1})
57+
58+
# and seek called with relative zero delta
59+
self.assertEqual(self.consumer.seek.call_count, 1)
60+
self.assertEqual(self.consumer.seek.call_args[0], (0, 1))
61+
62+
def test_mark_multiple(self):
63+
"""
64+
Should remain at highest marked location after context exit.
65+
"""
66+
with self.context as context:
67+
context.mark(self.partition, 0)
68+
context.mark(self.partition, 1)
69+
context.mark(self.partition, 2)
70+
# advance offset
71+
self.consumer.offsets = {self.partition: 3}
72+
73+
# offset sent to client
74+
self.assertEqual(self.client.send_offset_commit_request.call_count, 1)
75+
76+
# offset remains advanced
77+
self.assertEqual(self.consumer.offsets, {self.partition: 3})
78+
79+
# and seek called with relative zero delta
80+
self.assertEqual(self.consumer.seek.call_count, 1)
81+
self.assertEqual(self.consumer.seek.call_args[0], (0, 1))
82+
83+
def test_rollback(self):
84+
"""
85+
Should rollback to initial offsets on context exit with exception.
86+
"""
87+
with self.assertRaises(Exception):
88+
with self.context as context:
89+
context.mark(self.partition, 0)
90+
# advance offset
91+
self.consumer.offsets = {self.partition: 1}
92+
93+
raise Exception("Intentional failure")
94+
95+
# offset rolled back (ignoring mark)
96+
self.assertEqual(self.consumer.offsets, {self.partition: 0})
97+
98+
# and seek called with relative zero delta
99+
self.assertEqual(self.consumer.seek.call_count, 1)
100+
self.assertEqual(self.consumer.seek.call_args[0], (0, 1))
101+
102+
def test_out_of_range(self):
103+
"""
104+
Should reset to beginning of valid offsets on `OffsetOutOfRangeError`
105+
"""
106+
def _seek(offset, whence):
107+
# seek must be called with 0, 0 to find the beginning of the range
108+
self.assertEqual(offset, 0)
109+
self.assertEqual(whence, 0)
110+
# set offsets to something different
111+
self.consumer.offsets = {self.partition: 100}
112+
113+
with patch.object(self.consumer, "seek", _seek):
114+
with self.context:
115+
raise OffsetOutOfRangeError()
116+
117+
self.assertEqual(self.consumer.offsets, {self.partition: 100})

0 commit comments

Comments
 (0)