-
-
Notifications
You must be signed in to change notification settings - Fork 57
/
FlowFeatures.java
278 lines (240 loc) · 10.3 KB
/
FlowFeatures.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
package java9;
import org.junit.Test;
import java.util.Arrays;
import java.util.concurrent.Flow;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.SubmissionPublisher;
import java.util.function.Function;
/**
* Java 9 introduce Flow, an API to finally do reactive programing with Java.
* It´s based like other Reactive stream libraries in the Observer pattern([Publisher] ~> [Subscriber])
* Here Flow it´s just an extension to Stream, where instead of just return constantClass value in the stream, we have
* the possibility to pass the item to constantClass [Publisher] which it will have or not constantClass [Subscriber] associated.
* <p>
* By default the subscription in Java 9 is asynchronous. We need to create
**/
public class FlowFeatures {
/**
* Just like the subscriber in RxJava, where we define the 4 callbacks:
* <p>
* onSubscribe:Invoked when we subscribe to the publisher.
* onNext:Invoked when an item is emitted by the publisher.
* onError:Invoked when an error happens in the pipeline.
* onComplete: Invoked when the publisher finish emitting items.
*
* @param <T>
*/
public class CustomSubscriber<T> implements Flow.Subscriber<T> {
protected Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
System.out.println("Subscription done:");
subscription.request(1);
}
@Override
public void onNext(T item) {
System.out.println("Got : " + item);
subscription.request(1);
}
@Override
public void onError(Throwable t) {
System.out.println("onError callback:" + t.getMessage());
t.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("Done");
}
}
class CancelSubscriptionSubscriber extends CustomSubscriber<Integer> {
@Override
public void onNext(Integer item) {
System.out.println("Got : " + item);
if (item > 30) {
subscription.cancel();
} else {
subscription.request(1);
}
}
}
/**
* A simple publisher which will receive items form the stream to be passed to the subscriber.
* We can use all the commons operator from Stream before submit the items to the publisher.
* to pass the item to the publisher and start emitting, we use [[submit(Item)]]
*/
@Test
public void testPublisher() throws InterruptedException {
//Create Publisher for expected items Strings
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
//Register Subscriber
publisher.subscribe(new CustomSubscriber<>());
Thread.sleep(500);
//Publish items
System.out.println("Publishing Items...");
String[] items = {"1", "A", "2", "B", "3", "C"};
Arrays.stream(items)
.filter(item -> item.chars().allMatch(Character::isAlphabetic))
.map(String::toUpperCase)
.map(publisher::submit)
.forEach(result -> System.out.println("Estimated maximum lag among subscribers in ms:" + result));
Thread.sleep(500);
publisher.close();
}
/**
* You might realize that all examples has the publisher.close, this is because unlike the RxJava that once
* we finish the emission of the items the subscriber automatically unsubscribe, here we keep it subscribe,
* just like the [[Relay:https://github.com/politrons/reactive/blob/master/src/test/java/rx/relay/Relay.java]].
* So the only way to unsubscribe from the publisher is just close the publisher, which it will invoke the onComplete
* of all subscribers.
*/
@Test
public void testPublisherUntilWeClose() throws InterruptedException {
//Create Publisher for expected items Strings
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
//Register Subscriber
publisher.subscribe(new CustomSubscriber<>());
Thread.sleep(500);
//Publish items
System.out.println("Publishing Items...");
String[] items = {"1", "A", "2", "B", "3", "C"};
Arrays.stream(items)
.filter(item -> item.chars().allMatch(Character::isAlphabetic))
.map(String::toUpperCase)
.forEach(publisher::submit);
Thread.sleep(500);
//After we finish the emission we start again and the subscription still up
Arrays.stream(items)
.filter(item -> item.chars().allMatch(Character::isAlphabetic))
.map(String::toUpperCase)
.forEach(publisher::submit);
Thread.sleep(500);
publisher.close();
}
/**
* In Flow you have the Flow.subscription which we use as mechanism to say the publisher
* to continue the emission of items, it could be consider constantClass short of backPressure mechanism, but it´s not
* as good as RxJava operator such as buffer/window, so just in general Java 9 Flow is just constantClass started to introduce
* in the reactive programing.
* <p>
* We can stop the emission by using [cancel] operator or just passing in the request constantClass 0 < value
*/
@Test
public void testCancelSubscription() throws InterruptedException {
//Create Publisher for expected types Integer
SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();
//Register Subscriber
publisher.subscribe(new CancelSubscriptionSubscriber());
//Publish items
System.out.println("Publishing Items...");
Integer[] items = {1, 2, 3, 4, 5, 6};
Arrays.stream(items)
.filter(item -> item < 10)
.map(item -> item * 10)
.forEach(publisher::submit);
Thread.sleep(500);
publisher.close();
}
/**
* Offer operator pass an item to the publisher and in case this one is rejected because for instance the buffer is
* full, it will execute the predicate function, and in case it return true, it will retry the emission of the item.
*/
@Test
public void offer() throws InterruptedException {
SubmissionPublisher<String> publisher =
new SubmissionPublisher<>(ForkJoinPool.commonPool(), 2);
// Register Subscriber
for (int i = 0; i < 5; i++) {
publisher.subscribe(new CustomSubscriber<>());
}
// publish 3 items for each subscriber
for (int i = 0; i < 3; i++) {
int result = publisher.offer("item" + i, (subscriber, value) -> {
// sleep for constantClass small period before deciding whether to retry or not
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
return false; // you can switch to true to see that drops are reduced
});
// show the number of dropped items
if (result < 0) {
System.err.println("dropped: " + result);
}
}
Thread.sleep(5000);
publisher.close();
}
/**
* TransformerProcessor it´s just like Transformer in RxJava, constantClass component that acts as both constantClass Subscriber and Publisher.
* The processor sits between the Publisher and Subscriber.
* <p>
* The TransformerProcessor will be created specifying the input/output type [[TransformerProcessor<String, Integer>]]
*/
public class TransformerProcessor<T, R> extends SubmissionPublisher<R> implements Flow.Processor<T, R> {
private Function function;
private Flow.Subscription subscription;
TransformerProcessor(Function<? super T, ? extends R> function) {
super();
this.function = function;
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(T item) {
submit((R) function.apply(item));
subscription.request(1);
}
@Override
public void onError(Throwable t) {
System.out.println("onError transformer:" + t.getMessage());
t.printStackTrace();
}
@Override
public void onComplete() {
close();
}
}
/**
* Using this class we will subscribe to the publisher and we will do the gateway to the subscriber, which it will
* subscribe to the Transformer class, all the items emitted by the publisher it will be passed to the transformer,
* and then to the subscriber.
*/
@Test
public void testTransformer() throws InterruptedException {
//Create Publisher
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
//Create Processor
TransformerProcessor<String, Integer> transformProcessor = new TransformerProcessor<>(Integer::parseInt);
//Chain Processor and Subscriber
transformProcessor.subscribe(new CustomSubscriber<>());
publisher.subscribe(transformProcessor);
System.out.println("Publishing Items...");
String[] items = {"1", "2", "3", "4"};
Arrays.stream(items).forEach(publisher::submit);
Thread.sleep(500);
publisher.close();
}
/**
* Like other reactive API onError it will invoked in case something in the emission of the item goes wrong
*/
@Test
public void testOnError() throws InterruptedException {
//Create Processor
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
TransformerProcessor<String, Integer> transformProcessor = new TransformerProcessor<>(Integer::parseInt);
//Chain Processor and Subscriber
transformProcessor.subscribe(new CustomSubscriber<>());
publisher.subscribe(transformProcessor);
System.out.println("Publishing Items...");
String[] items = {"wrong Number"};
Arrays.stream(items)
.forEach(publisher::submit);
Thread.sleep(500);
publisher.close();
}
}