Skip to content

Commit 3206c01

Browse files
abbrowne126gcf-owl-bot[bot]parthea
authored
fix: Allow BackgroundConsumer To Inform Caller of Fatal Exceptions with Optional Callback (#821)
* fix: add stop() call to BackgroundConsumer failures due to exceptions * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * move stop call to within the exception clauses * Update test_bidi.py * fix: allow exceptions to be surfaced to the caller of BackgroundConsumer * Update bidi.py * fix: allow BackgroundConsumer caller to pass `on_fatal_exception` to be informed of fatal processing errors * Update tests/unit/test_bidi.py Co-authored-by: Anthonios Partheniou <partheniou@google.com> * Update bidi.py --------- Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com> Co-authored-by: Anthonios Partheniou <partheniou@google.com>
1 parent 225bf75 commit 3206c01

File tree

2 files changed

+45
-3
lines changed

2 files changed

+45
-3
lines changed

google/api_core/bidi.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -624,12 +624,15 @@ def on_response(response):
624624
``open()``ed yet.
625625
on_response (Callable[[protobuf.Message], None]): The callback to
626626
be called for every response on the stream.
627+
on_fatal_exception (Callable[[Exception], None]): The callback to
628+
be called on fatal errors during consumption. Default None.
627629
"""
628630

629-
def __init__(self, bidi_rpc, on_response):
631+
def __init__(self, bidi_rpc, on_response, on_fatal_exception=None):
630632
self._bidi_rpc = bidi_rpc
631633
self._on_response = on_response
632634
self._paused = False
635+
self._on_fatal_exception = on_fatal_exception
633636
self._wake = threading.Condition()
634637
self._thread = None
635638
self._operational_lock = threading.Lock()
@@ -676,13 +679,17 @@ def _thread_main(self, ready):
676679
exc,
677680
exc_info=True,
678681
)
682+
if self._on_fatal_exception is not None:
683+
self._on_fatal_exception(exc)
679684

680685
except Exception as exc:
681686
_LOGGER.exception(
682687
"%s caught unexpected exception %s and will exit.",
683688
_BIDIRECTIONAL_CONSUMER_NAME,
684689
exc,
685690
)
691+
if self._on_fatal_exception is not None:
692+
self._on_fatal_exception(exc)
686693

687694
_LOGGER.info("%s exiting", _BIDIRECTIONAL_CONSUMER_NAME)
688695

@@ -694,8 +701,8 @@ def start(self):
694701
name=_BIDIRECTIONAL_CONSUMER_NAME,
695702
target=self._thread_main,
696703
args=(ready,),
704+
daemon=True,
697705
)
698-
thread.daemon = True
699706
thread.start()
700707
# Other parts of the code rely on `thread.is_alive` which
701708
# isn't sufficient to know if a thread is active, just that it may
@@ -706,7 +713,11 @@ def start(self):
706713
_LOGGER.debug("Started helper thread %s", thread.name)
707714

708715
def stop(self):
709-
"""Stop consuming the stream and shutdown the background thread."""
716+
"""Stop consuming the stream and shutdown the background thread.
717+
718+
NOTE: Cannot be called within `_thread_main`, since it is not
719+
possible to join a thread to itself.
720+
"""
710721
with self._operational_lock:
711722
self._bidi_rpc.close()
712723

@@ -721,6 +732,7 @@ def stop(self):
721732

722733
self._thread = None
723734
self._on_response = None
735+
self._on_fatal_exception = None
724736

725737
@property
726738
def is_active(self):

tests/unit/test_bidi.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -918,3 +918,33 @@ def test_stop_error_logs(self, caplog):
918918
error_logs = [r.message for r in caplog.records if r.levelname == "ERROR"]
919919
assert not error_logs, f"Found unexpected ERROR logs: {error_logs}"
920920
bidi_rpc.is_active = False
921+
922+
def test_fatal_exceptions_can_inform_consumer(self, caplog):
923+
"""
924+
https://github.com/googleapis/python-api-core/issues/820
925+
Exceptions thrown in the BackgroundConsumer not caught by `should_recover` / `should_terminate`
926+
on the RPC should be bubbled back to the caller through `on_fatal_exception`, if passed.
927+
"""
928+
caplog.set_level(logging.DEBUG)
929+
930+
for fatal_exception in (
931+
ValueError("some non-api error"),
932+
exceptions.PermissionDenied("some api error"),
933+
):
934+
bidi_rpc = mock.create_autospec(bidi.ResumableBidiRpc, instance=True)
935+
bidi_rpc.is_active = True
936+
on_response = mock.Mock(spec=["__call__"])
937+
938+
on_fatal_exception = mock.Mock(spec=["__call__"])
939+
940+
bidi_rpc.open.side_effect = fatal_exception
941+
942+
consumer = bidi.BackgroundConsumer(
943+
bidi_rpc, on_response, on_fatal_exception
944+
)
945+
946+
consumer.start()
947+
# let the background thread run for a while before exiting
948+
time.sleep(0.1)
949+
950+
on_fatal_exception.assert_called_once_with(fatal_exception)

0 commit comments

Comments
 (0)