Skip to content

Commit 65ea4b7

Browse files
authored
windows: Fix server hangs under some circumstance (notify-rs#674)
* Add coresponding test * fix * simplify example * fmt * clippy * Even simpler * Better fix * Refactor * Fix * Fix merge conflicts * Fix * add changelog * fix ci * lower case * typo
1 parent f5c6814 commit 65ea4b7

File tree

5 files changed

+94
-22
lines changed

5 files changed

+94
-22
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
- FEATURE: added support for the [`flume`](https://docs.rs/flume) crate
55
- FIX: kqueue-backend: do not double unwatch top-level directory when recursively unwatching [#683]
66
- FIX: Return the crate error `PathNotFound` instead bubbling up the std::io error [#685] **breaking**
7+
- FIX: fix server hangs when trashing folders on Windows [#674]
78

89
## debouncer-full 0.6.0 (unreleased)
910
- FEATURE: allow `FileIdCache` trait implementations to choose ownership of the returned file-ids

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,5 +46,6 @@ rstest = "0.24.0"
4646
serde = { version = "1.0.89", features = ["derive"] }
4747
serde_json = "1.0.39"
4848
tempfile = "3.10.0"
49+
trash = "5.2.2"
4950
walkdir = "2.4.0"
5051
windows-sys = "0.59.0"

notify/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,3 +55,6 @@ serde_json.workspace = true
5555
tempfile.workspace = true
5656
nix.workspace = true
5757
insta.workspace = true
58+
59+
[target.'cfg(target_os = "windows")'.dev-dependencies]
60+
trash.workspace = true

notify/src/lib.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -459,4 +459,23 @@ mod tests {
459459

460460
panic!("did not receive expected event");
461461
}
462+
463+
#[test]
464+
#[cfg(target_os = "windows")]
465+
fn test_windows_trash_dir() -> std::result::Result<(), Box<dyn std::error::Error>> {
466+
let dir = tempdir()?;
467+
let child_dir = dir.path().join("child");
468+
fs::create_dir(&child_dir)?;
469+
470+
let mut watcher = recommended_watcher(|_| {
471+
// Do something with the event
472+
})?;
473+
watcher.watch(&child_dir, RecursiveMode::NonRecursive)?;
474+
475+
trash::delete(&child_dir)?;
476+
477+
watcher.watch(dir.path(), RecursiveMode::NonRecursive)?;
478+
479+
Ok(())
480+
}
462481
}

notify/src/windows.rs

Lines changed: 70 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ use std::slice;
2020
use std::sync::{Arc, Mutex};
2121
use std::thread;
2222
use windows_sys::Win32::Foundation::{
23-
CloseHandle, ERROR_OPERATION_ABORTED, HANDLE, INVALID_HANDLE_VALUE, WAIT_OBJECT_0,
23+
CloseHandle, ERROR_ACCESS_DENIED, ERROR_OPERATION_ABORTED, ERROR_SUCCESS, HANDLE,
24+
INVALID_HANDLE_VALUE, WAIT_OBJECT_0,
2425
};
2526
use windows_sys::Win32::Storage::FileSystem::{
2627
CreateFileW, ReadDirectoryChangesW, FILE_ACTION_ADDED, FILE_ACTION_MODIFIED,
@@ -51,6 +52,13 @@ struct ReadDirectoryRequest {
5152
buffer: [u8; BUF_SIZE as usize],
5253
handle: HANDLE,
5354
data: ReadData,
55+
action_tx: Sender<Action>,
56+
}
57+
58+
impl ReadDirectoryRequest {
59+
fn unwatch(&self) {
60+
let _ = self.action_tx.send(Action::Unwatch(self.data.dir.clone()));
61+
}
5462
}
5563

5664
enum Action {
@@ -72,6 +80,7 @@ struct WatchState {
7280
}
7381

7482
struct ReadDirectoryChangesServer {
83+
tx: Sender<Action>,
7584
rx: Receiver<Action>,
7685
event_handler: Arc<Mutex<dyn EventHandler>>,
7786
meta_tx: Sender<MetaEvent>,
@@ -92,17 +101,21 @@ impl ReadDirectoryChangesServer {
92101
let sem_temp = wakeup_sem as u64;
93102
let _ = thread::Builder::new()
94103
.name("notify-rs windows loop".to_string())
95-
.spawn(move || {
96-
let wakeup_sem = sem_temp as HANDLE;
97-
let server = ReadDirectoryChangesServer {
98-
rx: action_rx,
99-
event_handler,
100-
meta_tx,
101-
cmd_tx,
102-
watches: HashMap::new(),
103-
wakeup_sem,
104-
};
105-
server.run();
104+
.spawn({
105+
let tx = action_tx.clone();
106+
move || {
107+
let wakeup_sem = sem_temp as HANDLE;
108+
let server = ReadDirectoryChangesServer {
109+
tx,
110+
rx: action_rx,
111+
event_handler,
112+
meta_tx,
113+
cmd_tx,
114+
watches: HashMap::new(),
115+
wakeup_sem,
116+
};
117+
server.run();
118+
}
106119
});
107120
action_tx
108121
}
@@ -206,7 +219,7 @@ impl ReadDirectoryChangesServer {
206219
};
207220
// every watcher gets its own semaphore to signal completion
208221
let semaphore = unsafe { CreateSemaphoreW(ptr::null_mut(), 0, 1, ptr::null_mut()) };
209-
if semaphore == ptr::null_mut() || semaphore == INVALID_HANDLE_VALUE {
222+
if semaphore.is_null() || semaphore == INVALID_HANDLE_VALUE {
210223
unsafe {
211224
CloseHandle(handle);
212225
}
@@ -223,7 +236,7 @@ impl ReadDirectoryChangesServer {
223236
complete_sem: semaphore,
224237
};
225238
self.watches.insert(path.clone(), ws);
226-
start_read(&rd, self.event_handler.clone(), handle);
239+
start_read(&rd, self.event_handler.clone(), handle, self.tx.clone());
227240
Ok(path)
228241
}
229242

@@ -254,12 +267,18 @@ fn stop_watch(ws: &WatchState, meta_tx: &Sender<MetaEvent>) {
254267
let _ = meta_tx.send(MetaEvent::SingleWatchComplete);
255268
}
256269

257-
fn start_read(rd: &ReadData, event_handler: Arc<Mutex<dyn EventHandler>>, handle: HANDLE) {
270+
fn start_read(
271+
rd: &ReadData,
272+
event_handler: Arc<Mutex<dyn EventHandler>>,
273+
handle: HANDLE,
274+
action_tx: Sender<Action>,
275+
) {
258276
let request = Box::new(ReadDirectoryRequest {
259277
event_handler,
260278
handle,
261279
buffer: [0u8; BUF_SIZE as usize],
262280
data: rd.clone(),
281+
action_tx,
263282
});
264283

265284
let flags = FILE_NOTIFY_CHANGE_FILE_NAME
@@ -317,15 +336,44 @@ unsafe extern "system" fn handle_event(
317336
let overlapped: Box<OVERLAPPED> = Box::from_raw(overlapped);
318337
let request: Box<ReadDirectoryRequest> = Box::from_raw(overlapped.hEvent as *mut _);
319338

320-
if error_code == ERROR_OPERATION_ABORTED {
321-
// received when dir is unwatched or watcher is shutdown; return and let overlapped/request
322-
// get drop-cleaned
323-
ReleaseSemaphore(request.data.complete_sem, 1, ptr::null_mut());
324-
return;
339+
match error_code {
340+
ERROR_OPERATION_ABORTED => {
341+
// received when dir is unwatched or watcher is shutdown; return and let overlapped/request get drop-cleaned
342+
ReleaseSemaphore(request.data.complete_sem, 1, ptr::null_mut());
343+
return;
344+
}
345+
ERROR_ACCESS_DENIED => {
346+
// This could happen when the watched directory is deleted or trashed, first check if it's the case.
347+
// If so, unwatch the directory and return, otherwise, continue to handle the event.
348+
if !request.data.dir.exists() {
349+
request.unwatch();
350+
ReleaseSemaphore(request.data.complete_sem, 1, ptr::null_mut());
351+
return;
352+
}
353+
}
354+
ERROR_SUCCESS => {
355+
// Success, continue to handle the event
356+
}
357+
_ => {
358+
// Some unidentified error occurred, log and unwatch the directory, then return.
359+
log::error!(
360+
"unknown error in ReadDirectoryChangesW for directory {}: {}",
361+
request.data.dir.display(),
362+
error_code
363+
);
364+
request.unwatch();
365+
ReleaseSemaphore(request.data.complete_sem, 1, ptr::null_mut());
366+
return;
367+
}
325368
}
326369

327370
// Get the next request queued up as soon as possible
328-
start_read(&request.data, request.event_handler.clone(), request.handle);
371+
start_read(
372+
&request.data,
373+
request.event_handler.clone(),
374+
request.handle,
375+
request.action_tx,
376+
);
329377

330378
// The FILE_NOTIFY_INFORMATION struct has a variable length due to the variable length
331379
// string as its last member. Each struct contains an offset for getting the next entry in
@@ -431,7 +479,7 @@ impl ReadDirectoryChangesWatcher {
431479
let (cmd_tx, cmd_rx) = unbounded();
432480

433481
let wakeup_sem = unsafe { CreateSemaphoreW(ptr::null_mut(), 0, 1, ptr::null_mut()) };
434-
if wakeup_sem == ptr::null_mut() || wakeup_sem == INVALID_HANDLE_VALUE {
482+
if wakeup_sem.is_null() || wakeup_sem == INVALID_HANDLE_VALUE {
435483
return Err(Error::generic("Failed to create wakeup semaphore."));
436484
}
437485

0 commit comments

Comments
 (0)