ReactiveX 是一個利用可觀察序列來組合非同步和基於事件的程式的函式庫。
它擴展了觀察者模式,以支援數據和/或事件的序列,並添加了運算子,讓您可以聲明式地組合序列,同時抽象出諸如底層線程、同步、線程安全、並行數據結構和非阻塞 I/O 等問題。
Observables 填補了缺口,成為訪問多個項目的非同步序列的理想方式 | ||
---|---|---|
單一項目 | 多個項目 | |
同步 | T getData() | Iterable<T> getData() |
非同步 | Future<T> getData() | Observable<T> getData() |
它有時被稱為“函數式響應式編程”,但這是一種誤稱。ReactiveX 可能是函數式的,也可能是響應式的,但“函數式響應式編程”是另一回事。一個主要區別是,函數式響應式編程操作隨著時間連續變化的值,而 ReactiveX 操作隨著時間發出的離散值。(有關函數式響應式編程的更精確資訊,請參閱Conal Elliott 的研究。)
ReactiveX 的 Observable 模型允許您使用與處理陣列等數據項集合相同的簡單、可組合的操作來處理非同步事件流。它使您擺脫了錯綜複雜的回呼函數,從而使您的程式碼更具可讀性,更不容易出錯。
像 Java Futures 這樣的技術對於單級非同步執行來說使用起來很簡單,但是當它們被嵌套時,它們開始增加不小的複雜性。
很難使用 Futures 來最佳化組合條件非同步執行流程(或者說是不可能的,因為每次請求的延遲時間在執行時會有所不同)。當然,這可以做到,但是它很快就會變得複雜(因此容易出錯),或者它過早地阻塞了 Future.get()
,從而消除了非同步執行的好處。
另一方面,ReactiveX Observables 是旨在用於組合非同步數據的流程和序列。
ReactiveX Observables 不僅支援發出單一標量值(如 Futures 所做的那樣),還支援發出值序列甚至無限流。Observable
是一個單一的抽象,可以應用於所有這些使用案例。Observable 具有與其鏡像表親 Iterable 相關聯的所有靈活性和優雅性。
ReactiveX 不偏向於某種特定的並行或非同步來源。可以使用線程池、事件循環、非阻塞 I/O、actor(例如來自 Akka)或任何適合您的需求、風格或專業知識的實作來實現 Observables。無論您的底層實作是阻塞還是非阻塞,以及您選擇如何實作它,用戶端程式碼都將其與 Observables 的所有互動視為非同步。
Observable 是同步/拉取的 Iterable 的非同步/推送“對偶” | ||
---|---|---|
事件 | Iterable(拉取) | Observable(推送) |
檢索數據 | T next() | onNext(T) |
發現錯誤 | 拋出 Exception | onError(Exception) |
完成 | !hasNext() | onCompleted() |
這個 Observable 是如何實作的? |
---|
public Observable<data> getData(); |
從觀察者的角度來看,這並不重要! |
|
重要的是:使用 ReactiveX,您可以在以後改變主意,並從根本上改變 Observable 實作的底層性質,而不會破壞 Observable 的消費者。
回呼函數透過不允許任何阻塞來解決過早阻塞 Future.get()
的問題。它們自然是高效的,因為它們在響應準備好時執行。
但是,與 Futures 一樣,雖然回呼函數很容易與單級非同步執行一起使用,但是在嵌套組合中,它們會變得難以控制。
ReactiveX 目前以多種語言實作,其方式尊重這些語言的慣用語,並且正以驚人的速度添加更多語言。
ReactiveX 提供一系列運算子,您可以使用這些運算子來篩選、選擇、轉換、組合和組成 Observables。這樣可以實現高效的執行和組合。
您可以將 Observable 類別視為 Iterable 的“推送”等效項,後者是“拉取”。使用 Iterable,使用者從生產者那裡拉取值,並且線程會一直阻塞,直到這些值到達。相比之下,使用 Observable,生產者會在值可用時將值推送給使用者。這種方法更靈活,因為值可以同步或非同步到達。
範例程式碼顯示了如何將類似的高階函數應用於 Iterable 和 Observable | |
---|---|
Iterable | Observable(可觀察對象) |
|
|
Observable 類型在 四人幫的觀察者模式中添加了兩個缺失的語義,以匹配 Iterable 類型中可用的語義
onCompleted
方法)onError
方法)有了這些補充,ReactiveX 就協調了 Iterable 和 Observable 類型。它們之間唯一的區別是數據流的方向。這非常重要,因為現在您可以對 Iterable 執行的任何操作,也可以對 Observable 執行。