-
Notifications
You must be signed in to change notification settings - Fork 26
/
AsyncWithLatestFrom2Sequence.swift
227 lines (203 loc) · 7.3 KB
/
AsyncWithLatestFrom2Sequence.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
//
// AsyncWithLatestFromSequence.swift
//
//
// Created by Thibault Wittemberg on 31/03/2022.
//
public extension AsyncSequence {
/// Combines `self` with two other ``AsyncSequence`` into a single ``AsyncSequence`` where each
/// element from `self` is aggregated to the latest known elements from the `other` sequences (if any) as a tuple.
///
/// Remark: as the `other` sequences are being iterated over in the context of their own ``Task``, there is no guarantee
/// that their latest know elements are the ones that have just been produced when the base sequence produces its next element.
///
/// ```
/// let base = AsyncPassthoughSubject<Int>()
/// let other1 = AsyncPassthoughSubject<String>()
/// let other2 = AsyncPassthoughSubject<String>()
/// let sequence = base.withLatest(from: other1, other2)
///
/// Task {
/// for element in await sequence {
/// print(element)
/// }
/// }
///
/// await other1.send("a")
/// await other1.send("b")
///
/// await other2.send("c")
/// await other2.send("d")
///
/// ... later in the application flow
///
/// await base.send(1)
///
/// // will print: (1, "b", "d")
/// ```
///
/// - Parameters:
/// - other1: the first other ``AsyncSequence``
/// - other2: the second other ``AsyncSequence``
/// - Returns: an ``AsyncWithLatestFrom2Sequence`` where elements are a tuple of an element from `self` and the
/// latest known elements (if any) from the `other` sequences.
func withLatest<Other1: AsyncSequence, Other2: AsyncSequence>(
from other1: Other1,
_ other2: Other2
) -> AsyncWithLatestFrom2Sequence<Self, Other1, Other2> {
AsyncWithLatestFrom2Sequence(self, other1, other2)
}
}
/// ``AsyncWithLatestFrom2Sequence`` is an ``AsyncSequence`` where elements are a tuple of an element from `base` and the
/// latest known element (if any) from the `other` sequences.
public struct AsyncWithLatestFrom2Sequence<Base: AsyncSequence, Other1: AsyncSequence, Other2: AsyncSequence>: AsyncSequence
where Other1: Sendable, Other2: Sendable, Other1.Element: Sendable, Other2.Element: Sendable {
public typealias Element = (Base.Element, Other1.Element, Other2.Element)
public typealias AsyncIterator = Iterator
let base: Base
let other1: Other1
let other2: Other2
// for testability purpose
var onBaseElement: (@Sendable (Base.Element) -> Void)?
var onOther1Element: (@Sendable (Other1.Element?) -> Void)?
var onOther2Element: (@Sendable (Other2.Element?) -> Void)?
init(_ base: Base, _ other1: Other1, _ other2: Other2) {
self.base = base
self.other1 = other1
self.other2 = other2
}
public func makeAsyncIterator() -> Iterator {
var iterator = Iterator(
base: self.base.makeAsyncIterator(),
other1: self.other1,
other2: self.other2
)
iterator.onBaseElement = onBaseElement
iterator.onOther1Element = onOther1Element
iterator.onOther2Element = onOther2Element
iterator.startOthers()
return iterator
}
public struct Iterator: AsyncIteratorProtocol {
enum Other1State {
case idle
case element(Result<Other1.Element, Error>)
}
enum Other2State {
case idle
case element(Result<Other2.Element, Error>)
}
struct OthersState {
var other1State: Other1State
var other2State: Other2State
static var idle: OthersState {
OthersState(other1State: .idle, other2State: .idle)
}
}
enum BaseDecision {
case pass
case returnElement(Result<Element, Error>)
}
var base: Base.AsyncIterator
let other1: Other1
let other2: Other2
let othersState: ManagedCriticalState<OthersState>
var othersTask: Task<Void, Never>?
var isTerminated: ManagedCriticalState<Bool>
// for testability purpose
var onBaseElement: (@Sendable (Base.Element) -> Void)?
var onOther1Element: (@Sendable (Other1.Element?) -> Void)?
var onOther2Element: (@Sendable (Other2.Element?) -> Void)?
public init(base: Base.AsyncIterator, other1: Other1, other2: Other2) {
self.base = base
self.other1 = other1
self.other2 = other2
self.othersState = ManagedCriticalState(.idle)
self.isTerminated = ManagedCriticalState(false)
}
mutating func startOthers() {
self.othersTask = Task { [othersState, other1, other2, onOther1Element, onOther2Element] in
await withTaskGroup(of: Void.self) { group in
group.addTask {
do {
for try await element in other1 {
othersState.withCriticalRegion { state in
state.other1State = .element(.success(element))
}
onOther1Element?(element)
}
} catch {
othersState.withCriticalRegion { state in
state.other1State = .element(.failure(error))
}
}
}
group.addTask {
do {
for try await element in other2 {
othersState.withCriticalRegion { state in
state.other2State = .element(.success(element))
}
onOther2Element?(element)
}
} catch {
othersState.withCriticalRegion { state in
state.other2State = .element(.failure(error))
}
}
}
}
}
}
public mutating func next() async rethrows -> Element? {
let shouldReturnNil = self.isTerminated.withCriticalRegion { $0 }
guard !shouldReturnNil else { return nil }
return try await withTaskCancellationHandler { [isTerminated, othersTask] in
isTerminated.withCriticalRegion { isTerminated in
isTerminated = true
}
othersTask?.cancel()
} operation: { [othersTask, othersState, onBaseElement] in
do {
while true {
guard let baseElement = try await self.base.next() else {
isTerminated.withCriticalRegion { isTerminated in
isTerminated = true
}
othersTask?.cancel()
return nil
}
onBaseElement?(baseElement)
let decision = othersState.withCriticalRegion { state -> BaseDecision in
switch (state.other1State, state.other2State) {
case (.idle, _):
return .pass
case (_, .idle):
return .pass
case (.element(.success(let other1Element)), .element(.success(let other2Element))):
return .returnElement(.success((baseElement, other1Element, other2Element)))
case (.element(.failure(let otherError)), _):
return .returnElement(.failure(otherError))
case (_, .element(.failure(let otherError))):
return .returnElement(.failure(otherError))
}
}
switch decision {
case .pass:
continue
case .returnElement(let result):
return try result._rethrowGet()
}
}
} catch {
isTerminated.withCriticalRegion { isTerminated in
isTerminated = true
}
othersTask?.cancel()
throw error
}
}
}
}
}
extension AsyncWithLatestFrom2Sequence: Sendable where Base: Sendable { }