Create

透過一個函數從頭開始建立一個 Observable

Create

你可以使用 Create 操作符從頭開始建立一個 Observable。你傳遞給這個操作符一個函數,該函數接受觀察者 (observer) 作為其參數。撰寫此函數使其行為如同一個 Observable — 透過適當地呼叫觀察者的 onNextonErroronCompleted 方法。

一個格式良好的有限 Observable 必須嘗試呼叫觀察者的 onCompleted 方法恰好一次,或呼叫其 onError 方法恰好一次,並且此後不得嘗試呼叫觀察者的任何其他方法。

另請參閱

特定語言資訊

待定

create

RxGroovy 將此操作符實作為 create

範例程式碼

def myObservable = Observable.create({ aSubscriber ->
  try {
    for (int i = 1; i < 1000000; i++) {
      if (aSubscriber.isUnsubscribed()) {
        return;
      }
      aSubscriber.onNext(i);
    }
    if (!aSubscriber.isUnsubscribed()) {
      aSubscriber.onCompleted();
    }
  } catch(Throwable t) {
    if (!aSubscriber.isUnsubscribed()) {
      aSubscriber.onError(t);
    }
  }
})

檢查觀察者的 isUnsubscribed 狀態是一個好習慣,這樣你的 Observable 可以在沒有感興趣的觀察者時停止發射項目或執行昂貴的計算。

create 預設不會在任何特定的排程器 (Scheduler)上運作。

create

RxJava 將此操作符實作為 create

從你傳遞給 create 的函數中檢查觀察者的 isUnsubscribed 狀態是一個好習慣,這樣你的 Observable 可以在沒有感興趣的觀察者時停止發射項目或執行昂貴的計算。

範例程式碼

Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> observer) {
        try {
            if (!observer.isUnsubscribed()) {
                for (int i = 1; i < 5; i++) {
                    observer.onNext(i);
                }
                observer.onCompleted();
            }
        } catch (Exception e) {
            observer.onError(e);
        }
    }
 } ).subscribe(new Subscriber<Integer>() {
        @Override
        public void onNext(Integer item) {
            System.out.println("Next: " + item);
        }

        @Override
        public void onError(Throwable error) {
            System.err.println("Error: " + error.getMessage());
        }

        @Override
        public void onCompleted() {
            System.out.println("Sequence complete.");
        }
    });
Next: 1
Next: 2
Next: 3
Next: 4
Sequence complete.

create 預設不會在任何特定的排程器 (Scheduler)上運作。

create

RxJS 將此操作符實作為 create (同一個操作符還有另一個名稱:createWithDisposable)。

範例程式碼

/* Using a function */
var source = Rx.Observable.create(function (observer) {
    observer.onNext(42);
    observer.onCompleted();

    // Note that this is optional, you do not have to return this if you require no cleanup
    return function () { console.log('disposed'); };
});

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
Next: 42
Completed
/* Using a disposable */
var source = Rx.Observable.create(function (observer) {
    observer.onNext(42);
    observer.onCompleted();

    // Note that this is optional, you do not have to return this if you require no cleanup
    return Rx.Disposable.create(function () {
        console.log('disposed');
    });
});

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
Next: 42
Completed

create 位於以下發行版本中

  • rx.js
  • rx.all.js
  • rx.all.compat.js
  • rx.compat.js
  • rx.lite.js
  • rx.lite.compat.js
generate

你可以使用 generate 操作符來建立簡單的 Observables,它們可以產生它們的下一個發射項,並且可以基於上一個發射項的值來決定何時終止。generate 的基本形式採用四個參數

  1. 要發射的第一個項目
  2. 一個函數,用於測試一個項目,以確定是否發射它 (true) 或終止 Observable (false)
  3. 一個函數,用於基於上一個項目的值產生下一個要測試和發射的項目
  4. 一個函數,用於在發射項目之前轉換它們

你還可以選擇傳入第五個參數,一個 排程器 (Scheduler)generate 將使用它來建立和發射其序列 (預設使用 currentThread)。

範例程式碼

var source = Rx.Observable.generate(
    0,
    function (x) { return x < 3; },
    function (x) { return x + 1; },
    function (x) { return x; }
);

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
Next: 0
Next: 1
Next: 2
Completed

generate 位於以下發行版本中

  • rx.js
  • rx.compat.js
  • rx.lite.js
  • rx.lite.compat.js
generateWithRelativeTime

