@@ -1298,6 +1298,7 @@ def test_ror_operator(self):
1298
1298
1299
1299
class WalkTests (unittest .TestCase ):
1300
1300
"""Tests for os.walk()."""
1301
+ is_fwalk = False
1301
1302
1302
1303
# Wrapper to hide minor differences between os.walk and os.fwalk
1303
1304
# to tests both functions with the same code base
@@ -1332,14 +1333,14 @@ def setUp(self):
1332
1333
self .sub11_path = join (self .sub1_path , "SUB11" )
1333
1334
sub2_path = join (self .walk_path , "SUB2" )
1334
1335
sub21_path = join (sub2_path , "SUB21" )
1335
- tmp1_path = join (self .walk_path , "tmp1" )
1336
+ self . tmp1_path = join (self .walk_path , "tmp1" )
1336
1337
tmp2_path = join (self .sub1_path , "tmp2" )
1337
1338
tmp3_path = join (sub2_path , "tmp3" )
1338
1339
tmp5_path = join (sub21_path , "tmp3" )
1339
1340
self .link_path = join (sub2_path , "link" )
1340
1341
t2_path = join (os_helper .TESTFN , "TEST2" )
1341
1342
tmp4_path = join (os_helper .TESTFN , "TEST2" , "tmp4" )
1342
- broken_link_path = join (sub2_path , "broken_link" )
1343
+ self . broken_link_path = join (sub2_path , "broken_link" )
1343
1344
broken_link2_path = join (sub2_path , "broken_link2" )
1344
1345
broken_link3_path = join (sub2_path , "broken_link3" )
1345
1346
@@ -1349,13 +1350,13 @@ def setUp(self):
1349
1350
os .makedirs (sub21_path )
1350
1351
os .makedirs (t2_path )
1351
1352
1352
- for path in tmp1_path , tmp2_path , tmp3_path , tmp4_path , tmp5_path :
1353
+ for path in self . tmp1_path , tmp2_path , tmp3_path , tmp4_path , tmp5_path :
1353
1354
with open (path , "x" , encoding = 'utf-8' ) as f :
1354
1355
f .write ("I'm " + path + " and proud of it. Blame test_os.\n " )
1355
1356
1356
1357
if os_helper .can_symlink ():
1357
1358
os .symlink (os .path .abspath (t2_path ), self .link_path )
1358
- os .symlink ('broken' , broken_link_path , True )
1359
+ os .symlink ('broken' , self . broken_link_path , True )
1359
1360
os .symlink (join ('tmp3' , 'broken' ), broken_link2_path , True )
1360
1361
os .symlink (join ('SUB21' , 'tmp5' ), broken_link3_path , True )
1361
1362
self .sub2_tree = (sub2_path , ["SUB21" , "link" ],
@@ -1451,6 +1452,11 @@ def test_walk_symlink(self):
1451
1452
else :
1452
1453
self .fail ("Didn't follow symlink with followlinks=True" )
1453
1454
1455
+ walk_it = self .walk (self .broken_link_path , follow_symlinks = True )
1456
+ if self .is_fwalk :
1457
+ self .assertRaises (FileNotFoundError , next , walk_it )
1458
+ self .assertRaises (StopIteration , next , walk_it )
1459
+
1454
1460
def test_walk_bad_dir (self ):
1455
1461
# Walk top-down.
1456
1462
errors = []
@@ -1472,6 +1478,73 @@ def test_walk_bad_dir(self):
1472
1478
finally :
1473
1479
os .rename (path1new , path1 )
1474
1480
1481
+ def test_walk_bad_dir2 (self ):
1482
+ walk_it = self .walk ('nonexisting' )
1483
+ if self .is_fwalk :
1484
+ self .assertRaises (FileNotFoundError , next , walk_it )
1485
+ self .assertRaises (StopIteration , next , walk_it )
1486
+
1487
+ walk_it = self .walk ('nonexisting' , follow_symlinks = True )
1488
+ if self .is_fwalk :
1489
+ self .assertRaises (FileNotFoundError , next , walk_it )
1490
+ self .assertRaises (StopIteration , next , walk_it )
1491
+
1492
+ walk_it = self .walk (self .tmp1_path )
1493
+ self .assertRaises (StopIteration , next , walk_it )
1494
+
1495
+ walk_it = self .walk (self .tmp1_path , follow_symlinks = True )
1496
+ if self .is_fwalk :
1497
+ self .assertRaises (NotADirectoryError , next , walk_it )
1498
+ self .assertRaises (StopIteration , next , walk_it )
1499
+
1500
+ @unittest .skipUnless (hasattr (os , "mkfifo" ), 'requires os.mkfifo()' )
1501
+ @unittest .skipIf (sys .platform == "vxworks" ,
1502
+ "fifo requires special path on VxWorks" )
1503
+ def test_walk_named_pipe (self ):
1504
+ path = os_helper .TESTFN + '-pipe'
1505
+ os .mkfifo (path )
1506
+ self .addCleanup (os .unlink , path )
1507
+
1508
+ walk_it = self .walk (path )
1509
+ self .assertRaises (StopIteration , next , walk_it )
1510
+
1511
+ walk_it = self .walk (path , follow_symlinks = True )
1512
+ if self .is_fwalk :
1513
+ self .assertRaises (NotADirectoryError , next , walk_it )
1514
+ self .assertRaises (StopIteration , next , walk_it )
1515
+
1516
+ @unittest .skipUnless (hasattr (os , "mkfifo" ), 'requires os.mkfifo()' )
1517
+ @unittest .skipIf (sys .platform == "vxworks" ,
1518
+ "fifo requires special path on VxWorks" )
1519
+ def test_walk_named_pipe2 (self ):
1520
+ path = os_helper .TESTFN + '-dir'
1521
+ os .mkdir (path )
1522
+ self .addCleanup (shutil .rmtree , path )
1523
+ os .mkfifo (os .path .join (path , 'mypipe' ))
1524
+
1525
+ errors = []
1526
+ walk_it = self .walk (path , onerror = errors .append )
1527
+ next (walk_it )
1528
+ self .assertRaises (StopIteration , next , walk_it )
1529
+ self .assertEqual (errors , [])
1530
+
1531
+ errors = []
1532
+ walk_it = self .walk (path , onerror = errors .append )
1533
+ root , dirs , files = next (walk_it )
1534
+ self .assertEqual (root , path )
1535
+ self .assertEqual (dirs , [])
1536
+ self .assertEqual (files , ['mypipe' ])
1537
+ dirs .extend (files )
1538
+ files .clear ()
1539
+ if self .is_fwalk :
1540
+ self .assertRaises (NotADirectoryError , next , walk_it )
1541
+ self .assertRaises (StopIteration , next , walk_it )
1542
+ if self .is_fwalk :
1543
+ self .assertEqual (errors , [])
1544
+ else :
1545
+ self .assertEqual (len (errors ), 1 , errors )
1546
+ self .assertIsInstance (errors [0 ], NotADirectoryError )
1547
+
1475
1548
def test_walk_many_open_files (self ):
1476
1549
depth = 30
1477
1550
base = os .path .join (os_helper .TESTFN , 'deep' )
@@ -1537,6 +1610,7 @@ def test_walk_above_recursion_limit(self):
1537
1610
@unittest .skipUnless (hasattr (os , 'fwalk' ), "Test needs os.fwalk()" )
1538
1611
class FwalkTests (WalkTests ):
1539
1612
"""Tests for os.fwalk()."""
1613
+ is_fwalk = True
1540
1614
1541
1615
def walk (self , top , ** kwargs ):
1542
1616
for root , dirs , files , root_fd in self .fwalk (top , ** kwargs ):
0 commit comments