Skip to content

Update test_mmap.py from Cpython v3.11.2 #4841

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 5, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 99 additions & 9 deletions Lib/test/test_mmap.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
from test.support import (requires, _2G, _4G, gc_collect, cpython_only)
from test.support import (
requires, _2G, _4G, gc_collect, cpython_only, is_emscripten
)
from test.support.import_helper import import_module
from test.support.os_helper import TESTFN, unlink
import unittest
import os
import re
import itertools
import random
import socket
import string
import sys
import weakref

Expand All @@ -14,6 +18,16 @@

PAGESIZE = mmap.PAGESIZE

tagname_prefix = f'python_{os.getpid()}_test_mmap'
def random_tagname(length=10):
suffix = ''.join(random.choices(string.ascii_uppercase, k=length))
return f'{tagname_prefix}_{suffix}'

# Python's mmap module dup()s the file descriptor. Emscripten's FS layer
# does not materialize file changes through a dupped fd to a new mmap.
if is_emscripten:
raise unittest.SkipTest("incompatible with Emscripten's mmap emulation.")


class MmapTests(unittest.TestCase):

Expand Down Expand Up @@ -609,21 +623,23 @@ def test_tagname(self):
data1 = b"0123456789"
data2 = b"abcdefghij"
assert len(data1) == len(data2)
tagname1 = random_tagname()
tagname2 = random_tagname()

# Test same tag
m1 = mmap.mmap(-1, len(data1), tagname="foo")
m1 = mmap.mmap(-1, len(data1), tagname=tagname1)
m1[:] = data1
m2 = mmap.mmap(-1, len(data2), tagname="foo")
m2 = mmap.mmap(-1, len(data2), tagname=tagname1)
m2[:] = data2
self.assertEqual(m1[:], data2)
self.assertEqual(m2[:], data2)
m2.close()
m1.close()

# Test different tag
m1 = mmap.mmap(-1, len(data1), tagname="foo")
m1 = mmap.mmap(-1, len(data1), tagname=tagname1)
m1[:] = data1
m2 = mmap.mmap(-1, len(data2), tagname="boo")
m2 = mmap.mmap(-1, len(data2), tagname=tagname2)
m2[:] = data2
self.assertEqual(m1[:], data1)
self.assertEqual(m2[:], data2)
Expand All @@ -634,17 +650,18 @@ def test_tagname(self):
@unittest.skipUnless(os.name == 'nt', 'requires Windows')
def test_sizeof(self):
m1 = mmap.mmap(-1, 100)
tagname = "foo"
tagname = random_tagname()
m2 = mmap.mmap(-1, 100, tagname=tagname)
self.assertEqual(sys.getsizeof(m2),
sys.getsizeof(m1) + len(tagname) + 1)

@unittest.skipUnless(os.name == 'nt', 'requires Windows')
def test_crasher_on_windows(self):
# Should not crash (Issue 1733986)
m = mmap.mmap(-1, 1000, tagname="foo")
tagname = random_tagname()
m = mmap.mmap(-1, 1000, tagname=tagname)
try:
mmap.mmap(-1, 5000, tagname="foo")[:] # same tagname, but larger size
mmap.mmap(-1, 5000, tagname=tagname)[:] # same tagname, but larger size
except:
pass
m.close()
Expand Down Expand Up @@ -707,7 +724,6 @@ def test_write_returning_the_number_of_bytes_written(self):
self.assertEqual(mm.write(b"yz"), 2)
self.assertEqual(mm.write(b"python"), 6)

@unittest.skipIf(os.name == 'nt', 'cannot resize anonymous mmaps on Windows')
def test_resize_past_pos(self):
m = mmap.mmap(-1, 8192)
self.addCleanup(m.close)
Expand Down Expand Up @@ -797,6 +813,80 @@ def test_madvise(self):
self.assertEqual(m.madvise(mmap.MADV_NORMAL, 0, 2), None)
self.assertEqual(m.madvise(mmap.MADV_NORMAL, 0, size), None)

@unittest.skipUnless(os.name == 'nt', 'requires Windows')
def test_resize_up_when_mapped_to_pagefile(self):
"""If the mmap is backed by the pagefile ensure a resize up can happen
and that the original data is still in place
"""
start_size = PAGESIZE
new_size = 2 * start_size
data = bytes(random.getrandbits(8) for _ in range(start_size))

m = mmap.mmap(-1, start_size)
m[:] = data
m.resize(new_size)
self.assertEqual(len(m), new_size)
self.assertEqual(m[:start_size], data[:start_size])

@unittest.skipUnless(os.name == 'nt', 'requires Windows')
def test_resize_down_when_mapped_to_pagefile(self):
"""If the mmap is backed by the pagefile ensure a resize down up can happen
and that a truncated form of the original data is still in place
"""
start_size = PAGESIZE
new_size = start_size // 2
data = bytes(random.getrandbits(8) for _ in range(start_size))

m = mmap.mmap(-1, start_size)
m[:] = data
m.resize(new_size)
self.assertEqual(len(m), new_size)
self.assertEqual(m[:new_size], data[:new_size])

@unittest.skipUnless(os.name == 'nt', 'requires Windows')
def test_resize_fails_if_mapping_held_elsewhere(self):
"""If more than one mapping is held against a named file on Windows, neither
mapping can be resized
"""
start_size = 2 * PAGESIZE
reduced_size = PAGESIZE

f = open(TESTFN, 'wb+')
f.truncate(start_size)
try:
m1 = mmap.mmap(f.fileno(), start_size)
m2 = mmap.mmap(f.fileno(), start_size)
with self.assertRaises(OSError):
m1.resize(reduced_size)
with self.assertRaises(OSError):
m2.resize(reduced_size)
m2.close()
m1.resize(reduced_size)
self.assertEqual(m1.size(), reduced_size)
self.assertEqual(os.stat(f.fileno()).st_size, reduced_size)
finally:
f.close()

@unittest.skipUnless(os.name == 'nt', 'requires Windows')
def test_resize_succeeds_with_error_for_second_named_mapping(self):
"""If a more than one mapping exists of the same name, none of them can
be resized: they'll raise an Exception and leave the original mapping intact
"""
start_size = 2 * PAGESIZE
reduced_size = PAGESIZE
tagname = random_tagname()
data_length = 8
data = bytes(random.getrandbits(8) for _ in range(data_length))

m1 = mmap.mmap(-1, start_size, tagname=tagname)
m2 = mmap.mmap(-1, start_size, tagname=tagname)
m1[:data_length] = data
self.assertEqual(m2[:data_length], data)
with self.assertRaises(OSError):
m1.resize(reduced_size)
self.assertEqual(m1.size(), start_size)
self.assertEqual(m1[:data_length], data)
self.assertEqual(m2[:data_length], data)

class LargeMmapTests(unittest.TestCase):

Expand Down