訂閱 (Subscribe)

對 Observable 發射 (emissions) 和通知進行操作

Subscribe 操作符是連接觀察者 (observer) 和 Observable 的橋樑。為了讓觀察者看到 Observable 發射的項目,或接收來自 Observable 的錯誤或完成通知,它必須先使用此操作符訂閱該 Observable。

典型的 Subscribe 操作符實現可能接受一到三個方法 (這些方法構成觀察者),或者它可能接受一個對象 (有時稱為 ObserverSubscriber),該對象實現包含這三個方法的介面。

onNext
每當 Observable 發射一個項目時,就會呼叫此方法。此方法將 Observable 發射的項目作為參數。
onError
Observable 呼叫此方法表示它未能產生預期的數據或遇到其他錯誤。這會停止 Observable,它將不會再呼叫 onNextonCompletedonError 方法將導致錯誤的原因作為參數 (有時是一個物件,如 Exception 或 Throwable,其他時候是一個簡單的字串,取決於實作)。
onCompleted
Observable 在最後一次呼叫 onNext 後呼叫此方法,前提是它沒有遇到任何錯誤。

如果 Observable 在觀察者訂閱之前不會開始發射項目,則稱其為「冷」Observable;如果 Observable 可能隨時開始發射項目,並且訂閱者可能在開始之後的某個時間點開始觀察發射項目的序列,而錯過訂閱時間之前發射的任何項目,則稱其為「熱」Observable。

另請參閱

特定語言資訊

RxGroovy 實作了數個 subscribe 的變體。

如果您沒有傳遞任何參數,它會觸發對底層 Observable 的訂閱,但會忽略其發射和通知。這會啟動一個冷的 Observable。

您也可以傳遞一到三個函數;這些函數將按以下方式解釋

  1. onNext
  2. onNextonError
  3. onNextonErroronCompleted

最後,您可以傳遞一個實作 ObserverSubscriber 介面的對象。Observer 介面包含先前描述的三個「on」方法。Subscriber 介面也實作了這些方法,並新增了一些額外的方法,這些方法有助於反應式回壓,並允許 Subscriber 在 Observable 完成之前取消訂閱。

呼叫 subscribe 會傳回一個實作 Subscription 介面的對象。此介面包含 unsubscribe 方法,您可以隨時呼叫該方法以斷開 subscribe 在 Observable 和觀察者 (或替代觀察者的方法) 之間建立的訂閱。

forEach 操作符是 subscribe 的較簡單版本。您可以傳遞一到三個函數,這些函數將按以下方式解釋

  1. onNext
  2. onNextonError
  3. onNextonErroronCompleted

subscribe 不同,forEach 不會傳回可用來取消訂閱的物件。您也沒有選擇傳遞具有此功能的參數。因此,只有在您確定需要處理來自 Observable 的所有發射和通知時,才應使用此操作符。

forEach

還有一個名為 forEachBlockingObservable 方法,它有點類似。為了使用它,您必須首先透過 BlockingObservable.from 方法或 Observable.toBlocking 操作符將您的來源 Observable 轉換為 BlockingObservable

BlockingObservable.forEach 接受單一函數作為參數,此函數的行為很像普通 Observable 訂閱中的 onNext 函數。forEach 操作符本身會阻塞直到 BlockingObservable 完成,並且它是透過解除阻塞 (而不是呼叫回呼函數) 來表示它已完成。如果遇到錯誤,它將拋出 RuntimeException (而不是呼叫類似於 onError 回呼的函數)。

RxJava 實作了數個 subscribe 的變體。

如果您沒有傳遞任何參數,它會觸發對底層 Observable 的訂閱,但會忽略其發射和通知。這會啟動一個冷的 Observable。

您也可以傳遞一到三個函數;這些函數將按以下方式解釋

  1. onNext
  2. onNextonError
  3. onNextonErroronCompleted

最後,您可以傳遞一個實作 ObserverSubscriber 介面的對象。Observer 介面包含先前描述的三個「on」方法。Subscriber 介面也實作了這些方法,並新增了一些額外的方法,這些方法有助於反應式回壓,並允許 Subscriber 在 Observable 完成之前取消訂閱。

呼叫 subscribe 會傳回一個實作 Subscription 介面的對象。此介面包含 unsubscribe 方法,您可以隨時呼叫該方法以斷開 subscribe 在 Observable 和觀察者 (或替代觀察者的方法) 之間建立的訂閱。

forEach 操作符是 subscribe 的較簡單版本。您可以傳遞一到三個函數,這些函數將按以下方式解釋

  1. onNext
  2. onNextonError
  3. onNextonErroronCompleted

subscribe 不同,forEach 不會傳回可用來取消訂閱的物件。您也沒有選擇傳遞具有此功能的參數。因此,只有在您確定需要處理來自 Observable 的所有發射和通知時,才應使用此操作符。

forEach

還有一個名為 forEachBlockingObservable 方法,它有點類似。為了使用它,您必須首先透過 BlockingObservable.from 方法或 Observable.toBlocking 操作符將您的來源 Observable 轉換為 BlockingObservable

BlockingObservable.forEach 接受單一函數作為參數,此函數的行為很像普通 Observable 訂閱中的 onNext 函數。forEach 操作符本身會阻塞直到 BlockingObservable 完成,並且它是透過解除阻塞 (而不是呼叫回呼函數) 來表示它已完成。如果遇到錯誤,它將拋出 RuntimeException (而不是呼叫類似於 onError 回呼的函數)。

在 RxJS 中,您可以透過兩種方式訂閱 Observable

  1. 使用 subscribeOnNextsubscribeOnCompletedsubscribeOnError 分別將單一函數訂閱到 Observable 的 onNextonCompletedonError 通知
  2. 透過將零到三個個別函數,或一個實作這三個函數的對象傳遞到 subscribeforEach 操作符 (這些操作符的行為相同) 來進行訂閱。

範例程式碼

var source = Rx.Observable.range(0, 3)

var subscription = source.subscribeOnNext(
  function (x) {
    console.log('Next: %s', x);
  });
Next: 0
Next: 1
Next: 2
var source = Rx.Observable.range(0, 3);

var subscription = source.subscribeOnCompleted(
  function () {
    console.log('Completed');
  });
Completed
var source = Rx.Observable.throw(new Error());

var subscription = source.subscribeOnError(
  function (err) {
    console.log('Error: %s', err);
  });
Error: Error
var observer = Rx.Observer.create(
  function (x) { console.log('Next: %s', x); },
  function (err) { console.log('Error: %s', err); },
  function () { console.log('Completed'); });

var source = Rx.Observable.range(0, 3)

var subscription = source.subscribe(observer);
Next: 0
Next: 1
Next: 2
Completed
var source = Rx.Observable.range(0, 3)

var subscription = source.subscribe(
  function (x) { console.log('Next: %s', x); },
  function (err) { console.log('Error: %s', err); },
  function () { console.log('Completed'); });
Next: 0
Next: 1
Next: 2
Completed

本節中描述的函數都可以在以下每個發行版本中找到

  • rx.js
  • rx.all.js
  • rx.all.compat.js
  • rx.compat.js
  • rx.lite.js
  • rx.lite.compat.js

RxPHP 將此操作符實作為 subscribe