Merge

透過合併多個 Observables 的發射值,將其結合為一個

你可以使用 Merge 操作符來結合多個 Observables 的輸出,使其表現得像單一的 Observable。

Merge 可能會交錯合併的 Observables 所發射的項目 (一個類似的操作符,Concat,不會交錯合併項目,而是依序發射每個來源 Observable 的所有項目,然後才開始發射下一個來源 Observable 的項目)。

如上圖所示,任何來源 Observable 的 onError 通知都會立即傳遞給觀察者,並終止合併後的 Observable。

MergeDelayError

在許多 ReactiveX 實現中,還有第二個操作符 MergeDelayError,它會改變這種行為 — 保留 onError 通知,直到所有合併的 Observables 完成,然後才將其傳遞給觀察者。

另請參閱

特定語言資訊

在 RxClojure 中,這裡有六個相關的操作符

merge

merge 將兩個或多個 Observables 轉換為一個單一的 Observable,該 Observable 發射所有這些 Observables 發射的所有項目。

merge*

merge* 將發射 Observables 的 Observable 轉換為一個單一的 Observable,該 Observable 發射所有發射的 Observables 發射的所有項目。

merge-delay-error

merge-delay-errormerge 類似,但即使一個或多個 Observables 在仍在發射項目時以 onError 通知終止,它也會發射所有合併的 Observables 的所有項目。

merge-delay-error*merge* 的類似修改版本。

interleave*

interleavemerge 類似,但它更刻意地交錯來自來源 Observables 的項目:結果的 Observable 發射第一個來源 Observable 發射的第一個項目,然後是第二個來源 Observable 發射的第一個項目,依此類推,並且在到達最後一個來源 Observable 後,然後發射第一個來源 Observable 發射的第二個項目,第二個來源 Observable 發射的第二個項目,依此類推,直到所有來源 Observables 終止。

interleave* 類似,但操作的是 Observables 的 Observable。

RxCpp 將此操作符實作為 merge

merge

RxGroovy 將此操作符實作為 mergemergeWithmergeDelayError

merge

例如,以下程式碼將 oddsevens 合併為一個單一的 Observable。(subscribeOn 操作符使 odds 在與 evens 不同的執行緒上操作,以便兩個 Observables 可以同時發射項目,以演示 Merge 如何交錯這些項目。)

程式碼範例

odds  = Observable.from([1, 3, 5, 7]).subscribeOn(someScheduler);
evens = Observable.from([2, 4, 6]);

Observable.merge(odds,evens).subscribe(
  { println(it); },                          // onNext
  { println("Error: " + it.getMessage()); }, // onError
  { println("Sequence complete"); }          // onCompleted
);
1
3
2
5
4
7
6
Sequence complete

除了將多個 Observables (最多九個) 傳遞給 merge 之外,你還可以傳遞一個 Observables 的 List<> (或其他 Iterable)、一個 Observables 的陣列,甚至是發射 Observables 的 Observable,而 merge 會將它們的輸出合併到單一的 Observable 的輸出中

merge(List)

如果你傳入一個 Observables 的 Observable,你可以選擇傳入一個值,指示 merge 它應該嘗試同時訂閱的最大 Observables 數量。一旦達到此最大訂閱計數,它將不會訂閱來源 Observable 發射的任何其他 Observables,直到其中一個已訂閱的 Observables 發出 onCompleted 通知為止。

merge 的實例版本是 mergeWith,因此,例如,在上面的程式碼範例中,你也可以寫 odds.mergeWith(evens),而不是寫 Observable.merge(odds,evens)

如果傳遞給 merge 的任何個別 Observables 以 onError 通知終止,則 merge 產生的 Observable 本身會立即以 onError 通知終止。如果你希望合併繼續發射剩餘無錯誤的 Observables 的結果,然後再報告錯誤,請改用 mergeDelayError

mergeDelayError

mergeDelayError 的行為與 merge 非常相似。例外情況是當正在合併的 Observables 之一以 onError 通知終止時。如果 merge 發生這種情況,合併後的 Observable 將立即發出 onError 通知並終止。另一方面,mergeDelayError 會延遲報告錯誤,直到它給予它正在合併的任何其他非產生錯誤的 Observables 有機會完成發射它們的項目,並且它會自己發射這些項目,並且只有在所有其他合併的 Observables 完成後才會以 onError 通知終止。

由於可能有多個合併的 Observables 遇到錯誤,因此 mergeDelayError 可能會在 onError 通知中傳遞有關多個錯誤的資訊(它永遠不會多次調用觀察者的 onError 方法)。因此,如果你想知道這些錯誤的性質,你應該編寫觀察者的 onError 方法,使其接受 CompositeException 類別的參數。

