File tree 2 files changed +34
-1
lines changed
2 files changed +34
-1
lines changed Original file line number Diff line number Diff line change @@ -561,6 +561,10 @@ def _recv(self):
561
561
def recv (self ):
562
562
return self ._recoverable (self ._recv )
563
563
564
+ def close (self ):
565
+ self ._finalize (None )
566
+ super (ResumableBidiRpc , self ).close ()
567
+
564
568
@property
565
569
def is_active (self ):
566
570
"""bool: True if this stream is currently open and active."""
@@ -698,7 +702,11 @@ def stop(self):
698
702
if self ._thread is not None :
699
703
# Resume the thread to wake it up in case it is sleeping.
700
704
self .resume ()
701
- self ._thread .join ()
705
+ # The daemonized thread may itself block, so don't wait
706
+ # for it longer than a second.
707
+ self ._thread .join (1.0 )
708
+ if self ._thread .is_alive (): # pragma: NO COVER
709
+ _LOGGER .warning ("Background thread did not exit." )
702
710
703
711
self ._thread = None
704
712
Original file line number Diff line number Diff line change @@ -597,6 +597,31 @@ def test_recv_failure(self):
597
597
assert bidi_rpc .is_active is False
598
598
assert call .cancelled is True
599
599
600
+ def test_close (self ):
601
+ call = mock .create_autospec (_CallAndFuture , instance = True )
602
+
603
+ def cancel_side_effect ():
604
+ call .is_active .return_value = False
605
+
606
+ call .cancel .side_effect = cancel_side_effect
607
+ start_rpc = mock .create_autospec (
608
+ grpc .StreamStreamMultiCallable , instance = True , return_value = call
609
+ )
610
+ should_recover = mock .Mock (spec = ["__call__" ], return_value = False )
611
+ bidi_rpc = bidi .ResumableBidiRpc (start_rpc , should_recover )
612
+ bidi_rpc .open ()
613
+
614
+ bidi_rpc .close ()
615
+
616
+ should_recover .assert_not_called ()
617
+ call .cancel .assert_called_once ()
618
+ assert bidi_rpc .call == call
619
+ assert bidi_rpc .is_active is False
620
+ # ensure the request queue was signaled to stop.
621
+ assert bidi_rpc .pending_requests == 1
622
+ assert bidi_rpc ._request_queue .get () is None
623
+ assert bidi_rpc ._finalized
624
+
600
625
def test_reopen_failure_on_rpc_restart (self ):
601
626
error1 = ValueError ("1" )
602
627
error2 = ValueError ("2" )
You can’t perform that action at this time.
0 commit comments