Skip to content

gh-124008: Fix calculation of the number of written bytes for the Windows console #124059

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions Lib/test/test_winconsoleio.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,29 @@ def test_write_empty_data(self):
with ConIO('CONOUT$', 'w') as f:
self.assertEqual(f.write(b''), 0)

@requires_resource('console')
def test_write(self):
testcases = []
with ConIO('CONOUT$', 'w') as f:
for a in [
b'',
b'abc',
b'\xc2\xa7\xe2\x98\x83\xf0\x9f\x90\x8d',
b'\xff'*10,
]:
for b in b'\xc2\xa7', b'\xe2\x98\x83', b'\xf0\x9f\x90\x8d':
testcases.append(a + b)
for i in range(1, len(b)):
data = a + b[:i]
testcases.append(data + b'z')
testcases.append(data + b'\xff')
# incomplete multibyte sequence
with self.subTest(data=data):
self.assertEqual(f.write(data), len(a))
for data in testcases:
with self.subTest(data=data):
self.assertEqual(f.write(data), len(data))

def assertStdinRoundTrip(self, text):
stdin = open('CONIN$', 'r')
old_stdin = sys.stdin
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Fix possible crash (in debug build), incorrect output or returning incorrect
value from raw binary ``write()`` when writing to console on Windows.
118 changes: 90 additions & 28 deletions Modules/_io/winconsoleio.c
Original file line number Diff line number Diff line change
Expand Up @@ -135,19 +135,67 @@ char _PyIO_get_console_type(PyObject *path_or_fd) {
}

