diff --git a/Lib/encodings/aliases.py b/Lib/encodings/aliases.py index d85afd6d5c..6a5ca046b5 100644 --- a/Lib/encodings/aliases.py +++ b/Lib/encodings/aliases.py @@ -209,6 +209,7 @@ 'ms932' : 'cp932', 'mskanji' : 'cp932', 'ms_kanji' : 'cp932', + 'windows_31j' : 'cp932', # cp949 codec '949' : 'cp949', diff --git a/Lib/encodings/idna.py b/Lib/encodings/idna.py index 5396047a7f..0c90b4c9fe 100644 --- a/Lib/encodings/idna.py +++ b/Lib/encodings/idna.py @@ -11,7 +11,7 @@ sace_prefix = "xn--" # This assumes query strings, so AllowUnassigned is true -def nameprep(label): +def nameprep(label): # type: (str) -> str # Map newlabel = [] for c in label: @@ -25,7 +25,7 @@ def nameprep(label): label = unicodedata.normalize("NFKC", label) # Prohibit - for c in label: + for i, c in enumerate(label): if stringprep.in_table_c12(c) or \ stringprep.in_table_c22(c) or \ stringprep.in_table_c3(c) or \ @@ -35,7 +35,7 @@ def nameprep(label): stringprep.in_table_c7(c) or \ stringprep.in_table_c8(c) or \ stringprep.in_table_c9(c): - raise UnicodeError("Invalid character %r" % c) + raise UnicodeEncodeError("idna", label, i, i+1, f"Invalid character {c!r}") # Check bidi RandAL = [stringprep.in_table_d1(x) for x in label] @@ -46,29 +46,38 @@ def nameprep(label): # This is table C.8, which was already checked # 2) If a string contains any RandALCat character, the string # MUST NOT contain any LCat character. - if any(stringprep.in_table_d2(x) for x in label): - raise UnicodeError("Violation of BIDI requirement 2") + for i, x in enumerate(label): + if stringprep.in_table_d2(x): + raise UnicodeEncodeError("idna", label, i, i+1, + "Violation of BIDI requirement 2") # 3) If a string contains any RandALCat character, a # RandALCat character MUST be the first character of the # string, and a RandALCat character MUST be the last # character of the string. - if not RandAL[0] or not RandAL[-1]: - raise UnicodeError("Violation of BIDI requirement 3") + if not RandAL[0]: + raise UnicodeEncodeError("idna", label, 0, 1, + "Violation of BIDI requirement 3") + if not RandAL[-1]: + raise UnicodeEncodeError("idna", label, len(label)-1, len(label), + "Violation of BIDI requirement 3") return label -def ToASCII(label): +def ToASCII(label): # type: (str) -> bytes try: # Step 1: try ASCII - label = label.encode("ascii") - except UnicodeError: + label_ascii = label.encode("ascii") + except UnicodeEncodeError: pass else: # Skip to step 3: UseSTD3ASCIIRules is false, so # Skip to step 8. - if 0 < len(label) < 64: - return label - raise UnicodeError("label empty or too long") + if 0 < len(label_ascii) < 64: + return label_ascii + if len(label) == 0: + raise UnicodeEncodeError("idna", label, 0, 1, "label empty") + else: + raise UnicodeEncodeError("idna", label, 0, len(label), "label too long") # Step 2: nameprep label = nameprep(label) @@ -76,29 +85,34 @@ def ToASCII(label): # Step 3: UseSTD3ASCIIRules is false # Step 4: try ASCII try: - label = label.encode("ascii") - except UnicodeError: + label_ascii = label.encode("ascii") + except UnicodeEncodeError: pass else: # Skip to step 8. if 0 < len(label) < 64: - return label - raise UnicodeError("label empty or too long") + return label_ascii + if len(label) == 0: + raise UnicodeEncodeError("idna", label, 0, 1, "label empty") + else: + raise UnicodeEncodeError("idna", label, 0, len(label), "label too long") # Step 5: Check ACE prefix - if label.startswith(sace_prefix): - raise UnicodeError("Label starts with ACE prefix") + if label.lower().startswith(sace_prefix): + raise UnicodeEncodeError( + "idna", label, 0, len(sace_prefix), "Label starts with ACE prefix") # Step 6: Encode with PUNYCODE - label = label.encode("punycode") + label_ascii = label.encode("punycode") # Step 7: Prepend ACE prefix - label = ace_prefix + label + label_ascii = ace_prefix + label_ascii # Step 8: Check size - if 0 < len(label) < 64: - return label - raise UnicodeError("label empty or too long") + # do not check for empty as we prepend ace_prefix. + if len(label_ascii) < 64: + return label_ascii + raise UnicodeEncodeError("idna", label, 0, len(label), "label too long") def ToUnicode(label): if len(label) > 1024: @@ -110,7 +124,9 @@ def ToUnicode(label): # per https://www.rfc-editor.org/rfc/rfc3454#section-3.1 while still # preventing us from wasting time decoding a big thing that'll just # hit the actual <= 63 length limit in Step 6. - raise UnicodeError("label way too long") + if isinstance(label, str): + label = label.encode("utf-8", errors="backslashreplace") + raise UnicodeDecodeError("idna", label, 0, len(label), "label way too long") # Step 1: Check for ASCII if isinstance(label, bytes): pure_ascii = True @@ -118,25 +134,32 @@ def ToUnicode(label): try: label = label.encode("ascii") pure_ascii = True - except UnicodeError: + except UnicodeEncodeError: pure_ascii = False if not pure_ascii: + assert isinstance(label, str) # Step 2: Perform nameprep label = nameprep(label) # It doesn't say this, but apparently, it should be ASCII now try: label = label.encode("ascii") - except UnicodeError: - raise UnicodeError("Invalid character in IDN label") + except UnicodeEncodeError as exc: + raise UnicodeEncodeError("idna", label, exc.start, exc.end, + "Invalid character in IDN label") # Step 3: Check for ACE prefix - if not label.startswith(ace_prefix): + assert isinstance(label, bytes) + if not label.lower().startswith(ace_prefix): return str(label, "ascii") # Step 4: Remove ACE prefix label1 = label[len(ace_prefix):] # Step 5: Decode using PUNYCODE - result = label1.decode("punycode") + try: + result = label1.decode("punycode") + except UnicodeDecodeError as exc: + offset = len(ace_prefix) + raise UnicodeDecodeError("idna", label, offset+exc.start, offset+exc.end, exc.reason) # Step 6: Apply ToASCII label2 = ToASCII(result) @@ -144,7 +167,8 @@ def ToUnicode(label): # Step 7: Compare the result of step 6 with the one of step 3 # label2 will already be in lower case. if str(label, "ascii").lower() != str(label2, "ascii"): - raise UnicodeError("IDNA does not round-trip", label, label2) + raise UnicodeDecodeError("idna", label, 0, len(label), + f"IDNA does not round-trip, '{label!r}' != '{label2!r}'") # Step 8: return the result of step 5 return result @@ -156,7 +180,7 @@ def encode(self, input, errors='strict'): if errors != 'strict': # IDNA is quite clear that implementations must be strict - raise UnicodeError("unsupported error handling "+errors) + raise UnicodeError(f"Unsupported error handling: {errors}") if not input: return b'', 0 @@ -168,11 +192,16 @@ def encode(self, input, errors='strict'): else: # ASCII name: fast path labels = result.split(b'.') - for label in labels[:-1]: - if not (0 < len(label) < 64): - raise UnicodeError("label empty or too long") - if len(labels[-1]) >= 64: - raise UnicodeError("label too long") + for i, label in enumerate(labels[:-1]): + if len(label) == 0: + offset = sum(len(l) for l in labels[:i]) + i + raise UnicodeEncodeError("idna", input, offset, offset+1, + "label empty") + for i, label in enumerate(labels): + if len(label) >= 64: + offset = sum(len(l) for l in labels[:i]) + i + raise UnicodeEncodeError("idna", input, offset, offset+len(label), + "label too long") return result, len(input) result = bytearray() @@ -182,17 +211,27 @@ def encode(self, input, errors='strict'): del labels[-1] else: trailing_dot = b'' - for label in labels: + for i, label in enumerate(labels): if result: # Join with U+002E result.extend(b'.') - result.extend(ToASCII(label)) + try: + result.extend(ToASCII(label)) + except (UnicodeEncodeError, UnicodeDecodeError) as exc: + offset = sum(len(l) for l in labels[:i]) + i + raise UnicodeEncodeError( + "idna", + input, + offset + exc.start, + offset + exc.end, + exc.reason, + ) return bytes(result+trailing_dot), len(input) def decode(self, input, errors='strict'): if errors != 'strict': - raise UnicodeError("Unsupported error handling "+errors) + raise UnicodeError(f"Unsupported error handling: {errors}") if not input: return "", 0 @@ -202,7 +241,7 @@ def decode(self, input, errors='strict'): # XXX obviously wrong, see #3232 input = bytes(input) - if ace_prefix not in input: + if ace_prefix not in input.lower(): # Fast path try: return input.decode('ascii'), len(input) @@ -218,8 +257,15 @@ def decode(self, input, errors='strict'): trailing_dot = '' result = [] - for label in labels: - result.append(ToUnicode(label)) + for i, label in enumerate(labels): + try: + u_label = ToUnicode(label) + except (UnicodeEncodeError, UnicodeDecodeError) as exc: + offset = sum(len(x) for x in labels[:i]) + len(labels[:i]) + raise UnicodeDecodeError( + "idna", input, offset+exc.start, offset+exc.end, exc.reason) + else: + result.append(u_label) return ".".join(result)+trailing_dot, len(input) @@ -227,7 +273,7 @@ class IncrementalEncoder(codecs.BufferedIncrementalEncoder): def _buffer_encode(self, input, errors, final): if errors != 'strict': # IDNA is quite clear that implementations must be strict - raise UnicodeError("unsupported error handling "+errors) + raise UnicodeError(f"Unsupported error handling: {errors}") if not input: return (b'', 0) @@ -251,7 +297,16 @@ def _buffer_encode(self, input, errors, final): # Join with U+002E result.extend(b'.') size += 1 - result.extend(ToASCII(label)) + try: + result.extend(ToASCII(label)) + except (UnicodeEncodeError, UnicodeDecodeError) as exc: + raise UnicodeEncodeError( + "idna", + input, + size + exc.start, + size + exc.end, + exc.reason, + ) size += len(label) result += trailing_dot @@ -261,7 +316,7 @@ def _buffer_encode(self, input, errors, final): class IncrementalDecoder(codecs.BufferedIncrementalDecoder): def _buffer_decode(self, input, errors, final): if errors != 'strict': - raise UnicodeError("Unsupported error handling "+errors) + raise UnicodeError(f"Unsupported error handling: {errors}") if not input: return ("", 0) @@ -271,7 +326,11 @@ def _buffer_decode(self, input, errors, final): labels = dots.split(input) else: # Must be ASCII string - input = str(input, "ascii") + try: + input = str(input, "ascii") + except (UnicodeEncodeError, UnicodeDecodeError) as exc: + raise UnicodeDecodeError("idna", input, + exc.start, exc.end, exc.reason) labels = input.split(".") trailing_dot = '' @@ -288,7 +347,18 @@ def _buffer_decode(self, input, errors, final): result = [] size = 0 for label in labels: - result.append(ToUnicode(label)) + try: + u_label = ToUnicode(label) + except (UnicodeEncodeError, UnicodeDecodeError) as exc: + raise UnicodeDecodeError( + "idna", + input.encode("ascii", errors="backslashreplace"), + size + exc.start, + size + exc.end, + exc.reason, + ) + else: + result.append(u_label) if size: size += 1 size += len(label) diff --git a/Lib/encodings/palmos.py b/Lib/encodings/palmos.py index c506d65452..df164ca5b9 100644 --- a/Lib/encodings/palmos.py +++ b/Lib/encodings/palmos.py @@ -201,7 +201,7 @@ def getregentry(): '\u02dc' # 0x98 -> SMALL TILDE '\u2122' # 0x99 -> TRADE MARK SIGN '\u0161' # 0x9A -> LATIN SMALL LETTER S WITH CARON - '\x9b' # 0x9B -> + '\u203a' # 0x9B -> SINGLE RIGHT-POINTING ANGLE QUOTATION MARK '\u0153' # 0x9C -> LATIN SMALL LIGATURE OE '\x9d' # 0x9D -> '\x9e' # 0x9E -> diff --git a/Lib/encodings/punycode.py b/Lib/encodings/punycode.py index 1c57264470..4622fc8c92 100644 --- a/Lib/encodings/punycode.py +++ b/Lib/encodings/punycode.py @@ -1,4 +1,4 @@ -""" Codec for the Punicode encoding, as specified in RFC 3492 +""" Codec for the Punycode encoding, as specified in RFC 3492 Written by Martin v. Löwis. """ @@ -131,10 +131,11 @@ def decode_generalized_number(extended, extpos, bias, errors): j = 0 while 1: try: - char = ord(extended[extpos]) + char = extended[extpos] except IndexError: if errors == "strict": - raise UnicodeError("incomplete punicode string") + raise UnicodeDecodeError("punycode", extended, extpos, extpos+1, + "incomplete punycode string") return extpos + 1, None extpos += 1 if 0x41 <= char <= 0x5A: # A-Z @@ -142,8 +143,8 @@ def decode_generalized_number(extended, extpos, bias, errors): elif 0x30 <= char <= 0x39: digit = char - 22 # 0x30-26 elif errors == "strict": - raise UnicodeError("Invalid extended code point '%s'" - % extended[extpos-1]) + raise UnicodeDecodeError("punycode", extended, extpos-1, extpos, + f"Invalid extended code point '{extended[extpos-1]}'") else: return extpos, None t = T(j, bias) @@ -155,11 +156,14 @@ def decode_generalized_number(extended, extpos, bias, errors): def insertion_sort(base, extended, errors): - """3.2 Insertion unsort coding""" + """3.2 Insertion sort coding""" + # This function raises UnicodeDecodeError with position in the extended. + # Caller should add the offset. char = 0x80 pos = -1 bias = 72 extpos = 0 + while extpos < len(extended): newpos, delta = decode_generalized_number(extended, extpos, bias, errors) @@ -171,7 +175,9 @@ def insertion_sort(base, extended, errors): char += pos // (len(base) + 1) if char > 0x10FFFF: if errors == "strict": - raise UnicodeError("Invalid character U+%x" % char) + raise UnicodeDecodeError( + "punycode", extended, pos-1, pos, + f"Invalid character U+{char:x}") char = ord('?') pos = pos % (len(base) + 1) base = base[:pos] + chr(char) + base[pos:] @@ -187,11 +193,21 @@ def punycode_decode(text, errors): pos = text.rfind(b"-") if pos == -1: base = "" - extended = str(text, "ascii").upper() + extended = text.upper() else: - base = str(text[:pos], "ascii", errors) - extended = str(text[pos+1:], "ascii").upper() - return insertion_sort(base, extended, errors) + try: + base = str(text[:pos], "ascii", errors) + except UnicodeDecodeError as exc: + raise UnicodeDecodeError("ascii", text, exc.start, exc.end, + exc.reason) from None + extended = text[pos+1:].upper() + try: + return insertion_sort(base, extended, errors) + except UnicodeDecodeError as exc: + offset = pos + 1 + raise UnicodeDecodeError("punycode", text, + offset+exc.start, offset+exc.end, + exc.reason) from None ### Codec APIs @@ -203,7 +219,7 @@ def encode(self, input, errors='strict'): def decode(self, input, errors='strict'): if errors not in ('strict', 'replace', 'ignore'): - raise UnicodeError("Unsupported error handling "+errors) + raise UnicodeError(f"Unsupported error handling: {errors}") res = punycode_decode(input, errors) return res, len(input) @@ -214,7 +230,7 @@ def encode(self, input, final=False): class IncrementalDecoder(codecs.IncrementalDecoder): def decode(self, input, final=False): if self.errors not in ('strict', 'replace', 'ignore'): - raise UnicodeError("Unsupported error handling "+self.errors) + raise UnicodeError(f"Unsupported error handling: {self.errors}") return punycode_decode(input, self.errors) class StreamWriter(Codec,codecs.StreamWriter): diff --git a/Lib/encodings/undefined.py b/Lib/encodings/undefined.py index 4690288355..082771e1c8 100644 --- a/Lib/encodings/undefined.py +++ b/Lib/encodings/undefined.py @@ -1,6 +1,6 @@ """ Python 'undefined' Codec - This codec will always raise a ValueError exception when being + This codec will always raise a UnicodeError exception when being used. It is intended for use by the site.py file to switch off automatic string to Unicode coercion. diff --git a/Lib/encodings/utf_16.py b/Lib/encodings/utf_16.py index c61248242b..d3b9980026 100644 --- a/Lib/encodings/utf_16.py +++ b/Lib/encodings/utf_16.py @@ -64,7 +64,7 @@ def _buffer_decode(self, input, errors, final): elif byteorder == 1: self.decoder = codecs.utf_16_be_decode elif consumed >= 2: - raise UnicodeError("UTF-16 stream does not start with BOM") + raise UnicodeDecodeError("utf-16", input, 0, 2, "Stream does not start with BOM") return (output, consumed) return self.decoder(input, self.errors, final) @@ -138,7 +138,7 @@ def decode(self, input, errors='strict'): elif byteorder == 1: self.decode = codecs.utf_16_be_decode elif consumed>=2: - raise UnicodeError("UTF-16 stream does not start with BOM") + raise UnicodeDecodeError("utf-16", input, 0, 2, "Stream does not start with BOM") return (object, consumed) ### encodings module API diff --git a/Lib/encodings/utf_32.py b/Lib/encodings/utf_32.py index cdf84d1412..1924bedbb7 100644 --- a/Lib/encodings/utf_32.py +++ b/Lib/encodings/utf_32.py @@ -59,7 +59,7 @@ def _buffer_decode(self, input, errors, final): elif byteorder == 1: self.decoder = codecs.utf_32_be_decode elif consumed >= 4: - raise UnicodeError("UTF-32 stream does not start with BOM") + raise UnicodeDecodeError("utf-32", input, 0, 4, "Stream does not start with BOM") return (output, consumed) return self.decoder(input, self.errors, final) @@ -132,8 +132,8 @@ def decode(self, input, errors='strict'): self.decode = codecs.utf_32_le_decode elif byteorder == 1: self.decode = codecs.utf_32_be_decode - elif consumed>=4: - raise UnicodeError("UTF-32 stream does not start with BOM") + elif consumed >= 4: + raise UnicodeDecodeError("utf-32", input, 0, 4, "Stream does not start with BOM") return (object, consumed) ### encodings module API diff --git a/Lib/io.py b/Lib/io.py index c2812876d3..f0e2fa15d5 100644 --- a/Lib/io.py +++ b/Lib/io.py @@ -46,23 +46,17 @@ "BufferedReader", "BufferedWriter", "BufferedRWPair", "BufferedRandom", "TextIOBase", "TextIOWrapper", "UnsupportedOperation", "SEEK_SET", "SEEK_CUR", "SEEK_END", - "DEFAULT_BUFFER_SIZE", "text_encoding", - "IncrementalNewlineDecoder" - ] + "DEFAULT_BUFFER_SIZE", "text_encoding", "IncrementalNewlineDecoder"] import _io import abc from _io import (DEFAULT_BUFFER_SIZE, BlockingIOError, UnsupportedOperation, - open, open_code, BytesIO, StringIO, BufferedReader, + open, open_code, FileIO, BytesIO, StringIO, BufferedReader, BufferedWriter, BufferedRWPair, BufferedRandom, IncrementalNewlineDecoder, text_encoding, TextIOWrapper) -try: - from _io import FileIO -except ImportError: - pass # Pretend this exception was created here. UnsupportedOperation.__module__ = "io" @@ -87,10 +81,7 @@ class BufferedIOBase(_io._BufferedIOBase, IOBase): class TextIOBase(_io._TextIOBase, IOBase): __doc__ = _io._TextIOBase.__doc__ -try: - RawIOBase.register(FileIO) -except NameError: - pass +RawIOBase.register(FileIO) for klass in (BytesIO, BufferedReader, BufferedWriter, BufferedRandom, BufferedRWPair): diff --git a/Lib/test/test_codecs.py b/Lib/test/test_codecs.py index f986d85c6d..0d3d8c9e2d 100644 --- a/Lib/test/test_codecs.py +++ b/Lib/test/test_codecs.py @@ -762,7 +762,6 @@ def test_only_one_bom(self): f = reader(s) self.assertEqual(f.read(), "spamspam") - @unittest.expectedFailure # TODO: RUSTPYTHON;; UTF-16 stream does not start with BOM def test_badbom(self): s = io.BytesIO(b"\xff\xff") f = codecs.getreader(self.encoding)(s) @@ -1509,7 +1508,6 @@ def test_decode(self): puny = puny.decode("ascii").encode("ascii") self.assertEqual(uni, puny.decode("punycode")) - @unittest.expectedFailure # TODO: RUSTPYTHON; b'Pro\xffprostnemluvesky' != b'Pro\xffprostnemluvesky-uyb24dma41a' def test_decode_invalid(self): testcases = [ (b"xn--w&", "strict", UnicodeDecodeError("punycode", b"", 5, 6, "")), @@ -1694,7 +1692,6 @@ def test_decode_invalid(self): class NameprepTest(unittest.TestCase): - @unittest.expectedFailure # TODO: RUSTPYTHON; UnicodeError: Invalid character '\u1680' def test_nameprep(self): from encodings.idna import nameprep for pos, (orig, prepped) in enumerate(nameprep_tests): @@ -1732,7 +1729,6 @@ class IDNACodecTest(unittest.TestCase): ("あさ.\u034f", UnicodeEncodeError("idna", "あさ.\u034f", 3, 4, "")), ] - @unittest.expectedFailure # TODO: RUSTPYTHON; 'XN--pythn-mua.org.' != 'pythön.org.' def test_builtin_decode(self): self.assertEqual(str(b"python.org", "idna"), "python.org") self.assertEqual(str(b"python.org.", "idna"), "python.org.") @@ -1746,7 +1742,6 @@ def test_builtin_decode(self): self.assertEqual(str(b"bugs.XN--pythn-mua.org.", "idna"), "bugs.pyth\xf6n.org.") - @unittest.expectedFailure # TODO: RUSTPYTHON; 'ascii' != 'idna' def test_builtin_decode_invalid(self): for case, expected in self.invalid_decode_testcases: with self.subTest(case=case, expected=expected): @@ -1764,7 +1759,6 @@ def test_builtin_encode(self): self.assertEqual("pyth\xf6n.org".encode("idna"), b"xn--pythn-mua.org") self.assertEqual("pyth\xf6n.org.".encode("idna"), b"xn--pythn-mua.org.") - @unittest.expectedFailure # TODO: RUSTPYTHON; UnicodeError: label empty or too long def test_builtin_encode_invalid(self): for case, expected in self.invalid_encode_testcases: with self.subTest(case=case, expected=expected): @@ -1776,7 +1770,6 @@ def test_builtin_encode_invalid(self): self.assertEqual(exc.start, expected.start) self.assertEqual(exc.end, expected.end) - @unittest.expectedFailure # TODO: RUSTPYTHON; UnicodeError: label empty or too long def test_builtin_decode_length_limit(self): with self.assertRaisesRegex(UnicodeDecodeError, "way too long"): (b"xn--016c"+b"a"*1100).decode("idna") @@ -1818,7 +1811,6 @@ def test_incremental_decode(self): self.assertEqual(decoder.decode(b"rg."), "org.") self.assertEqual(decoder.decode(b"", True), "") - @unittest.expectedFailure # TODO: RUSTPYTHON; 'ascii' != 'idna' def test_incremental_decode_invalid(self): iterdecode_testcases = [ (b"\xFFpython.org", UnicodeDecodeError("idna", b"\xFF", 0, 1, "")), @@ -1880,7 +1872,6 @@ def test_incremental_encode(self): self.assertEqual(encoder.encode("ample.org."), b"xn--xample-9ta.org.") self.assertEqual(encoder.encode("", True), b"") - @unittest.expectedFailure # TODO: RUSTPYTHON; UnicodeError: label empty or too long def test_incremental_encode_invalid(self): iterencode_testcases = [ (f"foo.{'\xff'*60}", UnicodeEncodeError("idna", f"{'\xff'*60}", 0, 60, "")), diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py index e91d454b72..272098782d 100644 --- a/Lib/test/test_io.py +++ b/Lib/test/test_io.py @@ -39,11 +39,9 @@ from test import support from test.support.script_helper import ( assert_python_ok, assert_python_failure, run_python_until_end) -from test.support import import_helper -from test.support import os_helper -from test.support import threading_helper -from test.support import warnings_helper -from test.support import skip_if_sanitizer +from test.support import ( + import_helper, is_apple, os_helper, threading_helper, warnings_helper, +) from test.support.os_helper import FakePath import codecs @@ -66,10 +64,6 @@ def byteslike(*pos, **kw): class EmptyStruct(ctypes.Structure): pass -# Does io.IOBase finalizer log the exception if the close() method fails? -# The exception is ignored silently by default in release build. -IOBASE_EMITS_UNRAISABLE = (support.Py_DEBUG or sys.flags.dev_mode) - def _default_chunk_size(): """Get the default TextIOWrapper chunk size""" @@ -631,10 +625,10 @@ def test_raw_bytes_io(self): self.read_ops(f, True) def test_large_file_ops(self): - # On Windows and Mac OSX this test consumes large resources; It takes - # a long time to build the >2 GiB file and takes >2 GiB of disk space - # therefore the resource must be enabled to run this test. - if sys.platform[:3] == 'win' or sys.platform == 'darwin': + # On Windows and Apple platforms this test consumes large resources; It + # takes a long time to build the >2 GiB file and takes >2 GiB of disk + # space therefore the resource must be enabled to run this test. + if sys.platform[:3] == 'win' or is_apple: support.requires( 'largefile', 'test requires %s bytes and a long time to run' % self.LARGE) @@ -645,11 +639,9 @@ def test_large_file_ops(self): def test_with_open(self): for bufsize in (0, 100): - f = None with self.open(os_helper.TESTFN, "wb", bufsize) as f: f.write(b"xxx") self.assertEqual(f.closed, True) - f = None try: with self.open(os_helper.TESTFN, "wb", bufsize) as f: 1/0 @@ -788,8 +780,7 @@ def test_closefd_attr(self): file = self.open(f.fileno(), "r", encoding="utf-8", closefd=False) self.assertEqual(file.buffer.raw.closefd, False) - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test_garbage_collection(self): # FileIO objects are collected, and collecting them flushes # all data to disk. @@ -904,7 +895,7 @@ def test_bad_opener_negative_1(self): def badopener(fname, flags): return -1 with self.assertRaises(ValueError) as cm: - open('non-existent', 'r', opener=badopener) + self.open('non-existent', 'r', opener=badopener) self.assertEqual(str(cm.exception), 'opener returned -1') def test_bad_opener_other_negative(self): @@ -912,7 +903,7 @@ def test_bad_opener_other_negative(self): def badopener(fname, flags): return -2 with self.assertRaises(ValueError) as cm: - open('non-existent', 'r', opener=badopener) + self.open('non-existent', 'r', opener=badopener) self.assertEqual(str(cm.exception), 'opener returned -2') def test_opener_invalid_fd(self): @@ -1048,11 +1039,41 @@ def flush(self): # Silence destructor error R.flush = lambda self: None + @threading_helper.requires_working_threading() + def test_write_readline_races(self): + # gh-134908: Concurrent iteration over a file caused races + thread_count = 2 + write_count = 100 + read_count = 100 + + def writer(file, barrier): + barrier.wait() + for _ in range(write_count): + file.write("x") + + def reader(file, barrier): + barrier.wait() + for _ in range(read_count): + for line in file: + self.assertEqual(line, "") + + with self.open(os_helper.TESTFN, "w+") as f: + barrier = threading.Barrier(thread_count + 1) + reader = threading.Thread(target=reader, args=(f, barrier)) + writers = [threading.Thread(target=writer, args=(f, barrier)) + for _ in range(thread_count)] + with threading_helper.catch_threading_exception() as cm: + with threading_helper.start_threads(writers + [reader]): + pass + self.assertIsNone(cm.exc_type) + + self.assertEqual(os.stat(os_helper.TESTFN).st_size, + write_count * thread_count) + class CIOTest(IOTest): - # TODO: RUSTPYTHON, cyclic gc - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON; cyclic gc def test_IOBase_finalize(self): # Issue #12149: segmentation fault on _PyIOBase_finalize when both a # class which inherits IOBase and an object of this class are caught @@ -1071,10 +1092,9 @@ def close(self): support.gc_collect() self.assertIsNone(wr(), wr) - # TODO: RUSTPYTHON, AssertionError: filter ('', ResourceWarning) did not catch any warning - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON; AssertionError: filter ('', ResourceWarning) did not catch any warning def test_destructor(self): - super().test_destructor(self) + return super().test_destructor() @support.cpython_only class TestIOCTypes(unittest.TestCase): @@ -1165,9 +1185,32 @@ def test_disallow_instantiation(self): _io = self._io support.check_disallow_instantiation(self, _io._BytesIOBuffer) + def test_stringio_setstate(self): + # gh-127182: Calling __setstate__() with invalid arguments must not crash + obj = self._io.StringIO() + with self.assertRaisesRegex( + TypeError, + 'initial_value must be str or None, not int', + ): + obj.__setstate__((1, '', 0, {})) + + obj.__setstate__((None, '', 0, {})) # should not crash + self.assertEqual(obj.getvalue(), '') + + obj.__setstate__(('', '', 0, {})) + self.assertEqual(obj.getvalue(), '') + class PyIOTest(IOTest): pass + @unittest.expectedFailure # TODO: RUSTPYTHON; OSError: Negative file descriptor + def test_bad_opener_negative_1(): + return super().test_bad_opener_negative_1() + + @unittest.expectedFailure # TODO: RUSTPYTHON; OSError: Negative file descriptor + def test_bad_opener_other_negative(): + return super().test_bad_opener_other_negative() + @support.cpython_only class APIMismatchTest(unittest.TestCase): @@ -1175,7 +1218,7 @@ class APIMismatchTest(unittest.TestCase): def test_RawIOBase_io_in_pyio_match(self): """Test that pyio RawIOBase class has all c RawIOBase methods""" mismatch = support.detect_api_mismatch(pyio.RawIOBase, io.RawIOBase, - ignore=('__weakref__',)) + ignore=('__weakref__', '__static_attributes__')) self.assertEqual(mismatch, set(), msg='Python RawIOBase does not have all C RawIOBase methods') def test_RawIOBase_pyio_in_io_match(self): @@ -1244,6 +1287,7 @@ def _with(): # a ValueError. self.assertRaises(ValueError, _with) + @unittest.expectedFailure # TODO: RUSTPYTHON def test_error_through_destructor(self): # Test that the exception state is not modified by a destructor, # even if close() fails. @@ -1252,10 +1296,7 @@ def test_error_through_destructor(self): with self.assertRaises(AttributeError): self.tp(rawio).xyzzy - if not IOBASE_EMITS_UNRAISABLE: - self.assertIsNone(cm.unraisable) - elif cm.unraisable is not None: - self.assertEqual(cm.unraisable.exc_type, OSError) + self.assertEqual(cm.unraisable.exc_type, OSError) def test_repr(self): raw = self.MockRawIO() @@ -1271,11 +1312,9 @@ def test_recursive_repr(self): # Issue #25455 raw = self.MockRawIO() b = self.tp(raw) - with support.swap_attr(raw, 'name', b): - try: + with support.swap_attr(raw, 'name', b), support.infinite_recursion(25): + with self.assertRaises(RuntimeError): repr(b) # Should not crash - except RuntimeError: - pass def test_flush_error_on_close(self): # Test that buffered file is closed despite failed flush @@ -1356,6 +1395,28 @@ def test_readonly_attributes(self): with self.assertRaises(AttributeError): buf.raw = x + def test_pickling_subclass(self): + global MyBufferedIO + class MyBufferedIO(self.tp): + def __init__(self, raw, tag): + super().__init__(raw) + self.tag = tag + def __getstate__(self): + return self.tag, self.raw.getvalue() + def __setstate__(slf, state): + tag, value = state + slf.__init__(self.BytesIO(value), tag) + + raw = self.BytesIO(b'data') + buf = MyBufferedIO(raw, tag='ham') + for proto in range(pickle.HIGHEST_PROTOCOL + 1): + with self.subTest(protocol=proto): + pickled = pickle.dumps(buf, proto) + newbuf = pickle.loads(pickled) + self.assertEqual(newbuf.raw.getvalue(), b'data') + self.assertEqual(newbuf.tag, 'ham') + del MyBufferedIO + class SizeofTest: @@ -1717,20 +1778,6 @@ def test_seek_character_device_file(self): class CBufferedReaderTest(BufferedReaderTest, SizeofTest): tp = io.BufferedReader - @unittest.skip("TODO: RUSTPYTHON, fallible allocation") - @skip_if_sanitizer(memory=True, address=True, thread=True, - reason="sanitizer defaults to crashing " - "instead of returning NULL for malloc failure.") - def test_constructor(self): - BufferedReaderTest.test_constructor(self) - # The allocation can succeed on 32-bit builds, e.g. with more - # than 2 GiB RAM and a 64-bit kernel. - if sys.maxsize > 0x7FFFFFFF: - rawio = self.MockRawIO() - bufio = self.tp(rawio) - self.assertRaises((OverflowError, MemoryError, ValueError), - bufio.__init__, rawio, sys.maxsize) - def test_initialization(self): rawio = self.MockRawIO([b"abc"]) bufio = self.tp(rawio) @@ -1748,8 +1795,7 @@ def test_misbehaved_io_read(self): # checking this is not so easy. self.assertRaises(OSError, bufio.read, 10) - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test_garbage_collection(self): # C BufferedReader objects are collected. # The Python version has __del__, so it ends into gc.garbage instead @@ -1766,40 +1812,44 @@ def test_garbage_collection(self): def test_args_error(self): # Issue #17275 with self.assertRaisesRegex(TypeError, "BufferedReader"): - self.tp(io.BytesIO(), 1024, 1024, 1024) + self.tp(self.BytesIO(), 1024, 1024, 1024) def test_bad_readinto_value(self): - rawio = io.BufferedReader(io.BytesIO(b"12")) + rawio = self.tp(self.BytesIO(b"12")) rawio.readinto = lambda buf: -1 bufio = self.tp(rawio) with self.assertRaises(OSError) as cm: bufio.readline() self.assertIsNone(cm.exception.__cause__) - # TODO: RUSTPYTHON, TypeError: 'bytes' object cannot be interpreted as an integer") - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON; TypeError: 'bytes' object cannot be interpreted as an integer") def test_bad_readinto_type(self): - rawio = io.BufferedReader(io.BytesIO(b"12")) + rawio = self.tp(self.BytesIO(b"12")) rawio.readinto = lambda buf: b'' bufio = self.tp(rawio) with self.assertRaises(OSError) as cm: bufio.readline() self.assertIsInstance(cm.exception.__cause__, TypeError) - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test_flush_error_on_close(self): - super().test_flush_error_on_close() - - # TODO: RUSTPYTHON, AssertionError: UnsupportedOperation not raised by truncate - @unittest.expectedFailure - def test_truncate_on_read_only(self): # TODO: RUSTPYTHON, remove when this passes - super().test_truncate_on_read_only() # TODO: RUSTPYTHON, remove when this passes + return super().test_flush_error_on_close() - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test_seek_character_device_file(self): - super().test_seek_character_device_file() + return super().test_seek_character_device_file() + + @unittest.expectedFailure # TODO: RUSTPYTHON; AssertionError: UnsupportedOperation not raised by truncate + def test_truncate_on_read_only(self): + return super().test_truncate_on_read_only() + + @unittest.skip('TODO: RUSTPYTHON; fallible allocation') + def test_constructor(self): + return super().test_constructor() + + @unittest.expectedFailure # TODO: RUSTPYTHON + def test_pickling_subclass(self): + return super().test_pickling_subclass() class PyBufferedReaderTest(BufferedReaderTest): @@ -1909,8 +1959,7 @@ def _seekrel(bufio): def test_writes_and_truncates(self): self.check_writes(lambda bufio: bufio.truncate(bufio.tell())) - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test_write_non_blocking(self): raw = self.MockNonBlockWriterIO() bufio = self.tp(raw, 8) @@ -2107,20 +2156,6 @@ def test_slow_close_from_thread(self): class CBufferedWriterTest(BufferedWriterTest, SizeofTest): tp = io.BufferedWriter - @unittest.skip("TODO: RUSTPYTHON, fallible allocation") - @skip_if_sanitizer(memory=True, address=True, thread=True, - reason="sanitizer defaults to crashing " - "instead of returning NULL for malloc failure.") - def test_constructor(self): - BufferedWriterTest.test_constructor(self) - # The allocation can succeed on 32-bit builds, e.g. with more - # than 2 GiB RAM and a 64-bit kernel. - if sys.maxsize > 0x7FFFFFFF: - rawio = self.MockRawIO() - bufio = self.tp(rawio) - self.assertRaises((OverflowError, MemoryError, ValueError), - bufio.__init__, rawio, sys.maxsize) - def test_initialization(self): rawio = self.MockRawIO() bufio = self.tp(rawio) @@ -2131,8 +2166,7 @@ def test_initialization(self): self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1) self.assertRaises(ValueError, bufio.write, b"def") - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test_garbage_collection(self): # C BufferedWriter objects are collected, and collecting them flushes # all data to disk. @@ -2155,11 +2189,17 @@ def test_args_error(self): with self.assertRaisesRegex(TypeError, "BufferedWriter"): self.tp(self.BytesIO(), 1024, 1024, 1024) - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test_flush_error_on_close(self): - super().test_flush_error_on_close() + return super().test_flush_error_on_close() + + @unittest.skip('TODO: RUSTPYTHON; fallible allocation') + def test_constructor(self): + return super().test_constructor() + @unittest.expectedFailure # TODO: RUSTPYTHON + def test_pickling_subclass(self): + return super().test_pickling_subclass() class PyBufferedWriterTest(BufferedWriterTest): tp = pyio.BufferedWriter @@ -2637,22 +2677,7 @@ def test_interleaved_readline_write(self): class CBufferedRandomTest(BufferedRandomTest, SizeofTest): tp = io.BufferedRandom - @unittest.skip("TODO: RUSTPYTHON, fallible allocation") - @skip_if_sanitizer(memory=True, address=True, thread=True, - reason="sanitizer defaults to crashing " - "instead of returning NULL for malloc failure.") - def test_constructor(self): - BufferedRandomTest.test_constructor(self) - # The allocation can succeed on 32-bit builds, e.g. with more - # than 2 GiB RAM and a 64-bit kernel. - if sys.maxsize > 0x7FFFFFFF: - rawio = self.MockRawIO() - bufio = self.tp(rawio) - self.assertRaises((OverflowError, MemoryError, ValueError), - bufio.__init__, rawio, sys.maxsize) - - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test_garbage_collection(self): CBufferedReaderTest.test_garbage_collection(self) CBufferedWriterTest.test_garbage_collection(self) @@ -2662,20 +2687,25 @@ def test_args_error(self): with self.assertRaisesRegex(TypeError, "BufferedRandom"): self.tp(self.BytesIO(), 1024, 1024, 1024) - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test_flush_error_on_close(self): - super().test_flush_error_on_close() + return super().test_flush_error_on_close() - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test_seek_character_device_file(self): - super().test_seek_character_device_file() + return super().test_seek_character_device_file() - # TODO: RUSTPYTHON; f.read1(1) returns b'a' - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON; f.read1(1) returns b'a' def test_read1_after_write(self): - super().test_read1_after_write() + return super().test_read1_after_write() + + @unittest.skip('TODO: RUSTPYTHON; fallible allocation') + def test_constructor(self): + return super().test_constructor() + + @unittest.expectedFailure # TODO: RUSTPYTHON + def test_pickling_subclass(self): + return super().test_pickling_subclass() class PyBufferedRandomTest(BufferedRandomTest): @@ -2935,11 +2965,16 @@ def test_recursive_repr(self): # Issue #25455 raw = self.BytesIO() t = self.TextIOWrapper(raw, encoding="utf-8") - with support.swap_attr(raw, 'name', t): - try: + with support.swap_attr(raw, 'name', t), support.infinite_recursion(25): + with self.assertRaises(RuntimeError): repr(t) # Should not crash - except RuntimeError: - pass + + def test_subclass_repr(self): + class TestSubclass(self.TextIOWrapper): + pass + + f = TestSubclass(self.StringIO()) + self.assertIn(TestSubclass.__name__, repr(f)) def test_line_buffering(self): r = self.BytesIO() @@ -3166,6 +3201,7 @@ def flush(self): support.gc_collect() self.assertEqual(record, [1, 2, 3]) + @unittest.expectedFailure # TODO: RUSTPYTHON def test_error_through_destructor(self): # Test that the exception state is not modified by a destructor, # even if close() fails. @@ -3174,10 +3210,7 @@ def test_error_through_destructor(self): with self.assertRaises(AttributeError): self.TextIOWrapper(rawio, encoding="utf-8").xyzzy - if not IOBASE_EMITS_UNRAISABLE: - self.assertIsNone(cm.unraisable) - elif cm.unraisable is not None: - self.assertEqual(cm.unraisable.exc_type, OSError) + self.assertEqual(cm.unraisable.exc_type, OSError) # Systematic tests of the text I/O API @@ -3327,8 +3360,7 @@ def test_seek_and_tell_with_data(data, min_pos=0): finally: StatefulIncrementalDecoder.codecEnabled = 0 - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test_multibyte_seek_and_tell(self): f = self.open(os_helper.TESTFN, "w", encoding="euc_jp") f.write("AB\n\u3046\u3048\n") @@ -3344,8 +3376,7 @@ def test_multibyte_seek_and_tell(self): self.assertEqual(f.tell(), p1) f.close() - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test_seek_with_encoder_state(self): f = self.open(os_helper.TESTFN, "w", encoding="euc_jis_2004") f.write("\u00e6\u0300") @@ -3359,8 +3390,7 @@ def test_seek_with_encoder_state(self): self.assertEqual(f.readline(), "\u00e6\u0300\u0300") f.close() - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test_encoded_writes(self): data = "1234567890" tests = ("utf-16", @@ -3499,8 +3529,7 @@ def test_issue2282(self): self.assertEqual(buffer.seekable(), txt.seekable()) - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test_append_bom(self): # The BOM is not written again when appending to a non-empty file filename = os_helper.TESTFN @@ -3516,8 +3545,7 @@ def test_append_bom(self): with self.open(filename, 'rb') as f: self.assertEqual(f.read(), 'aaaxxx'.encode(charset)) - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test_seek_bom(self): # Same test, but when seeking manually filename = os_helper.TESTFN @@ -3533,8 +3561,7 @@ def test_seek_bom(self): with self.open(filename, 'rb') as f: self.assertEqual(f.read(), 'bbbzzz'.encode(charset)) - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test_seek_append_bom(self): # Same test, but first seek to the start and then to the end filename = os_helper.TESTFN @@ -3795,17 +3822,14 @@ def _check_create_at_shutdown(self, **kwargs): codecs.lookup('utf-8') class C: - def __init__(self): - self.buf = io.BytesIO() def __del__(self): - io.TextIOWrapper(self.buf, **{kwargs}) + io.TextIOWrapper(io.BytesIO(), **{kwargs}) print("ok") c = C() """.format(iomod=iomod, kwargs=kwargs) return assert_python_ok("-c", code) - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test_create_at_shutdown_without_encoding(self): rc, out, err = self._check_create_at_shutdown() if err: @@ -3815,8 +3839,7 @@ def test_create_at_shutdown_without_encoding(self): else: self.assertEqual("ok", out.decode().strip()) - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test_create_at_shutdown_with_encoding(self): rc, out, err = self._check_create_at_shutdown(encoding='utf-8', errors='strict') @@ -4042,6 +4065,28 @@ def test_issue35928(self): f.write(res) self.assertEqual(res + f.readline(), 'foo\nbar\n') + def test_pickling_subclass(self): + global MyTextIO + class MyTextIO(self.TextIOWrapper): + def __init__(self, raw, tag): + super().__init__(raw) + self.tag = tag + def __getstate__(self): + return self.tag, self.buffer.getvalue() + def __setstate__(slf, state): + tag, value = state + slf.__init__(self.BytesIO(value), tag) + + raw = self.BytesIO(b'data') + txt = MyTextIO(raw, 'ham') + for proto in range(pickle.HIGHEST_PROTOCOL + 1): + with self.subTest(protocol=proto): + pickled = pickle.dumps(txt, proto) + newtxt = pickle.loads(pickled) + self.assertEqual(newtxt.buffer.getvalue(), b'data') + self.assertEqual(newtxt.tag, 'ham') + del MyTextIO + class MemviewBytesIO(io.BytesIO): '''A BytesIO object whose read method returns memoryviews @@ -4066,98 +4111,7 @@ class CTextIOWrapperTest(TextIOWrapperTest): io = io shutdown_error = "LookupError: unknown encoding: ascii" - # TODO: RUSTPYTHON - @unittest.expectedFailure - def test_constructor(self): - super().test_constructor() - - # TODO: RUSTPYTHON - @unittest.expectedFailure - def test_detach(self): - super().test_detach() - - # TODO: RUSTPYTHON - @unittest.expectedFailure - def test_reconfigure_encoding_read(self): - super().test_reconfigure_encoding_read() - - # TODO: RUSTPYTHON - @unittest.expectedFailure - def test_reconfigure_line_buffering(self): - super().test_reconfigure_line_buffering() - - # TODO: RUSTPYTHON - @unittest.expectedFailure - def test_basic_io(self): - super().test_basic_io() - - # TODO: RUSTPYTHON - @unittest.expectedFailure - def test_telling(self): - super().test_telling() - - # TODO: RUSTPYTHON - @unittest.expectedFailure - def test_uninitialized(self): - super().test_uninitialized() - - # TODO: RUSTPYTHON - @unittest.expectedFailure - def test_non_text_encoding_codecs_are_rejected(self): - super().test_non_text_encoding_codecs_are_rejected() - - # TODO: RUSTPYTHON - @unittest.expectedFailure - def test_repr(self): - super().test_repr() - - # TODO: RUSTPYTHON - @unittest.expectedFailure - def test_newlines(self): - super().test_newlines() - - # TODO: RUSTPYTHON - @unittest.expectedFailure - def test_newlines_input(self): - super().test_newlines_input() - - # TODO: RUSTPYTHON - @unittest.expectedFailure - def test_reconfigure_write_through(self): - super().test_reconfigure_write_through() - - # TODO: RUSTPYTHON - @unittest.expectedFailure - def test_reconfigure_write_fromascii(self): - super().test_reconfigure_write_fromascii() - - # TODO: RUSTPYTHON - @unittest.expectedFailure - def test_reconfigure_write(self): - super().test_reconfigure_write() - - # TODO: RUSTPYTHON - @unittest.expectedFailure - def test_reconfigure_defaults(self): - super().test_reconfigure_defaults() - - # TODO: RUSTPYTHON - @unittest.expectedFailure - def test_reconfigure_newline(self): - super().test_reconfigure_newline() - - # TODO: RUSTPYTHON - @unittest.expectedFailure - def test_reconfigure_errors(self): - super().test_reconfigure_errors() - - # TODO: RUSTPYTHON - @unittest.expectedFailure - def test_reconfigure_locale(self): - super().test_reconfigure_locale() - - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test_initialization(self): r = self.BytesIO(b"\xc3\xa9\n\n") b = self.BufferedReader(r, 1000) @@ -4168,8 +4122,7 @@ def test_initialization(self): t = self.TextIOWrapper.__new__(self.TextIOWrapper) self.assertRaises(Exception, repr, t) - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test_garbage_collection(self): # C TextIOWrapper objects are collected, and collecting them flushes # all data to disk. @@ -4233,20 +4186,121 @@ def write(self, data): t.write("x"*chunk_size) self.assertEqual([b"abcdef", b"ghi", b"x"*chunk_size], buf._write_stack) + @unittest.expectedFailure # TODO: RUSTPYTHON + def test_issue119506(self): + chunk_size = 8192 + + class MockIO(self.MockRawIO): + written = False + def write(self, data): + if not self.written: + self.written = True + t.write("middle") + return super().write(data) + + buf = MockIO() + t = self.TextIOWrapper(buf) + t.write("abc") + t.write("def") + # writing data which size >= chunk_size cause flushing buffer before write. + t.write("g" * chunk_size) + t.flush() + + self.assertEqual([b"abcdef", b"middle", b"g"*chunk_size], + buf._write_stack) + + @unittest.expectedFailure # TODO: RUSTPYTHON + def test_basic_io(self): + return super().test_basic_io() + + @unittest.expectedFailure # TODO: RUSTPYTHON + def test_constructor(self): + return super().test_constructor() + + @unittest.expectedFailure # TODO: RUSTPYTHON + def test_detach(self): + return super().test_detach() + + @unittest.expectedFailure # TODO: RUSTPYTHON + def test_newlines(self): + return super().test_newlines() + + @unittest.expectedFailure # TODO: RUSTPYTHON + def test_newlines_input(self): + return super().test_newlines_input() + + @unittest.expectedFailure # TODO: RUSTPYTHON + def test_non_text_encoding_codecs_are_rejected(self): + return super().test_non_text_encoding_codecs_are_rejected() + + @unittest.expectedFailure # TODO: RUSTPYTHON + def test_reconfigure_defaults(self): + return super().test_reconfigure_defaults() + + @unittest.expectedFailure # TODO: RUSTPYTHON + def test_reconfigure_encoding_read(self): + return super().test_reconfigure_encoding_read() + + @unittest.expectedFailure # TODO: RUSTPYTHON + def test_reconfigure_errors(self): + return super().test_reconfigure_errors() + + @unittest.expectedFailure # TODO: RUSTPYTHON + def test_reconfigure_line_buffering(self): + return super().test_reconfigure_line_buffering() + + @unittest.expectedFailure # TODO: RUSTPYTHON + def test_reconfigure_locale(self): + return super().test_reconfigure_locale() + + @unittest.expectedFailure # TODO: RUSTPYTHON + def test_reconfigure_newline(self): + return super().test_reconfigure_newline() + + @unittest.expectedFailure # TODO: RUSTPYTHON + def test_reconfigure_write(self): + return super().test_reconfigure_write() + + @unittest.expectedFailure # TODO: RUSTPYTHON + def test_reconfigure_write_fromascii(self): + return super().test_reconfigure_write_fromascii() + + @unittest.expectedFailure # TODO: RUSTPYTHON + def test_reconfigure_write_through(self): + return super().test_reconfigure_write_through() + + @unittest.expectedFailure # TODO: RUSTPYTHON + def test_repr(self): + return super().test_repr() + + @unittest.expectedFailure # TODO: RUSTPYTHON + def test_telling(self): + return super().test_telling() + + @unittest.expectedFailure # TODO: RUSTPYTHON + def test_uninitialized(self): + return super().test_uninitialized() + + @unittest.expectedFailure # TODO: RUSTPYTHON + def test_recursive_repr(self): + return super().test_recursive_repr() + + @unittest.expectedFailure # TODO: RUSTPYTHON + def test_pickling_subclass(self): + return super().test_pickling_subclass() + class PyTextIOWrapperTest(TextIOWrapperTest): io = pyio shutdown_error = "LookupError: unknown encoding: ascii" - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON; AssertionError: ValueError not raised def test_constructor(self): - super().test_constructor() + return super().test_constructor() - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test_newlines(self): - super().test_newlines() + return super().test_newlines() class IncrementalNewlineDecoderTest(unittest.TestCase): @@ -4326,8 +4380,7 @@ def _decode_bytewise(s): self.assertEqual(decoder.decode(input), "abc") self.assertEqual(decoder.newlines, None) - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test_newline_decoder(self): encodings = ( # None meaning the IncrementalNewlineDecoder takes unicode input @@ -4489,8 +4542,7 @@ def test_io_after_close(self): self.assertRaises(ValueError, f.writelines, []) self.assertRaises(ValueError, next, f) - # TODO: RUSTPYTHON, cyclic gc - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON; cyclic gc def test_blockingioerror(self): # Various BlockingIOError issues class C(str): @@ -4538,15 +4590,14 @@ def test_abc_inheritance_official(self): self._check_abc_inheritance(io) def _check_warn_on_dealloc(self, *args, **kwargs): - f = open(*args, **kwargs) + f = self.open(*args, **kwargs) r = repr(f) with self.assertWarns(ResourceWarning) as cm: f = None support.gc_collect() self.assertIn(r, str(cm.warning.args[0])) - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test_warn_on_dealloc(self): self._check_warn_on_dealloc(os_helper.TESTFN, "wb", buffering=0) self._check_warn_on_dealloc(os_helper.TESTFN, "wb") @@ -4569,10 +4620,9 @@ def cleanup_fds(): r, w = os.pipe() fds += r, w with warnings_helper.check_no_resource_warning(self): - open(r, *args, closefd=False, **kwargs) + self.open(r, *args, closefd=False, **kwargs) - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()") def test_warn_on_dealloc_fd(self): self._check_warn_on_dealloc_fd("rb", buffering=0) @@ -4602,16 +4652,14 @@ def test_pickling(self): with self.assertRaisesRegex(TypeError, msg): pickle.dumps(f, protocol) - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON @unittest.skipIf( support.is_emscripten, "fstat() of a pipe fd is not supported" ) def test_nonblock_pipe_write_bigbuf(self): self._test_nonblock_pipe_write(16*1024) - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON @unittest.skipIf( support.is_emscripten, "fstat() of a pipe fd is not supported" ) @@ -4733,8 +4781,7 @@ def test_check_encoding_errors(self): proc = assert_python_failure('-X', 'dev', '-c', code) self.assertEqual(proc.rc, 10, proc) - # TODO: RUSTPYTHON, AssertionError: 0 != 2 - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON; AssertionError: 0 != 2 def test_check_encoding_warning(self): # PEP 597: Raise warning when encoding is not specified # and sys.flags.warn_default_encoding is set. @@ -4758,8 +4805,7 @@ def test_check_encoding_warning(self): self.assertTrue( warnings[1].startswith(b":8: EncodingWarning: ")) - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test_text_encoding(self): # PEP 597, bpo-47000. io.text_encoding() returns "locale" or "utf-8" # based on sys.flags.utf8_mode @@ -4837,10 +4883,9 @@ def test_daemon_threads_shutdown_stdout_deadlock(self): def test_daemon_threads_shutdown_stderr_deadlock(self): self.check_daemon_threads_shutdown_deadlock('stderr') - # TODO: RUSTPYTHON, AssertionError: 22 != 10 : _PythonRunResult(rc=22, out=b'', err=b'') - @unittest.expectedFailure - def test_check_encoding_errors(self): # TODO: RUSTPYTHON, remove when this passes - super().test_check_encoding_errors() # TODO: RUSTPYTHON, remove when this passes + @unittest.expectedFailure # TODO: RUSTPYTHON; AssertionError: 22 != 10 : _PythonRunResult(rc=22, out=b'', err=b'') + def test_check_encoding_errors(self): + return super().test_check_encoding_errors() class PyMiscIOTest(MiscIOTest): @@ -5014,16 +5059,14 @@ def alarm_handler(sig, frame): os.close(w) os.close(r) - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON @requires_alarm @support.requires_resource('walltime') def test_interrupted_read_retry_buffered(self): self.check_interrupted_read_retry(lambda x: x.decode('latin1'), mode="rb") - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON @requires_alarm @support.requires_resource('walltime') def test_interrupted_read_retry_text(self): @@ -5098,15 +5141,13 @@ def alarm2(sig, frame): if e.errno != errno.EBADF: raise - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON @requires_alarm @support.requires_resource('walltime') def test_interrupted_write_retry_buffered(self): self.check_interrupted_write_retry(b"x", mode="wb") - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON @requires_alarm @support.requires_resource('walltime') def test_interrupted_write_retry_text(self):