Buffer

將 Observable 發出的項目定期收集到捆綁包中,並發出這些捆綁包,而不是一次發出一個項目

Buffer

Buffer 運算子會將發出項目的 Observable 轉換為發出這些項目緩衝集合的 Observable。在各種特定語言的 Buffer 實作中,有許多變體在於它們如何選擇哪些項目進入哪些緩衝區。

請注意,如果來源 Observable 發出 onError 通知,Buffer 會立即傳遞此通知,而不會先發出它正在組裝的緩衝區,即使該緩衝區包含來源 Observable 在發出錯誤通知之前發出的項目也是如此。

Window 運算子與 Buffer 類似,但會將項目收集到單獨的 Observable 中,而不是收集到資料結構中再重新發出。

另請參閱

特定語言資訊

RxCpp 實作了 Buffer 的兩個變體

buffer(count)

buffer(count)

buffer(count) 發出非重疊的緩衝區,形式為 vector,每個緩衝區最多包含來自來源 Observable 的 count 個項目(最後發出的 vector 可能少於 count 個項目)。

buffer(count, skip)

buffer(count,skip)

buffer(count, skip) 從來源 Observable 發出的第一個項目開始建立新的緩衝區,然後每隔 skip 個項目建立一個,並以 count 個項目填滿每個緩衝區:初始項目和後續的 count-1 個項目。它會以 vector 的形式發出這些緩衝區。根據 countskip 的值,這些緩衝區可能會重疊(多個緩衝區可能包含相同的項目),或者它們可能會有間隙(來源 Observable 發出的項目未在任何緩衝區中表示)。

在 RxGroovy 中,Buffer 有幾個變體

buffer(count)

buffer(count)

buffer(count) 發出非重疊的緩衝區,形式為 List,每個緩衝區最多包含來自來源 Observable 的 count 個項目(最後發出的 List 可能少於 count 個項目)。

buffer(count, skip)

buffer(count,skip)

buffer(count, skip) 從來源 Observable 發出的第一個項目開始建立新的緩衝區,然後每隔 skip 個項目建立一個,並以 count 個項目填滿每個緩衝區:初始項目和後續的 count-1 個項目。它會以 List 的形式發出這些緩衝區。根據 countskip 的值,這些緩衝區可能會重疊(多個緩衝區可能包含相同的項目),或者它們可能會有間隙(來源 Observable 發出的項目未在任何緩衝區中表示)。

buffer(bufferClosingSelector)

buffer(bufferClosingSelector)

當它訂閱來源 Observable 時,buffer(bufferClosingSelector) 開始將其發出的項目收集到 List 中,並且它還會呼叫 bufferClosingSelector 來產生第二個 Observable。當這個第二個 Observable 發出 TClosing 物件時,buffer 會發出目前的 List 並重複此程序:開始新的 List 並呼叫 bufferClosingSelector 來建立新的 Observable 以進行監控。它會這樣做,直到來源 Observable 終止。

buffer(boundary[, initialCapacity])

buffer(boundary)

buffer(boundary) 會監控 Observable boundary。每當該 Observable 發出一個項目時,它會建立一個新的 List,開始收集來源 Observable 發出的項目,並發出先前的 List

buffer(bufferOpenings, bufferClosingSelector)

buffer(bufferOpenings,bufferClosingSelector)

buffer(bufferOpenings, bufferClosingSelector) 會監控 Observable bufferOpenings,該 Observable 發出 BufferOpening 物件。每當它觀察到發出的項目時,它會建立一個新的 List,開始收集來源 Observable 發出的項目,並將 bufferOpenings Observable 傳遞到 closingSelector 函數中。該函數會傳回一個 Observable。buffer 會監控該 Observable,當它偵測到其中發出的項目時,它會關閉 List 並將其作為自己的發出項目發出。

buffer(timespan, unit[, scheduler])

buffer(timespan,unit)

buffer(timespan, unit) 會定期發出新的項目 List,每隔 timespan 的時間,其中包含自上次捆綁發出以來,或在第一個捆綁的情況下,自訂閱來源 Observable 以來,來源 Observable 發出的所有項目。此運算子的變體也有一個版本,它接受 Scheduler 作為參數,並使用它來管理時間跨度;預設情況下,此變體使用 computation Scheduler。

buffer(timespan, unit, count[, scheduler])

