File tree 5 files changed +173
-4
lines changed
5 files changed +173
-4
lines changed Original file line number Diff line number Diff line change @@ -14,22 +14,22 @@ public class CommonObserver implements Observer<Integer> {
14
14
15
15
16
16
public void onSubscribe (Disposable d ) {
17
- System .out .println ("onSubscribe: ---> " );
17
+ System .out .println ("--- onSubscribe---" );
18
18
}
19
19
20
20
@ Override
21
21
public void onNext (Integer integer ) {
22
- System .out .println ("Next : " + integer );
22
+ System .out .println ("onNext : " + integer );
23
23
}
24
24
25
25
@ Override
26
26
public void onError (Throwable error ) {
27
- System .err .println ("Error : " + error .getMessage ());
27
+ System .err .println ("onError : " + error .getMessage ());
28
28
}
29
29
30
30
@ Override
31
31
public void onComplete () {
32
- System .out .println ("Sequence complete. " );
32
+ System .out .println ("---onComplete--- " );
33
33
}
34
34
35
35
Original file line number Diff line number Diff line change
1
+ package Observables .Subject ;
2
+
3
+ import io .reactivex .Observable ;
4
+ import io .reactivex .functions .Consumer ;
5
+ import io .reactivex .subjects .AsyncSubject ;
6
+
7
+ /**
8
+ * Author: andy.xwt
9
+ * Date: 2019/11/20 00:28
10
+ * Description:
11
+ * 一个AsyncSubject只在原始Observable完成后,发射来自原始Observable的最后一个值。(如果原始Observable没有发射任何值,
12
+ * AsyncObject也不发射任何值)它会把这最后一个值发射给任何后续的观察者。
13
+ * 如果原始的Observable因为发生了错误而终止,AsyncSubject将不会发射任何数据,只是简单的向前传递这个错误通知。
14
+ * https://mcxiaoke.gitbooks.io/rxdocs/content/Subject.html
15
+ */
16
+
17
+
18
+ class AsyncSubjectOpe {
19
+
20
+ public static void main (String [] args ) {
21
+ AsyncSubject <Integer > subject = AsyncSubject .create ();
22
+ Observable .range (1 , 4 ).subscribe (subject );
23
+
24
+ //第一个观察者可以收到4事件
25
+ subject .subscribe (new Consumer <Integer >() {
26
+ @ Override
27
+ public void accept (Integer integer ) throws Exception {
28
+ System .out .println ("first observer-->" +integer );
29
+ }
30
+ });
31
+
32
+ //第二个观察者只能收到4事件
33
+ subject .subscribe (new Consumer <Integer >() {
34
+ @ Override
35
+ public void accept (Integer integer ) throws Exception {
36
+ System .out .println ("second observer-->" +integer );
37
+ }
38
+ });
39
+ }
40
+ }
Original file line number Diff line number Diff line change
1
+ package Observables .Subject ;
2
+
3
+ import io .reactivex .functions .Consumer ;
4
+ import io .reactivex .subjects .BehaviorSubject ;
5
+
6
+ /**
7
+ * Author: andy.xwt
8
+ * Date: 2019/11/20 00:18
9
+ * Description:
10
+ * 当Observer订阅了一个BehaviorSubject,
11
+ * 它一开始就会释放Observable最近释放的一个数据对象,当还没有任何数据释放时,它则是一个默认值。接下来就会释放Observable释放的所有数据。
12
+ * 如果Observable因异常终止,BehaviorSubject将不会向后续的Observer释放数据,但是会向Observer传递一个异常通知。
13
+ * https://mcxiaoke.gitbooks.io/rxdocs/content/Subject.html
14
+ */
15
+
16
+
17
+ class BehaviorSubjectOpe {
18
+
19
+ public static void main (String [] args ) {
20
+ BehaviorSubject <Integer > subject = BehaviorSubject .createDefault (0 );
21
+
22
+ //第一个观察者可以收到0,1,2,3,4事件
23
+ subject .subscribe (new Consumer <Integer >() {
24
+ @ Override
25
+ public void accept (Integer integer ) throws Exception {
26
+ System .out .println ("first observer-->" + integer );
27
+ }
28
+ });
29
+
30
+ subject .onNext (1 );
31
+ subject .onNext (2 );
32
+
33
+ //第一个观察者可以收到2,3,4事件
34
+ subject .subscribe (new Consumer <Integer >() {
35
+ @ Override
36
+ public void accept (Integer integer ) throws Exception {
37
+ System .out .println ("second observer-->" + integer );
38
+ }
39
+ });
40
+ subject .onNext (3 );
41
+ subject .onNext (4 );
42
+
43
+
44
+ }
45
+ }
Original file line number Diff line number Diff line change
1
+ package Observables .Subject ;
2
+
3
+ import io .reactivex .functions .Consumer ;
4
+ import io .reactivex .subjects .PublishSubject ;
5
+
6
+ /**
7
+ * Author: andy.xwt
8
+ * Date: 2019/11/19 23:50
9
+ * Description:
10
+ * PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者。需要注意的是,
11
+ * PublishSubject可能会一创建完成就立刻开始发射数据(除非你可以阻止它发生),
12
+ * 因此这里有一个风险:在Subject被创建后到有观察者订阅它之前这个时间段内,一个或多个数据可能会丢失。
13
+ * 如果要确保来自原始Observable的所有数据都被分发,你需要这样做:
14
+ * 或者使用Create创建那个Observable以便手动给它引入"冷"Observable的行为(当所有观察者都已经订阅时才开始发射数据),
15
+ * 或者改用ReplaySubject。
16
+ * https://mcxiaoke.gitbooks.io/rxdocs/content/Subject.html
17
+ */
18
+
19
+
20
+ class PublishSubjectOpe {
21
+
22
+ public static void main (String [] args ) {
23
+ PublishSubject <Integer > subject = PublishSubject .create ();
24
+ //第一个观察者可以收到1,2,3事件
25
+ subject .subscribe (new Consumer <Integer >() {
26
+ @ Override
27
+ public void accept (Integer integer ) throws Exception {
28
+ System .out .println ("first observer-->" +integer );
29
+ }
30
+ });
31
+ subject .onNext (1 );
32
+ subject .onNext (2 );
33
+
34
+ //第二个观察者只能收到3事件
35
+ subject .subscribe (new Consumer <Integer >() {
36
+ @ Override
37
+ public void accept (Integer integer ) throws Exception {
38
+ System .out .println ("second observer-->" +integer );
39
+ }
40
+ });
41
+ subject .onNext (3 );
42
+ }
43
+ }
Original file line number Diff line number Diff line change
1
+ package Observables .Subject ;
2
+
3
+ import io .reactivex .Observable ;
4
+ import io .reactivex .functions .Consumer ;
5
+ import io .reactivex .subjects .ReplaySubject ;
6
+
7
+ /**
8
+ * Author: andy.xwt
9
+ * Date: 2019/11/19 23:59
10
+ * Description:
11
+ * 该Subject会接收数据,当被订阅时,将所有接收到的数据全部发送给订阅者。
12
+ * ReplaySubject会发射所有来自原始Observable的数据给观察者,无论它们是何时订阅的。
13
+ * 也有其它版本的ReplaySubject,在重放缓存增长到一定大小的时候或过了一段时间后会丢弃旧的数据(原始Observable发射的)。
14
+ * https://mcxiaoke.gitbooks.io/rxdocs/content/Subject.html
15
+ */
16
+
17
+
18
+ class ReplaySubjectOpe {
19
+
20
+ public static void main (String [] args ) {
21
+ ReplaySubject <Integer > subject = ReplaySubject .create ();
22
+ Observable .range (1 , 4 ).subscribe (subject );
23
+
24
+ //第一个观察者可以收到1,2,3,4事件
25
+ subject .subscribe (new Consumer <Integer >() {
26
+ @ Override
27
+ public void accept (Integer integer ) throws Exception {
28
+ System .out .println ("first observer-->" +integer );
29
+ }
30
+ });
31
+
32
+ //第二个观察者只能收到1,2,3,4事件
33
+ subject .subscribe (new Consumer <Integer >() {
34
+ @ Override
35
+ public void accept (Integer integer ) throws Exception {
36
+ System .out .println ("second observer-->" +integer );
37
+ }
38
+ });
39
+
40
+ }
41
+ }
You can’t perform that action at this time.
0 commit comments