Skip to content

you dun goofed #4284

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
jesse99 opened this issue Dec 25, 2012 · 6 comments
Closed

you dun goofed #4284

jesse99 opened this issue Dec 25, 2012 · 6 comments
Labels
A-concurrency Area: Concurrency

Comments

@jesse99
Copy link
Contributor

jesse99 commented Dec 25, 2012

After switching my code to use pipes I'm getting task failures with the above message. I finally tracked it down to cleanup code in a task which was sending close messages to other tasks right before exiting from the task. Sleeping for a bit after sending the close message seems to fix the failure.

I should point out that the message is an "impossible" case from pipes::sender_terminate.

@jesse99
Copy link
Contributor Author

jesse99 commented Dec 25, 2012

I spent quite a while trying to come up with a decent test case but I was unable to. However the code below is similar to what I am doing, although unfortunately it does not expose the problem.

extern mod std;

fn handle_connection(notify: pipes::Chan<()>)
{
    do task::spawn_sched(task::ManualThreads(2)) |move notify|
    {
        let (push_port, push_chan) = pipes::stream();
        let push_chan = pipes::SharedChan(push_chan);

        let mut tasks = ~[];
        loop
        {
            let exit_chan = pusher(push_chan.clone());
            vec::push(&mut tasks, exit_chan);

            // In the real code this happens in response to a message sent
            // as a result of a socket closing.
            if tasks.len() == 1
            {
                notify.send(());
                close_tasks(tasks);
                break;
            }

            push_port.recv();
            error!("received");
        }
    }
}

fn close_tasks(tasks: &[pipes::Chan<()>])
{
    for tasks.each |task|
    {
        task.send(());
    }
    // In the real code I get the failure unless I sleep for a bit here.
}

fn pusher(push_chan: pipes::SharedChan<~str>) -> pipes::Chan<()>
{
    let (exit_port, exit_chan) = pipes::stream();
    let push_chan = push_chan.clone();
    do task::spawn_sched(task::ThreadPerCore) |move push_chan, move exit_port|
    {
        while !exit_port.peek()
        {
            error!("sending");
            push_chan.send(~"hmm");
            libc::funcs::posix88::unistd::sleep(1);
        }
    }
    exit_chan
}

fn main()
{
    let (port, chan) = pipes::stream();
    handle_connection(chan);
    port.recv();
}

@jesse99
Copy link
Contributor Author

jesse99 commented Dec 25, 2012

Managed to get a repro while trying to repro a different pipes problem. The code is a bit complex but should still be a somewhat reasonable test case:

extern mod std;
use core::send_map::linear::{LinearMap};

enum StateMesg
{
    AddListener(~str, pipes::Chan<int>),    // str is used to identify the listener
    RemoveListener(~str),
    Shutdown,
}

type StatePort = pipes::Port<StateMesg>;
type StateChan = pipes::Chan<StateMesg>;

pub enum ControlEvent
{
    RefreshEvent,
    CloseEvent,
}
pub type PushChan = pipes::SharedChan<~str>;
pub type ControlPort = pipes::Port<ControlEvent>;
pub type ControlChan = pipes::Chan<ControlEvent>;
pub type OpenSse = fn~ (channel: PushChan) -> ControlChan;
pub type SseTasks = ~[(~str, ControlChan)];

fn main()
{
    let state_chan = pipes::SharedChan(manage_state());
    let up: OpenSse = |push| {uptime_sse(state_chan.clone(), push)};

    let (exit_port, exit_chan) = pipes::stream();
    handle_connection(exit_port, up);

    libc::funcs::posix88::unistd::sleep(3);
//    state_chan.send(Shutdown);
    exit_chan.send(());

    // Not quite sure how to shutdown the manage_state task so for now
    // we'll forcibly exit.
    libc::funcs::posix88::unistd::sleep(1);
    libc::exit(0);
}

fn manage_state() -> StateChan
{
    error!("starting manage_state");
    let (state_port, state_chan) : (StatePort, StateChan) = pipes::stream();
    do task::spawn_sched(task::ManualThreads(1)) |move state_port|
    {
        let mut time = 0;
        let mut listeners = LinearMap();
        loop
        {
            time += 1;
            libc::funcs::posix88::unistd::sleep(1);
            error!("sending new state");
            for listeners.each_value |ch: &pipes::Chan<int>| {ch.send(copy(time))};

            if state_port.peek()
            {
                match state_port.recv()
                {
                    AddListener(key, ch) =>
                    {
                        let added = listeners.insert(key, ch);
                        assert added;
                    }
                    RemoveListener(key) =>
                    {
                        listeners.remove(&key);
                    }
                    Shutdown =>
                    {
                        error!("exiting manage_state");
                        break;
                    }
                }
            }
        }
    }
    state_chan
}

