Skip to content

Conversation

@phausler
Copy link
Member

This allows for full customization of buffers with simple actor based isolation.

It needs considerably more tests, but this approach will allow us to not only offer more customization but also more extensive testing.

Some neat characteristics: the AsyncBuffer protocol can be implemented with an actor if that makes sense, it participates in rethrowing (so if folks want to throw when the buffer is full they can do so easily), but the sneakiest part here is that the buffer has an input type and an output type. That last part means that we can buffer say characters and emit strings. Or do things like buffer up strings and concatenate them etc.

@phausler phausler requested a review from kperryua March 1, 2022 03:10

public actor AsyncLimitBuffer<Element: Sendable>: AsyncBuffer {
public enum Policy: Sendable {
case unbounded
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this winds up being the easiest choice to make for clients, we could be leading them down the path of unbounded memory growth as well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can remove the defaultness of that policy. I was just mirroring AsyncStream

case .bufferingNewest(let limit):
if buffer.count < limit {
buffer.append(element)
} else if buffer.count > 0 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check seems superfluous.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unless the limit is 0, which could be valid as a buffer right?

Copy link
Member

@kperryua kperryua Mar 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the limit is 0 (which seems weird to me, see below), then you will never have put anything in the buffer ever, and buffer.count will always be 0. The condition is still superfluous. Mathematically, the only time this condition matters is if both buffer.count and limit could be negative.

let policy: Policy

init(policy: Policy) {
self.policy = policy
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For bufferingOldest and Newest we should have a precondition(limit > 0).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should it be precondition(limit > 0) or precondition(limit >= 0)?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's supposed to happen with limit == 0? Do all values get dropped? 0 is weird, because .bufferingNewest(0) and .bufferingOldest(0) seems like they would be equvalent.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea a buffer of 0 seems silly, you should just remove the buffer if that is the case.

func update(_ apply: @Sendable (inout T) -> Void) async {
apply(&value)
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't appear to be used.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea the whole type is no longer used, will remove

break
}
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We just discussed this in person, but while this looks better on the surface and seems to behave better in your testing, I still have to think very carefully about the location of the suspension points and the possibilities of actor interleaving messing with the state machine in a manner not too dissimilar from the previous ManagedCriticalState approach.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed, it definitely needs more testing to validate it

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, I did find at least one instance where actor interleaving disrupts the state machine as written, putting the sequence into an undesirable state. I'm working on a fix, but it requires careful examination of the current state at every entry point and after every suspension point to the iterator's actor, since suspension can allow another call into the actor to proceed and therefore change the actor's state.

extension AsyncSequence where Element: Sendable {
public func buffer<Buffer: AsyncBuffer>(_ createBuffer: @Sendable @escaping () -> Buffer) -> AsyncBufferSequence<Self, Buffer> where Buffer.Input == Element {
AsyncBufferSequence(self, createBuffer: createBuffer)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, I like this shape of this, and the amount of utility available here is very high.

associatedtype Input: Sendable
associatedtype Output: Sendable

func push(_ element: Input) async
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something that occurred to me: an async push() function essentially provides a backpressure mechanism, yeah? Could we, for instance, have a type of buffer that accepts a certain number of items, but then when full, stashes away a continuation in push() that only gets resumed on pop()?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes that is possible (a bit more complicated). are you thinking of it having some sort of limit there too? or does that become unneeded?

Copy link
Member

@kperryua kperryua Mar 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'm thinking about an additional Policy value that is something like .bufferingWithBackpressure(Int) (terrible name) that happily collects upstream values while there's available space, making them quickly available to any less-hot downstream consumers. But instead of dropping oldest or newest values, it uses a continuation to exert backpressure on the Task that is consuming items from the base iterator.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem with that would be that we would need some sort of external mechanism to push because if the push is awaiting a continuation the pop would never be able to enter IIUC for the actor models. It is worth exploration as another option.

kperryua added 3 commits March 8, 2022 17:24
…interleaving issues. Fixed error/termination handling. Validation diagram tests, using “delay next” operators
@kperryua
Copy link
Member

I think there is still some more work to be done here. More extensive testing. Perhaps new default buffer policies, including "throw when full", and "exert backpressure".

switch state {
case .idle(let iterator, let createBuffer):
let bufferState = AsyncBufferState<Base.Element, Buffer.Output>()
let buffer = Active(Envelope(iterator), buffer: createBuffer(), state: bufferState)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After looking at it, I think the envelope thing is a bad idea; actors really should have Sendable requirements for the initialization parameters. That means that buffer must have the base type's iterator should be Sendable.

for continuation in pending {
continuation.resume(returning: .success(nil))
}
pending = []
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might want to set the pending to empty before resuming btw, that way we don't run the risk of bad states (granted here I think you are fine)

@kperryua kperryua merged commit 6ba2c05 into main Mar 14, 2022
@kperryua kperryua deleted the pr/buffer branch March 14, 2022 20:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants