RxJava的Android开发之路 RxJava入门(一)

本系列文章主要翻译自O’Reilly出版的RxJava for Android App Development一书。

第一章 An Introduction to RxJava

陡峭的学习曲线,但会有大大的回报

I was pretty much dragged into RxJava by my coworkers…[RxJava] was a lot like git…when I first learned git, I didn’t really learn it. I just spent three weeks being mad at it…and then something clicked and I was like ‘Oh! I get it! And this is amazing and I love it!’ The same thing happened with RxJava -Dan Lew

就像Dan Lew在前面的引述中所说的那样,RxJava的学习是非常困难的。这是不幸的,理由我会在下一章节中给出,RxJava能使在Android应用中异步数据流的处理变的更简洁更灵活。在这一章中,我会给出基本的介绍。

如果你正在怀疑RxJava是否值得学习,鉴于其陡峭的学习曲线,你可以直接跳转到下一章节的第二部分。在那里,我会给出一个处理异步数据流场景,来对比于传统的方法,RxJava带给我们的优势。虽然你可能没有很好的理解这部分代码是怎么工作的,但是你会看到RxJava是如何快速地处理任务的,如果不用RxJava处理任务,这些代码看起来通常是凌乱的,不灵活的。在看过了RxJava使你的Android代码变得更清晰了之后,希望你能有动力回到这篇介绍中。

让我们从一个例子开始,来帮助我们理解RxJava吧。我们正在写HackerNews客户端,希望用户来读HackerNews的故事和评论。我们的HackerNews客户端可能像Figure1-1:

我们需要通过网络取HackerNews的数据,因为不能阻塞ui线程,所以我们需要 异步 。RxJava是一个库,允许我们把任何操作都描绘成 异步数据流 ,它可以在任何线程上创建,组合,也可以被多个对象在任意线程上消费。

你现在可能不会深有体会,但当你读完这章后你就会理解了。在前面RxJava的定义中,第一个可能模糊的或者说不熟悉的短语是”异步数据流”,现在让我们熟悉熟悉它吧。

Observables

RxJava的异步数据流是由Observables发射的。 reactive extensions website 称Observable是”异步/发射对偶于Iterable的同步/获取”。虽然Java的Iterable接口对偶于RxJava的Observables是不完美的,但有助于介绍Observables和异步数据流,所以有必要介绍一下Java的Iterable接口是如何工作的。

每当我们使用for-each语法遍历集合时,我们都会利用Iterables接口的优势。如果我们正在构建我们的HackerNews客户端,我们可能需要遍历故事列表并打印这些故事的标题:

for (Story story : stories) {
    Log.i(TAG, story.getTitle());
}

等价的代码如下:

for (Iterator<Story> iterator = stories.iterator(); iterator.hasNext();) {
    Story story = iterator.next();
    Log.i(TAG, story.getTitle());
}

正如我们前面看到的这段代码,Iterables暴露出Iterator,可以访问一个聚合对象中的元素,也决定不再有未被使用的元素遗留在集合里。从客户和接口交互的角度看,任何实现了Iterable接口的对象,都可以访问已经预先定义好终止点的数据流。

Observables在这方面很像Iterables:它们也可以使用预先定义好终止点的数据流。

Obervables和Iterators关键的不同点是,Observables提供了 异步数据流 的使用,而Iterables提供的是 同步 。使用来自Iterator的数据会阻塞线程直到元素被返回。另一方面,想要消费Observable发射出数据的对象,需要注册才能接收。

为了使这个区别更明显,请再思考上面的在集合Collection 中打印HackerNews故事标题的那段代码。被打印的故事不在内存中,每个故事不得不取自网络,同时我们还想在main线程上打印故事。在这里,我们需要故事流是一个异步流,那么使用Iterable来读取数据流中的每一个元素是不合适的。

相反地,我们应该使用Observable去读取由HackerNews API返回的每一个故事。我们知道通过调用Iterator的next()方法可以访问Iterable数据流中的元素。然而,我们不知道如何去读取Observable异步数据流中的元素。接下来带来RxJava中的第二个基本概念:Observer。

Observers

Observers是Observables异步数据流的消费者。Observers能以它们想要的方式响应由Observable发射出的数据。例如,下面的Observer打印了由Observable发射出的故事标题:

