-
-
Notifications
You must be signed in to change notification settings - Fork 57
/
ObservableAsynchronous.java
247 lines (215 loc) · 9.06 KB
/
ObservableAsynchronous.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
package rx.observables.scheduler;
import org.junit.Test;
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.functions.Action1;
import rx.observers.TestSubscriber;
import rx.schedulers.Schedulers;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* @author Pablo Perez
* Using the option subscribeOn or observerOn, you specify in your observable that you want to execute all/some pipeline steps into another thread,
* making the pipeline asyncronious
*/
public class ObservableAsynchronous {
int total = 0;
Integer[] numbers = {0, 1, 2, 3, 4};
//************************DIFFERENCE BETWEEN subscribeOn AND observerOn***********************\\
/**
* Once that you set in your pipeline the observerOn all the next steps of your pipeline will be executed in another thread.
* Shall print
* First step main
* Second step RxNewThreadScheduler-2
* Third step RxNewThreadScheduler-1
*/
@Test
public void testObservableObserverOn() throws InterruptedException {
Subscription subscription = Observable.just(1)
.doOnNext(number -> System.out.println("First step " + Thread.currentThread()
.getName()))
.observeOn(Schedulers.newThread())
.doOnNext(number -> System.out.println("Second step " + Thread.currentThread()
.getName()))
.observeOn(Schedulers.newThread())
.doOnNext(number -> System.out.println("Third step " + Thread.currentThread()
.getName()))
.subscribe();
new TestSubscriber((Observer) subscription)
.awaitTerminalEvent(100, TimeUnit.MILLISECONDS);
}
/**
* Does not matter at what point in your pipeline you set your subscribeOn, once that is set in the pipeline,
* all steps will be executed in another thread.
* Shall print
* First step RxNewThreadScheduler-1
* Second step RxNewThreadScheduler-1
*/
@Test
public void testObservableSubscribeOn() throws InterruptedException {
Subscription subscription = Observable.just(1)
.doOnNext(number -> System.out.println("First step " + Thread.currentThread()
.getName()))
.subscribeOn(Schedulers.newThread())
.doOnNext(number -> System.out.println("Second step " + Thread.currentThread()
.getName()))
.subscribe();
new TestSubscriber((Observer) subscription)
.awaitTerminalEvent(100, TimeUnit.MILLISECONDS);
}
/**
* Combining subscribeOn and observerOn it´s possible, and one can take the control over the other
* In this example since we define observerOn, everything before this operator it will be executed in the observerOn thread defined,
* After that, when we use the subscribeOn operator, the rest of the step it will be executed in the defined thread.
*
* @throws InterruptedException
*/
@Test
public void testObservableObservableOnAndSubscribeOn() throws InterruptedException {
Subscription subscription = Observable.just(1)
.doOnNext(number -> System.out.println("First step " + Thread.currentThread()
.getName()))
.observeOn(Schedulers.newThread())
.doOnNext(number -> System.out.println("Second step " + Thread.currentThread()
.getName()))
.doOnNext(number -> System.out.println("Third step " + Thread.currentThread()
.getName()))
.subscribeOn(Schedulers.newThread())
.subscribe();
new TestSubscriber((Observer) subscription)
.awaitTerminalEvent(100, TimeUnit.MILLISECONDS);
}
/**
* Combining subscribeOn and observerOn it´s possible, and one can override the other
*
* @throws InterruptedException
*/
@Test
public void testObservableSubscribeOnAndObserverOn() throws InterruptedException {
Subscription subscription = Observable.just(1)
.doOnNext(number -> System.out.println("First step " + Thread.currentThread()
.getName()))
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.newThread())
.doOnNext(number -> System.out.println("Second step " + Thread.currentThread()
.getName()))
.subscribe();
new TestSubscriber((Observer) subscription)
.awaitTerminalEvent(100, TimeUnit.MILLISECONDS);
}
//************************DIFFERENCE BETWEEN ASYNC AND SYNC OBSERVABLE***************************\\
/**
* In this test we prove how when we subscribe constantClass observable using scheduler, this one is executed in another thread,
* and total is in the scope of every thread.
* Shall print
* <p>
* I finish before the observable finish. Items emitted:0
*/
@Test
public void testObservableAsync() throws InterruptedException {
Subscription subscription = Observable.from(numbers)
.doOnNext(increaseTotalItemsEmitted())
.subscribeOn(Schedulers.newThread())
.subscribe(number -> System.out.println("Items emitted:" + total));
System.out.println("I finish before the observable finish. Items emitted:" + total);
new TestSubscriber((Observer) subscription)
.awaitTerminalEvent(100, TimeUnit.MILLISECONDS);
}
/**
* In this test we prove how when we subscribe constantClass observable and we not use subscribeOn, this one is executed in the main thread.
* And total is in the scope of both
* Shall print
* <p>
* Items emitted:0
* Items emitted:1
* Items emitted:3
* Items emitted:6
* Items emitted:10
* I finish after the observable finish. Items emitted:10
*/
@Test
public void testObservableSync() {
Observable.from(numbers)
.doOnNext(increaseTotalItemsEmitted())
.subscribe(number -> System.out.println("Items emitted:" + total));
System.out.println("I finish after the observable finish. Items emitted:" + total);
}
private Action1<Integer> increaseTotalItemsEmitted() {
return number -> {
try {
total += number;
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
};
}
//***********************+DIFFERENCE BETWEEN SCHEDULERS ********************************\\
/**
* scheduler.io() implementation backed to thread-pool that will grow as needed. So,
* we don’t know which thread will be used whenever we call Schedulers.io()
*/
@Test
public void io() {
Observable.just(1, 2, 3, 4, 5)
.subscribeOn(Schedulers.io())
.subscribe(onNext);
Observable.just(6, 7, 8, 9, 10)
.subscribeOn(Schedulers.io())
.subscribe(onNext);
}
/**
* All jobs that subscribes on trampoline() will be queued and excuted one by one
*/
@Test
public void trampoline() {
Observable.just(2, 4, 6, 8, 10)
.subscribeOn(Schedulers.trampoline())
.subscribe(onNext);
Observable.just(1, 3, 5, 7, 9)
.subscribeOn(Schedulers.trampoline())
.subscribe(onNext);
}
Action1<Integer> onNext = integer -> System.out.println("Number = " + integer);
/**
* Using flatMap and subscribeOn we can execute an item per thread
*/
@Test
public void itemPerThread() {
Observable.just(1, 2)
.flatMap(item -> Observable.just(item)
.subscribeOn(Schedulers.newThread())
.doOnNext(i -> System.out.println("Thread:" + Thread.currentThread())))
.subscribe(System.out::println);
}
@Test
public void testBackToMainThread() throws InterruptedException {
processValue(1);
processValue(2);
processValue(3);
processValue(4);
processValue(5);
Thread.sleep(3000);
while (tasks.size() != 0) {
tasks.take().run();
}
System.out.println("done");
}
private LinkedBlockingQueue<Runnable> tasks = new LinkedBlockingQueue<>();
private void processValue(int value){
Observable.just(value)
.subscribeOn(Schedulers.io())
.doOnNext(number -> processExecution())
.observeOn(Schedulers.from(command -> tasks.add(command)))
.subscribe(x -> System.out.println("Thread:" + Thread.currentThread().getName() + " value:" + x));
}
private void processExecution() {
System.out.println("Execution in " + Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}