buffer(timespan,unit,count)

buffer(timespan, unit, count) 會為來源 Observable 發出的每 count 個項目發出一個新的項目 List,或者,如果自上次捆綁發出以來已過了 timespan,它會發出在該時間跨度內來源 Observable 發出的任意數量的項目捆綁包,即使這少於 count。此運算子的變體也有一個版本,它接受 Scheduler 作為參數,並使用它來管理時間跨度;預設情況下,此變體使用 computation scheduler。

buffer(timespan, timeshift, unit[, scheduler])

buffer(timespan,timeshift,unit)

buffer(timespan, timeshift, unit) 每隔 timeshift 的時間建立一個新的項目 List,並以自該時間起直到自捆綁包建立以來經過 timespan 時間,來源 Observable 發出的每個項目填滿此捆綁包,然後發出此 List 作為自己的發出項目。如果 timespan 長於 timeshift,則發出的捆綁包將表示重疊的時間段,因此它們可能包含重複的項目。此運算子的變體也有一個版本,它接受 Scheduler 作為參數,並使用它來管理時間跨度;預設情況下,此變體使用 computation scheduler。

您可以使用 Buffer 運算子來實作背壓(也就是說,處理 Observable 可能產生項目太快而觀察者無法消耗的情況)。

Buffer as a backpressure strategy

Buffer 可以將許多項目的序列減少為較少項目的緩衝區序列,使其更易於管理。例如,您可以定期以固定的時間間隔關閉並發出 bursty Observable 中的項目緩衝區。

範例程式碼

Observable<List<Integer>> burstyBuffered = bursty.buffer(500, TimeUnit.MILLISECONDS);
Buffer as a backpressure strategy

或者您可以發揮想像力,在 bursty 時段收集緩衝區中的項目,並在每個 bursty 結束時發出它們,方法是使用 Debounce 運算子向 buffer 運算子發出緩衝區關閉指示。

範例程式碼

// we have to multicast the original bursty Observable so we can use it
// both as our source and as the source for our buffer closing selector:
Observable<Integer> burstyMulticast = bursty.publish().refCount();
// burstyDebounced will be our buffer closing selector:
Observable<Integer> burstyDebounced = burstyMulticast.debounce(10, TimeUnit.MILLISECONDS);
// and this, finally, is the Observable of buffers we're interested in:
Observable<List<Integer>> burstyBuffered = burstyMulticast.buffer(burstyDebounced);

在 RxJava 中,Buffer 有幾個變體

buffer(count)

buffer(count)

buffer(count) 發出非重疊的緩衝區,形式為 List,每個緩衝區最多包含來自來源 Observable 的 count 個項目(最後發出的 List 可能少於 count 個項目)。

buffer(count, skip)

buffer(count,skip)

buffer(count, skip) 從來源 Observable 發出的第一個項目開始建立新的緩衝區,然後每隔 skip 個項目建立一個,並以 count 個項目填滿每個緩衝區:初始項目和後續的 count-1 個項目。它會以 List 的形式發出這些緩衝區。根據 countskip 的值,這些緩衝區可能會重疊(多個緩衝區可能包含相同的項目),或者它們可能會有間隙(來源 Observable 發出的項目未在任何緩衝區中表示)。

buffer(bufferClosingSelector)

buffer(bufferClosingSelector)

當它訂閱來源 Observable 時,buffer(bufferClosingSelector) 開始將其發出的項目收集到 List 中,並且它還會呼叫 bufferClosingSelector 來產生第二個 Observable。當這個第二個 Observable 發出 TClosing 物件時,buffer 會發出目前的 List 並重複此程序:開始新的 List 並呼叫 bufferClosingSelector 來建立新的 Observable 以進行監控。它會這樣做,直到來源 Observable 終止。

buffer(boundary)

buffer(boundary)

buffer(boundary) 會監控 Observable boundary。每當該 Observable 發出一個項目時,它會建立一個新的 List,開始收集來源 Observable 發出的項目,並發出先前的 List

buffer(bufferOpenings, bufferClosingSelector)

buffer(bufferOpenings,bufferClosingSelector)

