你可以使用 Create 操作符從頭開始建立一個 Observable。你傳遞給這個操作符一個函數,該函數接受觀察者 (observer) 作為其參數。撰寫此函數使其行為如同一個 Observable — 透過適當地呼叫觀察者的 onNext
、onError
和 onCompleted
方法。
一個格式良好的有限 Observable 必須嘗試呼叫觀察者的 onCompleted
方法恰好一次,或呼叫其 onError
方法恰好一次,並且此後不得嘗試呼叫觀察者的任何其他方法。
待定
RxGroovy 將此操作符實作為 create
。
def myObservable = Observable.create({ aSubscriber -> try { for (int i = 1; i < 1000000; i++) { if (aSubscriber.isUnsubscribed()) { return; } aSubscriber.onNext(i); } if (!aSubscriber.isUnsubscribed()) { aSubscriber.onCompleted(); } } catch(Throwable t) { if (!aSubscriber.isUnsubscribed()) { aSubscriber.onError(t); } } })
檢查觀察者的 isUnsubscribed
狀態是一個好習慣,這樣你的 Observable 可以在沒有感興趣的觀察者時停止發射項目或執行昂貴的計算。
create
預設不會在任何特定的排程器 (Scheduler)上運作。
create(OnSubscribe)
RxJava 將此操作符實作為 create
。
從你傳遞給 create
的函數中檢查觀察者的 isUnsubscribed
狀態是一個好習慣,這樣你的 Observable 可以在沒有感興趣的觀察者時停止發射項目或執行昂貴的計算。
Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> observer) { try { if (!observer.isUnsubscribed()) { for (int i = 1; i < 5; i++) { observer.onNext(i); } observer.onCompleted(); } } catch (Exception e) { observer.onError(e); } } } ).subscribe(new Subscriber<Integer>() { @Override public void onNext(Integer item) { System.out.println("Next: " + item); } @Override public void onError(Throwable error) { System.err.println("Error: " + error.getMessage()); } @Override public void onCompleted() { System.out.println("Sequence complete."); } });
Next: 1 Next: 2 Next: 3 Next: 4 Sequence complete.
create
預設不會在任何特定的排程器 (Scheduler)上運作。
create(OnSubscribe)
RxJS 將此操作符實作為 create
(同一個操作符還有另一個名稱:createWithDisposable
)。
/* Using a function */ var source = Rx.Observable.create(function (observer) { observer.onNext(42); observer.onCompleted(); // Note that this is optional, you do not have to return this if you require no cleanup return function () { console.log('disposed'); }; }); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 42 Completed
/* Using a disposable */ var source = Rx.Observable.create(function (observer) { observer.onNext(42); observer.onCompleted(); // Note that this is optional, you do not have to return this if you require no cleanup return Rx.Disposable.create(function () { console.log('disposed'); }); }); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 42 Completed
create
位於以下發行版本中
rx.js
rx.all.js
rx.all.compat.js
rx.compat.js
rx.lite.js
rx.lite.compat.js
你可以使用 generate
操作符來建立簡單的 Observables,它們可以產生它們的下一個發射項,並且可以基於上一個發射項的值來決定何時終止。generate
的基本形式採用四個參數
true
) 或終止 Observable (false
)你還可以選擇傳入第五個參數,一個 排程器 (Scheduler),generate
將使用它來建立和發射其序列 (預設使用 currentThread
)。
var source = Rx.Observable.generate( 0, function (x) { return x < 3; }, function (x) { return x + 1; }, function (x) { return x; } ); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 0 Next: 1 Next: 2 Completed
generate
位於以下發行版本中
rx.js
rx.compat.js
rx.lite.js
rx.lite.compat.js
你可以使用 generateWithRelativeTime
操作符來建立簡單的 Observables,它們可以產生它們的下一個發射項,並且可以基於上一個發射項的值來決定何時終止。generateWithRelativeTime
的基本形式採用五個參數
true
) 或終止 Observable (false
)你還可以選擇傳入第六個參數,一個 排程器 (Scheduler),generate
將使用它來建立和發射其序列 (預設使用 currentThread
)。
var source = Rx.Observable.generateWithRelativeTime( 1, function (x) { return x < 4; }, function (x) { return x + 1; }, function (x) { return x; }, function (x) { return 100 * x; } ).timeInterval(); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: {value: 1, interval: 100} Next: {value: 2, interval: 200} Next: {value: 3, interval: 300} Completed
generateWithRelativeTime
位於以下發行版本中
rx.lite.js
rx.lite.compat.js
rx.time.js
(需要 rx.js
或 rx.compat.js
)你可以使用 generateWithAbsoluteTime
操作符來建立簡單的 Observables,它們可以產生它們的下一個發射項,並且可以基於上一個發射項的值來決定何時終止。generateWithAbsoluteTime
的基本形式採用五個參數
true
) 或終止 Observable (false
)Date
) 發射新項目你還可以選擇傳入第六個參數,一個 排程器 (Scheduler),generate
將使用它來建立和發射其序列 (預設使用 currentThread
)。
var source = Rx.Observable.generate( 1, function (x) { return x < 4; }, function (x) { return x + 1; }, function (x) { return x; }, function (x) { return Date.now() + (100 * x); } ).timeInterval(); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: {value: 1, interval: 100} Next: {value: 2, interval: 200} Next: {value: 3, interval: 300} Completed
generateWithAbsoluteTime
位於以下發行版本中
rx.time.js
(需要 rx.js
或 rx.compat.js
)RxPHP 將此操作符實作為 create
。
從指定的 subscribeAction 可呼叫實作建立一個 observable 序列。
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/create/create.php //With static method $source = \Rx\Observable::create(function (\Rx\ObserverInterface $observer) { $observer->onNext(42); $observer->onCompleted(); return new CallbackDisposable(function () { echo "Disposed\n"; }); }); $subscription = $source->subscribe($createStdoutObserver());
Next value: 42 Complete! Disposed
RxSwift 將此操作符實作為 create
。
let source : Observable= Observable.create { observer in for i in 1...5 { observer.on(.next(i)) } observer.on(.completed) // Note that this is optional. If you require no cleanup you can return // `Disposables.create()` (which returns the `NopDisposable` singleton) return Disposables.create { print("disposed") } } source.subscribe { print($0) }
next(1) next(2) next(3) next(4) next(5) completed disposed
你可以使用 generate
操作符來建立簡單的 Observables,它們可以產生它們的下一個發射項,並且可以基於上一個發射項的值來決定何時終止。generate
的基本形式採用三個參數
true
) 或終止 Observable (false
)你還可以選擇傳入第四個參數,一個 排程器 (Scheduler),generate
將使用它來建立和發射其序列 (預設使用 CurrentThreadScheduler
)。
let source = Observable.generate( initialState: 0, condition: { $0 < 3 }, iterate: { $0 + 1 } ) source.subscribe { print($0) }
next(0) next(1) next(2) completed