Zip

透過指定函數將多個 Observable 的發射項目組合在一起,並根據此函數的結果,針對每個組合發射單個項目。

Zip 方法會回傳一個 Observable,它會將您選擇的函數應用於兩個(或更多)其他 Observables 按順序發射的項目的組合,而此函數的結果將成為回傳的 Observable 發射的項目。它會嚴格按順序應用此函數,因此新 Observable 發射的第一個項目將是函數應用於 Observable #1 發射的第一個項目和 Observable #2 發射的第一個項目的結果;新的 zip-Observable 發射的第二個項目將是函數應用於 Observable #1 發射的第二個項目和 Observable #2 發射的第二個項目的結果;依此類推。它只會發射與發射最少項目的源 Observable 發射的項目數量一樣多的項目。

另請參閱

特定語言資訊

待定

RxGroovy 將此操作符實作為 zip 的多個變體,以及 zipWith,此操作符的實例函數版本。

zip

zip 的最後一個參數是一個函數,它接受每個正在壓縮的 Observable 的一個項目,並發射一個要由 zip 回傳的 Observable 發射的項目。您可以將要壓縮在一起的 Observables 作為兩個到九個單獨的參數,或作為一個單一參數提供給 zip:可以是 Observables 的 Iterable 或發射 Observables 的 Observable(如上圖所示)。

範例程式碼

odds  = Observable.from([1, 3, 5, 7, 9]);
evens = Observable.from([2, 4, 6]);

Observable.zip(odds, evens, {o, e -> [o, e]}).subscribe(
  { println(it); },                          // onNext
  { println("Error: " + it.getMessage()); }, // onError
  { println("Sequence complete"); }          // onCompleted
);
[1, 2]
[3, 4]
[5, 6]
Sequence complete

請注意,在此範例中,結果 Observable 在發射三個項目後正常完成,這是兩個組成 Observables(evens,發射三個項目)中較短的一個發射的項目數量。

zipWith

此操作符的 zipWith 實例版本始終接受兩個參數。第一個參數可以是簡單的 Observable,也可以是可迭代物件(如上圖所示)。

zipzipWith 預設不在任何特定的 Scheduler 上運作。

RxJava 將此操作符實作為 zip 的多個變體,以及 zipWith,此操作符的實例函數版本。

zip

zip 的最後一個參數是一個函數,它接受每個正在壓縮的 Observable 的一個項目,並發射一個要由 zip 回傳的 Observable 發射的項目。您可以將要壓縮在一起的 Observables 作為兩個到九個單獨的參數,或作為一個單一參數提供給 zip:可以是 Observables 的 Iterable 或發射 Observables 的 Observable(如上圖所示)。

zipWith

此操作符的 zipWith 實例版本始終接受兩個參數。第一個參數可以是簡單的 Observable,也可以是可迭代物件(如上圖所示)。

zipzipWith 預設不在任何特定的 Scheduler 上運作。

RxJS 將此操作符實作為 zipzipArray

zip

zip 接受可變數量的 Observables 或 Promises 作為參數,後跟一個函數,該函數接受每個 Observables 發射或 Promises 解析的一個項目作為輸入,並產生一個要由結果 Observable 發射的單個項目。

範例程式碼

/* Using arguments */
var range = Rx.Observable.range(0, 5);

var source = Observable.zip(
    range,
    range.skip(1),
    range.skip(2),
    function (s1, s2, s3) {
        return s1 + ':' + s2 + ':' + s3;
    }
);

var subscription = source.subscribe(
    function (x) {
        console.log('Next: ' + x);
    },
    function (err) {
        console.log('Error: ' + err);
    },
    function () {
        console.log('Completed');
    });
Next: 0:1:2
Next: 1:2:3
Next: 2:3:4
Completed
/* Using promises and Observables */
var range = Rx.Observable.range(0, 5);

var source = Observable.zip(
    RSVP.Promise.resolve(0),
    RSVP.Promise.resolve(1),
    Rx.Observable.return(2)
    function (s1, s2, s3) {
        return s1 + ':' + s2 + ':' + s3;
    }
);

var subscription = source.subscribe(
    function (x) {
        console.log('Next: ' + x);
    },
    function (err) {
        console.log('Error: ' + err);
    },
    function () {
        console.log('Completed');
    });
Next: 0:1:2
Completed
zipArray

zipArray 接受可變數量的 Observables 作為參數,並回傳一個發射陣列的 Observable,每個陣列都包含每個來源 Observable 的第 n 個項目。

範例程式碼

var range = Rx.Observable.range(0, 5);

var source = Rx.Observable.zipArray(
    range,
    range.skip(1), 
    range.skip(2)
);

var subscription = source.subscribe(
    function (x) {
        console.log('Next: ' + x);
    },
    function (err) {
        console.log('Error: ' + err);   
    },
    function () {
        console.log('Completed');   
    });
Next: [0,1,2]
Next: [1,2,3]
Next: [2,3,4]
Completed
forkJoin

RxJS 也實作了一個類似的操作符 forkJoin。此操作符有兩種變體。第一種將每個來源 Observables 發射的最後一個元素收集到一個陣列中,並將此陣列作為自己的唯一發射項目發射。您可以將 Observables 列表作為單獨的參數或作為 Observables 的陣列傳遞給 forkJoin

var source = Rx.Observable.forkJoin(
    Rx.Observable.return(42),
    Rx.Observable.range(0, 10),
    Rx.Observable.fromArray([1,2,3]),
    RSVP.Promise.resolve(56)
);

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
Next: [42, 9, 3, 56]
Completed
forkJoin

forkJoin 的第二種變體作為原型函數存在,您可以在一個來源 Observable 的實例上呼叫它,並將另一個來源 Observable 作為參數傳遞給它。作為第二個參數,您將一個函數傳遞給它,該函數將兩個來源 Observable 發射的最後一個項目組合到結果 Observable 要發射的唯一項目中。

var source1 = Rx.Observable.return(42);
var source2 = Rx.Observable.range(0, 3);

var source = source1.forkJoin(source2, function (s1, s2) {
    return s1 + s2;
});

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
Next: 44
Completed

forkJoin 可以在以下發行版中找到

  • rx.all.js
  • rx.all.compat.js
  • rx.experimental.js (需要 rx.jsrx.compat.jsrx.lite.jsrx.lite.compat.js)

待定

RxPHP 將此操作符實作為 zip

當所有 observable 序列在相應的索引處產生一個元素時,使用選擇器函數將指定的 observable 序列合併為一個 observable 序列。如果省略結果選擇器函數,則會產生一個包含 observable 序列在相應索引處元素的列表。

範例程式碼

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/zip/zip.php

//Without a result selector
$range = \Rx\Observable::fromArray(range(0, 4));

$source = $range
    ->zip([
        $range->skip(1),
        $range->skip(2)
    ]);

$observer = $createStdoutObserver();

$subscription = $source
    ->subscribe(new CallbackObserver(
        function ($array) use ($observer) {
            $observer->onNext(json_encode($array));
        },
        [$observer, 'onError'],
        [$observer, 'onCompleted']
    ));

   
Next value: [0,1,2]
Next value: [1,2,3]
Next value: [2,3,4]
Complete!
    
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/zip/zip-result-selector.php

//With a result selector
$range = \Rx\Observable::fromArray(range(0, 4));

$source = $range
    ->zip([
        $range->skip(1),
        $range->skip(2)
    ], function ($s1, $s2, $s3) {
        return $s1 . ':' . $s2 . ':' . $s3;
    });

$observer = $createStdoutObserver();

$subscription = $source->subscribe($createStdoutObserver());

   
Next value: 0:1:2
Next value: 1:2:3
Next value: 2:3:4
Complete!
    

RxPHP 還有一個操作符 forkJoin

平行執行所有 observable 序列並收集它們的最後一個元素。

範例程式碼

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/forkJoin/forkJoin.php

use Rx\Observable;

$obs1 = Observable::range(1, 4);
$obs2 = Observable::range(3, 5);
$obs3 = Observable::fromArray(['a', 'b', 'c']);

$observable = Observable::forkJoin([$obs1, $obs2, $obs3], function($v1, $v2, $v3) {
    return $v1 . $v2 . $v3;
});
$observable->subscribe($stdoutObserver);
   
Next value: 47c
Complete!
    

待定

待定