mergeDelayError 的變體較少。你無法將 Observables 的 Iterable 或陣列傳遞給它,但你可以傳遞一個發射 Observables 的 Observable,或介於一到九個個別的 Observables 作為參數。沒有像 merge 那樣的 mergeDelayError 的實例方法版本。

RxJava 將此操作符實作為 mergemergeWithmergeDelayError

merge

程式碼範例

Observable<Integer> odds = Observable.just(1, 3, 5).subscribeOn(someScheduler);
Observable<Integer> evens = Observable.just(2, 4, 6);

Observable.merge(odds, evens)
          .subscribe(new Subscriber<Integer>() {
        @Override
        public void onNext(Integer item) {
            System.out.println("Next: " + item);
        }

        @Override
        public void onError(Throwable error) {
            System.err.println("Error: " + error.getMessage());
        }

        @Override
        public void onCompleted() {
            System.out.println("Sequence complete.");
        }
    });
Next: 1
Next: 3
Next: 5
Next: 2
Next: 4
Next: 6
Sequence complete.

除了將多個 Observables (最多九個) 傳遞給 merge 之外,你還可以傳遞一個 Observables 的 List<> (或其他 Iterable)、一個 Observables 的陣列,甚至是發射 Observables 的 Observable,而 merge 會將它們的輸出合併到單一的 Observable 的輸出中

merge(List)

如果你傳入一個 Observables 的 Observable,你可以選擇傳入一個值,指示 merge 它應該嘗試同時訂閱的最大 Observables 數量。一旦達到此最大訂閱計數,它將不會訂閱來源 Observable 發射的任何其他 Observables,直到其中一個已訂閱的 Observables 發出 onCompleted 通知為止。

merge 的實例版本是 mergeWith,因此,例如,你也可以寫 odds.mergeWith(evens),而不是寫 Observable.merge(odds,evens)

如果傳遞給 merge 的任何個別 Observables 以 onError 通知終止,則 merge 產生的 Observable 本身會立即以 onError 通知終止。如果你希望合併繼續發射剩餘無錯誤的 Observables 的結果,然後再報告錯誤,請改用 mergeDelayError

mergeDelayError

mergeDelayError 的行為與 merge 非常相似。例外情況是當正在合併的 Observables 之一以 onError 通知終止時。如果 merge 發生這種情況,合併後的 Observable 將立即發出 onError 通知並終止。另一方面,mergeDelayError 會延遲報告錯誤,直到它給予它正在合併的任何其他非產生錯誤的 Observables 有機會完成發射它們的項目,並且它會自己發射這些項目,並且只有在所有其他合併的 Observables 完成後才會以 onError 通知終止。

由於可能有多個合併的 Observables 遇到錯誤,因此 mergeDelayError 可能會在 onError 通知中傳遞有關多個錯誤的資訊(它永遠不會多次調用觀察者的 onError 方法)。因此,如果你想知道這些錯誤的性質,你應該編寫觀察者的 onError 方法,使其接受 CompositeException 類別的參數。

mergeDelayError 的變體較少。你無法將 Observables 的 Iterable 或陣列傳遞給它,但你可以傳遞一個發射 Observables 的 Observable,或介於一到九個個別的 Observables 作為參數。沒有像 merge 那樣的 mergeDelayError 的實例方法版本。

merge

merge 的第一個變體是一個實例操作符,它接受可變數量的 Observables 作為參數,將這些 Observables 中的每一個與來源 (實例) Observables 合併,以產生其單一輸出 Observable。

merge 的第一個變體可以在以下發行版本中找到

  • rx.js
  • rx.compat.js
  • rx.lite.js
  • rx.lite.compat.js

merge 的第二個變體是一個原型 (類別) 操作符,它接受兩個參數。第二個參數是一個發射你要合併的 Observables 的 Observable。第一個參數是一個數字,指示你希望 merge 在任何時刻嘗試訂閱的最大發射 Observables 數量。一旦達到此最大訂閱計數,它將不會訂閱來源 Observable 發射的任何其他 Observables,直到其中一個已訂閱的 Observables 發出 onCompleted 通知為止。

merge 的第二個變體可以在以下發行版本中找到

  • rx.js
  • rx.all.js
  • rx.all.compat.js
  • rx.compat.js
  • rx.lite.js
  • rx.lite.compat.js
mergeAll

