Skip to content

Coding(4): RxJava

clarkehe edited this page Aug 22, 2016 · 14 revisions

最近接触到了RxJava,了解其功能及使用,并引入到项目中。通过RxJava取代之前通过设置回调来调用异步接口的方式,对RxJava背后的“思想”有了一个大概的了解。

RxJava是ReactiveX的Java实现。ReactiveX是一种编程模型,官网描述如下:

ReactiveX is a library for composing asynchronous and event-based programs by using observable sequences.

ReactiveX是进行异步处理的一个库,基于事件流,使用的是观察者模式。

讲解分析与使用RxJava的资料已经很多了,有一篇《给 Android 开发者的 RxJava 详解》写的不错,可以参考下。这里谈下自己使用的体会与理解。

使用RxJava来处理异步逻辑更直观,将异步的逻辑串起来,而不是回调嵌套。

举个例子,有一段代码逻辑,其实现分两步,先做一步,第一步完成后,获取其结果,进行第二步操作,第一步与第二步都是同步的处理。伪码如下:

int doStep1();
int doStep2(int retValue);

void funDoSomething(){
    int retValue = doStep1();
    int result   = doStep2(retValue);
}

这样的代码逻辑看起来很清楚。如果第一步操作是异步,调用doStep1不能同步返回结果,这个代码该怎么写。

interface IResultCallBack{
    void onResult(int retValue)
}

void doStep1(IResultCallBack callback);
int doStep2(int retValue);

void funDoSomething(){
    doStep1(new IResultCallBack(){
        @Override
        void onResult(int retValue){
             int result = doStep2(retValue);
        }
     });
}

相比之下,使用回调之后,代码就没那么直观了。

RxJava可以解决这种回调嵌套的问题。先看下使用了RxJava后的代码。

Observable<Integer> doStep1();
int doStep2(int retValue);

void funDoSomething(){
     Observable<Integer> observable = doStep1();
     observable.subscribe(new Subscriber<Integer>() {
        @Override
        public void onCompleted() {}

        @Override
        public void onError(Throwable e) {}

        @Override
        public void onNext(int retValue) {
             int result = doStep2(retValue);
        }
     };
}

//按照RxJava的习惯,doStep1返回的值,如果还要处理,可通过map来实现,这样onNext得到的就是最终的结果。
void funDoSomething(){
     // 异步处理第一步
     Observable<Integer> observable = doStep1();
     // 同步第二步处理
     observable.map(new Func1<Integer, Integer>(){
         @Override
         public Integer call(Integer retValue){
                int result = doStep2(retValue);
                return result;
     })
     // 结果
     .subscribe(new Subscriber<Integer>() {
        @Override
        public void onCompleted() {}

        @Override
        public void onError(Throwable e) {}

        @Override
        public void onNext(int result) {
        // got result
        }
     });
}

如果操作逻辑中第一步、第二步都是异常的,按照使用回调的逻辑,我们会这么写代码:

interface IResultCallBack{
    void onResult(int retValue)
}

void doStep1(IResultCallBack callback);
void doStep2(int retValue, IResultCallBack callback);

void funDoSomething(){
    doStep1(new IResultCallBack(){
        @Override
        void onResult(final int retValue){
             doStep2(retValue, new IResultCallBack(){
                 @Override
                 void onResult(int result){
                 // got result
                 }
              });
        }
     });
}

这样就出现了回调的嵌套,一层回调就有一层代码的缩进,整个代码逻辑更不直观。

RxJava可以有更直观的实现,通过flatMap将两个异步的操作串起来,将原来嵌套的逻辑,变成了链式的串行逻辑。当操作的逻辑越复杂,RxJava这种直观的优势体现的越明显。

Observable<Integer> doStep1();
Observable<Integer> doStep2(int retValue);

void funDoSomething(){
     // 异步处理第一步
     Observable<Integer> observable = doStep1();
     // 异步处理第二步
     observable.flatMap(new Fun1<Integer, Observable<Integer>>(){
         @Override
         public Observable<Integer> call(Integer retValue){
                return doStep2(retValue);
         }
     })
     // 结果
    .subscribe(new Subscriber<Integer>() {
        @Override
        public void onCompleted() {}

        @Override
        public void onError(Throwable e) {}

        @Override
        public void onNext(int result) {
         // got result
        }
     };
}

RxJava还有两个比较重要的特性,一个是Schedule,用于线程的调度;另一个体现在Operator上,提供了各种原生的运算逻辑,可查看参考资料的用的法。

[参考资料]
RxJava Essentials 中文翻译版
ReactiveX介绍
RxJava 进阶之用例总结(part1)
Grokking RxJava系列

Clone this wiki locally