RxJava2 入门教程

背景

在一个舒适区呆久了,要么就废掉,要么就挣扎着跳出来。因为各种原因,过去的几个月并不是特别忙,趁机想研究点新技术,说是新技术其实也算不上,几年前的个人项目里就已经使用个RxJava、Retrofit和OKHttp来做网络请求架构,但是一直没有系统的去了解过什么是RxJava以及RxJava可以做什么事情,这次正好借机系统的学习一下RxJava,整理此文算是重新入门。本文不少地方参考了大神扔物线的给 Android 开发者的 RxJava 详解,只不过大神的文章是基于RxJava的,我则是基于RxJava2再重新梳理一遍,RxJava2相对于RxJava还是有不少变化的。

[DOC]

RxJava是什么

一千个人就有一千个哈姆雷特,关于RxJava的理解,可能每个人的理解多少有点差别,但是核心肯定是一样的。基于官方的解释,RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.,翻译过来就是“RxJava,Java虚拟机的响应式扩展,一个运行在java虚拟机上的库,通过可观察的队列来实现异步的、基于事件化的程序”。我的英文有点烂,凑合着翻译成这样了。几个核心关键词很重要,响应式可观察的队列异步事件化

响应式

什么是响应式,或者说什么是响应式编程,第一次接触到这个概念的时候很糊涂,难道还有非响应式么?程序不是都会响应么?不响应了不就是ANR了吗?要理解响应式,必须得配合另外一个关键词一起理解,数据流。最开始在做地图上标记maker的功能时,我自己想到过数据流这个概念,那时候还没接触到响应式编程,当时我把查询数据的代码封装成“事件数据源”,源源不断的释放出新的数据出来,经过各种过滤处理,比如说数据是否有效?数据量是否超载?是否有重复数据等等一系列的过滤之后,交给“事件消耗方”,即绘制在地图上。可以把数据流理解为一条河流,经过不断的过滤,操作甚至分叉之后,被消耗掉或者跟更多新的数据汇聚成新的一条河流。

而这里说到的数据其实就是一个个事件,比如点击一下屏幕、收到一个推送、我们执行一个SQL查询等,都可以抽象成一系列事件,而CPU处理器就是在不停的通过调度处理各种各样的事件运算。这些事件组成了数据流,我们所说的响应式编程,就是说对这些事件流做出反应。点击屏幕————>点亮手机,收到推送————>显示notification,SQL查询————>处理查询结果————>显示到UI上面。诸如此类,基于数据流和变化的一种编程范式就叫做响应式编程,把一切行为归纳为数据流,通过各种函数进行合并、过滤、转变等等。
流是响应式的核心,下面就是 “在按钮上点击” 事件流的简单示意图。 20180628153015267661416.png 关于更多的响应式编程可以查看:The introduction to Reactive Programming you’ve been missing · GitHub

异步

这是RxJava主要的核心功能,它能够轻松的实现异步操作,比如SQL查询,以往的经验是开一个新的线程执行查询代码,结果返回后通知主线程更新UI。例如如下的实现方式:

new Thread() {
    @Override
    public void run() {
        super.run();
          //execute sql query
        getActivity().runOnUiThread(new Runnable() {
                        @Override
                        public void run() {
                            //update UI
                        }
                    });
    }
}.start();

从代码上看,查询和更新UI两个行为通过嵌套的形式存在,倒也清晰。我们再换成RxJava实现看看

Observable.create(new ObservableOnSubscribe<Object>() {
    @Override
      public void subscribe(ObservableEmitter<Object> emitter) {
            //execute sql query
            Object dbResult = new Object();//模拟查询结果
            emitter.onNext(dbResult);
    }
})
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Consumer<Object>() {
            @Override
            public void accept(Object dbResult) {
                //update UI
          }
        });

单从代码量上来看,使用RxJava实现的代码似乎是多于传统写法的,如果你习惯Lambda表达式的话,以上代码可以再精简一番,如下

Observable.create(emitter -> {
    //execute sql query
    Object dbResult = new Object();//模拟查询结果
    emitter.onNext(dbResult);
})
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(dbResult -> {
            //update UI
        });

