GroupBy 運算子將發射項目的 Observable 分割成一個發射 Observables 的 Observable,其中每個 Observable 發射來自原始來源 Observable 的一些子集的項目。哪個項目最終出現在哪個 Observable 上,通常由一個判別函數決定,該函數評估每個項目並為其分配一個鍵。所有具有相同鍵的項目都由同一個 Observable 發射。
RxGroovy 實作了 groupBy
運算子。它返回的 Observable 發射特定 Observable 子類別的項目 — GroupedObservable
。實作 GroupedObservable
介面的物件還有一個額外的方法 — getkey
— 您可以透過該方法檢索指定這個特定 GroupedObservable
項目的鍵。
以下範例程式碼使用 groupBy
將數字列表轉換為兩個列表,按數字是否為偶數分組
def numbers = Observable.from([1, 2, 3, 4, 5, 6, 7, 8, 9]); def groupFunc = { return(0 == (it % 2)); }; numbers.groupBy(groupFunc).flatMap({ it.reduce([it.getKey()], {a, b -> a << b}) }).subscribe( { println(it); }, // onNext { println("Error: " + it.getMessage()); }, // onError { println("Sequence complete"); } // onCompleted );
[false, 1, 3, 5, 7, 9] [true, 2, 4, 6, 8] Sequence complete
另一個版本的 groupBy
允許您傳入一個轉換函數,該函數會變更元素,然後再由產生的 GroupedObservable
發射。
請注意,當 groupBy
將來源 Observable 分割成一個發射 GroupedObservable
的 Observable 時,每個 GroupedObservable
都會開始緩衝它在訂閱時將發射的項目。因此,如果您忽略任何這些 GroupedObservable
(您既不訂閱它也不對它應用會訂閱它的運算子),這個緩衝區將會出現潛在的記憶體洩漏。因此,您不應該忽略您沒有興趣觀察的 GroupedObservable
,而應該對它應用像 take(0)
這樣的運算子,以表示它可以丟棄其緩衝區。
如果您取消訂閱其中一個 GroupedObservable
,或者如果您對 GroupedObservable
應用的像 take
這樣的運算子取消訂閱它,則該 GroupedObservable
將會終止。如果來源 Observable 後來發射的項目其鍵與以此方式終止的 GroupedObservable
相符,則 groupBy
將會建立並發射一個新的 GroupedObservable
來匹配該鍵。換句話說,取消訂閱 GroupedObservable
將不會導致 groupBy
吞掉其群組中的項目。例如,請參閱以下程式碼
Observable.range(1,5) .groupBy({ 0 }) .flatMap({ this.take(1) }) .subscribe( { println(it); }, // onNext { println("Error: " + it.getMessage()); }, // onError { println("Sequence complete"); } // onCompleted );
1 2 3 4 5
在上面的程式碼中,來源 Observable 發射序列 { 1 2 3 4 5 }
。當它發射此序列中的第一個項目時,groupBy
運算子會建立並發射一個鍵為 0
的 GroupedObservable
。flatMap
運算子將 take(1)
運算子應用於該 GroupedObservable
,這使其發射它發射的項目 (1
),並且也會取消訂閱該 GroupedObservable
,該 GroupedObservable
會被終止。當來源 Observable 發射其序列中的第二個項目時,groupBy
運算子會建立並發射一個具有相同鍵 (0
) 的第二個 GroupedObservable
來取代被終止的那個。flatMap
再次將 take(1)
應用於這個新的 GroupedObservable
,以檢索要發射的新項目 (2
),並取消訂閱和終止該 GroupedObservable
,並且此過程會對來源序列中的剩餘項目重複。
groupBy
預設情況下不會在任何特定的 Scheduler 上執行。
groupBy(Func1)
groupBy(Func1,Func1)
RxJava 實作了 groupBy
運算子。它返回的 Observable 發射特定 Observable 子類別的項目 — GroupedObservable
。實作 GroupedObservable
介面的物件還有一個額外的方法 — getkey
— 您可以透過該方法檢索指定這個特定 GroupedObservable
項目的鍵。
另一個版本的 groupBy
允許您傳入一個轉換函數,該函數會變更元素,然後再由產生的 GroupedObservable
發射。
請注意,當 groupBy
將來源 Observable 分割成一個發射 GroupedObservable
的 Observable 時,每個 GroupedObservable
都會開始緩衝它在訂閱時將發射的項目。因此,如果您忽略任何這些 GroupedObservable
(您既不訂閱它也不對它應用會訂閱它的運算子),這個緩衝區將會出現潛在的記憶體洩漏。因此,您不應該忽略您沒有興趣觀察的 GroupedObservable
,而應該對它應用像 take(0)
這樣的運算子,以表示它可以丟棄其緩衝區。
如果您取消訂閱其中一個 GroupedObservable
,則該 GroupedObservable
將會終止。如果來源 Observable 後來發射的項目其鍵與以此方式終止的 GroupedObservable
相符,則 groupBy
將會建立並發射一個新的 GroupedObservable
來匹配該鍵。
groupBy
預設情況下不會在任何特定的 Scheduler 上執行。
groupBy(Func1)
groupBy(Func1,Func1)
RxJS 實作了 groupBy
。它接受一到三個參數
var codes = [ { keyCode: 38}, // up { keyCode: 38}, // up { keyCode: 40}, // down { keyCode: 40}, // down { keyCode: 37}, // left { keyCode: 39}, // right { keyCode: 37}, // left { keyCode: 39}, // right { keyCode: 66}, // b { keyCode: 65} // a ]; var source = Rx.Observable.fromArray(codes) .groupBy( function (x) { return x.keyCode; }, function (x) { return x.keyCode; }); var subscription = source.subscribe( function (obs) { // Print the count obs.count().subscribe(function (x) { console.log('Count: ' + x); }); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Count: 2 Count: 2 Count: 2 Count: 2 Count: 1 Count: 1 Completed
在以下每個發行版中都可以找到 groupBy
rx.all.js
rx.all.compat.js
rx.coincidence.js
RxJS 也實作了 groupByUntil
。它會監視一個額外的 Observable,並且每當該 Observable 發射一個項目時,它就會關閉它已開啟的任何鍵控 Observable(如果來源 Observable 發射了其他符合該鍵的項目,它將會開啟新的 Observable)。groupByUntil
接受兩個到四個參數
var codes = [ { keyCode: 38}, // up { keyCode: 38}, // up { keyCode: 40}, // down { keyCode: 40}, // down { keyCode: 37}, // left { keyCode: 39}, // right { keyCode: 37}, // left { keyCode: 39}, // right { keyCode: 66}, // b { keyCode: 65} // a ]; var source = Rx.Observable .for(codes, function (x) { return Rx.Observable.return(x).delay(1000); }) .groupByUntil( function (x) { return x.keyCode; }, function (x) { return x.keyCode; }, function (x) { return Rx.Observable.timer(2000); }); var subscription = source.subscribe( function (obs) { // Print the count obs.count().subscribe(function (x) { console.log('Count: ' + x); }); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Count: 2 Count: 2 Count: 1 Count: 1 Count: 1 Count: 1 Count: 1 Count: 1 Completed
在以下每個發行版中都可以找到 groupByUntil
rx.all.js
rx.all.compat.js
rx.coincidence.js
RxPHP 將此運算子實作為 groupBy
。
根據指定的鍵選取器函數和比較器來分組可觀察序列的元素,並使用指定的函數選取產生的元素。
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/groupBy/groupBy.php $observable = \Rx\Observable::fromArray([21, 42, 21, 42, 21, 42]); $observable ->groupBy( function ($elem) { if ($elem === 42) { return 0; } return 1; }, null, function ($key) { return $key; } ) ->subscribe(function ($groupedObserver) use ($createStdoutObserver) { $groupedObserver->subscribe($createStdoutObserver($groupedObserver->getKey() . ": ")); });
1: Next value: 21 0: Next value: 42 1: Next value: 21 0: Next value: 42 1: Next value: 21 0: Next value: 42 1: Complete! 0: Complete!
RxPHP 也有一個 groupByUntil
運算子。
根據指定的鍵選取器函數和比較器來分組可觀察序列的元素,並使用指定的函數選取產生的元素。
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/groupBy/groupByUntil.php $codes = [ ['id' => 38], ['id' => 38], ['id' => 40], ['id' => 40], ['id' => 37], ['id' => 39], ['id' => 37], ['id' => 39], ['id' => 66], ['id' => 65] ]; $source = Rx\Observable ::fromArray($codes) ->concatMap(function ($x) { return \Rx\Observable::timer(100)->mapTo($x); }) ->groupByUntil( function ($x) { return $x['id']; }, function ($x) { return $x['id']; }, function ($x) { return Rx\Observable::timer(200); }); $subscription = $source->subscribe(new CallbackObserver( function (\Rx\Observable $obs) { // Print the count $obs->count()->subscribe(new CallbackObserver( function ($x) { echo 'Count: ', $x, PHP_EOL; })); }, function (Throwable $err) { echo 'Error', $err->getMessage(), PHP_EOL; }, function () { echo 'Completed', PHP_EOL; }));
Count: 2 Count: 2 Count: 1 Count: 1 Count: 1 Count: 1 Count: 1 Count: 1 Completed
RxPHP 還有一個 partition
運算子。
返回兩個可觀察對象,它們透過給定的函數分割來源的觀察。第一個可觀察對象將觸發謂詞返回 true 的那些值的觀察。第二個可觀察對象將觸發謂詞返回 false 的那些值的觀察。謂詞針對每個訂閱的觀察者執行一次。兩者也會傳播來源產生的所有錯誤觀察,並且每個觀察都會在來源完成時完成。
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/partition/partition.php list($evens, $odds) = \Rx\Observable::range(0, 10, \Rx\Scheduler::getImmediate()) ->partition(function ($x) { return $x % 2 === 0; }); //Because we used the immediate scheduler with range, the subscriptions are not asynchronous. $evens->subscribe($createStdoutObserver('Evens ')); $odds->subscribe($createStdoutObserver('Odds '));
Evens Next value: 0 Evens Next value: 2 Evens Next value: 4 Evens Next value: 6 Evens Next value: 8 Evens Complete! Odds Next value: 1 Odds Next value: 3 Odds Next value: 5 Odds Next value: 7 Odds Next value: 9 Odds Complete!
待定