Buffer 運算子會將發出項目的 Observable 轉換為發出這些項目緩衝集合的 Observable。在各種特定語言的 Buffer 實作中,有許多變體在於它們如何選擇哪些項目進入哪些緩衝區。
請注意,如果來源 Observable 發出 onError
通知,Buffer 會立即傳遞此通知,而不會先發出它正在組裝的緩衝區,即使該緩衝區包含來源 Observable 在發出錯誤通知之前發出的項目也是如此。
Window 運算子與 Buffer 類似,但會將項目收集到單獨的 Observable 中,而不是收集到資料結構中再重新發出。
RxCpp 實作了 Buffer 的兩個變體
buffer(count)
buffer(count)
發出非重疊的緩衝區,形式為 vector
,每個緩衝區最多包含來自來源 Observable 的 count
個項目(最後發出的 vector
可能少於 count
個項目)。
buffer(count, skip)
buffer(count, skip)
從來源 Observable 發出的第一個項目開始建立新的緩衝區,然後每隔 skip
個項目建立一個,並以 count
個項目填滿每個緩衝區:初始項目和後續的 count-1
個項目。它會以 vector
的形式發出這些緩衝區。根據 count
和 skip
的值,這些緩衝區可能會重疊(多個緩衝區可能包含相同的項目),或者它們可能會有間隙(來源 Observable 發出的項目未在任何緩衝區中表示)。
在 RxGroovy 中,Buffer 有幾個變體
buffer(count)
buffer(count)
發出非重疊的緩衝區,形式為 List
,每個緩衝區最多包含來自來源 Observable 的 count
個項目(最後發出的 List
可能少於 count
個項目)。
buffer(int)
buffer(count, skip)
buffer(count, skip)
從來源 Observable 發出的第一個項目開始建立新的緩衝區,然後每隔 skip
個項目建立一個,並以 count
個項目填滿每個緩衝區:初始項目和後續的 count-1
個項目。它會以 List
的形式發出這些緩衝區。根據 count
和 skip
的值,這些緩衝區可能會重疊(多個緩衝區可能包含相同的項目),或者它們可能會有間隙(來源 Observable 發出的項目未在任何緩衝區中表示)。
buffer(int,int)
buffer(bufferClosingSelector)
當它訂閱來源 Observable 時,buffer(bufferClosingSelector)
開始將其發出的項目收集到 List
中,並且它還會呼叫 bufferClosingSelector
來產生第二個 Observable。當這個第二個 Observable 發出 TClosing
物件時,buffer
會發出目前的 List
並重複此程序:開始新的 List
並呼叫 bufferClosingSelector
來建立新的 Observable 以進行監控。它會這樣做,直到來源 Observable 終止。
buffer(Func0)
buffer(boundary
[, initialCapacity
])
buffer(boundary)
會監控 Observable boundary
。每當該 Observable 發出一個項目時,它會建立一個新的 List
,開始收集來源 Observable 發出的項目,並發出先前的 List
。
buffer(Observable)
buffer(Observable,int)
buffer(bufferOpenings, bufferClosingSelector)
buffer(bufferOpenings, bufferClosingSelector)
會監控 Observable bufferOpenings
,該 Observable 發出 BufferOpening
物件。每當它觀察到發出的項目時,它會建立一個新的 List
,開始收集來源 Observable 發出的項目,並將 bufferOpenings
Observable 傳遞到 closingSelector
函數中。該函數會傳回一個 Observable。buffer
會監控該 Observable,當它偵測到其中發出的項目時,它會關閉 List
並將其作為自己的發出項目發出。
buffer(Observable,Func1)
buffer(timespan, unit
[, scheduler
])
buffer(timespan, unit)
會定期發出新的項目 List
,每隔 timespan
的時間,其中包含自上次捆綁發出以來,或在第一個捆綁的情況下,自訂閱來源 Observable 以來,來源 Observable 發出的所有項目。此運算子的變體也有一個版本,它接受 Scheduler
作為參數,並使用它來管理時間跨度;預設情況下,此變體使用 computation
Scheduler。
buffer(long,TimeUnit)
buffer(long,TimeUnit,Scheduler)
buffer(timespan, unit, count
[, scheduler
])
buffer(timespan, unit, count)
會為來源 Observable 發出的每 count
個項目發出一個新的項目 List
,或者,如果自上次捆綁發出以來已過了 timespan
,它會發出在該時間跨度內來源 Observable 發出的任意數量的項目捆綁包,即使這少於 count
。此運算子的變體也有一個版本,它接受 Scheduler
作為參數,並使用它來管理時間跨度;預設情況下,此變體使用 computation
scheduler。
buffer(long,TimeUnit,int)
buffer(long,TimeUnit,int,Scheduler)
buffer(timespan, timeshift, unit
[, scheduler
])
buffer(timespan, timeshift, unit)
每隔 timeshift
的時間建立一個新的項目 List
,並以自該時間起直到自捆綁包建立以來經過 timespan
時間,來源 Observable 發出的每個項目填滿此捆綁包,然後發出此 List
作為自己的發出項目。如果 timespan
長於 timeshift
,則發出的捆綁包將表示重疊的時間段,因此它們可能包含重複的項目。此運算子的變體也有一個版本,它接受 Scheduler
作為參數,並使用它來管理時間跨度;預設情況下,此變體使用 computation
scheduler。
buffer(long,long,TimeUnit)
buffer(long,long,TimeUnit,Scheduler)
您可以使用 Buffer 運算子來實作背壓(也就是說,處理 Observable 可能產生項目太快而觀察者無法消耗的情況)。
Buffer 可以將許多項目的序列減少為較少項目的緩衝區序列,使其更易於管理。例如,您可以定期以固定的時間間隔關閉並發出 bursty Observable 中的項目緩衝區。
Observable<List<Integer>> burstyBuffered = bursty.buffer(500, TimeUnit.MILLISECONDS);
或者您可以發揮想像力,在 bursty 時段收集緩衝區中的項目,並在每個 bursty 結束時發出它們,方法是使用 Debounce 運算子向 buffer 運算子發出緩衝區關閉指示。
// we have to multicast the original bursty Observable so we can use it // both as our source and as the source for our buffer closing selector: Observable<Integer> burstyMulticast = bursty.publish().refCount(); // burstyDebounced will be our buffer closing selector: Observable<Integer> burstyDebounced = burstyMulticast.debounce(10, TimeUnit.MILLISECONDS); // and this, finally, is the Observable of buffers we're interested in: Observable<List<Integer>> burstyBuffered = burstyMulticast.buffer(burstyDebounced);
在 RxJava 中,Buffer 有幾個變體
buffer(count)
buffer(count)
發出非重疊的緩衝區,形式為 List
,每個緩衝區最多包含來自來源 Observable 的 count
個項目(最後發出的 List
可能少於 count
個項目)。
buffer(int)
buffer(count, skip)
buffer(count, skip)
從來源 Observable 發出的第一個項目開始建立新的緩衝區,然後每隔 skip
個項目建立一個,並以 count
個項目填滿每個緩衝區:初始項目和後續的 count-1
個項目。它會以 List
的形式發出這些緩衝區。根據 count
和 skip
的值,這些緩衝區可能會重疊(多個緩衝區可能包含相同的項目),或者它們可能會有間隙(來源 Observable 發出的項目未在任何緩衝區中表示)。
buffer(int,int)
buffer(bufferClosingSelector)
當它訂閱來源 Observable 時,buffer(bufferClosingSelector)
開始將其發出的項目收集到 List
中,並且它還會呼叫 bufferClosingSelector
來產生第二個 Observable。當這個第二個 Observable 發出 TClosing
物件時,buffer
會發出目前的 List
並重複此程序:開始新的 List
並呼叫 bufferClosingSelector
來建立新的 Observable 以進行監控。它會這樣做,直到來源 Observable 終止。
buffer(Func0)
buffer(boundary)
buffer(boundary)
會監控 Observable boundary
。每當該 Observable 發出一個項目時,它會建立一個新的 List
,開始收集來源 Observable 發出的項目,並發出先前的 List
。
buffer(Observable)
buffer(Observable,int)
buffer(bufferOpenings, bufferClosingSelector)
buffer(bufferOpenings, bufferClosingSelector)
會監控 Observable bufferOpenings
,該 Observable 發出 BufferOpening
物件。每當它觀察到發出的項目時,它會建立一個新的 List
,開始收集來源 Observable 發出的項目,並將 bufferOpenings
Observable 傳遞到 closingSelector
函數中。該函數會傳回一個 Observable。buffer
會監控該 Observable,當它偵測到其中發出的項目時,它會關閉 List
並將其作為自己的發出項目發出。
buffer(Observable,Func1)
buffer(timespan, unit
[, scheduler
])
buffer(timespan, unit)
會定期發出新的項目 List
,每隔 timespan
的時間,其中包含自上次捆綁發出以來,或在第一個捆綁的情況下,自訂閱來源 Observable 以來,來源 Observable 發出的所有項目。此運算子的變體也有一個版本,它接受 Scheduler
作為參數,並使用它來管理時間跨度;預設情況下,此變體使用 computation
scheduler。
buffer(long,TimeUnit)
buffer(long,TimeUnit,Scheduler)
buffer(timespan, unit, count
[, scheduler
])
buffer(timespan, unit, count)
會為來源 Observable 發出的每 count
個項目發出一個新的項目 List
,或者,如果自上次捆綁發出以來已過了 timespan
,它會發出在該時間跨度內來源 Observable 發出的任意數量的項目捆綁包,即使這少於 count
。此運算子的變體也有一個版本,它接受 Scheduler
作為參數,並使用它來管理時間跨度;預設情況下,此變體使用 computation
scheduler。
buffer(long,TimeUnit,int)
buffer(long,TimeUnit,int,Scheduler)
buffer(timespan, timeshift, unit
[, scheduler
])
buffer(timespan, timeshift, unit)
每隔 timeshift
的時間建立一個新的項目 List
,並以自該時間起直到自捆綁包建立以來經過 timespan
時間,來源 Observable 發出的每個項目填滿此捆綁包,然後發出此 List
作為自己的發出項目。如果 timespan
長於 timeshift
,則發出的捆綁包將表示重疊的時間段,因此它們可能包含重複的項目。此運算子的變體也有一個版本,它接受 Scheduler
作為參數,並使用它來管理時間跨度;預設情況下,此變體使用 computation
scheduler。
buffer(long,long,TimeUnit)
buffer(long,long,TimeUnit,Scheduler)
您可以使用 Buffer 運算子來實作背壓(也就是說,處理 Observable 可能產生項目太快而觀察者無法消耗的情況)。
Buffer 可以將許多項目的序列減少為較少項目的緩衝區序列,使其更易於管理。例如,您可以定期以固定的時間間隔關閉並發出 bursty Observable 中的項目緩衝區。
Observable<List<Integer>> burstyBuffered = bursty.buffer(500, TimeUnit.MILLISECONDS);
或者您可以發揮想像力,在 bursty 時段收集緩衝區中的項目,並在每個 bursty 結束時發出它們,方法是使用 Debounce 運算子向 buffer 運算子發出緩衝區關閉指示。
// we have to multicast the original bursty Observable so we can use it // both as our source and as the source for our buffer closing selector: Observable<Integer> burstyMulticast = bursty.publish().refCount(); // burstyDebounced will be our buffer closing selector: Observable<Integer> burstyDebounced = burstyMulticast.debounce(10, TimeUnit.MILLISECONDS); // and this, finally, is the Observable of buffers we're interested in: Observable<List<Integer>> burstyBuffered = burstyMulticast.buffer(burstyDebounced);
RxJS 有四個 Buffer 運算子 — buffer
、bufferWithCount
、bufferWithTime
和 bufferWithTimeOrCount
,每個運算子都有不同的變體,以不同的方式管理哪些來源 Observable 項目作為哪些緩衝區的一部分發出。
buffer(bufferBoundaries)
buffer(bufferBoundaries)
會監控 Observable bufferBoundaries
。每當該 Observable 發出一個項目時,它會建立一個新的集合,開始收集來源 Observable 發出的項目,並發出先前的集合。
buffer(bufferClosingSelector)
當它訂閱來源 Observable 時,buffer(bufferClosingSelector)
開始將其發出的項目收集到集合中,並且它還會呼叫 bufferClosingSelector
來產生第二個 Observable。當這個第二個 Observable 發出一個項目時,buffer
會發出目前的集合並重複此程序:開始新的集合並呼叫 bufferClosingSelector
來建立新的 Observable 以進行監控。它會這樣做,直到來源 Observable 終止。
buffer(bufferOpenings,bufferClosingSelector)
buffer(bufferOpenings, bufferClosingSelector)
會監控 Observable bufferOpenings
,該 Observable 發出 BufferOpening
物件。每當它觀察到發出的項目時,它會建立一個新的集合,開始收集來源 Observable 發出的項目,並將 bufferOpenings
Observable 傳遞到 bufferClosingSelector
函數中。該函數會傳回一個 Observable。buffer
會監控該 Observable,當它偵測到其中發出的項目時,它會發出目前的集合並開始新的集合。
buffer
存在於以下每個發行版本中
rx.all.js
rx.all.compat.js
rx.coincidence.js
buffer
需要以下其中一個發行版本
rx.js
rx.compat.js
rx.lite.js
rx.lite.compat.js
bufferWithCount(count)
bufferWithCount(count)
發出非重疊的緩衝區,每個緩衝區最多包含來自來源 Observable 的 count
個項目(最後發出的緩衝區可能包含少於 count
個項目)。
bufferWithCount(count, skip)
bufferWithCount(count, skip)
會建立一個新的緩衝區,從來源 Observable 發出的第一個項目開始,然後每隔 skip
個項目建立一個新的緩衝區,並在每個緩衝區中填入 count
個項目:初始項目和後續的 count-1
個項目,並在緩衝區完成時發出每個緩衝區。根據 count
和 skip
的值,這些緩衝區可能會重疊(多個緩衝區可能包含相同的項目),或者它們可能會有間隙(來源 Observable 發出的項目未在任何緩衝區中表示)。
bufferWithCount
可以在以下各個發行版本中找到
rx.js
rx.compat.js
rx.all.js
rx.all.compat.js
rx.lite.extras.js
bufferWithTime(timeSpan)
bufferWithTime(timeSpan)
會定期發出一個新的項目集合,每隔 timeSpan
毫秒,其中包含自上次捆綁發出以來(如果是第一個捆綁,則自訂閱來源 Observable 以來)來源 Observable 發出的所有項目。此運算子的這個變體也有一個版本,它接受 Scheduler
作為參數,並使用它來管理時間跨度;預設情況下,此變體使用 timeout
排程器。
bufferWithTime(timeSpan, timeShift)
bufferWithTime(timeSpan, timeShift)
會每隔 timeShift
毫秒建立一個新的項目集合,並在這個捆綁中填入自該時間起,直到自集合建立以來經過 timeSpan
毫秒的時間內,來源 Observable 發出的每個項目,然後將此集合作為其自己的發出發出。如果 timeSpan
長於 timeShift
,則發出的捆綁將表示重疊的時間段,因此它們可能包含重複的項目。此運算子的這個變體也有一個版本,它接受 Scheduler
作為參數,並使用它來管理時間跨度;預設情況下,此變體使用 timeout
排程器。
bufferWithTimeOrCount(timeSpan, count)
bufferWithTimeOrCount(timeSpan, count)
會針對來源 Observable 發出的每 count
個項目發出一個新的項目集合,或者,如果自上次集合發出以來已經過了 timeSpan
毫秒,它會發出一個包含該時間跨度內來源 Observable 發出的任意數量的項目的集合,即使這個數量少於 count
。此運算子的這個變體也有一個版本,它接受 Scheduler
作為參數,並使用它來管理時間跨度;預設情況下,此變體使用 timeout
排程器。
bufferWithTime
和 bufferWithTimeOrCount
可以在以下各個發行版本中找到
rx.all.js
rx.all.compat.js
rx.time.js
bufferWithTime
和 bufferWithTimeOrCount
需要以下發行版本之一
rx.time.js
需要 rx.js
或 rx.compat.js
rx.lite.js
或 rx.lite.compat.js
在 RxKotlin 中,有幾個 Buffer 的變體
buffer(count)
buffer(count)
發出非重疊的緩衝區,形式為 List
,每個緩衝區最多包含來自來源 Observable 的 count
個項目(最後發出的 List
可能少於 count
個項目)。
buffer(count, skip)
buffer(count, skip)
從來源 Observable 發出的第一個項目開始建立新的緩衝區,然後每隔 skip
個項目建立一個,並以 count
個項目填滿每個緩衝區:初始項目和後續的 count-1
個項目。它會以 List
的形式發出這些緩衝區。根據 count
和 skip
的值,這些緩衝區可能會重疊(多個緩衝區可能包含相同的項目),或者它們可能會有間隙(來源 Observable 發出的項目未在任何緩衝區中表示)。
buffer(bufferClosingSelector)
當它訂閱來源 Observable 時,buffer(bufferClosingSelector)
開始將其發出的項目收集到 List
中,並且它還會呼叫 bufferClosingSelector
來產生第二個 Observable。當這個第二個 Observable 發出 TClosing
物件時,buffer
會發出目前的 List
並重複此程序:開始新的 List
並呼叫 bufferClosingSelector
來建立新的 Observable 以進行監控。它會這樣做,直到來源 Observable 終止。
buffer(boundary)
buffer(boundary)
會監控 Observable boundary
。每當該 Observable 發出一個項目時,它會建立一個新的 List
,開始收集來源 Observable 發出的項目,並發出先前的 List
。
buffer(bufferOpenings, bufferClosingSelector)
buffer(bufferOpenings, bufferClosingSelector)
會監控 Observable bufferOpenings
,該 Observable 發出 BufferOpening
物件。每當它觀察到發出的項目時,它會建立一個新的 List
,開始收集來源 Observable 發出的項目,並將 bufferOpenings
Observable 傳遞到 closingSelector
函數中。該函數會傳回一個 Observable。buffer
會監控該 Observable,當它偵測到其中發出的項目時,它會關閉 List
並將其作為自己的發出項目發出。
buffer(timespan, unit
[, scheduler
])
buffer(timespan, unit)
會定期發出新的項目 List
,每隔 timespan
的時間,其中包含自上次捆綁發出以來,或在第一個捆綁的情況下,自訂閱來源 Observable 以來,來源 Observable 發出的所有項目。此運算子的變體也有一個版本,它接受 Scheduler
作為參數,並使用它來管理時間跨度;預設情況下,此變體使用 computation
scheduler。
buffer(timespan, unit, count
[, scheduler
])
buffer(timespan, unit, count)
會為來源 Observable 發出的每 count
個項目發出一個新的項目 List
,或者,如果自上次捆綁發出以來已過了 timespan
,它會發出在該時間跨度內來源 Observable 發出的任意數量的項目捆綁包,即使這少於 count
。此運算子的變體也有一個版本,它接受 Scheduler
作為參數,並使用它來管理時間跨度;預設情況下,此變體使用 computation
scheduler。
buffer(timespan, timeshift, unit
[, scheduler
])
buffer(timespan, timeshift, unit)
每隔 timeshift
的時間建立一個新的項目 List
,並以自該時間起直到自捆綁包建立以來經過 timespan
時間,來源 Observable 發出的每個項目填滿此捆綁包,然後發出此 List
作為自己的發出項目。如果 timespan
長於 timeshift
,則發出的捆綁包將表示重疊的時間段,因此它們可能包含重複的項目。此運算子的變體也有一個版本,它接受 Scheduler
作為參數,並使用它來管理時間跨度;預設情況下,此變體使用 computation
scheduler。
在 Rx.NET 中,有幾個 Buffer 的變體。對於每個變體,您可以將來源 Observable 作為第一個參數傳入,或者您可以將其作為來源 Observable 的實例方法呼叫(在這種情況下,您可以省略該參數)
Buffer(count)
Buffer(count)
會以 IList
的形式發出非重疊的緩衝區,每個緩衝區最多包含來源 Observable 的 count
個項目(最後發出的 IList
可能少於 count
個項目)。
Buffer(count, skip)
Buffer(count, skip)
會建立一個新的緩衝區,從來源 Observable 發出的第一個項目開始,然後每隔 skip
個項目建立一個新的緩衝區,並在每個緩衝區中填入 count
個項目:初始項目和後續的 count-1
個項目。它會將這些緩衝區作為 IList
發出。根據 count
和 skip
的值,這些緩衝區可能會重疊(多個緩衝區可能包含相同的項目),或者它們可能會有間隙(來源 Observable 發出的項目未在任何緩衝區中表示)。
Buffer(bufferClosingSelector)
當它訂閱來源 Observable 時,Buffer(bufferClosingSelector)
開始將其發出收集到 IList
中,並且它還會呼叫 bufferClosingSelector
來產生第二個 Observable。當第二個 Observable 發出一個 TBufferClosing
物件時,Buffer
會發出目前的 IList
並重複此過程:開始新的 IList
並呼叫 bufferClosingSelector
來建立新的 Observable 以進行監控。它會這樣做直到來源 Observable 終止。
Buffer(bufferOpenings,bufferClosingSelector)
Buffer(bufferOpenings, bufferClosingSelector)
會監控一個 Observable,BufferOpenings
,它會發出 TBufferOpening
物件。每次它觀察到這樣發出的項目時,它都會建立一個新的 IList
來開始收集來源 Observable 發出的項目,並將 TBufferOpening
物件傳遞到 bufferClosingSelector
函式。該函式會傳回一個 Observable。Buffer
會監控該 Observable,當它偵測到它發出的項目時,它會關閉 IList
並將其作為自己的發出發出。
Buffer(timeSpan)
Buffer(timeSpan)
會定期發出一個新的項目 IList
,每隔 timeSpan
時間,其中包含自上次捆綁發出以來(如果是第一個列表,則自訂閱來源 Observable 以來)來源 Observable 發出的所有項目。此運算子的這個變體也有一個版本,它接受 IScheduler
作為參數,並使用它來管理時間跨度。
Buffer(timeSpan, count)
Buffer(timeSpan, count)
會針對來源 Observable 發出的每 count
個項目發出一個新的項目 IList
,或者,如果自上次列表發出以來已經過了 timeSpan
時間,它會發出一個包含該時間跨度內來源 Observable 發出的任意數量的項目的列表,即使這個數量少於 count
。此運算子的這個變體也有一個版本,它接受 IScheduler
作為參數,並使用它來管理時間跨度。
Buffer(timeSpan, timeShift)
Buffer(timeSpan, timeShift)
會每隔 timeShift
時間段建立一個新的項目 IList
,並在這個列表中填入自該時間起,直到自列表建立以來經過 timeSpan
時間內,來源 Observable 發出的每個項目,然後將此 IList
作為自己的發出發出。如果 timeSpan
長於 timeShift
,則發出的列表將表示重疊的時間段,因此它們可能包含重複的項目。此運算子的這個變體也有一個版本,它接受 IScheduler
作為參數,並使用它來管理時間跨度。
RxPHP 將此運算子實作為 bufferWithCount
。
將可觀察序列的每個元素投影到零個或多個基於元素計數資訊產生的緩衝區中。
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/bufferWithCount/bufferWithCount.php $source = Rx\Observable::range(1, 6) ->bufferWithCount(2) ->subscribe($stdoutObserver);
Next value: [1,2] Next value: [3,4] Next value: [5,6] Complete!
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/bufferWithCount/bufferWithCountAndSkip.php $source = Rx\Observable::range(1, 6) ->bufferWithCount(2, 1) ->subscribe($stdoutObserver);
Next value: [1,2] Next value: [2,3] Next value: [3,4] Next value: [4,5] Next value: [5,6] Next value: [6] Complete!
RxPY 有幾個 Buffer 變體:buffer
、buffer_with_count
、buffer_with_time
和 buffer_with_time_or_count
。對於每個這些變體,都有可選參數可以更改運算子的行為。在 RxPY 中,當運算子可能接受多個可選參數時,請務必在您呼叫運算子時,在參數列表中命名參數,以避免歧義。
buffer(buffer_openings)
buffer(buffer_openings=boundaryObservable)
會監控一個 Observable,buffer_openings
。每次該 Observable 發出一個項目時,它都會建立一個新的陣列來開始收集來源 Observable 發出的項目,並發出先前的陣列。
buffer(closing_selector)
buffer(closing_selector=closingSelector)
會在訂閱後立即開始收集來源 Observable 發出的項目,並且還會呼叫 closing_selector
函式來產生第二個 Observable。它會監控這個新的 Observable,並且當它完成或發出一個項目時,它會發出目前的陣列,開始一個新的陣列來收集來源 Observable 中的項目,並再次呼叫 closing_selector
來產生一個新的 Observable 以進行監控,以確定何時發出新的陣列。它會重複此過程直到來源 Observable 終止,然後發出最終的陣列。
buffer(closing_selector=openingSelector, buffer_closing_selector=closingSelector)
首先呼叫 closing_selector
來取得一個 Observable。它會監控這個 Observable,並且每當它發出一個項目時,buffer
就會建立一個新的陣列,開始將後續來源 Observable 發出的項目收集到這個陣列中,並呼叫 buffer_closing_selector
來取得一個新的 Observable 以管理該陣列的關閉。當這個新的 Observable 發出一個項目或終止時,buffer
會關閉並發出該 Observable 管理的陣列。
buffer_with_count(count)
buffer_with_count(count)
會以陣列的形式發出非重疊的緩衝區,每個緩衝區最多包含來源 Observable 的 count
個項目(最後發出的陣列可能少於 count
個項目)。
buffer_with_count(count, skip)
buffer_with_count(count, skip=skip)
會建立一個新的緩衝區,從來源 Observable 發出的第一個項目開始,然後每隔 skip
個項目建立一個新的緩衝區,並在每個緩衝區中填入 count
個項目:初始項目和後續的 count-1
個項目。它會將這些緩衝區作為陣列發出。根據 count
和 skip
的值,這些緩衝區可能會重疊(多個緩衝區可能包含相同的項目),或者它們可能會有間隙(來源 Observable 發出的項目未在任何緩衝區中表示)。
buffer_with_time(timespan)
buffer_with_time(timespan)
會定期發出一個新的項目陣列,每隔 timespan
毫秒,其中包含自上次捆綁發出以來(如果是第一個捆綁,則自訂閱來源 Observable 以來)來源 Observable 發出的所有項目。此運算子的這個變體也有一個版本,它接受 scheduler
參數,並使用它來管理時間跨度;預設情況下,此變體使用 timeout
排程器。
buffer_with_time(timespan, timeshift)
buffer(timespan, timeshift=timeshift)
會每隔 timeshift
毫秒建立一個新的項目陣列,並在這個陣列中填入自該時間起,直到自陣列建立以來經過 timespan
毫秒的時間內,來源 Observable 發出的每個項目,然後將此陣列作為其自己的發出發出。如果 timespan
長於 timeshift
,則發出的陣列將表示重疊的時間段,因此它們可能包含重複的項目。此運算子的這個變體也有一個版本,它接受 scheduler
參數,並使用它來管理時間跨度;預設情況下,此變體使用 timeout
排程器。
buffer_with_time_or_count(timespan, count)
buffer_with_time_or_count(timespan, count)
會針對來源 Observable 發出的每 count
個項目發出一個新的項目陣列,或者,如果自上次捆綁發出以來已經過了 timespan
毫秒,它會發出一個包含該時間跨度內來源 Observable 發出的任意數量的項目的陣列,即使這個數量少於 count
。此運算子的這個變體也有一個版本,它接受 scheduler
參數,並使用它來管理時間跨度;預設情況下,此變體使用 timeout
排程器。
Rx.rb 有三個 Buffer 運算子的變體
buffer_with_count(count)
buffer_with_count(count)
會以陣列的形式發出非重疊的緩衝區,每個緩衝區最多包含來源 Observable 的 count
個項目(最後發出的陣列可能少於 count
個項目)。
buffer_with_count(count,skip)
buffer_with_count(count, skip=skip)
會建立一個新的緩衝區,從來源 Observable 發出的第一個項目開始,然後每隔 skip
個項目建立一個新的緩衝區,並在每個緩衝區中填入 count
個項目:初始項目和後續的 count-1
個項目。它會將這些緩衝區作為陣列發出。根據 count
和 skip
的值,這些緩衝區可能會重疊(多個緩衝區可能包含相同的項目),或者它們可能會有間隙(來源 Observable 發出的項目未在任何緩衝區中表示)。
buffer_with_time(timespan)
buffer_with_time(timespan)
會定期發出一個新的項目陣列,每隔 timespan
毫秒,其中包含自上次捆綁發出以來(如果是第一個捆綁,則自訂閱來源 Observable 以來)來源 Observable 發出的所有項目。
RxScala 有兩種 Buffer 變體 — slidingBuffer
和 tumblingBuffer
— 每一種都有不同的變體,這些變體以不同的方式組裝它們發出的緩衝區
slidingBuffer(count, skip)
slidingBuffer(count, skip)
會建立一個新的緩衝區,從來源 Observable 發出的第一個項目開始,然後每隔 skip
個項目建立一個,並以 count
個項目填滿每個緩衝區:初始項目和後續的 count-1
個項目。它會將這些緩衝區以 Seq
的形式發出。根據 count
和 skip
的值,這些緩衝區可能會重疊(多個緩衝區可能包含相同的項目),或者它們可能會出現間隙(來源 Observable 發出的項目未在任何緩衝區中表示)。
slidingBuffer(timespan, timeshift)
slidingBuffer(timespan, timeshift)
每隔 timeshift
(一個 Duration
) 就會建立一個新的項目 Seq
,並以來源 Observable 從該時間點起發出的每個項目填滿此緩衝區,直到自緩衝區建立以來經過 timespan
(也是一個 Duration
) 為止,然後將此 Seq
作為自己的發射物發出。如果 timespan
比 timeshift
長,則發射的陣列將代表重疊的時間段,因此它們可能包含重複的項目。此運算子的這個變體還有一個版本,它會將 Scheduler
作為參數,並使用它來控制時間跨度。
slidingBuffer(openings, closings)
slidingBuffer(openings,closings)
監視 openings
Observable,並且每當它發出一個 Opening
項目時,slidingBuffer
會建立一個新的 Seq
,開始將來源 Observable 後續發出的項目收集到此緩衝區中,並呼叫 closings
以取得一個新的 Observable 來控制該緩衝區的關閉。當這個新的 Observable 發出一個項目或終止時,slidingBuffer
會關閉並發出由該 Observable 控制的 Seq
。
tumblingBuffer(count)
tumblingBuffer(count)
以 Seq
的形式發出非重疊的緩衝區,每個緩衝區最多包含來自來源 Observable 的 count
個項目(最後發出的緩衝區可能少於 count
個項目)。
tumblingBuffer(boundary)
tumblingBuffer(boundary)
監視一個 Observable,boundary
。每次該 Observable 發出一個項目時,它會建立一個新的 Seq
來開始收集來源 Observable 發出的項目,並發出先前的 Seq
。此運算子的這個變體有一個可選的第二個參數 initialCapacity
,您可以使用它來指示這些緩衝區的預期大小,以便更有效率地配置記憶體。
tumblingBuffer(timespan)
tumblingBuffer(timespan)
每隔 timespan
(一個 Duration
) 就會定期發出一個新的項目 Seq
,其中包含來源 Observable 自上一個捆綁發射以來或,如果這是第一個捆綁包,則自訂閱來源 Observable 以來發出的所有項目。此運算子的這個變體有一個可選的第二個參數 scheduler
,您可以使用它來設定要控制時間跨度計算的 Scheduler
。
tumblingBuffer(timespan, count)
tumblingBuffer(timespan, count)
會針對來源 Observable 發出的每 count
個項目發出一個新的項目 Seq
,或者,如果自上次捆綁發射以來已經過 timespan
(一個 Duration
),則它會發出一個 Seq
,其中包含來源 Observable 在該時間跨度內發射的任何項目數,即使這少於 count
。此運算子的這個變體有一個可選的第三個參數 scheduler
,您可以使用它來設定要控制時間跨度計算的 Scheduler
。