@@ -1290,6 +1290,7 @@ def test_ror_operator(self):
1290
1290
1291
1291
class WalkTests (unittest .TestCase ):
1292
1292
"""Tests for os.walk()."""
1293
+ is_fwalk = False
1293
1294
1294
1295
# Wrapper to hide minor differences between os.walk and os.fwalk
1295
1296
# to tests both functions with the same code base
@@ -1324,14 +1325,14 @@ def setUp(self):
1324
1325
self .sub11_path = join (self .sub1_path , "SUB11" )
1325
1326
sub2_path = join (self .walk_path , "SUB2" )
1326
1327
sub21_path = join (sub2_path , "SUB21" )
1327
- tmp1_path = join (self .walk_path , "tmp1" )
1328
+ self . tmp1_path = join (self .walk_path , "tmp1" )
1328
1329
tmp2_path = join (self .sub1_path , "tmp2" )
1329
1330
tmp3_path = join (sub2_path , "tmp3" )
1330
1331
tmp5_path = join (sub21_path , "tmp3" )
1331
1332
self .link_path = join (sub2_path , "link" )
1332
1333
t2_path = join (os_helper .TESTFN , "TEST2" )
1333
1334
tmp4_path = join (os_helper .TESTFN , "TEST2" , "tmp4" )
1334
- broken_link_path = join (sub2_path , "broken_link" )
1335
+ self . broken_link_path = join (sub2_path , "broken_link" )
1335
1336
broken_link2_path = join (sub2_path , "broken_link2" )
1336
1337
broken_link3_path = join (sub2_path , "broken_link3" )
1337
1338
@@ -1341,13 +1342,13 @@ def setUp(self):
1341
1342
os .makedirs (sub21_path )
1342
1343
os .makedirs (t2_path )
1343
1344
1344
- for path in tmp1_path , tmp2_path , tmp3_path , tmp4_path , tmp5_path :
1345
+ for path in self . tmp1_path , tmp2_path , tmp3_path , tmp4_path , tmp5_path :
1345
1346
with open (path , "x" , encoding = 'utf-8' ) as f :
1346
1347
f .write ("I'm " + path + " and proud of it. Blame test_os.\n " )
1347
1348
1348
1349
if os_helper .can_symlink ():
1349
1350
os .symlink (os .path .abspath (t2_path ), self .link_path )
1350
- os .symlink ('broken' , broken_link_path , True )
1351
+ os .symlink ('broken' , self . broken_link_path , True )
1351
1352
os .symlink (join ('tmp3' , 'broken' ), broken_link2_path , True )
1352
1353
os .symlink (join ('SUB21' , 'tmp5' ), broken_link3_path , True )
1353
1354
self .sub2_tree = (sub2_path , ["SUB21" , "link" ],
@@ -1443,6 +1444,11 @@ def test_walk_symlink(self):
1443
1444
else :
1444
1445
self .fail ("Didn't follow symlink with followlinks=True" )
1445
1446
1447
+ walk_it = self .walk (self .broken_link_path , follow_symlinks = True )
1448
+ if self .is_fwalk :
1449
+ self .assertRaises (FileNotFoundError , next , walk_it )
1450
+ self .assertRaises (StopIteration , next , walk_it )
1451
+
1446
1452
def test_walk_bad_dir (self ):
1447
1453
# Walk top-down.
1448
1454
errors = []
@@ -1464,6 +1470,73 @@ def test_walk_bad_dir(self):
1464
1470
finally :
1465
1471
os .rename (path1new , path1 )
1466
1472
1473
+ def test_walk_bad_dir2 (self ):
1474
+ walk_it = self .walk ('nonexisting' )
1475
+ if self .is_fwalk :
1476
+ self .assertRaises (FileNotFoundError , next , walk_it )
1477
+ self .assertRaises (StopIteration , next , walk_it )
1478
+
1479
+ walk_it = self .walk ('nonexisting' , follow_symlinks = True )
1480
+ if self .is_fwalk :
1481
+ self .assertRaises (FileNotFoundError , next , walk_it )
1482
+ self .assertRaises (StopIteration , next , walk_it )
1483
+
1484
+ walk_it = self .walk (self .tmp1_path )
1485
+ self .assertRaises (StopIteration , next , walk_it )
1486
+
1487
+ walk_it = self .walk (self .tmp1_path , follow_symlinks = True )
1488
+ if self .is_fwalk :
1489
+ self .assertRaises (NotADirectoryError , next , walk_it )
1490
+ self .assertRaises (StopIteration , next , walk_it )
1491
+
1492
+ @unittest .skipUnless (hasattr (os , "mkfifo" ), 'requires os.mkfifo()' )
1493
+ @unittest .skipIf (sys .platform == "vxworks" ,
1494
+ "fifo requires special path on VxWorks" )
1495
+ def test_walk_named_pipe (self ):
1496
+ path = os_helper .TESTFN + '-pipe'
1497
+ os .mkfifo (path )
1498
+ self .addCleanup (os .unlink , path )
1499
+
1500
+ walk_it = self .walk (path )
1501
+ self .assertRaises (StopIteration , next , walk_it )
1502
+
1503
+ walk_it = self .walk (path , follow_symlinks = True )
1504
+ if self .is_fwalk :
1505
+ self .assertRaises (NotADirectoryError , next , walk_it )
1506
+ self .assertRaises (StopIteration , next , walk_it )
1507
+
1508
+ @unittest .skipUnless (hasattr (os , "mkfifo" ), 'requires os.mkfifo()' )
1509
+ @unittest .skipIf (sys .platform == "vxworks" ,
1510
+ "fifo requires special path on VxWorks" )
1511
+ def test_walk_named_pipe2 (self ):
1512
+ path = os_helper .TESTFN + '-dir'
1513
+ os .mkdir (path )
1514
+ self .addCleanup (shutil .rmtree , path )
1515
+ os .mkfifo (os .path .join (path , 'mypipe' ))
1516
+
1517
+ errors = []
1518
+ walk_it = self .walk (path , onerror = errors .append )
1519
+ next (walk_it )
1520
+ self .assertRaises (StopIteration , next , walk_it )
1521
+ self .assertEqual (errors , [])
1522
+
1523
+ errors = []
1524
+ walk_it = self .walk (path , onerror = errors .append )
1525
+ root , dirs , files = next (walk_it )
1526
+ self .assertEqual (root , path )
1527
+ self .assertEqual (dirs , [])
1528
+ self .assertEqual (files , ['mypipe' ])
1529
+ dirs .extend (files )
1530
+ files .clear ()
1531
+ if self .is_fwalk :
1532
+ self .assertRaises (NotADirectoryError , next , walk_it )
1533
+ self .assertRaises (StopIteration , next , walk_it )
1534
+ if self .is_fwalk :
1535
+ self .assertEqual (errors , [])
1536
+ else :
1537
+ self .assertEqual (len (errors ), 1 , errors )
1538
+ self .assertIsInstance (errors [0 ], NotADirectoryError )
1539
+
1467
1540
def test_walk_many_open_files (self ):
1468
1541
depth = 30
1469
1542
base = os .path .join (os_helper .TESTFN , 'deep' )
@@ -1529,6 +1602,7 @@ def test_walk_above_recursion_limit(self):
1529
1602
@unittest .skipUnless (hasattr (os , 'fwalk' ), "Test needs os.fwalk()" )
1530
1603
class FwalkTests (WalkTests ):
1531
1604
"""Tests for os.fwalk()."""
1605
+ is_fwalk = True
1532
1606
1533
1607
def walk (self , top , ** kwargs ):
1534
1608
for root , dirs , files , root_fd in self .fwalk (top , ** kwargs ):
0 commit comments