Skip to content

Commit e46ea60

Browse files
committed
修改了包名,并增加了Distinct相关操作符
1 parent f06a517 commit e46ea60

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+209
-64
lines changed

src/main/java/Common/CommonObserver.java src/main/java/common/CommonObserver.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package Common;
1+
package common;
22

33
import io.reactivex.Observer;
44
import io.reactivex.disposables.Disposable;

src/main/java/common/Shape.java

+20
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package common;
2+
3+
import com.sun.prism.paint.Color;
4+
5+
/**
6+
* Author: andy.xwt
7+
* Date: 2019/11/28 19:15
8+
* Description:
9+
*/
10+
11+
12+
public class Shape {
13+
public Color color;
14+
public String type;
15+
16+
public Shape(Color color, String type) {
17+
this.color = color;
18+
this.type = type;
19+
}
20+
}

src/main/java/Backpressure/BackPressureOperator.java src/main/java/rxjava/backpressure/BackPressureOperator.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package Backpressure;
1+
package rxjava.backpressure;
22

33
import org.reactivestreams.Subscriber;
44
import org.reactivestreams.Subscription;

src/main/java/Backpressure/CommonSubscriber.java src/main/java/rxjava/backpressure/CommonSubscriber.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package Backpressure;
1+
package rxjava.backpressure;
22

33
import org.reactivestreams.Subscriber;
44
import org.reactivestreams.Subscription;

src/main/java/Combining/CombineLatest.java src/main/java/rxjava/combining/CombineLatest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package Combining;
1+
package rxjava.combining;
22

33
import java.util.concurrent.TimeUnit;
44

src/main/java/Combining/JoinOperator.java src/main/java/rxjava/combining/JoinOperator.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package Combining;
1+
package rxjava.combining;
22

33
import java.util.concurrent.TimeUnit;
44

src/main/java/Combining/MergeOperator.java src/main/java/rxjava/combining/MergeOperator.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package Combining;
1+
package rxjava.combining;
22

33
import java.util.concurrent.TimeUnit;
44

src/main/java/Combining/StartWithOperator.java src/main/java/rxjava/combining/StartWithOperator.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
package Combining;
1+
package rxjava.combining;
22

3-
import Common.CommonObserver;
3+
import common.CommonObserver;
44
import io.reactivex.Observable;
55