fn uptime_sse(registrar: pipes::SharedChan<StateMesg>, push: PushChan) -> ControlChan
{
    error!("starting sse client");
    let (control_port, control_chan): (ControlPort, ControlChan) = pipes::stream();
    do task::spawn_sched(task::ThreadPerCore) |move control_port, move registrar, move push|
    {
        let (notify_port, notify_chan) = pipes::stream();

        let key = fmt!("uptime %?", ptr::addr_of(&notify_port));
        registrar.send(AddListener(copy key, notify_chan));

        loop
        {
            let mut time = 0;
            match pipes::select2i(&notify_port, &control_port)
            {
                either::Left(_) =>
                {
                    error!("pushing new state");
                    let new_time = notify_port.recv();
                    time = new_time;
                    push.send(fmt!("retry: 5000\ndata: %?\n\n", time));
                }
                either::Right(_) =>
                {
                    match control_port.recv()
                    {
                        RefreshEvent =>
                        {
                            push.send(fmt!("retry: 5000\ndata: %?\n\n", time));
                        }
                        CloseEvent =>
                        {
                            error!("exiting sse client");
                            registrar.send(RemoveListener(key));
                            break;
                        }
                    }
                }
            }
        }
    }
    control_chan
}

pub fn handle_connection(exit: pipes::Port<()>, opener: OpenSse)
{
    error!("starting connection");
    do task::spawn_sched(task::ManualThreads(1))
    {
        let (sse_port, sse_chan) = pipes::stream();
        let sse_chan = pipes::SharedChan(sse_chan);

        let mut sse_tasks = ~[];
        let control_chan = opener(sse_chan);
        vec::push(&mut sse_tasks, (~"some path", control_chan));

        loop
        {
            match pipes::select2i(&exit, &sse_port)
            {
                either::Left(_) =>
                {
                    error!("exiting connection");
                    exit.recv();
                    close_sses(&sse_tasks);
                    break;
                }
                either::Right(_) =>
                {
                    let body = sse_port.recv();
                    error!("received %s state", body);
                }
            }
        }
    }
}

pub fn close_sses(tasks: &SseTasks)
{
    error!("closing all sse");
    for tasks.each |&(_path, control_ch)|
    {
        control_ch.send(CloseEvent);
    };

    // With this commented out we get the "you dun goofed" failure.
//    libc::funcs::posix88::unistd::sleep(1);
}

When I run this I get:

RUST_LOG=3 && rustc test.rs
warning: no debug symbols in executable (-arch x86_64)
rust: ~"starting manage_state"
rust: ~"starting connection"
rust: ~"starting sse client"
rust: ~"sending new state"
rust: ~"sending new state"
rust: ~"pushing new state"
rust: ~"received retry: 5000\ndata: 2\n\n state"
rust: ~"exiting connection"
rust: ~"closing all sse"
rust: task failed at 'you dun goofed', test.rs:1
rust: task failed at 'connection closed', test.rs:1
rust: domain main @0x7fb3fb81a610 root task failed
rust: ~"sending new state"

@bblum
Copy link
Contributor

bblum commented Feb 28, 2013

@eholk

@Aatch
Copy link
Contributor

Aatch commented Jun 7, 2013

This still happens. Here is an updated test case:

extern mod extra;
use std::hashmap::HashMap;
use std::*;

enum StateMesg
{
    AddListener(~str, comm::Chan<int>),    // str is used to identify the listener
    RemoveListener(~str),
    Shutdown,
}

type StatePort = comm::Port<StateMesg>;
type StateChan = comm::Chan<StateMesg>;

pub enum ControlEvent
{
    RefreshEvent,
    CloseEvent,
}
pub type PushChan = comm::SharedChan<~str>;
pub type ControlPort = comm::Port<ControlEvent>;
pub type ControlChan = comm::Chan<ControlEvent>;
pub type OpenSse = ~fn (channel: PushChan) -> ControlChan;
pub type SseTasks = ~[(~str, ControlChan)];

