-
-
Notifications
You must be signed in to change notification settings - Fork 2.8k
/
Copy pathtest_tmpdir.py
621 lines (508 loc) · 19.1 KB
/
test_tmpdir.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
# mypy: allow-untyped-defs
from __future__ import annotations
from collections.abc import Callable
import dataclasses
import os
from pathlib import Path
import stat
import sys
from typing import cast
import warnings
from _pytest import pathlib
from _pytest.config import Config
from _pytest.monkeypatch import MonkeyPatch
from _pytest.pathlib import cleanup_numbered_dir
from _pytest.pathlib import create_cleanup_lock
from _pytest.pathlib import make_numbered_dir
from _pytest.pathlib import maybe_delete_a_numbered_dir
from _pytest.pathlib import on_rm_rf_error
from _pytest.pathlib import register_cleanup_lock_removal
from _pytest.pathlib import rm_rf
from _pytest.pytester import Pytester
from _pytest.tmpdir import get_user
from _pytest.tmpdir import TempPathFactory
import pytest
def test_tmp_path_fixture(pytester: Pytester) -> None:
p = pytester.copy_example("tmpdir/tmp_path_fixture.py")
results = pytester.runpytest(p)
results.stdout.fnmatch_lines(["*1 passed*"])
@dataclasses.dataclass
class FakeConfig:
basetemp: str | Path
@property
def trace(self):
return self
def get(self, key):
return lambda *k: None
def getini(self, name):
if name == "tmp_path_retention_count":
return 3
elif name == "tmp_path_retention_policy":
return "all"
else:
assert False
@property
def option(self):
return self
class TestTmpPathHandler:
def test_mktemp(self, tmp_path: Path) -> None:
config = cast(Config, FakeConfig(tmp_path))
t = TempPathFactory.from_config(config, _ispytest=True)
tmp = t.mktemp("world")
assert str(tmp.relative_to(t.getbasetemp())) == "world0"
tmp = t.mktemp("this")
assert str(tmp.relative_to(t.getbasetemp())).startswith("this")
tmp2 = t.mktemp("this")
assert str(tmp2.relative_to(t.getbasetemp())).startswith("this")
assert tmp2 != tmp
def test_tmppath_relative_basetemp_absolute(
self, tmp_path: Path, monkeypatch: MonkeyPatch
) -> None:
"""#4425"""
monkeypatch.chdir(tmp_path)
config = cast(Config, FakeConfig("hello"))
t = TempPathFactory.from_config(config, _ispytest=True)
assert t.getbasetemp().resolve() == (tmp_path / "hello").resolve()
class TestConfigTmpPath:
def test_getbasetemp_custom_removes_old(self, pytester: Pytester) -> None:
mytemp = pytester.path.joinpath("xyz")
p = pytester.makepyfile(
"""
def test_1(tmp_path):
pass
"""
)
pytester.runpytest(p, f"--basetemp={mytemp}")
assert mytemp.exists()
mytemp.joinpath("hello").touch()
pytester.runpytest(p, f"--basetemp={mytemp}")
assert mytemp.exists()
assert not mytemp.joinpath("hello").exists()
def test_policy_failed_removes_only_passed_dir(self, pytester: Pytester) -> None:
p = pytester.makepyfile(
"""
def test_1(tmp_path):
assert 0 == 0
def test_2(tmp_path):
assert 0 == 1
"""
)
pytester.makepyprojecttoml(
"""
[tool.pytest.ini_options]
tmp_path_retention_policy = "failed"
"""
)
pytester.inline_run(p)
root = pytester._test_tmproot
for child in root.iterdir():
base_dir = list(
filter(lambda x: x.is_dir() and not x.is_symlink(), child.iterdir())
)
assert len(base_dir) == 1
test_dir = list(
filter(
lambda x: x.is_dir() and not x.is_symlink(), base_dir[0].iterdir()
)
)
# Check only the failed one remains
assert len(test_dir) == 1
assert test_dir[0].name == "test_20"
def test_policy_failed_removes_basedir_when_all_passed(
self, pytester: Pytester
) -> None:
p = pytester.makepyfile(
"""
def test_1(tmp_path):
assert 0 == 0
"""
)
pytester.makepyprojecttoml(
"""
[tool.pytest.ini_options]
tmp_path_retention_policy = "failed"
"""
)
pytester.inline_run(p)
root = pytester._test_tmproot
for child in root.iterdir():
# This symlink will be deleted by cleanup_numbered_dir **after**
# the test finishes because it's triggered by atexit.
# So it has to be ignored here.
base_dir = filter(lambda x: not x.is_symlink(), child.iterdir())
# Check the base dir itself is gone
assert len(list(base_dir)) == 0
# issue #10502
def test_policy_failed_removes_dir_when_skipped_from_fixture(
self, pytester: Pytester
) -> None:
p = pytester.makepyfile(
"""
import pytest
@pytest.fixture
def fixt(tmp_path):
pytest.skip()
def test_fixt(fixt):
pass
"""
)
pytester.makepyprojecttoml(
"""
[tool.pytest.ini_options]
tmp_path_retention_policy = "failed"
"""
)
pytester.inline_run(p)
# Check if the whole directory is removed
root = pytester._test_tmproot
for child in root.iterdir():
base_dir = list(
filter(lambda x: x.is_dir() and not x.is_symlink(), child.iterdir())
)
assert len(base_dir) == 0
# issue #10502
def test_policy_all_keeps_dir_when_skipped_from_fixture(
self, pytester: Pytester
) -> None:
p = pytester.makepyfile(
"""
import pytest
@pytest.fixture
def fixt(tmp_path):
pytest.skip()
def test_fixt(fixt):
pass
"""
)
pytester.makepyprojecttoml(
"""
[tool.pytest.ini_options]
tmp_path_retention_policy = "all"
"""
)
pytester.inline_run(p)
# Check if the whole directory is kept
root = pytester._test_tmproot
for child in root.iterdir():
base_dir = list(
filter(lambda x: x.is_dir() and not x.is_symlink(), child.iterdir())
)
assert len(base_dir) == 1
test_dir = list(
filter(
lambda x: x.is_dir() and not x.is_symlink(), base_dir[0].iterdir()
)
)
assert len(test_dir) == 1
testdata = [
("mypath", True),
("/mypath1", False),
("./mypath1", True),
("../mypath3", False),
("../../mypath4", False),
("mypath5/..", False),
("mypath6/../mypath6", True),
("mypath7/../mypath7/..", False),
]
@pytest.mark.parametrize("basename, is_ok", testdata)
def test_mktemp(pytester: Pytester, basename: str, is_ok: bool) -> None:
mytemp = pytester.mkdir("mytemp")
p = pytester.makepyfile(
f"""
def test_abs_path(tmp_path_factory):
tmp_path_factory.mktemp('{basename}', numbered=False)
"""
)
result = pytester.runpytest(p, f"--basetemp={mytemp}")
if is_ok:
assert result.ret == 0
assert mytemp.joinpath(basename).exists()
else:
assert result.ret == 1
result.stdout.fnmatch_lines("*ValueError*")
def test_tmp_path_always_is_realpath(pytester: Pytester, monkeypatch) -> None:
# the reason why tmp_path should be a realpath is that
# when you cd to it and do "os.getcwd()" you will anyway
# get the realpath. Using the symlinked path can thus
# easily result in path-inequality
# XXX if that proves to be a problem, consider using
# os.environ["PWD"]
realtemp = pytester.mkdir("myrealtemp")
linktemp = pytester.path.joinpath("symlinktemp")
attempt_symlink_to(linktemp, str(realtemp))
monkeypatch.setenv("PYTEST_DEBUG_TEMPROOT", str(linktemp))
pytester.makepyfile(
"""
def test_1(tmp_path):
assert tmp_path.resolve() == tmp_path
"""
)
reprec = pytester.inline_run()
reprec.assertoutcome(passed=1)
def test_tmp_path_too_long_on_parametrization(pytester: Pytester) -> None:
pytester.makepyfile(
"""
import pytest
@pytest.mark.parametrize("arg", ["1"*1000])
def test_some(arg, tmp_path):
tmp_path.joinpath("hello").touch()
"""
)
reprec = pytester.inline_run()
reprec.assertoutcome(passed=1)
def test_tmp_path_factory(pytester: Pytester) -> None:
pytester.makepyfile(
"""
import pytest
@pytest.fixture(scope='session')
def session_dir(tmp_path_factory):
return tmp_path_factory.mktemp('data', numbered=False)
def test_some(session_dir):
assert session_dir.is_dir()
"""
)
reprec = pytester.inline_run()
reprec.assertoutcome(passed=1)
def test_tmp_path_fallback_tox_env(pytester: Pytester, monkeypatch) -> None:
"""Test that tmp_path works even if environment variables required by getpass
module are missing (#1010).
"""
monkeypatch.delenv("USER", raising=False)
monkeypatch.delenv("USERNAME", raising=False)
pytester.makepyfile(
"""
def test_some(tmp_path):
assert tmp_path.is_dir()
"""
)
reprec = pytester.inline_run()
reprec.assertoutcome(passed=1)
@pytest.fixture
def break_getuser(monkeypatch):
monkeypatch.setattr("os.getuid", lambda: -1)
# taken from python 2.7/3.4
for envvar in ("LOGNAME", "USER", "LNAME", "USERNAME"):
monkeypatch.delenv(envvar, raising=False)
@pytest.mark.usefixtures("break_getuser")
@pytest.mark.skipif(sys.platform.startswith("win"), reason="no os.getuid on windows")
def test_tmp_path_fallback_uid_not_found(pytester: Pytester) -> None:
"""Test that tmp_path works even if the current process's user id does not
correspond to a valid user.
"""
pytester.makepyfile(
"""
def test_some(tmp_path):
assert tmp_path.is_dir()
"""
)
reprec = pytester.inline_run()
reprec.assertoutcome(passed=1)
@pytest.mark.usefixtures("break_getuser")
@pytest.mark.skipif(sys.platform.startswith("win"), reason="no os.getuid on windows")
def test_get_user_uid_not_found():
"""Test that get_user() function works even if the current process's
user id does not correspond to a valid user (e.g. running pytest in a
Docker container with 'docker run -u'.
"""
assert get_user() is None
@pytest.mark.skipif(not sys.platform.startswith("win"), reason="win only")
def test_get_user(monkeypatch):
"""Test that get_user() function works even if environment variables
required by getpass module are missing from the environment on Windows
(#1010).
"""
monkeypatch.delenv("USER", raising=False)
monkeypatch.delenv("USERNAME", raising=False)
assert get_user() is None
class TestNumberedDir:
PREFIX = "fun-"
def test_make(self, tmp_path):
for i in range(10):
d = make_numbered_dir(root=tmp_path, prefix=self.PREFIX)
assert d.name.startswith(self.PREFIX)
assert d.name.endswith(str(i))
symlink = tmp_path.joinpath(self.PREFIX + "current")
if symlink.exists():
# unix
assert symlink.is_symlink()
assert symlink.resolve() == d.resolve()
def test_cleanup_lock_create(self, tmp_path):
d = tmp_path.joinpath("test")
d.mkdir()
lockfile = create_cleanup_lock(d)
with pytest.raises(OSError, match="cannot create lockfile in .*"):
create_cleanup_lock(d)
lockfile.unlink()
def test_lock_register_cleanup_removal(self, tmp_path: Path) -> None:
lock = create_cleanup_lock(tmp_path)
registry: list[Callable[..., None]] = []
register_cleanup_lock_removal(lock, register=registry.append)
(cleanup_func,) = registry
assert lock.is_file()
cleanup_func(original_pid="intentionally_different")
assert lock.is_file()
cleanup_func()
assert not lock.exists()
cleanup_func()
assert not lock.exists()
def _do_cleanup(self, tmp_path: Path, keep: int = 2) -> None:
self.test_make(tmp_path)
cleanup_numbered_dir(
root=tmp_path,
prefix=self.PREFIX,
keep=keep,
consider_lock_dead_if_created_before=0,
)
def test_cleanup_keep(self, tmp_path):
self._do_cleanup(tmp_path)
a, b = (x for x in tmp_path.iterdir() if not x.is_symlink())
print(a, b)
def test_cleanup_keep_0(self, tmp_path: Path):
self._do_cleanup(tmp_path, 0)
dir_num = len(list(tmp_path.iterdir()))
assert dir_num == 0
def test_cleanup_locked(self, tmp_path):
p = make_numbered_dir(root=tmp_path, prefix=self.PREFIX)
create_cleanup_lock(p)
assert not pathlib.ensure_deletable(
p, consider_lock_dead_if_created_before=p.stat().st_mtime - 1
)
assert pathlib.ensure_deletable(
p, consider_lock_dead_if_created_before=p.stat().st_mtime + 1
)
def test_cleanup_ignores_symlink(self, tmp_path):
the_symlink = tmp_path / (self.PREFIX + "current")
attempt_symlink_to(the_symlink, tmp_path / (self.PREFIX + "5"))
self._do_cleanup(tmp_path)
def test_removal_accepts_lock(self, tmp_path):
folder = make_numbered_dir(root=tmp_path, prefix=self.PREFIX)
create_cleanup_lock(folder)
maybe_delete_a_numbered_dir(folder)
assert folder.is_dir()
class TestRmRf:
def test_rm_rf(self, tmp_path):
adir = tmp_path / "adir"
adir.mkdir()
rm_rf(adir)
assert not adir.exists()
adir.mkdir()
afile = adir / "afile"
afile.write_bytes(b"aa")
rm_rf(adir)
assert not adir.exists()
def test_rm_rf_with_read_only_file(self, tmp_path):
"""Ensure rm_rf can remove directories with read-only files in them (#5524)"""
fn = tmp_path / "dir/foo.txt"
fn.parent.mkdir()
fn.touch()
self.chmod_r(fn)
rm_rf(fn.parent)
assert not fn.parent.is_dir()
def chmod_r(self, path):
mode = os.stat(str(path)).st_mode
os.chmod(str(path), mode & ~stat.S_IWRITE)
def test_rm_rf_with_read_only_directory(self, tmp_path):
"""Ensure rm_rf can remove read-only directories (#5524)"""
adir = tmp_path / "dir"
adir.mkdir()
(adir / "foo.txt").touch()
self.chmod_r(adir)
rm_rf(adir)
assert not adir.is_dir()
def test_on_rm_rf_error(self, tmp_path: Path) -> None:
adir = tmp_path / "dir"
adir.mkdir()
fn = adir / "foo.txt"
fn.touch()
self.chmod_r(fn)
# unknown exception
with pytest.warns(pytest.PytestWarning):
exc_info1 = (RuntimeError, RuntimeError(), None)
on_rm_rf_error(os.unlink, str(fn), exc_info1, start_path=tmp_path)
assert fn.is_file()
# we ignore FileNotFoundError
exc_info2 = (FileNotFoundError, FileNotFoundError(), None)
assert not on_rm_rf_error(None, str(fn), exc_info2, start_path=tmp_path)
# unknown function
with pytest.warns(
pytest.PytestWarning,
match=r"^\(rm_rf\) unknown function None when removing .*foo.txt:\n<class 'PermissionError'>: ",
):
exc_info3 = (PermissionError, PermissionError(), None)
on_rm_rf_error(None, str(fn), exc_info3, start_path=tmp_path)
assert fn.is_file()
# ignored function
with warnings.catch_warnings(record=True) as w:
exc_info4 = PermissionError()
on_rm_rf_error(os.open, str(fn), exc_info4, start_path=tmp_path)
assert fn.is_file()
assert not [x.message for x in w]
exc_info5 = PermissionError()
on_rm_rf_error(os.unlink, str(fn), exc_info5, start_path=tmp_path)
assert not fn.is_file()
def attempt_symlink_to(path, to_path):
"""Try to make a symlink from "path" to "to_path", skipping in case this platform
does not support it or we don't have sufficient privileges (common on Windows)."""
try:
Path(path).symlink_to(Path(to_path))
except OSError:
pytest.skip("could not create symbolic link")
def test_basetemp_with_read_only_files(pytester: Pytester) -> None:
"""Integration test for #5524"""
pytester.makepyfile(
"""
import os
import stat
def test(tmp_path):
fn = tmp_path / 'foo.txt'
fn.write_text('hello', encoding='utf-8')
mode = os.stat(str(fn)).st_mode
os.chmod(str(fn), mode & ~stat.S_IREAD)
"""
)
result = pytester.runpytest("--basetemp=tmp")
assert result.ret == 0
# running a second time and ensure we don't crash
result = pytester.runpytest("--basetemp=tmp")
assert result.ret == 0
def test_tmp_path_factory_handles_invalid_dir_characters(
tmp_path_factory: TempPathFactory, monkeypatch: MonkeyPatch
) -> None:
monkeypatch.setattr("getpass.getuser", lambda: "os/<:*?;>agnostic")
# _basetemp / _given_basetemp are cached / set in parallel runs, patch them
monkeypatch.setattr(tmp_path_factory, "_basetemp", None)
monkeypatch.setattr(tmp_path_factory, "_given_basetemp", None)
p = tmp_path_factory.getbasetemp()
assert "pytest-of-unknown" in str(p)
@pytest.mark.skipif(not hasattr(os, "getuid"), reason="checks unix permissions")
def test_tmp_path_factory_create_directory_with_safe_permissions(
tmp_path: Path, monkeypatch: MonkeyPatch
) -> None:
"""Verify that pytest creates directories under /tmp with private permissions."""
# Use the test's tmp_path as the system temproot (/tmp).
monkeypatch.setenv("PYTEST_DEBUG_TEMPROOT", str(tmp_path))
tmp_factory = TempPathFactory(None, 3, "all", lambda *args: None, _ispytest=True)
basetemp = tmp_factory.getbasetemp()
# No world-readable permissions.
assert (basetemp.stat().st_mode & 0o077) == 0
# Parent too (pytest-of-foo).
assert (basetemp.parent.stat().st_mode & 0o077) == 0
@pytest.mark.skipif(not hasattr(os, "getuid"), reason="checks unix permissions")
def test_tmp_path_factory_fixes_up_world_readable_permissions(
tmp_path: Path, monkeypatch: MonkeyPatch
) -> None:
"""Verify that if a /tmp/pytest-of-foo directory already exists with
world-readable permissions, it is fixed.
pytest used to mkdir with such permissions, that's why we fix it up.
"""
# Use the test's tmp_path as the system temproot (/tmp).
monkeypatch.setenv("PYTEST_DEBUG_TEMPROOT", str(tmp_path))
tmp_factory = TempPathFactory(None, 3, "all", lambda *args: None, _ispytest=True)
basetemp = tmp_factory.getbasetemp()
# Before - simulate bad perms.
os.chmod(basetemp.parent, 0o777)
assert (basetemp.parent.stat().st_mode & 0o077) != 0
tmp_factory = TempPathFactory(None, 3, "all", lambda *args: None, _ispytest=True)
basetemp = tmp_factory.getbasetemp()
# After - fixed.
assert (basetemp.parent.stat().st_mode & 0o077) == 0