@@ -592,19 +592,27 @@ class SocketPairTest(unittest.TestCase, ThreadableTest):
592
592
def __init__ (self , methodName = 'runTest' ):
593
593
unittest .TestCase .__init__ (self , methodName = methodName )
594
594
ThreadableTest .__init__ (self )
595
+ self .cli = None
596
+ self .serv = None
597
+
598
+ def socketpair (self ):
599
+ # To be overridden by some child classes.
600
+ return socket .socketpair ()
595
601
596
602
def setUp (self ):
597
- self .serv , self .cli = socket .socketpair ()
603
+ self .serv , self .cli = self .socketpair ()
598
604
599
605
def tearDown (self ):
600
- self .serv .close ()
606
+ if self .serv :
607
+ self .serv .close ()
601
608
self .serv = None
602
609
603
610
def clientSetUp (self ):
604
611
pass
605
612
606
613
def clientTearDown (self ):
607
- self .cli .close ()
614
+ if self .cli :
615
+ self .cli .close ()
608
616
self .cli = None
609
617
ThreadableTest .clientTearDown (self )
610
618
@@ -4852,6 +4860,120 @@ def _testSend(self):
4852
4860
self .assertEqual (msg , MSG )
4853
4861
4854
4862
4863
+ class PurePythonSocketPairTest (SocketPairTest ):
4864
+
4865
+ # Explicitly use socketpair AF_INET or AF_INET6 to ensure that is the
4866
+ # code path we're using regardless platform is the pure python one where
4867
+ # `_socket.socketpair` does not exist. (AF_INET does not work with
4868
+ # _socket.socketpair on many platforms).
4869
+ def socketpair (self ):
4870
+ # called by super().setUp().
4871
+ try :
4872
+ return socket .socketpair (socket .AF_INET6 )
4873
+ except OSError :
4874
+ return socket .socketpair (socket .AF_INET )
4875
+
4876
+ # Local imports in this class make for easy security fix backporting.
4877
+
4878
+ def setUp (self ):
4879
+ import _socket
4880
+ self ._orig_sp = getattr (_socket , 'socketpair' , None )
4881
+ if self ._orig_sp is not None :
4882
+ # This forces the version using the non-OS provided socketpair
4883
+ # emulation via an AF_INET socket in Lib/socket.py.
4884
+ del _socket .socketpair
4885
+ import importlib
4886
+ global socket
4887
+ socket = importlib .reload (socket )
4888
+ else :
4889
+ pass # This platform already uses the non-OS provided version.
4890
+ super ().setUp ()
4891
+
4892
+ def tearDown (self ):
4893
+ super ().tearDown ()
4894
+ import _socket
4895
+ if self ._orig_sp is not None :
4896
+ # Restore the default socket.socketpair definition.
4897
+ _socket .socketpair = self ._orig_sp
4898
+ import importlib
4899
+ global socket
4900
+ socket = importlib .reload (socket )
4901
+
4902
+ def test_recv (self ):
4903
+ msg = self .serv .recv (1024 )
4904
+ self .assertEqual (msg , MSG )
4905
+
4906
+ def _test_recv (self ):
4907
+ self .cli .send (MSG )
4908
+
4909
+ def test_send (self ):
4910
+ self .serv .send (MSG )
4911
+
4912
+ def _test_send (self ):
4913
+ msg = self .cli .recv (1024 )
4914
+ self .assertEqual (msg , MSG )
4915
+
4916
+ def test_ipv4 (self ):
4917
+ cli , srv = socket .socketpair (socket .AF_INET )
4918
+ cli .close ()
4919
+ srv .close ()
4920
+
4921
+ def _test_ipv4 (self ):
4922
+ pass
4923
+
4924
+ @unittest .skipIf (not hasattr (_socket , 'IPPROTO_IPV6' ) or
4925
+ not hasattr (_socket , 'IPV6_V6ONLY' ),
4926
+ "IPV6_V6ONLY option not supported" )
4927
+ @unittest .skipUnless (socket_helper .IPV6_ENABLED , 'IPv6 required for this test' )
4928
+ def test_ipv6 (self ):
4929
+ cli , srv = socket .socketpair (socket .AF_INET6 )
4930
+ cli .close ()
4931
+ srv .close ()
4932
+
4933
+ def _test_ipv6 (self ):
4934
+ pass
4935
+
4936
+ def test_injected_authentication_failure (self ):
4937
+ orig_getsockname = socket .socket .getsockname
4938
+ inject_sock = None
4939
+
4940
+ def inject_getsocketname (self ):
4941
+ nonlocal inject_sock
4942
+ sockname = orig_getsockname (self )
4943
+ # Connect to the listening socket ahead of the
4944
+ # client socket.
4945
+ if inject_sock is None :
4946
+ inject_sock = socket .socket (socket .AF_INET , socket .SOCK_STREAM )
4947
+ inject_sock .setblocking (False )
4948
+ try :
4949
+ inject_sock .connect (sockname [:2 ])
4950
+ except (BlockingIOError , InterruptedError ):
4951
+ pass
4952
+ inject_sock .setblocking (True )
4953
+ return sockname
4954
+
4955
+ sock1 = sock2 = None
4956
+ try :
4957
+ socket .socket .getsockname = inject_getsocketname
4958
+ with self .assertRaises (OSError ):
4959
+ sock1 , sock2 = socket .socketpair ()
4960
+ finally :
4961
+ socket .socket .getsockname = orig_getsockname
4962
+ if inject_sock :
4963
+ inject_sock .close ()
4964
+ if sock1 : # This cleanup isn't needed on a successful test.
4965
+ sock1 .close ()
4966
+ if sock2 :
4967
+ sock2 .close ()
4968
+
4969
+ def _test_injected_authentication_failure (self ):
4970
+ # No-op. Exists for base class threading infrastructure to call.
4971
+ # We could refactor this test into its own lesser class along with the
4972
+ # setUp and tearDown code to construct an ideal; it is simpler to keep
4973
+ # it here and live with extra overhead one this _one_ failure test.
4974
+ pass
4975
+
4976
+
4855
4977
class NonBlockingTCPTests (ThreadedTCPSocketTest ):
4856
4978
4857
4979
def __init__ (self , methodName = 'runTest' ):
0 commit comments