仔细看代码的话,会发现使用RxJava实现的代码都是链式调用,从上往下依次执行。而传统写法的代码,则是通过嵌套方式实现。一两层嵌套可能没什么影响,但是如果嵌套比较多的时候,比如请求网络数据之后需要查询数据库对比数据,经过复杂运算之后,再更新到UI上面。这时候RxJava的优势就体现出来了,把整个过程组成一个链,逐次执行,业务逻辑清晰,代码可读性也比较高。最讨厌那些各种代码缩进了,嵌套一多,缩进就会多的很恶心。

回到本小节主题上,关于异步,RxJava如何实现呢?上面代码里其实已经有所体现,就是观察者模式,RxJava把观察者模式体现的淋漓尽致。提到观察者模式,很多Android工程师第一想到的就是事件监听,通过对一个View添加一个监听器,即可实现一个订阅者关系。一旦View被点击了,OnClickListener即可获得到回调。而RxJava则是进行了更深一步的抽象,把可以触发点击事件的View抽象成为Observable(可观查的,即被观察者),而OnClickListener则被抽象成为Observer(观察者)。同时还有另外一步抽象,就是把setOnClickListener这个过程抽象成subscribe(订阅)。
简单画了个对比图,如下: 20180628153015571191248.png

可观察的队列 和事件化

经过上面两个核心点的分析,可观察的队列理解起来并不难,在RxJava中每一步对事件流的加工组合成一个队列,依次执行,并且这个队列的每一步都是可以观察的,通过观察可以对执行结果进行处理。
而对于事件化,everything can be a stream,几乎一切事情都可以被事件流化,这就是响应式编程中的一句名言。

如何使用RxJava2

对于学习一门新的技术来说,我认为首先要先用最快的速度了解这门技术是什么,能干什么?比如通过维基百科或者一些使用母语写的文章熟悉基本概念,然后通过官方文档熟悉具体细节。经过上面一些篇幅介绍了什么是RxJava,接下来我将通过翻译RxJava官方的教程来梳理如何使用RxJava2。

添加依赖

只需要在你项目里添加如下Gradle编译依赖,即可使用RxJava2:

 `compile "io.reactivex.rxjava2:rxjava:2.x.y"`

写此文时最新版本是2.1.16,所以需要替换上面脚本里对应的x、y。

Hello World

通过以下代码即可实现最简单的打印Hello World功能:

package rxjava.examples;

import io.reactivex.*;

public class HelloWorld {
    public static void main(String[] args) {
        Flowable.just("Hello world").subscribe(System.out::println);
    }
}

如果你的平台不支持Java 8 lambda表达式,你需要手动实例化一个内部类Consumer作为消费者。

import io.reactivex.functions.Consumer;

Flowable.just("Hello world")
  .subscribe(new Consumer<String>() {
      @Override public void accept(String s) {
          System.out.println(s);
      }
  });

基本类型

RxJava2提供了一些基本的操作符类型:

  • io.reactivex.Flowable: 事件流,支持响应式事件流和背压功能,比较复杂的一个操作符。
  • io.reactivex.Observable: 事件流,不支持背压。基本上可以满足大部分需求。
  • io.reactivex.Single: 仅支持一个正常事件或者异常事件。成功or失败,二选一。
  • io.reactivex.Completable: 仅仅关心任务完成或者异常,不关心数据。完成or失败,二选一。
  • io.reactivex.Maybe: 支持0或者1个数据,要么成功,要么失败。成功or完成or失败,三选一。

一些术语

Upstream, downstream 上游流,下游流

在RxJava中,数据流由三部分组成,数据源 + 一个或多个中间处理步骤 + 数据消费者或者再加工者。

source.operator1().operator2().operator3().subscribe(consumer);

source.flatMap(value -> source.operator1().operator2().operator3());

那么,如果我们处在operator2这里,向左看的话,左边的被称作上游流,右边的订阅者/消费者就被称作为下游流。当每一步骤的代码单独写到一行时,看起来更加直观。

source
  .operator1()
  .operator2()
  .operator3()
  .subscribe(consumer)

Objects in motion 运动中的对象

