ReactiveX

ReactiveX 是一個利用可觀察序列來組合非同步和基於事件的程式的函式庫。

它擴展了觀察者模式,以支援數據和/或事件的序列,並添加了運算子,讓您可以聲明式地組合序列,同時抽象出諸如底層線程、同步、線程安全、並行數據結構和非阻塞 I/O 等問題。

Observables 填補了缺口,成為訪問多個項目的非同步序列的理想方式
單一項目多個項目
同步T getData()Iterable<T> getData()
非同步Future<T> getData()Observable<T> getData()

它有時被稱為“函數式響應式編程”,但這是一種誤稱。ReactiveX 可能是函數式的,也可能是響應式的,但“函數式響應式編程”是另一回事。一個主要區別是,函數式響應式編程操作隨著時間連續變化的值,而 ReactiveX 操作隨著時間發出的離散值。(有關函數式響應式編程的更精確資訊,請參閱Conal Elliott 的研究。)

為什麼要使用 Observables?

ReactiveX 的 Observable 模型允許您使用與處理陣列等數據項集合相同的簡單、可組合的操作來處理非同步事件流。它使您擺脫了錯綜複雜的回呼函數,從而使您的程式碼更具可讀性,更不容易出錯。

Observables 是可組合的

Java Futures 這樣的技術對於單級非同步執行來說使用起來很簡單,但是當它們被嵌套時,它們開始增加不小的複雜性

很難使用 Futures 來最佳化組合條件非同步執行流程(或者說是不可能的,因為每次請求的延遲時間在執行時會有所不同)。當然,這可以做到,但是它很快就會變得複雜(因此容易出錯),或者它過早地阻塞了 Future.get(),從而消除了非同步執行的好處。

另一方面,ReactiveX Observables 是旨在用於組合非同步數據的流程和序列

Observables 是靈活的

ReactiveX Observables 不僅支援發出單一標量值(如 Futures 所做的那樣),還支援發出值序列甚至無限流。Observable 是一個單一的抽象,可以應用於所有這些使用案例。Observable 具有與其鏡像表親 Iterable 相關聯的所有靈活性和優雅性。

</table>

Observables 的立場較不鮮明

ReactiveX 不偏向於某種特定的並行或非同步來源。可以使用線程池、事件循環、非阻塞 I/O、actor(例如來自 Akka)或任何適合您的需求、風格或專業知識的實作來實現 Observables。無論您的底層實作是阻塞還是非阻塞,以及您選擇如何實作它,用戶端程式碼都將其與 Observables 的所有互動視為非同步。

Observable 是同步/拉取的 Iterable 的非同步/推送“對偶”
事件Iterable(拉取)Observable(推送)
檢索數據T next()onNext(T)
發現錯誤拋出 ExceptiononError(Exception)
完成!hasNext()onCompleted()
這個 Observable 是如何實作的?
public Observable<data> getData();
從觀察者的角度來看,這並不重要!
  • 它是否在與呼叫者相同的線程上同步工作?
  • 它是否在一個不同的線程上非同步工作?
  • 它是否將其工作分散到多個線程上,這些線程可能會以任何順序將數據返回給呼叫者?
  • 它是否使用 Actor(或多個 Actor)而不是線程池?
  • 它是否使用帶有事件循環的 NIO 來進行非同步網路訪問?
  • 它是否使用事件循環將工作線程與回呼線程分開?

重要的是:使用 ReactiveX,您可以在以後改變主意,並從根本上改變 Observable 實作的底層性質,而不會破壞 Observable 的消費者。

回呼函數也有自己的問題

回呼函數透過不允許任何阻塞來解決過早阻塞 Future.get() 的問題。它們自然是高效的,因為它們在響應準備好時執行。

但是,與 Futures 一樣,雖然回呼函數很容易與單級非同步執行一起使用,但是在嵌套組合中,它們會變得難以控制

ReactiveX 是一種多語言實作

ReactiveX 目前以多種語言實作,其方式尊重這些語言的慣用語,並且正以驚人的速度添加更多語言。

響應式編程

ReactiveX 提供一系列運算子,您可以使用這些運算子來篩選、選擇、轉換、組合和組成 Observables。這樣可以實現高效的執行和組合。

您可以將 Observable 類別視為 Iterable 的“推送”等效項,後者是“拉取”。使用 Iterable,使用者從生產者那裡拉取值,並且線程會一直阻塞,直到這些值到達。相比之下,使用 Observable,生產者會在值可用時將值推送給使用者。這種方法更靈活,因為值可以同步或非同步到達。

範例程式碼顯示了如何將類似的高階函數應用於 Iterable 和 Observable
IterableObservable(可觀察對象)
getDataFromLocalMemory()
  .skip(10)
  .take(5)
  .map({ s -> return s + " transformed" })
  .forEach({ println "next => " + it })
getDataFromNetwork()
  .skip(10)
  .take(5)
  .map({ s -> return s + " transformed" })
  .subscribe({ println "onNext => " + it })

Observable 類型在 四人幫的觀察者模式中添加了兩個缺失的語義,以匹配 Iterable 類型中可用的語義

  1. 生產者向使用者發出訊號,表示沒有更多數據可用的能力(在這種情況下,Iterable 上的 foreach 循環會正常完成並返回;Observable 會呼叫其觀察者的 onCompleted 方法)
  2. 生產者向使用者發出訊號,表示發生錯誤的能力(如果迭代過程中發生錯誤,Iterable 會拋出異常;Observable 會呼叫其觀察者的 onError 方法)

有了這些補充,ReactiveX 就協調了 Iterable 和 Observable 類型。它們之間唯一的區別是數據流的方向。這非常重要,因為現在您可以對 Iterable 執行的任何操作,也可以對 Observable 執行。