static DWORD
_find_last_utf8_boundary(const char *buf, DWORD len)
_find_last_utf8_boundary(const unsigned char *buf, DWORD len)
{
/* This function never returns 0, returns the original len instead */
DWORD count = 1;
if (len == 0 || (buf[len - 1] & 0x80) == 0) {
return len;
}
for (;; count++) {
if (count > 3 || count >= len) {
for (DWORD count = 1; count < 4 && count <= len; count++) {
unsigned char c = buf[len - count];
if (c < 0x80) {
/* No starting byte found. */
return len;
}
if ((buf[len - count] & 0xc0) != 0x80) {
return len - count;
if (c >= 0xc0) {
if (c < 0xe0 /* 2-bytes sequence */ ? count < 2 :
c < 0xf0 /* 3-bytes sequence */ ? count < 3 :
c < 0xf8 /* 4-bytes sequence */)
{
/* Incomplete multibyte sequence. */
return len - count;
}
/* Either complete or invalid sequence. */
return len;
}
}
/* Either complete 4-bytes sequence or invalid sequence. */
return len;
}

/* Find the number of UTF-8 bytes that corresponds to the specified number of
* wchars.
* I.e. find x <= len so that MultiByteToWideChar(CP_UTF8, 0, s, x, NULL, 0) == n.
*
* WideCharToMultiByte() cannot be used for this, because the UTF-8 -> wchar
* conversion is not reversible (invalid UTF-8 byte produces \ufffd which
* will be converted back to 3-bytes UTF-8 sequence \xef\xbf\xbd).
* So we need to use binary search.
*/
static DWORD
_wchar_to_utf8_count(const unsigned char *s, DWORD len, DWORD n)
{
DWORD start = 0;
while (1) {
DWORD mid = 0;
for (DWORD i = len / 2; i <= len; i++) {
mid = _find_last_utf8_boundary(s, i);
if (mid != 0) {
break;
}
/* The middle could split the first multibytes sequence. */
}
if (mid == len) {
return start + len;
}
if (mid == 0) {
mid = len > 1 ? len - 1 : 1;
}
DWORD wlen = MultiByteToWideChar(CP_UTF8, 0, s, mid, NULL, 0);
if (wlen <= n) {
s += mid;
start += mid;
len -= mid;
n -= wlen;
}
else {
len = mid;
}
}
}
Expand Down Expand Up @@ -563,8 +611,10 @@ read_console_w(HANDLE handle, DWORD maxlen, DWORD *readlen) {
int err = 0, sig = 0;

wchar_t *buf = (wchar_t*)PyMem_Malloc(maxlen * sizeof(wchar_t));
if (!buf)
if (!buf) {
PyErr_NoMemory();
goto error;
}

*readlen = 0;

Expand Down Expand Up @@ -622,6 +672,7 @@ read_console_w(HANDLE handle, DWORD maxlen, DWORD *readlen) {
Py_UNBLOCK_THREADS
if (!newbuf) {
sig = -1;
PyErr_NoMemory();
break;
}
buf = newbuf;
Expand All @@ -645,8 +696,10 @@ read_console_w(HANDLE handle, DWORD maxlen, DWORD *readlen) {
if (*readlen > 0 && buf[0] == L'\x1a') {
PyMem_Free(buf);
buf = (wchar_t *)PyMem_Malloc(sizeof(wchar_t));
if (!buf)
if (!buf) {
PyErr_NoMemory();
goto error;
}
buf[0] = L'\0';
*readlen = 0;
}
Expand Down Expand Up @@ -824,8 +877,10 @@ _io__WindowsConsoleIO_readall_impl(winconsoleio *self)
bufsize = BUFSIZ;

buf = (wchar_t*)PyMem_Malloc((bufsize + 1) * sizeof(wchar_t));
if (buf == NULL)
if (buf == NULL) {
PyErr_NoMemory();
return NULL;
}

while (1) {
wchar_t *subbuf;
Expand All @@ -847,6 +902,7 @@ _io__WindowsConsoleIO_readall_impl(winconsoleio *self)
(bufsize + 1) * sizeof(wchar_t));
if (tmp == NULL) {
PyMem_Free(buf);
PyErr_NoMemory();
return NULL;
}
buf = tmp;
Expand Down Expand Up @@ -1022,43 +1078,49 @@ _io__WindowsConsoleIO_write_impl(winconsoleio *self, PyTypeObject *cls,
len = (DWORD)b->len;

Py_BEGIN_ALLOW_THREADS
wlen = MultiByteToWideChar(CP_UTF8, 0, b->buf, len, NULL, 0);

/* issue11395 there is an unspecified upper bound on how many bytes
can be written at once. We cap at 32k - the caller will have to
handle partial writes.
Since we don't know how many input bytes are being ignored, we
have to reduce and recalculate. */
while (wlen > 32766 / sizeof(wchar_t)) {
len /= 2;
const DWORD max_wlen = 32766 / sizeof(wchar_t);
/* UTF-8 to wchar ratio is at most 3:1. */
len = Py_MIN(len, max_wlen * 3);
while (1) {
/* Fix for github issues gh-110913 and gh-82052. */
len = _find_last_utf8_boundary(b->buf, len);
wlen = MultiByteToWideChar(CP_UTF8, 0, b->buf, len, NULL, 0);
if (wlen <= max_wlen) {
break;
}
len /= 2;
}
Py_END_ALLOW_THREADS

if (!wlen)
return PyErr_SetFromWindowsErr(0);
if (!wlen) {
return PyLong_FromLong(0);
}

wbuf = (wchar_t*)PyMem_Malloc(wlen * sizeof(wchar_t));
if (!wbuf) {
PyErr_NoMemory();
return NULL;
}

Py_BEGIN_ALLOW_THREADS
wlen = MultiByteToWideChar(CP_UTF8, 0, b->buf, len, wbuf, wlen);
if (wlen) {
res = WriteConsoleW(handle, wbuf, wlen, &n, NULL);
#ifdef Py_DEBUG
if (res) {
#else
if (res && n < wlen) {
#endif
/* Wrote fewer characters than expected, which means our
* len value may be wrong. So recalculate it from the
* characters that were written. As this could potentially
* result in a different value, we also validate that value.
* characters that were written.
*/
len = WideCharToMultiByte(CP_UTF8, 0, wbuf, n,
NULL, 0, NULL, NULL);
if (len) {
wlen = MultiByteToWideChar(CP_UTF8, 0, b->buf, len,
NULL, 0);
assert(wlen == len);
}
len = _wchar_to_utf8_count(b->buf, len, n);
}
} else
res = 0;
Expand Down
Loading