當您使用 Observables 時,如果所有要處理的資料都可以表示為 Observables,而不是 Observables 和其他類型的混合,可能會更方便。這讓您可以使用一組操作符來管理整個資料流的生命週期。
例如,Iterables 可以被認為是一種同步的 Observable;Futures 則是一種總是只發出單一項目的 Observable。透過明確地將這些物件轉換為 Observables,您可以讓它們與其他 Observables 互動。
因此,大多數 ReactiveX 實作都有方法可以讓您將某些特定於語言的物件和資料結構轉換為 Observables。
待定
在 RxGroovy 中,from
操作符可以轉換 Future、Iterable 或 Array。如果輸入是 Iterable 或 Array,則產生的 Observable 會發出 Iterable 或 Array 中包含的每個項目。
如果輸入是 Future,它將發出 get
呼叫的單一結果。您可以選擇傳遞接受 future 的 from
版本,並額外傳遞兩個參數,指示逾時時間跨度以及時間跨度所用的時間單位。如果 Future 在該時間跨度過去之前沒有回應值,則產生的 Observable 將以錯誤終止。
from
預設情況下不會在任何特定的 Scheduler 上運作,但是您可以傳遞將 Future 轉換為 Scheduler 的變體作為可選的第二個參數,它將使用該 Scheduler 來管理 Future。
from(array)
from(Iterable)
from(Future)
from(Future,Scheduler)
from(Future,timout,timeUnit)
此外,在 RxJavaAsyncUtil
套件中,您可以使用以下操作符將 actions、callables、functions 和 runnables 轉換為 Observables,這些 Observables 會發出這些事物的結果。
fromAction
fromCallable
fromFunc0
fromRunnable
請參閱 Start 操作符以取得有關這些操作符的更多資訊。
請注意,還有一個 from
操作符是可選的 StringObservable
類別的方法。它將字元流或 Reader
轉換為發出位元組陣列或字串的 Observable。
在單獨的 RxJavaAsyncUtil
套件中(預設情況下不包含在 RxGroovy 中),還有一個 runAsync
函數。將 runAsync
傳遞一個 Action
和一個 Scheduler
,它將傳回一個 StoppableObservable
,該 StoppableObservable
使用指定的 Action
來產生它發出的項目。
Action
接受一個 Observer
和一個 Subscription
。它使用 Subscription
來檢查 isUnsubscribed
條件,如果滿足該條件,它將停止發出項目。您也可以隨時透過呼叫其 unsubscribe
方法來手動停止 StoppableObservable
(這也會取消訂閱與 StoppableObservable
相關聯的 Subscription
)。
由於 runAsync
會立即調用 Action
並開始發出項目,因此在您透過此方法建立 StoppableObservable
和 Observer
準備好接收項目之間的時間間隔中,可能會遺失一些項目。如果這是個問題,您可以使用 runAsync
的變體,該變體也接受一個 Subject
,並傳遞一個 ReplaySubject
,您可以使用它來擷取其他遺失的項目。
StringObservable
類別(不是 RxGroovy 的預設部分)還包含 decode
操作符,該操作符將多位元組字元流轉換為發出尊重字元邊界的位元組陣列的 Observable。
在 RxJava 中,from
操作符可以轉換 Future、Iterable 或 Array。如果輸入是 Iterable 或 Array,則產生的 Observable 會發出 Iterable 或 Array 中包含的每個項目。
Integer[] items = { 0, 1, 2, 3, 4, 5 }; Observable myObservable = Observable.from(items); myObservable.subscribe( new Action1<Integer>() { @Override public void call(Integer item) { System.out.println(item); } }, new Action1<Throwable>() { @Override public void call(Throwable error) { System.out.println("Error encountered: " + error.getMessage()); } }, new Action0() { @Override public void call() { System.out.println("Sequence complete"); } } );
0 1 2 3 4 5 Sequence complete
如果輸入是 Future,它將發出 get
呼叫的單一結果。您可以選擇傳遞接受 future 的 from
版本,並額外傳遞兩個參數,指示逾時時間跨度以及時間跨度所用的時間單位。如果 Future 在該時間跨度過去之前沒有回應值,則產生的 Observable 將以錯誤終止。
from
預設情況下不會在任何特定的 Scheduler 上運作,但是您可以傳遞將 Future 轉換為 Scheduler 的變體作為可選的第二個參數,它將使用該 Scheduler 來管理 Future。
from(array)
from(Iterable)
from(Future)
from(Future,Scheduler)
from(Future,timout,timeUnit)
此外,在 RxJavaAsyncUtil
套件中,您可以使用以下操作符將 actions、callables、functions 和 runnables 轉換為 Observables,這些 Observables 會發出這些事物的結果。
fromAction
fromCallable
fromFunc0
fromRunnable
請參閱 Start 操作符以取得有關這些操作符的更多資訊。
請注意,還有一個 from
操作符是可選的 StringObservable
類別的方法。它將字元流或 Reader
轉換為發出位元組陣列或字串的 Observable。
在單獨的 RxJavaAsyncUtil
套件中(預設情況下不包含在 RxJava 中),還有一個 runAsync
函數。將 runAsync
傳遞一個 Action
和一個 Scheduler
,它將傳回一個 StoppableObservable
,該 StoppableObservable
使用指定的 Action
來產生它發出的項目。
Action
接受一個 Observer
和一個 Subscription
。它使用 Subscription
來檢查 isUnsubscribed
條件,如果滿足該條件,它將停止發出項目。您也可以隨時透過呼叫其 unsubscribe
方法來手動停止 StoppableObservable
(這也會取消訂閱與 StoppableObservable
相關聯的 Subscription
)。
由於 runAsync
會立即調用 Action
並開始發出項目,因此在您透過此方法建立 StoppableObservable
和 Observer
準備好接收項目之間的時間間隔中,可能會遺失一些項目。如果這是個問題,您可以使用 runAsync
的變體,該變體也接受一個 Subject
,並傳遞一個 ReplaySubject
,您可以使用它來擷取其他遺失的項目。
StringObservable
類別(不是 RxGroovy 的預設部分)還包含 decode
操作符,該操作符將多位元組字元流轉換為發出尊重字元邊界的位元組陣列的 Observable。
RxJS 中有幾個特殊的 From 變體
在 RxJS 中,from
操作符將類陣列或可迭代物件轉換為 Observable,該 Observable 發出該陣列或可迭代物件中的項目。在這種情況下,字串被視為字元陣列。
此操作符還採用三個額外的可選參數
// Array-like object (arguments) to Observable function f() { return Rx.Observable.from(arguments); } f(1, 2, 3).subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 1 Next: 2 Next: 3 Completed
// Any iterable object... // Set var s = new Set(['foo', window]); Rx.Observable.from(s).subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: foo Next: window Completed
// Map var m = new Map([[1, 2], [2, 4], [4, 8]]); Rx.Observable.from(m).subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: [1, 2] Next: [2, 4] Next: [4, 8] Completed
// String Rx.Observable.from("foo").subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: f Next: o Next: o Completed
// Using an arrow function as the map function to manipulate the elements Rx.Observable.from([1, 2, 3], function (x) { return x + x; }).subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 2 Next: 4 Next: 6 Completed
// Generate a sequence of numbers Rx.Observable.from({length: 5}, function(v, k) { return k; }).subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 0 Next: 1 Next: 2 Next: 3 Next: 4 Completed
在以下發行版中可以找到 from
rx.js
rx.all.js
rx.all.compat.js
rx.compat.js
rx.lite.js
rx.lite.compat.js
fromCallback
操作符接受一個函數作為參數,呼叫此函數,並將其傳回的值作為其單一發射發出。
此操作符還採用兩個額外的可選參數
var fs = require('fs'), Rx = require('rx'); // Wrap fs.exists var exists = Rx.Observable.fromCallback(fs.exists); // Check if file.txt exists var source = exists('file.txt'); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: true Completed
在以下發行版中可以找到 fromCallback
rx.all.js
rx.all.compat.js
rx.async.js
(需要 rx.binding.js
以及 rx.js
或 rx.compat.js
)rx.async.compat.js
(需要 rx.binding.js
以及 rx.js
或 rx.compat.js
)rx.lite.js
rx.lite.compat.js
還有一個 fromNodeCallback
操作符,專門用於 Node.js 中找到的回呼函數類型。
此操作符採用三個額外的可選參數
var fs = require('fs'), Rx = require('rx'); // Wrap fs.exists var rename = Rx.Observable.fromNodeCallback(fs.rename); // Rename file which returns no parameters except an error var source = rename('file1.txt', 'file2.txt'); var subscription = source.subscribe( function () { console.log('Next: success!'); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: success! Completed
在以下發行版中可以找到 fromNodeCallback
rx.async.js
(需要 rx.binding.js
以及 rx.js
或 rx.compat.js
)rx.async.compat.js
(需要 rx.binding.js
以及 rx.js
或 rx.compat.js
)rx.lite.js
rx.lite.compat.js
fromEvent
操作符接受一個「元素」和一個事件名稱作為參數,然後它會監聽該元素上發生的該名稱的事件。它會傳回發出這些事件的 Observable。「元素」可以是簡單的 DOM 元素,也可以是 NodeList、jQuery 元素、Zepto 元素、Angular 元素、Ember.js 元素或 EventEmitter。
此操作符還採用第三個可選參數:一個函數,它接受事件處理程式中的參數作為參數,並傳回要由產生的 Observable 發出的項目來代替事件。
// using a jQuery element var input = $('#input'); var source = Rx.Observable.fromEvent(input, 'click'); var subscription = source.subscribe( function (x) { console.log('Next: Clicked!'); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); }); input.trigger('click');
Next: Clicked!
// using a Node.js EventEmitter and the optional third parameter var EventEmitter = require('events').EventEmitter, Rx = require('rx'); var eventEmitter = new EventEmitter(); var source = Rx.Observable.fromEvent( eventEmitter, 'data', function (first, second) { return { foo: first, bar: second }; }); var subscription = source.subscribe( function (x) { console.log('Next: foo -' + x.foo + ', bar -' + x.bar); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); }); eventEmitter.emit('data', 'baz', 'quux');
Next: foo - baz, bar - quux
在以下發行版中可以找到 fromEvent
rx.async.js
(需要 rx.binding.js
以及 rx.js
或 rx.compat.js
)rx.async.compat.js
(需要 rx.binding.js
以及 rx.js
或 rx.compat.js
)rx.lite.js
rx.lite.compat.js
fromEventPattern
操作符類似,只是它不是接受一個元素和一個事件名稱作為參數,而是接受兩個函數作為參數。第一個函數將事件監聽器附加到各種元素上的各種事件;第二個函數移除這組監聽器。透過這種方式,您可以建立一個單一的 Observable,該 Observable 發出代表各種事件和各種目標元素的項目。
var input = $('#input'); var source = Rx.Observable.fromEventPattern( function add (h) { input.bind('click', h); }, function remove (h) { input.unbind('click', h); } ); var subscription = source.subscribe( function (x) { console.log('Next: Clicked!'); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); }); input.trigger('click');
Next: Clicked!
of
操作符接受多個項目作為參數,並傳回一個 Observable,該 Observable 按順序發出每個這些參數作為其發出的序列。
var source = Rx.Observable.of(1,2,3); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 1 Next: 2 Next: 3 Completed
在以下發行版中可以找到 of
rx.js
rx.all.js
rx.all.compat.js
rx.compat.js
rx.lite.js
rx.lite.compat.js
這個操作符的一個變體稱為 ofWithScheduler
,它將 Scheduler 作為其第一個參數,並在此 Scheduler 上運作產生的 Observable。
還有一個 fromPromise
操作符,它將 Promise 轉換為 Observable,將其 resolve
呼叫轉換為 onNext
通知,並將其 reject
呼叫轉換為 onError
通知。
在以下發行版中可以找到 fromPromise
rx.async.js
(需要 rx.binding.js
以及 rx.js
或 rx.compat.js
)rx.async.compat.js
(需要 rx.binding.js
以及 rx.js
或 rx.compat.js
)rx.lite.js
rx.lite.compat.js
var promise = new RSVP.Promise(function (resolve, reject) { resolve(42); }); var source = Rx.Observable.fromPromise(promise); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (e) { console.log('Error: ' + e); }, function ( ) { console.log('Completed'); });
Next: 42: Completed
var promise = new RSVP.Promise(function (resolve, reject) { reject(new Error('reason')); }); var source = Rx.Observable.fromPromise(promise); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (e) { console.log('Error: ' + e); }, function ( ) { console.log('Completed'); });
Error: Error: reject
還有一個 ofArrayChanges
操作符,它使用 Array.observe
方法監視陣列,並傳回一個 Observable,該 Observable 發出陣列中發生的任何變更。此操作符僅在 rx.all.js
發行版中找到。
var arr = [1,2,3]; var source = Rx.Observable.ofArrayChanges(arr); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (e) { console.log('Error: ' + e); }, function ( ) { console.log('Completed'); }); arr.push(4)
Next: {type: "splice", object: Array[4], index: 3, removed: Array[0], addedCount: 1}
類似的操作符是 ofObjectChanges
。它傳回一個 Observable,該 Observable 發出對特定物件所做的任何變更,如其 Object.observe
方法所報告。它也僅在 rx.all.js
發行版中找到。
var obj = {x: 1}; var source = Rx.Observable.ofObjectChanges(obj); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (e) { console.log('Error: ' + e); }, function ( ) { console.log('Completed'); }); obj.x = 42;
Next: {type: "update", object: Object, name: "x", oldValue: 1}
還有一個 pairs
操作符。此操作符接受一個物件,並傳回一個 Observable,該 Observable 以鍵/值對的形式發出該物件的屬性。
var obj = { foo: 42, bar: 56, baz: 78 }; var source = Rx.Observable.pairs(obj); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (e) { console.log('Error: ' + e); }, function ( ) { console.log('Completed'); });
Next: ['foo', 42] Next: ['bar', 56] Next: ['baz', 78] Completed
在以下發行版中可以找到 pairs
rx.js
rx.all.js
rx.all.compat.js
rx.compat.js
rx.lite.js
rx.lite.compat.js
RxPHP 將此操作符實作為 fromArray
。
將陣列轉換為可觀察序列
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/fromArray/fromArray.php $source = \Rx\Observable::fromArray([1, 2, 3, 4]); $subscription = $source->subscribe($stdoutObserver); //Next value: 1 //Next value: 2 //Next value: 3 //Next value: 4 //Complete!
Next value: 1 Next value: 2 Next value: 3 Next value: 4 Complete!
RxPHP 還有一個操作符 fromIterator
。
將 Iterator 轉換為可觀察序列
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/iterator/iterator.php $generator = function () { for ($i = 1; $i <= 3; $i++) { yield $i; } return 4; }; $source = Rx\Observable::fromIterator($generator()); $source->subscribe($stdoutObserver);
Next value: 1 Next value: 2 Next value: 3 Next value: 4 Complete!
RxPHP 還有一個操作符 asObservable
。
隱藏可觀察序列的身份。
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/asObservable/asObservable.php // Create subject $subject = new \Rx\Subject\AsyncSubject(); // Send a value $subject->onNext(42); $subject->onCompleted(); // Hide its type $source = $subject->asObservable(); $source->subscribe($stdoutObserver);
Next value: 42 Complete!
RxPHP 也具有 fromPromise
運算子。
將 Promise 轉換為可觀察物件。
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/promise/fromPromise.php $promise = \React\Promise\resolve(42); $source = \Rx\Observable::fromPromise($promise); $subscription = $source->subscribe($stdoutObserver);
Next value: 42 Complete!
在 Swift 中,這是使用 Observable.from
類別方法實作的。
陣列中的每個元素都會作為一個發射值產生。此方法與 Observable.just
的不同之處在於,後者將整個陣列作為一個發射值發出。
let numbers = [1,2,3,4,5] let source = Observable.from(numbers) source.subscribe { print($0) }
next(1) next(2) next(3) next(4) next(5) completed