在RxJava开发文档中, emission, emits, item, event, signal, data and message 被认为是同义词,他们用来表示沿着数据流传播的对象。

Backpressure 背压

当数据流通过异步方式运行,每一步可能以不同的速度执行不同的任务。为了防止某一步骤超过负载而导致出现各种问题,背压功能出现了。通过背压功能可以做好流程控制,确定当前步骤可以处理多少任务。这样就可以在无法得知上游发送多少数据的情况下限制内存使用。
在RxJava中只有Flowable类支持使用背压,而Observable被设计为无背压操作符,比如小的任务队列、GUI交互等等。至于其他类型,Single, MaybeCompletable不支持背压,当然也不应该支持。他们都仅仅操作一个事件而已,有足够的空间。

Assembly time 组装时间

事件流在准备期间,通过一系列的中间操作对数据加工,被称为组装时间:

Flowable<Integer> flow = Flowable.range(1, 5)
.map(v -> v* v)
.filter(v -> v % 3 == 0)
;

这个时候事件并没有流动起来,也没有其他事情发生。

Subscription time 订阅时间

这是一个临时状态,发生在 subscribe()被调用的时候。

flow.subscribe(System.out::println)

这时候订阅的接口会被触发(参考doOnSubscribe),同时一些代码块会被执行或者一些事件会被发出。

Runtime 运行时

当事件正在被发出,或者错误信号、完成信号被发出的时候,叫做运行时状态。

Observable.create(emitter -> {
     while (!emitter.isDisposed()) {
         long time = System.currentTimeMillis();
         emitter.onNext(time);
         if (time % 2 != 0) {
             emitter.onError(new IllegalStateException("Odd millisecond!"));
             break;
         }
     }
})
.subscribe(System.out::println, Throwable::printStackTrace);

在上面代码中,当while开始执行时,就是运行时状态。

后台线程

使用RxJava比较常见的一个场景是:在后台线程进行运算、执行网络请求,然后把结果数据或者错误信息展示在UI线程上。

import io.reactivex.schedulers.Schedulers;

Flowable.fromCallable(() -> {
    Thread.sleep(1000); //  模拟复杂的运算
    return "Done";
})
  .subscribeOn(Schedulers.io())
  .observeOn(Schedulers.single())
  .subscribe(System.out::println, Throwable::printStackTrace);

Thread.sleep(2000); // <--- 等待事件流完成

这种类型的链接方法称为流畅的API,类似于构建器模式。但是,RxJava的响应式类型是不可变的; 每个方法调用都会返回一个具有添加行为的新Flowable。仅仅为了演示,上面代码可以写成如下这样:

Flowable<String> source = Flowable.fromCallable(() -> {
    Thread.sleep(1000);
    return "Done";
});

Flowable<String> runBackground = source.subscribeOn(Schedulers.io());

Flowable<String> showForeground = runBackground.observeOn(Schedulers.single());

showForeground.subscribe(System.out::println, Throwable::printStackTrace);

Thread.sleep(2000);

通常,你可以把一些运算或者IO阻塞的行为通过subscribeOn运行在其他线程上面,一旦数据准备好了,你可以再通过observeOn来确保他们运行在前台线程或者GUI线程上。

线程调度器

RxJava操作符里不能直接使用Thread或者ExecutorService,而是通过使用Scheduler来实现并发和线程切换。RxJava2具有以下几个标准的线程调度器,可以通过Schedulers工具类来使用。

  • Schedulers.computation(): 在后台对固定数量的专用线程运行计算密集型工作。通常作为异步操作符的默认调度器。
  • Schedulers.io(): IO阻塞操作,有不定数量的线程。
  • Schedulers.single(): 以顺序和FIFO方式在单线程上运行工作
  • Schedulers.trampoline(): 在一个参与线程中以顺序和FIFO方式运行工作,通常用于测试目的。

流中的并发性

并行处理

依赖子流

延续性

类型转换

操作符命名约定

进一步阅读

Copyright © tedxiong.com 2017 all right reserved,powered by Gitbook该文章修改时间: 2018-06-28 17:04:31

results matching ""

    No results matching ""