Skip to content

Commit 29255b1

Browse files
committed
Async streaming
1 parent 6c050d5 commit 29255b1

17 files changed

+1016
-2
lines changed

Package.swift

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,24 @@ let availabilityMacros: [SwiftSetting] = [
2020
),
2121
]
2222

23+
let extraSettings: [SwiftSetting] = [
24+
.strictMemorySafety(),
25+
.enableExperimentalFeature("SuppressedAssociatedTypes"),
26+
.enableExperimentalFeature("LifetimeDependence"),
27+
.enableExperimentalFeature("Lifetimes"),
28+
.enableUpcomingFeature("LifetimeDependence"),
29+
.enableUpcomingFeature("NonisolatedNonsendingByDefault"),
30+
.enableUpcomingFeature("InferIsolatedConformances"),
31+
.enableUpcomingFeature("ExistentialAny"),
32+
.enableUpcomingFeature("MemberImportVisibility"),
33+
.enableUpcomingFeature("InternalImportsByDefault"),
34+
]
35+
2336
let package = Package(
2437
name: "swift-async-algorithms",
2538
products: [
26-
.library(name: "AsyncAlgorithms", targets: ["AsyncAlgorithms"])
39+
.library(name: "AsyncAlgorithms", targets: ["AsyncAlgorithms"]),
40+
.library(name: "AsyncStreaming", targets: ["AsyncStreaming"])
2741
],
2842
targets: [
2943
.target(
@@ -36,6 +50,14 @@ let package = Package(
3650
.enableExperimentalFeature("StrictConcurrency=complete")
3751
]
3852
),
53+
.target(
54+
name: "AsyncStreaming",
55+
dependencies: [
56+
.product(name: "DequeModule", package: "swift-collections"),
57+
.product(name: "BasicContainers", package: "swift-collections"),
58+
],
59+
swiftSettings: extraSettings + [.swiftLanguageMode(.v6)]
60+
),
3961
.target(
4062
name: "AsyncSequenceValidation",
4163
dependencies: ["_CAsyncSequenceValidationSupport", "AsyncAlgorithms"],
@@ -51,6 +73,13 @@ let package = Package(
5173
.enableExperimentalFeature("StrictConcurrency=complete")
5274
]
5375
),
76+
.testTarget(
77+
name: "AsyncStreamingTests",
78+
dependencies: [
79+
.target(name: "AsyncStreaming"),
80+
],
81+
swiftSettings: extraSettings + [.swiftLanguageMode(.v6)]
82+
),
5483
.testTarget(
5584
name: "AsyncAlgorithmsTests",
5685
dependencies: [
@@ -95,7 +124,10 @@ let package = Package(
95124

96125
if Context.environment["SWIFTCI_USE_LOCAL_DEPS"] == nil {
97126
package.dependencies += [
98-
.package(url: "https://github.com/apple/swift-collections.git", from: "1.1.0")
127+
.package(
128+
url: "https://github.com/FranzBusch/swift-collections.git",
129+
revision: "53408db248f5bea068579343c47aa0a542dce6c9",
130+
)
99131
]
100132
} else {
101133
package.dependencies += [
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
@available(macOS 26.0, iOS 26.0, watchOS 26.0, tvOS 26.0, visionOS 26.0, *)
2+
public protocol BorrowMutableIteratorProtocol<Element>: ~Copyable, ~Escapable {
3+
associatedtype Element: ~Copyable
4+
5+
// TODO: This is using a closure since we need the exclusive modifier to be
6+
// able to extract mutable spans from the underlying owning buffer
7+
mutating func nextSpan<Return, Failure>(
8+
maximumCount: Int?,
9+
body: (inout MutableSpan<Element>) async throws(Failure) -> Return
10+
) async throws(Failure) -> Return
11+
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/// An enumeration that represents one of two possible error types.
2+
///
3+
/// ``EitherError`` provides a type-safe way to represent errors that can be one of two distinct
4+
/// error types.
5+
public enum EitherError<First: Error, Second: Error>: Error {
6+
/// An error of the first type.
7+
///
8+
/// The associated value contains the specific error instance of type `First`.
9+
case first(First)
10+
11+
/// An error of the second type.
12+
///
13+
/// The associated value contains the specific error instance of type `Second`.
14+
case second(Second)
15+
16+
/// Throws the underlying error by unwrapping this either error.
17+
///
18+
/// This method extracts and throws the actual error contained within the either error,
19+
/// whether it's the first or second type. This is useful when you need to propagate
20+
/// the original error without the either error wrapper.
21+
///
22+
/// - Throws: The underlying error, either of type `First` or `Second`.
23+
///
24+
/// ## Example
25+
///
26+
/// ```swift
27+
/// do {
28+
/// // Some operation that returns EitherError
29+
/// let result = try await operation()
30+
/// } catch let eitherError as EitherError<NetworkError, ParseError> {
31+
/// try eitherError.unwrap() // Throws the original error
32+
/// }
33+
/// ```
34+
public func unwrap() throws {
35+
switch self {
36+
case .first(let first):
37+
throw first
38+
case .second(let second):
39+
throw second
40+
}
41+
}
42+
}
43+
44+
@available(macOS 26.0, iOS 26.0, watchOS 26.0, tvOS 26.0, visionOS 26.0, *)
45+
struct File {
46+
@_lifetime(&self)
47+
mutating func read() async -> Span<UInt8> {
48+
fatalError()
49+
}
50+
51+
mutating func write(span: Span<UInt8>) async {
52+
fatalError()
53+
}
54+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
@available(macOS 26.0, iOS 26.0, watchOS 26.0, tvOS 26.0, visionOS 26.0, *)
2+
extension InlineArray where Element: ~Copyable {
3+
package static func one(value: consuming Element) -> InlineArray<1, Element> {
4+
return InlineArray<1, Element>(first: value) { _ in fatalError() }
5+
}
6+
7+
package static func zero(of elementType: Element.Type = Element.self) -> InlineArray<0, Element> {
8+
return InlineArray<0, Element> { _ in }
9+
}
10+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
extension Optional where Wrapped: ~Copyable {
2+
@inlinable
3+
mutating func takeSending() -> sending Self {
4+
let result = consume self
5+
self = nil
6+
return result
7+
}
8+
}
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
@available(macOS 26.0, iOS 26.0, watchOS 26.0, tvOS 26.0, visionOS 26.0, *)
2+
extension Array {
3+
/// Creates an async reader that provides access to the array's elements.
4+
///
5+
/// This method converts an array into an ``AsyncReader`` implementation, allowing
6+
/// the array's elements to be read through the async reader interface.
7+
///
8+
/// - Returns: An ``AsyncReader`` that produces all elements of the array.
9+
///
10+
/// ## Example
11+
///
12+
/// ```swift
13+
/// let numbers = [1, 2, 3, 4, 5]
14+
/// var reader = numbers.asyncReader()
15+
///
16+
/// try await reader.forEach { span in
17+
/// print("Read \(span.count) numbers")
18+
/// }
19+
/// ```
20+
public func asyncReader() -> some AsyncReader<Element, Never> & SendableMetatype & ~Escapable {
21+
return ArrayAsyncReader(array: self)
22+
}
23+
}
24+
25+
/// An async reader implementation that provides array elements through the AsyncReader interface.
26+
///
27+
/// This internal reader type wraps an array and delivers its elements through the ``AsyncReader``
28+
/// protocol. It maintains a current read position and can deliver elements in chunks based on
29+
/// the requested maximum count.
30+
@available(macOS 26.0, iOS 26.0, watchOS 26.0, tvOS 26.0, visionOS 26.0, *)
31+
struct ArrayAsyncReader<Element>: AsyncReader, BorrowMutableIteratorProtocol {
32+
typealias ReadIterator = Self
33+
typealias Element = Element
34+
typealias ReadElement = Element
35+
typealias ReadFailure = Never
36+
37+
var array: [Element]
38+
var index: Array<Element>.Index
39+
40+
init(array: [Element]) {
41+
self.array = array
42+
self.index = array.startIndex
43+
}
44+
45+
mutating func nextSpan<Return, Failure>(
46+
maximumCount: Int?,
47+
body: (inout MutableSpan<Element>) async throws(Failure) -> Return
48+
) async throws(Failure) -> Return {
49+
guard self.index < self.array.endIndex else {
50+
var mutableSpan = self.array.mutableSpan
51+
var empty = mutableSpan._mutatingExtracting(last: 0)
52+
53+
return try await body(&empty)
54+
}
55+
56+
guard let maximumCount else {
57+
let index = self.index
58+
self.index = self.array.span.indices.endIndex
59+
var mutableSpan = self.array.mutableSpan
60+
var sizedMutableSpan = mutableSpan._mutatingExtracting(index...)
61+
return try await body(&sizedMutableSpan)
62+
}
63+
let endIndex = min(
64+
self.array.span.indices.endIndex,
65+
self.index.advanced(
66+
by: maximumCount
67+
)
68+
)
69+
self.index = endIndex
70+
let index = self.index
71+
var mutableSpan = self.array.mutableSpan
72+
var sizedMutableSpan = mutableSpan._mutatingExtracting(index..<endIndex)
73+
return try await body(&sizedMutableSpan)
74+
}
75+
76+
mutating func read<Return, Failure>(
77+
body: (inout ArrayAsyncReader<Element>) async throws(Failure) -> Return
78+
) async throws(EitherError<Never, Failure>) -> Return {
79+
do {
80+
return try await body(&self)
81+
} catch {
82+
throw.second(error)
83+
}
84+
}
85+
}
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
// swift-format-ignore: AmbiguousTrailingClosureOverload
2+
@available(macOS 26.0, iOS 26.0, watchOS 26.0, tvOS 26.0, visionOS 26.0, *)
3+
extension AsyncReader where Self: ~Copyable, Self: ~Escapable {
4+
/// Iterates over all elements from the reader, executing the provided body for each span.
5+
///
6+
/// This method continuously reads elements from the async reader until the stream ends,
7+
/// executing the provided closure for each span of elements read. The iteration terminates
8+
/// when the reader produces an empty span, indicating the end of the stream.
9+
///
10+
/// - Parameter body: An asynchronous closure that processes each span of elements read
11+
/// from the stream. The closure receives a `Span<ReadElement>` for each read operation.
12+
///
13+
/// - Throws: An `EitherError` containing either a `ReadFailure` from the read operation
14+
/// or a `Failure` from the body closure.
15+
///
16+
/// ## Example
17+
///
18+
/// ```swift
19+
/// var fileReader: FileAsyncReader = ...
20+
///
21+
/// // Process each chunk of data from the file
22+
/// try await fileReader.forEach { chunk in
23+
/// print("Processing \(chunk.count) elements")
24+
/// // Process the chunk
25+
/// }
26+
/// ```
27+
public consuming func forEach<Failure: Error>(
28+
body: (inout MutableSpan<ReadElement>) async throws(Failure) -> Void
29+
) async throws(EitherError<ReadFailure, Failure>) {
30+
var shouldContinue = true
31+
while shouldContinue {
32+
try await self.read { (iterator) throws(Failure) -> Void in
33+
try await iterator.nextSpan(maximumCount: 0) { (next) throws(Failure) -> Void in
34+
guard next.count > 0 else {
35+
shouldContinue = false
36+
return
37+
}
38+
39+
try await body(&next)
40+
}
41+
}
42+
}
43+
}
44+
45+
/// Iterates over all elements from the reader, executing the provided body for each span.
46+
///
47+
/// This method continuously reads elements from the async reader until the stream ends,
48+
/// executing the provided closure for each span of elements read. The iteration terminates
49+
/// when the reader produces an empty span, indicating the end of the stream.
50+
///
51+
/// - Parameter body: An asynchronous closure that processes each span of elements read
52+
/// from the stream. The closure receives a `Span<ReadElement>` for each read operation.
53+
///
54+
/// - Throws: An error of type `Failure` from the body closure. Since this reader never fails,
55+
/// only the body closure can throw errors.
56+
///
57+
/// ## Example
58+
///
59+
/// ```swift
60+
/// var fileReader: FileAsyncReader = ...
61+
///
62+
/// // Process each chunk of data from the file
63+
/// try await fileReader.forEach { chunk in
64+
/// print("Processing \(chunk.count) elements")
65+
/// // Process the chunk
66+
/// }
67+
/// ```
68+
public consuming func forEach<Failure: Error>(
69+
body: (inout MutableSpan<ReadElement>) async throws(Failure) -> Void
70+
) async throws(Failure) where ReadFailure == Never {
71+
var shouldContinue = true
72+
while shouldContinue {
73+
do {
74+
try await self.read { (iterator) throws(Failure) -> Void in
75+
try await iterator.nextSpan(maximumCount: 0) { (next) throws(Failure) -> Void in
76+
guard next.count > 0 else {
77+
shouldContinue = false
78+
return
79+
}
80+
81+
try await body(&next)
82+
}
83+
}
84+
} catch {
85+
switch error {
86+
case .first:
87+
fatalError()
88+
case .second(let error):
89+
throw error
90+
}
91+
}
92+
}
93+
}
94+
}

0 commit comments

Comments
 (0)