排程器 (Scheduler)

如果你想在 Observable 操作符的級聯中引入多執行緒,你可以指示這些操作符(或特定的 Observables)在特定的 排程器 (Scheduler) 上執行。

一些 ReactiveX Observable 操作符具有將排程器作為參數的變體。這些變體指示操作符在特定的排程器上完成部分或全部工作。

預設情況下,一個 Observable 和你應用於它的操作符鏈將在呼叫其 Subscribe 方法的同一個執行緒上執行其工作並通知其觀察者。SubscribeOn 操作符通過指定 Observable 應在其上運作的不同排程器來改變這種行為。ObserveOn 操作符指定 Observable 將用於向其觀察者發送通知的不同排程器。

如此圖所示,SubscribeOn 操作符指定 Observable 將開始運作的執行緒,無論該操作符在操作符鏈中的哪個位置被呼叫。ObserveOn,另一方面,會影響 Observable 將在該操作符出現 *之下* 使用的執行緒。因此,你可以在 Observable 操作符鏈中的各個點多次呼叫 ObserveOn,以更改某些操作符運作所在的執行緒。

ObserveOn and SubscribeOn

另請參閱

特定語言資訊

待定

待定

排程器的種類

你可以從 Schedulers 類別 中描述的工廠方法取得排程器。下表顯示了你可以通過 RxGroovy 中的這些方法使用的排程器種類

