可觀察對象 (Observable)

在 ReactiveX 中,一個觀察者 訂閱一個可觀察對象。然後,該觀察者會對可觀察對象 發射 的任何項目或項目序列做出反應。這種模式有助於並行操作,因為它不需要在等待可觀察對象發射物件時阻塞,而是以觀察者的形式建立一個哨兵,準備在可觀察對象在未來的任何時間這樣做時做出適當的反應。

本頁解釋了反應式模式是什麼,以及什麼是可觀察對象和觀察者(以及觀察者如何訂閱可觀察對象)。其他頁面展示了如何使用 各種可觀察對象運算子 來連結可觀察對象並改變其行為。

此文件以「彈珠圖」佐以說明。以下是彈珠圖如何表示可觀察對象和可觀察對象的轉換

另請參閱

背景

在許多軟體程式設計任務中,您或多或少會預期您編寫的指令會按順序、一次一個、以您編寫的順序逐步執行並完成。但在 ReactiveX 中,許多指令可能會並行執行,其結果稍後會由「觀察者」以任意順序捕獲。您不是呼叫方法,而是定義一種以「可觀察對象」形式擷取和轉換資料的機制,然後將觀察者訂閱到該機制,此時先前定義的機制會啟動作用,觀察者則作為哨兵,在它們準備好時捕獲並回應其發射。

這種方法的優點是,當您有一堆彼此不相關的任務時,您可以同時啟動所有任務,而不是等待每個任務完成後再啟動下一個任務 — 這樣,您的整個任務包完成所需的時間只會與任務包中最長的任務一樣長。

有許多術語用於描述這種異步程式設計和設計模型。本文檔將使用以下術語:觀察者 訂閱一個可觀察對象。可觀察對象透過呼叫觀察者的方法,向其觀察者發射項目或傳送通知

在其他文檔和其他上下文中,我們稱之為「觀察者」的內容,有時會稱為「訂閱者」、「監看者」或「反應器」。一般而言,此模型通常稱為 「反應器模式」

建立觀察者

本頁在其範例中使用類似 Groovy 的偽代碼,但許多語言都有 ReactiveX 實作。

在普通的方法呼叫中 — 也就是說,不是 ReactiveX 中典型的異步並行呼叫 — 流程類似於這樣

  1. 呼叫一個方法。
  2. 將該方法的傳回值儲存在變數中。
  3. 使用該變數及其新值來執行一些有用的操作。

或者,類似於這樣

// make the call, assign its return value to `returnVal`
returnVal = someMethod(itsParameters);
// do something useful with returnVal

在異步模型中,流程更像這樣

  1. 定義一個方法,該方法可以使用異步呼叫的傳回值來執行一些有用的操作;此方法是觀察者的一部分。
  2. 將異步呼叫本身定義為可觀察對象
  3. 透過訂閱的方式將觀察者附加到該可觀察對象 (這也會啟動可觀察對象的動作)。
  4. 繼續您的工作;每當呼叫傳回時,觀察者的方法將開始對其傳回值或值 — 可觀察對象發射的項目 — 進行操作。

看起來像這樣

// defines, but does not invoke, the Subscriber's onNext handler
// (in this example, the observer is very simple and has only an onNext handler)
def myOnNext = { it -> do something useful with it };
// defines, but does not invoke, the Observable
def myObservable = someObservable(itsParameters);
// subscribes the Subscriber to the Observable, and invokes the Observable
myObservable.subscribe(myOnNext);
// go on about my business

onNext、onCompleted 和 onError

Subscribe 方法是您將觀察者連接到可觀察對象的方式。您的觀察者會實作下列方法的一些子集

onNext
當可觀察對象發射項目時,可觀察對象會呼叫此方法。此方法將可觀察對象發射的項目作為參數。
onError
可觀察對象會呼叫此方法,表示它未能產生預期的資料或遇到其他錯誤。它不會再呼叫 onNextonCompletedonError 方法會將導致錯誤的指示作為其參數。
onCompleted
如果可觀察對象沒有遇到任何錯誤,它會在最後一次呼叫 onNext 後呼叫此方法。

根據 可觀察對象合約 的條款,它可能會呼叫 onNext 零次或多次,然後可能會在這些呼叫後呼叫 onCompletedonError 但兩者不會都呼叫,這將是它的最後一次呼叫。按照慣例,在本文件中,呼叫 onNext 通常稱為「發射」項目,而呼叫 onCompletedonError 則稱為「通知」。

更完整的 subscribe 呼叫範例如下

def myOnNext     = { item -> /* do something useful with item */ };
def myError      = { throwable -> /* react sensibly to a failed call */ };
def myComplete   = { /* clean up after the final response */ };
def myObservable = someMethod(itsParameters);
myObservable.subscribe(myOnNext, myError, myComplete);
// go on about my business

另請參閱

取消訂閱

在某些 ReactiveX 實作中,有一個特殊的觀察者介面 Subscriber,它實作了一個 unsubscribe 方法。您可以呼叫此方法來表示 Subscriber 不再對它目前訂閱的任何可觀察對象感興趣。這些可觀察對象隨後可以 (如果它們沒有其他感興趣的觀察者) 選擇停止產生要發射的新項目。

