#實作您自己的運算子

您可以實作您自己的 Observable 運算子。此頁面將說明如何實作。

如果您的運算子是設計用來產生 Observable,而不是轉換或響應來源 Observable,請使用 create( ) 方法,而不是嘗試手動實作 Observable。否則,請按照以下說明操作。

將您的自訂運算子與標準 RxJava 運算子鏈接

以下範例示範如何透過使用 lift( ) 運算子,將自訂運算子(在本範例中為:myOperator)與標準 RxJava 運算子鏈接在一起。

Observable foo = barObservable.ofType(Integer).map({it*2}).lift(new myOperator<T>()).map({"transformed by myOperator: " + it});

以下章節將說明如何建立您運算子的骨架,使其能正確地與 lift( ) 搭配運作。

實作您的運算子

將您的運算子定義為實作 Operator 介面的 public 類別,如下所示:

public class myOperator<T> implements Operator<T> {
  public myOperator( /* any necessary params here */ ) {
    /* any necessary initialization here */
  }

  @Override
  public Subscriber<? super T> call(final Subscriber<? super T> s) {
    return new Subscriber<t>(s) {
      @Override
      public void onCompleted() {
        /* add your own onCompleted behavior here, or just pass the completed notification through: */
        if(!s.isUnsubscribed()) {
          s.onCompleted();
        }
      }

      @Override
      public void onError(Throwable t) {
        /* add your own onError behavior here, or just pass the error notification through: */
        if(!s.isUnsubscribed()) {
          s.onError(t);
        }
      }

      @Override
      public void onNext(T item) {
        /* this example performs some sort of simple transformation on each incoming item and then passes it along */
        if(!s.isUnsubscribed()) {
          transformedItem = myOperatorTransformOperation(item);
          s.onNext(transformedItem);
        }
      }
    };
  }
}

其他注意事項

  • 您的運算子應在發送任何項目(或發送任何通知)給 Subscriber 之前,檢查其 Subscriber 的 isUnsubscribed( ) 狀態。不要浪費時間產生 Subscriber 不感興趣的項目。
  • 您的運算子應遵守 Observable 契約的核心原則
    • 它可以多次呼叫 Subscriber 的 onNext( ) 方法,但這些呼叫必須是非重疊的。
    • 它只能呼叫 Subscriber 的 onCompleted( )onError( ) 方法其中之一,而且只能呼叫一次,之後不得再呼叫 Subscriber 的 onNext( ) 方法。
    • 如果您無法保證您的運算子符合上述兩個原則,您可以加入 serialize( ) 運算子來強制執行正確的行為。
  • 不要在您的運算子內進行阻塞。
  • 通常最好是透過組合現有的運算子來組成新的運算子,盡可能地做到這一點,而不是重新發明輪子。RxJava 本身就是這樣處理一些標準運算子的,例如:
  • 如果您的運算子使用作為參數傳入的函數或 Lambda(例如,述詞),請注意這些可能會是例外狀況的來源,並準備好捕獲這些例外狀況,並透過 onError( ) 呼叫通知訂閱者。
  • 一般而言,應立即通知訂閱者錯誤情況,而不是努力先發送更多項目。
  • 在某些 ReactiveX 實作中,您的運算子可能需要對該實作的「背壓」策略敏感。(例如,請參閱:Dávid Karnok 的 運算子實作的陷阱(第 2 部分)。)

另請參閱