Publish

將普通的 Observable 轉換為可連接的 Observable

Publish

可連接的 Observable 類似於普通的 Observable,不同之處在於它不會在被訂閱時開始發射項目,而只會在 Connect 操作符被應用於它時才開始發射。通過這種方式,你可以提示 Observable 在你選擇的時間開始發射項目。

另請參閱

特定語言資訊

待定

publish

RxGroovy 將此操作符實現為 publish

publish

還有一個變體將函數作為參數。此函數將來源 Observable 發射的項目作為參數,並產生將由結果 Observable 發射的項目。

publish

RxJava 將此操作符實現為 publish

publish

還有一個變體將函數作為參數。此函數將 `ConnectableObservable` 作為參數,該 `ConnectableObservable` 與底層 Observable 序列共享單個訂閱。此函數產生並返回一個新的 Observable 序列。

publish

在 RxJS 中,publish 操作符將函數作為參數。此函數將來源 Observable 發射的項目作為參數,並產生將由返回的 ConnectableObservable 發射的項目。

範例程式碼

var interval = Rx.Observable.interval(1000);

var source = interval
    .take(2)
    .doAction(function (x) {
        console.log('Side effect');
    });

var published = source.publish();

published.subscribe(createObserver('SourceA'));
published.subscribe(createObserver('SourceB'));

var connection = published.connect();

function createObserver(tag) {
    return Rx.Observer.create(
        function (x) { console.log('Next: ' + tag + x); },
        function (err) { console.log('Error: ' + err); },
        function () { console.log('Completed'); });
}
Side effect
Next: SourceA0
Next: SourceB0
Side effect
Next: SourceA1
Next: SourceB1
Completed
publishValue

publishValue 操作符除了上述函數外,還接收一個初始項目,該項目將在連接時由結果的 ConnectableObservable 發射,然後才會發射來源 Observable 的項目。但是,它不會將此初始項目發射到在連接時間之後訂閱的觀察者。

範例程式碼

var interval = Rx.Observable.interval(1000);

var source = interval
    .take(2)
    .doAction(function (x) {
        console.log('Side effect');
    });

var published = source.publishValue(42);

published.subscribe(createObserver('SourceA'));
published.subscribe(createObserver('SourceB'));

var connection = published.connect();

function createObserver(tag) {
    return Rx.Observer.create(
        function (x) { console.log('Next: ' + tag + x); },
        function (err) { console.log('Error: ' + err); },
        function () { console.log('Completed'); });
}
Next: SourceA42
Next: SourceB42
Side effect
Next: SourceA0
Next: SourceB0
Side effect
Next: SourceA1
Next: SourceB1
Completed
Completed
publishLast

publishLast 操作符與 publish 類似,並將行為類似的函數作為其參數。它與 publish 的不同之處在於,它不是將該函數應用於並發射來源 Observable 在連接後發射的 *每個* 項目,而是僅將該函數應用於並發射來源 Observable 發射的 *最後一個* 項目,當來源 Observable 正常終止時。

範例程式碼

var interval = Rx.Observable.interval(1000);

var source = interval
    .take(2)
    .doAction(function (x) {
        console.log('Side effect');
    });

var published = source.publishLast();

published.subscribe(createObserver('SourceA'));
published.subscribe(createObserver('SourceB'));

var connection = published.connect();

function createObserver(tag) {
    return Rx.Observer.create(
        function (x) { console.log('Next: ' + tag + x); },
        function (err) { console.log('Error: ' + err); },
        function () { console.log('Completed'); });
}
Side effect
Side effect
Next: SourceA1
Completed
Next: SourceB1
Completed

上述操作符在以下套件中可用

  • rx.all.js
  • rx.all.compat.js
  • rx.binding.js (需要 rx.jsrx.compat.js)
  • rx.lite.js
  • rx.lite.compat.js

RxJS 還有一個 multicast 操作符,它對普通的 Observable 進行操作,透過您指定的特定 Subject 多播該 Observable,將轉換函數應用於每個發射,然後將這些轉換的值作為其自己的普通 Observable 序列發射。對此新 Observable 的每個訂閱都會觸發對底層多播 Observable 的新訂閱。

範例程式碼

var subject = new Rx.Subject();
var source = Rx.Observable.range(0, 3)
    .multicast(subject);

var observer = Rx.Observer.create(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); }
);

var subscription = source.subscribe(observer);
subject.subscribe(observer);

var connected = source.connect();

subscription.dispose();
Next: 0
Next: 0
Next: 1
Next: 1
Next: 2
Next: 2
Completed