storiesObservable.subscribe(new Observer<Story>() {

    @Override
    public void onCompleted() {

    }

    @Override
    public void onNext(Story story) {
        Log.i(TAG, story.getTitle());
    }

    //...
});

这段代码和之前的for-each代码是非常相似的。在这两段代码中,我们都消费了一个已经预先定义好终止点的数据流。当我们使用for-each语法遍历集合时,iterator.hasNext()方法返回false时遍历停止。相似的,在前面的这段代码中,当异步数据流中没有元素时,onCompleted()方法被调用。

两段代码的主要区别是,当我们遍历集合时,我们同步地打印故事的标题,而当我们订阅stringsObservable时,我们是异步地打印故事的标题。

Observer也能处理任何可能发生在Observable发出数据时的异常。Observer在onError()方法中处理这些errors。

为了看清楚为什么这是RxJava一个非常有用的特性,脑补一下由Observable发出的这些故事对象,是由JSON响应转换过来的。如果HackerNews API返回了一个有缺陷的JSON,那么转换成故事对象会发生异常,Observer可以调用onError()方法来处理JSON被解析时抛出的异常。

此时此刻,前面提到RxJava的两个概念会更清晰。让我们再来简单看看它的定义:

RxJava is a library that allows us to represent any operation as an asynchronous data stream that can be created on any thread, declaratively composed, and consumed by multiple objects on any thread.

我们仅仅看到了Observables允许我们把任何操作描绘成一个异步数据流。Observables和Iterables在他们提供有终止点的数据流使用上比较相似。我们现在也知道了Observables和Iterables的一个重要不同点:Observables暴露 异步 数据流然而Iterables暴露 同步 数据流。

Observers可以消费由Observables发射出的异步数据流。可以有多个Observers注册去接收由Observable发射的数据。Observers能处理任何可能发生在Observable发射数据时的异常,Observers也知道什么时候结束。

RxJava的定义到这里仍然会有一些是不清楚的。RxJava到底是如何允许我们把任何操作描绘成异步数据流的?换句话说,Observables是如何发射数据组成它们的异步数据流的?这些数据来自哪?这些问题我们将要在下一部分讲解。

Observable Creation and Subscribers

Observables发出异步数据流。Observables发射他们数据的方式和Iterable接口显示他们数据流的方式很相似。Iterables接口和Iterators是迭代器模式的一部分,迭代器模式的主要目的是:

提供一种方法顺序访问一个聚合对象中的各个元素,而又不暴露其内部的表示。

迭代器模式允许任何对象在不暴露底层实现情况下提供读取它的元素的方法。相似地,Observables以完全隐藏的方式提供了对异步数据流元素的使用,且独立于数据流被创建的过程。这也意味着Observables可以执行几乎任何的操作。

接下来用一个例子体现Observable的灵活性。Observables通常是由函数对象创建,这个函数对象可以取异步数据流中的数据并通知Subscriber这些数据可用。此外,Subscriber也就是Observer可以把它自己从Observable发射的数据中unsubscribe。

下面的代码会告诉你,如何创建Observable来发射从API服务器取回的HackerNews的故事:

Observable.create(new Observable.OnSubscribe<Story>() { //1
    @Override
    public void call(Subscriber<Story> subscriber) {
        if (!subscriber.isUnsubscribed()) { //2
            try {
                Story topStory = hackerNewsRestAdapter.getTopStory(); //3
                subscriber.onNext(topStory); //4
                Story newestStory = hackerNewsRestAdapter.getNewestStory();
                subscriber.onNext(newestStory);
                subscriber.onCompleted(); //5
            } catch (JsonParseException e) {
                subscriber.onError(e); //6
            }
        }
    }
});

让我们一步一步地浏览这段代码发生了什么:

  1. OnSubscribe 提供了这段代码什么时候被执行的线索:通过调用Observable.subscribe()方法,Observer注册去接收由这个Observable发射的数据时。
  2. 在发射数据前,我们可以检测Subscriber是否已经解订阅。请记住:Subscriber就像是Observer,它可以从发射数据的Observable中unsubscribe。
  3. 调用这个方法我们取出了HackerNews的数据。注意这是一个同步的方法调用,线程将阻塞直到故事被返回。
  4. 我们通知了已经订阅Observable的Observer,有新故事可用。Observer会被Subscriber封装并被传递进call()方法。Subscriber的封装,进一步简化了对封装好的Observer的调用。
  5. 当Observable的数据流中不再有故事发出时,我们会通知Observer调用onCompleted()方法。
  6. 如果JSON解析出现异常,我们会通知Observer调用onError()方法。

