ReactiveX 的各種特定語言實作都有許多操作符,可用於將 Observable 或 Observable 發射的項目序列轉換為另一種物件或資料結構。其中一些會阻塞直到 Observable 終止,然後產生等效的物件或資料結構;其他的則會返回一個 Observable,該 Observable 會發射這類的物件或資料結構。
在 ReactiveX 的某些實作中,還有一個操作符會將 Observable 轉換為「阻塞式」Observable。阻塞式 Observable 擴展了普通的 Observable,提供一組方法,可對 Observable 發射的項目進行操作,並會阻塞。一些 To 操作符屬於此阻塞式 Observable 的擴展操作集合。
getIterator
操作符應用於 BlockingObservable
子類別,因此為了使用它,您必須先通過 BlockingObservable.from
方法或 Observable.toBlocking
操作符將來源 Observable 轉換為 BlockingObservable
。
此操作符將 Observable 轉換為 Iterator
,您可以使用它來迭代來源 Observable 發射的項目集合。
BlockingObservable.getIterator()
toFuture
操作符應用於 BlockingObservable
子類別,因此為了使用它,您必須先通過 BlockingObservable.from
方法或 Observable.toBlocking
操作符將來源 Observable 轉換為 BlockingObservable
。
此操作符將 Observable 轉換為 Future
,該 Future
將返回來源 Observable 發射的單個項目。如果來源 Observable 發射多個項目,則 Future
將收到 IllegalArgumentException
;如果它在沒有發射任何項目的情況下完成,則 Future
將收到 NoSuchElementException
。
如果您想將可能會發射多個項目的 Observable 轉換為 Future
,請嘗試類似以下的操作:myObservable.toList().toBlocking().toFuture()
。
BlockingObservable.toFuture()
toIterable
操作符應用於 BlockingObservable
子類別,因此為了使用它,您必須先通過 BlockingObservable.from
方法或 Observable.toBlocking
操作符將來源 Observable 轉換為 BlockingObservable
。
此操作符將 Observable 轉換為 Iterable
,您可以使用它來迭代來源 Observable 發射的項目集合。
BlockingObservable.toIterable()
通常,發射多個項目的 Observable 會針對每個此類項目調用其觀察者的 onNext
方法。您可以變更此行為,指示 Observable 組合這些多個項目的列表,然後只調用一次觀察者的 onNext
方法,並將整個列表傳遞給它,方法是將 toList
操作符應用於 Observable。
例如,以下有點無意義的程式碼會取得整數列表,將其轉換為 Observable,然後將該 Observable 轉換為發射原始列表做為單個項目的 Observable
numbers = Observable.from([1, 2, 3, 4, 5, 6, 7, 8, 9]); numbers.toList().subscribe( { println(it); }, // onNext { println("Error: " + it.getMessage()); }, // onError { println("Sequence complete"); } // onCompleted );
[1, 2, 3, 4, 5, 6, 7, 8, 9] Sequence complete
如果來源 Observable 在發射任何項目之前調用 onCompleted
,則 toList
返回的 Observable 將在調用 onCompleted
之前發射一個空列表。如果來源 Observable 調用 onError
,則 toList
返回的 Observable 將立即調用其觀察者的 onError
方法。
預設情況下,toList
不會在任何特定的排程器上運作。
toList()
toMap
操作符會將來源 Observable 發射的項目收集到一個映射 (預設情況下為 HashMap
,但您可以選擇提供一個工廠函式來產生另一種 Map
),然後發射該映射。您提供一個函式來產生每個發射項目的索引鍵。您也可以選擇提供一個函式來將發射項目轉換為要儲存在映射中的值 (預設情況下,項目本身就是此值)。
預設情況下,toMap
不會在任何特定的排程器上運作。
toMap(Func1)
toMap(Func1,Func1)
toMap(Func1,Func1,Func0)
toMultiMap
操作符與 toMap
類似,不同之處在於,它產生的映射也是 ArrayList
(預設情況下;或者您可以傳遞一個選擇性的工廠方法做為第四個參數,通過它來產生您偏好的集合類型)。
預設情況下,toMultiMap
不會在任何特定的排程器上運作。
toMultiMap(Func1)
toMultiMap(Func1,Func1)
toMultiMap(Func1,Func1,Func0)
toMultiMap(Func1,Func1,Func0,Func1)
toSortedList
操作符的行為與 toList
非常相似,不同之處在於它會對結果列表進行排序。預設情況下,它會透過 Comparable
介面以遞增順序自然地對列表進行排序。如果 Observable 發射的任何項目在相對於 Observable 發射的每個其他項目的類型而言,都不支援 Comparable
,則 toSortedList
將擲回例外狀況。但是,您也可以透過將一個函式傳遞到 toSortedList
來變更此預設行為,該函式會將兩個項目做為其參數並傳回一個數字;然後 toSortedList
將使用該函式而不是 Comparable
來排序項目。
例如,以下程式碼會取得一個未排序的整數列表,將其轉換為 Observable,然後將該 Observable 轉換為一個發射排序後的原始列表做為單個項目的 Observable
numbers = Observable.from([8, 6, 4, 2, 1, 3, 5, 7, 9]); numbers.toSortedList().subscribe( { println(it); }, // onNext { println("Error: " + it.getMessage()); }, // onError { println("Sequence complete"); } // onCompleted );
[1, 2, 3, 4, 5, 6, 7, 8, 9] Sequence complete
以下範例提供自己的排序函式:在這種情況下,該函式會根據數字與數字 5 的接近程度來排序數字。
numbers = Observable.from([8, 6, 4, 2, 1, 3, 5, 7, 9]); numbers.toSortedList({ n, m -> Math.abs(5-n) - Math.abs(5-m) }).subscribe( { println(it); }, // onNext { println("Error: " + it.getMessage()); }, // onError { println("Sequence complete"); } // onCompleted );
[5, 6, 4, 3, 7, 8, 2, 1, 9] Sequence complete
預設情況下,toSortedList
不會在任何特定的排程器上運作。
toSortedList()
toSortedList(Func2)
RxGroovy 也有一個 nest
操作符,它有一個特定的用途:它會將來源 Observable 轉換為一個以該來源 Observable 做為其唯一項目來發射的 Observable。
getIterator
操作符應用於 BlockingObservable
子類別,因此為了使用它,您必須先通過 BlockingObservable.from
方法或 Observable.toBlocking
操作符將來源 Observable 轉換為 BlockingObservable
。
此操作符將 Observable 轉換為 Iterator
,您可以使用它來迭代來源 Observable 發射的項目集合。
BlockingObservable.getIterator()
toFuture
操作符應用於 BlockingObservable
子類別,因此為了使用它,您必須先通過 BlockingObservable.from
方法或 Observable.toBlocking
操作符將來源 Observable 轉換為 BlockingObservable
。
此操作符將 Observable 轉換為 Future
,該 Future
將返回來源 Observable 發射的單個項目。如果來源 Observable 發射多個項目,則 Future
將收到 IllegalArgumentException
;如果它在沒有發射任何項目的情況下完成,則 Future
將收到 NoSuchElementException
。
如果您想將可能會發射多個項目的 Observable 轉換為 Future
,請嘗試類似以下的操作:myObservable.toList().toBlocking().toFuture()
。
BlockingObservable.toFuture()
toIterable
操作符應用於 BlockingObservable
子類別,因此為了使用它,您必須先通過 BlockingObservable.from
方法或 Observable.toBlocking
操作符將來源 Observable 轉換為 BlockingObservable
。
此操作符將 Observable 轉換為 Iterable
,您可以使用它來迭代來源 Observable 發射的項目集合。
BlockingObservable.toIterable()
通常,發射多個項目的 Observable 會針對每個此類項目調用其觀察者的 onNext
方法。您可以變更此行為,指示 Observable 組合這些多個項目的列表,然後只調用一次觀察者的 onNext
方法,並將整個列表傳遞給它,方法是將 toList
操作符應用於 Observable。
如果來源 Observable 在發射任何項目之前調用 onCompleted
,則 toList
返回的 Observable 將在調用 onCompleted
之前發射一個空列表。如果來源 Observable 調用 onError
,則 toList
返回的 Observable 將立即調用其觀察者的 onError
方法。
預設情況下,toList
不會在任何特定的排程器上運作。
toList()
toMap
操作符會將來源 Observable 發射的項目收集到一個映射 (預設情況下為 HashMap
,但您可以選擇提供一個工廠函式來產生另一種 Map
),然後發射該映射。您提供一個函式來產生每個發射項目的索引鍵。您也可以選擇提供一個函式來將發射項目轉換為要儲存在映射中的值 (預設情況下,項目本身就是此值)。
預設情況下,toMap
不會在任何特定的排程器上運作。
toMap(Func1)
toMap(Func1,Func1)
toMap(Func1,Func1,Func0)
toMultiMap
操作符與 toMap
類似,不同之處在於,它產生的映射也是 ArrayList
(預設情況下;或者您可以傳遞一個選擇性的工廠方法做為第四個參數,通過它來產生您偏好的集合類型)。
預設情況下,toMultiMap
不會在任何特定的排程器上運作。
toMultiMap(Func1)
toMultiMap(Func1,Func1)
toMultiMap(Func1,Func1,Func0)
toMultiMap(Func1,Func1,Func0,Func1)
toSortedList
操作符的行為與 toList
非常相似,不同之處在於它會對結果列表進行排序。預設情況下,它會透過 Comparable
介面以遞增順序自然地對列表進行排序。如果 Observable 發射的任何項目在相對於 Observable 發射的每個其他項目的類型而言,都不支援 Comparable
,則 toSortedList
將擲回例外狀況。但是,您也可以透過將一個函式傳遞到 toSortedList
來變更此預設行為,該函式會將兩個項目做為其參數並傳回一個數字;然後 toSortedList
將使用該函式而不是 Comparable
來排序項目。
預設情況下,toSortedList
不會在任何特定的排程器上運作。
toSortedList()
toSortedList(Func2)
RxJava 也有一個 nest
操作符,它有一個特定的用途:它會將來源 Observable 轉換為一個以該來源 Observable 做為其唯一項目來發射的 Observable。
通常,發射多個項目的 Observable 會針對每個此類項目調用其觀察者的 onNext
方法。您可以變更此行為,指示 Observable 組合這些多個項目的陣列,然後只調用一次觀察者的 onNext
方法,並將整個陣列傳遞給它,方法是將 toArray
操作符應用於 Observable。
var source = Rx.Observable.timer(0, 1000) .take(5) .toArray(); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: [0,1,2,3,4] Completed
toArray
可在以下每個發行版本中找到
rx.js
rx.all.js
rx.all.compat.js
rx.compat.js
rx.lite.js
rx.lite.compat.js
toMap
操作符會將來源 Observable 發射的項目收集到一個 Map
,然後發射該映射。您提供一個函式來產生每個發射項目的索引鍵。您也可以選擇提供一個函式來將發射項目轉換為要儲存在映射中的值 (預設情況下,項目本身就是此值)。
var source = Rx.Observable.timer(0, 1000) .take(5) .toMap(function (x) { return x * 2; }, function (x) { return x * 4; }); var subscription = source.subscribe( function (x) { var arr = []; x.forEach(function (value, key) { arr.push(value, key); }) console.log('Next: ' + arr); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: [0,0,2,4,4,8,6,12,8,16] Completed
toMap
可在以下每個發行版本中找到
rx.all.js
rx.all.compat.js
rx.aggregates.js
通常,發射多個項目的 Observable 會針對每個此類項目調用其觀察者的 onNext
方法。您可以變更此行為,指示 Observable 組合這些多個項目的 Set
,然後只調用一次觀察者的 onNext
方法,並將整個 Set
傳遞給它,方法是將 toSet
操作符應用於 Observable。
請注意,這僅在 ES6 環境中或經過 polyfill 的情況下有效。
var source = Rx.Observable.timer(0, 1000) .take(5) .toSet(); var subscription = source.subscribe( function (x) { var arr = []; x.forEach(function (i) { arr.push(i); }) console.log('Next: ' + arr); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: [0,1,2,3,4] Completed
toSet
可在以下每個發行版本中找到
rx.all.js
rx.all.compat.js
rx.aggregates.js
待定
RxPHP 將此操作符實作為 toArray
。
建立一個包含單個元素的 Observable 序列,該元素是一個包含來源序列所有元素的陣列。
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/toArray/toArray.php $source = \Rx\Observable::fromArray([1, 2, 3, 4]); $observer = $createStdoutObserver(); $subscription = $source->toArray() ->subscribe(new CallbackObserver( function ($array) use ($observer) { $observer->onNext(json_encode($array)); }, [$observer, "onError"], [$observer, "onCompleted"] ));
Next value: [1,2,3,4] Complete!