Replay

確保所有觀察者看到相同的發射項目序列,即使他們在 Observable 開始發射項目後才訂閱

Replay

一個 可連接的 Observable 類似於一個普通的 Observable,除了它在被訂閱時不會開始發射項目,而是只有在將 Connect 運算子應用於它時才會開始。 這樣,您可以在您選擇的時間提示 Observable 開始發射項目。

如果您在將 Observable 轉換為可連接的 Observable 之前,將 Replay 運算子應用於它,則產生的可連接的 Observable 將始終向任何未來的觀察者發射相同的完整序列,即使這些觀察者是在可連接的 Observable 開始向其他訂閱的觀察者發射項目之後才訂閱的。

另請參閱

特定語言資訊

replay

在 RxGroovy 中,replay 運算子有多種變體,可以返回一個可連接的 Observable。 您必須先 Publish 這個可連接的 Observable,觀察者才能訂閱它,然後 Connect 它才能觀察其發射。

此種 replay 運算子的變體允許您設定最大緩衝區大小,以限制 replay 將緩衝並重播到後續觀察者的項目數量,和/或建立一個移動的時間窗口,該窗口定義何時發射的項目變得太舊而無法緩衝和重播。

replay

還有一個 replay 的變體,它返回一個普通的 Observable。這些變體將一個轉換函數作為參數;此函數接受來源 Observable 發射的項目作為其參數,並返回一個將由結果 Observable 發射的項目。因此,實際上,此運算子不會重播來源 Observable,而是重播此函數 *轉換的* 來源 Observable。

此種 replay 運算子的變體允許您設定最大緩衝區大小,以限制 replay 將緩衝並重播到後續觀察者的項目數量,和/或建立一個移動的時間窗口,該窗口定義何時發射的項目變得太舊而無法緩衝和重播。

replay

在 RxJava 中,replay 運算子有多種變體,可以返回一個可連接的 Observable。 您必須先 Publish 這個可連接的 Observable,觀察者才能訂閱它,然後 Connect 它才能觀察其發射。

此種 replay 運算子的變體允許您設定最大緩衝區大小,以限制 replay 將緩衝並重播到後續觀察者的項目數量,和/或建立一個移動的時間窗口,該窗口定義何時發射的項目變得太舊而無法緩衝和重播。

replay

還有一個 replay 的變體,它返回一個普通的 Observable。這些變體將一個轉換函數作為參數;此函數接受來源 Observable 發射的項目作為其參數,並返回一個將由結果 Observable 發射的項目。因此,實際上,此運算子不會重播來源 Observable,而是重播此函數 *轉換的* 來源 Observable。

此種 replay 運算子的變體允許您設定最大緩衝區大小,以限制 replay 將緩衝並重播到後續觀察者的項目數量,和/或建立一個移動的時間窗口,該窗口定義何時發射的項目變得太舊而無法緩衝和重播。

replay

在 RxJs 中,replay 運算子採用四個可選參數並返回一個普通的 Observable

selector
一個轉換函數,它接受來源 Observable 發射的項目作為其參數,並返回一個將由結果 Observable 發射的項目
bufferSize
緩衝並重播給後續觀察者的最大項目數
window
在此緩衝區中項目可能被丟棄而不會發射給後續觀察者的存在時間(以毫秒為單位)
scheduler
此運算子將在其上運作的 Scheduler

程式碼範例

var interval = Rx.Observable.interval(1000);

var source = interval
    .take(2)
    .do(function (x) {
        console.log('Side effect');
    });

var published = source
    .replay(function (x) {
        return x.take(2).repeat(2);
    }, 3);

published.subscribe(createObserver('SourceA'));
published.subscribe(createObserver('SourceB'));

function createObserver(tag) {
    return Rx.Observer.create(
        function (x) { console.log('Next: ' + tag + x); },
        function (err) { console.log('Error: ' + err); },
        function () { console.log('Completed'); });
}
Side effect
Next: SourceA0
Side effect
Next: SourceB0
Side effect
Next: SourceA1
Next: SourceA0
Next: SourceA1
Completed
Side effect
Next: SourceB1
Next: SourceB0
Next: SourceB1
Completed

還有一個 shareReplay 運算子,它會追蹤觀察者的數量,並在該數量降至零時與來源 Observable 斷開連接。 shareReplay 採用三個可選參數並返回一個普通的 Observable

