可連接的 Observable 類似於普通的 Observable,不同之處在於它不會在被訂閱時開始發射項目,而只會在 Connect 操作符被應用於它時才開始發射。通過這種方式,你可以提示 Observable 在你選擇的時間開始發射項目。
待定
RxGroovy 將此操作符實現為 publish
。
publish()
還有一個變體將函數作為參數。此函數將來源 Observable 發射的項目作為參數,並產生將由結果 Observable 發射的項目。
publish(Func1)
RxJava 將此操作符實現為 publish
。
publish()
還有一個變體將函數作為參數。此函數將 `ConnectableObservable` 作為參數,該 `ConnectableObservable` 與底層 Observable 序列共享單個訂閱。此函數產生並返回一個新的 Observable 序列。
publish(Func1)
在 RxJS 中,publish
操作符將函數作為參數。此函數將來源 Observable 發射的項目作為參數,並產生將由返回的 ConnectableObservable
發射的項目。
var interval = Rx.Observable.interval(1000); var source = interval .take(2) .doAction(function (x) { console.log('Side effect'); }); var published = source.publish(); published.subscribe(createObserver('SourceA')); published.subscribe(createObserver('SourceB')); var connection = published.connect(); function createObserver(tag) { return Rx.Observer.create( function (x) { console.log('Next: ' + tag + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); }); }
Side effect Next: SourceA0 Next: SourceB0 Side effect Next: SourceA1 Next: SourceB1 Completed
publishValue
操作符除了上述函數外,還接收一個初始項目,該項目將在連接時由結果的 ConnectableObservable
發射,然後才會發射來源 Observable 的項目。但是,它不會將此初始項目發射到在連接時間之後訂閱的觀察者。
var interval = Rx.Observable.interval(1000); var source = interval .take(2) .doAction(function (x) { console.log('Side effect'); }); var published = source.publishValue(42); published.subscribe(createObserver('SourceA')); published.subscribe(createObserver('SourceB')); var connection = published.connect(); function createObserver(tag) { return Rx.Observer.create( function (x) { console.log('Next: ' + tag + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); }); }
Next: SourceA42 Next: SourceB42 Side effect Next: SourceA0 Next: SourceB0 Side effect Next: SourceA1 Next: SourceB1 Completed Completed
publishLast
操作符與 publish
類似,並將行為類似的函數作為其參數。它與 publish
的不同之處在於,它不是將該函數應用於並發射來源 Observable 在連接後發射的 *每個* 項目,而是僅將該函數應用於並發射來源 Observable 發射的 *最後一個* 項目,當來源 Observable 正常終止時。
var interval = Rx.Observable.interval(1000); var source = interval .take(2) .doAction(function (x) { console.log('Side effect'); }); var published = source.publishLast(); published.subscribe(createObserver('SourceA')); published.subscribe(createObserver('SourceB')); var connection = published.connect(); function createObserver(tag) { return Rx.Observer.create( function (x) { console.log('Next: ' + tag + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); }); }
Side effect Side effect Next: SourceA1 Completed Next: SourceB1 Completed
上述操作符在以下套件中可用
rx.all.js
rx.all.compat.js
rx.binding.js
(需要 rx.js
或 rx.compat.js
)rx.lite.js
rx.lite.compat.js
RxJS 還有一個 multicast
操作符,它對普通的 Observable 進行操作,透過您指定的特定 Subject 多播該 Observable,將轉換函數應用於每個發射,然後將這些轉換的值作為其自己的普通 Observable 序列發射。對此新 Observable 的每個訂閱都會觸發對底層多播 Observable 的新訂閱。
var subject = new Rx.Subject(); var source = Rx.Observable.range(0, 3) .multicast(subject); var observer = Rx.Observer.create( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); } ); var subscription = source.subscribe(observer); subject.subscribe(observer); var connected = source.connect(); subscription.dispose();
Next: 0 Next: 0 Next: 1 Next: 1 Next: 2 Next: 2 Completed
multicast
操作符在以下套件中可用
rx.all.js
rx.all.compat.js
rx.binding.js
(需要 rx.lite.js
或 rx.compat.js
)rx.lite.js
rx.lite.compat.js
還有一個 let
操作符 (別名 letBind
可用於 IE9 之前的 Internet Explorer 等瀏覽器,其中 “let
” 是禁止的)。它類似於 multicast
,但不透過 Subject 多播底層 Observable
var obs = Rx.Observable.range(1, 3); var source = obs.let(function (o) { return o.concat(o); }); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); }); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 1 Next: 2 Next: 3 Next: 1 Next: 2 Next: 3 Completed
let
(或 letBind
) 操作符在以下套件中可用
rx.all.js
rx.all.compat.js
rx.experimental.js
它需要以下其中一個套件
rx.js
rx.compat.js
rx.lite.js
rx.lite.compat.js
RxPHP 將此操作符實現為 multicast
。
透過實例化的 subject,將來源序列通知多播到選取器函數內的所有序列使用。對結果序列的每個訂閱都會導致單獨的多播調用,公開選取器函數調用所產生的序列。對於具有固定 subject 類型的特殊化,請參閱 Publish、PublishLast 和 Replay。
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/multicast/multicast.php $subject = new \Rx\Subject\Subject(); $source = \Rx\Observable::range(0, 3)->multicast($subject); $subscription = $source->subscribe($stdoutObserver); $subject->subscribe($stdoutObserver); $connected = $source->connect();
Next value: 0 Next value: 0 Next value: 1 Next value: 1 Next value: 2 Next value: 2 Complete!
RxPHP 還有一個操作符 multicastWithSelector
。
透過來自 subject 選取器工廠的實例化 subject,將來源序列通知多播到選取器函數內的所有序列使用。對結果序列的每個訂閱都會導致單獨的多播調用,公開選取器函數調用所產生的序列。對於具有固定 subject 類型的特殊化,請參閱 Publish、PublishLast 和 Replay。
RxPHP 還有一個操作符 publish
。
返回一個 observable 序列,該序列是在共享對底層序列的單個訂閱的可連接 observable 序列上調用選取器的結果。此操作符是使用常規 Subject 的 Multicast 特殊化。
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/publish/publish.php /* With publish */ $interval = \Rx\Observable::range(0, 10); $source = $interval ->take(2) ->doOnNext(function ($x) { echo "Side effect\n"; }); $published = $source->publish(); $published->subscribe($createStdoutObserver('SourceC ')); $published->subscribe($createStdoutObserver('SourceD ')); $published->connect();
Side effect SourceC Next value: 0 SourceD Next value: 0 Side effect SourceC Next value: 1 SourceD Next value: 1 SourceC Complete! SourceD Complete!
RxPHP 還有一個操作符 publishLast
。
返回一個 observable 序列,該序列是在共享對底層序列的單個訂閱的可連接 observable 序列上調用選取器的結果,該序列僅包含最後的通知。此操作符是使用 AsyncSubject 的 Multicast 特殊化。
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/publish/publishLast.php $range = \Rx\Observable::fromArray(range(0, 1000)); $source = $range ->take(2) ->doOnNext(function ($x) { echo "Side effect\n"; }); $published = $source->publishLast(); $published->subscribe($createStdoutObserver('SourceA')); $published->subscribe($createStdoutObserver('SourceB')); $connection = $published->connect();
Side effect Side effect SourceANext value: 1 SourceBNext value: 1 SourceAComplete! SourceBComplete!
RxPHP 還有一個操作符 publishValue
。
返回一個 observable 序列,該序列是在共享對底層序列的單個訂閱並以 initialValue 開始的可連接 observable 序列上調用選取器的結果。此操作符是使用 BehaviorSubject 的 Multicast 特殊化。
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/publish/publishValue.php $range = \Rx\Observable::fromArray(range(0, 1000)); $source = $range ->take(2) ->doOnNext(function ($x) { echo "Side effect\n"; }); $published = $source->publishValue(42); $published->subscribe($createStdoutObserver('SourceA')); $published->subscribe($createStdoutObserver('SourceB')); $connection = $published->connect();
SourceANext value: 42 SourceBNext value: 42 Side effect SourceANext value: 0 SourceBNext value: 0 Side effect SourceANext value: 1 SourceBNext value: 1 SourceAComplete! SourceBComplete!
待定