一個 可連接的 Observable 類似於一個普通的 Observable,但它並不是在被訂閱時開始發射項目,而是在 Connect 操作符被應用到它時。這樣,你可以選擇在何時提示 Observable 開始發射項目。
RefCount 操作符自動化了連接和斷開與可連接 Observable 的過程。它作用於一個可連接的 Observable 並返回一個普通的 Observable。當第一個觀察者訂閱這個 Observable 時,RefCount 會連接到底層的可連接 Observable。RefCount 接著會追蹤有多少其他觀察者訂閱了它,並且只有在最後一個觀察者取消訂閱後才會斷開與底層可連接 Observable 的連接。
待定
RxGroovy 將此操作符實作為 refCount
。
refCount()
還有一個 share
操作符,它等同於將 publish
和 refCount
操作符按順序應用到一個 Observable 上。
share()
RxJava 將此操作符實作為 refCount
。
refCount()
還有一個 share
操作符,它等同於將 publish
和 refCount
操作符按順序應用到一個 Observable 上。
share()
RxJava 將此操作符實作為 refCount
。
var interval = Rx.Observable.interval(1000); var source = interval .take(2) .doAction(function (x) { console.log('Side effect'); }); var published = source.publish().refCount(); published.subscribe(createObserver('SourceA')); published.subscribe(createObserver('SourceB')); function createObserver(tag) { return Rx.Observer.create( function (x) { console.log('Next: ' + tag + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); }); }
Side effect Next: SourceA0 Next: SourceB0 Side effect Next: SourceA1 Next: SourceB1 Completed Completed
refCount
在以下發行版本中找到
rx.all.js
rx.all.compat.js
rx.binding.js
(需要 rx.js
、rx.compat.js
、rx.lite.js
或 rx.lite.compat.js
)rx.lite.js
rx.lite.compat.js
還有一個 share
操作符,它等同於將 publish
和 refCount
操作符按順序應用到一個 Observable 上。一個名為 shareValue
的變體接受一個單一項目作為參數,它將在開始發射來自來源 Observable 的項目之前,發射給任何訂閱者。
var interval = Rx.Observable.interval(1000); var source = interval .take(2) .do( function (x) { console.log('Side effect'); }); var published = source.share(); // When the number of observers subscribed to published observable goes from // 0 to 1, we connect to the underlying observable sequence. published.subscribe(createObserver('SourceA')); // When the second subscriber is added, no additional subscriptions are added to the // underlying observable sequence. As a result the operations that result in side // effects are not repeated per subscriber. published.subscribe(createObserver('SourceB')); function createObserver(tag) { return Rx.Observer.create( function (x) { console.log('Next: ' + tag + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); }); }
Side effect Next: SourceA0 Next: SourceB0 Side effect Next: SourceA1 Next: SourceB1 Completed
share
和 shareValue
在以下發行版本中找到
rx.all.js
rx.all.compat.js
rx.binding.js
(需要 rx.js
或 rx.compat.js
)rx.lite.js
rx.lite.compat.js
RxPHP 將此操作符實作為 share
。
返回一個可觀察序列,它與底層序列共享一個單一訂閱。此操作符是 publish 的一個特例,它在觀察者的數量從零變為一時創建訂閱,然後與所有後續的觀察者共享該訂閱,直到觀察者的數量恢復為零,此時訂閱將被釋放。
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/share/share.php //With Share $source = \Rx\Observable::interval(1000) ->take(2) ->doOnNext(function ($x) { echo "Side effect\n"; }); $published = $source->share(); $published->subscribe($createStdoutObserver('SourceA ')); $published->subscribe($createStdoutObserver('SourceB '));
Side effect SourceA Next value: 0 SourceB Next value: 0 Side effect SourceA Next value: 1 SourceB Next value: 1 SourceA Complete! SourceB Complete!
RxPHP 也有一個操作符 singleInstance
。
返回一個可觀察序列,它與底層序列共享一個單一訂閱。即使所有先前的訂閱都已結束,此可觀察序列也可以重新訂閱。此操作符的行為類似於 RxJS 5 中的 share()
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/share/singleInstance.php $interval = Rx\Observable::interval(1000); $source = $interval ->take(2) ->do(function () { echo 'Side effect', PHP_EOL; }); $single = $source->singleInstance(); // two simultaneous subscriptions, lasting 2 seconds $single->subscribe($createStdoutObserver('SourceA ')); $single->subscribe($createStdoutObserver('SourceB ')); \Rx\Observable::timer(5000)->subscribe(function () use ($single, &$createStdoutObserver) { // resubscribe two times again, more than 5 seconds later, // long after the original two subscriptions have ended $single->subscribe($createStdoutObserver('SourceC ')); $single->subscribe($createStdoutObserver('SourceD ')); });
Side effect SourceA Next value: 0 SourceB Next value: 0 Side effect SourceA Next value: 1 SourceB Next value: 1 SourceA Complete! SourceB Complete! Side effect SourceC Next value: 0 SourceD Next value: 0 Side effect SourceC Next value: 1 SourceD Next value: 1 SourceC Complete! SourceD Complete!
RxPHP 也有一個操作符 shareValue
。
返回一個可觀察序列,它與底層序列共享一個單一訂閱,並以 initialValue 開始。此操作符是 publishValue 的一個特例,它在觀察者的數量從零變為一時創建訂閱,然後與所有後續的觀察者共享該訂閱,直到觀察者的數量恢復為零,此時訂閱將被釋放。
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/share/shareValue.php $source = \Rx\Observable::interval(1000) ->take(2) ->doOnNext(function ($x) { echo "Side effect\n"; }); $published = $source->shareValue(42); $published->subscribe($createStdoutObserver('SourceA ')); $published->subscribe($createStdoutObserver('SourceB '));
SourceA Next value: 42 SourceB Next value: 42 Side effect SourceA Next value: 0 SourceB Next value: 0 Side effect SourceA Next value: 1 SourceB Next value: 1 SourceA Complete! SourceB Complete!
待定