小知识,大挑战!本文正在参与“程序员必备小知识”创作活动。
本文已参与 「掘力星计划」 ,赢取创作大礼包,挑战创作激励金。
ReactiveX是Reactive Extensions的缩写,一般简写为Rx,最初是LINQ的一个扩展,由微软的架构师Erik Meijer领导的团队开发,在2012年11月开源,Rx是一个编程模型,目标是提供一致的编程接口,帮助开发者更方便的处理异步数据流,Rx近几年越来越流行了,现在已经支持几乎全部的流行编程语言了,Rx的大部分语言库由ReactiveX这个组织负责维护,比较流行的有RxJava/RxJS/Rx.NET,社区网站是 reactivex.io。
微软给的定义是,Rx是一个函数库,让开发者可以利用可观察序列和LINQ风格查询操作符来编写异步和基于事件的程序,使用Rx,开发者可以用Observables表示异步数据流,用LINQ操作符查询异步数据流, 用Schedulers参数化异步数据流的并发处理,Rx可以这样定义:Rx = Observables + LINQ + Schedulers。
ReactiveX.io给的定义是,Rx是一个使用可观察数据流进行异步编程的编程接口,ReactiveX结合了观察者模式、迭代器模式和函数式编程的精华。
Observable拥有它的近亲Iterable的全部优雅与灵活。任何对Iterable的操作,你都可以对Observable使用。
RxJava是响应式编程(Reactive Extensions)的java实现,它基于观察者模式的实现了异步编程接口。
Rxjava 3.x 的github官网;
RxJava2将被支持到2021年2月28日,错误的会同时在2.x和3.x修复,但新功能只会在3.x上添加;
Rxjava 3.0的一些改变:官方Wiki;
Rxjava 3.x 文档可以在官方javadoc中找到;
使用Rxjava3.x之前的准备工作:
添加依赖
RxJava 在很长一段时间里以java6 为baseline( Android 运行时支持的锅),但在即将到来的 Android Studio 4预览中,一个叫做 desuging 的过程能够将许多 Java 7和8的特性,透明地转换成与 Java 6兼容的特性。因此我们可以将 RxJava 的基准提高到 java 8,并为许多 Java 8构造增加官方支持比如:Optional、Stream等,因此必须将项目的编译目标设置更改为 java8:
- Reactive 直译为反应性的,有活性的,根据上下文一般翻译为反应式、响应式
- Iterable 可迭代对象,支持以迭代器的形式遍历,许多语言中都存在这个概念
- Observable 可观察对象,在Rx中定义为更强大的Iterable,在观察者模式中是被观察的对象,一旦数据产生或发生变化,会通过某种方式通知观察者或订阅者
- Observer 观察者对象,监听Observable发射的数据并做出响应,Subscriber是它的一个特殊实现
- emit 直译为发射,发布,发出,含义是Observable在数据产生或变化时发送通知给Observer,调用Observer对应的方法,文章里一律译为发射
- items 直译为项目,条目,在Rx里是指Observable发射的数据项,文章里一律译为数据,数据项。
在RxJava中,数据以流的方式组织:Rxjava包括一个源数据流,源数据流后跟着若干个用于消费数据流的步骤。
在代码中,对于operator2来说,在它前面叫做上流,在它后面的叫做下流。
在RxJava的文档中,emission, emits, item, event, signal, data and message都被认为在数据流中被传递的数据对象。
当上下游在不同的线程中,通过Observable发射,处理,响应数据流时,如果上游发射数据的速度快于下游接收处理数据的速度,这样对于那些没来得及处理的数据就会造成积压,这些数据既不会丢失,也不会被垃圾回收机制回收,而是存放在一个异步缓存池中,如果缓存池中的数据一直得不到处理,越积越多,最后就会造成内存溢出,这便是响应式编程中的背压(backpressure)问题。
为此,RxJava带来了backpressure的概念。背压是一种流量的控制步骤,在不知道上流还有多少数据的情形下控制内存的使用,表示它们还能处理多少数据。背压是指在异步场景中,被观察者发送事件速度远快于观察者的处理速度的情况下,一种告诉上游的被观察者降低发送速度的策略。
在Rxjava1.0中,有的Observable支持背压,有的不支持,为了解决这种问题,2.0把支持背压和不支持背压的Observable区分开来:支持背压的有Flowable类,不支持背压的有Observable,Single, Maybe and Completable类。
- 在订阅的时候如果使用FlowableSubscriber,那么需要通过s.request(Long.MAX_VALUE)去主动请求上游的数据项。如果遇到背压报错的时候,FlowableSubscriber默认已经将错误try-catch,并通过onError()进行回调,程序并不会崩溃;
- 在订阅的时候如果使用Consumer,那么不需要主动去请求上游数据,默认已经调用了s.request(Long.MAX_VALUE)。如果遇到背压报错、且对Throwable的Consumer没有new出来,则程序直接崩溃;
- 背压策略的上游的默认缓存池是128。
背压策略:
- error, 缓冲区大概在128
- buffer, 缓冲区在1000左右
- drop, 把存不下的事件丢弃
- latest, 只保留最新的
- missing, 缺省设置,不做任何操作
对于Android开发者而言,RxJava最简单的是通过调度器来方便地切换线程。在不同平台还有不同的调度器,例如我们Android的主线程:AndroidSchedulers.mainThread()。
RxJava事件发出去并不是置之不顾,要有合理的管理者来管理它们,在合适的时机要进行释放事件,这样才不会导致内存泄漏,这里的管理者我们称为事件调度器(或事件管理者)CompositeDisposable。
RxJava 3 中的基类相比RxJava 2 没啥改变,主要有以下几个基类:
- io.reactivex.Flowable:发送0个N个的数据,支持Reactive-Streams和背压
- io.reactivex.Observable:发送0个N个的数据,不支持背压,
- io.reactivex.Single:只能发送单个数据或者一个错误
- io.reactivex.Completable:没有发送任何数据,但只处理 onComplete 和 onError 事件。
- io.reactivex.Maybe:能够发射0或者1个数据,要么成功,要么失败。
Observable什么时候开始发射数据序列?这取决于Observable的实现,一个"热"的Observable可能一创建完就开始发射数据,因此所有后续订阅它的观察者可能从序列中间的某个位置开始接受数据(有一些数据错过了)。一个"冷"的Observable会一直等待,直到有观察者订阅它才开始发射数据,因此这个观察者可以确保会收到整个数据序列。
在一些ReactiveX实现里,还存在一种被称作Connectable的Observable,不管有没有观察者订阅它,这种Observable都不会开始发射数据,除非Connect方法被调用。
需要知道的是,RxJava以观察者模式为骨架,有两种常见的观察者模式:
- Observable(被观察者)/Observer(观察者)
- Flowable(被观察者)/Subscriber(观察者)
![](https://www.bianchenghao6.com/uploads/202410/10/8d4cb948fcc644d0.webp)
RxJava2/3中,Observeable用于订阅Observer,是不支持背压的,而Flowable用于订阅Subscriber,是支持背压(Backpressure)的。
Observable正常用法:
这种观察者模型不支持背压:当被观察者快速发送大量数据时,下游不会做其他处理,即使数据大量堆积,调用链也不会报MissingBackpressureException,消耗内存过大只会OOM。所以,当我们使用Observable/Observer的时候,我们需要考虑的是,数据量是不是很大(官方给出以1000个事件为分界线作为参考)。
输出如下:
Flowable是支持背压的,也就是说,一般而言,上游的被观察者会响应下游观察者的数据请求,下游调用request(n)来告诉上游发送多少个数据。这样避免了大量数据堆积在调用链上,使内存一直处于较低水平。
当然,Flowable也可以通过creat()来创建:
Flowable虽然可以通过create()来创建,但是你必须指定背压的策略,以保证你创建的Flowable是支持背压的。
最常用的其实就是上面说的两种订阅观察者,但是一些情况下,我们也会用到一些其他的一类观察者比如
- Single/SingleObserver
- Completable/CompletableObserver
- Maybe/MaybeObserver
Single类似于Observable,不同的是,它总是只发射一个值,或者一个错误通知,而不是发射一系列的值(当然就不存在背压问题),所以当你使用一个单一连续事件流,这样你可以使用Single。Single观察者只包含两个事件,一个是正常处理成功的onSuccess,另一个是处理失败的onError。因此,不同于Observable需要三个方法onNext, onError, onCompleted,订阅Single只需要两个方法:
- onSuccess - Single发射单个的值到这个方法
- onError - 如果无法发射需要的值,Single发射一个Throwable对象到这个方法
Single只会调用这两个方法中的一个,而且只会调用一次,调用了任何一个方法之后,订阅关系终止。
Single的操作符:
Single也可以组合使用多种操作,一些操作符让你可以混合使用Observable和Single:
操作符详细的图解可以参考英文文档:Single
如果你的观察者连onNext事件都不关心,可以使用Completable,它只有onComplete和onError两个事件:
要转换成其他类型的被观察者,也是可以使用toFlowable()、toObservable()等方法去转换。
如果你有一个需求是可能发送一个数据或者不会发送任何数据,这时候你就需要Maybe,它类似于Single和Completable的混合体。
Maybe可能会调用以下其中一种情况(也就是所谓的Maybe):
- onSuccess或者onError
- onComplete或者onError 可以看到onSuccess和onComplete是互斥的存在,例子代码如下:
要转换成其他类型的被观察者,也是可以使用toFlowable()、toObservable()等方法去转换。
上面就是Maybe/MaybeObserver的普通用法,你可以看到,实际上,这种观察者模式并不用于发送大量数据,而是发送单个数据,也就是说,当你只想要某个事件的结果(true or false)的时候,你可以用这种观察者模式
CompositeDisposable提供的方法中,都是对事件的管理
- dispose():释放所有事件
- clear():释放所有事件,实现同dispose()
- add():增加某个事件
- addAll():增加所有事件
- remove():移除某个事件并释放
- delete():移除某个事件
这是上面那些基类被观察者的上层接口:
其实我们可以看到,每一种观察者都继承自各自的接口,这也就把他们能完全的区分开,各自独立(特别是Observable和Flowable),保证了他们各自的拓展或者配套的操作符不会相互影响。
例如flatMap操作符实现:
假如你想为Flowable写一个自定义的操作符,那么只要保证Function< Publisher >中的类型实现了Publisher接口即可。这么说可能很抽象,大家不理解其实也没关系,因为并不推荐大家自定义操作符,RxJava中的操纵符的组合已经可以满足大家的需求了。
当然,你也会注意到上面那些接口中的subscribe()方法的返回类型为void了,在1.X中,这个方法一般会返回一个Subscription对象,用于取消订阅。现在,这个功能的对象已经被放到观察者Observer或者subscriber的内部实现方法中了,
Flowable/Subscriber
上面的实例中,onSubscribe(Subscription s)传入的参数s就肩负着取消订阅的功能,当然,他也可以用于请求上游的数据。
在Observable/observer中,传入的参数是另一个对象
Observable/Observer
在Observer接口中,onSubscribe(Disposable d)方法传入的Disposable也是用于取消订阅,基本功能是差不多的,只不过命名不一致,大家知道就好。
其实这种设计可以说还是符合逻辑的,因为取消订阅这个动作就只有观察者(Observer等)才能做的,现在把它并入到观察者内部,也算顺理成章吧。
Rxjava中,被观察者不能接收null作为数据源。
- 将eagerTruncate添加到replay运算符,以便head节点将在截断时丢失它保留的项引用 (#6532)
- 新增 X.fromSupplier() (#6529)
- 使用 Scheduler 添加 concatMap,保证 mapper 函数的运行位置 (#6538)
- 新增 startWithItem 和 startWithIterable (#6530)
- ConnectableFlowable/ConnetableFlowable 重新设计 (#6519)
- 将 as() 并入 to() (#6514)
- 更改 Maybe.defaultIfEmpty() 以返回 Single (#6517)
- 用 Supplier 代替 Callable (#6511)
- 将一些实验操作符推广到标准 (#6537)
- 从某些主题/处理器中删除 getValues() (#6516)
- 删除 replay(Scheduler) 及其重载 (#6539)
- 删除 dematerialize() (#6539)
- 删除 startWith(T|Iterable) (#6530)
- 删除 as() (#6514)
- Maybe.toSingle(T) (#6517)
- 删除 Flowable.subscribe(4 args) (#6517)
- 删除 Observable.subscribe(4 args) (#6517)
- 删除 Single.toCompletable() (#6517)
- 删除 Completable.blockingGet() (#6517)
因为写RxJava系列的文章时进行了很多阅读和参考,因此不分一二三等,将全系列的参考引用统一如下:
RxJava3 Wiki
RxJava3官方github
ReactiveX文档中文翻译
single
操作符系列讲的很好的文章:blog.csdn.net/weixin_4204…
基础介绍:blog.csdn.net/weixin_4204…
RxJava3的一些简介:juejin.im/post/5d1eef…
观察者被观察者入门RxJava的一篇好文章:juejin.im/post/…
关于背压一个很好的介绍:juejin.im/post/582d41…
RxLifecycle:github.com/trello/RxLi…
版权声明:
本文来源网络,所有图片文章版权属于原作者,如有侵权,联系删除。
本文网址:https://www.bianchenghao6.com/java-jiao-cheng/9719.html