如果你想在 Observable 操作符的級聯中引入多執行緒,你可以指示這些操作符(或特定的 Observables)在特定的 排程器 (Scheduler) 上執行。
一些 ReactiveX Observable 操作符具有將排程器作為參數的變體。這些變體指示操作符在特定的排程器上完成部分或全部工作。
預設情況下,一個 Observable 和你應用於它的操作符鏈將在呼叫其 Subscribe
方法的同一個執行緒上執行其工作並通知其觀察者。SubscribeOn 操作符通過指定 Observable 應在其上運作的不同排程器來改變這種行為。ObserveOn 操作符指定 Observable 將用於向其觀察者發送通知的不同排程器。
如此圖所示,SubscribeOn 操作符指定 Observable 將開始運作的執行緒,無論該操作符在操作符鏈中的哪個位置被呼叫。ObserveOn,另一方面,會影響 Observable 將在該操作符出現 *之下* 使用的執行緒。因此,你可以在 Observable 操作符鏈中的各個點多次呼叫 ObserveOn,以更改某些操作符運作所在的執行緒。
待定
待定
你可以從 Schedulers
類別 中描述的工廠方法取得排程器。下表顯示了你可以通過 RxGroovy 中的這些方法使用的排程器種類
排程器 (Scheduler) | 目的 |
---|---|
Schedulers.computation( ) | 適用於計算工作,例如事件迴圈和回調處理;不要將此排程器用於 I/O(請改用 Schedulers.io( ) );預設情況下,執行緒數等於處理器數 |
Schedulers.from(executor) | 將指定的 Executor 用作排程器 |
Schedulers.immediate( ) | 排程工作以在目前的執行緒中立即開始 |
Schedulers.io( ) | 適用於 I/O 繫結的工作,例如非同步執行阻塞 I/O,此排程器由一個會根據需要成長的執行緒池支援;對於一般的計算工作,請切換到 Schedulers.computation( ) ;預設情況下,Schedulers.io( ) 是一個 CachedThreadScheduler ,它有點像具有執行緒快取的新的執行緒排程器 |
Schedulers.newThread( ) | 為每個工作單元建立新的執行緒 |
Schedulers.trampoline( ) | 將工作排隊以在目前執行緒中任何已排隊的工作之後開始 |
RxGroovy 中的一些 Observable 操作符具有允許你設定操作符將用於(至少一部分)其操作的排程器的替代形式。其他操作符則不在任何特定的排程器上運作,或者在特定的預設排程器上運作。具有特定預設排程器的操作符包括
TestScheduler
允許你精確地手動控制排程器的時鐘行為。這對於測試依賴於時間上精確排列的動作的互動非常有用。這個排程器有三個額外的方法
advanceTimeTo(time,unit)
advanceTimeBy(time,unit)
triggerActions( )
你可以從 Schedulers
類別 中描述的工廠方法取得排程器。下表顯示了你可以通過 RxJava 中的這些方法使用的排程器種類
排程器 (Scheduler) | 目的 |
---|---|
Schedulers.computation( ) | 適用於計算工作,例如事件迴圈和回調處理;不要將此排程器用於 I/O(請改用 Schedulers.io( ) );預設情況下,執行緒數等於處理器數 |
Schedulers.from(executor) | 將指定的 Executor 用作排程器 |
Schedulers.immediate( ) | 排程工作以在目前的執行緒中立即開始 |
Schedulers.io( ) | 適用於 I/O 繫結的工作,例如非同步執行阻塞 I/O,此排程器由一個會根據需要成長的執行緒池支援;對於一般的計算工作,請切換到 Schedulers.computation( ) ;預設情況下,Schedulers.io( ) 是一個 CachedThreadScheduler ,它有點像具有執行緒快取的新的執行緒排程器 |
Schedulers.newThread( ) | 為每個工作單元建立新的執行緒 |
Schedulers.trampoline( ) | 將工作排隊以在目前執行緒中任何已排隊的工作之後開始 |
RxJava 中的一些 Observable 操作符具有允許你設定操作符將用於(至少一部分)其操作的排程器的替代形式。其他操作符則不在任何特定的排程器上運作,或者在特定的預設排程器上運作。具有特定預設排程器的操作符包括
除了將這些排程器傳遞給 RxJava Observable 操作符之外,你還可以使用它們在訂閱 (Subscription) 上排程你自己的工作。以下範例使用 schedule
方法,該方法來自 Scheduler.Worker
類別,以在 newThread
排程器上排程工作
worker = Schedulers.newThread().createWorker(); worker.schedule(new Action0() { @Override public void call() { yourWork(); } }); // some time later... worker.unsubscribe();
要排程遞迴呼叫,你可以在 Worker 物件上使用 schedule
,然後使用 schedule(this)
worker = Schedulers.newThread().createWorker(); worker.schedule(new Action0() { @Override public void call() { yourWork(); // recurse until unsubscribed (schedule will do nothing if unsubscribed) worker.schedule(this); } }); // some time later... worker.unsubscribe();
Worker
類別的物件實作 Subscription
介面,其中具有 isUnsubscribed
和 unsubscribe
方法,因此你可以在訂閱被取消時停止工作,或者你可以從排程的工作中取消訂閱
Worker worker = Schedulers.newThread().createWorker(); Subscription mySubscription = worker.schedule(new Action0() { @Override public void call() { while(!worker.isUnsubscribed()) { status = yourWork(); if(QUIT == status) { worker.unsubscribe(); } } } });
Worker
也是一個 Subscription
,因此你可以(並且應該最終)呼叫其 unsubscribe
方法,以表示它可以停止工作並釋放資源
worker.unsubscribe();
你也可以使用 schedule
的版本,該版本會在給定的排程器上的某個時間跨度過去後,延遲你的動作。以下範例排程 someAction
在根據該排程器的時鐘過去 500 毫秒後在 someScheduler
上執行
someScheduler.schedule(someAction, 500, TimeUnit.MILLISECONDS);
另一個 Scheduler
方法允許你排程動作以固定間隔發生。以下範例排程 someAction
在過去 500 毫秒後在 someScheduler
上執行,然後每隔 250 毫秒執行一次
someScheduler.schedulePeriodically(someAction, 500, 250, TimeUnit.MILLISECONDS);
TestScheduler
允許你精確地手動控制排程器的時鐘行為。這對於測試依賴於時間上精確排列的動作的互動非常有用。這個排程器有三個額外的方法
advanceTimeTo(time,unit)
advanceTimeBy(time,unit)
triggerActions( )
在 RxJS 中,你從 Rx.Scheduler
物件或獨立實作的物件中取得排程器。下表顯示了你在 RxJS 中可以使用的排程器種類:
排程器 (Scheduler) | 目的 |
---|---|
Rx.Scheduler.currentThread | 盡快在目前執行緒上排程工作 |
Rx.HistoricalScheduler | 排程工作,就好像它發生在任意歷史時間一樣 |
Rx.Scheduler.immediate | 立即在目前執行緒上排程工作 |
Rx.TestScheduler | 用於單元測試;這允許你手動操作時間的移動 |
Rx.Scheduler.timeout | 通過定時回調排程工作 |
待定
待定
待定
待定
待定