Zip
方法會回傳一個 Observable,它會將您選擇的函數應用於兩個(或更多)其他 Observables 按順序發射的項目的組合,而此函數的結果將成為回傳的 Observable 發射的項目。它會嚴格按順序應用此函數,因此新 Observable 發射的第一個項目將是函數應用於 Observable #1 發射的第一個項目和 Observable #2 發射的第一個項目的結果;新的 zip-Observable 發射的第二個項目將是函數應用於 Observable #1 發射的第二個項目和 Observable #2 發射的第二個項目的結果;依此類推。它只會發射與發射最少項目的源 Observable 發射的項目數量一樣多的項目。
待定
RxGroovy 將此操作符實作為 zip
的多個變體,以及 zipWith
,此操作符的實例函數版本。
zip
的最後一個參數是一個函數,它接受每個正在壓縮的 Observable 的一個項目,並發射一個要由 zip
回傳的 Observable 發射的項目。您可以將要壓縮在一起的 Observables 作為兩個到九個單獨的參數,或作為一個單一參數提供給 zip
:可以是 Observables 的 Iterable 或發射 Observables 的 Observable(如上圖所示)。
odds = Observable.from([1, 3, 5, 7, 9]); evens = Observable.from([2, 4, 6]); Observable.zip(odds, evens, {o, e -> [o, e]}).subscribe( { println(it); }, // onNext { println("Error: " + it.getMessage()); }, // onError { println("Sequence complete"); } // onCompleted );
[1, 2] [3, 4] [5, 6] Sequence complete
請注意,在此範例中,結果 Observable 在發射三個項目後正常完成,這是兩個組成 Observables(evens
,發射三個項目)中較短的一個發射的項目數量。
zip(Iterable<Observable>,FuncN)
zip(Observable<Observable>,FuncN)
zip(Observable,Observable,Func2)
(也有最多可接受九個 Observables 的版本)此操作符的 zipWith
實例版本始終接受兩個參數。第一個參數可以是簡單的 Observable,也可以是可迭代物件(如上圖所示)。
zipWith(Observable,Func2)
zipWith(Iterable,Func2)
zip
和 zipWith
預設不在任何特定的 Scheduler 上運作。
RxJava 將此操作符實作為 zip
的多個變體,以及 zipWith
,此操作符的實例函數版本。
zip
的最後一個參數是一個函數,它接受每個正在壓縮的 Observable 的一個項目,並發射一個要由 zip
回傳的 Observable 發射的項目。您可以將要壓縮在一起的 Observables 作為兩個到九個單獨的參數,或作為一個單一參數提供給 zip
:可以是 Observables 的 Iterable 或發射 Observables 的 Observable(如上圖所示)。
zip(Iterable<Observable>,FuncN)
zip(Observable<Observable>,FuncN)
zip(Observable,Observable,Func2)
(也有最多可接受九個 Observables 的版本)此操作符的 zipWith
實例版本始終接受兩個參數。第一個參數可以是簡單的 Observable,也可以是可迭代物件(如上圖所示)。
zipWith(Observable,Func2)
zipWith(Iterable,Func2)
zip
和 zipWith
預設不在任何特定的 Scheduler 上運作。
RxJS 將此操作符實作為 zip
和 zipArray
。
zip
接受可變數量的 Observables 或 Promises 作為參數,後跟一個函數,該函數接受每個 Observables 發射或 Promises 解析的一個項目作為輸入,並產生一個要由結果 Observable 發射的單個項目。
/* Using arguments */ var range = Rx.Observable.range(0, 5); var source = Observable.zip( range, range.skip(1), range.skip(2), function (s1, s2, s3) { return s1 + ':' + s2 + ':' + s3; } ); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 0:1:2 Next: 1:2:3 Next: 2:3:4 Completed
/* Using promises and Observables */ var range = Rx.Observable.range(0, 5); var source = Observable.zip( RSVP.Promise.resolve(0), RSVP.Promise.resolve(1), Rx.Observable.return(2) function (s1, s2, s3) { return s1 + ':' + s2 + ':' + s3; } ); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 0:1:2 Completed
zipArray
接受可變數量的 Observables 作為參數,並回傳一個發射陣列的 Observable,每個陣列都包含每個來源 Observable 的第 n 個項目。
var range = Rx.Observable.range(0, 5); var source = Rx.Observable.zipArray( range, range.skip(1), range.skip(2) ); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: [0,1,2] Next: [1,2,3] Next: [2,3,4] Completed
RxJS 也實作了一個類似的操作符 forkJoin
。此操作符有兩種變體。第一種將每個來源 Observables 發射的最後一個元素收集到一個陣列中,並將此陣列作為自己的唯一發射項目發射。您可以將 Observables 列表作為單獨的參數或作為 Observables 的陣列傳遞給 forkJoin
。
var source = Rx.Observable.forkJoin( Rx.Observable.return(42), Rx.Observable.range(0, 10), Rx.Observable.fromArray([1,2,3]), RSVP.Promise.resolve(56) ); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: [42, 9, 3, 56] Completed
forkJoin
的第二種變體作為原型函數存在,您可以在一個來源 Observable 的實例上呼叫它,並將另一個來源 Observable 作為參數傳遞給它。作為第二個參數,您將一個函數傳遞給它,該函數將兩個來源 Observable 發射的最後一個項目組合到結果 Observable 要發射的唯一項目中。
var source1 = Rx.Observable.return(42); var source2 = Rx.Observable.range(0, 3); var source = source1.forkJoin(source2, function (s1, s2) { return s1 + s2; }); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 44 Completed
forkJoin
可以在以下發行版中找到
rx.all.js
rx.all.compat.js
rx.experimental.js
(需要 rx.js
、rx.compat.js
、rx.lite.js
或 rx.lite.compat.js
)待定
RxPHP 將此操作符實作為 zip
。
當所有 observable 序列在相應的索引處產生一個元素時,使用選擇器函數將指定的 observable 序列合併為一個 observable 序列。如果省略結果選擇器函數,則會產生一個包含 observable 序列在相應索引處元素的列表。
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/zip/zip.php //Without a result selector $range = \Rx\Observable::fromArray(range(0, 4)); $source = $range ->zip([ $range->skip(1), $range->skip(2) ]); $observer = $createStdoutObserver(); $subscription = $source ->subscribe(new CallbackObserver( function ($array) use ($observer) { $observer->onNext(json_encode($array)); }, [$observer, 'onError'], [$observer, 'onCompleted'] ));
Next value: [0,1,2] Next value: [1,2,3] Next value: [2,3,4] Complete!
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/zip/zip-result-selector.php //With a result selector $range = \Rx\Observable::fromArray(range(0, 4)); $source = $range ->zip([ $range->skip(1), $range->skip(2) ], function ($s1, $s2, $s3) { return $s1 . ':' . $s2 . ':' . $s3; }); $observer = $createStdoutObserver(); $subscription = $source->subscribe($createStdoutObserver());
Next value: 0:1:2 Next value: 1:2:3 Next value: 2:3:4 Complete!
RxPHP 還有一個操作符 forkJoin
。
平行執行所有 observable 序列並收集它們的最後一個元素。
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/forkJoin/forkJoin.php use Rx\Observable; $obs1 = Observable::range(1, 4); $obs2 = Observable::range(3, 5); $obs3 = Observable::fromArray(['a', 'b', 'c']); $observable = Observable::forkJoin([$obs1, $obs2, $obs3], function($v1, $v2, $v3) { return $v1 . $v2 . $v3; }); $observable->subscribe($stdoutObserver);
Next value: 47c Complete!
待定
待定