66
/**

src/main/java/Combining/ZipOperator.java src/main/java/rxjava/combining/ZipOperator.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package Combining;
1+
package rxjava.combining;
22

33
import java.util.concurrent.TimeUnit;
44

src/main/java/ConditionalAndBoolean/TakeUntilOperator.java src/main/java/rxjava/conditional/TakeUntilOperator.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package ConditionalAndBoolean;
1+
package rxjava.conditional;
22

33
import java.util.concurrent.TimeUnit;
44

src/main/java/Creating/CreateOperator.java src/main/java/rxjava/creating/CreateOperator.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1-
package Creating;
1+
package rxjava.creating;
22

33
import io.reactivex.Observable;
44
import io.reactivex.ObservableEmitter;
55
import io.reactivex.ObservableOnSubscribe;
6-
import Common.CommonObserver;
6+
import common.CommonObserver;
77

88
/**
99
* Author: andy.xwt

src/main/java/Creating/DeferOperator.java src/main/java/rxjava/creating/DeferOperator.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
1-
package Creating;
1+
package rxjava.creating;
22

33
import java.util.concurrent.Callable;
44

55
import io.reactivex.Observable;
66
import io.reactivex.ObservableSource;
7-
import Common.CommonObserver;
7+
import common.CommonObserver;
88

99
/**
1010
* Author: andy.xwt

src/main/java/Creating/FromOperator.java src/main/java/rxjava/creating/FromOperator.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
1-
package Creating;
1+
package rxjava.creating;
22

33
import java.util.Arrays;
44
import java.util.List;
55

66
import io.reactivex.Observable;
7-
import Common.CommonObserver;
7+
import common.CommonObserver;
88

99
/**
1010
* Author: andy.xwt

src/main/java/Creating/IntervalOperator.java src/main/java/rxjava/creating/IntervalOperator.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package Creating;
1+
package rxjava.creating;
22

33
import java.util.concurrent.TimeUnit;
44

src/main/java/Creating/JustOperator.java src/main/java/rxjava/creating/JustOperator.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package Creating;
1+
package rxjava.creating;
22

33
import io.reactivex.Observable;
44
import io.reactivex.functions.Consumer;

src/main/java/Creating/NeverEmptyOperator.java src/main/java/rxjava/creating/NeverEmptyOperator.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
package Creating;
1+
package rxjava.creating;
22

33
import io.reactivex.Observable;
4-
import Common.CommonObserver;
4+
import common.CommonObserver;
55

66
/**
77
* Author: andy.xwt

src/main/java/Creating/RangeOperator.java src/main/java/rxjava/creating/RangeOperator.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
package Creating;
1+
package rxjava.creating;
22

33
import io.reactivex.Observable;
4-
import Common.CommonObserver;
4+
import common.CommonObserver;
55

66
/**
77
* Author: andy.xwt

src/main/java/Creating/RepeatOperator.java src/main/java/rxjava/creating/RepeatOperator.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package Creating;
1+
package rxjava.creating;
22

33
import io.reactivex.Observable;
44
import io.reactivex.functions.BooleanSupplier;

src/main/java/Creating/StartOperator.java src/main/java/rxjava/creating/StartOperator.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1-
package Creating;
1+
package rxjava.creating;
22

33
import java.util.concurrent.Callable;
44

55
import io.reactivex.Observable;
6-
import Common.CommonObserver;
6+
import common.CommonObserver;
77

88
/**
99
* Author: andy.xwt

src/main/java/Creating/TimerOperator.java src/main/java/rxjava/creating/TimerOperator.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package Creating;
1+
package rxjava.creating;
22

33
import java.util.concurrent.TimeUnit;
44

src/main/java/ErrorHanding/ErrorOperator.java src/main/java/rxjava/error/ErrorOperator.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
package ErrorHanding;
1+
package rxjava.error;
22

3-
import Common.CommonObserver;
3+
import common.CommonObserver;
44
import io.reactivex.Observable;
55
import io.reactivex.ObservableEmitter;
66
import io.reactivex.ObservableOnSubscribe;

src/main/java/ErrorHanding/RetryOperator.java src/main/java/rxjava/error/RetryOperator.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
package ErrorHanding;
1+
package rxjava.error;
22

3-
import Common.CommonObserver;
3+
import common.CommonObserver;
44
import io.reactivex.Observable;
55
import io.reactivex.ObservableEmitter;
66
import io.reactivex.ObservableOnSubscribe;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
package rxjava.filtering;
2+
3+
4+
import com.sun.prism.paint.Color;
5+
6+
import java.util.Arrays;
7+
import java.util.List;
8+
9+
import common.CommonObserver;
10+
import common.Shape;
11+
import io.reactivex.Observable;
12+
import io.reactivex.ObservableEmitter;
13+
import io.reactivex.ObservableOnSubscribe;
14+
import io.reactivex.functions.Consumer;
15+
import io.reactivex.functions.Function;
16+
17+
/**
18+
* Author: andy.xwt
19+
* Date: 2019/11/28 19:07
20+
* Description:抑制(过滤掉)重复的数据项
21+
* https://mcxiaoke.gitbooks.io/rxdocs/content/operators/Distinct.html
22+
*/
23+
24+
25+
class DistinctOperator {
26+
27+
28+
public static List<common.Shape> mShapeLis = Arrays.asList(
29+
new common.Shape(Color.RED, "circular"),//红色圆形
30+
new common.Shape(Color.GREEN, "triangle"),//绿色三角形
31+
new common.Shape(Color.RED, "rhombus"),//红色菱形
32+
new common.Shape(Color.BLUE, "rhombus"),//蓝色菱形
33+
new common.Shape(Color.BLUE, "triangle"),//蓝色三角形
34+
new common.Shape(Color.WHITE, "triangle")//白色三角形
35+
);
36+
37+
38+
/**
39+
* Distinct的过滤规则是:只允许还没有发射过的数据项通过。
40+
* <p>
41+
* 在某些实现中,有一些变体允许你调整判定两个数据不同(distinct)的标准。还有一些实现只比较一项数据和它的直接前驱,因此只会从序列中过滤掉连续重复的数据。
42+
*/
43+
public static void testDistinct() {
44+
//输出1,2,3
45+
Observable.just(1, 2, 1, 1, 2, 3)
46+
.distinct()
47+
.subscribe(new CommonObserver());
48+
49+
}
50+
51+
52+
/**
53+
* 这个操作符有一个变体接受一个函数。这个函数根据原始Observable发射的数据项产生一个Key,然后,比较这些Key而不是数据本身,来判定两个数据是否是不同的。
54+
* http://reactivex.io/RxJava/javadoc/rx/Observable.html#distinct(rx.functions.Func1
55+
* 输出:
56+
* new common.Shape(Color.RED, "circular"),//红色圆形
57+
* new common.Shape(Color.GREEN, "triangle"),//绿色三角形
58+
* new common.Shape(Color.RED, "rhombus"),//红色菱形
59+
*/
60+
public static void testDistinctFun() {
61+
Observable.create(new ObservableOnSubscribe<Shape>() {
62+
@Override
63+
public void subscribe(ObservableEmitter<Shape> emitter) throws Exception {
64+
for (Shape shape : mShapeLis) {
65+
emitter.onNext(shape);
66+
}
67+
}
68+
}).distinct(new Function<Shape, String>() {
69+
@Override
70+
public String apply(Shape shape) throws Exception {
71+
return shape.type;//其实默认传入的是hashSet,这里返回的值就是hashset中的key
72+
}
73+
}).subscribe(new Consumer<Shape>() {
74+
@Override
75+
public void accept(Shape shape) throws Exception {
76+
System.out.println(shape.type + "---->" + shape.color);
77+
}
78+
});
79+
}
80+
81+
/**
82+
* RxJava还是实现了一个distinctUntilChanged操作符。
83+
* 它只判定一个数据和它的直接前驱是否是不同的!!!!!!!!!!!!!
84+
* 它只判定一个数据和它的直接前驱是否是不同的!!!!!!!!!!!!!
85+
* 它只判定一个数据和它的直接前驱是否是不同的!!!!!!!!!!!!!
86+
* 和distinct(Func1)一样,根据一个函数产生的Key判定两个相邻的数据项是不是不同的
87+
* distinct和distinctUntilChanged默认不在任何特定的调度器上执行。
88+
* <p>
89+
* 输出:
90+
* new common.Shape(Color.RED, "circular"),//红色圆形
91+
* new common.Shape(Color.GREEN, "triangle"),//绿色三角形
92+
* new common.Shape(Color.RED, "rhombus"),//红色菱形
93+
* new common.Shape(Color.BLUE, "rhombus"),//蓝色菱形
94+
* new common.Shape(Color.WHITE, "triangle")//白色三角形
95+
*/
96+
public static void testDistinctUntilChanged() {
97+
Observable.create(new ObservableOnSubscribe<Shape>() {
98+
@Override
99+
public void subscribe(ObservableEmitter<Shape> emitter) throws Exception {
100+
for (Shape shape : mShapeLis) {
101+
emitter.onNext(shape);
102+
}
103+
}
104+
//判断的是上一个节点
105+
}).distinctUntilChanged(new Function<Shape, Color>() {
106+
@Override
107+
public Color apply(Shape shape) throws Exception {
108+
return shape.color;
109+
110+
}
111+
}).subscribe(new Consumer<Shape>() {
112+
@Override
113+
public void accept(Shape shape) throws Exception {
114+
System.out.println(shape.type + "---->" + shape.color);
115+
}
116+
});
117+
}
118+
119+
public static void main(String[] args) {
120+
// testDistinct();
121+
testDistinctFun();
122+
// testDistinctUntilChanged();
123+
}
124+
125+
126+
}