在Activitys中创建Observables可能导致内存泄露

理由我们会在下一章节中给出,当你在Activity中调用Observable.create()方法时要小心。前面的代码如果在Activity中调用可能导致内存泄漏。

正如你之前看到的那段代码一样,Observables能通过相当多的操作创建。Observable被创建的灵活性和Iterables接口很相似。实现了Iterable接口的任何对象, 都可以暴露出同步数据流。同样地,Observable的数据流能被任何对象创建,只要将对象传递进Observable.OnSubscribe方法中即可。

到这里,聪明的读者可能会好奇Observables是否真的发射出了异步数据流。思考前面的例子,他们可能想知道,当Observable.subscribe()方法被调用时,是否在Observable.OnSubscribe函数对象中的call()方法也被调用了?方法的调用是否锁住了hackerNewsRestAdapter中的同步方法?是不是直到调用Observable.subscribe()阻塞了主线程Observable才完成发射由hackerNewsRestAdapter返回的故事?

这确实是一个好问题。在这里Observable.subscribe()方法会阻塞主线程。然而RxJava的另一个部分能阻止它的发生:Scheduler。

Schedulers

Schedulers决定了Observables在哪个线程发射他们的异步数据流,也决定了Observers在哪个线程消费这些数据流。对Observable应用正确的Scheduler会阻止Observable.OnSubscribe的call()方法运行在main线程上:

Observable.create(new Observable.OnSubscribe<Story>() {
// ...
}).subscribeOn(Schedulers.io());

顾名思义,Schedulers.io()会返回一个Scheduler,用来调度Observable.OnSubscribe对象里面的call()方法跑在一个I/O线程上。

使用Scheduler还有另一个方法:observeOn()。使用这个方法决定Observer在哪一个线程上消费由Observable subscribeOn()方法发出的数据,因此你能把observeOn()方法关联到通过subscribeOn()返回的Observable上:

Observable.create(new Observable.OnSubscribe<Story>() {
//...
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());

AndroidSchedulers.mainThread()实际上不属于RxJava库,但是可以通过依赖RxAndroid库来使用。调用observeOn()方法主要点在于,你可以修改Observer消费由Observable发出的数据的线程。

subscribeOn()和observeOn()两个方法是你能修改由Observable发出的数据流通用的方式:operators。我们将要在下一章节中讨论operators。现在,让我们回到RxJava的定义:

RxJava is a library that allows us to represent any operation as an asynchronous data stream that can be created on any thread, declaratively composed, and consumed by multiple objects on any thread.

这一节我们讲述了RxJava是如何允许我们在任意一个线程上创建并消费异步数据流的。这个定义里面还有一个地方是我们不是很清楚的, 组合 。不清楚的这个 组合 其实是和操作符相关的。

Operators

在前面章节中我们讨论了Schedulers,在Observable.subscribeOn()和Observable.observeOn()方法中使用。这两个方法都是operators。Operators允许我们组合Observables。为了更好的理解operators,让我们来简短地分解定义中的组合。

组合Observable仅仅只是”制作”一个复杂的Observable。在复杂的功能实现上,由operators组合而成的Obserable和函数组合是非常相似的。在函数组合中,复杂的功能是通过一个函数的输出作为另一个函数的输入实现的。

例如,考虑Math.ceil(int x)函数。它返回要么比x大,要么与x相等的integer类型的整数。例如,Math.ceil(1.2)将要返回2.0。现在,假设我们有一个takeTwentyPercent(double x)方法简单地返回传进去值的20%。如果我们想要写一个方法计算小费,我们能组合Math.ceil()和takeTwentyPercent()去定义这个函数:

double calculateGenerousTip(double bill) {
    return takeTwentyPercent(Math.ceil(bill));
}

复杂的函数calculateGenerousTip()是传递Math.ceil(bill)的返回值作为takeTwentyPercent()输入得到的结果组成的。

操作符允许我们像calculateGenerousTip()方法的组成那样组合Observables。一个操作符是被应用于一个源Observable并且作为应用的结果它将返回一个新的Observable。例如,看下面的代码,源Observable是storiesObservable:

Observable<String> ioStoriesObservable = storiesObservable.subscribeOn(Schedulers.io());

ioStoriesObservable,作为应用了subscribeOn操作符的结果返回的当然是一个Observable了。在操作符被应用之后,返回的Observable是更加复杂的:它表现不同于源Observable的地方在于它发射出的数据在一个I/O线程。

我们可以拿subscribeOn操作符返回的Observable和另一个操作符去进一步地组合成最终我们想要订阅的Observable。把subscribeOn和observeOn两个操作符链接在一起,确保故事标题的异步流在后台线程发射并且消费在main线程:

Observable<String> androidFriendlyStoriesObservable = storiesObservable
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());