buffer(bufferOpenings, bufferClosingSelector) 會監控 Observable bufferOpenings,該 Observable 發出 BufferOpening 物件。每當它觀察到發出的項目時,它會建立一個新的 List,開始收集來源 Observable 發出的項目,並將 bufferOpenings Observable 傳遞到 closingSelector 函數中。該函數會傳回一個 Observable。buffer 會監控該 Observable,當它偵測到其中發出的項目時,它會關閉 List 並將其作為自己的發出項目發出。

buffer(timespan, unit[, scheduler])

buffer(timespan,unit)

buffer(timespan, unit) 會定期發出新的項目 List,每隔 timespan 的時間,其中包含自上次捆綁發出以來,或在第一個捆綁的情況下,自訂閱來源 Observable 以來,來源 Observable 發出的所有項目。此運算子的變體也有一個版本,它接受 Scheduler 作為參數,並使用它來管理時間跨度;預設情況下,此變體使用 computation scheduler。

buffer(timespan, unit, count[, scheduler])

buffer(timespan,unit,count)

buffer(timespan, unit, count) 會為來源 Observable 發出的每 count 個項目發出一個新的項目 List,或者,如果自上次捆綁發出以來已過了 timespan,它會發出在該時間跨度內來源 Observable 發出的任意數量的項目捆綁包,即使這少於 count。此運算子的變體也有一個版本,它接受 Scheduler 作為參數,並使用它來管理時間跨度;預設情況下,此變體使用 computation scheduler。

buffer(timespan, timeshift, unit[, scheduler])

buffer(timespan,timeshift,unit)

buffer(timespan, timeshift, unit) 每隔 timeshift 的時間建立一個新的項目 List,並以自該時間起直到自捆綁包建立以來經過 timespan 時間,來源 Observable 發出的每個項目填滿此捆綁包,然後發出此 List 作為自己的發出項目。如果 timespan 長於 timeshift,則發出的捆綁包將表示重疊的時間段,因此它們可能包含重複的項目。此運算子的變體也有一個版本,它接受 Scheduler 作為參數,並使用它來管理時間跨度;預設情況下,此變體使用 computation scheduler。

您可以使用 Buffer 運算子來實作背壓(也就是說,處理 Observable 可能產生項目太快而觀察者無法消耗的情況)。

Buffer as a backpressure strategy

Buffer 可以將許多項目的序列減少為較少項目的緩衝區序列,使其更易於管理。例如,您可以定期以固定的時間間隔關閉並發出 bursty Observable 中的項目緩衝區。

範例程式碼

Observable<List<Integer>> burstyBuffered = bursty.buffer(500, TimeUnit.MILLISECONDS);
Buffer as a backpressure strategy

或者您可以發揮想像力,在 bursty 時段收集緩衝區中的項目,並在每個 bursty 結束時發出它們,方法是使用 Debounce 運算子向 buffer 運算子發出緩衝區關閉指示。

範例程式碼

// we have to multicast the original bursty Observable so we can use it
// both as our source and as the source for our buffer closing selector:
Observable<Integer> burstyMulticast = bursty.publish().refCount();
// burstyDebounced will be our buffer closing selector:
Observable<Integer> burstyDebounced = burstyMulticast.debounce(10, TimeUnit.MILLISECONDS);
// and this, finally, is the Observable of buffers we're interested in:
Observable<List<Integer>> burstyBuffered = burstyMulticast.buffer(burstyDebounced);

另請參閱

RxJS 有四個 Buffer 運算子 — bufferbufferWithCountbufferWithTimebufferWithTimeOrCount,每個運算子都有不同的變體,以不同的方式管理哪些來源 Observable 項目作為哪些緩衝區的一部分發出。

buffer(bufferBoundaries)

buffer(bufferBoundaries)

buffer(bufferBoundaries) 會監控 Observable bufferBoundaries。每當該 Observable 發出一個項目時,它會建立一個新的集合,開始收集來源 Observable 發出的項目,並發出先前的集合。

buffer(bufferClosingSelector)

buffer(bufferClosingSelector)

當它訂閱來源 Observable 時,buffer(bufferClosingSelector) 開始將其發出的項目收集到集合中,並且它還會呼叫 bufferClosingSelector 來產生第二個 Observable。當這個第二個 Observable 發出一個項目時,buffer 會發出目前的集合並重複此程序:開始新的集合並呼叫 bufferClosingSelector 來建立新的 Observable 以進行監控。它會這樣做,直到來源 Observable 終止。

buffer(bufferOpenings,bufferClosingSelector)