bufferSize
緩衝並重播給後續觀察者的最大項目數
window
在此緩衝區中項目可能被丟棄而不會發射給後續觀察者的存在時間(以毫秒為單位)
scheduler
此運算子將在其上運作的 Scheduler

程式碼範例

var interval = Rx.Observable.interval(1000);

var source = interval
    .take(4)
    .doAction(function (x) {
        console.log('Side effect');
    });

var published = source
    .shareReplay(3);

published.subscribe(createObserver('SourceA'));
published.subscribe(createObserver('SourceB'));

// Creating a third subscription after the previous two subscriptions have
// completed. Notice that no side effects result from this subscription,
// because the notifications are cached and replayed.
Rx.Observable
    .return(true)
    .delay(6000)
    .flatMap(published)
    .subscribe(createObserver('SourceC'));

function createObserver(tag) {
    return Rx.Observer.create(
        function (x) { console.log('Next: ' + tag + x); },
        function (err) { console.log('Error: ' + err); },
        function () { console.log('Completed'); });
}
Side effect
Next: SourceA0
Next: SourceB0
Side effect
Next: SourceA1
Next: SourceB1
Side effect
Next: SourceA2
Next: SourceB2
Side effect
Next: SourceA3
Next: SourceB3
Completed
Completed
Next: SourceC1
Next: SourceC2
Next: SourceC3
Completed

replayshareReplay 位於以下發行版中

  • rx.all.js
  • rx.all.compat.js
  • rx.binding.js (需要 rx.jsrx.compat.js
  • rx.lite.js
  • rx.lite.compat.js

待定

RxPHP 將此運算子實作為 replay

返回一個可觀察的序列,該序列是在一個可連接的可觀察序列上調用選擇器所產生的結果,該序列會共享對基礎序列的單一訂閱,並根據重播緩衝區的最大時間長度重播通知。 此運算子是使用 ReplaySubject 的 Multicast 的一種特殊形式。

程式碼範例

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/replay/replay.php

$interval = \Rx\Observable::interval(1000);

$source = $interval
    ->take(2)
    ->doOnNext(function ($x) {
        echo $x, ' something', PHP_EOL;
        echo 'Side effect', PHP_EOL;
    });

$published = $source
    ->replay(function (\Rx\Observable $x) {
        return $x->take(2)->repeat(2);
    }, 3);

$published->subscribe($createStdoutObserver('SourceA '));
$published->subscribe($createStdoutObserver('SourceB '));

   
0 something
Side effect
0 something
Side effect
SourceA Next value: 0
SourceB Next value: 0
SourceA Next value: 0
SourceB Next value: 0
SourceA Next value: 0
SourceB Next value: 0
SourceA Next value: 0
SourceA Complete!
SourceB Next value: 0
SourceB Complete!
1 something
Side effect
1 something
Side effect
    

RxPHP 還有一個運算子 shareReplay

返回一個可觀察的序列,該序列會共享對基礎序列的單一訂閱,並根據重播緩衝區的最大時間長度重播通知。 此運算子是重播的一種特殊形式,當觀察者的數量從零變為一時會建立訂閱,然後與所有後續觀察者共享該訂閱,直到觀察者的數量回到零,此時會處置訂閱。

程式碼範例

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/share/shareReplay.php

$interval = Rx\Observable::interval(1000);

$source = $interval
    ->take(4)
    ->doOnNext(function ($x) {
        echo 'Side effect', PHP_EOL;
    });

$published = $source
    ->shareReplay(3);

$published->subscribe($createStdoutObserver('SourceA '));
$published->subscribe($createStdoutObserver('SourceB '));

Rx\Observable
    ::of(true)
    ->concatMapTo(\Rx\Observable::timer(6000))
    ->flatMap(function () use ($published) {
        return $published;
    })
    ->subscribe($createStdoutObserver('SourceC '));

   
Side effect
SourceA Next value: 0
SourceB Next value: 0
Side effect
SourceA Next value: 1
SourceB Next value: 1
Side effect
SourceA Next value: 2
SourceB Next value: 2
Side effect
SourceA Next value: 3
SourceB Next value: 3
SourceA Complete!
SourceB Complete!
SourceC Next value: 1
SourceC Next value: 2
SourceC Next value: 3
SourceC Complete!
    

待定

待定