Skip to content

[feat] add shrink_to_fit() to Sender<T> and Receiver<T> #148

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: master
Choose a base branch
from

Conversation

rakbladsvalsen
Copy link

@rakbladsvalsen rakbladsvalsen commented May 7, 2024

Fixes: #147

Copy link
Owner

@zesterer zesterer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit undecided how I feel about this PR. On the one hand, I recognise that this is a problem. On the other, this feels like it exposes far too much of the implementation in a public API.

One alternative might be to check, in the pull path, whether capacity > length * 2 + some constant and if so, shrink the internal queue automatically.

Comment on lines +806 to +814
/// Discards excess capacity in the internal queue.
pub fn shrink_to_fit(&self) {
self.shared.shrink_to_fit();
}

/// Returns the number of elements the internal queue can hold without reallocating.
pub fn queue_capacity(&self) -> usize {
self.shared.queue_capacity()
}
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that queue_capacity should be externally exposed, the internal queue is very much an implementation detail.

Comment on lines +807 to +809
pub fn shrink_to_fit(&self) {
self.shared.shrink_to_fit();
}
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a less precise name might be better here to account for implementation changes: there's no saying whether we'll want to continue to use std's queues in the future.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I completely agree. Perhaps shrink_internal_queue() is a better name?

@rakbladsvalsen
Copy link
Author

I'm a bit undecided how I feel about this PR. On the one hand, I recognise that this is a problem. On the other, this feels like it exposes far too much of the implementation in a public API.

Hey @zesterer, thanks for the review and sorry for the late response. I don't see how this could be solved without exposing a little bit of implementation details without adding either some sort of periodic task that shrinks the channels every now and then or what you mentioned.

In my opinion, instead of adding the capacity checks, shrinking operations should be delegated to the user to mimic rust's collections behavior. That's how Vec, Hashmap work, and this will keep flume simple and fast if the channels are short-lived.

@rakbladsvalsen
Copy link
Author

rakbladsvalsen commented Jul 4, 2024

Regarding the capacity method, it's necessary because otherwise calling shrink would be a shot in the dark. Without that method, users won't know the internal capacity at all, and thus, won't know when to call the shrink method (unless they have a periodic task that shrinks the channel, but users might implement some sort of shrink when capacity > length * 2).