buffer(bufferOpenings,bufferClosingSelector)

buffer(bufferOpenings, bufferClosingSelector) 會監控 Observable bufferOpenings,該 Observable 發出 BufferOpening 物件。每當它觀察到發出的項目時,它會建立一個新的集合,開始收集來源 Observable 發出的項目,並將 bufferOpenings Observable 傳遞到 bufferClosingSelector 函數中。該函數會傳回一個 Observable。buffer 會監控該 Observable,當它偵測到其中發出的項目時,它會發出目前的集合並開始新的集合。

buffer 存在於以下每個發行版本中

  • rx.all.js
  • rx.all.compat.js
  • rx.coincidence.js

buffer 需要以下其中一個發行版本

  • rx.js
  • rx.compat.js
  • rx.lite.js
  • rx.lite.compat.js

bufferWithCount(count)

bufferWithCount(count)

bufferWithCount(count) 發出非重疊的緩衝區,每個緩衝區最多包含來自來源 Observable 的 count 個項目(最後發出的緩衝區可能包含少於 count 個項目)。

bufferWithCount(count, skip)

bufferWithCount(count,skip)

bufferWithCount(count, skip) 會建立一個新的緩衝區,從來源 Observable 發出的第一個項目開始,然後每隔 skip 個項目建立一個新的緩衝區,並在每個緩衝區中填入 count 個項目:初始項目和後續的 count-1 個項目,並在緩衝區完成時發出每個緩衝區。根據 countskip 的值,這些緩衝區可能會重疊(多個緩衝區可能包含相同的項目),或者它們可能會有間隙(來源 Observable 發出的項目未在任何緩衝區中表示)。

bufferWithCount 可以在以下各個發行版本中找到

  • rx.js
  • rx.compat.js
  • rx.all.js
  • rx.all.compat.js
  • rx.lite.extras.js

bufferWithTime(timeSpan)

bufferWithTime(timeSpan)

bufferWithTime(timeSpan) 會定期發出一個新的項目集合,每隔 timeSpan 毫秒,其中包含自上次捆綁發出以來(如果是第一個捆綁,則自訂閱來源 Observable 以來)來源 Observable 發出的所有項目。此運算子的這個變體也有一個版本,它接受 Scheduler 作為參數,並使用它來管理時間跨度;預設情況下,此變體使用 timeout 排程器。

bufferWithTime(timeSpan, timeShift)

bufferWithTime(timeSpan,timeShift)

bufferWithTime(timeSpan, timeShift) 會每隔 timeShift 毫秒建立一個新的項目集合,並在這個捆綁中填入自該時間起,直到自集合建立以來經過 timeSpan 毫秒的時間內,來源 Observable 發出的每個項目,然後將此集合作為其自己的發出發出。如果 timeSpan 長於 timeShift,則發出的捆綁將表示重疊的時間段,因此它們可能包含重複的項目。此運算子的這個變體也有一個版本,它接受 Scheduler 作為參數,並使用它來管理時間跨度;預設情況下,此變體使用 timeout 排程器。

bufferWithTimeOrCount(timeSpan, count)

bufferWithTimeOrCount(timeSpan,count)

bufferWithTimeOrCount(timeSpan, count) 會針對來源 Observable 發出的每 count 個項目發出一個新的項目集合,或者,如果自上次集合發出以來已經過了 timeSpan 毫秒,它會發出一個包含該時間跨度內來源 Observable 發出的任意數量的項目的集合,即使這個數量少於 count。此運算子的這個變體也有一個版本,它接受 Scheduler 作為參數,並使用它來管理時間跨度;預設情況下,此變體使用 timeout 排程器。

bufferWithTimebufferWithTimeOrCount 可以在以下各個發行版本中找到

  • rx.all.js
  • rx.all.compat.js
  • rx.time.js

bufferWithTimebufferWithTimeOrCount 需要以下發行版本之一

  • rx.time.js 需要 rx.jsrx.compat.js
  • 否則: rx.lite.jsrx.lite.compat.js

在 RxKotlin 中,有幾個 Buffer 的變體

buffer(count)

buffer(count)

buffer(count) 發出非重疊的緩衝區,形式為 List,每個緩衝區最多包含來自來源 Observable 的 count 個項目(最後發出的 List 可能少於 count 個項目)。

buffer(count, skip)