multicast 操作符在以下套件中可用

  • rx.all.js
  • rx.all.compat.js
  • rx.binding.js (需要 rx.lite.jsrx.compat.js)
  • rx.lite.js
  • rx.lite.compat.js

還有一個 let 操作符 (別名 letBind 可用於 IE9 之前的 Internet Explorer 等瀏覽器,其中 “let” 是禁止的)。它類似於 multicast,但不透過 Subject 多播底層 Observable

範例程式碼

var obs = Rx.Observable.range(1, 3);

var source = obs.let(function (o) { return o.concat(o); });

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
Next: 1
Next: 2
Next: 3
Next: 1
Next: 2
Next: 3
Completed

let (或 letBind) 操作符在以下套件中可用

  • rx.all.js
  • rx.all.compat.js
  • rx.experimental.js

它需要以下其中一個套件

  • rx.js
  • rx.compat.js
  • rx.lite.js
  • rx.lite.compat.js

RxPHP 將此操作符實現為 multicast

透過實例化的 subject,將來源序列通知多播到選取器函數內的所有序列使用。對結果序列的每個訂閱都會導致單獨的多播調用,公開選取器函數調用所產生的序列。對於具有固定 subject 類型的特殊化,請參閱 Publish、PublishLast 和 Replay。

範例程式碼

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/multicast/multicast.php

$subject = new \Rx\Subject\Subject();
$source  = \Rx\Observable::range(0, 3)->multicast($subject);

$subscription = $source->subscribe($stdoutObserver);
$subject->subscribe($stdoutObserver);

$connected = $source->connect();

   
Next value: 0
Next value: 0
Next value: 1
Next value: 1
Next value: 2
Next value: 2
Complete!
    

RxPHP 還有一個操作符 multicastWithSelector

透過來自 subject 選取器工廠的實例化 subject,將來源序列通知多播到選取器函數內的所有序列使用。對結果序列的每個訂閱都會導致單獨的多播調用,公開選取器函數調用所產生的序列。對於具有固定 subject 類型的特殊化,請參閱 Publish、PublishLast 和 Replay。

RxPHP 還有一個操作符 publish

返回一個 observable 序列,該序列是在共享對底層序列的單個訂閱的可連接 observable 序列上調用選取器的結果。此操作符是使用常規 Subject 的 Multicast 特殊化。

範例程式碼

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/publish/publish.php

/* With publish */
$interval = \Rx\Observable::range(0, 10);

$source = $interval
    ->take(2)
    ->doOnNext(function ($x) {
        echo "Side effect\n";
    });

$published = $source->publish();

$published->subscribe($createStdoutObserver('SourceC '));
$published->subscribe($createStdoutObserver('SourceD '));

$published->connect();

   
Side effect
SourceC Next value: 0
SourceD Next value: 0
Side effect
SourceC Next value: 1
SourceD Next value: 1
SourceC Complete!
SourceD Complete!
    

RxPHP 還有一個操作符 publishLast

返回一個 observable 序列,該序列是在共享對底層序列的單個訂閱的可連接 observable 序列上調用選取器的結果,該序列僅包含最後的通知。此操作符是使用 AsyncSubject 的 Multicast 特殊化。

範例程式碼

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/publish/publishLast.php

$range = \Rx\Observable::fromArray(range(0, 1000));

$source = $range
    ->take(2)
    ->doOnNext(function ($x) {
        echo "Side effect\n";
    });

$published = $source->publishLast();

$published->subscribe($createStdoutObserver('SourceA'));
$published->subscribe($createStdoutObserver('SourceB'));

$connection = $published->connect();

   
Side effect
Side effect
SourceANext value: 1
SourceBNext value: 1
SourceAComplete!
SourceBComplete!
    

RxPHP 還有一個操作符 publishValue

返回一個 observable 序列,該序列是在共享對底層序列的單個訂閱並以 initialValue 開始的可連接 observable 序列上調用選取器的結果。此操作符是使用 BehaviorSubject 的 Multicast 特殊化。

範例程式碼

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/publish/publishValue.php

$range = \Rx\Observable::fromArray(range(0, 1000));

$source = $range
    ->take(2)
    ->doOnNext(function ($x) {
        echo "Side effect\n";
    });

$published = $source->publishValue(42);

$published->subscribe($createStdoutObserver('SourceA'));
$published->subscribe($createStdoutObserver('SourceB'));

$connection = $published->connect();

   
SourceANext value: 42
SourceBNext value: 42
Side effect
SourceANext value: 0
SourceBNext value: 0
Side effect
SourceANext value: 1
SourceBNext value: 1
SourceAComplete!
SourceBComplete!
    

待定