RxJava2 入门教程
背景
在一个舒适区呆久了,要么就废掉,要么就挣扎着跳出来。因为各种原因,过去的几个月并不是特别忙,趁机想研究点新技术,说是新技术其实也算不上,几年前的个人项目里就已经使用个RxJava、Retrofit和OKHttp来做网络请求架构,但是一直没有系统的去了解过什么是RxJava以及RxJava可以做什么事情,这次正好借机系统的学习一下RxJava,整理此文算是重新入门。本文不少地方参考了大神扔物线的给 Android 开发者的 RxJava 详解,只不过大神的文章是基于RxJava的,我则是基于RxJava2再重新梳理一遍,RxJava2相对于RxJava还是有不少变化的。
[DOC]
RxJava是什么
一千个人就有一千个哈姆雷特,关于RxJava的理解,可能每个人的理解多少有点差别,但是核心肯定是一样的。基于官方的解释,RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.,翻译过来就是“RxJava,Java虚拟机的响应式扩展,一个运行在java虚拟机上的库,通过可观察的队列来实现异步的、基于事件化的程序”。我的英文有点烂,凑合着翻译成这样了。几个核心关键词很重要,响应式、可观察的队列、异步、事件化。
响应式
什么是响应式,或者说什么是响应式编程,第一次接触到这个概念的时候很糊涂,难道还有非响应式么?程序不是都会响应么?不响应了不就是ANR了吗?要理解响应式,必须得配合另外一个关键词一起理解,数据流。最开始在做地图上标记maker的功能时,我自己想到过数据流这个概念,那时候还没接触到响应式编程,当时我把查询数据的代码封装成“事件数据源”,源源不断的释放出新的数据出来,经过各种过滤处理,比如说数据是否有效?数据量是否超载?是否有重复数据等等一系列的过滤之后,交给“事件消耗方”,即绘制在地图上。可以把数据流理解为一条河流,经过不断的过滤,操作甚至分叉之后,被消耗掉或者跟更多新的数据汇聚成新的一条河流。
而这里说到的数据其实就是一个个事件,比如点击一下屏幕、收到一个推送、我们执行一个SQL查询等,都可以抽象成一系列事件,而CPU处理器就是在不停的通过调度处理各种各样的事件运算。这些事件组成了数据流,我们所说的响应式编程,就是说对这些事件流做出反应。点击屏幕————>点亮手机,收到推送————>显示notification,SQL查询————>处理查询结果————>显示到UI上面。诸如此类,基于数据流和变化的一种编程范式就叫做响应式编程,把一切行为归纳为数据流,通过各种函数进行合并、过滤、转变等等。
流是响应式的核心,下面就是 “在按钮上点击” 事件流的简单示意图。
关于更多的响应式编程可以查看:The introduction to Reactive Programming you’ve been missing · GitHub
异步
这是RxJava主要的核心功能,它能够轻松的实现异步操作,比如SQL查询,以往的经验是开一个新的线程执行查询代码,结果返回后通知主线程更新UI。例如如下的实现方式:
new Thread() {
@Override
public void run() {
super.run();
//execute sql query
getActivity().runOnUiThread(new Runnable() {
@Override
public void run() {
//update UI
}
});
}
}.start();
从代码上看,查询和更新UI两个行为通过嵌套的形式存在,倒也清晰。我们再换成RxJava实现看看
Observable.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(ObservableEmitter<Object> emitter) {
//execute sql query
Object dbResult = new Object();//模拟查询结果
emitter.onNext(dbResult);
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Object>() {
@Override
public void accept(Object dbResult) {
//update UI
}
});
单从代码量上来看,使用RxJava实现的代码似乎是多于传统写法的,如果你习惯Lambda表达式的话,以上代码可以再精简一番,如下
Observable.create(emitter -> {
//execute sql query
Object dbResult = new Object();//模拟查询结果
emitter.onNext(dbResult);
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(dbResult -> {
//update UI
});
仔细看代码的话,会发现使用RxJava实现的代码都是链式调用,从上往下依次执行。而传统写法的代码,则是通过嵌套方式实现。一两层嵌套可能没什么影响,但是如果嵌套比较多的时候,比如请求网络数据之后需要查询数据库对比数据,经过复杂运算之后,再更新到UI上面。这时候RxJava的优势就体现出来了,把整个过程组成一个链,逐次执行,业务逻辑清晰,代码可读性也比较高。最讨厌那些各种代码缩进了,嵌套一多,缩进就会多的很恶心。
回到本小节主题上,关于异步,RxJava如何实现呢?上面代码里其实已经有所体现,就是观察者模式,RxJava把观察者模式体现的淋漓尽致。提到观察者模式,很多Android工程师第一想到的就是事件监听,通过对一个View添加一个监听器,即可实现一个订阅者关系。一旦View被点击了,OnClickListener即可获得到回调。而RxJava则是进行了更深一步的抽象,把可以触发点击事件的View抽象成为Observable(可观查的,即被观察者),而OnClickListener则被抽象成为Observer(观察者)。同时还有另外一步抽象,就是把setOnClickListener这个过程抽象成subscribe(订阅)。
简单画了个对比图,如下:
可观察的队列 和事件化
经过上面两个核心点的分析,可观察的队列理解起来并不难,在RxJava中每一步对事件流的加工组合成一个队列,依次执行,并且这个队列的每一步都是可以观察的,通过观察可以对执行结果进行处理。
而对于事件化,everything can be a stream,几乎一切事情都可以被事件流化,这就是响应式编程中的一句名言。
如何使用RxJava2
对于学习一门新的技术来说,我认为首先要先用最快的速度了解这门技术是什么,能干什么?比如通过维基百科或者一些使用母语写的文章熟悉基本概念,然后通过官方文档熟悉具体细节。经过上面一些篇幅介绍了什么是RxJava,接下来我将通过翻译RxJava官方的教程来梳理如何使用RxJava2。
添加依赖
只需要在你项目里添加如下Gradle编译依赖,即可使用RxJava2:
`compile "io.reactivex.rxjava2:rxjava:2.x.y"`
写此文时最新版本是2.1.16,所以需要替换上面脚本里对应的x、y。
Hello World
通过以下代码即可实现最简单的打印Hello World功能:
package rxjava.examples;
import io.reactivex.*;
public class HelloWorld {
public static void main(String[] args) {
Flowable.just("Hello world").subscribe(System.out::println);
}
}
如果你的平台不支持Java 8 lambda表达式,你需要手动实例化一个内部类Consumer作为消费者。
import io.reactivex.functions.Consumer;
Flowable.just("Hello world")
.subscribe(new Consumer<String>() {
@Override public void accept(String s) {
System.out.println(s);
}
});
基本类型
RxJava2提供了一些基本的操作符类型:
- io.reactivex.Flowable: 事件流,支持响应式事件流和背压功能,比较复杂的一个操作符。
- io.reactivex.Observable: 事件流,不支持背压。基本上可以满足大部分需求。
- io.reactivex.Single: 仅支持一个正常事件或者异常事件。成功or失败,二选一。
- io.reactivex.Completable: 仅仅关心任务完成或者异常,不关心数据。完成or失败,二选一。
- io.reactivex.Maybe: 支持0或者1个数据,要么成功,要么失败。成功or完成or失败,三选一。
一些术语
Upstream, downstream 上游流,下游流
在RxJava中,数据流由三部分组成,数据源 + 一个或多个中间处理步骤 + 数据消费者或者再加工者。
source.operator1().operator2().operator3().subscribe(consumer);
source.flatMap(value -> source.operator1().operator2().operator3());
那么,如果我们处在operator2这里,向左看的话,左边的被称作上游流,右边的订阅者/消费者就被称作为下游流。当每一步骤的代码单独写到一行时,看起来更加直观。
source
.operator1()
.operator2()
.operator3()
.subscribe(consumer)
Objects in motion 运动中的对象
在RxJava开发文档中, emission, emits, item, event, signal, data and message 被认为是同义词,他们用来表示沿着数据流传播的对象。
Backpressure 背压
当数据流通过异步方式运行,每一步可能以不同的速度执行不同的任务。为了防止某一步骤超过负载而导致出现各种问题,背压功能出现了。通过背压功能可以做好流程控制,确定当前步骤可以处理多少任务。这样就可以在无法得知上游发送多少数据的情况下限制内存使用。
在RxJava中只有Flowable类支持使用背压,而Observable被设计为无背压操作符,比如小的任务队列、GUI交互等等。至于其他类型,Single, Maybe 和 Completable不支持背压,当然也不应该支持。他们都仅仅操作一个事件而已,有足够的空间。
Assembly time 组装时间
事件流在准备期间,通过一系列的中间操作对数据加工,被称为组装时间:
Flowable<Integer> flow = Flowable.range(1, 5)
.map(v -> v* v)
.filter(v -> v % 3 == 0)
;
这个时候事件并没有流动起来,也没有其他事情发生。
Subscription time 订阅时间
这是一个临时状态,发生在 subscribe()被调用的时候。
flow.subscribe(System.out::println)
这时候订阅的接口会被触发(参考doOnSubscribe),同时一些代码块会被执行或者一些事件会被发出。
Runtime 运行时
当事件正在被发出,或者错误信号、完成信号被发出的时候,叫做运行时状态。
Observable.create(emitter -> {
while (!emitter.isDisposed()) {
long time = System.currentTimeMillis();
emitter.onNext(time);
if (time % 2 != 0) {
emitter.onError(new IllegalStateException("Odd millisecond!"));
break;
}
}
})
.subscribe(System.out::println, Throwable::printStackTrace);
在上面代码中,当while开始执行时,就是运行时状态。
后台线程
使用RxJava比较常见的一个场景是:在后台线程进行运算、执行网络请求,然后把结果数据或者错误信息展示在UI线程上。
import io.reactivex.schedulers.Schedulers;
Flowable.fromCallable(() -> {
Thread.sleep(1000); // 模拟复杂的运算
return "Done";
})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.single())
.subscribe(System.out::println, Throwable::printStackTrace);
Thread.sleep(2000); // <--- 等待事件流完成
这种类型的链接方法称为流畅的API,类似于构建器模式。但是,RxJava的响应式类型是不可变的; 每个方法调用都会返回一个具有添加行为的新Flowable。仅仅为了演示,上面代码可以写成如下这样:
Flowable<String> source = Flowable.fromCallable(() -> {
Thread.sleep(1000);
return "Done";
});
Flowable<String> runBackground = source.subscribeOn(Schedulers.io());
Flowable<String> showForeground = runBackground.observeOn(Schedulers.single());
showForeground.subscribe(System.out::println, Throwable::printStackTrace);
Thread.sleep(2000);
通常,你可以把一些运算或者IO阻塞的行为通过subscribeOn运行在其他线程上面,一旦数据准备好了,你可以再通过observeOn来确保他们运行在前台线程或者GUI线程上。
线程调度器
RxJava操作符里不能直接使用Thread或者ExecutorService,而是通过使用Scheduler来实现并发和线程切换。RxJava2具有以下几个标准的线程调度器,可以通过Schedulers工具类来使用。
- Schedulers.computation(): 在后台对固定数量的专用线程运行计算密集型工作。通常作为异步操作符的默认调度器。
- Schedulers.io(): IO阻塞操作,有不定数量的线程。
- Schedulers.single(): 以顺序和FIFO方式在单线程上运行工作
- Schedulers.trampoline(): 在一个参与线程中以顺序和FIFO方式运行工作,通常用于测试目的。