buffer(count,skip)

buffer(count, skip) 從來源 Observable 發出的第一個項目開始建立新的緩衝區,然後每隔 skip 個項目建立一個,並以 count 個項目填滿每個緩衝區:初始項目和後續的 count-1 個項目。它會以 List 的形式發出這些緩衝區。根據 countskip 的值,這些緩衝區可能會重疊(多個緩衝區可能包含相同的項目),或者它們可能會有間隙(來源 Observable 發出的項目未在任何緩衝區中表示)。

buffer(bufferClosingSelector)

buffer(bufferClosingSelector)

當它訂閱來源 Observable 時,buffer(bufferClosingSelector) 開始將其發出的項目收集到 List 中,並且它還會呼叫 bufferClosingSelector 來產生第二個 Observable。當這個第二個 Observable 發出 TClosing 物件時,buffer 會發出目前的 List 並重複此程序:開始新的 List 並呼叫 bufferClosingSelector 來建立新的 Observable 以進行監控。它會這樣做,直到來源 Observable 終止。

buffer(boundary)

buffer(boundary)

buffer(boundary) 會監控 Observable boundary。每當該 Observable 發出一個項目時,它會建立一個新的 List,開始收集來源 Observable 發出的項目,並發出先前的 List

buffer(bufferOpenings, bufferClosingSelector)

buffer(bufferOpenings,bufferClosingSelector)

buffer(bufferOpenings, bufferClosingSelector) 會監控 Observable bufferOpenings,該 Observable 發出 BufferOpening 物件。每當它觀察到發出的項目時,它會建立一個新的 List,開始收集來源 Observable 發出的項目,並將 bufferOpenings Observable 傳遞到 closingSelector 函數中。該函數會傳回一個 Observable。buffer 會監控該 Observable,當它偵測到其中發出的項目時,它會關閉 List 並將其作為自己的發出項目發出。

buffer(timespan, unit[, scheduler])

buffer(timespan,unit)

buffer(timespan, unit) 會定期發出新的項目 List,每隔 timespan 的時間,其中包含自上次捆綁發出以來,或在第一個捆綁的情況下,自訂閱來源 Observable 以來,來源 Observable 發出的所有項目。此運算子的變體也有一個版本,它接受 Scheduler 作為參數,並使用它來管理時間跨度;預設情況下,此變體使用 computation scheduler。

buffer(timespan, unit, count[, scheduler])

buffer(timespan,unit,count)

buffer(timespan, unit, count) 會為來源 Observable 發出的每 count 個項目發出一個新的項目 List,或者,如果自上次捆綁發出以來已過了 timespan,它會發出在該時間跨度內來源 Observable 發出的任意數量的項目捆綁包,即使這少於 count。此運算子的變體也有一個版本,它接受 Scheduler 作為參數,並使用它來管理時間跨度;預設情況下,此變體使用 computation scheduler。

buffer(timespan, timeshift, unit[, scheduler])

buffer(timespan,timeshift,unit)

buffer(timespan, timeshift, unit) 每隔 timeshift 的時間建立一個新的項目 List,並以自該時間起直到自捆綁包建立以來經過 timespan 時間,來源 Observable 發出的每個項目填滿此捆綁包,然後發出此 List 作為自己的發出項目。如果 timespan 長於 timeshift,則發出的捆綁包將表示重疊的時間段,因此它們可能包含重複的項目。此運算子的變體也有一個版本,它接受 Scheduler 作為參數,並使用它來管理時間跨度;預設情況下,此變體使用 computation scheduler。

在 Rx.NET 中,有幾個 Buffer 的變體。對於每個變體,您可以將來源 Observable 作為第一個參數傳入,或者您可以將其作為來源 Observable 的實例方法呼叫(在這種情況下,您可以省略該參數)

Buffer(count)

Buffer(count)

Buffer(count) 會以 IList 的形式發出非重疊的緩衝區,每個緩衝區最多包含來源 Observable 的 count 個項目(最後發出的 IList 可能少於 count 個項目)。

Buffer(count, skip)

Buffer(count,skip)

Buffer(count, skip) 會建立一個新的緩衝區,從來源 Observable 發出的第一個項目開始,然後每隔 skip 個項目建立一個新的緩衝區,並在每個緩衝區中填入 count 個項目:初始項目和後續的 count-1 個項目。它會將這些緩衝區作為 IList 發出。根據 countskip 的值,這些緩衝區可能會重疊(多個緩衝區可能包含相同的項目),或者它們可能會有間隙(來源 Observable 發出的項目未在任何緩衝區中表示)。

