在 ReactiveX 中,很容易遇到 Observable 發射項目的速度快於操作符或觀察者消耗它們的速度的情況。這就出現了一個問題,如何處理不斷增長、未消耗的項目積壓。
例如,想像使用 Zip 操作符來將兩個無限的 Observable 合併在一起,其中一個發射項目的頻率是另一個的兩倍。操作符的幼稚實現將不得不維護一個不斷擴展的緩衝區,來存放較快 Observable 發射的項目,以便最終與較慢的 Observable 發射的項目組合。這可能導致 ReactiveX 佔用大量的系統資源。
在 ReactiveX 中,有多種策略可以進行流量控制和背壓,以緩解快速生產的 Observable 遇到慢速消耗的觀察者時引起的問題,其中包括,在某些 ReactiveX 實現中,反應式拉取背壓和一些特定的背壓操作符。
冷的 Observable 發射特定的項目序列,但是當其觀察者認為方便時,可以開始發射此序列,並以觀察者期望的任何速率發射,而不會破壞序列的完整性。例如,如果將靜態的可迭代對象轉換為 Observable,則無論稍後何時訂閱或觀察這些項目的頻率如何,該 Observable 都會發射相同的項目序列。冷 Observable 發射的項目示例可能包括資料庫查詢、文件檢索或 Web 請求的結果。
熱的 Observable 在創建時立即開始生成要發射的項目。訂閱者通常從熱的 Observable 發射的項目序列的中間某處開始觀察,從建立訂閱後 Observable 發射的第一個項目開始。這樣的 Observable 以自己的速度發射項目,並且由其觀察者來跟上。熱的 Observable 發射的項目示例可能包括滑鼠和鍵盤事件、系統事件或股票價格。
當冷的 Observable 被多播時(當它被轉換為可連接的 Observable 並且它的 Connect 方法被調用時),它實際上變成了熱的,並且為了背壓和流量控制的目的,應該將其視為熱的 Observable。
冷的 Observable 非常適合某些 ReactiveX 實現所實現的反應式拉取背壓模型(在其他地方描述)。熱的 Observable 通常不適合反應式拉取模型,並且是其他流量控制策略的更好候選者,例如使用此頁面上描述的操作符,或者像 Buffer、Sample、Debounce 或 Window 的操作符。
待定
RxGroovy 實現了反應式拉取背壓,並且其許多操作符都支援這種形式的背壓。它還具有三個可以應用於尚未編寫為支援背壓的 Observable 的操作符
onBackpressureBuffer
維護來自源 Observable 的所有未觀察到的發射的緩衝區,並根據下游觀察者產生的請求將它們發射到下游觀察者。
在 RxGroovy 1.1 中引入的操作符版本允許您設置緩衝區的容量;應用此操作符將導致如果緩衝區溢出,則導致產生的 Observable 以錯誤終止。在同一版本中引入的第二個版本允許您設置一個 Action
,如果緩衝區溢出,onBackpressureBuffer
將會調用它。
onBackpressureBuffer()
onBackpressureBuffer(long)
(RxGroovy 1.1)onBackpressureBuffer(long, Action0)
(RxGroovy 1.1)
onBackpressureDrop
會丟棄來自源 Observable 的發射,除非有來自下游觀察者的待處理請求,在這種情況下,它將發射足夠的項目來滿足請求。
在 1.1 版本中引入的操作符版本會通過您作為參數傳遞的 Action
來通知您,何時丟棄了項目以及丟棄了哪個項目。
onBackpressureDrop()
onBackpressureLatest
(RxJava 1.1 中新增)會保留來自源 Observable 的最新發射項目,並在請求時立即將該項目發射給其觀察者。它會丟棄觀察者請求之間觀察到的任何其他項目。
onBackpressureLatest()
RxJava 實現了反應式拉取背壓,並且其許多操作符都支援這種形式的背壓。它還具有三個可以應用於尚未編寫為支援背壓的 Observable 的操作符
onBackpressureBuffer
維護來自源 Observable 的所有未觀察到的發射的緩衝區,並根據下游觀察者產生的請求將它們發射到下游觀察者。
在 RxJava 1.1 中引入的操作符版本允許您設置緩衝區的容量;應用此操作符將導致如果緩衝區溢出,則導致產生的 Observable 以錯誤終止。在同一版本中引入的第二個版本允許您設置一個 Action
,如果緩衝區溢出,onBackpressureBuffer
將會調用它。
onBackpressureBuffer()
onBackpressureBuffer(long)
(RxJava 1.1)onBackpressureBuffer(long, Action0)
(RxJava 1.1)
onBackpressureDrop
會丟棄來自源 Observable 的發射,除非有來自下游觀察者的待處理請求,在這種情況下,它將發射足夠的項目來滿足請求。
在 1.1 版本中引入的操作符版本會通過您作為參數傳遞的 Action
來通知您,何時丟棄了項目以及丟棄了哪個項目。
onBackpressureDrop()
onBackpressureLatest
(RxJava 1.1 中新增)會保留來自源 Observable 的最新發射項目,並在請求時立即將該項目發射給其觀察者。它會丟棄觀察者請求之間觀察到的任何其他項目。
onBackpressureLatest()
RxJS 通過使用 controlled
操作符將普通的 Observable 轉換為 ControlledObservable
來實現背壓。這會強制 Observable 尊重來自其觀察者的拉取 request
,而不是主動推送項目。
作為使用 request
從 ControlledObservable
拉取項目的替代方法,您可以將 stopAndWait
操作符應用於它。此操作符將在每次觀察者的 onNext
例程收到最新項目時,從 Observable 請求一個新項目。
第二種可能性是使用 windowed(
n)
。它的行為類似於 stopAndWait
,但是具有 n 個項目的內部緩衝區,這允許 ControlledObservable
不時地在觀察者前面運行。windowed(1)
等效於 stopAndWait
。
還有兩個操作符可以將普通的 Observable 轉換為 PausableObservable
。
如果您調用使用 pausable
操作符創建的 PausableObservable
的 pause
方法,它將丟棄(忽略)底層源 Observable 發射的任何項目,直到您調用其 resume
方法時,它才會繼續將發射的項目傳遞給其觀察者。
如果您調用使用 pausableBuffered
操作符創建的 PausableObservable
的 pause
方法,它將緩衝底層源 Observable 發射的任何項目,直到您調用其 resume
方法時,它才會發射那些緩衝的項目,然後繼續將任何其他發射的項目傳遞給其觀察者。
待定
待定
待定
待定