Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disconnecting a channel should, perhaps, drop unreceived messages #89

Open
itamarst opened this issue Sep 6, 2021 · 6 comments
Open

Comments

@itamarst
Copy link

itamarst commented Sep 6, 2021

This may or may not be a problem in general; in my case it caused a deadlock. Scenario:

struct Command {
    sender: flume::Sender<()>
}

impl Command {
    fn new() -> (Self, flume::Receiver<()>) {
        let (tx, rx) = flume::bounded(1);
        Command { sender: tx }, rx
    }

    fn run(&mut self) {
        do_something();
        self.sender.send(()).unwrap();
    }
}

fn run_command_thread(receiver: flume::Receiver<Command>) {
    for mut command in receiver {
        command.run();
    }
}

fn main() {
    let (tx, rx) = flume::unbounded();
    std::thread::spawn(move || {
        run_command_thread(rx);
    });
    for i in 0..5000 {
        let command_rx, command = Command::new();
        tx.send(command).unwrap();
        // wait for command to finish; in some scenarios this blocks forever.
        command_rx.recv().unwrap();
    }
}

This is a pretty standard pattern for running commands in another thread and getting back a result.

Problem is if the command thread panics before reading all commands, the main thread might be blocking on command_rx.recv() forever, because the corresponding sender is stuck in a Command in the channel and will never ever be read because the command thread is dead.

The solution is to have a timeout on command_rx.recv() and check if the command thread (via tx.is_disconnected()), but... perhaps more broadly it makes sense to say that if there are no receivers left, anything in the channel gets dropped, because no one will ever be touching that again anyway. Or maybe not. Worth considering at least.

@itamarst
Copy link
Author

itamarst commented Sep 6, 2021

(I should add that I am hypothesizing that commands in the channel are not dropped when the last receiver is dropped, I haven't checked the code. But that's the only thing that makes sense given the behavior I'v seen.)

@zesterer
Copy link
Owner

zesterer commented Sep 6, 2021

Hi,

Unfortunately, the problem is not quite as simple to solve as stated. When sending fails, the sender still expects to be able to get back the sent message as part of the error value because it was not received, so the last receiver to be dropped cannot simply empty the channel preemptively. The original sender must first indicate that it no longer cares about the status of the message, which is something that the current implementation does not facilitate. I'll need to think about this more carefully, but potential solutions to this problem I'm currently able to think of incur a non-trivial performance cost.

I should have time to look into this more carefully in a week.

@itamarst
Copy link
Author

itamarst commented Sep 6, 2021

Don't want a performance hit, yeah, Flume's speed is one of its attractions.

Thinking about this a little (from my limited understanding), there are some general approaches to solving this:

  1. Documentation: Just document this explicitly, with some workarounds.
  2. Channels only: Focus just on my particular use case of response channels.
  3. General: Solve the problem more generally.

The latter two cases still probably should have some explanatory documentation of the problem, it's a very easy to miss race condition (I only discovered it because I'm doing panic injection testing using a simplified equivalent of https://docs.rs/fail/).

My current solution is more or less:

    fn wait_for_recv(&self, command_receiver: flume::Receiver<()>) {
        loop {
            if self.sender.is_disconnected() {
                return;
            }
            match command_receiver.recv_timeout(std::time::Duration::from_millis(10)) {
                Ok(_) => return,
                Err(flume::RecvTimeoutError::Timeout) => continue,
                Err(flume::RecvTimeoutError::Disconnected) => return,
            }
        }
    }

Something like this could go into the documentation as an example.

Or, Selector could be expanded so that in addition to waiting on send or recv, it can also wait on disconnect. This would be a channels-only solution.

Or, Receiver could grow a new method, disconnect_and_flush() which disconnects and drops everything left in the channel (being careful to avoid the potential race conditions involved). This would be a more general solution.

@itamarst
Copy link
Author

itamarst commented Sep 6, 2021

Maybe instead of disconnect_and_flush() you just need a way to disconnect a Receiver without dropping it and without preventing reading what's already in the channel. Something likestop_receiving():

fn run_command_thread(receiver: flume::Receiver<Command>) {
    let result = std::panic::catch_unwind(|| {
        for mut command in receiver {
            command.run();
        }
    });
    if let Err(e) = result {
        receiver.stop_receiving();
        for _x in receiver.drain() { drop(x) };
        std::panic::resume_unwind(e);
    }
}

@zesterer
Copy link
Owner

zesterer commented Sep 6, 2021

I'm going to look into this more carefully next week because I don't think I currently have the time to do it justice. That said, implementing this feature could potentially sit nicely with a wider refactor I've been planning to do for a while.

@itamarst
Copy link
Author

itamarst commented Sep 6, 2021

Appreciate all your work on Flume! And the above was just me braindumping ideas, if you never get to it that's fine too, no pressure.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants