GroupBy

將一個 Observable 分割成一組 Observables,每個 Observable 發射來自原始 Observable 的不同子集的項目

GroupBy

GroupBy 運算子將發射項目的 Observable 分割成一個發射 Observables 的 Observable,其中每個 Observable 發射來自原始來源 Observable 的一些子集的項目。哪個項目最終出現在哪個 Observable 上,通常由一個判別函數決定,該函數評估每個項目並為其分配一個鍵。所有具有相同鍵的項目都由同一個 Observable 發射。

另請參閱

特定語言資訊

groupBy

RxGroovy 實作了 groupBy 運算子。它返回的 Observable 發射特定 Observable 子類別的項目 — GroupedObservable。實作 GroupedObservable 介面的物件還有一個額外的方法 — getkey — 您可以透過該方法檢索指定這個特定 GroupedObservable 項目的鍵。

以下範例程式碼使用 groupBy 將數字列表轉換為兩個列表,按數字是否為偶數分組

範例程式碼

def numbers = Observable.from([1, 2, 3, 4, 5, 6, 7, 8, 9]);
def groupFunc = { return(0 == (it % 2)); };

numbers.groupBy(groupFunc).flatMap({ it.reduce([it.getKey()], {a, b -> a << b}) }).subscribe(
  { println(it); },                          // onNext
  { println("Error: " + it.getMessage()); }, // onError
  { println("Sequence complete"); }          // onCompleted
);
[false, 1, 3, 5, 7, 9]
[true, 2, 4, 6, 8]
Sequence complete

另一個版本的 groupBy 允許您傳入一個轉換函數,該函數會變更元素,然後再由產生的 GroupedObservable 發射。

請注意,當 groupBy 將來源 Observable 分割成一個發射 GroupedObservable 的 Observable 時,每個 GroupedObservable 都會開始緩衝它在訂閱時將發射的項目。因此,如果您忽略任何這些 GroupedObservable(您既不訂閱它也不對它應用會訂閱它的運算子),這個緩衝區將會出現潛在的記憶體洩漏。因此,您不應該忽略您沒有興趣觀察的 GroupedObservable,而應該對它應用像 take(0) 這樣的運算子,以表示它可以丟棄其緩衝區。

如果您取消訂閱其中一個 GroupedObservable,或者如果您對 GroupedObservable 應用的像 take 這樣的運算子取消訂閱它,則該 GroupedObservable 將會終止。如果來源 Observable 後來發射的項目其鍵與以此方式終止的 GroupedObservable 相符,則 groupBy 將會建立並發射一個新的 GroupedObservable 來匹配該鍵。換句話說,取消訂閱 GroupedObservable不會導致 groupBy 吞掉其群組中的項目。例如,請參閱以下程式碼

範例程式碼

Observable.range(1,5)
          .groupBy({ 0 })
          .flatMap({ this.take(1) })
          .subscribe(
  { println(it); },                          // onNext
  { println("Error: " + it.getMessage()); }, // onError
  { println("Sequence complete"); }          // onCompleted
);
1
2
3
4
5

在上面的程式碼中,來源 Observable 發射序列 { 1 2 3 4 5 }。當它發射此序列中的第一個項目時,groupBy 運算子會建立並發射一個鍵為 0GroupedObservableflatMap 運算子將 take(1) 運算子應用於該 GroupedObservable,這使其發射它發射的項目 (1),並且也會取消訂閱該 GroupedObservable,該 GroupedObservable 會被終止。當來源 Observable 發射其序列中的第二個項目時,groupBy 運算子會建立並發射一個具有相同鍵 (0) 的第二個 GroupedObservable 來取代被終止的那個。flatMap 再次將 take(1) 應用於這個新的 GroupedObservable,以檢索要發射的新項目 (2),並取消訂閱和終止該 GroupedObservable,並且此過程會對來源序列中的剩餘項目重複。

groupBy 預設情況下不會在任何特定的 Scheduler 上執行。

groupBy

