Skip to content

poll_write May Cause an Infinite Loop in the Upper Layer #70

@oven-yang

Description

@oven-yang

While reviewing the code, I noticed a potential issue with poll_write when handling write requests.

If a write operation fails, the error is only logged in the task and the reader pipe is closed. However, subsequent write requests continue to be processed as normal, without detecting the earlier failure. Instead, these requests return a write length of 0. The error is only surfaced when the upper layer initiates a flush or close operation.

A common file I/O pattern is to split data into smaller chunks and write them sequentially. When the written length is less than the chunk size, the application incrementally writes the remaining data. Importantly, a write length of 0 is not typically treated as a failure.

Given this behavior, a single write failure could cause the upper layer to enter an infinite loop, as it repeatedly attempts to write data without detecting the underlying error.

To verify this, I created a simple demo that reproduces the issue. The demo is based on crate hdrs implementation, as this problem came to my attention while using opendal to access HDFS, where I observed a thread getting stuck in an infinite loop.

Demo code:

use std::io::{ErrorKind, Read, Result, Seek, SeekFrom, Write};
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use blocking::Unblock;
use futures::lock::Mutex;
use futures::{ready, AsyncSeek, AsyncWrite, AsyncWriteExt};

pub struct File {
    path: String,
}

unsafe impl Send for File {}
unsafe impl Sync for File {}

impl Drop for File {
    fn drop(&mut self) {
        println!("close file {}", self.path);
    }
}

impl File {
    pub(crate) fn new(path: String) -> Self {
        File { path }
    }

    fn inner_seek(&self, offset: i64) -> Result<()> {
        println!(
            "inner_seek file:{}, offset:{}",
            self.path,
            offset
        );
        Ok(())
    }

    fn tell(&self) -> Result<i64> {
        println!("tell file:{}", self.path);
        Ok(0)
    }

    pub fn read_at(&self, buf: &mut [u8], offset: u64) -> Result<usize> {
        println!(
            "read_at file:{}, from:{}, len:{}",
            self.path,
            offset,
            buf.len()
        );
        Ok(buf.len())
    }
}

impl Read for File {
    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
        println!(
            "read file:{}, len:{}",
            self.path,
            buf.len()
        );
        Ok(buf.len())
    }
}

impl Seek for File {
    fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
        println!("seek file:{}, pos:{:?}", self.path, pos);
        match pos {
            SeekFrom::Start(n) => {
                self.inner_seek(n as i64)?;
                Ok(n)
            }
            SeekFrom::Current(n) => {
                let current = self.tell()?;
                let offset = (current + n) as u64;
                self.inner_seek(offset as i64)?;
                Ok(offset)
            }
            SeekFrom::End(n) => {
                self.inner_seek(n)?;
                Ok(n as u64)
            }
        }
    }
}

impl Write for File {
    fn write(&mut self, buf: &[u8]) -> Result<usize> {
        println!(
            "write file:{}, len:{}",
            self.path,
            buf.len()
        );
        Ok(buf.len())
    }

    fn flush(&mut self) -> Result<()> {
        println!("flush file:{}", self.path);
        Ok(())
    }
}

impl Read for &File {
    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
        println!(
            "read file:{}, len:{}",
            self.path,
            buf.len()
        );
        Ok(buf.len())
    }
}

impl Seek for &File {
    fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
        println!(
            "{} file:{}, pos:{:?}",
            std::any::type_name::<fn()>(),
            self.path,
            pos
        );
        match pos {
            SeekFrom::Start(n) => {
                self.inner_seek(n as i64)?;
                Ok(n)
            }
            SeekFrom::Current(n) => {
                let current = self.tell()?;
                let offset = (current + n) as u64;
                self.inner_seek(offset as i64)?;
                Ok(offset)
            }
            SeekFrom::End(n) => {
                self.inner_seek(n)?;
                Ok(n as u64)
            }
        }
    }
}

impl Write for &File {
    fn write(&mut self, buf: &[u8]) -> Result<usize> {
        println!(
            "write& file:{}, len:{}",
            self.path,
            buf.len()
        );
        Err(ErrorKind::Interrupted.into())
    }

    fn flush(&mut self) -> Result<()> {
        println!("flush& file:{}", self.path);
        Ok(())
    }
}

struct ArcFile(Arc<File>);

impl Read for ArcFile {
    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
        (&*self.0).read(buf)
    }
}

impl Write for ArcFile {
    fn write(&mut self, buf: &[u8]) -> Result<usize> {
        (&*self.0).write(buf)
    }

    fn flush(&mut self) -> Result<()> {
        (&*self.0).flush()
    }
}

impl Seek for ArcFile {
    fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
        (&*self.0).seek(pos)
    }
}

pub struct AsyncFile {
    _file: Arc<File>,
    unblock: Mutex<Unblock<ArcFile>>,
    read_pos: Option<Result<u64>>,
    is_dirty: bool,
}

impl AsyncFile {
    pub(crate) fn new(inner: File, is_dirty: bool) -> AsyncFile {
        let file = Arc::new(inner);
        let unblock = Mutex::new(Unblock::new(ArcFile(file.clone())));
        let read_pos = None;
        AsyncFile {
            _file: file,
            unblock,
            read_pos,
            is_dirty,
        }
    }

    fn poll_reposition(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
        if let Some(Ok(read_pos)) = self.read_pos {
            ready!(Pin::new(self.unblock.get_mut()).poll_seek(cx, SeekFrom::Start(read_pos)))?;
        }
        self.read_pos = None;
        Poll::Ready(Ok(()))
    }
}

impl futures::AsyncRead for AsyncFile {
    fn poll_read(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut [u8],
    ) -> Poll<Result<usize>> {
        if self.read_pos.is_none() {
            self.read_pos = Some(ready!(self.as_mut().poll_seek(cx, SeekFrom::Current(0))));
        }

        let n = ready!(Pin::new(self.unblock.get_mut()).poll_read(cx, buf))?;

        if let Some(Ok(pos)) = self.read_pos.as_mut() {
            *pos += n as u64;
        }

        Poll::Ready(Ok(n))
    }
}

impl AsyncSeek for AsyncFile {
    fn poll_seek(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        pos: SeekFrom,
    ) -> Poll<Result<u64>> {
        ready!(self.poll_reposition(cx))?;
        Pin::new(self.unblock.get_mut()).poll_seek(cx, pos)
    }
}

impl AsyncWrite for AsyncFile {
    fn poll_write(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &[u8],
    ) -> Poll<Result<usize>> {
        ready!(self.poll_reposition(cx))?;
        self.is_dirty = true;
        Pin::new(self.unblock.get_mut()).poll_write(cx, buf)
    }

    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
        if self.is_dirty {
            ready!(Pin::new(self.unblock.get_mut()).poll_flush(cx))?;
            self.is_dirty = false;
        }
        Poll::Ready(Ok(()))
    }

    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
        Pin::new(self.unblock.get_mut()).poll_close(cx)
    }
}

fn open(path: &str) -> AsyncFile {
    AsyncFile::new(File::new(path.to_string()), false)
}

#[tokio::main]
async fn main() {
    let mut file = open("file.txt");
    loop {
        let ret =  file.write(b"123456789").await;
        println!("file written: {:?}", ret);  // will always return Ok(0) except the first write.
        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
    }
}

Suggested Fix:

Consider modifying poll_write to propagate the error immediately upon a write failure, allowing the upper layer to handle it appropriately and avoid infinite loops.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions