簡介

ReactiveX 的每個特定語言實作都實作了一組運算子。雖然實作之間有很多重疊之處,但也有些運算子僅在某些實作中實作。此外,每個實作都傾向於將其運算子命名為類似於該語言中其他上下文中已熟悉的類似方法。

串聯運算子

大多數運算子都對 Observable 進行操作並返回一個 Observable。這允許你一個接一個地以鏈的形式應用這些運算子。鏈中的每個運算子都會修改前一個運算子操作產生的 Observable。

還有其他模式,例如 Builder 模式,其中特定類別的各種方法透過方法的操作修改該物件,對該類別的項目進行操作。這些模式也允許你以類似的方式鏈接方法。但是,雖然在 Builder 模式中,方法在鏈中出現的順序通常並不重要,但在 Observable 運算子中,順序很重要

Observable 運算子鏈不會獨立地對源自該鏈的原始 Observable 進行操作,而是依次操作,每個運算子都對鏈中緊接在前一個運算子生成的 Observable 進行操作。

ReactiveX 的運算子

此頁面首先列出 ReactiveX 中可被視為「核心」運算子的內容,並連結到具有更深入資訊的頁面,說明這些運算子如何運作以及特定語言的 ReactiveX 版本如何實作這些運算子。

接下來是一個「決策樹」,可協助你選擇最適合你的使用案例的運算子。

最後,有一個字母順序的列表,列出 ReactiveX 的許多特定語言實作中可用的大多數運算子。這些連結到記錄最接近特定語言運算子的核心運算子的頁面(因此,例如,Rx.NET 的「SelectMany」運算子連結到 FlatMap ReactiveX 運算子的文件,其中「SelectMany」是 Rx.NET 的實作)。

如果你想實作自己的運算子,請參閱實作你自己的運算子

目錄

  1. 依類別分類的運算子
  2. Observable 運算子的決策樹
  3. Observable 運算子的字母順序列表

依類別分類的運算子

建立 Observables

產生新 Observables 的運算子。

  • Create — 透過程式設計呼叫觀察者方法,從頭開始建立 Observable
  • Defer — 在觀察者訂閱之前不要建立 Observable,並為每個觀察者建立一個新的 Observable
  • Empty/Never/Throw — 建立具有非常精確和有限行為的 Observables
  • From — 將其他物件或資料結構轉換為 Observable
  • Interval — 建立一個 Observable,該 Observable 發出以特定時間間隔間隔的整數序列
  • Just — 將一個物件或一組物件轉換為發出該物件或那些物件的 Observable
  • Range — 建立一個 Observable,該 Observable 發出一個連續的整數範圍
  • Repeat — 建立一個 Observable,該 Observable 重複發出特定項目或項目序列
  • Start — 建立一個 Observable,該 Observable 發出函數的傳回值
  • Timer — 建立一個 Observable,該 Observable 在給定的延遲後發出單個項目

轉換 Observables

轉換 Observable 發出的項目的運算子。

  • Buffer — 定期從 Observable 收集項目到捆綁包中,並發出這些捆綁包,而不是一次發出一個項目
  • FlatMap — 將 Observable 發出的項目轉換為 Observables,然後將這些項目的發射展平為單個 Observable
  • GroupBy — 將 Observable 分成一組 Observables,每個 Observable 發出原始 Observable 的不同項目組,按鍵組織
  • Map — 透過對每個項目應用函數來轉換 Observable 發出的項目
  • Scan — 依序將函數應用於 Observable 發出的每個項目,並發出每個後續值
  • Window — 定期將 Observable 的項目細分為 Observable 視窗,並發出這些視窗,而不是一次發出一個項目

篩選 Observables

有選擇地從來源 Observable 發出項目的運算子。

  • Debounce — 僅當特定時間範圍過去而沒有發出另一個項目時,才從 Observable 發出一個項目
  • Distinct — 抑制 Observable 發出的重複項目
  • ElementAt — 僅發出 Observable 發出的第 n 個項目
  • Filter — 僅從 Observable 發出通過謂詞測試的那些項目
  • First — 僅從 Observable 發出第一個項目,或符合條件的第一個項目
  • IgnoreElements — 不從 Observable 發出任何項目,但鏡像其終止通知
  • Last — 僅發出 Observable 發出的最後一個項目
  • Sample — 在週期性時間間隔內發出 Observable 發出的最新項目
  • Skip — 抑制 Observable 發出的前 n 個項目
  • SkipLast — 抑制 Observable 發出的最後 n 個項目
  • Take — 僅發出 Observable 發出的前 n 個項目
  • TakeLast — 僅發出 Observable 發出的最後 n 個項目

合併 Observables

