Skip to content

Commit ac8716d

Browse files
authored
fix(api_core): finalize during close of 'ResumableBidiRpc' (#9337)
Avoid blocking for ill-behaved daemon threads during BiDi shutdown. Closes #8616, #9008.
1 parent 8c74e9c commit ac8716d

File tree

2 files changed

+34
-1
lines changed

2 files changed

+34
-1
lines changed

google/api_core/bidi.py

+9-1
Original file line numberDiff line numberDiff line change
@@ -561,6 +561,10 @@ def _recv(self):
561561
def recv(self):
562562
return self._recoverable(self._recv)
563563

564+
def close(self):
565+
self._finalize(None)
566+
super(ResumableBidiRpc, self).close()
567+
564568
@property
565569
def is_active(self):
566570
"""bool: True if this stream is currently open and active."""
@@ -698,7 +702,11 @@ def stop(self):
698702
if self._thread is not None:
699703
# Resume the thread to wake it up in case it is sleeping.
700704
self.resume()
701-
self._thread.join()
705+
# The daemonized thread may itself block, so don't wait
706+
# for it longer than a second.
707+
self._thread.join(1.0)
708+
if self._thread.is_alive(): # pragma: NO COVER
709+
_LOGGER.warning("Background thread did not exit.")
702710

703711
self._thread = None
704712

tests/unit/test_bidi.py

+25
Original file line numberDiff line numberDiff line change
@@ -597,6 +597,31 @@ def test_recv_failure(self):
597597
assert bidi_rpc.is_active is False
598598
assert call.cancelled is True
599599

600+
def test_close(self):
601+
call = mock.create_autospec(_CallAndFuture, instance=True)
602+
603+
def cancel_side_effect():
604+
call.is_active.return_value = False
605+
606+
call.cancel.side_effect = cancel_side_effect
607+
start_rpc = mock.create_autospec(
608+
grpc.StreamStreamMultiCallable, instance=True, return_value=call
609+
)
610+
should_recover = mock.Mock(spec=["__call__"], return_value=False)
611+
bidi_rpc = bidi.ResumableBidiRpc(start_rpc, should_recover)
612+
bidi_rpc.open()
613+
614+
bidi_rpc.close()
615+
616+
should_recover.assert_not_called()
617+
call.cancel.assert_called_once()
618+
assert bidi_rpc.call == call
619+
assert bidi_rpc.is_active is False
620+
# ensure the request queue was signaled to stop.
621+
assert bidi_rpc.pending_requests == 1
622+
assert bidi_rpc._request_queue.get() is None
623+
assert bidi_rpc._finalized
624+
600625
def test_reopen_failure_on_rpc_restart(self):
601626
error1 = ValueError("1")
602627
error2 = ValueError("2")

0 commit comments

Comments
 (0)