Skip to content

Commit 6ba2c05

Browse files
phauslerkperryua
andauthored
A full knobs-and-dials buffering strategy transformation (apple#61)
* A full knobs-and-dials buffering strategy transformation * Refine the AsyncBuffer type to require actor and add some additonal tests for custom buffer types * Migrate the internal state to an actor * Remove default unbounded policy from buffers * Remove some dead code tests and fix space removal to avoid superfluous checks * Simplification of nominal state machine, which helps eliminate actor interleaving issues. Fixed error/termination handling. Validation diagram tests, using “delay next” operators * Most robust behavior when multiple tasks are concurrently accessing the iterator * Removed Envelope, added base iterator Sendable requirement, and added conditional Sendable conformances * Minor future-proofing Co-authored-by: Kevin Perry <[email protected]>
1 parent f0f348a commit 6ba2c05

File tree

2 files changed

+758
-0
lines changed

2 files changed

+758
-0
lines changed
Lines changed: 277 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,277 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the Swift Async Algorithms open source project
4+
//
5+
// Copyright (c) 2022 Apple Inc. and the Swift project authors
6+
// Licensed under Apache License v2.0 with Runtime Library Exception
7+
//
8+
// See https://swift.org/LICENSE.txt for license information
9+
//
10+
//===----------------------------------------------------------------------===//
11+
12+
actor AsyncBufferState<Input: Sendable, Output: Sendable> {
13+
enum TerminationState: Sendable, CustomStringConvertible {
14+
case running
15+
case baseFailure(Error) // An error from the base sequence has occurred. We need to process any buffered items before throwing the error. We can rely on it not emitting any more items.
16+
case baseTermination
17+
case terminal
18+
19+
var description: String {
20+
switch self {
21+
case .running: return "running"
22+
case .baseFailure: return "base failure"
23+
case .baseTermination: return "base termination"
24+
case .terminal: return "terminal"
25+
}
26+
}
27+
}
28+
29+
var pending = [UnsafeContinuation<Result<Output?, Error>, Never>]()
30+
var terminationState = TerminationState.running
31+
32+
init() { }
33+
34+
func drain<Buffer: AsyncBuffer>(buffer: Buffer) async where Buffer.Input == Input, Buffer.Output == Output {
35+
guard pending.count > 0 else {
36+
return
37+
}
38+
39+
do {
40+
if let value = try await buffer.pop() {
41+
pending.removeFirst().resume(returning: .success(value))
42+
} else {
43+
switch terminationState {
44+
case .running:
45+
// There's no value to report, because it was probably grabbed by next() before we could grab it. The pending continuation was either resumed by next() directly, or will be by a future enqueued value or base termination/failure.
46+
break
47+
case .baseFailure(let error):
48+
// Now that there are no more items in the buffer, we can finally report the base sequence's error and enter terminal state.
49+
pending.removeFirst().resume(returning: .failure(error))
50+
self.terminate()
51+
case .terminal, .baseTermination:
52+
self.terminate()
53+
}
54+
}
55+
} catch {
56+
// Errors thrown by the buffer immediately terminate the sequence.
57+
pending.removeFirst().resume(returning: .failure(error))
58+
self.terminate()
59+
}
60+
}
61+
62+
func enqueue<Buffer: AsyncBuffer>(_ item: Input, buffer: Buffer) async where Buffer.Input == Input, Buffer.Output == Output {
63+
await buffer.push(item)
64+
await drain(buffer: buffer)
65+
}
66+
67+
func fail<Buffer: AsyncBuffer>(_ error: Error, buffer: Buffer) async where Buffer.Input == Input, Buffer.Output == Output {
68+
terminationState = .baseFailure(error)
69+
await drain(buffer: buffer)
70+
}
71+
72+
func finish<Buffer: AsyncBuffer>(buffer: Buffer) async where Buffer.Input == Input, Buffer.Output == Output {
73+
if case .running = terminationState {
74+
terminationState = .baseTermination
75+
}
76+
await drain(buffer: buffer)
77+
}
78+
79+
func terminate() {
80+
terminationState = .terminal
81+
pending = []
82+
for continuation in pending {
83+
continuation.resume(returning: .success(nil))
84+
}
85+
}
86+
87+
func next<Buffer: AsyncBuffer>(buffer: Buffer) async throws -> Buffer.Output? where Buffer.Input == Input, Buffer.Output == Output {
88+
if case .terminal = terminationState {
89+
return nil
90+
}
91+
92+
do {
93+
while let value = try await buffer.pop() {
94+
if let continuation = pending.first {
95+
pending.removeFirst()
96+
continuation.resume(returning: .success(value))
97+
} else {
98+
return value
99+
}
100+
}
101+
} catch {
102+
// Errors thrown by the buffer immediately terminate the sequence.
103+
self.terminate()
104+
throw error
105+
}
106+
107+
switch terminationState {
108+
case .running:
109+
break
110+
case .baseFailure(let error):
111+
self.terminate()
112+
throw error
113+
case .baseTermination, .terminal:
114+
self.terminate()
115+
return nil
116+
}
117+
118+
let result: Result<Output?, Error> = await withUnsafeContinuation { continuation in
119+
pending.append(continuation)
120+
}
121+
return try result._rethrowGet()
122+
}
123+
}
124+
125+
@rethrows
126+
public protocol AsyncBuffer: Actor {
127+
associatedtype Input: Sendable
128+
associatedtype Output: Sendable
129+
130+
func push(_ element: Input) async
131+
func pop() async throws -> Output?
132+
}
133+
134+
public actor AsyncLimitBuffer<Element: Sendable>: AsyncBuffer {
135+
public enum Policy: Sendable {
136+
case unbounded
137+
case bufferingOldest(Int)
138+
case bufferingNewest(Int)
139+
}
140+
141+
var buffer = [Element]()
142+
let policy: Policy
143+
144+
init(policy: Policy) {
145+
// limits should always be greater than 0 items
146+
switch policy {
147+
case .bufferingNewest(let limit):
148+
precondition(limit > 0)
149+
case .bufferingOldest(let limit):
150+
precondition(limit > 0)
151+
default: break
152+
}
153+
self.policy = policy
154+
}
155+
156+
public func push(_ element: Element) async {
157+
switch policy {
158+
case .unbounded:
159+
buffer.append(element)
160+
case .bufferingOldest(let limit):
161+
if buffer.count < limit {
162+
buffer.append(element)
163+
}
164+
case .bufferingNewest(let limit):
165+
if buffer.count < limit {
166+
// there is space available
167+
buffer.append(element)
168+
} else {
169+
// no space is available and this should make some room
170+
buffer.removeFirst()
171+
buffer.append(element)
172+
}
173+
}
174+
}
175+
176+
public func pop() async -> Element? {
177+
guard buffer.count > 0 else {
178+
return nil
179+
}
180+
return buffer.removeFirst()
181+
}
182+
}
183+
184+
extension AsyncSequence where Element: Sendable {
185+
public func buffer<Buffer: AsyncBuffer>(_ createBuffer: @Sendable @escaping () -> Buffer) -> AsyncBufferSequence<Self, Buffer> where Buffer.Input == Element {
186+
AsyncBufferSequence(self, createBuffer: createBuffer)
187+
}
188+
189+
public func buffer(policy limit: AsyncLimitBuffer<Element>.Policy) -> AsyncBufferSequence<Self, AsyncLimitBuffer<Element>> {
190+
buffer {
191+
AsyncLimitBuffer(policy: limit)
192+
}
193+
}
194+
}
195+
196+
public struct AsyncBufferSequence<Base: AsyncSequence, Buffer: AsyncBuffer> where Base.Element == Buffer.Input, Base.AsyncIterator: Sendable {
197+
let base: Base
198+
let createBuffer: @Sendable () -> Buffer
199+
200+
init(_ base: Base, createBuffer: @Sendable @escaping () -> Buffer) {
201+
self.base = base
202+
self.createBuffer = createBuffer
203+
}
204+
}
205+
206+
extension AsyncBufferSequence: Sendable where Base: Sendable, Base.AsyncIterator: Sendable, Base.Element: Sendable { }
207+
extension AsyncBufferSequence.Iterator: Sendable where Base: Sendable, Base.AsyncIterator: Sendable, Base.Element: Sendable { }
208+
209+
extension AsyncBufferSequence: AsyncSequence {
210+
public typealias Element = Buffer.Output
211+
212+
public struct Iterator: AsyncIteratorProtocol {
213+
struct Active {
214+
var task: Task<Void, Never>?
215+
let buffer: Buffer
216+
let state: AsyncBufferState<Buffer.Input, Buffer.Output>
217+
218+
init(_ iterator: Base.AsyncIterator, buffer: Buffer, state: AsyncBufferState<Buffer.Input, Buffer.Output>) {
219+
self.buffer = buffer
220+
self.state = state
221+
task = Task {
222+
var iter = iterator
223+
do {
224+
while let item = try await iter.next() {
225+
await state.enqueue(item, buffer: buffer)
226+
}
227+
await state.finish(buffer: buffer)
228+
} catch {
229+
await state.fail(error, buffer: buffer)
230+
}
231+
}
232+
}
233+
234+
func next() async rethrows -> Element? {
235+
let result: Result<Element?, Error> = await withTaskCancellationHandler {
236+
task?.cancel()
237+
} operation: {
238+
do {
239+
let value = try await state.next(buffer: buffer)
240+
return .success(value)
241+
} catch {
242+
task?.cancel()
243+
return .failure(error)
244+
}
245+
}
246+
return try result._rethrowGet()
247+
}
248+
}
249+
250+
enum State {
251+
case idle(Base.AsyncIterator, @Sendable () -> Buffer)
252+
case active(Active)
253+
}
254+
255+
var state: State
256+
257+
init(_ iterator: Base.AsyncIterator, createBuffer: @Sendable @escaping () -> Buffer) {
258+
state = .idle(iterator, createBuffer)
259+
}
260+
261+
public mutating func next() async rethrows -> Element? {
262+
switch state {
263+
case .idle(let iterator, let createBuffer):
264+
let bufferState = AsyncBufferState<Base.Element, Buffer.Output>()
265+
let buffer = Active(iterator, buffer: createBuffer(), state: bufferState)
266+
state = .active(buffer)
267+
return try await buffer.next()
268+
case .active(let buffer):
269+
return try await buffer.next()
270+
}
271+
}
272+
}
273+
274+
public func makeAsyncIterator() -> Iterator {
275+
Iterator(base.makeAsyncIterator(), createBuffer: createBuffer)
276+
}
277+
}

0 commit comments

Comments
 (0)