diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py index 7eb55bd63ddb73..f404273c3ae5c1 100644 --- a/Lib/asyncio/proactor_events.py +++ b/Lib/asyncio/proactor_events.py @@ -460,6 +460,8 @@ def _pipe_closed(self, fut): class _ProactorDatagramTransport(_ProactorBasePipeTransport, transports.DatagramTransport): max_size = 256 * 1024 + _header_size = 8 + def __init__(self, loop, sock, protocol, address=None, waiter=None, extra=None): self._address = address @@ -499,7 +501,7 @@ def sendto(self, data, addr=None): # Ensure that what we buffer is immutable. self._buffer.append((bytes(data), addr)) - self._buffer_size += len(data) + 8 # include header bytes + self._buffer_size += len(data) + self._header_size if self._write_fut is None: # No current write operations are active, kick one off @@ -526,7 +528,7 @@ def _loop_writing(self, fut=None): return data, addr = self._buffer.popleft() - self._buffer_size -= len(data) + self._buffer_size -= len(data) + self._header_size if self._address is not None: self._write_fut = self._loop._proactor.send(self._sock, data) diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py index 6ad84044adf146..3505d4bb6bd1f7 100644 --- a/Lib/asyncio/selector_events.py +++ b/Lib/asyncio/selector_events.py @@ -1212,6 +1212,7 @@ def close(self): class _SelectorDatagramTransport(_SelectorTransport, transports.DatagramTransport): _buffer_factory = collections.deque + _header_size = 8 def __init__(self, loop, sock, protocol, address=None, waiter=None, extra=None): @@ -1285,13 +1286,13 @@ def sendto(self, data, addr=None): # Ensure that what we buffer is immutable. self._buffer.append((bytes(data), addr)) - self._buffer_size += len(data) + 8 # include header bytes + self._buffer_size += len(data) + self._header_size self._maybe_pause_protocol() def _sendto_ready(self): while self._buffer: data, addr = self._buffer.popleft() - self._buffer_size -= len(data) + self._buffer_size -= len(data) + self._header_size try: if self._extra['peername']: self._sock.send(data) @@ -1299,7 +1300,7 @@ def _sendto_ready(self): self._sock.sendto(data, addr) except (BlockingIOError, InterruptedError): self._buffer.appendleft((data, addr)) # Try again later. - self._buffer_size += len(data) + self._buffer_size += len(data) + self._header_size break except OSError as exc: self._protocol.error_received(exc) diff --git a/Lib/test/test_asyncio/test_proactor_events.py b/Lib/test/test_asyncio/test_proactor_events.py index 24c4e8546b17aa..0dba1eb0d41038 100644 --- a/Lib/test/test_asyncio/test_proactor_events.py +++ b/Lib/test/test_asyncio/test_proactor_events.py @@ -566,6 +566,8 @@ def test_sendto(self): self.assertTrue(self.proactor.sendto.called) self.proactor.sendto.assert_called_with( self.sock, data, addr=('0.0.0.0', 1234)) + self.assertFalse(transport._buffer) + self.assertEqual(0, transport._buffer_size) def test_sendto_bytearray(self): data = bytearray(b'data') diff --git a/Lib/test/test_asyncio/test_selector_events.py b/Lib/test/test_asyncio/test_selector_events.py index aab6a779170eb9..a7f4c0104ecf58 100644 --- a/Lib/test/test_asyncio/test_selector_events.py +++ b/Lib/test/test_asyncio/test_selector_events.py @@ -1497,6 +1497,47 @@ def test_sendto_closing(self): transport.sendto(b'data', (1,)) self.assertEqual(transport._conn_lost, 2) + def test_sendto_sendto_ready(self): + data = b'data' + + # First queue up a buffer by having the socket block + self.sock.sendto.side_effect = BlockingIOError + transport = self.datagram_transport() + transport.sendto(data, ('0.0.0.0', 12345)) + self.loop.assert_writer(7, transport._sendto_ready) + self.assertEqual(1, len(transport._buffer)) + self.assertEqual(transport._buffer_size, len(data) + transport._header_size) + + # Now let the socket send the buffer + self.sock.sendto.side_effect = None + transport._sendto_ready() + self.assertTrue(self.sock.sendto.called) + self.assertEqual( + self.sock.sendto.call_args[0], (data, ('0.0.0.0', 12345))) + self.assertFalse(self.loop.writers) + self.assertFalse(transport._buffer) + self.assertEqual(transport._buffer_size, 0) + + def test_sendto_sendto_ready_blocked(self): + data = b'data' + + # First queue up a buffer by having the socket block + self.sock.sendto.side_effect = BlockingIOError + transport = self.datagram_transport() + transport.sendto(data, ('0.0.0.0', 12345)) + self.loop.assert_writer(7, transport._sendto_ready) + self.assertEqual(1, len(transport._buffer)) + self.assertEqual(transport._buffer_size, len(data) + transport._header_size) + + # Now try to send the buffer and let it get requeued + transport._sendto_ready() + self.assertTrue(self.sock.sendto.called) + self.assertEqual( + self.sock.sendto.call_args[0], (data, ('0.0.0.0', 12345))) + self.assertTrue(self.loop.writers) + self.assertEqual(1, len(transport._buffer)) + self.assertEqual(transport._buffer_size, len(data) + transport._header_size) + def test_sendto_ready(self): data = b'data' self.sock.sendto.return_value = len(data) diff --git a/Misc/NEWS.d/next/Library/2025-06-16-12-37-02.gh-issue-135444.An2eeA.rst b/Misc/NEWS.d/next/Library/2025-06-16-12-37-02.gh-issue-135444.An2eeA.rst new file mode 100644 index 00000000000000..5e01cd43808b55 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2025-06-16-12-37-02.gh-issue-135444.An2eeA.rst @@ -0,0 +1,2 @@ +Fix asyncio DatagramTransport flow control accounting when a packet cannot +be immediately sent.