使用多個來源 Observables 來建立單個 Observable 的運算子

  • And/Then/When — 透過 PatternPlan 中介合併由兩個或多個 Observables 發出的一組項目
  • CombineLatest — 當兩個 Observables 中的任何一個發出一個項目時,透過指定的函數合併每個 Observable 發出的最新項目,並根據此函數的結果發出項目
  • Join — 當一個 Observable 的項目在根據另一個 Observable 發出的項目定義的時間視窗內發出時,合併兩個 Observables 發出的項目
  • Merge — 透過合併其發射,將多個 Observables 合併為一個
  • StartWith — 在開始發出來源 Observable 的項目之前,發出指定的項目序列
  • Switch — 將發出 Observables 的 Observable 轉換為單個 Observable,該 Observable 發出最近發出的那些 Observables 發出的項目
  • Zip — 透過指定的函數合併多個 Observables 的發射,並根據此函數的結果為每個組合發出單個項目

錯誤處理運算子

協助從 Observable 的錯誤通知中復原的運算子

  • Catch — 透過在沒有錯誤的情況下繼續該序列,從 onError 通知中復原
  • Retry — 如果來源 Observable 發送 onError 通知,則重新訂閱它,希望它在沒有錯誤的情況下完成

Observable 公用程式運算子

用於處理 Observables 的有用運算子工具箱

  • Delay — 將 Observable 的發射時間向前移動特定的量
  • Do — 註冊在各種 Observable 生命周期事件中採取的動作
  • Materialize/Dematerialize — 表示發出的項目和發送的通知都作為發出的項目,或反轉此過程
  • ObserveOn — 指定觀察者將在此 Observable 上觀察的排程器
  • Serialize — 強制 Observable 進行序列化呼叫並表現良好
  • Subscribe — 對來自 Observable 的發射和通知執行操作
  • SubscribeOn — 指定 Observable 在訂閱時應使用的排程器
  • TimeInterval — 將發出項目的 Observable 轉換為發出這些發射之間經過的時間量的指示的 Observable
  • Timeout — 鏡像來源 Observable,但如果在沒有發出任何項目的情況下經過特定時間段,則發出錯誤通知
  • Timestamp — 將時間戳記附加到 Observable 發出的每個項目
  • Using — 建立與 Observable 具有相同生命週期的可釋放資源

條件和布林運算子

評估一個或多個 Observables 或由 Observables 發出的項目的運算子

  • All — 確定 Observable 發出的所有項目是否符合某些條件
  • Amb — 給定兩個或多個來源 Observables,僅發出來自這些 Observables 中第一個發出項目的所有項目
  • Contains — 確定 Observable 是否發出特定項目
  • DefaultIfEmpty — 發出來源 Observable 的項目,如果來源 Observable 沒有發出任何項目,則發出預設項目
  • SequenceEqual — 確定兩個 Observables 是否發出相同的項目序列
  • SkipUntil — 丟棄 Observable 發出的項目,直到第二個 Observable 發出項目
  • SkipWhile — 丟棄 Observable 發出的項目,直到指定條件變為 false
  • TakeUntil — 在第二個 Observable 發射項目或終止後,捨棄 Observable 發射的項目
  • TakeWhile — 在指定條件變為 false 後,捨棄 Observable 發射的項目

數學和彙總運算子

對 Observable 發射的整個項目序列進行操作的運算子

  • Average — 計算 Observable 發射的數字的平均值並發射此平均值
  • Concat — 發射兩個或多個 Observables 的發射項,而不將它們交錯
  • Count — 計算來源 Observable 發射的項目數量,並僅發射此值
  • Max — 確定並發射 Observable 發射的最大值項目
  • Min — 確定並發射 Observable 發射的最小值項目
  • Reduce — 依序將函式套用至 Observable 發射的每個項目,並發射最終值
  • Sum — 計算 Observable 發射的數字總和並發射此總和

背壓運算子

  • 背壓運算子 — 用於處理產生項目速度快於其觀察者消耗速度的 Observables 的策略

可連線 Observable 運算子

具有更精確控制的訂閱動態的特殊 Observable

  • Connect — 指示可連線的 Observable 開始向其訂閱者發射項目
  • Publish — 將普通 Observable 轉換為可連線的 Observable
  • RefCount — 使可連線的 Observable 的行為像普通 Observable
  • Replay — 確保所有觀察者看到相同的發射項目序列,即使他們在 Observable 開始發射項目後才訂閱

轉換 Observables 的運算子

  • To — 將 Observable 轉換為另一個物件或資料結構

Observable 運算子的決策樹

此樹狀結構可以幫助您找到您正在尋找的 ReactiveX Observable 運算子。

