-
Notifications
You must be signed in to change notification settings - Fork 565
/
Copy pathtest_fs_event.py
100 lines (81 loc) · 3.11 KB
/
test_fs_event.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import asyncio
import os.path
import tempfile
from uvloop import _testbase as tb
from uvloop.loop import FileSystemEvent
class Test_UV_FS_EVENT_CHANGE(tb.UVTestCase):
async def _file_writer(self):
f = await self.q.get()
while True:
f.write('hello uvloop\n')
f.flush()
x = await self.q.get()
if x is None:
return
def fs_event_setup(self):
self.change_event_count = 0
self.fname = ''
self.q = asyncio.Queue()
def event_cb(self, ev_fname: bytes, evt: FileSystemEvent):
_d, fn = os.path.split(self.fname)
self.assertEqual(ev_fname, fn)
self.assertEqual(evt, FileSystemEvent.CHANGE)
self.change_event_count += 1
if self.change_event_count < 4:
self.q.put_nowait(0)
else:
self.q.put_nowait(None)
def test_fs_event_change(self):
self.fs_event_setup()
async def run(write_task):
self.q.put_nowait(tf)
try:
await asyncio.wait_for(write_task, 4)
except asyncio.TimeoutError:
write_task.cancel()
with tempfile.NamedTemporaryFile('wt') as tf:
self.fname = tf.name.encode()
h = self.loop._monitor_fs(tf.name, self.event_cb)
self.assertFalse(h.cancelled())
self.loop.run_until_complete(run(
self.loop.create_task(self._file_writer())))
h.cancel()
self.assertTrue(h.cancelled())
self.assertEqual(self.change_event_count, 4)
class Test_UV_FS_EVENT_RENAME(tb.UVTestCase):
async def _file_renamer(self):
await self.q.get()
os.rename(os.path.join(self.dname, self.changed_name),
os.path.join(self.dname, self.changed_name + "-new"))
await self.q.get()
def fs_event_setup(self):
self.dname = ''
self.changed_name = "hello_fs_event.txt"
self.changed_set = {self.changed_name, self.changed_name + '-new'}
self.q = asyncio.Queue()
def event_cb(self, ev_fname: bytes, evt: FileSystemEvent):
ev_fname = ev_fname.decode()
self.assertEqual(evt, FileSystemEvent.RENAME)
self.changed_set.remove(ev_fname)
if len(self.changed_set) == 0:
self.q.put_nowait(None)
def test_fs_event_rename(self):
self.fs_event_setup()
async def run(write_task):
self.q.put_nowait(0)
try:
await asyncio.wait_for(write_task, 4)
except asyncio.TimeoutError:
write_task.cancel()
with tempfile.TemporaryDirectory() as td_name:
self.dname = td_name
f = open(os.path.join(td_name, self.changed_name), 'wt')
f.write('hello!')
f.close()
h = self.loop._monitor_fs(td_name, self.event_cb)
self.assertFalse(h.cancelled())
self.loop.run_until_complete(run(
self.loop.create_task(self._file_renamer())))
h.cancel()
self.assertTrue(h.cancelled())
self.assertEqual(len(self.changed_set), 0)