mergeAllmerge 的第二個變體類似,只是它不允許你設定此最大訂閱計數。它只接受一個 Observables 的 Observable 的單一參數。

mergeAll 可以在以下發行版本中找到

  • rx.js
  • rx.all.js
  • rx.all.compat.js
  • rx.compat.js
  • rx.lite.js
  • rx.lite.compat.js
mergeDelayError

如果傳遞給 mergemergeAll 的任何個別 Observables 以 onError 通知終止,則產生的 Observable 將立即以 onError 通知終止。如果你希望合併繼續發射剩餘無錯誤的 Observables 的結果,然後再報告錯誤,請改用 mergeDelayError

程式碼範例

var source1 = Rx.Observable.of(1,2,3);
var source2 = Rx.Observable.throwError(new Error('whoops!'));
var source3 = Rx.Observable.of(4,5,6);

var merged = Rx.Observable.mergeDelayError(source1, source2, source3);

var subscription = merged.subscribe(
  function (x) { console.log('Next: %s', x); },
  function (err) { console.log('Error: %s', err); }
  function () { console.log('Completed' } );
1
2
3
4
5
6
Error: Error: whoops!

mergeDelayError 可以在以下發行版本中找到

  • rx.js
  • rx.compat.js
  • rx.lite.js
  • rx.lite.compat.js

RxKotlin 將此操作符實作為 mergemergeWithmergeDelayError

merge

除了將多個 Observables (最多九個) 傳遞給 merge 之外,你還可以傳遞一個 Observables 的 List<> (或其他 Iterable)、一個 Observables 的陣列,甚至是發射 Observables 的 Observable,而 merge 會將它們的輸出合併到單一的 Observable 的輸出中

merge(List)

如果你傳入一個 Observables 的 Observable,你可以選擇傳入一個值,指示 merge 它應該嘗試同時訂閱的最大 Observables 數量。一旦達到此最大訂閱計數,它將不會訂閱來源 Observable 發射的任何其他 Observables,直到其中一個已訂閱的 Observables 發出 onCompleted 通知為止。

merge 的實例版本是 mergeWith,因此,例如,你也可以寫 odds.mergeWith(evens),而不是寫 Observable.merge(odds,evens)

如果傳遞給 merge 的任何個別 Observables 以 onError 通知終止,則 merge 產生的 Observable 本身會立即以 onError 通知終止。如果你希望合併繼續發射剩餘無錯誤的 Observables 的結果,然後再報告錯誤,請改用 mergeDelayError

mergeDelayError

mergeDelayError 的行為與 merge 非常相似。例外情況是當正在合併的 Observables 之一以 onError 通知終止時。如果 merge 發生這種情況,合併後的 Observable 將立即發出 onError 通知並終止。另一方面,mergeDelayError 會延遲報告錯誤,直到它給予它正在合併的任何其他非產生錯誤的 Observables 有機會完成發射它們的項目,並且它會自己發射這些項目,並且只有在所有其他合併的 Observables 完成後才會以 onError 通知終止。

由於可能有多個合併的 Observables 遇到錯誤,因此 mergeDelayError 可能會在 onError 通知中傳遞有關多個錯誤的資訊(它永遠不會多次調用觀察者的 onError 方法)。因此,如果你想知道這些錯誤的性質,你應該編寫觀察者的 onError 方法,使其接受 CompositeException 類別的參數。

mergeDelayError 的變體較少。你無法將 Observables 的 Iterable 或陣列傳遞給它,但你可以傳遞一個發射 Observables 的 Observable,或介於一到九個個別的 Observables 作為參數。沒有像 merge 那樣的 mergeDelayError 的實例方法版本。

Rx.NET 將此操作符實作為 Merge

Merge

你可以將 Observables 的陣列、Observables 的可列舉物件、Observables 的 Observable 或兩個個別的 Observables 傳遞給 Merge

如果您傳遞一個 Observables 的 Enumerable 或 Observable,您可以選擇傳遞一個整數,表示它應嘗試同時訂閱這些 Observables 的最大數量。一旦達到此最大訂閱計數,它將暫停訂閱來源 Observable 發出的任何其他 Observables,直到其中一個已訂閱的 Observables 發出 onCompleted 通知。

RxPHP 將此運算符實作為 merge

將一個 Observable 與另一個 Observable 結合,方法是將它們的發射合併到一個單一的 Observable 中。

程式碼範例

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/merge/merge.php

$observable       = Rx\Observable::of(42)->repeat();
$otherObservable  = Rx\Observable::of(21)->repeat();
$mergedObservable = $observable
    ->merge($otherObservable)
    ->take(10);

$disposable = $mergedObservable->subscribe($stdoutObserver);

   
Next value: 42
Next value: 21
Next value: 42
Next value: 21
Next value: 42
Next value: 21
Next value: 42
Next value: 21
Next value: 42
Next value: 21
Complete!
    

RxPHP 也有一個運算符 mergeAll

將一個 observables 的 observable 序列合併成一個 observable 序列。

程式碼範例

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/merge/merge-all.php

$sources = Rx\Observable::range(0, 3)
    ->map(function ($x) {
        return Rx\Observable::range($x, 3);
    });

$merged = $sources->mergeAll();

$disposable = $merged->subscribe($stdoutObserver);

   
Next value: 0
Next value: 1
Next value: 1
Next value: 2
Next value: 2
Next value: 2
Next value: 3
Next value: 3
Next value: 4
Complete!
    

RxPY 將此運算符實作為 mergemerge_all/merge_observable

merge

您可以將一組 Observables 作為個別參數傳遞給 merge,也可以將它們作為包含這些 Observables 的陣列的單一參數傳遞。

merge_all

merge_all 及其別名 merge_observable 將一個發射 Observables 的 Observable 作為其單一參數。它們合併所有這些 Observables 的發射以建立它們自己的 Observable。

Rx.rb 將此運算符實作為 mergemerge_concurrentmerge_all

merge

merge 將第二個 Observable 合併到它正在操作的 Observable 中,以建立一個新的合併 Observable。

merge_concurrent 操作一個發射 Observables 的 Observable,將每個這些 Observables 的發射合併到它自己的發射中。您可以選擇性地傳遞一個整數參數,表示 merge_concurrent 應嘗試同時訂閱多少個這些發射的 Observables。一旦達到此最大訂閱計數,它將暫停訂閱來源 Observable 發出的任何其他 Observables,直到其中一個已訂閱的 Observables 發出 onCompleted 通知。預設值為 1,使其等同於 merge_all

merge_all

merge_all 就像 merge_concurrent(1)。它一次訂閱每個發射的 Observable,將其發射鏡像為自己的發射,並等待訂閱下一個 Observable,直到目前的一個以 onCompleted 通知終止。在這方面,它更像是一個 Concat 變體。

RxScala 將此運算符實作為 flattenflattenDelayErrormergemergeDelayError

merge

merge 將第二個 Observable 作為參數,並將該 Observable 與應用 merge 運算符的 Observable 合併,以建立一個新的輸出 Observable。

mergeDelayError

mergeDelayError 類似於 merge,但即使其中一個 Observable 在另一個 Observable 完成發射項目之前以 onError 通知終止,它始終會發射兩個 Observables 的所有項目。

flatten

flatten 將一個發射 Observables 的 Observable 作為其參數。它合併每個這些 Observables 發射的項目,以建立其自己的單一 Observable 序列。此運算符的一個變體允許您傳遞一個 Int,表示您希望 flatten 嘗試在任何時間訂閱的最大發射 Observables 數量。如果達到此最大訂閱計數,它將暫停訂閱來源 Observable 發出的任何其他 Observables,直到其中一個已訂閱的 Observables 發出 onCompleted 通知。

flattenDelayError 類似於 flatten,但即使一個或多個這些 Observables 在其他 Observables 完成發射項目之前以 onError 通知終止,它始終會發射所有發射的 Observables 的所有項目。

RxSwift 將此運算符實作為 merge

merge

merge 將一個發射 Observables 的 Observable 作為其參數。它合併每個這些 Observables 發射的項目,以建立其自己的單一 Observable 序列。

此運算符的一個變體 merge(maxConcurrent:) 允許您傳遞一個 Int,表示您希望 merge 嘗試在任何時間訂閱的最大發射 Observables 數量。如果達到此最大訂閱計數,它將暫停訂閱來源 Observable 發出的任何其他 Observables,直到其中一個已訂閱的 Observables 發出 onCompleted 通知。

程式碼範例

let subject1 = PublishSubject()
let subject2 = PublishSubject()

Observable.of(subject1, subject2)
   .merge()
   .subscribe {
       print($0)
   }

subject1.on(.Next(10))
subject1.on(.Next(11))
subject1.on(.Next(12))
subject2.on(.Next(20))
subject2.on(.Next(21))
subject1.on(.Next(14))
subject1.on(.Completed)
subject2.on(.Next(22))
subject2.on(.Completed)
Next(10)
Next(11)
Next(12)
Next(20)
Next(21)
Next(14)
Next(22)
Completed