此取消訂閱的結果將會透過套用至觀察者訂閱的可觀察對象的運算子鏈回溯,這將導致鏈中的每個連結停止發射項目。然而,這並不保證會立即發生,即使在沒有任何觀察者繼續觀察這些發射之後,可觀察對象仍有可能產生並嘗試發射一段時間的項目。

關於命名慣例的一些注意事項

ReactiveX 的每個特定語言實作都有其自身的命名怪癖。沒有規範的命名標準,儘管實作之間存在許多共同點。

此外,其中一些名稱在其他上下文中具有不同的含義,或者在特定實作語言的慣用語中顯得笨拙。

例如,有 onEvent 命名模式 (例如 onNextonCompletedonError)。在某些情況下,此類名稱會指示事件處理常式註冊所使用的方法。然而,在 ReactiveX 中,它們會命名事件處理常式本身。

「熱」和「冷」可觀察對象

可觀察對象何時開始發射其項目序列?這取決於可觀察對象。「熱」可觀察對象可能會在建立後立即開始發射項目,因此稍後訂閱該可觀察對象的任何觀察者可能會在序列的中間某處開始觀察。另一方面,「冷」可觀察對象會等到觀察者訂閱它之後才會開始發射項目,因此保證此類觀察者會從頭開始看到整個序列。

在某些 ReactiveX 實作中,還有一個稱為「可連線」可觀察對象的東西。只有在呼叫其 Connect 方法時,此類可觀察對象才會開始發射項目,無論是否有任何觀察者訂閱它。

透過可觀察對象運算子進行組合

可觀察對象和觀察者只是 ReactiveX 的開始。就它們本身而言,它們只不過是標準觀察者模式的輕微擴展,更適合處理事件序列而不是單一回呼。

真正的威力來自「反應式擴展」(因此稱為「ReactiveX」) — 允許您轉換、組合、操作和處理可觀察對象發射的項目序列的運算子。

這些 Rx 運算子允許您以宣告式的方式將異步序列組合在一起,並具有回呼的所有效率優勢,但沒有通常與異步系統相關聯的巢狀回呼處理常式的缺點。

此文件將關於各種運算子及其使用範例的資訊分組到以下頁面

建立可觀察對象
CreateDeferEmpty/Never/ThrowFromIntervalJustRangeRepeatStartTimer
轉換可觀察對象項目
BufferFlatMapGroupByMapScanWindow
篩選可觀察對象
DebounceDistinctElementAtFilterFirstIgnoreElementsLastSampleSkipSkipLastTakeTakeLast
組合可觀察對象
And/Then/WhenCombineLatestJoinMergeStartWithSwitchZip
錯誤處理運算子
CatchRetry
公用程式運算子
DelayDoMaterialize/DematerializeObserveOnSerializeSubscribeSubscribeOnTimeIntervalTimeoutTimestampUsing
條件和布林運算子
AllAmbContainsDefaultIfEmptySequenceEqualSkipUntilSkipWhileTakeUntilTakeWhile
數學和彙總運算子
AverageConcatCountMaxMinReduceSum
轉換可觀察對象
To
可連線的可觀察對象運算子
ConnectPublishRefCountReplay
背壓運算子
強制執行特定流量控制原則的各種運算子

這些頁面包含一些不屬於 ReactiveX 核心,但在一個或多個特定語言實作和/或選用模組中實作的運算子資訊。

鏈結運算子

大多數運算子都會對可觀察對象進行操作並傳回可觀察對象。這允許您依序一個接一個地套用這些運算子,形成一個鏈。鏈中的每個運算子都會修改前一個運算子運算產生的可觀察對象。

還有其他模式,例如建造者模式(Builder Pattern),其中特定類別的多種方法會透過修改該物件來操作該類別的項目。這些模式也允許你以類似的方式鏈式調用方法。但雖然在建造者模式中,方法在鏈中的出現順序通常不重要,但在 Observable 運算符中,順序很重要

Observable 運算符的鏈不是獨立地對產生鏈的原始 Observable 進行操作,而是依序操作,每個運算符都對鏈中緊接在其前的運算符產生的 Observable 進行操作。

Null 值

在某些 ReactiveX 實作中,例如 RxJava 2.xRxJava 3.x,由於現在必須與 Reactive Streams 規範相容,因此不再允許 null 值。

一般來說,null 在某種意義上是模糊的,因為沒有好的方法來區分「未呈現」的指示和返回 null 而不是拋出異常的錯誤。如果真的需要這樣的「未呈現」指示,請考慮使用 Optional<T>(在 Java 8+ 中)或類似的結構來包裝可能為 null 的 T,並讓它在鏈中傳遞。

是的,使用這種方法會引入記憶體開銷和間接性(以及一些不便)。但是,如果仔細想想,以這種方式支援 null 會在所有地方引入開銷(就像 RxJava 1.x 一樣),即使在絕大多數用例中這大多是不必要的。

因此,RxJava 2.xRxJava 3.x 會積極檢查參數是否為 null、使用者提供的回呼函式是否回傳 null,並且幾乎永遠不會使用 null 調用使用者提供的回呼函式,除非在特定運算符的文件中另有說明。