Skip to content

Commit e31495e

Browse files
author
Thibault Wittemberg
committed
Merge branch 'feature/streamed'
2 parents c5cbf56 + bb3f4be commit e31495e

File tree

5 files changed

+149
-2
lines changed

5 files changed

+149
-2
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
**v0.4.0 - Bore:**
2+
3+
- AsyncStreams: new @Streamed property wrapper
4+
- AsyncSequences: finish Timer when canceled
5+
16
**v0.3.0 - Beryllium:**
27

38
- Operators: new Share operator

README.md

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ AsyncSequences
3535
* [Passthrough](#Passthrough)
3636
* [CurrentValue](#CurrentValue)
3737
* [Replay](#Replay)
38+
* [Streamed](#Streamed)
3839

3940
### Operators
4041
* [Collect](#Collect)
@@ -297,6 +298,34 @@ for try await element in replay {
297298
}
298299
```
299300

301+
### Streamed
302+
303+
`Streamed` is a property wrapper that streams a property as an AsyncSequence. It is a structured concurrency equivalent to **Combine @Published**.
304+
305+
```swift
306+
class Weather {
307+
@Streamed var temperature: Double
308+
init(temperature: Double) {
309+
self.temperature = temperature
310+
}
311+
}
312+
313+
let weather = Weather(temperature: 20)
314+
Task {
315+
for try await element in weather.$temperature {
316+
print ("Temperature now: \(element)")
317+
}
318+
}
319+
320+
// ... later in the application flow
321+
322+
weather.temperature = 25
323+
324+
// will print:
325+
// Temperature now: 20.0
326+
// Temperature now: 25.0
327+
```
328+
300329
## Operators
301330

302331
### Collect

Sources/AsyncStreams/Streamed.swift

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
//
2+
// Streamed.swift
3+
//
4+
//
5+
// Created by Thibault Wittemberg on 20/03/2022.
6+
//
7+
8+
/// A type that streams a property marked with an attribute as an AsyncSequence.
9+
///
10+
/// Streaming a property with the `@Streamed` attribute creates an AsyncSequence of this type. You access the AsyncSequence with the `$` operator, as shown here:
11+
///
12+
/// class Weather {
13+
/// @Streamed var temperature: Double
14+
/// init(temperature: Double) {
15+
/// self.temperature = temperature
16+
/// }
17+
/// }
18+
///
19+
/// let weather = Weather(temperature: 20)
20+
/// Task {
21+
/// for try await element in weather.$temperature {
22+
/// print ("Temperature now: \(element)")
23+
/// }
24+
/// }
25+
///
26+
/// // ... later in the application flow
27+
///
28+
/// weather.temperature = 25
29+
///
30+
/// // Prints:
31+
/// // Temperature now: 20.0
32+
/// // Temperature now: 25.0
33+
@propertyWrapper public struct Streamed<Element> {
34+
let currentValue: AsyncStreams.CurrentValue<Element>
35+
36+
/// Creates the streamed instance with an initial wrapped value.
37+
///
38+
/// Don't use this initializer directly. Instead, create a property with the `@Streamed` attribute, as shown here:
39+
///
40+
/// @Streamed var lastUpdated: Date = Date()
41+
///
42+
/// - Parameter wrappedValue: The stream's initial value.
43+
public init(wrappedValue: Element) {
44+
self.currentValue = AsyncStreams.CurrentValue<Element>(wrappedValue)
45+
self.wrappedValue = wrappedValue
46+
}
47+
48+
public var wrappedValue: Element {
49+
willSet {
50+
self.currentValue.element = newValue
51+
}
52+
}
53+
54+
/// The property for which this instance exposes an AsyncSequence.
55+
///
56+
/// The ``Streamed/projectedValue`` is the property accessed with the `$` operator.
57+
public var projectedValue: AnyAsyncSequence<Element> {
58+
self.currentValue.eraseToAnyAsyncSequence()
59+
}
60+
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
//
2+
// StreamedTests.swift
3+
//
4+
//
5+
// Created by Thibault Wittemberg on 20/03/2022.
6+
//
7+
8+
import AsyncExtensions
9+
import Combine
10+
import XCTest
11+
12+
final class StreamedTests: XCTestCase {
13+
@Streamed
14+
var sut: Int = 0
15+
16+
func test_streamed_gets_and_sets_element() {
17+
XCTAssertEqual(sut, 0)
18+
let newValue = Int.random(in: 0...100)
19+
self.sut = newValue
20+
XCTAssertEqual(sut, newValue)
21+
}
22+
23+
func test_streamed_projects_in_asyncSequence() {
24+
let firstElementIsReceivedExpectation = expectation(description: "The first element has been received")
25+
let fifthElementIsReceivedExpectation = expectation(description: "The fifth element has been received")
26+
27+
let expectedElements = [0, 1, 2, 3, 4]
28+
let task = Task(priority: .high) {
29+
var receivedElements = [Int]()
30+
for try await element in self.$sut {
31+
receivedElements.append(element)
32+
33+
if element == 0 {
34+
firstElementIsReceivedExpectation.fulfill()
35+
}
36+
if element == 4 {
37+
fifthElementIsReceivedExpectation.fulfill()
38+
XCTAssertEqual(receivedElements, expectedElements)
39+
}
40+
}
41+
}
42+
43+
wait(for: [firstElementIsReceivedExpectation], timeout: 1)
44+
45+
sut = 1
46+
sut = 2
47+
sut = 3
48+
sut = 4
49+
50+
wait(for: [fifthElementIsReceivedExpectation], timeout: 1)
51+
task.cancel()
52+
}
53+
}

Tests/Operators/AsyncSequence+MulticastTests.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ final class AsyncSequence_MulticastTests: XCTestCase {
7373
let stream = AsyncStreams.Passthrough<Int>()
7474
let sut = spyUpstreamSequence.multicast(stream)
7575

76-
Task {
76+
Task(priority: .high) {
7777
var receivedElement = [Int]()
7878
for try await element in sut {
7979
receivedElement.append(element)
@@ -82,7 +82,7 @@ final class AsyncSequence_MulticastTests: XCTestCase {
8282
tasksHaveFinishedExpectation.fulfill()
8383
}
8484

85-
Task {
85+
Task(priority: .high) {
8686
var receivedElement = [Int]()
8787
for try await element in sut {
8888
receivedElement.append(element)

0 commit comments

Comments
 (0)