FlatMap 運算子透過將您指定的函式應用於來源 Observable 發射的每個項目來轉換 Observable,該函式會返回一個本身會發射項目的 Observable。FlatMap 接著會合併這些結果 Observables 的發射,並將這些合併的結果作為其自身的序列發射。
例如,當您有一個 Observable 發射一系列本身具有 Observable 成員的項目,或可以其他方式轉換為 Observables 時,此方法非常有用,以便您可以建立一個新的 Observable,它會發射這些項目的子 Observables 發射的完整項目集合。
請注意,FlatMap 會合併這些 Observables 的發射,因此它們可能會交錯。
在一些特定於語言的實作中,還有一個運算子不會交錯來自轉換後的 Observables 的發射,而是以嚴格的順序發射這些發射,通常稱為 ConcatMap 或類似的名稱。
RxGroovy 實作了 flatMap
運算子。
// this closure is an Observable that emits three numbers numbers = Observable.from([1, 2, 3]); // this closure is an Observable that emits two numbers based on what number it is passed multiples = { n -> Observable.from([ n*2, n*3 ]) }; numbers.flatMap(multiples).subscribe( { println(it); }, // onNext { println("Error: " + it.getMessage()); }, // onError { println("Sequence complete"); } // onCompleted );
2 3 4 6 6 9 Sequence complete
請注意,如果由 flatMap
對來源 Observable 的項目對應的任何個別 Observables 透過呼叫 onError
中止,則 flatMap
產生的 Observable 本身會立即中止並呼叫 onError
。
此運算子的變體版本採用額外的 int
參數。此參數設定 flatMap
將嘗試對來源 Observable 發射的項目對應的 Observables 進行的最大並行訂閱數。當達到此最大數時,它將等待其中一個 Observables 終止,然後再訂閱另一個。
flatMap(Func1)
flatMap(Func1,int)
另一個版本的 flatMap
會為來源 Observable 的每個項目和通知建立 (和展平) 一個新的 Observable。
此運算子的變體版本採用額外的 int
參數。此參數設定 flatMap
將嘗試對來源 Observable 發射的項目對應的 Observables 進行的最大並行訂閱數。當達到此最大數時,它將等待其中一個 Observables 終止,然後再訂閱另一個。
flatMap(Func1,Func1,Func0)
flatMap(Func1,Func1,Func0,int)
另一個版本會將來自來源 Observable 的項目與這些來源項目觸發的 Observable 合併,並發射這些組合。
此運算子的變體版本採用額外的 int
參數。此參數設定 flatMap
將嘗試對來源 Observable 發射的項目對應的 Observables 進行的最大並行訂閱數。當達到此最大數時,它將等待其中一個 Observables 終止,然後再訂閱另一個。
flatMap(Func1,Func2)
flatMap(Func1,Func2,int)
flatMapIterable
變體配對來源項目和產生的 Iterable
,而不是來源項目和產生的 Observables,但其他方面的工作方式大致相同。
flatMapIterable(Func1)
flatMapIterable(Func1,Func2)
還有一個 concatMap
運算子,它類似於 flatMap
運算子的較簡單版本,但它會連接而不是合併結果 Observables,以產生其自身的序列。
concatMap(Func1)
RxGroovy 也實作了 switchMap
運算子。它的行為很像 flatMap
,除了每當來源 Observable 發射新項目時,它會取消訂閱並停止鏡像從先前發射的項目產生的 Observable,並且只開始鏡像目前的項目。
switchMap(Func1)
RxJava 實作了 flatMap
運算子。
請注意,如果由 flatMap
對來源 Observable 的項目對應的任何個別 Observables 透過呼叫 onError
中止,則 flatMap
產生的 Observable 本身會立即中止並呼叫 onError
。
此運算子的變體版本採用額外的 int
參數。此參數設定 flatMap
將嘗試對來源 Observable 發射的項目對應的 Observables 進行的最大並行訂閱數。當達到此最大數時,它將等待其中一個 Observables 終止,然後再訂閱另一個。
flatMap(Func1)
flatMap(Func1,int)
另一個版本的 flatMap
會為來源 Observable 的每個項目和通知建立 (和展平) 一個新的 Observable。
此運算子的變體版本採用額外的 int
參數。此參數設定 flatMap
將嘗試對來源 Observable 發射的項目對應的 Observables 進行的最大並行訂閱數。當達到此最大數時,它將等待其中一個 Observables 終止,然後再訂閱另一個。
flatMap(Func1,Func1,Func0)
flatMap(Func1,Func1,Func0,int)
另一個版本會將來自來源 Observable 的項目與這些來源項目觸發的 Observable 合併,並發射這些組合。
此運算子的變體版本採用額外的 int
參數。此參數設定 flatMap
將嘗試對來源 Observable 發射的項目對應的 Observables 進行的最大並行訂閱數。當達到此最大數時,它將等待其中一個 Observables 終止,然後再訂閱另一個。
flatMap(Func1,Func2)
flatMap(Func1,Func2,int)
flatMapIterable
變體配對來源項目和產生的 Iterable
,而不是來源項目和產生的 Observables,但其他方面的工作方式大致相同。
flatMapIterable(Func1)
flatMapIterable(Func1,Func2)
還有一個 concatMap
運算子,它類似於 flatMap
運算子的較簡單版本,但它會連接而不是合併結果 Observables,以產生其自身的序列。
concatMap(Func1)
RxJava 也實作了 switchMap
運算子。它的行為很像 flatMap
,除了每當來源 Observable 發射新項目時,它會取消訂閱並停止鏡像從先前發射的項目產生的 Observable,並且只開始鏡像目前的項目。
switchMap(Func1)
RxJS 有許多執行類似 FlatMap 操作的運算子。在 RxJS 中,將來源 Observable 發射的項目轉換為 Observables 的函式通常會將項目和項目在 Observable 序列中的索引作為參數。
RxJS 實作了基本的 flatMap
運算子。它有一個變體,允許您將轉換函式 (flatMap
的可選第二個參數) 應用於為來源 Observable 中的每個項目產生的 Observables 發射的項目,然後合併和發射這些項目。
如果您提供的函式將來自來源 Observables 的項目轉換為 Observables、Promise 或陣列,flatMap
的工作方式也一樣。
“selectMany
” 是 flatMap
的別名。
var source = Rx.Observable .range(1, 2) .selectMany(function (x) { return Rx.Observable.range(x, 2); }); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 1 Next: 2 Next: 2 Next: 3 Completed
// Using a promise var source = Rx.Observable.of(1,2,3,4) .selectMany(function (x, i) { return Promise.resolve(x + i); }); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 1 Next: 3 Next: 5 Next: 7 Completed
// Using an array Rx.Observable.of(1,2,3) .flatMap( function (x, i) { return [x,i]; }, function (x, y, ix, iy) { return x + y + ix + iy; } ); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 2 Next: 2 Next: 5 Next: 5 Next: 8 Next: 8 Completed
在以下每個發行版中都找到了 flatMap
rx.js
rx.all.js
rx.all.compat.js
rx.compat.js
rx.lite.js
rx.lite.compat.js
flatMapLatest
運算子的行為很像標準的 FlatMap 運算子,除了每當來源 Observable 發射新項目時,它會取消訂閱並停止鏡像從先前發射的項目產生的 Observable,並且只開始鏡像目前的項目。
“selectSwitch
” 是 flatMapLatest
的別名。
var source = Rx.Observable .range(1, 2) .flatMapLatest(function (x) { return Rx.Observable.range(x, 2); }); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 1 Next: 2 Next: 3 Completed
在以下每個發行版中都找到了 flatMapLatest
rx.js
rx.all.js
rx.all.compat.js
rx.compat.js
rx.lite.js
rx.lite.compat.js
flatMapObserver
會為來源 Observable 的每個項目和通知建立 (和展平) 一個新的 Observable。它接受不同的轉換函式來回應 onNext
、onError
和 onCompleted
通知,並為每個通知返回 Observable。
“selectManyObserver
” 是 flatMapObserver
的別名。
var source = Rx.Observable.range(1, 3) .flatMapObserver( function (x, i) { return Rx.Observable.repeat(x, i); }, function (err) { return Rx.Observable.return(42); }, function () { return Rx.Observable.empty(); }); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 2 Next: 3 Next: 3 Completed
在以下每個發行版中都找到了 flatMapObserver
rx.js
rx.all.js
rx.all.compat.js
rx.compat.js
還有一個 concatMap
運算子,它類似於 flatMap
運算子,但它會連接而不是合併結果 Observables,以產生其自身的序列。
與 flatMap
一樣,如果您提供的函式將來自來源 Observables 的項目轉換為 Observables、Promise 或陣列,concatMap
的工作方式也一樣。
“selectConcat
” 是 concatMap
的別名。
在以下每個發行版中都找到了 concatMap
rx.js
rx.all.js
rx.all.compat.js
rx.compat.js
rx.lite.js
rx.lite.compat.js
for
運算子 (及其別名 forIn
) 與 concatMap
非常相似,但它具有相反的靈活性。雖然 concatMap
在 Observable 來源上操作,並且可以使用 Observable、Promise 或陣列中介來產生其輸出序列;for
始終使用 Observables 作為其中介,但可以在來源是 Observable、Promise 或陣列上操作。
在以下每個發行版中都找到了 concatMap
rx.all.js
rx.all.compat.js
rx.experimental.js
(需要 rx.js
、rx.compat.js
、rx.lite.js
或 rx.lite.compat.js
其中之一)還有一個 concatMapObserver
運算子,它類似於 flatMapObserver
運算子,因為它會建立 Observables 來合併來自來源 Observable 的發射和終端通知,但它會連接而不是合併這些結果 Observables,以產生其自身的序列。
“selectConcatObserver
” 是 concatMapObserver
的別名。
在以下每個發行版中都找到了 concatMapObserver
rx.js
rx.all.js
rx.all.compat.js
rx.compat.js
manySelect
運算子通常被描述為「共單子綁定」。如果這讓您更清楚了,歡迎使用。否則,以下是解釋
manySelect
在內部將來源 Observable 發射的每個項目轉換為一個 Observable,該 Observable 會按相同的順序發射該項目和來源 Observable 後續發射的所有項目。因此,例如,它在內部將發射數字 1、2、3 的 Observable 轉換為三個 Observables:一個發射 1、2、3,一個發射 2、3,一個發射 3。
然後 manySelect
將這些 Observables 中的每一個傳遞到您提供的函式中,並發射來自 manySelect
返回的 Observable 的發射,以及這些函式呼叫的傳回值。
透過這種方式,結果 Observable 發射的每個項目都是來源 Observable 中相應項目及其之後來源 Observable 發射的所有項目的函式。
在以下每個發行版中都找到了 manySelect
rx.all.js
rx.all.compat.js
rx.experimental.js
manySelect
需要以下發行版之一
rx.js
rx.compat.js
rx.lite.js
rx.lite.compat.js
RxPHP 將此運算子實作為 flatMap
。
將可觀察序列的每個元素投影到可觀察序列,並將結果可觀察序列合併為一個可觀察序列。
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/flatMap/flatMap.php $observable = Rx\Observable::range(1, 2); $selectManyObservable = $observable->flatMap(function ($value) { return Rx\Observable::range($value, 2); }); $selectManyObservable->subscribe($stdoutObserver);
Next value: 1 Next value: 2 Next value: 2 Next value: 3 Complete!
RxPHP 還有一個運算子 flatMapTo
。
將來源可觀察序列的每個元素投影到其他可觀察序列,並將結果可觀察序列合併為一個可觀察序列。
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/concat/concatMapTo.php $obs = \Rx\Observable::interval(100) ->take(3) ->mapWithIndex(function ($i) { return $i; }); $source = Rx\Observable::range(0, 5) ->concatMapTo($obs); $subscription = $source->subscribe($stdoutObserver);
Next value: 0 Next value: 1 Next value: 2 Next value: 3 Next value: 4 Next value: 5 Next value: 6 Next value: 7 Next value: 8 Next value: 9 Next value: 10 Next value: 11 Next value: 12 Next value: 13 Next value: 14 Complete!
RxPHP 還有一個運算子 selectMany
。
flatMap 的別名
RxPHP 還有一個運算子 flatMapLatest
。
略過可觀察序列中指定數量的元素,然後傳回剩餘的元素。將 Observable 發出的項目轉換為 Observables,並鏡射由最近轉換的 Observable 所發出的那些項目。flatMapLatest 操作符類似於上述的 flatMap 和 concatMap 方法,然而,flatMapLatest 並非發出該操作符透過轉換來自來源 Observable 的項目所生成的所有 Observables 發出的所有項目,而是僅發出來自每個此類轉換後 Observable 的項目,直到發出下一個此類 Observable,然後它會忽略前一個並開始發出由新的 Observable 發出的項目。
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/flatMap/flatMapLatest.php $source = \Rx\Observable::range(1, 3) ->flatMapLatest(function ($x) { return \Rx\Observable::fromArray([$x . 'a', $x . 'b']); }); $source->subscribe($stdoutObserver);
Next value: 1a Next value: 2a Next value: 3a Next value: 3b Complete!
RxPHP 也有一個操作符 concatMap
。
將可觀察序列的每個元素投影到一個可觀察序列,並將產生的可觀察序列串聯成一個可觀察序列。
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/concat/concatMap.php $source = Rx\Observable::range(0, 5) ->concatMap(function ($x, $i) { return \Rx\Observable::interval(100) ->take($x) ->map(function () use ($i) { return $i; }); }); $subscription = $source->subscribe($stdoutObserver);
Next value: 1 Next value: 2 Next value: 2 Next value: 3 Next value: 3 Next value: 3 Next value: 4 Next value: 4 Next value: 4 Next value: 4 Complete!
RxPHP 也有一個操作符 concatMapTo
。
將來源可觀察序列的每個元素投影到其他可觀察序列,並將結果可觀察序列合併為一個可觀察序列。
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/concat/concatMapTo.php $obs = \Rx\Observable::interval(100) ->take(3) ->mapWithIndex(function ($i) { return $i; }); $source = Rx\Observable::range(0, 5) ->concatMapTo($obs); $subscription = $source->subscribe($stdoutObserver);
Next value: 0 Next value: 1 Next value: 2 Next value: 3 Next value: 4 Next value: 5 Next value: 6 Next value: 7 Next value: 8 Next value: 9 Next value: 10 Next value: 11 Next value: 12 Next value: 13 Next value: 14 Complete!