你可以使用 generateWithRelativeTime 操作符來建立簡單的 Observables,它們可以產生它們的下一個發射項,並且可以基於上一個發射項的值來決定何時終止。generateWithRelativeTime 的基本形式採用五個參數

  1. 要發射的第一個項目
  2. 一個函數,用於測試一個項目,以確定是否發射它 (true) 或終止 Observable (false)
  3. 一個函數,用於基於上一個項目的值產生下一個要測試和發射的項目
  4. 一個函數,用於在發射項目之前轉換它們
  5. 一個函數,指示產生器在發射上一個項目後,應該等待多少毫秒才能發射這個項目

你還可以選擇傳入第六個參數,一個 排程器 (Scheduler)generate 將使用它來建立和發射其序列 (預設使用 currentThread)。

範例程式碼

var source = Rx.Observable.generateWithRelativeTime(
    1,
    function (x) { return x < 4; },
    function (x) { return x + 1; },
    function (x) { return x; },
    function (x) { return 100 * x; }
).timeInterval();

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
Next: {value: 1, interval: 100}
Next: {value: 2, interval: 200}
Next: {value: 3, interval: 300}
Completed

generateWithRelativeTime 位於以下發行版本中

  • rx.lite.js
  • rx.lite.compat.js
  • rx.time.js (需要 rx.jsrx.compat.js)
generateWithAbsoluteTime

你可以使用 generateWithAbsoluteTime 操作符來建立簡單的 Observables,它們可以產生它們的下一個發射項,並且可以基於上一個發射項的值來決定何時終止。generateWithAbsoluteTime 的基本形式採用五個參數

  1. 要發射的第一個項目
  2. 一個函數,用於測試一個項目,以確定是否發射它 (true) 或終止 Observable (false)
  3. 一個函數,用於基於上一個項目的值產生下一個要測試和發射的項目
  4. 一個函數,用於在發射項目之前轉換它們
  5. 一個函數,指示產生器應該在什麼時間 (表示為 Date) 發射新項目

你還可以選擇傳入第六個參數,一個 排程器 (Scheduler)generate 將使用它來建立和發射其序列 (預設使用 currentThread)。

範例程式碼

var source = Rx.Observable.generate(
    1,
    function (x) { return x < 4; },
    function (x) { return x + 1; },
    function (x) { return x; },
    function (x) { return Date.now() + (100 * x); }
).timeInterval();

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
Next: {value: 1, interval: 100}
Next: {value: 2, interval: 200}
Next: {value: 3, interval: 300}
Completed

generateWithAbsoluteTime 位於以下發行版本中

  • rx.time.js (需要 rx.jsrx.compat.js)

RxPHP 將此操作符實作為 create

從指定的 subscribeAction 可呼叫實作建立一個 observable 序列。

範例程式碼

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/create/create.php

//With static method
$source = \Rx\Observable::create(function (\Rx\ObserverInterface $observer) {
    $observer->onNext(42);
    $observer->onCompleted();

    return new CallbackDisposable(function () {
        echo "Disposed\n";
    });
});

$subscription = $source->subscribe($createStdoutObserver());

   
Next value: 42
Complete!
Disposed
    
create

RxSwift 將此操作符實作為 create

範例程式碼

let source : Observable = Observable.create { observer in
    for i in 1...5 {
        observer.on(.next(i))
    }
    observer.on(.completed)

    // Note that this is optional. If you require no cleanup you can return
    // `Disposables.create()` (which returns the `NopDisposable` singleton)
    return Disposables.create {
        print("disposed")
    }
}

source.subscribe {
    print($0)
}
next(1)
next(2)
next(3)
next(4)
next(5)
completed
disposed
generate

你可以使用 generate 操作符來建立簡單的 Observables,它們可以產生它們的下一個發射項,並且可以基於上一個發射項的值來決定何時終止。generate 的基本形式採用三個參數

  1. 要發射的第一個項目
  2. 一個函數,用於測試一個項目,以確定是否發射它 (true) 或終止 Observable (false)
  3. 一個函數,用於基於上一個項目的值產生下一個要測試和發射的項目

你還可以選擇傳入第四個參數,一個 排程器 (Scheduler)generate 將使用它來建立和發射其序列 (預設使用 CurrentThreadScheduler)。

範例程式碼

let source = Observable.generate(
   initialState: 0,
   condition: { $0 < 3 },
   iterate: { $0 + 1 }
)

source.subscribe {
   print($0)
}
next(0)
next(1)
next(2)
completed