-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathConditionalOperators.java
146 lines (131 loc) · 3.84 KB
/
ConditionalOperators.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package java_rxjava134;
import static java_rxjava134.Commons.DEFAULT_VALUE;
import static java_rxjava134.Commons.subscribePrint;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import rx.Observable;
/**
* Conditional operators
* 1. amb
* 2. ambWith
* 3. takeUntil
* 4. takeWhile
* 5. skipUntil
* 6. defaultIfEmpoty
*
* @author ÌÆÁú
*
*/
public class ConditionalOperators {
public static void main(String[] args) {
// amb
testAmb();
// ambWith
testAmbWith();
// takeUntil
testTakeUntil();
// takeWhile
testTakeWhile();
// skipUntil
testSkipUntil();
// defaultIfEmpoty
testDefaultIfEmpoty();
}
/** test for 'amb'
*
* It emits the items of the source Observable instance
* that starts emitting first. It doesn't matter what this
* is, whether OnError, OnCompleted notification, or data
*/
private static void testAmb() {
Observable<String> s1 = Observable.just("S11", "S12");
Observable<String> s2 = Observable.just("S21", "S22");
// 'words' will emite
subscribePrint(Observable.amb(s1, s2), "Amb 1");
Random r = new Random();
Observable<String> source1 = Observable
.just("data from source 1")
.delay(r.nextInt(1000), TimeUnit.MILLISECONDS);
Observable<String> source2 = Observable
.just("data from source 2")
.delay(r.nextInt(1000), TimeUnit.MILLISECONDS);
// 'source2' or 'source1' will emite according to their emiting time
subscribePrint(Observable.amb(source2, source1), "Amb 2");
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/** test for embWith */
private static void testAmbWith() {
Observable<String> s1 = Observable.just("S11", "S12");
Observable<String> s2 = Observable.just("S21", "S22");
// 'words' will emite
subscribePrint(s2.ambWith(s1), "Amb 1");
}
/** test takeUntil
*
* The first Observable will emits until the second start to emit
*/
private static void testTakeUntil() {
Observable<String> words = Observable
.just("I'm", "Tanglong", "I", "am", "learning", "RxJava")
.zipWith(
Observable.interval(100L, TimeUnit.MILLISECONDS),
(x, y) -> x
);
Observable<Long> interval = Observable.interval(800L, TimeUnit.MILLISECONDS);
subscribePrint(words.takeUntil(interval), "takeUntil");
try {
Thread.sleep(1500L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/** test takeWhile
*
* 1. The first Observable will emits while meet the second's conditions
* 2. It will keep emiting until encounter the first one that doesn't meet the conditions
*/
private static void testTakeWhile() {
Observable<String> words = Observable
.just("I'm", "Tanglong", "I", "am", "learning", "RxJava")
.zipWith(
Observable.interval(100L, TimeUnit.MILLISECONDS),
(x, y) -> x
);
subscribePrint(words.takeWhile(word -> word.length() > 2), "takeWhile");
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/** test skipUntil
*
* skip the items that meet the second's conditions
*/
private static void testSkipUntil() {
Observable<String> words = Observable
.just("I'm", "Tanglong", "I", "am", "learning", "RxJava")
.zipWith(
Observable.interval(100L, TimeUnit.MILLISECONDS),
(x, y) -> x
);
Observable<Long> interval = Observable.interval(200L, TimeUnit.MILLISECONDS);
subscribePrint(words.skipUntil(interval), "skipUntil");
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/** test for defaultIfEmpoty */
private static void testDefaultIfEmpoty() {
Observable<Object> test = Observable
.empty()
.defaultIfEmpty(DEFAULT_VALUE);
subscribePrint(test, "defaultIfEmpty");
}
}