-
Notifications
You must be signed in to change notification settings - Fork 0
/
Concat.swift
116 lines (109 loc) · 3.22 KB
/
Concat.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
public func concat<T>(_ sources: Producer<T> ...) -> Producer<T> {
return concat(sources)
}
public func concat<T>(_ sources: Array<Producer<T>>) -> Producer<T> {
if sources.count == 0 {
return empty()
} else {
var iterator = sources.makeIterator()
var source: Producer<T> = iterator.next()!
while let nextSource = iterator.next() {
source = concat(nextSource)(source)
}
return source
}
}
public func concat<T>(_ secondProducer: @escaping Producer<T>) -> Operator<T, T> {
return { firstProducer in
return { sink in
let lock: RecursiveLock = RecursiveLock(label: DOMAIN("concat"))
var talkback: Optional<SourceTalkback<T>> = .none
var didReceiveCompletion = false
firstProducer { payload in
lock.withLock {
switch payload {
case let .start(tb):
talkback = tb
sink(.start({ payload in
lock.withLock {
switch payload {
case .start: return
case .next:
talkback?(payload)
case .completed:
didReceiveCompletion = true
sink(.completed(.finished))
}
}
}))
case .next:
sink(payload)
case .completed:
if didReceiveCompletion || payload.isForcedToComplete {
sink(payload)
} else {
secondProducer { payload in
lock.withLock {
switch payload {
case let .start(tb):
talkback = tb
talkback?(.next(.none))
case .next:
sink(payload)
case .completed:
sink(payload)
}
}
}
}
}
}
}
}
}
}
public func concat<T>() -> Operator<Producer<T>, T> {
return { source in
return { sink in
var talkback: Optional<SourceTalkback<Producer<T>>> = .none
var sources: Array<Producer<T>> = .init()
source { payload in
switch payload {
case let .start(tb):
talkback = .some(tb)
talkback?(.next(.none))
case let .next(source):
sources.append(source)
talkback?(.next(.none))
case let .completed(completion):
if payload.isForcedToComplete ||
sources.count == 0 {
sink(.completed(completion))
} else {
concat(sources)(sink)
}
}
}
}
}
}
public func append<T>(_ secondProducer: @escaping Producer<T>) -> Operator<T, T> {
return { firstProducer in
return concat(secondProducer)(firstProducer)
}
}
public func append<S: Sequence>(_ sequence: S) -> Operator<S.Element, S.Element> {
return { source in
return concat(from(sequence.makeIterator))(source)
}
}
public func prepend<T>(_ firstProducer: @escaping Producer<T>) -> Operator<T, T> {
return { secondProducer in
return concat(secondProducer)(firstProducer)
}
}
public func prepend<S: Sequence>(_ sequence: S) -> Operator<S.Element, S.Element> {
return { source in
return concat(source)(from(sequence.makeIterator))
}
}