Buffer(bufferClosingSelector)

Buffer(bufferClosingSelector)

當它訂閱來源 Observable 時,Buffer(bufferClosingSelector) 開始將其發出收集到 IList 中,並且它還會呼叫 bufferClosingSelector 來產生第二個 Observable。當第二個 Observable 發出一個 TBufferClosing 物件時,Buffer 會發出目前的 IList 並重複此過程:開始新的 IList 並呼叫 bufferClosingSelector 來建立新的 Observable 以進行監控。它會這樣做直到來源 Observable 終止。

Buffer(bufferOpenings,bufferClosingSelector)

Buffer(bufferOpenings,bufferClosingSelector)

Buffer(bufferOpenings, bufferClosingSelector) 會監控一個 Observable,BufferOpenings,它會發出 TBufferOpening 物件。每次它觀察到這樣發出的項目時,它都會建立一個新的 IList 來開始收集來源 Observable 發出的項目,並將 TBufferOpening 物件傳遞到 bufferClosingSelector 函式。該函式會傳回一個 Observable。Buffer 會監控該 Observable,當它偵測到它發出的項目時,它會關閉 IList 並將其作為自己的發出發出。

Buffer(timeSpan)

Buffer(timeSpan)

Buffer(timeSpan) 會定期發出一個新的項目 IList,每隔 timeSpan 時間,其中包含自上次捆綁發出以來(如果是第一個列表,則自訂閱來源 Observable 以來)來源 Observable 發出的所有項目。此運算子的這個變體也有一個版本,它接受 IScheduler 作為參數,並使用它來管理時間跨度。

Buffer(timeSpan, count)

Buffer(timeSpan,count)

Buffer(timeSpan, count) 會針對來源 Observable 發出的每 count 個項目發出一個新的項目 IList,或者,如果自上次列表發出以來已經過了 timeSpan 時間,它會發出一個包含該時間跨度內來源 Observable 發出的任意數量的項目的列表,即使這個數量少於 count。此運算子的這個變體也有一個版本,它接受 IScheduler 作為參數,並使用它來管理時間跨度。

Buffer(timeSpan, timeShift)

Buffer(timeSpan,timeShift)

Buffer(timeSpan, timeShift) 會每隔 timeShift 時間段建立一個新的項目 IList,並在這個列表中填入自該時間起,直到自列表建立以來經過 timeSpan 時間內,來源 Observable 發出的每個項目,然後將此 IList 作為自己的發出發出。如果 timeSpan 長於 timeShift,則發出的列表將表示重疊的時間段,因此它們可能包含重複的項目。此運算子的這個變體也有一個版本,它接受 IScheduler 作為參數,並使用它來管理時間跨度。

RxPHP 將此運算子實作為 bufferWithCount

將可觀察序列的每個元素投影到零個或多個基於元素計數資訊產生的緩衝區中。

範例程式碼

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/bufferWithCount/bufferWithCount.php

$source = Rx\Observable::range(1, 6)
    ->bufferWithCount(2)
    ->subscribe($stdoutObserver);

   
Next value: [1,2]
Next value: [3,4]
Next value: [5,6]
Complete!
    
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/bufferWithCount/bufferWithCountAndSkip.php

$source = Rx\Observable::range(1, 6)
    ->bufferWithCount(2, 1)
    ->subscribe($stdoutObserver);

   
Next value: [1,2]
Next value: [2,3]
Next value: [3,4]
Next value: [4,5]
Next value: [5,6]
Next value: [6]
Complete!
    

RxPY 有幾個 Buffer 變體:bufferbuffer_with_countbuffer_with_timebuffer_with_time_or_count。對於每個這些變體,都有可選參數可以更改運算子的行為。在 RxPY 中,當運算子可能接受多個可選參數時,請務必在您呼叫運算子時,在參數列表中命名參數,以避免歧義。

buffer(buffer_openings)

buffer(buffer_openings)

buffer(buffer_openings=boundaryObservable) 會監控一個 Observable,buffer_openings。每次該 Observable 發出一個項目時,它都會建立一個新的陣列來開始收集來源 Observable 發出的項目,並發出先前的陣列。