RxJava 實作了 groupBy 運算子。它返回的 Observable 發射特定 Observable 子類別的項目 — GroupedObservable。實作 GroupedObservable 介面的物件還有一個額外的方法 — getkey — 您可以透過該方法檢索指定這個特定 GroupedObservable 項目的鍵。

另一個版本的 groupBy 允許您傳入一個轉換函數,該函數會變更元素,然後再由產生的 GroupedObservable 發射。

請注意,當 groupBy 將來源 Observable 分割成一個發射 GroupedObservable 的 Observable 時,每個 GroupedObservable 都會開始緩衝它在訂閱時將發射的項目。因此,如果您忽略任何這些 GroupedObservable(您既不訂閱它也不對它應用會訂閱它的運算子),這個緩衝區將會出現潛在的記憶體洩漏。因此,您不應該忽略您沒有興趣觀察的 GroupedObservable,而應該對它應用像 take(0) 這樣的運算子,以表示它可以丟棄其緩衝區。

如果您取消訂閱其中一個 GroupedObservable,則該 GroupedObservable 將會終止。如果來源 Observable 後來發射的項目其鍵與以此方式終止的 GroupedObservable 相符,則 groupBy 將會建立並發射一個新的 GroupedObservable 來匹配該鍵。

groupBy 預設情況下不會在任何特定的 Scheduler 上執行。

groupBy

RxJS 實作了 groupBy。它接受一到三個參數

  1. (必要)一個接受來自來源 Observable 的項目的函數,並返回其鍵
  2. 一個接受來自來源 Observable 的項目的函數,並返回一個項目,該項目將被產生的 Observables 之一替換發射
  3. 一個用於比較兩個鍵是否相同的函數(也就是說,具有兩個鍵的項目是否應在同一個 Observable 上發射)

範例程式碼

var codes = [
    { keyCode: 38}, // up
    { keyCode: 38}, // up
    { keyCode: 40}, // down
    { keyCode: 40}, // down
    { keyCode: 37}, // left
    { keyCode: 39}, // right
    { keyCode: 37}, // left
    { keyCode: 39}, // right
    { keyCode: 66}, // b
    { keyCode: 65}  // a
];

var source = Rx.Observable.fromArray(codes)
    .groupBy(
        function (x) { return x.keyCode; },
        function (x) { return x.keyCode; });

var subscription = source.subscribe(
    function (obs) {
        // Print the count
        obs.count().subscribe(function (x) {
            console.log('Count: ' + x);
        });
    },
    function (err) {
        console.log('Error: ' + err);
    },
    function () {
        console.log('Completed');
    });
Count: 2
Count: 2
Count: 2
Count: 2
Count: 1
Count: 1
Completed

在以下每個發行版中都可以找到 groupBy

  • rx.all.js
  • rx.all.compat.js
  • rx.coincidence.js
groupByUntil

RxJS 也實作了 groupByUntil。它會監視一個額外的 Observable,並且每當該 Observable 發射一個項目時,它就會關閉它已開啟的任何鍵控 Observable(如果來源 Observable 發射了其他符合該鍵的項目,它將會開啟新的 Observable)。groupByUntil 接受兩個到四個參數

  1. (必要)一個接受來自來源 Observable 的項目的函數,並返回其鍵
  2. 一個接受來自來源 Observable 的項目的函數,並返回一個項目,該項目將被產生的 Observables 之一替換發射
  3. (必要)一個返回 Observable 的函數,其發射會觸發任何開啟的 Observables 的終止
  4. 一個用於比較兩個鍵是否相同的函數(也就是說,具有兩個鍵的項目是否應在同一個 Observable 上發射)

範例程式碼

var codes = [
    { keyCode: 38}, // up
    { keyCode: 38}, // up
    { keyCode: 40}, // down
    { keyCode: 40}, // down
    { keyCode: 37}, // left
    { keyCode: 39}, // right
    { keyCode: 37}, // left
    { keyCode: 39}, // right
    { keyCode: 66}, // b
    { keyCode: 65}  // a
];

var source = Rx.Observable
    .for(codes, function (x) { return Rx.Observable.return(x).delay(1000); })
    .groupByUntil(
        function (x) { return x.keyCode; },
        function (x) { return x.keyCode; },
        function (x) { return Rx.Observable.timer(2000); });

