diff --git a/CHANGELOG.md b/CHANGELOG.md index 36408ed81..63ac7ab9a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,16 @@ [1]: https://pypi.org/project/google-cloud-pubsub/#history +## [2.13.6](https://github.com/googleapis/python-pubsub/compare/v2.13.5...v2.13.6) (2022-08-11) + + +### Bug Fixes + +* **deps:** allow protobuf < 5.0.0 ([#762](https://github.com/googleapis/python-pubsub/issues/762)) ([260bd18](https://github.com/googleapis/python-pubsub/commit/260bd183ffe19992be9a1c1d298438c1f44d3fa9)) +* **deps:** require proto-plus >= 1.22.0 ([260bd18](https://github.com/googleapis/python-pubsub/commit/260bd183ffe19992be9a1c1d298438c1f44d3fa9)) +* set stream_ack_deadline to max_duration_per_lease_extension or 60 s, set ack_deadline to min_duration_per_lease_extension or 10 s ([#760](https://github.com/googleapis/python-pubsub/issues/760)) ([4444129](https://github.com/googleapis/python-pubsub/commit/4444129b28a19296752e865b73827b78e99adea5)) +* Update stream_ack_deadline with ack_deadline ([#763](https://github.com/googleapis/python-pubsub/issues/763)) ([e600ad8](https://github.com/googleapis/python-pubsub/commit/e600ad8228930445765ffa0c45500a7779e25817)) + ## [2.13.5](https://github.com/googleapis/python-pubsub/compare/v2.13.4...v2.13.5) (2022-08-10) diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index f909c8eec..932699261 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -20,7 +20,7 @@ import logging import threading import typing -from typing import Any, Dict, Callable, Iterable, List, Optional, Tuple, Union +from typing import Any, Dict, Callable, Iterable, List, Optional, Tuple import uuid import grpc # type: ignore @@ -74,6 +74,15 @@ a subscription. We do this to reduce premature ack expiration. """ +_DEFAULT_STREAM_ACK_DEADLINE: float = 60 +"""The default stream ack deadline in seconds.""" + +_MAX_STREAM_ACK_DEADLINE: float = 600 +"""The maximum stream ack deadline in seconds.""" + +_MIN_STREAM_ACK_DEADLINE: float = 10 +"""The minimum stream ack deadline in seconds.""" + _EXACTLY_ONCE_DELIVERY_TEMPORARY_RETRY_ERRORS = { code_pb2.DEADLINE_EXCEEDED, code_pb2.RESOURCE_EXHAUSTED, @@ -270,7 +279,36 @@ def __init__( self._await_callbacks_on_shutdown = await_callbacks_on_shutdown self._ack_histogram = histogram.Histogram() self._last_histogram_size = 0 - self._ack_deadline: Union[int, float] = histogram.MIN_ACK_DEADLINE + + # If max_duration_per_lease_extension is the default + # we set the stream_ack_deadline to the default of 60 + if self._flow_control.max_duration_per_lease_extension == 0: + self._stream_ack_deadline = _DEFAULT_STREAM_ACK_DEADLINE + # We will not be able to extend more than the default minimum + elif ( + self._flow_control.max_duration_per_lease_extension + < _MIN_STREAM_ACK_DEADLINE + ): + self._stream_ack_deadline = _MIN_STREAM_ACK_DEADLINE + # Will not be able to extend past the max + elif ( + self._flow_control.max_duration_per_lease_extension + > _MAX_STREAM_ACK_DEADLINE + ): + self._stream_ack_deadline = _MAX_STREAM_ACK_DEADLINE + else: + self._stream_ack_deadline = ( + self._flow_control.max_duration_per_lease_extension + ) + + self._ack_deadline = max( + min( + self._flow_control.min_duration_per_lease_extension, + histogram.MAX_ACK_DEADLINE, + ), + histogram.MIN_ACK_DEADLINE, + ) + self._rpc: Optional[bidi.ResumableBidiRpc] = None self._callback: Optional[functools.partial] = None self._closing = threading.Lock() @@ -413,7 +451,10 @@ def _obtain_ack_deadline(self, maybe_update: bool) -> float: self._ack_deadline = max( self._ack_deadline, _MIN_ACK_DEADLINE_SECS_WHEN_EXACTLY_ONCE_ENABLED ) - + # If we have updated the ack_deadline and it is longer than the stream_ack_deadline + # set the stream_ack_deadline to the new ack_deadline. + if self._ack_deadline > self._stream_ack_deadline: + self._stream_ack_deadline = self._ack_deadline return self._ack_deadline @property @@ -741,10 +782,10 @@ def heartbeat(self) -> bool: if send_new_ack_deadline: request = gapic_types.StreamingPullRequest( - stream_ack_deadline_seconds=self.ack_deadline + stream_ack_deadline_seconds=self._stream_ack_deadline ) _LOGGER.debug( - "Sending new ack_deadline of %d seconds.", self.ack_deadline + "Sending new ack_deadline of %d seconds.", self._stream_ack_deadline ) else: request = gapic_types.StreamingPullRequest() @@ -780,7 +821,7 @@ def open( ) # Create the RPC - stream_ack_deadline_seconds = self.ack_deadline + stream_ack_deadline_seconds = self._stream_ack_deadline get_initial_request = functools.partial( self._get_initial_request, stream_ack_deadline_seconds @@ -796,7 +837,7 @@ def open( _LOGGER.debug( "Creating a stream, default ACK deadline set to {} seconds.".format( - stream_ack_deadline_seconds + self._stream_ack_deadline ) ) @@ -928,6 +969,8 @@ def _get_initial_request( suitable for any other purpose). """ # Put the request together. + # We need to set streaming ack deadline, but it's not useful since we'll modack to send receipt + # anyway. Set to some big-ish value in case we modack late. request = gapic_types.StreamingPullRequest( stream_ack_deadline_seconds=stream_ack_deadline_seconds, modify_deadline_ack_ids=[], diff --git a/setup.py b/setup.py index 12533438e..4eb55ed31 100644 --- a/setup.py +++ b/setup.py @@ -22,7 +22,7 @@ name = "google-cloud-pubsub" description = "Google Cloud Pub/Sub API client library" -version = "2.13.5" +version = "2.13.6" # Should be one of: # 'Development Status :: 3 - Alpha' # 'Development Status :: 4 - Beta' @@ -31,8 +31,8 @@ dependencies = [ "grpcio >= 1.38.1, < 2.0dev", # https://github.com/googleapis/python-pubsub/issues/414 "google-api-core[grpc] >= 1.32.0, <3.0.0dev,!=2.0.*,!=2.1.*,!=2.2.*,!=2.3.*,!=2.4.*,!=2.5.*,!=2.6.*,!=2.7.*", - "proto-plus >= 1.15.0, <2.0.0dev", - "protobuf >= 3.19.0, <4.0.0dev", + "proto-plus >= 1.22.0, <2.0.0dev", + "protobuf >= 3.19.0, <5.0.0dev", "grpc-google-iam-v1 >=0.12.4, <1.0.0dev", "grpcio-status >= 1.16.0", ] diff --git a/testing/constraints-3.7.txt b/testing/constraints-3.7.txt index 38e0f719d..07498af4b 100644 --- a/testing/constraints-3.7.txt +++ b/testing/constraints-3.7.txt @@ -7,6 +7,6 @@ grpcio==1.38.1 google-api-core==1.32.0 libcst==0.3.10 -proto-plus==1.15.0 +proto-plus==1.22.0 grpc-google-iam-v1==0.12.4 protobuf==3.19.0 diff --git a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py index a8cbfbcda..deb476eb1 100644 --- a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py +++ b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py @@ -107,16 +107,61 @@ def test_constructor_and_default_state(): assert manager._client_id is not None -def test_constructor_with_options(): +def test_constructor_with_default_options(): + flow_control_ = types.FlowControl() manager = streaming_pull_manager.StreamingPullManager( mock.sentinel.client, mock.sentinel.subscription, - flow_control=mock.sentinel.flow_control, + flow_control=flow_control_, scheduler=mock.sentinel.scheduler, ) - assert manager.flow_control == mock.sentinel.flow_control + assert manager.flow_control == flow_control_ assert manager._scheduler == mock.sentinel.scheduler + assert manager._ack_deadline == 10 + assert manager._stream_ack_deadline == 60 + + +def test_constructor_with_min_and_max_duration_per_lease_extension_(): + flow_control_ = types.FlowControl( + min_duration_per_lease_extension=15, max_duration_per_lease_extension=20 + ) + manager = streaming_pull_manager.StreamingPullManager( + mock.sentinel.client, + mock.sentinel.subscription, + flow_control=flow_control_, + scheduler=mock.sentinel.scheduler, + ) + assert manager._ack_deadline == 15 + assert manager._stream_ack_deadline == 20 + + +def test_constructor_with_min_duration_per_lease_extension_too_low(): + flow_control_ = types.FlowControl( + min_duration_per_lease_extension=9, max_duration_per_lease_extension=9 + ) + manager = streaming_pull_manager.StreamingPullManager( + mock.sentinel.client, + mock.sentinel.subscription, + flow_control=flow_control_, + scheduler=mock.sentinel.scheduler, + ) + assert manager._ack_deadline == 10 + assert manager._stream_ack_deadline == 10 + + +def test_constructor_with_max_duration_per_lease_extension_too_high(): + flow_control_ = types.FlowControl( + max_duration_per_lease_extension=601, min_duration_per_lease_extension=601 + ) + manager = streaming_pull_manager.StreamingPullManager( + mock.sentinel.client, + mock.sentinel.subscription, + flow_control=flow_control_, + scheduler=mock.sentinel.scheduler, + ) + assert manager._ack_deadline == 600 + assert manager._stream_ack_deadline == 600 def make_manager(**kwargs): @@ -164,9 +209,13 @@ def test__obtain_ack_deadline_no_custom_flow_control_setting(): manager._flow_control = types.FlowControl( min_duration_per_lease_extension=0, max_duration_per_lease_extension=0 ) + assert manager._stream_ack_deadline == 60 + assert manager._ack_deadline == 10 + assert manager._obtain_ack_deadline(maybe_update=False) == 10 deadline = manager._obtain_ack_deadline(maybe_update=True) assert deadline == histogram.MIN_ACK_DEADLINE + assert manager._stream_ack_deadline == 60 # When we get some historical data, the deadline is adjusted. manager.ack_histogram.add(histogram.MIN_ACK_DEADLINE * 2) @@ -186,11 +235,14 @@ def test__obtain_ack_deadline_with_max_duration_per_lease_extension(): manager._flow_control = types.FlowControl( max_duration_per_lease_extension=histogram.MIN_ACK_DEADLINE + 1 ) + assert manager._ack_deadline == 10 + manager.ack_histogram.add(histogram.MIN_ACK_DEADLINE * 3) # make p99 value large # The deadline configured in flow control should prevail. deadline = manager._obtain_ack_deadline(maybe_update=True) assert deadline == histogram.MIN_ACK_DEADLINE + 1 + assert manager._stream_ack_deadline == 60 def test__obtain_ack_deadline_with_min_duration_per_lease_extension(): @@ -205,6 +257,7 @@ def test__obtain_ack_deadline_with_min_duration_per_lease_extension(): # The deadline configured in flow control should prevail. deadline = manager._obtain_ack_deadline(maybe_update=True) assert deadline == histogram.MAX_ACK_DEADLINE + assert manager._stream_ack_deadline == histogram.MAX_ACK_DEADLINE def test__obtain_ack_deadline_with_max_duration_per_lease_extension_too_low(): @@ -231,6 +284,7 @@ def test__obtain_ack_deadline_with_min_duration_per_lease_extension_too_high(): # The deadline configured in flow control should be adjusted to the maximum allowed. deadline = manager._obtain_ack_deadline(maybe_update=True) assert deadline == histogram.MAX_ACK_DEADLINE + assert manager._stream_ack_deadline == histogram.MAX_ACK_DEADLINE def test__obtain_ack_deadline_with_exactly_once_enabled(): @@ -247,6 +301,7 @@ def test__obtain_ack_deadline_with_exactly_once_enabled(): # Since the 60-second min ack_deadline value for exactly_once subscriptions # seconds is higher than the histogram value, the deadline should be 60 sec. assert deadline == 60 + assert manager._stream_ack_deadline == 60 def test__obtain_ack_deadline_with_min_duration_per_lease_extension_with_exactly_once_enabled(): @@ -264,6 +319,7 @@ def test__obtain_ack_deadline_with_min_duration_per_lease_extension_with_exactly # User-defined custom min ack_deadline value takes precedence over # exactly_once default of 60 seconds. assert deadline == histogram.MAX_ACK_DEADLINE + assert manager._stream_ack_deadline == histogram.MAX_ACK_DEADLINE def test__obtain_ack_deadline_no_value_update(): @@ -292,12 +348,12 @@ def test__obtain_ack_deadline_no_value_update(): def test_client_id(): manager1 = make_manager() - request1 = manager1._get_initial_request(stream_ack_deadline_seconds=10) + request1 = manager1._get_initial_request(stream_ack_deadline_seconds=60) client_id_1 = request1.client_id assert client_id_1 manager2 = make_manager() - request2 = manager2._get_initial_request(stream_ack_deadline_seconds=10) + request2 = manager2._get_initial_request(stream_ack_deadline_seconds=60) client_id_2 = request2.client_id assert client_id_2 @@ -308,7 +364,7 @@ def test_streaming_flow_control(): manager = make_manager( flow_control=types.FlowControl(max_messages=10, max_bytes=1000) ) - request = manager._get_initial_request(stream_ack_deadline_seconds=10) + request = manager._get_initial_request(stream_ack_deadline_seconds=60) assert request.max_outstanding_messages == 10 assert request.max_outstanding_bytes == 1000 @@ -318,7 +374,7 @@ def test_streaming_flow_control_use_legacy_flow_control(): flow_control=types.FlowControl(max_messages=10, max_bytes=1000), use_legacy_flow_control=True, ) - request = manager._get_initial_request(stream_ack_deadline_seconds=10) + request = manager._get_initial_request(stream_ack_deadline_seconds=60) assert request.max_outstanding_messages == 0 assert request.max_outstanding_bytes == 0 @@ -1046,12 +1102,12 @@ def test_heartbeat_stream_ack_deadline_seconds(caplog): result = manager.heartbeat() manager._rpc.send.assert_called_once_with( - gapic_types.StreamingPullRequest(stream_ack_deadline_seconds=10) + gapic_types.StreamingPullRequest(stream_ack_deadline_seconds=60) ) assert result # Set to false after a send is initiated. assert not manager._send_new_ack_deadline - assert "Sending new ack_deadline of 10 seconds." in caplog.text + assert "Sending new ack_deadline of 60 seconds." in caplog.text @mock.patch("google.api_core.bidi.ResumableBidiRpc", autospec=True) @@ -1096,7 +1152,7 @@ def test_open(heartbeater, dispatcher, leaser, background_consumer, resumable_bi ) initial_request_arg = resumable_bidi_rpc.call_args.kwargs["initial_request"] assert initial_request_arg.func == manager._get_initial_request - assert initial_request_arg.args[0] == 18 + assert initial_request_arg.args[0] == 60 assert not manager._client.get_subscription.called resumable_bidi_rpc.return_value.add_done_callback.assert_called_once_with( @@ -1781,6 +1837,7 @@ def test__on_response_disable_exactly_once(): # exactly_once minimum since exactly_once has been disabled. deadline = manager._obtain_ack_deadline(maybe_update=True) assert deadline == histogram.MIN_ACK_DEADLINE + assert manager._stream_ack_deadline == 60 def test__on_response_exactly_once_immediate_modacks_fail():