buffer(closing_selector)

buffer(closing_selector)

buffer(closing_selector=closingSelector) 會在訂閱後立即開始收集來源 Observable 發出的項目,並且還會呼叫 closing_selector 函式來產生第二個 Observable。它會監控這個新的 Observable,並且當它完成或發出一個項目時,它會發出目前的陣列,開始一個新的陣列來收集來源 Observable 中的項目,並再次呼叫 closing_selector 來產生一個新的 Observable 以進行監控,以確定何時發出新的陣列。它會重複此過程直到來源 Observable 終止,然後發出最終的陣列。

buffer(closing_selector,buffer_closing_selector)

buffer(closing_selector=openingSelector, buffer_closing_selector=closingSelector) 首先呼叫 closing_selector 來取得一個 Observable。它會監控這個 Observable,並且每當它發出一個項目時,buffer 就會建立一個新的陣列,開始將後續來源 Observable 發出的項目收集到這個陣列中,並呼叫 buffer_closing_selector 來取得一個新的 Observable 以管理該陣列的關閉。當這個新的 Observable 發出一個項目或終止時,buffer 會關閉並發出該 Observable 管理的陣列。

buffer_with_count(count)

buffer_with_count(count)

buffer_with_count(count) 會以陣列的形式發出非重疊的緩衝區,每個緩衝區最多包含來源 Observable 的 count 個項目(最後發出的陣列可能少於 count 個項目)。

buffer_with_count(count, skip)

buffer_with_count(count,skip)

buffer_with_count(count, skip=skip) 會建立一個新的緩衝區,從來源 Observable 發出的第一個項目開始,然後每隔 skip 個項目建立一個新的緩衝區,並在每個緩衝區中填入 count 個項目:初始項目和後續的 count-1 個項目。它會將這些緩衝區作為陣列發出。根據 countskip 的值,這些緩衝區可能會重疊(多個緩衝區可能包含相同的項目),或者它們可能會有間隙(來源 Observable 發出的項目未在任何緩衝區中表示)。

buffer_with_time(timespan)

buffer_with_time(timespan)

buffer_with_time(timespan) 會定期發出一個新的項目陣列,每隔 timespan 毫秒,其中包含自上次捆綁發出以來(如果是第一個捆綁,則自訂閱來源 Observable 以來)來源 Observable 發出的所有項目。此運算子的這個變體也有一個版本,它接受 scheduler 參數,並使用它來管理時間跨度;預設情況下,此變體使用 timeout 排程器。

buffer_with_time(timespan, timeshift)

buffer_with_time(timespan,timeshift)

buffer(timespan, timeshift=timeshift) 會每隔 timeshift 毫秒建立一個新的項目陣列,並在這個陣列中填入自該時間起,直到自陣列建立以來經過 timespan 毫秒的時間內,來源 Observable 發出的每個項目,然後將此陣列作為其自己的發出發出。如果 timespan 長於 timeshift,則發出的陣列將表示重疊的時間段,因此它們可能包含重複的項目。此運算子的這個變體也有一個版本,它接受 scheduler 參數,並使用它來管理時間跨度;預設情況下,此變體使用 timeout 排程器。

buffer_with_time_or_count(timespan, count)

buffer_with_time_or_count(timespan,count)

buffer_with_time_or_count(timespan, count) 會針對來源 Observable 發出的每 count 個項目發出一個新的項目陣列,或者,如果自上次捆綁發出以來已經過了 timespan 毫秒,它會發出一個包含該時間跨度內來源 Observable 發出的任意數量的項目的陣列,即使這個數量少於 count。此運算子的這個變體也有一個版本,它接受 scheduler 參數,並使用它來管理時間跨度;預設情況下,此變體使用 timeout 排程器。

Rx.rb 有三個 Buffer 運算子的變體

buffer_with_count(count)

buffer_with_count(count)

buffer_with_count(count) 會以陣列的形式發出非重疊的緩衝區,每個緩衝區最多包含來源 Observable 的 count 個項目(最後發出的陣列可能少於 count 個項目)。

buffer_with_count(count,skip)

buffer_with_count(count,skip)