我想建立一個新的 Observable
它會發射特定的項目
Just
該項目是在訂閱時從呼叫的函式中傳回的
Start
該項目是在訂閱時從呼叫的 ActionCallableRunnable 或類似的項目中傳回的
From
在指定的延遲之後
Timer
它會從特定的 ArrayIterable 或類似項目中提取其發射項
From
透過從 Future 檢索它
Start
它從 Future 取得其序列
From
它會重複發射一個項目序列
Repeat
從頭開始,使用自訂邏輯
Create
針對每個訂閱的觀察者
Defer
它會發射一個整數序列
Range
在特定的時間間隔
Interval
在指定的延遲之後
Timer
它會在不發射任何項目的情況下完成
Empty
它什麼都不做
Never
我想透過組合其他 Observables 來建立一個 Observable
並以接收到的任何順序發射所有 Observables 中的所有項目
Merge
並一次發射一個 Observable 中的所有項目
Concat
透過依序組合兩個或多個 Observables 中的項目,得出要發射的新項目
每當每個 Observables 發射新項目時
Zip
每當任何 Observables 發射新項目時
CombineLatest
每當一個 Observable 在另一個 Observable 發射的項目定義的視窗中發射項目時
Join
透過 PatternPlan 中介
And/Then/When
並只發射最近發射的 Observables 中的項目
Switch
我想在轉換 Observable 的項目之後發射它們
一次使用一個函式
Map
透過發射對應 Observables 發射的所有項目
FlatMap
一次一個 Observable,按照它們發射的順序
ConcatMap
基於它們之前的所有項目
Scan
透過將時間戳記附加到它們
Timestamp
轉換為項目發射之前經過的時間量指標
TimeInterval
我想在時間上向前移動 Observable 發射的項目,然後重新發射它們
Delay
我想將 Observable 的項目通知轉換為項目並重新發射它們
透過將它們包裝在 Notification 物件中
Materialize
然後我可以再次使用
Dematerialize
我想忽略 Observable 發射的所有項目,只傳遞其完成/錯誤通知
IgnoreElements
我想鏡像一個 Observable,但在其序列中加上前置項目
StartWith
只有當其序列為空時
DefaultIfEmpty
我想從 Observable 收集項目,並將它們作為項目緩衝區重新發射
Buffer
只包含最後發射的項目
TakeLastBuffer
我想將一個 Observable 分割成多個 Observables
Window
以便相似的項目最終出現在同一個 Observable 上
GroupBy
我想檢索 Observable 發射的特定項目
它完成之前發射的最後一個項目
Last
它發射的唯一項目
Single
它發射的第一個項目
First
我想只重新發射 Observable 中的某些項目
透過篩選出那些不符合某些謂詞的項目
Filter
也就是說,只有第一個項目
First
也就是說,只有第一個
Take
也就是說,只有最後一個項目
Last
也就是說,只有第 n 個項目
ElementAt
也就是說,只有在第一個項目之後的項目
也就是說,在最初 n 個項目之後
Skip
也就是說,直到其中一個項目符合謂詞
SkipWhile
也就是說,在初始時間段之後
Skip
也就是說,在第二個 Observable 發射項目之後
SkipUntil
也就是說,除了最後幾個項目以外的項目
也就是說,除了最後 n 個項目以外的項目
SkipLast
也就是說,直到其中一個項目符合謂詞
TakeWhile
也就是說,除了在來源完成之前的時間段內發射的項目以外的項目
SkipLast
也就是說,除了在第二個 Observable 發射項目之後發射的項目以外的項目
TakeUntil
透過定期對 Observable 進行取樣
Sample
透過僅發射在某個持續時間內沒有其他項目跟隨的項目
Debounce
透過抑制與已發射項目重複的項目
Distinct
如果它們緊接著是它們的重複項
DistinctUntilChanged
透過在它開始發射項目之後的一段時間內延遲我的訂閱
DelaySubscription
我希望僅在 Observable 是發射項目的第一個 Observable 時,才重新發射該項目
Amb
我想評估 Observable 發射的整個項目序列
並發射一個單一布林值,指示所有項目是否通過某項測試
All
並發射一個單一布林值,指示 Observable 是否發射了任何項目(通過某項測試)
Contains
並發射一個單一布林值,指示 Observable 是否發射了任何項目
IsEmpty
並發射一個單一布林值,指示該序列是否與第二個 Observable 發射的序列相同
SequenceEqual
並發射所有值的平均值
Average
並發射所有值的總和
Sum
並發射一個指示序列中有多少項目的數字
Count
並發射具有最大值的項目
Max
並發射具有最小值的項目
Min
透過將聚合函式依次套用到每個項目並發射結果
Scan
我想將 Observable 發射的整個項目序列轉換為其他資料結構
To
我希望運算子在特定的 排程器上運作
SubscribeOn
當它通知觀察者時
ObserveOn
我希望 Observable 在發生某些事件時呼叫特定動作
Do
我希望 Observable 將錯誤通知觀察者
Throw
如果指定的時間段過去而沒有發射項目
Timeout
我希望 Observable 正常復原
從逾時切換到備份 Observable
Timeout
從上游錯誤通知
Catch
透過嘗試重新訂閱上游 Observable
Retry
我想建立一個具有與 Observable 相同生命週期的資源
Using
我想訂閱 Observable 並接收一個會阻塞直到 Observable 完成的 Future
Start
我希望 Observable 在被要求之前不開始向訂閱者發射項目
Publish
然後只發射其序列中的最後一個項目
PublishLast
然後發射完整序列,即使是那些在序列開始後才訂閱的人也是如此
Replay
但我希望它在所有訂閱者取消訂閱後消失
RefCount
然後我想要要求它開始
連線

另請參閱

Observable 運算子的字母順序列表

規範、核心運算子名稱以粗體表示。其他條目代表這些運算子的特定語言變體或主要 ReactiveX 核心運算子集合之外的特殊運算子。