-
Notifications
You must be signed in to change notification settings - Fork 190
A full knobs-and-dials buffering strategy transformation #61
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…ests for custom buffer types
|
|
||
| public actor AsyncLimitBuffer<Element: Sendable>: AsyncBuffer { | ||
| public enum Policy: Sendable { | ||
| case unbounded |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this winds up being the easiest choice to make for clients, we could be leading them down the path of unbounded memory growth as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can remove the defaultness of that policy. I was just mirroring AsyncStream
| case .bufferingNewest(let limit): | ||
| if buffer.count < limit { | ||
| buffer.append(element) | ||
| } else if buffer.count > 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This check seems superfluous.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unless the limit is 0, which could be valid as a buffer right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the limit is 0 (which seems weird to me, see below), then you will never have put anything in the buffer ever, and buffer.count will always be 0. The condition is still superfluous. Mathematically, the only time this condition matters is if both buffer.count and limit could be negative.
| let policy: Policy | ||
|
|
||
| init(policy: Policy) { | ||
| self.policy = policy |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For bufferingOldest and Newest we should have a precondition(limit > 0).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should it be precondition(limit > 0) or precondition(limit >= 0)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's supposed to happen with limit == 0? Do all values get dropped? 0 is weird, because .bufferingNewest(0) and .bufferingOldest(0) seems like they would be equvalent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea a buffer of 0 seems silly, you should just remove the buffer if that is the case.
| func update(_ apply: @Sendable (inout T) -> Void) async { | ||
| apply(&value) | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't appear to be used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea the whole type is no longer used, will remove
| break | ||
| } | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We just discussed this in person, but while this looks better on the surface and seems to behave better in your testing, I still have to think very carefully about the location of the suspension points and the possibilities of actor interleaving messing with the state machine in a manner not too dissimilar from the previous ManagedCriticalState approach.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agreed, it definitely needs more testing to validate it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately, I did find at least one instance where actor interleaving disrupts the state machine as written, putting the sequence into an undesirable state. I'm working on a fix, but it requires careful examination of the current state at every entry point and after every suspension point to the iterator's actor, since suspension can allow another call into the actor to proceed and therefore change the actor's state.
| extension AsyncSequence where Element: Sendable { | ||
| public func buffer<Buffer: AsyncBuffer>(_ createBuffer: @Sendable @escaping () -> Buffer) -> AsyncBufferSequence<Self, Buffer> where Buffer.Input == Element { | ||
| AsyncBufferSequence(self, createBuffer: createBuffer) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall, I like this shape of this, and the amount of utility available here is very high.
| associatedtype Input: Sendable | ||
| associatedtype Output: Sendable | ||
|
|
||
| func push(_ element: Input) async |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something that occurred to me: an async push() function essentially provides a backpressure mechanism, yeah? Could we, for instance, have a type of buffer that accepts a certain number of items, but then when full, stashes away a continuation in push() that only gets resumed on pop()?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes that is possible (a bit more complicated). are you thinking of it having some sort of limit there too? or does that become unneeded?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I'm thinking about an additional Policy value that is something like .bufferingWithBackpressure(Int) (terrible name) that happily collects upstream values while there's available space, making them quickly available to any less-hot downstream consumers. But instead of dropping oldest or newest values, it uses a continuation to exert backpressure on the Task that is consuming items from the base iterator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem with that would be that we would need some sort of external mechanism to push because if the push is awaiting a continuation the pop would never be able to enter IIUC for the actor models. It is worth exploration as another option.
…interleaving issues. Fixed error/termination handling. Validation diagram tests, using “delay next” operators
|
I think there is still some more work to be done here. More extensive testing. Perhaps new default buffer policies, including "throw when full", and "exert backpressure". |
| switch state { | ||
| case .idle(let iterator, let createBuffer): | ||
| let bufferState = AsyncBufferState<Base.Element, Buffer.Output>() | ||
| let buffer = Active(Envelope(iterator), buffer: createBuffer(), state: bufferState) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After looking at it, I think the envelope thing is a bad idea; actors really should have Sendable requirements for the initialization parameters. That means that buffer must have the base type's iterator should be Sendable.
… conditional Sendable conformances
| for continuation in pending { | ||
| continuation.resume(returning: .success(nil)) | ||
| } | ||
| pending = [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
might want to set the pending to empty before resuming btw, that way we don't run the risk of bad states (granted here I think you are fine)
This allows for full customization of buffers with simple actor based isolation.
It needs considerably more tests, but this approach will allow us to not only offer more customization but also more extensive testing.
Some neat characteristics: the
AsyncBufferprotocol can be implemented with an actor if that makes sense, it participates in rethrowing (so if folks want to throw when the buffer is full they can do so easily), but the sneakiest part here is that the buffer has an input type and an output type. That last part means that we can buffer say characters and emit strings. Or do things like buffer up strings and concatenate them etc.