本篇文章已授权微信公众号 YYGeeker
独家发布转载请标明出处
CSDN学院课程地址
- RxJava2从入门到精通-初级篇:
- RxJava2从入门到精通-中级篇:
- RxJava2从入门到精通-进阶篇:
- RxJava2从入门到精通-源码分析篇:
3. RxJava操作符
RxJava操作符也是其精髓之一,可以通过一个简单的操作符,实现复杂的业务逻辑,甚至还可以将操作符组合起来(即RxJava的组合过程),完成更为复杂的业务需求。比如我们前面用到的.create()
,.subscribeOn()
,.observeOn()
,.subscribe()
都是RxJava的操作符之一,下面我们将对RxJava的操作符进行分析
掌握RxJava操作符前,首先要学会看得懂RxJava的图片,图片是RxJava主导的精髓,下面我们通过例子说明
这张图片我们先要分清楚概念上的东西,上下两行横向的直线区域代表着事件流,上面一行(上游)是我们的被观察者Observable
,下面一行(下游)是我们的观察者Observer
,事件流就是从上游的被观察者发送给下游的观察者的。而中间一行的flatMap区域则是我们的操作符部分,它可以对我们的数据进行变换操作。最后,数据流则是图片上的圆形、方形、菱形等区域,也是从上游流向下游的,不同的形状代表着不同的数据类型
这张图片并不是表示没有被观察者Observable
,而是Create方法本身就是创建了被观察者,所以可以将被观察者的上游省略。在进行事件的onNext()
分发后,执行onComplete()
事件,这样就表示事件流已经结束,后续如果上游继续发事件,则下游表示不接收。当事件流的onCompleted()
或者onError()
正好被调用过一次后,此后就不能再调用观察者的任何其它回调方法
在理解RxJava操作符之前,需要将这几个概念弄明白,整个操作符的章节都是围绕这几个概念进行的
- 事件流:通过发射器发射的事件,从发射事件到结束事件的过程,这一过程称为事件流
- 数据流:通过发射器发射的数据,从数据输入到数据输出的过程,这一过程称为数据流
- 被观察者:事件流的上游,即
Observable
,事件流开始的地方和数据流发射的地方 - 观察者:事件流的下游,即
Observer
,事件流结束的地方和数据流接收的地方
3.1 Creating Observables (创建操作符)
1、create
Observable
最原始的创建方式,创建出一个最简单的事件流,可以使用发射器发射特定的数据类型
public static void main(String[] args) { Observable .create(new ObservableOnSubscribe() { @Override public void subscribe(@NonNull ObservableEmitter e) throws Exception { for (int i = 1; i < 5; i++) { e.onNext(i); } e.onComplete(); } }) .subscribe(new Consumer () { @Override public void accept(Integer integer) throws Exception { System.out.println("onNext=" + integer); } }, new Consumer () { @Override public void accept(Throwable throwable) throws Exception { } }, new Action() { @Override public void run() throws Exception { System.out.println("onComplete"); } });}复制代码
输出
onNext=1onNext=2onNext=3onNext=4onComplete复制代码
2、from
创建一个事件流并发出特定类型的数据流,其发射的数据流类型有如下几个操作符
public static void main(String[] args) { Observable.fromArray(new Integer[]{1, 2, 3, 4, 5}) .subscribe(new Consumer() { @Override public void accept(Integer integer) throws Exception { System.out.println("onNext=" + integer); } });}复制代码
输出
onNext=1onNext=2onNext=3onNext=4onNext=5复制代码
3、just
just操作符和from操作符很像,只是方法的参数有所差别,它可以接受多个参数
public static void main(String[] args) { Observable.just(1, 2, 3, 4, 5) .subscribe(new Consumer() { @Override public void accept(Integer integer) throws Exception { System.out.println("onNext=" + integer); } });}复制代码
输出
onNext=1onNext=2onNext=3onNext=4onNext=5复制代码
4、defer
defer与just的区别是,just是直接将发射当前的数据流,而defer会等到订阅的时候,才会去执行它的call()回调,再去发射当前的数据流。复杂点的理解就是:defer操作符是将一组数据流在原有的事件流基础上缓存一个新的事件流,直到有人订阅的时候,才会创建它缓存的事件流
public static void main(String[] args) { i = 10; Observablejust = Observable.just(i, i); Observable
输出
onNext=10onNext=10onNext=15onNext=15onNext=20onNext=20复制代码
5、interval
interval操作符是按固定的时间间隔发射一个无限递增的整数数据流,由于这段代码的的延时操作都是非阻塞型的,所以在Java上运行会导致JVM的立马停止,只能把这段代码放在Android来运行,interval默认在computation调度器上执行
public void interval() { Observable.interval(1, TimeUnit.SECONDS) .subscribe(new Consumer() { @Override public void accept(Long aLong) throws Exception { System.out.println("onNext=" + aLong); } });}复制代码
输出
onNext=1onNext=2onNext=3onNext=4......复制代码
6、range
range操作符发射一个范围内的有序整数数据流,你可以指定范围的起始和长度
public static void main(String[] args) { Observable.range(1, 5) .subscribe(new Consumer() { @Override public void accept(Integer integer) throws Exception { System.out.println("onNext=" + integer); } });}复制代码
输出
onNext=1onNext=2onNext=3onNext=4onNext=5复制代码
7、repeat
repeat操作符可以重复发送指定次数的某个事件流,repeat操作符默认在trampoline调度器上执行
public static void main(String[] args) { Observable.just(1).repeat(5) .subscribe(new Consumer() { @Override public void accept(Integer integer) throws Exception { System.out.println("onNext=" + integer); } });}复制代码
输出
onNext=1onNext=1onNext=1onNext=1onNext=1复制代码
8、timer
timer操作符可以创建一个延时的事件流,由于这段代码的的延时操作都是非阻塞型的,所以在Java上运行会导致JVM的立马停止,只能把这段代码放在Android来运行,默认在computation调度器上执行
public void timer() { Observable.timer(5, TimeUnit.SECONDS) .subscribe(new Consumer() { @Override public void accept(Long aLong) throws Exception { System.out.println("onNext=" + aLong); } });}复制代码
输出
onNext=0复制代码
9、小结
- create():创建最简单的事件流
- from():创建事件流,可发送不同类型的数据流
- just():创建事件流,可发送多个参数的数据流
- defer():创建事件流,可缓存可激活事件流
- interval():创建延时重复的事件流
- range():创建事件流,可发送范围内的数据流
- repeat():创建可重复次数的事件流
- timer():创建一次延时的事件流
补充:interval()、timer()、delay()的区别
- interval():用于创建事件流,周期性重复发送
- timer():用于创建事件流,延时发送一次
- delay():用于事件流中,可以延时某次事件流的发送
3.2 Transforming Observables (转换操作符)
1、map
map操作符可以将数据流进行类型转换
public static void main(String[] args) { Observable.just(1).map(new Function() { @Override public String apply(Integer integer) throws Exception { return "发送过来的数据会被变成字符串" + integer; } }) .subscribe(new Consumer () { @Override public void accept(String s) throws Exception { System.out.println("onNext=" + s); } });}复制代码
输出
onNext=发送过来的数据会被变成字符串1复制代码
2、flatMap
flatMap操作符将数据流进行类型转换,然后将新的数据流传递给新的事件流进行分发,这里通过模拟请求登录的延时操作进行说明,由于这段代码的的延时操作都是非阻塞型的,所以在Java上运行会导致JVM的立马停止,只能把这段代码放在Android来运行
public void flatMap() { Observable.just(new UserParams("hensen", "123456")).flatMap(new Function>() { @Override public ObservableSource apply(UserParams userParams) throws Exception { return Observable.just(userParams.username + "登录成功").delay(2, TimeUnit.SECONDS); } }).subscribe(new Consumer () { @Override public void accept(String s) throws Exception { System.out.println(s); } });}public static class UserParams { public UserParams(String username, String password) { this.username = username; this.password = password; } public String username; public String password;}复制代码
输出
hensen登录成功复制代码
补充:
- concatMap与flatMap功能一样,唯一的区别就是concatMap是有序的,flatMap是乱序的
3、groupBy
groupBy操作符可以将发射出来的数据项进行分组,并将分组后的数据项保存在具有key-value映射的事件流中。groupBy具体的分组规则由groupBy操作符传递进来的函数参数Function
所决定的,它可以将key和value按照Function
的返回值进行分组,返回一个具有分组规则的事件流GroupedObservable
,注意这里分组出来的事件流是按照原始事件流的顺序输出的,我们可以通过sorted()
对数据项进行排序,然后输出有序的数据流。
public static void main(String[] args) { Observable.just("java", "c++", "c", "c#", "javaScript", "Android") .groupBy(new Function() { @Override public Character apply(String s) throws Exception { return s.charAt(0);//按首字母分组 } }) .subscribe(new Consumer >() { @Override public void accept(final GroupedObservable characterStringGroupedObservable) throws Exception { //排序后,直接订阅输出key和value characterStringGroupedObservable.sorted().subscribe(new Consumer () { @Override public void accept(String s) throws Exception { System.out.println("onNext= key:" + characterStringGroupedObservable.getKey() + " value:" + s); } }); } });}复制代码
输出
onNext= key:A value:AndroidonNext= key:c value:conNext= key:c value:c#onNext= key:c value:c++onNext= key:j value:javaonNext= key:j value:javaScript复制代码
4、scan
scan操作符会对发射的数据和上一轮发射的数据进行函数处理,并返回的数据供下一轮使用,持续这个过程来产生剩余的数据流。其应用场景有简单的累加计算,判断所有数据的最小值等
public static void main(String[] args) { Observable.just(8, 2, 13, 1, 15).scan(new BiFunction() { @Override public Integer apply(Integer integer, Integer integer2) throws Exception { return integer < integer2 ? integer : integer2; } }) .subscribe(new Consumer () { @Override public void accept(Integer item) throws Exception { System.out.println("onNext=" + item); } });}复制代码
输出
onNext=8onNext=2onNext=2onNext=1onNext=1复制代码
5、buffer
buffer操作符可以将发射出来的数据流,在给定的缓存池中进行缓存,当缓存池中的数据项溢满时,则将缓存池的数据项进行输出,重复上述过程,直到将发射出来的数据全部发射出去。如果发射出来的数据不够缓存池的大小,则按照当前发射出来的数量进行输出。如果对buffer操作符设置了skip
参数,则buffer每次缓存池溢满时,会跳过指定的skip
数据项,然后再进行缓存和输出。
Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9) .buffer(5).subscribe(new Consumer
>() { @Override public void accept(List integers) throws Exception { System.out.println("onNext=" + integers.toString()); }});复制代码
输出
onNext=[1, 2, 3, 4, 5]onNext=[6, 7, 8, 9]复制代码
6、window
window操作符和buffer操作符在功能上实现的效果是一样的,但window操作符最大区别在于同样是缓存一定数量的数据项,window操作符最终发射出来的是新的事件流integerObservable
,而buffer操作符发射出来的是新的数据流,也就是说,window操作符发射出来新的事件流中的数据项,还可以经过Rxjava其他操作符进行处理。
public static void main(String[] args) { Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9) .window(2, 1).subscribe(new Consumer>() { @Override public void accept(Observable integerObservable) throws Exception { integerObservable.subscribe(new Consumer () { @Override public void accept(Integer integer) throws Exception { System.out.println("onNext=" + integer); } }); } });}复制代码
输出
onNext=1onNext=2onNext=2onNext=3onNext=3onNext=4onNext=4onNext=5onNext=5onNext=6onNext=6onNext=7onNext=7onNext=8onNext=8onNext=9onNext=9复制代码
7、小结
- map():对数据流的类型进行转换
- flatMap():对数据流的类型进行包装成另一个数据流
- groupby():对所有的数据流进行分组
- scan():对上一轮处理过后的数据流进行函数处理
- buffer():缓存发射的数据流到一定数量,随后发射出数据流集合
- window():缓存发射的数据流到一定数量,随后发射出新的事件流
3.3 Filtering Observables (过滤操作符)
1、debounce
debounce操作符会去过滤掉发射速率过快的数据项,下面的例子onNext
事件可以想象成按钮的点击事件,如果在2秒种内频繁的点击,则其点击事件会被忽略,当i为3的除数的时候,发射的事件的时间会超过规定忽略事件的时间,那么则允许触发点击事件。这就有点像我们频繁点击按钮,但始终只会触发一次点击事件,这样就不会导致重复去响应点击事件
public static void main(String[] args) { Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter emitter) throws Exception { for (int i = 0; i < 100; i++) { if (i % 3 == 0) { Thread.sleep(3000); } else { Thread.sleep(1000); } emitter.onNext(i); } } }).debounce(2, TimeUnit.SECONDS) .subscribe(new Consumer () { @Override public void accept(Integer integer) throws Exception { System.out.println("onNext=" + integer); } });}复制代码
输出
onNext=2onNext=5onNext=8onNext=11onNext=14......复制代码
2、distinct
distinct操作符会过滤重复发送的数据项
public static void main(String[] args) { Observable.just(1, 2, 3, 4, 1, 2, 3).distinct() .subscribe(new Consumer() { @Override public void accept(Integer integer) throws Exception { System.out.println("onNext=" + integer); } });}复制代码
输出
onNext=1onNext=2onNext=3onNext=4复制代码
3、elementAt
elementAt操作符只取指定的角标的事件
public static void main(String[] args) { Observable.just(1, 2, 3, 4, 1, 2, 3).elementAt(0) .subscribe(new Consumer() { @Override public void accept(Integer integer) throws Exception { System.out.println("onNext=" + integer); } });}复制代码
输出
onNext=1复制代码
4、filter
filter操作符可以过滤指定函数的数据项
public static void main(String[] args) { Observable.just(1, 2, 3, 4, 1, 2, 3) .filter(new Predicate() { @Override public boolean test(Integer integer) throws Exception { return integer > 2; } }) .subscribe(new Consumer () { @Override public void accept(Integer integer) throws Exception { System.out.println("onNext=" + integer); } });}复制代码
输出
onNext=3onNext=4onNext=3复制代码
5、first
first操作符只发射第一项数据项
public static void main(String[] args) { Observable.just(1, 2, 3, 4, 1, 2, 3) .first(7) .subscribe(new Consumer() { @Override public void accept(Integer integer) throws Exception { System.out.println("onNext=" + integer); } });}复制代码
输出
onNext=1复制代码
6、ignoreElements
ignoreElements操作符不发射任何数据,只发射事件流的终止通知
public static void main(String[] args) { Observable.just(1, 2, 3, 4, 1, 2, 3) .ignoreElements() .subscribe(new Action() { @Override public void run() throws Exception { System.out.println("onComplete"); } });}复制代码
输出
onComplete复制代码
7、last
last操作符只发射最后一项数据
public static void main(String[] args) { Observable.just(1, 2, 3, 4, 1, 2, 3) .last(7) .subscribe(new Consumer() { @Override public void accept(Integer integer) throws Exception { System.out.println("onNext=" + integer); } });}复制代码
输出
onNext=3复制代码
8、sample
sample操作符会在指定的事件内从数据项中采集所需要的数据,由于这段代码的的延时操作都是非阻塞型的,所以在Java上运行会导致JVM的立马停止,只能把这段代码放在Android来运行
public void sample() { Observable.interval(1, TimeUnit.SECONDS) .sample(2, TimeUnit.SECONDS) .subscribe(new Consumer() { @Override public void accept(Long aLong) throws Exception { System.out.println("onNext=" + aLong); } });}复制代码
输出
onNext=2onNext=4onNext=6onNext=8复制代码
9、skip
skip操作符可以忽略事件流发射的前N项数据项,只保留之后的数据
public static void main(String[] args) { Observable.just(1, 2, 3, 4, 5, 6, 7, 8) .skip(3) .subscribe(new Consumer() { @Override public void accept(Integer i) throws Exception { System.out.println("onNext=" + i); } });}复制代码
输出
onNext=4onNext=5onNext=6onNext=7onNext=8复制代码
10、skipLast
skipLast操作符可以抑制事件流发射的后N项数据
public static void main(String[] args) { Observable.just(1, 2, 3, 4, 5, 6, 7, 8) .skipLast(3) .subscribe(new Consumer() { @Override public void accept(Integer i) throws Exception { System.out.println("onNext=" + i); } });}复制代码
输出
onNext=1onNext=2onNext=3onNext=4onNext=5复制代码
11、take
take操作符可以在事件流中只发射前面的N项数据
public static void main(String[] args) { Observable.just(1, 2, 3, 4, 5, 6, 7, 8) .take(3) .subscribe(new Consumer() { @Override public void accept(Integer i) throws Exception { System.out.println("onNext=" + i); } });}复制代码
输出
onNext=1onNext=2onNext=3复制代码
12、takeLast
takeLast操作符事件流只发射数据流的后N项数据项,忽略前面的数据项
public static void main(String[] args) { Observable.just(1, 2, 3, 4, 5, 6, 7, 8) .takeLast(3) .subscribe(new Consumer() { @Override public void accept(Integer i) throws Exception { System.out.println("onNext=" + i); } });}复制代码
输出
onNext=6onNext=7onNext=8复制代码
还有一个操作符叫takeLastBuffer,它和takeLast类似,,唯一的不同是它把所有的数据项收集到一个List再发射,而不是依次发射一个
13、小结
- debounce():事件流只发射规定范围时间内的数据项
- distinct():事件流只发射不重复的数据项
- elementAt():事件流只发射第N个数据项
- filter():事件流只发射符合规定函数的数据项
- first():事件流只发射第一个数据项
- ignoreElements():忽略事件流的发射,只发射事件流的终止事件
- last():事件流只发射最后一项数据项
- sample():事件流对指定的时间间隔进行数据项的采样
- skip():事件流忽略前N个数据项
- skipLast():事件流忽略后N个数据项
- take():事件流只发射前N个数据项
- takeLast():事件流只发射后N个数据项
3.4 Combining Observables (组合操作符)
1、merge/concat
merge操作符可以合并两个事件流,如果在merge操作符上增加延时发送的操作,那么就会导致其发射的数据项是无序的,会跟着发射的时间点进行合并。虽然是将两个事件流合并成一个事件流进行发射,但在最终的一个事件流中,发射出来的却是两次数据流。由于concat操作符和merge操作符的效果是一样的,这里只举一例
merge和concat的区别
- merge():合并后发射的数据项是无序的
- concat():合并后发射的数据项是有序的
public static void main(String[] args) { Observablejust1 = Observable.just("A", "B", "C", "D", "E"); Observable just2 = Observable.just("1", "2", "3", "4", "5"); Observable.merge(just1, just2).subscribe(new Consumer () { @Override public void accept(Serializable serializable) throws Exception { System.out.println("onNext=" + serializable.toString()); } });}复制代码
输出
onNext=AonNext=BonNext=ConNext=DonNext=EonNext=1onNext=2onNext=3onNext=4onNext=5复制代码
2、zip
zip操作符是将两个数据流进行指定的函数规则合并
public static void main(String[] args) { Observablejust1 = Observable.just("A", "B", "C", "D", "E"); Observable just2 = Observable.just("1", "2", "3", "4", "5"); Observable.zip(just1, just2, new BiFunction () { @Override public String apply(String s, String s2) throws Exception { return s + s2; } }).subscribe(new Consumer () { @Override public void accept(String s) throws Exception { System.out.println("onNext=" + s); } });}复制代码
输出
onNext=A1onNext=B2onNext=C3onNext=D4onNext=E5复制代码
3、startWith
startWith操作符是将另一个数据流合并到原数据流的开头
public static void main(String[] args) { Observablejust1 = Observable.just("A", "B", "C", "D", "E"); Observable just2 = Observable.just("1", "2", "3", "4", "5"); just1.startWith(just2).subscribe(new Consumer () { @Override public void accept(String s) throws Exception { System.out.println("onNext=" + s); } });}复制代码
输出
onNext=1onNext=2onNext=3onNext=4onNext=5onNext=AonNext=BonNext=ConNext=DonNext=E复制代码
4、join
join操作符是有时间期限的合并操作符,由于这段代码的的延时操作都是非阻塞型的,所以在Java上运行会导致JVM的立马停止,只能把这段代码放在Android来运行
public void join() { Observablejust1 = Observable.just("A", "B", "C", "D", "E"); Observable just2 = Observable.interval(1, TimeUnit.SECONDS); just1.join(just2, new Function >() { @Override public ObservableSource apply(String s) throws Exception { return Observable.timer(3, TimeUnit.SECONDS); } }, new Function >() { @Override public ObservableSource apply(Long l) throws Exception { return Observable.timer(8, TimeUnit.SECONDS); } }, new BiFunction () { @Override public String apply(String s, Long l) throws Exception { return s + l; } }).subscribe(new Consumer () { @Override public void accept(String s) throws Exception { System.out.println("onNext=" + s); } });}复制代码
join操作符有三个函数需要设置
- 第一个函数:规定just2的过期期限
- 第二个函数:规定just1的过期期限
- 第三个函数:规定just1和just2的合并规则
由于just2的期限只有3秒的时间,而just2延时1秒发送一次,所以just2只发射了2次,其输出的结果就只能和just2输出的两次进行合并,其输出格式有点类似我们的排列组合
onNext=A0onNext=B0onNext=C0onNext=D0onNext=E0onNext=A1onNext=B1onNext=C1onNext=D1onNext=E1复制代码
5、combineLatest
conbineLatest操作符会寻找其他事件流最近发射的数据流进行合并,由于这段代码的的延时操作都是非阻塞型的,所以在Java上运行会导致JVM的立马停止,只能把这段代码放在Android来运行
public static String[] str = { "A", "B", "C", "D", "E"};public void combineLatest() { Observablejust1 = Observable.interval(1, TimeUnit.SECONDS).map(new Function () { @Override public String apply(Long aLong) throws Exception { return str[(int) (aLong % 5)]; } }); Observable just2 = Observable.interval(1, TimeUnit.SECONDS); Observable.combineLatest(just1, just2, new BiFunction () { @Override public String apply(String s, Long l) throws Exception { return s + l; } }).subscribe(new Consumer () { @Override public void accept(String s) throws Exception { System.out.println("onNext=" + s); } });}复制代码
输出
onNext=A0onNext=B0onNext=B1onNext=C1onNext=C2onNext=D2onNext=D3onNext=E3onNext=E4onNext=A4onNext=A5复制代码
6、小结
- merge()/concat():无序/有序的合并两个数据流
- zip():两个数据流的数据项合并成一个数据流一同发出
- startWith():将待合并的数据流放在自身前面一同发出
- join():将数据流进行排列组合发出,不过数据流都是有时间期限的
- combineLatest():合并最近发射出的数据项成数据流一同发出
3.5 Error Handling Operators(错误处理操作符)
1、onErrorReturn
onErrorReturn操作符表示当错误发生时,它会忽略onError的回调且会发射一个新的数据项并回调onCompleted()
public static void main(String[] args) { Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(@NonNull ObservableEmitter e) throws Exception { for (int i = 1; i < 5; i++) { if(i == 4){ e.onError(new Exception("onError crash")); } e.onNext(i); } } }) .onErrorReturn(new Function () { @Override public Integer apply(Throwable throwable) throws Exception { return -1; } }) .subscribe(new Consumer () { @Override public void accept(Integer integer) throws Exception { System.out.println("onNext=" + integer); } }, new Consumer () { @Override public void accept(Throwable throwable) throws Exception { System.out.println("onError"); } }, new Action() { @Override public void run() throws Exception { System.out.println("onComplete"); } });}复制代码
输出
onNext=1onNext=2onNext=3onNext=-1onComplete复制代码
2、onErrorResumeNext
onErrorResumeNext操作符表示当错误发生时,它会忽略onError的回调且会发射一个新的事件流并回调onCompleted()
public static void main(String[] args) { Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(@NonNull ObservableEmitter e) throws Exception { for (int i = 1; i < 5; i++) { if(i == 4){ e.onError(new Exception("onError crash")); } e.onNext(i); } } }) .onErrorResumeNext(new Function >() { @Override public ObservableSource apply(Throwable throwable) throws Exception { return Observable.just(-1); } }) .subscribe(new Consumer () { @Override public void accept(Integer integer) throws Exception { System.out.println("onNext=" + integer); } }, new Consumer () { @Override public void accept(Throwable throwable) throws Exception { System.out.println("onError"); } }, new Action() { @Override public void run() throws Exception { System.out.println("onComplete"); } });}复制代码
输出
onNext=1onNext=2onNext=3onNext=-1onComplete复制代码
3、onExceptionResumeNext
onExceptionResumeNext操作符表示当错误发生时,如果onError收到的Throwable不是一个Exception,它会回调onError方法,且不会回调备用的事件流,如果onError收到的Throwable是一个Exception,它会回调备用的事件流进行数据的发射
public static void main(String[] args) { Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(@NonNull ObservableEmitter e) throws Exception { for (int i = 1; i < 5; i++) { if(i == 4){ e.onError(new Exception("onException crash")); //e.onError(new Error("onError crash")); } e.onNext(i); } } }) .onExceptionResumeNext(new ObservableSource () { @Override public void subscribe(Observer observer) { //备用事件流 observer.onNext(8); } }) .subscribe(new Consumer () { @Override public void accept(Integer integer) throws Exception { System.out.println("onNext=" + integer); } }, new Consumer () { @Override public void accept(Throwable throwable) throws Exception { System.out.println("onError"); } }, new Action() { @Override public void run() throws Exception { System.out.println("onComplete"); } });}复制代码
输出
onNext=1onNext=2onNext=3onNext=8复制代码
4、retry
retry操作符表示当错误发生时,发射器会重新发射
public static void main(String[] args) { Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(@NonNull ObservableEmitter e) throws Exception { for (int i = 1; i < 5; i++) { if (i == 4) { e.onError(new Exception("onError crash")); } e.onNext(i); } } }) .retry(1) .onErrorReturn(new Function () { @Override public Integer apply(Throwable throwable) throws Exception { return -1; } }) .subscribe(new Consumer () { @Override public void accept(Integer integer) throws Exception { System.out.println("onNext=" + integer); } }, new Consumer () { @Override public void accept(Throwable throwable) throws Exception { System.out.println("onError"); } }, new Action() { @Override public void run() throws Exception { System.out.println("onComplete"); } });}复制代码
输出
onNext=1onNext=2onNext=3onNext=1onNext=2onNext=3onNext=-1onComplete复制代码
- retry():表示重试无限次
- retry(long times):表示重试指定次数
- retry(Func predicate):可以根据函数参数中的Throwable类型和重试次数决定本次需不需要重试
5、retryWhen
retryWhen操作符和retry操作符相似,区别在于retryWhen将错误Throwable传递给了函数进行处理并产生新的事件流进行处理,由于这段代码的的延时操作都是非阻塞型的,所以在Java上运行会导致JVM的立马停止,只能把这段代码放在Android来运行
private static int retryCount = 0;private static int maxRetries = 2;public void retryWhen(){ Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(@NonNull ObservableEmitter e) throws Exception { for (int i = 1; i < 5; i++) { if (i == 4) { e.onError(new Exception("onError crash")); } e.onNext(i); } } }) .retryWhen(new Function , ObservableSource >() { @Override public ObservableSource apply(Observable throwableObservable) throws Exception { return throwableObservable.flatMap(new Function >() { @Override public ObservableSource apply(Throwable throwable) throws Exception { if (++retryCount <= maxRetries) { // When this Observable calls onNext, the original Observable will be retried (i.e. re-subscribed). System.out.println("get error, it will try after " + 1 + " seconds, retry count " + retryCount); return Observable.timer(1, TimeUnit.SECONDS); } return Observable.error(throwable); } }); } }) .onErrorReturn(new Function () { @Override public Integer apply(Throwable throwable) throws Exception { return -1; } }) .subscribe(new Consumer () { @Override public void accept(Integer integer) throws Exception { System.out.println("onNext=" + integer); } }, new Consumer () { @Override public void accept(Throwable throwable) throws Exception { System.out.println("onError"); } }, new Action() { @Override public void run() throws Exception { System.out.println("onComplete"); } });}复制代码
输出
onNext=1onNext=2onNext=3get error, it will try after 1 seconds, retry count 1onNext=1onNext=2onNext=3get error, it will try after 1 seconds, retry count 2onNext=1onNext=2onNext=3onNext=-1onComplete复制代码
6、小结
- onErrorReturn():当错误发生时,它会忽略onError的回调且会发射一个新的数据项并回调onCompleted()
- onErrorResumeNext():当错误发生时,它会忽略onError的回调且会发射一个新的事件流并回调onCompleted()
- onExceptionResumeNext():当错误发生时,如果onError收到的Throwable不是一个Exception,它会回调onError方法,且不会回调备用的事件流,如果onError收到的Throwable是一个Exception,它会回调备用的事件流进行数据的发射
- retry():当错误发生时,发射器会重新发射
- retryWhen():当错误发生时,根据Tharowble类型决定发射器是否重新发射
3.6 Observable Utility Operators(辅助性操作符)
1、delay
delay操作符可以延时某次事件发送的数据流,由于这段代码的的延时操作都是非阻塞型的,所以在Java上运行会导致JVM的立马停止,只能把这段代码放在Android来运行
public void deley() { Observable.just(1, 2, 3, 4, 5).delay(2, TimeUnit.SECONDS) .subscribe(new Consumer() { @Override public void accept(Integer integer) throws Exception { System.out.println("onNext=" + integer); } });}复制代码
输出
onNext=1onNext=2onNext=3onNext=4onNext=5复制代码
delay和delaySubscription的效果是一样的,只不过delay是对数据流的延时,而delaySubscription是对事件流的延时
2、do
do操作符可以监听整个事件流的生命周期,do操作符分为多个类型,而且每个类型的作用都不同
- doOnNext():接收每次发送的数据项
- doOnEach():接收每次发送的数据项
- doOnSubscribe():当事件流被订阅时被调用
- doOnDispose():当事件流被释放时被调用
- doOnComplete():当事件流被正常终止时被调用
- doOnError():当事件流被异常终止时被调用
- doOnTerminate():当事件流被终止之前被调用,无论正常终止还是异常终止都会调用
- doFinally():当事件流被终止之后被调用,无论正常终止还是异常终止都会调用
public static void main(String[] args) { Observable.just(1, 2, 3) .doOnNext(new Consumer() { @Override public void accept(Integer integer) throws Exception { System.out.println("doOnNext"); } }) .doOnEach(new Consumer >() { @Override public void accept(Notification integerNotification) throws Exception { System.out.println("doOnEach"); } }) .doOnSubscribe(new Consumer () { @Override public void accept(Disposable disposable) throws Exception { System.out.println("doOnSubscribe"); } }) .doOnDispose(new Action() { @Override public void run() throws Exception { System.out.println("doOnDispose"); } }) .doOnTerminate(new Action() { @Override public void run() throws Exception { System.out.println("doOnTerminate"); } }) .doOnError(new Consumer () { @Override public void accept(Throwable throwable) throws Exception { System.out.println("doOnError"); } }) .doOnComplete(new Action() { @Override public void run() throws Exception { System.out.println("doOnComplete"); } }) .doFinally(new Action() { @Override public void run() throws Exception { System.out.println("doFinally"); } }) .subscribe(new Consumer () { @Override public void accept(Integer integer) throws Exception { System.out.println("onNext=" + integer); } });}复制代码
输出
doOnSubscribedoOnNextdoOnEachonNext=1doOnNextdoOnEachonNext=2doOnNextdoOnEachonNext=3doOnEachdoOnTerminatedoOnCompletedoFinally复制代码
3、materialize/dematerialize
materialize操作符将发射出的数据项转换成为一个Notification对象,而dematerialize操作符则是跟materialize操作符相反,这两个操作符有点类似我们Java对象的装箱和拆箱功能
public static void main(String[] args) { Observable.just(1, 2, 3, 4, 5).materialize() .subscribe(new Consumer>() { @Override public void accept(Notification integerNotification) throws Exception { System.out.println("onNext=" + integerNotification.getValue()); } }); Observable.just(1, 2, 3, 4, 5).materialize().dematerialize() .subscribe(new Consumer () { @Override public void accept(Object object) throws Exception { System.out.println("onNext=" + object.toString()); } });}复制代码
输出
onNext=1onNext=2onNext=3onNext=4onNext=5onNext=nullonNext=1onNext=2onNext=3onNext=4onNext=5复制代码
输出的时候,materialize会输出多个null,是因为null的事件为onCompleted事件,而dematerialize把onCompleted事件给去掉了,这个原因也可以从图片中看出来
4、serialize
serialize操作符可以将异步执行的事件流进行同步操作,直到事件流结束
public static void main(String[] args) { Observable.just(1, 2, 3, 4, 5).serialize() .subscribe(new Consumer() { @Override public void accept(Integer integer) throws Exception { System.out.println("onNext=" + integer); } });}复制代码
输出
onNext=1onNext=2onNext=3onNext=4onNext=5复制代码
5、timeInterval
timeInterval操作符可以将发射的数据项转换为带有时间间隔的数据项,由于这段代码的的延时操作都是非阻塞型的,所以在Java上运行会导致JVM的立马停止,只能把这段代码放在Android来运行
public void timeInterval(){ Observable.interval(2, TimeUnit.SECONDS).timeInterval(TimeUnit.SECONDS) .subscribe(new Consumer>() { @Override public void accept(Timed longTimed) throws Exception { System.out.println("onNext=" + longTimed.value() + " timeInterval=" + longTimed.time()); } });}复制代码
输出
onNext=0 timeInterval=2onNext=1 timeInterval=2onNext=2 timeInterval=2onNext=3 timeInterval=2onNext=4 timeInterval=2复制代码
6、timeout
timeout操作符表示当发射的数据项超过了规定的限制时间,则发射onError事件,这里直接让程序超过规定的限制时间,由于这段代码的的延时操作都是非阻塞型的,所以在Java上运行会导致JVM的立马停止,只能把这段代码放在Android来运行
public void timeOut(){ Observable.interval(2, TimeUnit.SECONDS).timeout(1, TimeUnit.SECONDS) .subscribe(new Consumer() { @Override public void accept(Long aLong) throws Exception { System.out.println("onNext=" + aLong); } }, new Consumer () { @Override public void accept(Throwable throwable) throws Exception { System.out.println("onError"); } });}复制代码
输出
onError复制代码
7、timestamp
timestamp操作符会给每个发射的数据项带上时间戳,由于这段代码的的延时操作都是非阻塞型的,所以在Java上运行会导致JVM的立马停止,只能把这段代码放在Android来运行
public void timeStamp() { Observable.interval(2, TimeUnit.SECONDS).timestamp(TimeUnit.MILLISECONDS) .subscribe(new Consumer>() { @Override public void accept(Timed longTimed) throws Exception { System.out.println("onNext=" + longTimed.value() + " timeInterval=" + longTimed.time()); } });}复制代码
输出
onNext=0 timeInterval=1525755132132onNext=1 timeInterval=1525755134168onNext=2 timeInterval=1525755136132onNext=3 timeInterval=1525755138132复制代码
8、using
using操作符可以让你的事件流存在一次性的数据项,即用完就将资源释放掉
using操作符接受三个参数:
- 一个用户创建一次性资源的工厂函数
- 一个用于创建一次性事件的工厂函数
- 一个用于释放资源的函数
public static class UserBean { String name; int age; public UserBean(String name, int age) { this.name = name; this.age = age; }}public static void main(String[] args) { Observable.using(new Callable() { @Override public UserBean call() throws Exception { //从网络中获取某个对象 return new UserBean("俊俊俊", 22); } }, new Function >() { @Override public ObservableSource apply(UserBean userBean) throws Exception { //拿出你想要的资源 return Observable.just(userBean.name); } }, new Consumer () { @Override public void accept(UserBean userBean) throws Exception { //释放对象 userBean = null; } }).subscribe(new Consumer () { @Override public void accept(Object o) throws Exception { System.out.println("onNext=" + o.toString()); } });}复制代码
输出
onNext=俊俊俊复制代码
9、to
to操作符可以将数据流中的数据项进行集合的转换,to操作符分为多个类型,而且每个类型的作用都不同
- toList():转换成List类型的集合
- toMap():转换成Map类型的集合
- toMultimap():转换成一对多(即<A类型,List<B类型>>)的Map类型的集合
- toSortedList():转换成具有排序的List类型的集合
public static void main(String[] args) { Observable.just(1, 2, 3, 4, 5).toList() .subscribe(new Consumer
>() { @Override public void accept(List integers) throws Exception { System.out.println("onNext=" + integers.toString()); } });}复制代码
输出
onNext=[1, 2, 3, 4, 5]复制代码
10、小结
- delay():延迟事件发射的数据项
- do():监听事件流的生命周期
- materialize()/dematerialize():对事件流进行装箱/拆箱
- serialize():同步事件流的发射
- timeInterval():对事件流增加时间间隔
- timeout():对事件流增加限定时间
- timestamp():对事件流增加时间戳
- using():对事件流增加一次性的资源
- to():对数据流中的数据项进行集合的转换
3.7 Conditional and Boolean Operators(条件和布尔操作符)
1、all
all操作符表示对所有数据项进行校验,如果所有都通过则返回true,否则返回false
public static void main(String[] args) { Observable.just(1, 2, 3, 4, 5) .all(new Predicate() { @Override public boolean test(Integer integer) throws Exception { return integer > 0; } }) .subscribe(new Consumer () { @Override public void accept(Boolean aBoolean) throws Exception { System.out.println("onNext=" + aBoolean); } });}复制代码
输出
onNext=true复制代码
2、contains
contains操作符表示事件流中发射的数据项当中是否包含有指定的数据项
public static void main(String[] args) { Observable.just(1, 2, 3, 4, 5) .contains(2) .subscribe(new Consumer() { @Override public void accept(Boolean aBoolean) throws Exception { System.out.println("onNext=" + aBoolean); } });}复制代码
输出
onNext=true复制代码
3、amb
amb操作符在多个事件流中只发射最先发出数据的事件流,由于这段代码的的延时操作都是非阻塞型的,所以在Java上运行会导致JVM的立马停止,只能把这段代码放在Android来运行
public void amb(){ List> list = new ArrayList<>(); list.add(Observable.just(1, 2, 3).delay(3, TimeUnit.SECONDS)); list.add(Observable.just(4, 5, 6).delay(2, TimeUnit.SECONDS)); list.add(Observable.just(7, 8, 9).delay(1, TimeUnit.SECONDS)); Observable.amb(list) .subscribe(new Consumer () { @Override public void accept(Integer integer) throws Exception { System.out.println("onNext=" + integer); } });}复制代码
输出
onNext=7onNext=8onNext=9复制代码
4、defaultIfEmpty
defaultIfEmpty操作符会在事件流没有发射任何数据时,发射一个指定的默认值
public static void main(String[] args) { Observable.empty() .defaultIfEmpty(-1) .subscribe(new Consumer() { @Override public void accept(Object o) throws Exception { System.out.println("onNext=" + o.toString()); } });}复制代码
输出
onNext=-1复制代码
5、sequenceEqual
sequenceEqual操作符可以判断两个数据流是否完全相等
public static void main(String[] args) { Observablejust1 = Observable.just(1, 2, 3); Observable just2 = Observable.just(1, 2, 3); Observable.sequenceEqual(just1, just2) .subscribe(new Consumer () { @Override public void accept(Boolean aBoolean) throws Exception { System.out.println("onNext=" + aBoolean); } });}复制代码
输出
onNext=true复制代码
6、skipUntil/skipWhile
skipUtils操作符是在两个事件流发射的时候,第一个事件流会等到第二个事件流开始发射的时候,第一个事件流才开始发射出数据项,它会忽略之前发射过的数据项,由于这段代码的的延时操作都是非阻塞型的,所以在Java上运行会导致JVM的立马停止,只能把这段代码放在Android来运行
public void skipUntil(){ Observablejust1 = Observable.interval(1, TimeUnit.SECONDS); Observable just2 = Observable.just(8).delay(3, TimeUnit.SECONDS); just1.skipUntil(just2) .subscribe(new Consumer () { @Override public void accept(Long aLong) throws Exception { System.out.println("onNext=" + aLong); } });}复制代码
输出
onNext=2onNext=3onNext=4onNext=5......复制代码
skipWhile操作符是在一个事件流中,从第一项数据项开始判断是否符合某个特定条件,如果判断值返回true,则不发射该数据项,继续从下一个数据项执行同样的判断,直到某个数据项的判断值返回false时,则终止判断,发射剩余的所有数据项。需要注意的是,这里只要一次判断为false则后面的所有数据项都不判断
public static void main(String[] args) { Observable.just(1, 2, 3, 4, 5) .skipWhile(new Predicate() { @Override public boolean test(Integer integer) throws Exception { return integer < 3; } }) .subscribe(new Consumer () { @Override public void accept(Integer integer) throws Exception { System.out.println("onNext=" + integer); } });}复制代码
输出
onNext=3onNext=4onNext=5复制代码
7、takeUntil/takeWhile
takeUntil操作符跟skipUntil类似,skip表示跳过的意思,而take表示取值的意思,takeUntil操作符是在两个事件流发射的时候,第一个事件流会等到第二个事件流开始发射的时候,第一个事件流停止发射数据项,它会忽略之后的数据项,由于这段代码的的延时操作都是非阻塞型的,所以在Java上运行会导致JVM的立马停止,只能把这段代码放在Android来运行
public void takeUntil(){ Observablejust1 = Observable.interval(1, TimeUnit.SECONDS); Observable just2 = Observable.just(8).delay(3, TimeUnit.SECONDS); just1.takeUntil(just2) .subscribe(new Consumer () { @Override public void accept(Long aLong) throws Exception { System.out.println("onNext=" + aLong); } });}复制代码
输出
onNext=0onNext=1复制代码
takeWhile操作符是在一个事件流中,从第一项数据项开始判断是否符合某个特定条件,如果判断值返回true,则发射该数据项,继续从下一个数据项执行同样的判断,直到某个数据项的判断值返回false时,则终止判断,且剩余的所有数据项不会发射。需要注意的是,这里只要一次判断为false则后面的所有数据项都不判断
public static void main(String[] args) { Observable.just(1, 2, 3, 4, 0) .takeWhile(new Predicate() { @Override public boolean test(Integer integer) throws Exception { return integer < 3; } }) .subscribe(new Consumer () { @Override public void accept(Integer integer) throws Exception { System.out.println("onNext=" + integer); } });}复制代码
输出
onNext=1onNext=2复制代码
8、小结
- all():对所有数据项进行校验
- contains():所有数据项是否包含指定数据项
- amb():多个事件流中,只发射最先发出的事件流
- defaultIfEmpty():如果数据流为空则发射默认数据项
- sequenceEqual():判断两个数据流是否完全相等
- skipUntil():当两个事件流发射时,第一个事件流的数据项会等到第二个事件流开始发射时才进行发射
- skipWhile():当发射的数据流达到某种条件时,才开始发射剩余所有数据项
- takeUntil():当两个事件流发射时,第一个事件流的数据项会等到第二个事件流开始发射时终止发射
- takeWhile():当发射的数据流达到某种条件时,才停止发射剩余所有数据项
3.8 Mathematical and Aggregate Operators(数学运算及聚合操作符)
数学运算操作符比较简单,对于数学运算操作符会放在小结中介绍,下面是对聚合操作符做介绍
1、reduce
reduce操作符跟scan操作符是一样的,会对发射的数据和上一轮发射的数据进行函数处理,并返回的数据供下一轮使用,持续这个过程来产生剩余的数据流。reduce与scan的唯一区别在于reduce只输出最后的结果,而scan会输出每一次的结果,这点从图片中也能看出来
public static void main(String[] args) { Observable.just(8, 2, 13, 1, 15).reduce(new BiFunction() { @Override public Integer apply(Integer integer, Integer integer2) throws Exception { return integer < integer2 ? integer : integer2; } }) .subscribe(new Consumer () { @Override public void accept(Integer item) throws Exception { System.out.println("onNext=" + item); } });}复制代码
输出
onNext=1复制代码
2、collect
collect操作符跟reduce操作符类似,只不过collect增加了一个可改变数据结构的函数供我们处理
public static void main(String[] args) { Observable.just(8, 2, 13, 1, 15).collect(new Callable() { @Override public String call() throws Exception { return "A"; } }, new BiConsumer () { @Override public void accept(String s, Integer integer) throws Exception { System.out.println("onNext=" + s + " " + integer); } }).subscribe(new BiConsumer () { @Override public void accept(String s, Throwable throwable) throws Exception { System.out.println("onNext2=" + s); } });}复制代码
输出
onNext=A 8onNext=A 2onNext=A 13onNext=A 1onNext=A 15onNext2=A复制代码
3、小结
数学运算操作符的使用需要在gradle中添加rxjava-math的依赖
implementation 'io.reactivex:rxjava-math:1.0.0'复制代码
- average():求所有数据项的平均值
- max/min():求所有数据项的最大或最小值
- sum():求所有数据项的总和
- reduce():对上一轮处理过后的数据流进行函数处理,只返回最后的结果
- collect():对上一轮处理过后的数据流进行函数处理,可改变原始的数据结构
3.9 Connectable Observable(连接操作符)
1、publish
publish操作符是将普通的事件流转化成可连接的事件流ConnectableObservable
,它与普通的事件流不一样,ConnectableObservable
在没有调用connect()进行连接的情况下,事件流是不会发射数据的
public static void main(String[] args) { ConnectableObservableconnectableObservable = Observable.just(1, 2, 3, 4, 5).publish(); connectableObservable.subscribe(new Consumer () { @Override public void accept(Integer integer) throws Exception { System.out.println("onNext=" + integer); } });}复制代码
输出
无复制代码
2、connect
connect操作符是将可连接的事件流进行连接并开始发射数据。这个方法需要注意的是,connect操作符必须在所有事件流被订阅后才开始发射数据。如果放在subscribe
之前的话,则订阅者是无法收到数据的。如果后面还有订阅者将订阅此次事件流,则会丢失已经调用了connect
后,发射出去的数据项
public static void main(String[] args) { ConnectableObservableconnectableObservable = Observable.just(1, 2, 3, 4, 5).publish(); connectableObservable.subscribe(new Consumer () { @Override public void accept(Integer integer) throws Exception { System.out.println("onNext=" + integer); } }); connectableObservable.connect();}复制代码
输出
onNext=1onNext=2onNext=3onNext=4onNext=5复制代码
3、refCount
refCount操作符可以将可连接的事件流转换成普通的事件流
public static void main(String[] args) { ConnectableObservableconnectableObservable = Observable.just(1, 2, 3, 4, 5).publish(); connectableObservable.refCount().subscribe(new Consumer () { @Override public void accept(Integer integer) throws Exception { System.out.println("onNext=" + integer); } });}复制代码
输出
onNext=1onNext=2onNext=3onNext=4onNext=5复制代码
4、replay
replay操作符将弥补connect
操作符的缺陷,由于connect会让后面进行订阅的订阅者丢失之前发射出去的数据项,所以使用replay操作符可以将发射出去的数据项进行缓存,这样使得后面的订阅者都可以获得完整的数据项。这里需要注意的是,replay操作符不能和publish操作符同时使用,否则将不会发射数据。例子中,读者可以将replay操作符换成publish操作符,这时候的输出就会丢失前2秒发射的数据项
public void replay(){ ConnectableObservableconnectableObservable = Observable.interval(1, TimeUnit.SECONDS).replay(); connectableObservable.connect(); connectableObservable.delaySubscription(3, TimeUnit.SECONDS) .subscribe(new Consumer () { @Override public void accept(Long aLong) throws Exception { System.out.println("onNext=" + aLong); } });}复制代码
输出
onNext=0onNext=1onNext=2onNext=3onNext=4onNext=5......复制代码
5、小结
- publish():将普通的事件流转换成可连接的事件流
- connect():将可连接的事件流进行连接并发射数据
- refCount():将可连接的事件流转换成普通的事件流
- replay():缓存可连接的事件流中的所有数据项