程式語言有許多方法可以取得計算結果的值,例如函數、未來 (futures)、動作 (actions)、可呼叫物件 (callables)、可執行物件 (runnables) 等等。這裡將這些在 Start 操作符分類下的東西,都使其行為像 Observable 一樣,這樣它們就可以在 Observable 串聯中與其他 Observable 鏈結。
待辦
待辦
Start 的各種 RxGroovy 實作位於可選的 rxjava-async
模組中。
rxjava-async
模組包含 start
操作符,它接受一個函數作為參數,呼叫該函數以檢索值,然後返回一個 Observable,該 Observable 將把該值發射給每個後續的訂閱者。
請注意,即使有多個訂閱者訂閱產生的 Observable,該函數也只會執行一次。
rxjava-async
模組還包括 toAsync
、asyncAction
和 asyncFunc
操作符。這些操作符接受一個函數或一個 Action 作為參數。如果是函數,此操作符的變體會呼叫該函數以檢索值,然後返回一個 Observable,該 Observable 會將該值發射給每個後續的訂閱者(就像 start
操作符一樣)。
如果是 Action,流程類似,但沒有返回值。在這種情況下,此操作符建立的 Observable 會在終止之前發射一個 null
。
請注意,即使有多個訂閱者訂閱產生的 Observable,該函數或 Action 也只會執行一次。
rxjava-async
模組還包括 startFuture
操作符。您將一個返回 Future
的函數傳遞給它。startFuture
會立即呼叫此函數以取得 Future
,並呼叫 Future
的 get
方法以嘗試取得其值。它會返回一個 Observable,並將此值發射給任何後續的訂閱者。
rxjava-async
模組還包括 deferFuture
操作符。您將一個返回 Future
的函數傳遞給它,該 Future
又返回一個 Observable。deferFuture
會返回一個 Observable,但在訂閱者訂閱它返回的 Observable 之前,不會呼叫您提供的函數。當它這樣做時,它會立即呼叫產生的 Future
上的 get
,然後將 Future
返回的 Observable 的發射內容反映為自己的發射內容。
這樣,您可以將一個返回 Observable 的 Future
包含在 Observable 的串聯中,作為其他 Observable 的同級。
rxjava-async
模組還包括 fromAction
操作符。它接受一個 Action
作為參數,並返回一個 Observable,該 Observable 會在 Action
終止時發射您傳遞給 fromAction
的項目。
rxjava-async
模組還包括 fromCallable
操作符。它接受一個 Callable
作為參數,並返回一個 Observable,該 Observable 會將此 callable 的結果作為其唯一的發射內容。
rxjava-async
模組還包括 fromRunnable
操作符。它接受一個 Runnable
作為參數,並返回一個 Observable,該 Observable 會在 Runnable
終止時發射您傳遞給 fromRunnable
的項目。
rxjava-async
模組還包括 forEachFuture
操作符。它並不是 Start 操作符的一個變體,而是它自己的東西。您將一些典型的訂閱者方法 (onNext
、onError
和 onCompleted
) 的子集傳遞給 forEachFuture
,而 Observable 會以通常的方式呼叫這些方法。但 forEachFuture
本身會返回一個 Future
,它會阻塞在 get
上,直到來源 Observable 完成,然後根據 Observable 的完成方式返回完成或錯誤。
如果需要一個會阻塞直到 Observable 完成的函數,您可以使用此方法。
rxjava-async
模組還包括 runAsync
操作符。它的特殊之處在於它會建立一個稱為 StoppableObservable
的 Observable 特殊化。
將一個 Action
和一個 Scheduler
傳遞給 runAsync
,它會返回一個 StoppableObservable
,該 StoppableObservable
使用指定的 Action
來產生它發射的項目。Action
接受一個 Observer
和一個 Subscription
。它使用 Subscription
來檢查 unsubscribed
條件,一旦滿足該條件,就會停止發射項目。您也可以隨時通過呼叫其 unsubscribe
方法來手動停止 StoppableObservable
(這也會取消與 StoppableObservable
相關聯的 Subscription
的訂閱)。
由於 runAsync
會立即呼叫 Action
並開始發射項目(也就是說,它會產生一個*熱* Observable),因此可能會在您使用此操作符建立 StoppableObservable
和訂閱者準備好接收項目之間的間隔中遺失一些項目。如果這是一個問題,您可以使用 runAsync
的變體,它也接受一個 Subject
,並傳遞一個 ReplaySubject
,您可以使用它來檢索那些遺失的項目。
在 RxGroovy 中,還有一個 From 操作符的版本,它會將 Future
轉換為 Observable,並以這種方式類似於 Start 操作符。
Start 的各種 RxJava 實作位於可選的 rxjava-async
模組中。
rxjava-async
模組包含 start
操作符,它接受一個函數作為參數,呼叫該函數以檢索值,然後返回一個 Observable,該 Observable 將把該值發射給每個後續的訂閱者。
請注意,即使有多個訂閱者訂閱產生的 Observable,該函數也只會執行一次。
rxjava-async
模組還包括 toAsync
、asyncAction
和 asyncFunc
操作符。這些操作符接受一個函數或一個 Action 作為參數。如果是函數,此操作符的變體會呼叫該函數以檢索值,然後返回一個 Observable,該 Observable 會將該值發射給每個後續的訂閱者(就像 start
操作符一樣)。
如果是 Action,流程類似,但沒有返回值。在這種情況下,此操作符建立的 Observable 會在終止之前發射一個 null
。
請注意,即使有多個訂閱者訂閱產生的 Observable,該函數或 Action 也只會執行一次。
rxjava-async
模組還包括 startFuture
操作符。您將一個返回 Future
的函數傳遞給它。startFuture
會立即呼叫此函數以取得 Future
,並呼叫 Future
的 get
方法以嘗試取得其值。它會返回一個 Observable,並將此值發射給任何後續的訂閱者。
rxjava-async
模組還包括 deferFuture
操作符。您將一個返回 Future
的函數傳遞給它,該 Future
又返回一個 Observable。deferFuture
會返回一個 Observable,但在訂閱者訂閱它返回的 Observable 之前,不會呼叫您提供的函數。當它這樣做時,它會立即呼叫產生的 Future
上的 get
,然後將 Future
返回的 Observable 的發射內容反映為自己的發射內容。
這樣,您可以將一個返回 Observable 的 Future
包含在 Observable 的串聯中,作為其他 Observable 的同級。
rxjava-async
模組還包括 fromAction
操作符。它接受一個 Action
作為參數,並返回一個 Observable,該 Observable 會在 Action
終止時發射您傳遞給 fromAction
的項目。
rxjava-async
模組還包括 fromCallable
操作符。它接受一個 Callable
作為參數,並返回一個 Observable,該 Observable 會將此 callable 的結果作為其唯一的發射內容。
rxjava-async
模組還包括 fromRunnable
操作符。它接受一個 Runnable
作為參數,並返回一個 Observable,該 Observable 會在 Runnable
終止時發射您傳遞給 fromRunnable
的項目。
rxjava-async
模組還包括 forEachFuture
操作符。它並不是 Start 操作符的一個變體,而是它自己的東西。您將一些典型的訂閱者方法 (onNext
、onError
和 onCompleted
) 的子集傳遞給 forEachFuture
,而 Observable 會以通常的方式呼叫這些方法。但 forEachFuture
本身會返回一個 Future
,它會阻塞在 get
上,直到來源 Observable 完成,然後根據 Observable 的完成方式返回完成或錯誤。
如果需要一個會阻塞直到 Observable 完成的函數,您可以使用此方法。
rxjava-async
模組還包括 runAsync
操作符。它的特殊之處在於它會建立一個稱為 StoppableObservable
的 Observable 特殊化。
將一個 Action
和一個 Scheduler
傳遞給 runAsync
,它會返回一個 StoppableObservable
,該 StoppableObservable
使用指定的 Action
來產生它發射的項目。Action
接受一個 Observer
和一個 Subscription
。它使用 Subscription
來檢查 unsubscribed
條件,一旦滿足該條件,就會停止發射項目。您也可以隨時通過呼叫其 unsubscribe
方法來手動停止 StoppableObservable
(這也會取消與 StoppableObservable
相關聯的 Subscription
的訂閱)。
由於 runAsync
會立即呼叫 Action
並開始發射項目(也就是說,它會產生一個*熱* Observable),因此可能會在您使用此操作符建立 StoppableObservable
和訂閱者準備好接收項目之間的間隔中遺失一些項目。如果這是一個問題,您可以使用 runAsync
的變體,它也接受一個 Subject
,並傳遞一個 ReplaySubject
,您可以使用它來檢索那些遺失的項目。
在 RxJava 中,還有一個 From 操作符的版本,它會將 Future
轉換為 Observable,並以這種方式類似於 Start 操作符。
RxJS 實作了 start
操作符。它將一個函數作為參數,該函數的返回值將是產生的 Observable 的發射內容,還可以選擇將該函數的任何其他參數和一個 Scheduler 作為參數,以便在該 Scheduler 上執行該函數。
var context = { value: 42 }; var source = Rx.Observable.start( function () { return this.value; }, context, Rx.Scheduler.timeout ); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 42 Completed
start
位於以下發行版中
rx.async.js
(需要 rx.binding.js
以及 rx.js
或 rx.compat.js
)rx.async.compat.js
(需要 rx.binding.js
以及 rx.js
或 rx.compat.js
)rx.lite.js
rx.lite.compat.js
RxJS 也實作了 startAsync
操作符。它將一個異步函數作為參數,該異步函數的返回值將是產生的 Observable 的發射內容。
您可以使用 toAsync
方法將函數轉換為異步函數。它將一個函數、函數參數和 Scheduler 作為參數,並返回一個異步函數,該函數將在指定的 Scheduler 上呼叫。最後兩個參數是可選的;如果您未指定 Scheduler,則預設會使用 timeout
Scheduler。
var source = Rx.Observable.startAsync(function () { return RSVP.Promise.resolve(42); }); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 42 Completed
startAsync
位於以下發行版中
rx.async.js
(需要 rx.binding.js
以及 rx.js
或 rx.compat.js
)rx.async.compat.js
(需要 rx.binding.js
以及 rx.js
或 rx.compat.js
)rx.lite.js
rx.lite.compat.js
toAsync
位於以下發行版中
rx.async.js
(需要 rx.binding.js
以及 rx.js
或 rx.compat.js
)rx.async.compat.js
(需要 rx.binding.js
以及 rx.js
或 rx.compat.js
)RxPHP 將此操作符實作為 start
。
在指定的 scheduler 上非同步呼叫指定的函數,並通過 observable 序列顯示結果。
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/start/start.php $source = Rx\Observable::start(function () { return 42; }); $source->subscribe($stdoutObserver);
Next value: 42 Complete!
待辦
待辦