一個 可連接的 Observable 類似於一個普通的 Observable,除了它在被訂閱時不會開始發射項目,而是只有在將 Connect 運算子應用於它時才會開始。 這樣,您可以在您選擇的時間提示 Observable 開始發射項目。
如果您在將 Observable 轉換為可連接的 Observable 之前,將 Replay 運算子應用於它,則產生的可連接的 Observable 將始終向任何未來的觀察者發射相同的完整序列,即使這些觀察者是在可連接的 Observable 開始向其他訂閱的觀察者發射項目之後才訂閱的。
在 RxGroovy 中,replay
運算子有多種變體,可以返回一個可連接的 Observable。 您必須先 Publish 這個可連接的 Observable,觀察者才能訂閱它,然後 Connect 它才能觀察其發射。
此種 replay
運算子的變體允許您設定最大緩衝區大小,以限制 replay
將緩衝並重播到後續觀察者的項目數量,和/或建立一個移動的時間窗口,該窗口定義何時發射的項目變得太舊而無法緩衝和重播。
replay()
replay(Scheduler)
replay(int)
replay(int,Scheduler)
replay(long,TimeUnit)
replay(long,TimeUnit,Scheduler)
replay(int,long,TimeUnit)
replay(int,long,TimeUnit,Scheduler)
還有一個 replay
的變體,它返回一個普通的 Observable。這些變體將一個轉換函數作為參數;此函數接受來源 Observable 發射的項目作為其參數,並返回一個將由結果 Observable 發射的項目。因此,實際上,此運算子不會重播來源 Observable,而是重播此函數 *轉換的* 來源 Observable。
此種 replay
運算子的變體允許您設定最大緩衝區大小,以限制 replay
將緩衝並重播到後續觀察者的項目數量,和/或建立一個移動的時間窗口,該窗口定義何時發射的項目變得太舊而無法緩衝和重播。
replay(Func1)
replay(Func1,Scheduler)
replay(Func1,int)
replay(Func1,int,Scheduler)
replay(Func1,long,TimeUnit)
replay(Func1,long,TimeUnit,Scheduler)
replay(Func1,int,long,TimeUnit)
replay(Func1,int,long,TimeUnit,Scheduler)
在 RxJava 中,replay
運算子有多種變體,可以返回一個可連接的 Observable。 您必須先 Publish 這個可連接的 Observable,觀察者才能訂閱它,然後 Connect 它才能觀察其發射。
此種 replay
運算子的變體允許您設定最大緩衝區大小,以限制 replay
將緩衝並重播到後續觀察者的項目數量,和/或建立一個移動的時間窗口,該窗口定義何時發射的項目變得太舊而無法緩衝和重播。
replay()
replay(Scheduler)
replay(int)
replay(int,Scheduler)
replay(long,TimeUnit)
replay(long,TimeUnit,Scheduler)
replay(int,long,TimeUnit)
replay(int,long,TimeUnit,Scheduler)
還有一個 replay
的變體,它返回一個普通的 Observable。這些變體將一個轉換函數作為參數;此函數接受來源 Observable 發射的項目作為其參數,並返回一個將由結果 Observable 發射的項目。因此,實際上,此運算子不會重播來源 Observable,而是重播此函數 *轉換的* 來源 Observable。
此種 replay
運算子的變體允許您設定最大緩衝區大小,以限制 replay
將緩衝並重播到後續觀察者的項目數量,和/或建立一個移動的時間窗口,該窗口定義何時發射的項目變得太舊而無法緩衝和重播。
replay(Func1)
replay(Func1,Scheduler)
replay(Func1,int)
replay(Func1,int,Scheduler)
replay(Func1,long,TimeUnit)
replay(Func1,long,TimeUnit,Scheduler)
replay(Func1,int,long,TimeUnit)
replay(Func1,int,long,TimeUnit,Scheduler)
在 RxJs 中,replay
運算子採用四個可選參數並返回一個普通的 Observable
selector
bufferSize
window
scheduler
var interval = Rx.Observable.interval(1000); var source = interval .take(2) .do(function (x) { console.log('Side effect'); }); var published = source .replay(function (x) { return x.take(2).repeat(2); }, 3); published.subscribe(createObserver('SourceA')); published.subscribe(createObserver('SourceB')); function createObserver(tag) { return Rx.Observer.create( function (x) { console.log('Next: ' + tag + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); }); }
Side effect Next: SourceA0 Side effect Next: SourceB0 Side effect Next: SourceA1 Next: SourceA0 Next: SourceA1 Completed Side effect Next: SourceB1 Next: SourceB0 Next: SourceB1 Completed
還有一個 shareReplay
運算子,它會追蹤觀察者的數量,並在該數量降至零時與來源 Observable 斷開連接。 shareReplay
採用三個可選參數並返回一個普通的 Observable
bufferSize
window
scheduler
var interval = Rx.Observable.interval(1000); var source = interval .take(4) .doAction(function (x) { console.log('Side effect'); }); var published = source .shareReplay(3); published.subscribe(createObserver('SourceA')); published.subscribe(createObserver('SourceB')); // Creating a third subscription after the previous two subscriptions have // completed. Notice that no side effects result from this subscription, // because the notifications are cached and replayed. Rx.Observable .return(true) .delay(6000) .flatMap(published) .subscribe(createObserver('SourceC')); function createObserver(tag) { return Rx.Observer.create( function (x) { console.log('Next: ' + tag + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); }); }
Side effect Next: SourceA0 Next: SourceB0 Side effect Next: SourceA1 Next: SourceB1 Side effect Next: SourceA2 Next: SourceB2 Side effect Next: SourceA3 Next: SourceB3 Completed Completed Next: SourceC1 Next: SourceC2 Next: SourceC3 Completed
replay
和 shareReplay
位於以下發行版中
rx.all.js
rx.all.compat.js
rx.binding.js
(需要 rx.js
或 rx.compat.js
)rx.lite.js
rx.lite.compat.js
待定
RxPHP 將此運算子實作為 replay
。
返回一個可觀察的序列,該序列是在一個可連接的可觀察序列上調用選擇器所產生的結果,該序列會共享對基礎序列的單一訂閱,並根據重播緩衝區的最大時間長度重播通知。 此運算子是使用 ReplaySubject 的 Multicast 的一種特殊形式。
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/replay/replay.php $interval = \Rx\Observable::interval(1000); $source = $interval ->take(2) ->doOnNext(function ($x) { echo $x, ' something', PHP_EOL; echo 'Side effect', PHP_EOL; }); $published = $source ->replay(function (\Rx\Observable $x) { return $x->take(2)->repeat(2); }, 3); $published->subscribe($createStdoutObserver('SourceA ')); $published->subscribe($createStdoutObserver('SourceB '));
0 something Side effect 0 something Side effect SourceA Next value: 0 SourceB Next value: 0 SourceA Next value: 0 SourceB Next value: 0 SourceA Next value: 0 SourceB Next value: 0 SourceA Next value: 0 SourceA Complete! SourceB Next value: 0 SourceB Complete! 1 something Side effect 1 something Side effect
RxPHP 還有一個運算子 shareReplay
。
返回一個可觀察的序列,該序列會共享對基礎序列的單一訂閱,並根據重播緩衝區的最大時間長度重播通知。 此運算子是重播的一種特殊形式,當觀察者的數量從零變為一時會建立訂閱,然後與所有後續觀察者共享該訂閱,直到觀察者的數量回到零,此時會處置訂閱。
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/share/shareReplay.php $interval = Rx\Observable::interval(1000); $source = $interval ->take(4) ->doOnNext(function ($x) { echo 'Side effect', PHP_EOL; }); $published = $source ->shareReplay(3); $published->subscribe($createStdoutObserver('SourceA ')); $published->subscribe($createStdoutObserver('SourceB ')); Rx\Observable ::of(true) ->concatMapTo(\Rx\Observable::timer(6000)) ->flatMap(function () use ($published) { return $published; }) ->subscribe($createStdoutObserver('SourceC '));
Side effect SourceA Next value: 0 SourceB Next value: 0 Side effect SourceA Next value: 1 SourceB Next value: 1 Side effect SourceA Next value: 2 SourceB Next value: 2 Side effect SourceA Next value: 3 SourceB Next value: 3 SourceA Complete! SourceB Complete! SourceC Next value: 1 SourceC Next value: 2 SourceC Next value: 3 SourceC Complete!
待定
待定