I think it might also be useful to expose a method like shrink_to (https://doc.rust-lang.org/std/vec/struct.Vec.html#method.shrink_to) , but at this point we might as well just add an unsafe method (not actually unsafe, but at least to discourage users from using it) that returns a mutable reference to the internal queue. Then the users will be able to do whatever they want, including shrinking the queue, reading the capacity and so on. This keeps code simple and flexible at the same time.

@rakbladsvalsen
Copy link
Author

rakbladsvalsen commented Feb 5, 2025

@zesterer hey bud. Please review this whenever you get the chance. I've been using my own fork of flume in some projects but I'd very much appreciate to have this merged upstream.

BTW, you probably want to consider running cargo fmt on your codebase. Anyone who uses vscode/codium with rust-analyzer will have problems with this because the repo's code is unformatted, and rust-analyzer automatically formats on save.

@zesterer
Copy link
Owner

zesterer commented Feb 9, 2025

Regarding the capacity method, it's necessary because otherwise calling shrink would be a shot in the dark. Without that method, users won't know the internal capacity at all, and thus, won't know when to call the shrink method (unless they have a periodic task that shrinks the channel, but users might implement some sort of shrink when capacity > length * 2).

I think it might also be useful to expose a method like shrink_to (https://doc.rust-lang.org/std/vec/struct.Vec.html#method.shrink_to) , but at this point we might as well just add an unsafe method (not actually unsafe, but at least to discourage users from using it) that returns a mutable reference to the internal queue. Then the users will be able to do whatever they want, including shrinking the queue, reading the capacity and so on. This keeps code simple and flexible at the same time.

My reluctance about all of this is that 'a vector with capacity' is not even the right mental model to use for an MPMC channel (unlike, say, a vector or a hashmap). That it happens to be implemented with one right now is largely incidental: it could just as easily be implemented with a HashSet, a linked list, a tree, or any number of other underlying constructs, which would lead the implementation to not be compatible with this API.

Out of interest, what specific use-case are you looking for? Would a more general .optimize() method with a description like 'minimises internal data structures to save memory' be adequate for your use-case? This seems more likely to be forward-compatible with future implementations.

@rakbladsvalsen
Copy link
Author

My reluctance about all of this is that 'a vector with capacity' is not even the right mental model to use for an MPMC channel (unlike, say, a vector or a hashmap).

I completely agree that the underlying storage structure could be basically anything, perhaps something as simple as [T; N]. But I don't agree at all with the fact that this is incidental: VecDeque is a good choice, gets the job done and it's efficient and simple. Most channel libraries use either a VecDeque or [T; N] FIFOs anyway.
This is just my opinion, but if we're being realistic, and considering flume hasn't seen a lot of changes in the last year, I find it very unlikely that VecDeque will be replaced with something else.

Out of interest, what specific use-case are you looking for? Would a more general .optimize() method with a description like 'minimises internal data structures to save memory' be adequate for your use-case? This seems more likely to be forward-compatible with future implementations.

I use flume to asynchronously process events in long-running axum apps (e.g. save results to DB, send webhooks events, etc). More often than not I've seen memory usage go up and never go back again. I did some debugging and found out that something as common as a network fluctuation could cause events to pile up in the queue, causing flume to allocate more memory to hold the excess of items. Yes, I know a big bounded channel, timeouts, etc... could be used, but still, I'd very much prefer to at least be able to reclaim unused memory instead of saying "uh, the channel thing wastes memory even if it's empty but it is what it is".
I specifically use the capacity method to expose some metrics in grafana. Yes there are better ways to instrument code, but being able to know if the channel used tons of memory at any specific moment in time definitely helps.

While having an optimize() method would be better than not having anything at all, I don't think it would be adequate. How would users know when to call optimize()? What if users actually want to know capacity information to decide whether to call optimize() or not?

@zesterer
Copy link
Owner

While having an optimize() method would be better than not having anything at all, I don't think it would be adequate. How would users know when to call optimize()? What if users actually want to know capacity information to decide whether to call optimize() or not?

The problem with this is that there's no neat relationship between the channel capacity and the capacity of the underlying VecDeques: rendezvous queues have a channel capacity of 0, but in practice their internal queue does sometimes contain items. There is also more than one VecDeque (for example, for waiting threads) and I don't want that to be exposed.

I think a reasonable approach here is to just have flume itself shrink the internal queue if its capacity exceeds its length by some load factor + a small constant to avoid overzealous reallocation for small sizes. This also doesn't require the user to have to think about this problem either.

@rakbladsvalsen
Copy link
Author

rakbladsvalsen commented May 2, 2025

The problem with this is that there's no neat relationship between the channel capacity and the capacity of the underlying VecDeque

I fully agree. I hadn't taken a good chance to look at flume's codebase (even though it's small), but I just noticed there are about 3 queues. I think this issue can be potentially solved with a proper name? Perhaps something such as item_queue_shrink_to_fit?

If the other two queues (sending and waiting) are never going to significantly grow over time, even if consumers are slow, then I don't see why we should worry about naming conventions (since the only "relevant" queue is the item queue).

... if its capacity exceeds its length by some load factor + a small constant

That sounds good, but I think that takes away some of the user's power and visibility over the queue implementation. No matter how small or big the constant is, that's most likely going to cause problems: if it's too small, and the queue gets filled up frequently, it'll hurt performance because of the constant de/allocation. If it's too big, the queue will never deallocate unused memory (which is what happens today).

What I think would work best would be your .optimize() approach to free up resources used by all 3 internal queues, but also provide a method to take a look at the item queue size. This way you give users the choice to free up used resources whenever they think it's convenient to do so.

I know, I know... the underlying implementation could change, and the naming issue needs to be worked out, but as far as impl changes go, that's something very unlikely, considering flume is pretty much stable at this point.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants