-
-
Notifications
You must be signed in to change notification settings - Fork 91
[feat] add shrink_to_fit() to Sender<T> and Receiver<T> #148
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit undecided how I feel about this PR. On the one hand, I recognise that this is a problem. On the other, this feels like it exposes far too much of the implementation in a public API.
One alternative might be to check, in the pull path, whether capacity > length * 2 + some constant
and if so, shrink the internal queue automatically.
/// Discards excess capacity in the internal queue. | ||
pub fn shrink_to_fit(&self) { | ||
self.shared.shrink_to_fit(); | ||
} | ||
|
||
/// Returns the number of elements the internal queue can hold without reallocating. | ||
pub fn queue_capacity(&self) -> usize { | ||
self.shared.queue_capacity() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that queue_capacity
should be externally exposed, the internal queue is very much an implementation detail.
pub fn shrink_to_fit(&self) { | ||
self.shared.shrink_to_fit(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a less precise name might be better here to account for implementation changes: there's no saying whether we'll want to continue to use std
's queues in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I completely agree. Perhaps shrink_internal_queue()
is a better name?
Hey @zesterer, thanks for the review and sorry for the late response. I don't see how this could be solved without exposing a little bit of implementation details without adding either some sort of periodic task that shrinks the channels every now and then or what you mentioned. In my opinion, instead of adding the capacity checks, shrinking operations should be delegated to the user to mimic rust's collections behavior. That's how |
Regarding the I think it might also be useful to expose a method like |
@zesterer hey bud. Please review this whenever you get the chance. I've been using my own fork of flume in some projects but I'd very much appreciate to have this merged upstream. BTW, you probably want to consider running |
My reluctance about all of this is that 'a vector with capacity' is not even the right mental model to use for an MPMC channel (unlike, say, a vector or a hashmap). That it happens to be implemented with one right now is largely incidental: it could just as easily be implemented with a Out of interest, what specific use-case are you looking for? Would a more general |
I completely agree that the underlying storage structure could be basically anything, perhaps something as simple as
I use flume to asynchronously process events in long-running axum apps (e.g. save results to DB, send webhooks events, etc). More often than not I've seen memory usage go up and never go back again. I did some debugging and found out that something as common as a network fluctuation could cause events to pile up in the queue, causing While having an |
The problem with this is that there's no neat relationship between the channel capacity and the capacity of the underlying I think a reasonable approach here is to just have flume itself shrink the internal queue if its capacity exceeds its length by some load factor + a small constant to avoid overzealous reallocation for small sizes. This also doesn't require the user to have to think about this problem either. |
I fully agree. I hadn't taken a good chance to look at flume's codebase (even though it's small), but I just noticed there are about 3 queues. I think this issue can be potentially solved with a proper name? Perhaps something such as If the other two queues (
That sounds good, but I think that takes away some of the user's power and visibility over the queue implementation. No matter how small or big the constant is, that's most likely going to cause problems: if it's too small, and the queue gets filled up frequently, it'll hurt performance because of the constant de/allocation. If it's too big, the queue will never deallocate unused memory (which is what happens today). What I think would work best would be your I know, I know... the underlying implementation could change, and the naming issue needs to be worked out, but as far as impl changes go, that's something very unlikely, considering flume is pretty much stable at this point. |
Fixes: #147