src/main/java/Filtering/FilterOperator.java src/main/java/rxjava/filtering/FilterOperator.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
package Filtering;
1+
package rxjava.filtering;
22

3-
import Common.CommonObserver;
3+
import common.CommonObserver;
44
import io.reactivex.Observable;
55
import io.reactivex.functions.Predicate;
66

src/main/java/Filtering/FirstOperator.java src/main/java/rxjava/filtering/FirstOperator.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package Filtering;
1+
package rxjava.filtering;
22

33
import io.reactivex.Observable;
44
import io.reactivex.functions.Consumer;

src/main/java/Filtering/OfTypeOperator.java src/main/java/rxjava/filtering/OfTypeOperator.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package Filtering;
1+
package rxjava.filtering;
22

33
import io.reactivex.Observable;
44
import io.reactivex.ObservableEmitter;

src/main/java/Filtering/SampleOperator.java src/main/java/rxjava/filtering/SampleOperator.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package Filtering;
1+
package rxjava.filtering;
22

33
import java.util.concurrent.TimeUnit;
44

src/main/java/Filtering/TakeOperator.java src/main/java/rxjava/filtering/TakeOperator.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
package Filtering;
1+
package rxjava.filtering;
22

33
import java.util.concurrent.TimeUnit;
44

5-
import Common.CommonObserver;
5+
import common.CommonObserver;
66
import io.reactivex.Observable;
77

