RefCount

使可連接的 Observable 行為像普通的 Observable

RefCount

一個 可連接的 Observable 類似於一個普通的 Observable,但它並不是在被訂閱時開始發射項目,而是在 Connect 操作符被應用到它時。這樣,你可以選擇在何時提示 Observable 開始發射項目。

RefCount 操作符自動化了連接和斷開與可連接 Observable 的過程。它作用於一個可連接的 Observable 並返回一個普通的 Observable。當第一個觀察者訂閱這個 Observable 時,RefCount 會連接到底層的可連接 Observable。RefCount 接著會追蹤有多少其他觀察者訂閱了它,並且只有在最後一個觀察者取消訂閱後才會斷開與底層可連接 Observable 的連接。

參見

特定語言資訊

待定

待定

refCount

RxGroovy 將此操作符實作為 refCount

還有一個 share 操作符,它等同於將 publishrefCount 操作符按順序應用到一個 Observable 上。

refCount

RxJava 將此操作符實作為 refCount

還有一個 share 操作符,它等同於將 publishrefCount 操作符按順序應用到一個 Observable 上。

refCount

RxJava 將此操作符實作為 refCount

範例程式碼

var interval = Rx.Observable.interval(1000);

var source = interval
    .take(2)
    .doAction(function (x) { console.log('Side effect'); });

var published = source.publish().refCount();

published.subscribe(createObserver('SourceA'));
published.subscribe(createObserver('SourceB'));

function createObserver(tag) {
    return Rx.Observer.create(
        function (x) { console.log('Next: ' + tag + x); },
        function (err) { console.log('Error: ' + err); },
        function () { console.log('Completed'); });
}
Side effect
Next: SourceA0
Next: SourceB0
Side effect
Next: SourceA1
Next: SourceB1
Completed
Completed

refCount 在以下發行版本中找到

  • rx.all.js
  • rx.all.compat.js
  • rx.binding.js (需要 rx.jsrx.compat.jsrx.lite.jsrx.lite.compat.js)
  • rx.lite.js
  • rx.lite.compat.js

還有一個 share 操作符,它等同於將 publishrefCount 操作符按順序應用到一個 Observable 上。一個名為 shareValue 的變體接受一個單一項目作為參數,它將在開始發射來自來源 Observable 的項目之前,發射給任何訂閱者。

範例程式碼

var interval = Rx.Observable.interval(1000);

var source = interval
    .take(2)
    .do(
        function (x) { console.log('Side effect'); });

var published = source.share();

// When the number of observers subscribed to published observable goes from
// 0 to 1, we connect to the underlying observable sequence.
published.subscribe(createObserver('SourceA'));
// When the second subscriber is added, no additional subscriptions are added to the
// underlying observable sequence. As a result the operations that result in side
// effects are not repeated per subscriber.
published.subscribe(createObserver('SourceB'));

function createObserver(tag) {
    return Rx.Observer.create(
        function (x) { console.log('Next: ' + tag + x); },
        function (err) { console.log('Error: ' + err); },
        function () { console.log('Completed'); });
}
Side effect
Next: SourceA0
Next: SourceB0
Side effect
Next: SourceA1
Next: SourceB1
Completed

shareshareValue 在以下發行版本中找到

  • rx.all.js
  • rx.all.compat.js
  • rx.binding.js (需要 rx.jsrx.compat.js)
  • rx.lite.js
  • rx.lite.compat.js

RxPHP 將此操作符實作為 share

返回一個可觀察序列,它與底層序列共享一個單一訂閱。此操作符是 publish 的一個特例,它在觀察者的數量從零變為一時創建訂閱,然後與所有後續的觀察者共享該訂閱,直到觀察者的數量恢復為零,此時訂閱將被釋放。

範例程式碼

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/share/share.php

//With Share
$source = \Rx\Observable::interval(1000)
    ->take(2)
    ->doOnNext(function ($x) {
        echo "Side effect\n";
    });

$published = $source->share();

$published->subscribe($createStdoutObserver('SourceA '));
$published->subscribe($createStdoutObserver('SourceB '));

   
Side effect
SourceA Next value: 0
SourceB Next value: 0
Side effect
SourceA Next value: 1
SourceB Next value: 1
SourceA Complete!
SourceB Complete!
    

RxPHP 也有一個操作符 singleInstance

返回一個可觀察序列,它與底層序列共享一個單一訂閱。即使所有先前的訂閱都已結束,此可觀察序列也可以重新訂閱。此操作符的行為類似於 RxJS 5 中的 share()

範例程式碼

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/share/singleInstance.php

$interval = Rx\Observable::interval(1000);

$source = $interval
    ->take(2)
    ->do(function () {
        echo 'Side effect', PHP_EOL;
    });

$single = $source->singleInstance();

// two simultaneous subscriptions, lasting 2 seconds
$single->subscribe($createStdoutObserver('SourceA '));
$single->subscribe($createStdoutObserver('SourceB '));

\Rx\Observable::timer(5000)->subscribe(function () use ($single, &$createStdoutObserver) {
    // resubscribe two times again, more than 5 seconds later,
    // long after the original two subscriptions have ended
    $single->subscribe($createStdoutObserver('SourceC '));
    $single->subscribe($createStdoutObserver('SourceD '));
});

   
Side effect
SourceA Next value: 0
SourceB Next value: 0
Side effect
SourceA Next value: 1
SourceB Next value: 1
SourceA Complete!
SourceB Complete!
Side effect
SourceC Next value: 0
SourceD Next value: 0
Side effect
SourceC Next value: 1
SourceD Next value: 1
SourceC Complete!
SourceD Complete!
    

RxPHP 也有一個操作符 shareValue

返回一個可觀察序列,它與底層序列共享一個單一訂閱,並以 initialValue 開始。此操作符是 publishValue 的一個特例,它在觀察者的數量從零變為一時創建訂閱,然後與所有後續的觀察者共享該訂閱,直到觀察者的數量恢復為零,此時訂閱將被釋放。

範例程式碼

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/share/shareValue.php

$source = \Rx\Observable::interval(1000)
    ->take(2)
    ->doOnNext(function ($x) {
        echo "Side effect\n";
    });

$published = $source->shareValue(42);

$published->subscribe($createStdoutObserver('SourceA '));
$published->subscribe($createStdoutObserver('SourceB '));

   
SourceA Next value: 42
SourceB Next value: 42
Side effect
SourceA Next value: 0
SourceB Next value: 0
Side effect
SourceA Next value: 1
SourceB Next value: 1
SourceA Complete!
SourceB Complete!
    

待定