diff --git a/changelog/68136.fixed.md b/changelog/68136.fixed.md new file mode 100644 index 000000000000..15a97f642a4a --- /dev/null +++ b/changelog/68136.fixed.md @@ -0,0 +1 @@ +Fix filedescriptor out of range problem in tcp.py by replacing select.sect() with the higher-level selectors API diff --git a/salt/transport/tcp.py b/salt/transport/tcp.py index bf6760cbbe28..fd0cbd677ebe 100644 --- a/salt/transport/tcp.py +++ b/salt/transport/tcp.py @@ -11,7 +11,7 @@ import logging import multiprocessing import queue -import select +import selectors import socket import threading import time @@ -371,10 +371,13 @@ async def recv(self, timeout=None): if timeout == 0: for msg in self.unpacker: return msg[b"body"] - try: - events, _, _ = select.select([self._stream.socket], [], [], 0) - except TimeoutError: - events = [] + + with selectors.DefaultSelector() as sel: + sel.register(self._stream.socket, selectors.EVENT_READ) + ready = sel.select(timeout=0) + events = [key.fileobj for key, _ in ready] + sel.unregister(self._stream.socket) + if events: while not self._closing: async with self._read_in_progress: diff --git a/tests/pytests/unit/transport/test_publish_client.py b/tests/pytests/unit/transport/test_publish_client.py index c212d7ef4e1a..c1e7e5685508 100644 --- a/tests/pytests/unit/transport/test_publish_client.py +++ b/tests/pytests/unit/transport/test_publish_client.py @@ -5,6 +5,7 @@ import asyncio import hashlib import logging +import selectors import socket import time @@ -295,3 +296,39 @@ async def handler(request): raise Exception(f"Unknown transport {transport}") client.close() await asyncio.sleep(0.03) + + +async def test_recv_timeout_zero(): + """ + Test recv method with timeout=0. + """ + host = "127.0.0.1" + port = 11122 + ioloop = MagicMock() + mock_stream = MagicMock() + mock_unpacker = MagicMock() + mock_unpacker.__iter__.return_value = [] + mock_socket = MagicMock() + mock_stream.socket = mock_socket + + mock_selector_instance = MagicMock() + mock_selector_instance.__enter__.return_value = mock_selector_instance + mock_selector_instance.__exit__.return_value = None + mock_selector_instance.select.return_value = [] + + with patch( + "salt.transport.tcp.selectors.DefaultSelector", + return_value=mock_selector_instance, + ), patch("salt.utils.msgpack.Unpacker", return_value=mock_unpacker): + + client = salt.transport.tcp.PublishClient({}, ioloop, host=host, port=port) + client._stream = mock_stream + result = await client.recv(timeout=0) + + assert result is None + mock_selector_instance.register.assert_called_once_with( + mock_socket, selectors.EVENT_READ + ) + mock_selector_instance.unregister.assert_called_once_with(mock_socket) + mock_selector_instance.__enter__.assert_called_once() + mock_selector_instance.__exit__.assert_called_once()