排程器 (Scheduler)目的
Schedulers.computation( )適用於計算工作,例如事件迴圈和回調處理;不要將此排程器用於 I/O(請改用 Schedulers.io( ));預設情況下,執行緒數等於處理器數
Schedulers.from(executor)將指定的 Executor 用作排程器
Schedulers.immediate( )排程工作以在目前的執行緒中立即開始
Schedulers.io( )適用於 I/O 繫結的工作,例如非同步執行阻塞 I/O,此排程器由一個會根據需要成長的執行緒池支援;對於一般的計算工作,請切換到 Schedulers.computation( );預設情況下,Schedulers.io(&#8239) 是一個 CachedThreadScheduler,它有點像具有執行緒快取的新的執行緒排程器
Schedulers.newThread( )為每個工作單元建立新的執行緒
Schedulers.trampoline( )將工作排隊以在目前執行緒中任何已排隊的工作之後開始

RxGroovy Observable 操作符的預設排程器

RxGroovy 中的一些 Observable 操作符具有允許你設定操作符將用於(至少一部分)其操作的排程器的替代形式。其他操作符則不在任何特定的排程器上運作,或者在特定的預設排程器上運作。具有特定預設排程器的操作符包括

操作符排程器 (Scheduler)
buffer(timespan)computation (計算)
buffer(timespan, count)computation (計算)
buffer(timespan, timeshift)computation (計算)
debounce(timeout, unit)computation (計算)
delay(delay, unit)computation (計算)
delaySubscription(delay, unit)computation (計算)
interval (間隔)computation (計算)
repeat (重複)trampoline (彈簧床)
replay(time, unit)computation (計算)
replay(buffersize, time, unit)computation (計算)
replay(selector, time, unit)computation (計算)
replay(selector, buffersize, time, unit)computation (計算)
retry (重試)trampoline (彈簧床)
sample(period, unit)computation (計算)
skip(time, unit)computation (計算)
skipLast(time, unit)computation (計算)
take(time, unit)computation (計算)
takeLast(time, unit)computation (計算)
takeLast(count, time, unit)computation (計算)
takeLastBuffer(time, unit)computation (計算)
takeLastBuffer(count, time, unit)computation (計算)
throttleFirstcomputation (計算)
throttleLastcomputation (計算)
throttleWithTimeoutcomputation (計算)
timeIntervalcomputation (計算)
timeout(timeoutSelector)immediate (立即)
timeout(firstTimeoutSelector, timeoutSelector)immediate (立即)
timeout(timeoutSelector, other)immediate (立即)
timeout(timeout, timeUnit)computation (計算)
timeout(firstTimeoutSelector, timeoutSelector, other)immediate (立即)
timeout(timeout, timeUnit, other)computation (計算)
timer (計時器)computation (計算)
timestamp (時間戳記)computation (計算)
window(timespan)computation (計算)
window(timespan, count)computation (計算)
window(timespan, timeshift)computation (計算)

測試排程器

TestScheduler 允許你精確地手動控制排程器的時鐘行為。這對於測試依賴於時間上精確排列的動作的互動非常有用。這個排程器有三個額外的方法

advanceTimeTo(time,unit)
將排程器的時鐘推進到特定的時間點
advanceTimeBy(time,unit)
將排程器的時鐘向前推進特定的時間量
triggerActions( )
根據排程器的時鐘,啟動任何已排程在等於或早於目前時間的時間執行的未啟動動作

另請參閱

排程器的種類

你可以從 Schedulers 類別 中描述的工廠方法取得排程器。下表顯示了你可以通過 RxJava 中的這些方法使用的排程器種類

排程器 (Scheduler)目的
Schedulers.computation( )適用於計算工作,例如事件迴圈和回調處理;不要將此排程器用於 I/O(請改用 Schedulers.io( ));預設情況下,執行緒數等於處理器數
Schedulers.from(executor)將指定的 Executor 用作排程器
Schedulers.immediate( )排程工作以在目前的執行緒中立即開始
Schedulers.io( )適用於 I/O 繫結的工作,例如非同步執行阻塞 I/O,此排程器由一個會根據需要成長的執行緒池支援;對於一般的計算工作,請切換到 Schedulers.computation( );預設情況下,Schedulers.io(&#8239) 是一個 CachedThreadScheduler,它有點像具有執行緒快取的新的執行緒排程器
Schedulers.newThread( )為每個工作單元建立新的執行緒
Schedulers.trampoline( )將工作排隊以在目前執行緒中任何已排隊的工作之後開始

RxJava 1.x Observable 操作符的預設排程器

RxJava 中的一些 Observable 操作符具有允許你設定操作符將用於(至少一部分)其操作的排程器的替代形式。其他操作符則不在任何特定的排程器上運作,或者在特定的預設排程器上運作。具有特定預設排程器的操作符包括

操作符排程器 (Scheduler)
buffer(timespan)computation (計算)
buffer(timespan, count)computation (計算)
buffer(timespan, timeshift)computation (計算)
debounce(timeout, unit)computation (計算)
delay(delay, unit)computation (計算)
delaySubscription(delay, unit)computation (計算)
interval (間隔)computation (計算)
repeat (重複)trampoline (彈簧床)
replay(time, unit)computation (計算)
replay(buffersize, time, unit)computation (計算)
replay(selector, time, unit)computation (計算)
replay(selector, buffersize, time, unit)computation (計算)
retry (重試)trampoline (彈簧床)
sample(period, unit)computation (計算)
skip(time, unit)computation (計算)
skipLast(time, unit)computation (計算)
take(time, unit)computation (計算)
takeLast(time, unit)computation (計算)
takeLast(count, time, unit)computation (計算)
takeLastBuffer(time, unit)computation (計算)
takeLastBuffer(count, time, unit)computation (計算)
throttleFirstcomputation (計算)
throttleLastcomputation (計算)
throttleWithTimeoutcomputation (計算)
timeIntervalcomputation (計算)
timeout(timeoutSelector)immediate (立即)
timeout(firstTimeoutSelector, timeoutSelector)immediate (立即)
timeout(timeoutSelector, other)immediate (立即)
timeout(timeout, timeUnit)computation (計算)
timeout(firstTimeoutSelector, timeoutSelector, other)immediate (立即)
timeout(timeout, timeUnit, other)computation (計算)
timer (計時器)computation (計算)
timestamp (時間戳記)computation (計算)
window(timespan)computation (計算)
window(timespan, count)computation (計算)
window(timespan, timeshift)computation (計算)

使用排程器

除了將這些排程器傳遞給 RxJava Observable 操作符之外,你還可以使用它們在訂閱 (Subscription) 上排程你自己的工作。以下範例使用 schedule 方法,該方法來自 Scheduler.Worker 類別,以在 newThread 排程器上排程工作

worker = Schedulers.newThread().createWorker();
worker.schedule(new Action0() {

    @Override
    public void call() {
        yourWork();
    }

});
// some time later...
worker.unsubscribe();

遞迴排程器

要排程遞迴呼叫,你可以在 Worker 物件上使用 schedule,然後使用 schedule(this)

worker = Schedulers.newThread().createWorker();
worker.schedule(new Action0() {

    @Override
    public void call() {
        yourWork();
        // recurse until unsubscribed (schedule will do nothing if unsubscribed)
        worker.schedule(this);
    }

});
// some time later...
worker.unsubscribe();

檢查或設定已取消訂閱狀態

Worker 類別的物件實作 Subscription 介面,其中具有 isUnsubscribedunsubscribe 方法,因此你可以在訂閱被取消時停止工作,或者你可以從排程的工作中取消訂閱

Worker worker = Schedulers.newThread().createWorker();
Subscription mySubscription = worker.schedule(new Action0() {

    @Override
    public void call() {
        while(!worker.isUnsubscribed()) {
            status = yourWork();
            if(QUIT == status) { worker.unsubscribe(); }
        }
    }

});

Worker 也是一個 Subscription,因此你可以(並且應該最終)呼叫其 unsubscribe 方法,以表示它可以停止工作並釋放資源

worker.unsubscribe();

延遲和定期排程器

你也可以使用 schedule 的版本,該版本會在給定的排程器上的某個時間跨度過去後,延遲你的動作。以下範例排程 someAction 在根據該排程器的時鐘過去 500 毫秒後在 someScheduler 上執行

someScheduler.schedule(someAction, 500, TimeUnit.MILLISECONDS);

另一個 Scheduler 方法允許你排程動作以固定間隔發生。以下範例排程 someAction 在過去 500 毫秒後在 someScheduler 上執行,然後每隔 250 毫秒執行一次

someScheduler.schedulePeriodically(someAction, 500, 250, TimeUnit.MILLISECONDS);

測試排程器

TestScheduler 允許你精確地手動控制排程器的時鐘行為。這對於測試依賴於時間上精確排列的動作的互動非常有用。這個排程器有三個額外的方法

advanceTimeTo(time,unit)
將排程器的時鐘推進到特定的時間點
advanceTimeBy(time,unit)
將排程器的時鐘向前推進特定的時間量
triggerActions( )
根據排程器的時鐘,啟動任何已排程在等於或早於目前時間的時間執行的未啟動動作

另請參閱

</div> </div> </div>

在 RxJS 中,你從 Rx.Scheduler 物件或獨立實作的物件中取得排程器。下表顯示了你在 RxJS 中可以使用的排程器種類:

排程器 (Scheduler)目的
Rx.Scheduler.currentThread盡快在目前執行緒上排程工作
Rx.HistoricalScheduler排程工作,就好像它發生在任意歷史時間一樣
Rx.Scheduler.immediate立即在目前執行緒上排程工作
Rx.TestScheduler用於單元測試;這允許你手動操作時間的移動
Rx.Scheduler.timeout通過定時回調排程工作

另請參閱

待定

待定

待定

待定

待定

</div>