buffer_with_count(count, skip=skip) 會建立一個新的緩衝區,從來源 Observable 發出的第一個項目開始,然後每隔 skip 個項目建立一個新的緩衝區,並在每個緩衝區中填入 count 個項目:初始項目和後續的 count-1 個項目。它會將這些緩衝區作為陣列發出。根據 countskip 的值,這些緩衝區可能會重疊(多個緩衝區可能包含相同的項目),或者它們可能會有間隙(來源 Observable 發出的項目未在任何緩衝區中表示)。

buffer_with_time(timespan)

buffer_with_time(timespan)

buffer_with_time(timespan) 會定期發出一個新的項目陣列,每隔 timespan 毫秒,其中包含自上次捆綁發出以來(如果是第一個捆綁,則自訂閱來源 Observable 以來)來源 Observable 發出的所有項目。

RxScala 有兩種 Buffer 變體 — slidingBuffertumblingBuffer — 每一種都有不同的變體,這些變體以不同的方式組裝它們發出的緩衝區

slidingBuffer(count, skip)

slidingBuffer(count,skip)

slidingBuffer(count, skip) 會建立一個新的緩衝區,從來源 Observable 發出的第一個項目開始,然後每隔 skip 個項目建立一個,並以 count 個項目填滿每個緩衝區:初始項目和後續的 count-1 個項目。它會將這些緩衝區以 Seq 的形式發出。根據 countskip 的值,這些緩衝區可能會重疊(多個緩衝區可能包含相同的項目),或者它們可能會出現間隙(來源 Observable 發出的項目未在任何緩衝區中表示)。

slidingBuffer(timespan, timeshift)

slidingBuffer(timespan,timeshift)

slidingBuffer(timespan, timeshift) 每隔 timeshift (一個 Duration) 就會建立一個新的項目 Seq,並以來源 Observable 從該時間點起發出的每個項目填滿此緩衝區,直到自緩衝區建立以來經過 timespan (也是一個 Duration) 為止,然後將此 Seq 作為自己的發射物發出。如果 timespantimeshift 長,則發射的陣列將代表重疊的時間段,因此它們可能包含重複的項目。此運算子的這個變體還有一個版本,它會將 Scheduler 作為參數,並使用它來控制時間跨度。

slidingBuffer(openings, closings)

slidingBuffer(openings,closings)

slidingBuffer(openings,closings) 監視 openings Observable,並且每當它發出一個 Opening 項目時,slidingBuffer 會建立一個新的 Seq,開始將來源 Observable 後續發出的項目收集到此緩衝區中,並呼叫 closings 以取得一個新的 Observable 來控制該緩衝區的關閉。當這個新的 Observable 發出一個項目或終止時,slidingBuffer 會關閉並發出由該 Observable 控制的 Seq

tumblingBuffer(count)

tumblingBuffer(count)

tumblingBuffer(count)Seq 的形式發出非重疊的緩衝區,每個緩衝區最多包含來自來源 Observable 的 count 個項目(最後發出的緩衝區可能少於 count 個項目)。

tumblingBuffer(boundary)

tumblingBuffer(boundary)

tumblingBuffer(boundary) 監視一個 Observable,boundary。每次該 Observable 發出一個項目時,它會建立一個新的 Seq 來開始收集來源 Observable 發出的項目,並發出先前的 Seq。此運算子的這個變體有一個可選的第二個參數 initialCapacity,您可以使用它來指示這些緩衝區的預期大小,以便更有效率地配置記憶體。

tumblingBuffer(timespan)

tumblingBuffer(timespan)

tumblingBuffer(timespan) 每隔 timespan (一個 Duration) 就會定期發出一個新的項目 Seq,其中包含來源 Observable 自上一個捆綁發射以來或,如果這是第一個捆綁包,則自訂閱來源 Observable 以來發出的所有項目。此運算子的這個變體有一個可選的第二個參數 scheduler,您可以使用它來設定要控制時間跨度計算的 Scheduler

tumblingBuffer(timespan, count)

tumblingBuffer(timespan,count)

tumblingBuffer(timespan, count) 會針對來源 Observable 發出的每 count 個項目發出一個新的項目 Seq,或者,如果自上次捆綁發射以來已經過 timespan (一個 Duration),則它會發出一個 Seq,其中包含來源 Observable 在該時間跨度內發射的任何項目數,即使這少於 count。此運算子的這個變體有一個可選的第三個參數 scheduler,您可以使用它來設定要控制時間跨度計算的 Scheduler