88
/**

src/main/java/Mathematical/ConcatOperator.java src/main/java/rxjava/mathematical/ConcatOperator.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
1-
package Mathematical;
1+
package rxjava.mathematical;
22

3-
import Common.CommonObserver;
3+
import common.CommonObserver;
44
import io.reactivex.Observable;
55

66
/**
77
* Author: andy.xwt
88
* Date: 2019-02-06 00:33
99
* Description:concat操作符 将多个被观察者发送的事件组合起来
10-
* 与{@link Combining.MergeOperator}的区别,是按照被观察者的顺序一个一个发送事件的。不会出现抢占的情况
10+
* 与{@link rxjava.combining.MergeOperator}的区别,是按照被观察者的顺序一个一个发送事件的。不会出现抢占的情况
1111
*/
1212

1313
public class ConcatOperator {
@@ -26,7 +26,7 @@ static void testConcatArray() {
2626
}
2727
//delayError的作用是当整个事件流出现异常时,不会中断事件的传递,而是等到正常事件发送完毕后,才发送错误事件
2828
static void testConcatDelayError() {
29-
Observable.concatArrayDelayError(Observable.range(1, 5), Observable.error(new RuntimeException("error")), Observable.range(5, 5))
29+
Observable.concatArrayDelayError(Observable.range(1, 5), Observable.error(new RuntimeException("rxjava/error")), Observable.range(5, 5))
3030
.subscribe(new CommonObserver());
3131

3232
}

src/main/java/Observables/CompletableObservable.java src/main/java/rxjava/observables/CompletableObservable.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
package Observables;
1+
package rxjava.observables;
22

3-
import Common.CommonObserver;
3+
import common.CommonObserver;
44
import io.reactivex.Completable;
55
import io.reactivex.CompletableEmitter;
66
import io.reactivex.CompletableOnSubscribe;
@@ -24,7 +24,7 @@ private static void normalUse() {
2424
@Override
2525
public void subscribe(CompletableEmitter emitter) throws Exception {
2626
// emitter.onComplete();
27-
emitter.onError(new Throwable("error"));
27+
emitter.onError(new Throwable("rxjava/error"));
2828
}
2929
}).subscribe(new Action() {
3030
@Override

0 commit comments

Comments
 (0)