diff --git a/kafka/client_async.py b/kafka/client_async.py index 984cd81fb..da2699101 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -637,7 +637,12 @@ def _poll(self, timeout): for key, events in ready: if key.fileobj.fileno() < 0: - self._selector.unregister(key.fileobj) + try: + self._selector.unregister(key.fileobj) + except KeyError: + pass + # Skip to next key after handling closed fd + continue if key.fileobj is self._wake_r: self._clear_wake_fd() diff --git a/test/test_client_async.py b/test/test_client_async.py index 84d52807e..d0e0e68f3 100644 --- a/test/test_client_async.py +++ b/test/test_client_async.py @@ -247,8 +247,51 @@ def test_poll(mocker): _poll.assert_called_with(cli.config['retry_backoff_ms'] / 1000.0) -def test__poll(): - pass +@pytest.fixture +def setup_selector_key(cli, conn): + def _setup(fileno_value, side_effect=None): + conn._sock.fileno.return_value = fileno_value + mock_key = selectors.SelectorKey( + fileobj=conn._sock, + fd=fileno_value, + events=selectors.EVENT_READ, + data=conn + ) + cli._selector.select.return_value = [(mock_key, selectors.EVENT_READ)] + cli._selector.unregister.side_effect = side_effect + + return _setup + + +def test__poll_basic_read(cli, conn, setup_selector_key): + conn.in_flight_requests = [1] + conn.recv.return_value = [('response', Future())] + setup_selector_key(fileno_value=1) + + cli._poll(0) + + conn.recv.assert_called_once() + assert len(cli._pending_completion) == 1 + + +def test__poll_unregister(cli, conn, setup_selector_key): + setup_selector_key(fileno_value=-1) + + cli._poll(0) + + cli._selector.unregister.assert_called_once_with(conn._sock) + + +def test__poll_unregister_keyerror(cli, conn, setup_selector_key): + setup_selector_key( + fileno_value=-1, + side_effect=KeyError("Already unregistered") + ) + + # Should not raise KeyError + cli._poll(0) + + cli._selector.unregister.assert_called_once_with(conn._sock) def test_in_flight_request_count():