var subscription = source.subscribe(
    function (obs) {
        // Print the count
        obs.count().subscribe(function (x) { console.log('Count: ' + x); });
    },
    function (err) {
        console.log('Error: ' + err);
    },
    function () {
        console.log('Completed');
    });
Count: 2
Count: 2
Count: 1
Count: 1
Count: 1
Count: 1
Count: 1
Count: 1
Completed

在以下每個發行版中都可以找到 groupByUntil

  • rx.all.js
  • rx.all.compat.js
  • rx.coincidence.js

RxPHP 將此運算子實作為 groupBy

根據指定的鍵選取器函數和比較器來分組可觀察序列的元素,並使用指定的函數選取產生的元素。

範例程式碼

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/groupBy/groupBy.php

$observable = \Rx\Observable::fromArray([21, 42, 21, 42, 21, 42]);
$observable
    ->groupBy(
        function ($elem) {
            if ($elem === 42) {
                return 0;
            }

            return 1;
        },
        null,
        function ($key) {
            return $key;
        }
    )
    ->subscribe(function ($groupedObserver) use ($createStdoutObserver) {
        $groupedObserver->subscribe($createStdoutObserver($groupedObserver->getKey() . ": "));
    });

   
1: Next value: 21
0: Next value: 42
1: Next value: 21
0: Next value: 42
1: Next value: 21
0: Next value: 42
1: Complete!
0: Complete!
    

RxPHP 也有一個 groupByUntil 運算子。

根據指定的鍵選取器函數和比較器來分組可觀察序列的元素,並使用指定的函數選取產生的元素。

範例程式碼

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/groupBy/groupByUntil.php

$codes = [
    ['id' => 38],
    ['id' => 38],
    ['id' => 40],
    ['id' => 40],
    ['id' => 37],
    ['id' => 39],
    ['id' => 37],
    ['id' => 39],
    ['id' => 66],
    ['id' => 65]
];

$source = Rx\Observable
    ::fromArray($codes)
    ->concatMap(function ($x) {
        return \Rx\Observable::timer(100)->mapTo($x);
    })
    ->groupByUntil(
        function ($x) {
            return $x['id'];
        },
        function ($x) {
            return $x['id'];
        },
        function ($x) {
            return Rx\Observable::timer(200);
        });

$subscription = $source->subscribe(new CallbackObserver(
    function (\Rx\Observable $obs) {
        // Print the count
        $obs->count()->subscribe(new CallbackObserver(
            function ($x) {
                echo 'Count: ', $x, PHP_EOL;
            }));
    },
    function (Throwable $err) {
        echo 'Error', $err->getMessage(), PHP_EOL;
    },
    function () {
        echo 'Completed', PHP_EOL;
    }));


   
Count: 2
Count: 2
Count: 1
Count: 1
Count: 1
Count: 1
Count: 1
Count: 1
Completed
    

RxPHP 還有一個 partition 運算子。

返回兩個可觀察對象,它們透過給定的函數分割來源的觀察。第一個可觀察對象將觸發謂詞返回 true 的那些值的觀察。第二個可觀察對象將觸發謂詞返回 false 的那些值的觀察。謂詞針對每個訂閱的觀察者執行一次。兩者也會傳播來源產生的所有錯誤觀察,並且每個觀察都會在來源完成時完成。

範例程式碼

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/partition/partition.php

list($evens, $odds) = \Rx\Observable::range(0, 10, \Rx\Scheduler::getImmediate())
    ->partition(function ($x) {
        return $x % 2 === 0;
    });

//Because we used the immediate scheduler with range, the subscriptions are not asynchronous.
$evens->subscribe($createStdoutObserver('Evens '));
$odds->subscribe($createStdoutObserver('Odds '));

   
Evens Next value: 0
Evens Next value: 2
Evens Next value: 4
Evens Next value: 6
Evens Next value: 8
Evens Complete!
Odds Next value: 1
Odds Next value: 3
Odds Next value: 5
Odds Next value: 7
Odds Next value: 9
Odds Complete!
    

待定