|
1 |
| -import unittest |
| 1 | +import codecs |
| 2 | +import contextlib |
| 3 | +import io |
2 | 4 | import sys
|
| 5 | +import unittest |
| 6 | +import unittest.mock as mock |
| 7 | +import _testcapi |
3 | 8 | from test.support import import_helper
|
4 | 9 |
|
5 | 10 | _testlimitedcapi = import_helper.import_module('_testlimitedcapi')
|
6 | 11 |
|
7 | 12 | NULL = None
|
8 | 13 |
|
9 | 14 |
|
10 |
| -class CAPITest(unittest.TestCase): |
| 15 | +class CAPIUnicodeTest(unittest.TestCase): |
11 | 16 | # TODO: Test the following functions:
|
12 | 17 | #
|
13 | 18 | # PyUnicode_BuildEncodingMap
|
@@ -516,5 +521,228 @@ def test_asrawunicodeescapestring(self):
|
516 | 521 | # CRASHES asrawunicodeescapestring(NULL)
|
517 | 522 |
|
518 | 523 |
|
| 524 | +class CAPICodecRegistration(unittest.TestCase): |
| 525 | + |
| 526 | + def setUp(self): |
| 527 | + self.enterContext(import_helper.isolated_modules()) |
| 528 | + self.enterContext(import_helper.CleanImport('codecs')) |
| 529 | + self.codecs = import_helper.import_module('codecs') |
| 530 | + # Encoding names are normalized internally by converting them |
| 531 | + # to lowercase and their hyphens are replaced by underscores. |
| 532 | + self.encoding_name = f'codec_reversed_{id(self)}' |
| 533 | + # make sure that our custom codec is not already registered |
| 534 | + self.assertRaises(LookupError, self.codecs.lookup, self.encoding_name) |
| 535 | + # create the search function without registering yet |
| 536 | + self._create_custom_codec() |
| 537 | + |
| 538 | + def _create_custom_codec(self): |
| 539 | + def codec_encoder(m, errors='strict'): |
| 540 | + return (type(m)().join(reversed(m)), len(m)) |
| 541 | + |
| 542 | + def codec_decoder(c, errors='strict'): |
| 543 | + return (type(c)().join(reversed(c)), len(c)) |
| 544 | + |
| 545 | + class IncrementalEncoder(codecs.IncrementalEncoder): |
| 546 | + def encode(self, input, final=False): |
| 547 | + return codec_encoder(input) |
| 548 | + |
| 549 | + class IncrementalDecoder(codecs.IncrementalDecoder): |
| 550 | + def decode(self, input, final=False): |
| 551 | + return codec_decoder(input) |
| 552 | + |
| 553 | + class StreamReader(codecs.StreamReader): |
| 554 | + def encode(self, input, errors='strict'): |
| 555 | + return codec_encoder(input, errors=errors) |
| 556 | + |
| 557 | + def decode(self, input, errors='strict'): |
| 558 | + return codec_decoder(input, errors=errors) |
| 559 | + |
| 560 | + class StreamWriter(codecs.StreamWriter): |
| 561 | + def encode(self, input, errors='strict'): |
| 562 | + return codec_encoder(input, errors=errors) |
| 563 | + |
| 564 | + def decode(self, input, errors='strict'): |
| 565 | + return codec_decoder(input, errors=errors) |
| 566 | + |
| 567 | + info = codecs.CodecInfo( |
| 568 | + encode=codec_encoder, |
| 569 | + decode=codec_decoder, |
| 570 | + streamreader=StreamReader, |
| 571 | + streamwriter=StreamWriter, |
| 572 | + incrementalencoder=IncrementalEncoder, |
| 573 | + incrementaldecoder=IncrementalDecoder, |
| 574 | + name=self.encoding_name |
| 575 | + ) |
| 576 | + |
| 577 | + def search_function(encoding): |
| 578 | + if encoding == self.encoding_name: |
| 579 | + return info |
| 580 | + return None |
| 581 | + |
| 582 | + self.codec_info = info |
| 583 | + self.search_function = search_function |
| 584 | + |
| 585 | + @contextlib.contextmanager |
| 586 | + def use_custom_encoder(self): |
| 587 | + self.assertRaises(LookupError, self.codecs.lookup, self.encoding_name) |
| 588 | + self.codecs.register(self.search_function) |
| 589 | + yield |
| 590 | + self.codecs.unregister(self.search_function) |
| 591 | + self.assertRaises(LookupError, self.codecs.lookup, self.encoding_name) |
| 592 | + |
| 593 | + def test_codec_register(self): |
| 594 | + search_function, encoding = self.search_function, self.encoding_name |
| 595 | + self.assertIsNone(_testcapi.codec_register(search_function)) |
| 596 | + self.assertIs(self.codecs.lookup(encoding), search_function(encoding)) |
| 597 | + self.assertEqual(self.codecs.encode('123', encoding=encoding), '321') |
| 598 | + |
| 599 | + def test_codec_unregister(self): |
| 600 | + search_function, encoding = self.search_function, self.encoding_name |
| 601 | + self.assertRaises(LookupError, self.codecs.lookup, encoding) |
| 602 | + self.codecs.register(search_function) |
| 603 | + self.assertIsNone(_testcapi.codec_unregister(search_function)) |
| 604 | + self.assertRaises(LookupError, self.codecs.lookup, encoding) |
| 605 | + |
| 606 | + def test_codec_known_encoding(self): |
| 607 | + self.assertRaises(LookupError, self.codecs.lookup, 'unknown-codec') |
| 608 | + self.assertFalse(_testcapi.codec_known_encoding('unknown-codec')) |
| 609 | + self.assertFalse(_testcapi.codec_known_encoding('unknown_codec')) |
| 610 | + self.assertFalse(_testcapi.codec_known_encoding('UNKNOWN-codec')) |
| 611 | + |
| 612 | + encoding_name = self.encoding_name |
| 613 | + self.assertRaises(LookupError, self.codecs.lookup, encoding_name) |
| 614 | + self.codecs.register(self.search_function) |
| 615 | + |
| 616 | + for name in [ |
| 617 | + encoding_name, |
| 618 | + encoding_name.upper(), |
| 619 | + encoding_name.replace('_', '-'), |
| 620 | + ]: |
| 621 | + with self.subTest(name): |
| 622 | + self.assertTrue(_testcapi.codec_known_encoding(name)) |
| 623 | + |
| 624 | + def test_codec_encode(self): |
| 625 | + encode = _testcapi.codec_encode |
| 626 | + self.assertEqual(encode('a', 'utf-8', NULL), b'a') |
| 627 | + self.assertEqual(encode('a', 'utf-8', 'strict'), b'a') |
| 628 | + self.assertEqual(encode('é', 'ascii', 'ignore'), b'') |
| 629 | + # todo: add more cases |
| 630 | + self.assertRaises(TypeError, encode, NULL, 'ascii', 'strict') |
| 631 | + # CRASHES encode('a', NULL, 'strict') |
| 632 | + |
| 633 | + def test_codec_decode(self): |
| 634 | + decode = _testcapi.codec_decode |
| 635 | + |
| 636 | + b = b'a\xc2\xa1\xe4\xbd\xa0\xf0\x9f\x98\x80' |
| 637 | + s = 'a\xa1\u4f60\U0001f600' |
| 638 | + |
| 639 | + self.assertEqual(decode(b, 'utf-8', 'strict'), s) |
| 640 | + self.assertEqual(decode(b, 'utf-8', NULL), s) |
| 641 | + self.assertEqual(decode(b, 'latin1', 'strict'), b.decode('latin1')) |
| 642 | + self.assertRaises(UnicodeDecodeError, decode, b, 'ascii', 'strict') |
| 643 | + self.assertRaises(UnicodeDecodeError, decode, b, 'ascii', NULL) |
| 644 | + self.assertEqual(decode(b, 'ascii', 'replace'), 'a' + '\ufffd'*9) |
| 645 | + # todo: add more cases |
| 646 | + |
| 647 | + # _codecs.decode only reports unknown errors policy when they are |
| 648 | + # used (it has a fast path for empty bytes); this is different from |
| 649 | + # PyUnicode_Decode which checks that both the encoding and the errors |
| 650 | + # policy are recognized. |
| 651 | + self.assertEqual(decode(b'', 'utf-8', 'unknown-errors-policy'), '') |
| 652 | + |
| 653 | + self.assertRaises(TypeError, decode, NULL, 'ascii', 'strict') |
| 654 | + # CRASHES decode(b, NULL, 'strict') |
| 655 | + |
| 656 | + def test_codec_encoder(self): |
| 657 | + with self.use_custom_encoder(): |
| 658 | + encoder = _testcapi.codec_encoder(self.encoding_name) |
| 659 | + self.assertIs(encoder, self.codec_info.encode) |
| 660 | + |
| 661 | + def test_codec_decoder(self): |
| 662 | + with self.use_custom_encoder(): |
| 663 | + decoder = _testcapi.codec_decoder(self.encoding_name) |
| 664 | + self.assertIs(decoder, self.codec_info.decode) |
| 665 | + |
| 666 | + def test_codec_incremental_encoder(self): |
| 667 | + with self.use_custom_encoder(): |
| 668 | + encoder = _testcapi.codec_incremental_encoder(self.encoding_name, 'strict') |
| 669 | + self.assertIsInstance(encoder, self.codec_info.incrementalencoder) |
| 670 | + |
| 671 | + def test_codec_incremental_decoder(self): |
| 672 | + with self.use_custom_encoder(): |
| 673 | + decoder = _testcapi.codec_incremental_decoder(self.encoding_name, 'strict') |
| 674 | + self.assertIsInstance(decoder, self.codec_info.incrementaldecoder) |
| 675 | + |
| 676 | + def test_codec_stream_reader(self): |
| 677 | + with self.use_custom_encoder(): |
| 678 | + encoding, stream = self.encoding_name, io.StringIO() |
| 679 | + reader = _testcapi.codec_stream_reader(encoding, stream, 'strict') |
| 680 | + self.assertIsInstance(reader, self.codec_info.streamreader) |
| 681 | + |
| 682 | + def test_codec_stream_writer(self): |
| 683 | + with self.use_custom_encoder(): |
| 684 | + encoding, stream = self.encoding_name, io.StringIO() |
| 685 | + writer = _testcapi.codec_stream_writer(encoding, stream, 'strict') |
| 686 | + self.assertIsInstance(writer, self.codec_info.streamwriter) |
| 687 | + |
| 688 | +class CAPICodecErrors(unittest.TestCase): |
| 689 | + |
| 690 | + def setUp(self): |
| 691 | + self.enterContext(import_helper.isolated_modules()) |
| 692 | + self.enterContext(import_helper.CleanImport('codecs')) |
| 693 | + self.codecs = import_helper.import_module('codecs') |
| 694 | + |
| 695 | + def test_codec_register_error(self): |
| 696 | + self.assertRaises(LookupError, _testcapi.codec_lookup_error, 'custom') |
| 697 | + |
| 698 | + def error_handler(exc): |
| 699 | + raise exc |
| 700 | + |
| 701 | + error_handler = mock.Mock(wraps=error_handler) |
| 702 | + _testcapi.codec_register_error('custom', error_handler) |
| 703 | + |
| 704 | + self.assertRaises(UnicodeEncodeError, self.codecs.encode, |
| 705 | + '\xff', 'ascii', errors='custom') |
| 706 | + error_handler.assert_called_once() |
| 707 | + error_handler.reset_mock() |
| 708 | + |
| 709 | + self.assertRaises(UnicodeDecodeError, self.codecs.decode, |
| 710 | + b'\xff', 'ascii', errors='custom') |
| 711 | + error_handler.assert_called_once() |
| 712 | + |
| 713 | + def test_codec_lookup_error(self): |
| 714 | + codec_lookup_error = _testcapi.codec_lookup_error |
| 715 | + self.assertIs(codec_lookup_error(NULL), self.codecs.strict_errors) |
| 716 | + self.assertIs(codec_lookup_error('strict'), self.codecs.strict_errors) |
| 717 | + self.assertIs(codec_lookup_error('ignore'), self.codecs.ignore_errors) |
| 718 | + self.assertIs(codec_lookup_error('replace'), self.codecs.replace_errors) |
| 719 | + self.assertIs(codec_lookup_error('xmlcharrefreplace'), self.codecs.xmlcharrefreplace_errors) |
| 720 | + self.assertIs(codec_lookup_error('namereplace'), self.codecs.namereplace_errors) |
| 721 | + self.assertRaises(LookupError, codec_lookup_error, 'custom') |
| 722 | + |
| 723 | + def test_codec_error_handlers(self): |
| 724 | + exceptions = [ |
| 725 | + UnicodeEncodeError('bad', '', 0, 1, 'reason'), |
| 726 | + UnicodeEncodeError('bad', 'x', 0, 1, 'reason'), |
| 727 | + UnicodeEncodeError('bad', 'xyz123', 0, 1, 'reason'), |
| 728 | + UnicodeEncodeError('bad', 'xyz123', 1, 4, 'reason'), |
| 729 | + ] |
| 730 | + |
| 731 | + strict_handler = _testcapi.codec_strict_errors |
| 732 | + for exc in exceptions: |
| 733 | + with self.subTest(handler=strict_handler, exc=exc): |
| 734 | + self.assertRaises(UnicodeEncodeError, strict_handler, exc) |
| 735 | + |
| 736 | + for handler in [ |
| 737 | + _testcapi.codec_ignore_errors, |
| 738 | + _testcapi.codec_replace_errors, |
| 739 | + _testcapi.codec_xmlcharrefreplace_errors, |
| 740 | + _testcapi.codec_namereplace_errors, |
| 741 | + ]: |
| 742 | + for exc in exceptions: |
| 743 | + with self.subTest(handler=handler, exc=exc): |
| 744 | + handler(exc) |
| 745 | + |
| 746 | + |
519 | 747 | if __name__ == "__main__":
|
520 | 748 | unittest.main()
|
0 commit comments