ReactiveX 的每個特定語言實作都實作了一組運算子。雖然實作之間有很多重疊之處,但也有些運算子僅在某些實作中實作。此外,每個實作都傾向於將其運算子命名為類似於該語言中其他上下文中已熟悉的類似方法。
大多數運算子都對 Observable 進行操作並返回一個 Observable。這允許你一個接一個地以鏈的形式應用這些運算子。鏈中的每個運算子都會修改前一個運算子操作產生的 Observable。
還有其他模式,例如 Builder 模式,其中特定類別的各種方法透過方法的操作修改該物件,對該類別的項目進行操作。這些模式也允許你以類似的方式鏈接方法。但是,雖然在 Builder 模式中,方法在鏈中出現的順序通常並不重要,但在 Observable 運算子中,順序很重要。
Observable 運算子鏈不會獨立地對源自該鏈的原始 Observable 進行操作,而是依次操作,每個運算子都對鏈中緊接在前一個運算子生成的 Observable 進行操作。
此頁面首先列出 ReactiveX 中可被視為「核心」運算子的內容,並連結到具有更深入資訊的頁面,說明這些運算子如何運作以及特定語言的 ReactiveX 版本如何實作這些運算子。
接下來是一個「決策樹」,可協助你選擇最適合你的使用案例的運算子。
最後,有一個字母順序的列表,列出 ReactiveX 的許多特定語言實作中可用的大多數運算子。這些連結到記錄最接近特定語言運算子的核心運算子的頁面(因此,例如,Rx.NET 的「SelectMany」運算子連結到 FlatMap ReactiveX 運算子的文件,其中「SelectMany」是 Rx.NET 的實作)。
如果你想實作自己的運算子,請參閱實作你自己的運算子。
產生新 Observables 的運算子。
Create
— 透過程式設計呼叫觀察者方法,從頭開始建立 ObservableDefer
— 在觀察者訂閱之前不要建立 Observable,並為每個觀察者建立一個新的 ObservableEmpty
/Never
/Throw
— 建立具有非常精確和有限行為的 ObservablesFrom
— 將其他物件或資料結構轉換為 ObservableInterval
— 建立一個 Observable,該 Observable 發出以特定時間間隔間隔的整數序列Just
— 將一個物件或一組物件轉換為發出該物件或那些物件的 ObservableRange
— 建立一個 Observable,該 Observable 發出一個連續的整數範圍Repeat
— 建立一個 Observable,該 Observable 重複發出特定項目或項目序列Start
— 建立一個 Observable,該 Observable 發出函數的傳回值Timer
— 建立一個 Observable,該 Observable 在給定的延遲後發出單個項目轉換 Observable 發出的項目的運算子。
Buffer
— 定期從 Observable 收集項目到捆綁包中,並發出這些捆綁包,而不是一次發出一個項目FlatMap
— 將 Observable 發出的項目轉換為 Observables,然後將這些項目的發射展平為單個 ObservableGroupBy
— 將 Observable 分成一組 Observables,每個 Observable 發出原始 Observable 的不同項目組,按鍵組織Map
— 透過對每個項目應用函數來轉換 Observable 發出的項目Scan
— 依序將函數應用於 Observable 發出的每個項目,並發出每個後續值Window
— 定期將 Observable 的項目細分為 Observable 視窗,並發出這些視窗,而不是一次發出一個項目有選擇地從來源 Observable 發出項目的運算子。
Debounce
— 僅當特定時間範圍過去而沒有發出另一個項目時,才從 Observable 發出一個項目Distinct
— 抑制 Observable 發出的重複項目ElementAt
— 僅發出 Observable 發出的第 n 個項目Filter
— 僅從 Observable 發出通過謂詞測試的那些項目First
— 僅從 Observable 發出第一個項目,或符合條件的第一個項目IgnoreElements
— 不從 Observable 發出任何項目,但鏡像其終止通知Last
— 僅發出 Observable 發出的最後一個項目Sample
— 在週期性時間間隔內發出 Observable 發出的最新項目Skip
— 抑制 Observable 發出的前 n 個項目SkipLast
— 抑制 Observable 發出的最後 n 個項目Take
— 僅發出 Observable 發出的前 n 個項目TakeLast
— 僅發出 Observable 發出的最後 n 個項目使用多個來源 Observables 來建立單個 Observable 的運算子
And
/Then
/When
— 透過 Pattern
和 Plan
中介合併由兩個或多個 Observables 發出的一組項目CombineLatest
— 當兩個 Observables 中的任何一個發出一個項目時,透過指定的函數合併每個 Observable 發出的最新項目,並根據此函數的結果發出項目Join
— 當一個 Observable 的項目在根據另一個 Observable 發出的項目定義的時間視窗內發出時,合併兩個 Observables 發出的項目Merge
— 透過合併其發射,將多個 Observables 合併為一個StartWith
— 在開始發出來源 Observable 的項目之前,發出指定的項目序列Switch
— 將發出 Observables 的 Observable 轉換為單個 Observable,該 Observable 發出最近發出的那些 Observables 發出的項目Zip
— 透過指定的函數合併多個 Observables 的發射,並根據此函數的結果為每個組合發出單個項目協助從 Observable 的錯誤通知中復原的運算子
Catch
— 透過在沒有錯誤的情況下繼續該序列,從 onError
通知中復原Retry
— 如果來源 Observable 發送 onError
通知,則重新訂閱它,希望它在沒有錯誤的情況下完成用於處理 Observables 的有用運算子工具箱
Delay
— 將 Observable 的發射時間向前移動特定的量Do
— 註冊在各種 Observable 生命周期事件中採取的動作Materialize
/Dematerialize
— 表示發出的項目和發送的通知都作為發出的項目,或反轉此過程ObserveOn
— 指定觀察者將在此 Observable 上觀察的排程器Serialize
— 強制 Observable 進行序列化呼叫並表現良好Subscribe
— 對來自 Observable 的發射和通知執行操作SubscribeOn
— 指定 Observable 在訂閱時應使用的排程器TimeInterval
— 將發出項目的 Observable 轉換為發出這些發射之間經過的時間量的指示的 ObservableTimeout
— 鏡像來源 Observable,但如果在沒有發出任何項目的情況下經過特定時間段,則發出錯誤通知Timestamp
— 將時間戳記附加到 Observable 發出的每個項目Using
— 建立與 Observable 具有相同生命週期的可釋放資源評估一個或多個 Observables 或由 Observables 發出的項目的運算子
All
— 確定 Observable 發出的所有項目是否符合某些條件Amb
— 給定兩個或多個來源 Observables,僅發出來自這些 Observables 中第一個發出項目的所有項目Contains
— 確定 Observable 是否發出特定項目DefaultIfEmpty
— 發出來源 Observable 的項目,如果來源 Observable 沒有發出任何項目,則發出預設項目SequenceEqual
— 確定兩個 Observables 是否發出相同的項目序列SkipUntil
— 丟棄 Observable 發出的項目,直到第二個 Observable 發出項目SkipWhile
— 丟棄 Observable 發出的項目,直到指定條件變為 falseTakeUntil
— 在第二個 Observable 發射項目或終止後,捨棄 Observable 發射的項目TakeWhile
— 在指定條件變為 false 後,捨棄 Observable 發射的項目對 Observable 發射的整個項目序列進行操作的運算子
Average
— 計算 Observable 發射的數字的平均值並發射此平均值Concat
— 發射兩個或多個 Observables 的發射項,而不將它們交錯Count
— 計算來源 Observable 發射的項目數量,並僅發射此值Max
— 確定並發射 Observable 發射的最大值項目Min
— 確定並發射 Observable 發射的最小值項目Reduce
— 依序將函式套用至 Observable 發射的每個項目,並發射最終值Sum
— 計算 Observable 發射的數字總和並發射此總和具有更精確控制的訂閱動態的特殊 Observable
Connect
— 指示可連線的 Observable 開始向其訂閱者發射項目Publish
— 將普通 Observable 轉換為可連線的 ObservableRefCount
— 使可連線的 Observable 的行為像普通 ObservableReplay
— 確保所有觀察者看到相同的發射項目序列,即使他們在 Observable 開始發射項目後才訂閱To
— 將 Observable 轉換為另一個物件或資料結構此樹狀結構可以幫助您找到您正在尋找的 ReactiveX Observable 運算子。
Pattern
和 Plan
中介Notification
物件中Future
規範、核心運算子名稱以粗體表示。其他條目代表這些運算子的特定語言變體或主要 ReactiveX 核心運算子集合之外的特殊運算子。
聚合
All
Amb
ambArray
ambWith
and_
And
Any
apply
as_blocking
asObservable
AssertEqual
asyncAction
asyncFunc
Average
averageDouble
averageFloat
averageInteger
averageLong
blocking
blockingFirst
blockingForEach
blockingIterable
blockingLast
blockingLatest
blockingMostRecent
blockingNext
blockingSingle
blockingSubscribe
Buffer
bufferWithCount
bufferWithTime
bufferWithTimeOrCount
byLine
cache
cacheWithInitialCapacity
case
Cast
Catch
catchError
catchException
collect
collect
(RxScala 版本的 Filter
)collectInto
CombineLatest
combineLatestDelayError
combineLatestWith
Concat
concat_all
concatAll
concatArray
concatArrayDelayError
concatArrayEager
concatDelayError
concatEager
concatMap
concatMapDelayError
concatMapEager
concatMapEagerDelayError
concatMapIterable
concatMapObserver
concatMapTo
concatWith
連線
connect_forever
cons
Contains
controlled
Count
countLong
Create
cycle
Debounce
decode
DefaultIfEmpty
Defer
deferFuture
Delay
delaySubscription
delayWithSelector
Dematerialize
Distinct
distinctKey
distinctUntilChanged
distinctUntilKeyChanged
Do
doAction
doAfterTerminate
doOnComplete
doOnCompleted
doOnDispose
doOnEach
doOnError
doOnLifecycle
doOnNext
doOnRequest
doOnSubscribe
doOnTerminate
doOnUnsubscribe
doseq
doWhile
drop
dropRight
dropUntil
dropWhile
ElementAt
ElementAtOrDefault
Empty
emptyObservable
empty?
encode
ensures
error
every
exclusive
exists
expand
failWith
Filter
filterNot
Finally
finallyAction
finallyDo
find
findIndex
First
firstElement
FirstOrDefault
firstOrElse
FlatMap
flatMapFirst
flatMapIterable
flatMapIterableWith
flatMapLatest
flatMapObserver
flatMapWith
flatMapWithMaxConcurrent
flat_map_with_index
flatten
flattenDelayError
foldl
foldLeft
for
forall
ForEach
forEachFuture
forEachWhile
forIn
forkJoin
From
fromAction
fromArray
FromAsyncPattern
fromCallable
fromCallback
FromEvent
FromEventPattern
fromFunc0
fromFuture
fromIterable
fromIterator
from_list
fromNodeCallback
fromPromise
fromPublisher
fromRunnable
Generate
generateWithAbsoluteTime
generateWithRelativeTime
generator
GetEnumerator
getIterator
GroupBy
GroupByUntil
GroupJoin
head
headOption
headOrElse
if
ifThen
IgnoreElements
indexOf
interleave
interpose
Interval
intervalRange
into
isEmpty
items
Join
join
(字串)jortSort
jortSortUntil
Just
keep
keep-indexed
Last
lastElement
lastOption
LastOrDefault
lastOrElse
Latest
latest
(Rx.rb 版本的 Switch
)length
let
letBind
lift
limit
LongCount
ManySelect
Map
map
(RxClojure 版本的 Zip
)MapCat
mapCat
(RxClojure 版本的 Zip
)map-indexed
mapTo
mapWithIndex
Materialize
Max
MaxBy
Merge
mergeAll
mergeArray
mergeArrayDelayError
merge_concurrent
mergeDelayError
mergeObservable
mergeWith
Min
MinBy
MostRecent
Multicast
multicastWithSelector
nest
Never
Next
Next
(BlockingObservable 版本)none
nonEmpty
nth
ObserveOn
ObserveOnDispatcher
observeSingleOn
of
of_array
ofArrayChanges
of_enumerable
of_enumerator
ofObjectChanges
OfType
ofWithScheduler
onBackpressureBlock
onBackpressureBuffer
onBackpressureDrop
OnErrorResumeNext
onErrorReturn
onErrorReturnItem
onExceptionResumeNext
onTerminateDetach
orElse
pairs
pairwise
partition
partition-all
pausable
pausableBuffered
pluck
product
Publish
PublishLast
publish_synchronized
publishValue
raise_error
Range
Reduce
reduceWith
reductions
RefCount
Repeat
repeat_infinitely
repeatUntil
repeatWhen
Replay
rescue_error
rest
Retry
retry_infinitely
retryUntil
retryWhen
Return
returnElement
returnValue
runAsync
safeSubscribe
Sample
Scan
scanWith
scope
Select
(Map
的替代名稱)select
(Filter
的替代名稱)selectConcat
selectConcatObserver
SelectMany
selectManyObserver
select_switch
selectSwitch
selectSwitchFirst
selectWithMaxConcurrent
select_with_index
seq
SequenceEqual
sequence_eql?
SequenceEqualWith
Serialize
share
shareReplay
shareValue
Single
singleElement
SingleOrDefault
singleOption
singleOrElse
size
Skip
SkipLast
skipLastWithTime
SkipUntil
skipUntilWithTime
SkipWhile
skipWhileWithIndex
skip_with_time
slice
sliding
slidingBuffer
some
sort
sorted
sort-by
sorted-list-by
split
split-with
Start
startAsync
startFuture
StartWith
startWithArray
stringConcat
stopAndWait
subscribe
subscribeActual
SubscribeOn
SubscribeOnDispatcher
subscribeOnCompleted
subscribeOnError
subscribeOnNext
subscribeWith
Sum
sumDouble
sumFloat
sumInteger
sumLong
Switch
switchCase
switchIfEmpty
switchLatest
switchMap
switchMapDelayError
switchOnNext
switchOnNextDelayError
Synchronize
Take
take_with_time
takeFirst
TakeLast
takeLastBuffer
takeLastBufferWithTime
takeLastWithTime
takeRight
(另請參閱:TakeLast
)TakeUntil
takeUntilWithTime
TakeWhile
takeWhileWithIndex
tail
tap
tapOnCompleted
tapOnError
tapOnNext
Then
thenDo
Throttle
throttleFirst
throttleLast
throttleLatest
throttleWithSelector
throttleWithTimeout
Throw
throwError
throwException
TimeInterval
Timeout
timeoutWithSelector
Timer
Timestamp
To
to_a
ToArray
ToAsync
toBlocking
toBuffer
to_dict
ToDictionary
ToEnumerable
ToEvent
ToEventPattern
ToFlowable
ToFuture
to_h
toIndexedSeq
toIterable
toIterator
ToList
ToLookup
toMap
toMultiMap
ToObservable
toSet
toSortedList
toStream
ToTask
toTraversable
toVector
tumbling
tumblingBuffer
unsafeCreate
unsubscribeOn
Using
When
Where
while
whileDo
Window
windowWithCount
windowWithTime
windowWithTimeOrCount
windowed
withFilter
withLatestFrom
Zip
zipArray
zipIterable
zipWith
zipWithIndex
++
+:
:+