Skip to content

Commit 6d3d520

Browse files
authored
Merge pull request #14 from AsyncCommunity/improve/handle-error-withLatestFrom
operators: handle errors in other for withLatestFrom
2 parents 0aafcc4 + db10b3f commit 6d3d520

File tree

3 files changed

+71
-8
lines changed

3 files changed

+71
-8
lines changed

Sources/Operators/AsyncSequence+SwitchToLatest.swift

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ public extension AsyncSequence where Element: AsyncSequence {
2222
/// // will print:
2323
/// a3, b3
2424
/// ```
25-
/// - parameter upstreamPriority: can be used to change the priority of the task that supports the iteration over the upstream sequence (nil by default)
25+
/// - parameter upstreamPriority: can be used to change the priority of the task that supports the iteration
26+
/// over the upstream sequence (nil by default)
2627
///
2728
/// - Returns: The async sequence that republishes elements sent by the most recently received async sequence.
2829
func switchToLatest(upstreamPriority: TaskPriority? = nil) -> AsyncSwitchToLatestSequence<Self> {

Sources/Operators/AsyncSequence+WithLatestFrom.swift

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ public extension AsyncSequence {
3535
///
3636
/// - returns: An async sequence emitting the result of combining each value of the self
3737
/// with the latest value from the other sequence. If the other sequence finishes, the returned sequence
38-
/// will finish with the next value from self.
38+
/// will finish with the next value from self. if the other sequence fails, the returned sequence will fail
39+
/// with the next value from self.
3940
///
4041
func withLatestFrom<OtherAsyncSequence: AsyncSequence>(
4142
_ other: OtherAsyncSequence,
@@ -77,6 +78,8 @@ public struct AsyncWithLatestFromSequence<UpstreamAsyncSequence: AsyncSequence,
7778

7879
final class OtherIteratorManager {
7980
var otherElement: OtherAsyncSequence.Element?
81+
var otherError: Error?
82+
8083
var otherIterator: OtherAsyncSequence.AsyncIterator
8184
var hasStarted = false
8285

@@ -98,12 +101,16 @@ public struct AsyncWithLatestFromSequence<UpstreamAsyncSequence: AsyncSequence,
98101
self.otherElement = try await self.otherIterator.next()
99102

100103
Task(priority: self.otherPriority) { [weak self] in
101-
while let element = try await self?.otherIterator.next() {
102-
guard !Task.isCancelled else { break }
103-
104-
self?.otherElement = element
104+
do {
105+
while let element = try await self?.otherIterator.next() {
106+
guard !Task.isCancelled else { break }
107+
108+
self?.otherElement = element
109+
}
110+
self?.otherElement = nil
111+
} catch {
112+
self?.otherError = error
105113
}
106-
self?.otherElement = nil
107114
}
108115
}
109116
}
@@ -132,6 +139,10 @@ public struct AsyncWithLatestFromSequence<UpstreamAsyncSequence: AsyncSequence,
132139
let upstreamElement = try await self.upstreamAsyncIterator.next()
133140
let otherElement = self.otherIteratorManager.otherElement
134141

142+
if let otherError = self.otherIteratorManager.otherError {
143+
throw otherError
144+
}
145+
135146
guard let nonNilUpstreamElement = upstreamElement,
136147
let nonNilOtherElement = otherElement else {
137148
return nil

Tests/Operators/AsyncSequence+WithLatestFromTests.swift

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ final class AsyncSequence_WithLatestFromTests: XCTestCase {
7878
wait(for: [sutIsFinishedExpectation], timeout: 2)
7979
}
8080

81-
func test_withLatestFrom_finishes_if_upstream_finishes() async throws {
81+
func test_withLatestFrom_finishes_when_upstream_finishes() async throws {
8282
let upstream = AsyncStreams.CurrentValue<Int>(1)
8383
let other = AsyncStreams.CurrentValue<String>("1")
8484

@@ -125,4 +125,55 @@ final class AsyncSequence_WithLatestFromTests: XCTestCase {
125125

126126
wait(for: [taskHasFinishedExpectation], timeout: 5) // task has been cancelled and has finished
127127
}
128+
129+
func test_withLatestFrom_throws_when_other_throws() {
130+
let canSendErrorInOtherSequenceExpectation = expectation(description: "The first element has been emitted by the upstrean sequence")
131+
let secondElementSentInUpstreamExpectation = expectation(description: "2 has been sent in the upstream sequence")
132+
let otherHasFailedExpectation = expectation(description: "The other sequence has failed")
133+
let sutHasFailedExpectation = expectation(description: "The sut has failed")
134+
135+
let upstream = AsyncStreams.CurrentValue<Int>(1)
136+
let other = AsyncStreams.CurrentValue<String>("1")
137+
138+
let sut = upstream.withLatestFrom(other, otherPriority: .high)
139+
140+
// monitoring the other sequence's error
141+
Task(priority: .high) {
142+
do {
143+
try await other.collect { _ in }
144+
} catch {
145+
otherHasFailedExpectation.fulfill()
146+
}
147+
}
148+
149+
Task(priority: .low) {
150+
var firstElement: (Int, String)?
151+
do {
152+
for try await element in sut {
153+
firstElement = element
154+
if firstElement?.0 == 1 {
155+
// first element is received, make other fail
156+
canSendErrorInOtherSequenceExpectation.fulfill()
157+
wait(for: [secondElementSentInUpstreamExpectation], timeout: 5)
158+
}
159+
}
160+
} catch {
161+
XCTAssertEqual(firstElement?.0, 1)
162+
XCTAssertEqual(firstElement?.1, "1")
163+
sutHasFailedExpectation.fulfill()
164+
}
165+
}
166+
167+
wait(for: [canSendErrorInOtherSequenceExpectation], timeout: 5) // one element has been emitted, we can send error in the other seq
168+
169+
other.send(termination: .failure(NSError(domain: "", code: 1)))
170+
171+
wait(for: [otherHasFailedExpectation], timeout: 5)
172+
173+
upstream.send(2)
174+
175+
secondElementSentInUpstreamExpectation.fulfill() // we can release the lock
176+
177+
wait(for: [sutHasFailedExpectation], timeout: 5) // task has been cancelled and has finished
178+
}
128179
}

0 commit comments

Comments
 (0)