From ffea61b54035f85e3c047c0c703e084c7ad19cc4 Mon Sep 17 00:00:00 2001 From: Andrey Maltsev Date: Tue, 4 Apr 2023 10:06:41 +0000 Subject: [PATCH] Update test_mmap.py from Cpython v3.11.2 --- Lib/test/test_mmap.py | 108 ++++++++++++++++++++++++++++++++++++++---- 1 file changed, 99 insertions(+), 9 deletions(-) diff --git a/Lib/test/test_mmap.py b/Lib/test/test_mmap.py index fa371a291d..ed96be53cc 100644 --- a/Lib/test/test_mmap.py +++ b/Lib/test/test_mmap.py @@ -1,11 +1,15 @@ -from test.support import (requires, _2G, _4G, gc_collect, cpython_only) +from test.support import ( + requires, _2G, _4G, gc_collect, cpython_only, is_emscripten +) from test.support.import_helper import import_module from test.support.os_helper import TESTFN, unlink import unittest import os import re import itertools +import random import socket +import string import sys import weakref @@ -14,6 +18,16 @@ PAGESIZE = mmap.PAGESIZE +tagname_prefix = f'python_{os.getpid()}_test_mmap' +def random_tagname(length=10): + suffix = ''.join(random.choices(string.ascii_uppercase, k=length)) + return f'{tagname_prefix}_{suffix}' + +# Python's mmap module dup()s the file descriptor. Emscripten's FS layer +# does not materialize file changes through a dupped fd to a new mmap. +if is_emscripten: + raise unittest.SkipTest("incompatible with Emscripten's mmap emulation.") + class MmapTests(unittest.TestCase): @@ -609,11 +623,13 @@ def test_tagname(self): data1 = b"0123456789" data2 = b"abcdefghij" assert len(data1) == len(data2) + tagname1 = random_tagname() + tagname2 = random_tagname() # Test same tag - m1 = mmap.mmap(-1, len(data1), tagname="foo") + m1 = mmap.mmap(-1, len(data1), tagname=tagname1) m1[:] = data1 - m2 = mmap.mmap(-1, len(data2), tagname="foo") + m2 = mmap.mmap(-1, len(data2), tagname=tagname1) m2[:] = data2 self.assertEqual(m1[:], data2) self.assertEqual(m2[:], data2) @@ -621,9 +637,9 @@ def test_tagname(self): m1.close() # Test different tag - m1 = mmap.mmap(-1, len(data1), tagname="foo") + m1 = mmap.mmap(-1, len(data1), tagname=tagname1) m1[:] = data1 - m2 = mmap.mmap(-1, len(data2), tagname="boo") + m2 = mmap.mmap(-1, len(data2), tagname=tagname2) m2[:] = data2 self.assertEqual(m1[:], data1) self.assertEqual(m2[:], data2) @@ -634,7 +650,7 @@ def test_tagname(self): @unittest.skipUnless(os.name == 'nt', 'requires Windows') def test_sizeof(self): m1 = mmap.mmap(-1, 100) - tagname = "foo" + tagname = random_tagname() m2 = mmap.mmap(-1, 100, tagname=tagname) self.assertEqual(sys.getsizeof(m2), sys.getsizeof(m1) + len(tagname) + 1) @@ -642,9 +658,10 @@ def test_sizeof(self): @unittest.skipUnless(os.name == 'nt', 'requires Windows') def test_crasher_on_windows(self): # Should not crash (Issue 1733986) - m = mmap.mmap(-1, 1000, tagname="foo") + tagname = random_tagname() + m = mmap.mmap(-1, 1000, tagname=tagname) try: - mmap.mmap(-1, 5000, tagname="foo")[:] # same tagname, but larger size + mmap.mmap(-1, 5000, tagname=tagname)[:] # same tagname, but larger size except: pass m.close() @@ -707,7 +724,6 @@ def test_write_returning_the_number_of_bytes_written(self): self.assertEqual(mm.write(b"yz"), 2) self.assertEqual(mm.write(b"python"), 6) - @unittest.skipIf(os.name == 'nt', 'cannot resize anonymous mmaps on Windows') def test_resize_past_pos(self): m = mmap.mmap(-1, 8192) self.addCleanup(m.close) @@ -797,6 +813,80 @@ def test_madvise(self): self.assertEqual(m.madvise(mmap.MADV_NORMAL, 0, 2), None) self.assertEqual(m.madvise(mmap.MADV_NORMAL, 0, size), None) + @unittest.skipUnless(os.name == 'nt', 'requires Windows') + def test_resize_up_when_mapped_to_pagefile(self): + """If the mmap is backed by the pagefile ensure a resize up can happen + and that the original data is still in place + """ + start_size = PAGESIZE + new_size = 2 * start_size + data = bytes(random.getrandbits(8) for _ in range(start_size)) + + m = mmap.mmap(-1, start_size) + m[:] = data + m.resize(new_size) + self.assertEqual(len(m), new_size) + self.assertEqual(m[:start_size], data[:start_size]) + + @unittest.skipUnless(os.name == 'nt', 'requires Windows') + def test_resize_down_when_mapped_to_pagefile(self): + """If the mmap is backed by the pagefile ensure a resize down up can happen + and that a truncated form of the original data is still in place + """ + start_size = PAGESIZE + new_size = start_size // 2 + data = bytes(random.getrandbits(8) for _ in range(start_size)) + + m = mmap.mmap(-1, start_size) + m[:] = data + m.resize(new_size) + self.assertEqual(len(m), new_size) + self.assertEqual(m[:new_size], data[:new_size]) + + @unittest.skipUnless(os.name == 'nt', 'requires Windows') + def test_resize_fails_if_mapping_held_elsewhere(self): + """If more than one mapping is held against a named file on Windows, neither + mapping can be resized + """ + start_size = 2 * PAGESIZE + reduced_size = PAGESIZE + + f = open(TESTFN, 'wb+') + f.truncate(start_size) + try: + m1 = mmap.mmap(f.fileno(), start_size) + m2 = mmap.mmap(f.fileno(), start_size) + with self.assertRaises(OSError): + m1.resize(reduced_size) + with self.assertRaises(OSError): + m2.resize(reduced_size) + m2.close() + m1.resize(reduced_size) + self.assertEqual(m1.size(), reduced_size) + self.assertEqual(os.stat(f.fileno()).st_size, reduced_size) + finally: + f.close() + + @unittest.skipUnless(os.name == 'nt', 'requires Windows') + def test_resize_succeeds_with_error_for_second_named_mapping(self): + """If a more than one mapping exists of the same name, none of them can + be resized: they'll raise an Exception and leave the original mapping intact + """ + start_size = 2 * PAGESIZE + reduced_size = PAGESIZE + tagname = random_tagname() + data_length = 8 + data = bytes(random.getrandbits(8) for _ in range(data_length)) + + m1 = mmap.mmap(-1, start_size, tagname=tagname) + m2 = mmap.mmap(-1, start_size, tagname=tagname) + m1[:data_length] = data + self.assertEqual(m2[:data_length], data) + with self.assertRaises(OSError): + m1.resize(reduced_size) + self.assertEqual(m1.size(), start_size) + self.assertEqual(m1[:data_length], data) + self.assertEqual(m2[:data_length], data) class LargeMmapTests(unittest.TestCase):