Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 67 additions & 80 deletions tests/test_fs_event.py
Original file line number Diff line number Diff line change
@@ -1,100 +1,87 @@
import asyncio
import contextlib
import os.path
import tempfile

from uvloop import _testbase as tb
from uvloop.loop import FileSystemEvent


class Test_UV_FS_EVENT_CHANGE(tb.UVTestCase):
async def _file_writer(self):
f = await self.q.get()
while True:
f.write('hello uvloop\n')
f.flush()
x = await self.q.get()
if x is None:
return

def fs_event_setup(self):
self.change_event_count = 0
self.fname = ''
self.q = asyncio.Queue()

def event_cb(self, ev_fname: bytes, evt: FileSystemEvent):
_d, fn = os.path.split(self.fname)
self.assertEqual(ev_fname, fn)
self.assertEqual(evt, FileSystemEvent.CHANGE)
self.change_event_count += 1
if self.change_event_count < 4:
self.q.put_nowait(0)
else:
self.q.put_nowait(None)
class Test_UV_FS_Event(tb.UVTestCase):
def setUp(self):
super().setUp()
self.exit_stack = contextlib.ExitStack()
self.tmp_dir = self.exit_stack.enter_context(
tempfile.TemporaryDirectory()
)

def tearDown(self):
self.exit_stack.close()
super().tearDown()

def test_fs_event_change(self):
self.fs_event_setup()

async def run(write_task):
self.q.put_nowait(tf)
try:
await asyncio.wait_for(write_task, 4)
except asyncio.TimeoutError:
write_task.cancel()

with tempfile.NamedTemporaryFile('wt') as tf:
self.fname = tf.name.encode()
h = self.loop._monitor_fs(tf.name, self.event_cb)
change_event_count = 0
filename = "fs_event_change.txt"
path = os.path.join(self.tmp_dir, filename)
q = asyncio.Queue()

with open(path, 'wt') as f:
async def file_writer():
while True:
f.write('hello uvloop\n')
f.flush()
x = await q.get()
if x is None:
return

def event_cb(ev_fname: bytes, evt: FileSystemEvent):
nonlocal change_event_count
self.assertEqual(ev_fname, filename.encode())
self.assertEqual(evt, FileSystemEvent.CHANGE)
change_event_count += 1
if change_event_count < 4:
q.put_nowait(0)
else:
q.put_nowait(None)

h = self.loop._monitor_fs(path, event_cb)
self.loop.run_until_complete(
asyncio.sleep(0.1) # let monitor start
)
self.assertFalse(h.cancelled())

self.loop.run_until_complete(run(
self.loop.create_task(self._file_writer())))
self.loop.run_until_complete(asyncio.wait_for(file_writer(), 4))
h.cancel()
self.assertTrue(h.cancelled())

self.assertEqual(self.change_event_count, 4)


class Test_UV_FS_EVENT_RENAME(tb.UVTestCase):
async def _file_renamer(self):
await self.q.get()
os.rename(os.path.join(self.dname, self.changed_name),
os.path.join(self.dname, self.changed_name + "-new"))
await self.q.get()

def fs_event_setup(self):
self.dname = ''
self.changed_name = "hello_fs_event.txt"
self.changed_set = {self.changed_name, self.changed_name + '-new'}
self.q = asyncio.Queue()

def event_cb(self, ev_fname: bytes, evt: FileSystemEvent):
ev_fname = ev_fname.decode()
self.assertEqual(evt, FileSystemEvent.RENAME)
self.changed_set.remove(ev_fname)
if len(self.changed_set) == 0:
self.q.put_nowait(None)
self.assertEqual(change_event_count, 4)

def test_fs_event_rename(self):
self.fs_event_setup()

async def run(write_task):
self.q.put_nowait(0)
try:
await asyncio.wait_for(write_task, 4)
except asyncio.TimeoutError:
write_task.cancel()

with tempfile.TemporaryDirectory() as td_name:
self.dname = td_name
f = open(os.path.join(td_name, self.changed_name), 'wt')
orig_name = "hello_fs_event.txt"
new_name = "hello_fs_event_rename.txt"
changed_set = {orig_name, new_name}
event = asyncio.Event()

async def file_renamer():
os.rename(os.path.join(self.tmp_dir, orig_name),
os.path.join(self.tmp_dir, new_name))
await event.wait()

def event_cb(ev_fname: bytes, evt: FileSystemEvent):
ev_fname = ev_fname.decode()
self.assertEqual(evt, FileSystemEvent.RENAME)
changed_set.discard(ev_fname)
if len(changed_set) == 0:
event.set()

with open(os.path.join(self.tmp_dir, orig_name), 'wt') as f:
f.write('hello!')
f.close()
h = self.loop._monitor_fs(td_name, self.event_cb)
self.assertFalse(h.cancelled())
h = self.loop._monitor_fs(self.tmp_dir, event_cb)
self.loop.run_until_complete(asyncio.sleep(0.5)) # let monitor start
self.assertFalse(h.cancelled())

self.loop.run_until_complete(run(
self.loop.create_task(self._file_renamer())))
h.cancel()
self.assertTrue(h.cancelled())
self.loop.run_until_complete(asyncio.wait_for(file_renamer(), 4))
h.cancel()
self.assertTrue(h.cancelled())

self.assertEqual(len(self.changed_set), 0)
self.assertEqual(len(changed_set), 0)
Loading