我们看到Observable的组合就像calculateGenerousTip()方法的组合一样,传递Math.ceil()方法的输出作为takeTwentyPercent()方法的输入。相似地,传递应用了subscribeOn操作符的输出作为应用observeOn操作符的输入来组合成androidFriendlyStoriesObservable。

operators的这种组合方式声明了Observable。当我们使用operator时,我们可以简单地指定我们想要组合的Observable,而不是提供一个跳出组成我们Observable的行为实现。例如,当我们应用observeOn和subsceibeOn操作符时,我们就不是被迫的和Threads,Executors,Handlers一起工作。反而,我们可以简单地将Scheduler传递进这些操作符中并且这个Scheduler有责任确保组合成我们想要的Observable。这样,RxJava帮我们避免了复杂的和易于出错的异步数据的传输。

组合一个”android friendly”的Observable, 在后台线程发射数据并且在主线程将这些数据传递给Observer就像你使用操作符开始的一样。联系上下文,看一看操作符是如何被使用的,是一种学习操作符是如何工作的并且它是如何帮助你的非常好的方式。这是下一章节我们将要详细讨论的事情。

现在,我们简单地介绍另一个在出现在我们HackerNews故事例子中的操作符。map操作符允许我们把一个发射 故事 的Observable变成一个发射这些 故事标题 的Observable。看起来像这样:

Observable.create(new Observable.OnSubscribe<Story>() {
    // Emitting story objects...
})
.map(new Func1<Story, String>() {
    @Override
    public String call(Story story) {
        return story.getTitle();
    }
})

map操作符会返回一个新的Observable 用来发射故事的标题,故事由Observable.create()创建的Observable发出。

此时此刻,我们已经足够的了解了RxJava是如何允许我们整洁地和声明性地处理异步数据。由于操作符的强大,我们能以一个发射HackerNews故事的Observable和在UI线程上消费的需求开始,应用一系列的操作符,在I/O线程发射HackerNews的故事, 但是将这些故事的标题传递给UI线程上的Observers。

看起来像这样:

Observale.create(new Observable.OnSubscribe<Story>() {
    //Emitting story objects...
})
.map(new Func1<Story, String>() {
    @Override
    public String call(Story story) {
        return story.getTitle();
    }
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());

多个操作符链在一起的混乱

一些Android的开发者推荐使用Retrolambda插件,这个库把Java 8的lambda特性兼容到Java 6, 一个被Android完全支持的Java版本。Dan Lew在他的博客中推荐这个库。然而,Jake Wharton大神,指出了一个使用Retrolambda的重要缺陷:IDE中的代码不会匹配跑在设备上的代码,因为Retrolambda为了移植lambda特性重写了字节码。

在决定是否使用Retrolambda之前需要记住的是,Android Studio能折叠被传递进多种多样的RxJava方法的函数对象,使这些对象看起来像lambda。对于我来说,这能减轻对Retrolambda的需要。

Conclusion

在这篇文章的开始,我给出了RxJava的一般定义:

RxJava is a library that allows us to represent any operation as an asynchronous data stream that can be created on any thread, declaratively composed, and consumed by multiple objects on any thread.

此时此刻,你应该掌握这个定义并且你应该能将定义的各个部分映射到确定的概念或对象中去。RxJava让我们把任何操作描绘成一个异步数据流,允许我们通过Observable.OnSubscribe函数对象创建Observables,获取数据并且通过调用Observers的onNext(), onError()和onCompleted()方法,通知已注册的Observers,数据流中的新数据,错误,或者是数据流的完成。RxJava的Schedulers允许我们改变创建和消费异步数据流的线程。通过操作符的使用,被应用到Observables的这些Schedulers,允许我们从一个简单的Observables声明性地组合成复杂的Observables。

我来评几句
登录后评论

已发表评论数()