fn main()
{
    let state_chan = comm::SharedChan::new(manage_state());
    let up: OpenSse = |push| {uptime_sse(state_chan.clone(), push)};

    let (exit_port, exit_chan) = comm::stream();
    handle_connection(exit_port, up);

    unsafe {
        libc::sleep(3);
        exit_chan.send(());

        libc::sleep(1);
        libc::exit(0);
    }
}

fn manage_state() -> StateChan
{
    error!("starting manage_state");
    let (state_port, state_chan) : (StatePort, StateChan) = comm::stream();
    do task::spawn_sched(task::ManualThreads(1))
    {
        let mut time = 0;
        let mut listeners = HashMap::new();
        loop
        {
            time += 1;
            unsafe {
            libc::sleep(1);
            }
            error!("sending new state");
            for listeners.each_value |ch: &comm::Chan<int>| {ch.send(time)};

            if state_port.peek()
            {
                match state_port.recv()
                {
                    AddListener(key, ch) =>
                    {
                        let added = listeners.insert(key, ch);
                        assert!(added);
                    }
                    RemoveListener(key) =>
                    {
                        listeners.remove(&key);
                    }
                    Shutdown =>
                    {
                        error!("exiting manage_state");
                        break;
                    }
                }
            }
        }
    }
    state_chan
}

fn uptime_sse(registrar: comm::SharedChan<StateMesg>, push: PushChan) -> ControlChan
{
    error!("starting sse client");
    let (control_port, control_chan): (ControlPort, ControlChan) = comm::stream();
    let control_cell = cell::Cell(control_port);
    do task::spawn_sched(task::ThreadPerCore)
    {
        let mut (notify_port, notify_chan) = comm::stream();

        let mut control_port = control_cell.take();

        let key = fmt!("uptime %?", ptr::to_unsafe_ptr(&notify_port));
        registrar.send(AddListener(copy key, notify_chan));

        loop
        {
            let mut time = 0;
            match comm::select2i(&mut notify_port, &mut control_port)
            {
                either::Left(_) =>
                {
                    error!("pushing new state");
                    let new_time = notify_port.recv();
                    time = new_time;
                    push.send(fmt!("retry: 5000\ndata: %?\n\n", time));
                }
                either::Right(_) =>
                {
                    match control_port.recv()
                    {
                        RefreshEvent =>
                        {
                            push.send(fmt!("retry: 5000\ndata: %?\n\n", time));
                        }
                        CloseEvent =>
                        {
                            error!("exiting sse client");
                            registrar.send(RemoveListener(key));
                            break;
                        }
                    }
                }
            }
        }
    }
    control_chan
}

pub fn handle_connection(exit: comm::Port<()>, opener: OpenSse)
{
    let exit_cell = cell::Cell(exit);
    error!("starting connection");
    do task::spawn_sched(task::ManualThreads(1))
    {
        let mut (sse_port, sse_chan) = comm::stream();
        let sse_chan = comm::SharedChan::new(sse_chan);
        let mut exit = exit_cell.take();

        let mut sse_tasks = ~[];
        let control_chan = opener(sse_chan);
        vec::push(&mut sse_tasks, (~"some path", control_chan));

        loop
        {
            match pipes::select2i(&mut exit, &mut sse_port)
            {
                either::Left(_) =>
                {
                    error!("exiting connection");
                    exit.recv();
                    close_sses(&sse_tasks);
                    break;
                }
                either::Right(_) =>
                {
                    let body = sse_port.recv();
                    error!("received %s state", body);
                }
            }
        }
    }
}

pub fn close_sses(tasks: &SseTasks)
{
    error!("closing all sse");
    for tasks.each |&(_path, control_ch)|
    {
        control_ch.send(CloseEvent);
    };
    // With this commented out we get the "you dun goofed" failure.
//    libc::funcs::posix88::uniextra::sleep(1);
}

@ghost ghost assigned bblum Jul 14, 2013
@bblum
Copy link
Contributor

bblum commented Jul 17, 2013

This is not a bug. At the bottom of the test case is the unsafe argument pattern |&(_path, control_ch)|, which illegally copies a noncopyable channel, resulting in double destruction. The program is no longer accepted (even after being re-modernized).

@bblum bblum closed this as completed Jul 17, 2013
@eholk
Copy link
Contributor

eholk commented Jul 18, 2013

I'm glad to see cases I thought were impossible are apparently impossible.

RalfJung pushed a commit to RalfJung/rust that referenced this issue Apr 29, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-concurrency Area: Concurrency
Projects
None yet
Development

No branches or pull requests

4 participants