你可以使用 Take 操作符修改 Observable,只發射 Observable 發出的前 n 個項目,然後完成,並忽略其餘的項目。
take
待辦
在 RxGroovy 中,此操作符以 take
實作。
如果你在 Observable 上使用 take(n)
操作符 (或其同義詞 limit(n)
),且該 Observable 在完成之前發射的項目少於 n 個,則新的,經 take
修改的 Observable 將不會拋出異常或調用 onError
,而只會在完成之前發射相同數量的較少項目。
numbers = Observable.from([1, 2, 3, 4, 5, 6, 7, 8]); numbers.take(3).subscribe( { println(it); }, // onNext { println("Error: " + it.getMessage()); }, // onError { println("Sequence complete"); } // onCompleted );
1 2 3 Sequence complete
這個版本的 take
預設不會在任何特定的 Scheduler 上操作。
take(int)
還有一個版本的 take
採用時間長度而不是項目數量。它會產生一個 Observable,該 Observable 只發射在來源 Observable 的生命週期中初始持續時間內發射的項目。你可以通過將時間長度和時間單位作為參數傳遞給 take
來設定此持續時間。
這個版本的 take
預設在 computation
Scheduler 上操作,但你也可以選擇性地傳遞你選擇的 Scheduler 作為第三個參數。
take(long,TimeUnit)
take(long,TimeUnit,Scheduler)
在 RxJava 中,此操作符以 take
實作。
如果你在 Observable 上使用 take(n)
操作符 (或其同義詞 limit(n)
),且該 Observable 在完成之前發射的項目少於 n 個,則新的,經 take
修改的 Observable 將不會拋出異常或調用 onError
,而只會在完成之前發射相同數量的較少項目。
Observable.just(1, 2, 3, 4, 5, 6, 7, 8) .take(4) .subscribe(new Subscriber<Integer>() { @Override public void onNext(Integer item) { System.out.println("Next: " + item); } @Override public void onError(Throwable error) { System.err.println("Error: " + error.getMessage()); } @Override public void onCompleted() { System.out.println("Sequence complete."); } });
Next: 1 Next: 2 Next: 3 Next: 4 Sequence complete.
這個版本的 take
預設不會在任何特定的 Scheduler 上操作。
take(int)
還有一個版本的 take
採用時間長度而不是項目數量。它會產生一個 Observable,該 Observable 只發射在來源 Observable 的生命週期中初始持續時間內發射的項目。你可以通過將時間長度和時間單位作為參數傳遞給 take
來設定此持續時間。
這個版本的 take
預設在 computation
Scheduler 上操作,但你也可以選擇性地傳遞你選擇的 Scheduler 作為第三個參數。
take(long,TimeUnit)
take(long,TimeUnit,Scheduler)
RxJS 實作了 take
操作符。
var source = Rx.Observable.range(0, 5) .take(3); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 0 Next: 1 Next: 2 Completed
對於 take(0)
的特殊情況,你也可以傳遞一個 Scheduler 作為第二個參數,take
將使用該 Scheduler 立即排程對 onCompleted
的調用。
take
在以下每個發行版中都可找到
rx.js
rx.all.js
rx.all.compat.js
rx.compat.js
rx.lite.js
rx.lite.compat.js
RxJS 還實作了 takeUntilWithTime
操作符,它與 take
類似,不同之處在於它不是採用特定數量的項目,而是採用在初始時間段內發射的所有項目。你可以通過以下兩種格式之一傳遞參數給 takeUntilWithTime
來建立此時間段
Date
你也可以選擇性地傳遞一個 Scheduler 作為第二個參數,計時器將在該 Scheduler 上操作 (takeUntilWithTime
預設使用 timeout
Scheduler)。
var source = Rx.Observable.timer(0, 1000) .takeUntilWithTime(5000); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 0 Next: 1 Next: 2 Next: 3 Next: 4 Completed
takeUntilWithTime
在以下每個發行版中都可找到
rx.all.js
rx.all.compat.js
rx.time.js
(需要 rx.js
或 rx.compat.js
)rx.lite.js
rx.lite.compat.js
待辦
RxPHP 將此操作符實作為 take
。
從 observable 序列的開頭返回指定數量的連續元素
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/take/take.php $observable = Rx\Observable::fromArray([21, 42, 63]); $observable ->take(2) ->subscribe($stdoutObserver);
Next value: 21 Next value: 42 Complete!
RxPHP 還有一個操作符 takeUntil
。
從來源 observable 序列返回值,直到另一個 observable 序列產生一個值。
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/take/takeUntil.php $source = \Rx\Observable::interval(105) ->takeUntil(\Rx\Observable::timer(1000)); $subscription = $source->subscribe($stdoutObserver);
Next value: 0 Next value: 1 Next value: 2 Next value: 3 Next value: 4 Next value: 5 Next value: 6 Next value: 7 Next value: 8 Complete!
待辦
待辦
待辦