|
17 | 17 | from kafka.future import Future
|
18 | 18 | from kafka.protocol.broker_api_versions import BROKER_API_VERSIONS
|
19 | 19 | from kafka.protocol.fetch import FetchRequest, FetchResponse
|
20 |
| -from kafka.protocol.list_offsets import ListOffsetsResponse |
| 20 | +from kafka.protocol.list_offsets import ListOffsetsResponse, OffsetResetStrategy |
21 | 21 | from kafka.errors import (
|
22 | 22 | StaleMetadata, LeaderNotAvailableError, NotLeaderForPartitionError,
|
23 | 23 | UnknownTopicOrPartitionError, OffsetOutOfRangeError
|
@@ -610,3 +610,51 @@ def test_partition_records_compacted_offset(mocker):
|
610 | 610 | msgs = records.take()
|
611 | 611 | assert len(msgs) == batch_end - fetch_offset - 1
|
612 | 612 | assert msgs[0].offset == fetch_offset + 1
|
| 613 | + |
| 614 | + |
| 615 | +def test_update_fetch_positions_paused(subscription_state, client, mocker): |
| 616 | + fetcher = Fetcher(client, subscription_state) |
| 617 | + tp = TopicPartition('foo', 0) |
| 618 | + subscription_state.assign_from_user([tp]) |
| 619 | + subscription_state.pause(tp) # paused partition does not have a valid position |
| 620 | + subscription_state.need_offset_reset(tp, OffsetResetStrategy.LATEST) |
| 621 | + |
| 622 | + mocker.patch.object(fetcher, '_retrieve_offsets', return_value={tp: OffsetAndTimestamp(10, 1, -1)}) |
| 623 | + fetcher.update_fetch_positions([tp]) |
| 624 | + |
| 625 | + assert not subscription_state.is_offset_reset_needed(tp) |
| 626 | + assert not subscription_state.is_fetchable(tp) # because tp is paused |
| 627 | + assert subscription_state.has_valid_position(tp) |
| 628 | + assert subscription_state.position(tp) == OffsetAndMetadata(10, '', -1) |
| 629 | + |
| 630 | + |
| 631 | +def test_update_fetch_positions_paused_without_valid(subscription_state, client, mocker): |
| 632 | + fetcher = Fetcher(client, subscription_state) |
| 633 | + tp = TopicPartition('foo', 0) |
| 634 | + subscription_state.assign_from_user([tp]) |
| 635 | + subscription_state.pause(tp) # paused partition does not have a valid position |
| 636 | + |
| 637 | + mocker.patch.object(fetcher, '_retrieve_offsets', return_value={tp: OffsetAndTimestamp(0, 1, -1)}) |
| 638 | + fetcher.update_fetch_positions([tp]) |
| 639 | + |
| 640 | + assert not subscription_state.is_offset_reset_needed(tp) |
| 641 | + assert not subscription_state.is_fetchable(tp) # because tp is paused |
| 642 | + assert subscription_state.has_valid_position(tp) |
| 643 | + assert subscription_state.position(tp) == OffsetAndMetadata(0, '', -1) |
| 644 | + |
| 645 | + |
| 646 | +def test_update_fetch_positions_paused_with_valid(subscription_state, client, mocker): |
| 647 | + fetcher = Fetcher(client, subscription_state) |
| 648 | + tp = TopicPartition('foo', 0) |
| 649 | + subscription_state.assign_from_user([tp]) |
| 650 | + subscription_state.assignment[tp].committed = OffsetAndMetadata(0, '', -1) |
| 651 | + subscription_state.seek(tp, 10) |
| 652 | + subscription_state.pause(tp) # paused partition already has a valid position |
| 653 | + |
| 654 | + mocker.patch.object(fetcher, '_retrieve_offsets', return_value={tp: OffsetAndTimestamp(0, 1, -1)}) |
| 655 | + fetcher.update_fetch_positions([tp]) |
| 656 | + |
| 657 | + assert not subscription_state.is_offset_reset_needed(tp) |
| 658 | + assert not subscription_state.is_fetchable(tp) # because tp is paused |
| 659 | + assert subscription_state.has_valid_position(tp) |
| 660 | + assert subscription_state.position(tp) == OffsetAndMetadata(10, '', -1) |
0 commit comments